diff --git a/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py b/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py index 4548aff1..ef260970 100644 --- a/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py +++ b/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py @@ -27,9 +27,6 @@ def test_list_blobs(pipeline: Pipeline) -> None: # Arrange activity = pipeline.get_activity_by_name("List Folders") state = PipelineRunState( - variables=[ - PipelineRunVariable(name="SourceContainerName", default_value="source"), - ], parameters=[ RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"), RunParameter( @@ -55,9 +52,6 @@ def test_for_each(pipeline: Pipeline) -> None: # Arrange activity = pipeline.get_activity_by_name("For Each SourceFolder") state = PipelineRunState( - variables=[ - PipelineRunVariable(name="SourceContainerName", default_value="source"), - ], parameters=[ RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"), RunParameter( diff --git a/examples/synapse/copy_blobs/pipeline/copy_blobs.json b/examples/synapse/copy_blobs/pipeline/copy_blobs.json new file mode 100644 index 00000000..ec07c6d9 --- /dev/null +++ b/examples/synapse/copy_blobs/pipeline/copy_blobs.json @@ -0,0 +1,172 @@ +{ + "name": "copy_blobs", + "properties": { + "activities": [ + { + "name": "List Folders", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "headers": { + "x-ms-version": "2023-01-03" + }, + "url": { + "value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')", + "type": "Expression" + }, + "connectVia": { + "referenceName": "AutoResolveIntegrationRuntime", + "type": "IntegrationRuntimeReference" + }, + "authentication": { + "type": "MSI", + "resource": "https://storage.azure.com" + } + } + }, + { + "name": "For Each SourceFolder", + "type": "ForEach", + "dependsOn": [ + { + "activity": "List Folders", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "items": { + "value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')", + "type": "Expression" + }, + "activities": [ + { + "name": "Copy files to Destination", + "type": "Copy", + "dependsOn": [], + "policy": { + "timeout": "0.12:00:00", + "retry": 0, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "source": { + "type": "ParquetSource", + "storeSettings": { + "type": "AzureBlobStorageReadSettings", + "recursive": true, + "wildcardFolderPath": { + "value": "@item()", + "type": "Expression" + }, + "wildcardFileName": "*.parquet" + }, + "formatSettings": { + "type": "ParquetReadSettings" + } + }, + "sink": { + "type": "ParquetSink", + "storeSettings": { + "type": "AzureBlobStorageWriteSettings", + "copyBehavior": "FlattenHierarchy" + }, + "formatSettings": { + "type": "ParquetWriteSettings" + } + }, + "enableStaging": false, + "translator": { + "type": "TabularTranslator", + "typeConversion": true, + "typeConversionSettings": { + "allowDataTruncation": true, + "treatBooleanAsNumber": false + } + } + }, + "inputs": [ + { + "referenceName": "Binary", + "type": "DatasetReference", + "parameters": { + "ServiceURI": { + "value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net')", + "type": "Expression" + }, + "ContainerName": { + "value": "@pipeline().parameters.SourceContainerName", + "type": "Expression" + }, + "FolderName": { + "value": "@pipeline().parameters.SourceFolderPrefix", + "type": "Expression" + } + } + } + ], + "outputs": [ + { + "referenceName": "Binary", + "type": "DatasetReference", + "parameters": { + "ServiceURI": { + "value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')", + "type": "Expression" + }, + "ContainerName": { + "value": "@pipeline().parameters.SinkContainerName", + "type": "Expression" + }, + "FolderName": { + "value": "@pipeline().parameters.SinkFolderName", + "type": "Expression" + } + } + } + ] + } + ] + } + } + ], + "parameters": { + "SourceContainerName": { + "type": "string" + }, + "SourceFolderPrefix": { + "type": "string" + }, + "SinkStorageAccountName": { + "type": "string" + }, + "SinkContainerName": { + "type": "string" + }, + "SinkFolderName": { + "type": "string" + }, + "SourceStorageAccountName": { + "type": "string" + } + }, + "folder": { + "name": "batch" + }, + "annotations": [] + } +} \ No newline at end of file diff --git a/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py b/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py new file mode 100644 index 00000000..50911328 --- /dev/null +++ b/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py @@ -0,0 +1,75 @@ +import pytest +from data_factory_testing_framework import TestFramework, TestFrameworkType +from data_factory_testing_framework.state import ( + DependencyCondition, + RunParameter, + RunParameterType, +) + + +def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None: + # Arrange + test_framework = TestFramework( + framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname + ) + pipeline = test_framework.get_pipeline_by_name("copy_blobs") + + # Act + activities = test_framework.evaluate_pipeline( + pipeline=pipeline, + parameters=[ + RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorageaccount"), + RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"), + RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"), + RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"), + RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"), + RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"), + ], + ) + + # Assert + list_folder_activity = next(activities) + assert list_folder_activity.name == "List Folders" + assert ( + list_folder_activity.type_properties["url"].result + == "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter" + ) + assert list_folder_activity.type_properties["method"] == "GET" + list_folder_activity.set_result( + result=DependencyCondition.SUCCEEDED, + output={ + "Response": """ + + testfolder + $SourceBlobDelimiter + + + testfolder_1/$SourceBlobDelimiter + + + testfolder_2/$SourceBlobDelimiter + + + + """ + }, + ) + + copy_activity = next(activities) + + assert copy_activity.name == "Copy files to Destination" + assert copy_activity.type == "Copy" + assert ( + copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result + == "testfolder_1/$SourceBlobDelimiter" + ) + + copy_activity = next(activities) + assert copy_activity.name == "Copy files to Destination" + assert copy_activity.type == "Copy" + assert ( + copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result + == "testfolder_2/$SourceBlobDelimiter" + ) + + pytest.raises(StopIteration, lambda: next(activities)) diff --git a/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py b/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py new file mode 100644 index 00000000..825a464c --- /dev/null +++ b/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py @@ -0,0 +1,131 @@ +import pytest +from data_factory_testing_framework import TestFramework, TestFrameworkType +from data_factory_testing_framework.models import Pipeline +from data_factory_testing_framework.models.activities import Activity, ForEachActivity +from data_factory_testing_framework.state import ( + PipelineRunState, + PipelineRunVariable, + RunParameter, + RunParameterType, +) + + +@pytest.fixture +def test_framework(request: pytest.FixtureRequest) -> TestFramework: + return TestFramework( + framework_type=TestFrameworkType.Synapse, + root_folder_path=request.fspath.dirname, + ) + + +@pytest.fixture +def pipeline(test_framework: TestFramework) -> Pipeline: + return test_framework.get_pipeline_by_name("copy_blobs") + + +def test_list_blobs(pipeline: Pipeline) -> None: + # Arrange + activity = pipeline.get_activity_by_name("List Folders") + state = PipelineRunState( + parameters=[ + RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"), + RunParameter( + RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba" + ), + RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"), + ], + ) + + # Act + activity.evaluate(state) + + # Assert + assert activity.name == "List Folders" + assert ( + activity.type_properties["url"].result + == "https://sourcestorage.blob.core.windows.net/container-8b6b545b-c583-4a06-adf7-19ff41370aba?restype=container&comp=list&prefix=testfolder&delimiter=$SourceBlobDelimiter" + ) + assert activity.type_properties["method"] == "GET" + +def test_for_each(pipeline: Pipeline) -> None: + #Arrange + activity = pipeline.get_activity_by_name("For Each SourceFolder") + state = PipelineRunState( + parameters=[ + RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"), + RunParameter( + RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba" + ), + RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder") + ] + ) + state.add_activity_result( + activity_name="List Folders", + status="Succeeded", + output={ + "Response": """ + + testfolder + $SourceBlobDelimiter + + + testfolder_1/$SourceBlobDelimiter + + + testfolder_2/$SourceBlobDelimiter + + + + """ + }, + ) + + # Act + activity.evaluate(state) + + # Assert + assert activity.name == "For Each SourceFolder" + assert activity.type_properties["items"].result == [ + "testfolder_1/$SourceBlobDelimiter", + "testfolder_2/$SourceBlobDelimiter", + ] + assert len(activity.type_properties["activities"]) == 1 + assert activity.type_properties["activities"][0]["name"] == "Copy files to Destination" + assert activity.type_properties["activities"][0]["type"] == "Copy" + + +def _get_child_activity_by_name(foreach_activity: ForEachActivity, name: str) -> Activity: + return next(activity for activity in foreach_activity.activities if activity.name == name) + + +@pytest.mark.parametrize( + "wildcardfolderpath", + [ + ("testfolder_1/$SourceBlobDelimiter"), + ("testfolder_2/$SourceBlobDelimiter"), + ], +) +def test_copy_blobs_activity(pipeline: Pipeline, wildcardfolderpath: str) -> None: + # Arrange + foreach_activity = pipeline.get_activity_by_name("For Each SourceFolder") + activity = _get_child_activity_by_name(foreach_activity, "Copy files to Destination") + state = PipelineRunState( + parameters=[ + RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"), + RunParameter( + RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba" + ), + RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"), + RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorage"), + RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"), + RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"), + ], + iteration_item=wildcardfolderpath, + ) + + # Act + activity.evaluate(state) + + # Assert + assert activity.name == "Copy files to Destination" + assert activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result == wildcardfolderpath