Command Handler
Event Sourcing brings a repeatable pattern for handling business logic. We can expand that to application logic.
Command Handling can be described by the following steps:
- Read events from the stream and build the state from them (in other words aggregate stream). Get also the current version of the stream.
- Run the business logic using the command and the state. Use the default (initial) state if the stream does not exist.
- Append the result of the business logic (so events) at the end of the stream from which you've read events. Use the read version (or the one provided by the user) for an optimistic concurrency check.
In pseudo-code, this could look as follows:
ts
const { state, expectedStreamVersion } = await eventStore.aggregateStream(
streamName,
{
evolve,
initialState,
},
);
const events = handle(command, state);
await eventStore.appendToStream(streamName, result, { expectedStreamVersion });
That looks quite simple, but generalising it and making it robust requires some experience. But that's why you have Emmett, the intention is to cut the learning curve for you and help you with basic abstractions.
Usage
You can use the CommandHandler
method to set up a command handler for you:
ts
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';
export const handle = CommandHandler({ evolve, initialState });
Such handlers should be defined per stream type (e.g., one for Shopping Cart, the other for Orders, etc.). It can be used later in the application code as:
ts
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
};
const { nextExpectedStreamVersion } = await handle(
eventStore,
shoppingCartId,
(state) => addProductItem(command, state),
);
Definition
ts
export type CommandHandlerResult<
State,
StreamEvent extends Event,
Store extends EventStore,
> = AppendStreamResultOfEventStore<Store> & {
newState: State;
newEvents: StreamEvent[];
};
export type CommandHandlerOptions<State, StreamEvent extends Event> = {
evolve: (state: State, event: StreamEvent) => State;
initialState: () => State;
mapToStreamId?: (id: string) => string;
retry?: CommandHandlerRetryOptions;
};
export type HandleOptions<Store extends EventStore> = Parameters<
Store['appendToStream']
>[2] &
(
| {
expectedStreamVersion?: ExpectedStreamVersion<
StreamPositionTypeOfEventStore<Store>
>;
}
| {
retry?: CommandHandlerRetryOptions;
}
);
type CommandHandlerFunction<State, StreamEvent extends Event> = (
state: State,
) => StreamEvent | StreamEvent[] | Promise<StreamEvent | StreamEvent[]>;
export const CommandHandler =
<State, StreamEvent extends Event>(
options: CommandHandlerOptions<State, StreamEvent>,
) =>
async <Store extends EventStore>(
store: Store,
id: string,
handle:
| CommandHandlerFunction<State, StreamEvent>
| CommandHandlerFunction<State, StreamEvent>[],
handleOptions?: HandleOptions<Store>,
): Promise<CommandHandlerResult<State, StreamEvent, Store>> =>
asyncRetry(
async () => {
const result = await withSession<
Store,
CommandHandlerResult<
State,
StreamEvent,
StreamPositionTypeOfEventStore<Store>
>
>(store, async ({ eventStore }) => {
const { evolve, initialState } = options;
const mapToStreamId = options.mapToStreamId ?? ((id) => id);
const streamName = mapToStreamId(id);
// 1. Aggregate the stream
const aggregationResult = await eventStore.aggregateStream<
State,
StreamEvent
>(streamName, {
evolve,
initialState,
read: {
// expected stream version is passed to fail fast
// if stream is in the wrong state
expectedStreamVersion:
handleOptions?.expectedStreamVersion ?? NO_CONCURRENCY_CHECK,
},
});
// 2. Use the aggregate state
const {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
currentStreamVersion,
streamExists: _streamExists,
...restOfAggregationResult
} = aggregationResult;
let state = aggregationResult.state;
const handlers = Array.isArray(handle) ? handle : [handle];
let eventsToAppend: StreamEvent[] = [];
// 3. Run business logic
for (const handler of handlers) {
const result = await handler(state);
const newEvents = Array.isArray(result) ? result : [result];
if (newEvents.length > 0) {
state = newEvents.reduce(evolve, state);
}
eventsToAppend = [...eventsToAppend, ...newEvents];
}
//const newEvents = Array.isArray(result) ? result : [result];
if (eventsToAppend.length === 0) {
return {
...restOfAggregationResult,
newEvents: [],
newState: state,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
nextExpectedStreamVersion: currentStreamVersion,
createdNewStream: false,
} as unknown as CommandHandlerResult<State, StreamEvent, Store>;
}
// Either use:
// - provided expected stream version,
// - current stream version got from stream aggregation,
// - or expect stream not to exists otherwise.
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const expectedStreamVersion: ExpectedStreamVersion<
StreamPositionTypeOfEventStore<Store>
> =
handleOptions?.expectedStreamVersion ??
(aggregationResult.streamExists
? (currentStreamVersion as ExpectedStreamVersion<
StreamPositionTypeOfEventStore<Store>
>)
: STREAM_DOES_NOT_EXIST);
// 4. Append result to the stream
const appendResult = await eventStore.appendToStream(
streamName,
eventsToAppend,
{
...handleOptions,
expectedStreamVersion,
},
);
// 5. Return result with updated state
return {
...appendResult,
newEvents: eventsToAppend,
newState: state,
} as unknown as CommandHandlerResult<State, StreamEvent, Store>;
});
return result;
},
fromCommandHandlerRetryOptions(
handleOptions && 'retry' in handleOptions
? handleOptions.retry
: options.retry,
),
);