.do
Patterns

Service Chaining Patterns

Complete guide to sequential service execution patterns, output-to-input mapping, error recovery, conditional branching, and chain optimization strategies

Build sophisticated multi-step workflows by chaining services together, where each service's output flows into the next service's input, creating powerful data transformation and processing pipelines.

Overview

Service chaining enables you to compose complex workflows from simple, reusable services. Each service in the chain performs a specific transformation or operation, passing its results to the next service in sequence. This pattern is fundamental for building data pipelines, content workflows, and multi-stage business processes.

When to Use Service Chaining

Use service chaining when:

  • Sequential Dependencies: Each step requires the output of the previous step
  • Data Transformation: Data needs to pass through multiple transformation stages
  • Workflow Automation: Business processes have clear sequential steps
  • Pipeline Processing: Building ETL (Extract, Transform, Load) pipelines
  • Quality Gates: Each stage validates and enriches data before proceeding
  • Staged Execution: Complex operations broken into manageable stages

Avoid this pattern when:

  • Services can execute independently in parallel
  • No data dependencies exist between services
  • Real-time event-driven architecture is more appropriate
  • Overhead of sequential execution is prohibitive

Architecture

graph LR Input[Input Data] --> S1[Service 1<br/>Transform] S1 --> |Output 1| S2[Service 2<br/>Validate] S2 --> |Output 2| S3[Service 3<br/>Enrich] S3 --> |Output 3| S4[Service 4<br/>Store] S4 --> |Output 4| Result[Final Result] S1 -.->|Checkpoint 1| CP[Checkpoint Store] S2 -.->|Checkpoint 2| CP S3 -.->|Checkpoint 3| CP

Core Implementation

Basic Service Chain

import $, { db, on, send } from 'sdk.do'

export class ServiceChain {
  private steps: Array<{
    name: string
    serviceId: string
    inputMapper?: (prevOutput: any, context: any) => any
    outputMapper?: (output: any) => any
    retryable?: boolean
  }> = []

  addStep(
    name: string,
    serviceId: string,
    options?: {
      inputMapper?: (prevOutput: any, context: any) => any
      outputMapper?: (output: any) => any
      retryable?: boolean
    }
  ): ServiceChain {
    this.steps.push({
      name,
      serviceId,
      ...options,
    })
    return this
  }

  async execute(initialInput: any): Promise<any> {
    let currentData = initialInput
    const context = {
      startTime: Date.now(),
      executionId: generateId(),
      stepResults: [] as any[],
    }

    for (let i = 0; i < this.steps.length; i++) {
      const step = this.steps[i]
      const stepStartTime = Date.now()

      try {
        // Map input for this step
        const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData

        // Execute service
        const execution = await $.ServiceExecution.start({
          serviceId: step.serviceId,
          inputs: stepInput,
          metadata: {
            chainExecutionId: context.executionId,
            stepIndex: i,
            stepName: step.name,
          },
        })

        const result = await execution.waitForCompletion()

        // Map output
        const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs

        // Save step result
        context.stepResults.push({
          stepName: step.name,
          stepIndex: i,
          input: stepInput,
          output: stepOutput,
          duration: Date.now() - stepStartTime,
          timestamp: new Date().toISOString(),
        })

        // Update current data for next step
        currentData = stepOutput

        // Emit progress event
        await send($.Event.publish, {
          type: 'chain.step.completed',
          data: {
            executionId: context.executionId,
            stepName: step.name,
            stepIndex: i,
            totalSteps: this.steps.length,
            progress: (i + 1) / this.steps.length,
          },
        })
      } catch (error) {
        throw new Error(`Chain failed at step ${i + 1} (${step.name}): ${(error as Error).message}`)
      }
    }

    return {
      result: currentData,
      metadata: {
        executionId: context.executionId,
        totalDuration: Date.now() - context.startTime,
        stepsExecuted: this.steps.length,
        stepResults: context.stepResults,
      },
    }
  }
}

function generateId(): string {
  return `chain_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

Example 1: Content Production Pipeline

Complete pipeline from topic research to published article.

import $, { db, on, send, ai } from 'sdk.do'

export const contentProductionPipeline = await $.Service.create({
  name: 'Content Production Pipeline',
  description: 'End-to-end content creation from topic to published article',
  type: $.ServiceType.ContentGeneration,

  pricing: {
    model: 'per-execution',
    basePrice: 8.0,
  },

  workflow: async (inputs: { topic: string; targetAudience: string; wordCount: number; tone: string; keywords: string[]; publishTo?: string[] }) => {
    const chain = new ServiceChain()

    // Step 1: Topic Research
    chain.addStep('research', 'topic-researcher', {
      inputMapper: (_, __) => ({
        topic: inputs.topic,
        depth: 'comprehensive',
        sources: ['academic', 'industry', 'news'],
      }),
      outputMapper: (output) => ({
        ...output,
        research: output.findings,
      }),
    })

    // Step 2: Outline Generation
    chain.addStep('outline', 'outline-generator', {
      inputMapper: (prevOutput) => ({
        topic: inputs.topic,
        research: prevOutput.research,
        targetLength: inputs.wordCount,
        audience: inputs.targetAudience,
      }),
      outputMapper: (output) => ({
        outline: output.sections,
        estimatedWordCount: output.wordCount,
      }),
    })

    // Step 3: Content Writing
    chain.addStep('writing', 'content-writer', {
      inputMapper: (prevOutput) => ({
        outline: prevOutput.outline,
        tone: inputs.tone,
        targetAudience: inputs.targetAudience,
        keywords: inputs.keywords,
      }),
      outputMapper: (output) => ({
        content: output.text,
        title: output.title,
        excerpt: output.excerpt,
        wordCount: output.wordCount,
      }),
    })

    // Step 4: Grammar and Style Check
    chain.addStep('editing', 'grammar-checker', {
      inputMapper: (prevOutput) => ({
        text: prevOutput.content,
        style: inputs.tone,
        checkLevel: 'thorough',
      }),
      outputMapper: (output) => ({
        content: output.correctedText,
        title: output.title || 'Untitled',
        corrections: output.corrections,
        readabilityScore: output.readabilityScore,
      }),
    })

    // Step 5: SEO Optimization
    chain.addStep('seo', 'seo-optimizer', {
      inputMapper: (prevOutput) => ({
        title: prevOutput.title,
        content: prevOutput.content,
        keywords: inputs.keywords,
        targetAudience: inputs.targetAudience,
      }),
      outputMapper: (output) => ({
        title: output.optimizedTitle,
        content: output.optimizedContent,
        metaDescription: output.metaDescription,
        seoScore: output.score,
        suggestions: output.suggestions,
      }),
    })

    // Step 6: Image Generation
    chain.addStep('images', 'image-generator', {
      inputMapper: (prevOutput) => ({
        content: prevOutput.content,
        title: prevOutput.title,
        count: 3,
        style: 'professional',
        aspectRatio: '16:9',
      }),
      outputMapper: (output) => ({
        images: output.urls,
        altTexts: output.altTexts,
      }),
    })

    // Step 7: Plagiarism Check
    chain.addStep('plagiarism', 'plagiarism-checker', {
      inputMapper: (prevOutput, context) => ({
        text: context.stepResults[4].output.content, // Use SEO-optimized content
        checkAgainst: ['web', 'academic', 'publications'],
      }),
      outputMapper: (output) => ({
        isPlagiarismFree: output.similarity < 10,
        similarityScore: output.similarity,
        sources: output.matchedSources,
      }),
    })

    // Step 8: Quality Assurance
    chain.addStep('qa', 'content-qa-checker', {
      inputMapper: (prevOutput, context) => {
        const contentStep = context.stepResults[4] // SEO step
        const imagesStep = context.stepResults[5] // Images step
        const plagiarismStep = context.stepResults[6] // Plagiarism step

        return {
          title: contentStep.output.title,
          content: contentStep.output.content,
          wordCount: context.stepResults[2].output.wordCount,
          targetWordCount: inputs.wordCount,
          images: imagesStep.output.images,
          seoScore: contentStep.output.seoScore,
          readabilityScore: context.stepResults[3].output.readabilityScore,
          isPlagiarismFree: plagiarismStep.output.isPlagiarismFree,
        }
      },
      outputMapper: (output) => ({
        passedQA: output.passed,
        qualityScore: output.score,
        issues: output.issues,
      }),
    })

    // Step 9: Formatting
    chain.addStep('formatting', 'content-formatter', {
      inputMapper: (prevOutput, context) => {
        const contentStep = context.stepResults[4] // SEO step
        const imagesStep = context.stepResults[5] // Images step

        return {
          title: contentStep.output.title,
          content: contentStep.output.content,
          images: imagesStep.output.images,
          altTexts: imagesStep.output.altTexts,
          format: 'html',
        }
      },
      outputMapper: (output) => ({
        formattedContent: output.html,
        plainText: output.plainText,
      }),
    })

    // Step 10: Publishing (if configured)
    if (inputs.publishTo && inputs.publishTo.length > 0) {
      chain.addStep('publishing', 'content-publisher', {
        inputMapper: (prevOutput, context) => {
          const contentStep = context.stepResults[4] // SEO step
          const imagesStep = context.stepResults[5] // Images step
          const formattedStep = context.stepResults[8] // Formatting step

          return {
            title: contentStep.output.title,
            content: formattedStep.output.formattedContent,
            excerpt: contentStep.output.metaDescription,
            images: imagesStep.output.images,
            keywords: inputs.keywords,
            platforms: inputs.publishTo,
          }
        },
        outputMapper: (output) => ({
          published: true,
          publishedUrls: output.urls,
          publishedAt: output.timestamp,
        }),
      })
    }

    // Execute chain
    const result = await chain.execute(inputs)

    return {
      article: {
        title: result.result.title,
        content: result.result.formattedContent,
        excerpt: result.metadata.stepResults[4].output.metaDescription,
        images: result.metadata.stepResults[5].output.images,
        wordCount: result.metadata.stepResults[2].output.wordCount,
      },
      quality: {
        seoScore: result.metadata.stepResults[4].output.seoScore,
        readabilityScore: result.metadata.stepResults[3].output.readabilityScore,
        qualityScore: result.metadata.stepResults[7].output.qualityScore,
        isPlagiarismFree: result.metadata.stepResults[6].output.isPlagiarismFree,
      },
      publishing: inputs.publishTo
        ? {
            published: result.result.published,
            urls: result.result.publishedUrls,
          }
        : null,
      pipeline: {
        executionId: result.metadata.executionId,
        totalDuration: result.metadata.totalDuration,
        stepsExecuted: result.metadata.stepsExecuted,
        stepBreakdown: result.metadata.stepResults.map((step: any) => ({
          name: step.stepName,
          duration: step.duration,
          timestamp: step.timestamp,
        })),
      },
    }
  },
})

Example 2: Lead Qualification Chain

Multi-stage lead qualification and nurturing pipeline.

import $, { db, on, send, ai } from 'sdk.do'

export const leadQualificationChain = await $.Service.create({
  name: 'Lead Qualification Chain',
  description: 'Multi-stage lead qualification, scoring, and routing pipeline',
  type: $.ServiceType.LeadProcessing,

  pricing: {
    model: 'per-execution',
    basePrice: 1.5,
  },

  workflow: async (inputs: { leadId: string; email: string; name?: string; company?: string; source: string }) => {
    const chain = new ServiceChain()

    // Step 1: Email Validation
    chain.addStep('email-validation', 'email-validator', {
      inputMapper: () => ({ email: inputs.email }),
      outputMapper: (output) => ({
        emailValid: output.isValid,
        emailScore: output.score,
        emailType: output.type,
      }),
    })

    // Step 2: Data Enrichment
    chain.addStep('enrichment', 'lead-enrichment', {
      inputMapper: () => ({
        email: inputs.email,
        name: inputs.name,
        company: inputs.company,
      }),
      outputMapper: (output) => ({
        enrichedData: output.profile,
        confidence: output.confidence,
      }),
    })

    // Step 3: Company Analysis
    chain.addStep('company-analysis', 'company-analyzer', {
      inputMapper: (prevOutput) => ({
        company: prevOutput.enrichedData.company || inputs.company,
        domain: prevOutput.enrichedData.companyDomain,
      }),
      outputMapper: (output) => ({
        companyData: output.company,
        companyScore: output.fitScore,
      }),
    })

    // Step 4: Lead Scoring
    chain.addStep('scoring', 'lead-scorer', {
      inputMapper: (prevOutput, context) => {
        const emailStep = context.stepResults[0]
        const enrichmentStep = context.stepResults[1]
        const companyStep = context.stepResults[2]

        return {
          email: {
            valid: emailStep.output.emailValid,
            score: emailStep.output.emailScore,
            type: emailStep.output.emailType,
          },
          profile: enrichmentStep.output.enrichedData,
          company: companyStep.output.companyData,
          source: inputs.source,
        }
      },
      outputMapper: (output) => ({
        leadScore: output.score,
        scoreBreakdown: output.breakdown,
        qualification: output.qualification,
      }),
    })

    // Step 5: Intent Analysis
    chain.addStep('intent-analysis', 'intent-analyzer', {
      inputMapper: (prevOutput, context) => {
        const enrichmentStep = context.stepResults[1]
        const companyStep = context.stepResults[2]

        return {
          profile: enrichmentStep.output.enrichedData,
          company: companyStep.output.companyData,
          recentActivity: enrichmentStep.output.enrichedData.recentActivity,
        }
      },
      outputMapper: (output) => ({
        buyingIntent: output.intentLevel,
        signals: output.signals,
        recommendedApproach: output.approach,
      }),
    })

    // Step 6: Segmentation
    chain.addStep('segmentation', 'lead-segmenter', {
      inputMapper: (prevOutput, context) => {
        const scoringStep = context.stepResults[3]
        const intentStep = context.stepResults[4]
        const companyStep = context.stepResults[2]

        return {
          leadScore: scoringStep.output.leadScore,
          qualification: scoringStep.output.qualification,
          buyingIntent: intentStep.output.buyingIntent,
          companySize: companyStep.output.companyData.size,
          industry: companyStep.output.companyData.industry,
        }
      },
      outputMapper: (output) => ({
        segment: output.segment,
        priority: output.priority,
        recommendedCampaigns: output.campaigns,
      }),
    })

    // Step 7: Routing Decision
    chain.addStep('routing', 'lead-router', {
      inputMapper: (prevOutput, context) => {
        const scoringStep = context.stepResults[3]
        const segmentStep = context.stepResults[5]
        const intentStep = context.stepResults[4]

        return {
          leadScore: scoringStep.output.leadScore,
          segment: segmentStep.output.segment,
          priority: segmentStep.output.priority,
          buyingIntent: intentStep.output.buyingIntent,
        }
      },
      outputMapper: (output) => ({
        assignTo: output.assignedTo,
        routingReason: output.reason,
        recommendedActions: output.actions,
      }),
    })

    // Step 8: CRM Update
    chain.addStep('crm-update', 'crm-updater', {
      inputMapper: (prevOutput, context) => {
        const allSteps = context.stepResults

        return {
          leadId: inputs.leadId,
          updates: {
            emailValidation: allSteps[0].output,
            enrichedData: allSteps[1].output.enrichedData,
            companyData: allSteps[2].output.companyData,
            leadScore: allSteps[3].output.leadScore,
            scoreBreakdown: allSteps[3].output.scoreBreakdown,
            qualification: allSteps[3].output.qualification,
            buyingIntent: allSteps[4].output.buyingIntent,
            signals: allSteps[4].output.signals,
            segment: allSteps[5].output.segment,
            priority: allSteps[5].output.priority,
            assignedTo: allSteps[6].output.assignTo,
            lastProcessed: new Date().toISOString(),
          },
        }
      },
      outputMapper: (output) => ({
        updated: output.success,
        crmRecordId: output.recordId,
      }),
    })

    // Step 9: Notification
    chain.addStep('notification', 'notification-sender', {
      inputMapper: (prevOutput, context) => {
        const routingStep = context.stepResults[6]
        const scoringStep = context.stepResults[3]
        const enrichmentStep = context.stepResults[1]

        return {
          recipient: routingStep.output.assignTo,
          type: 'new-lead-assigned',
          data: {
            leadId: inputs.leadId,
            leadName: enrichmentStep.output.enrichedData.name || inputs.name,
            leadScore: scoringStep.output.leadScore,
            qualification: scoringStep.output.qualification,
            priority: context.stepResults[5].output.priority,
            recommendedActions: routingStep.output.recommendedActions,
          },
        }
      },
      outputMapper: (output) => ({
        notificationSent: output.sent,
        notificationId: output.id,
      }),
    })

    // Execute chain
    const result = await chain.execute(inputs)

    // Extract key results
    const stepResults = result.metadata.stepResults

    return {
      leadId: inputs.leadId,
      qualification: {
        score: stepResults[3].output.leadScore,
        qualification: stepResults[3].output.qualification,
        breakdown: stepResults[3].output.scoreBreakdown,
      },
      enrichedProfile: stepResults[1].output.enrichedData,
      company: stepResults[2].output.companyData,
      intent: {
        level: stepResults[4].output.buyingIntent,
        signals: stepResults[4].output.signals,
        approach: stepResults[4].output.recommendedApproach,
      },
      routing: {
        segment: stepResults[5].output.segment,
        priority: stepResults[5].output.priority,
        assignedTo: stepResults[6].output.assignTo,
        reason: stepResults[6].output.routingReason,
        actions: stepResults[6].output.recommendedActions,
      },
      processing: {
        executionId: result.metadata.executionId,
        duration: result.metadata.totalDuration,
        stepsCompleted: result.metadata.stepsExecuted,
        processedAt: new Date().toISOString(),
      },
    }
  },
})

Example 3: Data Ingestion Pipeline

ETL pipeline for data extraction, transformation, and loading.

import $, { db, on, send } from 'sdk.do'

export const dataIngestionPipeline = await $.Service.create({
  name: 'Data Ingestion Pipeline',
  description: 'ETL pipeline for extracting, transforming, and loading data',
  type: $.ServiceType.DataProcessing,

  pricing: {
    model: 'per-record',
    pricePerUnit: 0.005,
  },

  workflow: async (inputs: {
    source: string
    sourceType: 'api' | 'file' | 'database'
    destination: string
    transformations: string[]
    validations: string[]
  }) => {
    const chain = new ServiceChain()

    // Step 1: Data Extraction
    chain.addStep('extract', 'data-extractor', {
      inputMapper: () => ({
        source: inputs.source,
        sourceType: inputs.sourceType,
        batchSize: 1000,
      }),
      outputMapper: (output) => ({
        records: output.data,
        metadata: output.metadata,
        recordCount: output.data.length,
      }),
    })

    // Step 2: Schema Validation
    chain.addStep('schema-validation', 'schema-validator', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.records,
        expectedSchema: inputs.sourceType,
      }),
      outputMapper: (output) => ({
        validRecords: output.valid,
        invalidRecords: output.invalid,
        validationErrors: output.errors,
      }),
    })

    // Step 3: Data Cleaning
    chain.addStep('cleaning', 'data-cleaner', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.validRecords,
      }),
      outputMapper: (output) => ({
        cleanedRecords: output.cleaned,
        removedRecords: output.removed,
        cleaningLog: output.log,
      }),
    })

    // Step 4: Data Transformation
    chain.addStep('transformation', 'data-transformer', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.cleanedRecords,
        transformations: inputs.transformations,
      }),
      outputMapper: (output) => ({
        transformedRecords: output.transformed,
        transformationLog: output.log,
      }),
    })

    // Step 5: Data Enrichment
    chain.addStep('enrichment', 'data-enricher', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.transformedRecords,
        enrichmentSources: ['geo', 'demographics', 'firmographics'],
      }),
      outputMapper: (output) => ({
        enrichedRecords: output.enriched,
        enrichmentRate: output.enrichmentRate,
      }),
    })

    // Step 6: Business Rules Validation
    chain.addStep('business-validation', 'business-rules-validator', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.enrichedRecords,
        rules: inputs.validations,
      }),
      outputMapper: (output) => ({
        validRecords: output.passed,
        failedRecords: output.failed,
        validationReport: output.report,
      }),
    })

    // Step 7: Deduplication
    chain.addStep('deduplication', 'deduplicator', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.validRecords,
        matchFields: ['email', 'phone', 'customerId'],
      }),
      outputMapper: (output) => ({
        uniqueRecords: output.unique,
        duplicatesFound: output.duplicates,
        deduplicationLog: output.log,
      }),
    })

    // Step 8: Data Loading
    chain.addStep('load', 'data-loader', {
      inputMapper: (prevOutput) => ({
        records: prevOutput.uniqueRecords,
        destination: inputs.destination,
        mode: 'upsert',
      }),
      outputMapper: (output) => ({
        loaded: output.success,
        loadedCount: output.count,
        loadErrors: output.errors,
      }),
    })

    // Step 9: Quality Metrics
    chain.addStep('quality-metrics', 'quality-calculator', {
      inputMapper: (prevOutput, context) => {
        const extractStep = context.stepResults[0]
        const validationStep = context.stepResults[1]
        const cleaningStep = context.stepResults[2]
        const businessValidationStep = context.stepResults[5]
        const deduplicationStep = context.stepResults[6]
        const loadStep = context.stepResults[7]

        return {
          totalRecords: extractStep.output.recordCount,
          validRecords: validationStep.output.validRecords.length,
          invalidRecords: validationStep.output.invalidRecords.length,
          cleanedRecords: cleaningStep.output.cleanedRecords.length,
          removedRecords: cleaningStep.output.removedRecords.length,
          failedBusinessRules: businessValidationStep.output.failedRecords.length,
          duplicates: deduplicationStep.output.duplicatesFound.length,
          loadedRecords: loadStep.output.loadedCount,
        }
      },
      outputMapper: (output) => ({
        qualityScore: output.score,
        completeness: output.completeness,
        accuracy: output.accuracy,
        consistency: output.consistency,
      }),
    })

    // Step 10: Reporting
    chain.addStep('reporting', 'pipeline-reporter', {
      inputMapper: (prevOutput, context) => ({
        pipelineResults: context.stepResults,
        qualityMetrics: prevOutput,
      }),
      outputMapper: (output) => ({
        reportUrl: output.url,
        summary: output.summary,
      }),
    })

    // Execute chain
    const result = await chain.execute(inputs)

    const stepResults = result.metadata.stepResults

    return {
      pipeline: {
        executionId: result.metadata.executionId,
        duration: result.metadata.totalDuration,
        status: 'completed',
      },
      statistics: {
        extracted: stepResults[0].output.recordCount,
        validated: stepResults[1].output.validRecords.length,
        cleaned: stepResults[2].output.cleanedRecords.length,
        transformed: stepResults[3].output.transformedRecords.length,
        enriched: stepResults[4].output.enrichedRecords.length,
        businessValidated: stepResults[5].output.validRecords.length,
        deduplicated: stepResults[6].output.uniqueRecords.length,
        loaded: stepResults[7].output.loadedCount,
      },
      quality: stepResults[8].output,
      report: stepResults[9].output.reportUrl,
      errors: {
        validation: stepResults[1].output.validationErrors,
        businessRules: stepResults[5].output.validationReport,
        loading: stepResults[7].output.loadErrors,
      },
    }
  },
})

Advanced Patterns

Pattern 1: Conditional Branching

Execute different service chains based on conditions.

import $, { db } from 'sdk.do'

export class ConditionalServiceChain extends ServiceChain {
  addConditionalStep(
    name: string,
    condition: (prevOutput: any, context: any) => boolean,
    trueServiceId: string,
    falseServiceId: string
  ): ConditionalServiceChain {
    this.addStep(name, '', {
      inputMapper: async (prevOutput, context) => {
        const shouldUseTrueService = condition(prevOutput, context)
        const serviceId = shouldUseTrueService ? trueServiceId : falseServiceId

        const execution = await $.ServiceExecution.start({
          serviceId,
          inputs: prevOutput,
        })

        return await execution.waitForCompletion()
      },
    })

    return this
  }
}

// Usage
const chain = new ConditionalServiceChain()

chain
  .addStep('analyze', 'content-analyzer')
  .addConditionalStep(
    'quality-check',
    (output) => output.qualityScore >= 80,
    'simple-proofreader', // High quality: simple check
    'deep-editor' // Low quality: deep editing
  )
  .addStep('publish', 'content-publisher')

const result = await chain.execute({ content: 'Article text...' })

Pattern 2: Parallel Sub-Chains

Execute multiple chains in parallel within a step.

import $, { db } from 'sdk.do'

export class ParallelSubChainStep {
  static create(
    name: string,
    subChains: Array<{
      name: string
      chain: ServiceChain
      inputMapper: (parentOutput: any) => any
    }>,
    aggregator: (results: any[]) => any
  ) {
    return {
      name,
      execute: async (prevOutput: any) => {
        const results = await Promise.all(
          subChains.map(async (subChain) => {
            const subChainInput = subChain.inputMapper(prevOutput)
            return await subChain.chain.execute(subChainInput)
          })
        )

        return aggregator(results.map((r) => r.result))
      },
    }
  }
}

// Usage
const contentChain = new ServiceChain().addStep('research', 'topic-researcher').addStep('outline', 'outline-generator')

const imageChain = new ServiceChain().addStep('concept', 'image-concept-generator').addStep('generate', 'image-generator')

const seoChain = new ServiceChain().addStep('keywords', 'keyword-researcher').addStep('optimize', 'seo-optimizer')

const parallelStep = ParallelSubChainStep.create(
  'parallel-processing',
  [
    {
      name: 'content',
      chain: contentChain,
      inputMapper: (input) => ({ topic: input.topic }),
    },
    {
      name: 'images',
      chain: imageChain,
      inputMapper: (input) => ({ topic: input.topic }),
    },
    {
      name: 'seo',
      chain: seoChain,
      inputMapper: (input) => ({ topic: input.topic }),
    },
  ],
  (results) => ({
    content: results[0],
    images: results[1],
    seo: results[2],
  })
)

Pattern 3: Checkpoint and Recovery

Save checkpoints and recover from failures.

import $, { db } from 'sdk.do'

export class CheckpointServiceChain extends ServiceChain {
  private checkpointStore = new Map<string, any>()

  async executeWithCheckpoints(initialInput: any, checkpointId?: string): Promise<any> {
    // Load from checkpoint if exists
    if (checkpointId && this.checkpointStore.has(checkpointId)) {
      const checkpoint = this.checkpointStore.get(checkpointId)
      console.log(`Resuming from checkpoint: step ${checkpoint.stepIndex}`)

      return this.resumeFromCheckpoint(checkpoint)
    }

    // Normal execution with checkpoints
    const executionId = generateId()
    let currentData = initialInput
    const context = {
      startTime: Date.now(),
      executionId,
      stepResults: [] as any[],
    }

    for (let i = 0; i < this.steps.length; i++) {
      const step = this.steps[i]

      try {
        // Execute step
        const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData

        const execution = await $.ServiceExecution.start({
          serviceId: step.serviceId,
          inputs: stepInput,
        })

        const result = await execution.waitForCompletion()
        const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs

        context.stepResults.push({
          stepName: step.name,
          stepIndex: i,
          output: stepOutput,
        })

        currentData = stepOutput

        // Save checkpoint after each step
        await this.saveCheckpoint(executionId, {
          stepIndex: i,
          currentData,
          context,
        })
      } catch (error) {
        // Save error checkpoint
        await this.saveCheckpoint(executionId, {
          stepIndex: i,
          currentData,
          context,
          error: (error as Error).message,
        })

        if (step.retryable) {
          console.log(`Step ${step.name} failed but is retryable. Checkpoint saved.`)
        }

        throw error
      }
    }

    return {
      result: currentData,
      metadata: {
        executionId: context.executionId,
        totalDuration: Date.now() - context.startTime,
        stepsExecuted: this.steps.length,
        stepResults: context.stepResults,
      },
    }
  }

  private async saveCheckpoint(executionId: string, checkpoint: any): Promise<void> {
    this.checkpointStore.set(executionId, checkpoint)

    // Persist to database
    await db.create($.Checkpoint, {
      executionId,
      data: checkpoint,
      createdAt: new Date(),
    })
  }

  private async resumeFromCheckpoint(checkpoint: any): Promise<any> {
    // Resume from checkpoint step
    const startIndex = checkpoint.stepIndex + 1
    let currentData = checkpoint.currentData
    const context = checkpoint.context

    for (let i = startIndex; i < this.steps.length; i++) {
      const step = this.steps[i]

      const stepInput = step.inputMapper ? step.inputMapper(currentData, context) : currentData

      const execution = await $.ServiceExecution.start({
        serviceId: step.serviceId,
        inputs: stepInput,
      })

      const result = await execution.waitForCompletion()
      const stepOutput = step.outputMapper ? step.outputMapper(result.outputs) : result.outputs

      context.stepResults.push({
        stepName: step.name,
        stepIndex: i,
        output: stepOutput,
      })

      currentData = stepOutput
    }

    return {
      result: currentData,
      metadata: {
        executionId: context.executionId,
        totalDuration: Date.now() - context.startTime,
        stepsExecuted: this.steps.length,
        stepResults: context.stepResults,
        resumedFromCheckpoint: true,
      },
    }
  }
}

Pattern 4: Dynamic Chain Construction

Build chains dynamically based on input or runtime conditions.

import $, { db, ai } from 'sdk.do'

export class DynamicChainBuilder {
  static async buildChain(goal: string, constraints: any): Promise<ServiceChain> {
    // Use AI to determine optimal service chain
    const chainPlan = await ai.generate({
      model: 'gpt-5',
      prompt: `Given the following goal and constraints, design an optimal service chain.

Goal: ${goal}

Constraints:
${JSON.stringify(constraints, null, 2)}

Available Services:
- data-extractor: Extract data from sources
- data-transformer: Transform data structure
- data-validator: Validate data quality
- data-enricher: Enrich data from external sources
- sentiment-analyzer: Analyze sentiment
- content-generator: Generate content
- summarizer: Summarize text
- translator: Translate languages
- classifier: Classify data
- predictor: Make predictions

Return a JSON array of service steps in order, where each step has:
{
  "serviceId": "service-id",
  "purpose": "what this step accomplishes",
  "inputMapper": "description of input transformation needed"
}`,
      maxTokens: 1000,
    })

    // Parse AI response
    const steps = JSON.parse(chainPlan.text)

    // Build chain
    const chain = new ServiceChain()

    for (const step of steps) {
      chain.addStep(step.purpose, step.serviceId, {
        inputMapper: createInputMapper(step.inputMapper),
      })
    }

    return chain
  }
}

function createInputMapper(description: string): (prevOutput: any) => any {
  // Create input mapper based on description
  // This could use AI or template-based mapping
  return (prevOutput: any) => {
    // Implementation based on description
    return prevOutput
  }
}

// Usage
const chain = await DynamicChainBuilder.buildChain('Extract customer data, validate it, enrich with firmographics, and generate insights', {
  maxDuration: 60000,
  budget: 5.0,
  quality: 'high',
})

const result = await chain.execute({ source: 'customers.csv' })

Error Handling and Retry Strategies

Retry with Exponential Backoff

import $, { db } from 'sdk.do'

export class RetryableServiceChain extends ServiceChain {
  async executeStepWithRetry(step: any, input: any, maxRetries: number = 3): Promise<any> {
    let lastError: Error | null = null
    let attempt = 0

    while (attempt < maxRetries) {
      try {
        const execution = await $.ServiceExecution.start({
          serviceId: step.serviceId,
          inputs: input,
        })

        return await execution.waitForCompletion()
      } catch (error) {
        lastError = error as Error
        attempt++

        if (attempt < maxRetries) {
          const delay = Math.pow(2, attempt) * 1000
          console.log(`Retry ${attempt}/${maxRetries} after ${delay}ms`)
          await new Promise((resolve) => setTimeout(resolve, delay))
        }
      }
    }

    throw new Error(`Step ${step.name} failed after ${maxRetries} attempts: ${lastError?.message}`)
  }
}

Fallback Services

import $, { db } from 'sdk.do'

export class FallbackServiceChain extends ServiceChain {
  addStepWithFallback(name: string, primaryServiceId: string, fallbackServiceId: string): FallbackServiceChain {
    this.addStep(name, primaryServiceId, {
      inputMapper: async (prevOutput) => {
        try {
          const execution = await $.ServiceExecution.start({
            serviceId: primaryServiceId,
            inputs: prevOutput,
          })

          return await execution.waitForCompletion()
        } catch (error) {
          console.log(`Primary service ${primaryServiceId} failed, using fallback`)

          const execution = await $.ServiceExecution.start({
            serviceId: fallbackServiceId,
            inputs: prevOutput,
          })

          return await execution.waitForCompletion()
        }
      },
    })

    return this
  }
}

Performance Optimization

Lazy Evaluation

import $, { db } from 'sdk.do'

export class LazyServiceChain extends ServiceChain {
  addLazyStep(name: string, serviceId: string, condition: (prevOutput: any) => boolean): LazyServiceChain {
    this.addStep(name, serviceId, {
      inputMapper: async (prevOutput) => {
        // Only execute if condition is met
        if (!condition(prevOutput)) {
          console.log(`Skipping step ${name} - condition not met`)
          return prevOutput
        }

        const execution = await $.ServiceExecution.start({
          serviceId,
          inputs: prevOutput,
        })

        return await execution.waitForCompletion()
      },
    })

    return this
  }
}

// Usage
const chain = new LazyServiceChain()

chain
  .addStep('analyze', 'content-analyzer')
  .addLazyStep(
    'deep-check',
    'expensive-quality-checker',
    (output) => output.initialQuality < 70 // Only run if quality is low
  )
  .addStep('publish', 'publisher')

Caching Intermediate Results

import $, { db } from 'sdk.do'

export class CachedServiceChain extends ServiceChain {
  private cache = new Map<string, any>()

  addCachedStep(
    name: string,
    serviceId: string,
    cacheDuration: number = 3600000 // 1 hour
  ): CachedServiceChain {
    this.addStep(name, serviceId, {
      inputMapper: async (prevOutput) => {
        const cacheKey = `${serviceId}:${JSON.stringify(prevOutput)}`

        // Check cache
        if (this.cache.has(cacheKey)) {
          const cached = this.cache.get(cacheKey)
          if (Date.now() - cached.timestamp < cacheDuration) {
            console.log(`Using cached result for ${name}`)
            return cached.data
          }
        }

        // Execute service
        const execution = await $.ServiceExecution.start({
          serviceId,
          inputs: prevOutput,
        })

        const result = await execution.waitForCompletion()

        // Cache result
        this.cache.set(cacheKey, {
          data: result,
          timestamp: Date.now(),
        })

        return result
      },
    })

    return this
  }
}

Testing Service Chains

import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'

describe('Service Chaining', () => {
  it('should execute steps in sequence', async () => {
    const chain = new ServiceChain()

    const executionOrder: string[] = []

    chain
      .addStep('step1', 'service-1', {
        outputMapper: (output) => {
          executionOrder.push('step1')
          return { ...output, step: 1 }
        },
      })
      .addStep('step2', 'service-2', {
        outputMapper: (output) => {
          executionOrder.push('step2')
          return { ...output, step: 2 }
        },
      })
      .addStep('step3', 'service-3', {
        outputMapper: (output) => {
          executionOrder.push('step3')
          return { ...output, step: 3 }
        },
      })

    await chain.execute({ initial: 'data' })

    expect(executionOrder).toEqual(['step1', 'step2', 'step3'])
  })

  it('should pass output from one step to next', async () => {
    const chain = new ServiceChain()

    chain
      .addStep('double', 'doubler', {
        inputMapper: (input) => input,
        outputMapper: (output) => ({ value: output.value * 2 }),
      })
      .addStep('add10', 'adder', {
        inputMapper: (input) => input,
        outputMapper: (output) => ({ value: output.value + 10 }),
      })

    const result = await chain.execute({ value: 5 })

    expect(result.result.value).toBe(20) // (5 * 2) + 10
  })

  it('should handle errors in chain', async () => {
    const chain = new ServiceChain()

    chain.addStep('step1', 'service-1').addStep('failing-step', 'failing-service').addStep('step3', 'service-3')

    await expect(chain.execute({ data: 'test' })).rejects.toThrow('Chain failed at step 2')
  })
})

Best Practices

  1. Clear Step Names: Use descriptive names for each step
  2. Input/Output Contracts: Define clear data contracts between steps
  3. Error Handling: Implement retry logic and fallbacks
  4. Checkpointing: Save progress for long-running chains
  5. Monitoring: Track execution time and success rates for each step
  6. Idempotency: Make steps idempotent for safe retries
  7. Testing: Test individual steps and full chains
  8. Documentation: Document the purpose and requirements of each step
  9. Optimization: Cache results and skip unnecessary steps
  10. Observability: Emit events for monitoring and debugging

Additional Resources