ProcessingOverview

Processing

The Event Processor is a Go service (port 8081) that consumes raw events from Kafka and transforms them into vendor-ready payloads. It applies a two-layer processing architecture before publishing events to per-integration delivery topics.

Architecture

Kafka raw-events
  → Org Data Layer (11 steps, tenant-wide)
    → Pipeline Transformation Engine (per-integration)
      → Kafka delivery-{integration_id} topics
        → Delivery Workers

Every event passes through both layers sequentially. The Org Data Layer applies tenant-wide rules that are consistent across all integrations, while the Pipeline Engine transforms each event into the specific format required by each destination.

Two-Layer Design

Layer 1: Organisation Data Layer

The Org Data Layer runs first and applies tenant-wide processing that is consistent across all integrations. This is where you enforce consent, filter bots, detect PII, resolve identities, and enrich events with geolocation and device data.

Think of this layer as “what happens to every event regardless of where it’s going.”

StepPurpose
Schema validationEnsure events conform to the expected structure
Consent enforcementApply consent decisions per category
Bot/spam filteringRemove non-human traffic
PII detectionAuto-detect, hash, redact, or pass through PII
IP geolocationEnrich with country, region, city
Device/browser parsingParse user agent into structured device data
Session stitchingGroup events into sessions
Identity resolutionAttach known user IDs and vendor IDs
Event deduplicationPrevent duplicate processing via idempotency keys
Custom JavaScriptExecute org-level transformation scripts
Event routingDetermine which integrations receive this event

See Organisation Data Layer for details on each step.

Layer 2: Pipeline Transformation Engine

The Pipeline Engine runs after the Org Data Layer, once per integration that the event is routed to. Each integration has its own pipeline configuration that transforms the canonical event into the vendor’s required format.

Think of this layer as “how do we shape this event for Google Analytics 4, Meta, etc.”

CapabilityDescription
Field mappingMap canonical fields to vendor-specific fields
Value transformationConvert values (e.g. cents to dollars, enum mappings)
EnrichmentAdd static or computed fields
FilteringDrop events that don’t apply to this integration
FormattingOutput the vendor’s required payload structure

Pipelines are configured as YAML or JSON documents stored in PostgreSQL. See Pipeline Engine and Pipeline Configuration for details.

Data Flow Example

Here is how a “Purchase” event flows through both layers:

1. Raw event arrives from Kafka:

{
  "type": "track",
  "event": "Purchase",
  "userId": "user_98765",
  "anonymousId": "dfid_abc123",
  "properties": {
    "order_id": "ORD-001",
    "revenue": 149.99,
    "currency": "USD"
  },
  "context": {
    "ip": "203.0.113.42",
    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ..."
  }
}

2. After Org Data Layer:

{
  "type": "track",
  "event": "Purchase",
  "userId": "user_98765",
  "anonymousId": "dfid_abc123",
  "properties": {
    "order_id": "ORD-001",
    "revenue": 149.99,
    "currency": "USD"
  },
  "context": {
    "ip": "203.0.113.42",
    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ...",
    "geo": {
      "country": "US",
      "region": "CA",
      "city": "San Francisco"
    },
    "device": {
      "type": "desktop",
      "browser": "Chrome",
      "browserVersion": "121.0",
      "os": "macOS",
      "osVersion": "14.3"
    },
    "session": {
      "id": "sess_xyz789",
      "eventIndex": 5
    }
  },
  "vendorIds": {
    "ga4_client_id": "1234567890.1740000000",
    "fbp": "fb.1.1740000000.987654321"
  },
  "_routing": ["integration_ga4_001", "integration_meta_001"]
}

3. After Pipeline Engine (GA4 output):

{
  "client_id": "1234567890.1740000000",
  "user_id": "user_98765",
  "events": [
    {
      "name": "purchase",
      "params": {
        "transaction_id": "ORD-001",
        "value": 149.99,
        "currency": "USD"
      }
    }
  ]
}

The GA4-formatted payload is published to the delivery-integration_ga4_001 Kafka topic, where a Delivery Worker picks it up and sends it to the GA4 Measurement Protocol API.

Sections

The Event Processor is stateless — it reads from Kafka, processes, and writes back to Kafka. All state (identity graphs, session data, deduplication keys) is stored in Redis. This allows horizontal scaling by adding more processor instances.