Pipeline Configuration
Pipelines are defined as YAML or JSON documents that describe how to transform a canonical Datafly event into a vendor-specific payload. This page is the configuration reference.
Structure
A pipeline configuration has a top-level steps array. Each step has a type and type-specific parameters:
name: GA4 Purchase Pipeline
description: Transforms purchase events for GA4 Measurement Protocol
steps:
- type: filter
condition: "event.type = 'track' AND event.event = 'Purchase'"
- type: map
mappings:
client_id: "event.vendorIds.ga4_client_id"
user_id: "event.userId"
events[0].name: "'purchase'"
events[0].params.transaction_id: "event.properties.order_id"
events[0].params.value: "event.properties.revenue"
events[0].params.currency: "event.properties.currency"
- type: enrich
fields:
events[0].params.engagement_time_msec: 100
- type: compute
fields:
events[0].params.value:
expression: "COALESCE(event.properties.revenue, 0)"
type: numberStep Types
filter
Evaluates a condition and drops the event if it returns false. Use the expression language for conditions.
- type: filter
condition: "event.type = 'track' AND event.event IN ('Purchase', 'Add to Cart', 'Checkout Started')"
on_error: skip| Parameter | Type | Required | Description |
|---|---|---|---|
condition | string | Yes | Expression that must evaluate to true |
on_error | string | No | skip, drop, or fail (default: skip) |
map
Maps fields from the canonical event to the output payload. The left side is the destination path, the right side is the source path or a literal value.
- type: map
mappings:
client_id: "event.vendorIds.ga4_client_id"
user_id: "event.userId"
events[0].name: "event.event"
events[0].params.transaction_id: "event.properties.order_id"
events[0].params.value: "event.properties.revenue"
events[0].params.currency: "event.properties.currency"
on_missing: skip| Parameter | Type | Required | Description |
|---|---|---|---|
mappings | object | Yes | Destination-to-source field mapping |
on_missing | string | No | Behaviour when a source field is missing: skip (omit field), null (set to null), error (default: skip) |
on_error | string | No | skip, drop, or fail (default: skip) |
Source path syntax
| Syntax | Description | Example |
|---|---|---|
event.field | Reference a field on the canonical event | event.properties.revenue |
event.field[0] | Array index | event.properties.products[0].name |
event.field[*].sub | All items in an array | event.properties.products[*].product_id |
'literal' | Static string value | 'purchase' |
123 | Static numeric value | 100 |
true / false | Static boolean value | true |
transform
Modifies field values using built-in transformation functions.
- type: transform
transforms:
- field: "events[0].name"
function: lowercase
- field: "user_data.em"
function: sha256_hash
- field: "user_data.ph"
function: sha256_hash
- field: "custom_data.value"
function: divide
args: { divisor: 100 }| Parameter | Type | Required | Description |
|---|---|---|---|
transforms | array | Yes | List of field transformations |
transforms[].field | string | Yes | Path to the field to transform |
transforms[].function | string | Yes | Transformation function name |
transforms[].args | object | No | Function-specific arguments |
on_error | string | No | skip, drop, or fail (default: skip) |
Available functions
| Function | Description | Args |
|---|---|---|
lowercase | Convert to lowercase | — |
uppercase | Convert to uppercase | — |
trim | Remove leading/trailing whitespace | — |
sha256_hash | SHA-256 hash (hex output) | — |
md5_hash | MD5 hash (hex output) | — |
base64_encode | Base64 encode | — |
base64_decode | Base64 decode | — |
to_string | Convert to string | — |
to_number | Convert to number | — |
to_boolean | Convert to boolean | — |
to_unix_seconds | Convert ISO timestamp to Unix seconds | — |
to_unix_millis | Convert ISO timestamp to Unix milliseconds | — |
to_iso8601 | Convert Unix timestamp to ISO 8601 | — |
multiply | Multiply by a factor | { factor: 100 } |
divide | Divide by a divisor | { divisor: 100 } |
round | Round to N decimal places | { decimals: 2 } |
replace | String replacement | { pattern: " ", replacement: "_" } |
regex_extract | Extract via regex capture group | { pattern: "(\\d+)", group: 1 } |
default | Set a default value if null/missing | { value: "unknown" } |
array_join | Join array elements | { separator: "," } |
array_map | Map each array element | { field: "product_id" } |
enrich
Adds static or computed fields to the output. Unlike map, this does not reference the source event — it adds values directly.
- type: enrich
fields:
api_secret: "GA4_API_SECRET"
measurement_id: "G-XXXXXXXXXX"
events[0].params.engagement_time_msec: 100
data[0].action_source: "website"| Parameter | Type | Required | Description |
|---|---|---|---|
fields | object | Yes | Field-to-value mapping |
on_error | string | No | skip, drop, or fail (default: skip) |
Use enrich to inject secrets and static configuration values (API keys, measurement IDs, pixel IDs) that are the same for every event sent to an integration.
Pipeline parameter resolution: If the integration revision config contains {{pipeline_param.KEY}} placeholders, these are resolved from the pipeline’s parameters object before the config is parsed. This happens at config load time (every 5 seconds), not per-event. See Pipeline Parameters for details.
compute
Creates new fields using the expression language. Computed fields can reference both the source event and previously mapped output fields.
- type: compute
fields:
custom_data.value:
expression: "COALESCE(event.properties.revenue, event.properties.total, 0)"
type: number
user_data.fbc:
expression: "CONCAT('fb.1.', event.context.session.startedAt, '.', event.clickIds.fbclid)"
type: string
events[0].params.session_engaged:
expression: "event.context.session.eventIndex > 1"
type: boolean| Parameter | Type | Required | Description |
|---|---|---|---|
fields | object | Yes | Field definitions with expression and type |
fields.{name}.expression | string | Yes | Expression to evaluate |
fields.{name}.type | string | Yes | Output type: string, number, boolean |
on_error | string | No | skip, drop, or fail (default: skip) |
Complete Example: GA4 Pipeline
name: GA4 Measurement Protocol
description: Transforms events for Google Analytics 4
steps:
# Only process track and page events
- type: filter
condition: "event.type IN ('track', 'page')"
# Map canonical fields to GA4 format
- type: map
mappings:
client_id: "event.vendorIds.ga4_client_id"
user_id: "event.userId"
events[0].params.session_id: "event.context.session.id"
events[0].params.engagement_time_msec: 100
on_missing: skip
# Map page events
- type: map
condition: "event.type = 'page'"
mappings:
events[0].name: "'page_view'"
events[0].params.page_location: "event.properties.url"
events[0].params.page_title: "event.properties.title"
events[0].params.page_referrer: "event.properties.referrer"
# Map purchase events
- type: map
condition: "event.event = 'Purchase'"
mappings:
events[0].name: "'purchase'"
events[0].params.transaction_id: "event.properties.order_id"
events[0].params.value: "event.properties.revenue"
events[0].params.currency: "event.properties.currency"
events[0].params.items: "event.properties.products"
# Map generic track events (fallback)
- type: map
condition: "event.type = 'track' AND event.event != 'Purchase'"
mappings:
events[0].name: "event.event"
# Lowercase event names (GA4 convention)
- type: transform
transforms:
- field: "events[0].name"
function: lowercase
- field: "events[0].name"
function: replace
args: { pattern: " ", replacement: "_" }
# Compute session engagement
- type: compute
fields:
events[0].params.session_engaged:
expression: "event.context.session.eventIndex > 1"
type: boolean
# Add static configuration
- type: enrich
fields:
measurement_id: "G-XXXXXXXXXX"
api_secret: "your_api_secret"Complete Example: Meta Conversions API Pipeline
name: Meta Conversions API
description: Transforms events for Meta CAPI
steps:
# Only process specific conversion events
- type: filter
condition: "event.type = 'track' AND event.event IN ('Purchase', 'Add to Cart', 'Initiate Checkout', 'Lead', 'Page View')"
# Map core fields
- type: map
mappings:
data[0].event_name: "event.event"
data[0].event_source_url: "event.context.page.url"
data[0].user_data.external_id[0]: "event.userId"
data[0].user_data.fbp: "event.vendorIds.fbp"
data[0].user_data.em[0]: "event.traits.email"
data[0].user_data.ph[0]: "event.traits.phone"
data[0].custom_data.order_id: "event.properties.order_id"
data[0].custom_data.value: "event.properties.revenue"
data[0].custom_data.currency: "event.properties.currency"
# Hash PII fields as required by Meta
- type: transform
transforms:
- field: "data[0].user_data.em[0]"
function: lowercase
- field: "data[0].user_data.em[0]"
function: trim
- field: "data[0].user_data.em[0]"
function: sha256_hash
- field: "data[0].user_data.ph[0]"
function: sha256_hash
# Convert timestamp to Unix seconds
- type: transform
transforms:
- field: "data[0].event_time"
function: to_unix_seconds
# Compute fbc from click ID
- type: compute
fields:
data[0].user_data.fbc:
expression: "CONCAT('fb.1.', event.context.session.startedAt, '.', event.clickIds.fbclid)"
type: string
# Add static fields
- type: enrich
fields:
data[0].action_source: "website"
access_token: "your_access_token"
pixel_id: "your_pixel_id"Dry-Run API
Test a pipeline against a sample event without publishing to Kafka:
POST /v1/admin/transformations/{id}/dry-runRequest
{
"event": {
"type": "track",
"event": "Purchase",
"userId": "user_98765",
"properties": {
"order_id": "ORD-001",
"revenue": 149.99,
"currency": "USD"
},
"vendorIds": {
"ga4_client_id": "1234567890.1740000000"
}
}
}Response
{
"success": true,
"output": {
"client_id": "1234567890.1740000000",
"user_id": "user_98765",
"events": [
{
"name": "purchase",
"params": {
"transaction_id": "ORD-001",
"value": 149.99,
"currency": "USD"
}
}
]
},
"steps_executed": 5,
"steps_skipped": 1,
"duration_ms": 0.42
}The dry-run endpoint is available in the Management API and is used by the Management UI’s pipeline editor to provide real-time preview of transformations.
Validation
Pipeline configurations are validated when saved. The validator checks for:
- Valid step types
- Required parameters per step type
- Valid field path syntax
- Valid expression syntax (for filter and compute steps)
- No circular references in computed fields
Invalid configurations are rejected with a detailed error message indicating the step and field that failed validation.