Event Store
WARNING
We created this page with the help of the GenAI tool.
We're currently double-checking it to ensure the information is 100% correct and free of hallucinations.
The event store is the persistence layer for Event Sourcing. It stores events in append-only streams.
Overview
Event stores are fundamentally key-value databases where:
- Key = Stream name (e.g.,
shopping_cart-123) - Value = Ordered list of events
Each stream represents a single aggregate's history. Events are appended atomically and can never be modified or deleted.
Read more: Event stores are key-value databases
Interface
interface EventStore<ReadEventMetadataType extends AnyReadEventMetadata> {
readStream<EventType extends Event>(
streamName: string,
options?: ReadStreamOptions,
): Promise<ReadStreamResult<EventType, ReadEventMetadataType>>;
appendToStream<EventType extends Event>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions,
): Promise<AppendToStreamResult>;
aggregateStream<State, EventType extends Event>(
streamName: string,
options: AggregateStreamOptions<State, EventType, ReadEventMetadataType>,
): Promise<AggregateStreamResult<State>>;
}Core Methods
readStream
Reads events from a stream.
const result =
await eventStore.readStream<ShoppingCartEvent>('shopping_cart-123');
console.log(result.events); // Array of events
console.log(result.currentStreamVersion); // Current stream version (bigint)
console.log(result.streamExists); // true if stream existsReading Options
// Read from specific position
const { events } = await eventStore.readStream('stream-1', {
from: 5n, // Start from position 5
});
// Read up to specific position
const { events } = await eventStore.readStream('stream-1', {
to: 10n, // Read up to position 10
});
// Read with limit
const { events } = await eventStore.readStream('stream-1', {
from: 0n,
maxCount: 100n, // Max 100 events
});
// With expected version (for optimistic concurrency)
const { events } = await eventStore.readStream('stream-1', {
expectedStreamVersion: 5n, // Throws if version doesn't match
});appendToStream
Appends events to the end of a stream.
const result = await eventStore.appendToStream<ShoppingCartEvent>(
'shopping_cart-123',
[
{
type: 'ProductItemAdded',
data: { productId: 'shoes-1', quantity: 2, price: 99.99 },
},
],
);
console.log(result.nextExpectedStreamVersion); // Version after append (bigint)
console.log(result.createdNewStream); // true if stream was createdOptimistic Concurrency
// Require specific version before appending
const result = await eventStore.appendToStream(
'shopping_cart-123',
[newEvent],
{ expectedStreamVersion: 5n },
);
// Throws ConcurrencyError if current version !== 5n
// Require stream doesn't exist
await eventStore.appendToStream('new-stream', [firstEvent], {
expectedStreamVersion: 'no_stream',
});
// Require stream exists
await eventStore.appendToStream('existing-stream', [event], {
expectedStreamVersion: 'stream_exists',
});aggregateStream
Rebuilds state from events using an evolve function.
interface ShoppingCart {
items: ProductItem[];
status: 'Open' | 'Confirmed' | 'Cancelled';
}
const { state, currentStreamVersion } = await eventStore.aggregateStream<
ShoppingCart,
ShoppingCartEvent
>('shopping_cart-123', {
evolve: (state, event) => {
switch (event.type) {
case 'ProductItemAdded':
return { ...state, items: [...state.items, event.data] };
case 'ShoppingCartConfirmed':
return { ...state, status: 'Confirmed' };
default:
return state;
}
},
initialState: () => ({ items: [], status: 'Open' }),
});Result Types
ReadStreamResult
type ReadStreamResult<EventType, MetadataType> = {
events: ReadEvent<EventType, MetadataType>[];
currentStreamVersion: bigint;
streamExists: boolean;
};AppendToStreamResult
type AppendToStreamResult = {
nextExpectedStreamVersion: bigint;
createdNewStream: boolean;
};
// Some stores include global position
type AppendToStreamResultWithGlobalPosition = AppendToStreamResult & {
lastEventGlobalPosition: bigint;
};AggregateStreamResult
type AggregateStreamResult<State> = {
state: State;
currentStreamVersion: bigint;
streamExists: boolean;
};Expected Stream Version
Control concurrency with expected versions:
| Value | Meaning |
|---|---|
bigint (e.g., 5n) | Exact version required |
'no_stream' | Stream must not exist |
'stream_exists' | Stream must exist (any version) |
import { ConcurrencyError } from '@event-driven-io/emmett';
try {
await eventStore.appendToStream('cart-123', [event], {
expectedStreamVersion: 5n,
});
} catch (error) {
if (error instanceof ConcurrencyError) {
console.log(`Expected: ${error.expected}, Actual: ${error.actual}`);
// Retry with fresh state
}
}Event Store Implementations
Emmett provides multiple implementations:
| Implementation | Package | Use Case |
|---|---|---|
| In-Memory | @event-driven-io/emmett | Testing |
| PostgreSQL | @event-driven-io/emmett-postgresql | Production |
| EventStoreDB | @event-driven-io/emmett-esdb | Production |
| MongoDB | @event-driven-io/emmett-mongodb | Production |
| SQLite | @event-driven-io/emmett-sqlite | Development |
In-Memory (Testing)
import { getInMemoryEventStore } from '@event-driven-io/emmett';
const eventStore = getInMemoryEventStore();PostgreSQL
import { getPostgreSQLEventStore } from '@event-driven-io/emmett-postgresql';
const eventStore = getPostgreSQLEventStore(connectionString);EventStoreDB
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
const eventStore = getEventStoreDBEventStore(client);Session Factory
For transaction management:
interface EventStoreSessionFactory<Store extends EventStore> {
withSession<T>(
callback: (session: EventStoreSession<Store>) => Promise<T>,
): Promise<T>;
}
// Usage
await sessionFactory.withSession(async (session) => {
await session.eventStore.appendToStream('stream-1', [event1]);
await session.eventStore.appendToStream('stream-2', [event2]);
// Both appends are part of the same transaction
});Hooks
Configure behavior after commits:
const eventStore = getPostgreSQLEventStore(connectionString, {
hooks: {
onAfterCommit: async ({ events, streamName }) => {
// Called after successful append
// Warning: May not be called if process crashes
await notifyExternalSystem(events);
},
},
});WARNING
onAfterCommit is called exactly once if append succeeds, but:
- If the hook fails, the append still succeeds
- If process crashes after commit but before hook, delivery is not retried
- Race conditions may cause ordering issues under high concurrency
Best Practices
1. Use Meaningful Stream Names
// ✅ Good: Type + ID pattern
const streamName = `shopping_cart-${cartId}`;
const streamName = `user-${userId}`;
// ❌ Bad: Just ID
const streamName = cartId;2. Always Handle Concurrency
// ✅ Good: Handle version conflicts
try {
await eventStore.appendToStream(streamName, events, {
expectedStreamVersion: currentVersion,
});
} catch (error) {
if (error instanceof ConcurrencyError) {
// Reload and retry
}
}
// ❌ Bad: Ignore concurrency
await eventStore.appendToStream(streamName, events);3. Use aggregateStream for Commands
// ✅ Good: Aggregate then decide
const { state } = await eventStore.aggregateStream(streamName, {
evolve,
initialState,
});
const newEvents = decide(command, state);
await eventStore.appendToStream(streamName, newEvents);Type Source
export interface EventStore<
ReadEventMetadataType extends AnyReadEventMetadata = AnyReadEventMetadata,
> {
aggregateStream<
State,
EventType extends Event,
EventPayloadType extends Event = EventType,
>(
streamName: string,
options: AggregateStreamOptions<
State,
EventType,
ReadEventMetadataType,
EventPayloadType
>,
): Promise<AggregateStreamResult<State>>;
readStream<
EventType extends Event,
EventPayloadType extends Event = EventType,
>(
streamName: string,
options?: ReadStreamOptions<EventType, EventPayloadType>,
): Promise<ReadStreamResult<EventType, ReadEventMetadataType>>;
appendToStream<
EventType extends Event,
EventPayloadType extends Event = EventType,
>(
streamName: string,
events: EventType[],
options?: AppendToStreamOptions<EventType, EventPayloadType>,
): Promise<AppendToStreamResult>;
streamExists(streamName: string): Promise<StreamExistsResult>;
// streamEvents(): ReadableStream<
// ReadEvent<Event, ReadEventMetadataType> | GlobalSubscriptionEvent
// >;
}
export type EventStoreReadEventMetadata<Store extends EventStore> =
Store extends EventStore<infer T>
? T extends CommonReadEventMetadata
? T extends WithGlobalPosition
? ReadEventMetadata<true> & T
: ReadEventMetadata<undefined> & T
: never
: never;
export type EventStoreSession<EventStoreType extends EventStore> = {
eventStore: EventStoreType;
close: () => Promise<void>;
};
export interface EventStoreSessionFactory<EventStoreType extends EventStore> {
withSession<T = unknown>(
callback: (session: EventStoreSession<EventStoreType>) => Promise<T>,
): Promise<T>;
}See Also
- Choosing an Event Store
- PostgreSQL Event Store
- EventStoreDB
- Command Handler - Uses event store internally
- Let's build an Event Store in one hour!
