ProcessingPipeline Engine

Pipeline Transformation Engine

The Pipeline Transformation Engine is the second processing layer. It runs after the Organisation Data Layer, once per integration that the event has been routed to. Each integration has its own pipeline configuration that transforms the canonical Datafly event into the exact format required by the vendor’s API.

How It Works

Enriched event (from Org Data Layer)
  → Pipeline for Integration A → delivery-{integration_a} topic
  → Pipeline for Integration B → delivery-{integration_b} topic
  → Pipeline for Integration C → delivery-{integration_c} topic

Each pipeline is an ordered sequence of steps. The event enters the pipeline as the canonical Datafly format and exits as the vendor-specific payload. If any step fails or a filter drops the event, processing stops for that integration without affecting others.

Pipeline Steps

A pipeline consists of one or more steps executed in order:

Step TypePurposeExample
mapRename or restructure fieldsMap properties.revenue to params.value
filterDrop events that don’t match criteriaOnly send track events named Purchase
transformModify field valuesConvert currency from cents to dollars
enrichAdd static or computed fieldsAdd measurement_id to every GA4 event
computeCalculate new fields from expressionsCompute lifetime_value from multiple fields

Each step type is described in detail in the Pipeline Configuration reference.

Execution Model

Step Ordering

Steps execute in the order they are defined in the configuration. The output of each step becomes the input of the next:

Input event → map → filter → transform → enrich → compute → Output payload

Early Termination

If a filter step drops the event, the pipeline stops immediately. No subsequent steps run, and no message is published to the delivery topic.

Error Handling

If a step encounters an error (e.g. a referenced field does not exist), the behaviour depends on the step’s on_error setting:

SettingBehaviour
skip (default)Skip this step, continue with the next
dropDrop the event entirely for this integration
failSend the event to the dead-letter topic for investigation

Example: GA4 Pipeline

Here is how a pipeline transforms a canonical “Purchase” event into the GA4 Measurement Protocol format:

Input (canonical event)

{
  "type": "track",
  "event": "Purchase",
  "userId": "user_98765",
  "properties": {
    "order_id": "ORD-001",
    "revenue": 149.99,
    "currency": "USD",
    "products": [
      { "product_id": "SKU-1234", "name": "Headphones", "price": 79.99 },
      { "product_id": "SKU-5678", "name": "Cable", "price": 14.99 }
    ]
  },
  "vendorIds": {
    "ga4_client_id": "1234567890.1740000000"
  }
}

Pipeline steps

  1. map — Restructure to GA4 format
  2. transform — Convert event name to lowercase
  3. enrich — Add measurement ID and API secret

Output (GA4 Measurement Protocol)

{
  "client_id": "1234567890.1740000000",
  "user_id": "user_98765",
  "events": [
    {
      "name": "purchase",
      "params": {
        "transaction_id": "ORD-001",
        "value": 149.99,
        "currency": "USD",
        "items": [
          { "item_id": "SKU-1234", "item_name": "Headphones", "price": 79.99 },
          { "item_id": "SKU-5678", "item_name": "Cable", "price": 14.99 }
        ]
      }
    }
  ]
}

Example: Meta Conversions API Pipeline

Input (canonical event)

{
  "type": "track",
  "event": "Purchase",
  "userId": "user_98765",
  "properties": {
    "order_id": "ORD-001",
    "revenue": 149.99,
    "currency": "USD"
  },
  "traits": {
    "email": "jane@example.com",
    "phone": "+15551234567"
  },
  "vendorIds": {
    "fbp": "fb.1.1740000000.987654321"
  },
  "clickIds": {
    "fbclid": "IwAR3..."
  },
  "context": {
    "page": {
      "url": "https://shop.example.com/checkout/success"
    }
  }
}

Output (Meta CAPI)

{
  "data": [
    {
      "event_name": "Purchase",
      "event_time": 1740493800,
      "action_source": "website",
      "event_source_url": "https://shop.example.com/checkout/success",
      "user_data": {
        "em": ["a1b2c3..."],
        "ph": ["d4e5f6..."],
        "fbp": "fb.1.1740000000.987654321",
        "fbc": "fb.1.1740493800.IwAR3...",
        "external_id": ["user_98765"]
      },
      "custom_data": {
        "order_id": "ORD-001",
        "value": 149.99,
        "currency": "USD"
      }
    }
  ]
}

Note how the pipeline automatically hashes PII fields (em, ph) using SHA-256, as required by the Meta Conversions API.

Pipeline Storage

Pipeline configurations are stored in PostgreSQL as YAML or JSON documents. They are associated with a specific integration and can be versioned. The Management API and Management UI both provide interfaces for creating and editing pipelines.

FieldDescription
idUnique pipeline identifier
integration_idThe integration this pipeline belongs to
versionPipeline version number (auto-incremented)
configYAML or JSON pipeline configuration
created_atCreation timestamp
updated_atLast modification timestamp
is_activeWhether this version is currently live

When you update a pipeline, the previous version is retained. You can roll back to any previous version via the Management UI or API. Only one version per integration is active at any time.

Dry-Run Testing

You can test a pipeline against a sample event without publishing to Kafka:

POST /v1/admin/transformations/{id}/dry-run

This executes the pipeline and returns the output payload without side effects. See Pipeline Configuration for the full dry-run API reference.

Performance

The Pipeline Engine is designed for high throughput:

  • Pipelines are compiled into an optimised execution plan on load
  • Compiled pipelines are cached in memory and invalidated on config change
  • Each pipeline step operates on a shared event object to minimise memory allocation
  • Pipelines execute in under 1ms for typical configurations