.do

on - Event Handlers

Subscribe to and handle domain events using semantic event patterns in the SDK.do event system

on - Event Handlers

The on primitive enables you to subscribe to domain events using semantic event patterns. It provides a declarative way to build event-driven architectures where handlers automatically execute in response to events published throughout your system.

Type Signature

function on<T = unknown>(event: PathProxy, handler: EventHandler<T>, options?: EventOptions): Subscription

type EventHandler<T = unknown> = (data: T) => Promise<void> | void

interface EventOptions {
  priority?: number
  delay?: number
  retry?: {
    maxAttempts?: number
    backoff?: 'linear' | 'exponential'
  }
  source?: string
  traceId?: string
  correlationId?: string
}

interface Subscription {
  unsubscribe: () => Promise<void>
  pause: () => void
  resume: () => void
  isActive: () => boolean
  getMetadata: () => {
    subscriberId: string
    pattern: string
    subscribedAt: Date
    active: boolean
  }
}

Basic Usage

Simple Event Handler

Subscribe to a specific event and execute a handler when it occurs:

import { $, on } from 'sdk.do'

// Subscribe to order creation events
on($.Order.created, async (order) => {
  console.log('New order received:', order.id)
  console.log('Customer:', order.customerId)
  console.log('Total:', order.total)
})

Type-Safe Handlers

Use TypeScript generics for type-safe event data:

interface OrderCreatedEvent {
  id: string
  customerId: string
  total: number
  items: Array<{
    productId: string
    quantity: number
    price: number
  }>
  createdAt: Date
}

on<OrderCreatedEvent>($.Order.created, async (order) => {
  // TypeScript knows the shape of 'order'
  const itemCount = order.items.length
  const customerEmail = await db.get('Customer', order.customerId)
})

Handler with Error Handling

Implement robust error handling in your event handlers:

on($.Payment.succeeded, async (payment) => {
  try {
    // Update order status
    await db.update('Order', payment.orderId, {
      status: 'paid',
      paidAt: new Date(),
    })

    // Send confirmation email
    await send($.Email.send, {
      to: payment.customerEmail,
      subject: 'Payment Confirmed',
      template: 'payment-confirmation',
      data: { payment },
    })

    console.log(`Payment processed: ${payment.id}`)
  } catch (error) {
    console.error('Payment processing failed:', error)

    // Publish error event for monitoring
    await send($.Payment.processingFailed, {
      paymentId: payment.id,
      error: error.message,
      timestamp: new Date(),
    })

    // Retry logic could be implemented here
    throw error
  }
})

Semantic Event Patterns

Event patterns follow the $.Subject.predicate.Object semantic triple format, providing a natural way to express domain events.

Entity Lifecycle Events

Track the complete lifecycle of entities:

// Creation events
on($.Order.created, async (order) => {
  console.log('Order created:', order.id)
})

// Update events
on($.Order.updated, async (order) => {
  console.log('Order updated:', order.id)
})

// State transition events
on($.Order.shipped, async (order) => {
  await send($.Email.send, {
    to: order.customerEmail,
    subject: 'Your order has shipped',
    template: 'order-shipped',
    data: { order, trackingNumber: order.trackingNumber },
  })
})

// Completion events
on($.Order.completed, async (order) => {
  await db.create('Review', {
    orderId: order.id,
    customerId: order.customerId,
    requestedAt: new Date(),
  })
})

// Deletion events
on($.Order.deleted, async (order) => {
  console.log('Order deleted:', order.id)
  await db.delete('OrderItems', { orderId: order.id }, { cascade: true })
})

Action-Based Events

Handle events that represent actions or operations:

// User actions
on($.User.loggedIn, async (user) => {
  await db.update('User', user.id, {
    lastLoginAt: new Date(),
    loginCount: user.loginCount + 1,
  })
})

on($.User.passwordChanged, async (user) => {
  await send($.Email.send, {
    to: user.email,
    subject: 'Password Changed',
    template: 'password-change-notification',
  })
})

// System actions
on($.System.backupCompleted, async (backup) => {
  console.log('Backup completed:', backup.filename)
  await db.create('BackupLog', backup)
})

on($.System.maintenanceScheduled, async (maintenance) => {
  await send($.Notification.broadcast, {
    message: `Scheduled maintenance: ${maintenance.description}`,
    scheduledFor: maintenance.startTime,
  })
})

Nested Semantic Events

Use deeply nested semantic patterns for precise event targeting:

// User email verification
on($.User.email.verified, async (user) => {
  await db.update('User', user.id, {
    emailVerified: true,
    emailVerifiedAt: new Date(),
  })

  await send($.Welcome.email, {
    userId: user.id,
    email: user.email,
  })
})

// Product inventory updates
on($.Product.inventory.updated, async (product) => {
  if (product.inventory.quantity < product.inventory.threshold) {
    await send($.Alert.lowInventory, {
      productId: product.id,
      currentQuantity: product.inventory.quantity,
      threshold: product.inventory.threshold,
    })
  }
})

// Payment card expiration
on($.Payment.card.expiring, async (card) => {
  await send($.Email.send, {
    to: card.customerEmail,
    subject: 'Your payment card is expiring soon',
    template: 'card-expiring',
    data: { card },
  })
})

Wildcard Patterns

Subscribe to multiple related events using wildcard patterns:

// Subscribe to all Order events
on($.Order['*'], async (order) => {
  console.log('Order event occurred:', order)
  await db.create('OrderAuditLog', {
    orderId: order.id,
    event: order.eventType,
    timestamp: new Date(),
  })
})

// Subscribe to all Payment events
on($.Payment['*'], async (payment) => {
  // Track all payment activity
  await db.create('PaymentEvent', {
    paymentId: payment.id,
    type: payment.eventType,
    amount: payment.amount,
    timestamp: new Date(),
  })
})

// Subscribe to all email verification events
on($.User.email['*'], async (event) => {
  console.log('Email event:', event.type)
})

Handler Composition and Chaining

Multiple Handlers for Same Event

Register multiple handlers for a single event type:

// First handler: Update database
on($.Order.created, async (order) => {
  await db.create('OrderMetrics', {
    orderId: order.id,
    createdAt: new Date(),
    total: order.total,
  })
})

// Second handler: Send notification
on($.Order.created, async (order) => {
  await send($.Email.send, {
    to: order.customerEmail,
    subject: 'Order Confirmation',
    template: 'order-confirmation',
    data: { order },
  })
})

// Third handler: Update analytics
on($.Order.created, async (order) => {
  await api.fetch('https://analytics.example.com/track', {
    method: 'POST',
    body: JSON.stringify({
      event: 'order_created',
      properties: {
        orderId: order.id,
        total: order.total,
        customerId: order.customerId,
      },
    }),
  })
})

Event Handler Orchestration

Create complex workflows by chaining event handlers:

// Step 1: User registers
on($.User.registered, async (user) => {
  // Create user profile
  await db.create('Profile', {
    userId: user.id,
    createdAt: new Date(),
  })

  // Trigger email verification
  await send($.User.email.verificationRequested, {
    userId: user.id,
    email: user.email,
  })
})

// Step 2: Email verification requested
on($.User.email.verificationRequested, async (data) => {
  const token = crypto.randomUUID()

  await db.create('VerificationToken', {
    userId: data.userId,
    token,
    expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000), // 24 hours
  })

  await send($.Email.send, {
    to: data.email,
    subject: 'Verify your email',
    template: 'email-verification',
    data: { token },
  })
})

// Step 3: Email verified
on($.User.email.verified, async (user) => {
  // Activate account
  await db.update('User', user.id, {
    status: 'active',
    emailVerified: true,
  })

  // Send welcome email
  await send($.Welcome.email, {
    userId: user.id,
    email: user.email,
  })

  // Trigger onboarding
  await send($.User.onboarding.started, {
    userId: user.id,
  })
})

Subscription Management

Managing Subscription Lifecycle

Control when event handlers are active:

// Create subscription
const subscription = on($.Order.created, async (order) => {
  console.log('Processing order:', order.id)
})

// Check if active
if (subscription.isActive()) {
  console.log('Subscription is active')
}

// Pause subscription temporarily
subscription.pause()
console.log('Subscription paused')

// Resume subscription
subscription.resume()
console.log('Subscription resumed')

// Unsubscribe permanently
await subscription.unsubscribe()
console.log('Subscription removed')

Conditional Subscriptions

Create subscriptions that activate based on conditions:

let orderSubscription: Subscription | null = null

// Activate order processing during business hours
function activateOrderProcessing() {
  if (orderSubscription?.isActive()) {
    return // Already active
  }

  orderSubscription = on($.Order.created, async (order) => {
    await processOrder(order)
  })

  console.log('Order processing activated')
}

// Deactivate order processing outside business hours
function deactivateOrderProcessing() {
  if (orderSubscription) {
    orderSubscription.pause()
    console.log('Order processing paused')
  }
}

// Schedule activation/deactivation
every('0 9 * * MON-FRI', activateOrderProcessing) // 9 AM weekdays
every('0 17 * * MON-FRI', deactivateOrderProcessing) // 5 PM weekdays

Subscription Metadata

Access subscription information for monitoring and debugging:

const subscription = on($.Payment.succeeded, async (payment) => {
  await processPayment(payment)
})

const metadata = subscription.getMetadata()
console.log('Subscriber ID:', metadata.subscriberId)
console.log('Event pattern:', metadata.pattern)
console.log('Subscribed at:', metadata.subscribedAt)
console.log('Active:', metadata.active)

// Store for monitoring
await db.create('EventSubscription', {
  subscriberId: metadata.subscriberId,
  pattern: metadata.pattern,
  subscribedAt: metadata.subscribedAt,
  service: 'payment-processor',
})

Integration with Other Primitives

Event Handlers with Database Operations

Combine on with db for data-driven event handling:

on($.Order.created, async (order) => {
  // Get customer details
  const customer = await db.get('Customer', order.customerId)

  // Check inventory for all items
  for (const item of order.items) {
    const product = await db.get('Product', item.productId)

    if (product.inventory < item.quantity) {
      throw new Error(`Insufficient inventory for ${product.name}`)
    }

    // Update inventory
    await db.update('Product', item.productId, {
      inventory: product.inventory - item.quantity,
    })
  }

  // Create invoice
  const invoice = await db.create('Invoice', {
    orderId: order.id,
    customerId: order.customerId,
    total: order.total,
    createdAt: new Date(),
  })

  // Publish invoice created event
  await send($.Invoice.created, { invoiceId: invoice.id })
})

Event Handlers with AI Operations

Use ai within event handlers for intelligent processing:

on($.Support.ticketCreated, async (ticket) => {
  // Analyze ticket sentiment and urgency
  const analysis = await ai.generate({
    prompt: `Analyze this support ticket and extract:
    - Sentiment (positive/neutral/negative)
    - Urgency (low/medium/high/critical)
    - Category (technical/billing/account/general)

    Ticket: "${ticket.description}"`,
    model: 'gpt-5',
    schema: {
      sentiment: 'string',
      urgency: 'string',
      category: 'string',
    },
  })

  // Update ticket with AI analysis
  await db.update('Ticket', ticket.id, {
    sentiment: analysis.sentiment,
    urgency: analysis.urgency,
    category: analysis.category,
  })

  // Auto-assign based on urgency
  if (analysis.urgency === 'critical') {
    await send($.Ticket.escalated, {
      ticketId: ticket.id,
      assignToTeam: 'senior-support',
    })
  }
})

Event Handlers Triggering Other Events

Create event chains using send within handlers:

on($.Invoice.created, async (invoice) => {
  // Send invoice to customer
  await send($.Email.send, {
    to: invoice.customerEmail,
    subject: `Invoice #${invoice.number}`,
    template: 'invoice',
    data: { invoice },
  })

  // Schedule payment reminder
  await send(
    $.Payment.reminderScheduled,
    {
      invoiceId: invoice.id,
      dueDate: invoice.dueDate,
      amount: invoice.total,
    },
    {
      delay: 7 * 24 * 60 * 60 * 1000, // 7 days
    }
  )

  // Update accounting system
  await send($.Accounting.invoiceRecorded, {
    invoiceId: invoice.id,
    amount: invoice.total,
    customerId: invoice.customerId,
  })
})

Real-World Examples

Example 1: Order Processing Workflow

Complete order processing pipeline with multiple event handlers:

// 1. Order created - validate and initialize
on($.Order.created, async (order) => {
  console.log('Starting order processing:', order.id)

  // Validate customer
  const customer = await db.get('Customer', order.customerId)
  if (!customer) {
    throw new Error('Invalid customer')
  }

  // Validate products and check inventory
  let allAvailable = true
  for (const item of order.items) {
    const product = await db.get('Product', item.productId)
    if (!product || product.inventory < item.quantity) {
      allAvailable = false
      break
    }
  }

  if (allAvailable) {
    await send($.Order.validated, { orderId: order.id })
  } else {
    await send($.Order.validationFailed, {
      orderId: order.id,
      reason: 'Insufficient inventory',
    })
  }
})

// 2. Order validated - reserve inventory
on($.Order.validated, async (data) => {
  const order = await db.get('Order', data.orderId)

  for (const item of order.items) {
    await db.update('Product', item.productId, {
      reservedInventory: (product) => product.reservedInventory + item.quantity,
    })
  }

  await db.update('Order', data.orderId, {
    status: 'inventory_reserved',
  })

  await send($.Payment.requested, {
    orderId: data.orderId,
    amount: order.total,
  })
})

// 3. Payment succeeded - fulfill order
on($.Payment.succeeded, async (payment) => {
  const order = await db.get('Order', payment.orderId)

  // Update inventory
  for (const item of order.items) {
    await db.update('Product', item.productId, {
      inventory: (product) => product.inventory - item.quantity,
      reservedInventory: (product) => product.reservedInventory - item.quantity,
    })
  }

  // Update order status
  await db.update('Order', payment.orderId, {
    status: 'paid',
    paidAt: new Date(),
  })

  // Trigger fulfillment
  await send($.Order.fulfillmentRequested, {
    orderId: payment.orderId,
  })
})

// 4. Order fulfilled - ship and notify
on($.Order.fulfilled, async (data) => {
  const order = await db.get('Order', data.orderId)

  await db.update('Order', data.orderId, {
    status: 'shipped',
    shippedAt: new Date(),
    trackingNumber: data.trackingNumber,
  })

  await send($.Email.send, {
    to: order.customerEmail,
    subject: 'Your order has shipped!',
    template: 'order-shipped',
    data: {
      orderNumber: order.number,
      trackingNumber: data.trackingNumber,
    },
  })
})

Example 2: User Notification System

Multi-channel notification system with preferences:

// Subscribe to notification events
on($.Notification.requested, async (notification) => {
  const user = await db.get('User', notification.userId)
  const preferences = await db.get('NotificationPreferences', notification.userId)

  // Email notifications
  if (preferences.email && notification.channels.includes('email')) {
    await send($.Email.send, {
      to: user.email,
      subject: notification.title,
      body: notification.message,
      template: notification.template,
    })
  }

  // SMS notifications
  if (preferences.sms && notification.channels.includes('sms')) {
    await send($.SMS.send, {
      to: user.phone,
      message: notification.message,
    })
  }

  // Push notifications
  if (preferences.push && notification.channels.includes('push')) {
    await send($.Push.send, {
      userId: user.id,
      title: notification.title,
      message: notification.message,
    })
  }

  // In-app notifications
  await db.create('InAppNotification', {
    userId: user.id,
    title: notification.title,
    message: notification.message,
    read: false,
    createdAt: new Date(),
  })

  // Log notification
  await db.create('NotificationLog', {
    userId: user.id,
    type: notification.type,
    channels: notification.channels,
    sentAt: new Date(),
  })
})

Example 3: Real-Time Analytics Pipeline

Event-driven analytics with aggregation:

// Track page views
on($.Analytics.pageView, async (event) => {
  await db.create('PageView', {
    userId: event.userId,
    path: event.path,
    timestamp: new Date(),
    sessionId: event.sessionId,
    referrer: event.referrer,
  })

  // Update session
  await db.update('Session', event.sessionId, {
    lastActivityAt: new Date(),
    pageViewCount: (session) => session.pageViewCount + 1,
  })
})

// Track conversions
on($.Analytics.conversion, async (event) => {
  await db.create('Conversion', {
    userId: event.userId,
    type: event.type,
    value: event.value,
    timestamp: new Date(),
  })

  // Update user lifetime value
  await db.update('User', event.userId, {
    lifetimeValue: (user) => user.lifetimeValue + event.value,
  })

  // Check for milestones
  const user = await db.get('User', event.userId)
  if (user.lifetimeValue >= 1000 && !user.vipStatus) {
    await send($.User.vipStatusAchieved, {
      userId: event.userId,
      lifetimeValue: user.lifetimeValue,
    })
  }
})

// Aggregate metrics hourly
every('0 * * * *', async () => {
  const hourStart = new Date()
  hourStart.setMinutes(0, 0, 0)

  const hourEnd = new Date(hourStart)
  hourEnd.setHours(hourEnd.getHours() + 1)

  const pageViews = await db.list('PageView', {
    where: {
      timestamp: { gte: hourStart, lt: hourEnd },
    },
  })

  const conversions = await db.list('Conversion', {
    where: {
      timestamp: { gte: hourStart, lt: hourEnd },
    },
  })

  await db.create('HourlyMetrics', {
    hour: hourStart,
    pageViews: pageViews.length,
    conversions: conversions.length,
    conversionRate: conversions.length / pageViews.length,
    revenue: conversions.reduce((sum, c) => sum + c.value, 0),
  })
})

Performance Considerations

Handler Execution

Event handlers execute asynchronously and don't block event publishing:

// Fast handler - executes quickly
on($.Order.created, async (order) => {
  console.log('Order created:', order.id)
  await db.update('Order', order.id, { processedAt: new Date() })
})

// Slow handler - doesn't block other handlers
on($.Order.created, async (order) => {
  // Generate PDF invoice (slow operation)
  const pdf = await generateInvoicePDF(order)
  await send($.Email.send, {
    to: order.customerEmail,
    attachments: [{ filename: 'invoice.pdf', content: pdf }],
  })
})

Error Handling and Retries

Implement retry logic for transient failures:

on($.Payment.process, async (payment) => {
  let attempts = 0
  const maxAttempts = 3

  while (attempts < maxAttempts) {
    try {
      const result = await api.fetch('https://payment-gateway.example.com/charge', {
        method: 'POST',
        body: JSON.stringify({
          amount: payment.amount,
          customerId: payment.customerId,
        }),
      })

      if (result.ok) {
        await send($.Payment.succeeded, {
          paymentId: payment.id,
          transactionId: result.data.id,
        })
        return
      }
    } catch (error) {
      attempts++
      if (attempts === maxAttempts) {
        await send($.Payment.failed, {
          paymentId: payment.id,
          error: error.message,
          attempts,
        })
        throw error
      }

      // Exponential backoff
      await new Promise((resolve) => setTimeout(resolve, Math.pow(2, attempts) * 1000))
    }
  }
})

Batching Operations

Batch database operations for efficiency:

on($.Order.created, async (order) => {
  // Batch inventory updates
  const updates = order.items.map((item) => ({
    type: 'Product',
    id: item.productId,
    data: {
      inventory: (product) => product.inventory - item.quantity,
    },
  }))

  await db.batchUpdate(updates)

  // Batch analytics events
  const events = order.items.map((item) => ({
    type: 'product_sold',
    productId: item.productId,
    quantity: item.quantity,
    timestamp: new Date(),
  }))

  await db.batchCreate('AnalyticsEvent', events)
})

Authentication Requirements

Event subscriptions require authentication because they create persistent server-side resources.

Readonly Mode

Anonymous users cannot create event subscriptions:

// ❌ Requires authentication
on($.Order.created, async (order) => {
  console.log(order)
})

Authenticated Mode

With valid API key, event subscriptions work normally:

import { configure, on } from 'sdk.do'

configure({
  apiUrl: 'https://api.do',
  apiKey: process.env.SDK_DO_API_KEY,
})

// ✅ Works with authentication
on($.Order.created, async (order) => {
  console.log('Order created:', order.id)
})

Common Pitfalls

Infinite Event Loops

Avoid creating circular event chains:

// ❌ Bad: Creates infinite loop
on($.Order.updated, async (order) => {
  await db.update('Order', order.id, {
    lastModified: new Date(),
  })
  // This triggers another $.Order.updated event!
})

// ✅ Good: Check if update is needed
on($.Order.updated, async (order) => {
  const now = new Date()
  const lastModified = new Date(order.lastModified)

  if (now - lastModified > 1000) {
    // More than 1 second
    await db.update(
      'Order',
      order.id,
      {
        lastModified: now,
      },
      { silent: true }
    ) // Use silent option if available
  }
})

Memory Leaks

Always clean up subscriptions when done:

// ❌ Bad: Never cleaned up
function startListening() {
  on($.Order.created, async (order) => {
    console.log(order)
  })
}

// Called multiple times creates multiple subscriptions
startListening()
startListening()

// ✅ Good: Store and clean up
let subscription: Subscription | null = null

function startListening() {
  if (subscription) {
    await subscription.unsubscribe()
  }

  subscription = on($.Order.created, async (order) => {
    console.log(order)
  })
}

function stopListening() {
  if (subscription) {
    await subscription.unsubscribe()
    subscription = null
  }
}

Blocking Operations

Don't block the event loop with synchronous operations:

// ❌ Bad: Synchronous heavy computation
on($.Order.created, (order) => {
  // Heavy synchronous processing
  for (let i = 0; i < 1000000; i++) {
    // Complex calculation
  }
})

// ✅ Good: Offload to worker or batch
on($.Order.created, async (order) => {
  await send($.Order.processInBackground, {
    orderId: order.id,
  })
})

WebSocket Connection

The on primitive uses WebSocket for real-time event delivery:

Connection Management

The SDK automatically manages WebSocket connections:

  • Connects on first subscription
  • Automatically reconnects on disconnect
  • Resubscribes to all active patterns on reconnection
  • Closes when all subscriptions are removed

Connection Status

Monitor connection status through subscription lifecycle:

const subscription = on($.Order.created, async (order) => {
  console.log('Processing order:', order.id)
})

// Subscription is active even during reconnection
console.log('Active:', subscription.isActive())

// WebSocket reconnection is automatic and transparent
// Handlers continue to receive events after reconnection
  • send - Publish events to trigger handlers
  • every - Schedule recurring event-driven tasks
  • db - Database operations for event data
  • ai - AI operations within event handlers

Best Practices

  1. Keep handlers focused: Each handler should do one thing well
  2. Handle errors gracefully: Always implement error handling
  3. Use semantic patterns: Follow $.Subject.predicate.Object convention
  4. Clean up subscriptions: Unsubscribe when no longer needed
  5. Avoid infinite loops: Be careful with handlers that publish events
  6. Batch operations: Combine multiple database operations when possible
  7. Log events: Track event processing for debugging and monitoring
  8. Use type safety: Leverage TypeScript for type-safe event data
  9. Test handlers: Unit test event handlers in isolation
  10. Monitor performance: Track handler execution times and failures

Next Steps