.do
Patterns

Saga Pattern for Distributed Services

Complete guide to managing distributed transactions across services using the Saga pattern with compensating transactions, orchestration vs choreography, and error recovery strategies

Manage complex distributed transactions across multiple services with automatic rollback through compensating transactions, ensuring data consistency in microservices architectures.

Overview

The Saga pattern breaks a distributed transaction into a series of local transactions, each executed by a different service. If any step fails, the saga executes compensating transactions to undo the work of preceding steps, ensuring eventual consistency across services without requiring distributed locks or two-phase commits.

When to Use Saga Pattern

Use the Saga pattern when:

  • Distributed Transactions: Multiple services need to participate in a single business transaction
  • No 2PC: Two-phase commit is not available or not desired
  • Eventual Consistency: System can tolerate eventual consistency
  • Long-Running: Transactions span multiple services and time
  • Rollback Required: Failed transactions must be undone across services
  • Service Independence: Services should remain loosely coupled
  • Business Process: Complex multi-step business processes
  • Compensation Logic: Clear compensating actions can be defined

Avoid this pattern when:

  • Strong ACID guarantees are required
  • Compensating transactions are impossible to define
  • System is simple with few services
  • All operations can be in a single service
  • Read-only operations (no state changes)

Architecture

Orchestration vs Choreography

graph TB subgraph "Orchestration (Centralized)" O[Saga Orchestrator] --> S1[Service 1] O --> S2[Service 2] O --> S3[Service 3] S1 -.->|result| O S2 -.->|result| O S3 -.->|result| O end subgraph "Choreography (Decentralized)" SV1[Service 1] -->|event| SV2[Service 2] SV2 -->|event| SV3[Service 3] SV3 -.->|compensation| SV2 SV2 -.->|compensation| SV1 end

Core Implementation

Saga Orchestrator

import $, { db, on, send } from 'sdk.do'

export interface SagaStep {
  name: string
  serviceId: string
  inputMapper: (context: any) => any
  outputMapper?: (output: any) => any
  compensate: (context: any, stepResult: any) => Promise<void>
  retryable?: boolean
  timeout?: number
}

export interface SagaDefinition {
  name: string
  steps: SagaStep[]
  timeout?: number
}

export class SagaOrchestrator {
  async execute(saga: SagaDefinition, initialInput: any): Promise<{ success: boolean; result?: any; error?: string }> {
    const sagaExecution = await this.initializeSaga(saga, initialInput)
    const executedSteps: Array<{
      step: SagaStep
      result: any
      timestamp: Date
    }> = []

    try {
      // Execute each step sequentially
      for (const step of saga.steps) {
        const stepResult = await this.executeStep(step, sagaExecution.context, executedSteps)

        executedSteps.push({
          step,
          result: stepResult,
          timestamp: new Date(),
        })

        // Update saga context with step result
        sagaExecution.context = {
          ...sagaExecution.context,
          [step.name]: stepResult,
        }

        // Save checkpoint
        await this.saveCheckpoint(sagaExecution.id, {
          completedSteps: executedSteps.length,
          context: sagaExecution.context,
        })
      }

      // All steps completed successfully
      await this.completeSaga(sagaExecution.id)

      return {
        success: true,
        result: sagaExecution.context,
      }
    } catch (error) {
      // Saga failed - execute compensating transactions
      console.error('Saga failed, initiating rollback:', error)

      await this.rollback(sagaExecution.id, executedSteps)

      return {
        success: false,
        error: (error as Error).message,
      }
    }
  }

  private async executeStep(step: SagaStep, context: any, executedSteps: any[]): Promise<any> {
    const input = step.inputMapper(context)

    try {
      const execution = await $.ServiceExecution.start({
        serviceId: step.serviceId,
        inputs: input,
        timeout: step.timeout || 30000,
      })

      const result = await execution.waitForCompletion()

      const output = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs

      return output
    } catch (error) {
      // Step failed
      if (step.retryable) {
        // Retry step
        return await this.retryStep(step, context)
      }

      throw new Error(`Saga step "${step.name}" failed: ${(error as Error).message}`)
    }
  }

  private async retryStep(step: SagaStep, context: any): Promise<any> {
    const maxRetries = 3
    let lastError: Error | null = null

    for (let attempt = 1; attempt <= maxRetries; attempt++) {
      try {
        const input = step.inputMapper(context)

        const execution = await $.ServiceExecution.start({
          serviceId: step.serviceId,
          inputs: input,
          timeout: step.timeout || 30000,
        })

        const result = await execution.waitForCompletion()

        return step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
      } catch (error) {
        lastError = error as Error
        console.log(`Retry ${attempt}/${maxRetries} for step ${step.name}`)

        if (attempt < maxRetries) {
          await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 1000))
        }
      }
    }

    throw new Error(`Step ${step.name} failed after ${maxRetries} retries: ${lastError?.message}`)
  }

  private async rollback(sagaId: string, executedSteps: Array<{ step: SagaStep; result: any }>): Promise<void> {
    console.log(`Rolling back ${executedSteps.length} steps...`)

    // Execute compensating transactions in reverse order
    for (const { step, result } of executedSteps.reverse()) {
      try {
        console.log(`Compensating step: ${step.name}`)

        // Get current context
        const saga = await db.findOne($.SagaExecution, { id: sagaId })

        await step.compensate(saga.context, result)

        console.log(`Successfully compensated step: ${step.name}`)
      } catch (error) {
        // Compensation failed - log for manual intervention
        console.error(`CRITICAL: Failed to compensate step ${step.name}:`, error)

        await this.logCompensationFailure(sagaId, step, error as Error)
      }
    }

    await this.markSagaFailed(sagaId)
  }

  private async initializeSaga(saga: SagaDefinition, initialInput: any): Promise<any> {
    const sagaExecution = await db.create($.SagaExecution, {
      name: saga.name,
      status: 'running',
      context: initialInput,
      startedAt: new Date(),
    })

    return sagaExecution
  }

  private async saveCheckpoint(sagaId: string, checkpoint: any): Promise<void> {
    await db.update($.SagaExecution, sagaId, {
      checkpoint,
      lastCheckpointAt: new Date(),
    })
  }

  private async completeSaga(sagaId: string): Promise<void> {
    await db.update($.SagaExecution, sagaId, {
      status: 'completed',
      completedAt: new Date(),
    })
  }

  private async markSagaFailed(sagaId: string): Promise<void> {
    await db.update($.SagaExecution, sagaId, {
      status: 'failed',
      failedAt: new Date(),
    })
  }

  private async logCompensationFailure(sagaId: string, step: SagaStep, error: Error): Promise<void> {
    await db.create($.CompensationFailure, {
      sagaId,
      stepName: step.name,
      error: error.message,
      stack: error.stack,
      requiresManualIntervention: true,
      createdAt: new Date(),
    })

    // Send alert to operations team
    await send($.Alert.create, {
      severity: 'critical',
      title: 'Saga Compensation Failed',
      message: `Failed to compensate step "${step.name}" in saga ${sagaId}`,
      data: {
        sagaId,
        stepName: step.name,
        error: error.message,
      },
    })
  }
}

Example 1: E-Commerce Order Processing

Complete order processing saga with payment, inventory, and shipping.

import $, { db, on, send } from 'sdk.do'

export const orderProcessingSaga: SagaDefinition = {
  name: 'Order Processing Saga',
  steps: [
    {
      name: 'validate-payment',
      serviceId: 'payment-validator',
      inputMapper: (context) => ({
        customerId: context.customerId,
        amount: context.amount,
        paymentMethod: context.paymentMethod,
      }),
      outputMapper: (output) => ({
        paymentValidationId: output.validationId,
        isValid: output.isValid,
      }),
      compensate: async (context, stepResult) => {
        // Release payment validation hold
        await $.ServiceExecution.start({
          serviceId: 'payment-validation-releaser',
          inputs: {
            validationId: stepResult.paymentValidationId,
          },
        })
      },
      retryable: true,
    },

    {
      name: 'reserve-inventory',
      serviceId: 'inventory-reserver',
      inputMapper: (context) => ({
        items: context.items,
        orderId: context.orderId,
      }),
      outputMapper: (output) => ({
        reservationId: output.reservationId,
        reservedItems: output.items,
      }),
      compensate: async (context, stepResult) => {
        // Release inventory reservation
        await $.ServiceExecution.start({
          serviceId: 'inventory-releaser',
          inputs: {
            reservationId: stepResult.reservationId,
          },
        })
      },
      retryable: true,
    },

    {
      name: 'charge-payment',
      serviceId: 'payment-processor',
      inputMapper: (context) => ({
        customerId: context.customerId,
        amount: context.amount,
        paymentMethod: context.paymentMethod,
        orderId: context.orderId,
      }),
      outputMapper: (output) => ({
        chargeId: output.chargeId,
        transactionId: output.transactionId,
      }),
      compensate: async (context, stepResult) => {
        // Refund payment
        await $.ServiceExecution.start({
          serviceId: 'payment-refunder',
          inputs: {
            chargeId: stepResult.chargeId,
            amount: context.amount,
            reason: 'order_cancelled',
          },
        })
      },
      retryable: false, // Don't retry payment charges
    },

    {
      name: 'commit-inventory',
      serviceId: 'inventory-committer',
      inputMapper: (context) => ({
        reservationId: context['reserve-inventory'].reservationId,
      }),
      outputMapper: (output) => ({
        committed: output.success,
      }),
      compensate: async (context, stepResult) => {
        // Return items to inventory
        await $.ServiceExecution.start({
          serviceId: 'inventory-returner',
          inputs: {
            items: context['reserve-inventory'].reservedItems,
            orderId: context.orderId,
          },
        })
      },
      retryable: true,
    },

    {
      name: 'create-shipment',
      serviceId: 'shipment-creator',
      inputMapper: (context) => ({
        orderId: context.orderId,
        items: context.items,
        shippingAddress: context.shippingAddress,
      }),
      outputMapper: (output) => ({
        shipmentId: output.shipmentId,
        trackingNumber: output.trackingNumber,
        carrier: output.carrier,
      }),
      compensate: async (context, stepResult) => {
        // Cancel shipment
        await $.ServiceExecution.start({
          serviceId: 'shipment-canceller',
          inputs: {
            shipmentId: stepResult.shipmentId,
          },
        })
      },
      retryable: true,
    },

    {
      name: 'send-confirmation',
      serviceId: 'email-sender',
      inputMapper: (context) => ({
        to: context.customerEmail,
        template: 'order-confirmation',
        data: {
          orderId: context.orderId,
          trackingNumber: context['create-shipment'].trackingNumber,
          items: context.items,
        },
      }),
      outputMapper: (output) => ({
        emailSent: output.sent,
      }),
      compensate: async (context, stepResult) => {
        // Send cancellation email
        await $.ServiceExecution.start({
          serviceId: 'email-sender',
          inputs: {
            to: context.customerEmail,
            template: 'order-cancelled',
            data: {
              orderId: context.orderId,
              reason: 'processing_error',
            },
          },
        })
      },
      retryable: true,
    },
  ],
}

// Execute the saga
export const processOrder = async (orderData: {
  orderId: string
  customerId: string
  customerEmail: string
  items: any[]
  amount: number
  paymentMethod: string
  shippingAddress: any
}) => {
  const orchestrator = new SagaOrchestrator()

  const result = await orchestrator.execute(orderProcessingSaga, orderData)

  if (result.success) {
    console.log('Order processed successfully:', result.result)

    // Update order status
    await db.update($.Order, orderData.orderId, {
      status: 'completed',
      completedAt: new Date(),
    })
  } else {
    console.error('Order processing failed:', result.error)

    // Update order status
    await db.update($.Order, orderData.orderId, {
      status: 'failed',
      failureReason: result.error,
      failedAt: new Date(),
    })
  }

  return result
}

Example 2: Multi-Service Account Creation

Create accounts across multiple services with rollback.

import $, { db, on, send } from 'sdk.do'

export const accountCreationSaga: SagaDefinition = {
  name: 'Account Creation Saga',
  steps: [
    {
      name: 'create-user-profile',
      serviceId: 'user-profile-creator',
      inputMapper: (context) => ({
        email: context.email,
        name: context.name,
        metadata: context.metadata,
      }),
      outputMapper: (output) => ({
        userId: output.userId,
      }),
      compensate: async (context, stepResult) => {
        // Delete user profile
        await $.ServiceExecution.start({
          serviceId: 'user-profile-deleter',
          inputs: {
            userId: stepResult.userId,
          },
        })
      },
    },

    {
      name: 'create-auth-account',
      serviceId: 'auth-account-creator',
      inputMapper: (context) => ({
        userId: context['create-user-profile'].userId,
        email: context.email,
        password: context.password,
      }),
      outputMapper: (output) => ({
        authId: output.authId,
      }),
      compensate: async (context, stepResult) => {
        // Delete auth account
        await $.ServiceExecution.start({
          serviceId: 'auth-account-deleter',
          inputs: {
            authId: stepResult.authId,
          },
        })
      },
    },

    {
      name: 'create-billing-account',
      serviceId: 'billing-account-creator',
      inputMapper: (context) => ({
        userId: context['create-user-profile'].userId,
        email: context.email,
        plan: context.plan,
      }),
      outputMapper: (output) => ({
        billingAccountId: output.accountId,
      }),
      compensate: async (context, stepResult) => {
        // Delete billing account
        await $.ServiceExecution.start({
          serviceId: 'billing-account-deleter',
          inputs: {
            accountId: stepResult.billingAccountId,
          },
        })
      },
    },

    {
      name: 'provision-workspace',
      serviceId: 'workspace-provisioner',
      inputMapper: (context) => ({
        userId: context['create-user-profile'].userId,
        workspaceName: context.workspaceName,
      }),
      outputMapper: (output) => ({
        workspaceId: output.workspaceId,
      }),
      compensate: async (context, stepResult) => {
        // Delete workspace
        await $.ServiceExecution.start({
          serviceId: 'workspace-deleter',
          inputs: {
            workspaceId: stepResult.workspaceId,
          },
        })
      },
    },

    {
      name: 'setup-notifications',
      serviceId: 'notification-setup',
      inputMapper: (context) => ({
        userId: context['create-user-profile'].userId,
        email: context.email,
        preferences: context.notificationPreferences || {},
      }),
      outputMapper: (output) => ({
        notificationConfigured: output.success,
      }),
      compensate: async (context, stepResult) => {
        // Remove notification configuration
        await $.ServiceExecution.start({
          serviceId: 'notification-cleanup',
          inputs: {
            userId: context['create-user-profile'].userId,
          },
        })
      },
    },

    {
      name: 'send-welcome-email',
      serviceId: 'email-sender',
      inputMapper: (context) => ({
        to: context.email,
        template: 'welcome',
        data: {
          name: context.name,
          workspaceId: context['provision-workspace'].workspaceId,
        },
      }),
      outputMapper: (output) => ({
        emailSent: output.sent,
      }),
      compensate: async (context, stepResult) => {
        // No compensation needed for welcome email
        console.log('Welcome email was sent but account creation failed')
      },
    },
  ],
}

// Execute account creation saga
export const createAccount = async (accountData: {
  email: string
  name: string
  password: string
  plan: string
  workspaceName: string
  metadata?: any
  notificationPreferences?: any
}) => {
  const orchestrator = new SagaOrchestrator()

  const result = await orchestrator.execute(accountCreationSaga, accountData)

  if (result.success) {
    console.log('Account created successfully:', result.result)

    return {
      success: true,
      userId: result.result['create-user-profile'].userId,
      workspaceId: result.result['provision-workspace'].workspaceId,
    }
  } else {
    console.error('Account creation failed:', result.error)

    return {
      success: false,
      error: result.error,
    }
  }
}

Example 3: Travel Booking Saga

Book flights, hotels, and car rentals with coordinated rollback.

import $, { db, on, send } from 'sdk.do'

export const travelBookingSaga: SagaDefinition = {
  name: 'Travel Booking Saga',
  steps: [
    {
      name: 'reserve-flight',
      serviceId: 'flight-reserver',
      inputMapper: (context) => ({
        departureCity: context.departureCity,
        arrivalCity: context.arrivalCity,
        departureDate: context.departureDate,
        returnDate: context.returnDate,
        passengers: context.passengers,
      }),
      outputMapper: (output) => ({
        flightReservationId: output.reservationId,
        flightPrice: output.price,
        flightDetails: output.details,
      }),
      compensate: async (context, stepResult) => {
        // Cancel flight reservation
        await $.ServiceExecution.start({
          serviceId: 'flight-canceller',
          inputs: {
            reservationId: stepResult.flightReservationId,
          },
        })
      },
      timeout: 60000,
    },

    {
      name: 'reserve-hotel',
      serviceId: 'hotel-reserver',
      inputMapper: (context) => ({
        city: context.arrivalCity,
        checkInDate: context.departureDate,
        checkOutDate: context.returnDate,
        guests: context.passengers.length,
        roomType: context.roomType,
      }),
      outputMapper: (output) => ({
        hotelReservationId: output.reservationId,
        hotelPrice: output.price,
        hotelDetails: output.details,
      }),
      compensate: async (context, stepResult) => {
        // Cancel hotel reservation
        await $.ServiceExecution.start({
          serviceId: 'hotel-canceller',
          inputs: {
            reservationId: stepResult.hotelReservationId,
          },
        })
      },
      timeout: 60000,
    },

    {
      name: 'reserve-car',
      serviceId: 'car-reserver',
      inputMapper: (context) => ({
        pickupLocation: context.arrivalCity,
        pickupDate: context.departureDate,
        returnDate: context.returnDate,
        carType: context.carType || 'economy',
      }),
      outputMapper: (output) => ({
        carReservationId: output.reservationId,
        carPrice: output.price,
        carDetails: output.details,
      }),
      compensate: async (context, stepResult) => {
        // Cancel car reservation
        await $.ServiceExecution.start({
          serviceId: 'car-canceller',
          inputs: {
            reservationId: stepResult.carReservationId,
          },
        })
      },
      timeout: 60000,
    },

    {
      name: 'calculate-total',
      serviceId: 'price-calculator',
      inputMapper: (context) => ({
        flightPrice: context['reserve-flight'].flightPrice,
        hotelPrice: context['reserve-hotel'].hotelPrice,
        carPrice: context['reserve-car'].carPrice,
        taxes: true,
        fees: true,
      }),
      outputMapper: (output) => ({
        totalPrice: output.total,
        breakdown: output.breakdown,
      }),
      compensate: async () => {
        // No compensation needed for calculation
      },
    },

    {
      name: 'process-payment',
      serviceId: 'payment-processor',
      inputMapper: (context) => ({
        amount: context['calculate-total'].totalPrice,
        currency: 'USD',
        paymentMethod: context.paymentMethod,
        customerId: context.customerId,
      }),
      outputMapper: (output) => ({
        paymentId: output.paymentId,
        transactionId: output.transactionId,
      }),
      compensate: async (context, stepResult) => {
        // Refund payment
        await $.ServiceExecution.start({
          serviceId: 'payment-refunder',
          inputs: {
            paymentId: stepResult.paymentId,
            amount: context['calculate-total'].totalPrice,
            reason: 'booking_cancelled',
          },
        })
      },
    },

    {
      name: 'confirm-flight',
      serviceId: 'flight-confirmer',
      inputMapper: (context) => ({
        reservationId: context['reserve-flight'].flightReservationId,
        paymentId: context['process-payment'].paymentId,
      }),
      outputMapper: (output) => ({
        flightConfirmationNumber: output.confirmationNumber,
      }),
      compensate: async (context, stepResult) => {
        // Cancel confirmed flight (may incur fees)
        await $.ServiceExecution.start({
          serviceId: 'flight-canceller',
          inputs: {
            confirmationNumber: stepResult.flightConfirmationNumber,
          },
        })
      },
    },

    {
      name: 'confirm-hotel',
      serviceId: 'hotel-confirmer',
      inputMapper: (context) => ({
        reservationId: context['reserve-hotel'].hotelReservationId,
        paymentId: context['process-payment'].paymentId,
      }),
      outputMapper: (output) => ({
        hotelConfirmationNumber: output.confirmationNumber,
      }),
      compensate: async (context, stepResult) => {
        // Cancel confirmed hotel
        await $.ServiceExecution.start({
          serviceId: 'hotel-canceller',
          inputs: {
            confirmationNumber: stepResult.hotelConfirmationNumber,
          },
        })
      },
    },

    {
      name: 'confirm-car',
      serviceId: 'car-confirmer',
      inputMapper: (context) => ({
        reservationId: context['reserve-car'].carReservationId,
        paymentId: context['process-payment'].paymentId,
      }),
      outputMapper: (output) => ({
        carConfirmationNumber: output.confirmationNumber,
      }),
      compensate: async (context, stepResult) => {
        // Cancel confirmed car rental
        await $.ServiceExecution.start({
          serviceId: 'car-canceller',
          inputs: {
            confirmationNumber: stepResult.carConfirmationNumber,
          },
        })
      },
    },

    {
      name: 'send-itinerary',
      serviceId: 'itinerary-sender',
      inputMapper: (context) => ({
        email: context.customerEmail,
        flightDetails: context['reserve-flight'].flightDetails,
        flightConfirmation: context['confirm-flight'].flightConfirmationNumber,
        hotelDetails: context['reserve-hotel'].hotelDetails,
        hotelConfirmation: context['confirm-hotel'].hotelConfirmationNumber,
        carDetails: context['reserve-car'].carDetails,
        carConfirmation: context['confirm-car'].carConfirmationNumber,
        totalPrice: context['calculate-total'].totalPrice,
      }),
      outputMapper: (output) => ({
        itinerarySent: output.sent,
      }),
      compensate: async (context) => {
        // Send cancellation notice
        await $.ServiceExecution.start({
          serviceId: 'cancellation-sender',
          inputs: {
            email: context.customerEmail,
            bookingId: context.bookingId,
          },
        })
      },
    },
  ],
}

// Execute travel booking saga
export const bookTravel = async (bookingData: {
  customerId: string
  customerEmail: string
  departureCity: string
  arrivalCity: string
  departureDate: string
  returnDate: string
  passengers: any[]
  roomType: string
  carType?: string
  paymentMethod: string
  bookingId: string
}) => {
  const orchestrator = new SagaOrchestrator()

  const result = await orchestrator.execute(travelBookingSaga, bookingData)

  if (result.success) {
    console.log('Travel booking completed successfully')

    return {
      success: true,
      bookingId: bookingData.bookingId,
      confirmations: {
        flight: result.result['confirm-flight'].flightConfirmationNumber,
        hotel: result.result['confirm-hotel'].hotelConfirmationNumber,
        car: result.result['confirm-car'].carConfirmationNumber,
      },
      totalPrice: result.result['calculate-total'].totalPrice,
    }
  } else {
    console.error('Travel booking failed:', result.error)

    return {
      success: false,
      error: result.error,
    }
  }
}

Choreography Pattern

Saga implementation using event-driven choreography instead of central orchestration.

import $, { db, on, send } from 'sdk.do'
import { eventBus } from './event-bus'

// Define saga steps as event handlers

// Step 1: Order created → Reserve inventory
on($.Order.created, async (order) => {
  try {
    const reservation = await $.ServiceExecution.start({
      serviceId: 'inventory-reserver',
      inputs: {
        orderId: order.id,
        items: order.items,
      },
    }).then((e) => e.waitForCompletion())

    // Emit success event
    await send($.Event.publish, {
      type: 'inventory.reserved',
      data: {
        orderId: order.id,
        reservationId: reservation.outputs.reservationId,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  } catch (error) {
    // Emit failure event
    await send($.Event.publish, {
      type: 'inventory.reservation.failed',
      data: {
        orderId: order.id,
        error: (error as Error).message,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  }
})

// Step 2: Inventory reserved → Process payment
on('inventory.reserved', async (event) => {
  const order = await db.findOne($.Order, { id: event.data.orderId })

  try {
    const payment = await $.ServiceExecution.start({
      serviceId: 'payment-processor',
      inputs: {
        orderId: order.id,
        amount: order.total,
        paymentMethod: order.paymentMethod,
      },
    }).then((e) => e.waitForCompletion())

    // Emit success event
    await send($.Event.publish, {
      type: 'payment.processed',
      data: {
        orderId: order.id,
        paymentId: payment.outputs.paymentId,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  } catch (error) {
    // Compensate: Release inventory
    await $.ServiceExecution.start({
      serviceId: 'inventory-releaser',
      inputs: {
        reservationId: event.data.reservationId,
      },
    })

    // Emit failure event
    await send($.Event.publish, {
      type: 'payment.failed',
      data: {
        orderId: order.id,
        error: (error as Error).message,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  }
})

// Step 3: Payment processed → Create shipment
on('payment.processed', async (event) => {
  const order = await db.findOne($.Order, { id: event.data.orderId })

  try {
    const shipment = await $.ServiceExecution.start({
      serviceId: 'shipment-creator',
      inputs: {
        orderId: order.id,
        items: order.items,
        address: order.shippingAddress,
      },
    }).then((e) => e.waitForCompletion())

    // Emit success event
    await send($.Event.publish, {
      type: 'shipment.created',
      data: {
        orderId: order.id,
        shipmentId: shipment.outputs.shipmentId,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  } catch (error) {
    // Compensate: Refund payment and release inventory
    await Promise.all([
      $.ServiceExecution.start({
        serviceId: 'payment-refunder',
        inputs: {
          paymentId: event.data.paymentId,
        },
      }),
      $.ServiceExecution.start({
        serviceId: 'inventory-releaser',
        inputs: {
          orderId: order.id,
        },
      }),
    ])

    // Emit failure event
    await send($.Event.publish, {
      type: 'shipment.failed',
      data: {
        orderId: order.id,
        error: (error as Error).message,
      },
      metadata: {
        correlationId: order.id,
      },
    })
  }
})

// Handle compensation events
on('inventory.reservation.failed', async (event) => {
  // Update order status
  await db.update($.Order, event.data.orderId, {
    status: 'failed',
    failureReason: 'inventory_unavailable',
  })
})

on('payment.failed', async (event) => {
  // Update order status
  await db.update($.Order, event.data.orderId, {
    status: 'failed',
    failureReason: 'payment_failed',
  })
})

on('shipment.failed', async (event) => {
  // Update order status
  await db.update($.Order, event.data.orderId, {
    status: 'failed',
    failureReason: 'shipment_creation_failed',
  })
})

Advanced Patterns

Pattern 1: Saga State Machine

Model saga as a state machine for complex workflows.

import $, { db } from 'sdk.do'

export class SagaStateMachine {
  private state: string = 'initial'
  private context: any = {}

  constructor(
    private definition: {
      states: Record<
        string,
        {
          onEnter: (context: any) => Promise<any>
          onExit?: (context: any) => Promise<void>
          transitions: Record<string, string>
          compensation?: (context: any) => Promise<void>
        }
      >
      initialState: string
    }
  ) {
    this.state = definition.initialState
  }

  async execute(initialContext: any): Promise<any> {
    this.context = initialContext
    const stateHistory: string[] = []

    try {
      while (this.state !== 'completed' && this.state !== 'failed') {
        const currentState = this.definition.states[this.state]

        if (!currentState) {
          throw new Error(`Invalid state: ${this.state}`)
        }

        stateHistory.push(this.state)

        // Execute state
        const result = await currentState.onEnter(this.context)

        // Update context
        this.context = { ...this.context, ...result }

        // Determine next state
        const nextState = this.determineNextState(currentState, result)

        // Exit current state
        if (currentState.onExit) {
          await currentState.onExit(this.context)
        }

        // Transition to next state
        this.state = nextState
      }

      return {
        success: this.state === 'completed',
        context: this.context,
      }
    } catch (error) {
      // Compensate executed states
      await this.compensate(stateHistory)

      throw error
    }
  }

  private determineNextState(currentState: any, result: any): string {
    // Use result to determine transition
    for (const [condition, nextState] of Object.entries(currentState.transitions)) {
      if (this.evaluateCondition(condition, result)) {
        return nextState as string
      }
    }

    throw new Error('No valid transition found')
  }

  private evaluateCondition(condition: string, result: any): boolean {
    // Simple condition evaluation
    if (condition === 'success' && result.success) return true
    if (condition === 'failure' && !result.success) return true
    if (condition === 'default') return true

    return false
  }

  private async compensate(stateHistory: string[]): Promise<void> {
    // Compensate in reverse order
    for (const stateName of stateHistory.reverse()) {
      const state = this.definition.states[stateName]

      if (state.compensation) {
        try {
          await state.compensation(this.context)
        } catch (error) {
          console.error(`Failed to compensate state ${stateName}:`, error)
        }
      }
    }
  }
}

// Usage
const orderStateMachine = new SagaStateMachine({
  initialState: 'validating',
  states: {
    validating: {
      onEnter: async (context) => {
        // Validate order
        const isValid = await validateOrder(context.order)
        return { success: isValid }
      },
      transitions: {
        success: 'reserving-inventory',
        failure: 'failed',
      },
    },
    'reserving-inventory': {
      onEnter: async (context) => {
        const reservation = await reserveInventory(context.order)
        return { success: true, reservationId: reservation.id }
      },
      transitions: {
        success: 'processing-payment',
        failure: 'failed',
      },
      compensation: async (context) => {
        await releaseInventory(context.reservationId)
      },
    },
    'processing-payment': {
      onEnter: async (context) => {
        const payment = await processPayment(context.order)
        return { success: true, paymentId: payment.id }
      },
      transitions: {
        success: 'completed',
        failure: 'failed',
      },
      compensation: async (context) => {
        await refundPayment(context.paymentId)
      },
    },
    completed: {
      onEnter: async () => ({ success: true }),
      transitions: {},
    },
    failed: {
      onEnter: async () => ({ success: false }),
      transitions: {},
    },
  },
})

async function validateOrder(order: any): Promise<boolean> {
  return true
}

async function reserveInventory(order: any): Promise<any> {
  return { id: 'res_123' }
}

async function releaseInventory(reservationId: string): Promise<void> {}

async function processPayment(order: any): Promise<any> {
  return { id: 'pay_123' }
}

async function refundPayment(paymentId: string): Promise<void> {}

Pattern 2: Saga Timeout Handling

Handle timeouts at saga and step level.

import $, { db } from 'sdk.do'

export class TimeoutAwareSagaOrchestrator extends SagaOrchestrator {
  async executeWithTimeout(saga: SagaDefinition, initialInput: any, sagaTimeout: number): Promise<any> {
    return Promise.race([
      this.execute(saga, initialInput),
      new Promise((_, reject) => setTimeout(() => reject(new Error('Saga timeout exceeded')), sagaTimeout)),
    ])
  }
}

Pattern 3: Saga Monitoring and Observability

Track saga execution with metrics and logging.

import $, { db, send } from 'sdk.do'

export class ObservableSagaOrchestrator extends SagaOrchestrator {
  async execute(saga: SagaDefinition, initialInput: any): Promise<any> {
    const startTime = Date.now()
    const sagaId = generateId()

    // Log saga start
    await send($.Log.create, {
      level: 'info',
      message: `Saga started: ${saga.name}`,
      metadata: {
        sagaId,
        sagaName: saga.name,
      },
    })

    try {
      const result = await super.execute(saga, initialInput)

      // Log saga completion
      await send($.Log.create, {
        level: 'info',
        message: `Saga completed: ${saga.name}`,
        metadata: {
          sagaId,
          sagaName: saga.name,
          duration: Date.now() - startTime,
        },
      })

      // Record metrics
      await send($.Metric.record, {
        name: 'saga.execution.success',
        value: 1,
        tags: {
          sagaName: saga.name,
          duration: Date.now() - startTime,
        },
      })

      return result
    } catch (error) {
      // Log saga failure
      await send($.Log.create, {
        level: 'error',
        message: `Saga failed: ${saga.name}`,
        metadata: {
          sagaId,
          sagaName: saga.name,
          error: (error as Error).message,
          duration: Date.now() - startTime,
        },
      })

      // Record failure metric
      await send($.Metric.record, {
        name: 'saga.execution.failure',
        value: 1,
        tags: {
          sagaName: saga.name,
          error: (error as Error).message,
        },
      })

      throw error
    }
  }
}

function generateId(): string {
  return `saga_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

Testing Saga Patterns

import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'

describe('Saga Pattern', () => {
  it('should execute all steps successfully', async () => {
    const mockSaga: SagaDefinition = {
      name: 'Test Saga',
      steps: [
        {
          name: 'step1',
          serviceId: 'service-1',
          inputMapper: (ctx) => ctx,
          compensate: async () => {},
        },
        {
          name: 'step2',
          serviceId: 'service-2',
          inputMapper: (ctx) => ctx,
          compensate: async () => {},
        },
      ],
    }

    const orchestrator = new SagaOrchestrator()
    const result = await orchestrator.execute(mockSaga, { data: 'test' })

    expect(result.success).toBe(true)
  })

  it('should rollback on failure', async () => {
    const compensations: string[] = []

    const mockSaga: SagaDefinition = {
      name: 'Failing Saga',
      steps: [
        {
          name: 'step1',
          serviceId: 'service-1',
          inputMapper: (ctx) => ctx,
          compensate: async () => {
            compensations.push('step1')
          },
        },
        {
          name: 'failing-step',
          serviceId: 'failing-service',
          inputMapper: (ctx) => ctx,
          compensate: async () => {},
        },
      ],
    }

    const orchestrator = new SagaOrchestrator()
    const result = await orchestrator.execute(mockSaga, { data: 'test' })

    expect(result.success).toBe(false)
    expect(compensations).toContain('step1')
  })

  it('should handle compensation failures', async () => {
    const mockSaga: SagaDefinition = {
      name: 'Saga with failing compensation',
      steps: [
        {
          name: 'step1',
          serviceId: 'service-1',
          inputMapper: (ctx) => ctx,
          compensate: async () => {
            throw new Error('Compensation failed')
          },
        },
      ],
    }

    const orchestrator = new SagaOrchestrator()
    const result = await orchestrator.execute(mockSaga, { data: 'test' })

    // Should log compensation failure
    const failures = await db.query($.CompensationFailure, {
      where: { sagaId: result.sagaId },
    })

    expect(failures.length).toBeGreaterThan(0)
  })
})

Best Practices

  1. Idempotency: Make all steps and compensations idempotent
  2. Compensation Design: Design careful and complete compensation logic
  3. Timeout Management: Set appropriate timeouts for saga and steps
  4. State Persistence: Save saga state for recovery
  5. Monitoring: Track saga execution and failures
  6. Manual Intervention: Plan for compensation failures
  7. Testing: Test both success and failure paths
  8. Documentation: Document saga flows and compensation logic
  9. Isolation: Keep saga steps isolated and independent
  10. Error Handling: Handle errors gracefully at each step

Orchestration vs Choreography

AspectOrchestrationChoreography
ControlCentralizedDecentralized
ComplexityEasier to understandHarder to track
CouplingHigher couplingLower coupling
MonitoringEasier to monitorHarder to monitor
FlexibilityLess flexibleMore flexible
Use WhenComplex workflowsSimple, event-driven

Additional Resources