Skip to content

Command Handler

WARNING

We created this page with the help of the GenAI tool.

We're currently double-checking it to ensure the information is 100% correct and free of hallucinations.

The Command Handler encapsulates the standard Event Sourcing pattern: read state, run business logic, append events.

Overview

Command handling follows a repeatable pattern:

  1. Aggregate stream - Read events and build current state
  2. Execute business logic - Run the handler with command and state
  3. Append events - Store the resulting events with optimistic concurrency
typescript
// The pattern Command Handler automates:
const { state, currentStreamVersion } = await eventStore.aggregateStream(
  streamName,
  { evolve, initialState },
);

const events = handle(command, state);

await eventStore.appendToStream(streamName, events, {
  expectedStreamVersion: currentStreamVersion,
});

Basic Usage

Creating a Command Handler

ts
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';

export const handle = CommandHandler({ evolve, initialState });

Using the Handler

ts

import { getInMemoryEventStore } from '@event-driven-io/emmett';

const eventStore = getInMemoryEventStore();

const command: AddProductItemToShoppingCart = {
  type: 'AddProductItemToShoppingCart',
  data: {
    shoppingCartId,
    productItem,
  },
};

const { nextExpectedStreamVersion } = await handle(
  eventStore,
  shoppingCartId,
  (state) => addProductItem(command, state),
);

Type Definitions

CommandHandler

typescript
const CommandHandler = <State, StreamEvent extends Event>(
  options: CommandHandlerOptions<State, StreamEvent>
) => async <Store extends EventStore>(
  store: Store,
  id: string,
  handle: CommandHandlerFunction<State, StreamEvent>,
  handleOptions?: HandleOptions<Store>
): Promise<CommandHandlerResult<State, StreamEvent, Store>>;

CommandHandlerOptions

typescript
type CommandHandlerOptions<State, StreamEvent extends Event> = {
  evolve: (state: State, event: StreamEvent) => State;
  initialState: () => State;
  mapToStreamId?: (id: string) => string;
  retry?: CommandHandlerRetryOptions;
};
PropertyTypeDescription
evolve(state, event) => stateState evolution function
initialState() => StateFactory for initial state
mapToStreamId(id: string) => stringMaps ID to stream name (default: identity)
retryCommandHandlerRetryOptionsRetry configuration

CommandHandlerResult

typescript
type CommandHandlerResult<State, StreamEvent, Store> = {
  newState: State;
  newEvents: StreamEvent[];
  nextExpectedStreamVersion: bigint;
  createdNewStream: boolean;
};

Stream ID Mapping

Map business IDs to stream names:

typescript
const handle = CommandHandler({
  evolve,
  initialState,
  mapToStreamId: (id) => `shopping_cart-${id}`,
});

// Called with business ID
await handle(eventStore, 'cart-123', (state) => [...]);
// Internally uses stream: 'shopping_cart-cart-123'

Handler Functions

Handlers receive state and return events:

Single Event

typescript
await handle(eventStore, cartId, (state) => ({
  type: 'ProductItemAdded',
  data: { productId, quantity, price },
}));

Multiple Events

typescript
await handle(eventStore, cartId, (state) => [
  { type: 'DiscountApplied', data: { code: 'SAVE10' } },
  { type: 'TaxCalculated', data: { amount: 15.5 } },
]);

Async Handler

typescript
await handle(eventStore, cartId, async (state) => {
  const price = await lookupPrice(productId);
  return {
    type: 'ProductItemAdded',
    data: { productId, quantity, price },
  };
});

Multiple Handlers (Sequential)

Execute multiple handlers in sequence, each seeing the updated state:

typescript
await handle(eventStore, cartId, [
  (state) => ({ type: 'ProductItemAdded', data: productData }),
  (state) => {
    // state now includes the effect of ProductItemAdded
    if (state.items.length >= 3) {
      return { type: 'BulkDiscountApplied', data: { discount: 10 } };
    }
    return [];
  },
]);

Optimistic Concurrency

Automatic Version Tracking

By default, Command Handler tracks versions automatically:

typescript
// First call: creates stream at version 0
await handle(eventStore, 'cart-123', (state) => firstEvent);

// Second call: expects version 0, appends at version 1
await handle(eventStore, 'cart-123', (state) => secondEvent);

// Concurrent call: fails if version changed
await handle(eventStore, 'cart-123', (state) => thirdEvent);
// Throws ConcurrencyError if stream was modified

Explicit Version

typescript
await handle(eventStore, cartId, (state) => events, {
  expectedStreamVersion: 5n,
});

Require New Stream

typescript
await handle(
  eventStore,
  cartId,
  (state) => [{ type: 'CartOpened', data: {} }],
  { expectedStreamVersion: 'no_stream' },
);

Retry Configuration

Retry on Version Conflict

typescript
const handle = CommandHandler({
  evolve,
  initialState,
  retry: { onVersionConflict: true }, // 3 retries with backoff
});

// Or specify retry count
const handle = CommandHandler({
  evolve,
  initialState,
  retry: { onVersionConflict: 5 },
});

Custom Retry Options

typescript
const handle = CommandHandler({
  evolve,
  initialState,
  retry: {
    retries: 5,
    minTimeout: 100,
    factor: 2,
    shouldRetryError: (error) => error instanceof ConcurrencyError,
  },
});

Per-Call Retry Override

typescript
await handle(eventStore, cartId, (state) => events, {
  retry: { onVersionConflict: 10 },
});

No-Op Handling

If handler returns empty array, no append occurs:

typescript
await handle(eventStore, cartId, (state) => {
  // Already confirmed, do nothing
  if (state.status === 'Confirmed') {
    return [];
  }
  return [{ type: 'CartConfirmed', data: {} }];
});

Error Handling

Business Errors

Throw errors for business rule violations:

typescript
await handle(eventStore, cartId, (state) => {
  if (state.status !== 'Open') {
    throw new IllegalStateError('Cart is not open');
  }

  if (command.quantity <= 0) {
    throw new ValidationError('Quantity must be positive');
  }

  return [{ type: 'ProductItemAdded', data: command }];
});

Concurrency Errors

typescript
import { ConcurrencyError } from '@event-driven-io/emmett';

try {
  await handle(eventStore, cartId, (state) => events);
} catch (error) {
  if (error instanceof ConcurrencyError) {
    // Stream was modified by another process
    console.log(
      `Version conflict: expected ${error.expected}, got ${error.actual}`,
    );
    // Retry with fresh state or notify user
  }
  throw error;
}

Integration with Web Frameworks

Express.js

typescript
import { on, ok } from '@event-driven-io/emmett-expressjs';

router.post(
  '/carts/:id/items',
  on(async (request) => {
    const { id } = request.params;
    const { productId, quantity } = request.body;

    const result = await handle(eventStore, id, (state) => ({
      type: 'ProductItemAdded',
      data: { productId, quantity, price: await getPrice(productId) },
    }));

    return ok({
      status: 'Added',
      version: result.nextExpectedStreamVersion.toString(),
    });
  }),
);

With ETag Concurrency

typescript
router.post(
  '/carts/:id/confirm',
  on(async (request) => {
    const { id } = request.params;
    const expectedVersion = getExpectedVersionFromRequest(request);

    const result = await handle(
      eventStore,
      id,
      (state) => ({ type: 'CartConfirmed', data: { confirmedAt: new Date() } }),
      { expectedStreamVersion: expectedVersion },
    );

    return ok({ status: 'Confirmed' });
  }),
);

Best Practices

1. Keep Handlers Pure When Possible

typescript
// ✅ Good: Pure handler
await handle(eventStore, cartId, (state) => ({
  type: 'ProductItemAdded',
  data: { ...command.data, addedAt: command.metadata.now },
}));

// ⚠️ Less ideal: Side effects in handler
await handle(eventStore, cartId, async (state) => {
  await externalService.notify(); // Side effect
  return [event];
});

2. Validate Before Deciding

typescript
// ✅ Good: Guard clauses first
await handle(eventStore, cartId, (state) => {
  if (state.status === 'Confirmed') {
    throw new IllegalStateError('Cannot modify confirmed cart');
  }
  return [event];
});

3. Use Type-Safe Event Unions

typescript
// ✅ Good: Discriminated union of events
type ShoppingCartEvent =
  | Event<'ProductItemAdded', {...}>
  | Event<'CartConfirmed', {...}>;

const handle = CommandHandler<ShoppingCart, ShoppingCartEvent>({
  evolve,
  initialState,
});

Type Source

ts
export type CommandHandlerResult<
  State,
  StreamEvent extends Event,
  Store extends EventStore,
> = AppendStreamResultOfEventStore<Store> & {
  newState: State;
  newEvents: StreamEvent[];
};

export type CommandHandlerOptions<
  State,
  StreamEvent extends Event,
  StoredEvent extends Event = StreamEvent,
> = {
  evolve: (state: State, event: StreamEvent) => State;
  initialState: () => State;
  mapToStreamId?: (id: string) => string;
  retry?: CommandHandlerRetryOptions;
  schema?: {
    versioning?: {
      upcast?: (event: StoredEvent) => StreamEvent;
      downcast?: (event: StreamEvent) => StoredEvent;
    };
  };
} & JSONSerializationOptions;

export type HandleOptions<Store extends EventStore> = Parameters<
  Store['appendToStream']
>[2] &
  (
    | {
        expectedStreamVersion?: ExpectedStreamVersion;
      }
    | {
        retry?: CommandHandlerRetryOptions;
      }
  );

type CommandHandlerFunction<State, StreamEvent extends Event> = (
  state: State,
) => StreamEvent | StreamEvent[] | Promise<StreamEvent | StreamEvent[]>;

export const CommandHandler =
  <
    State,
    StreamEvent extends Event,
    EventPayloadType extends Event = StreamEvent,
  >(
    options: CommandHandlerOptions<State, StreamEvent, EventPayloadType>,
  ) =>
  async <Store extends EventStore>(
    store: Store,
    id: string,
    handle:
      | CommandHandlerFunction<State, StreamEvent>
      | CommandHandlerFunction<State, StreamEvent>[],
    handleOptions?: HandleOptions<Store>,
  ): Promise<CommandHandlerResult<State, StreamEvent, Store>> =>
    asyncRetry(
      async () => {
        const result = await withSession<
          Store,
          CommandHandlerResult<State, StreamEvent, Store>
        >(store, async ({ eventStore }) => {
          const { evolve, initialState } = options;
          const mapToStreamId = options.mapToStreamId ?? ((id) => id);

          const streamName = mapToStreamId(id);

          // 1. Aggregate the stream
          const aggregationResult = await eventStore.aggregateStream<
            State,
            StreamEvent,
            EventPayloadType
          >(streamName, {
            evolve,
            initialState,
            read: {
              schema: options.schema,
              ...(handleOptions as ReadStreamOptions<
                StreamEvent,
                EventPayloadType
              >),
              serialization: options.serialization,
              // expected stream version is passed to fail fast
              // if stream is in the wrong state
              expectedStreamVersion:
                handleOptions?.expectedStreamVersion ?? NO_CONCURRENCY_CHECK,
            },
          });

          // 2. Use the aggregate state

          const {
            currentStreamVersion,
            streamExists: _streamExists,
            ...restOfAggregationResult
          } = aggregationResult;

          let state = aggregationResult.state;

          const handlers = Array.isArray(handle) ? handle : [handle];
          let eventsToAppend: StreamEvent[] = [];

          // 3. Run business logic
          for (const handler of handlers) {
            const result = await handler(state);

            const newEvents = Array.isArray(result) ? result : [result];

            if (newEvents.length > 0) {
              state = newEvents.reduce(evolve, state);
            }

            eventsToAppend = [...eventsToAppend, ...newEvents];
          }

          //const newEvents = Array.isArray(result) ? result : [result];

          if (eventsToAppend.length === 0) {
            return {
              ...restOfAggregationResult,
              newEvents: [],
              newState: state,

              nextExpectedStreamVersion: currentStreamVersion,
              createdNewStream: false,
            } as unknown as CommandHandlerResult<State, StreamEvent, Store>;
          }

          // Either use:
          // - provided expected stream version,
          // - current stream version got from stream aggregation,
          // - or expect stream not to exists otherwise.

          const expectedStreamVersion: ExpectedStreamVersion =
            handleOptions?.expectedStreamVersion ??
            (aggregationResult.streamExists
              ? currentStreamVersion
              : STREAM_DOES_NOT_EXIST);

          // 4. Append result to the stream
          const appendResult = await eventStore.appendToStream(
            streamName,
            eventsToAppend,
            {
              ...(handleOptions as AppendToStreamOptions<
                StreamEvent,
                EventPayloadType
              >),
              expectedStreamVersion,
            },
          );

          // 5. Return result with updated state
          return {
            ...appendResult,
            newEvents: eventsToAppend,
            newState: state,
          } as unknown as CommandHandlerResult<State, StreamEvent, Store>;
        });

        return result;
      },
      fromCommandHandlerRetryOptions(
        handleOptions && 'retry' in handleOptions
          ? handleOptions.retry
          : options.retry,
      ),
    );

See Also