Command Handler
WARNING
We created this page with the help of the GenAI tool.
We're currently double-checking it to ensure the information is 100% correct and free of hallucinations.
The Command Handler encapsulates the standard Event Sourcing pattern: read state, run business logic, append events.
Overview
Command handling follows a repeatable pattern:
- Aggregate stream - Read events and build current state
- Execute business logic - Run the handler with command and state
- Append events - Store the resulting events with optimistic concurrency
typescript
// The pattern Command Handler automates:
const { state, currentStreamVersion } = await eventStore.aggregateStream(
streamName,
{ evolve, initialState },
);
const events = handle(command, state);
await eventStore.appendToStream(streamName, events, {
expectedStreamVersion: currentStreamVersion,
});Basic Usage
Creating a Command Handler
ts
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';
export const handle = CommandHandler({ evolve, initialState });Using the Handler
ts
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
};
const { nextExpectedStreamVersion } = await handle(
eventStore,
shoppingCartId,
(state) => addProductItem(command, state),
);Type Definitions
CommandHandler
typescript
const CommandHandler = <State, StreamEvent extends Event>(
options: CommandHandlerOptions<State, StreamEvent>
) => async <Store extends EventStore>(
store: Store,
id: string,
handle: CommandHandlerFunction<State, StreamEvent>,
handleOptions?: HandleOptions<Store>
): Promise<CommandHandlerResult<State, StreamEvent, Store>>;CommandHandlerOptions
typescript
type CommandHandlerOptions<State, StreamEvent extends Event> = {
evolve: (state: State, event: StreamEvent) => State;
initialState: () => State;
mapToStreamId?: (id: string) => string;
retry?: CommandHandlerRetryOptions;
};| Property | Type | Description |
|---|---|---|
evolve | (state, event) => state | State evolution function |
initialState | () => State | Factory for initial state |
mapToStreamId | (id: string) => string | Maps ID to stream name (default: identity) |
retry | CommandHandlerRetryOptions | Retry configuration |
CommandHandlerResult
typescript
type CommandHandlerResult<State, StreamEvent, Store> = {
newState: State;
newEvents: StreamEvent[];
nextExpectedStreamVersion: bigint;
createdNewStream: boolean;
};Stream ID Mapping
Map business IDs to stream names:
typescript
const handle = CommandHandler({
evolve,
initialState,
mapToStreamId: (id) => `shopping_cart-${id}`,
});
// Called with business ID
await handle(eventStore, 'cart-123', (state) => [...]);
// Internally uses stream: 'shopping_cart-cart-123'Handler Functions
Handlers receive state and return events:
Single Event
typescript
await handle(eventStore, cartId, (state) => ({
type: 'ProductItemAdded',
data: { productId, quantity, price },
}));Multiple Events
typescript
await handle(eventStore, cartId, (state) => [
{ type: 'DiscountApplied', data: { code: 'SAVE10' } },
{ type: 'TaxCalculated', data: { amount: 15.5 } },
]);Async Handler
typescript
await handle(eventStore, cartId, async (state) => {
const price = await lookupPrice(productId);
return {
type: 'ProductItemAdded',
data: { productId, quantity, price },
};
});Multiple Handlers (Sequential)
Execute multiple handlers in sequence, each seeing the updated state:
typescript
await handle(eventStore, cartId, [
(state) => ({ type: 'ProductItemAdded', data: productData }),
(state) => {
// state now includes the effect of ProductItemAdded
if (state.items.length >= 3) {
return { type: 'BulkDiscountApplied', data: { discount: 10 } };
}
return [];
},
]);Optimistic Concurrency
Automatic Version Tracking
By default, Command Handler tracks versions automatically:
typescript
// First call: creates stream at version 0
await handle(eventStore, 'cart-123', (state) => firstEvent);
// Second call: expects version 0, appends at version 1
await handle(eventStore, 'cart-123', (state) => secondEvent);
// Concurrent call: fails if version changed
await handle(eventStore, 'cart-123', (state) => thirdEvent);
// Throws ConcurrencyError if stream was modifiedExplicit Version
typescript
await handle(eventStore, cartId, (state) => events, {
expectedStreamVersion: 5n,
});Require New Stream
typescript
await handle(
eventStore,
cartId,
(state) => [{ type: 'CartOpened', data: {} }],
{ expectedStreamVersion: 'no_stream' },
);Retry Configuration
Retry on Version Conflict
typescript
const handle = CommandHandler({
evolve,
initialState,
retry: { onVersionConflict: true }, // 3 retries with backoff
});
// Or specify retry count
const handle = CommandHandler({
evolve,
initialState,
retry: { onVersionConflict: 5 },
});Custom Retry Options
typescript
const handle = CommandHandler({
evolve,
initialState,
retry: {
retries: 5,
minTimeout: 100,
factor: 2,
shouldRetryError: (error) => error instanceof ConcurrencyError,
},
});Per-Call Retry Override
typescript
await handle(eventStore, cartId, (state) => events, {
retry: { onVersionConflict: 10 },
});No-Op Handling
If handler returns empty array, no append occurs:
typescript
await handle(eventStore, cartId, (state) => {
// Already confirmed, do nothing
if (state.status === 'Confirmed') {
return [];
}
return [{ type: 'CartConfirmed', data: {} }];
});Error Handling
Business Errors
Throw errors for business rule violations:
typescript
await handle(eventStore, cartId, (state) => {
if (state.status !== 'Open') {
throw new IllegalStateError('Cart is not open');
}
if (command.quantity <= 0) {
throw new ValidationError('Quantity must be positive');
}
return [{ type: 'ProductItemAdded', data: command }];
});Concurrency Errors
typescript
import { ConcurrencyError } from '@event-driven-io/emmett';
try {
await handle(eventStore, cartId, (state) => events);
} catch (error) {
if (error instanceof ConcurrencyError) {
// Stream was modified by another process
console.log(
`Version conflict: expected ${error.expected}, got ${error.actual}`,
);
// Retry with fresh state or notify user
}
throw error;
}Integration with Web Frameworks
Express.js
typescript
import { on, ok } from '@event-driven-io/emmett-expressjs';
router.post(
'/carts/:id/items',
on(async (request) => {
const { id } = request.params;
const { productId, quantity } = request.body;
const result = await handle(eventStore, id, (state) => ({
type: 'ProductItemAdded',
data: { productId, quantity, price: await getPrice(productId) },
}));
return ok({
status: 'Added',
version: result.nextExpectedStreamVersion.toString(),
});
}),
);With ETag Concurrency
typescript
router.post(
'/carts/:id/confirm',
on(async (request) => {
const { id } = request.params;
const expectedVersion = getExpectedVersionFromRequest(request);
const result = await handle(
eventStore,
id,
(state) => ({ type: 'CartConfirmed', data: { confirmedAt: new Date() } }),
{ expectedStreamVersion: expectedVersion },
);
return ok({ status: 'Confirmed' });
}),
);Best Practices
1. Keep Handlers Pure When Possible
typescript
// ✅ Good: Pure handler
await handle(eventStore, cartId, (state) => ({
type: 'ProductItemAdded',
data: { ...command.data, addedAt: command.metadata.now },
}));
// ⚠️ Less ideal: Side effects in handler
await handle(eventStore, cartId, async (state) => {
await externalService.notify(); // Side effect
return [event];
});2. Validate Before Deciding
typescript
// ✅ Good: Guard clauses first
await handle(eventStore, cartId, (state) => {
if (state.status === 'Confirmed') {
throw new IllegalStateError('Cannot modify confirmed cart');
}
return [event];
});3. Use Type-Safe Event Unions
typescript
// ✅ Good: Discriminated union of events
type ShoppingCartEvent =
| Event<'ProductItemAdded', {...}>
| Event<'CartConfirmed', {...}>;
const handle = CommandHandler<ShoppingCart, ShoppingCartEvent>({
evolve,
initialState,
});Type Source
ts
export type CommandHandlerResult<
State,
StreamEvent extends Event,
Store extends EventStore,
> = AppendStreamResultOfEventStore<Store> & {
newState: State;
newEvents: StreamEvent[];
};
export type CommandHandlerOptions<
State,
StreamEvent extends Event,
StoredEvent extends Event = StreamEvent,
> = {
evolve: (state: State, event: StreamEvent) => State;
initialState: () => State;
mapToStreamId?: (id: string) => string;
retry?: CommandHandlerRetryOptions;
schema?: {
versioning?: {
upcast?: (event: StoredEvent) => StreamEvent;
downcast?: (event: StreamEvent) => StoredEvent;
};
};
} & JSONSerializationOptions;
export type HandleOptions<Store extends EventStore> = Parameters<
Store['appendToStream']
>[2] &
(
| {
expectedStreamVersion?: ExpectedStreamVersion;
}
| {
retry?: CommandHandlerRetryOptions;
}
);
type CommandHandlerFunction<State, StreamEvent extends Event> = (
state: State,
) => StreamEvent | StreamEvent[] | Promise<StreamEvent | StreamEvent[]>;
export const CommandHandler =
<
State,
StreamEvent extends Event,
EventPayloadType extends Event = StreamEvent,
>(
options: CommandHandlerOptions<State, StreamEvent, EventPayloadType>,
) =>
async <Store extends EventStore>(
store: Store,
id: string,
handle:
| CommandHandlerFunction<State, StreamEvent>
| CommandHandlerFunction<State, StreamEvent>[],
handleOptions?: HandleOptions<Store>,
): Promise<CommandHandlerResult<State, StreamEvent, Store>> =>
asyncRetry(
async () => {
const result = await withSession<
Store,
CommandHandlerResult<State, StreamEvent, Store>
>(store, async ({ eventStore }) => {
const { evolve, initialState } = options;
const mapToStreamId = options.mapToStreamId ?? ((id) => id);
const streamName = mapToStreamId(id);
// 1. Aggregate the stream
const aggregationResult = await eventStore.aggregateStream<
State,
StreamEvent,
EventPayloadType
>(streamName, {
evolve,
initialState,
read: {
schema: options.schema,
...(handleOptions as ReadStreamOptions<
StreamEvent,
EventPayloadType
>),
serialization: options.serialization,
// expected stream version is passed to fail fast
// if stream is in the wrong state
expectedStreamVersion:
handleOptions?.expectedStreamVersion ?? NO_CONCURRENCY_CHECK,
},
});
// 2. Use the aggregate state
const {
currentStreamVersion,
streamExists: _streamExists,
...restOfAggregationResult
} = aggregationResult;
let state = aggregationResult.state;
const handlers = Array.isArray(handle) ? handle : [handle];
let eventsToAppend: StreamEvent[] = [];
// 3. Run business logic
for (const handler of handlers) {
const result = await handler(state);
const newEvents = Array.isArray(result) ? result : [result];
if (newEvents.length > 0) {
state = newEvents.reduce(evolve, state);
}
eventsToAppend = [...eventsToAppend, ...newEvents];
}
//const newEvents = Array.isArray(result) ? result : [result];
if (eventsToAppend.length === 0) {
return {
...restOfAggregationResult,
newEvents: [],
newState: state,
nextExpectedStreamVersion: currentStreamVersion,
createdNewStream: false,
} as unknown as CommandHandlerResult<State, StreamEvent, Store>;
}
// Either use:
// - provided expected stream version,
// - current stream version got from stream aggregation,
// - or expect stream not to exists otherwise.
const expectedStreamVersion: ExpectedStreamVersion =
handleOptions?.expectedStreamVersion ??
(aggregationResult.streamExists
? currentStreamVersion
: STREAM_DOES_NOT_EXIST);
// 4. Append result to the stream
const appendResult = await eventStore.appendToStream(
streamName,
eventsToAppend,
{
...(handleOptions as AppendToStreamOptions<
StreamEvent,
EventPayloadType
>),
expectedStreamVersion,
},
);
// 5. Return result with updated state
return {
...appendResult,
newEvents: eventsToAppend,
newState: state,
} as unknown as CommandHandlerResult<State, StreamEvent, Store>;
});
return result;
},
fromCommandHandlerRetryOptions(
handleOptions && 'retry' in handleOptions
? handleOptions.retry
: options.retry,
),
);