Data Ingestion (EL)
Airbyte Operations
Section titled “Airbyte Operations”Setting up a new Connection
Section titled “Setting up a new Connection”1. Add a New Source
Section titled “1. Add a New Source”- From the Airbyte dashboard, click on Sources in the left-hand navigation menu.
- Click the + New source button in the top right corner.
- Search for and select the desired connector from the list (e.g., LinkedIn Ads, Bing Ads).
2. Configure the Source
Section titled “2. Configure the Source”- Authenticate: Follow the prompts to authorize the connection to the third-party account.
- Start Date: Input the date from which you want to begin ingesting historical documentation/data.
- Consult Documentation: Use the setup guide panel on the right side of the screen for specific prerequisites and API requirements for your chosen source.
3. Select Destination
Section titled “3. Select Destination”- After setting up the source, proceed to the next step and select your destination (e.g.,
databricks_production).
4. Configure Schema (Critical for Billing)
Section titled “4. Configure Schema (Critical for Billing)”- Navigate to the Schema tab of your connection.
- Review the list of available streams (tables).
- Action: Uncheck any streams or rows that are not strictly necessary.
- Reasoning: Airbyte billing is based on the number of rows synced. Syncing unnecessary data will significantly increase costs.
5. Test and Sync
Section titled “5. Test and Sync”- Run a Test sync to ensure the connection is valid.
- Once confirmed, enable the connection.
6. Verify in Databricks
Section titled “6. Verify in Databricks”- Log in to your Azure Databricks environment.
- Navigate to the Catalog.
- Go to the Raw schema (or your configured ingestion location).
- Locate the namespace that matches the destination name you set in Airbyte (e.g.,
bing_ads). - Click on a table (stream) to view the schema and verify that:
- The data columns are present.
- The Airbyte metadata columns (e.g.,
_airbyte_extracted_at) are visible.
Fixing an Existing Connection
Section titled “Fixing an Existing Connection”Summary: This guide explains how to investigate a failed sync within Airbyte, locate specific error logs, and re-trigger the sync after applying a fix.
Instructions
Section titled “Instructions”1. Identify the Failed Connection
Section titled “1. Identify the Failed Connection”- Navigate to the Connections dashboard.
- Look for connections marked with a red status bar or “Sync Failed” (e.g.,
Google Ads -> databricks_production). - Tip: You can use the 30-day history bar to visually spot past failures.
2. Locate Error Details
Section titled “2. Locate Error Details”- Click on the name of the failed connection to open its details.
- Select the Timeline tab from the top menu.
- Scroll to find the specific Sync failed event entry.
3. Investigate the Cause
Section titled “3. Investigate the Cause”You have two options to view errors:
- Option A: Quick View
- Click the See more link next to the error message.
- Example: In the video, this revealed a
\t(tab) character was used where a space was expected.
- Option B: View Full Logs
- If the quick view is insufficient, click the three vertical dots (kebab menu) on the right of the error row.
- Select View logs.
- Review the generated log file to find the specific exception or error message.
4. Fix and Retry
Section titled “4. Fix and Retry”- Apply the necessary fix to your source data or connector settings.
- Return to the connection dashboard.
- Click the Sync now button in the top right corner.
- Monitor the Streams status or Timeline to ensure the sync completes successfully.
Custom Connectors
Section titled “Custom Connectors”Overview
Section titled “Overview”When a pre-built connector (e.g., for a CRM or Criteo) is not available in the standard Airbyte catalog, you must build a Custom Connector. This process involves defining the API base URL, authentication method, and the specific data streams you wish to replicate.
Step 1: Access the Connector Builder
Section titled “Step 1: Access the Connector Builder”- Navigate to the Builder tab in the Airbyte sidebar.
- Click to create a new connector or edit an existing one (the video demonstrates editing an existing “Criteo” connector).
- The builder is divided into two main sections: Global Configuration and Streams.
Step 2: Global Configuration & Authentication
Section titled “Step 2: Global Configuration & Authentication”You must set up how Airbyte connects to the API globally.
- API Base URL: Enter the root URL for the API.
- Authentication Method: Configure the authentication flow (The video demonstrates OAuth2).
- Headers: Set up standard headers (e.g.,
accept: application/json,Content-Type). - Grant Type: Specify the grant type (e.g.,
client_credentialsorauthorization_code). - Token Refresh: Since this uses OAuth2, you must configure the Refresh Token Endpoint so Airbyte can maintain access without manual intervention.
- Variables: Input the
Client IDandClient Secretas variables in the configuration form.
- Headers: Set up standard headers (e.g.,
Step 3: Stream Configuration
Section titled “Step 3: Stream Configuration”This step defines the specific data endpoint you want to pull (in this example, a statistics report).
1. Request Setup
Section titled “1. Request Setup”- URL Path: Define the specific endpoint (e.g.,
/2021-01/statistics/report). - HTTP Method: Set to POST.
- Request Body: Select JSON Object Body and define the payload required by the API. In the video, the following keys are configured:
format: “csv”metrics: List of metrics (Clicks, Display, AdvertiserCost, Visits, etc.)currency: “USD”dimensions: List of dimensions (Adset, AdvertiserId, CampaignId, etc.)advertiserIds: The specific ID for the account (e.g., “Blue Compass RV”).
2. Incremental Sync Configuration
Section titled “2. Incremental Sync Configuration”To ensure efficient data loading, configure the sync to only pull new data rather than a full refresh every time.
- Sync Mode: Select Incremental Sync.
- Cursor: Use a Datetime Based Cursor.
- Start/End Date Logic:
- The video shows a configuration ensuring the sync finishes at midnight of the previous day (
min_max_datetimelogic) to ensure a full 24-hour period is captured.
- The video shows a configuration ensuring the sync finishes at midnight of the previous day (
- Granularity: Set to 1 Day.
- Event Frequency: The system is set to check for events every 1 Second to ensure no data is missed.
3. Error Handling
Section titled “3. Error Handling”Configure how the connector reacts to API failures:
- Retry Count: Set to 3 times.
- Backoff Time: Set to 5 seconds.
Step 4: Publish and Connect
Section titled “Step 4: Publish and Connect”- Test/Preview: Use the “Test” button in the builder to preview the data and ensure the stream is working correctly.
- Publish: Once verified, publish the connector.
- Add Source:
- Go to the Sources tab.
- Search for your newly created custom connector title.
- Configure it with the destination (in the video, the destination is Databricks Production).
- Destination Settings:
- Verify the Namespace and Schema.
- The video shows the data being written to the
rawcatalog under thecriteoschema.
Databricks Ingestion
Section titled “Databricks Ingestion”Adding an External Database Source
Section titled “Adding an External Database Source”Part 1: Establish the External Connection
Section titled “Part 1: Establish the External Connection”Before querying data programmatically, you must establish a secure connection object within the Databricks UI.
-
Navigate to External Data:
- In the Databricks left-hand sidebar, click on Catalog.
- Select the External Data tab at the top of the pane.
-
Create Connection:
- Click on Connections.
- Click the Create connection button in the top right corner.
- Note: In the video example, a connection named
external_bigqueryhas already been established.
Once this connection is set up, you can reference it in notebooks or SQL queries just as you would a local table.
Part 2: Loading Data with PySpark
Section titled “Part 2: Loading Data with PySpark”The video demonstrates a notebook workflow for loading Google Analytics 4 (GA4) raw data from BigQuery into Databricks.
1. Read Configuration
Section titled “1. Read Configuration”To read from the external source, use the spark.read command with the BigQuery format. You must provide the credentials and the specific source table.
Key Configuration Options:
- Format:
bigquery - parentProject: The Google Cloud Project ID.
- table: The specific BigQuery table you are querying.
- Filter (Optional): You can apply filters (e.g.,
col("event_name") != "user_engagement") directly after reading to limit data volume.
2. Incremental Load Logic
Section titled “2. Incremental Load Logic”To ensure efficiency, do not reload the entire dataset every time. The video demonstrates a logic flow to handle “materialization” and incremental loading:
- Log Check: The script checks a log to see which dates have already been loaded.
- Filter by Date: It filters the source query to only fetch data for dates that do not exist in the destination table.
- Intraday vs. Daily: The pipeline distinguishes between “intraday” tables (events happening today that aren’t finalized) and historical daily tables.
3. Transformation
Section titled “3. Transformation”Perform necessary transformations on the DataFrame df.
- Example: Transforming timestamp formats or adding specific metadata columns.
Part 3: Writing to Delta Tables
Section titled “Part 3: Writing to Delta Tables”Once the data is read and transformed, it is written to a Databricks Delta table.
Write Command Syntax
Section titled “Write Command Syntax”The video highlights a specific write command pattern to ensure performance and data integrity.
(df.write .format("delta") .partitionBy("event_date") # Crucial for performance on large datasets .mode("append") # Use 'append' to add new rows, avoiding full overwrites .option("mergeSchema", "true") # Allows the schema to evolve if new columns appear .saveAsTable("catalog.schema.table_name"))Key Parameters Explained
Section titled “Key Parameters Explained”partitionBy("event_date"): Because the dataset is large (GA4 events), partitioning by date organizes the files physically, speeding up future queries that filter by date.mode("append"): Since this is a log of events, you want to add new records to the bottom of the table rather than overwriting the whole history.saveAsTable: This registers the data in the Databricks Hive metastore/Unity Catalog (e.g.,raw.ga4.ga4_events), making it accessible via SQL.
Adding Cloud File Source (Autoloader)
Section titled “Adding Cloud File Source (Autoloader)”Overview
Section titled “Overview”This process utilizes the Databricks Auto Loader feature (cloudFiles) within a Delta Live Tables pipeline. It is designed to incrementally ingest data from cloud storage as new files arrive.
Step 1: Identify External Data Source
Section titled “Step 1: Identify External Data Source”Before writing the ingestion code, you must locate the path to your external data.
- Navigate to Catalog in the Databricks sidebar.
- Select External Data.
- Locate the specific connection (e.g.,
braze-currents). - Copy the URL (Base Path) provided in the details view (e.g.,
abfss://container@storage.dfs.core.windows.net/).
Step 2: Define the DLT Pipeline (Python)
Section titled “Step 2: Define the DLT Pipeline (Python)”Create a new notebook or use an existing DLT notebook. The ingestion logic uses the @dlt.table decorator and spark.readStream.
Code Structure
Section titled “Code Structure”import dlt
@dlt.table( name="target_table_name", table_properties={"quality": "bronze"} # Optional properties)def target_function_name(): return ( spark.readStream .format("cloudFiles") # Activates Auto Loader .option("cloudFiles.format", "avro") # Specifies source file format (Avro, JSON, CSV, etc.) .load("abfss://<path_to_data>/*/*/*/*.avro") # Path with wildcards )Step 3: Configuration Details
Section titled “Step 3: Configuration Details”1. The cloudFiles Format
Section titled “1. The cloudFiles Format”Using .format("cloudFiles") tells Spark to use Auto Loader. This automatically handles schema inference and state management for file ingestion.
2. Source Format
Section titled “2. Source Format”You must specify the format of the raw files sitting in cloud storage using the option:
.option("cloudFiles.format", "avro")
Note: Update “avro” to “json”, “csv”, or “parquet” depending on your source data.
3. Path and Wildcards (Globbing)
Section titled “3. Path and Wildcards (Globbing)”When defining the .load() path, use asterisks (*) as wildcards to navigate folder structures.
- Example:
.../date=2023-10-01/event_type=user/... - Usage: If your data is nested in dynamic folders (like dates or categories), use
*to tell Auto Loader to search recursively through those directories (e.g.,/*/*/*.avro).
Step 4: Operational Behavior & Limitations
Section titled “Step 4: Operational Behavior & Limitations”- Incremental Ingestion: Auto Loader tracks which files have been processed. When the pipeline runs again, it will only ingest new files added to the storage since the last run.
- Append-Only: This method is optimized for append-only workflows.
- Warning: Auto Loader does not automatically handle updates to existing rows or deletions of rows in the source files. If your use case requires updates (CDC), you will need downstream logic (like
dlt.apply_changes) to handle merging.
- Warning: Auto Loader does not automatically handle updates to existing rows or deletions of rows in the source files. If your use case requires updates (CDC), you will need downstream logic (like
Step 5: Validation and Monitoring
Section titled “Step 5: Validation and Monitoring”- Navigate to the Delta Live Tables pipeline view.
- You will see the directed graph (DAG) of your tables (e.g.,
subscription_actions,subscription_group_actions). - Click on a table node to view:
- Schema: The columns detected from the source files.
- Metrics: Processing rates and row counts.
- Status: ensure the compute starts and the state transitions to “Running” or “Completed”.
Next Step: Would you like me to write a PySpark snippet that cleans up the wildcards in your specific path structure to ensure optimal discovery performance?