Skip to main content

GCP BigQuery Integration

This guide shows how to automatically load ABsmartly experiment data exports into Google BigQuery using a Cloud Function triggered by the ExperimentDataExportReady webhook event.

When you download experiment event data from an experiment's page in the ABsmartly Web Console, the platform generates a ZIP file containing the exported data as CSV files. Once the export is ready, the ExperimentDataExportReady webhook fires with the download URL. A Cloud Function receives it, downloads the ZIP, extracts the CSV files, and loads them into BigQuery — giving your data team direct SQL access to experiment data.

Architecture

Prerequisites

  1. ABsmartly account with access to the Web Console and a valid API Key. API Keys can be created under User Profile > API Keys in the ABsmartly Web Console.
  2. Google Cloud project with billing enabled.
  3. BigQuery API and Cloud Functions API enabled in your GCP project.
  4. gcloud CLI installed and authenticated (install guide).
  5. The ExperimentDataExportReady webhook event must be available on your ABsmartly instance (requires platform version with this event — contact support if you don't see it).

Step 1 — Create a BigQuery Dataset

If you don't already have a dataset for ABsmartly data, create one:

gcloud config set project YOUR_PROJECT_ID

bq mk --dataset \
--location=US \
--description="ABsmartly experiment data exports" \
YOUR_PROJECT_ID:absmartly

You don't need to create the table manually — the Cloud Function uses BigQuery's schema auto-detection on the first load and appends to the table on subsequent loads.

tip
If you prefer a fixed schema, create the table upfront with the columns matching your export configuration and set autodetect=False in the Cloud Function's LoadJobConfig.

Step 2 — Create the Cloud Function

Create a directory for your function:

mkdir absmartly-export-to-bigquery && cd absmartly-export-to-bigquery

main.py

This function handles the webhook, downloads the export ZIP, extracts and sorts the CSV files by name, and loads them into BigQuery in order.

info
Large exports are split into multiple CSV files (at 5 million rows each). The files are named so that sorting alphabetically preserves the original row order. The function sorts and loads them sequentially to ensure deterministic ingestion order. When querying in BigQuery, use an explicit ORDER BY clause to guarantee result ordering.
main.py
import io
import json
import os
import zipfile

import functions_framework
import requests
from google.cloud import bigquery


ABSMARTLY_API_URL = os.environ["ABSMARTLY_API_URL"] # e.g. https://your-subdomain.absmartly.com
ABSMARTLY_API_KEY = os.environ["ABSMARTLY_API_KEY"]
BIGQUERY_DATASET = os.environ["BIGQUERY_DATASET"] # e.g. absmartly
BIGQUERY_TABLE = os.environ.get("BIGQUERY_TABLE", "experiment_exports")
TABLE_PER_EXPERIMENT = os.environ.get("TABLE_PER_EXPERIMENT", "false").lower() == "true"
DOWNLOAD_TIMEOUT = int(os.environ.get("DOWNLOAD_TIMEOUT", "300"))


@functions_framework.http
def handle_webhook(request):
"""Cloud Function entry point for the ExperimentDataExportReady webhook."""
if request.method != "POST":
return ("Method not allowed", 405)

payload = request.get_json(silent=True)
if not payload:
return ("Bad request: missing JSON body", 400)

if payload.get("event_name") != "ExperimentDataExportReady":
return ("Ignored: not an ExperimentDataExportReady event", 200)

metadata = payload.get("metadata")
if not metadata:
return ("Bad request: missing metadata", 400)

download_url = metadata.get("download_url")
if not download_url:
return ("Bad request: missing download_url in metadata", 400)

experiment_id = payload.get("id")
if TABLE_PER_EXPERIMENT and not experiment_id:
return ("Bad request: TABLE_PER_EXPERIMENT is enabled but payload has no experiment id", 400)

zip_bytes = download_export(download_url)
csv_files = extract_and_sort_csvs(zip_bytes)
rows_loaded = load_csvs_into_bigquery(csv_files, experiment_id)

return (json.dumps({
"status": "ok",
"files_processed": len(csv_files),
"rows_loaded": rows_loaded,
}), 200, {"Content-Type": "application/json"})


def download_export(download_url: str) -> bytes:
"""Download the export ZIP file from the ABsmartly API."""
url = f"{ABSMARTLY_API_URL.rstrip('/')}{download_url}"
response = requests.get(
url,
headers={"Authorization": f"Api-Key {ABSMARTLY_API_KEY}"},
timeout=DOWNLOAD_TIMEOUT,
)
response.raise_for_status()
return response.content


def extract_and_sort_csvs(zip_bytes: bytes) -> list[tuple[str, bytes]]:
"""Extract CSV files from a ZIP archive and return them sorted by name.

Exports with more than 5 million rows are split into multiple CSVs.
Sorting by filename preserves the original row order across files.
"""
csv_files = []
with zipfile.ZipFile(io.BytesIO(zip_bytes)) as zf:
for name in zf.namelist():
if name.lower().endswith(".csv"):
csv_files.append((name, zf.read(name)))
csv_files.sort(key=lambda entry: entry[0])
return csv_files


def load_csvs_into_bigquery(csv_files: list[tuple[str, bytes]], experiment_id: int) -> int:
"""Load CSV files into BigQuery in order, return total rows loaded."""
client = bigquery.Client()

if TABLE_PER_EXPERIMENT and experiment_id:
table_name = f"experiment_{experiment_id}_exports"
else:
table_name = BIGQUERY_TABLE

table_ref = f"{client.project}.{BIGQUERY_DATASET}.{table_name}"

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

total_rows = 0
for filename, content in csv_files:
load_job = client.load_table_from_file(
io.BytesIO(content),
table_ref,
job_config=job_config,
)
load_job.result() # wait for completion
total_rows += load_job.output_rows
print(f"Loaded {load_job.output_rows} rows from {filename}")

print(f"Total: {total_rows} rows loaded into {table_ref}")
return total_rows

requirements.txt

requirements.txt
functions-framework==3.10.1
google-cloud-bigquery==3.41.0
requests==2.33.1

Step 3 — Deploy the Cloud Function

Deploy with gcloud, setting the required environment variables:

gcloud functions deploy absmartly-export-to-bigquery \
--gen2 \
--region=us-central1 \
--runtime=python312 \
--entry-point=handle_webhook \
--trigger-http \
--memory=512Mi \
--timeout=540s \
--set-env-vars="\
ABSMARTLY_API_URL=https://your-subdomain.absmartly.com,\
BIGQUERY_DATASET=absmartly,\
BIGQUERY_TABLE=experiment_exports,\
TABLE_PER_EXPERIMENT=false" \
--set-secrets="ABSMARTLY_API_KEY=absmartly-api-key:latest"
info
The --set-secrets flag references a secret stored inGoogle Secret Manager. Create the secret first:
echo -n "YOUR_API_KEY" | gcloud secrets create absmartly-api-key --data-file=-
This avoids exposing the API key in shell history or CI logs.
tip
By default, Cloud Functions require authenticated invocation. If you need the function to accept unauthenticated requests (e.g. because ABsmartly cannot authenticate to GCP), add --allow-unauthenticated to the deploy command.

Table strategy

The TABLE_PER_EXPERIMENT environment variable controls how data is organized in BigQuery:

  • false (default) — All exports go into a single table (set by BIGQUERY_TABLE, e.g. experiment_exports). The experiment ID is included as a column in the data, so you can filter by experiment. This is simpler to manage and query across experiments.
  • true — Each experiment gets its own table, named experiment_{id}_exports (e.g. experiment_42_exports). This gives better isolation between experiments and avoids mixing schemas if different experiments export different columns.

After deployment, gcloud prints the function's URL. Copy it — you'll need it in the next step.

info
The --timeout=540s (9 minutes) allows time for large exports to download and load. Adjust based on your typical export sizes. The--memory=512Mi should be sufficient for most exports; increase if you see out-of-memory errors in Cloud Function logs.

Step 4 — Configure the Webhook in ABsmartly

  1. In the ABsmartly Web Console, go to Settings > Webhooks.
  2. Click Create Webhook.
  3. Fill in the details:
    • URL: The Cloud Function URL from the previous step.
    • Events: Select ExperimentDataExportReady.
  4. Save the webhook.

Now, whenever you download experiment event data from an experiment's page and the export completes, ABsmartly will POST the event to your Cloud Function, which will automatically download and load the data into BigQuery.

Webhook Payload Reference

The ExperimentDataExportReady event is fired when an experiment data export finishes and is ready for download. The payload has the following shape:

{
"event_name": "ExperimentDataExportReady",
"event_at": 1712937600000,
"id": 42,
"user_id": 7,
"metadata": {
"download_url": "/v1/experiments/exports/15/experiment_42_export.zip",
"export_config_id": 15,
"file_name": "experiment_42_export.zip"
}
}
FieldDescription
event_nameAlways ExperimentDataExportReady
event_atTimestamp (Unix ms) of when the export completed
idThe experiment ID
user_idThe user ID who triggered the export
metadata.download_urlRelative path to download the ZIP file from the ABsmartly API
metadata.export_config_idThe ID of the export configuration
metadata.file_nameThe filename of the ZIP archive

Alternative: Direct Cloud Storage Access

If your ABsmartly instance is configured to store exports in Google Cloud Storage (or S3) and you have direct access to the bucket, you can skip the API download and read the file directly from storage.

Replace the download_export function with:

Google Cloud Storage

from google.cloud import storage

def download_export(download_url: str) -> bytes:
"""Download the export ZIP directly from GCS."""
bucket_name = os.environ["EXPORT_BUCKET"] # e.g. absmartly-exports
# Extract the object key from the download_url or file_name
blob_name = download_url.rsplit("/", 1)[-1]

client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
return blob.download_as_bytes()

Add google-cloud-storage==2.* to requirements.txt.

Amazon S3

import boto3

def download_export(download_url: str) -> bytes:
"""Download the export ZIP directly from S3."""
bucket_name = os.environ["EXPORT_BUCKET"] # e.g. absmartly-exports
object_key = download_url.rsplit("/", 1)[-1]

s3 = boto3.client("s3")
response = s3.get_object(Bucket=bucket_name, Key=object_key)
return response["Body"].read()

Add boto3==1.* to requirements.txt.

info
When using direct storage access, the Cloud Function's service account needs read permissions on the bucket. For GCS, grant theStorage Object Viewer role. For S3, configure appropriate IAM credentials.

Filtering Exports

You may not want every experiment export to land in BigQuery. The webhook payload contains the experiment ID, so the Cloud Function can call the ABsmartly Web Console API to fetch experiment details and decide whether to proceed based on tags, owners, or teams.

Add this helper function to main.py:

def get_experiment(experiment_id: int) -> dict:
"""Fetch experiment details from the ABsmartly Web Console API."""
url = f"{ABSMARTLY_API_URL.rstrip('/')}/v1/experiments/{experiment_id}"
response = requests.get(
url,
headers={"Authorization": f"Api-Key {ABSMARTLY_API_KEY}"},
timeout=DOWNLOAD_TIMEOUT,
)
response.raise_for_status()
return response.json().get("experiment", {})

Then add a filter check in handle_webhook before the download:

    experiment_id = payload.get("id")
experiment = get_experiment(experiment_id)

if not should_export(experiment):
return (f"Skipped: experiment {experiment_id} does not match filter", 200)

zip_bytes = download_export(download_url)
# ...

Below are examples of should_export functions you can mix and match.

Filter by experiment tag

Only export experiments tagged with a specific tag (e.g. bigquery):

REQUIRED_TAG = os.environ.get("EXPORT_FILTER_TAG", "bigquery")

def should_export(experiment: dict) -> bool:
"""Only export if the experiment has the required tag."""
tags = {
t["experiment_tag"]["tag"]
for t in experiment.get("experiment_tags", [])
}
return REQUIRED_TAG in tags

Filter by experiment owner

Only export experiments owned by a specific user:

REQUIRED_OWNER_EMAIL = os.environ.get("EXPORT_FILTER_OWNER", "")

def should_export(experiment: dict) -> bool:
"""Only export if the experiment is owned by the specified user."""
if not REQUIRED_OWNER_EMAIL:
return True
owner_emails = {
o["user"]["email"]
for o in experiment.get("owners", [])
}
return REQUIRED_OWNER_EMAIL in owner_emails

Filter by team

Only export experiments belonging to a specific team:

REQUIRED_TEAM = os.environ.get("EXPORT_FILTER_TEAM", "")

def should_export(experiment: dict) -> bool:
"""Only export if the experiment belongs to the specified team."""
if not REQUIRED_TEAM:
return True
team_names = {
t["team"]["name"]
for t in experiment.get("teams", [])
}
return REQUIRED_TEAM in team_names
tip
You can combine multiple filters. Set the environment variables in the deploy command (e.g. EXPORT_FILTER_TAG=bigquery) and chain the conditions in a single should_export function.

Verifying the Integration

  1. In the ABsmartly Web Console, open an experiment and download its event data from the experiment page.
  2. Wait for the export to complete — this fires the ExperimentDataExportReady webhook.
  3. Check the Cloud Function logs for the download and load output:
    gcloud functions logs read absmartly-export-to-bigquery --gen2 --region=us-central1
  4. Query the data in BigQuery:
    SELECT * FROM `YOUR_PROJECT_ID.absmartly.experiment_exports` LIMIT 10;

Querying Experiment Data

The exported data contains events — exposures, goals, and attributes — all in the same table, distinguished by the event_type column. Key columns include:

ColumnDescription
unit_uidThe unit identifier (e.g. user ID)
unit_typeThe unit type (e.g. user_id, anonymous_id)
event_typeexposure, goal, or attribute
event_atEvent timestamp
experiment_idThe experiment ID (exposure events)
experiment_nameThe experiment name (exposure events)
variantThe assigned variant (exposure events)
goal_nameThe goal name (goal events)
propertiesGoal properties as JSON (goal events)

Count unique participants per variant

Each unit may have multiple exposure events. To count unique participants, deduplicate by unit_uid using the first exposure (earliest event_at):

SELECT
variant,
COUNT(*) AS unique_participants
FROM (
SELECT
unit_uid,
variant,
ROW_NUMBER() OVER (PARTITION BY unit_uid ORDER BY event_at ASC) AS rn
FROM `YOUR_PROJECT_ID.absmartly.experiment_exports`
WHERE event_type = 'exposure'
AND experiment_id = 42
)
WHERE rn = 1
GROUP BY variant
ORDER BY variant;

Conversion rate per variant

To calculate conversion rates, join each unit's first exposure with whether they achieved a goal. A unit counts as a converter if they triggered the goal after their first exposure:

WITH first_exposures AS (
SELECT
unit_uid,
variant,
MIN(event_at) AS first_exposed_at
FROM `YOUR_PROJECT_ID.absmartly.experiment_exports`
WHERE event_type = 'exposure'
AND experiment_id = 42
GROUP BY unit_uid, variant
),
conversions AS (
SELECT DISTINCT
g.unit_uid
FROM `YOUR_PROJECT_ID.absmartly.experiment_exports` g
INNER JOIN first_exposures e ON g.unit_uid = e.unit_uid
WHERE g.event_type = 'goal'
AND g.goal_name = 'your_goal_name'
AND g.event_at >= e.first_exposed_at
)
SELECT
e.variant,
COUNT(*) AS unique_participants,
COUNTIF(c.unit_uid IS NOT NULL) AS converters,
ROUND(COUNTIF(c.unit_uid IS NOT NULL) / COUNT(*), 4) AS conversion_rate
FROM first_exposures e
LEFT JOIN conversions c ON e.unit_uid = c.unit_uid
GROUP BY e.variant
ORDER BY e.variant;

Per-experiment table mode

When using TABLE_PER_EXPERIMENT=true, replace the table name with the experiment-specific table and remove the experiment_id filter:

-- Unique participants per variant
SELECT
variant,
COUNT(*) AS unique_participants
FROM (
SELECT
unit_uid,
variant,
ROW_NUMBER() OVER (PARTITION BY unit_uid ORDER BY event_at ASC) AS rn
FROM `YOUR_PROJECT_ID.absmartly.experiment_42_exports`
WHERE event_type = 'exposure'
)
WHERE rn = 1
GROUP BY variant
ORDER BY variant;

Troubleshooting

SymptomPossible cause
Function returns 405Webhook is sending a GET instead of POST — check the webhook URL
401 Unauthorized on downloadABSMARTLY_API_KEY is incorrect or missing
404 Not Found on downloadExport file may have expired — check ABSMARTLY_API_URL and the download_url
BigQuery permission errorCloud Function's service account needs BigQuery Data Editor role on the dataset
Timeout errorsIncrease --timeout and --memory in the deployment command
Rows appear out of orderEnsure CSV files are sorted by name before loading (the provided function does this)

If you have any issues setting up this integration, please contact us at [email protected].