Skip to content


Merge pull request #84 from CloudFormations/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
MattPCollins committed Jul 22, 2024
2 parents edf5559 + ad8f100 commit 8996208
Show file tree
Hide file tree
Showing 6 changed files with 628 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Databricks notebook source
# MAGIC %md
# MAGIC #Transform Check Functionality
# MAGIC - Check payload validity
# MAGIC - Confirm storage is accessible
# MAGIC - Create Delta Table, if required
# MAGIC #TODO items:
# MAGIC - Unit tests

# COMMAND ----------

# MAGIC %run ../utils/Initialise

# COMMAND ----------

# MAGIC %run ../utils/CheckPayloadFunctions

# COMMAND ----------

# MAGIC %run ./utils/CheckPayloadFunctions

# COMMAND ----------

# MAGIC %run ./utils/ConfigurePayloadVariables

# COMMAND ----------

dbutils.widgets.text("Notebook Payload","")
dbutils.widgets.text("Pipeline Run Id","")
#Remove Widgets
#dbutils.widgets.remove("<widget name>")

# COMMAND ----------

import json

# payload = json.loads(dbutils.widgets.get("Notebook Payload"))
payload = {
"ComputeWorkspaceURL": "",
"ComputeClusterId": "0428-080256-zv6uka2b",
"ComputeSize": "Standard_D4ds_v4",
"ComputeVersion": "14.3.x-scala2.12",
"CountNodes": 1,
"ComputeLinkedServiceName": "Transform_LS_Databricks_Cluster_MIAuth",
"ComputeResourceName": "cumulusdatabricksdev",
"ResourceGroupName": "CumulusFrameworkDev",
"SubscriptionId": "1b2b1db2-3735-4a51-86a5-18fa41b8bb49",
"CuratedStorageName": "cumulusframeworkdev",
"CuratedContainerName": "curated",
"CleansedStorageName": "cumulusframeworkdev",
"CleansedContainerName": "cleansed",
"CuratedStorageAccessKey": "cumulusframeworkdevcuratedaccesskey",
"CleansedStorageAccessKey": "cumulusframeworkdevcleansedaccesskey",
"DatasetName": "WordDictionary",
"SchemaName": "Dimensions",
"BusinessLogicNotebookPath": "/Workspace/Repos/[email protected]/CF.Cumulus/src/azure.databricks/python/notebooks/transform/businesslogicnotebooks/Dimensions/WordDictionary",
"ExecutionNotebookPath": "/Workspace/Repos/[email protected]/CF.Cumulus/src/azure.databricks/python/notebooks/transform/CreateDimensionTable",
"ColumnsList": "WordIndex,English,Hiragana,Kanji,DateAdded,Rank,Active",
"SurrogateKey": "WordDictionaryId",
"BkAttributesList": "WordIndex",
"PartitionByAttributesList": "",
"LoadType": "I",
"LastLoadDate": "2024-06-21T09:03:29.9233333Z"

# COMMAND ----------

[cleansedSecret, cleansedStorageName, cleansedContainerName, curatedSecret, curatedStorageName, curatedContainerName, curatedSchemaName, curatedDatasetName, columnsList, columnTypeList, bkList, partitionList, surrogateKey, loadType, businessLogicNotebookPath]= getTransformPayloadVariables(payload)

# COMMAND ----------

# MAGIC %md
# MAGIC # Initialisation

# COMMAND ----------

print("Setting cleansed ABFSS config...")
setAbfssSparkConfig(cleansedSecret, cleansedStorageName)

print("Setting curated ABFSS config...")
setAbfssSparkConfig(curatedSecret, curatedStorageName)

# COMMAND ----------

print("Setting cleansed ABFSS path...")
cleansedAbfssPath = setAbfssPath(cleansedStorageName, cleansedContainerName)

print("Setting curated ABFSS path...")
curatedAbfssPath = setAbfssPath(curatedStorageName, curatedContainerName)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Payload Validity

# COMMAND ----------

# Check data types and nullability of each dictionary element
checkLoadAction(loadAction = loadType)

# COMMAND ----------

checkMergeAndPKConditions(loadAction = loadType, pkList=bkList)

# COMMAND ----------

checkContainerName(containerName = cleansedContainerName)

# COMMAND ----------

checkContainerName(containerName = curatedContainerName)

# COMMAND ----------


# COMMAND ----------

sizeInBytes = spark.sql("describe detail dimensions.worddictionary").select("sizeInBytes").collect()[0][0]
partitionByThreshold = (1024**4) # 1TB
compareDeltaTableSizeVsPartitionThreshold(sizeInBytes, partitionByThreshold)

# COMMAND ----------


# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Storage accessibility

# COMMAND ----------

# Check cleansed storage account exists and is accessible.

# Check curated storage account exists and is accessible.

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Delta Schema created

# COMMAND ----------

schemaExists = checkExistsDeltaSchema(schemaName = curatedSchemaName)

# COMMAND ----------

# MAGIC %md
# MAGIC # Check: Delta Table created

# COMMAND ----------

curatedTablePath = setTablePath(schemaName =curatedSchemaName, tableName =curatedDatasetName)

# COMMAND ----------

# add loadtype to sp results + variables
# rename existing loadType to loadAction in the prevtests
tableExists = checkExistsDeltaTable(tablePath = curatedTablePath, loadAction = loadType, loadType = loadType)

# COMMAND ----------

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Databricks notebook source
# MAGIC %run ../utils/CheckPayloadFunctions

# COMMAND ----------

import unittest

# COMMAND ----------

class TestCheckSurrogateKey(unittest.TestCase):
def test_checkSurrogateKeyValidString(self):
actual = checkSurrogateKey(surrogateKey="StringValue")
expected = True
self.assertEqual(actual, expected)

def test_checkSurrogateKeyBlankString(self):
with self.assertRaises(Exception) as context:
expected = "Surrogate Key is a blank string. Please ensure this is populated for Curated tables."
self.assertTrue(expected in str(context.exception))

# COMMAND ----------

# class TestCheckExistsNotebook(unittest.TestCase):
# def test_checkExistsNotebookValidPath(self):
# actual = checkExistsNotebook(notebookPath="StringValue")
# expected = True
# self.assertEqual(actual, expected)

# def test_checkExistsNotebookInvalidPath(self):
# with self.assertRaises(Exception) as context:
# checkExistsNotebook(notebookPath="StringValue2")
# expected = "Notebook cannot be found at this path. Please ensure this has been created in the appropriate directory."
# self.assertTrue(expected in str(context.exception))

# COMMAND ----------

class TestCompareDeltaTableSizeVsPartitionThreshold(unittest.TestCase):

def test_compareDeltaTableSizeVsPartitionThresholdTableSmaller(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=100, partitionByThreshold=(1024**4))
expected = 'Delta table samller than partition by threshold. Partitions not advised unless other requirements, such as RLS.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdTableSmaller2(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=(1024**3), partitionByThreshold=(1024**4))
expected = 'Delta table samller than partition by threshold. Partitions not advised unless other requirements, such as RLS.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdTableSmaller3(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=100, partitionByThreshold=101)
expected = 'Delta table samller than partition by threshold. Partitions not advised unless other requirements, such as RLS.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdEqualSize(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=(1024**4), partitionByThreshold=(1024**4))
expected = 'Delta table equal size to partition by threshold. Partitions may be used.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdEqualSize2(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=100, partitionByThreshold=100)
expected = 'Delta table equal size to partition by threshold. Partitions may be used.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdEqualSize3(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=1099511627776, partitionByThreshold=(1024**4))
expected = 'Delta table equal size to partition by threshold. Partitions may be used.'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdTableBigger(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=101, partitionByThreshold=100)
expected = 'Delta table bigger than partition by threshold. Partitions may be used'
self.assertEqual(actual, expected)

def test_compareDeltaTableSizeVsPartitionThresholdTableBigger2(self):
actual = compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes=1099511627777, partitionByThreshold=(1024**4))
expected = 'Delta table bigger than partition by threshold. Partitions may be used'
self.assertEqual(actual, expected)

# COMMAND ----------

class TestCheckEmptyPartitionByFields(unittest.TestCase):

def test_checkEmptyPartitionByFieldsTableExistsSingleElement(self):
actual = checkEmptyPartitionByFields(partitionList=['sampleField'])
expected = False
self.assertEqual(actual, expected)

def test_checkEmptyPartitionByFieldsTableExistsMultipleElements(self):
actual = checkEmptyPartitionByFields(partitionList=['sampleField1','sampleField2'])
expected = False
self.assertEqual(actual, expected)

def test_checkEmptyPartitionByFieldsTableEmpty(self):
actual = checkEmptyPartitionByFields(partitionList=[])
expected = True
self.assertEqual(actual, expected)

def test_checkEmptyPartitionByFieldsPartitionFieldsNone(self):
with self.assertRaises(Exception) as context:
expected = "PartitionBy fields input as None value. Please review."
self.assertTrue(expected in str(context.exception))

# COMMAND ----------

r = unittest.main(argv=[''], verbosity=2, exit=False)
assert r.result.wasSuccessful(), 'Test failed; see logs above'
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Databricks notebook source
def checkSurrogateKey(surrogateKey: str) -> bool:
if surrogateKey == "":
raise Exception("Surrogate Key is a blank string. Please ensure this is populated for Curated tables.")
elif surrogateKey != "":
print(f"SurrogateKey value {surrogateKey} is non-blank.")
return True
raise Exception("Unexpected state.")

# COMMAND ----------

# # Requires API as can't iterate through notebooks...
# def checkExistsNotebook(notebookPath: str) -> bool:
# raise NotImplementedError

# class TestCheckExistsNotebook(unittest.TestCase):
# def test_checkExistsNotebookValidPath(self):
# actual = checkExistsNotebook(notebookPath="StringValue")
# expected = True
# self.assertEqual(actual, expected)

# def test_checkExistsNotebookInvalidPath(self):
# with self.assertRaises(Exception) as context:
# checkExistsNotebook(notebookPath="StringValue2")
# expected = "Notebook cannot be found at this path. Please ensure this has been created in the appropriate directory."
# self.assertTrue(expected in str(context.exception))

# r = unittest.main(argv=[''], verbosity=2, exit=False)
# assert r.result.wasSuccessful(), 'Test failed; see logs above'

# COMMAND ----------

def compareDeltaTableSizeVsPartitionThreshold(deltaTableSizeInBytes:int, partitionByThreshold: int):
if deltaTableSizeInBytes > partitionByThreshold:
return 'Delta table bigger than partition by threshold. Partitions may be used'
elif deltaTableSizeInBytes < partitionByThreshold:
return 'Delta table samller than partition by threshold. Partitions not advised unless other requirements, such as RLS.'
elif deltaTableSizeInBytes == partitionByThreshold:
return 'Delta table equal size to partition by threshold. Partitions may be used.'
raise Exception('Unexpected state. Please investigate value provided for sizeInBytes and partitionByThreshold.')

# COMMAND ----------

# Advisory check partitionby not used for small tables (may have RLS use case)
def checkEmptyPartitionByFields(partitionList:list()) -> bool:
if partitionList == []:
print('Empty list passed to partition fields value. No action required.')
return True
elif partitionList is None:
raise Exception('PartitionBy fields input as None value. Please review.')
elif partitionList != []:
print('Non-empty list passed to partition fields value. Please confirm partitioning is required based on Delta table size and RLS requirements.')
return False
raise Exception('Unexpected state. Please investigate value provided for partitionList.')
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def checkMergeAndPKConditions(loadAction:str, pkList: list()) -> None:
pkList (list): The primary keys supplied from the payload.
if loadAction.upper() == "I" and len(pkList) > 0:
print(f'Incremental loading configured with primary keys. This is a valid combination.')
print(f'Incremental loading configured with primary/business keys. This is a valid combination.')
elif loadAction.upper() == "F" and len(pkList) > 0:
print(f'Full loading configured with primary keys. This is a valid combination.')
print(f'Full loading configured with primary/business keys. This is a valid combination.')
elif loadAction.upper() == "F" and len(pkList) == 0:
print(f'Full loading configured with no primary keys. This is a valid combination, assuming no subsequent incremental loads are due to take place.')
print(f'Full loading configured with no primary/business keys. This is a valid combination, assuming no subsequent incremental loads are due to take place.')
elif loadAction.upper() == "I" and len(pkList) == 0:
raise Exception(f'Incremental loading configured with no primary keys. This is not a valid combination and will result in merge failures as no merge criteria can be specified.')
raise Exception(f'Incremental loading configured with no primary/business keys. This is not a valid combination and will result in merge failures as no merge criteria can be specified.')
raise Exception('Unexpected state.')

Expand All @@ -52,7 +52,7 @@ def checkContainerName(containerName: str) -> None:
containers = [
if containerName in containers:
print(f'container name {containerName} is supported.')
Expand Down

0 comments on commit 8996208

Please sign in to comment.