Skip to content

Ingestion & ETL Manual

View Airbyte Console

Airbyte consists of sources, destinations, and connections.

  • Connections have a defined schema for saving data. This schema is determined by the source and can be updated whenever the source updates its schema.
  • It reads the source at the defined schedule, looks for new records since the last read, deduplicates records, and saves them in the destination.
  • You can set the sync schedule using any cron job expression.
  • You can view an overview of the last 30 syncs in Airbyte, as well as logs for all of those syncs. This is helpful when figuring out why a sync failed (very rare).
  • Airbyte bills based on the number of rows synced, so it is often better to get daily aggregated data synced (unless you need something more granular).

Airbyte offers 200+ open source pre-built connectors. We use these for the following sources:

  • Google Search Console
  • Marketing Budget
  • Trusted Feed
  • Google Ads
  • Bing Ads

When the source schema is updated (new columns added or data type changes), Airbyte automatically updates the destination schema in Databricks. For certain sources that use OAuth 2.0, you will have to re-authenticate the Airbyte connector about every 3 months.

  • Bing Ads → databricks_production
  • Criteo → databricks_production
  • Google Ads → databricks_production
  • Google Search Console → databricks_production
  • Marketing Budget → databricks_production
  • theCRM → databricks_production
  • Trusted Feed → databricks_production

To add a new source, start by checking the Airbyte Integrations page. Airbyte provides a comprehensive catalog of pre-built connectors for many popular APIs, databases, and file systems. These connectors are categorized by support level: Airbyte-maintained, Community-maintained (Marketplace), and Enterprise-only.

If you find a suitable connector in the catalog, you can simply configure it within your Airbyte workspace. If the connector you need is not available, or if existing connectors do not meet your specific requirements, you may need to build a custom solution.

If a pre-built connector is not available, you can build a custom connector using the Connector Builder. The Connector Builder is a no-code tool built into the Airbyte UI that allows you to create source connectors using an intuitive interface.

Behind the scenes, the Connector Builder generates a standard low-code YAML manifest. You can define authentication methods, pagination, and record processing logic without writing code. Once your connector is built and tested, you can publish it to your workspace to use in your data pipelines, or even contribute it to the Airbyte Marketplace.

For sources that do not have a pre-built connector but that we want to take advantage of Airbyte’s deduplication & automatic schema management, we use custom connectors for:

  • Criteo
  • theCRM

A custom connector consists of 2 parts: the authentication & the streams to sync. You can test changes and see the data returned before publishing changes to custom connectors.

You can see more documentation around custom connectors here.

When a sync fails, follow these steps to diagnose the issue using the Airbyte interface.

Navigate to the specific connection that is experiencing issues. In the connection dashboard, look for the ‘Job History’ or ‘Timeline’ tab. This section provides a historical view of all sync attempts, displaying their status, start time, and duration. Locate the failed sync attempt, which will typically be marked with a red status indicator. This view helps you establish a pattern of failures or identify if the issue is an isolated incident.

Click on the failed job entry to access its details. Look for a ‘Logs’ button or tab to view the detailed execution logs for that specific attempt. The logs contain the raw output from the sync process, including information from both the source and destination connectors. Scroll through the logs or search for keywords like “ERROR”, “Exception”, or “Traceback”. These messages will usually point to the specific cause of the failure, such as invalid credentials, network timeouts, or schema mismatches.

The main catalog to look in is cleaned.

  • The raw catalog is where imported data is first stored.
  • rudderstack_prod has all of the user stitching data.
  • After validation, enrichment, & aggregation it is stored in clean.
  • You can see the schema and sample data for each table.
  • You have read access to the clean & raw catalogs, and editor permission in the uni_gw catalog.

Attach compute resources to run jobs and view data

Section titled “Attach compute resources to run jobs and view data”
  • Serverless compute can be used for viewing sample data in tables and running SQL queries. It takes a few seconds to be ready.
  • All purpose compute is used to run notebooks using Python, R, or Scala. It takes a few minutes to be ready.
  • Both power down after 10 minutes of not being used, but cost is associated with time compute is active so just don’t keep them active unless there is a query or notebook you need to run.
  • Try to use the Local Pyspark Cluster & Local Queries Serverless compute whenever possible so that scheduled jobs & data loading is not affected.
  • Repos folder is for connected Github or other code repositories.
  • Shared folder can be used for notebooks or queries. There is a umich folder in there to use.
  • You can also store notebooks and queries in your own folder if you are not ready to share it.

Databricks Autoloader is a special way to read data from cloud storage that allows you to only ingest new files added to a storage bucket location. It keeps a KV store of what files have been already read so that it only reads new files.

  • While extremely useful, Autoloader should only be used for “append only” data sources, since it will not check already read files for changes.
  • If you delete the checkpoint file (KV store), you will reingest all files that have already been read.
  • When pointing autoloader at a location, you can use wildcards to ingest all files in a variety of folders such as:
    • /export/*/*/*.json will read all files nested 2 levels deep in the export folder.
    • /export/*/*/users_*.json will read all files nested 2 levels deep that start with users_.
    • /{2024, 2025}/*.csv will read all CSV files in the 2024 and 2025 folders.
@dlt.table(
comment="Blueshift user actions",
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
"tag.level": "raw",
"tag.source": "blueshift"
}
)
def st_user_actions():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3a://bsft-customers/rvretailer.net/useractions/json/{2024,2025}/*/*/*.json.gz")
)
@dlt.table(
comment="Blueshift campaign activity",
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
"tag.level": "raw",
"tag.source": "blueshift"
}
)
def st_campaign_activity():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("multiLine", "true") \
.option("escape", "\"") \
.option("quote", "\"") \
.option("rescuedDataColumn", "_rescued_csv_data") \
.load("s3a://bsft-customers/rvretailer.net/campaign_activity/{2024,2025}/*/*/*.csv.gz")
)
SourceIngestion MethodNotebook
GA4 Events (BigQuery)External Catalogga4_raw_loader
Braze Currents (ALDS)Autoloaderbraze_autoloader
Blueshift Events (S3)Autoloaderblueshift_autoloader
Blueshift Users (S3)External Connectionblueshift_user_import
Facebook Ads (ALDS)Autoloaderfacebook_ads_import
IDS (ALDS)External Connectionids_import
Web Inventory (R2)Autoloaderweb_inventory_import
Web Leads (BigQuery)External Catalogweb_lead_raw_loader