Read Models
In Event Sourcing, rebuilding state from events works well for a single entity (a shopping cart might have 10-50 events). But showing a list of all shopping carts? You'd need to read events from thousands of streams and rebuild each cart in memory on every page load.
Read models solve this. They are queryable representations of your data, stored in the database of your choice and shaped for specific questions your application needs to answer. A projection is the event handler that builds a read model. It takes events and transforms them into the stored result. Each projection is a different interpretation of the same facts: a shopping cart summary for the menu bar, a client analytics dashboard, a product sales report. Same events, different read models shaped for different questions.
This guide shows you how to build projections and query the read models they produce.
Project Events into Read Models
From a Single Stream
One event stream maps to one document. The document ID equals the stream ID.
Use when: your read model represents a single entity, like a shopping cart summary, an order status, or a user profile.
import { pongoSingleStreamProjection } from '@event-driven-io/emmett-postgresql';
type ShoppingCartSummary = {
_id: string;
productItemsCount: number;
totalAmount: number;
};
const cartSummaryProjection = pongoSingleStreamProjection({
collectionName: 'shopping_cart_summaries',
canHandle: [
'ProductItemAddedToShoppingCart',
'ProductItemRemovedFromShoppingCart',
],
evolve: (document: ShoppingCartSummary | null, event: ShoppingCartEvent) => {
const current = document ?? {
_id: event.data.shoppingCartId,
productItemsCount: 0,
totalAmount: 0,
};
switch (event.type) {
case 'ProductItemAddedToShoppingCart':
return {
...current,
productItemsCount:
current.productItemsCount + event.data.productItem.quantity,
totalAmount:
current.totalAmount +
event.data.productItem.price * event.data.productItem.quantity,
};
case 'ProductItemRemovedFromShoppingCart':
return {
...current,
productItemsCount:
current.productItemsCount - event.data.productItem.quantity,
totalAmount:
current.totalAmount -
event.data.productItem.price * event.data.productItem.quantity,
};
}
},
});From Multiple Streams
Events from multiple streams combine into documents with custom IDs.
Use when: your read model aggregates across entities, like a client's total spending across all their carts, product statistics from all orders, or system-wide dashboards.
import { pongoMultiStreamProjection } from '@event-driven-io/emmett-postgresql';
type ClientShoppingSummary = {
_id: string;
clientId: string;
totalOrders: number;
totalSpent: number;
lastOrderDate: Date | null;
};
const clientSummaryProjection = pongoMultiStreamProjection({
collectionName: 'client_summaries',
canHandle: ['ShoppingCartConfirmed'],
// Extract document ID from event metadata
getDocumentId: (event) => event.metadata.clientId,
evolve: (
document: ClientShoppingSummary | null,
event: ShoppingCartConfirmed,
) => {
const current = document ?? {
_id: event.metadata.clientId,
clientId: event.metadata.clientId,
totalOrders: 0,
totalSpent: 0,
lastOrderDate: null,
};
return {
...current,
totalOrders: current.totalOrders + 1,
totalSpent: current.totalSpent + event.data.totalAmount,
lastOrderDate: event.data.confirmedAt,
};
},
});Inline vs. Async Registration
Inline Projections
Use inline when consistency matters more than write speed. Inline projections run in the same database transaction as the event append. Either both succeed or both fail, so your read model is always consistent with your events. For single-stream projections this is often the right default.
Be careful with inline multi-stream projections under high write load. When multiple streams update the same document concurrently, they can overwrite each other's changes. If that's a concern, switch to async.
import { projections } from '@event-driven-io/emmett';
import {
getPostgreSQLEventStore,
pongoMultiStreamProjection,
pongoSingleStreamProjection,
} from '@event-driven-io/emmett-postgresql';
const eventStore = getPostgreSQLEventStore(connectionString, {
// register inline projection
projections: projections.inline([cartSummaryProjection]),
});Async Projections
Use async when you need faster appends, when you're projecting to external systems, or when you have multi-stream projections that would suffer from concurrent write conflicts. Async projections process events in a background process, decoupled from the append. The tradeoff is eventual consistency: your read model may lag behind by a short window.
// create event store events consumer
const consumer = eventStore.consumer();
// register async projection
consumer.projector({ projection: clientSummaryProjection });
// start consuming events and projecting
await consumer.start();Common Patterns
Provide a Default State
If you'd rather not deal with null in your evolve function, provide an initialState. Emmett will use it when the document doesn't exist yet:
const projectionWithDefault = pongoMultiStreamProjection({
collectionName: 'client_summaries',
canHandle: ['ShoppingCartConfirmed'],
getDocumentId: (event) => event.metadata.clientId,
// Provide initial state for the document,
// so document is never null in evolve function
initialState: () => ({
_id: 'unknown',
clientId: 'unknown',
totalOrders: 0,
totalSpent: 0,
lastOrderDate: new Date(0),
}),
// Look Ma, no nulls! Initial state is used when document doesn't exist,
// so document is never null in evolve function
evolve: (document: ClientShoppingSummary, event: ShoppingCartConfirmed) => {
return {
...document,
_id: event.metadata.clientId,
clientId: event.metadata.clientId,
totalOrders: document.totalOrders + 1,
totalSpent: document.totalSpent + event.data.totalAmount,
lastOrderDate: event.data.confirmedAt,
};
},
});This works for both single-stream and multi-stream projections.
Delete a Document
Return null from evolve to delete the document. This is useful when a process completes and the read model should be cleared. For example, removing a pending cart summary after confirmation, so the next shopping session starts fresh:
const evolve = (
document: ShoppingCartShortInfo,
{ type, data: event }: ShoppingCartEvent,
): ShoppingCartShortInfo | null => {
switch (type) {
case 'ProductItemAddedToShoppingCart':
return {
totalAmount:
document.totalAmount +
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount + event.productItem.quantity,
};
case 'ProductItemRemovedFromShoppingCart':
return {
totalAmount:
document.totalAmount -
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount - event.productItem.quantity,
};
case 'ShoppingCartConfirmed':
case 'ShoppingCartCancelled':
// Delete the pending cart document
return null;
default:
return document;
}
};Filter Events
Your projection doesn't need to handle every event type in the stream. List the event types you care about in canHandle. Everything else is silently ignored. A shopping cart summary only needs product additions and removals; it doesn't need to know about confirmation or cancellation:
const shoppingCartShortInfoProjection = pongoSingleStreamProjection({
collectionName: 'shoppingCartShortInfo',
// Only events listed here will reach the evolve function.
// All other event types in the stream are ignored.
canHandle: [
'ProductItemAddedToShoppingCart',
'ProductItemRemovedFromShoppingCart',
'ShoppingCartConfirmed',
'ShoppingCartCancelled',
],
evolve: (
document: ShoppingCartShortInfo,
{ type, data: event }: ShoppingCartEvent,
): ShoppingCartShortInfo => {
switch (type) {
case 'ProductItemAddedToShoppingCart':
return {
totalAmount:
document.totalAmount +
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount + event.productItem.quantity,
};
case 'ProductItemRemovedFromShoppingCart':
return {
totalAmount:
document.totalAmount -
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount - event.productItem.quantity,
};
default:
return document;
}
},
initialState: () => ({ productItemsCount: 0, totalAmount: 0 }),
});Use Event Metadata
When building a multi-stream projection, you need a way to correlate events from different streams into the right document. If your read model groups data by client, but the client ID isn't in every event's data payload, you can pull it from event metadata. Be careful not to turn metadata into a bag for random data. Context like client ID, tenant, or correlation ID is a reasonable fit, especially if it's already available in your request pipeline for authorisation or routing:
const clientShoppingSummaryProjection = pongoMultiStreamProjection({
collectionName: 'clientShoppingSummary',
// Use event metadata to route events from different streams
// into the same document, grouped by client
getDocumentId: (event) => event.metadata.clientId,
canHandle: ['ShoppingCartConfirmed'],
evolve: (
document: ClientShoppingSummary,
event: ShoppingCartConfirmed,
): ClientShoppingSummary => ({
...document,
clientId: event.metadata.clientId,
totalOrders: document.totalOrders + 1,
totalSpent: document.totalSpent + event.data.totalAmount,
}),
initialState: () => ({
clientId: '',
totalOrders: 0,
totalSpent: 0,
}),
});Query Read Models
With Pongo (PostgreSQL)
Projected read models are stored in Pongo collections, PostgreSQL tables with a JSONB column for your document data. Query them using the Pongo client with MongoDB-like syntax:
const getShortInfoById = (
db: PongoDb,
shoppingCartId: string,
): Promise<ShoppingCartShortInfo | null> =>
db
.collection<ShoppingCartShortInfo>(shoppingCartShortInfoCollectionName)
.findOne({ _id: shoppingCartId });
const getDetailsById = (
db: PongoDb,
shoppingCartId: string,
): Promise<ShoppingCartDetails | null> =>
db
.collection<ShoppingCartDetails>(shoppingCartDetailsCollectionName)
.findOne({ _id: shoppingCartId });In API Routes
Wire Pongo queries into your Express route handlers to serve the read models:
const shoppingCartApi =
(readStore: PongoDb): WebApiSetup =>
(router: Router) => {
router.get(
'/clients/:clientId/shopping-carts/current',
on(async (request: Request) => {
const shoppingCartId = `shopping_cart:${request.params.clientId}:current`;
const result = await getDetailsById(readStore, shoppingCartId);
if (result === null) return NotFound();
if (result.status !== 'Opened') return NotFound();
return OK({ body: result });
}),
);
router.get(
'/clients/:clientId/shopping-carts/current/short-info',
on(async (request: Request) => {
const shoppingCartId = `shopping_cart:${request.params.clientId}:current`;
const result = await getShortInfoById(readStore, shoppingCartId);
if (result === null) return NotFound();
return OK({ body: result });
}),
);
};Test a Projection
Projection tests should run against a real database. Both querying behaviour and JSON serialisation can surprise you, so in-memory fakes won't give you enough confidence. Use PostgreSQLProjectionSpec with a test container for BDD-style given/when/then tests:
void describe('Shopping Cart Short Info Projection', () => {
let postgres: StartedPostgreSqlContainer;
let given: PostgreSQLProjectionSpec<ProductItemAdded | DiscountApplied>;
let shoppingCartId: string;
beforeAll(async () => {
postgres = await getPostgreSQLStartedContainer();
given = PostgreSQLProjectionSpec.for({
projection: shoppingCartShortInfoProjection,
connectionString: postgres.getConnectionUri(),
});
});
beforeEach(() => (shoppingCartId = `shoppingCart:${uuid()}:${uuid()}`));
afterAll(async () => {
await postgres.stop();
});
void it('creates summary from first event', () =>
given([])
.when([
eventInStream(shoppingCartId, {
type: 'ProductItemAdded',
data: {
productItem: { price: 100, productId: 'shoes', quantity: 100 },
},
}),
])
.then(
expectPongoDocuments
.fromCollection<ShoppingCartShortInfo>(
shoppingCartShortInfoCollectionName,
)
.withId(shoppingCartId)
.toBeEqual({
productItemsCount: 100,
totalAmount: 10000,
appliedDiscounts: [],
}),
));
void it('accumulates across events', () => {
const couponId = uuid();
return given(
eventsInStream<ProductItemAdded>(shoppingCartId, [
{
type: 'ProductItemAdded',
data: {
productItem: { price: 100, productId: 'shoes', quantity: 100 },
},
},
]),
)
.when(
newEventsInStream(shoppingCartId, [
{
type: 'DiscountApplied',
data: { percent: 10, couponId },
},
]),
)
.then(
expectPongoDocuments
.fromCollection<ShoppingCartShortInfo>(
shoppingCartShortInfoCollectionName,
)
.withId(shoppingCartId)
.toBeEqual({
productItemsCount: 100,
totalAmount: 9000,
appliedDiscounts: [couponId],
}),
);
});
});Best Practices
Shape Read Models for Queries
Start from the query your UI or API needs to serve, then shape your read model to match. If you're showing a "best-selling products" list, your read model should have fields you can sort and filter directly: total quantity sold, revenue, last sold date.
We recommend starting with one read model, so one projection per query.
A shopping cart summary for the menu bar only needs item count and total amount. If you need cart details for a different view, create a separate projection. Multiple focused projections are easier to maintain and rebuild than one that tries to answer every question:
// Read model for the menu bar: just count and total
type CartSummary = {
productItemsCount: number;
totalAmount: number;
};
const cartSummaryProjection = pongoSingleStreamProjection({
collectionName: 'cart_summaries',
canHandle: [
'ProductItemAddedToShoppingCart',
'ProductItemRemovedFromShoppingCart',
],
evolve: (
document: CartSummary,
{ type, data: event }: ShoppingCartEvent,
): CartSummary => {
switch (type) {
case 'ProductItemAddedToShoppingCart':
return {
productItemsCount:
document.productItemsCount + event.productItem.quantity,
totalAmount:
document.totalAmount +
event.productItem.unitPrice * event.productItem.quantity,
};
case 'ProductItemRemovedFromShoppingCart':
return {
productItemsCount:
document.productItemsCount - event.productItem.quantity,
totalAmount:
document.totalAmount -
event.productItem.unitPrice * event.productItem.quantity,
};
default:
return document;
}
},
initialState: () => ({ productItemsCount: 0, totalAmount: 0 }),
});
// Read model for the cart detail page: full product list
type CartDetails = {
productItems: Map<
string,
{ productId: string; quantity: number; unitPrice: number }
>;
status: 'open' | 'confirmed';
confirmedAt: Date | null;
};
const cartDetailsProjection = pongoSingleStreamProjection({
collectionName: 'cart_details',
canHandle: [
'ProductItemAddedToShoppingCart',
'ProductItemRemovedFromShoppingCart',
'ShoppingCartConfirmed',
],
evolve: (
document: CartDetails,
{ type, data: event }: ShoppingCartEvent,
): CartDetails => {
switch (type) {
case 'ProductItemAddedToShoppingCart': {
const existing = document.productItems.get(event.productItem.productId);
document.productItems.set(event.productItem.productId, {
productId: event.productItem.productId,
unitPrice: event.productItem.unitPrice,
quantity: (existing?.quantity ?? 0) + event.productItem.quantity,
});
return document;
}
case 'ProductItemRemovedFromShoppingCart': {
const existing = document.productItems.get(event.productItem.productId);
if (existing) {
existing.quantity -= event.productItem.quantity;
if (existing.quantity <= 0) {
document.productItems.delete(event.productItem.productId);
}
}
return document;
}
case 'ShoppingCartConfirmed':
return {
...document,
status: 'confirmed',
confirmedAt: event.confirmedAt,
};
default:
return document;
}
},
initialState: () => ({
productItems: new Map(),
status: 'open' as const,
confirmedAt: null,
}),
});The whole point of a projection is to pre-shape data for the questions you'll ask. The CartSummary vs CartDetails example above shows this: the summary pre-computes count and total for the menu bar, while the details projection keeps the full product list for the cart page.
Version Your Projections
When your projection logic changes (new fields, different calculations, a bug fix), existing documents were built with the old logic. You can't just update the code. Set the version property and rebuild from events. Emmett appends it to the collection name automatically:
const cartSummariesV2 = pongoSingleStreamProjection({
collectionName: 'cart_summaries',
// Emmett appends the version to the collection name automatically,
// creating 'cart_summaries_v2' in the database
version: 2,
canHandle: [
'ProductItemAddedToShoppingCart',
'ProductItemRemovedFromShoppingCart',
],
evolve: (
document: ShoppingCartShortInfo,
{ type, data: event }: ShoppingCartEvent,
): ShoppingCartShortInfo => {
switch (type) {
case 'ProductItemAddedToShoppingCart':
return {
totalAmount:
document.totalAmount +
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount + event.productItem.quantity,
};
case 'ProductItemRemovedFromShoppingCart':
return {
totalAmount:
document.totalAmount -
event.productItem.unitPrice * event.productItem.quantity,
productItemsCount:
document.productItemsCount - event.productItem.quantity,
};
default:
return document;
}
},
initialState: () => ({ productItemsCount: 0, totalAmount: 0 }),
});Rebuild From Events
Events are your source of truth; read models are derived data you can always rebuild. For production changes without downtime, use a blue-green approach:
- Deploy the new projection writing to a new collection alongside the old one
- Let it catch up by processing the event history
- Once it's current, switch your queries to the new collection
- Remove the old projection
If you can afford a brief window of incomplete data, the simpler path is to truncate the old collection and replay all events through the updated projection logic.
const consumer = rebuildPostgreSQLProjections({
connectionString,
projection,
});
await consumer.start();Troubleshooting
Projection Not Updating
If your read model isn't reflecting new events, the most common cause is a missing event type in canHandle. The projection silently ignores any event type not listed there. For multi-stream projections, also verify that getDocumentId returns the correct ID. If it pulls from metadata, make sure the metadata is actually being set when events are appended.
Stale Multi-Stream Data
If your multi-stream projection shows outdated data under concurrent writes, you're likely hitting write conflicts. When two streams both update the same document inline, the second write can overwrite the first. Switch the projection to async registration, which processes events sequentially and avoids this.
Inconsistent After Redeployment
If you changed your projection logic but existing documents still reflect the old calculations, you need to rebuild. Existing documents were created with the previous logic and won't update themselves. Version your collection name and replay from the event history. See Version Your Projections above.
