Skip to content

Data Ingestion (EL)

  • From the Airbyte dashboard, click on Sources in the left-hand navigation menu.
  • Click the + New source button in the top right corner.
  • Search for and select the desired connector from the list (e.g., LinkedIn Ads, Bing Ads).
  • Authenticate: Follow the prompts to authorize the connection to the third-party account.
  • Start Date: Input the date from which you want to begin ingesting historical documentation/data.
  • Consult Documentation: Use the setup guide panel on the right side of the screen for specific prerequisites and API requirements for your chosen source.
  • After setting up the source, proceed to the next step and select your destination (e.g., databricks_production).

4. Configure Schema (Critical for Billing)

Section titled “4. Configure Schema (Critical for Billing)”
  • Navigate to the Schema tab of your connection.
  • Review the list of available streams (tables).
  • Action: Uncheck any streams or rows that are not strictly necessary.
  • Reasoning: Airbyte billing is based on the number of rows synced. Syncing unnecessary data will significantly increase costs.
  • Run a Test sync to ensure the connection is valid.
  • Once confirmed, enable the connection.
  • Log in to your Azure Databricks environment.
  • Navigate to the Catalog.
  • Go to the Raw schema (or your configured ingestion location).
  • Locate the namespace that matches the destination name you set in Airbyte (e.g., bing_ads).
  • Click on a table (stream) to view the schema and verify that:
    • The data columns are present.
    • The Airbyte metadata columns (e.g., _airbyte_extracted_at) are visible.

Summary: This guide explains how to investigate a failed sync within Airbyte, locate specific error logs, and re-trigger the sync after applying a fix.

  • Navigate to the Connections dashboard.
  • Look for connections marked with a red status bar or “Sync Failed” (e.g., Google Ads -> databricks_production).
  • Tip: You can use the 30-day history bar to visually spot past failures.
  1. Click on the name of the failed connection to open its details.
  2. Select the Timeline tab from the top menu.
  3. Scroll to find the specific Sync failed event entry.

You have two options to view errors:

  • Option A: Quick View
    • Click the See more link next to the error message.
    • Example: In the video, this revealed a \t (tab) character was used where a space was expected.
  • Option B: View Full Logs
    • If the quick view is insufficient, click the three vertical dots (kebab menu) on the right of the error row.
    • Select View logs.
    • Review the generated log file to find the specific exception or error message.
  1. Apply the necessary fix to your source data or connector settings.
  2. Return to the connection dashboard.
  3. Click the Sync now button in the top right corner.
  4. Monitor the Streams status or Timeline to ensure the sync completes successfully.

When a pre-built connector (e.g., for a CRM or Criteo) is not available in the standard Airbyte catalog, you must build a Custom Connector. This process involves defining the API base URL, authentication method, and the specific data streams you wish to replicate.

  1. Navigate to the Builder tab in the Airbyte sidebar.
  2. Click to create a new connector or edit an existing one (the video demonstrates editing an existing “Criteo” connector).
  3. The builder is divided into two main sections: Global Configuration and Streams.

Step 2: Global Configuration & Authentication

Section titled “Step 2: Global Configuration & Authentication”

You must set up how Airbyte connects to the API globally.

  • API Base URL: Enter the root URL for the API.
  • Authentication Method: Configure the authentication flow (The video demonstrates OAuth2).
    • Headers: Set up standard headers (e.g., accept: application/json, Content-Type).
    • Grant Type: Specify the grant type (e.g., client_credentials or authorization_code).
    • Token Refresh: Since this uses OAuth2, you must configure the Refresh Token Endpoint so Airbyte can maintain access without manual intervention.
    • Variables: Input the Client ID and Client Secret as variables in the configuration form.

This step defines the specific data endpoint you want to pull (in this example, a statistics report).

  • URL Path: Define the specific endpoint (e.g., /2021-01/statistics/report).
  • HTTP Method: Set to POST.
  • Request Body: Select JSON Object Body and define the payload required by the API. In the video, the following keys are configured:
    • format: “csv”
    • metrics: List of metrics (Clicks, Display, AdvertiserCost, Visits, etc.)
    • currency: “USD”
    • dimensions: List of dimensions (Adset, AdvertiserId, CampaignId, etc.)
    • advertiserIds: The specific ID for the account (e.g., “Blue Compass RV”).

To ensure efficient data loading, configure the sync to only pull new data rather than a full refresh every time.

  • Sync Mode: Select Incremental Sync.
  • Cursor: Use a Datetime Based Cursor.
  • Start/End Date Logic:
    • The video shows a configuration ensuring the sync finishes at midnight of the previous day (min_max_datetime logic) to ensure a full 24-hour period is captured.
  • Granularity: Set to 1 Day.
  • Event Frequency: The system is set to check for events every 1 Second to ensure no data is missed.

Configure how the connector reacts to API failures:

  • Retry Count: Set to 3 times.
  • Backoff Time: Set to 5 seconds.
  1. Test/Preview: Use the “Test” button in the builder to preview the data and ensure the stream is working correctly.
  2. Publish: Once verified, publish the connector.
  3. Add Source:
    • Go to the Sources tab.
    • Search for your newly created custom connector title.
    • Configure it with the destination (in the video, the destination is Databricks Production).
  4. Destination Settings:
    • Verify the Namespace and Schema.
    • The video shows the data being written to the raw catalog under the criteo schema.

Before querying data programmatically, you must establish a secure connection object within the Databricks UI.

  1. Navigate to External Data:

    • In the Databricks left-hand sidebar, click on Catalog.
    • Select the External Data tab at the top of the pane.
  2. Create Connection:

    • Click on Connections.
    • Click the Create connection button in the top right corner.
    • Note: In the video example, a connection named external_bigquery has already been established.

Once this connection is set up, you can reference it in notebooks or SQL queries just as you would a local table.


The video demonstrates a notebook workflow for loading Google Analytics 4 (GA4) raw data from BigQuery into Databricks.

To read from the external source, use the spark.read command with the BigQuery format. You must provide the credentials and the specific source table.

Key Configuration Options:

  • Format: bigquery
  • parentProject: The Google Cloud Project ID.
  • table: The specific BigQuery table you are querying.
  • Filter (Optional): You can apply filters (e.g., col("event_name") != "user_engagement") directly after reading to limit data volume.

To ensure efficiency, do not reload the entire dataset every time. The video demonstrates a logic flow to handle “materialization” and incremental loading:

  • Log Check: The script checks a log to see which dates have already been loaded.
  • Filter by Date: It filters the source query to only fetch data for dates that do not exist in the destination table.
  • Intraday vs. Daily: The pipeline distinguishes between “intraday” tables (events happening today that aren’t finalized) and historical daily tables.

Perform necessary transformations on the DataFrame df.

  • Example: Transforming timestamp formats or adding specific metadata columns.

Once the data is read and transformed, it is written to a Databricks Delta table.

The video highlights a specific write command pattern to ensure performance and data integrity.

(df.write
.format("delta")
.partitionBy("event_date") # Crucial for performance on large datasets
.mode("append") # Use 'append' to add new rows, avoiding full overwrites
.option("mergeSchema", "true") # Allows the schema to evolve if new columns appear
.saveAsTable("catalog.schema.table_name")
)
  • partitionBy("event_date"): Because the dataset is large (GA4 events), partitioning by date organizes the files physically, speeding up future queries that filter by date.
  • mode("append"): Since this is a log of events, you want to add new records to the bottom of the table rather than overwriting the whole history.
  • saveAsTable: This registers the data in the Databricks Hive metastore/Unity Catalog (e.g., raw.ga4.ga4_events), making it accessible via SQL.

This process utilizes the Databricks Auto Loader feature (cloudFiles) within a Delta Live Tables pipeline. It is designed to incrementally ingest data from cloud storage as new files arrive.


Before writing the ingestion code, you must locate the path to your external data.

  1. Navigate to Catalog in the Databricks sidebar.
  2. Select External Data.
  3. Locate the specific connection (e.g., braze-currents).
  4. Copy the URL (Base Path) provided in the details view (e.g., abfss://container@storage.dfs.core.windows.net/).

Create a new notebook or use an existing DLT notebook. The ingestion logic uses the @dlt.table decorator and spark.readStream.

import dlt
@dlt.table(
name="target_table_name",
table_properties={"quality": "bronze"} # Optional properties
)
def target_function_name():
return (
spark.readStream
.format("cloudFiles") # Activates Auto Loader
.option("cloudFiles.format", "avro") # Specifies source file format (Avro, JSON, CSV, etc.)
.load("abfss://<path_to_data>/*/*/*/*.avro") # Path with wildcards
)

Using .format("cloudFiles") tells Spark to use Auto Loader. This automatically handles schema inference and state management for file ingestion.

You must specify the format of the raw files sitting in cloud storage using the option: .option("cloudFiles.format", "avro") Note: Update “avro” to “json”, “csv”, or “parquet” depending on your source data.

When defining the .load() path, use asterisks (*) as wildcards to navigate folder structures.

  • Example: .../date=2023-10-01/event_type=user/...
  • Usage: If your data is nested in dynamic folders (like dates or categories), use * to tell Auto Loader to search recursively through those directories (e.g., /*/*/*.avro).
Step 4: Operational Behavior & Limitations
Section titled “Step 4: Operational Behavior & Limitations”
  • Incremental Ingestion: Auto Loader tracks which files have been processed. When the pipeline runs again, it will only ingest new files added to the storage since the last run.
  • Append-Only: This method is optimized for append-only workflows.
    • Warning: Auto Loader does not automatically handle updates to existing rows or deletions of rows in the source files. If your use case requires updates (CDC), you will need downstream logic (like dlt.apply_changes) to handle merging.
  1. Navigate to the Delta Live Tables pipeline view.
  2. You will see the directed graph (DAG) of your tables (e.g., subscription_actions, subscription_group_actions).
  3. Click on a table node to view:
    • Schema: The columns detected from the source files.
    • Metrics: Processing rates and row counts.
    • Status: ensure the compute starts and the state transitions to “Running” or “Completed”.

Next Step: Would you like me to write a PySpark snippet that cleans up the wildcards in your specific path structure to ensure optimal discovery performance?

Before starting the import process, ensure your new dataset matches the existing schema found in raw.customer_imports.

You should expect the following standard columns from the dealership: first_name, last_name, phone, email, street, home_city, state, and zip_code.


####Step 1: Clean Phone and Email Data

  • Phone Numbers: Standardize all phone numbers. Remove any country codes (e.g., no “+1”) and ensure every number is exactly 10 digits long.
  • Emails: Perform a basic clean of the email addresses to ensure there are no obvious formatting errors.

####Step 2: Append Location Information

  • Access your Trusted Feed to find the specific details for the new dealership location.
  • For every customer row in your dataset, populate the following new columns with the exact data from the Trusted Feed:
  • location_uuid
  • location_state
  • location_region

####Step 3: Validate Email Deliverability (Zerobounce)

  • Log in to Zerobounce.

  • Navigate to Email Validation and select Validate.

  • Upload your current CSV file.

  • Zerobounce will process the file and return it with additional columns. Ensure your dataset now includes and matches the schema for:

  • zerobounce_status

  • sub_status

  • free_email

  • domain

  • Crucial Step: Filter your list to remove any customer records with invalid email addresses. You only want to upload valid, marketable contacts.

####Step 4: Ingest Data into Databricks

  • Log in to your Databricks environment.
  • Go to Data Engineering and select Data Ingestion.
  • Click on Create or modify a table.
  • Drag and drop your finalized, validated CSV file.
  • Set the target destination to raw.customer_imports.[your_new_list_name] (replace the bracketed text with the actual name of your file).
  • Perform a final visual check to ensure all columns align perfectly with the existing schema.

####Step 5: Update the Pipeline Workflow

  • In Databricks, go to your Workspace, then Pipelines, and select Workflows.
  • Locate and open the joined_customer_imports file.
  • You need to append your new table to the existing query. Add a new line using a SQL UNION statement, followed by your new table name. It should look something like this: UNION raw.customer_imports.[your_new_list_name].
  • Run the sheet/workflow.
  • The successful output will be routed to clean.customer_imports.joined_customers.

Once the workflow runs successfully in Databricks, Rudderstack will automatically take over the rest of the ingestion and routing processes.