API docs ​
Event ​
Events are the centrepiece of event-sourced systems. They represent both critical points of the business process but are also used as the state. That enables you to reflect your business into the code better, getting the synergy. Let's model a simple business process: a shopping cart. You can open it, add or remove the product from it and confirm or cancel.
Event type helps to keep the event definition aligned. It's not a must, but it helps to ensure that it has a type name defined (e.g. ProductItemAddedToShoppingCart
) and read-only payload data.
You can use it as follows
import type { Event } from '@event-driven-io/emmett';
type ProductItemAddedToShoppingCart = Event<
'ProductItemAddedToShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
}
>;
The type is a simple wrapper to ensure the structure's correctness. It defines:
- type - event type name,
- data - represents the business data the event contains. It has to be a record structure; primitives are not allowed,
- metadata - represents the generic data event contains. It can represent telemetry, user id, tenant id, timestamps and other information that can be useful for running infrastructure. It has to be a record structure; primitives are not allowed.
See more context in getting started guide
import type { DefaultRecord, Flavour } from './';
export type Event<
EventType extends string = string,
EventData extends DefaultRecord = DefaultRecord,
EventMetaData extends DefaultRecord = DefaultRecord,
> = Flavour<
Readonly<{
type: EventType;
data: EventData;
metadata?: EventMetaData;
}>,
'Event'
>;
export type EventTypeOf<T extends Event> = T['type'];
export type EventDataOf<T extends Event> = T['data'];
export type EventMetaDataOf<T extends Event> = T['metadata'];
export type CanHandle<T extends Event> = EventTypeOf<T>[];
export type CreateEventType<
EventType extends string,
EventData extends DefaultRecord,
EventMetaData extends DefaultRecord | undefined,
> = Readonly<{
type: EventType;
data: EventData;
metadata?: EventMetaData;
}>;
export const event = <EventType extends Event>(
type: EventTypeOf<EventType>,
data: EventDataOf<EventType>,
metadata?: EventMetaDataOf<EventType>,
): CreateEventType<
EventTypeOf<EventType>,
EventDataOf<EventType>,
EventMetaDataOf<EventType>
> => {
return {
type,
data,
metadata,
};
};
export type ReadEvent<
EventType extends Event = Event,
EventMetaDataType extends EventMetaDataOf<EventType> &
ReadEventMetadata = EventMetaDataOf<EventType> & ReadEventMetadata,
> = CreateEventType<
EventTypeOf<EventType>,
EventDataOf<EventType>,
EventMetaDataType
> &
EventType & { metadata: EventMetaDataType };
export type ReadEventMetadata = Readonly<{
eventId: string;
streamPosition: bigint;
streamName: string;
}>;
export type ReadEventMetadataWithGlobalPosition = ReadEventMetadata & {
globalPosition: bigint;
};
Command ​
Commands represent intention to perform business operation. It targets a specific audience. It can be an application service and request with intention to “add user” or “change the order status to confirmed”. So the sender of the command must know the recipient and expects the request to be executed. Of course, the recipient may refuse to do it by not passing us the salt or throwing an exception during the request handling.
Command type helps to keep the command definition aligned. It's not a must, but it helps to ensure that it has a type name defined (e.g. AddProductItemToShoppingCart
) and read-only payload data.
You can use it as follows
import type { Command } from '@event-driven-io/emmett';
type AddProductItemToShoppingCart = Command<
'AddProductItemToShoppingCart',
{
shoppingCartId: string;
productItem: PricedProductItem;
}
>;
The type is a simple wrapper to ensure the structure's correctness. It defines:
- type - command type name,
- data - represents the business data the command contains. It has to be a record structure; primitives are not allowed,
- metadata - represents the generic data command contains. It can represent telemetry, user id, tenant id, timestamps and other information that can be useful for running infrastructure. It has to be a record structure; primitives are not allowed.
See more context in getting started guide
import type { DefaultRecord, Flavour } from './';
export type Command<
CommandType extends string = string,
CommandData extends DefaultRecord = DefaultRecord,
CommandMetaData extends DefaultRecord = DefaultCommandMetadata,
> = Flavour<
Readonly<{
type: CommandType;
data: Readonly<CommandData>;
metadata?: CommandMetaData | undefined;
}>,
'Command'
>;
export type CommandTypeOf<T extends Command> = T['type'];
export type CommandDataOf<T extends Command> = T['data'];
export type CommandMetaDataOf<T extends Command> = T['metadata'];
export type CreateCommandType<
CommandType extends string,
CommandData extends DefaultRecord,
CommandMetaData extends DefaultRecord | undefined,
> = Readonly<{
type: CommandType;
data: CommandData;
metadata?: CommandMetaData;
}>;
export const command = <CommandType extends Command>(
type: CommandTypeOf<CommandType>,
data: CommandDataOf<CommandType>,
metadata?: CommandMetaDataOf<CommandType>,
): CreateCommandType<
CommandTypeOf<CommandType>,
CommandDataOf<CommandType>,
CommandMetaDataOf<CommandType>
> => {
return {
type,
data,
metadata,
};
};
export type DefaultCommandMetadata = { now: Date };
Event Store ​
Emmett is an Event Sourcing framework, so we need an event store to store events, aye? Event stores are key-value databases. The key is a record id, and the value is an ordered list of events. Such a sequence of events is called Event Stream. One stream keeps all events recorded for a particular business process or entity.
The essential difference between Event Sourcing and Event Streaming is that in Event Sourcing, events are the state. There's no other state. We use recorded events to get the state and make the next decisions, resulting in more events. Plus, as you'd expect from the database, we get strong consistency on writes and reads. Read more in article.
Emmett provides a lightweight abstraction for event stores. We don't intend to provide the lowest common denominator but streamline the typical usage patterns. It's OK if you use your preferred event store or client for the cases where those parts do not suffice your needs. Still, what's there should take you far enough.
Here is the general definition of it:
export interface EventStore<
StreamVersion = DefaultStreamVersionType,
ReadEventMetadataType extends ReadEventMetadata = ReadEventMetadata,
> {
aggregateStream<State, EventType extends Event>(
streamName: string,
options: AggregateStreamOptions<
State,
EventType,
StreamVersion,
ReadEventMetadataType
>,
): Promise<AggregateStreamResult<State, StreamVersion>>;
readStream<EventType extends Event>(
streamName: string,
options?: ReadStreamOptions<StreamVersion>,
): Promise<ReadStreamResult<EventType, StreamVersion, ReadEventMetadataType>>;
appendToStream<EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions<StreamVersion>,
): Promise<AppendToStreamResult<StreamVersion>>;
// streamEvents(): ReadableStream<
// ReadEvent<Event, ReadEventMetadataType> | GlobalSubscriptionEvent
// >;
}
export type EventStoreSession<
EventStoreType extends EventStore<StreamVersion>,
StreamVersion = DefaultStreamVersionType,
> = {
eventStore: EventStoreType;
close: () => Promise<void>;
};
export interface EventStoreSessionFactory<
EventStoreType extends EventStore<StreamVersion>,
StreamVersion = DefaultStreamVersionType,
> {
withSession<T = unknown>(
callback: (
session: EventStoreSession<EventStoreType, StreamVersion>,
) => Promise<T>,
): Promise<T>;
}
export type DefaultStreamVersionType = bigint;
It brings you three most important methods:
readStream
- reads events for the specific stream. By default, it reads all events, but through options, you can specify the event range you want to get (from
,to
,maxCount
). You can also specify the expected stream version.appendToStream
- appends new events at the end of the stream. All events should be appended as an atomic operation. You can specify the expected stream version for an optimistic concurrency check. We're also getting the next stream version as a result.aggregateStream
- builds the current state from events. Internally, event store implementation should read all events in the stream based on the passed initial state and theevolve
function. It also supports all the same options as thereadStream
method.
Read more about how event stores are built in the article.
Command Handler ​
Event Sourcing brings a repeatable pattern for handling business logic. We can expand that to application logic.
Command Handling can be described by the following steps:
- Read events from the stream and build the state from them (in other words aggregate stream). Get also the current version of the stream.
- Run the business logic using the command and the state. Use the default (initial) state if the stream does not exist.
- Append the result of the business logic (so events) at the end of the stream from which you've read events. Use the read version (or the one provided by the user) for an optimistic concurrency check.
In pseudo-code, this could look as follows:
const { state, expectedStreamVersion } = await eventStore.aggregateStream(
streamName,
{
evolve,
initialState,
},
);
const events = handle(command, state);
await eventStore.appendToStream(streamName, result, { expectedStreamVersion });
That looks quite simple, but generalising it and making it robust requires some experience. But that's why you have Emmett, the intention is to cut the learning curve for you and help you with basic abstractions.
You can use the CommandHandler
method to set up a command handler for you:
import { CommandHandler } from '@event-driven-io/emmett';
import { evolve, initialState } from './shoppingCart';
export const handle = CommandHandler({ evolve, initialState });
Such handlers should be defined per stream type (e.g., one for Shopping Cart, the other for Orders, etc.). It can be used later in the application code as:
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();
const command: AddProductItemToShoppingCart = {
type: 'AddProductItemToShoppingCart',
data: {
shoppingCartId,
productItem,
},
};
const { nextExpectedStreamVersion } = await handle(
eventStore,
shoppingCartId,
(state) => addProductItem(command, state),
);