Event Store
Emmett is an Event Sourcing framework, so we need an event store to store events, aye? Event stores are key-value databases. The key is a record id, and the value is an ordered list of events. Such a sequence of events is called Event Stream. One stream keeps all events recorded for a particular business process or entity.
The essential difference between Event Sourcing and Event Streaming is that in Event Sourcing, events are the state. There's no other state. We use recorded events to get the state and make the next decisions, resulting in more events. Plus, as you'd expect from the database, we get strong consistency on writes and reads. Read more in article.
Emmett provides a lightweight abstraction for event stores. We don't intend to provide the lowest common denominator but streamline the typical usage patterns. It's OK if you use your preferred event store or client for the cases where those parts do not suffice your needs. Still, what's there should take you far enough.
Usage
Here is the general definition of it:
export interface EventStore<
ReadEventMetadataType extends AnyReadEventMetadata = AnyReadEventMetadata,
> {
aggregateStream<State, EventType extends Event>(
streamName: string,
options: AggregateStreamOptions<State, EventType, ReadEventMetadataType>,
): Promise<
AggregateStreamResult<
State,
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
readStream<EventType extends Event>(
streamName: string,
options?: ReadStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<ReadStreamResult<EventType, ReadEventMetadataType>>;
appendToStream<EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<
AppendToStreamResult<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
// streamEvents(): ReadableStream<
// ReadEvent<Event, ReadEventMetadataType> | GlobalSubscriptionEvent
// >;
}
export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer T>
? T extends CommonReadEventMetadata<infer SP>
? T extends WithGlobalPosition<infer GP>
? ReadEventMetadata<GP, SP> & T
: ReadEventMetadata<undefined, SP> & T
: never
: never;
export type GlobalPositionTypeOfEventStore<Store extends EventStore> =
GlobalPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type StreamPositionTypeOfEventStore<Store extends EventStore> =
StreamPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type EventStoreSession<EventStoreType extends EventStore> = {
eventStore: EventStoreType;
close: () => Promise<void>;
};
export interface EventStoreSessionFactory<EventStoreType extends EventStore> {
withSession<T = unknown>(
callback: (session: EventStoreSession<EventStoreType>) => Promise<T>,
): Promise<T>;
}
It brings you three most important methods:
readStream
- reads events for the specific stream. By default, it reads all events, but through options, you can specify the event range you want to get (from
,to
,maxCount
). You can also specify the expected stream version.appendToStream
- appends new events at the end of the stream. All events should be appended as an atomic operation. You can specify the expected stream version for an optimistic concurrency check. We're also getting the next stream version as a result.aggregateStream
- builds the current state from events. Internally, event store implementation should read all events in the stream based on the passed initial state and theevolve
function. It also supports all the same options as thereadStream
method.
Definition
export interface EventStore<
ReadEventMetadataType extends AnyReadEventMetadata = AnyReadEventMetadata,
> {
aggregateStream<State, EventType extends Event>(
streamName: string,
options: AggregateStreamOptions<State, EventType, ReadEventMetadataType>,
): Promise<
AggregateStreamResult<
State,
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
readStream<EventType extends Event>(
streamName: string,
options?: ReadStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<ReadStreamResult<EventType, ReadEventMetadataType>>;
appendToStream<EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>,
): Promise<
AppendToStreamResult<
StreamPositionTypeOfReadEventMetadata<ReadEventMetadataType>
>
>;
// streamEvents(): ReadableStream<
// ReadEvent<Event, ReadEventMetadataType> | GlobalSubscriptionEvent
// >;
}
export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer T>
? T extends CommonReadEventMetadata<infer SP>
? T extends WithGlobalPosition<infer GP>
? ReadEventMetadata<GP, SP> & T
: ReadEventMetadata<undefined, SP> & T
: never
: never;
export type GlobalPositionTypeOfEventStore<Store extends EventStore> =
GlobalPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type StreamPositionTypeOfEventStore<Store extends EventStore> =
StreamPositionTypeOfReadEventMetadata<EventStoreReadEventMetadata<Store>>;
export type EventStoreSession<EventStoreType extends EventStore> = {
eventStore: EventStoreType;
close: () => Promise<void>;
};
export interface EventStoreSessionFactory<EventStoreType extends EventStore> {
withSession<T = unknown>(
callback: (session: EventStoreSession<EventStoreType>) => Promise<T>,
): Promise<T>;
}
See also
Read more about how event stores are built in the article.