Skip to content

Data Transformation (T)

Workflow for creating and testing new Notebooks

Section titled “Workflow for creating and testing new Notebooks”

This guide outlines the standard workflow for developing data pipelines in Databricks, moving from an experimental “sandbox” environment to a formalized production job or Delta Live Tables (DLT) pipeline.

Phase 1: The “Sandbox” (Interactive Prototyping)

Section titled “Phase 1: The “Sandbox” (Interactive Prototyping)”

Goal: Experiment with code and validate logic with immediate visual feedback.

  1. Create a Scratchpad Notebook: Start in a standard Databricks notebook. This is your experimental zone. Do not worry about code structure or modularity here; the focus is on getting the logic right.
  2. Iterative Development: Write your code in small chunks. Run individual cells to test specific transformations (e.g., column renaming, casting types, or cleaning strings) one step at a time.
  3. Visual Validation: Heavily utilize the display() command or .show() methods after every significant transformation.
    • Why? You need to visually confirm that your joins didn’t explode the row count and that your filters aren’t removing valid data.
    • Action: Check the data grid output to ensure the schema looks correct before moving to the next step.

Goal: Establish a clean, version-controllable environment for the final pipeline.

  1. Open a Production Notebook: Create a new notebook dedicated to the pipeline (or Job). Do not simply rename the scratchpad; starting fresh ensures you don’t accidentally carry over temporary variables or debug code.
  2. Set Up the Structure: Prepare the new notebook to receive the code. If you are building a DLT pipeline, import the necessary libraries (dlt) and set up your function decorators. If building a Job, set up your imports.

Setting up Delta Live Tables (DLT) pipelines

Section titled “Setting up Delta Live Tables (DLT) pipelines”
  • A Databricks workspace.
  • Existing PySpark transformation logic (tested in a standard notebook).
  1. Navigate to Workflows (or Jobs) in the sidebar.
  2. Select the Delta Live Tables tab.
  3. Click the Create Pipeline button.
  4. Configure the General settings:
    • Pipeline Name: Give your pipeline a descriptive name.
    • Product Edition: Select “Advanced” or “Pro” (depending on your needs).
    • Pipeline Mode: Choose Triggered (for batch processing) or Continuous (for low-latency streaming).
  5. Configure Source Code:
    • Under “Paths”, you can select an existing notebook or start fresh.
    • Select Create new (start with an empty file).
    • Choose a directory/folder path where the code will be stored.
    • Select Python as the language.

Once the pipeline creation screen opens the editor, you need to adapt your standard PySpark code for DLT.

  1. Import DLT: Ensure you import the library at the top of your file.
    import dlt
    from pyspark.sql.functions import *
  2. Add the Decorator: Instead of just writing a dataframe transformation, wrap your logic in a function decorated with @dlt.table.
  3. Define Properties: Inside the decorator, define the properties of the target table.
    • name: The name of the table to be created in the database.
    • comment: A description of what the data represents.
  4. Return the Data: unlike standard notebooks where you might use display() or .write.save(), a DLT function must return the final DataFrame.

General Code Template:

import dlt
from pyspark.sql.functions import *
# Define the DLT Table
@dlt.table(
name="target_table_name",
comment="Description of what this table contains"
)
def create_metric_table():
# 1. Read your source data
df_source = spark.read.table("source_database.source_table")
# 2. Apply your transformations (Joins, filters, aggregations)
df_transformed = df_source.filter(col("status") == "active") \
.join(...)
# 3. Return the final dataframe (Do not use .write or .save)
return df_transformed

On the right-hand panel of the DLT editor (Pipeline Settings):

  1. Target Schema: Enter the name of the database (schema) where you want your new tables to be published.
    • Note: If you do not set this, the tables will be transient and not queryable outside the pipeline.
  2. Storage Location: (Optional but recommended) define a specific cloud storage path for the tables.
  1. Dry Run: Before processing data, click the arrow next to the “Start” button and select Validate (or “Dry Run”).
    • Databricks will verify the code syntax and generate a DAG (Directed Acyclic Graph) visualization showing the flow of data.
  2. Start: If the graph looks correct, click Start.
    • This will provision the cluster and run the pipeline.
    • It will create the “Materialized View,” meaning it will calculate the current state of the data based on the source.

To automate the pipeline:

  1. Click the Schedule button in the top right corner.
  2. Trigger: Choose Scheduled.
  3. Frequency: Set the interval (e.g., Every 1 Day, Every Hour) or use Cron syntax for specific times.
  4. Click Create to save the schedule.

This guide outlines how to navigate, select, and configure compute resources within the Databricks environment.

To view all available infrastructure:

  1. Navigate to the left-hand sidebar menu.
  2. Select the Compute tab.

The Compute section is divided into specific tabs based on the workload type:

Best for: General catalog browsing, SQL queries, and external connections (e.g., Power BI, Rudderstack).

  • Serverless Options: These are generally available and recommended for most standard tasks.
  • External Connections: It is a recommended best practice to provision a separate compute resource for each external connection (e.g., one warehouse for Power BI, a different one for Rudderstack). This ensures that heavy loads on one tool do not impact the performance of another.

Best for: Specific configurations and custom environments.

  • Use this when you need granular control over the environment that is not available in SQL Warehouses.
  • Required for certain specific notebook execution scenarios where standard SQL warehouses lack necessary libraries or configuration settings.
  • Can be added to notebooks or Delta Live Tables (DLT).
  • Benefit: significantly faster startup times and execution for automated jobs.

To modify an existing resource (e.g., a SQL Warehouse):

  1. Select the Resource: From the list (e.g., under the SQL Warehouses tab), locate the specific compute instance you wish to manage.

  2. Enter Edit Mode: Click on the resource name, then click the Edit button in the top right corner.

  3. Adjust Settings:

    • Cluster Size: Select the “T-Shirt size” (e.g., 2X-Small, Small, Medium) appropriate for the data volume.
    • Auto-Stop: Define the idle time limit (in minutes). Setting this aggressively helps prevent unnecessary costs when the warehouse is not in use.
    • Scaling: Set the Minimum and Maximum number of clusters. This allows the warehouse to scale up during high concurrency and scale down when demand drops.
    • Type: Toggle between Serverless or Classic depending on your requirements.
  4. Save: Click Save to apply the configuration changes.

Updating specific logic for key transformations

Section titled “Updating specific logic for key transformations”

This document outlines the standard operating procedure for adding new data attributes (e.g., User Geo, Device Type, Custom Dimensions) to the reporting datasets used in Power BI.

The data flow consists of two primary pipelines that transform raw event data into aggregated reporting tables. To add a new field to Power BI, the data must be passed sequentially through this chain.

  • Source: Raw GA4 Event Data
  • Pipeline 1 (analytics_attribution): Transforms raw events into Sessions, Devices, and Vehicle Events.
  • Pipeline 2 (lead_funnel): Consumes the output of the attribution pipeline to create high-level aggregates (e.g., Page Summaries, Daily Funnels).
  • Destination: Power BI Reporting Tables.

When adding a new attribute (referred to here as [New_Attribute]), you must begin at the lowest level of granularity (Events) and propagate the field forward through every subsequent transformation table.

Phase 1: Update analytics_attribution Pipeline
Section titled “Phase 1: Update analytics_attribution Pipeline”

Navigate to the analytics_attribution notebook/pipeline.

Step 1.1: Add to Base Events (mv_events) Locate the code block for mv_events. This is where raw data is extracted.

  • Identify the source field in the raw data (e.g., geo.country or device.category).
  • Add the field to the select statement or dataframe transformation.
  • Example: ensure events.geo.country is selected and aliased if necessary.

Step 1.2: Propagate to Sessions (mv_sessions) Locate the mv_sessions table definition. Since sessions are grouped events:

  • Add [New_Attribute] to the selection list reading from mv_events.
  • Ensure the logic handles the aggregation (e.g., taking the first() value, max(), or grouping by the attribute if it defines the session).
  • Key Action: Ensure the field is present in the final DataFrame returned for sessions.

Step 1.3: Propagate to Devices/Vehicles (If applicable) If the attribute relates specifically to device or vehicle metadata:

  • Update mv_devices or mv_vehicle_events following the same logic as Step 1.2.

Once the attributes are available in the attribution tables, navigate to the lead_funnel pipeline. These tables read from the outputs of Phase 1.

Step 2.1: Update Intermediate Aggregates Locate tables that aggregate session data, such as mv_website_engagement or mv_page_summary.

  • Add [New_Attribute] to the import logic reading from the analytics_attribution tables.
  • Verify the field is included in the DataFrame schema.

Step 2.2: Update Final Reporting Tables Locate the final destination tables used by Power BI. Common examples include:

  • mv_online_lead_funnel_daily
  • mv_full_funnel_opportunities
  • mv_page_summary_daily

Update the final SELECT or DataFrame operations to include [New_Attribute].

  1. Run Pipelines: Execute the pipelines starting from analytics_attribution followed by lead_funnel to refresh the data tables.
  2. Verify Schema: Check the output of the final table (e.g., mv_online_lead_funnel_daily) to ensure the column is populated.
  3. Power BI Refresh: Once the Databricks tables are updated, refresh the data model in Power BI to see the new field available for reporting.

Context: This process outlines how to update attributes in the CRM semantic model (e.g., adding a new field to the Customer or Contact view). This usually involves propagating a column from a raw activity table to a rolled-up customer view.

Before modifying the transformation logic, ensure the data exists upstream.

  1. Check the Upstream Table: Verify that the new attribute/column is already present in the staging table (e.g., mv_crm_activities).
  2. Handle Missing Data:
    • If the column is missing: This indicates a gap in the ingestion process. You must first update the downstream process that hits the API to import these events and add the new fields there.
    • If the column exists: Proceed to Phase 2.

Phase 2: Updating the Transformation Logic

Section titled “Phase 2: Updating the Transformation Logic”
  1. Locate the DLT Notebook: Open the Databricks notebook containing the Delta Live Tables (DLT) definitions (e.g., the_crm).
  2. Find the Target Table Definition: Scroll to the specific materialized view definition you are updating (e.g., mv_crm_customers or mv_crm_contacts).
    • Note: These tables typically aggregate data so that there is one separate line per individual customer.
  3. Update the Aggregation:
    • Locate the aggregation block (where the code defines logic like first(), last(), count(), etc.).
    • Add the new column to this list. Ensure you are applying the correct aggregation logic (e.g., grabbing the “most recent” value or “first” value seen).

Critical Step: Do not run the pipeline immediately. Validate the code syntax and output first.

  1. Isolate the Code: Copy the entire function/logic block from within the DLT definition.
  2. Create a Scratch Cell: Paste the code into a new, separate notebook cell.
  3. Modify for Display:
    • Remove or comment out the return dlt_dataframe statement.
    • Replace it with a display command to visualize the output (e.g., df_final.display()).
  4. Execute: Run the cell interactively. Check the output to ensure the new column appears as expected and the data looks correct.

Once validated, move the code back to production.

  1. Apply Changes: Paste the tested code back into the original DLT definition block.
  2. Select Update Strategy:
    • Standard Update: If the change only affects new data moving forward, you can run a standard pipeline update.
    • Full Refresh (Important): If the change alters historical logic (e.g., changing how a “first source” is calculated) or significantly changes the schema, you must perform a Full Refresh.
      • Warning: Ensure you select “Full Refresh” for the specific table and any downstream tables that rely on this data.