.do
Patterns

Event-Driven Service Orchestration

Complete guide to building event-driven service architectures with event routing, fan-out/fan-in patterns, event sourcing, and reactive service orchestration

Build loosely coupled, scalable service architectures where services react to events rather than direct calls, enabling real-time processing, flexible workflows, and system-wide decoupling.

Overview

Event-driven architecture (EDA) is a design pattern where services communicate through events rather than direct calls. Services publish events when something happens, and other services subscribe to events they're interested in. This creates highly decoupled, scalable systems that can evolve independently.

When to Use Event-Driven Architecture

Use event-driven patterns when:

  • Loose Coupling: Services should be independent and not know about each other
  • Scalability: System needs to handle high volumes of concurrent operations
  • Flexibility: New services need to be added without changing existing ones
  • Real-Time: Immediate reactions to events are required
  • Audit Trail: Complete history of system changes must be maintained
  • Event Replay: Need to reconstruct system state from event history
  • Asynchronous: Operations don't need immediate responses
  • Fan-Out: One event should trigger multiple independent actions

Avoid this pattern when:

  • Immediate, synchronous responses are required
  • System is simple with few services
  • Strong consistency is critical
  • Event ordering is difficult to maintain
  • Debugging complexity is a major concern

Architecture

graph TB P1[Publisher Service 1] --> EB[Event Bus] P2[Publisher Service 2] --> EB P3[Publisher Service 3] --> EB EB --> |order.created| S1[Email Service] EB --> |order.created| S2[Analytics Service] EB --> |order.created| S3[Inventory Service] EB --> |order.created| S4[Fulfillment Service] S4 --> |fulfillment.started| EB EB --> |fulfillment.started| S5[Shipping Service] EB --> |fulfillment.started| S6[Notification Service] EB --> DLQ[Dead Letter Queue] EB --> ES[Event Store]

Core Implementation

Event Bus

import $, { db, on, send } from 'sdk.do'

export class EventBus {
  private subscribers = new Map<string, Array<(event: any) => Promise<void>>>()

  subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, [])
    }

    this.subscribers.get(eventType)!.push(handler)
  }

  async publish(event: { type: string; data: any; metadata?: any }): Promise<void> {
    const fullEvent = {
      id: generateEventId(),
      timestamp: new Date().toISOString(),
      ...event,
      metadata: {
        ...event.metadata,
        publishedAt: new Date().toISOString(),
      },
    }

    // Store event for audit trail
    await this.storeEvent(fullEvent)

    // Get subscribers for this event type
    const handlers = this.subscribers.get(event.type) || []

    // Execute all handlers in parallel
    await Promise.allSettled(
      handlers.map(async (handler) => {
        try {
          await handler(fullEvent)
        } catch (error) {
          // Send to dead letter queue
          await this.handleFailedEvent(fullEvent, handler, error as Error)
        }
      })
    )
  }

  private async storeEvent(event: any): Promise<void> {
    await db.create($.Event, event)
  }

  private async handleFailedEvent(event: any, handler: Function, error: Error): Promise<void> {
    await db.create($.DeadLetterEvent, {
      event,
      handler: handler.name,
      error: error.message,
      failedAt: new Date().toISOString(),
    })
  }
}

function generateEventId(): string {
  return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

// Global event bus instance
export const eventBus = new EventBus()

Event Schema

import $, { db } from 'sdk.do'

export interface ServiceEvent<T = any> {
  id: string
  type: string
  version: string
  timestamp: string
  source: string
  data: T
  metadata?: {
    correlationId?: string
    causationId?: string
    userId?: string
    traceId?: string
    [key: string]: any
  }
}

// Event type definitions
export namespace EventTypes {
  // Order events
  export interface OrderCreated {
    orderId: string
    customerId: string
    items: Array<{
      productId: string
      quantity: number
      price: number
    }>
    total: number
    currency: string
  }

  export interface OrderPaid {
    orderId: string
    paymentId: string
    amount: number
    currency: string
  }

  export interface OrderShipped {
    orderId: string
    trackingNumber: string
    carrier: string
    estimatedDelivery: string
  }

  // Service execution events
  export interface ServiceExecutionStarted {
    executionId: string
    serviceId: string
    inputs: any
  }

  export interface ServiceExecutionCompleted {
    executionId: string
    serviceId: string
    outputs: any
    duration: number
  }

  export interface ServiceExecutionFailed {
    executionId: string
    serviceId: string
    error: string
    retryable: boolean
  }
}

Example 1: Order Processing System

Complete event-driven order processing with multiple services.

import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'

// Order Service - Publishes order events
export const orderService = {
  async createOrder(orderData: any): Promise<any> {
    // Create order in database
    const order = await db.create($.Order, {
      ...orderData,
      status: 'created',
      createdAt: new Date(),
    })

    // Publish order.created event
    await eventBus.publish({
      type: 'order.created',
      data: {
        orderId: order.id,
        customerId: order.customerId,
        items: order.items,
        total: order.total,
        currency: order.currency,
        shippingAddress: order.shippingAddress,
      },
      metadata: {
        source: 'order-service',
        correlationId: order.id,
      },
    })

    return order
  },
}

// Email Service - Reacts to order events
eventBus.subscribe('order.created', async (event) => {
  const { orderId, customerId } = event.data

  // Get customer details
  const customer = await db.findOne($.Customer, { id: customerId })

  // Send order confirmation email
  await $.ServiceExecution.start({
    serviceId: 'email-sender',
    inputs: {
      to: customer.email,
      template: 'order-confirmation',
      data: {
        orderId,
        customerName: customer.name,
        orderDetails: event.data,
      },
    },
  })

  console.log(`Order confirmation email sent for order ${orderId}`)
})

// Analytics Service - Tracks order metrics
eventBus.subscribe('order.created', async (event) => {
  const { orderId, total, currency, items } = event.data

  // Track order in analytics
  await $.ServiceExecution.start({
    serviceId: 'analytics-tracker',
    inputs: {
      event: 'order_created',
      properties: {
        orderId,
        revenue: total,
        currency,
        itemCount: items.length,
        timestamp: event.timestamp,
      },
    },
  })

  console.log(`Order ${orderId} tracked in analytics`)
})

// Inventory Service - Updates stock levels
eventBus.subscribe('order.created', async (event) => {
  const { orderId, items } = event.data

  try {
    // Reserve inventory for each item
    for (const item of items) {
      await $.ServiceExecution.start({
        serviceId: 'inventory-reserver',
        inputs: {
          productId: item.productId,
          quantity: item.quantity,
          orderId,
        },
      })
    }

    // Publish inventory.reserved event
    await eventBus.publish({
      type: 'inventory.reserved',
      data: {
        orderId,
        items,
      },
      metadata: {
        source: 'inventory-service',
        correlationId: orderId,
      },
    })

    console.log(`Inventory reserved for order ${orderId}`)
  } catch (error) {
    // Publish inventory.reservation.failed event
    await eventBus.publish({
      type: 'inventory.reservation.failed',
      data: {
        orderId,
        reason: (error as Error).message,
      },
      metadata: {
        source: 'inventory-service',
        correlationId: orderId,
      },
    })
  }
})

// Fraud Detection Service - Checks for fraud
eventBus.subscribe('order.created', async (event) => {
  const { orderId, customerId, total } = event.data

  // Run fraud detection
  const fraudCheck = await $.ServiceExecution.start({
    serviceId: 'fraud-detector',
    inputs: {
      orderId,
      customerId,
      amount: total,
      shippingAddress: event.data.shippingAddress,
    },
  }).then((e) => e.waitForCompletion())

  if (fraudCheck.outputs.isFraudulent) {
    // Publish order.flagged event
    await eventBus.publish({
      type: 'order.flagged',
      data: {
        orderId,
        reason: 'fraud',
        riskScore: fraudCheck.outputs.riskScore,
      },
      metadata: {
        source: 'fraud-detection-service',
        correlationId: orderId,
      },
    })
  } else {
    // Publish order.verified event
    await eventBus.publish({
      type: 'order.verified',
      data: {
        orderId,
        riskScore: fraudCheck.outputs.riskScore,
      },
      metadata: {
        source: 'fraud-detection-service',
        correlationId: orderId,
      },
    })
  }
})

// Payment Service - Processes payment after verification
eventBus.subscribe('order.verified', async (event) => {
  const { orderId } = event.data

  // Get order details
  const order = await db.findOne($.Order, { id: orderId })

  try {
    // Process payment
    const payment = await $.ServiceExecution.start({
      serviceId: 'payment-processor',
      inputs: {
        orderId,
        amount: order.total,
        currency: order.currency,
        paymentMethod: order.paymentMethod,
      },
    }).then((e) => e.waitForCompletion())

    // Publish payment.processed event
    await eventBus.publish({
      type: 'payment.processed',
      data: {
        orderId,
        paymentId: payment.outputs.paymentId,
        amount: order.total,
        currency: order.currency,
      },
      metadata: {
        source: 'payment-service',
        correlationId: orderId,
      },
    })

    // Update order status
    await db.update($.Order, orderId, { status: 'paid' })
  } catch (error) {
    // Publish payment.failed event
    await eventBus.publish({
      type: 'payment.failed',
      data: {
        orderId,
        reason: (error as Error).message,
      },
      metadata: {
        source: 'payment-service',
        correlationId: orderId,
      },
    })
  }
})

// Fulfillment Service - Starts fulfillment after payment
eventBus.subscribe('payment.processed', async (event) => {
  const { orderId } = event.data

  // Start fulfillment workflow
  const fulfillment = await $.ServiceExecution.start({
    serviceId: 'fulfillment-orchestrator',
    inputs: { orderId },
  })

  // Publish fulfillment.started event
  await eventBus.publish({
    type: 'fulfillment.started',
    data: {
      orderId,
      fulfillmentId: fulfillment.id,
    },
    metadata: {
      source: 'fulfillment-service',
      correlationId: orderId,
    },
  })
})

// Shipping Service - Creates shipment
eventBus.subscribe('fulfillment.started', async (event) => {
  const { orderId, fulfillmentId } = event.data

  const order = await db.findOne($.Order, { id: orderId })

  // Create shipment
  const shipment = await $.ServiceExecution.start({
    serviceId: 'shipment-creator',
    inputs: {
      orderId,
      fulfillmentId,
      address: order.shippingAddress,
      items: order.items,
    },
  }).then((e) => e.waitForCompletion())

  // Publish shipment.created event
  await eventBus.publish({
    type: 'shipment.created',
    data: {
      orderId,
      shipmentId: shipment.outputs.shipmentId,
      trackingNumber: shipment.outputs.trackingNumber,
      carrier: shipment.outputs.carrier,
      estimatedDelivery: shipment.outputs.estimatedDelivery,
    },
    metadata: {
      source: 'shipping-service',
      correlationId: orderId,
    },
  })
})

// Notification Service - Sends shipping notification
eventBus.subscribe('shipment.created', async (event) => {
  const { orderId, trackingNumber, carrier, estimatedDelivery } = event.data

  const order = await db.findOne($.Order, { id: orderId })
  const customer = await db.findOne($.Customer, { id: order.customerId })

  // Send shipping notification
  await $.ServiceExecution.start({
    serviceId: 'email-sender',
    inputs: {
      to: customer.email,
      template: 'shipping-notification',
      data: {
        customerName: customer.name,
        orderId,
        trackingNumber,
        carrier,
        estimatedDelivery,
      },
    },
  })
})

Example 2: Real-Time Analytics Pipeline

Event-driven analytics processing with aggregation and alerts.

import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'

// User Activity Tracking
export const trackUserActivity = async (activity: { userId: string; action: string; resource: string; metadata?: any }) => {
  await eventBus.publish({
    type: 'user.activity',
    data: activity,
    metadata: {
      source: 'activity-tracker',
    },
  })
}

// Real-time aggregation service
eventBus.subscribe('user.activity', async (event) => {
  const { userId, action, resource } = event.data

  // Update user activity counters in real-time
  await db.increment($.UserMetrics, userId, {
    [`actions.${action}`]: 1,
    [`resources.${resource}`]: 1,
    totalActivity: 1,
    lastActivityAt: new Date(),
  })

  // Check for activity milestones
  const metrics = await db.findOne($.UserMetrics, { userId })

  if (metrics.totalActivity % 100 === 0) {
    await eventBus.publish({
      type: 'user.milestone.reached',
      data: {
        userId,
        milestone: 'activity',
        count: metrics.totalActivity,
      },
    })
  }
})

// Behavioral analysis service
eventBus.subscribe('user.activity', async (event) => {
  const { userId, action } = event.data

  // Get recent activity
  const recentActivity = await db.query($.Event, {
    where: {
      type: 'user.activity',
      'data.userId': userId,
      timestamp: { $gte: new Date(Date.now() - 3600000) }, // Last hour
    },
    limit: 100,
  })

  // Analyze patterns
  const analysis = await ai.generate({
    model: 'gpt-5',
    prompt: `Analyze this user activity and identify:
1. Behavior patterns
2. User intent
3. Potential issues
4. Recommendations

Activity: ${JSON.stringify(
      recentActivity.map((e: any) => e.data),
      null,
      2
    )}`,
    maxTokens: 500,
  })

  // Store analysis
  await db.create($.UserBehaviorAnalysis, {
    userId,
    timestamp: new Date(),
    patterns: analysis.text,
    activityCount: recentActivity.length,
  })
})

// Anomaly detection service
eventBus.subscribe('user.activity', async (event) => {
  const { userId, action } = event.data

  // Get user's typical behavior
  const historicalMetrics = await db.findOne($.UserMetrics, { userId })

  // Calculate current activity rate
  const recentEvents = await db.count($.Event, {
    where: {
      type: 'user.activity',
      'data.userId': userId,
      timestamp: { $gte: new Date(Date.now() - 300000) }, // Last 5 minutes
    },
  })

  const currentRate = recentEvents / 5 // per minute
  const averageRate = historicalMetrics.averageActivityRate || 1

  // Detect anomalies
  if (currentRate > averageRate * 5) {
    await eventBus.publish({
      type: 'anomaly.detected',
      data: {
        userId,
        anomalyType: 'high-activity',
        currentRate,
        averageRate,
        severity: 'high',
      },
      metadata: {
        source: 'anomaly-detection-service',
      },
    })
  }
})

// Alert service - Sends alerts for anomalies
eventBus.subscribe('anomaly.detected', async (event) => {
  const { userId, anomalyType, severity } = event.data

  // Send alert to admin
  await $.ServiceExecution.start({
    serviceId: 'alert-sender',
    inputs: {
      channel: 'slack',
      severity,
      message: `Anomaly detected for user ${userId}: ${anomalyType}`,
      data: event.data,
    },
  })

  // Log anomaly
  await db.create($.SecurityAlert, {
    userId,
    type: anomalyType,
    severity,
    detectedAt: new Date(),
    data: event.data,
  })
})

// Report generation service
eventBus.subscribe('user.milestone.reached', async (event) => {
  const { userId, milestone, count } = event.data

  // Generate milestone report
  const report = await $.ServiceExecution.start({
    serviceId: 'report-generator',
    inputs: {
      type: 'milestone',
      userId,
      milestone,
      count,
    },
  }).then((e) => e.waitForCompletion())

  // Send congratulations email
  const user = await db.findOne($.User, { id: userId })

  await $.ServiceExecution.start({
    serviceId: 'email-sender',
    inputs: {
      to: user.email,
      template: 'milestone-celebration',
      data: {
        userName: user.name,
        milestone,
        count,
        reportUrl: report.outputs.url,
      },
    },
  })
})

Example 3: Content Publishing Workflow

Event-driven content creation and distribution pipeline.

import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'

// Content creation service
export const createContent = async (contentData: { title: string; author: string; type: string; draft: string }) => {
  const content = await db.create($.Content, {
    ...contentData,
    status: 'draft',
    createdAt: new Date(),
  })

  await eventBus.publish({
    type: 'content.created',
    data: {
      contentId: content.id,
      title: content.title,
      author: content.author,
      type: content.type,
    },
    metadata: {
      source: 'content-service',
    },
  })

  return content
}

// AI enhancement service
eventBus.subscribe('content.created', async (event) => {
  const { contentId } = event.data

  const content = await db.findOne($.Content, { id: contentId })

  // Enhance content with AI
  const enhanced = await ai.generate({
    model: 'gpt-5',
    prompt: `Enhance this content by:
1. Improving clarity and readability
2. Adding relevant details
3. Fixing grammar and style issues
4. Suggesting better structure

Content:
${content.draft}`,
    maxTokens: 2000,
  })

  // Update content
  await db.update($.Content, contentId, {
    enhanced: enhanced.text,
    enhancedAt: new Date(),
  })

  // Publish enhanced event
  await eventBus.publish({
    type: 'content.enhanced',
    data: {
      contentId,
      enhancementsApplied: true,
    },
    metadata: {
      source: 'ai-enhancement-service',
      correlationId: contentId,
    },
  })
})

// SEO optimization service
eventBus.subscribe('content.enhanced', async (event) => {
  const { contentId } = event.data

  const content = await db.findOne($.Content, { id: contentId })

  const seoOptimized = await $.ServiceExecution.start({
    serviceId: 'seo-optimizer',
    inputs: {
      title: content.title,
      content: content.enhanced,
      targetKeywords: content.keywords || [],
    },
  }).then((e) => e.waitForCompletion())

  await db.update($.Content, contentId, {
    optimizedTitle: seoOptimized.outputs.title,
    optimizedContent: seoOptimized.outputs.content,
    metaDescription: seoOptimized.outputs.metaDescription,
    seoScore: seoOptimized.outputs.score,
  })

  await eventBus.publish({
    type: 'content.optimized',
    data: {
      contentId,
      seoScore: seoOptimized.outputs.score,
    },
    metadata: {
      source: 'seo-service',
      correlationId: contentId,
    },
  })
})

// Image generation service
eventBus.subscribe('content.optimized', async (event) => {
  const { contentId } = event.data

  const content = await db.findOne($.Content, { id: contentId })

  const images = await $.ServiceExecution.start({
    serviceId: 'image-generator',
    inputs: {
      content: content.optimizedContent,
      count: 3,
      style: 'professional',
    },
  }).then((e) => e.waitForCompletion())

  await db.update($.Content, contentId, {
    images: images.outputs.urls,
  })

  await eventBus.publish({
    type: 'content.images.generated',
    data: {
      contentId,
      imageCount: images.outputs.urls.length,
    },
    metadata: {
      source: 'image-service',
      correlationId: contentId,
    },
  })
})

// Quality assurance service
eventBus.subscribe('content.images.generated', async (event) => {
  const { contentId } = event.data

  const content = await db.findOne($.Content, { id: contentId })

  const qa = await $.ServiceExecution.start({
    serviceId: 'content-qa',
    inputs: {
      title: content.optimizedTitle,
      content: content.optimizedContent,
      images: content.images,
      seoScore: content.seoScore,
    },
  }).then((e) => e.waitForCompletion())

  await db.update($.Content, contentId, {
    qaScore: qa.outputs.score,
    qaIssues: qa.outputs.issues,
    qaStatus: qa.outputs.passed ? 'passed' : 'failed',
  })

  if (qa.outputs.passed) {
    await eventBus.publish({
      type: 'content.ready.for.review',
      data: {
        contentId,
        qaScore: qa.outputs.score,
      },
      metadata: {
        source: 'qa-service',
        correlationId: contentId,
      },
    })
  } else {
    await eventBus.publish({
      type: 'content.qa.failed',
      data: {
        contentId,
        issues: qa.outputs.issues,
      },
      metadata: {
        source: 'qa-service',
        correlationId: contentId,
      },
    })
  }
})

// Review notification service
eventBus.subscribe('content.ready.for.review', async (event) => {
  const { contentId } = event.data

  const content = await db.findOne($.Content, { id: contentId })
  const author = await db.findOne($.User, { id: content.author })

  await $.ServiceExecution.start({
    serviceId: 'notification-sender',
    inputs: {
      recipient: content.reviewerId,
      type: 'content-review-request',
      data: {
        contentId,
        title: content.optimizedTitle,
        author: author.name,
        qaScore: content.qaScore,
      },
    },
  })
})

// Publishing service
export const publishContent = async (contentId: string) => {
  const content = await db.findOne($.Content, { id: contentId })

  // Update status
  await db.update($.Content, contentId, {
    status: 'published',
    publishedAt: new Date(),
  })

  // Publish event
  await eventBus.publish({
    type: 'content.published',
    data: {
      contentId,
      title: content.optimizedTitle,
      author: content.author,
      url: content.url,
    },
    metadata: {
      source: 'publishing-service',
    },
  })
}

// Distribution service - Distributes to multiple channels
eventBus.subscribe('content.published', async (event) => {
  const { contentId, title, url } = event.data

  const content = await db.findOne($.Content, { id: contentId })

  // Distribute to configured channels
  const channels = content.distributionChannels || ['blog', 'social']

  await Promise.all(
    channels.map((channel) =>
      $.ServiceExecution.start({
        serviceId: 'content-distributor',
        inputs: {
          channel,
          contentId,
          title: content.optimizedTitle,
          content: content.optimizedContent,
          images: content.images,
          url,
        },
      })
    )
  )

  await eventBus.publish({
    type: 'content.distributed',
    data: {
      contentId,
      channels,
    },
    metadata: {
      source: 'distribution-service',
      correlationId: contentId,
    },
  })
})

// Analytics tracking
eventBus.subscribe('content.published', async (event) => {
  const { contentId, title, author } = event.data

  await $.ServiceExecution.start({
    serviceId: 'analytics-tracker',
    inputs: {
      event: 'content_published',
      properties: {
        contentId,
        title,
        author,
        publishedAt: new Date().toISOString(),
      },
    },
  })
})

Advanced Patterns

Pattern 1: Event Sourcing

Store all changes as events and reconstruct state from event history.

import $, { db } from 'sdk.do'

export class EventSourcedAggregate {
  private events: any[] = []
  private version = 0

  async applyEvent(event: any): Promise<void> {
    // Store event
    await db.create($.Event, {
      ...event,
      aggregateId: this.getId(),
      version: this.version + 1,
    })

    // Apply to current state
    this.apply(event)
    this.version++
  }

  async loadFromHistory(): Promise<void> {
    // Load all events for this aggregate
    const events = await db.query($.Event, {
      where: { aggregateId: this.getId() },
      orderBy: { version: 'asc' },
    })

    // Replay events to rebuild state
    for (const event of events) {
      this.apply(event)
      this.version = event.version
    }
  }

  protected apply(event: any): void {
    // Override in subclass to apply event to state
  }

  protected getId(): string {
    // Override in subclass
    return ''
  }
}

// Example: Order aggregate
export class OrderAggregate extends EventSourcedAggregate {
  private orderId: string
  private status: string = 'pending'
  private items: any[] = []
  private total: number = 0

  constructor(orderId: string) {
    super()
    this.orderId = orderId
  }

  protected getId(): string {
    return this.orderId
  }

  protected apply(event: any): void {
    switch (event.type) {
      case 'order.created':
        this.status = 'created'
        this.items = event.data.items
        this.total = event.data.total
        break

      case 'order.paid':
        this.status = 'paid'
        break

      case 'order.shipped':
        this.status = 'shipped'
        break

      case 'order.delivered':
        this.status = 'delivered'
        break

      case 'order.cancelled':
        this.status = 'cancelled'
        break
    }
  }

  // Commands that generate events
  async create(orderData: any): Promise<void> {
    await this.applyEvent({
      type: 'order.created',
      data: orderData,
    })
  }

  async markPaid(paymentId: string): Promise<void> {
    await this.applyEvent({
      type: 'order.paid',
      data: { paymentId },
    })
  }

  async ship(trackingNumber: string): Promise<void> {
    await this.applyEvent({
      type: 'order.shipped',
      data: { trackingNumber },
    })
  }

  getStatus(): string {
    return this.status
  }
}

Pattern 2: Event Replay and Time Travel

Replay events to rebuild system state at any point in time.

import $, { db } from 'sdk.do'

export class EventReplayer {
  async replayEvents(startTime: Date, endTime: Date, eventTypes?: string[]): Promise<void> {
    // Get events in time range
    const query: any = {
      timestamp: {
        $gte: startTime,
        $lte: endTime,
      },
    }

    if (eventTypes) {
      query.type = { $in: eventTypes }
    }

    const events = await db.query($.Event, {
      where: query,
      orderBy: { timestamp: 'asc' },
    })

    console.log(`Replaying ${events.length} events...`)

    // Replay each event
    for (const event of events) {
      await this.replayEvent(event)
    }

    console.log('Replay complete')
  }

  private async replayEvent(event: any): Promise<void> {
    // Re-publish event to event bus
    await eventBus.publish(event)
  }

  async rebuildState(aggregateId: string): Promise<any> {
    // Get all events for aggregate
    const events = await db.query($.Event, {
      where: { aggregateId },
      orderBy: { version: 'asc' },
    })

    // Replay events to rebuild state
    let state = {}

    for (const event of events) {
      state = this.applyEvent(state, event)
    }

    return state
  }

  private applyEvent(state: any, event: any): any {
    // Apply event transformations to state
    // This would be specific to your domain
    return { ...state, ...event.data }
  }
}

Pattern 3: Event Correlation and Causation

Track relationships between events.

import $, { db } from 'sdk.do'

export interface CorrelatedEvent {
  id: string
  type: string
  data: any
  correlationId: string // Groups related events
  causationId?: string // Links cause and effect
  timestamp: string
}

export class EventCorrelator {
  async publishCorrelatedEvent(
    event: {
      type: string
      data: any
    },
    correlationId: string,
    causationId?: string
  ): Promise<void> {
    const correlatedEvent: CorrelatedEvent = {
      id: generateEventId(),
      ...event,
      correlationId,
      causationId,
      timestamp: new Date().toISOString(),
    }

    await eventBus.publish(correlatedEvent)
  }

  async getEventChain(correlationId: string): Promise<any[]> {
    // Get all events with this correlation ID
    const events = await db.query($.Event, {
      where: { 'metadata.correlationId': correlationId },
      orderBy: { timestamp: 'asc' },
    })

    // Build causal chain
    return this.buildCausalChain(events)
  }

  private buildCausalChain(events: any[]): any[] {
    const eventMap = new Map(events.map((e) => [e.id, e]))
    const roots: any[] = []

    // Find root events (no causation)
    for (const event of events) {
      if (!event.metadata.causationId) {
        roots.push(this.buildEventTree(event, eventMap))
      }
    }

    return roots
  }

  private buildEventTree(event: any, eventMap: Map<string, any>): any {
    const children = Array.from(eventMap.values())
      .filter((e) => e.metadata.causationId === event.id)
      .map((e) => this.buildEventTree(e, eventMap))

    return {
      ...event,
      children,
    }
  }
}

// Usage
const correlator = new EventCorrelator()

// Original event
await correlator.publishCorrelatedEvent(
  {
    type: 'order.created',
    data: { orderId: '123' },
  },
  'order-123' // correlation ID
)

// Caused by order.created
await correlator.publishCorrelatedEvent(
  {
    type: 'payment.initiated',
    data: { orderId: '123' },
  },
  'order-123', // same correlation ID
  'evt_order_created_id' // causation ID
)

Pattern 4: Fan-Out and Fan-In

One event triggers multiple services, then aggregate results.

import $, { db, on, send } from 'sdk.do'
import { eventBus } from './event-bus'

// Fan-Out: One event triggers multiple services
export class FanOutPattern {
  async execute(event: any, services: string[]): Promise<void> {
    // Publish fan-out initiated event
    await eventBus.publish({
      type: 'fanout.initiated',
      data: {
        originalEvent: event,
        targetServices: services,
        fanoutId: generateId(),
      },
    })

    // Trigger all services in parallel
    await Promise.all(
      services.map((serviceId) =>
        $.ServiceExecution.start({
          serviceId,
          inputs: event.data,
        })
      )
    )
  }
}

// Fan-In: Aggregate results from multiple events
export class FanInPattern {
  private aggregations = new Map<string, any>()

  async waitForAll(correlationId: string, expectedEventTypes: string[], timeout: number = 30000): Promise<any> {
    return new Promise((resolve, reject) => {
      const results: any = {}
      const received = new Set<string>()

      // Set timeout
      const timeoutId = setTimeout(() => {
        reject(new Error('Fan-in timeout'))
      }, timeout)

      // Subscribe to expected event types
      for (const eventType of expectedEventTypes) {
        eventBus.subscribe(eventType, async (event) => {
          if (event.metadata.correlationId === correlationId) {
            received.add(eventType)
            results[eventType] = event.data

            // Check if all events received
            if (received.size === expectedEventTypes.length) {
              clearTimeout(timeoutId)
              resolve(results)
            }
          }
        })
      }
    })
  }
}

// Usage example: Multi-service data enrichment
export const fanOutFanInExample = async (leadData: any) => {
  const correlationId = generateId()

  // Fan-Out: Trigger multiple enrichment services
  await Promise.all([
    $.ServiceExecution.start({
      serviceId: 'email-enrichment',
      inputs: { ...leadData, correlationId },
    }),
    $.ServiceExecution.start({
      serviceId: 'company-enrichment',
      inputs: { ...leadData, correlationId },
    }),
    $.ServiceExecution.start({
      serviceId: 'social-enrichment',
      inputs: { ...leadData, correlationId },
    }),
  ])

  // Fan-In: Wait for all results
  const fanIn = new FanInPattern()
  const results = await fanIn.waitForAll(correlationId, ['email.enriched', 'company.enriched', 'social.enriched'], 60000)

  // Aggregate results
  return {
    ...leadData,
    email: results['email.enriched'],
    company: results['company.enriched'],
    social: results['social.enriched'],
  }
}

function generateId(): string {
  return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

Error Handling

Dead Letter Queue

import $, { db } from 'sdk.do'

export class DeadLetterQueue {
  async add(event: any, handler: string, error: Error): Promise<void> {
    await db.create($.DeadLetterEvent, {
      event,
      handler,
      error: error.message,
      stack: error.stack,
      failedAt: new Date(),
      retryCount: 0,
    })
  }

  async retry(deadLetterEventId: string): Promise<void> {
    const dlEvent = await db.findOne($.DeadLetterEvent, { id: deadLetterEventId })

    try {
      // Republish event
      await eventBus.publish(dlEvent.event)

      // Mark as retried
      await db.update($.DeadLetterEvent, deadLetterEventId, {
        retryCount: dlEvent.retryCount + 1,
        lastRetryAt: new Date(),
      })
    } catch (error) {
      // Retry failed
      await db.update($.DeadLetterEvent, deadLetterEventId, {
        retryCount: dlEvent.retryCount + 1,
        lastRetryAt: new Date(),
        lastError: (error as Error).message,
      })
    }
  }

  async retryAll(maxRetries: number = 3): Promise<void> {
    const dlEvents = await db.query($.DeadLetterEvent, {
      where: {
        retryCount: { $lt: maxRetries },
      },
    })

    for (const dlEvent of dlEvents) {
      await this.retry(dlEvent.id)
    }
  }
}

Idempotent Event Handlers

import $, { db } from 'sdk.do'

export class IdempotentHandler {
  private processedEvents = new Set<string>()

  async handle(event: any, handler: (event: any) => Promise<void>): Promise<void> {
    // Check if already processed
    if (this.processedEvents.has(event.id)) {
      console.log(`Event ${event.id} already processed, skipping`)
      return
    }

    // Check database for processed events
    const processed = await db.findOne($.ProcessedEvent, { eventId: event.id })

    if (processed) {
      this.processedEvents.add(event.id)
      return
    }

    try {
      // Process event
      await handler(event)

      // Mark as processed
      await db.create($.ProcessedEvent, {
        eventId: event.id,
        eventType: event.type,
        processedAt: new Date(),
      })

      this.processedEvents.add(event.id)
    } catch (error) {
      // Don't mark as processed if error
      throw error
    }
  }
}

// Usage
const idempotentHandler = new IdempotentHandler()

eventBus.subscribe('order.created', async (event) => {
  await idempotentHandler.handle(event, async (e) => {
    // This will only execute once per event ID
    await processOrder(e.data)
  })
})

Testing Event-Driven Systems

import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'
import { eventBus } from './event-bus'

describe('Event-Driven Architecture', () => {
  it('should publish and subscribe to events', async () => {
    const received: any[] = []

    // Subscribe to event
    eventBus.subscribe('test.event', async (event) => {
      received.push(event)
    })

    // Publish event
    await eventBus.publish({
      type: 'test.event',
      data: { message: 'Hello' },
    })

    // Wait for async processing
    await new Promise((resolve) => setTimeout(resolve, 100))

    expect(received).toHaveLength(1)
    expect(received[0].data.message).toBe('Hello')
  })

  it('should handle multiple subscribers', async () => {
    const subscriber1Results: any[] = []
    const subscriber2Results: any[] = []

    eventBus.subscribe('multi.test', async (event) => {
      subscriber1Results.push(event)
    })

    eventBus.subscribe('multi.test', async (event) => {
      subscriber2Results.push(event)
    })

    await eventBus.publish({
      type: 'multi.test',
      data: { value: 42 },
    })

    await new Promise((resolve) => setTimeout(resolve, 100))

    expect(subscriber1Results).toHaveLength(1)
    expect(subscriber2Results).toHaveLength(1)
  })

  it('should correlate related events', async () => {
    const correlationId = 'test-correlation-123'

    await eventBus.publish({
      type: 'order.created',
      data: { orderId: '123' },
      metadata: { correlationId },
    })

    await eventBus.publish({
      type: 'payment.processed',
      data: { orderId: '123' },
      metadata: { correlationId },
    })

    const correlator = new EventCorrelator()
    const chain = await correlator.getEventChain(correlationId)

    expect(chain.length).toBeGreaterThan(0)
  })
})

Best Practices

  1. Event Design: Design events to be immutable and self-contained
  2. Idempotency: Make event handlers idempotent
  3. Event Versioning: Version events for backward compatibility
  4. Ordering: Don't assume event order unless guaranteed
  5. Error Handling: Use dead letter queues for failed events
  6. Correlation: Track related events with correlation IDs
  7. Monitoring: Monitor event flow and processing times
  8. Testing: Test event handlers in isolation
  9. Documentation: Document event schemas and contracts
  10. Replay: Design for event replay and debugging

Additional Resources