Event-Driven Service Orchestration
Complete guide to building event-driven service architectures with event routing, fan-out/fan-in patterns, event sourcing, and reactive service orchestration
Build loosely coupled, scalable service architectures where services react to events rather than direct calls, enabling real-time processing, flexible workflows, and system-wide decoupling.
Overview
Event-driven architecture (EDA) is a design pattern where services communicate through events rather than direct calls. Services publish events when something happens, and other services subscribe to events they're interested in. This creates highly decoupled, scalable systems that can evolve independently.
When to Use Event-Driven Architecture
Use event-driven patterns when:
- Loose Coupling: Services should be independent and not know about each other
- Scalability: System needs to handle high volumes of concurrent operations
- Flexibility: New services need to be added without changing existing ones
- Real-Time: Immediate reactions to events are required
- Audit Trail: Complete history of system changes must be maintained
- Event Replay: Need to reconstruct system state from event history
- Asynchronous: Operations don't need immediate responses
- Fan-Out: One event should trigger multiple independent actions
Avoid this pattern when:
- Immediate, synchronous responses are required
- System is simple with few services
- Strong consistency is critical
- Event ordering is difficult to maintain
- Debugging complexity is a major concern
Architecture
Core Implementation
Event Bus
import $, { db, on, send } from 'sdk.do'
export class EventBus {
private subscribers = new Map<string, Array<(event: any) => Promise<void>>>()
subscribe(eventType: string, handler: (event: any) => Promise<void>): void {
if (!this.subscribers.has(eventType)) {
this.subscribers.set(eventType, [])
}
this.subscribers.get(eventType)!.push(handler)
}
async publish(event: { type: string; data: any; metadata?: any }): Promise<void> {
const fullEvent = {
id: generateEventId(),
timestamp: new Date().toISOString(),
...event,
metadata: {
...event.metadata,
publishedAt: new Date().toISOString(),
},
}
// Store event for audit trail
await this.storeEvent(fullEvent)
// Get subscribers for this event type
const handlers = this.subscribers.get(event.type) || []
// Execute all handlers in parallel
await Promise.allSettled(
handlers.map(async (handler) => {
try {
await handler(fullEvent)
} catch (error) {
// Send to dead letter queue
await this.handleFailedEvent(fullEvent, handler, error as Error)
}
})
)
}
private async storeEvent(event: any): Promise<void> {
await db.create($.Event, event)
}
private async handleFailedEvent(event: any, handler: Function, error: Error): Promise<void> {
await db.create($.DeadLetterEvent, {
event,
handler: handler.name,
error: error.message,
failedAt: new Date().toISOString(),
})
}
}
function generateEventId(): string {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}
// Global event bus instance
export const eventBus = new EventBus()Event Schema
import $, { db } from 'sdk.do'
export interface ServiceEvent<T = any> {
id: string
type: string
version: string
timestamp: string
source: string
data: T
metadata?: {
correlationId?: string
causationId?: string
userId?: string
traceId?: string
[key: string]: any
}
}
// Event type definitions
export namespace EventTypes {
// Order events
export interface OrderCreated {
orderId: string
customerId: string
items: Array<{
productId: string
quantity: number
price: number
}>
total: number
currency: string
}
export interface OrderPaid {
orderId: string
paymentId: string
amount: number
currency: string
}
export interface OrderShipped {
orderId: string
trackingNumber: string
carrier: string
estimatedDelivery: string
}
// Service execution events
export interface ServiceExecutionStarted {
executionId: string
serviceId: string
inputs: any
}
export interface ServiceExecutionCompleted {
executionId: string
serviceId: string
outputs: any
duration: number
}
export interface ServiceExecutionFailed {
executionId: string
serviceId: string
error: string
retryable: boolean
}
}Example 1: Order Processing System
Complete event-driven order processing with multiple services.
import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'
// Order Service - Publishes order events
export const orderService = {
async createOrder(orderData: any): Promise<any> {
// Create order in database
const order = await db.create($.Order, {
...orderData,
status: 'created',
createdAt: new Date(),
})
// Publish order.created event
await eventBus.publish({
type: 'order.created',
data: {
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total,
currency: order.currency,
shippingAddress: order.shippingAddress,
},
metadata: {
source: 'order-service',
correlationId: order.id,
},
})
return order
},
}
// Email Service - Reacts to order events
eventBus.subscribe('order.created', async (event) => {
const { orderId, customerId } = event.data
// Get customer details
const customer = await db.findOne($.Customer, { id: customerId })
// Send order confirmation email
await $.ServiceExecution.start({
serviceId: 'email-sender',
inputs: {
to: customer.email,
template: 'order-confirmation',
data: {
orderId,
customerName: customer.name,
orderDetails: event.data,
},
},
})
console.log(`Order confirmation email sent for order ${orderId}`)
})
// Analytics Service - Tracks order metrics
eventBus.subscribe('order.created', async (event) => {
const { orderId, total, currency, items } = event.data
// Track order in analytics
await $.ServiceExecution.start({
serviceId: 'analytics-tracker',
inputs: {
event: 'order_created',
properties: {
orderId,
revenue: total,
currency,
itemCount: items.length,
timestamp: event.timestamp,
},
},
})
console.log(`Order ${orderId} tracked in analytics`)
})
// Inventory Service - Updates stock levels
eventBus.subscribe('order.created', async (event) => {
const { orderId, items } = event.data
try {
// Reserve inventory for each item
for (const item of items) {
await $.ServiceExecution.start({
serviceId: 'inventory-reserver',
inputs: {
productId: item.productId,
quantity: item.quantity,
orderId,
},
})
}
// Publish inventory.reserved event
await eventBus.publish({
type: 'inventory.reserved',
data: {
orderId,
items,
},
metadata: {
source: 'inventory-service',
correlationId: orderId,
},
})
console.log(`Inventory reserved for order ${orderId}`)
} catch (error) {
// Publish inventory.reservation.failed event
await eventBus.publish({
type: 'inventory.reservation.failed',
data: {
orderId,
reason: (error as Error).message,
},
metadata: {
source: 'inventory-service',
correlationId: orderId,
},
})
}
})
// Fraud Detection Service - Checks for fraud
eventBus.subscribe('order.created', async (event) => {
const { orderId, customerId, total } = event.data
// Run fraud detection
const fraudCheck = await $.ServiceExecution.start({
serviceId: 'fraud-detector',
inputs: {
orderId,
customerId,
amount: total,
shippingAddress: event.data.shippingAddress,
},
}).then((e) => e.waitForCompletion())
if (fraudCheck.outputs.isFraudulent) {
// Publish order.flagged event
await eventBus.publish({
type: 'order.flagged',
data: {
orderId,
reason: 'fraud',
riskScore: fraudCheck.outputs.riskScore,
},
metadata: {
source: 'fraud-detection-service',
correlationId: orderId,
},
})
} else {
// Publish order.verified event
await eventBus.publish({
type: 'order.verified',
data: {
orderId,
riskScore: fraudCheck.outputs.riskScore,
},
metadata: {
source: 'fraud-detection-service',
correlationId: orderId,
},
})
}
})
// Payment Service - Processes payment after verification
eventBus.subscribe('order.verified', async (event) => {
const { orderId } = event.data
// Get order details
const order = await db.findOne($.Order, { id: orderId })
try {
// Process payment
const payment = await $.ServiceExecution.start({
serviceId: 'payment-processor',
inputs: {
orderId,
amount: order.total,
currency: order.currency,
paymentMethod: order.paymentMethod,
},
}).then((e) => e.waitForCompletion())
// Publish payment.processed event
await eventBus.publish({
type: 'payment.processed',
data: {
orderId,
paymentId: payment.outputs.paymentId,
amount: order.total,
currency: order.currency,
},
metadata: {
source: 'payment-service',
correlationId: orderId,
},
})
// Update order status
await db.update($.Order, orderId, { status: 'paid' })
} catch (error) {
// Publish payment.failed event
await eventBus.publish({
type: 'payment.failed',
data: {
orderId,
reason: (error as Error).message,
},
metadata: {
source: 'payment-service',
correlationId: orderId,
},
})
}
})
// Fulfillment Service - Starts fulfillment after payment
eventBus.subscribe('payment.processed', async (event) => {
const { orderId } = event.data
// Start fulfillment workflow
const fulfillment = await $.ServiceExecution.start({
serviceId: 'fulfillment-orchestrator',
inputs: { orderId },
})
// Publish fulfillment.started event
await eventBus.publish({
type: 'fulfillment.started',
data: {
orderId,
fulfillmentId: fulfillment.id,
},
metadata: {
source: 'fulfillment-service',
correlationId: orderId,
},
})
})
// Shipping Service - Creates shipment
eventBus.subscribe('fulfillment.started', async (event) => {
const { orderId, fulfillmentId } = event.data
const order = await db.findOne($.Order, { id: orderId })
// Create shipment
const shipment = await $.ServiceExecution.start({
serviceId: 'shipment-creator',
inputs: {
orderId,
fulfillmentId,
address: order.shippingAddress,
items: order.items,
},
}).then((e) => e.waitForCompletion())
// Publish shipment.created event
await eventBus.publish({
type: 'shipment.created',
data: {
orderId,
shipmentId: shipment.outputs.shipmentId,
trackingNumber: shipment.outputs.trackingNumber,
carrier: shipment.outputs.carrier,
estimatedDelivery: shipment.outputs.estimatedDelivery,
},
metadata: {
source: 'shipping-service',
correlationId: orderId,
},
})
})
// Notification Service - Sends shipping notification
eventBus.subscribe('shipment.created', async (event) => {
const { orderId, trackingNumber, carrier, estimatedDelivery } = event.data
const order = await db.findOne($.Order, { id: orderId })
const customer = await db.findOne($.Customer, { id: order.customerId })
// Send shipping notification
await $.ServiceExecution.start({
serviceId: 'email-sender',
inputs: {
to: customer.email,
template: 'shipping-notification',
data: {
customerName: customer.name,
orderId,
trackingNumber,
carrier,
estimatedDelivery,
},
},
})
})Example 2: Real-Time Analytics Pipeline
Event-driven analytics processing with aggregation and alerts.
import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'
// User Activity Tracking
export const trackUserActivity = async (activity: { userId: string; action: string; resource: string; metadata?: any }) => {
await eventBus.publish({
type: 'user.activity',
data: activity,
metadata: {
source: 'activity-tracker',
},
})
}
// Real-time aggregation service
eventBus.subscribe('user.activity', async (event) => {
const { userId, action, resource } = event.data
// Update user activity counters in real-time
await db.increment($.UserMetrics, userId, {
[`actions.${action}`]: 1,
[`resources.${resource}`]: 1,
totalActivity: 1,
lastActivityAt: new Date(),
})
// Check for activity milestones
const metrics = await db.findOne($.UserMetrics, { userId })
if (metrics.totalActivity % 100 === 0) {
await eventBus.publish({
type: 'user.milestone.reached',
data: {
userId,
milestone: 'activity',
count: metrics.totalActivity,
},
})
}
})
// Behavioral analysis service
eventBus.subscribe('user.activity', async (event) => {
const { userId, action } = event.data
// Get recent activity
const recentActivity = await db.query($.Event, {
where: {
type: 'user.activity',
'data.userId': userId,
timestamp: { $gte: new Date(Date.now() - 3600000) }, // Last hour
},
limit: 100,
})
// Analyze patterns
const analysis = await ai.generate({
model: 'gpt-5',
prompt: `Analyze this user activity and identify:
1. Behavior patterns
2. User intent
3. Potential issues
4. Recommendations
Activity: ${JSON.stringify(
recentActivity.map((e: any) => e.data),
null,
2
)}`,
maxTokens: 500,
})
// Store analysis
await db.create($.UserBehaviorAnalysis, {
userId,
timestamp: new Date(),
patterns: analysis.text,
activityCount: recentActivity.length,
})
})
// Anomaly detection service
eventBus.subscribe('user.activity', async (event) => {
const { userId, action } = event.data
// Get user's typical behavior
const historicalMetrics = await db.findOne($.UserMetrics, { userId })
// Calculate current activity rate
const recentEvents = await db.count($.Event, {
where: {
type: 'user.activity',
'data.userId': userId,
timestamp: { $gte: new Date(Date.now() - 300000) }, // Last 5 minutes
},
})
const currentRate = recentEvents / 5 // per minute
const averageRate = historicalMetrics.averageActivityRate || 1
// Detect anomalies
if (currentRate > averageRate * 5) {
await eventBus.publish({
type: 'anomaly.detected',
data: {
userId,
anomalyType: 'high-activity',
currentRate,
averageRate,
severity: 'high',
},
metadata: {
source: 'anomaly-detection-service',
},
})
}
})
// Alert service - Sends alerts for anomalies
eventBus.subscribe('anomaly.detected', async (event) => {
const { userId, anomalyType, severity } = event.data
// Send alert to admin
await $.ServiceExecution.start({
serviceId: 'alert-sender',
inputs: {
channel: 'slack',
severity,
message: `Anomaly detected for user ${userId}: ${anomalyType}`,
data: event.data,
},
})
// Log anomaly
await db.create($.SecurityAlert, {
userId,
type: anomalyType,
severity,
detectedAt: new Date(),
data: event.data,
})
})
// Report generation service
eventBus.subscribe('user.milestone.reached', async (event) => {
const { userId, milestone, count } = event.data
// Generate milestone report
const report = await $.ServiceExecution.start({
serviceId: 'report-generator',
inputs: {
type: 'milestone',
userId,
milestone,
count,
},
}).then((e) => e.waitForCompletion())
// Send congratulations email
const user = await db.findOne($.User, { id: userId })
await $.ServiceExecution.start({
serviceId: 'email-sender',
inputs: {
to: user.email,
template: 'milestone-celebration',
data: {
userName: user.name,
milestone,
count,
reportUrl: report.outputs.url,
},
},
})
})Example 3: Content Publishing Workflow
Event-driven content creation and distribution pipeline.
import $, { db, on, send, ai } from 'sdk.do'
import { eventBus } from './event-bus'
// Content creation service
export const createContent = async (contentData: { title: string; author: string; type: string; draft: string }) => {
const content = await db.create($.Content, {
...contentData,
status: 'draft',
createdAt: new Date(),
})
await eventBus.publish({
type: 'content.created',
data: {
contentId: content.id,
title: content.title,
author: content.author,
type: content.type,
},
metadata: {
source: 'content-service',
},
})
return content
}
// AI enhancement service
eventBus.subscribe('content.created', async (event) => {
const { contentId } = event.data
const content = await db.findOne($.Content, { id: contentId })
// Enhance content with AI
const enhanced = await ai.generate({
model: 'gpt-5',
prompt: `Enhance this content by:
1. Improving clarity and readability
2. Adding relevant details
3. Fixing grammar and style issues
4. Suggesting better structure
Content:
${content.draft}`,
maxTokens: 2000,
})
// Update content
await db.update($.Content, contentId, {
enhanced: enhanced.text,
enhancedAt: new Date(),
})
// Publish enhanced event
await eventBus.publish({
type: 'content.enhanced',
data: {
contentId,
enhancementsApplied: true,
},
metadata: {
source: 'ai-enhancement-service',
correlationId: contentId,
},
})
})
// SEO optimization service
eventBus.subscribe('content.enhanced', async (event) => {
const { contentId } = event.data
const content = await db.findOne($.Content, { id: contentId })
const seoOptimized = await $.ServiceExecution.start({
serviceId: 'seo-optimizer',
inputs: {
title: content.title,
content: content.enhanced,
targetKeywords: content.keywords || [],
},
}).then((e) => e.waitForCompletion())
await db.update($.Content, contentId, {
optimizedTitle: seoOptimized.outputs.title,
optimizedContent: seoOptimized.outputs.content,
metaDescription: seoOptimized.outputs.metaDescription,
seoScore: seoOptimized.outputs.score,
})
await eventBus.publish({
type: 'content.optimized',
data: {
contentId,
seoScore: seoOptimized.outputs.score,
},
metadata: {
source: 'seo-service',
correlationId: contentId,
},
})
})
// Image generation service
eventBus.subscribe('content.optimized', async (event) => {
const { contentId } = event.data
const content = await db.findOne($.Content, { id: contentId })
const images = await $.ServiceExecution.start({
serviceId: 'image-generator',
inputs: {
content: content.optimizedContent,
count: 3,
style: 'professional',
},
}).then((e) => e.waitForCompletion())
await db.update($.Content, contentId, {
images: images.outputs.urls,
})
await eventBus.publish({
type: 'content.images.generated',
data: {
contentId,
imageCount: images.outputs.urls.length,
},
metadata: {
source: 'image-service',
correlationId: contentId,
},
})
})
// Quality assurance service
eventBus.subscribe('content.images.generated', async (event) => {
const { contentId } = event.data
const content = await db.findOne($.Content, { id: contentId })
const qa = await $.ServiceExecution.start({
serviceId: 'content-qa',
inputs: {
title: content.optimizedTitle,
content: content.optimizedContent,
images: content.images,
seoScore: content.seoScore,
},
}).then((e) => e.waitForCompletion())
await db.update($.Content, contentId, {
qaScore: qa.outputs.score,
qaIssues: qa.outputs.issues,
qaStatus: qa.outputs.passed ? 'passed' : 'failed',
})
if (qa.outputs.passed) {
await eventBus.publish({
type: 'content.ready.for.review',
data: {
contentId,
qaScore: qa.outputs.score,
},
metadata: {
source: 'qa-service',
correlationId: contentId,
},
})
} else {
await eventBus.publish({
type: 'content.qa.failed',
data: {
contentId,
issues: qa.outputs.issues,
},
metadata: {
source: 'qa-service',
correlationId: contentId,
},
})
}
})
// Review notification service
eventBus.subscribe('content.ready.for.review', async (event) => {
const { contentId } = event.data
const content = await db.findOne($.Content, { id: contentId })
const author = await db.findOne($.User, { id: content.author })
await $.ServiceExecution.start({
serviceId: 'notification-sender',
inputs: {
recipient: content.reviewerId,
type: 'content-review-request',
data: {
contentId,
title: content.optimizedTitle,
author: author.name,
qaScore: content.qaScore,
},
},
})
})
// Publishing service
export const publishContent = async (contentId: string) => {
const content = await db.findOne($.Content, { id: contentId })
// Update status
await db.update($.Content, contentId, {
status: 'published',
publishedAt: new Date(),
})
// Publish event
await eventBus.publish({
type: 'content.published',
data: {
contentId,
title: content.optimizedTitle,
author: content.author,
url: content.url,
},
metadata: {
source: 'publishing-service',
},
})
}
// Distribution service - Distributes to multiple channels
eventBus.subscribe('content.published', async (event) => {
const { contentId, title, url } = event.data
const content = await db.findOne($.Content, { id: contentId })
// Distribute to configured channels
const channels = content.distributionChannels || ['blog', 'social']
await Promise.all(
channels.map((channel) =>
$.ServiceExecution.start({
serviceId: 'content-distributor',
inputs: {
channel,
contentId,
title: content.optimizedTitle,
content: content.optimizedContent,
images: content.images,
url,
},
})
)
)
await eventBus.publish({
type: 'content.distributed',
data: {
contentId,
channels,
},
metadata: {
source: 'distribution-service',
correlationId: contentId,
},
})
})
// Analytics tracking
eventBus.subscribe('content.published', async (event) => {
const { contentId, title, author } = event.data
await $.ServiceExecution.start({
serviceId: 'analytics-tracker',
inputs: {
event: 'content_published',
properties: {
contentId,
title,
author,
publishedAt: new Date().toISOString(),
},
},
})
})Advanced Patterns
Pattern 1: Event Sourcing
Store all changes as events and reconstruct state from event history.
import $, { db } from 'sdk.do'
export class EventSourcedAggregate {
private events: any[] = []
private version = 0
async applyEvent(event: any): Promise<void> {
// Store event
await db.create($.Event, {
...event,
aggregateId: this.getId(),
version: this.version + 1,
})
// Apply to current state
this.apply(event)
this.version++
}
async loadFromHistory(): Promise<void> {
// Load all events for this aggregate
const events = await db.query($.Event, {
where: { aggregateId: this.getId() },
orderBy: { version: 'asc' },
})
// Replay events to rebuild state
for (const event of events) {
this.apply(event)
this.version = event.version
}
}
protected apply(event: any): void {
// Override in subclass to apply event to state
}
protected getId(): string {
// Override in subclass
return ''
}
}
// Example: Order aggregate
export class OrderAggregate extends EventSourcedAggregate {
private orderId: string
private status: string = 'pending'
private items: any[] = []
private total: number = 0
constructor(orderId: string) {
super()
this.orderId = orderId
}
protected getId(): string {
return this.orderId
}
protected apply(event: any): void {
switch (event.type) {
case 'order.created':
this.status = 'created'
this.items = event.data.items
this.total = event.data.total
break
case 'order.paid':
this.status = 'paid'
break
case 'order.shipped':
this.status = 'shipped'
break
case 'order.delivered':
this.status = 'delivered'
break
case 'order.cancelled':
this.status = 'cancelled'
break
}
}
// Commands that generate events
async create(orderData: any): Promise<void> {
await this.applyEvent({
type: 'order.created',
data: orderData,
})
}
async markPaid(paymentId: string): Promise<void> {
await this.applyEvent({
type: 'order.paid',
data: { paymentId },
})
}
async ship(trackingNumber: string): Promise<void> {
await this.applyEvent({
type: 'order.shipped',
data: { trackingNumber },
})
}
getStatus(): string {
return this.status
}
}Pattern 2: Event Replay and Time Travel
Replay events to rebuild system state at any point in time.
import $, { db } from 'sdk.do'
export class EventReplayer {
async replayEvents(startTime: Date, endTime: Date, eventTypes?: string[]): Promise<void> {
// Get events in time range
const query: any = {
timestamp: {
$gte: startTime,
$lte: endTime,
},
}
if (eventTypes) {
query.type = { $in: eventTypes }
}
const events = await db.query($.Event, {
where: query,
orderBy: { timestamp: 'asc' },
})
console.log(`Replaying ${events.length} events...`)
// Replay each event
for (const event of events) {
await this.replayEvent(event)
}
console.log('Replay complete')
}
private async replayEvent(event: any): Promise<void> {
// Re-publish event to event bus
await eventBus.publish(event)
}
async rebuildState(aggregateId: string): Promise<any> {
// Get all events for aggregate
const events = await db.query($.Event, {
where: { aggregateId },
orderBy: { version: 'asc' },
})
// Replay events to rebuild state
let state = {}
for (const event of events) {
state = this.applyEvent(state, event)
}
return state
}
private applyEvent(state: any, event: any): any {
// Apply event transformations to state
// This would be specific to your domain
return { ...state, ...event.data }
}
}Pattern 3: Event Correlation and Causation
Track relationships between events.
import $, { db } from 'sdk.do'
export interface CorrelatedEvent {
id: string
type: string
data: any
correlationId: string // Groups related events
causationId?: string // Links cause and effect
timestamp: string
}
export class EventCorrelator {
async publishCorrelatedEvent(
event: {
type: string
data: any
},
correlationId: string,
causationId?: string
): Promise<void> {
const correlatedEvent: CorrelatedEvent = {
id: generateEventId(),
...event,
correlationId,
causationId,
timestamp: new Date().toISOString(),
}
await eventBus.publish(correlatedEvent)
}
async getEventChain(correlationId: string): Promise<any[]> {
// Get all events with this correlation ID
const events = await db.query($.Event, {
where: { 'metadata.correlationId': correlationId },
orderBy: { timestamp: 'asc' },
})
// Build causal chain
return this.buildCausalChain(events)
}
private buildCausalChain(events: any[]): any[] {
const eventMap = new Map(events.map((e) => [e.id, e]))
const roots: any[] = []
// Find root events (no causation)
for (const event of events) {
if (!event.metadata.causationId) {
roots.push(this.buildEventTree(event, eventMap))
}
}
return roots
}
private buildEventTree(event: any, eventMap: Map<string, any>): any {
const children = Array.from(eventMap.values())
.filter((e) => e.metadata.causationId === event.id)
.map((e) => this.buildEventTree(e, eventMap))
return {
...event,
children,
}
}
}
// Usage
const correlator = new EventCorrelator()
// Original event
await correlator.publishCorrelatedEvent(
{
type: 'order.created',
data: { orderId: '123' },
},
'order-123' // correlation ID
)
// Caused by order.created
await correlator.publishCorrelatedEvent(
{
type: 'payment.initiated',
data: { orderId: '123' },
},
'order-123', // same correlation ID
'evt_order_created_id' // causation ID
)Pattern 4: Fan-Out and Fan-In
One event triggers multiple services, then aggregate results.
import $, { db, on, send } from 'sdk.do'
import { eventBus } from './event-bus'
// Fan-Out: One event triggers multiple services
export class FanOutPattern {
async execute(event: any, services: string[]): Promise<void> {
// Publish fan-out initiated event
await eventBus.publish({
type: 'fanout.initiated',
data: {
originalEvent: event,
targetServices: services,
fanoutId: generateId(),
},
})
// Trigger all services in parallel
await Promise.all(
services.map((serviceId) =>
$.ServiceExecution.start({
serviceId,
inputs: event.data,
})
)
)
}
}
// Fan-In: Aggregate results from multiple events
export class FanInPattern {
private aggregations = new Map<string, any>()
async waitForAll(correlationId: string, expectedEventTypes: string[], timeout: number = 30000): Promise<any> {
return new Promise((resolve, reject) => {
const results: any = {}
const received = new Set<string>()
// Set timeout
const timeoutId = setTimeout(() => {
reject(new Error('Fan-in timeout'))
}, timeout)
// Subscribe to expected event types
for (const eventType of expectedEventTypes) {
eventBus.subscribe(eventType, async (event) => {
if (event.metadata.correlationId === correlationId) {
received.add(eventType)
results[eventType] = event.data
// Check if all events received
if (received.size === expectedEventTypes.length) {
clearTimeout(timeoutId)
resolve(results)
}
}
})
}
})
}
}
// Usage example: Multi-service data enrichment
export const fanOutFanInExample = async (leadData: any) => {
const correlationId = generateId()
// Fan-Out: Trigger multiple enrichment services
await Promise.all([
$.ServiceExecution.start({
serviceId: 'email-enrichment',
inputs: { ...leadData, correlationId },
}),
$.ServiceExecution.start({
serviceId: 'company-enrichment',
inputs: { ...leadData, correlationId },
}),
$.ServiceExecution.start({
serviceId: 'social-enrichment',
inputs: { ...leadData, correlationId },
}),
])
// Fan-In: Wait for all results
const fanIn = new FanInPattern()
const results = await fanIn.waitForAll(correlationId, ['email.enriched', 'company.enriched', 'social.enriched'], 60000)
// Aggregate results
return {
...leadData,
email: results['email.enriched'],
company: results['company.enriched'],
social: results['social.enriched'],
}
}
function generateId(): string {
return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}Error Handling
Dead Letter Queue
import $, { db } from 'sdk.do'
export class DeadLetterQueue {
async add(event: any, handler: string, error: Error): Promise<void> {
await db.create($.DeadLetterEvent, {
event,
handler,
error: error.message,
stack: error.stack,
failedAt: new Date(),
retryCount: 0,
})
}
async retry(deadLetterEventId: string): Promise<void> {
const dlEvent = await db.findOne($.DeadLetterEvent, { id: deadLetterEventId })
try {
// Republish event
await eventBus.publish(dlEvent.event)
// Mark as retried
await db.update($.DeadLetterEvent, deadLetterEventId, {
retryCount: dlEvent.retryCount + 1,
lastRetryAt: new Date(),
})
} catch (error) {
// Retry failed
await db.update($.DeadLetterEvent, deadLetterEventId, {
retryCount: dlEvent.retryCount + 1,
lastRetryAt: new Date(),
lastError: (error as Error).message,
})
}
}
async retryAll(maxRetries: number = 3): Promise<void> {
const dlEvents = await db.query($.DeadLetterEvent, {
where: {
retryCount: { $lt: maxRetries },
},
})
for (const dlEvent of dlEvents) {
await this.retry(dlEvent.id)
}
}
}Idempotent Event Handlers
import $, { db } from 'sdk.do'
export class IdempotentHandler {
private processedEvents = new Set<string>()
async handle(event: any, handler: (event: any) => Promise<void>): Promise<void> {
// Check if already processed
if (this.processedEvents.has(event.id)) {
console.log(`Event ${event.id} already processed, skipping`)
return
}
// Check database for processed events
const processed = await db.findOne($.ProcessedEvent, { eventId: event.id })
if (processed) {
this.processedEvents.add(event.id)
return
}
try {
// Process event
await handler(event)
// Mark as processed
await db.create($.ProcessedEvent, {
eventId: event.id,
eventType: event.type,
processedAt: new Date(),
})
this.processedEvents.add(event.id)
} catch (error) {
// Don't mark as processed if error
throw error
}
}
}
// Usage
const idempotentHandler = new IdempotentHandler()
eventBus.subscribe('order.created', async (event) => {
await idempotentHandler.handle(event, async (e) => {
// This will only execute once per event ID
await processOrder(e.data)
})
})Testing Event-Driven Systems
import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'
import { eventBus } from './event-bus'
describe('Event-Driven Architecture', () => {
it('should publish and subscribe to events', async () => {
const received: any[] = []
// Subscribe to event
eventBus.subscribe('test.event', async (event) => {
received.push(event)
})
// Publish event
await eventBus.publish({
type: 'test.event',
data: { message: 'Hello' },
})
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 100))
expect(received).toHaveLength(1)
expect(received[0].data.message).toBe('Hello')
})
it('should handle multiple subscribers', async () => {
const subscriber1Results: any[] = []
const subscriber2Results: any[] = []
eventBus.subscribe('multi.test', async (event) => {
subscriber1Results.push(event)
})
eventBus.subscribe('multi.test', async (event) => {
subscriber2Results.push(event)
})
await eventBus.publish({
type: 'multi.test',
data: { value: 42 },
})
await new Promise((resolve) => setTimeout(resolve, 100))
expect(subscriber1Results).toHaveLength(1)
expect(subscriber2Results).toHaveLength(1)
})
it('should correlate related events', async () => {
const correlationId = 'test-correlation-123'
await eventBus.publish({
type: 'order.created',
data: { orderId: '123' },
metadata: { correlationId },
})
await eventBus.publish({
type: 'payment.processed',
data: { orderId: '123' },
metadata: { correlationId },
})
const correlator = new EventCorrelator()
const chain = await correlator.getEventChain(correlationId)
expect(chain.length).toBeGreaterThan(0)
})
})Best Practices
- Event Design: Design events to be immutable and self-contained
- Idempotency: Make event handlers idempotent
- Event Versioning: Version events for backward compatibility
- Ordering: Don't assume event order unless guaranteed
- Error Handling: Use dead letter queues for failed events
- Correlation: Track related events with correlation IDs
- Monitoring: Monitor event flow and processing times
- Testing: Test event handlers in isolation
- Documentation: Document event schemas and contracts
- Replay: Design for event replay and debugging
Related Patterns
- Multi-Service Coordination - Parallel service execution
- Service Chaining - Sequential workflows
- Saga Pattern - Distributed transactions
Additional Resources
- Service Types - Understanding different service types
- Composition - Building composite services
- Best Practices - Service development guidelines
- API Reference - Complete API documentation
Service Chaining Patterns
Complete guide to sequential service execution patterns, output-to-input mapping, error recovery, conditional branching, and chain optimization strategies
Saga Pattern for Distributed Services
Complete guide to managing distributed transactions across services using the Saga pattern with compensating transactions, orchestration vs choreography, and error recovery strategies