Multi-Service Coordination
Comprehensive guide to coordinating multiple services in parallel for complex workflows, including service dependencies, error handling, and result aggregation patterns
Execute multiple independent services in parallel to achieve complex business goals faster and more efficiently than sequential execution.
Overview
Multi-service coordination enables you to run several services simultaneously, aggregate their results, and handle failures gracefully. This pattern is essential for building high-performance, composite services that leverage multiple data sources or processing capabilities.
When to Use Multi-Service Coordination
Use this pattern when:
- Independent Operations: Services can execute without dependencies on each other's results
- Data Aggregation: You need to combine information from multiple sources
- Performance Critical: Sequential execution would be too slow
- Composite Services: Building higher-level services from existing primitives
- Parallel Processing: Tasks can benefit from concurrent execution
- Fault Tolerance: System should continue with partial results if some services fail
Avoid this pattern when:
- Services have strict ordering requirements
- One service's output is required for another's input
- Sequential execution is required for business logic
- Coordination overhead exceeds parallelization benefits
Architecture
Core Implementation
Basic Parallel Execution
import $, { db, on, send } from 'sdk.do'
// Basic multi-service coordinator
export const createMultiServiceCoordinator = async (config: {
name: string
description: string
services: Array<{
id: string
inputs: (parentInputs: any) => any
timeout?: number
}>
aggregator: (results: any[]) => any
}) => {
return await $.Service.create({
name: config.name,
description: config.description,
type: $.ServiceType.Orchestration,
workflow: async (inputs) => {
const startTime = Date.now()
// Execute all services in parallel
const servicePromises = config.services.map(async (service) => {
const serviceInputs = service.inputs(inputs)
try {
const execution = await $.ServiceExecution.start({
serviceId: service.id,
inputs: serviceInputs,
timeout: service.timeout || 30000,
})
const result = await execution.waitForCompletion()
return {
serviceId: service.id,
status: 'success',
data: result.outputs,
duration: result.duration,
}
} catch (error) {
return {
serviceId: service.id,
status: 'failed',
error: error.message,
duration: Date.now() - startTime,
}
}
})
// Wait for all services to complete
const results = await Promise.all(servicePromises)
// Aggregate results
const successfulResults = results.filter((r) => r.status === 'success')
const failedResults = results.filter((r) => r.status === 'failed')
const aggregatedData = config.aggregator(successfulResults.map((r) => r.data))
return {
data: aggregatedData,
metadata: {
totalDuration: Date.now() - startTime,
servicesExecuted: results.length,
successCount: successfulResults.length,
failureCount: failedResults.length,
failures: failedResults,
completeness: successfulResults.length / results.length,
},
}
},
})
}Example 1: Lead Intelligence Aggregator
Combine multiple data sources to create comprehensive lead profiles.
import $, { db, on, send, ai } from 'sdk.do'
export const leadIntelligenceAggregator = await $.Service.create({
name: 'Lead Intelligence Aggregator',
description: 'Aggregate data from multiple sources to create comprehensive lead profiles',
type: $.ServiceType.DataEnrichment,
pricing: {
model: 'per-execution',
basePrice: 0.5,
},
workflow: async (inputs: { email: string; name?: string; company?: string; phone?: string }) => {
const startTime = Date.now()
// Execute all enrichment services in parallel
const [emailValidation, emailReputation, socialProfiles, professionalProfile, companyData, techStack, newsArticles, contactEnrichment] =
await Promise.allSettled([
// Service 1: Email validation
$.ServiceExecution.start({
serviceId: 'email-validator',
inputs: { email: inputs.email },
}).then((e) => e.waitForCompletion()),
// Service 2: Email reputation check
$.ServiceExecution.start({
serviceId: 'email-reputation-checker',
inputs: { email: inputs.email },
}).then((e) => e.waitForCompletion()),
// Service 3: Social media profiles
$.ServiceExecution.start({
serviceId: 'social-profile-finder',
inputs: {
email: inputs.email,
name: inputs.name,
},
}).then((e) => e.waitForCompletion()),
// Service 4: Professional profile (LinkedIn)
$.ServiceExecution.start({
serviceId: 'linkedin-enrichment',
inputs: {
email: inputs.email,
name: inputs.name,
},
}).then((e) => e.waitForCompletion()),
// Service 5: Company information
$.ServiceExecution.start({
serviceId: 'company-researcher',
inputs: {
company: inputs.company || 'unknown',
email: inputs.email,
},
}).then((e) => e.waitForCompletion()),
// Service 6: Technology stack detection
$.ServiceExecution.start({
serviceId: 'tech-stack-analyzer',
inputs: {
domain: inputs.company ? extractDomain(inputs.company) : null,
},
}).then((e) => e.waitForCompletion()),
// Service 7: Recent news and press releases
$.ServiceExecution.start({
serviceId: 'news-aggregator',
inputs: {
company: inputs.company,
person: inputs.name,
limit: 5,
},
}).then((e) => e.waitForCompletion()),
// Service 8: Additional contact info
$.ServiceExecution.start({
serviceId: 'contact-finder',
inputs: {
email: inputs.email,
phone: inputs.phone,
},
}).then((e) => e.waitForCompletion()),
])
// Extract successful results
const extractResult = (result: PromiseSettledResult<any>) => {
if (result.status === 'fulfilled') {
return result.value.outputs
}
return null
}
const emailData = extractResult(emailValidation)
const reputationData = extractResult(emailReputation)
const socialData = extractResult(socialProfiles)
const professionalData = extractResult(professionalProfile)
const companyInfo = extractResult(companyData)
const techData = extractResult(techStack)
const newsData = extractResult(newsArticles)
const contactData = extractResult(contactEnrichment)
// Calculate completeness score
const dataPoints = [
emailData?.isValid,
reputationData?.score,
socialData?.profiles?.length > 0,
professionalData?.profile,
companyInfo?.name,
techData?.technologies?.length > 0,
newsData?.articles?.length > 0,
contactData?.additionalEmails?.length > 0,
]
const completeness = dataPoints.filter(Boolean).length / dataPoints.length
// Aggregate into comprehensive profile
const profile = {
contact: {
email: inputs.email,
isValidEmail: emailData?.isValid || false,
emailReputation: reputationData?.score || 'unknown',
phone: contactData?.phone || inputs.phone,
alternateEmails: contactData?.additionalEmails || [],
social: {
linkedin: socialData?.profiles?.linkedin,
twitter: socialData?.profiles?.twitter,
github: socialData?.profiles?.github,
facebook: socialData?.profiles?.facebook,
},
},
professional: {
name: inputs.name || professionalData?.profile?.name,
title: professionalData?.profile?.title,
company: inputs.company || professionalData?.profile?.company || companyInfo?.name,
experience: professionalData?.profile?.experience || [],
education: professionalData?.profile?.education || [],
skills: professionalData?.profile?.skills || [],
},
company: companyInfo
? {
name: companyInfo.name,
domain: companyInfo.domain,
industry: companyInfo.industry,
size: companyInfo.employeeCount,
revenue: companyInfo.revenue,
founded: companyInfo.founded,
location: companyInfo.headquarters,
description: companyInfo.description,
technologies: techData?.technologies || [],
techCategories: techData?.categories || [],
}
: null,
signals: {
recentNews: newsData?.articles || [],
buyingSignals: analyzeSignals(newsData?.articles, companyInfo),
engagementScore: calculateEngagementScore({
socialPresence: socialData,
professionalActivity: professionalData,
companyGrowth: companyInfo,
}),
},
metadata: {
completeness,
dataQuality: calculateDataQuality([emailData, reputationData, socialData, professionalData, companyInfo, techData, newsData, contactData]),
enrichedAt: new Date().toISOString(),
duration: Date.now() - startTime,
sources: {
emailValidation: !!emailData,
reputation: !!reputationData,
social: !!socialData,
professional: !!professionalData,
company: !!companyInfo,
technology: !!techData,
news: !!newsData,
contacts: !!contactData,
},
},
}
// Generate AI insights
const insights = await ai.generate({
model: 'gpt-5',
prompt: `Analyze this lead profile and provide:
1. Key insights about the lead
2. Recommended engagement approach
3. Potential pain points
4. Best time to reach out
5. Personalization suggestions
Profile: ${JSON.stringify(profile, null, 2)}`,
maxTokens: 1000,
})
return {
profile,
insights: insights.text,
readyForOutreach: completeness >= 0.6 && emailData?.isValid,
}
},
})
// Helper functions
function extractDomain(companyOrEmail: string): string {
if (companyOrEmail.includes('@')) {
return companyOrEmail.split('@')[1]
}
return companyOrEmail.toLowerCase().replace(/\s+/g, '') + '.com'
}
function analyzeSignals(articles: any[], companyInfo: any): string[] {
const signals = []
if (articles?.some((a: any) => a.title.includes('funding') || a.title.includes('raised'))) {
signals.push('recent-funding')
}
if (articles?.some((a: any) => a.title.includes('hiring') || a.title.includes('expansion'))) {
signals.push('rapid-growth')
}
if (articles?.some((a: any) => a.title.includes('launch') || a.title.includes('release'))) {
signals.push('new-product')
}
if (companyInfo?.employeeCount > companyInfo?.lastYearEmployeeCount * 1.2) {
signals.push('aggressive-hiring')
}
return signals
}
function calculateEngagementScore(data: any): number {
let score = 0
if (data.socialPresence?.profiles) {
score += Object.keys(data.socialPresence.profiles).length * 10
}
if (data.professionalActivity?.profile?.skills?.length > 10) {
score += 20
}
if (data.companyGrowth?.growthRate > 0.2) {
score += 30
}
return Math.min(score, 100)
}
function calculateDataQuality(results: any[]): number {
const validResults = results.filter((r) => r !== null && Object.keys(r).length > 0)
return validResults.length / results.length
}Example 2: Content Marketing Suite
Execute multiple content generation and distribution services in parallel.
import $, { db, on, send, ai } from 'sdk.do'
export const contentMarketingSuite = await $.Service.create({
name: 'Content Marketing Suite',
description: 'Generate and distribute content across multiple channels simultaneously',
type: $.ServiceType.ContentGeneration,
pricing: {
model: 'per-execution',
basePrice: 5.0,
},
workflow: async (inputs: { topic: string; targetAudience: string; tone: string; keywords: string[]; channels: string[] }) => {
const startTime = Date.now()
// Phase 1: Research (parallel)
const [topicResearch, competitorAnalysis, trendAnalysis, keywordResearch] = await Promise.all([
$.ServiceExecution.start({
serviceId: 'topic-researcher',
inputs: {
topic: inputs.topic,
depth: 'comprehensive',
},
}).then((e) => e.waitForCompletion()),
$.ServiceExecution.start({
serviceId: 'competitor-analyzer',
inputs: {
topic: inputs.topic,
limit: 10,
},
}).then((e) => e.waitForCompletion()),
$.ServiceExecution.start({
serviceId: 'trend-analyzer',
inputs: {
topic: inputs.topic,
timeframe: '30d',
},
}).then((e) => e.waitForCompletion()),
$.ServiceExecution.start({
serviceId: 'keyword-researcher',
inputs: {
keywords: inputs.keywords,
intent: 'informational',
},
}).then((e) => e.waitForCompletion()),
])
// Phase 2: Content generation (parallel)
const contentPromises = inputs.channels.map(async (channel) => {
const channelConfig = getChannelConfig(channel)
const content = await $.ServiceExecution.start({
serviceId: 'content-generator',
inputs: {
topic: inputs.topic,
research: topicResearch.outputs.findings,
competitorInsights: competitorAnalysis.outputs.insights,
trends: trendAnalysis.outputs.trends,
keywords: keywordResearch.outputs.keywords,
format: channelConfig.format,
length: channelConfig.length,
tone: inputs.tone,
audience: inputs.targetAudience,
},
}).then((e) => e.waitForCompletion())
return {
channel,
content: content.outputs,
}
})
const contents = await Promise.all(contentPromises)
// Phase 3: Enhancement (parallel)
const enhancementPromises = contents.map(async ({ channel, content }) => {
const [seoOptimized, images, hashtags, metadata] = await Promise.allSettled([
// SEO optimization
$.ServiceExecution.start({
serviceId: 'seo-optimizer',
inputs: {
content: content.text,
keywords: keywordResearch.outputs.keywords,
channel,
},
}).then((e) => e.waitForCompletion()),
// Image generation
$.ServiceExecution.start({
serviceId: 'image-generator',
inputs: {
content: content.text,
style: getChannelConfig(channel).imageStyle,
count: getChannelConfig(channel).imageCount,
},
}).then((e) => e.waitForCompletion()),
// Hashtag generation
$.ServiceExecution.start({
serviceId: 'hashtag-generator',
inputs: {
content: content.text,
platform: channel,
count: 10,
},
}).then((e) => e.waitForCompletion()),
// Metadata generation
$.ServiceExecution.start({
serviceId: 'metadata-generator',
inputs: {
content: content.text,
channel,
},
}).then((e) => e.waitForCompletion()),
])
return {
channel,
content: {
...content,
optimized: seoOptimized.status === 'fulfilled' ? seoOptimized.value.outputs : content,
images: images.status === 'fulfilled' ? images.value.outputs.urls : [],
hashtags: hashtags.status === 'fulfilled' ? hashtags.value.outputs.hashtags : [],
metadata: metadata.status === 'fulfilled' ? metadata.value.outputs : {},
},
}
})
const enhancedContents = await Promise.all(enhancementPromises)
// Phase 4: Scheduling (parallel)
const schedulingPromises = enhancedContents.map(async ({ channel, content }) => {
const optimalTime = await $.ServiceExecution.start({
serviceId: 'optimal-time-finder',
inputs: {
channel,
audience: inputs.targetAudience,
},
}).then((e) => e.waitForCompletion())
const scheduled = await $.ServiceExecution.start({
serviceId: 'content-scheduler',
inputs: {
channel,
content: content.optimized.text,
images: content.images,
hashtags: content.hashtags,
metadata: content.metadata,
scheduledTime: optimalTime.outputs.time,
},
}).then((e) => e.waitForCompletion())
return {
channel,
scheduledAt: scheduled.outputs.scheduledAt,
postId: scheduled.outputs.postId,
}
})
const scheduled = await Promise.all(schedulingPromises)
return {
research: {
topic: topicResearch.outputs,
competitors: competitorAnalysis.outputs,
trends: trendAnalysis.outputs,
keywords: keywordResearch.outputs,
},
content: enhancedContents.map(({ channel, content }) => ({
channel,
text: content.optimized.text,
images: content.images,
hashtags: content.hashtags,
metadata: content.metadata,
})),
scheduled,
metadata: {
totalDuration: Date.now() - startTime,
channelsProcessed: inputs.channels.length,
contentPieces: enhancedContents.length,
researchSources: 4,
generatedAt: new Date().toISOString(),
},
}
},
})
function getChannelConfig(channel: string) {
const configs: Record<string, any> = {
blog: {
format: 'long-form',
length: 2000,
imageStyle: 'professional',
imageCount: 3,
},
twitter: {
format: 'micro',
length: 280,
imageStyle: 'engaging',
imageCount: 1,
},
linkedin: {
format: 'professional',
length: 1300,
imageStyle: 'business',
imageCount: 2,
},
instagram: {
format: 'visual-first',
length: 2200,
imageStyle: 'lifestyle',
imageCount: 5,
},
facebook: {
format: 'conversational',
length: 500,
imageStyle: 'friendly',
imageCount: 2,
},
}
return configs[channel] || configs['blog']
}Example 3: Data Processing Pipeline
Process large datasets through multiple transformation services.
import $, { db, on, send } from 'sdk.do'
export const dataProcessingPipeline = await $.Service.create({
name: 'Parallel Data Processing Pipeline',
description: 'Process data through multiple parallel transformation and enrichment services',
type: $.ServiceType.DataProcessing,
pricing: {
model: 'per-record',
pricePerUnit: 0.01,
},
workflow: async (inputs: { dataset: string; transformations: string[]; validations: string[]; enrichments: string[] }) => {
const startTime = Date.now()
// Load dataset
const data = await db.query($.Dataset, {
where: { id: inputs.dataset },
})
const records = data.records
// Process records in batches
const batchSize = 100
const batches = []
for (let i = 0; i < records.length; i += batchSize) {
batches.push(records.slice(i, i + batchSize))
}
// Process batches in parallel
const processedBatches = await Promise.all(
batches.map(async (batch, batchIndex) => {
// For each record in batch, run all services in parallel
const processedRecords = await Promise.all(
batch.map(async (record) => {
const [transformations, validations, enrichments] = await Promise.all([
// Transformations
Promise.all(
inputs.transformations.map((transformationId) =>
$.ServiceExecution.start({
serviceId: transformationId,
inputs: { record },
}).then((e) => e.waitForCompletion())
)
),
// Validations
Promise.all(
inputs.validations.map((validationId) =>
$.ServiceExecution.start({
serviceId: validationId,
inputs: { record },
}).then((e) => e.waitForCompletion())
)
),
// Enrichments
Promise.all(
inputs.enrichments.map((enrichmentId) =>
$.ServiceExecution.start({
serviceId: enrichmentId,
inputs: { record },
}).then((e) => e.waitForCompletion())
)
),
])
// Merge all results
let processedRecord = { ...record }
// Apply transformations
for (const transformation of transformations) {
processedRecord = {
...processedRecord,
...transformation.outputs.record,
}
}
// Add validation results
const validationResults = validations.map((v) => ({
validator: v.serviceId,
isValid: v.outputs.isValid,
errors: v.outputs.errors || [],
}))
processedRecord.validations = validationResults
processedRecord.isValid = validationResults.every((v) => v.isValid)
// Add enrichments
for (const enrichment of enrichments) {
processedRecord = {
...processedRecord,
...enrichment.outputs.enrichedData,
}
}
return processedRecord
})
)
return {
batchIndex,
records: processedRecords,
processedAt: new Date().toISOString(),
}
})
)
// Flatten results
const allProcessedRecords = processedBatches.flatMap((b) => b.records)
// Calculate statistics
const stats = {
totalRecords: records.length,
validRecords: allProcessedRecords.filter((r) => r.isValid).length,
invalidRecords: allProcessedRecords.filter((r) => !r.isValid).length,
transformationsApplied: inputs.transformations.length,
validationsRun: inputs.validations.length,
enrichmentsAdded: inputs.enrichments.length,
batchesProcessed: batches.length,
totalDuration: Date.now() - startTime,
recordsPerSecond: records.length / ((Date.now() - startTime) / 1000),
}
// Store results
const resultDataset = await db.create($.Dataset, {
name: `${data.name} - Processed`,
records: allProcessedRecords,
metadata: {
sourceDataset: inputs.dataset,
processedAt: new Date().toISOString(),
statistics: stats,
},
})
return {
datasetId: resultDataset.id,
statistics: stats,
sampleRecords: allProcessedRecords.slice(0, 10),
}
},
})Advanced Patterns
Pattern 1: Graceful Degradation
Continue operation even when some services fail.
import $, { db } from 'sdk.do'
export const gracefulDegradationPattern = async (inputs: any) => {
// Execute services with different priority levels
const [critical, important, optional] = await Promise.allSettled([
// Critical services - must succeed
Promise.all([executeService('critical-service-1', inputs), executeService('critical-service-2', inputs)]),
// Important services - should succeed
Promise.all([executeService('important-service-1', inputs), executeService('important-service-2', inputs), executeService('important-service-3', inputs)]),
// Optional services - nice to have
Promise.all([
executeService('optional-service-1', inputs),
executeService('optional-service-2', inputs),
executeService('optional-service-3', inputs),
executeService('optional-service-4', inputs),
]),
])
// Check critical services
if (critical.status === 'rejected') {
throw new Error('Critical services failed - cannot continue')
}
// Warn about important service failures
if (important.status === 'rejected') {
console.warn('Some important services failed:', important.reason)
}
// Aggregate all successful results
const results = {
critical: critical.status === 'fulfilled' ? critical.value : [],
important: important.status === 'fulfilled' ? important.value : [],
optional: optional.status === 'fulfilled' ? optional.value : [],
}
return {
data: aggregateResults(results),
completeness: calculateCompleteness(results),
degraded: important.status === 'rejected' || optional.status === 'rejected',
}
}
async function executeService(serviceId: string, inputs: any) {
const execution = await $.ServiceExecution.start({
serviceId,
inputs,
timeout: 10000,
})
return await execution.waitForCompletion()
}
function calculateCompleteness(results: any): number {
const total = results.critical.length + results.important.length + results.optional.length
const expected = 2 + 3 + 4 // Based on service counts
return total / expected
}
function aggregateResults(results: any): any {
return {
...results.critical.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
...results.important.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
...results.optional.reduce((acc: any, r: any) => ({ ...acc, ...r.outputs }), {}),
}
}Pattern 2: Result Caching and Deduplication
Avoid redundant service executions.
import $, { db } from 'sdk.do'
class ServiceCache {
private cache = new Map<string, any>()
private getCacheKey(serviceId: string, inputs: any): string {
return `${serviceId}:${JSON.stringify(inputs)}`
}
async execute(serviceId: string, inputs: any): Promise<any> {
const cacheKey = this.getCacheKey(serviceId, inputs)
// Check cache
if (this.cache.has(cacheKey)) {
console.log(`Cache hit for ${serviceId}`)
return this.cache.get(cacheKey)
}
// Execute service
const execution = await $.ServiceExecution.start({
serviceId,
inputs,
})
const result = await execution.waitForCompletion()
// Cache result
this.cache.set(cacheKey, result)
return result
}
async executeMultiple(services: Array<{ id: string; inputs: any }>): Promise<any[]> {
// Deduplicate service calls
const uniqueServices = new Map<string, { id: string; inputs: any }>()
for (const service of services) {
const cacheKey = this.getCacheKey(service.id, service.inputs)
if (!uniqueServices.has(cacheKey)) {
uniqueServices.set(cacheKey, service)
}
}
// Execute unique services in parallel
const results = await Promise.all(Array.from(uniqueServices.values()).map((service) => this.execute(service.id, service.inputs)))
// Map results back to original service order
return services.map((service) => {
const cacheKey = this.getCacheKey(service.id, service.inputs)
const index = Array.from(uniqueServices.keys()).indexOf(cacheKey)
return results[index]
})
}
}
// Usage
const cache = new ServiceCache()
const results = await cache.executeMultiple([
{ id: 'service-1', inputs: { data: 'A' } },
{ id: 'service-2', inputs: { data: 'B' } },
{ id: 'service-1', inputs: { data: 'A' } }, // Deduplicated
{ id: 'service-3', inputs: { data: 'C' } },
{ id: 'service-2', inputs: { data: 'B' } }, // Deduplicated
])Pattern 3: Dynamic Service Selection
Choose services at runtime based on requirements.
import $, { db, ai } from 'sdk.do'
export const dynamicServiceSelector = async (inputs: {
task: string
requirements: {
quality: 'low' | 'medium' | 'high' | 'premium'
speed: 'fast' | 'balanced' | 'thorough'
budget: 'low' | 'medium' | 'high' | 'unlimited'
}
}) => {
// Get available services for task
const availableServices = await db.query($.Service, {
where: {
capabilities: { contains: inputs.task },
},
})
// Score and rank services
const rankedServices = availableServices.map((service) => {
let score = 0
// Quality scoring
if (inputs.requirements.quality === 'premium' && service.qualityRating >= 4.5) {
score += 40
} else if (inputs.requirements.quality === 'high' && service.qualityRating >= 4.0) {
score += 30
} else if (inputs.requirements.quality === 'medium' && service.qualityRating >= 3.5) {
score += 20
} else if (inputs.requirements.quality === 'low') {
score += 10
}
// Speed scoring
if (inputs.requirements.speed === 'fast' && service.avgExecutionTime < 5000) {
score += 30
} else if (inputs.requirements.speed === 'balanced' && service.avgExecutionTime < 15000) {
score += 20
} else if (inputs.requirements.speed === 'thorough') {
score += 10
}
// Budget scoring
const costPerExecution = service.pricing.basePrice || 0
if (inputs.requirements.budget === 'low' && costPerExecution < 0.5) {
score += 30
} else if (inputs.requirements.budget === 'medium' && costPerExecution < 2.0) {
score += 20
} else if (inputs.requirements.budget === 'high' && costPerExecution < 10.0) {
score += 10
} else if (inputs.requirements.budget === 'unlimited') {
score += 5
}
return {
service,
score,
}
})
// Sort by score
rankedServices.sort((a, b) => b.score - a.score)
// Select top N services for parallel execution
const selectedServices = rankedServices.slice(0, 3)
// Execute selected services in parallel
const results = await Promise.all(
selectedServices.map(async ({ service }) => {
try {
const execution = await $.ServiceExecution.start({
serviceId: service.id,
inputs: inputs,
})
return {
serviceId: service.id,
serviceName: service.name,
result: await execution.waitForCompletion(),
score: selectedServices.find((s) => s.service.id === service.id)?.score,
}
} catch (error) {
return {
serviceId: service.id,
serviceName: service.name,
error: error.message,
score: selectedServices.find((s) => s.service.id === service.id)?.score,
}
}
})
)
// Select best result
const successfulResults = results.filter((r) => !r.error)
if (successfulResults.length === 0) {
throw new Error('All selected services failed')
}
// Use AI to pick best result
const bestResult = await ai.generate({
model: 'gpt-5',
prompt: `Analyze these service results and pick the best one based on quality, completeness, and relevance:
${successfulResults
.map(
(r, i) => `
Result ${i + 1} (from ${r.serviceName}, score: ${r.score}):
${JSON.stringify(r.result.outputs, null, 2)}
`
)
.join('\n')}
Return only the index number (0-${successfulResults.length - 1}) of the best result.`,
maxTokens: 10,
})
const bestIndex = parseInt(bestResult.text.trim())
return {
result: successfulResults[bestIndex].result,
selectedService: successfulResults[bestIndex].serviceName,
alternativeResults: successfulResults.filter((_, i) => i !== bestIndex),
}
}Pattern 4: Timeout and Retry Management
Handle timeouts and retries across multiple services.
import $, { db } from 'sdk.do'
interface RetryConfig {
maxAttempts: number
backoff: 'linear' | 'exponential'
initialDelay: number
maxDelay: number
}
async function executeWithRetry(serviceId: string, inputs: any, timeout: number, retryConfig: RetryConfig): Promise<any> {
let attempt = 0
let lastError: Error | null = null
while (attempt < retryConfig.maxAttempts) {
try {
const execution = await $.ServiceExecution.start({
serviceId,
inputs,
timeout,
})
return await execution.waitForCompletion()
} catch (error) {
lastError = error as Error
attempt++
if (attempt < retryConfig.maxAttempts) {
const delay = calculateDelay(attempt, retryConfig)
console.log(`Retry ${attempt}/${retryConfig.maxAttempts} for ${serviceId} after ${delay}ms`)
await new Promise((resolve) => setTimeout(resolve, delay))
}
}
}
throw new Error(`Service ${serviceId} failed after ${attempt} attempts: ${lastError?.message}`)
}
function calculateDelay(attempt: number, config: RetryConfig): number {
let delay: number
if (config.backoff === 'linear') {
delay = config.initialDelay * attempt
} else {
delay = config.initialDelay * Math.pow(2, attempt - 1)
}
return Math.min(delay, config.maxDelay)
}
// Multi-service execution with retry
export const multiServiceWithRetry = async (
services: Array<{
id: string
inputs: any
timeout?: number
retry?: RetryConfig
}>
) => {
const defaultRetryConfig: RetryConfig = {
maxAttempts: 3,
backoff: 'exponential',
initialDelay: 1000,
maxDelay: 10000,
}
const results = await Promise.allSettled(
services.map((service) => executeWithRetry(service.id, service.inputs, service.timeout || 30000, service.retry || defaultRetryConfig))
)
return results.map((result, index) => ({
serviceId: services[index].id,
status: result.status,
data: result.status === 'fulfilled' ? result.value : null,
error: result.status === 'rejected' ? result.reason : null,
}))
}Performance Optimization
Batch Processing
import $, { db } from 'sdk.do'
export const batchProcessor = async (items: any[], batchSize: number, serviceId: string) => {
const batches = []
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize))
}
// Process batches in parallel
const results = await Promise.all(
batches.map(async (batch, index) => {
const execution = await $.ServiceExecution.start({
serviceId,
inputs: { batch, batchIndex: index },
})
return await execution.waitForCompletion()
})
)
// Flatten results
return results.flatMap((r) => r.outputs.processedItems)
}Resource Pooling
import $, { db } from 'sdk.do'
class ServicePool {
private maxConcurrent: number
private activeExecutions = 0
private queue: Array<() => Promise<any>> = []
constructor(maxConcurrent: number) {
this.maxConcurrent = maxConcurrent
}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.activeExecutions >= this.maxConcurrent) {
await new Promise<void>((resolve) => {
this.queue.push(async () => {
resolve()
return Promise.resolve()
})
})
}
this.activeExecutions++
try {
return await fn()
} finally {
this.activeExecutions--
if (this.queue.length > 0) {
const next = this.queue.shift()
if (next) next()
}
}
}
}
// Usage
const pool = new ServicePool(10) // Max 10 concurrent executions
const results = await Promise.all(
services.map((service) =>
pool.execute(() =>
$.ServiceExecution.start({
serviceId: service.id,
inputs: service.inputs,
}).then((e) => e.waitForCompletion())
)
)
)Monitoring and Observability
Execution Tracking
import $, { db, send } from 'sdk.do'
export const monitoredMultiService = async (
services: Array<{ id: string; inputs: any }>,
monitoringConfig: {
enableMetrics: boolean
enableLogging: boolean
enableTracing: boolean
}
) => {
const executionId = generateId()
const startTime = Date.now()
// Log start
if (monitoringConfig.enableLogging) {
await send($.Log.create, {
level: 'info',
message: 'Multi-service execution started',
metadata: {
executionId,
serviceCount: services.length,
},
})
}
// Execute services
const results = await Promise.allSettled(
services.map(async (service, index) => {
const serviceStartTime = Date.now()
try {
const execution = await $.ServiceExecution.start({
serviceId: service.id,
inputs: service.inputs,
metadata: {
parentExecutionId: executionId,
serviceIndex: index,
},
})
const result = await execution.waitForCompletion()
// Emit metrics
if (monitoringConfig.enableMetrics) {
await send($.Metric.record, {
name: 'service.execution.success',
value: 1,
tags: {
serviceId: service.id,
executionId,
duration: Date.now() - serviceStartTime,
},
})
}
return result
} catch (error) {
// Emit error metric
if (monitoringConfig.enableMetrics) {
await send($.Metric.record, {
name: 'service.execution.failure',
value: 1,
tags: {
serviceId: service.id,
executionId,
error: (error as Error).message,
},
})
}
throw error
}
})
)
const duration = Date.now() - startTime
// Log completion
if (monitoringConfig.enableLogging) {
await send($.Log.create, {
level: 'info',
message: 'Multi-service execution completed',
metadata: {
executionId,
duration,
successCount: results.filter((r) => r.status === 'fulfilled').length,
failureCount: results.filter((r) => r.status === 'rejected').length,
},
})
}
return {
executionId,
duration,
results,
}
}
function generateId(): string {
return `exec_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`
}Error Handling Strategies
Circuit Breaker Pattern
class CircuitBreaker {
private failures = 0
private lastFailureTime: number | null = null
private state: 'closed' | 'open' | 'half-open' = 'closed'
constructor(
private threshold: number,
private timeout: number,
private resetTimeout: number
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (Date.now() - (this.lastFailureTime || 0) > this.resetTimeout) {
this.state = 'half-open'
} else {
throw new Error('Circuit breaker is OPEN')
}
}
try {
const result = await Promise.race([fn(), new Promise<never>((_, reject) => setTimeout(() => reject(new Error('Timeout')), this.timeout))])
if (this.state === 'half-open') {
this.state = 'closed'
this.failures = 0
}
return result
} catch (error) {
this.failures++
this.lastFailureTime = Date.now()
if (this.failures >= this.threshold) {
this.state = 'open'
}
throw error
}
}
}
// Usage with multi-service
const breakers = new Map<string, CircuitBreaker>()
export const multiServiceWithCircuitBreaker = async (services: Array<{ id: string; inputs: any }>) => {
const results = await Promise.allSettled(
services.map(async (service) => {
if (!breakers.has(service.id)) {
breakers.set(
service.id,
new CircuitBreaker(5, 30000, 60000) // 5 failures, 30s timeout, 60s reset
)
}
const breaker = breakers.get(service.id)!
return breaker.execute(async () => {
const execution = await $.ServiceExecution.start({
serviceId: service.id,
inputs: service.inputs,
})
return await execution.waitForCompletion()
})
})
)
return results
}Testing Multi-Service Coordination
import { describe, it, expect, vi } from 'vitest'
import $, { db } from 'sdk.do'
describe('Multi-Service Coordination', () => {
it('should execute all services in parallel', async () => {
const mockServices = [
{ id: 'service-1', inputs: { data: 'A' } },
{ id: 'service-2', inputs: { data: 'B' } },
{ id: 'service-3', inputs: { data: 'C' } },
]
const startTime = Date.now()
const results = await Promise.all(
mockServices.map((service) =>
$.ServiceExecution.start({
serviceId: service.id,
inputs: service.inputs,
}).then((e) => e.waitForCompletion())
)
)
const duration = Date.now() - startTime
expect(results).toHaveLength(3)
expect(duration).toBeLessThan(5000) // Should be faster than sequential
})
it('should handle partial failures gracefully', async () => {
const mockServices = [
{ id: 'working-service', inputs: {} },
{ id: 'failing-service', inputs: {} },
{ id: 'working-service-2', inputs: {} },
]
const results = await Promise.allSettled(
mockServices.map((service) =>
$.ServiceExecution.start({
serviceId: service.id,
inputs: service.inputs,
}).then((e) => e.waitForCompletion())
)
)
const successful = results.filter((r) => r.status === 'fulfilled')
const failed = results.filter((r) => r.status === 'rejected')
expect(successful.length).toBeGreaterThan(0)
expect(failed.length).toBeGreaterThan(0)
})
it('should aggregate results correctly', async () => {
const results = await Promise.all([Promise.resolve({ score: 85 }), Promise.resolve({ score: 92 }), Promise.resolve({ score: 78 })])
const avgScore = results.reduce((sum, r) => sum + r.score, 0) / results.length
expect(avgScore).toBe(85)
})
})Best Practices
- Service Independence: Ensure services can execute independently without shared state
- Timeout Configuration: Set appropriate timeouts for each service
- Error Handling: Use
Promise.allSettledfor graceful degradation - Result Aggregation: Define clear strategies for combining results
- Cost Management: Track and optimize costs across all services
- Monitoring: Implement comprehensive observability
- Caching: Cache results to avoid redundant executions
- Rate Limiting: Respect service rate limits and quotas
- Resource Pooling: Limit concurrent executions to prevent overload
- Testing: Test with various failure scenarios
Related Patterns
- Service Chaining - Sequential service execution
- Event-Driven - Reactive service orchestration
- Saga Pattern - Distributed transaction management
Additional Resources
- Service Types - Understanding different service types
- Composition - Building composite services
- Best Practices - Service development guidelines
- API Reference - Complete API documentation