Skip to content

Data Engineering - Azure Databricks and DuckDB

Github site: https://github.com/mcliston/azure_databricks_dataengineering

Project intro:

Being a bay area sports fan, I've always wondered how the teams I grew up with compared to dominant teams that came before like the 2010-2014 San Francisco Giants vs. the Yankee teams of the past and the 49ers vs. the 1970's Steelers.

Snooping around the Kaggle datasets, I came upon NBA database dataset and thought it would be a good opportunity to do a little data sleuthing to see how the recent Golden State Warriors success compared to the Kobe Bryant/Shaquille O'Neal Lakers...

Note

Since this is a data engineering section, topics will be for the most part data engineering centric. There will be a related Analysis/ML section to follow

Alt text

Tools used:

  • Azure Blob Storage
  • Azure Databricks (Spark, Workflows)
  • DuckDB (querying parquet files)
  • Python v3.12
  • Pandas

Note

The transformations of the data used are just some of the many more that are going to be done in follow-on posts.

Azure Blob Storage

Like AWS S3, Azure Blob storage can hold a variety of file types which can be accessed quickly from a number of different tools. I this project I chose to store the data in parquet files because of their versatility and ability to use different types of compression (ZSTD, snappy, gzip, etc.), and I wanted to how DuckDB would handle the Azure envrironment.

I setup Azure Key Vault (may be over kill but once its setup, it easy to work with) for authentication between Databricks notebooks and Blob storage.

Getting container names and container keys in a Databricks notebook looks like the following:

storage_account_name = dbutils.secrets.get(scope="key-vault-scope", key="storage-acct-name")
storage_account_key = dbutils.secrets.get(scope="key-vault-scope", key="storage-account-key")
sas_token = dbutils.secrets.get(scope="key-vault-scope", key="sastoken")

It's also essential to setup a secret scope for the vault (which is a parameter in the dbutils.secrets.get()). Microsoft instructions to do so are here.

Interacting with Blob Storage from a Databricks notebook

Along with the storage account name and keys, to interface with Blob storage within a Databricks notebook, I also had to install the adlfs library. How its used in a notebook is in the code snippet below:

from adlfs import AzureBlobFileSystem
abfs = AzureBlobFileSystem(account_name=storage_account_name, account_key=storage_account_key) ## name and key from previous code block

To browse the Blob filesystem you can run the following:

files = abfs.ls("<container-name>")
print(files)

Saving transformed parquet files back to Blob storage is done with the following:

file_path = f"az://<container-name>/output.parquet"
with abfs.open(file_path, 'wb') as f:
    df.to_parquet(f)

## notice that these saves are assuming a Pandas DataFrame. At the time of writing this I could not save to parquet directly from a Spark DataFrame.

Working with DuckDB in Azure (Transformation Task #1)

Working with DuckDB in Azure was more or less like working with it in any other environment. It was pretty seamless and painless and fast (albeit not as fast as Spark). I know that DuckDB can run queries in parallel but not sure the details when put in an environment like Databricks (benchmarking and research to come).

If you're familar with DuckDB, you know that it's allure is writing SQL queries to almost any time of data file and how well it's been optimized.

Because this is a data engineering project, I decided to make a small transformations library to keep things in order here.

How that library is used in a Databricks notebook is below:

from nba_analysis_tools.query_tools import PerTeamExtract
warriors = PerTeamExtract('Warriors', 2014, 2019, abfs_obj=abfs)
warriors_df, warriors_team_picks_df = warriors.run()

lakers = PerTeamExtract('Lakers', 1999, 2002, abfs_obj=abfs)
lakers_df, lakers_team_picks_df = lakers.run()
# making a general query for Kobe Bryant and Shaquille O'Neal since they were not drafted by the Lakers but spent the majority of there careers on the Lakers

draft_cols = lakers_team_picks_df.columns
col_q_lst = ['a.'+ i for i in draft_cols]
table = 'draft_history'
q = f"""
    SELECT {",".join(col_q_lst)} FROM read_parquet('abfs://data/{table}.parquet') AS a WHERE a.player_name = 'Kobe Bryant' OR a.player_name LIKE 'Shaq%'
"""

kobe_shaq = lakers._general_query(q)
lakers_team_picks_df = pd.concat([lakers_team_picks_df, kobe_shaq], axis=0)

Desired output #1 - Simple join of 'game' and 'other_stats' table by game_id: Alt text

Desired output #2 - Extraction of all draft picks of a team 6 years before the start of their dominance up till the end of their run: Alt text

Interacting with Databricks (PySpark)

To setup an interface for PySpark to communicate with Blob storage, mount the storage container locally using the following:

dbutils.fs.mount(
    source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/mydata",
  extra_configs = {"fs.azure.account.key.<storage-account-name>.blob.core.windows.net": "<storage-account-key>"}
)

Note that the mount point name is arbitrary. This mount point will persist across compute sessions until you unmount with the following (within a Databricks Notebook):

dbutils.fs.unmount("/mnt/<my-container>")

List the contents of the mount point:

dbutils.fs.ls("/mnt/<my-container>")

List available mount points:

dbutils.fs.mounts()

Data transformations with PySpark DataFrames (Transformation Task #2)

PySpark DataFrames behave similarly to Pandas and Polars DataFrames but are optimized to use Spark clusters underneath the hood. Cleaning and transforming the data went as follows:

#importing
lakers_df = spark.read.parquet("/mnt/process2/lakers_df.parquet")
lakers_draft_df = spark.read.parquet("/mnt/process2/lakers_team_picks_df.parquet")
warriors_df = spark.read.parquet("/mnt/process2/warriors_df.parquet")
warriors_draft_df = spark.read.parquet("/mnt/process2/team_picks_df.parquet")

#using custom class "TeamStats"
warriors_team_stats = TeamStats(warriors_id,warriors_df, warriors_draft_df)
warriors_combined = warriors_team_stats.combine_stats()

lakers_team_stats = TeamStats(lakers_id,lakers_df, lakers_draft_df)
lakers_combined = lakers_team_stats.combine_stats()

Tranformation 2 output #1 - A team's avg field goal percentage at home/away, avg fast-break ratio home/away, avg points-in-the-paint home/away, avg 3-pt percentage home/away:

Alt text

Orchestrating the Transformations - Databricks Workflows

With the transformation notebooks in place, it's time for executing/automating. With the many orchestration libraries available today (Dagster, Luigi, Prefect, Airflow, etc) its hard to make a decision because all do the job well and have great features. In this case, the built in Databricks Workflows are a great option because its integral to the nature of Databricks.

Alt text

Setting up a workflow is just a couple of menu selection down from where notebooks are started:

Start workflow Alt text

Assign notebooks to workflow Alt text

Post Conclusion

Starting the workflow saves the transformed parquet files back into a assigned results container in the Blob storage.

Next, I'll continue subsetting the NBA data set in search of the identities of these two basketball teams and what made them so successful. If I stay within the Azure/Databricks ecosystem I'll probably stay solely with PySpark since its a ton faster (DuckDB did really well regardless).