diff --git a/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py b/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py
index 4548aff1..6b8feebf 100644
--- a/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py
+++ b/examples/data_factory/copy_blobs/test_data_factory_copy_blobs_unit.py
@@ -4,7 +4,6 @@
from data_factory_testing_framework.models.activities import Activity, ForEachActivity
from data_factory_testing_framework.state import (
PipelineRunState,
- PipelineRunVariable,
RunParameter,
RunParameterType,
)
@@ -27,9 +26,6 @@ def test_list_blobs(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("List Folders")
state = PipelineRunState(
- variables=[
- PipelineRunVariable(name="SourceContainerName", default_value="source"),
- ],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
@@ -55,9 +51,6 @@ def test_for_each(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("For Each SourceFolder")
state = PipelineRunState(
- variables=[
- PipelineRunVariable(name="SourceContainerName", default_value="source"),
- ],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
diff --git a/examples/synapse/copy_blobs/README.md b/examples/synapse/copy_blobs/README.md
new file mode 100644
index 00000000..83c4f396
--- /dev/null
+++ b/examples/synapse/copy_blobs/README.md
@@ -0,0 +1,12 @@
+# Copy Blobs
+
+This is an example pipeline which intends to list all the blobs in a given container and copies these blobs to another container
+
+![image](copy_blobs.png)
+
+The pipeline has two activities:
+
+1. **List folders**: Web activity to list all blobs in a container that has a given prefix
+2. **For each activity**: Iterates over each item in the list returned above and executes the sub-activity on each item.
+
+ 2.1. **Copy files to destination**: Copy activity which copies the blobs to a given destination.
diff --git a/examples/synapse/copy_blobs/copy_blobs.png b/examples/synapse/copy_blobs/copy_blobs.png
new file mode 100644
index 00000000..068725e4
Binary files /dev/null and b/examples/synapse/copy_blobs/copy_blobs.png differ
diff --git a/examples/synapse/copy_blobs/pipeline/copy_blobs.json b/examples/synapse/copy_blobs/pipeline/copy_blobs.json
new file mode 100644
index 00000000..3fcaaf00
--- /dev/null
+++ b/examples/synapse/copy_blobs/pipeline/copy_blobs.json
@@ -0,0 +1,172 @@
+{
+ "name": "copy_blobs",
+ "properties": {
+ "activities": [
+ {
+ "name": "List Folders",
+ "type": "WebActivity",
+ "dependsOn": [],
+ "policy": {
+ "timeout": "0.12:00:00",
+ "retry": 0,
+ "retryIntervalInSeconds": 30,
+ "secureOutput": false,
+ "secureInput": false
+ },
+ "userProperties": [],
+ "typeProperties": {
+ "method": "GET",
+ "headers": {
+ "x-ms-version": "2023-01-03"
+ },
+ "url": {
+ "value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')",
+ "type": "Expression"
+ },
+ "connectVia": {
+ "referenceName": "AutoResolveIntegrationRuntime",
+ "type": "IntegrationRuntimeReference"
+ },
+ "authentication": {
+ "type": "MSI",
+ "resource": "https://storage.azure.com"
+ }
+ }
+ },
+ {
+ "name": "For Each SourceFolder",
+ "type": "ForEach",
+ "dependsOn": [
+ {
+ "activity": "List Folders",
+ "dependencyConditions": [
+ "Succeeded"
+ ]
+ }
+ ],
+ "userProperties": [],
+ "typeProperties": {
+ "items": {
+ "value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')",
+ "type": "Expression"
+ },
+ "activities": [
+ {
+ "name": "Copy files to Destination",
+ "type": "Copy",
+ "dependsOn": [],
+ "policy": {
+ "timeout": "0.12:00:00",
+ "retry": 0,
+ "retryIntervalInSeconds": 30,
+ "secureOutput": false,
+ "secureInput": false
+ },
+ "userProperties": [],
+ "typeProperties": {
+ "source": {
+ "type": "ParquetSource",
+ "storeSettings": {
+ "type": "AzureBlobStorageReadSettings",
+ "recursive": true,
+ "wildcardFolderPath": {
+ "value": "@item()",
+ "type": "Expression"
+ },
+ "wildcardFileName": "*.parquet"
+ },
+ "formatSettings": {
+ "type": "ParquetReadSettings"
+ }
+ },
+ "sink": {
+ "type": "ParquetSink",
+ "storeSettings": {
+ "type": "AzureBlobStorageWriteSettings",
+ "copyBehavior": "FlattenHierarchy"
+ },
+ "formatSettings": {
+ "type": "ParquetWriteSettings"
+ }
+ },
+ "enableStaging": false,
+ "translator": {
+ "type": "TabularTranslator",
+ "typeConversion": true,
+ "typeConversionSettings": {
+ "allowDataTruncation": true,
+ "treatBooleanAsNumber": false
+ }
+ }
+ },
+ "inputs": [
+ {
+ "referenceName": "Binary",
+ "type": "DatasetReference",
+ "parameters": {
+ "ServiceURI": {
+ "value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net')",
+ "type": "Expression"
+ },
+ "ContainerName": {
+ "value": "@pipeline().parameters.SourceContainerName",
+ "type": "Expression"
+ },
+ "FolderName": {
+ "value": "@pipeline().parameters.SourceFolderPrefix",
+ "type": "Expression"
+ }
+ }
+ }
+ ],
+ "outputs": [
+ {
+ "referenceName": "Binary",
+ "type": "DatasetReference",
+ "parameters": {
+ "ServiceURI": {
+ "value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')",
+ "type": "Expression"
+ },
+ "ContainerName": {
+ "value": "@pipeline().parameters.SinkContainerName",
+ "type": "Expression"
+ },
+ "FolderName": {
+ "value": "@pipeline().parameters.SinkFolderName",
+ "type": "Expression"
+ }
+ }
+ }
+ ]
+ }
+ ]
+ }
+ }
+ ],
+ "parameters": {
+ "SourceContainerName": {
+ "type": "string"
+ },
+ "SourceFolderPrefix": {
+ "type": "string"
+ },
+ "SinkStorageAccountName": {
+ "type": "string"
+ },
+ "SinkContainerName": {
+ "type": "string"
+ },
+ "SinkFolderName": {
+ "type": "string"
+ },
+ "SourceStorageAccountName": {
+ "type": "string"
+ }
+ },
+ "folder": {
+ "name": "batch"
+ },
+ "annotations": []
+ }
+}
diff --git a/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py b/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py
new file mode 100644
index 00000000..50911328
--- /dev/null
+++ b/examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py
@@ -0,0 +1,75 @@
+import pytest
+from data_factory_testing_framework import TestFramework, TestFrameworkType
+from data_factory_testing_framework.state import (
+ DependencyCondition,
+ RunParameter,
+ RunParameterType,
+)
+
+
+def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None:
+ # Arrange
+ test_framework = TestFramework(
+ framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname
+ )
+ pipeline = test_framework.get_pipeline_by_name("copy_blobs")
+
+ # Act
+ activities = test_framework.evaluate_pipeline(
+ pipeline=pipeline,
+ parameters=[
+ RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorageaccount"),
+ RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"),
+ RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"),
+ RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"),
+ RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
+ RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
+ ],
+ )
+
+ # Assert
+ list_folder_activity = next(activities)
+ assert list_folder_activity.name == "List Folders"
+ assert (
+ list_folder_activity.type_properties["url"].result
+ == "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter"
+ )
+ assert list_folder_activity.type_properties["method"] == "GET"
+ list_folder_activity.set_result(
+ result=DependencyCondition.SUCCEEDED,
+ output={
+ "Response": """
+
+ testfolder
+ $SourceBlobDelimiter
+
+
+ testfolder_1/$SourceBlobDelimiter
+
+
+ testfolder_2/$SourceBlobDelimiter
+
+
+
+ """
+ },
+ )
+
+ copy_activity = next(activities)
+
+ assert copy_activity.name == "Copy files to Destination"
+ assert copy_activity.type == "Copy"
+ assert (
+ copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
+ == "testfolder_1/$SourceBlobDelimiter"
+ )
+
+ copy_activity = next(activities)
+ assert copy_activity.name == "Copy files to Destination"
+ assert copy_activity.type == "Copy"
+ assert (
+ copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
+ == "testfolder_2/$SourceBlobDelimiter"
+ )
+
+ pytest.raises(StopIteration, lambda: next(activities))
diff --git a/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py b/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py
new file mode 100644
index 00000000..4870fd8b
--- /dev/null
+++ b/examples/synapse/copy_blobs/test_synapse_copy_blobs_unit.py
@@ -0,0 +1,131 @@
+import pytest
+from data_factory_testing_framework import TestFramework, TestFrameworkType
+from data_factory_testing_framework.models import Pipeline
+from data_factory_testing_framework.models.activities import Activity, ForEachActivity
+from data_factory_testing_framework.state import (
+ PipelineRunState,
+ RunParameter,
+ RunParameterType,
+)
+
+
+@pytest.fixture
+def test_framework(request: pytest.FixtureRequest) -> TestFramework:
+ return TestFramework(
+ framework_type=TestFrameworkType.Synapse,
+ root_folder_path=request.fspath.dirname,
+ )
+
+
+@pytest.fixture
+def pipeline(test_framework: TestFramework) -> Pipeline:
+ return test_framework.get_pipeline_by_name("copy_blobs")
+
+
+def test_list_blobs(pipeline: Pipeline) -> None:
+ # Arrange
+ activity = pipeline.get_activity_by_name("List Folders")
+ state = PipelineRunState(
+ parameters=[
+ RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"),
+ RunParameter(
+ RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
+ ),
+ RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"),
+ ],
+ )
+
+ # Act
+ activity.evaluate(state)
+
+ # Assert
+ assert activity.name == "List Folders"
+ assert (
+ activity.type_properties["url"].result
+ == "https://sourcestorage.blob.core.windows.net/container-8b6b545b-c583-4a06-adf7-19ff41370aba?restype=container&comp=list&prefix=testfolder&delimiter=$SourceBlobDelimiter"
+ )
+ assert activity.type_properties["method"] == "GET"
+
+
+def test_for_each(pipeline: Pipeline) -> None:
+ # Arrange
+ activity = pipeline.get_activity_by_name("For Each SourceFolder")
+ state = PipelineRunState(
+ parameters=[
+ RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"),
+ RunParameter(
+ RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
+ ),
+ RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"),
+ ]
+ )
+ state.add_activity_result(
+ activity_name="List Folders",
+ status="Succeeded",
+ output={
+ "Response": """
+
+ testfolder
+ $SourceBlobDelimiter
+
+
+ testfolder_1/$SourceBlobDelimiter
+
+
+ testfolder_2/$SourceBlobDelimiter
+
+
+
+ """
+ },
+ )
+
+ # Act
+ activity.evaluate(state)
+
+ # Assert
+ assert activity.name == "For Each SourceFolder"
+ assert activity.type_properties["items"].result == [
+ "testfolder_1/$SourceBlobDelimiter",
+ "testfolder_2/$SourceBlobDelimiter",
+ ]
+ assert len(activity.type_properties["activities"]) == 1
+ assert activity.type_properties["activities"][0]["name"] == "Copy files to Destination"
+ assert activity.type_properties["activities"][0]["type"] == "Copy"
+
+
+def _get_child_activity_by_name(foreach_activity: ForEachActivity, name: str) -> Activity:
+ return next(activity for activity in foreach_activity.activities if activity.name == name)
+
+
+@pytest.mark.parametrize(
+ "wildcardfolderpath",
+ [
+ ("testfolder_1/$SourceBlobDelimiter"),
+ ("testfolder_2/$SourceBlobDelimiter"),
+ ],
+)
+def test_copy_blobs_activity(pipeline: Pipeline, wildcardfolderpath: str) -> None:
+ # Arrange
+ foreach_activity = pipeline.get_activity_by_name("For Each SourceFolder")
+ activity = _get_child_activity_by_name(foreach_activity, "Copy files to Destination")
+ state = PipelineRunState(
+ parameters=[
+ RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorage"),
+ RunParameter(
+ RunParameterType.Pipeline, "SourceContainerName", "container-8b6b545b-c583-4a06-adf7-19ff41370aba"
+ ),
+ RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "testfolder"),
+ RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorage"),
+ RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
+ RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
+ ],
+ iteration_item=wildcardfolderpath,
+ )
+
+ # Act
+ activity.evaluate(state)
+
+ # Assert
+ assert activity.name == "Copy files to Destination"
+ assert activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result == wildcardfolderpath