Ingestion & ETL Manual
Airbyte
Section titled “Airbyte”General Information
Section titled “General Information”Airbyte consists of sources, destinations, and connections.
- Connections have a defined schema for saving data. This schema is determined by the source and can be updated whenever the source updates its schema.
- It reads the source at the defined schedule, looks for new records since the last read, deduplicates records, and saves them in the destination.
- You can set the sync schedule using any cron job expression.
- You can view an overview of the last 30 syncs in Airbyte, as well as logs for all of those syncs. This is helpful when figuring out why a sync failed (very rare).
- Airbyte bills based on the number of rows synced, so it is often better to get daily aggregated data synced (unless you need something more granular).
Airbyte offers 200+ open source pre-built connectors. We use these for the following sources:
- Google Search Console
- Marketing Budget
- Trusted Feed
- Google Ads
- Bing Ads
When the source schema is updated (new columns added or data type changes), Airbyte automatically updates the destination schema in Databricks. For certain sources that use OAuth 2.0, you will have to re-authenticate the Airbyte connector about every 3 months.
List of Active Connections
Section titled “List of Active Connections”- Bing Ads → databricks_production
- Criteo → databricks_production
- Google Ads → databricks_production
- Google Search Console → databricks_production
- Marketing Budget → databricks_production
- theCRM → databricks_production
- Trusted Feed → databricks_production
Guide: How to add a new Source
Section titled “Guide: How to add a new Source”To add a new source, start by checking the Airbyte Integrations page. Airbyte provides a comprehensive catalog of pre-built connectors for many popular APIs, databases, and file systems. These connectors are categorized by support level: Airbyte-maintained, Community-maintained (Marketplace), and Enterprise-only.
If you find a suitable connector in the catalog, you can simply configure it within your Airbyte workspace. If the connector you need is not available, or if existing connectors do not meet your specific requirements, you may need to build a custom solution.
Custom Connectors
Section titled “Custom Connectors”If a pre-built connector is not available, you can build a custom connector using the Connector Builder. The Connector Builder is a no-code tool built into the Airbyte UI that allows you to create source connectors using an intuitive interface.
Behind the scenes, the Connector Builder generates a standard low-code YAML manifest. You can define authentication methods, pagination, and record processing logic without writing code. Once your connector is built and tested, you can publish it to your workspace to use in your data pipelines, or even contribute it to the Airbyte Marketplace.
For sources that do not have a pre-built connector but that we want to take advantage of Airbyte’s deduplication & automatic schema management, we use custom connectors for:
- Criteo
- theCRM
A custom connector consists of 2 parts: the authentication & the streams to sync. You can test changes and see the data returned before publishing changes to custom connectors.
You can see more documentation around custom connectors here.
Guide: Troubleshooting Sync Failures
Section titled “Guide: Troubleshooting Sync Failures”When a sync fails, follow these steps to diagnose the issue using the Airbyte interface.
Step 1: Open the Timeline
Section titled “Step 1: Open the Timeline”Navigate to the specific connection that is experiencing issues. In the connection dashboard, look for the ‘Job History’ or ‘Timeline’ tab. This section provides a historical view of all sync attempts, displaying their status, start time, and duration. Locate the failed sync attempt, which will typically be marked with a red status indicator. This view helps you establish a pattern of failures or identify if the issue is an isolated incident.
Step 2: View the Logs
Section titled “Step 2: View the Logs”Click on the failed job entry to access its details. Look for a ‘Logs’ button or tab to view the detailed execution logs for that specific attempt. The logs contain the raw output from the sync process, including information from both the source and destination connectors. Scroll through the logs or search for keywords like “ERROR”, “Exception”, or “Traceback”. These messages will usually point to the specific cause of the failure, such as invalid credentials, network timeouts, or schema mismatches.
Databricks Ingestion
Section titled “Databricks Ingestion”Explore data in the Catalog
Section titled “Explore data in the Catalog”The main catalog to look in is cleaned.
- The
rawcatalog is where imported data is first stored. rudderstack_prodhas all of the user stitching data.- After validation, enrichment, & aggregation it is stored in
clean. - You can see the schema and sample data for each table.
- You have read access to the
clean&rawcatalogs, and editor permission in theuni_gwcatalog.
Attach compute resources to run jobs and view data
Section titled “Attach compute resources to run jobs and view data”- Serverless compute can be used for viewing sample data in tables and running SQL queries. It takes a few seconds to be ready.
- All purpose compute is used to run notebooks using Python, R, or Scala. It takes a few minutes to be ready.
- Both power down after 10 minutes of not being used, but cost is associated with time compute is active so just don’t keep them active unless there is a query or notebook you need to run.
- Try to use the Local Pyspark Cluster & Local Queries Serverless compute whenever possible so that scheduled jobs & data loading is not affected.
Organize notebooks & queries in Workspace
Section titled “Organize notebooks & queries in Workspace”- Repos folder is for connected Github or other code repositories.
- Shared folder can be used for notebooks or queries. There is a
umichfolder in there to use. - You can also store notebooks and queries in your own folder if you are not ready to share it.
Autoloader patterns and best practices
Section titled “Autoloader patterns and best practices”Databricks Autoloader is a special way to read data from cloud storage that allows you to only ingest new files added to a storage bucket location. It keeps a KV store of what files have been already read so that it only reads new files.
- While extremely useful, Autoloader should only be used for “append only” data sources, since it will not check already read files for changes.
- If you delete the checkpoint file (KV store), you will reingest all files that have already been read.
- When pointing autoloader at a location, you can use wildcards to ingest all files in a variety of folders such as:
/export/*/*/*.jsonwill read all files nested 2 levels deep in the export folder./export/*/*/users_*.jsonwill read all files nested 2 levels deep that start withusers_./{2024, 2025}/*.csvwill read all CSV files in the 2024 and 2025 folders.
Example Usage
Section titled “Example Usage”@dlt.table( comment="Blueshift user actions", table_properties={ "delta.autoOptimize.optimizeWrite": "true", "delta.autoOptimize.autoCompact": "true", "tag.level": "raw", "tag.source": "blueshift" })def st_user_actions(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load("s3a://bsft-customers/rvretailer.net/useractions/json/{2024,2025}/*/*/*.json.gz") )
@dlt.table( comment="Blueshift campaign activity", table_properties={ "delta.autoOptimize.optimizeWrite": "true", "delta.autoOptimize.autoCompact": "true", "tag.level": "raw", "tag.source": "blueshift" })def st_campaign_activity(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("multiLine", "true") \ .option("escape", "\"") \ .option("quote", "\"") \ .option("rescuedDataColumn", "_rescued_csv_data") \ .load("s3a://bsft-customers/rvretailer.net/campaign_activity/{2024,2025}/*/*/*.csv.gz") )External Catalog configurations
Section titled “External Catalog configurations”| Source | Ingestion Method | Notebook |
|---|---|---|
| GA4 Events (BigQuery) | External Catalog | ga4_raw_loader |
| Braze Currents (ALDS) | Autoloader | braze_autoloader |
| Blueshift Events (S3) | Autoloader | blueshift_autoloader |
| Blueshift Users (S3) | External Connection | blueshift_user_import |
| Facebook Ads (ALDS) | Autoloader | facebook_ads_import |
| IDS (ALDS) | External Connection | ids_import |
| Web Inventory (R2) | Autoloader | web_inventory_import |
| Web Leads (BigQuery) | External Catalog | web_lead_raw_loader |