ProcessingPipeline Configuration

Pipeline Configuration

Pipelines are defined as YAML or JSON documents that describe how to transform a canonical Datafly event into a vendor-specific payload. This page is the configuration reference.

Structure

A pipeline configuration has a top-level steps array. Each step has a type and type-specific parameters:

name: GA4 Purchase Pipeline
description: Transforms purchase events for GA4 Measurement Protocol
steps:
  - type: filter
    condition: "event.type = 'track' AND event.event = 'Purchase'"
 
  - type: map
    mappings:
      client_id: "event.vendorIds.ga4_client_id"
      user_id: "event.userId"
      events[0].name: "'purchase'"
      events[0].params.transaction_id: "event.properties.order_id"
      events[0].params.value: "event.properties.revenue"
      events[0].params.currency: "event.properties.currency"
 
  - type: enrich
    fields:
      events[0].params.engagement_time_msec: 100
 
  - type: compute
    fields:
      events[0].params.value:
        expression: "COALESCE(event.properties.revenue, 0)"
        type: number

Step Types

filter

Evaluates a condition and drops the event if it returns false. Use the expression language for conditions.

- type: filter
  condition: "event.type = 'track' AND event.event IN ('Purchase', 'Add to Cart', 'Checkout Started')"
  on_error: skip
ParameterTypeRequiredDescription
conditionstringYesExpression that must evaluate to true
on_errorstringNoskip, drop, or fail (default: skip)

map

Maps fields from the canonical event to the output payload. The left side is the destination path, the right side is the source path or a literal value.

- type: map
  mappings:
    client_id: "event.vendorIds.ga4_client_id"
    user_id: "event.userId"
    events[0].name: "event.event"
    events[0].params.transaction_id: "event.properties.order_id"
    events[0].params.value: "event.properties.revenue"
    events[0].params.currency: "event.properties.currency"
  on_missing: skip
ParameterTypeRequiredDescription
mappingsobjectYesDestination-to-source field mapping
on_missingstringNoBehaviour when a source field is missing: skip (omit field), null (set to null), error (default: skip)
on_errorstringNoskip, drop, or fail (default: skip)

Source path syntax

SyntaxDescriptionExample
event.fieldReference a field on the canonical eventevent.properties.revenue
event.field[0]Array indexevent.properties.products[0].name
event.field[*].subAll items in an arrayevent.properties.products[*].product_id
'literal'Static string value'purchase'
123Static numeric value100
true / falseStatic boolean valuetrue

transform

Modifies field values using built-in transformation functions.

- type: transform
  transforms:
    - field: "events[0].name"
      function: lowercase
    - field: "user_data.em"
      function: sha256_hash
    - field: "user_data.ph"
      function: sha256_hash
    - field: "custom_data.value"
      function: divide
      args: { divisor: 100 }
ParameterTypeRequiredDescription
transformsarrayYesList of field transformations
transforms[].fieldstringYesPath to the field to transform
transforms[].functionstringYesTransformation function name
transforms[].argsobjectNoFunction-specific arguments
on_errorstringNoskip, drop, or fail (default: skip)

Available functions

FunctionDescriptionArgs
lowercaseConvert to lowercase
uppercaseConvert to uppercase
trimRemove leading/trailing whitespace
sha256_hashSHA-256 hash (hex output)
md5_hashMD5 hash (hex output)
base64_encodeBase64 encode
base64_decodeBase64 decode
to_stringConvert to string
to_numberConvert to number
to_booleanConvert to boolean
to_unix_secondsConvert ISO timestamp to Unix seconds
to_unix_millisConvert ISO timestamp to Unix milliseconds
to_iso8601Convert Unix timestamp to ISO 8601
multiplyMultiply by a factor{ factor: 100 }
divideDivide by a divisor{ divisor: 100 }
roundRound to N decimal places{ decimals: 2 }
replaceString replacement{ pattern: " ", replacement: "_" }
regex_extractExtract via regex capture group{ pattern: "(\\d+)", group: 1 }
defaultSet a default value if null/missing{ value: "unknown" }
array_joinJoin array elements{ separator: "," }
array_mapMap each array element{ field: "product_id" }

enrich

Adds static or computed fields to the output. Unlike map, this does not reference the source event — it adds values directly.

- type: enrich
  fields:
    api_secret: "GA4_API_SECRET"
    measurement_id: "G-XXXXXXXXXX"
    events[0].params.engagement_time_msec: 100
    data[0].action_source: "website"
ParameterTypeRequiredDescription
fieldsobjectYesField-to-value mapping
on_errorstringNoskip, drop, or fail (default: skip)

Use enrich to inject secrets and static configuration values (API keys, measurement IDs, pixel IDs) that are the same for every event sent to an integration.

Pipeline parameter resolution: If the integration revision config contains {{pipeline_param.KEY}} placeholders, these are resolved from the pipeline’s parameters object before the config is parsed. This happens at config load time (every 5 seconds), not per-event. See Pipeline Parameters for details.

compute

Creates new fields using the expression language. Computed fields can reference both the source event and previously mapped output fields.

- type: compute
  fields:
    custom_data.value:
      expression: "COALESCE(event.properties.revenue, event.properties.total, 0)"
      type: number
    user_data.fbc:
      expression: "CONCAT('fb.1.', event.context.session.startedAt, '.', event.clickIds.fbclid)"
      type: string
    events[0].params.session_engaged:
      expression: "event.context.session.eventIndex > 1"
      type: boolean
ParameterTypeRequiredDescription
fieldsobjectYesField definitions with expression and type
fields.{name}.expressionstringYesExpression to evaluate
fields.{name}.typestringYesOutput type: string, number, boolean
on_errorstringNoskip, drop, or fail (default: skip)

Complete Example: GA4 Pipeline

name: GA4 Measurement Protocol
description: Transforms events for Google Analytics 4
 
steps:
  # Only process track and page events
  - type: filter
    condition: "event.type IN ('track', 'page')"
 
  # Map canonical fields to GA4 format
  - type: map
    mappings:
      client_id: "event.vendorIds.ga4_client_id"
      user_id: "event.userId"
      events[0].params.session_id: "event.context.session.id"
      events[0].params.engagement_time_msec: 100
    on_missing: skip
 
  # Map page events
  - type: map
    condition: "event.type = 'page'"
    mappings:
      events[0].name: "'page_view'"
      events[0].params.page_location: "event.properties.url"
      events[0].params.page_title: "event.properties.title"
      events[0].params.page_referrer: "event.properties.referrer"
 
  # Map purchase events
  - type: map
    condition: "event.event = 'Purchase'"
    mappings:
      events[0].name: "'purchase'"
      events[0].params.transaction_id: "event.properties.order_id"
      events[0].params.value: "event.properties.revenue"
      events[0].params.currency: "event.properties.currency"
      events[0].params.items: "event.properties.products"
 
  # Map generic track events (fallback)
  - type: map
    condition: "event.type = 'track' AND event.event != 'Purchase'"
    mappings:
      events[0].name: "event.event"
 
  # Lowercase event names (GA4 convention)
  - type: transform
    transforms:
      - field: "events[0].name"
        function: lowercase
      - field: "events[0].name"
        function: replace
        args: { pattern: " ", replacement: "_" }
 
  # Compute session engagement
  - type: compute
    fields:
      events[0].params.session_engaged:
        expression: "event.context.session.eventIndex > 1"
        type: boolean
 
  # Add static configuration
  - type: enrich
    fields:
      measurement_id: "G-XXXXXXXXXX"
      api_secret: "your_api_secret"

Complete Example: Meta Conversions API Pipeline

name: Meta Conversions API
description: Transforms events for Meta CAPI
 
steps:
  # Only process specific conversion events
  - type: filter
    condition: "event.type = 'track' AND event.event IN ('Purchase', 'Add to Cart', 'Initiate Checkout', 'Lead', 'Page View')"
 
  # Map core fields
  - type: map
    mappings:
      data[0].event_name: "event.event"
      data[0].event_source_url: "event.context.page.url"
      data[0].user_data.external_id[0]: "event.userId"
      data[0].user_data.fbp: "event.vendorIds.fbp"
      data[0].user_data.em[0]: "event.traits.email"
      data[0].user_data.ph[0]: "event.traits.phone"
      data[0].custom_data.order_id: "event.properties.order_id"
      data[0].custom_data.value: "event.properties.revenue"
      data[0].custom_data.currency: "event.properties.currency"
 
  # Hash PII fields as required by Meta
  - type: transform
    transforms:
      - field: "data[0].user_data.em[0]"
        function: lowercase
      - field: "data[0].user_data.em[0]"
        function: trim
      - field: "data[0].user_data.em[0]"
        function: sha256_hash
      - field: "data[0].user_data.ph[0]"
        function: sha256_hash
 
  # Convert timestamp to Unix seconds
  - type: transform
    transforms:
      - field: "data[0].event_time"
        function: to_unix_seconds
 
  # Compute fbc from click ID
  - type: compute
    fields:
      data[0].user_data.fbc:
        expression: "CONCAT('fb.1.', event.context.session.startedAt, '.', event.clickIds.fbclid)"
        type: string
 
  # Add static fields
  - type: enrich
    fields:
      data[0].action_source: "website"
      access_token: "your_access_token"
      pixel_id: "your_pixel_id"

Dry-Run API

Test a pipeline against a sample event without publishing to Kafka:

POST /v1/admin/transformations/{id}/dry-run

Request

{
  "event": {
    "type": "track",
    "event": "Purchase",
    "userId": "user_98765",
    "properties": {
      "order_id": "ORD-001",
      "revenue": 149.99,
      "currency": "USD"
    },
    "vendorIds": {
      "ga4_client_id": "1234567890.1740000000"
    }
  }
}

Response

{
  "success": true,
  "output": {
    "client_id": "1234567890.1740000000",
    "user_id": "user_98765",
    "events": [
      {
        "name": "purchase",
        "params": {
          "transaction_id": "ORD-001",
          "value": 149.99,
          "currency": "USD"
        }
      }
    ]
  },
  "steps_executed": 5,
  "steps_skipped": 1,
  "duration_ms": 0.42
}

The dry-run endpoint is available in the Management API and is used by the Management UI’s pipeline editor to provide real-time preview of transformations.

Validation

Pipeline configurations are validated when saved. The validator checks for:

  • Valid step types
  • Required parameters per step type
  • Valid field path syntax
  • Valid expression syntax (for filter and compute steps)
  • No circular references in computed fields

Invalid configurations are rejected with a detailed error message indicating the step and field that failed validation.