Data Warehouses
Datafly Signal can deliver events to data warehouses and cloud storage for analytics, reporting, and long-term retention. Events are buffered and flushed in batches for efficient ingestion.
Supported Destinations
| Destination | Method | Status |
|---|---|---|
| Google BigQuery | Streaming Insert API | GA |
| Snowflake | Snowpipe | Planned |
| Amazon S3 | PUT Object | Planned |
| Amazon Redshift | COPY via S3 staging | Planned |
Batching
All data warehouse integrations use batching to optimise throughput and reduce API calls. Events are buffered in memory and flushed when either condition is met:
| Setting | Default | Description |
|---|---|---|
batch_size | 500 | Maximum number of events per batch |
batch_interval_seconds | 60 | Maximum time (seconds) before flushing a batch |
Configure these per integration:
{
"config": {
"batch_size": 1000,
"batch_interval_seconds": 30
}
}Smaller batch intervals reduce delivery latency but increase the number of API calls. For most use cases, the defaults provide a good balance between latency and efficiency.
Event Schema
All data warehouse destinations receive events in a consistent schema. The event properties are stored as a JSON column (JSONB in PostgreSQL/Redshift, VARIANT in Snowflake, JSON string in BigQuery/S3) to accommodate any event structure without schema migrations.
| Column | Type | Description |
|---|---|---|
event_id | STRING | Unique event identifier |
type | STRING | Event type: page, track, identify, group |
event | STRING | Event name (e.g. Order Completed, Page Viewed) |
anonymous_id | STRING | Anonymous visitor identifier |
user_id | STRING | Identified user ID (if available) |
timestamp | TIMESTAMP | Event timestamp (ISO 8601) |
received_at | TIMESTAMP | When the Ingestion Gateway received the event |
sent_at | TIMESTAMP | When Datafly.js sent the event |
context | JSON / VARIANT | Full context object (page, IP, user agent, locale, consent, etc.) |
properties | JSON / VARIANT | Event properties (all custom data) |
traits | JSON / VARIANT | User traits from identify calls |
source_id | STRING | Datafly source identifier |
integration_id | STRING | Datafly integration identifier |
Storing properties, context, and traits as JSON columns means you never need to alter your warehouse table schema when new event properties are added. Use your warehouse’s JSON query functions to extract specific fields.
Google BigQuery
Deliver events to BigQuery using the Streaming Insert API (tabledata.insertAll).
Configuration
| Field | Required | Description |
|---|---|---|
project_id | Yes | Google Cloud project ID |
dataset | Yes | BigQuery dataset name |
table | Yes | BigQuery table name |
service_account_json | Yes | Service account key JSON with bigquery.insertdata permission |
Management API Setup
curl -X POST http://localhost:8084/v1/admin/integrations \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source_id": "src_abc123",
"vendor": "bigquery",
"name": "BigQuery Events",
"enabled": true,
"config": {
"project_id": "my-gcp-project",
"dataset": "datafly_events",
"table": "events",
"service_account_json": "{\"type\": \"service_account\", \"project_id\": \"...\", ...}",
"batch_size": 500,
"batch_interval_seconds": 60
}
}'Table Setup
Create the destination table in BigQuery before enabling the integration:
CREATE TABLE `my-gcp-project.datafly_events.events` (
event_id STRING NOT NULL,
type STRING,
event STRING,
anonymous_id STRING,
user_id STRING,
timestamp TIMESTAMP,
received_at TIMESTAMP,
sent_at TIMESTAMP,
context JSON,
properties JSON,
traits JSON,
source_id STRING,
integration_id STRING
)
PARTITION BY DATE(timestamp)
CLUSTER BY type, event;Partitioning by timestamp and clustering by type and event significantly improves query performance and reduces costs for large event volumes.
Querying Events
Extract specific properties using BigQuery’s JSON functions:
-- Revenue by day
SELECT
DATE(timestamp) AS day,
SUM(CAST(JSON_VALUE(properties, '$.total') AS FLOAT64)) AS revenue
FROM `my-gcp-project.datafly_events.events`
WHERE event = 'Order Completed'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY day
ORDER BY day DESC;
-- Top pages by views
SELECT
JSON_VALUE(context, '$.page.url') AS page_url,
COUNT(*) AS views
FROM `my-gcp-project.datafly_events.events`
WHERE type = 'page'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;Service Account Permissions
The service account requires the following BigQuery IAM role:
roles/bigquery.dataEditor— Allows inserting rows into the table.
For least-privilege access, create a custom role with only the bigquery.tables.updateData permission.
Store the service account JSON securely. The key is encrypted at rest in Datafly’s configuration database and is never exposed in API responses or logs.
Snowflake
Deliver events to Snowflake using Snowpipe for continuous, near-real-time loading.
Snowflake integration is currently planned and not yet available. The configuration below describes the intended implementation.
Configuration
| Field | Required | Description |
|---|---|---|
account | Yes | Snowflake account identifier (e.g. abc12345.us-east-1) |
warehouse | Yes | Snowflake warehouse name |
database | Yes | Snowflake database name |
schema | Yes | Snowflake schema name |
table | Yes | Destination table name |
stage | Yes | Snowflake stage name for Snowpipe |
pipe | Yes | Snowpipe name |
user | Yes | Snowflake user with write permissions |
private_key | Yes | RSA private key for key-pair authentication |
Management API Setup
curl -X POST http://localhost:8084/v1/admin/integrations \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source_id": "src_abc123",
"vendor": "snowflake",
"name": "Snowflake Events",
"enabled": true,
"config": {
"account": "abc12345.us-east-1",
"warehouse": "DATAFLY_WH",
"database": "ANALYTICS",
"schema": "DATAFLY",
"table": "EVENTS",
"stage": "DATAFLY_STAGE",
"pipe": "DATAFLY_PIPE",
"user": "DATAFLY_SERVICE",
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----",
"batch_size": 1000,
"batch_interval_seconds": 60
}
}'How Snowpipe Works
- Datafly Signal writes batches of events as JSON files to the Snowflake internal stage.
- Snowpipe detects new files and loads them into the destination table automatically.
- Typical latency from event to queryable row is 1-3 minutes.
Table Setup
CREATE TABLE ANALYTICS.DATAFLY.EVENTS (
event_id STRING NOT NULL,
type STRING,
event STRING,
anonymous_id STRING,
user_id STRING,
timestamp TIMESTAMP_NTZ,
received_at TIMESTAMP_NTZ,
sent_at TIMESTAMP_NTZ,
context VARIANT,
properties VARIANT,
traits VARIANT,
source_id STRING,
integration_id STRING
);Querying Events
Extract specific properties using Snowflake’s VARIANT query syntax:
-- Revenue by day
SELECT
DATE(timestamp) AS day,
SUM(properties:total::FLOAT) AS revenue
FROM ANALYTICS.DATAFLY.EVENTS
WHERE event = 'Order Completed'
AND timestamp >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY day
ORDER BY day DESC;Amazon S3
Deliver events as files to an Amazon S3 bucket for data lake architectures, downstream ETL, or archival.
Amazon S3 integration is currently planned and not yet available. The configuration below describes the intended implementation.
Configuration
| Field | Required | Default | Description |
|---|---|---|---|
bucket | Yes | — | S3 bucket name |
prefix | No | datafly/events/ | Key prefix (folder path) |
region | Yes | — | AWS region (e.g. us-east-1) |
access_key_id | Yes | — | IAM access key ID |
secret_access_key | Yes | — | IAM secret access key |
file_format | No | json | Output format: json (newline-delimited JSON) or parquet |
compression | No | gzip | Compression: gzip, snappy (Parquet only), or none |
Management API Setup
curl -X POST http://localhost:8084/v1/admin/integrations \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source_id": "src_abc123",
"vendor": "s3",
"name": "S3 Event Archive",
"enabled": true,
"config": {
"bucket": "my-analytics-bucket",
"prefix": "datafly/events/",
"region": "us-east-1",
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"file_format": "json",
"compression": "gzip",
"batch_size": 1000,
"batch_interval_seconds": 300
}
}'File Layout
Files are written with the following key structure:
s3://my-analytics-bucket/datafly/events/year=2026/month=01/day=29/hour=14/events-1706540000-abc123.json.gzThe Hive-style partitioning (year=, month=, day=, hour=) allows efficient querying with tools like Athena, Spark, or Presto.
JSON Format (Newline-Delimited)
Each line is a complete JSON event:
{"event_id":"evt_001","type":"track","event":"Order Completed","anonymous_id":"anon_123","timestamp":"2026-01-29T14:30:00Z","properties":{"order_id":"ORD-001","total":129.99}}
{"event_id":"evt_002","type":"page","event":"Page Viewed","anonymous_id":"anon_456","timestamp":"2026-01-29T14:30:01Z","properties":{}}Parquet Format
Parquet files use the same column schema as described in the Event Schema section, with context, properties, and traits stored as STRING columns containing JSON.
IAM Policy
The IAM user or role requires the following S3 permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::my-analytics-bucket",
"arn:aws:s3:::my-analytics-bucket/datafly/events/*"
]
}
]
}Amazon Redshift
Deliver events to Amazon Redshift using the COPY command via S3 staging.
Amazon Redshift integration is currently planned and not yet available. The configuration below describes the intended implementation.
Configuration
| Field | Required | Description |
|---|---|---|
host | Yes | Redshift cluster endpoint |
port | No | Redshift port (default 5439) |
database | Yes | Database name |
schema | Yes | Schema name |
table | Yes | Destination table name |
user | Yes | Redshift user |
password | Yes | Redshift password |
s3_bucket | Yes | S3 bucket for staging files |
s3_prefix | No | S3 key prefix for staging files |
s3_region | Yes | AWS region of the S3 bucket |
iam_role_arn | Yes | IAM role ARN for Redshift to access S3 |
Management API Setup
curl -X POST http://localhost:8084/v1/admin/integrations \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"source_id": "src_abc123",
"vendor": "redshift",
"name": "Redshift Events",
"enabled": true,
"config": {
"host": "my-cluster.abc123.us-east-1.redshift.amazonaws.com",
"port": 5439,
"database": "analytics",
"schema": "datafly",
"table": "events",
"user": "datafly_service",
"password": "your_password",
"s3_bucket": "my-redshift-staging",
"s3_prefix": "datafly/staging/",
"s3_region": "us-east-1",
"iam_role_arn": "arn:aws:iam::123456789012:role/RedshiftS3Access",
"batch_size": 1000,
"batch_interval_seconds": 120
}
}'How It Works
- Datafly Signal writes batches of events as JSON files to the S3 staging bucket.
- A
COPYcommand is issued to Redshift to load the staged files into the destination table. - Staging files are deleted after successful loading.
Table Setup
CREATE TABLE datafly.events (
event_id VARCHAR(64) NOT NULL,
type VARCHAR(20),
event VARCHAR(256),
anonymous_id VARCHAR(64),
user_id VARCHAR(256),
timestamp TIMESTAMP,
received_at TIMESTAMP,
sent_at TIMESTAMP,
context SUPER,
properties SUPER,
traits SUPER,
source_id VARCHAR(64),
integration_id VARCHAR(64)
)
DISTKEY(anonymous_id)
SORTKEY(timestamp);Using anonymous_id as the distribution key and timestamp as the sort key optimises both join performance (by user) and time-range query performance.
Querying Events
Extract specific properties using Redshift’s SUPER type functions:
-- Revenue by day
SELECT
DATE(timestamp) AS day,
SUM(properties.total::FLOAT) AS revenue
FROM datafly.events
WHERE event = 'Order Completed'
AND timestamp >= GETDATE() - INTERVAL '30 days'
GROUP BY day
ORDER BY day DESC;