.do
Patterns

Multi-Service Coordination

Comprehensive guide to coordinating multiple services in parallel for complex workflows, including service dependencies, error handling, and result aggregation patterns

Execute multiple independent services in parallel to achieve complex business goals faster and more efficiently than sequential execution.

Overview

Multi-service coordination enables you to run several services simultaneously, aggregate their results, and handle failures gracefully. This pattern is essential for building high-performance, composite services that leverage multiple data sources or processing capabilities.

When to Use Multi-Service Coordination

Use this pattern when:

  • Independent Operations: Services can execute without dependencies on each other's results
  • Data Aggregation: You need to combine information from multiple sources
  • Performance Critical: Sequential execution would be too slow
  • Composite Services: Building higher-level services from existing primitives
  • Parallel Processing: Tasks can benefit from concurrent execution
  • Fault Tolerance: System should continue with partial results if some services fail

Avoid this pattern when:

  • Services have strict ordering requirements
  • One service's output is required for another's input
  • Sequential execution is required for business logic
  • Coordination overhead exceeds parallelization benefits

Architecture

graph TB Input[Input Data] --> Coordinator[Service Coordinator] Coordinator --> S1[Service 1] Coordinator --> S2[Service 2] Coordinator --> S3[Service 3] Coordinator --> S4[Service 4] S1 --> |Result 1| Aggregator[Result Aggregator] S2 --> |Result 2| Aggregator S3 --> |Result 3| Aggregator S4 --> |Result 4| Aggregator Aggregator --> Validator[Result Validator] Validator --> Output[Combined Output]

Core Implementation

Basic Parallel Execution

import $, { db, on, send } from 'sdk.do'

// Basic multi-service coordinator
export const createMultiServiceCoordinator = async (config: {
  name: string
  description: string
  services: Array<{
    id: string
    inputs: (parentInputs: any) => any
    timeout?: number
  }>
  aggregator: (results: any[]) => any
}) => {
  return await $.Service.create({
    name: config.name,
    description: config.description,
    type: $.ServiceType.Orchestration,

    workflow: async (inputs) => {
      const startTime = Date.now()

      // Execute all services in parallel
      const servicePromises = config.services.map(async (service) => {
        const serviceInputs = service.inputs(inputs)

        try {
          const execution = await $.ServiceExecution.start({
            serviceId: service.id,
            inputs: serviceInputs,
            timeout: service.timeout || 30000,
          })

          const result = await execution.waitForCompletion()

          return {
            serviceId: service.id,
            status: 'success',
            data: result.outputs,
            duration: result.duration,
          }
        } catch (error) {
          return {
            serviceId: service.id,
            status: 'failed',
            error: error.message,
            duration: Date.now() - startTime,
          }
        }
      })

      // Wait for all services to complete
      const results = await Promise.all(servicePromises)

      // Aggregate results
      const successfulResults = results.filter((r) => r.status === 'success')
      const failedResults = results.filter((r) => r.status === 'failed')

      const aggregatedData = config.aggregator(successfulResults.map((r) => r.data))

      return {
        data: aggregatedData,
        metadata: {
          totalDuration: Date.now() - startTime,
          servicesExecuted: results.length,
          successCount: successfulResults.length,
          failureCount: failedResults.length,
          failures: failedResults,
          completeness: successfulResults.length / results.length,
        },
      }
    },
  })
}

Example 1: Lead Intelligence Aggregator

Combine multiple data sources to create comprehensive lead profiles.

import $, { db, on, send, ai } from 'sdk.do'

export const leadIntelligenceAggregator = await $.Service.create({
  name: 'Lead Intelligence Aggregator',
  description: 'Aggregate data from multiple sources to create comprehensive lead profiles',
  type: $.ServiceType.DataEnrichment,

  pricing: {
    model: 'per-execution',
    basePrice: 0.5,
  },

  workflow: async (inputs: { email: string; name?: string; company?: string; phone?: string }) => {
    const startTime = Date.now()

    // Execute all enrichment services in parallel
    const [emailValidation, emailReputation, socialProfiles, professionalProfile, companyData, techStack, newsArticles, contactEnrichment] =
      await Promise.allSettled([
        // Service 1: Email validation
        $.ServiceExecution.start({
          serviceId: 'email-validator',
          inputs: { email: inputs.email },
        }).then((e) => e.waitForCompletion()),

        // Service 2: Email reputation check
        $.ServiceExecution.start({
          serviceId: 'email-reputation-checker',
          inputs: { email: inputs.email },
        }).then((e) => e.waitForCompletion()),

        // Service 3: Social media profiles
        $.ServiceExecution.start({
          serviceId: 'social-profile-finder',
          inputs: {
            email: inputs.email,
            name: inputs.name,
          },
        }).then((e) => e.waitForCompletion()),

        // Service 4: Professional profile (LinkedIn)
        $.ServiceExecution.start({
          serviceId: 'linkedin-enrichment',
          inputs: {
            email: inputs.email,
            name: inputs.name,
          },
        }).then((e) => e.waitForCompletion()),

        // Service 5: Company information
        $.ServiceExecution.start({
          serviceId: 'company-researcher',
          inputs: {
            company: inputs.company || 'unknown',
            email: inputs.email,
          },
        }).then((e) => e.waitForCompletion()),

        // Service 6: Technology stack detection
        $.ServiceExecution.start({
          serviceId: 'tech-stack-analyzer',
          inputs: {
            domain: inputs.company ? extractDomain(inputs.company) : null,
          },
        }).then((e) => e.waitForCompletion()),

        // Service 7: Recent news and press releases
        $.ServiceExecution.start({
          serviceId: 'news-aggregator',
          inputs: {
            company: inputs.company,
            person: inputs.name,
            limit: 5,
          },
        }).then((e) => e.waitForCompletion()),

        // Service 8: Additional contact info
        $.ServiceExecution.start({
          serviceId: 'contact-finder',
          inputs: {
            email: inputs.email,
            phone: inputs.phone,
          },
        }).then((e) => e.waitForCompletion()),
      ])

    // Extract successful results
    const extractResult = (result: PromiseSettledResult<any>) => {
      if (result.status === 'fulfilled') {
        return result.value.outputs
      }
      return null
    }

    const emailData = extractResult(emailValidation)
    const reputationData = extractResult(emailReputation)
    const socialData = extractResult(socialProfiles)
    const professionalData = extractResult(professionalProfile)
    const companyInfo = extractResult(companyData)
    const techData = extractResult(techStack)
    const newsData = extractResult(newsArticles)
    const contactData = extractResult(contactEnrichment)

    // Calculate completeness score
    const dataPoints = [
      emailData?.isValid,
      reputationData?.score,
      socialData?.profiles?.length > 0,
      professionalData?.profile,
      companyInfo?.name,
      techData?.technologies?.length > 0,
      newsData?.articles?.length > 0,
      contactData?.additionalEmails?.length > 0,
    ]

    const completeness = dataPoints.filter(Boolean).length / dataPoints.length

    // Aggregate into comprehensive profile
    const profile = {
      contact: {
        email: inputs.email,
        isValidEmail: emailData?.isValid || false,
        emailReputation: reputationData?.score || 'unknown',
        phone: contactData?.phone || inputs.phone,
        alternateEmails: contactData?.additionalEmails || [],
        social: {
          linkedin: socialData?.profiles?.linkedin,
          twitter: socialData?.profiles?.twitter,
          github: socialData?.profiles?.github,
          facebook: socialData?.profiles?.facebook,
        },
      },

      professional: {
        name: inputs.name || professionalData?.profile?.name,
        title: professionalData?.profile?.title,
        company: inputs.company || professionalData?.profile?.company || companyInfo?.name,
        experience: professionalData?.profile?.experience || [],
        education: professionalData?.profile?.education || [],
        skills: professionalData?.profile?.skills || [],
      },

      company: companyInfo
        ? {
            name: companyInfo.name,
            domain: companyInfo.domain,
            industry: companyInfo.industry,
            size: companyInfo.employeeCount,
            revenue: companyInfo.revenue,
            founded: companyInfo.founded,
            location: companyInfo.headquarters,
            description: companyInfo.description,
            technologies: techData?.technologies || [],
            techCategories: techData?.categories || [],
          }
        : null,

      signals: {
        recentNews: newsData?.articles || [],
        buyingSignals: analyzeSignals(newsData?.articles, companyInfo),
        engagementScore: calculateEngagementScore({
          socialPresence: socialData,
          professionalActivity: professionalData,
          companyGrowth: companyInfo,
        }),
      },

      metadata: {
        completeness,
        dataQuality: calculateDataQuality([emailData, reputationData, socialData, professionalData, companyInfo, techData, newsData, contactData]),
        enrichedAt: new Date().toISOString(),
        duration: Date.now() - startTime,
        sources: {
          emailValidation: !!emailData,
          reputation: !!reputationData,
          social: !!socialData,
          professional: !!professionalData,
          company: !!companyInfo,
          technology: !!techData,
          news: !!newsData,
          contacts: !!contactData,
        },
      },
    }

    // Generate AI insights
    const insights = await ai.generate({
      model: 'gpt-5',
      prompt: `Analyze this lead profile and provide:
1. Key insights about the lead
2. Recommended engagement approach
3. Potential pain points
4. Best time to reach out
5. Personalization suggestions

Profile: ${JSON.stringify(profile, null, 2)}`,
      maxTokens: 1000,
    })

    return {
      profile,
      insights: insights.text,
      readyForOutreach: completeness >= 0.6 && emailData?.isValid,
    }
  },
})

// Helper functions
function extractDomain(companyOrEmail: string): string {
  if (companyOrEmail.includes('@')) {
    return companyOrEmail.split('@')[1]
  }
  return companyOrEmail.toLowerCase().replace(/\s+/g, '') + '.com'
}

function analyzeSignals(articles: any[], companyInfo: any): string[] {
  const signals = []

  if (articles?.some((a: any) => a.title.includes('funding') || a.title.includes('raised'))) {
    signals.push('recent-funding')
  }

  if (articles?.some((a: any) => a.title.includes('hiring') || a.title.includes('expansion'))) {
    signals.push('rapid-growth')
  }

  if (articles?.some((a: any) => a.title.includes('launch') || a.title.includes('release'))) {
    signals.push('new-product')
  }

  if (companyInfo?.employeeCount > companyInfo?.lastYearEmployeeCount * 1.2) {
    signals.push('aggressive-hiring')
  }

  return signals
}

function calculateEngagementScore(data: any): number {
  let score = 0

  if (data.socialPresence?.profiles) {
    score += Object.keys(data.socialPresence.profiles).length * 10
  }

  if (data.professionalActivity?.profile?.skills?.length > 10) {
    score += 20
  }

  if (data.companyGrowth?.growthRate > 0.2) {
    score += 30
  }

  return Math.min(score, 100)
}

function calculateDataQuality(results: any[]): number {
  const validResults = results.filter((r) => r !== null && Object.keys(r).length > 0)
  return validResults.length / results.length
}

Example 2: Content Marketing Suite

Execute multiple content generation and distribution services in parallel.

import $, { db, on, send, ai } from 'sdk.do'

export const contentMarketingSuite = await $.Service.create({
  name: 'Content Marketing Suite',
  description: 'Generate and distribute content across multiple channels simultaneously',
  type: $.ServiceType.ContentGeneration,

  pricing: {
    model: 'per-execution',
    basePrice: 5.0,
  },

  workflow: async (inputs: { topic: string; targetAudience: string; tone: string; keywords: string[]; channels: string[] }) => {
    const startTime = Date.now()

    // Phase 1: Research (parallel)
    const [topicResearch, competitorAnalysis, trendAnalysis, keywordResearch] = await Promise.all([
      $.ServiceExecution.start({
        serviceId: 'topic-researcher',
        inputs: {
          topic: inputs.topic,
          depth: 'comprehensive',
        },
      }).then((e) => e.waitForCompletion()),

      $.ServiceExecution.start({
        serviceId: 'competitor-analyzer',
        inputs: {
          topic: inputs.topic,
          limit: 10,
        },
      }).then((e) => e.waitForCompletion()),

      $.ServiceExecution.start({
        serviceId: 'trend-analyzer',
        inputs: {
          topic: inputs.topic,
          timeframe: '30d',
        },
      }).then((e) => e.waitForCompletion()),

      $.ServiceExecution.start({
        serviceId: 'keyword-researcher',
        inputs: {
          keywords: inputs.keywords,
          intent: 'informational',
        },
      }).then((e) => e.waitForCompletion()),
    ])

    // Phase 2: Content generation (parallel)
    const contentPromises = inputs.channels.map(async (channel) => {
      const channelConfig = getChannelConfig(channel)

      const content = await $.ServiceExecution.start({
        serviceId: 'content-generator',
        inputs: {
          topic: inputs.topic,
          research: topicResearch.outputs.findings,
          competitorInsights: competitorAnalysis.outputs.insights,
          trends: trendAnalysis.outputs.trends,
          keywords: keywordResearch.outputs.keywords,
          format: channelConfig.format,
          length: channelConfig.length,
          tone: inputs.tone,
          audience: inputs.targetAudience,
        },
      }).then((e) => e.waitForCompletion())

      return {
        channel,
        content: content.outputs,
      }
    })

    const contents = await Promise.all(contentPromises)

    // Phase 3: Enhancement (parallel)
    const enhancementPromises = contents.map(async ({ channel, content }) => {
      const [seoOptimized, images, hashtags, metadata] = await Promise.allSettled([
        // SEO optimization
        $.ServiceExecution.start({
          serviceId: 'seo-optimizer',
          inputs: {
            content: content.text,
            keywords: keywordResearch.outputs.keywords,
            channel,
          },
        }).then((e) => e.waitForCompletion()),

        // Image generation
        $.ServiceExecution.start({
          serviceId: 'image-generator',
          inputs: {
            content: content.text,
            style: getChannelConfig(channel).imageStyle,
            count: getChannelConfig(channel).imageCount,
          },
        }).then((e) => e.waitForCompletion()),

        // Hashtag generation
        $.ServiceExecution.start({
          serviceId: 'hashtag-generator',
          inputs: {
            content: content.text,
            platform: channel,
            count: 10,
          },
        }).then((e) => e.waitForCompletion()),

        // Metadata generation
        $.ServiceExecution.start({
          serviceId: 'metadata-generator',
          inputs: {
            content: content.text,
            channel,
          },
        }).then((e) => e.waitForCompletion()),
      ])

      return {
        channel,
        content: {
          ...content,
          optimized: seoOptimized.status === 'fulfilled' ? seoOptimized.value.outputs : content,
          images: images.status === 'fulfilled' ? images.value.outputs.urls : [],
          hashtags: hashtags.status === 'fulfilled' ? hashtags.value.outputs.hashtags : [],
          metadata: metadata.status === 'fulfilled' ? metadata.value.outputs : {},
        },
      }
    })

    const enhancedContents = await Promise.all(enhancementPromises)

    // Phase 4: Scheduling (parallel)
    const schedulingPromises = enhancedContents.map(async ({ channel, content }) => {
      const optimalTime = await $.ServiceExecution.start({
        serviceId: 'optimal-time-finder',
        inputs: {
          channel,
          audience: inputs.targetAudience,
        },
      }).then((e) => e.waitForCompletion())

      const scheduled = await $.ServiceExecution.start({
        serviceId: 'content-scheduler',
        inputs: {
          channel,
          content: content.optimized.text,
          images: content.images,
          hashtags: content.hashtags,
          metadata: content.metadata,
          scheduledTime: optimalTime.outputs.time,
        },
      }).then((e) => e.waitForCompletion())

      return {
        channel,
        scheduledAt: scheduled.outputs.scheduledAt,
        postId: scheduled.outputs.postId,
      }
    })

    const scheduled = await Promise.all(schedulingPromises)

    return {
      research: {
        topic: topicResearch.outputs,
        competitors: competitorAnalysis.outputs,
        trends: trendAnalysis.outputs,
        keywords: keywordResearch.outputs,
      },
      content: enhancedContents.map(({ channel, content }) => ({
        channel,
        text: content.optimized.text,
        images: content.images,
        hashtags: content.hashtags,
        metadata: content.metadata,
      })),
      scheduled,
      metadata: {
        totalDuration: Date.now() - startTime,
        channelsProcessed: inputs.channels.length,
        contentPieces: enhancedContents.length,
        researchSources: 4,
        generatedAt: new Date().toISOString(),
      },
    }
  },
})

function getChannelConfig(channel: string) {
  const configs: Record<string, any> = {
    blog: {
      format: 'long-form',
      length: 2000,
      imageStyle: 'professional',
      imageCount: 3,
    },
    twitter: {
      format: 'micro',
      length: 280,
      imageStyle: 'engaging',
      imageCount: 1,
    },
    linkedin: {
      format: 'professional',
      length: 1300,
      imageStyle: 'business',
      imageCount: 2,
    },
    instagram: {
      format: 'visual-first',
      length: 2200,
      imageStyle: 'lifestyle',
      imageCount: 5,
    },
    facebook: {
      format: 'conversational',
      length: 500,
      imageStyle: 'friendly',
      imageCount: 2,
    },
  }

  return configs[channel] || configs['blog']
}

Example 3: Data Processing Pipeline

Process large datasets through multiple transformation services.

import $, { db, on, send } from 'sdk.do'

export const dataProcessingPipeline = await $.Service.create({
  name: 'Parallel Data Processing Pipeline',
  description: 'Process data through multiple parallel transformation and enrichment services',
  type: $.ServiceType.DataProcessing,

  pricing: {
    model: 'per-record',
    pricePerUnit: 0.01,
  },

  workflow: async (inputs: { dataset: string; transformations: string[]; validations: string[]; enrichments: string[] }) => {
    const startTime = Date.now()

    // Load dataset
    const data = await db.query($.Dataset, {
      where: { id: inputs.dataset },
    })

    const records = data.records

    // Process records in batches
    const batchSize = 100
    const batches = []

    for (let i = 0; i < records.length; i += batchSize) {
      batches.push(records.slice(i, i + batchSize))
    }

    // Process batches in parallel
    const processedBatches = await Promise.all(
      batches.map(async (batch, batchIndex) => {
        // For each record in batch, run all services in parallel
        const processedRecords = await Promise.all(
          batch.map(async (record) => {
            const [transformations, validations, enrichments] = await Promise.all([
              // Transformations
              Promise.all(
                inputs.transformations.map((transformationId) =>
                  $.ServiceExecution.start({
                    serviceId: transformationId,
                    inputs: { record },
                  }).then((e) => e.waitForCompletion())
                )
              ),

              // Validations
              Promise.all(
                inputs.validations.map((validationId) =>
                  $.ServiceExecution.start({
                    serviceId: validationId,
                    inputs: { record },
                  }).then((e) => e.waitForCompletion())
                )
              ),

              // Enrichments
              Promise.all(
                inputs.enrichments.map((enrichmentId) =>
                  $.ServiceExecution.start({
                    serviceId: enrichmentId,
                    inputs: { record },
                  }).then((e) => e.waitForCompletion())
                )
              ),
            ])

            // Merge all results
            let processedRecord = { ...record }

            // Apply transformations
            for (const transformation of transformations) {
              processedRecord = {
                ...processedRecord,
                ...transformation.outputs.record,
              }
            }

            // Add validation results
            const validationResults = validations.map((v) => ({
              validator: v.serviceId,
              isValid: v.outputs.isValid,
              errors: v.outputs.errors || [],
            }))

            processedRecord.validations = validationResults
            processedRecord.isValid = validationResults.every((v) => v.isValid)

            // Add enrichments
            for (const enrichment of enrichments) {
              processedRecord = {
                ...processedRecord,
                ...enrichment.outputs.enrichedData,
              }
            }

            return processedRecord
          })
        )

        return {
          batchIndex,
          records: processedRecords,
          processedAt: new Date().toISOString(),
        }
      })
    )

    // Flatten results
    const allProcessedRecords = processedBatches.flatMap((b) => b.records)

    // Calculate statistics
    const stats = {
      totalRecords: records.length,
      validRecords: allProcessedRecords.filter((r) => r.isValid).length,
      invalidRecords: allProcessedRecords.filter((r) => !r.isValid).length,
      transformationsApplied: inputs.transformations.length,
      validationsRun: inputs.validations.length,
      enrichmentsAdded: inputs.enrichments.length,
      batchesProcessed: batches.length,
      totalDuration: Date.now() - startTime,
      recordsPerSecond: records.length / ((Date.now() - startTime) / 1000),
    }

    // Store results
    const resultDataset = await db.create($.Dataset, {
      name: `${data.name} - Processed`,
      records: allProcessedRecords,
      metadata: {
        sourceDataset: inputs.dataset,
        processedAt: new Date().toISOString(),
        statistics: stats,
      },
    })

    return {
      datasetId: resultDataset.id,
      statistics: stats,
      sampleRecords: allProcessedRecords.slice(0, 10),
    }
  },
})

Advanced Patterns

Pattern 1: Graceful Degradation

Continue operation even when some services fail.

import $, { db } from 'sdk.do'

export const gracefulDegradationPattern = async (inputs: any) => {
  // Execute services with different priority levels
  const [critical, important, optional] = await Promise.allSettled([
    // Critical services - must succeed
    Promise.all([executeService('critical-service-1', inputs), executeService('critical-service-2', inputs)]),

    // Important services - should succeed
    Promise.all([executeService('important-service-1', inputs), executeService('important-service-2', inputs), executeService('important-service-3', inputs)]),

    // Optional services - nice to have
    Promise.all([
      executeService('optional-service-1', inputs),
      executeService('optional-service-2', inputs),
      executeService('optional-service-3', inputs),
      executeService('optional-service-4', inputs),
    ]),
  ])

  // Check critical services
  if (critical.status === 'rejected') {
    throw new Error('Critical services failed - cannot continue')
  }

  // Warn about important service failures
  if (important.status === 'rejected') {
    console.warn('Some important services failed:', important.reason)
  }

  // Aggregate all successful results
  const results = {
    critical: critical.status === 'fulfilled' ? critical.value : [],
    important: important.status === 'fulfilled' ? important.value : [],
    optional: optional.status === 'fulfilled' ? optional.value : [],
  }

  return {
    data: aggregateResults(results),
    completeness: calculateCompleteness(results),
    degraded: important.status === 'rejected' || optional.status === 'rejected',
  }
}

async function executeService(serviceId: string, inputs: any) {
  const execution = await $.ServiceExecution.start({
    serviceId,
    inputs,
    timeout: 10000,
  })

  return await execution.waitForCompletion()
}

function calculateCompleteness(results: any): number {
  const total = results.critical.length + results.important.length + results.optional.length
  const expected = 2 + 3 + 4 // Based on service counts
  return total / expected
}

function aggregateResults(results: any): any {
  return {
    ...results.critical.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
    ...results.important.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
    ...results.optional.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
  }
}

Pattern 2: Result Caching and Deduplication

Avoid redundant service executions.

import $, { db } from 'sdk.do'

class ServiceCache {
  private cache = new Map<string, any>()

  private getCacheKey(serviceId: string, inputs: any): string {
    return `${serviceId}:${JSON.stringify(inputs)}`
  }

  async execute(serviceId: string, inputs: any): Promise<any> {
    const cacheKey = this.getCacheKey(serviceId, inputs)

    // Check cache
    if (this.cache.has(cacheKey)) {
      console.log(`Cache hit for ${serviceId}`)
      return this.cache.get(cacheKey)
    }

    // Execute service
    const execution = await $.ServiceExecution.start({
      serviceId,
      inputs,
    })

    const result = await execution.waitForCompletion()

    // Cache result
    this.cache.set(cacheKey, result)

    return result
  }

  async executeMultiple(services: Array<{ id: string; inputs: any }>): Promise<any[]> {
    // Deduplicate service calls
    const uniqueServices = new Map<string, { id: string; inputs: any }>()

    for (const service of services) {
      const cacheKey = this.getCacheKey(service.id, service.inputs)
      if (!uniqueServices.has(cacheKey)) {
        uniqueServices.set(cacheKey, service)
      }
    }

    // Execute unique services in parallel
    const results = await Promise.all(Array.from(uniqueServices.values()).map((service) => this.execute(service.id, service.inputs)))

    // Map results back to original service order
    return services.map((service) => {
      const cacheKey = this.getCacheKey(service.id, service.inputs)
      const index = Array.from(uniqueServices.keys()).indexOf(cacheKey)
      return results[index]
    })
  }
}

// Usage
const cache = new ServiceCache()

const results = await cache.executeMultiple([
  { id: 'service-1', inputs: { data: 'A' } },
  { id: 'service-2', inputs: { data: 'B' } },
  { id: 'service-1', inputs: { data: 'A' } }, // Deduplicated
  { id: 'service-3', inputs: { data: 'C' } },
  { id: 'service-2', inputs: { data: 'B' } }, // Deduplicated
])

Pattern 3: Dynamic Service Selection

Choose services at runtime based on requirements.

import $, { db, ai } from 'sdk.do'

export const dynamicServiceSelector = async (inputs: {
  task: string
  requirements: {
    quality: 'low' | 'medium' | 'high' | 'premium'
    speed: 'fast' | 'balanced' | 'thorough'
    budget: 'low' | 'medium' | 'high' | 'unlimited'
  }
}) => {
  // Get available services for task
  const availableServices = await db.query($.Service, {
    where: {
      capabilities: { contains: inputs.task },
    },
  })

  // Score and rank services
  const rankedServices = availableServices.map((service) => {
    let score = 0

    // Quality scoring
    if (inputs.requirements.quality === 'premium' && service.qualityRating >= 4.5) {
      score += 40
    } else if (inputs.requirements.quality === 'high' && service.qualityRating >= 4.0) {
      score += 30
    } else if (inputs.requirements.quality === 'medium' && service.qualityRating >= 3.5) {
      score += 20
    } else if (inputs.requirements.quality === 'low') {
      score += 10
    }

    // Speed scoring
    if (inputs.requirements.speed === 'fast' && service.avgExecutionTime < 5000) {
      score += 30
    } else if (inputs.requirements.speed === 'balanced' && service.avgExecutionTime < 15000) {
      score += 20
    } else if (inputs.requirements.speed === 'thorough') {
      score += 10
    }

    // Budget scoring
    const costPerExecution = service.pricing.basePrice || 0
    if (inputs.requirements.budget === 'low' && costPerExecution < 0.5) {
      score += 30
    } else if (inputs.requirements.budget === 'medium' && costPerExecution < 2.0) {
      score += 20
    } else if (inputs.requirements.budget === 'high' && costPerExecution < 10.0) {
      score += 10
    } else if (inputs.requirements.budget === 'unlimited') {
      score += 5
    }

    return {
      service,
      score,
    }
  })

  // Sort by score
  rankedServices.sort((a, b) => b.score - a.score)

  // Select top N services for parallel execution
  const selectedServices = rankedServices.slice(0, 3)

  // Execute selected services in parallel
  const results = await Promise.all(
    selectedServices.map(async ({ service }) => {
      try {
        const execution = await $.ServiceExecution.start({
          serviceId: service.id,
          inputs: inputs,
        })

        return {
          serviceId: service.id,
          serviceName: service.name,
          result: await execution.waitForCompletion(),
          score: selectedServices.find((s) => s.service.id === service.id)?.score,
        }
      } catch (error) {
        return {
          serviceId: service.id,
          serviceName: service.name,
          error: error.message,
          score: selectedServices.find((s) => s.service.id === service.id)?.score,
        }
      }
    })
  )

  // Select best result
  const successfulResults = results.filter((r) => !r.error)

  if (successfulResults.length === 0) {
    throw new Error('All selected services failed')
  }

  // Use AI to pick best result
  const bestResult = await ai.generate({
    model: 'gpt-5',
    prompt: `Analyze these service results and pick the best one based on quality, completeness, and relevance:

${successfulResults
  .map(
    (r, i) => `
Result ${i + 1} (from ${r.serviceName}, score: ${r.score}):
${JSON.stringify(r.result.outputs, null, 2)}
`
  )
  .join('\n')}

Return only the index number (0-${successfulResults.length - 1}) of the best result.`,
    maxTokens: 10,
  })

  const bestIndex = parseInt(bestResult.text.trim())

  return {
    result: successfulResults[bestIndex].result,
    selectedService: successfulResults[bestIndex].serviceName,
    alternativeResults: successfulResults.filter((_, i) => i !== bestIndex),
  }
}

Pattern 4: Timeout and Retry Management

Handle timeouts and retries across multiple services.

import $, { db } from 'sdk.do'

interface RetryConfig {
  maxAttempts: number
  backoff: 'linear' | 'exponential'
  initialDelay: number
  maxDelay: number
}

async function executeWithRetry(serviceId: string, inputs: any, timeout: number, retryConfig: RetryConfig): Promise<any> {
  let attempt = 0
  let lastError: Error | null = null

  while (attempt < retryConfig.maxAttempts) {
    try {
      const execution = await $.ServiceExecution.start({
        serviceId,
        inputs,
        timeout,
      })

      return await execution.waitForCompletion()
    } catch (error) {
      lastError = error as Error
      attempt++

      if (attempt < retryConfig.maxAttempts) {
        const delay = calculateDelay(attempt, retryConfig)
        console.log(`Retry ${attempt}/${retryConfig.maxAttempts} for ${serviceId} after ${delay}ms`)
        await new Promise((resolve) => setTimeout(resolve, delay))
      }
    }
  }

  throw new Error(`Service ${serviceId} failed after ${attempt} attempts: ${lastError?.message}`)
}

function calculateDelay(attempt: number, config: RetryConfig): number {
  let delay: number

  if (config.backoff === 'linear') {
    delay = config.initialDelay * attempt
  } else {
    delay = config.initialDelay * Math.pow(2, attempt - 1)
  }

  return Math.min(delay, config.maxDelay)
}

// Multi-service execution with retry
export const multiServiceWithRetry = async (
  services: Array<{
    id: string
    inputs: any
    timeout?: number
    retry?: RetryConfig
  }>
) => {
  const defaultRetryConfig: RetryConfig = {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelay: 1000,
    maxDelay: 10000,
  }

  const results = await Promise.allSettled(
    services.map((service) => executeWithRetry(service.id, service.inputs, service.timeout || 30000, service.retry || defaultRetryConfig))
  )

  return results.map((result, index) => ({
    serviceId: services[index].id,
    status: result.status,
    data: result.status === 'fulfilled' ? result.value : null,
    error: result.status === 'rejected' ? result.reason : null,
  }))
}

Performance Optimization

Batch Processing

import $, { db } from 'sdk.do'

export const batchProcessor = async (items: any[], batchSize: number, serviceId: string) => {
  const batches = []

  for (let i = 0; i < items.length; i += batchSize) {
    batches.push(items.slice(i, i + batchSize))
  }

  // Process batches in parallel
  const results = await Promise.all(
    batches.map(async (batch, index) => {
      const execution = await $.ServiceExecution.start({
        serviceId,
        inputs: { batch, batchIndex: index },
      })

      return await execution.waitForCompletion()
    })
  )

  // Flatten results
  return results.flatMap((r) => r.outputs.processedItems)
}

Resource Pooling

import $, { db } from 'sdk.do'

class ServicePool {
  private maxConcurrent: number
  private activeExecutions = 0
  private queue: Array<() => Promise<any>> = []

  constructor(maxConcurrent: number) {
    this.maxConcurrent = maxConcurrent
  }

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.activeExecutions >= this.maxConcurrent) {
      await new Promise<void>((resolve) => {
        this.queue.push(async () => {
          resolve()
          return Promise.resolve()
        })
      })
    }

    this.activeExecutions++

    try {
      return await fn()
    } finally {
      this.activeExecutions--

      if (this.queue.length > 0) {
        const next = this.queue.shift()
        if (next) next()
      }
    }
  }
}

// Usage
const pool = new ServicePool(10) // Max 10 concurrent executions

const results = await Promise.all(
  services.map((service) =>
    pool.execute(() =>
      $.ServiceExecution.start({
        serviceId: service.id,
        inputs: service.inputs,
      }).then((e) => e.waitForCompletion())
    )
  )
)

Monitoring and Observability

Execution Tracking

import $, { db, send } from 'sdk.do'

export const monitoredMultiService = async (
  services: Array<{ id: string; inputs: any }>,
  monitoringConfig: {
    enableMetrics: boolean
    enableLogging: boolean
    enableTracing: boolean
  }
) => {
  const executionId = generateId()
  const startTime = Date.now()

  // Log start
  if (monitoringConfig.enableLogging) {
    await send($.Log.create, {
      level: 'info',
      message: 'Multi-service execution started',
      metadata: {
        executionId,
        serviceCount: services.length,
      },
    })
  }

  // Execute services
  const results = await Promise.allSettled(
    services.map(async (service, index) => {
      const serviceStartTime = Date.now()

      try {
        const execution = await $.ServiceExecution.start({
          serviceId: service.id,
          inputs: service.inputs,
          metadata: {
            parentExecutionId: executionId,
            serviceIndex: index,
          },
        })

        const result = await execution.waitForCompletion()

        // Emit metrics
        if (monitoringConfig.enableMetrics) {
          await send($.Metric.record, {
            name: 'service.execution.success',
            value: 1,
            tags: {
              serviceId: service.id,
              executionId,
              duration: Date.now() - serviceStartTime,
            },
          })
        }

        return result
      } catch (error) {
        // Emit error metric
        if (monitoringConfig.enableMetrics) {
          await send($.Metric.record, {
            name: 'service.execution.failure',
            value: 1,
            tags: {
              serviceId: service.id,
              executionId,
              error: (error as Error).message,
            },
          })
        }

        throw error
      }
    })
  )

  const duration = Date.now() - startTime

  // Log completion
  if (monitoringConfig.enableLogging) {
    await send($.Log.create, {
      level: 'info',
      message: 'Multi-service execution completed',
      metadata: {
        executionId,
        duration,
        successCount: results.filter((r) => r.status === 'fulfilled').length,
        failureCount: results.filter((r) => r.status === 'rejected').length,
      },
    })
  }

  return {
    executionId,
    duration,
    results,
  }
}

function generateId(): string {
  return `exec_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}

Error Handling Strategies

Circuit Breaker Pattern

class CircuitBreaker {
  private failures = 0
  private lastFailureTime: number | null = null
  private state: 'closed' | 'open' | 'half-open' = 'closed'

  constructor(
    private threshold: number,
    private timeout: number,
    private resetTimeout: number
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() - (this.lastFailureTime || 0) > this.resetTimeout) {
        this.state = 'half-open'
      } else {
        throw new Error('Circuit breaker is OPEN')
      }
    }

    try {
      const result = await Promise.race([fn(), new Promise<never>((_, reject) => setTimeout(() => reject(new Error('Timeout')), this.timeout))])

      if (this.state === 'half-open') {
        this.state = 'closed'
        this.failures = 0
      }

      return result
    } catch (error) {
      this.failures++
      this.lastFailureTime = Date.now()

      if (this.failures >= this.threshold) {
        this.state = 'open'
      }

      throw error
    }
  }
}

// Usage with multi-service
const breakers = new Map<string, CircuitBreaker>()

export const multiServiceWithCircuitBreaker = async (services: Array<{ id: string; inputs: any }>) => {
  const results = await Promise.allSettled(
    services.map(async (service) => {
      if (!breakers.has(service.id)) {
        breakers.set(
          service.id,
          new CircuitBreaker(5, 30000, 60000) // 5 failures, 30s timeout, 60s reset
        )
      }

      const breaker = breakers.get(service.id)!

      return breaker.execute(async () => {
        const execution = await $.ServiceExecution.start({
          serviceId: service.id,
          inputs: service.inputs,
        })

        return await execution.waitForCompletion()
      })
    })
  )

  return results
}

Testing Multi-Service Coordination

import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'

describe('Multi-Service Coordination', () => {
  it('should execute all services in parallel', async () => {
    const mockServices = [
      { id: 'service-1', inputs: { data: 'A' } },
      { id: 'service-2', inputs: { data: 'B' } },
      { id: 'service-3', inputs: { data: 'C' } },
    ]

    const startTime = Date.now()

    const results = await Promise.all(
      mockServices.map((service) =>
        $.ServiceExecution.start({
          serviceId: service.id,
          inputs: service.inputs,
        }).then((e) => e.waitForCompletion())
      )
    )

    const duration = Date.now() - startTime

    expect(results).toHaveLength(3)
    expect(duration).toBeLessThan(5000) // Should be faster than sequential
  })

  it('should handle partial failures gracefully', async () => {
    const mockServices = [
      { id: 'working-service', inputs: {} },
      { id: 'failing-service', inputs: {} },
      { id: 'working-service-2', inputs: {} },
    ]

    const results = await Promise.allSettled(
      mockServices.map((service) =>
        $.ServiceExecution.start({
          serviceId: service.id,
          inputs: service.inputs,
        }).then((e) => e.waitForCompletion())
      )
    )

    const successful = results.filter((r) => r.status === 'fulfilled')
    const failed = results.filter((r) => r.status === 'rejected')

    expect(successful.length).toBeGreaterThan(0)
    expect(failed.length).toBeGreaterThan(0)
  })

  it('should aggregate results correctly', async () => {
    const results = await Promise.all([Promise.resolve({ score: 85 }), Promise.resolve({ score: 92 }), Promise.resolve({ score: 78 })])

    const avgScore = results.reduce((sum, r) => sum + r.score, 0) / results.length

    expect(avgScore).toBe(85)
  })
})

Best Practices

  1. Service Independence: Ensure services can execute independently without shared state
  2. Timeout Configuration: Set appropriate timeouts for each service
  3. Error Handling: Use Promise.allSettled for graceful degradation
  4. Result Aggregation: Define clear strategies for combining results
  5. Cost Management: Track and optimize costs across all services
  6. Monitoring: Implement comprehensive observability
  7. Caching: Cache results to avoid redundant executions
  8. Rate Limiting: Respect service rate limits and quotas
  9. Resource Pooling: Limit concurrent executions to prevent overload
  10. Testing: Test with various failure scenarios

Additional Resources