Getting Started
Event Sourcing
Event Sourcing is architecting for tomorrow's questions. Which is essential as today's decisions are tomorrow's context. We keep all the facts that happened in our system. Facts are stored as events that can be used to make decisions, fine-tune read models, integrate our systems, and enhance our analytics and tracking. All in one package, wash and go!
This simple pattern allows easier integration, building applications from smaller building blocks, keeping the cognitive load and coupling on a leash.
Yet, some say that's complex and complicated; Emmett aims to prove that it doesn't have to be like that. We cut the boilerplate and layered madness, letting you focus on delivery. We're opinionated but focus on composition, not magic. Let me show you how.
Installation
Before we dive into the real world's scenario, let's add Emmett to your project, as we'll use its types and tooling to streamline the development effort.
I assume that you have Node.js installed. Add the package from the terminal using your favourite package manager:
$ npm add @event-driven-io/emmett
$ pnpm add @event-driven-io/emmett
$ yarn add @event-driven-io/emmett
$ bun add @event-driven-io/emmett
Events
Events are the centrepiece of event-sourced systems. They represent both critical points of the business process but are also used as the state. That enables you to reflect your business into the code better, getting the synergy. Let's model a simple business process: a shopping cart. You can open it, add or remove the product from it and confirm or cancel.
We could define it as follows:
import type { Event } from '@event-driven-io/emmett';
export type ProductItemAddedToShoppingCart = Event<
'ProductItemAddedToShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
addedAt: Date;
}
>;
export type ProductItemRemovedFromShoppingCart = Event<
'ProductItemRemovedFromShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
removedAt: Date;
}
>;
export type ShoppingCartConfirmed = Event<
'ShoppingCartConfirmed',
{
shoppingCartId: string;
confirmedAt: Date;
}
>;
export type ShoppingCartCancelled = Event<
'ShoppingCartCancelled',
{
shoppingCartId: string;
canceledAt: Date;
}
>;
export type ShoppingCartEvent =
| ProductItemAddedToShoppingCart
| ProductItemRemovedFromShoppingCart
| ShoppingCartConfirmed
| ShoppingCartCancelled;
export interface ProductItem {
productId: string;
quantity: number;
}
export type PricedProductItem = ProductItem & {
unitPrice: number;
};
It shows that clients can add or remove products to our shopping cart and confirm or cancel them. All events represent facts that happened and tell the story of the shopping cart. To highlight that, we're grouping all type definitions with the ShoppingCartEvent
union type. It tells that either of those events may happen.
We're using Event type, which helps to keep the event definition aligned. It's not a must, but it helps to ensure that it has a type name defined (e.g. ProductItemAddedToShoppingCart
) and read-only payload data.
Commands
We need to provide a clear business intention to capture a meaningful business event. We can declare it by defining the set of commands to tell what business logic we'll be handling:
import type { Command } from '@event-driven-io/emmett';
import type { PricedProductItem } from './events';
export type AddProductItemToShoppingCart = Command<
'AddProductItemToShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
}
>;
export type RemoveProductItemFromShoppingCart = Command<
'RemoveProductItemFromShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
}
>;
export type ConfirmShoppingCart = Command<
'ConfirmShoppingCart',
{
shoppingCartId: string;
}
>;
export type CancelShoppingCart = Command<
'CancelShoppingCart',
{
shoppingCartId: string;
}
>;
export type ShoppingCartCommand =
| AddProductItemToShoppingCart
| RemoveProductItemFromShoppingCart
| ConfirmShoppingCart
| CancelShoppingCart;
Accordingly, to define the event, we're using the Command
type, ensuring that our type definition is aligned. Using the Command
type is optional, as Emmett won't force you to use any marker types, but with TypeScript structural typing, this is quite useful to align the type definition.
Business logic and decisions
Knowing what may happen (events) and our business intentions (commands), we can define our business logic.
Let's say that we have the following business rules:
- The customer must provide the quantity when selecting and adding a product to the basket. The system calculates the product price based on the current price list.
- The customer may remove a product with a given price from the cart.
- The customer can confirm the shopping cart and start the order fulfilment process.
- The customer may cancel the shopping cart and reject all selected products.
- After shopping cart confirmation or cancellation, the product can no longer be added or removed from the cart.
To evaluate business rules, it'd be helpful if we had the current state in which we could make our decisions. It could look as follows:
export type EmptyShoppingCart = {
status: 'Empty';
};
export type OpenedShoppingCart = {
status: 'Opened';
productItems: ProductItems;
};
export type ClosedShoppingCart = {
status: 'Closed';
};
export type ShoppingCart =
| EmptyShoppingCart
| OpenedShoppingCart
| ClosedShoppingCart;
export type ProductItems = Map<string, number>;
Simple as that. No additional classes are needed. Our shopping cart can be either Empty (initial state), Opened (we added or removed items) or Closed (confirmed or cancelled). Based on the above rules, it's enough to keep product items as a simple map with product id and the record with price and quantity.
Keep your state slimmed down
It's essential to keep our state focused on decision-making. We should trim it to only contain data used in our business rules evaluation. Read more in the article
Now, let's define our business logic! We can do it through a set of functions:
import { EmmettError, IllegalStateError, sum } from '@event-driven-io/emmett';
export const addProductItem = (
command: AddProductItemToShoppingCart,
state: ShoppingCart,
): ProductItemAddedToShoppingCart => {
if (state.status === 'Closed')
throw new IllegalStateError('Shopping Cart already closed');
const {
data: { shoppingCartId, productItem },
metadata,
} = command;
return {
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: metadata?.now ?? new Date(),
},
};
};
export const removeProductItem = (
command: RemoveProductItemFromShoppingCart,
state: ShoppingCart,
): ProductItemRemovedFromShoppingCart => {
if (state.status !== 'Opened')
throw new IllegalStateError('Shopping Cart is not opened');
const {
data: { shoppingCartId, productItem },
metadata,
} = command;
const currentQuantity = state.productItems.get(productItem.productId) ?? 0;
if (currentQuantity < productItem.quantity)
throw new IllegalStateError('Not enough products');
return {
type: 'ProductItemRemovedFromShoppingCart',
data: {
shoppingCartId,
productItem,
removedAt: metadata?.now ?? new Date(),
},
};
};
export const confirm = (
command: ConfirmShoppingCart,
state: ShoppingCart,
): ShoppingCartConfirmed => {
if (state.status !== 'Opened')
throw new IllegalStateError('Shopping Cart is not opened');
const totalQuantityOfAllProductItems = sum(state.productItems.values());
if (totalQuantityOfAllProductItems <= 0)
throw new IllegalStateError('Shopping Cart is empty');
const {
data: { shoppingCartId },
metadata,
} = command;
return {
type: 'ShoppingCartConfirmed',
data: {
shoppingCartId,
confirmedAt: metadata?.now ?? new Date(),
},
};
};
export const cancel = (
command: CancelShoppingCart,
state: ShoppingCart,
): ShoppingCartCancelled => {
if (state.status !== 'Opened')
throw new IllegalStateError('Shopping Cart is not opened');
const {
data: { shoppingCartId },
metadata,
} = command;
return {
type: 'ShoppingCartCancelled',
data: {
shoppingCartId,
canceledAt: metadata?.now ?? new Date(),
},
};
};
As you see, this is a simple set of functions with a repeatable pattern. They take command and the state and a make decision. The result of business logic is always an event (or multiple events). If business rules validation fails, you can either throw an error or return a failure result (it can also be another event), but I'll let you decide on your preferences.
We can also wrap the whole processing into a single function:
export const decide = (command: ShoppingCartCommand, state: ShoppingCart) => {
const { type } = command;
switch (type) {
case 'AddProductItemToShoppingCart':
return addProductItem(command, state);
case 'RemoveProductItemFromShoppingCart':
return removeProductItem(command, state);
case 'ConfirmShoppingCart':
return confirm(command, state);
case 'CancelShoppingCart':
return cancel(command, state);
default: {
const _notExistingCommandType: never = type;
throw new EmmettError(`Unknown command type`);
}
}
};
Such repeatable patterns are powerful, as they open easier composition, which is the base of the Emmett way.
Building state from events
We know how to run business logic based on state and command. Yet, in event sourcing, events are the state.
Each event recorded due to the business logic is appended to the event stream. An event stream is an ordered sequence of events. Event stream id equals the entity id (e.g. shopping cart id). To get the current state of events, we need to read all events previously recorded. Then, we take the initial state and apply it one by one to get the current state at the time. Yes, the state we'll use in business logic.
The state aggregation can be coded as:
const currentState = events.reduce<State>(evolve, initialState());
For our case initial state can look like:
export const initialState = (): ShoppingCart => {
return {
status: 'Empty',
};
};
Now let's define the evolve
function that will evolve our state based on events:
export const evolve = (
state: ShoppingCart,
event: ShoppingCartEvent,
): ShoppingCart => {
const { type, data } = event;
switch (type) {
case 'ProductItemAddedToShoppingCart':
case 'ProductItemRemovedFromShoppingCart': {
if (state.status !== 'Opened' && state.status !== 'Empty') return state;
const {
productItem: { productId, quantity },
} = data;
const productItems =
state.status === 'Opened'
? state.productItems
: new Map<string, number>();
const plusOrMinus = type == 'ProductItemAddedToShoppingCart' ? 1 : -1;
return {
status: 'Opened',
productItems: withUpdatedQuantity(
productItems,
productId,
plusOrMinus * quantity,
),
};
}
case 'ShoppingCartConfirmed':
case 'ShoppingCartCancelled':
return { status: 'Closed' };
default:
return state;
}
};
const withUpdatedQuantity = (
current: ProductItems,
productId: string,
quantity: number,
) => {
const productItems = new Map(current);
const currentQuantity = productItems.get(productId) ?? 0;
productItems.set(productId, currentQuantity + quantity);
return productItems;
};
Read also more in article How to get the current entity state from events? and follow up on Should you throw an exception when rebuilding the state from events?.
Unit Testing
One of the mentioned benefits is testing, which Emmett helps to do out of the box.
For Event Sourcing, the testing pattern looks like this:
- GIVEN set of events recorded for the entity,
- WHEN we run the command on the state built from events,
- THEN we’re getting new event(s) as a result of business logic. Or the exception is thrown.
Tests for our Shopping Cart business logic can look like this:
import { DeciderSpecification } from '@event-driven-io/emmett';
import { randomUUID } from 'node:crypto';
import type { PricedProductItem } from './events';
const given = DeciderSpecification.for({
decide,
evolve,
initialState: initialState,
});
void describe('ShoppingCart', () => {
void describe('When empty', () => {
void it('should add product item', () => {
given([])
.when({
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
metadata: { now },
})
.then([
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: now,
},
},
]);
});
});
void describe('When opened', () => {
void it('should confirm', () => {
given({
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: oldTime,
},
})
.when({
type: 'ConfirmShoppingCart',
data: {
shoppingCartId,
},
metadata: { now },
})
.then([
{
type: 'ShoppingCartConfirmed',
data: {
shoppingCartId,
confirmedAt: now,
},
},
]);
});
});
void describe('When confirmed', () => {
void it('should not add products', () => {
given([
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: oldTime,
},
},
{
type: 'ShoppingCartConfirmed',
data: { shoppingCartId, confirmedAt: oldTime },
},
])
.when({
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
metadata: { now },
})
.thenThrows(
(error: Error) => error.message === 'Shopping Cart already closed',
);
});
});
const getRandomProduct = (): PricedProductItem => {
return {
productId: randomUUID(),
unitPrice: Math.random() * 10,
quantity: Math.random() * 10,
};
};
const oldTime = new Date();
const now = new Date();
const shoppingCartId = randomUUID();
const productItem = getRandomProduct();
});
Event store
Emmett is an Event Sourcing framework, so we need an event store to store events, aye? Event stores are key-value databases. The key is a record id, and the value is an ordered list of events. Such a sequence of events is called Event Stream. One stream keeps all events recorded for a particular business process or entity.
The essential difference between Event Sourcing and Event Streaming is that in Event Sourcing, events are the state. There's no other state. We use recorded events to get the state and make the next decisions, resulting in more events. Plus, as you'd expect from the database, we get strong consistency on writes and reads. Read more in article.
Emmett provides a lightweight abstraction for event stores. We don't intend to provide the lowest common denominator but streamline the typical usage patterns. It's OK if you use your preferred event store or client for the cases where those parts do not suffice your needs. Still, what's there should take you far enough.
Here is the general definition of it:
export interface EventStore<
ReadEventMetadataType extends AnyReadEventMetadata = AnyReadEventMetadata,
> {
aggregateStream<State, EventType extends Event>(
streamName: string,
options: AggregateStreamOptions<State, EventType, ReadEventMetadataType>,
): Promise<
AggregateStreamResult<
State,
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
readStream<EventType extends Event>(
streamName: string,
options?: ReadStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<ReadStreamResult<EventType, ReadEventMetadataType>>;
appendToStream<EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<
AppendToStreamResult<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
// streamEvents(): ReadableStream<
// ReadEvent<Event, ReadEventMetadataType> | GlobalSubscriptionEvent
// >;
}
export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer ReadEventMetadataType>
? ReadEventMetadataType extends ReadEventMetadata<infer GV, infer SV>
? ReadEventMetadata<GV, SV> & ReadEventMetadataType
: never
: never;
export type GlobalPositionTypeOfEventStore<Store extends EventStore> =
GlobalPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type StreamPositionTypeOfEventStore<Store extends EventStore> =
StreamPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type EventStoreSession<EventStoreType extends EventStore> = {
eventStore: EventStoreType;
close: () => Promise<void>;
};
export interface EventStoreSessionFactory<EventStoreType extends EventStore> {
withSession<T = unknown>(
callback: (session: EventStoreSession<EventStoreType>) => Promise<T>,
): Promise<T>;
}
It brings you three most important methods:
readStream
- reads events for the specific stream. By default, it reads all events, but through options, you can specify the event range you want to get (from
,to
,maxCount
). You can also specify the expected stream version.appendToStream
- appends new events at the end of the stream. All events should be appended as an atomic operation. You can specify the expected stream version for an optimistic concurrency check. We're also getting the next stream version as a result.aggregateStream
- builds the current state from events. Internally, event store implementation should read all events in the stream based on the passed initial state and theevolve
function. It also supports all the same options as thereadStream
method.
Read more about how event stores are built in the article.
Command Handling
As you saw in the unit tests example, Event Sourcing brings a repeatable pattern for handling business logic. We can expand that to application logic.
Command Handling can be described by the following steps:
- Read events from the stream and build the state from them (in other words aggregate stream). Get also the current version of the stream.
- Run the business logic using the command and the state. Use the default (initial) state if the stream does not exist.
- Append the result of the business logic (so events) at the end of the stream from which you've read events. Use the read version (or the one provided by the user) for an optimistic concurrency check.
In pseudo-code, this could look as follows:
const { state, expectedStreamVersion } = await eventStore.aggregateStream(
streamName,
{
evolve,
initialState,
},
);
const events = handle(command, state);
await eventStore.appendToStream(streamName, result, { expectedStreamVersion });
That looks quite simple, but generalising it and making it robust requires some experience. But that's why you have Emmett, the intention is to cut the learning curve for you and help you with basic abstractions.
You can use the CommandHandler
method to set up a command handler for you:
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';
export const handle = CommandHandler({ evolve, initialState });
Such handlers should be defined per stream type (e.g., one for Shopping Cart, the other for Orders, etc.). It can be used later in the application code as:
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
};
const { nextExpectedStreamVersion } = await handle(
eventStore,
shoppingCartId,
(state) => addProductItem(command, state),
);
You could put such code, e.g. in your WebApi endpoint. Let's go to the next step and use that in practice in the real web application.
Application Setup
Seems like we have our business rules modelled, business logic reflected in code, and even tested. You also know how to write application code for handling commands. Isn't that cool? That's nice, but we need to build real applications, which nowadays typically mean a Web Application. Let's try to do it as well.
Node.js is a great, lightweight environment that doesn't require much ceremony. Some tools try to bring, but we want to keep it simple in Emmett. What could be simpler than building an Express.js application?
Let's add some flavour to the classical Express.js app by installing the emmett-expressjs
package:
$ npm add @event-driven-io/emmett-expressjs
$ pnpm add @event-driven-io/emmett-expressjs
$ yarn add @event-driven-io/emmett-expressjs
$ bun add @event-driven-io/emmett-expressjs
We don't want to replace your favourite frameworks but get synergy with them. We want to help you cut the boilerplate by providing safe defaults to the configuration and simple wrappers. The example?
import { getInMemoryEventStore } from '@event-driven-io/emmett';
import { getApplication, startAPI } from '@event-driven-io/emmett-expressjs';
import type { Application } from 'express';
import type { Server } from 'http';
const eventStore = getInMemoryEventStore();
const shoppingCarts = shoppingCartApi(
eventStore,
getUnitPrice,
() => new Date(),
);
const application: Application = getApplication({
apis: [shoppingCarts],
});
const server: Server = startAPI(application);
Those are just a few lines, but there are a few things to discuss here. Let's tackle them one by one.
Application setup
Emmett provides the getApplication method that sets up the recommended configuration of the Express.js application. By calling it, you'll get:
JSON and Url Encoding middlewares set up needed for WebApi request processing,
Problem details middleware. Why reinvent the wheel if there's now an industry standard for handling error responses? See RFC 9457 - Problem Details for HTTP APIs) - we're implementing it for you out of the box. We've also set up a basic error-to-status mapping convention. For instance, for Emmett built-in error types:
ValidationError
to400
,IllegalStateError
to403
,NotFoundError
to404
,ConcurrencyError
to412
.
You can also customise that and provide your custom mapping.
Default setup for using ETag headers for optimistic concurrency.
Unified way of setting up WebApis via providing a set of router configurations.
Of course, you can disable all of that or use your current setup. All of that is optional for Emmett to work. We just want to make things easier for you and help you speed up your development using industry standards. We prefer composition over replacement.
Starting Application
The startAPI
method encapsulates the default startup options like the default port (in Emmett's case, it's 3000
). A separate startAPI
method allows you to customise the default application setup and makes it easier to run integration tests, as you'll see in a moment.
Router configuration
To configure API, we need to provide router configuration. We can do it via the apis
property of the getApplication
options. WebApi setup is a simple function that takes the router and defines needed routings on it.
export type WebApiSetup = (router: Router) => void;
We recommend providing different web app configurations for different endpoints' logical groupings. It's also worth injecting all needed dependencies from the top, as that will make integration testing easier.
That's what we did in our case. We've set up our Shopping Carts API and injected external dependencies:
- event store to store and retrieve events,
- The
getUnitPrice
method represents a call to an external service to get the price of a product added to the shopping cart, - We're also passing the current date generator. Embracing this non-deterministic dependency will be helpful for integration testing later on.
That clearly explains what dependencies this API needs, and by reading the file, you can understand what your application technology needs. That should cut the onboarding time for new people grasping our system setup.
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();
const shoppingCarts = shoppingCartApi(
eventStore,
getUnitPrice,
() => new Date(),
);
We're using the simplest option for this guide: an in-memory event store. For a real application, you'd need to use another, e.g. EventStoreDB implementation.
Sounds like we have all the building blocks to define our API; let's do it!
WebAPI definition
Let's define our Shopping Cart WebApi. As mentioned before, we'll need two external dependencies: event store and query for product price:
import { type WebApiSetup } from '@event-driven-io/emmett-expressjs';
import { Router } from 'express';
import { evolve, initialState } from '../shoppingCart';
// Let's setup the command handler, we'll use it in endpoints
const handle = CommandHandler({ evolve, initialState });
export const shoppingCartApi =
(
// external dependencies
eventStore: EventStore,
getUnitPrice: (productId: string) => Promise<number>,
): WebApiSetup =>
(router: Router): void => {
// We'll setup routes here
};
The API definition is a function taking external dependencies and returning the Web API setup. We're also setting up Command Handler (as explained in the previous section). Let's not keep it empty for too long and define our first endpoint!
We'll start with adding product items to the shopping cart and vanilla Express.js syntax.
import type { Request, Response } from 'express';
type AddProductItemRequest = Request<
Partial<{ clientId: string; shoppingCartId: string }>,
unknown,
Partial<{ productId: string; quantity: number }>
>;
router.post(
'/clients/:clientId/shopping-carts/current/product-items',
async (request: AddProductItemRequest, response: Response) => {
// 1. Translate request params to the command
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const productId = assertNotEmptyString(request.body.productId);
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem: {
productId,
quantity: assertPositiveNumber(request.body.quantity),
unitPrice: await getUnitPrice(productId),
},
},
};
// 2. Handle command
await handle(eventStore, shoppingCartId, (state) =>
addProductItem(command, state),
);
// 3. Send response status
response.sendStatus(204);
},
);
Web Api Command Handling can be described by the following steps:
- Translate and request params to the command. This is also a place to run necessary validation. Thanks to that, once we created our command, we can trust that it's validated and semantically correct. We don't need to repeat that in the business logic. That reduces the number of IFs and, eventually, the number of unit tests.
- Run command handler on top of business logic. As you see, we keep things explicit; you can still run Go to definition in your IDE and understand what's being run. So we're keeping things that should be explicit, explicit and hiding boilerplate that can be implicit.
- Return the proper HTTP response.
As you see, it's just a regular Express.js syntax. Still, in the longer term, it's better to have a way to make it a more scalable approach and unify intended usage patterns. That's what we're here for, right?
In Emmett, you can define it also like that:
router.post(
'/clients/:clientId/shopping-carts/current/product-items',
on(async (request: AddProductItemRequest) => {
// 1. Translate request params to the command
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const productId = assertNotEmptyString(request.body.productId);
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem: {
productId,
quantity: assertPositiveNumber(request.body.quantity),
unitPrice: await getUnitPrice(productId),
},
},
};
// 2. Handle command
await handle(eventStore, shoppingCartId, (state) =>
addProductItem(command, state),
);
// 3. Return response status
return NoContent();
}),
);
on
is a simple wrapper:
export type HttpResponse = (response: Response) => void;
export type HttpHandler<RequestType extends Request> = (
request: RequestType,
) => Promise<HttpResponse> | HttpResponse;
export const on =
<RequestType extends Request>(handle: HttpHandler<RequestType>) =>
async (
request: RequestType,
response: Response,
_next: NextFunction,
): Promise<void> => {
const setResponse = await Promise.resolve(handle(request));
return setResponse(response);
};
But this simplicity is powerful as:
- it makes code more explicit on what we have as input and what is output. Emmett also defines the explicit signatures for the most common
- unifies request processing, which should enable better handling of telemetry, logging, OpenApi, etc.
- enables keeping endpoint handlers even in different files, so enables organisation,
- you could even unit test it without running the whole application.
If you still don't buy that, check a more advanced scenario showing a different flow, where shopping cart opening should happen explicitly:
router.post(
'/clients/:clientId/shopping-carts/',
on(async (request: Request) => {
const clientId = assertNotEmptyString(request.params.clientId);
const shoppingCartId = clientId;
const result = await handle(
eventStore,
shoppingCartId,
{
type: 'OpenShoppingCart',
data: { clientId, shoppingCartId, now: new Date() },
},
{ expectedStreamVersion: STREAM_DOES_NOT_EXIST },
);
return Created({
createdId: shoppingCartId,
eTag: toWeakETag(result.nextExpectedStreamVersion),
});
}),
);
Yes, Emmett provides more built-in response helpers together with the explicit options. Created will generate the location header. If you're returning the error status (e.g. 404 Not Found
), you can add problem details, information, etc.
What's also sweet is that you can use Emmett's Express.js helpers even without an Event Sourcing code; Bon Appétit!
Still, we're here for the Event Sourcing, so let's see the whole API:
// Add Product Item
router.post(
'/clients/:clientId/shopping-carts/current/product-items',
on(async (request: AddProductItemRequest) => {
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const productId = assertNotEmptyString(request.body.productId);
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem: {
productId,
quantity: assertPositiveNumber(request.body.quantity),
unitPrice: await getUnitPrice(productId),
},
},
metadata: { now: getCurrentTime() },
};
await handle(eventStore, shoppingCartId, (state) =>
addProductItem(command, state),
);
return NoContent();
}),
);
// Remove Product Item
router.delete(
'/clients/:clientId/shopping-carts/current/product-items',
on(async (request: Request) => {
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const command: RemoveProductItemFromShoppingCart = {
type: 'RemoveProductItemFromShoppingCart',
data: {
shoppingCartId,
productItem: {
productId: assertNotEmptyString(request.query.productId),
quantity: assertPositiveNumber(Number(request.query.quantity)),
unitPrice: assertPositiveNumber(Number(request.query.unitPrice)),
},
},
metadata: { now: getCurrentTime() },
};
await handle(eventStore, shoppingCartId, (state) =>
removeProductItem(command, state),
);
return NoContent();
}),
);
// Confirm Shopping Cart
router.post(
'/clients/:clientId/shopping-carts/current/confirm',
on(async (request: Request) => {
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const command: ConfirmShoppingCart = {
type: 'ConfirmShoppingCart',
data: { shoppingCartId },
metadata: { now: getCurrentTime() },
};
await handle(eventStore, shoppingCartId, (state) =>
confirm(command, state),
);
return NoContent();
}),
);
// Cancel Shopping Cart
router.delete(
'/clients/:clientId/shopping-carts/current',
on(async (request: Request) => {
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const command: CancelShoppingCart = {
type: 'CancelShoppingCart',
data: { shoppingCartId },
metadata: { now: getCurrentTime() },
};
await handle(eventStore, shoppingCartId, (state) =>
cancel(command, state),
);
return NoContent();
}),
);
Of course, we could make it even crisper and automagically do the request mapping, more conventional-based status resolution, decorators, and fire-command-and-forget, but we won't. Why?
Emmett prefers composability over magical glue. We believe that a healthy amount of copy-paste won't harm you. We target removability and segregation of the code and making things explicit that should be explicit.
Still, Emmett won't tell you how to live! If you want to add more, feel free to do it. We want to give you basic building blocks and recommendations so you can build on top of that!
Integration Testing
Cool, we now have API up and running. We even tested our domain logic with unit tests. That's great, but as you know, a lot can happen in the meantime. The request mapping or validation may fail; middlewares (like auth one) can say no. It'd be great to test it.
There are many different shapes of Testing: Pyramids, Honeycombs, Thropies etc. All of them share the goal of having them reliable and fast. However, agreeing on where the compromise is and where to put the biggest effort is, of course, challenging to agree.
No matter what your preference is, Emmett has got you covered.
Let's say that you're a fan of Hexagonal/Ports & Adapters Architecture and you'd like to test the whole flow being able to replace dependencies (adapters) with in-memory implementations to have your tests running in-memory. Such approaches have tradeoffs. The pros are that they run smoothly, allow a fast feedback loop, and run tests continuously. The downside is that they don't validate all integration scenarios with real tools. Don't worry, we'll cover that later!
I heard that one picture could speak more than a thousand words, so let's look at this one:
The picture shows the boundaries between the business logic and our application layer.
Our application layer is thin; it's a vertical slice to which the entry point (port) is the WebApi endpoint. Inside it, we can do additional stuff like getting product prices and mapping requests with additional data added to the command. We handle commands in the business logic that return event(s). We're storing them in the event store. This looks like that in the already known adding product to shopping cart code:
const handle = CommandHandler({ evolve, initialState });
type AddProductItemRequest = Request<
Partial<{ clientId: string; shoppingCartId: string }>,
unknown,
Partial<{ productId: string; quantity: number }>
>;
////////////////////////////////////////////////////
// Web Api
////////////////////////////////////////////////////
export const shoppingCartApi =
(
// external dependencies
eventStore: EventStore,
getUnitPrice: (productId: string) => Promise<number>,
): WebApiSetup =>
(router: Router): void => {
////////////////////////////////////////////////////
// Endpoint
////////////////////////////////////////////////////
router.post(
'/clients/:clientId/shopping-carts/current/product-items',
on(async (request: AddProductItemRequest) => {
const shoppingCartId = getShoppingCartId(
assertNotEmptyString(request.params.clientId),
);
const productId = assertNotEmptyString(request.body.productId);
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem: {
productId,
quantity: assertPositiveNumber(request.body.quantity),
unitPrice: await getUnitPrice(productId),
},
},
};
await handle(eventStore, shoppingCartId, (state) =>
addProductItem(command, state),
);
return NoContent();
}),
);
// (...) other endpoints
};
////////////////////////////////////////////////////
// Business Logic
////////////////////////////////////////////////////
export type AddProductItemToShoppingCart = Command<
'AddProductItemToShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
}
>;
export const addProductItem = (
command: AddProductItemToShoppingCart,
state: ShoppingCart,
): ProductItemAddedToShoppingCart => {
if (state.status === 'Closed')
throw new IllegalStateError('Shopping Cart already closed');
const {
data: { shoppingCartId, productItem },
metadata,
} = command;
return {
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: metadata?.now ?? new Date(),
},
};
};
Our slice has 3 ports that one can plug in:
- WebApi endpoint, where the user can send a request (which will be translated to a command).
getUnitPrice
function that gets the product price. Depending on our design, it may represent a call to an external service (e.g. with HTTP), calling a database (e.g. read model) or just running some computation. We're also using it as an input to the command.- Event store, from which we load events and store new facts.
We already made those dependencies explicit, allowing us to replace the real ones in the tests. Also, as we know that we're doing Event Sourcing then, why not take advantage of that and write our tests in the following style:
TIP
- GIVEN set of events recorded for the entity,
- WHEN we run the web API request,
- THEN we’re getting new event(s) as a result of business logic. Or the error status is returned with Problem Details.
Let's start with defining our WebApi specification. We're using ApiSpecification
class from @event-driven-io/emmett-expressjs
.
import { getInMemoryEventStore } from '@event-driven-io/emmett';
import {
ApiSpecification,
getApplication,
} from '@event-driven-io/emmett-expressjs';
const unitPrice = 100;
const now = new Date();
const given = ApiSpecification.for<ShoppingCartEvent>(
() => getInMemoryEventStore(),
(eventStore) =>
getApplication({
apis: [
shoppingCartApi(
eventStore,
() => Promise.resolve(unitPrice),
() => now,
),
],
}),
);
We're also using the same getApplication
known from the previous steps. The only difference is that we replaced real dependencies with the in-memory ones. ApiSpecification
uses internally SuperTest package. It allows straightforward testing of Express.js, e.g. it ensures that server starts in a random port and provides the helpers for building test requests, which we'll use in our tests.
Having it, we can define our tests as:
import {
existingStream,
expectNewEvents,
expectResponse,
} from '@event-driven-io/emmett-expressjs';
void describe('When opened with product item', () => {
void it('should confirm', () => {
return given(
existingStream(shoppingCartId, [
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: oldTime,
},
},
]),
)
.when((request) =>
request.post(`/clients/${clientId}/shopping-carts/current/confirm`),
)
.then([
expectResponse(204),
expectNewEvents(shoppingCartId, [
{
type: 'ShoppingCartConfirmed',
data: {
shoppingCartId,
confirmedAt: now,
},
},
]),
]);
});
});
The test follows the similar Given/When/Then pattern as our unit tests but uses HTTP request to trigger the command handling and uses additional helpers to set up and verify data:
exisitingStream
- allows you to specify the stream id and events existing in the stream. You can set more than one stream if your command handling logic requires that,expectResponse
- verifies the HTTP response with expected criteria like status code. You can also check the response body, headers, etc. For expected errors you can useexpectError
accordingly.expectEvents
- ensures that new events are appended to the specific streams in the event store.
Complete tests will look like this:
import { getInMemoryEventStore } from '@event-driven-io/emmett';
import {
ApiSpecification,
existingStream,
expectError,
expectNewEvents,
expectResponse,
getApplication,
} from '@event-driven-io/emmett-expressjs';
import { randomUUID } from 'node:crypto';
void describe('ShoppingCart', () => {
let clientId: string;
let shoppingCartId: string;
beforeEach(() => {
clientId = randomUUID();
shoppingCartId = `shopping_cart:${clientId}:current`;
});
void describe('When empty', () => {
void it('should add product item', () => {
return given()
.when((request) =>
request
.post(`/clients/${clientId}/shopping-carts/current/product-items`)
.send(productItem),
)
.then([
expectNewEvents(shoppingCartId, [
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: now,
},
},
]),
]);
});
});
void describe('When opened with product item', () => {
void it('should confirm', () => {
return given(
existingStream(shoppingCartId, [
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: oldTime,
},
},
]),
)
.when((request) =>
request.post(`/clients/${clientId}/shopping-carts/current/confirm`),
)
.then([
expectResponse(204),
expectNewEvents(shoppingCartId, [
{
type: 'ShoppingCartConfirmed',
data: {
shoppingCartId,
confirmedAt: now,
},
},
]),
]);
});
});
void describe('When confirmed', () => {
void it('should not add products', () => {
return given(
existingStream(shoppingCartId, [
{
type: 'ProductItemAddedToShoppingCart',
data: {
shoppingCartId,
productItem,
addedAt: oldTime,
},
},
{
type: 'ShoppingCartConfirmed',
data: { shoppingCartId, confirmedAt: oldTime },
},
]),
)
.when((request) =>
request
.post(`/clients/${clientId}/shopping-carts/current/product-items`)
.send(productItem),
)
.then(
expectError(403, {
detail: 'Shopping Cart already closed',
status: 403,
title: 'Forbidden',
type: 'about:blank',
}),
);
});
});
const oldTime = new Date();
const now = new Date();
const unitPrice = Math.random() * 10;
const given = ApiSpecification.for<ShoppingCartEvent>(
() => getInMemoryEventStore(),
(eventStore) =>
getApplication({
apis: [
shoppingCartApi(
eventStore,
() => Promise.resolve(unitPrice),
() => now,
),
],
}),
);
const getRandomProduct = (): PricedProductItem => {
return {
productId: randomUUID(),
unitPrice,
quantity: Math.random() * 10,
};
};
const productItem = getRandomProduct();
});
You can use those tests as complementary to the business logic (e.g., testing the most important scenarios), or you may even replace unit tests with them. As they're in memory, they're fast enough to be run continuously.
You can also replace the in-memory store with the real one (e.g. EventStoreDB) and test your module in isolation from other modules. The choice is yours!
Again, in Emmett, we don't want to force you to anything but give you options and the recommended safe path.
TIP
We encourage you to watch Martin Thwaites' talk "Building Operable Software with TDD (but not the way you think)". It nicely explains why we can now shift the balance from unit testing to integration testing.
End-to-End Testing
You may say:
Those tests are cool, but I'd like to either test business logic with unit tests or run my end-to-end tests treating the WebApi as a black box.
And we answer: sure, why not! We also give you help with that.
Let's start by adding some flavour and use EventStoreDB this time. We need to install two more packages. One for adding implementation of the EventStoreDB event store:
$ npm add @event-driven-io/@event-driven-io/emmett-esdb
$ pnpm add @event-driven-io/@event-driven-io/emmett-esdb
$ yarn add @event-driven-io/@event-driven-io/emmett-esdb
$ bun add @event-driven-io/@event-driven-io/emmett-esdb
Now, we need to switch the in-memory implementation to EventStoreDB in WebApi setup. Updated will look as follows:
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
import { EventStoreDBClient } from '@eventstore/db-client';
const eventStoreDBClient = EventStoreDBClient.connectionString(
`esdb://localhost:2113?tls=false`,
);
const eventStore = getEventStoreDBEventStore(eventStoreDBClient);
const shoppingCarts = shoppingCartApi(
eventStore,
getUnitPrice,
() => new Date(),
);
It's as simple as that; we're injecting just a different implementation.
As EventStoreDB is a real database, we need to set it up for our tests. The simplest option is to use a Docker container. You can do it in multiple ways, but the fastest can be using TestContainers. The library allows us to easily set up containers for our tests. It automatically randomise ports, helps in teardown etc.
Emmett provides the package with additional test containers like the one for EventStoreDB. You need to install:
$ npm add @event-driven-io/@event-driven-io/emmett-testcontainers
$ pnpm add @event-driven-io/@event-driven-io/emmett-testcontainers
$ yarn add @event-driven-io/@event-driven-io/emmett-testcontainers
$ bun add @event-driven-io/@event-driven-io/emmett-testcontainers
Having that, we can set our test container with:
import {
EventStoreDBContainer,
type StartedEventStoreDBContainer,
} from '@event-driven-io/emmett-testcontainers';
let esdbContainer: StartedEventStoreDBContainer;
void describe('ShoppingCart E2E', () => {
// Set up a container before all tests
before(async () => {
esdbContainer = await new EventStoreDBContainer().start();
});
// Stop container once we finished testing
after(() => {
return esdbContainer.stop();
});
// (...) Tests will go here
});
And create our test specification using the ApiE2ESpecification
type:
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
import {
ApiE2ESpecification,
getApplication,
} from '@event-driven-io/emmett-expressjs';
const given = ApiE2ESpecification.for(
() => getEventStoreDBEventStore(esdbContainer.getClient()),
(eventStore) =>
getApplication({
apis: [
shoppingCartApi(
eventStore,
() => Promise.resolve(unitPrice),
() => now,
),
],
}),
);
The test will look accordingly to the integration tests, with the distinction that we also use HTTP requests for the setup. We're also checking only responses; we treat the WebApi as a black box.
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
import { expectResponse } from '@event-driven-io/emmett-expressjs';
import type { StartedEventStoreDBContainer } from '@event-driven-io/emmett-testcontainers';
void describe('When opened with product item', () => {
const openedShoppingCartWithProduct: TestRequest = (request) =>
request
.post(`/clients/${clientId}/shopping-carts/current/product-items`)
.send(productItem);
void it('should confirm', () => {
return given(openedShoppingCartWithProduct)
.when((request) =>
request.post(`/clients/${clientId}/shopping-carts/current/confirm`),
)
.then([expectResponse(204)]);
});
});
Complete tests will look like this:
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
import {
ApiE2ESpecification,
expectResponse,
getApplication,
type TestRequest,
} from '@event-driven-io/emmett-expressjs';
import {
EventStoreDBContainer,
StartedEventStoreDBContainer,
} from '@event-driven-io/emmett-testcontainers';
import { randomUUID } from 'node:crypto';
void describe('ShoppingCart E2E', () => {
const unitPrice = 100;
let clientId: string;
let shoppingCartId: string;
let esdbContainer: StartedEventStoreDBContainer;
let given: ApiE2ESpecification;
before(async () => {
esdbContainer = await new EventStoreDBContainer().start();
given = ApiE2ESpecification.for(
() => getEventStoreDBEventStore(esdbContainer.getClient()),
(eventStore) =>
getApplication({
apis: [
shoppingCartApi(
eventStore,
() => Promise.resolve(unitPrice),
() => now,
),
],
}),
);
});
beforeEach(() => {
clientId = randomUUID();
shoppingCartId = `shopping_cart:${clientId}:current`;
});
after(() => {
return esdbContainer.stop();
});
void describe('When opened with product item', () => {
const openedShoppingCartWithProduct: TestRequest = (request) =>
request
.post(`/clients/${clientId}/shopping-carts/current/product-items`)
.send(productItem);
void it('should confirm', () =>
given(openedShoppingCartWithProduct)
.when((request) =>
request.post(`/clients/${clientId}/shopping-carts/current/confirm`),
)
.then([expectResponse(204)]));
void it('returns details', () =>
given(openedShoppingCartWithProduct)
.when((request) =>
request.get(`/clients/${clientId}/shopping-carts/current`).send(),
)
.then([
expectResponse(200, {
body: {
clientId,
id: shoppingCartId,
productItems: [
{
quantity: productItem.quantity,
productId: productItem.productId,
},
],
status: ShoppingCartStatus.Opened,
},
}),
]));
});
const now = new Date();
const getRandomProduct = (): PricedProductItem => {
return {
productId: randomUUID(),
unitPrice: unitPrice,
quantity: Math.random() * 10,
};
};
const productItem = getRandomProduct();
});
Check also the full sample in Emmett repository.