Pipeline API
REST API reference for pipeline.do - A data ingestion and transformation service that processes streaming events with SQL and delivers them to object storage in analytics-ready formats.
Pipeline API
A data ingestion and transformation service that processes streaming events with SQL and delivers them to object storage in analytics-ready formats.
Endpoint
Authentication
All API requests require authentication:
curl https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json"Request
Headers
Authorization: Bearer YOUR_API_KEY
Content-Type: application/json
X-Request-ID: unique-request-id (optional)Request Body
{
"operation": "pipeline",
"parameters": {
// Operation-specific parameters
},
"options": {
"timeout": 30000,
"retries": 3
}
}Response
Success Response
{
"success": true,
"data": {
// Response data
},
"meta": {
"requestId": "req_123",
"timestamp": "2025-01-01T12:00:00Z",
"duration": 145
}
}Error Response
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid parameters",
"details": {
// Error details
}
},
"meta": {
"requestId": "req_123",
"timestamp": "2025-01-01T12:00:00Z"
}
}Operations
create
Create a new data pipeline.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "create",
"parameters": {}
}'delete
Permanently remove the pipeline.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "delete",
"parameters": {}
}'start
Start processing events through the pipeline.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "start",
"parameters": {}
}'pause
Temporarily stop processing events.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "pause",
"parameters": {}
}'resume
Resume a paused pipeline.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "resume",
"parameters": {}
}'stop
Stop the pipeline and flush remaining events.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "stop",
"parameters": {}
}'ingest
Send an event to the pipeline for processing.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "ingest",
"parameters": {}
}'transform
Update the SQL transformation query.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "transform",
"parameters": {}
}'monitor
Get pipeline metrics (throughput, errors, latency).
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "monitor",
"parameters": {}
}'logs
Retrieve pipeline execution logs.
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "logs",
"parameters": {}
}'Examples
cURL
curl -X POST https://api.do/pipeline \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"operation": "pipeline",
"parameters": {}
}'JavaScript/TypeScript
const response = await fetch('https://api.do/pipeline', {
method: 'POST',
headers: {
Authorization: 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json',
},
body: JSON.stringify({
operation: 'pipeline',
parameters: {},
}),
})
const data = await response.json()Python
import requests
response = requests.post(
'https://api.do/pipeline',
headers={
'Authorization': 'Bearer YOUR_API_KEY',
'Content-Type': 'application/json',
},
json={
'operation': 'pipeline',
'parameters': {}
}
)
data = response.json()Rate Limiting
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1609459200Status Codes
200 OK- Success400 Bad Request- Invalid request401 Unauthorized- Missing/invalid API key403 Forbidden- Insufficient permissions404 Not Found- Resource not found429 Too Many Requests- Rate limit exceeded500 Internal Server Error- Server error503 Service Unavailable- Service temporarily unavailable
Error Codes
VALIDATION_ERROR- Invalid parametersAUTHENTICATION_ERROR- Invalid API keyAUTHORIZATION_ERROR- Insufficient permissionsNOT_FOUND- Resource not foundRATE_LIMIT_EXCEEDED- Too many requestsTIMEOUT- Operation timeoutINTERNAL_ERROR- Server error
Webhooks
Subscribe to events:
curl -X POST https://api.do/webhooks \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-app.com/webhook",
"events": ["pipeline.created", "pipeline.deleted", "pipeline.started", "pipeline.paused", "pipeline.resumed", "pipeline.stopped", "pipeline.ingested", "pipeline.transformed", "pipeline.written", "pipeline.failed"]
}'Best Practices
- API Keys - Store securely, never commit to git
- Error Handling - Handle all error codes gracefully
- Retries - Implement exponential backoff
- Rate Limiting - Respect rate limits
- Idempotency - Use
X-Idempotency-Keyheader - Logging - Log requests for debugging