IntegrationsCloud StorageData Warehouses (Overview)

Data Warehouses

Datafly Signal can deliver events to data warehouses and cloud storage for analytics, reporting, and long-term retention. Events are buffered and flushed in batches for efficient ingestion.

Supported Destinations

DestinationMethodStatus
Google BigQueryStreaming Insert APIGA
SnowflakeSnowpipePlanned
Amazon S3PUT ObjectPlanned
Amazon RedshiftCOPY via S3 stagingPlanned

Batching

All data warehouse integrations use batching to optimise throughput and reduce API calls. Events are buffered in memory and flushed when either condition is met:

SettingDefaultDescription
batch_size500Maximum number of events per batch
batch_interval_seconds60Maximum time (seconds) before flushing a batch

Configure these per integration:

{
  "config": {
    "batch_size": 1000,
    "batch_interval_seconds": 30
  }
}

Smaller batch intervals reduce delivery latency but increase the number of API calls. For most use cases, the defaults provide a good balance between latency and efficiency.

Event Schema

All data warehouse destinations receive events in a consistent schema. The event properties are stored as a JSON column (JSONB in PostgreSQL/Redshift, VARIANT in Snowflake, JSON string in BigQuery/S3) to accommodate any event structure without schema migrations.

ColumnTypeDescription
event_idSTRINGUnique event identifier
typeSTRINGEvent type: page, track, identify, group
eventSTRINGEvent name (e.g. Order Completed, Page Viewed)
anonymous_idSTRINGAnonymous visitor identifier
user_idSTRINGIdentified user ID (if available)
timestampTIMESTAMPEvent timestamp (ISO 8601)
received_atTIMESTAMPWhen the Ingestion Gateway received the event
sent_atTIMESTAMPWhen Datafly.js sent the event
contextJSON / VARIANTFull context object (page, IP, user agent, locale, consent, etc.)
propertiesJSON / VARIANTEvent properties (all custom data)
traitsJSON / VARIANTUser traits from identify calls
source_idSTRINGDatafly source identifier
integration_idSTRINGDatafly integration identifier

Storing properties, context, and traits as JSON columns means you never need to alter your warehouse table schema when new event properties are added. Use your warehouse’s JSON query functions to extract specific fields.


Google BigQuery

Deliver events to BigQuery using the Streaming Insert API (tabledata.insertAll).

Configuration

FieldRequiredDescription
project_idYesGoogle Cloud project ID
datasetYesBigQuery dataset name
tableYesBigQuery table name
service_account_jsonYesService account key JSON with bigquery.insertdata permission

Management API Setup

curl -X POST http://localhost:8084/v1/admin/integrations \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "src_abc123",
    "vendor": "bigquery",
    "name": "BigQuery Events",
    "enabled": true,
    "config": {
      "project_id": "my-gcp-project",
      "dataset": "datafly_events",
      "table": "events",
      "service_account_json": "{\"type\": \"service_account\", \"project_id\": \"...\", ...}",
      "batch_size": 500,
      "batch_interval_seconds": 60
    }
  }'

Table Setup

Create the destination table in BigQuery before enabling the integration:

CREATE TABLE `my-gcp-project.datafly_events.events` (
  event_id STRING NOT NULL,
  type STRING,
  event STRING,
  anonymous_id STRING,
  user_id STRING,
  timestamp TIMESTAMP,
  received_at TIMESTAMP,
  sent_at TIMESTAMP,
  context JSON,
  properties JSON,
  traits JSON,
  source_id STRING,
  integration_id STRING
)
PARTITION BY DATE(timestamp)
CLUSTER BY type, event;

Partitioning by timestamp and clustering by type and event significantly improves query performance and reduces costs for large event volumes.

Querying Events

Extract specific properties using BigQuery’s JSON functions:

-- Revenue by day
SELECT
  DATE(timestamp) AS day,
  SUM(CAST(JSON_VALUE(properties, '$.total') AS FLOAT64)) AS revenue
FROM `my-gcp-project.datafly_events.events`
WHERE event = 'Order Completed'
  AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY day
ORDER BY day DESC;
 
-- Top pages by views
SELECT
  JSON_VALUE(context, '$.page.url') AS page_url,
  COUNT(*) AS views
FROM `my-gcp-project.datafly_events.events`
WHERE type = 'page'
  AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;

Service Account Permissions

The service account requires the following BigQuery IAM role:

  • roles/bigquery.dataEditor — Allows inserting rows into the table.

For least-privilege access, create a custom role with only the bigquery.tables.updateData permission.

⚠️

Store the service account JSON securely. The key is encrypted at rest in Datafly’s configuration database and is never exposed in API responses or logs.


Snowflake

Deliver events to Snowflake using Snowpipe for continuous, near-real-time loading.

⚠️

Snowflake integration is currently planned and not yet available. The configuration below describes the intended implementation.

Configuration

FieldRequiredDescription
accountYesSnowflake account identifier (e.g. abc12345.us-east-1)
warehouseYesSnowflake warehouse name
databaseYesSnowflake database name
schemaYesSnowflake schema name
tableYesDestination table name
stageYesSnowflake stage name for Snowpipe
pipeYesSnowpipe name
userYesSnowflake user with write permissions
private_keyYesRSA private key for key-pair authentication

Management API Setup

curl -X POST http://localhost:8084/v1/admin/integrations \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "src_abc123",
    "vendor": "snowflake",
    "name": "Snowflake Events",
    "enabled": true,
    "config": {
      "account": "abc12345.us-east-1",
      "warehouse": "DATAFLY_WH",
      "database": "ANALYTICS",
      "schema": "DATAFLY",
      "table": "EVENTS",
      "stage": "DATAFLY_STAGE",
      "pipe": "DATAFLY_PIPE",
      "user": "DATAFLY_SERVICE",
      "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----",
      "batch_size": 1000,
      "batch_interval_seconds": 60
    }
  }'

How Snowpipe Works

  1. Datafly Signal writes batches of events as JSON files to the Snowflake internal stage.
  2. Snowpipe detects new files and loads them into the destination table automatically.
  3. Typical latency from event to queryable row is 1-3 minutes.

Table Setup

CREATE TABLE ANALYTICS.DATAFLY.EVENTS (
  event_id STRING NOT NULL,
  type STRING,
  event STRING,
  anonymous_id STRING,
  user_id STRING,
  timestamp TIMESTAMP_NTZ,
  received_at TIMESTAMP_NTZ,
  sent_at TIMESTAMP_NTZ,
  context VARIANT,
  properties VARIANT,
  traits VARIANT,
  source_id STRING,
  integration_id STRING
);

Querying Events

Extract specific properties using Snowflake’s VARIANT query syntax:

-- Revenue by day
SELECT
  DATE(timestamp) AS day,
  SUM(properties:total::FLOAT) AS revenue
FROM ANALYTICS.DATAFLY.EVENTS
WHERE event = 'Order Completed'
  AND timestamp >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY day
ORDER BY day DESC;

Amazon S3

Deliver events as files to an Amazon S3 bucket for data lake architectures, downstream ETL, or archival.

⚠️

Amazon S3 integration is currently planned and not yet available. The configuration below describes the intended implementation.

Configuration

FieldRequiredDefaultDescription
bucketYesS3 bucket name
prefixNodatafly/events/Key prefix (folder path)
regionYesAWS region (e.g. us-east-1)
access_key_idYesIAM access key ID
secret_access_keyYesIAM secret access key
file_formatNojsonOutput format: json (newline-delimited JSON) or parquet
compressionNogzipCompression: gzip, snappy (Parquet only), or none

Management API Setup

curl -X POST http://localhost:8084/v1/admin/integrations \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "src_abc123",
    "vendor": "s3",
    "name": "S3 Event Archive",
    "enabled": true,
    "config": {
      "bucket": "my-analytics-bucket",
      "prefix": "datafly/events/",
      "region": "us-east-1",
      "access_key_id": "AKIAIOSFODNN7EXAMPLE",
      "secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
      "file_format": "json",
      "compression": "gzip",
      "batch_size": 1000,
      "batch_interval_seconds": 300
    }
  }'

File Layout

Files are written with the following key structure:

s3://my-analytics-bucket/datafly/events/year=2026/month=01/day=29/hour=14/events-1706540000-abc123.json.gz

The Hive-style partitioning (year=, month=, day=, hour=) allows efficient querying with tools like Athena, Spark, or Presto.

JSON Format (Newline-Delimited)

Each line is a complete JSON event:

{"event_id":"evt_001","type":"track","event":"Order Completed","anonymous_id":"anon_123","timestamp":"2026-01-29T14:30:00Z","properties":{"order_id":"ORD-001","total":129.99}}
{"event_id":"evt_002","type":"page","event":"Page Viewed","anonymous_id":"anon_456","timestamp":"2026-01-29T14:30:01Z","properties":{}}

Parquet Format

Parquet files use the same column schema as described in the Event Schema section, with context, properties, and traits stored as STRING columns containing JSON.

IAM Policy

The IAM user or role requires the following S3 permissions:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::my-analytics-bucket",
        "arn:aws:s3:::my-analytics-bucket/datafly/events/*"
      ]
    }
  ]
}

Amazon Redshift

Deliver events to Amazon Redshift using the COPY command via S3 staging.

⚠️

Amazon Redshift integration is currently planned and not yet available. The configuration below describes the intended implementation.

Configuration

FieldRequiredDescription
hostYesRedshift cluster endpoint
portNoRedshift port (default 5439)
databaseYesDatabase name
schemaYesSchema name
tableYesDestination table name
userYesRedshift user
passwordYesRedshift password
s3_bucketYesS3 bucket for staging files
s3_prefixNoS3 key prefix for staging files
s3_regionYesAWS region of the S3 bucket
iam_role_arnYesIAM role ARN for Redshift to access S3

Management API Setup

curl -X POST http://localhost:8084/v1/admin/integrations \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "source_id": "src_abc123",
    "vendor": "redshift",
    "name": "Redshift Events",
    "enabled": true,
    "config": {
      "host": "my-cluster.abc123.us-east-1.redshift.amazonaws.com",
      "port": 5439,
      "database": "analytics",
      "schema": "datafly",
      "table": "events",
      "user": "datafly_service",
      "password": "your_password",
      "s3_bucket": "my-redshift-staging",
      "s3_prefix": "datafly/staging/",
      "s3_region": "us-east-1",
      "iam_role_arn": "arn:aws:iam::123456789012:role/RedshiftS3Access",
      "batch_size": 1000,
      "batch_interval_seconds": 120
    }
  }'

How It Works

  1. Datafly Signal writes batches of events as JSON files to the S3 staging bucket.
  2. A COPY command is issued to Redshift to load the staged files into the destination table.
  3. Staging files are deleted after successful loading.

Table Setup

CREATE TABLE datafly.events (
  event_id VARCHAR(64) NOT NULL,
  type VARCHAR(20),
  event VARCHAR(256),
  anonymous_id VARCHAR(64),
  user_id VARCHAR(256),
  timestamp TIMESTAMP,
  received_at TIMESTAMP,
  sent_at TIMESTAMP,
  context SUPER,
  properties SUPER,
  traits SUPER,
  source_id VARCHAR(64),
  integration_id VARCHAR(64)
)
DISTKEY(anonymous_id)
SORTKEY(timestamp);

Using anonymous_id as the distribution key and timestamp as the sort key optimises both join performance (by user) and time-range query performance.

Querying Events

Extract specific properties using Redshift’s SUPER type functions:

-- Revenue by day
SELECT
  DATE(timestamp) AS day,
  SUM(properties.total::FLOAT) AS revenue
FROM datafly.events
WHERE event = 'Order Completed'
  AND timestamp >= GETDATE() - INTERVAL '30 days'
GROUP BY day
ORDER BY day DESC;