Skip to content

Data Ingestion (EL)

  • From the Airbyte dashboard, click on Sources in the left-hand navigation menu.
  • Click the + New source button in the top right corner.
  • Search for and select the desired connector from the list (e.g., LinkedIn Ads, Bing Ads).
  • Authenticate: Follow the prompts to authorize the connection to the third-party account.
  • Start Date: Input the date from which you want to begin ingesting historical documentation/data.
  • Consult Documentation: Use the setup guide panel on the right side of the screen for specific prerequisites and API requirements for your chosen source.
  • After setting up the source, proceed to the next step and select your destination (e.g., databricks_production).

4. Configure Schema (Critical for Billing)

Section titled “4. Configure Schema (Critical for Billing)”
  • Navigate to the Schema tab of your connection.
  • Review the list of available streams (tables).
  • Action: Uncheck any streams or rows that are not strictly necessary.
  • Reasoning: Airbyte billing is based on the number of rows synced. Syncing unnecessary data will significantly increase costs.
  • Run a Test sync to ensure the connection is valid.
  • Once confirmed, enable the connection.
  • Log in to your Azure Databricks environment.
  • Navigate to the Catalog.
  • Go to the Raw schema (or your configured ingestion location).
  • Locate the namespace that matches the destination name you set in Airbyte (e.g., bing_ads).
  • Click on a table (stream) to view the schema and verify that:
    • The data columns are present.
    • The Airbyte metadata columns (e.g., _airbyte_extracted_at) are visible.

Summary: This guide explains how to investigate a failed sync within Airbyte, locate specific error logs, and re-trigger the sync after applying a fix.

  • Navigate to the Connections dashboard.
  • Look for connections marked with a red status bar or “Sync Failed” (e.g., Google Ads -> databricks_production).
  • Tip: You can use the 30-day history bar to visually spot past failures.
  1. Click on the name of the failed connection to open its details.
  2. Select the Timeline tab from the top menu.
  3. Scroll to find the specific Sync failed event entry.

You have two options to view errors:

  • Option A: Quick View
    • Click the See more link next to the error message.
    • Example: In the video, this revealed a \t (tab) character was used where a space was expected.
  • Option B: View Full Logs
    • If the quick view is insufficient, click the three vertical dots (kebab menu) on the right of the error row.
    • Select View logs.
    • Review the generated log file to find the specific exception or error message.
  1. Apply the necessary fix to your source data or connector settings.
  2. Return to the connection dashboard.
  3. Click the Sync now button in the top right corner.
  4. Monitor the Streams status or Timeline to ensure the sync completes successfully.

When a pre-built connector (e.g., for a CRM or Criteo) is not available in the standard Airbyte catalog, you must build a Custom Connector. This process involves defining the API base URL, authentication method, and the specific data streams you wish to replicate.

  1. Navigate to the Builder tab in the Airbyte sidebar.
  2. Click to create a new connector or edit an existing one (the video demonstrates editing an existing “Criteo” connector).
  3. The builder is divided into two main sections: Global Configuration and Streams.

Step 2: Global Configuration & Authentication

Section titled “Step 2: Global Configuration & Authentication”

You must set up how Airbyte connects to the API globally.

  • API Base URL: Enter the root URL for the API.
  • Authentication Method: Configure the authentication flow (The video demonstrates OAuth2).
    • Headers: Set up standard headers (e.g., accept: application/json, Content-Type).
    • Grant Type: Specify the grant type (e.g., client_credentials or authorization_code).
    • Token Refresh: Since this uses OAuth2, you must configure the Refresh Token Endpoint so Airbyte can maintain access without manual intervention.
    • Variables: Input the Client ID and Client Secret as variables in the configuration form.

This step defines the specific data endpoint you want to pull (in this example, a statistics report).

  • URL Path: Define the specific endpoint (e.g., /2021-01/statistics/report).
  • HTTP Method: Set to POST.
  • Request Body: Select JSON Object Body and define the payload required by the API. In the video, the following keys are configured:
    • format: “csv”
    • metrics: List of metrics (Clicks, Display, AdvertiserCost, Visits, etc.)
    • currency: “USD”
    • dimensions: List of dimensions (Adset, AdvertiserId, CampaignId, etc.)
    • advertiserIds: The specific ID for the account (e.g., “Blue Compass RV”).

To ensure efficient data loading, configure the sync to only pull new data rather than a full refresh every time.

  • Sync Mode: Select Incremental Sync.
  • Cursor: Use a Datetime Based Cursor.
  • Start/End Date Logic:
    • The video shows a configuration ensuring the sync finishes at midnight of the previous day (min_max_datetime logic) to ensure a full 24-hour period is captured.
  • Granularity: Set to 1 Day.
  • Event Frequency: The system is set to check for events every 1 Second to ensure no data is missed.

Configure how the connector reacts to API failures:

  • Retry Count: Set to 3 times.
  • Backoff Time: Set to 5 seconds.
  1. Test/Preview: Use the “Test” button in the builder to preview the data and ensure the stream is working correctly.
  2. Publish: Once verified, publish the connector.
  3. Add Source:
    • Go to the Sources tab.
    • Search for your newly created custom connector title.
    • Configure it with the destination (in the video, the destination is Databricks Production).
  4. Destination Settings:
    • Verify the Namespace and Schema.
    • The video shows the data being written to the raw catalog under the criteo schema.

Before querying data programmatically, you must establish a secure connection object within the Databricks UI.

  1. Navigate to External Data:

    • In the Databricks left-hand sidebar, click on Catalog.
    • Select the External Data tab at the top of the pane.
  2. Create Connection:

    • Click on Connections.
    • Click the Create connection button in the top right corner.
    • Note: In the video example, a connection named external_bigquery has already been established.

Once this connection is set up, you can reference it in notebooks or SQL queries just as you would a local table.


The video demonstrates a notebook workflow for loading Google Analytics 4 (GA4) raw data from BigQuery into Databricks.

To read from the external source, use the spark.read command with the BigQuery format. You must provide the credentials and the specific source table.

Key Configuration Options:

  • Format: bigquery
  • parentProject: The Google Cloud Project ID.
  • table: The specific BigQuery table you are querying.
  • Filter (Optional): You can apply filters (e.g., col("event_name") != "user_engagement") directly after reading to limit data volume.

To ensure efficiency, do not reload the entire dataset every time. The video demonstrates a logic flow to handle “materialization” and incremental loading:

  • Log Check: The script checks a log to see which dates have already been loaded.
  • Filter by Date: It filters the source query to only fetch data for dates that do not exist in the destination table.
  • Intraday vs. Daily: The pipeline distinguishes between “intraday” tables (events happening today that aren’t finalized) and historical daily tables.

Perform necessary transformations on the DataFrame df.

  • Example: Transforming timestamp formats or adding specific metadata columns.

Once the data is read and transformed, it is written to a Databricks Delta table.

The video highlights a specific write command pattern to ensure performance and data integrity.

(df.write
.format("delta")
.partitionBy("event_date") # Crucial for performance on large datasets
.mode("append") # Use 'append' to add new rows, avoiding full overwrites
.option("mergeSchema", "true") # Allows the schema to evolve if new columns appear
.saveAsTable("catalog.schema.table_name")
)
  • partitionBy("event_date"): Because the dataset is large (GA4 events), partitioning by date organizes the files physically, speeding up future queries that filter by date.
  • mode("append"): Since this is a log of events, you want to add new records to the bottom of the table rather than overwriting the whole history.
  • saveAsTable: This registers the data in the Databricks Hive metastore/Unity Catalog (e.g., raw.ga4.ga4_events), making it accessible via SQL.

This process utilizes the Databricks Auto Loader feature (cloudFiles) within a Delta Live Tables pipeline. It is designed to incrementally ingest data from cloud storage as new files arrive.


Before writing the ingestion code, you must locate the path to your external data.

  1. Navigate to Catalog in the Databricks sidebar.
  2. Select External Data.
  3. Locate the specific connection (e.g., braze-currents).
  4. Copy the URL (Base Path) provided in the details view (e.g., abfss://container@storage.dfs.core.windows.net/).

Create a new notebook or use an existing DLT notebook. The ingestion logic uses the @dlt.table decorator and spark.readStream.

import dlt
@dlt.table(
name="target_table_name",
table_properties={"quality": "bronze"} # Optional properties
)
def target_function_name():
return (
spark.readStream
.format("cloudFiles") # Activates Auto Loader
.option("cloudFiles.format", "avro") # Specifies source file format (Avro, JSON, CSV, etc.)
.load("abfss://<path_to_data>/*/*/*/*.avro") # Path with wildcards
)

Using .format("cloudFiles") tells Spark to use Auto Loader. This automatically handles schema inference and state management for file ingestion.

You must specify the format of the raw files sitting in cloud storage using the option: .option("cloudFiles.format", "avro") Note: Update “avro” to “json”, “csv”, or “parquet” depending on your source data.

When defining the .load() path, use asterisks (*) as wildcards to navigate folder structures.

  • Example: .../date=2023-10-01/event_type=user/...
  • Usage: If your data is nested in dynamic folders (like dates or categories), use * to tell Auto Loader to search recursively through those directories (e.g., /*/*/*.avro).
Step 4: Operational Behavior & Limitations
Section titled “Step 4: Operational Behavior & Limitations”
  • Incremental Ingestion: Auto Loader tracks which files have been processed. When the pipeline runs again, it will only ingest new files added to the storage since the last run.
  • Append-Only: This method is optimized for append-only workflows.
    • Warning: Auto Loader does not automatically handle updates to existing rows or deletions of rows in the source files. If your use case requires updates (CDC), you will need downstream logic (like dlt.apply_changes) to handle merging.
  1. Navigate to the Delta Live Tables pipeline view.
  2. You will see the directed graph (DAG) of your tables (e.g., subscription_actions, subscription_group_actions).
  3. Click on a table node to view:
    • Schema: The columns detected from the source files.
    • Metrics: Processing rates and row counts.
    • Status: ensure the compute starts and the state transitions to “Running” or “Completed”.

Next Step: Would you like me to write a PySpark snippet that cleans up the wildcards in your specific path structure to ensure optimal discovery performance?