Skip to content

Command Handler

Event Sourcing brings a repeatable pattern for handling business logic. We can expand that to application logic.

Command Handling can be described by the following steps:

  1. Read events from the stream and build the state from them (in other words aggregate stream). Get also the current version of the stream.
  2. Run the business logic using the command and the state. Use the default (initial) state if the stream does not exist.
  3. Append the result of the business logic (so events) at the end of the stream from which you've read events. Use the read version (or the one provided by the user) for an optimistic concurrency check.

In pseudo-code, this could look as follows:

ts
const { state, expectedStreamVersion } = await eventStore.aggregateStream(
  streamName,
  {
    evolve,
    initialState,
  },
);

const events = handle(command, state);

await eventStore.appendToStream(streamName, result, { expectedStreamVersion });

That looks quite simple, but generalising it and making it robust requires some experience. But that's why you have Emmett, the intention is to cut the learning curve for you and help you with basic abstractions.

Usage

You can use the CommandHandler method to set up a command handler for you:

ts
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';

export const handle = CommandHandler({ evolve, initialState });

Such handlers should be defined per stream type (e.g., one for Shopping Cart, the other for Orders, etc.). It can be used later in the application code as:

ts

import { getInMemoryEventStore } from '@event-driven-io/emmett';

const eventStore = getInMemoryEventStore();

const command: AddProductItemToShoppingCart = {
  type: 'AddProductItemToShoppingCart',
  data: {
    shoppingCartId,
    productItem,
  },
};

const { nextExpectedStreamVersion } = await handle(
  eventStore,
  shoppingCartId,
  (state) => addProductItem(command, state),
);

Definition

ts
export type CommandHandlerResult<
  State,
  StreamEvent extends Event,
  Store extends EventStore,
> = AppendStreamResultOfEventStore<Store> & {
  newState: State;
  newEvents: StreamEvent[];
};

export type CommandHandlerOptions<State, StreamEvent extends Event> = {
  evolve: (state: State, event: StreamEvent) => State;
  initialState: () => State;
  mapToStreamId?: (id: string) => string;
  retry?: CommandHandlerRetryOptions;
};

export type HandleOptions<Store extends EventStore> = Parameters<
  Store['appendToStream']
>[2] &
  (
    | {
        expectedStreamVersion?: ExpectedStreamVersion<
          StreamPositionTypeOfEventStore<Store>
        >;
      }
    | {
        retry?: CommandHandlerRetryOptions;
      }
  );

type CommandHandlerFunction<State, StreamEvent extends Event> = (
  state: State,
) => StreamEvent | StreamEvent[] | Promise<StreamEvent | StreamEvent[]>;

export const CommandHandler =
  <State, StreamEvent extends Event>(
    options: CommandHandlerOptions<State, StreamEvent>,
  ) =>
  async <Store extends EventStore>(
    store: Store,
    id: string,
    handle:
      | CommandHandlerFunction<State, StreamEvent>
      | CommandHandlerFunction<State, StreamEvent>[],
    handleOptions?: HandleOptions<Store>,
  ): Promise<CommandHandlerResult<State, StreamEvent, Store>> =>
    asyncRetry(
      async () => {
        const result = await withSession<
          Store,
          CommandHandlerResult<
            State,
            StreamEvent,
            StreamPositionTypeOfEventStore<Store>
          >
        >(store, async ({ eventStore }) => {
          const { evolve, initialState } = options;
          const mapToStreamId = options.mapToStreamId ?? ((id) => id);

          const streamName = mapToStreamId(id);

          // 1. Aggregate the stream
          const aggregationResult = await eventStore.aggregateStream<
            State,
            StreamEvent
          >(streamName, {
            evolve,
            initialState,
            read: {
              // expected stream version is passed to fail fast
              // if stream is in the wrong state
              expectedStreamVersion:
                handleOptions?.expectedStreamVersion ?? NO_CONCURRENCY_CHECK,
            },
          });

          // 2. Use the aggregate state

          const {
            // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
            currentStreamVersion,
            streamExists: _streamExists,
            ...restOfAggregationResult
          } = aggregationResult;

          let state = aggregationResult.state;

          const handlers = Array.isArray(handle) ? handle : [handle];
          let eventsToAppend: StreamEvent[] = [];

          // 3. Run business logic
          for (const handler of handlers) {
            const result = await handler(state);

            const newEvents = Array.isArray(result) ? result : [result];

            if (newEvents.length > 0) {
              state = newEvents.reduce(evolve, state);
            }

            eventsToAppend = [...eventsToAppend, ...newEvents];
          }

          //const newEvents = Array.isArray(result) ? result : [result];

          if (eventsToAppend.length === 0) {
            return {
              ...restOfAggregationResult,
              newEvents: [],
              newState: state,
              // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
              nextExpectedStreamVersion: currentStreamVersion,
              createdNewStream: false,
            } as unknown as CommandHandlerResult<State, StreamEvent, Store>;
          }

          // Either use:
          // - provided expected stream version,
          // - current stream version got from stream aggregation,
          // - or expect stream not to exists otherwise.
          // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
          const expectedStreamVersion: ExpectedStreamVersion<
            StreamPositionTypeOfEventStore<Store>
          > =
            handleOptions?.expectedStreamVersion ??
            (aggregationResult.streamExists
              ? (currentStreamVersion as ExpectedStreamVersion<
                  StreamPositionTypeOfEventStore<Store>
                >)
              : STREAM_DOES_NOT_EXIST);

          // 4. Append result to the stream
          const appendResult = await eventStore.appendToStream(
            streamName,
            eventsToAppend,
            {
              ...handleOptions,
              expectedStreamVersion,
            },
          );

          // 5. Return result with updated state
          return {
            ...appendResult,
            newEvents: eventsToAppend,
            newState: state,
          } as unknown as CommandHandlerResult<State, StreamEvent, Store>;
        });

        return result;
      },
      fromCommandHandlerRetryOptions(
        handleOptions && 'retry' in handleOptions
          ? handleOptions.retry
          : options.retry,
      ),
    );