Data Transformation (T)
Databricks Development
Section titled “Databricks Development”Workflow for creating and testing new Notebooks
Section titled “Workflow for creating and testing new Notebooks”This guide outlines the standard workflow for developing data pipelines in Databricks, moving from an experimental “sandbox” environment to a formalized production job or Delta Live Tables (DLT) pipeline.
Phase 1: The “Sandbox” (Interactive Prototyping)
Section titled “Phase 1: The “Sandbox” (Interactive Prototyping)”Goal: Experiment with code and validate logic with immediate visual feedback.
- Create a Scratchpad Notebook: Start in a standard Databricks notebook. This is your experimental zone. Do not worry about code structure or modularity here; the focus is on getting the logic right.
- Iterative Development: Write your code in small chunks. Run individual cells to test specific transformations (e.g., column renaming, casting types, or cleaning strings) one step at a time.
- Visual Validation:
Heavily utilize the
display()command or.show()methods after every significant transformation.- Why? You need to visually confirm that your joins didn’t explode the row count and that your filters aren’t removing valid data.
- Action: Check the data grid output to ensure the schema looks correct before moving to the next step.
Phase 2: The Migration (Clean Slate)
Section titled “Phase 2: The Migration (Clean Slate)”Goal: Establish a clean, version-controllable environment for the final pipeline.
- Open a Production Notebook: Create a new notebook dedicated to the pipeline (or Job). Do not simply rename the scratchpad; starting fresh ensures you don’t accidentally carry over temporary variables or debug code.
- Set Up the Structure:
Prepare the new notebook to receive the code. If you are building a DLT pipeline, import the necessary libraries (
dlt) and set up your function decorators. If building a Job, set up your imports.
Setting up Delta Live Tables (DLT) pipelines
Section titled “Setting up Delta Live Tables (DLT) pipelines”Prerequisites
Section titled “Prerequisites”- A Databricks workspace.
- Existing PySpark transformation logic (tested in a standard notebook).
Step 1: Initialize the Pipeline
Section titled “Step 1: Initialize the Pipeline”- Navigate to Workflows (or Jobs) in the sidebar.
- Select the Delta Live Tables tab.
- Click the Create Pipeline button.
- Configure the General settings:
- Pipeline Name: Give your pipeline a descriptive name.
- Product Edition: Select “Advanced” or “Pro” (depending on your needs).
- Pipeline Mode: Choose Triggered (for batch processing) or Continuous (for low-latency streaming).
- Configure Source Code:
- Under “Paths”, you can select an existing notebook or start fresh.
- Select Create new (start with an empty file).
- Choose a directory/folder path where the code will be stored.
- Select Python as the language.
Step 2: Implement the DLT Logic
Section titled “Step 2: Implement the DLT Logic”Once the pipeline creation screen opens the editor, you need to adapt your standard PySpark code for DLT.
- Import DLT: Ensure you import the library at the top of your file.
import dltfrom pyspark.sql.functions import *
- Add the Decorator: Instead of just writing a dataframe transformation, wrap your logic in a function decorated with
@dlt.table. - Define Properties: Inside the decorator, define the properties of the target table.
name: The name of the table to be created in the database.comment: A description of what the data represents.
- Return the Data: unlike standard notebooks where you might use
display()or.write.save(), a DLT function must return the final DataFrame.
General Code Template:
import dltfrom pyspark.sql.functions import *
# Define the DLT Table@dlt.table( name="target_table_name", comment="Description of what this table contains")def create_metric_table(): # 1. Read your source data df_source = spark.read.table("source_database.source_table")
# 2. Apply your transformations (Joins, filters, aggregations) df_transformed = df_source.filter(col("status") == "active") \ .join(...)
# 3. Return the final dataframe (Do not use .write or .save) return df_transformedStep 3: Configure Target Settings
Section titled “Step 3: Configure Target Settings”On the right-hand panel of the DLT editor (Pipeline Settings):
- Target Schema: Enter the name of the database (schema) where you want your new tables to be published.
- Note: If you do not set this, the tables will be transient and not queryable outside the pipeline.
- Storage Location: (Optional but recommended) define a specific cloud storage path for the tables.
Step 4: Validate and Run
Section titled “Step 4: Validate and Run”- Dry Run: Before processing data, click the arrow next to the “Start” button and select Validate (or “Dry Run”).
- Databricks will verify the code syntax and generate a DAG (Directed Acyclic Graph) visualization showing the flow of data.
- Start: If the graph looks correct, click Start.
- This will provision the cluster and run the pipeline.
- It will create the “Materialized View,” meaning it will calculate the current state of the data based on the source.
Step 5: Scheduling (Optional)
Section titled “Step 5: Scheduling (Optional)”To automate the pipeline:
- Click the Schedule button in the top right corner.
- Trigger: Choose Scheduled.
- Frequency: Set the interval (e.g., Every 1 Day, Every Hour) or use Cron syntax for specific times.
- Click Create to save the schedule.
Managing Compute resources
Section titled “Managing Compute resources”This guide outlines how to navigate, select, and configure compute resources within the Databricks environment.
1. Accessing Compute Resources
Section titled “1. Accessing Compute Resources”To view all available infrastructure:
- Navigate to the left-hand sidebar menu.
- Select the Compute tab.
2. Types of Compute
Section titled “2. Types of Compute”The Compute section is divided into specific tabs based on the workload type:
A. SQL Warehouses
Section titled “A. SQL Warehouses”Best for: General catalog browsing, SQL queries, and external connections (e.g., Power BI, Rudderstack).
- Serverless Options: These are generally available and recommended for most standard tasks.
- External Connections: It is a recommended best practice to provision a separate compute resource for each external connection (e.g., one warehouse for Power BI, a different one for Rudderstack). This ensures that heavy loads on one tool do not impact the performance of another.
B. All-Purpose Compute
Section titled “B. All-Purpose Compute”Best for: Specific configurations and custom environments.
- Use this when you need granular control over the environment that is not available in SQL Warehouses.
- Required for certain specific notebook execution scenarios where standard SQL warehouses lack necessary libraries or configuration settings.
C. Serverless Jobs
Section titled “C. Serverless Jobs”- Can be added to notebooks or Delta Live Tables (DLT).
- Benefit: significantly faster startup times and execution for automated jobs.
3. Configuring and Editing Compute
Section titled “3. Configuring and Editing Compute”To modify an existing resource (e.g., a SQL Warehouse):
-
Select the Resource: From the list (e.g., under the SQL Warehouses tab), locate the specific compute instance you wish to manage.
-
Enter Edit Mode: Click on the resource name, then click the Edit button in the top right corner.
-
Adjust Settings:
- Cluster Size: Select the “T-Shirt size” (e.g., 2X-Small, Small, Medium) appropriate for the data volume.
- Auto-Stop: Define the idle time limit (in minutes). Setting this aggressively helps prevent unnecessary costs when the warehouse is not in use.
- Scaling: Set the Minimum and Maximum number of clusters. This allows the warehouse to scale up during high concurrency and scale down when demand drops.
- Type: Toggle between Serverless or Classic depending on your requirements.
-
Save: Click Save to apply the configuration changes.
Updating specific logic for key transformations
Section titled “Updating specific logic for key transformations”Website Analytics
Section titled “Website Analytics”This document outlines the standard operating procedure for adding new data attributes (e.g., User Geo, Device Type, Custom Dimensions) to the reporting datasets used in Power BI.
1. System Architecture Overview
Section titled “1. System Architecture Overview”The data flow consists of two primary pipelines that transform raw event data into aggregated reporting tables. To add a new field to Power BI, the data must be passed sequentially through this chain.
- Source: Raw GA4 Event Data
- Pipeline 1 (
analytics_attribution): Transforms raw events into Sessions, Devices, and Vehicle Events. - Pipeline 2 (
lead_funnel): Consumes the output of the attribution pipeline to create high-level aggregates (e.g., Page Summaries, Daily Funnels). - Destination: Power BI Reporting Tables.
2. General Update Workflow
Section titled “2. General Update Workflow”When adding a new attribute (referred to here as [New_Attribute]), you must begin at the lowest level of granularity (Events) and propagate the field forward through every subsequent transformation table.
Phase 1: Update analytics_attribution Pipeline
Section titled “Phase 1: Update analytics_attribution Pipeline”Navigate to the analytics_attribution notebook/pipeline.
Step 1.1: Add to Base Events (mv_events)
Locate the code block for mv_events. This is where raw data is extracted.
- Identify the source field in the raw data (e.g.,
geo.countryordevice.category). - Add the field to the select statement or dataframe transformation.
- Example: ensure
events.geo.countryis selected and aliased if necessary.
Step 1.2: Propagate to Sessions (mv_sessions)
Locate the mv_sessions table definition. Since sessions are grouped events:
- Add
[New_Attribute]to the selection list reading frommv_events. - Ensure the logic handles the aggregation (e.g., taking the
first()value,max(), or grouping by the attribute if it defines the session). - Key Action: Ensure the field is present in the final DataFrame returned for sessions.
Step 1.3: Propagate to Devices/Vehicles (If applicable) If the attribute relates specifically to device or vehicle metadata:
- Update
mv_devicesormv_vehicle_eventsfollowing the same logic as Step 1.2.
Phase 2: Update lead_funnel Pipeline
Section titled “Phase 2: Update lead_funnel Pipeline”Once the attributes are available in the attribution tables, navigate to the lead_funnel pipeline. These tables read from the outputs of Phase 1.
Step 2.1: Update Intermediate Aggregates
Locate tables that aggregate session data, such as mv_website_engagement or mv_page_summary.
- Add
[New_Attribute]to the import logic reading from theanalytics_attributiontables. - Verify the field is included in the DataFrame schema.
Step 2.2: Update Final Reporting Tables Locate the final destination tables used by Power BI. Common examples include:
mv_online_lead_funnel_dailymv_full_funnel_opportunitiesmv_page_summary_daily
Update the final SELECT or DataFrame operations to include [New_Attribute].
3. Deployment & Verification
Section titled “3. Deployment & Verification”- Run Pipelines: Execute the pipelines starting from
analytics_attributionfollowed bylead_funnelto refresh the data tables. - Verify Schema: Check the output of the final table (e.g.,
mv_online_lead_funnel_daily) to ensure the column is populated. - Power BI Refresh: Once the Databricks tables are updated, refresh the data model in Power BI to see the new field available for reporting.
Context: This process outlines how to update attributes in the CRM semantic model (e.g., adding a new field to the Customer or Contact view). This usually involves propagating a column from a raw activity table to a rolled-up customer view.
Phase 1: Verification & Prerequisites
Section titled “Phase 1: Verification & Prerequisites”Before modifying the transformation logic, ensure the data exists upstream.
- Check the Upstream Table: Verify that the new attribute/column is already present in the staging table (e.g.,
mv_crm_activities). - Handle Missing Data:
- If the column is missing: This indicates a gap in the ingestion process. You must first update the downstream process that hits the API to import these events and add the new fields there.
- If the column exists: Proceed to Phase 2.
Phase 2: Updating the Transformation Logic
Section titled “Phase 2: Updating the Transformation Logic”- Locate the DLT Notebook: Open the Databricks notebook containing the Delta Live Tables (DLT) definitions (e.g.,
the_crm). - Find the Target Table Definition: Scroll to the specific materialized view definition you are updating (e.g.,
mv_crm_customersormv_crm_contacts).- Note: These tables typically aggregate data so that there is one separate line per individual customer.
- Update the Aggregation:
- Locate the aggregation block (where the code defines logic like
first(),last(),count(), etc.). - Add the new column to this list. Ensure you are applying the correct aggregation logic (e.g., grabbing the “most recent” value or “first” value seen).
- Locate the aggregation block (where the code defines logic like
Phase 3: Validation (Dry Run)
Section titled “Phase 3: Validation (Dry Run)”Critical Step: Do not run the pipeline immediately. Validate the code syntax and output first.
- Isolate the Code: Copy the entire function/logic block from within the DLT definition.
- Create a Scratch Cell: Paste the code into a new, separate notebook cell.
- Modify for Display:
- Remove or comment out the
return dlt_dataframestatement. - Replace it with a display command to visualize the output (e.g.,
df_final.display()).
- Remove or comment out the
- Execute: Run the cell interactively. Check the output to ensure the new column appears as expected and the data looks correct.
Phase 4: Deployment & Refresh
Section titled “Phase 4: Deployment & Refresh”Once validated, move the code back to production.
- Apply Changes: Paste the tested code back into the original DLT definition block.
- Select Update Strategy:
- Standard Update: If the change only affects new data moving forward, you can run a standard pipeline update.
- Full Refresh (Important): If the change alters historical logic (e.g., changing how a “first source” is calculated) or significantly changes the schema, you must perform a Full Refresh.
- Warning: Ensure you select “Full Refresh” for the specific table and any downstream tables that rely on this data.