Skip to content

Commit

Permalink
feat: pipeline activity
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 10, 2023
1 parent 036c370 commit 4a5f63f
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class ActivityNotFoundException(Exception):
def __init__(self, activity_name):
super().__init__(f"Activity with name {activity_name} not found")
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
class PipelineNotFoundException(Exception):
def __init__(self, pipeline_name):
super().__init__(f"Pipeline with name {pipeline_name} not found")

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import List

from data_factory_testing_framework.generated.models import ExecutePipelineActivity
from data_factory_testing_framework.models.base.parameter_type import ParameterType
from data_factory_testing_framework.models.base.run_parameter_type import RunParameterType
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState

Expand All @@ -11,7 +11,7 @@ class ExecutePipelineActivity:
def get_child_run_parameters(self, state: PipelineRunState) -> List[RunParameter]:
child_parameters = []
for parameter in state.parameters:
if parameter.type == ParameterType.Global:
child_parameters.append(RunParameter(ParameterType.Global, parameter.name, parameter.value))
if parameter.type == RunParameterType.Global:
child_parameters.append(RunParameter(RunParameterType.Global, parameter.name, parameter.value))

return child_parameters
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import TypeVar, Generic

from data_factory_testing_framework.models.base.parameter_type import ParameterType
from data_factory_testing_framework.models.base.run_parameter_type import RunParameterType

T = TypeVar("T")


class RunParameter(Generic[T]):
def __init__(self, type: ParameterType, name: str, value: T):
def __init__(self, type: RunParameterType, name: str, value: T):
self.type = type
self.name = name
self.value = value
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from azure.core import CaseInsensitiveEnumMeta


class ParameterType(str, Enum, metaclass=CaseInsensitiveEnumMeta):
class RunParameterType(str, Enum, metaclass=CaseInsensitiveEnumMeta):
Pipeline = "Pipeline"
Global = "Global"
Dataset = "Dataset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from data_factory_testing_framework.models.activities.control_activities.until_activity import UntilActivity
from data_factory_testing_framework.models.activities.set_variable_activity import SetVariableActivity
from data_factory_testing_framework.models.expression import Expression
from data_factory_testing_framework.models.pipelines.pipeline_resource import PipelineResource


# Patch models with our custom classes
Expand All @@ -22,6 +23,7 @@ def patch_models():
patch_model(_models.IfConditionActivity, IfConditionActivity)
patch_model(_models.UntilActivity, UntilActivity)
patch_model(_models.SetVariableActivity, SetVariableActivity)
patch_model(_models.PipelineResource, PipelineResource)


def patch_model(main_class, partial_class):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from typing import List

from data_factory_testing_framework.exceptions.activity_not_found_exception import ActivityNotFoundException
from data_factory_testing_framework.generated.models import PipelineResource, Activity
from data_factory_testing_framework.models.base.run_parameter import RunParameter


class PipelineResource:
def get_activity_by_name(self: PipelineResource, name: str) -> Activity:
for activity in self.activities:
if activity.name == name:
return activity

raise ActivityNotFoundException(f"Activity with name {name} not found")

def validate_parameters(self: PipelineResource, parameters: List[RunParameter]):
# Check if all parameters are provided
for pipeline_parameter_name, pipeline_parameter_specification in self.parameters.items():
found = False
for parameter in parameters:
if pipeline_parameter_name == parameter.name and pipeline_parameter_specification.type == parameter.type:
found = True
break

if not found:
raise ValueError(f"Parameter with name '{pipeline_parameter_name}' and type '{pipeline_parameter_specification.type}' not found in pipeline '{self.name}'")

# Check if no duplicate parameters are provided
for parameter in parameters:
if sum(1 for p in parameters if p.name == parameter.name and p.type == parameter.type) > 1:
raise ValueError(f"Duplicate parameter with name '{parameter.name}' and type '{parameter.type}' found in pipeline '{self.name}'")
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List

from data_factory_testing_framework.exceptions.pipeline_not_found_exception import PipelineNotFoundException
from data_factory_testing_framework.generated.models import PipelineResource


Expand All @@ -12,4 +13,4 @@ def get_pipeline_by_name(self, name: str) -> PipelineResource:
if pipeline.name == name:
return pipeline

raise Exception(f"Pipeline with name {name} not found")
raise PipelineNotFoundException(f"Pipeline with name {name} not found")
46 changes: 23 additions & 23 deletions src/python/example/batch_job/batchjob_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from data_factory_testing_framework.generated.models import WebActivity, DependencyCondition, SetVariableActivity
from data_factory_testing_framework.models.base.parameter_type import ParameterType
from data_factory_testing_framework.models.base.run_parameter_type import RunParameterType
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState
from data_factory_testing_framework.models.test_framework import TestFramework
Expand All @@ -12,28 +12,28 @@ def test_batch_job_pipeline(self):
test_framework = TestFramework("pipelines")
pipeline = test_framework.repository.get_pipeline_by_name("batch_job")
state = PipelineRunState(parameters=[
RunParameter(ParameterType.Pipeline, "BatchPoolId", "batch-pool-id"),
RunParameter(ParameterType.Pipeline, "WorkloadApplicationPackageName", "test-application"),
RunParameter(ParameterType.Pipeline, "WorkloadApplicationPackageVersion", "1.5.0"),
RunParameter(ParameterType.Pipeline, "ManagerApplicationPackageName", "batchmanager"),
RunParameter(ParameterType.Pipeline, "ManagerApplicationPackageVersion", "2.0.0"),
RunParameter(ParameterType.Pipeline, "ManagerTaskParameters", "--parameter1 dummy --parameter2 another-dummy"),
RunParameter(ParameterType.Pipeline, "JobId", "802100a5-ec79-4a52-be62-8d6109f3ff9a"),
RunParameter(ParameterType.Pipeline, "TaskOutputFolderPrefix", "TASKOUTPUT_"),
RunParameter(ParameterType.Pipeline, "WorkloadUserAssignedIdentityName", "test-application-identity-name"),
RunParameter(ParameterType.Pipeline, "WorkloadUserAssignedIdentityClientId", "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name"),
RunParameter(ParameterType.Pipeline, "JobAdditionalEnvironmentSettings", "[]"),
RunParameter(ParameterType.Pipeline, "OutputStorageAccountName", "test-application-output-storage-account-name"),
RunParameter(ParameterType.Pipeline, "OutputContainerName", "test-application-output-container-name"),
RunParameter(ParameterType.Pipeline, "OutputFolderName", "TEMP"),
RunParameter(ParameterType.Pipeline, "BatchJobTimeout", "PT4H"),
RunParameter(ParameterType.Global, "BatchStorageAccountName", "batch-account-name"),
RunParameter(ParameterType.Global, "BatchAccountSubscription", "SUBSCRIPTION_ID"),
RunParameter(ParameterType.Global, "BatchAccountResourceGroup", "RESOURCE_GROUP"),
RunParameter(ParameterType.Global, "BatchURI", "https://batch-account-name.westeurope.batch.azure.com"),
RunParameter(ParameterType.Global, "ADFSubscription", "bd19dba4-89ad-4976-b862-848bf43a4340"),
RunParameter(ParameterType.Global, "ADFResourceGroup", "adf-rg"),
RunParameter(ParameterType.Global, "ADFName", "adf-name"),
RunParameter(RunParameterType.Pipeline, "BatchPoolId", "batch-pool-id"),
RunParameter(RunParameterType.Pipeline, "WorkloadApplicationPackageName", "test-application"),
RunParameter(RunParameterType.Pipeline, "WorkloadApplicationPackageVersion", "1.5.0"),
RunParameter(RunParameterType.Pipeline, "ManagerApplicationPackageName", "batchmanager"),
RunParameter(RunParameterType.Pipeline, "ManagerApplicationPackageVersion", "2.0.0"),
RunParameter(RunParameterType.Pipeline, "ManagerTaskParameters", "--parameter1 dummy --parameter2 another-dummy"),
RunParameter(RunParameterType.Pipeline, "JobId", "802100a5-ec79-4a52-be62-8d6109f3ff9a"),
RunParameter(RunParameterType.Pipeline, "TaskOutputFolderPrefix", "TASKOUTPUT_"),
RunParameter(RunParameterType.Pipeline, "WorkloadUserAssignedIdentityName", "test-application-identity-name"),
RunParameter(RunParameterType.Pipeline, "WorkloadUserAssignedIdentityClientId", "/subscriptions/SUBSCRIPTION_ID/resourcegroups/RESOURCE_GROUP/providers/Microsoft.ManagedIdentity/userAssignedIdentities/test-application-identity-name"),
RunParameter(RunParameterType.Pipeline, "JobAdditionalEnvironmentSettings", "[]"),
RunParameter(RunParameterType.Pipeline, "OutputStorageAccountName", "test-application-output-storage-account-name"),
RunParameter(RunParameterType.Pipeline, "OutputContainerName", "test-application-output-container-name"),
RunParameter(RunParameterType.Pipeline, "OutputFolderName", "TEMP"),
RunParameter(RunParameterType.Pipeline, "BatchJobTimeout", "PT4H"),
RunParameter(RunParameterType.Global, "BatchStorageAccountName", "batch-account-name"),
RunParameter(RunParameterType.Global, "BatchAccountSubscription", "SUBSCRIPTION_ID"),
RunParameter(RunParameterType.Global, "BatchAccountResourceGroup", "RESOURCE_GROUP"),
RunParameter(RunParameterType.Global, "BatchURI", "https://batch-account-name.westeurope.batch.azure.com"),
RunParameter(RunParameterType.Global, "ADFSubscription", "bd19dba4-89ad-4976-b862-848bf43a4340"),
RunParameter(RunParameterType.Global, "ADFResourceGroup", "adf-rg"),
RunParameter(RunParameterType.Global, "ADFName", "adf-name"),
])

# Act
Expand Down
65 changes: 65 additions & 0 deletions src/python/tests/models/pipelines/test_pipeline_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pytest

from data_factory_testing_framework.generated.models import PipelineResource, ParameterSpecification
from data_factory_testing_framework.models.base.run_parameter_type import RunParameterType
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.test_framework import TestFramework

TestFramework()

class TestPipeline:
def test_when_validate_parameters_is_accurate_should_pass(self):
# Arrange
pipeline = PipelineResource(
name="pipeline",
parameters={
"pipelineParameterName": ParameterSpecification(type=RunParameterType.Pipeline),
"pipelineParameterName2": ParameterSpecification(type=RunParameterType.Pipeline)
}
)

# Act
pipeline.validate_parameters([
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
RunParameter(RunParameterType.Pipeline, "pipelineParameterName2", "pipelineParameterValue2")
])

def test_when_validate_parameters_is_missing_run_parameter_should_throw_error(self):
# Arrange
pipeline = PipelineResource(
parameters={
"pipelineParameterName": ParameterSpecification(type=RunParameterType.Pipeline),
"pipelineParameterName2": ParameterSpecification(type=RunParameterType.Pipeline)
}
)
pipeline.name = "pipeline"

# Act
with pytest.raises(ValueError) as exception_info:
pipeline.validate_parameters([
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
])

# Assert
assert exception_info.value.args[0] == "Parameter with name 'pipelineParameterName2' and type 'RunParameterType.Pipeline' not found in pipeline 'pipeline'"

def test_when_duplicate_parameters_supplied_should_throw_error(self):
# Arrange
pipeline = PipelineResource(
parameters={
"pipelineParameterName": ParameterSpecification(type=RunParameterType.Pipeline),
"pipelineParameterName2": ParameterSpecification(type=RunParameterType.Pipeline)
}
)
pipeline.name = "pipeline"

# Act
with pytest.raises(ValueError) as exception_info:
pipeline.validate_parameters([
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
RunParameter(RunParameterType.Pipeline, "pipelineParameterName", "pipelineParameterValue"),
RunParameter(RunParameterType.Pipeline, "pipelineParameterName2", "pipelineParameterValue2")
])

# Assert
assert exception_info.value.args[0] == "Duplicate parameter with name 'pipelineParameterName' and type 'RunParameterType.Pipeline' found in pipeline 'pipeline'"

0 comments on commit 4a5f63f

Please sign in to comment.