.do
Patterns

Event-Driven Patterns

Patterns for asynchronous, event-driven business architectures

Event-driven architecture enables loosely coupled, scalable business systems.

Event Basics

Events are semantic notifications that something happened:

import $, { on, send } from 'sdk.do'

// Listen for events
on($.Order.created, async (order) => {
  console.log('Order created:', order.orderNumber)
})

// Emit events
await send($.Order.created, {
  $type: 'Order',
  orderNumber: 'ORD-001',
  customer: customer.$id,
  totalPrice: 99.99,
})

Pub-Sub Pattern

Multiple subscribers react to single event:

// Multiple handlers for same event
on($.Order.created, async (order) => {
  // Handler 1: Process payment
  await send($.Payment.process, { order })
})

on($.Order.created, async (order) => {
  // Handler 2: Update analytics
  await db.increment($.Analytics, 'totalOrders')
  await db.increment($.Customer, order.customer.$id, {
    orderCount: 1,
  })
})

on($.Order.created, async (order) => {
  // Handler 3: Notify warehouse
  await send($.Warehouse.notifyNewOrder, { order })
})

on($.Order.created, async (order) => {
  // Handler 4: Send confirmation
  await send($.Email.send, {
    to: order.customer.email,
    template: 'order-created',
  })
})

Event Sourcing

Store state as sequence of events:

// @errors: 7006
// @strict: true
import { $, send, db } from 'sdk.do'

type Event = {
  type: string
  data: any
  timestamp: Date
}

type Order = {
  $id: string
  status: string
  payment?: any
  shipment?: any
  deliveredAt?: Date
}

type Payment = { id: string; amount: number }

declare const customer: any
declare const items: any
declare const payment: Payment

// Event store with full type safety
class OrderAggregate {
  private events: Event[] = []
  private state: Order

  constructor(orderId: string) {
    this.state = { $id: orderId, status: 'new' }
  }

  // Apply event to change state - TypeScript ensures type safety
  private apply(event: Event) {
    this.events.push(event)

    switch (event.type) {
      case 'OrderCreated':
        this.state = { ...this.state, ...event.data, status: 'created' }
        break
      case 'PaymentProcessed':
        this.state.status = 'paid'
        this.state.payment = event.data
        break
      case 'OrderShipped':
        this.state.status = 'shipped'
        this.state.shipment = event.data
        break
      case 'OrderDelivered':
        this.state.status = 'delivered'
        this.state.deliveredAt = event.timestamp
        break
    }
  }

  // Commands create events - return type is inferred
  async create(orderData: Partial<Order>) {
    this.apply({
      type: 'OrderCreated',
      data: orderData,
      timestamp: new Date(),
    })

    await send($.Order.created, this.state)
  }

  async processPayment(payment: Payment) {
    this.apply({
      type: 'PaymentProcessed',
      data: payment,
      timestamp: new Date(),
    })

    await send($.Payment.processed, { order: this.state, payment })
  }

  // Reconstruct state from events - fully typed reconstruction
  static async load(orderId: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate(orderId)
    //    ^?
    const events = await db.list($.Event, {
      where: { aggregateId: orderId },
    })

    for (const event of events) {
      aggregate.apply(event)
    }

    return aggregate
  }

  getCurrentState(): Order {
    return this.state
    //            ^?
  }

  getEvents(): Event[] {
    return this.events
  }
}

// Usage - TypeScript tracks types through event sourcing
const order = new OrderAggregate('order-001')
await order.create({ customer, items, totalPrice: 99.99 })
await order.processPayment(payment)

const currentState = order.getCurrentState() // Type: Order
//    ^?
const allEvents = order.getEvents() // Type: Event[]
//    ^?

CQRS (Command Query Responsibility Segregation)

Separate reads and writes:

// Write model (commands)
class OrderCommandService {
  async createOrder(data: CreateOrderCommand) {
    // Validate
    if (!data.items?.length) {
      throw new Error('Order must have items')
    }

    // Create order
    const order = await $.Order.create({
      $type: 'Order',
      customer: data.customer,
      orderedItem: data.items,
      totalPrice: calculateTotal(data.items),
    })

    // Emit event
    await send($.Order.created, order)

    return order.$id
  }

  async cancelOrder(orderId: string, reason: string) {
    const order = await db.get($.Order, orderId)

    if (order.status === 'shipped') {
      throw new Error('Cannot cancel shipped order')
    }

    await db.update($.Order, orderId, {
      status: 'cancelled',
      cancelReason: reason,
    })

    await send($.Order.cancelled, { order, reason })
  }
}

// Read model (queries)
class OrderQueryService {
  async getOrderById(orderId: string) {
    return await db.get($.Order, orderId)
  }

  async getCustomerOrders(customerId: string) {
    return await db.list($.Order, {
      where: { customer: customerId },
      sort: { orderDate: -1 },
    })
  }

  async getOrderStats(timeRange: string) {
    return await db.aggregate($.Order, {
      where: { orderDate: { $gte: parseTimeRange(timeRange) } },
      aggregations: {
        totalOrders: { $count: '*' },
        totalRevenue: { $sum: 'totalPrice' },
        avgOrderValue: { $avg: 'totalPrice' },
      },
    })
  }

  // Denormalized read model
  async getOrderWithDetails(orderId: string) {
    const order = await db.get($.Order, orderId)

    // Fetch related data in parallel
    const [customer, items, shipment] = await Promise.all([
      db.get($.Person, order.customer),
      Promise.all(order.orderedItem.map((item) => db.get($.Product, item.orderItem))),
      db.related(order, $.shippedAs, $.Shipment).then((s) => s[0]),
    ])

    return {
      order,
      customer,
      items,
      shipment,
    }
  }
}

// Usage
const commands = new OrderCommandService()
const queries = new OrderQueryService()

// Write
const orderId = await commands.createOrder({
  customer: customerId,
  items: cartItems,
})

// Read
const order = await queries.getOrderById(orderId)
const customerOrders = await queries.getCustomerOrders(customerId)
const stats = await queries.getOrderStats('30d')

Event Replay

Replay events to rebuild state or test changes:

// Replay all events for an entity
async function replayEvents(aggregateId: string) {
  const events = await db.list($.Event, {
    where: { aggregateId },
    sort: { timestamp: 1 },
  })

  let state = {}

  for (const event of events) {
    state = applyEvent(state, event)
    console.log(`After ${event.type}:`, state)
  }

  return state
}

// Replay events to specific point in time
async function replayToTimestamp(aggregateId: string, timestamp: Date) {
  const events = await db.list($.Event, {
    where: {
      aggregateId,
      timestamp: { $lte: timestamp },
    },
    sort: { timestamp: 1 },
  })

  let state = {}
  for (const event of events) {
    state = applyEvent(state, event)
  }

  return state
}

// Test business logic changes by replaying
async function testNewBusinessLogic(aggregateId: string) {
  const events = await db.list($.Event, {
    where: { aggregateId },
    sort: { timestamp: 1 },
  })

  // Apply with old logic
  const oldState = events.reduce((state, event) => {
    return applyEventOldLogic(state, event)
  }, {})

  // Apply with new logic
  const newState = events.reduce((state, event) => {
    return applyEventNewLogic(state, event)
  }, {})

  // Compare results
  console.log('Differences:', diff(oldState, newState))
}

Event Versioning

Handle schema evolution:

// Event with version
interface OrderCreatedV1 {
  version: 1
  type: 'OrderCreated'
  data: {
    customer: string
    items: string[]
    total: number
  }
}

interface OrderCreatedV2 {
  version: 2
  type: 'OrderCreated'
  data: {
    customer: string
    orderedItem: OrderItem[] // More detailed
    totalPrice: number // Renamed
    tax: number // New field
  }
}

// Upcaster: Convert old events to new format
function upcastEvent(event: OrderCreatedV1 | OrderCreatedV2): OrderCreatedV2 {
  if (event.version === 2) {
    return event
  }

  // Convert v1 to v2
  return {
    version: 2,
    type: 'OrderCreated',
    data: {
      customer: event.data.customer,
      orderedItem: event.data.items.map((id) => ({
        orderItem: id,
        orderQuantity: 1,
      })),
      totalPrice: event.data.total,
      tax: event.data.total * 0.08, // Calculate tax
    },
  }
}

// Always work with latest version
on($.Order.created, async (event) => {
  const latestEvent = upcastEvent(event)
  await processOrder(latestEvent.data)
})

Temporal Events

Events that are scheduled or recurring:

// Schedule future event
await send($.Email.send, {
  to: customer.email,
  template: 'renewal-reminder',
  scheduledFor: addDays(subscription.renewalDate, -7),
})

// Recurring events
on($.Subscription.created, async (sub) => {
  // Check subscription health daily
  await send($.Task.schedule, {
    action: 'check-subscription-health',
    subscription: sub.$id,
    recurring: 'daily',
    until: sub.endDate,
  })
})

// Cancel scheduled events
on($.Subscription.cancelled, async (sub) => {
  await send($.Task.cancel, {
    where: {
      action: 'check-subscription-health',
      subscription: sub.$id,
    },
  })
})

Event Correlation

Link related events:

// Correlation ID tracks related events
const correlationId = generateId()

// Start workflow
await send($.Order.created, {
  ...order,
  correlationId,
})

on($.Order.created, async (order) => {
  // All subsequent events share correlation ID
  await send($.Payment.process, {
    order: order.$id,
    correlationId: order.correlationId,
  })
})

on($.Payment.processed, async (payment) => {
  await send($.Inventory.decrement, {
    items: payment.order.items,
    correlationId: payment.correlationId,
  })
})

// Query all events in a workflow
async function getWorkflowEvents(correlationId: string) {
  return await db.list($.Event, {
    where: { correlationId },
    sort: { timestamp: 1 },
  })
}

// Visualize workflow
const events = await getWorkflowEvents(correlationId)
console.log(events.map((e) => `${e.timestamp}: ${e.type}`))

Event Filtering

Subscribe to specific events only:

// Filter by entity type
on(
  $.Order.created,
  {
    where: {
      customer: { membershipLevel: 'premium' },
    },
  },
  async (order) => {
    // Only premium orders
    await processPremiumOrder(order)
  }
)

// Filter by value
on(
  $.Order.created,
  {
    where: {
      totalPrice: { $gte: 1000 },
    },
  },
  async (order) => {
    // High-value orders only
    await send($.Notification.send, {
      to: $.Role.SalesManager,
      message: `High-value order: ${order.orderNumber}`,
    })
  }
)

// Complex filters
on(
  $.SupportTicket.created,
  {
    where: {
      priority: { $in: ['high', 'urgent'] },
      category: 'technical',
      customer: {
        plan: 'enterprise',
      },
    },
  },
  async (ticket) => {
    // Enterprise technical tickets only
    await send($.Ticket.escalate, { ticket })
  }
)

Event Aggregation

Combine multiple events:

// Batch processing
const orderBatch: Order[] = []

on($.Order.created, async (order) => {
  orderBatch.push(order)

  // Process every 100 orders or every 5 minutes
  if (orderBatch.length >= 100 || shouldFlush()) {
    await processOrderBatch(orderBatch)
    orderBatch.length = 0
  }
})

// Time-window aggregation
const windowedEvents = new Map<string, any[]>()

on($.Product.viewed, async (event) => {
  const window = Math.floor(Date.now() / 60000) // 1-minute windows
  const key = `${event.product.$id}-${window}`

  if (!windowedEvents.has(key)) {
    windowedEvents.set(key, [])
  }

  windowedEvents.get(key)!.push(event)

  // Process window when it closes
  setTimeout(async () => {
    const events = windowedEvents.get(key)
    if (events) {
      await send($.Analytics.productViews, {
        product: event.product,
        viewCount: events.length,
        window: new Date(window * 60000),
      })
      windowedEvents.delete(key)
    }
  }, 60000)
})

Dead Letter Queue

Handle failed events:

const MAX_RETRIES = 3

on($.Order.created, async (order) => {
  let attempts = 0

  while (attempts < MAX_RETRIES) {
    try {
      await processOrder(order)
      return
    } catch (error) {
      attempts++

      if (attempts >= MAX_RETRIES) {
        // Send to DLQ
        await db.create($.DeadLetter, {
          eventType: 'Order.created',
          payload: order,
          error: error.message,
          attempts,
          timestamp: new Date(),
        })

        // Alert operations
        await send($.Alert.send, {
          severity: 'high',
          message: `Order processing failed after ${attempts} attempts`,
          order: order.$id,
        })

        return
      }

      // Exponential backoff
      await sleep(Math.pow(2, attempts) * 1000)
    }
  }
})

// Process DLQ periodically
setInterval(async () => {
  const dlqItems = await db.list($.DeadLetter, {
    where: { processed: false },
    limit: 10,
  })

  for (const item of dlqItems) {
    try {
      // Retry processing
      await processEvent(item.eventType, item.payload)

      // Mark as processed
      await db.update($.DeadLetter, item.$id, {
        processed: true,
        processedAt: new Date(),
      })
    } catch (error) {
      console.error('DLQ item still failing:', item.$id)
    }
  }
}, 300000) // Every 5 minutes

Best Practices

1. Event Naming

// Good: Past tense, specific
$.Order.created
$.Payment.processed
$.Shipment.delivered

// Avoid: Present tense
$.Order.create
$.Payment.process

2. Event Payload

// Good: Include complete data
await send($.Order.created, {
  $type: 'Order',
  $id: order.$id,
  orderNumber: order.orderNumber,
  customer: order.customer,
  items: order.orderedItem,
  totalPrice: order.totalPrice,
  timestamp: new Date(),
})

// Avoid: Just IDs
await send($.Order.created, {
  orderId: '123',
})

3. Idempotency

// Handle duplicate events
const processedEvents = new Set()

on($.Order.created, async (order) => {
  const eventId = `${order.$id}-created`

  if (processedEvents.has(eventId)) {
    return // Already processed
  }

  await processOrder(order)
  processedEvents.add(eventId)
})

4. Event Schema

// Consistent event structure
interface DomainEvent<T> {
  $id: string
  $type: string
  type: string // Event type (e.g., 'Order.created')
  version: number
  data: T
  metadata: {
    timestamp: Date
    correlationId?: string
    causationId?: string
    userId?: string
  }
}

// Usage
const event: DomainEvent<Order> = {
  $id: generateId(),
  $type: 'Event',
  type: 'Order.created',
  version: 1,
  data: order,
  metadata: {
    timestamp: new Date(),
    correlationId: correlationId,
    userId: user.$id,
  },
}

Summary

Event-driven patterns enable:

  • Pub-Sub - Multiple reactions to events
  • Event Sourcing - State as event sequence
  • CQRS - Separate reads/writes
  • Temporal Events - Scheduled/recurring
  • Correlation - Link related events
  • Aggregation - Batch processing
  • DLQ - Failed event handling

Choose patterns based on your scalability, consistency, and complexity needs.


Next: AI Integration Patterns →