Saga Pattern for Distributed Services
Complete guide to managing distributed transactions across services using the Saga pattern with compensating transactions, orchestration vs choreography, and error recovery strategies
Manage complex distributed transactions across multiple services with automatic rollback through compensating transactions, ensuring data consistency in microservices architectures.
Overview
The Saga pattern breaks a distributed transaction into a series of local transactions, each executed by a different service. If any step fails, the saga executes compensating transactions to undo the work of preceding steps, ensuring eventual consistency across services without requiring distributed locks or two-phase commits.
When to Use Saga Pattern
Use the Saga pattern when:
- Distributed Transactions: Multiple services need to participate in a single business transaction
- No 2PC: Two-phase commit is not available or not desired
- Eventual Consistency: System can tolerate eventual consistency
- Long-Running: Transactions span multiple services and time
- Rollback Required: Failed transactions must be undone across services
- Service Independence: Services should remain loosely coupled
- Business Process: Complex multi-step business processes
- Compensation Logic: Clear compensating actions can be defined
Avoid this pattern when:
- Strong ACID guarantees are required
- Compensating transactions are impossible to define
- System is simple with few services
- All operations can be in a single service
- Read-only operations (no state changes)
Architecture
Orchestration vs Choreography
Core Implementation
Saga Orchestrator
import $, { db, on, send } from 'sdk.do'
export interface SagaStep {
name: string
serviceId: string
inputMapper: (context: any) => any
outputMapper?: (output: any) => any
compensate: (context: any, stepResult: any) => Promise<void>
retryable?: boolean
timeout?: number
}
export interface SagaDefinition {
name: string
steps: SagaStep[]
timeout?: number
}
export class SagaOrchestrator {
async execute(saga: SagaDefinition, initialInput: any): Promise<{ success: boolean; result?: any; error?: string }> {
const sagaExecution = await this.initializeSaga(saga, initialInput)
const executedSteps: Array<{
step: SagaStep
result: any
timestamp: Date
}> = []
try {
// Execute each step sequentially
for (const step of saga.steps) {
const stepResult = await this.executeStep(step, sagaExecution.context, executedSteps)
executedSteps.push({
step,
result: stepResult,
timestamp: new Date(),
})
// Update saga context with step result
sagaExecution.context = {
...sagaExecution.context,
[step.name]: stepResult,
}
// Save checkpoint
await this.saveCheckpoint(sagaExecution.id, {
completedSteps: executedSteps.length,
context: sagaExecution.context,
})
}
// All steps completed successfully
await this.completeSaga(sagaExecution.id)
return {
success: true,
result: sagaExecution.context,
}
} catch (error) {
// Saga failed - execute compensating transactions
console.error('Saga failed, initiating rollback:', error)
await this.rollback(sagaExecution.id, executedSteps)
return {
success: false,
error: (error as Error).message,
}
}
}
private async executeStep(step: SagaStep, context: any, executedSteps: any[]): Promise<any> {
const input = step.inputMapper(context)
try {
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: input,
timeout: step.timeout || 30000,
})
const result = await execution.waitForCompletion()
const output = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
return output
} catch (error) {
// Step failed
if (step.retryable) {
// Retry step
return await this.retryStep(step, context)
}
throw new Error(`Saga step "${step.name}" failed: ${(error as Error).message}`)
}
}
private async retryStep(step: SagaStep, context: any): Promise<any> {
const maxRetries = 3
let lastError: Error | null = null
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const input = step.inputMapper(context)
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: input,
timeout: step.timeout || 30000,
})
const result = await execution.waitForCompletion()
return step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
} catch (error) {
lastError = error as Error
console.log(`Retry ${attempt}/${maxRetries} for step ${step.name}`)
if (attempt < maxRetries) {
await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempt) * 1000))
}
}
}
throw new Error(`Step ${step.name} failed after ${maxRetries} retries: ${lastError?.message}`)
}
private async rollback(sagaId: string, executedSteps: Array<{ step: SagaStep; result: any }>): Promise<void> {
console.log(`Rolling back ${executedSteps.length} steps...`)
// Execute compensating transactions in reverse order
for (const { step, result } of executedSteps.reverse()) {
try {
console.log(`Compensating step: ${step.name}`)
// Get current context
const saga = await db.findOne($.SagaExecution, { id: sagaId })
await step.compensate(saga.context, result)
console.log(`Successfully compensated step: ${step.name}`)
} catch (error) {
// Compensation failed - log for manual intervention
console.error(`CRITICAL: Failed to compensate step ${step.name}:`, error)
await this.logCompensationFailure(sagaId, step, error as Error)
}
}
await this.markSagaFailed(sagaId)
}
private async initializeSaga(saga: SagaDefinition, initialInput: any): Promise<any> {
const sagaExecution = await db.create($.SagaExecution, {
name: saga.name,
status: 'running',
context: initialInput,
startedAt: new Date(),
})
return sagaExecution
}
private async saveCheckpoint(sagaId: string, checkpoint: any): Promise<void> {
await db.update($.SagaExecution, sagaId, {
checkpoint,
lastCheckpointAt: new Date(),
})
}
private async completeSaga(sagaId: string): Promise<void> {
await db.update($.SagaExecution, sagaId, {
status: 'completed',
completedAt: new Date(),
})
}
private async markSagaFailed(sagaId: string): Promise<void> {
await db.update($.SagaExecution, sagaId, {
status: 'failed',
failedAt: new Date(),
})
}
private async logCompensationFailure(sagaId: string, step: SagaStep, error: Error): Promise<void> {
await db.create($.CompensationFailure, {
sagaId,
stepName: step.name,
error: error.message,
stack: error.stack,
requiresManualIntervention: true,
createdAt: new Date(),
})
// Send alert to operations team
await send($.Alert.create, {
severity: 'critical',
title: 'Saga Compensation Failed',
message: `Failed to compensate step "${step.name}" in saga ${sagaId}`,
data: {
sagaId,
stepName: step.name,
error: error.message,
},
})
}
}Example 1: E-Commerce Order Processing
Complete order processing saga with payment, inventory, and shipping.
import $, { db, on, send } from 'sdk.do'
export const orderProcessingSaga: SagaDefinition = {
name: 'Order Processing Saga',
steps: [
{
name: 'validate-payment',
serviceId: 'payment-validator',
inputMapper: (context) => ({
customerId: context.customerId,
amount: context.amount,
paymentMethod: context.paymentMethod,
}),
outputMapper: (output) => ({
paymentValidationId: output.validationId,
isValid: output.isValid,
}),
compensate: async (context, stepResult) => {
// Release payment validation hold
await $.ServiceExecution.start({
serviceId: 'payment-validation-releaser',
inputs: {
validationId: stepResult.paymentValidationId,
},
})
},
retryable: true,
},
{
name: 'reserve-inventory',
serviceId: 'inventory-reserver',
inputMapper: (context) => ({
items: context.items,
orderId: context.orderId,
}),
outputMapper: (output) => ({
reservationId: output.reservationId,
reservedItems: output.items,
}),
compensate: async (context, stepResult) => {
// Release inventory reservation
await $.ServiceExecution.start({
serviceId: 'inventory-releaser',
inputs: {
reservationId: stepResult.reservationId,
},
})
},
retryable: true,
},
{
name: 'charge-payment',
serviceId: 'payment-processor',
inputMapper: (context) => ({
customerId: context.customerId,
amount: context.amount,
paymentMethod: context.paymentMethod,
orderId: context.orderId,
}),
outputMapper: (output) => ({
chargeId: output.chargeId,
transactionId: output.transactionId,
}),
compensate: async (context, stepResult) => {
// Refund payment
await $.ServiceExecution.start({
serviceId: 'payment-refunder',
inputs: {
chargeId: stepResult.chargeId,
amount: context.amount,
reason: 'order_cancelled',
},
})
},
retryable: false, // Don't retry payment charges
},
{
name: 'commit-inventory',
serviceId: 'inventory-committer',
inputMapper: (context) => ({
reservationId: context['reserve-inventory'].reservationId,
}),
outputMapper: (output) => ({
committed: output.success,
}),
compensate: async (context, stepResult) => {
// Return items to inventory
await $.ServiceExecution.start({
serviceId: 'inventory-returner',
inputs: {
items: context['reserve-inventory'].reservedItems,
orderId: context.orderId,
},
})
},
retryable: true,
},
{
name: 'create-shipment',
serviceId: 'shipment-creator',
inputMapper: (context) => ({
orderId: context.orderId,
items: context.items,
shippingAddress: context.shippingAddress,
}),
outputMapper: (output) => ({
shipmentId: output.shipmentId,
trackingNumber: output.trackingNumber,
carrier: output.carrier,
}),
compensate: async (context, stepResult) => {
// Cancel shipment
await $.ServiceExecution.start({
serviceId: 'shipment-canceller',
inputs: {
shipmentId: stepResult.shipmentId,
},
})
},
retryable: true,
},
{
name: 'send-confirmation',
serviceId: 'email-sender',
inputMapper: (context) => ({
to: context.customerEmail,
template: 'order-confirmation',
data: {
orderId: context.orderId,
trackingNumber: context['create-shipment'].trackingNumber,
items: context.items,
},
}),
outputMapper: (output) => ({
emailSent: output.sent,
}),
compensate: async (context, stepResult) => {
// Send cancellation email
await $.ServiceExecution.start({
serviceId: 'email-sender',
inputs: {
to: context.customerEmail,
template: 'order-cancelled',
data: {
orderId: context.orderId,
reason: 'processing_error',
},
},
})
},
retryable: true,
},
],
}
// Execute the saga
export const processOrder = async (orderData: {
orderId: string
customerId: string
customerEmail: string
items: any[]
amount: number
paymentMethod: string
shippingAddress: any
}) => {
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(orderProcessingSaga, orderData)
if (result.success) {
console.log('Order processed successfully:', result.result)
// Update order status
await db.update($.Order, orderData.orderId, {
status: 'completed',
completedAt: new Date(),
})
} else {
console.error('Order processing failed:', result.error)
// Update order status
await db.update($.Order, orderData.orderId, {
status: 'failed',
failureReason: result.error,
failedAt: new Date(),
})
}
return result
}Example 2: Multi-Service Account Creation
Create accounts across multiple services with rollback.
import $, { db, on, send } from 'sdk.do'
export const accountCreationSaga: SagaDefinition = {
name: 'Account Creation Saga',
steps: [
{
name: 'create-user-profile',
serviceId: 'user-profile-creator',
inputMapper: (context) => ({
email: context.email,
name: context.name,
metadata: context.metadata,
}),
outputMapper: (output) => ({
userId: output.userId,
}),
compensate: async (context, stepResult) => {
// Delete user profile
await $.ServiceExecution.start({
serviceId: 'user-profile-deleter',
inputs: {
userId: stepResult.userId,
},
})
},
},
{
name: 'create-auth-account',
serviceId: 'auth-account-creator',
inputMapper: (context) => ({
userId: context['create-user-profile'].userId,
email: context.email,
password: context.password,
}),
outputMapper: (output) => ({
authId: output.authId,
}),
compensate: async (context, stepResult) => {
// Delete auth account
await $.ServiceExecution.start({
serviceId: 'auth-account-deleter',
inputs: {
authId: stepResult.authId,
},
})
},
},
{
name: 'create-billing-account',
serviceId: 'billing-account-creator',
inputMapper: (context) => ({
userId: context['create-user-profile'].userId,
email: context.email,
plan: context.plan,
}),
outputMapper: (output) => ({
billingAccountId: output.accountId,
}),
compensate: async (context, stepResult) => {
// Delete billing account
await $.ServiceExecution.start({
serviceId: 'billing-account-deleter',
inputs: {
accountId: stepResult.billingAccountId,
},
})
},
},
{
name: 'provision-workspace',
serviceId: 'workspace-provisioner',
inputMapper: (context) => ({
userId: context['create-user-profile'].userId,
workspaceName: context.workspaceName,
}),
outputMapper: (output) => ({
workspaceId: output.workspaceId,
}),
compensate: async (context, stepResult) => {
// Delete workspace
await $.ServiceExecution.start({
serviceId: 'workspace-deleter',
inputs: {
workspaceId: stepResult.workspaceId,
},
})
},
},
{
name: 'setup-notifications',
serviceId: 'notification-setup',
inputMapper: (context) => ({
userId: context['create-user-profile'].userId,
email: context.email,
preferences: context.notificationPreferences || {},
}),
outputMapper: (output) => ({
notificationConfigured: output.success,
}),
compensate: async (context, stepResult) => {
// Remove notification configuration
await $.ServiceExecution.start({
serviceId: 'notification-cleanup',
inputs: {
userId: context['create-user-profile'].userId,
},
})
},
},
{
name: 'send-welcome-email',
serviceId: 'email-sender',
inputMapper: (context) => ({
to: context.email,
template: 'welcome',
data: {
name: context.name,
workspaceId: context['provision-workspace'].workspaceId,
},
}),
outputMapper: (output) => ({
emailSent: output.sent,
}),
compensate: async (context, stepResult) => {
// No compensation needed for welcome email
console.log('Welcome email was sent but account creation failed')
},
},
],
}
// Execute account creation saga
export const createAccount = async (accountData: {
email: string
name: string
password: string
plan: string
workspaceName: string
metadata?: any
notificationPreferences?: any
}) => {
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(accountCreationSaga, accountData)
if (result.success) {
console.log('Account created successfully:', result.result)
return {
success: true,
userId: result.result['create-user-profile'].userId,
workspaceId: result.result['provision-workspace'].workspaceId,
}
} else {
console.error('Account creation failed:', result.error)
return {
success: false,
error: result.error,
}
}
}Example 3: Travel Booking Saga
Book flights, hotels, and car rentals with coordinated rollback.
import $, { db, on, send } from 'sdk.do'
export const travelBookingSaga: SagaDefinition = {
name: 'Travel Booking Saga',
steps: [
{
name: 'reserve-flight',
serviceId: 'flight-reserver',
inputMapper: (context) => ({
departureCity: context.departureCity,
arrivalCity: context.arrivalCity,
departureDate: context.departureDate,
returnDate: context.returnDate,
passengers: context.passengers,
}),
outputMapper: (output) => ({
flightReservationId: output.reservationId,
flightPrice: output.price,
flightDetails: output.details,
}),
compensate: async (context, stepResult) => {
// Cancel flight reservation
await $.ServiceExecution.start({
serviceId: 'flight-canceller',
inputs: {
reservationId: stepResult.flightReservationId,
},
})
},
timeout: 60000,
},
{
name: 'reserve-hotel',
serviceId: 'hotel-reserver',
inputMapper: (context) => ({
city: context.arrivalCity,
checkInDate: context.departureDate,
checkOutDate: context.returnDate,
guests: context.passengers.length,
roomType: context.roomType,
}),
outputMapper: (output) => ({
hotelReservationId: output.reservationId,
hotelPrice: output.price,
hotelDetails: output.details,
}),
compensate: async (context, stepResult) => {
// Cancel hotel reservation
await $.ServiceExecution.start({
serviceId: 'hotel-canceller',
inputs: {
reservationId: stepResult.hotelReservationId,
},
})
},
timeout: 60000,
},
{
name: 'reserve-car',
serviceId: 'car-reserver',
inputMapper: (context) => ({
pickupLocation: context.arrivalCity,
pickupDate: context.departureDate,
returnDate: context.returnDate,
carType: context.carType || 'economy',
}),
outputMapper: (output) => ({
carReservationId: output.reservationId,
carPrice: output.price,
carDetails: output.details,
}),
compensate: async (context, stepResult) => {
// Cancel car reservation
await $.ServiceExecution.start({
serviceId: 'car-canceller',
inputs: {
reservationId: stepResult.carReservationId,
},
})
},
timeout: 60000,
},
{
name: 'calculate-total',
serviceId: 'price-calculator',
inputMapper: (context) => ({
flightPrice: context['reserve-flight'].flightPrice,
hotelPrice: context['reserve-hotel'].hotelPrice,
carPrice: context['reserve-car'].carPrice,
taxes: true,
fees: true,
}),
outputMapper: (output) => ({
totalPrice: output.total,
breakdown: output.breakdown,
}),
compensate: async () => {
// No compensation needed for calculation
},
},
{
name: 'process-payment',
serviceId: 'payment-processor',
inputMapper: (context) => ({
amount: context['calculate-total'].totalPrice,
currency: 'USD',
paymentMethod: context.paymentMethod,
customerId: context.customerId,
}),
outputMapper: (output) => ({
paymentId: output.paymentId,
transactionId: output.transactionId,
}),
compensate: async (context, stepResult) => {
// Refund payment
await $.ServiceExecution.start({
serviceId: 'payment-refunder',
inputs: {
paymentId: stepResult.paymentId,
amount: context['calculate-total'].totalPrice,
reason: 'booking_cancelled',
},
})
},
},
{
name: 'confirm-flight',
serviceId: 'flight-confirmer',
inputMapper: (context) => ({
reservationId: context['reserve-flight'].flightReservationId,
paymentId: context['process-payment'].paymentId,
}),
outputMapper: (output) => ({
flightConfirmationNumber: output.confirmationNumber,
}),
compensate: async (context, stepResult) => {
// Cancel confirmed flight (may incur fees)
await $.ServiceExecution.start({
serviceId: 'flight-canceller',
inputs: {
confirmationNumber: stepResult.flightConfirmationNumber,
},
})
},
},
{
name: 'confirm-hotel',
serviceId: 'hotel-confirmer',
inputMapper: (context) => ({
reservationId: context['reserve-hotel'].hotelReservationId,
paymentId: context['process-payment'].paymentId,
}),
outputMapper: (output) => ({
hotelConfirmationNumber: output.confirmationNumber,
}),
compensate: async (context, stepResult) => {
// Cancel confirmed hotel
await $.ServiceExecution.start({
serviceId: 'hotel-canceller',
inputs: {
confirmationNumber: stepResult.hotelConfirmationNumber,
},
})
},
},
{
name: 'confirm-car',
serviceId: 'car-confirmer',
inputMapper: (context) => ({
reservationId: context['reserve-car'].carReservationId,
paymentId: context['process-payment'].paymentId,
}),
outputMapper: (output) => ({
carConfirmationNumber: output.confirmationNumber,
}),
compensate: async (context, stepResult) => {
// Cancel confirmed car rental
await $.ServiceExecution.start({
serviceId: 'car-canceller',
inputs: {
confirmationNumber: stepResult.carConfirmationNumber,
},
})
},
},
{
name: 'send-itinerary',
serviceId: 'itinerary-sender',
inputMapper: (context) => ({
email: context.customerEmail,
flightDetails: context['reserve-flight'].flightDetails,
flightConfirmation: context['confirm-flight'].flightConfirmationNumber,
hotelDetails: context['reserve-hotel'].hotelDetails,
hotelConfirmation: context['confirm-hotel'].hotelConfirmationNumber,
carDetails: context['reserve-car'].carDetails,
carConfirmation: context['confirm-car'].carConfirmationNumber,
totalPrice: context['calculate-total'].totalPrice,
}),
outputMapper: (output) => ({
itinerarySent: output.sent,
}),
compensate: async (context) => {
// Send cancellation notice
await $.ServiceExecution.start({
serviceId: 'cancellation-sender',
inputs: {
email: context.customerEmail,
bookingId: context.bookingId,
},
})
},
},
],
}
// Execute travel booking saga
export const bookTravel = async (bookingData: {
customerId: string
customerEmail: string
departureCity: string
arrivalCity: string
departureDate: string
returnDate: string
passengers: any[]
roomType: string
carType?: string
paymentMethod: string
bookingId: string
}) => {
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(travelBookingSaga, bookingData)
if (result.success) {
console.log('Travel booking completed successfully')
return {
success: true,
bookingId: bookingData.bookingId,
confirmations: {
flight: result.result['confirm-flight'].flightConfirmationNumber,
hotel: result.result['confirm-hotel'].hotelConfirmationNumber,
car: result.result['confirm-car'].carConfirmationNumber,
},
totalPrice: result.result['calculate-total'].totalPrice,
}
} else {
console.error('Travel booking failed:', result.error)
return {
success: false,
error: result.error,
}
}
}Choreography Pattern
Saga implementation using event-driven choreography instead of central orchestration.
import $, { db, on, send } from 'sdk.do'
import { eventBus } from './event-bus'
// Define saga steps as event handlers
// Step 1: Order created → Reserve inventory
on($.Order.created, async (order) => {
try {
const reservation = await $.ServiceExecution.start({
serviceId: 'inventory-reserver',
inputs: {
orderId: order.id,
items: order.items,
},
}).then((e) => e.waitForCompletion())
// Emit success event
await send($.Event.publish, {
type: 'inventory.reserved',
data: {
orderId: order.id,
reservationId: reservation.outputs.reservationId,
},
metadata: {
correlationId: order.id,
},
})
} catch (error) {
// Emit failure event
await send($.Event.publish, {
type: 'inventory.reservation.failed',
data: {
orderId: order.id,
error: (error as Error).message,
},
metadata: {
correlationId: order.id,
},
})
}
})
// Step 2: Inventory reserved → Process payment
on('inventory.reserved', async (event) => {
const order = await db.findOne($.Order, { id: event.data.orderId })
try {
const payment = await $.ServiceExecution.start({
serviceId: 'payment-processor',
inputs: {
orderId: order.id,
amount: order.total,
paymentMethod: order.paymentMethod,
},
}).then((e) => e.waitForCompletion())
// Emit success event
await send($.Event.publish, {
type: 'payment.processed',
data: {
orderId: order.id,
paymentId: payment.outputs.paymentId,
},
metadata: {
correlationId: order.id,
},
})
} catch (error) {
// Compensate: Release inventory
await $.ServiceExecution.start({
serviceId: 'inventory-releaser',
inputs: {
reservationId: event.data.reservationId,
},
})
// Emit failure event
await send($.Event.publish, {
type: 'payment.failed',
data: {
orderId: order.id,
error: (error as Error).message,
},
metadata: {
correlationId: order.id,
},
})
}
})
// Step 3: Payment processed → Create shipment
on('payment.processed', async (event) => {
const order = await db.findOne($.Order, { id: event.data.orderId })
try {
const shipment = await $.ServiceExecution.start({
serviceId: 'shipment-creator',
inputs: {
orderId: order.id,
items: order.items,
address: order.shippingAddress,
},
}).then((e) => e.waitForCompletion())
// Emit success event
await send($.Event.publish, {
type: 'shipment.created',
data: {
orderId: order.id,
shipmentId: shipment.outputs.shipmentId,
},
metadata: {
correlationId: order.id,
},
})
} catch (error) {
// Compensate: Refund payment and release inventory
await Promise.all([
$.ServiceExecution.start({
serviceId: 'payment-refunder',
inputs: {
paymentId: event.data.paymentId,
},
}),
$.ServiceExecution.start({
serviceId: 'inventory-releaser',
inputs: {
orderId: order.id,
},
}),
])
// Emit failure event
await send($.Event.publish, {
type: 'shipment.failed',
data: {
orderId: order.id,
error: (error as Error).message,
},
metadata: {
correlationId: order.id,
},
})
}
})
// Handle compensation events
on('inventory.reservation.failed', async (event) => {
// Update order status
await db.update($.Order, event.data.orderId, {
status: 'failed',
failureReason: 'inventory_unavailable',
})
})
on('payment.failed', async (event) => {
// Update order status
await db.update($.Order, event.data.orderId, {
status: 'failed',
failureReason: 'payment_failed',
})
})
on('shipment.failed', async (event) => {
// Update order status
await db.update($.Order, event.data.orderId, {
status: 'failed',
failureReason: 'shipment_creation_failed',
})
})Advanced Patterns
Pattern 1: Saga State Machine
Model saga as a state machine for complex workflows.
import $, { db } from 'sdk.do'
export class SagaStateMachine {
private state: string = 'initial'
private context: any = {}
constructor(
private definition: {
states: Record<
string,
{
onEnter: (context: any) => Promise<any>
onExit?: (context: any) => Promise<void>
transitions: Record<string, string>
compensation?: (context: any) => Promise<void>
}
>
initialState: string
}
) {
this.state = definition.initialState
}
async execute(initialContext: any): Promise<any> {
this.context = initialContext
const stateHistory: string[] = []
try {
while (this.state !== 'completed' && this.state !== 'failed') {
const currentState = this.definition.states[this.state]
if (!currentState) {
throw new Error(`Invalid state: ${this.state}`)
}
stateHistory.push(this.state)
// Execute state
const result = await currentState.onEnter(this.context)
// Update context
this.context = { ...this.context, ...result }
// Determine next state
const nextState = this.determineNextState(currentState, result)
// Exit current state
if (currentState.onExit) {
await currentState.onExit(this.context)
}
// Transition to next state
this.state = nextState
}
return {
success: this.state === 'completed',
context: this.context,
}
} catch (error) {
// Compensate executed states
await this.compensate(stateHistory)
throw error
}
}
private determineNextState(currentState: any, result: any): string {
// Use result to determine transition
for (const [condition, nextState] of Object.entries(currentState.transitions)) {
if (this.evaluateCondition(condition, result)) {
return nextState as string
}
}
throw new Error('No valid transition found')
}
private evaluateCondition(condition: string, result: any): boolean {
// Simple condition evaluation
if (condition === 'success' && result.success) return true
if (condition === 'failure' && !result.success) return true
if (condition === 'default') return true
return false
}
private async compensate(stateHistory: string[]): Promise<void> {
// Compensate in reverse order
for (const stateName of stateHistory.reverse()) {
const state = this.definition.states[stateName]
if (state.compensation) {
try {
await state.compensation(this.context)
} catch (error) {
console.error(`Failed to compensate state ${stateName}:`, error)
}
}
}
}
}
// Usage
const orderStateMachine = new SagaStateMachine({
initialState: 'validating',
states: {
validating: {
onEnter: async (context) => {
// Validate order
const isValid = await validateOrder(context.order)
return { success: isValid }
},
transitions: {
success: 'reserving-inventory',
failure: 'failed',
},
},
'reserving-inventory': {
onEnter: async (context) => {
const reservation = await reserveInventory(context.order)
return { success: true, reservationId: reservation.id }
},
transitions: {
success: 'processing-payment',
failure: 'failed',
},
compensation: async (context) => {
await releaseInventory(context.reservationId)
},
},
'processing-payment': {
onEnter: async (context) => {
const payment = await processPayment(context.order)
return { success: true, paymentId: payment.id }
},
transitions: {
success: 'completed',
failure: 'failed',
},
compensation: async (context) => {
await refundPayment(context.paymentId)
},
},
completed: {
onEnter: async () => ({ success: true }),
transitions: {},
},
failed: {
onEnter: async () => ({ success: false }),
transitions: {},
},
},
})
async function validateOrder(order: any): Promise<boolean> {
return true
}
async function reserveInventory(order: any): Promise<any> {
return { id: 'res_123' }
}
async function releaseInventory(reservationId: string): Promise<void> {}
async function processPayment(order: any): Promise<any> {
return { id: 'pay_123' }
}
async function refundPayment(paymentId: string): Promise<void> {}Pattern 2: Saga Timeout Handling
Handle timeouts at saga and step level.
import $, { db } from 'sdk.do'
export class TimeoutAwareSagaOrchestrator extends SagaOrchestrator {
async executeWithTimeout(saga: SagaDefinition, initialInput: any, sagaTimeout: number): Promise<any> {
return Promise.race([
this.execute(saga, initialInput),
new Promise((_, reject) => setTimeout(() => reject(new Error('Saga timeout exceeded')), sagaTimeout)),
])
}
}Pattern 3: Saga Monitoring and Observability
Track saga execution with metrics and logging.
import $, { db, send } from 'sdk.do'
export class ObservableSagaOrchestrator extends SagaOrchestrator {
async execute(saga: SagaDefinition, initialInput: any): Promise<any> {
const startTime = Date.now()
const sagaId = generateId()
// Log saga start
await send($.Log.create, {
level: 'info',
message: `Saga started: ${saga.name}`,
metadata: {
sagaId,
sagaName: saga.name,
},
})
try {
const result = await super.execute(saga, initialInput)
// Log saga completion
await send($.Log.create, {
level: 'info',
message: `Saga completed: ${saga.name}`,
metadata: {
sagaId,
sagaName: saga.name,
duration: Date.now() - startTime,
},
})
// Record metrics
await send($.Metric.record, {
name: 'saga.execution.success',
value: 1,
tags: {
sagaName: saga.name,
duration: Date.now() - startTime,
},
})
return result
} catch (error) {
// Log saga failure
await send($.Log.create, {
level: 'error',
message: `Saga failed: ${saga.name}`,
metadata: {
sagaId,
sagaName: saga.name,
error: (error as Error).message,
duration: Date.now() - startTime,
},
})
// Record failure metric
await send($.Metric.record, {
name: 'saga.execution.failure',
value: 1,
tags: {
sagaName: saga.name,
error: (error as Error).message,
},
})
throw error
}
}
}
function generateId(): string {
return `saga_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}Testing Saga Patterns
import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'
describe('Saga Pattern', () => {
it('should execute all steps successfully', async () => {
const mockSaga: SagaDefinition = {
name: 'Test Saga',
steps: [
{
name: 'step1',
serviceId: 'service-1',
inputMapper: (ctx) => ctx,
compensate: async () => {},
},
{
name: 'step2',
serviceId: 'service-2',
inputMapper: (ctx) => ctx,
compensate: async () => {},
},
],
}
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(mockSaga, { data: 'test' })
expect(result.success).toBe(true)
})
it('should rollback on failure', async () => {
const compensations: string[] = []
const mockSaga: SagaDefinition = {
name: 'Failing Saga',
steps: [
{
name: 'step1',
serviceId: 'service-1',
inputMapper: (ctx) => ctx,
compensate: async () => {
compensations.push('step1')
},
},
{
name: 'failing-step',
serviceId: 'failing-service',
inputMapper: (ctx) => ctx,
compensate: async () => {},
},
],
}
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(mockSaga, { data: 'test' })
expect(result.success).toBe(false)
expect(compensations).toContain('step1')
})
it('should handle compensation failures', async () => {
const mockSaga: SagaDefinition = {
name: 'Saga with failing compensation',
steps: [
{
name: 'step1',
serviceId: 'service-1',
inputMapper: (ctx) => ctx,
compensate: async () => {
throw new Error('Compensation failed')
},
},
],
}
const orchestrator = new SagaOrchestrator()
const result = await orchestrator.execute(mockSaga, { data: 'test' })
// Should log compensation failure
const failures = await db.query($.CompensationFailure, {
where: { sagaId: result.sagaId },
})
expect(failures.length).toBeGreaterThan(0)
})
})Best Practices
- Idempotency: Make all steps and compensations idempotent
- Compensation Design: Design careful and complete compensation logic
- Timeout Management: Set appropriate timeouts for saga and steps
- State Persistence: Save saga state for recovery
- Monitoring: Track saga execution and failures
- Manual Intervention: Plan for compensation failures
- Testing: Test both success and failure paths
- Documentation: Document saga flows and compensation logic
- Isolation: Keep saga steps isolated and independent
- Error Handling: Handle errors gracefully at each step
Orchestration vs Choreography
| Aspect | Orchestration | Choreography |
|---|---|---|
| Control | Centralized | Decentralized |
| Complexity | Easier to understand | Harder to track |
| Coupling | Higher coupling | Lower coupling |
| Monitoring | Easier to monitor | Harder to monitor |
| Flexibility | Less flexible | More flexible |
| Use When | Complex workflows | Simple, event-driven |
Related Patterns
- Multi-Service Coordination - Parallel service execution
- Service Chaining - Sequential workflows
- Event-Driven - Reactive architectures
Additional Resources
- Service Types - Understanding different service types
- Composition - Building composite services
- Best Practices - Service development guidelines
- API Reference - Complete API documentation
Event-Driven Service Orchestration
Complete guide to building event-driven service architectures with event routing, fan-out/fan-in patterns, event sourcing, and reactive service orchestration
AI Analysis Services
Build AI-powered analysis services that extract insights from data, perform sentiment analysis, detect trends, identify anomalies, and provide predictive analytics using the Services-as-Software framework.