Processing
The Event Processor is a Go service (port 8081) that consumes raw events from Kafka and transforms them into vendor-ready payloads. It applies a two-layer processing architecture before publishing events to per-integration delivery topics.
Architecture
Kafka raw-events
→ Org Data Layer (11 steps, tenant-wide)
→ Pipeline Transformation Engine (per-integration)
→ Kafka delivery-{integration_id} topics
→ Delivery WorkersEvery event passes through both layers sequentially. The Org Data Layer applies tenant-wide rules that are consistent across all integrations, while the Pipeline Engine transforms each event into the specific format required by each destination.
Two-Layer Design
Layer 1: Organisation Data Layer
The Org Data Layer runs first and applies tenant-wide processing that is consistent across all integrations. This is where you enforce consent, filter bots, detect PII, resolve identities, and enrich events with geolocation and device data.
Think of this layer as “what happens to every event regardless of where it’s going.”
| Step | Purpose |
|---|---|
| Schema validation | Ensure events conform to the expected structure |
| Consent enforcement | Apply consent decisions per category |
| Bot/spam filtering | Remove non-human traffic |
| PII detection | Auto-detect, hash, redact, or pass through PII |
| IP geolocation | Enrich with country, region, city |
| Device/browser parsing | Parse user agent into structured device data |
| Session stitching | Group events into sessions |
| Identity resolution | Attach known user IDs and vendor IDs |
| Event deduplication | Prevent duplicate processing via idempotency keys |
| Custom JavaScript | Execute org-level transformation scripts |
| Event routing | Determine which integrations receive this event |
See Organisation Data Layer for details on each step.
Layer 2: Pipeline Transformation Engine
The Pipeline Engine runs after the Org Data Layer, once per integration that the event is routed to. Each integration has its own pipeline configuration that transforms the canonical event into the vendor’s required format.
Think of this layer as “how do we shape this event for Google Analytics 4, Meta, etc.”
| Capability | Description |
|---|---|
| Field mapping | Map canonical fields to vendor-specific fields |
| Value transformation | Convert values (e.g. cents to dollars, enum mappings) |
| Enrichment | Add static or computed fields |
| Filtering | Drop events that don’t apply to this integration |
| Formatting | Output the vendor’s required payload structure |
Pipelines are configured as YAML or JSON documents stored in PostgreSQL. See Pipeline Engine and Pipeline Configuration for details.
Data Flow Example
Here is how a “Purchase” event flows through both layers:
1. Raw event arrives from Kafka:
{
"type": "track",
"event": "Purchase",
"userId": "user_98765",
"anonymousId": "dfid_abc123",
"properties": {
"order_id": "ORD-001",
"revenue": 149.99,
"currency": "USD"
},
"context": {
"ip": "203.0.113.42",
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ..."
}
}2. After Org Data Layer:
{
"type": "track",
"event": "Purchase",
"userId": "user_98765",
"anonymousId": "dfid_abc123",
"properties": {
"order_id": "ORD-001",
"revenue": 149.99,
"currency": "USD"
},
"context": {
"ip": "203.0.113.42",
"userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) ...",
"geo": {
"country": "US",
"region": "CA",
"city": "San Francisco"
},
"device": {
"type": "desktop",
"browser": "Chrome",
"browserVersion": "121.0",
"os": "macOS",
"osVersion": "14.3"
},
"session": {
"id": "sess_xyz789",
"eventIndex": 5
}
},
"vendorIds": {
"ga4_client_id": "1234567890.1740000000",
"fbp": "fb.1.1740000000.987654321"
},
"_routing": ["integration_ga4_001", "integration_meta_001"]
}3. After Pipeline Engine (GA4 output):
{
"client_id": "1234567890.1740000000",
"user_id": "user_98765",
"events": [
{
"name": "purchase",
"params": {
"transaction_id": "ORD-001",
"value": 149.99,
"currency": "USD"
}
}
]
}The GA4-formatted payload is published to the delivery-integration_ga4_001 Kafka topic, where a Delivery Worker picks it up and sends it to the GA4 Measurement Protocol API.
Sections
- Organisation Data Layer — The 11-step tenant-wide processing pipeline
- Pipeline Engine — Per-integration transformation engine
- Pipeline Configuration — YAML/JSON configuration reference
- Expressions — SQL-like expression language for conditions and computed fields
- Custom Code — Sandboxed JavaScript execution within the processing pipeline
The Event Processor is stateless — it reads from Kafka, processes, and writes back to Kafka. All state (identity graphs, session data, deduplication keys) is stored in Redis. This allows horizontal scaling by adding more processor instances.