Skip to content

Example: Testing an ETL notebook

In this example we will show how to test an ETL notebook using DatabricksTestingTools.

Context

The ETL process consist of getting a file from an Azure Data Lake Storage and upload the data into a Snowflake Table. The example notebook and its test notebook are available in example directory in the repository PR.Data.DataFoundation.DatabricksTestingTools.

Here is an overview of the notebook to test:

# Step1: extract
data_lake = ADLS(
    azure_client_id=azure_client_id,
    azure_client_secret=azure_client_secret,
    azure_tenant_id=azure_tenant_id,
    azure_storage_account_name=azure_storage_account_name,
    azure_datalake_file_system=azure_datalake_file_system
)
data = data_lake.read_file_as(key=adls_file, input_format='csv', output_format='pandas')

# COMMAND ----------

# Step2: transform
data = data[data["category"] == "software"]

# COMMAND ----------

# Step3: load
snowflake = Snowflake(snowflake_username,
                      snowflake_password,
                      snowflake_database,
                      snowflake_warehouse,
                      snowflake_schema,
                      snowflake_role,
                      snowflake_account)
snowflake.connect()
data.to_sql(name='tech_crunch', con=snowflake.connection, if_exists="replace", index=False, method=pd_writer)

The test notebook:

Step 1: Create the test notebook

To test the notebook, we create a test notebook whose name starts with test_

Step 2: [Optional] Declare the output widget parameter

The test notebook might declare a widget parameter called output which represents the directory where will be generated XML tests results and coverage reports. The output parameter is optional and if not provided tests and coverage results will be printed to console.

# COMMAND ----------

dbutils.widgets.text("output", "")

# COMMAND ----------

output = dbutils.widgets.get("output")

Step 3: Define the Test Class

The test notebook must define a test class that implement DatabricksTestCase python Class. DatabricksTestCase is a Python unittest base class.

from databricks_testing_tools.test_case import DatabricksTestCase

class TestEtl(DatabricksTestCase):
    .....

This test class must contain all the tests of the notebook under test.

In this example, we can test that the upload has been done successfully by checking the content of the Snowflake table. Here we check that the column CATEGORY is present in database and contains a unique value software.

from databricks_testing_tools.test_case import DatabricksTestCase
class TestEtl(DatabricksTestCase):
    def test_data_load(self):
        # launch etl notebook
        dbutils.notebook.run("../notebooks/etl_notebook", 0, {"adls_file": "test/TechCrunchcontinentalUSA.csv"})

        # get data from the database
        cur = self.snowflake_connector.cursor()
        cur.execute(f"SELECT * FROM {self.snowflake_database}.{self.snowflake_schema}.{self.table_name}")
        result_df = cur.fetch_pandas_all()

        # check that CATEGORY column is present and only contains software
        self.assertIn("CATEGORY", list(result_df.columns))
        self.assertEqual(["software"], list(result_df['CATEGORY'].unique()))

Step 4: Call the execution of tests functions

Finally, The test notebook must call the execution of tests with execute_tests method.

TestEtl().execute_tests(output=output)

You are now ready to launch the test notebook by deploying it to your Databricks workspace, using DatabricksTestingTools, from your laptop or from an Azure Devops pipeline using the provided template.