Patterns
Event-Driven Patterns
Patterns for asynchronous, event-driven business architectures
Event-driven architecture enables loosely coupled, scalable business systems.
Event Basics
Events are semantic notifications that something happened:
import $, { on, send } from 'sdk.do'
// Listen for events
on($.Order.created, async (order) => {
console.log('Order created:', order.orderNumber)
})
// Emit events
await send($.Order.created, {
$type: 'Order',
orderNumber: 'ORD-001',
customer: customer.$id,
totalPrice: 99.99,
})Pub-Sub Pattern
Multiple subscribers react to single event:
// Multiple handlers for same event
on($.Order.created, async (order) => {
// Handler 1: Process payment
await send($.Payment.process, { order })
})
on($.Order.created, async (order) => {
// Handler 2: Update analytics
await db.increment($.Analytics, 'totalOrders')
await db.increment($.Customer, order.customer.$id, {
orderCount: 1,
})
})
on($.Order.created, async (order) => {
// Handler 3: Notify warehouse
await send($.Warehouse.notifyNewOrder, { order })
})
on($.Order.created, async (order) => {
// Handler 4: Send confirmation
await send($.Email.send, {
to: order.customer.email,
template: 'order-created',
})
})Event Sourcing
Store state as sequence of events:
// @errors: 7006
// @strict: true
import { $, send, db } from 'sdk.do'
type Event = {
type: string
data: any
timestamp: Date
}
type Order = {
$id: string
status: string
payment?: any
shipment?: any
deliveredAt?: Date
}
type Payment = { id: string; amount: number }
declare const customer: any
declare const items: any
declare const payment: Payment
// Event store with full type safety
class OrderAggregate {
private events: Event[] = []
private state: Order
constructor(orderId: string) {
this.state = { $id: orderId, status: 'new' }
}
// Apply event to change state - TypeScript ensures type safety
private apply(event: Event) {
this.events.push(event)
switch (event.type) {
case 'OrderCreated':
this.state = { ...this.state, ...event.data, status: 'created' }
break
case 'PaymentProcessed':
this.state.status = 'paid'
this.state.payment = event.data
break
case 'OrderShipped':
this.state.status = 'shipped'
this.state.shipment = event.data
break
case 'OrderDelivered':
this.state.status = 'delivered'
this.state.deliveredAt = event.timestamp
break
}
}
// Commands create events - return type is inferred
async create(orderData: Partial<Order>) {
this.apply({
type: 'OrderCreated',
data: orderData,
timestamp: new Date(),
})
await send($.Order.created, this.state)
}
async processPayment(payment: Payment) {
this.apply({
type: 'PaymentProcessed',
data: payment,
timestamp: new Date(),
})
await send($.Payment.processed, { order: this.state, payment })
}
// Reconstruct state from events - fully typed reconstruction
static async load(orderId: string): Promise<OrderAggregate> {
const aggregate = new OrderAggregate(orderId)
// ^?
const events = await db.list($.Event, {
where: { aggregateId: orderId },
})
for (const event of events) {
aggregate.apply(event)
}
return aggregate
}
getCurrentState(): Order {
return this.state
// ^?
}
getEvents(): Event[] {
return this.events
}
}
// Usage - TypeScript tracks types through event sourcing
const order = new OrderAggregate('order-001')
await order.create({ customer, items, totalPrice: 99.99 })
await order.processPayment(payment)
const currentState = order.getCurrentState() // Type: Order
// ^?
const allEvents = order.getEvents() // Type: Event[]
// ^?CQRS (Command Query Responsibility Segregation)
Separate reads and writes:
// Write model (commands)
class OrderCommandService {
async createOrder(data: CreateOrderCommand) {
// Validate
if (!data.items?.length) {
throw new Error('Order must have items')
}
// Create order
const order = await $.Order.create({
$type: 'Order',
customer: data.customer,
orderedItem: data.items,
totalPrice: calculateTotal(data.items),
})
// Emit event
await send($.Order.created, order)
return order.$id
}
async cancelOrder(orderId: string, reason: string) {
const order = await db.get($.Order, orderId)
if (order.status === 'shipped') {
throw new Error('Cannot cancel shipped order')
}
await db.update($.Order, orderId, {
status: 'cancelled',
cancelReason: reason,
})
await send($.Order.cancelled, { order, reason })
}
}
// Read model (queries)
class OrderQueryService {
async getOrderById(orderId: string) {
return await db.get($.Order, orderId)
}
async getCustomerOrders(customerId: string) {
return await db.list($.Order, {
where: { customer: customerId },
sort: { orderDate: -1 },
})
}
async getOrderStats(timeRange: string) {
return await db.aggregate($.Order, {
where: { orderDate: { $gte: parseTimeRange(timeRange) } },
aggregations: {
totalOrders: { $count: '*' },
totalRevenue: { $sum: 'totalPrice' },
avgOrderValue: { $avg: 'totalPrice' },
},
})
}
// Denormalized read model
async getOrderWithDetails(orderId: string) {
const order = await db.get($.Order, orderId)
// Fetch related data in parallel
const [customer, items, shipment] = await Promise.all([
db.get($.Person, order.customer),
Promise.all(order.orderedItem.map((item) => db.get($.Product, item.orderItem))),
db.related(order, $.shippedAs, $.Shipment).then((s) => s[0]),
])
return {
order,
customer,
items,
shipment,
}
}
}
// Usage
const commands = new OrderCommandService()
const queries = new OrderQueryService()
// Write
const orderId = await commands.createOrder({
customer: customerId,
items: cartItems,
})
// Read
const order = await queries.getOrderById(orderId)
const customerOrders = await queries.getCustomerOrders(customerId)
const stats = await queries.getOrderStats('30d')Event Replay
Replay events to rebuild state or test changes:
// Replay all events for an entity
async function replayEvents(aggregateId: string) {
const events = await db.list($.Event, {
where: { aggregateId },
sort: { timestamp: 1 },
})
let state = {}
for (const event of events) {
state = applyEvent(state, event)
console.log(`After ${event.type}:`, state)
}
return state
}
// Replay events to specific point in time
async function replayToTimestamp(aggregateId: string, timestamp: Date) {
const events = await db.list($.Event, {
where: {
aggregateId,
timestamp: { $lte: timestamp },
},
sort: { timestamp: 1 },
})
let state = {}
for (const event of events) {
state = applyEvent(state, event)
}
return state
}
// Test business logic changes by replaying
async function testNewBusinessLogic(aggregateId: string) {
const events = await db.list($.Event, {
where: { aggregateId },
sort: { timestamp: 1 },
})
// Apply with old logic
const oldState = events.reduce((state, event) => {
return applyEventOldLogic(state, event)
}, {})
// Apply with new logic
const newState = events.reduce((state, event) => {
return applyEventNewLogic(state, event)
}, {})
// Compare results
console.log('Differences:', diff(oldState, newState))
}Event Versioning
Handle schema evolution:
// Event with version
interface OrderCreatedV1 {
version: 1
type: 'OrderCreated'
data: {
customer: string
items: string[]
total: number
}
}
interface OrderCreatedV2 {
version: 2
type: 'OrderCreated'
data: {
customer: string
orderedItem: OrderItem[] // More detailed
totalPrice: number // Renamed
tax: number // New field
}
}
// Upcaster: Convert old events to new format
function upcastEvent(event: OrderCreatedV1 | OrderCreatedV2): OrderCreatedV2 {
if (event.version === 2) {
return event
}
// Convert v1 to v2
return {
version: 2,
type: 'OrderCreated',
data: {
customer: event.data.customer,
orderedItem: event.data.items.map((id) => ({
orderItem: id,
orderQuantity: 1,
})),
totalPrice: event.data.total,
tax: event.data.total * 0.08, // Calculate tax
},
}
}
// Always work with latest version
on($.Order.created, async (event) => {
const latestEvent = upcastEvent(event)
await processOrder(latestEvent.data)
})Temporal Events
Events that are scheduled or recurring:
// Schedule future event
await send($.Email.send, {
to: customer.email,
template: 'renewal-reminder',
scheduledFor: addDays(subscription.renewalDate, -7),
})
// Recurring events
on($.Subscription.created, async (sub) => {
// Check subscription health daily
await send($.Task.schedule, {
action: 'check-subscription-health',
subscription: sub.$id,
recurring: 'daily',
until: sub.endDate,
})
})
// Cancel scheduled events
on($.Subscription.cancelled, async (sub) => {
await send($.Task.cancel, {
where: {
action: 'check-subscription-health',
subscription: sub.$id,
},
})
})Event Correlation
Link related events:
// Correlation ID tracks related events
const correlationId = generateId()
// Start workflow
await send($.Order.created, {
...order,
correlationId,
})
on($.Order.created, async (order) => {
// All subsequent events share correlation ID
await send($.Payment.process, {
order: order.$id,
correlationId: order.correlationId,
})
})
on($.Payment.processed, async (payment) => {
await send($.Inventory.decrement, {
items: payment.order.items,
correlationId: payment.correlationId,
})
})
// Query all events in a workflow
async function getWorkflowEvents(correlationId: string) {
return await db.list($.Event, {
where: { correlationId },
sort: { timestamp: 1 },
})
}
// Visualize workflow
const events = await getWorkflowEvents(correlationId)
console.log(events.map((e) => `${e.timestamp}: ${e.type}`))Event Filtering
Subscribe to specific events only:
// Filter by entity type
on(
$.Order.created,
{
where: {
customer: { membershipLevel: 'premium' },
},
},
async (order) => {
// Only premium orders
await processPremiumOrder(order)
}
)
// Filter by value
on(
$.Order.created,
{
where: {
totalPrice: { $gte: 1000 },
},
},
async (order) => {
// High-value orders only
await send($.Notification.send, {
to: $.Role.SalesManager,
message: `High-value order: ${order.orderNumber}`,
})
}
)
// Complex filters
on(
$.SupportTicket.created,
{
where: {
priority: { $in: ['high', 'urgent'] },
category: 'technical',
customer: {
plan: 'enterprise',
},
},
},
async (ticket) => {
// Enterprise technical tickets only
await send($.Ticket.escalate, { ticket })
}
)Event Aggregation
Combine multiple events:
// Batch processing
const orderBatch: Order[] = []
on($.Order.created, async (order) => {
orderBatch.push(order)
// Process every 100 orders or every 5 minutes
if (orderBatch.length >= 100 || shouldFlush()) {
await processOrderBatch(orderBatch)
orderBatch.length = 0
}
})
// Time-window aggregation
const windowedEvents = new Map<string, any[]>()
on($.Product.viewed, async (event) => {
const window = Math.floor(Date.now() / 60000) // 1-minute windows
const key = `${event.product.$id}-${window}`
if (!windowedEvents.has(key)) {
windowedEvents.set(key, [])
}
windowedEvents.get(key)!.push(event)
// Process window when it closes
setTimeout(async () => {
const events = windowedEvents.get(key)
if (events) {
await send($.Analytics.productViews, {
product: event.product,
viewCount: events.length,
window: new Date(window * 60000),
})
windowedEvents.delete(key)
}
}, 60000)
})Dead Letter Queue
Handle failed events:
const MAX_RETRIES = 3
on($.Order.created, async (order) => {
let attempts = 0
while (attempts < MAX_RETRIES) {
try {
await processOrder(order)
return
} catch (error) {
attempts++
if (attempts >= MAX_RETRIES) {
// Send to DLQ
await db.create($.DeadLetter, {
eventType: 'Order.created',
payload: order,
error: error.message,
attempts,
timestamp: new Date(),
})
// Alert operations
await send($.Alert.send, {
severity: 'high',
message: `Order processing failed after ${attempts} attempts`,
order: order.$id,
})
return
}
// Exponential backoff
await sleep(Math.pow(2, attempts) * 1000)
}
}
})
// Process DLQ periodically
setInterval(async () => {
const dlqItems = await db.list($.DeadLetter, {
where: { processed: false },
limit: 10,
})
for (const item of dlqItems) {
try {
// Retry processing
await processEvent(item.eventType, item.payload)
// Mark as processed
await db.update($.DeadLetter, item.$id, {
processed: true,
processedAt: new Date(),
})
} catch (error) {
console.error('DLQ item still failing:', item.$id)
}
}
}, 300000) // Every 5 minutesBest Practices
1. Event Naming
// Good: Past tense, specific
$.Order.created
$.Payment.processed
$.Shipment.delivered
// Avoid: Present tense
$.Order.create
$.Payment.process2. Event Payload
// Good: Include complete data
await send($.Order.created, {
$type: 'Order',
$id: order.$id,
orderNumber: order.orderNumber,
customer: order.customer,
items: order.orderedItem,
totalPrice: order.totalPrice,
timestamp: new Date(),
})
// Avoid: Just IDs
await send($.Order.created, {
orderId: '123',
})3. Idempotency
// Handle duplicate events
const processedEvents = new Set()
on($.Order.created, async (order) => {
const eventId = `${order.$id}-created`
if (processedEvents.has(eventId)) {
return // Already processed
}
await processOrder(order)
processedEvents.add(eventId)
})4. Event Schema
// Consistent event structure
interface DomainEvent<T> {
$id: string
$type: string
type: string // Event type (e.g., 'Order.created')
version: number
data: T
metadata: {
timestamp: Date
correlationId?: string
causationId?: string
userId?: string
}
}
// Usage
const event: DomainEvent<Order> = {
$id: generateId(),
$type: 'Event',
type: 'Order.created',
version: 1,
data: order,
metadata: {
timestamp: new Date(),
correlationId: correlationId,
userId: user.$id,
},
}Summary
Event-driven patterns enable:
- Pub-Sub - Multiple reactions to events
- Event Sourcing - State as event sequence
- CQRS - Separate reads/writes
- Temporal Events - Scheduled/recurring
- Correlation - Link related events
- Aggregation - Batch processing
- DLQ - Failed event handling
Choose patterns based on your scalability, consistency, and complexity needs.