# Data Engineering Tricks with Databricks

# General Databricks

Include different functions, variables or the likes from configuration files into the Databricks notebooks.

Similar to doing from library import bla

%run ./<path_to_file>/<config_file>

Databricks Utilites

# List files
dbutils.fs.ls(<path>)

# View first few rows
dbutils.fs.head(<path>)

# Delta Table API

Delta Lake provides programmatic APIs to examine and manipulate Delta tables.

from delta.tables import DeltaTable

bronzeTable = DeltaTable.forPath(spark, bronzePath)

Delta Table Update

from delta.tables import DeltaTable

silverTable = DeltaTable.forPath(spark, silverPath)

update_match = """
  health_tracker.eventtime = updates.eventtime
  AND
  health_tracker.device_id = updates.device_id
"""

update = {"heartrate": "updates.heartrate"}

(
    silverTable.alias("health_tracker")
    .merge(updatesDF.alias("updates"), update_match)
    .whenMatchedUpdate(set=update)
    .execute()
)

Delta Table Prevent Duplicate Writes

from delta.tables import DeltaTable

silverTable = DeltaTable.forPath(spark, silverPath)

existing_record_match = """
    table.value = latearrivals.value
"""

(
    silverTable.alias("health_tracker")
    .merge(lateArrivalDF.alias("latearrivals"), existing_record_match)
    .whenNotMatchedInsertAll()
    .execute()
)

# Streams

TIP

Best practice is to avoid using .table() notation since it will create fully managed tables by writing output to a default location on DBFS.

Create a Stream

from pyspark.sql.functions import col

(
    raw_health_tracker_data_df.select(
        "datasource", "ingesttime", "value", col("ingestdate").alias("p_ingestdate")
        )
    .writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", bronzeCheckpoint)
    .partitionBy("p_ingestdate")
    .queryName("write_raw_to_bronze")
    .start(bronzePath)
)

Display running Streams

for stream in spark.streams.active:
    print(stream.name)

Register Table in Metastore

  • A Delta table registered in the Metastore is a reference to a physical table created in object storage.
  • Furthermore, no table repair is required
  • AWS Glue (opens new window) can be used as Metastore
spark.sql(
    """
DROP TABLE IF EXISTS health_tracker_plus_bronze
"""
)

spark.sql(
    f"""
CREATE TABLE health_tracker_plus_bronze
USING DELTA
LOCATION "{bronzePath}"
"""
)