Data Transformation Services
Build powerful data transformation services that convert, normalize, and cleanse data across formats and schemas
Data transformation services convert data from one format, structure, or representation to another. These services are essential for data integration, migration, and ensuring data quality across systems.
Overview
Data transformation is the backbone of modern data operations. Whether you're moving data between systems, preparing data for analysis, or ensuring consistency across platforms, transformation services automate and standardize these critical operations.
Key Capabilities
- Format Conversion: Transform data between JSON, XML, CSV, YAML, Parquet, Avro, and more
- Schema Mapping: Map fields from source to target schemas automatically
- Data Normalization: Standardize formats, units, and representations
- Data Cleansing: Remove duplicates, fix errors, and handle missing values
- Aggregation: Combine, group, and summarize data
- Enrichment Integration: Add calculated fields and derived values
Common Use Cases
- ETL Pipelines: Extract, transform, and load data between systems
- Data Migration: Move data from legacy to modern systems
- API Integration: Transform API responses to match internal schemas
- Report Generation: Convert raw data into formatted reports
- Data Standardization: Ensure consistent data across departments
- Real-time Processing: Transform streaming data on-the-fly
Building Your First Transformation Service
Let's start with a simple CSV to JSON converter:
import $, { db, on, send } from 'sdk.do'
const csvToJsonService = await $.Service.create({
name: 'CSV to JSON Converter',
description: 'Convert CSV files to JSON format with schema validation',
type: $.ServiceType.DataTransformation,
subtype: 'format-conversion',
input: {
required: ['csvData', 'schema'],
optional: ['delimiter', 'hasHeaders', 'encoding'],
},
output: {
jsonData: 'array',
recordCount: 'number',
validationErrors: 'array',
},
pricing: {
model: 'per-record',
rate: 0.001, // $0.001 per record
minimumCharge: 0.1, // $0.10 minimum
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== csvToJsonService.id) return
const { csvData, schema, delimiter = ',', hasHeaders = true } = request.inputs
try {
// Parse CSV data
const lines = csvData.trim().split('\n')
const headers = hasHeaders ? lines[0].split(delimiter) : schema.map((_, i) => `column_${i}`)
const dataLines = hasHeaders ? lines.slice(1) : lines
// Transform to JSON
const jsonData = dataLines.map((line) => {
const values = line.split(delimiter)
const record = {}
headers.forEach((header, index) => {
const fieldName = header.trim()
const fieldValue = values[index]?.trim()
// Apply schema transformations
const fieldSchema = schema.find((s) => s.name === fieldName)
if (fieldSchema) {
record[fieldName] = transformValue(fieldValue, fieldSchema.type)
} else {
record[fieldName] = fieldValue
}
})
return record
})
// Validate against schema
const validationErrors = validateRecords(jsonData, schema)
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
jsonData,
recordCount: jsonData.length,
validationErrors,
},
})
// Calculate and charge
const recordCount = jsonData.length
const cost = Math.max(recordCount * csvToJsonService.pricing.rate, csvToJsonService.pricing.minimumCharge)
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `CSV to JSON conversion (${recordCount} records)`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: false,
})
}
})
function transformValue(value: string, type: string): any {
switch (type) {
case 'number':
return parseFloat(value)
case 'integer':
return parseInt(value, 10)
case 'boolean':
return value.toLowerCase() === 'true'
case 'date':
return new Date(value)
default:
return value
}
}
function validateRecords(records: any[], schema: any[]): string[] {
const errors = []
records.forEach((record, index) => {
schema.forEach((field) => {
if (field.required && !record[field.name]) {
errors.push(`Record ${index}: Missing required field ${field.name}`)
}
if (record[field.name] && !isValidType(record[field.name], field.type)) {
errors.push(`Record ${index}: Invalid type for ${field.name}`)
}
})
})
return errors
}Advanced Format Conversion
Create a universal format converter that supports multiple formats:
const universalConverterService = await $.Service.create({
name: 'Universal Data Converter',
description: 'Convert between any supported data format',
type: $.ServiceType.DataTransformation,
subtype: 'universal-conversion',
supportedFormats: {
input: ['json', 'xml', 'csv', 'yaml', 'toml', 'parquet', 'avro', 'protobuf'],
output: ['json', 'xml', 'csv', 'yaml', 'toml', 'parquet', 'avro', 'protobuf'],
},
input: {
required: ['data', 'sourceFormat', 'targetFormat'],
optional: ['sourceSchema', 'targetSchema', 'transformations', 'options'],
},
pricing: {
model: 'per-mb',
rate: 0.1, // $0.10 per MB
complexityMultipliers: {
simple: 1.0, // Same structure (JSON to YAML)
moderate: 1.5, // Schema mapping required
complex: 2.0, // Custom transformations
},
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== universalConverterService.id) return
const { data, sourceFormat, targetFormat, sourceSchema, targetSchema, transformations, options } = request.inputs
try {
// Step 1: Parse source data
const parsed = await parseFormat(data, sourceFormat, sourceSchema)
// Step 2: Apply transformations
let transformed = parsed
if (transformations && transformations.length > 0) {
for (const transform of transformations) {
transformed = await applyTransformation(transformed, transform)
}
}
// Step 3: Map to target schema if provided
if (targetSchema) {
transformed = await mapSchema(transformed, sourceSchema, targetSchema)
}
// Step 4: Convert to target format
const converted = await convertFormat(transformed, targetFormat, options)
// Calculate complexity
const complexity = determineComplexity(sourceFormat, targetFormat, transformations, sourceSchema, targetSchema)
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
data: converted,
format: targetFormat,
originalSize: data.length,
convertedSize: converted.length,
compressionRatio: converted.length / data.length,
complexity,
},
})
// Calculate cost based on data size and complexity
const sizeInMB = data.length / (1024 * 1024)
const multiplier = universalConverterService.pricing.complexityMultipliers[complexity]
const cost = sizeInMB * universalConverterService.pricing.rate * multiplier
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `Data conversion: ${sourceFormat} to ${targetFormat} (${sizeInMB.toFixed(2)} MB, ${complexity})`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: true,
})
}
})
async function parseFormat(data: string, format: string, schema?: any): Promise<any> {
switch (format) {
case 'json':
return JSON.parse(data)
case 'xml':
return parseXML(data)
case 'csv':
return parseCSV(data, schema)
case 'yaml':
return parseYAML(data)
case 'parquet':
return parseParquet(data)
case 'avro':
return parseAvro(data, schema)
default:
throw new Error(`Unsupported source format: ${format}`)
}
}
async function convertFormat(data: any, format: string, options?: any): Promise<string> {
switch (format) {
case 'json':
return JSON.stringify(data, null, options?.indent || 2)
case 'xml':
return convertToXML(data, options)
case 'csv':
return convertToCSV(data, options)
case 'yaml':
return convertToYAML(data)
case 'parquet':
return convertToParquet(data)
case 'avro':
return convertToAvro(data, options?.schema)
default:
throw new Error(`Unsupported target format: ${format}`)
}
}Data Normalization Service
Standardize data formats, units, and representations:
const normalizationService = await $.Service.create({
name: 'Data Normalizer',
description: 'Standardize data formats, units, and representations',
type: $.ServiceType.DataTransformation,
subtype: 'normalization',
normalizations: {
dates: ['ISO-8601', 'unix-timestamp', 'relative'],
addresses: ['standard', 'postal', 'geographic'],
phoneNumbers: ['E.164', 'national', 'international'],
currencies: ['ISO-4217', 'decimal'],
names: ['titlecase', 'uppercase', 'normalized'],
units: ['metric', 'imperial', 'standard'],
},
pricing: {
model: 'per-field',
rate: 0.0001, // $0.0001 per field normalized
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== normalizationService.id) return
const { data, normalizations, rules } = request.inputs
let fieldCount = 0
try {
const normalized = Array.isArray(data) ? data : [data]
const results = normalized.map((record) => {
const normalizedRecord = { ...record }
for (const [field, normType] of Object.entries(normalizations)) {
if (normalizedRecord[field]) {
normalizedRecord[field] = normalizeField(normalizedRecord[field], normType, rules)
fieldCount++
}
}
return normalizedRecord
})
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
data: Array.isArray(data) ? results : results[0],
recordCount: results.length,
fieldsNormalized: fieldCount,
},
})
// Charge based on fields normalized
const cost = fieldCount * normalizationService.pricing.rate
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `Data normalization (${fieldCount} fields)`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: true,
})
}
})
function normalizeField(value: any, type: string, rules?: any): any {
switch (type) {
case 'date-iso8601':
return new Date(value).toISOString()
case 'phone-e164':
return normalizePhoneNumber(value, 'E.164')
case 'address-standard':
return normalizeAddress(value)
case 'currency-decimal':
return normalizeCurrency(value)
case 'name-titlecase':
return value.replace(/\w\S*/g, (txt) => txt.charAt(0).toUpperCase() + txt.substr(1).toLowerCase())
case 'email-lowercase':
return value.toLowerCase().trim()
case 'unit-metric':
return convertToMetric(value, rules?.sourceUnit)
default:
return value
}
}Schema Mapping Service
Automatically map fields between different schemas:
import $, { ai, on, send } from 'sdk.do'
const schemaMappingService = await $.Service.create({
name: 'Intelligent Schema Mapper',
description: 'AI-powered schema mapping and field transformation',
type: $.ServiceType.DataTransformation,
subtype: 'schema-mapping',
features: ['automatic-field-mapping', 'type-conversion', 'nested-object-support', 'array-handling', 'custom-transformations'],
pricing: {
model: 'hybrid',
baseRate: 5.0, // $5 for initial mapping analysis
perRecord: 0.01, // $0.01 per record transformed
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== schemaMappingService.id) return
const { sourceData, sourceSchema, targetSchema, mappingRules, automap } = request.inputs
try {
// Step 1: Analyze schemas
let mapping = mappingRules
if (automap) {
// Use AI to generate field mappings
mapping = await ai.generate({
model: 'gpt-5',
type: 'schema-mapping',
context: {
sourceSchema,
targetSchema,
sampleData: sourceData.slice(0, 5),
},
instructions: `
Analyze the source and target schemas and generate field mappings.
Consider:
- Exact name matches
- Semantic similarity (e.g., "customer_name" -> "clientName")
- Type compatibility
- Nested structures
- Array fields
Return mappings as: { "sourceField": "targetField" }
`,
})
}
// Step 2: Transform data
const transformed = sourceData.map((record) => {
const targetRecord = {}
for (const [sourceField, targetField] of Object.entries(mapping)) {
const value = getNestedValue(record, sourceField)
if (value !== undefined) {
const sourceType = getFieldType(sourceSchema, sourceField)
const targetType = getFieldType(targetSchema, targetField)
// Convert type if needed
const convertedValue = convertType(value, sourceType, targetType)
setNestedValue(targetRecord, targetField, convertedValue)
}
}
// Apply default values for missing required fields
applyDefaults(targetRecord, targetSchema)
return targetRecord
})
// Step 3: Validate transformed data
const validation = validateAgainstSchema(transformed, targetSchema)
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
data: transformed,
mapping,
recordCount: transformed.length,
validation,
},
})
// Calculate cost
const cost = schemaMappingService.pricing.baseRate + transformed.length * schemaMappingService.pricing.perRecord
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `Schema mapping (${transformed.length} records)`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: true,
})
}
})
function getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, part) => current?.[part], obj)
}
function setNestedValue(obj: any, path: string, value: any): void {
const parts = path.split('.')
const last = parts.pop()!
const target = parts.reduce((current, part) => {
if (!current[part]) current[part] = {}
return current[part]
}, obj)
target[last] = value
}
function convertType(value: any, fromType: string, toType: string): any {
if (fromType === toType) return value
// Handle common conversions
if (toType === 'string') return String(value)
if (toType === 'number') return Number(value)
if (toType === 'boolean') return Boolean(value)
if (toType === 'date') return new Date(value)
if (toType === 'array' && !Array.isArray(value)) return [value]
return value
}Data Cleansing Service
Remove duplicates, fix errors, and handle missing values:
const cleansingService = await $.Service.create({
name: 'Data Cleanser',
description: 'Remove duplicates, fix errors, and handle missing values',
type: $.ServiceType.DataTransformation,
subtype: 'data-cleansing',
operations: ['remove-duplicates', 'fix-formatting', 'handle-missing-values', 'remove-outliers', 'standardize-values', 'validate-constraints'],
pricing: {
model: 'per-record',
rate: 0.005, // $0.005 per record
volume: [
{ min: 0, max: 1000, rate: 0.005 },
{ min: 1001, max: 10000, rate: 0.003 },
{ min: 10001, max: Infinity, rate: 0.001 },
],
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== cleansingService.id) return
const { data, operations, rules } = request.inputs
const originalCount = data.length
try {
let cleaned = [...data]
const report = {
originalRecords: originalCount,
duplicatesRemoved: 0,
errorsFixed: 0,
missingValuesHandled: 0,
outliersRemoved: 0,
operations: [],
}
// Step 1: Remove duplicates
if (operations.includes('remove-duplicates')) {
const unique = removeDuplicates(cleaned, rules.uniqueFields)
report.duplicatesRemoved = cleaned.length - unique.length
cleaned = unique
report.operations.push('remove-duplicates')
}
// Step 2: Fix formatting errors
if (operations.includes('fix-formatting')) {
cleaned = cleaned.map((record) => fixFormatting(record, rules.formatting))
report.errorsFixed += cleaned.length
report.operations.push('fix-formatting')
}
// Step 3: Handle missing values
if (operations.includes('handle-missing-values')) {
cleaned = cleaned.map((record) => handleMissingValues(record, rules.missingValues))
report.missingValuesHandled += cleaned.length
report.operations.push('handle-missing-values')
}
// Step 4: Remove outliers
if (operations.includes('remove-outliers')) {
const filtered = removeOutliers(cleaned, rules.outliers)
report.outliersRemoved = cleaned.length - filtered.length
cleaned = filtered
report.operations.push('remove-outliers')
}
// Step 5: Standardize values
if (operations.includes('standardize-values')) {
cleaned = cleaned.map((record) => standardizeValues(record, rules.standardization))
report.operations.push('standardize-values')
}
// Step 6: Validate constraints
if (operations.includes('validate-constraints')) {
cleaned = cleaned.filter((record) => validateConstraints(record, rules.constraints))
report.operations.push('validate-constraints')
}
report.finalRecords = cleaned.length
report.cleanedPercentage = ((report.finalRecords / originalCount) * 100).toFixed(2)
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
data: cleaned,
report,
},
})
// Calculate cost based on volume
const rate = getVolumeRate(originalCount, cleansingService.pricing.volume)
const cost = originalCount * rate
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `Data cleansing (${originalCount} records)`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: true,
})
}
})
function removeDuplicates(data: any[], fields: string[]): any[] {
const seen = new Set()
return data.filter((record) => {
const key = fields.map((f) => record[f]).join('|')
if (seen.has(key)) return false
seen.add(key)
return true
})
}
function handleMissingValues(record: any, rules: any): any {
const result = { ...record }
for (const [field, rule] of Object.entries(rules)) {
if (result[field] === null || result[field] === undefined || result[field] === '') {
switch (rule.strategy) {
case 'default':
result[field] = rule.value
break
case 'remove':
delete result[field]
break
case 'interpolate':
result[field] = rule.interpolationFn(record)
break
}
}
}
return result
}Data Aggregation Service
Combine, group, and summarize data:
const aggregationService = await $.Service.create({
name: 'Data Aggregator',
description: 'Aggregate, group, and summarize large datasets',
type: $.ServiceType.DataTransformation,
subtype: 'aggregation',
operations: ['group-by', 'sum', 'average', 'count', 'min', 'max', 'median', 'percentile', 'distinct', 'custom'],
pricing: {
model: 'per-gb',
rate: 1.0, // $1.00 per GB processed
},
})
on.ServiceRequest.created(async (request) => {
if (request.serviceId !== aggregationService.id) return
const { data, groupBy, aggregations, filters, sort } = request.inputs
try {
// Step 1: Apply filters
let filtered = data
if (filters) {
filtered = data.filter((record) => evaluateFilters(record, filters))
}
// Step 2: Group data
const grouped = groupData(filtered, groupBy)
// Step 3: Apply aggregations
const aggregated = Object.entries(grouped).map(([key, records]) => {
const result = {
[groupBy]: key,
recordCount: records.length,
}
for (const [field, operations] of Object.entries(aggregations)) {
if (!Array.isArray(operations)) {
operations = [operations]
}
operations.forEach((op) => {
const aggKey = `${field}_${op}`
result[aggKey] = calculateAggregation(records, field, op)
})
}
return result
})
// Step 4: Sort results
if (sort) {
aggregated.sort((a, b) => {
const aVal = a[sort.field]
const bVal = b[sort.field]
return sort.order === 'asc' ? aVal - bVal : bVal - aVal
})
}
// Calculate data size
const dataSize = new Blob([JSON.stringify(data)]).size / (1024 * 1024 * 1024) // GB
// Deliver results
await send.ServiceResult.deliver({
requestId: request.id,
outputs: {
data: aggregated,
summary: {
originalRecords: data.length,
filteredRecords: filtered.length,
groups: aggregated.length,
dataSize: `${dataSize.toFixed(4)} GB`,
},
},
})
// Calculate cost
const cost = Math.max(dataSize * aggregationService.pricing.rate, 0.01)
await send.Payment.charge({
customerId: request.customerId,
amount: cost,
description: `Data aggregation (${dataSize.toFixed(4)} GB)`,
})
} catch (error) {
await send.ServiceRequest.fail({
requestId: request.id,
error: error.message,
retryable: true,
})
}
})
function groupData(data: any[], groupBy: string | string[]): Record<string, any[]> {
const groups: Record<string, any[]> = {}
const fields = Array.isArray(groupBy) ? groupBy : [groupBy]
data.forEach((record) => {
const key = fields.map((f) => record[f]).join('|')
if (!groups[key]) groups[key] = []
groups[key].push(record)
})
return groups
}
function calculateAggregation(records: any[], field: string, operation: string): any {
const values = records.map((r) => r[field]).filter((v) => v !== null && v !== undefined)
switch (operation) {
case 'sum':
return values.reduce((sum, val) => sum + val, 0)
case 'average':
return values.reduce((sum, val) => sum + val, 0) / values.length
case 'count':
return values.length
case 'min':
return Math.min(...values)
case 'max':
return Math.max(...values)
case 'median':
return calculateMedian(values)
case 'distinct':
return new Set(values).size
default:
return null
}
}Real-Time Transformation Pipeline
Build a streaming transformation service:
const streamingTransformService = await $.Service.create({
name: 'Streaming Data Transformer',
description: 'Real-time data transformation for streaming pipelines',
type: $.ServiceType.DataTransformation,
subtype: 'streaming',
throughput: '10000 records/second',
latency: '<100ms',
pricing: {
model: 'subscription',
tiers: [
{ name: 'starter', price: 100, throughput: '1000 records/sec' },
{ name: 'professional', price: 500, throughput: '10000 records/sec' },
{ name: 'enterprise', price: 2000, throughput: '100000 records/sec' },
],
},
})
on.Stream.data(async (stream) => {
const transformations = await db.Transformation.list({
where: { streamId: stream.id, enabled: true },
})
for await (const record of stream) {
try {
let transformed = record
// Apply transformations in sequence
for (const transform of transformations) {
transformed = await applyStreamTransformation(transformed, transform)
}
// Emit transformed record
await send.Stream.emit({
streamId: stream.id,
data: transformed,
})
} catch (error) {
await send.Stream.error({
streamId: stream.id,
record,
error: error.message,
})
}
}
})
async function applyStreamTransformation(record: any, transform: any): Promise<any> {
switch (transform.type) {
case 'filter':
return evaluateFilters(record, transform.conditions) ? record : null
case 'map':
return mapFields(record, transform.mapping)
case 'enrich':
const enrichment = await lookupEnrichmentData(record, transform.source)
return { ...record, ...enrichment }
case 'aggregate':
return updateAggregation(record, transform.window)
case 'custom':
return await executeCustomTransformation(record, transform.code)
default:
return record
}
}Pricing Models for Transformation Services
Per-Record Pricing
Best for: High-volume, simple transformations
pricing: {
model: 'per-record',
rate: 0.001,
volume: [
{ min: 0, max: 10000, rate: 0.001 },
{ min: 10001, max: 100000, rate: 0.0005 },
{ min: 100001, max: Infinity, rate: 0.0001 },
],
}Per-GB Pricing
Best for: Large file conversions, data warehousing
pricing: {
model: 'per-gb',
rate: 1.0,
minimumCharge: 0.10,
}Complexity-Based Pricing
Best for: Schema mapping, custom transformations
pricing: {
model: 'complexity-based',
base: 5.0,
multipliers: {
simple: 1.0,
moderate: 2.0,
complex: 4.0,
},
}Subscription Pricing
Best for: Streaming pipelines, continuous processing
pricing: {
model: 'subscription',
tiers: [
{ name: 'starter', price: 50, volume: 100000 },
{ name: 'professional', price: 200, volume: 1000000 },
{ name: 'enterprise', price: 1000, volume: Infinity },
],
}Best Practices
1. Schema Validation
Always validate data against schemas:
function validateSchema(data: any, schema: any): { valid: boolean; errors: string[] } {
const errors = []
for (const field of schema.required || []) {
if (!(field in data)) {
errors.push(`Missing required field: ${field}`)
}
}
for (const [field, value] of Object.entries(data)) {
const fieldSchema = schema.properties?.[field]
if (fieldSchema && !validateFieldType(value, fieldSchema.type)) {
errors.push(`Invalid type for field ${field}: expected ${fieldSchema.type}`)
}
}
return {
valid: errors.length === 0,
errors,
}
}2. Error Recovery
Handle transformation errors gracefully:
async function transformWithRecovery(record: any, transformation: any): Promise<any> {
try {
return await applyTransformation(record, transformation)
} catch (error) {
// Log error
await db.create($.TransformationError, {
record,
transformation,
error: error.message,
timestamp: new Date(),
})
// Apply fallback
if (transformation.fallback) {
return applyTransformation(record, transformation.fallback)
}
// Return original or null
return transformation.skipOnError ? null : record
}
}3. Performance Optimization
Batch processing for large datasets:
async function batchTransform(data: any[], transformation: any, batchSize = 1000): Promise<any[]> {
const results = []
for (let i = 0; i < data.length; i += batchSize) {
const batch = data.slice(i, i + batchSize)
const transformed = await Promise.all(batch.map((record) => applyTransformation(record, transformation)))
results.push(...transformed)
// Update progress
await send.ServiceProgress.updated({
progress: (i + batchSize) / data.length,
message: `Processed ${Math.min(i + batchSize, data.length)} of ${data.length} records`,
})
}
return results
}Next Steps
- Data Enrichment Services → - Add value to your data
- Data Validation Services → - Ensure data quality
- Data Analytics Services → - Generate insights
- Service Composition → - Combine transformation services