Pipeline Transformation Engine
The Pipeline Transformation Engine is the second processing layer. It runs after the Organisation Data Layer, once per integration that the event has been routed to. Each integration has its own pipeline configuration that transforms the canonical Datafly event into the exact format required by the vendor’s API.
How It Works
Enriched event (from Org Data Layer)
→ Pipeline for Integration A → delivery-{integration_a} topic
→ Pipeline for Integration B → delivery-{integration_b} topic
→ Pipeline for Integration C → delivery-{integration_c} topicEach pipeline is an ordered sequence of steps. The event enters the pipeline as the canonical Datafly format and exits as the vendor-specific payload. If any step fails or a filter drops the event, processing stops for that integration without affecting others.
Pipeline Steps
A pipeline consists of one or more steps executed in order:
| Step Type | Purpose | Example |
|---|---|---|
| map | Rename or restructure fields | Map properties.revenue to params.value |
| filter | Drop events that don’t match criteria | Only send track events named Purchase |
| transform | Modify field values | Convert currency from cents to dollars |
| enrich | Add static or computed fields | Add measurement_id to every GA4 event |
| compute | Calculate new fields from expressions | Compute lifetime_value from multiple fields |
Each step type is described in detail in the Pipeline Configuration reference.
Execution Model
Step Ordering
Steps execute in the order they are defined in the configuration. The output of each step becomes the input of the next:
Input event → map → filter → transform → enrich → compute → Output payloadEarly Termination
If a filter step drops the event, the pipeline stops immediately. No subsequent steps run, and no message is published to the delivery topic.
Error Handling
If a step encounters an error (e.g. a referenced field does not exist), the behaviour depends on the step’s on_error setting:
| Setting | Behaviour |
|---|---|
skip (default) | Skip this step, continue with the next |
drop | Drop the event entirely for this integration |
fail | Send the event to the dead-letter topic for investigation |
Example: GA4 Pipeline
Here is how a pipeline transforms a canonical “Purchase” event into the GA4 Measurement Protocol format:
Input (canonical event)
{
"type": "track",
"event": "Purchase",
"userId": "user_98765",
"properties": {
"order_id": "ORD-001",
"revenue": 149.99,
"currency": "USD",
"products": [
{ "product_id": "SKU-1234", "name": "Headphones", "price": 79.99 },
{ "product_id": "SKU-5678", "name": "Cable", "price": 14.99 }
]
},
"vendorIds": {
"ga4_client_id": "1234567890.1740000000"
}
}Pipeline steps
- map — Restructure to GA4 format
- transform — Convert event name to lowercase
- enrich — Add measurement ID and API secret
Output (GA4 Measurement Protocol)
{
"client_id": "1234567890.1740000000",
"user_id": "user_98765",
"events": [
{
"name": "purchase",
"params": {
"transaction_id": "ORD-001",
"value": 149.99,
"currency": "USD",
"items": [
{ "item_id": "SKU-1234", "item_name": "Headphones", "price": 79.99 },
{ "item_id": "SKU-5678", "item_name": "Cable", "price": 14.99 }
]
}
}
]
}Example: Meta Conversions API Pipeline
Input (canonical event)
{
"type": "track",
"event": "Purchase",
"userId": "user_98765",
"properties": {
"order_id": "ORD-001",
"revenue": 149.99,
"currency": "USD"
},
"traits": {
"email": "jane@example.com",
"phone": "+15551234567"
},
"vendorIds": {
"fbp": "fb.1.1740000000.987654321"
},
"clickIds": {
"fbclid": "IwAR3..."
},
"context": {
"page": {
"url": "https://shop.example.com/checkout/success"
}
}
}Output (Meta CAPI)
{
"data": [
{
"event_name": "Purchase",
"event_time": 1740493800,
"action_source": "website",
"event_source_url": "https://shop.example.com/checkout/success",
"user_data": {
"em": ["a1b2c3..."],
"ph": ["d4e5f6..."],
"fbp": "fb.1.1740000000.987654321",
"fbc": "fb.1.1740493800.IwAR3...",
"external_id": ["user_98765"]
},
"custom_data": {
"order_id": "ORD-001",
"value": 149.99,
"currency": "USD"
}
}
]
}Note how the pipeline automatically hashes PII fields (em, ph) using SHA-256, as required by the Meta Conversions API.
Pipeline Storage
Pipeline configurations are stored in PostgreSQL as YAML or JSON documents. They are associated with a specific integration and can be versioned. The Management API and Management UI both provide interfaces for creating and editing pipelines.
| Field | Description |
|---|---|
id | Unique pipeline identifier |
integration_id | The integration this pipeline belongs to |
version | Pipeline version number (auto-incremented) |
config | YAML or JSON pipeline configuration |
created_at | Creation timestamp |
updated_at | Last modification timestamp |
is_active | Whether this version is currently live |
When you update a pipeline, the previous version is retained. You can roll back to any previous version via the Management UI or API. Only one version per integration is active at any time.
Dry-Run Testing
You can test a pipeline against a sample event without publishing to Kafka:
POST /v1/admin/transformations/{id}/dry-runThis executes the pipeline and returns the output payload without side effects. See Pipeline Configuration for the full dry-run API reference.
Performance
The Pipeline Engine is designed for high throughput:
- Pipelines are compiled into an optimised execution plan on load
- Compiled pipelines are cached in memory and invalidated on config change
- Each pipeline step operates on a shared event object to minimise memory allocation
- Pipelines execute in under 1ms for typical configurations