Skip to content

Event Stores

WARNING

We created this page with the help of the GenAI tool.

We're currently double-checking it to ensure the information is 100% correct and free of hallucinations.

Emmett provides multiple event store implementations to fit your infrastructure and requirements.

Overview

All event stores implement the same EventStore interface, making it easy to switch between them:

typescript
interface EventStore {
  // Append events to a stream
  appendToStream(
    streamName: string,
    events: Event[],
    options?: AppendOptions,
  ): Promise<AppendResult>;

  // Read events from a stream
  readStream(streamName: string, options?: ReadOptions): Promise<ReadResult>;

  // Aggregate events into state
  aggregateStream(
    streamName: string,
    options: AggregateOptions,
  ): Promise<AggregateResult>;
}

Quick Comparison

StoreBest ForPersistenceTransactionsScaling
PostgreSQLProduction appsYesFull ACIDHorizontal
EventStoreDBNative ESYesStream-levelCluster
MongoDBDocument-centricYesDocument-levelHorizontal
SQLiteDev/embeddedYesFull ACIDSingle node
In-MemoryTestingNoN/AN/A

Installation

Each event store is a separate package:

bash
npm install @event-driven-io/emmett-postgresql
bash
npm install @event-driven-io/emmett-esdb
bash
npm install @event-driven-io/emmett-mongodb
bash
npm install @event-driven-io/emmett-sqlite
bash
# Included in core package
npm install @event-driven-io/emmett

Quick Setup

typescript
import { getPostgreSQLEventStore } from '@event-driven-io/emmett-postgresql';

const eventStore = getPostgreSQLEventStore(
  'postgresql://user:password@localhost:5432/mydb',
);
typescript
import { getEventStoreDBEventStore } from '@event-driven-io/emmett-esdb';
import { EventStoreDBClient } from '@eventstore/db-client';

const client = EventStoreDBClient.connectionString(
  'esdb://localhost:2113?tls=false',
);
const eventStore = getEventStoreDBEventStore(client);
typescript
import { getMongoDBEventStore } from '@event-driven-io/emmett-mongodb';

const eventStore = getMongoDBEventStore({
  connectionString: 'mongodb://localhost:27017',
  database: 'events',
});
typescript
import { getSQLiteEventStore } from '@event-driven-io/emmett-sqlite';

const eventStore = getSQLiteEventStore('./events.db');
// or in-memory: getSQLiteEventStore(':memory:')
typescript
import { getInMemoryEventStore } from '@event-driven-io/emmett';

const eventStore = getInMemoryEventStore();

Common Operations

Appending Events

typescript
const result = await eventStore.appendToStream(
  'shopping_cart-123',
  [
    {
      type: 'ProductItemAdded',
      data: { productId: 'shoes', quantity: 1, price: 99 },
    },
    {
      type: 'ProductItemAdded',
      data: { productId: 'shirt', quantity: 2, price: 49 },
    },
  ],
  { expectedStreamVersion: 0n },
);

console.log(result.nextExpectedStreamVersion); // 2n

Reading Events

typescript
const { events, currentStreamVersion } =
  await eventStore.readStream('shopping_cart-123');

for (const event of events) {
  console.log(`${event.type}: ${JSON.stringify(event.data)}`);
}

Aggregating State

typescript
const { state, currentStreamVersion } = await eventStore.aggregateStream(
  'shopping_cart-123',
  {
    evolve: (state, event) => {
      switch (event.type) {
        case 'ProductItemAdded':
          return {
            ...state,
            items: [...state.items, event.data],
            total: state.total + event.data.price * event.data.quantity,
          };
        default:
          return state;
      }
    },
    initialState: () => ({ items: [], total: 0 }),
  },
);

Optimistic Concurrency

All event stores support optimistic concurrency control:

typescript
import { STREAM_DOES_NOT_EXIST, STREAM_EXISTS } from '@event-driven-io/emmett';

// Require stream to not exist (create new)
await eventStore.appendToStream(streamName, events, {
  expectedStreamVersion: STREAM_DOES_NOT_EXIST,
});

// Require stream to exist (update)
await eventStore.appendToStream(streamName, events, {
  expectedStreamVersion: STREAM_EXISTS,
});

// Require specific version
await eventStore.appendToStream(streamName, events, {
  expectedStreamVersion: 5n,
});

// Skip concurrency check
await eventStore.appendToStream(streamName, events, {
  expectedStreamVersion: NO_CONCURRENCY_CHECK,
});

Choosing an Event Store

See Choosing an Event Store for detailed guidance on selecting the right event store for your project.

Detailed Documentation