.do
Service TypesData Services

Data Transformation Services

Build powerful data transformation services that convert, normalize, and cleanse data across formats and schemas

Data transformation services convert data from one format, structure, or representation to another. These services are essential for data integration, migration, and ensuring data quality across systems.

Overview

Data transformation is the backbone of modern data operations. Whether you're moving data between systems, preparing data for analysis, or ensuring consistency across platforms, transformation services automate and standardize these critical operations.

Key Capabilities

  • Format Conversion: Transform data between JSON, XML, CSV, YAML, Parquet, Avro, and more
  • Schema Mapping: Map fields from source to target schemas automatically
  • Data Normalization: Standardize formats, units, and representations
  • Data Cleansing: Remove duplicates, fix errors, and handle missing values
  • Aggregation: Combine, group, and summarize data
  • Enrichment Integration: Add calculated fields and derived values

Common Use Cases

  1. ETL Pipelines: Extract, transform, and load data between systems
  2. Data Migration: Move data from legacy to modern systems
  3. API Integration: Transform API responses to match internal schemas
  4. Report Generation: Convert raw data into formatted reports
  5. Data Standardization: Ensure consistent data across departments
  6. Real-time Processing: Transform streaming data on-the-fly

Building Your First Transformation Service

Let's start with a simple CSV to JSON converter:

import $, { db, on, send } from 'sdk.do'

const csvToJsonService = await $.Service.create({
  name: 'CSV to JSON Converter',
  description: 'Convert CSV files to JSON format with schema validation',
  type: $.ServiceType.DataTransformation,
  subtype: 'format-conversion',

  input: {
    required: ['csvData', 'schema'],
    optional: ['delimiter', 'hasHeaders', 'encoding'],
  },

  output: {
    jsonData: 'array',
    recordCount: 'number',
    validationErrors: 'array',
  },

  pricing: {
    model: 'per-record',
    rate: 0.001, // $0.001 per record
    minimumCharge: 0.1, // $0.10 minimum
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== csvToJsonService.id) return

  const { csvData, schema, delimiter = ',', hasHeaders = true } = request.inputs

  try {
    // Parse CSV data
    const lines = csvData.trim().split('\n')
    const headers = hasHeaders ? lines[0].split(delimiter) : schema.map((_, i) => `column_${i}`)
    const dataLines = hasHeaders ? lines.slice(1) : lines

    // Transform to JSON
    const jsonData = dataLines.map((line) => {
      const values = line.split(delimiter)
      const record = {}

      headers.forEach((header, index) => {
        const fieldName = header.trim()
        const fieldValue = values[index]?.trim()

        // Apply schema transformations
        const fieldSchema = schema.find((s) => s.name === fieldName)
        if (fieldSchema) {
          record[fieldName] = transformValue(fieldValue, fieldSchema.type)
        } else {
          record[fieldName] = fieldValue
        }
      })

      return record
    })

    // Validate against schema
    const validationErrors = validateRecords(jsonData, schema)

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        jsonData,
        recordCount: jsonData.length,
        validationErrors,
      },
    })

    // Calculate and charge
    const recordCount = jsonData.length
    const cost = Math.max(recordCount * csvToJsonService.pricing.rate, csvToJsonService.pricing.minimumCharge)

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `CSV to JSON conversion (${recordCount} records)`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: false,
    })
  }
})

function transformValue(value: string, type: string): any {
  switch (type) {
    case 'number':
      return parseFloat(value)
    case 'integer':
      return parseInt(value, 10)
    case 'boolean':
      return value.toLowerCase() === 'true'
    case 'date':
      return new Date(value)
    default:
      return value
  }
}

function validateRecords(records: any[], schema: any[]): string[] {
  const errors = []

  records.forEach((record, index) => {
    schema.forEach((field) => {
      if (field.required && !record[field.name]) {
        errors.push(`Record ${index}: Missing required field ${field.name}`)
      }

      if (record[field.name] && !isValidType(record[field.name], field.type)) {
        errors.push(`Record ${index}: Invalid type for ${field.name}`)
      }
    })
  })

  return errors
}

Advanced Format Conversion

Create a universal format converter that supports multiple formats:

const universalConverterService = await $.Service.create({
  name: 'Universal Data Converter',
  description: 'Convert between any supported data format',
  type: $.ServiceType.DataTransformation,
  subtype: 'universal-conversion',

  supportedFormats: {
    input: ['json', 'xml', 'csv', 'yaml', 'toml', 'parquet', 'avro', 'protobuf'],
    output: ['json', 'xml', 'csv', 'yaml', 'toml', 'parquet', 'avro', 'protobuf'],
  },

  input: {
    required: ['data', 'sourceFormat', 'targetFormat'],
    optional: ['sourceSchema', 'targetSchema', 'transformations', 'options'],
  },

  pricing: {
    model: 'per-mb',
    rate: 0.1, // $0.10 per MB
    complexityMultipliers: {
      simple: 1.0, // Same structure (JSON to YAML)
      moderate: 1.5, // Schema mapping required
      complex: 2.0, // Custom transformations
    },
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== universalConverterService.id) return

  const { data, sourceFormat, targetFormat, sourceSchema, targetSchema, transformations, options } = request.inputs

  try {
    // Step 1: Parse source data
    const parsed = await parseFormat(data, sourceFormat, sourceSchema)

    // Step 2: Apply transformations
    let transformed = parsed
    if (transformations && transformations.length > 0) {
      for (const transform of transformations) {
        transformed = await applyTransformation(transformed, transform)
      }
    }

    // Step 3: Map to target schema if provided
    if (targetSchema) {
      transformed = await mapSchema(transformed, sourceSchema, targetSchema)
    }

    // Step 4: Convert to target format
    const converted = await convertFormat(transformed, targetFormat, options)

    // Calculate complexity
    const complexity = determineComplexity(sourceFormat, targetFormat, transformations, sourceSchema, targetSchema)

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        data: converted,
        format: targetFormat,
        originalSize: data.length,
        convertedSize: converted.length,
        compressionRatio: converted.length / data.length,
        complexity,
      },
    })

    // Calculate cost based on data size and complexity
    const sizeInMB = data.length / (1024 * 1024)
    const multiplier = universalConverterService.pricing.complexityMultipliers[complexity]
    const cost = sizeInMB * universalConverterService.pricing.rate * multiplier

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `Data conversion: ${sourceFormat} to ${targetFormat} (${sizeInMB.toFixed(2)} MB, ${complexity})`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: true,
    })
  }
})

async function parseFormat(data: string, format: string, schema?: any): Promise<any> {
  switch (format) {
    case 'json':
      return JSON.parse(data)
    case 'xml':
      return parseXML(data)
    case 'csv':
      return parseCSV(data, schema)
    case 'yaml':
      return parseYAML(data)
    case 'parquet':
      return parseParquet(data)
    case 'avro':
      return parseAvro(data, schema)
    default:
      throw new Error(`Unsupported source format: ${format}`)
  }
}

async function convertFormat(data: any, format: string, options?: any): Promise<string> {
  switch (format) {
    case 'json':
      return JSON.stringify(data, null, options?.indent || 2)
    case 'xml':
      return convertToXML(data, options)
    case 'csv':
      return convertToCSV(data, options)
    case 'yaml':
      return convertToYAML(data)
    case 'parquet':
      return convertToParquet(data)
    case 'avro':
      return convertToAvro(data, options?.schema)
    default:
      throw new Error(`Unsupported target format: ${format}`)
  }
}

Data Normalization Service

Standardize data formats, units, and representations:

const normalizationService = await $.Service.create({
  name: 'Data Normalizer',
  description: 'Standardize data formats, units, and representations',
  type: $.ServiceType.DataTransformation,
  subtype: 'normalization',

  normalizations: {
    dates: ['ISO-8601', 'unix-timestamp', 'relative'],
    addresses: ['standard', 'postal', 'geographic'],
    phoneNumbers: ['E.164', 'national', 'international'],
    currencies: ['ISO-4217', 'decimal'],
    names: ['titlecase', 'uppercase', 'normalized'],
    units: ['metric', 'imperial', 'standard'],
  },

  pricing: {
    model: 'per-field',
    rate: 0.0001, // $0.0001 per field normalized
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== normalizationService.id) return

  const { data, normalizations, rules } = request.inputs
  let fieldCount = 0

  try {
    const normalized = Array.isArray(data) ? data : [data]

    const results = normalized.map((record) => {
      const normalizedRecord = { ...record }

      for (const [field, normType] of Object.entries(normalizations)) {
        if (normalizedRecord[field]) {
          normalizedRecord[field] = normalizeField(normalizedRecord[field], normType, rules)
          fieldCount++
        }
      }

      return normalizedRecord
    })

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        data: Array.isArray(data) ? results : results[0],
        recordCount: results.length,
        fieldsNormalized: fieldCount,
      },
    })

    // Charge based on fields normalized
    const cost = fieldCount * normalizationService.pricing.rate

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `Data normalization (${fieldCount} fields)`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: true,
    })
  }
})

function normalizeField(value: any, type: string, rules?: any): any {
  switch (type) {
    case 'date-iso8601':
      return new Date(value).toISOString()

    case 'phone-e164':
      return normalizePhoneNumber(value, 'E.164')

    case 'address-standard':
      return normalizeAddress(value)

    case 'currency-decimal':
      return normalizeCurrency(value)

    case 'name-titlecase':
      return value.replace(/\w\S*/g, (txt) => txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase())

    case 'email-lowercase':
      return value.toLowerCase().trim()

    case 'unit-metric':
      return convertToMetric(value, rules?.sourceUnit)

    default:
      return value
  }
}

Schema Mapping Service

Automatically map fields between different schemas:

import $, { ai, on, send } from 'sdk.do'

const schemaMappingService = await $.Service.create({
  name: 'Intelligent Schema Mapper',
  description: 'AI-powered schema mapping and field transformation',
  type: $.ServiceType.DataTransformation,
  subtype: 'schema-mapping',

  features: ['automatic-field-mapping', 'type-conversion', 'nested-object-support', 'array-handling', 'custom-transformations'],

  pricing: {
    model: 'hybrid',
    baseRate: 5.0, // $5 for initial mapping analysis
    perRecord: 0.01, // $0.01 per record transformed
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== schemaMappingService.id) return

  const { sourceData, sourceSchema, targetSchema, mappingRules, automap } = request.inputs

  try {
    // Step 1: Analyze schemas
    let mapping = mappingRules

    if (automap) {
      // Use AI to generate field mappings
      mapping = await ai.generate({
        model: 'gpt-5',
        type: 'schema-mapping',
        context: {
          sourceSchema,
          targetSchema,
          sampleData: sourceData.slice(0, 5),
        },
        instructions: `
          Analyze the source and target schemas and generate field mappings.
          Consider:
          - Exact name matches
          - Semantic similarity (e.g., "customer_name" -> "clientName")
          - Type compatibility
          - Nested structures
          - Array fields
          Return mappings as: { "sourceField": "targetField" }
        `,
      })
    }

    // Step 2: Transform data
    const transformed = sourceData.map((record) => {
      const targetRecord = {}

      for (const [sourceField, targetField] of Object.entries(mapping)) {
        const value = getNestedValue(record, sourceField)

        if (value !== undefined) {
          const sourceType = getFieldType(sourceSchema, sourceField)
          const targetType = getFieldType(targetSchema, targetField)

          // Convert type if needed
          const convertedValue = convertType(value, sourceType, targetType)

          setNestedValue(targetRecord, targetField, convertedValue)
        }
      }

      // Apply default values for missing required fields
      applyDefaults(targetRecord, targetSchema)

      return targetRecord
    })

    // Step 3: Validate transformed data
    const validation = validateAgainstSchema(transformed, targetSchema)

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        data: transformed,
        mapping,
        recordCount: transformed.length,
        validation,
      },
    })

    // Calculate cost
    const cost = schemaMappingService.pricing.baseRate + transformed.length * schemaMappingService.pricing.perRecord

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `Schema mapping (${transformed.length} records)`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: true,
    })
  }
})

function getNestedValue(obj: any, path: string): any {
  return path.split('.').reduce((current, part) => current?.[part], obj)
}

function setNestedValue(obj: any, path: string, value: any): void {
  const parts = path.split('.')
  const last = parts.pop()!
  const target = parts.reduce((current, part) => {
    if (!current[part]) current[part] = {}
    return current[part]
  }, obj)
  target[last] = value
}

function convertType(value: any, fromType: string, toType: string): any {
  if (fromType === toType) return value

  // Handle common conversions
  if (toType === 'string') return String(value)
  if (toType === 'number') return Number(value)
  if (toType === 'boolean') return Boolean(value)
  if (toType === 'date') return new Date(value)
  if (toType === 'array' && !Array.isArray(value)) return [value]

  return value
}

Data Cleansing Service

Remove duplicates, fix errors, and handle missing values:

const cleansingService = await $.Service.create({
  name: 'Data Cleanser',
  description: 'Remove duplicates, fix errors, and handle missing values',
  type: $.ServiceType.DataTransformation,
  subtype: 'data-cleansing',

  operations: ['remove-duplicates', 'fix-formatting', 'handle-missing-values', 'remove-outliers', 'standardize-values', 'validate-constraints'],

  pricing: {
    model: 'per-record',
    rate: 0.005, // $0.005 per record
    volume: [
      { min: 0, max: 1000, rate: 0.005 },
      { min: 1001, max: 10000, rate: 0.003 },
      { min: 10001, max: Infinity, rate: 0.001 },
    ],
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== cleansingService.id) return

  const { data, operations, rules } = request.inputs
  const originalCount = data.length

  try {
    let cleaned = [...data]
    const report = {
      originalRecords: originalCount,
      duplicatesRemoved: 0,
      errorsFixed: 0,
      missingValuesHandled: 0,
      outliersRemoved: 0,
      operations: [],
    }

    // Step 1: Remove duplicates
    if (operations.includes('remove-duplicates')) {
      const unique = removeDuplicates(cleaned, rules.uniqueFields)
      report.duplicatesRemoved = cleaned.length - unique.length
      cleaned = unique
      report.operations.push('remove-duplicates')
    }

    // Step 2: Fix formatting errors
    if (operations.includes('fix-formatting')) {
      cleaned = cleaned.map((record) => fixFormatting(record, rules.formatting))
      report.errorsFixed += cleaned.length
      report.operations.push('fix-formatting')
    }

    // Step 3: Handle missing values
    if (operations.includes('handle-missing-values')) {
      cleaned = cleaned.map((record) => handleMissingValues(record, rules.missingValues))
      report.missingValuesHandled += cleaned.length
      report.operations.push('handle-missing-values')
    }

    // Step 4: Remove outliers
    if (operations.includes('remove-outliers')) {
      const filtered = removeOutliers(cleaned, rules.outliers)
      report.outliersRemoved = cleaned.length - filtered.length
      cleaned = filtered
      report.operations.push('remove-outliers')
    }

    // Step 5: Standardize values
    if (operations.includes('standardize-values')) {
      cleaned = cleaned.map((record) => standardizeValues(record, rules.standardization))
      report.operations.push('standardize-values')
    }

    // Step 6: Validate constraints
    if (operations.includes('validate-constraints')) {
      cleaned = cleaned.filter((record) => validateConstraints(record, rules.constraints))
      report.operations.push('validate-constraints')
    }

    report.finalRecords = cleaned.length
    report.cleanedPercentage = ((report.finalRecords / originalCount) * 100).toFixed(2)

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        data: cleaned,
        report,
      },
    })

    // Calculate cost based on volume
    const rate = getVolumeRate(originalCount, cleansingService.pricing.volume)
    const cost = originalCount * rate

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `Data cleansing (${originalCount} records)`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: true,
    })
  }
})

function removeDuplicates(data: any[], fields: string[]): any[] {
  const seen = new Set()
  return data.filter((record) => {
    const key = fields.map((f) => record[f]).join('|')
    if (seen.has(key)) return false
    seen.add(key)
    return true
  })
}

function handleMissingValues(record: any, rules: any): any {
  const result = { ...record }

  for (const [field, rule] of Object.entries(rules)) {
    if (result[field] === null || result[field] === undefined || result[field] === '') {
      switch (rule.strategy) {
        case 'default':
          result[field] = rule.value
          break
        case 'remove':
          delete result[field]
          break
        case 'interpolate':
          result[field] = rule.interpolationFn(record)
          break
      }
    }
  }

  return result
}

Data Aggregation Service

Combine, group, and summarize data:

const aggregationService = await $.Service.create({
  name: 'Data Aggregator',
  description: 'Aggregate, group, and summarize large datasets',
  type: $.ServiceType.DataTransformation,
  subtype: 'aggregation',

  operations: ['group-by', 'sum', 'average', 'count', 'min', 'max', 'median', 'percentile', 'distinct', 'custom'],

  pricing: {
    model: 'per-gb',
    rate: 1.0, // $1.00 per GB processed
  },
})

on.ServiceRequest.created(async (request) => {
  if (request.serviceId !== aggregationService.id) return

  const { data, groupBy, aggregations, filters, sort } = request.inputs

  try {
    // Step 1: Apply filters
    let filtered = data
    if (filters) {
      filtered = data.filter((record) => evaluateFilters(record, filters))
    }

    // Step 2: Group data
    const grouped = groupData(filtered, groupBy)

    // Step 3: Apply aggregations
    const aggregated = Object.entries(grouped).map(([key, records]) => {
      const result = {
        [groupBy]: key,
        recordCount: records.length,
      }

      for (const [field, operations] of Object.entries(aggregations)) {
        if (!Array.isArray(operations)) {
          operations = [operations]
        }

        operations.forEach((op) => {
          const aggKey = `${field}_${op}`
          result[aggKey] = calculateAggregation(records, field, op)
        })
      }

      return result
    })

    // Step 4: Sort results
    if (sort) {
      aggregated.sort((a, b) => {
        const aVal = a[sort.field]
        const bVal = b[sort.field]
        return sort.order === 'asc' ? aVal - bVal : bVal - aVal
      })
    }

    // Calculate data size
    const dataSize = new Blob([JSON.stringify(data)]).size / (1024 * 1024 * 1024) // GB

    // Deliver results
    await send.ServiceResult.deliver({
      requestId: request.id,
      outputs: {
        data: aggregated,
        summary: {
          originalRecords: data.length,
          filteredRecords: filtered.length,
          groups: aggregated.length,
          dataSize: `${dataSize.toFixed(4)} GB`,
        },
      },
    })

    // Calculate cost
    const cost = Math.max(dataSize * aggregationService.pricing.rate, 0.01)

    await send.Payment.charge({
      customerId: request.customerId,
      amount: cost,
      description: `Data aggregation (${dataSize.toFixed(4)} GB)`,
    })
  } catch (error) {
    await send.ServiceRequest.fail({
      requestId: request.id,
      error: error.message,
      retryable: true,
    })
  }
})

function groupData(data: any[], groupBy: string | string[]): Record<string, any[]> {
  const groups: Record<string, any[]> = {}
  const fields = Array.isArray(groupBy) ? groupBy : [groupBy]

  data.forEach((record) => {
    const key = fields.map((f) => record[f]).join('|')
    if (!groups[key]) groups[key] = []
    groups[key].push(record)
  })

  return groups
}

function calculateAggregation(records: any[], field: string, operation: string): any {
  const values = records.map((r) => r[field]).filter((v) => v !== null && v !== undefined)

  switch (operation) {
    case 'sum':
      return values.reduce((sum, val) => sum + val, 0)
    case 'average':
      return values.reduce((sum, val) => sum + val, 0) / values.length
    case 'count':
      return values.length
    case 'min':
      return Math.min(...values)
    case 'max':
      return Math.max(...values)
    case 'median':
      return calculateMedian(values)
    case 'distinct':
      return new Set(values).size
    default:
      return null
  }
}

Real-Time Transformation Pipeline

Build a streaming transformation service:

const streamingTransformService = await $.Service.create({
  name: 'Streaming Data Transformer',
  description: 'Real-time data transformation for streaming pipelines',
  type: $.ServiceType.DataTransformation,
  subtype: 'streaming',

  throughput: '10000 records/second',
  latency: '<100ms',

  pricing: {
    model: 'subscription',
    tiers: [
      { name: 'starter', price: 100, throughput: '1000 records/sec' },
      { name: 'professional', price: 500, throughput: '10000 records/sec' },
      { name: 'enterprise', price: 2000, throughput: '100000 records/sec' },
    ],
  },
})

on.Stream.data(async (stream) => {
  const transformations = await db.Transformation.list({
    where: { streamId: stream.id, enabled: true },
  })

  for await (const record of stream) {
    try {
      let transformed = record

      // Apply transformations in sequence
      for (const transform of transformations) {
        transformed = await applyStreamTransformation(transformed, transform)
      }

      // Emit transformed record
      await send.Stream.emit({
        streamId: stream.id,
        data: transformed,
      })
    } catch (error) {
      await send.Stream.error({
        streamId: stream.id,
        record,
        error: error.message,
      })
    }
  }
})

async function applyStreamTransformation(record: any, transform: any): Promise<any> {
  switch (transform.type) {
    case 'filter':
      return evaluateFilters(record, transform.conditions) ? record : null

    case 'map':
      return mapFields(record, transform.mapping)

    case 'enrich':
      const enrichment = await lookupEnrichmentData(record, transform.source)
      return { ...record, ...enrichment }

    case 'aggregate':
      return updateAggregation(record, transform.window)

    case 'custom':
      return await executeCustomTransformation(record, transform.code)

    default:
      return record
  }
}

Pricing Models for Transformation Services

Per-Record Pricing

Best for: High-volume, simple transformations

pricing: {
  model: 'per-record',
  rate: 0.001,
  volume: [
    { min: 0, max: 10000, rate: 0.001 },
    { min: 10001, max: 100000, rate: 0.0005 },
    { min: 100001, max: Infinity, rate: 0.0001 },
  ],
}

Per-GB Pricing

Best for: Large file conversions, data warehousing

pricing: {
  model: 'per-gb',
  rate: 1.0,
  minimumCharge: 0.10,
}

Complexity-Based Pricing

Best for: Schema mapping, custom transformations

pricing: {
  model: 'complexity-based',
  base: 5.0,
  multipliers: {
    simple: 1.0,
    moderate: 2.0,
    complex: 4.0,
  },
}

Subscription Pricing

Best for: Streaming pipelines, continuous processing

pricing: {
  model: 'subscription',
  tiers: [
    { name: 'starter', price: 50, volume: 100000 },
    { name: 'professional', price: 200, volume: 1000000 },
    { name: 'enterprise', price: 1000, volume: Infinity },
  ],
}

Best Practices

1. Schema Validation

Always validate data against schemas:

function validateSchema(data: any, schema: any): { valid: boolean; errors: string[] } {
  const errors = []

  for (const field of schema.required || []) {
    if (!(field in data)) {
      errors.push(`Missing required field: ${field}`)
    }
  }

  for (const [field, value] of Object.entries(data)) {
    const fieldSchema = schema.properties?.[field]
    if (fieldSchema && !validateFieldType(value, fieldSchema.type)) {
      errors.push(`Invalid type for field ${field}: expected ${fieldSchema.type}`)
    }
  }

  return {
    valid: errors.length === 0,
    errors,
  }
}

2. Error Recovery

Handle transformation errors gracefully:

async function transformWithRecovery(record: any, transformation: any): Promise<any> {
  try {
    return await applyTransformation(record, transformation)
  } catch (error) {
    // Log error
    await db.create($.TransformationError, {
      record,
      transformation,
      error: error.message,
      timestamp: new Date(),
    })

    // Apply fallback
    if (transformation.fallback) {
      return applyTransformation(record, transformation.fallback)
    }

    // Return original or null
    return transformation.skipOnError ? null : record
  }
}

3. Performance Optimization

Batch processing for large datasets:

async function batchTransform(data: any[], transformation: any, batchSize = 1000): Promise<any[]> {
  const results = []

  for (let i = 0; i < data.length; i += batchSize) {
    const batch = data.slice(i, i + batchSize)
    const transformed = await Promise.all(batch.map((record) => applyTransformation(record, transformation)))

    results.push(...transformed)

    // Update progress
    await send.ServiceProgress.updated({
      progress: (i + batchSize) / data.length,
      message: `Processed ${Math.min(i + batchSize, data.length)} of ${data.length} records`,
    })
  }

  return results
}

Next Steps