Service Chaining Patterns
Complete guide to sequential service execution patterns, output-to-input mapping, error recovery, conditional branching, and chain optimization strategies
Build sophisticated multi-step workflows by chaining services together, where each service's output flows into the next service's input, creating powerful data transformation and processing pipelines.
Overview
Service chaining enables you to compose complex workflows from simple, reusable services. Each service in the chain performs a specific transformation or operation, passing its results to the next service in sequence. This pattern is fundamental for building data pipelines, content workflows, and multi-stage business processes.
When to Use Service Chaining
Use service chaining when:
- Sequential Dependencies: Each step requires the output of the previous step
- Data Transformation: Data needs to pass through multiple transformation stages
- Workflow Automation: Business processes have clear sequential steps
- Pipeline Processing: Building ETL (Extract, Transform, Load) pipelines
- Quality Gates: Each stage validates and enriches data before proceeding
- Staged Execution: Complex operations broken into manageable stages
Avoid this pattern when:
- Services can execute independently in parallel
- No data dependencies exist between services
- Real-time event-driven architecture is more appropriate
- Overhead of sequential execution is prohibitive
Architecture
Core Implementation
Basic Service Chain
import $, { db, on, send } from 'sdk.do'
export class ServiceChain {
private steps: Array<{
name: string
serviceId: string
inputMapper?: (prevOutput: any, context: any) => any
outputMapper?: (output: any) => any
retryable?: boolean
}> = []
addStep(
name: string,
serviceId: string,
options?: {
inputMapper?: (prevOutput: any, context: any) => any
outputMapper?: (output: any) => any
retryable?: boolean
}
): ServiceChain {
this.steps.push({
name,
serviceId,
...options,
})
return this
}
async execute(initialInput: any): Promise<any> {
let currentData = initialInput
const context = {
startTime: Date.now(),
executionId: generateId(),
stepResults: [] as any[],
}
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i]
const stepStartTime = Date.now()
try {
// Map input for this step
const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData
// Execute service
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: stepInput,
metadata: {
chainExecutionId: context.executionId,
stepIndex: i,
stepName: step.name,
},
})
const result = await execution.waitForCompletion()
// Map output
const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
// Save step result
context.stepResults.push({
stepName: step.name,
stepIndex: i,
input: stepInput,
output: stepOutput,
duration: Date.now() - stepStartTime,
timestamp: new Date().toISOString(),
})
// Update current data for next step
currentData = stepOutput
// Emit progress event
await send($.Event.publish, {
type: 'chain.step.completed',
data: {
executionId: context.executionId,
stepName: step.name,
stepIndex: i,
totalSteps: this.steps.length,
progress: (i + 1) / this.steps.length,
},
})
} catch (error) {
throw new Error(`Chain failed at step ${i + 1} (${step.name}): ${(error as Error).message}`)
}
}
return {
result: currentData,
metadata: {
executionId: context.executionId,
totalDuration: Date.now() - context.startTime,
stepsExecuted: this.steps.length,
stepResults: context.stepResults,
},
}
}
}
function generateId(): string {
return `chain_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}Example 1: Content Production Pipeline
Complete pipeline from topic research to published article.
import $, { db, on, send, ai } from 'sdk.do'
export const contentProductionPipeline = await $.Service.create({
name: 'Content Production Pipeline',
description: 'End-to-end content creation from topic to published article',
type: $.ServiceType.ContentGeneration,
pricing: {
model: 'per-execution',
basePrice: 8.0,
},
workflow: async (inputs: { topic: string; targetAudience: string; wordCount: number; tone: string; keywords: string[]; publishTo?: string[] }) => {
const chain = new ServiceChain()
// Step 1: Topic Research
chain.addStep('research', 'topic-researcher', {
inputMapper: (_, __) => ({
topic: inputs.topic,
depth: 'comprehensive',
sources: ['academic', 'industry', 'news'],
}),
outputMapper: (output) => ({
...output,
research: output.findings,
}),
})
// Step 2: Outline Generation
chain.addStep('outline', 'outline-generator', {
inputMapper: (prevOutput) => ({
topic: inputs.topic,
research: prevOutput.research,
targetLength: inputs.wordCount,
audience: inputs.targetAudience,
}),
outputMapper: (output) => ({
outline: output.sections,
estimatedWordCount: output.wordCount,
}),
})
// Step 3: Content Writing
chain.addStep('writing', 'content-writer', {
inputMapper: (prevOutput) => ({
outline: prevOutput.outline,
tone: inputs.tone,
targetAudience: inputs.targetAudience,
keywords: inputs.keywords,
}),
outputMapper: (output) => ({
content: output.text,
title: output.title,
excerpt: output.excerpt,
wordCount: output.wordCount,
}),
})
// Step 4: Grammar and Style Check
chain.addStep('editing', 'grammar-checker', {
inputMapper: (prevOutput) => ({
text: prevOutput.content,
style: inputs.tone,
checkLevel: 'thorough',
}),
outputMapper: (output) => ({
content: output.correctedText,
title: output.title || 'Untitled',
corrections: output.corrections,
readabilityScore: output.readabilityScore,
}),
})
// Step 5: SEO Optimization
chain.addStep('seo', 'seo-optimizer', {
inputMapper: (prevOutput) => ({
title: prevOutput.title,
content: prevOutput.content,
keywords: inputs.keywords,
targetAudience: inputs.targetAudience,
}),
outputMapper: (output) => ({
title: output.optimizedTitle,
content: output.optimizedContent,
metaDescription: output.metaDescription,
seoScore: output.score,
suggestions: output.suggestions,
}),
})
// Step 6: Image Generation
chain.addStep('images', 'image-generator', {
inputMapper: (prevOutput) => ({
content: prevOutput.content,
title: prevOutput.title,
count: 3,
style: 'professional',
aspectRatio: '16:9',
}),
outputMapper: (output) => ({
images: output.urls,
altTexts: output.altTexts,
}),
})
// Step 7: Plagiarism Check
chain.addStep('plagiarism', 'plagiarism-checker', {
inputMapper: (prevOutput, context) => ({
text: context.stepResults[4].output.content, // Use SEO-optimized content
checkAgainst: ['web', 'academic', 'publications'],
}),
outputMapper: (output) => ({
isPlagiarismFree: output.similarity < 10,
similarityScore: output.similarity,
sources: output.matchedSources,
}),
})
// Step 8: Quality Assurance
chain.addStep('qa', 'content-qa-checker', {
inputMapper: (prevOutput, context) => {
const contentStep = context.stepResults[4] // SEO step
const imagesStep = context.stepResults[5] // Images step
const plagiarismStep = context.stepResults[6] // Plagiarism step
return {
title: contentStep.output.title,
content: contentStep.output.content,
wordCount: context.stepResults[2].output.wordCount,
targetWordCount: inputs.wordCount,
images: imagesStep.output.images,
seoScore: contentStep.output.seoScore,
readabilityScore: context.stepResults[3].output.readabilityScore,
isPlagiarismFree: plagiarismStep.output.isPlagiarismFree,
}
},
outputMapper: (output) => ({
passedQA: output.passed,
qualityScore: output.score,
issues: output.issues,
}),
})
// Step 9: Formatting
chain.addStep('formatting', 'content-formatter', {
inputMapper: (prevOutput, context) => {
const contentStep = context.stepResults[4] // SEO step
const imagesStep = context.stepResults[5] // Images step
return {
title: contentStep.output.title,
content: contentStep.output.content,
images: imagesStep.output.images,
altTexts: imagesStep.output.altTexts,
format: 'html',
}
},
outputMapper: (output) => ({
formattedContent: output.html,
plainText: output.plainText,
}),
})
// Step 10: Publishing (if configured)
if (inputs.publishTo && inputs.publishTo.length > 0) {
chain.addStep('publishing', 'content-publisher', {
inputMapper: (prevOutput, context) => {
const contentStep = context.stepResults[4] // SEO step
const imagesStep = context.stepResults[5] // Images step
const formattedStep = context.stepResults[8] // Formatting step
return {
title: contentStep.output.title,
content: formattedStep.output.formattedContent,
excerpt: contentStep.output.metaDescription,
images: imagesStep.output.images,
keywords: inputs.keywords,
platforms: inputs.publishTo,
}
},
outputMapper: (output) => ({
published: true,
publishedUrls: output.urls,
publishedAt: output.timestamp,
}),
})
}
// Execute chain
const result = await chain.execute(inputs)
return {
article: {
title: result.result.title,
content: result.result.formattedContent,
excerpt: result.metadata.stepResults[4].output.metaDescription,
images: result.metadata.stepResults[5].output.images,
wordCount: result.metadata.stepResults[2].output.wordCount,
},
quality: {
seoScore: result.metadata.stepResults[4].output.seoScore,
readabilityScore: result.metadata.stepResults[3].output.readabilityScore,
qualityScore: result.metadata.stepResults[7].output.qualityScore,
isPlagiarismFree: result.metadata.stepResults[6].output.isPlagiarismFree,
},
publishing: inputs.publishTo
? {
published: result.result.published,
urls: result.result.publishedUrls,
}
: null,
pipeline: {
executionId: result.metadata.executionId,
totalDuration: result.metadata.totalDuration,
stepsExecuted: result.metadata.stepsExecuted,
stepBreakdown: result.metadata.stepResults.map((step: any) => ({
name: step.stepName,
duration: step.duration,
timestamp: step.timestamp,
})),
},
}
},
})Example 2: Lead Qualification Chain
Multi-stage lead qualification and nurturing pipeline.
import $, { db, on, send, ai } from 'sdk.do'
export const leadQualificationChain = await $.Service.create({
name: 'Lead Qualification Chain',
description: 'Multi-stage lead qualification, scoring, and routing pipeline',
type: $.ServiceType.LeadProcessing,
pricing: {
model: 'per-execution',
basePrice: 1.5,
},
workflow: async (inputs: { leadId: string; email: string; name?: string; company?: string; source: string }) => {
const chain = new ServiceChain()
// Step 1: Email Validation
chain.addStep('email-validation', 'email-validator', {
inputMapper: () => ({ email: inputs.email }),
outputMapper: (output) => ({
emailValid: output.isValid,
emailScore: output.score,
emailType: output.type,
}),
})
// Step 2: Data Enrichment
chain.addStep('enrichment', 'lead-enrichment', {
inputMapper: () => ({
email: inputs.email,
name: inputs.name,
company: inputs.company,
}),
outputMapper: (output) => ({
enrichedData: output.profile,
confidence: output.confidence,
}),
})
// Step 3: Company Analysis
chain.addStep('company-analysis', 'company-analyzer', {
inputMapper: (prevOutput) => ({
company: prevOutput.enrichedData.company || inputs.company,
domain: prevOutput.enrichedData.companyDomain,
}),
outputMapper: (output) => ({
companyData: output.company,
companyScore: output.fitScore,
}),
})
// Step 4: Lead Scoring
chain.addStep('scoring', 'lead-scorer', {
inputMapper: (prevOutput, context) => {
const emailStep = context.stepResults[0]
const enrichmentStep = context.stepResults[1]
const companyStep = context.stepResults[2]
return {
email: {
valid: emailStep.output.emailValid,
score: emailStep.output.emailScore,
type: emailStep.output.emailType,
},
profile: enrichmentStep.output.enrichedData,
company: companyStep.output.companyData,
source: inputs.source,
}
},
outputMapper: (output) => ({
leadScore: output.score,
scoreBreakdown: output.breakdown,
qualification: output.qualification,
}),
})
// Step 5: Intent Analysis
chain.addStep('intent-analysis', 'intent-analyzer', {
inputMapper: (prevOutput, context) => {
const enrichmentStep = context.stepResults[1]
const companyStep = context.stepResults[2]
return {
profile: enrichmentStep.output.enrichedData,
company: companyStep.output.companyData,
recentActivity: enrichmentStep.output.enrichedData.recentActivity,
}
},
outputMapper: (output) => ({
buyingIntent: output.intentLevel,
signals: output.signals,
recommendedApproach: output.approach,
}),
})
// Step 6: Segmentation
chain.addStep('segmentation', 'lead-segmenter', {
inputMapper: (prevOutput, context) => {
const scoringStep = context.stepResults[3]
const intentStep = context.stepResults[4]
const companyStep = context.stepResults[2]
return {
leadScore: scoringStep.output.leadScore,
qualification: scoringStep.output.qualification,
buyingIntent: intentStep.output.buyingIntent,
companySize: companyStep.output.companyData.size,
industry: companyStep.output.companyData.industry,
}
},
outputMapper: (output) => ({
segment: output.segment,
priority: output.priority,
recommendedCampaigns: output.campaigns,
}),
})
// Step 7: Routing Decision
chain.addStep('routing', 'lead-router', {
inputMapper: (prevOutput, context) => {
const scoringStep = context.stepResults[3]
const segmentStep = context.stepResults[5]
const intentStep = context.stepResults[4]
return {
leadScore: scoringStep.output.leadScore,
segment: segmentStep.output.segment,
priority: segmentStep.output.priority,
buyingIntent: intentStep.output.buyingIntent,
}
},
outputMapper: (output) => ({
assignTo: output.assignedTo,
routingReason: output.reason,
recommendedActions: output.actions,
}),
})
// Step 8: CRM Update
chain.addStep('crm-update', 'crm-updater', {
inputMapper: (prevOutput, context) => {
const allSteps = context.stepResults
return {
leadId: inputs.leadId,
updates: {
emailValidation: allSteps[0].output,
enrichedData: allSteps[1].output.enrichedData,
companyData: allSteps[2].output.companyData,
leadScore: allSteps[3].output.leadScore,
scoreBreakdown: allSteps[3].output.scoreBreakdown,
qualification: allSteps[3].output.qualification,
buyingIntent: allSteps[4].output.buyingIntent,
signals: allSteps[4].output.signals,
segment: allSteps[5].output.segment,
priority: allSteps[5].output.priority,
assignedTo: allSteps[6].output.assignTo,
lastProcessed: new Date().toISOString(),
},
}
},
outputMapper: (output) => ({
updated: output.success,
crmRecordId: output.recordId,
}),
})
// Step 9: Notification
chain.addStep('notification', 'notification-sender', {
inputMapper: (prevOutput, context) => {
const routingStep = context.stepResults[6]
const scoringStep = context.stepResults[3]
const enrichmentStep = context.stepResults[1]
return {
recipient: routingStep.output.assignTo,
type: 'new-lead-assigned',
data: {
leadId: inputs.leadId,
leadName: enrichmentStep.output.enrichedData.name || inputs.name,
leadScore: scoringStep.output.leadScore,
qualification: scoringStep.output.qualification,
priority: context.stepResults[5].output.priority,
recommendedActions: routingStep.output.recommendedActions,
},
}
},
outputMapper: (output) => ({
notificationSent: output.sent,
notificationId: output.id,
}),
})
// Execute chain
const result = await chain.execute(inputs)
// Extract key results
const stepResults = result.metadata.stepResults
return {
leadId: inputs.leadId,
qualification: {
score: stepResults[3].output.leadScore,
qualification: stepResults[3].output.qualification,
breakdown: stepResults[3].output.scoreBreakdown,
},
enrichedProfile: stepResults[1].output.enrichedData,
company: stepResults[2].output.companyData,
intent: {
level: stepResults[4].output.buyingIntent,
signals: stepResults[4].output.signals,
approach: stepResults[4].output.recommendedApproach,
},
routing: {
segment: stepResults[5].output.segment,
priority: stepResults[5].output.priority,
assignedTo: stepResults[6].output.assignTo,
reason: stepResults[6].output.routingReason,
actions: stepResults[6].output.recommendedActions,
},
processing: {
executionId: result.metadata.executionId,
duration: result.metadata.totalDuration,
stepsCompleted: result.metadata.stepsExecuted,
processedAt: new Date().toISOString(),
},
}
},
})Example 3: Data Ingestion Pipeline
ETL pipeline for data extraction, transformation, and loading.
import $, { db, on, send } from 'sdk.do'
export const dataIngestionPipeline = await $.Service.create({
name: 'Data Ingestion Pipeline',
description: 'ETL pipeline for extracting, transforming, and loading data',
type: $.ServiceType.DataProcessing,
pricing: {
model: 'per-record',
pricePerUnit: 0.005,
},
workflow: async (inputs: {
source: string
sourceType: 'api' | 'file' | 'database'
destination: string
transformations: string[]
validations: string[]
}) => {
const chain = new ServiceChain()
// Step 1: Data Extraction
chain.addStep('extract', 'data-extractor', {
inputMapper: () => ({
source: inputs.source,
sourceType: inputs.sourceType,
batchSize: 1000,
}),
outputMapper: (output) => ({
records: output.data,
metadata: output.metadata,
recordCount: output.data.length,
}),
})
// Step 2: Schema Validation
chain.addStep('schema-validation', 'schema-validator', {
inputMapper: (prevOutput) => ({
records: prevOutput.records,
expectedSchema: inputs.sourceType,
}),
outputMapper: (output) => ({
validRecords: output.valid,
invalidRecords: output.invalid,
validationErrors: output.errors,
}),
})
// Step 3: Data Cleaning
chain.addStep('cleaning', 'data-cleaner', {
inputMapper: (prevOutput) => ({
records: prevOutput.validRecords,
}),
outputMapper: (output) => ({
cleanedRecords: output.cleaned,
removedRecords: output.removed,
cleaningLog: output.log,
}),
})
// Step 4: Data Transformation
chain.addStep('transformation', 'data-transformer', {
inputMapper: (prevOutput) => ({
records: prevOutput.cleanedRecords,
transformations: inputs.transformations,
}),
outputMapper: (output) => ({
transformedRecords: output.transformed,
transformationLog: output.log,
}),
})
// Step 5: Data Enrichment
chain.addStep('enrichment', 'data-enricher', {
inputMapper: (prevOutput) => ({
records: prevOutput.transformedRecords,
enrichmentSources: ['geo', 'demographics', 'firmographics'],
}),
outputMapper: (output) => ({
enrichedRecords: output.enriched,
enrichmentRate: output.enrichmentRate,
}),
})
// Step 6: Business Rules Validation
chain.addStep('business-validation', 'business-rules-validator', {
inputMapper: (prevOutput) => ({
records: prevOutput.enrichedRecords,
rules: inputs.validations,
}),
outputMapper: (output) => ({
validRecords: output.passed,
failedRecords: output.failed,
validationReport: output.report,
}),
})
// Step 7: Deduplication
chain.addStep('deduplication', 'deduplicator', {
inputMapper: (prevOutput) => ({
records: prevOutput.validRecords,
matchFields: ['email', 'phone', 'customerId'],
}),
outputMapper: (output) => ({
uniqueRecords: output.unique,
duplicatesFound: output.duplicates,
deduplicationLog: output.log,
}),
})
// Step 8: Data Loading
chain.addStep('load', 'data-loader', {
inputMapper: (prevOutput) => ({
records: prevOutput.uniqueRecords,
destination: inputs.destination,
mode: 'upsert',
}),
outputMapper: (output) => ({
loaded: output.success,
loadedCount: output.count,
loadErrors: output.errors,
}),
})
// Step 9: Quality Metrics
chain.addStep('quality-metrics', 'quality-calculator', {
inputMapper: (prevOutput, context) => {
const extractStep = context.stepResults[0]
const validationStep = context.stepResults[1]
const cleaningStep = context.stepResults[2]
const businessValidationStep = context.stepResults[5]
const deduplicationStep = context.stepResults[6]
const loadStep = context.stepResults[7]
return {
totalRecords: extractStep.output.recordCount,
validRecords: validationStep.output.validRecords.length,
invalidRecords: validationStep.output.invalidRecords.length,
cleanedRecords: cleaningStep.output.cleanedRecords.length,
removedRecords: cleaningStep.output.removedRecords.length,
failedBusinessRules: businessValidationStep.output.failedRecords.length,
duplicates: deduplicationStep.output.duplicatesFound.length,
loadedRecords: loadStep.output.loadedCount,
}
},
outputMapper: (output) => ({
qualityScore: output.score,
completeness: output.completeness,
accuracy: output.accuracy,
consistency: output.consistency,
}),
})
// Step 10: Reporting
chain.addStep('reporting', 'pipeline-reporter', {
inputMapper: (prevOutput, context) => ({
pipelineResults: context.stepResults,
qualityMetrics: prevOutput,
}),
outputMapper: (output) => ({
reportUrl: output.url,
summary: output.summary,
}),
})
// Execute chain
const result = await chain.execute(inputs)
const stepResults = result.metadata.stepResults
return {
pipeline: {
executionId: result.metadata.executionId,
duration: result.metadata.totalDuration,
status: 'completed',
},
statistics: {
extracted: stepResults[0].output.recordCount,
validated: stepResults[1].output.validRecords.length,
cleaned: stepResults[2].output.cleanedRecords.length,
transformed: stepResults[3].output.transformedRecords.length,
enriched: stepResults[4].output.enrichedRecords.length,
businessValidated: stepResults[5].output.validRecords.length,
deduplicated: stepResults[6].output.uniqueRecords.length,
loaded: stepResults[7].output.loadedCount,
},
quality: stepResults[8].output,
report: stepResults[9].output.reportUrl,
errors: {
validation: stepResults[1].output.validationErrors,
businessRules: stepResults[5].output.validationReport,
loading: stepResults[7].output.loadErrors,
},
}
},
})Advanced Patterns
Pattern 1: Conditional Branching
Execute different service chains based on conditions.
import $, { db } from 'sdk.do'
export class ConditionalServiceChain extends ServiceChain {
addConditionalStep(
name: string,
condition: (prevOutput: any, context: any) => boolean,
trueServiceId: string,
falseServiceId: string
): ConditionalServiceChain {
this.addStep(name, '', {
inputMapper: async (prevOutput, context) => {
const shouldUseTrueService = condition(prevOutput, context)
const serviceId = shouldUseTrueService ? trueServiceId : falseServiceId
const execution = await $.ServiceExecution.start({
serviceId,
inputs: prevOutput,
})
return await execution.waitForCompletion()
},
})
return this
}
}
// Usage
const chain = new ConditionalServiceChain()
chain
.addStep('analyze', 'content-analyzer')
.addConditionalStep(
'quality-check',
(output) => output.qualityScore >= 80,
'simple-proofreader', // High quality: simple check
'deep-editor' // Low quality: deep editing
)
.addStep('publish', 'content-publisher')
const result = await chain.execute({ content: 'Article text...' })Pattern 2: Parallel Sub-Chains
Execute multiple chains in parallel within a step.
import $, { db } from 'sdk.do'
export class ParallelSubChainStep {
static create(
name: string,
subChains: Array<{
name: string
chain: ServiceChain
inputMapper: (parentOutput: any) => any
}>,
aggregator: (results: any[]) => any
) {
return {
name,
execute: async (prevOutput: any) => {
const results = await Promise.all(
subChains.map(async (subChain) => {
const subChainInput = subChain.inputMapper(prevOutput)
return await subChain.chain.execute(subChainInput)
})
)
return aggregator(results.map((r) => r.result))
},
}
}
}
// Usage
const contentChain = new ServiceChain().addStep('research', 'topic-researcher').addStep('outline', 'outline-generator')
const imageChain = new ServiceChain().addStep('concept', 'image-concept-generator').addStep('generate', 'image-generator')
const seoChain = new ServiceChain().addStep('keywords', 'keyword-researcher').addStep('optimize', 'seo-optimizer')
const parallelStep = ParallelSubChainStep.create(
'parallel-processing',
[
{
name: 'content',
chain: contentChain,
inputMapper: (input) => ({ topic: input.topic }),
},
{
name: 'images',
chain: imageChain,
inputMapper: (input) => ({ topic: input.topic }),
},
{
name: 'seo',
chain: seoChain,
inputMapper: (input) => ({ topic: input.topic }),
},
],
(results) => ({
content: results[0],
images: results[1],
seo: results[2],
})
)Pattern 3: Checkpoint and Recovery
Save checkpoints and recover from failures.
import $, { db } from 'sdk.do'
export class CheckpointServiceChain extends ServiceChain {
private checkpointStore = new Map<string, any>()
async executeWithCheckpoints(initialInput: any, checkpointId?: string): Promise<any> {
// Load from checkpoint if exists
if (checkpointId && this.checkpointStore.has(checkpointId)) {
const checkpoint = this.checkpointStore.get(checkpointId)
console.log(`Resuming from checkpoint: step ${checkpoint.stepIndex}`)
return this.resumeFromCheckpoint(checkpoint)
}
// Normal execution with checkpoints
const executionId = generateId()
let currentData = initialInput
const context = {
startTime: Date.now(),
executionId,
stepResults: [] as any[],
}
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i]
try {
// Execute step
const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: stepInput,
})
const result = await execution.waitForCompletion()
const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
context.stepResults.push({
stepName: step.name,
stepIndex: i,
output: stepOutput,
})
currentData = stepOutput
// Save checkpoint after each step
await this.saveCheckpoint(executionId, {
stepIndex: i,
currentData,
context,
})
} catch (error) {
// Save error checkpoint
await this.saveCheckpoint(executionId, {
stepIndex: i,
currentData,
context,
error: (error as Error).message,
})
if (step.retryable) {
console.log(`Step ${step.name} failed but is retryable. Checkpoint saved.`)
}
throw error
}
}
return {
result: currentData,
metadata: {
executionId: context.executionId,
totalDuration: Date.now() - context.startTime,
stepsExecuted: this.steps.length,
stepResults: context.stepResults,
},
}
}
private async saveCheckpoint(executionId: string, checkpoint: any): Promise<void> {
this.checkpointStore.set(executionId, checkpoint)
// Persist to database
await db.create($.Checkpoint, {
executionId,
data: checkpoint,
createdAt: new Date(),
})
}
private async resumeFromCheckpoint(checkpoint: any): Promise<any> {
// Resume from checkpoint step
const startIndex = checkpoint.stepIndex + 1
let currentData = checkpoint.currentData
const context = checkpoint.context
for (let i = startIndex; i < this.steps.length; i++) {
const step = this.steps[i]
const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: stepInput,
})
const result = await execution.waitForCompletion()
const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs
context.stepResults.push({
stepName: step.name,
stepIndex: i,
output: stepOutput,
})
currentData = stepOutput
}
return {
result: currentData,
metadata: {
executionId: context.executionId,
totalDuration: Date.now() - context.startTime,
stepsExecuted: this.steps.length,
stepResults: context.stepResults,
resumedFromCheckpoint: true,
},
}
}
}Pattern 4: Dynamic Chain Construction
Build chains dynamically based on input or runtime conditions.
import $, { db, ai } from 'sdk.do'
export class DynamicChainBuilder {
static async buildChain(goal: string, constraints: any): Promise<ServiceChain> {
// Use AI to determine optimal service chain
const chainPlan = await ai.generate({
model: 'gpt-5',
prompt: `Given the following goal and constraints, design an optimal service chain.
Goal: ${goal}
Constraints:
${JSON.stringify(constraints, null, 2)}
Available Services:
- data-extractor: Extract data from sources
- data-transformer: Transform data structure
- data-validator: Validate data quality
- data-enricher: Enrich data from external sources
- sentiment-analyzer: Analyze sentiment
- content-generator: Generate content
- summarizer: Summarize text
- translator: Translate languages
- classifier: Classify data
- predictor: Make predictions
Return a JSON array of service steps in order, where each step has:
{
"serviceId": "service-id",
"purpose": "what this step accomplishes",
"inputMapper": "description of input transformation needed"
}`,
maxTokens: 1000,
})
// Parse AI response
const steps = JSON.parse(chainPlan.text)
// Build chain
const chain = new ServiceChain()
for (const step of steps) {
chain.addStep(step.purpose, step.serviceId, {
inputMapper: createInputMapper(step.inputMapper),
})
}
return chain
}
}
function createInputMapper(description: string): (prevOutput: any) => any {
// Create input mapper based on description
// This could use AI or template-based mapping
return (prevOutput: any) => {
// Implementation based on description
return prevOutput
}
}
// Usage
const chain = await DynamicChainBuilder.buildChain('Extract customer data, validate it, enrich with firmographics, and generate insights', {
maxDuration: 60000,
budget: 5.0,
quality: 'high',
})
const result = await chain.execute({ source: 'customers.csv' })Error Handling and Retry Strategies
Retry with Exponential Backoff
import $, { db } from 'sdk.do'
export class RetryableServiceChain extends ServiceChain {
async executeStepWithRetry(step: any, input: any, maxRetries: number = 3): Promise<any> {
let lastError: Error | null = null
let attempt = 0
while (attempt < maxRetries) {
try {
const execution = await $.ServiceExecution.start({
serviceId: step.serviceId,
inputs: input,
})
return await execution.waitForCompletion()
} catch (error) {
lastError = error as Error
attempt++
if (attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000
console.log(`Retry ${attempt}/${maxRetries} after ${delay}ms`)
await new Promise((resolve) => setTimeout(resolve, delay))
}
}
}
throw new Error(`Step ${step.name} failed after ${maxRetries} attempts: ${lastError?.message}`)
}
}Fallback Services
import $, { db } from 'sdk.do'
export class FallbackServiceChain extends ServiceChain {
addStepWithFallback(name: string, primaryServiceId: string, fallbackServiceId: string): FallbackServiceChain {
this.addStep(name, primaryServiceId, {
inputMapper: async (prevOutput) => {
try {
const execution = await $.ServiceExecution.start({
serviceId: primaryServiceId,
inputs: prevOutput,
})
return await execution.waitForCompletion()
} catch (error) {
console.log(`Primary service ${primaryServiceId} failed, using fallback`)
const execution = await $.ServiceExecution.start({
serviceId: fallbackServiceId,
inputs: prevOutput,
})
return await execution.waitForCompletion()
}
},
})
return this
}
}Performance Optimization
Lazy Evaluation
import $, { db } from 'sdk.do'
export class LazyServiceChain extends ServiceChain {
addLazyStep(name: string, serviceId: string, condition: (prevOutput: any) => boolean): LazyServiceChain {
this.addStep(name, serviceId, {
inputMapper: async (prevOutput) => {
// Only execute if condition is met
if (!condition(prevOutput)) {
console.log(`Skipping step ${name} - condition not met`)
return prevOutput
}
const execution = await $.ServiceExecution.start({
serviceId,
inputs: prevOutput,
})
return await execution.waitForCompletion()
},
})
return this
}
}
// Usage
const chain = new LazyServiceChain()
chain
.addStep('analyze', 'content-analyzer')
.addLazyStep(
'deep-check',
'expensive-quality-checker',
(output) => output.initialQuality < 70 // Only run if quality is low
)
.addStep('publish', 'publisher')Caching Intermediate Results
import $, { db } from 'sdk.do'
export class CachedServiceChain extends ServiceChain {
private cache = new Map<string, any>()
addCachedStep(
name: string,
serviceId: string,
cacheDuration: number = 3600000 // 1 hour
): CachedServiceChain {
this.addStep(name, serviceId, {
inputMapper: async (prevOutput) => {
const cacheKey = `${serviceId}:${JSON.stringify(prevOutput)}`
// Check cache
if (this.cache.has(cacheKey)) {
const cached = this.cache.get(cacheKey)
if (Date.now() - cached.timestamp < cacheDuration) {
console.log(`Using cached result for ${name}`)
return cached.data
}
}
// Execute service
const execution = await $.ServiceExecution.start({
serviceId,
inputs: prevOutput,
})
const result = await execution.waitForCompletion()
// Cache result
this.cache.set(cacheKey, {
data: result,
timestamp: Date.now(),
})
return result
},
})
return this
}
}Testing Service Chains
import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'
describe('Service Chaining', () => {
it('should execute steps in sequence', async () => {
const chain = new ServiceChain()
const executionOrder: string[] = []
chain
.addStep('step1', 'service-1', {
outputMapper: (output) => {
executionOrder.push('step1')
return { ...output, step: 1 }
},
})
.addStep('step2', 'service-2', {
outputMapper: (output) => {
executionOrder.push('step2')
return { ...output, step: 2 }
},
})
.addStep('step3', 'service-3', {
outputMapper: (output) => {
executionOrder.push('step3')
return { ...output, step: 3 }
},
})
await chain.execute({ initial: 'data' })
expect(executionOrder).toEqual(['step1', 'step2', 'step3'])
})
it('should pass output from one step to next', async () => {
const chain = new ServiceChain()
chain
.addStep('double', 'doubler', {
inputMapper: (input) => input,
outputMapper: (output) => ({ value: output.value * 2 }),
})
.addStep('add10', 'adder', {
inputMapper: (input) => input,
outputMapper: (output) => ({ value: output.value + 10 }),
})
const result = await chain.execute({ value: 5 })
expect(result.result.value).toBe(20) // (5 * 2) + 10
})
it('should handle errors in chain', async () => {
const chain = new ServiceChain()
chain.addStep('step1', 'service-1').addStep('failing-step', 'failing-service').addStep('step3', 'service-3')
await expect(chain.execute({ data: 'test' })).rejects.toThrow('Chain failed at step 2')
})
})Best Practices
- Clear Step Names: Use descriptive names for each step
- Input/Output Contracts: Define clear data contracts between steps
- Error Handling: Implement retry logic and fallbacks
- Checkpointing: Save progress for long-running chains
- Monitoring: Track execution time and success rates for each step
- Idempotency: Make steps idempotent for safe retries
- Testing: Test individual steps and full chains
- Documentation: Document the purpose and requirements of each step
- Optimization: Cache results and skip unnecessary steps
- Observability: Emit events for monitoring and debugging
Related Patterns
- Multi-Service Coordination - Parallel service execution
- Event-Driven - Reactive service orchestration
- Saga Pattern - Distributed transaction management
Additional Resources
- Service Types - Understanding different service types
- Composition - Building composite services
- Best Practices - Service development guidelines
- API Reference - Complete API documentation
Multi-Service Coordination
Comprehensive guide to coordinating multiple services in parallel for complex workflows, including service dependencies, error handling, and result aggregation patterns
Event-Driven Service Orchestration
Complete guide to building event-driven service architectures with event routing, fan-out/fan-in patterns, event sourcing, and reactive service orchestration