Skip to content

Commit

Permalink
feat: foreach support and refactor model patching
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Nov 9, 2023
1 parent 29c9ff6 commit 314a1f4
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ def get_scoped_activity_result_by_name(self, name: str, state: PipelineRunState)
return next((activity_result for activity_result in state.scoped_pipeline_activity_results if activity_result.name == name), None)

def are_dependency_condition_met(self, state: PipelineRunState):
if not self.depends_on:
return True

for dependency in self.depends_on:
dependency_activity = self.get_scoped_activity_result_by_name(dependency.activity, state)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Callable
from typing import Callable, Generator

from data_factory_testing_framework.generated.models import Activity
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


Expand All @@ -9,5 +10,5 @@ class ControlActivity:
def patch_generated_models(models):
models.ControlActivity.evaluate_control_activity_iterations = ControlActivity.evaluate_control_activity_iterations

def evaluate_control_activity_iterations(self, state: PipelineRunState, evaluate: Callable):
def evaluate_control_activity_iterations(self, state: PipelineRunState, evaluate_activities: Callable[[PipelineRunState], Generator[Activity, None, None]]):
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Callable, Generator

from data_factory_testing_framework.generated.models import ForEachActivity, Activity, ControlActivity
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class ForEachActivity:

@staticmethod
def patch_generated_models(models):
models.ForEachActivity.evaluate = ForEachActivity.evaluate
models.ForEachActivity.evaluate_control_activity_iterations = ForEachActivity.evaluate_control_activity_iterations

def evaluate(self: ForEachActivity, state: PipelineRunState):
self.items.evaluate(state)

return super(ControlActivity, self).evaluate(state)

def evaluate_control_activity_iterations(self: ForEachActivity, state: PipelineRunState, evaluate_activities: Callable[[PipelineRunState], Generator[Activity, None, None]]):
for item in self.items.evaluated_items:
scoped_state = state.create_iteration_scope(item)
for activity in evaluate_activities(self.activities, scoped_state):
yield activity

state.add_scoped_activity_results_from_scoped_state(scoped_state)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
T = TypeVar("T")


class RunVariable(Generic[T]):
class PipelineRunVariable(Generic[T]):
def __init__(self, name: str, default_value: T = None):
self.name = name
self.value = default_value
20 changes: 20 additions & 0 deletions src/python/data_factory_testing_framework/models/expression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import List

from data_factory_testing_framework.generated.models import Expression
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState


class Expression:

evaluated_items: List[str] = []

@staticmethod
def patch_generated_models(models):
models.Expression.evaluate = Expression.evaluate

def evaluate(self: Expression, state: PipelineRunState):
self.evaluated_items = [
"item1",
"item2",
"item3"
]
17 changes: 17 additions & 0 deletions src/python/data_factory_testing_framework/models/patch_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from data_factory_testing_framework.generated import models as _models

from data_factory_testing_framework.models.activities.base import Activity
from data_factory_testing_framework.models.activities.control_activities.control_activity import ControlActivity
from data_factory_testing_framework.models.activities.control_activities.execute_pipeline_activity import \
ExecutePipelineActivity
from data_factory_testing_framework.models.activities.control_activities.for_each_activity import ForEachActivity
from data_factory_testing_framework.models.expression import Expression


# Patch models with our custom classes
def patch_models():
Activity.patch_generated_models(_models)
ExecutePipelineActivity.patch_generated_models(_models)
ControlActivity.patch_generated_models(_models)
ForEachActivity.patch_generated_models(_models)
Expression.patch_generated_models(_models)
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import os

from data_factory_testing_framework.generated import Deserializer
from data_factory_testing_framework.models.patch_models import patch_models
from data_factory_testing_framework.models.repositories.data_factory_repository import DataFactoryRepository
from data_factory_testing_framework.models.repositories.models_repository import ModelsRepository
from data_factory_testing_framework.generated import models as _models

models_repository = ModelsRepository()
deserializer = Deserializer(models_repository.get_models())
patch_models()
models = {k: v for k, v in _models.__dict__.items() if isinstance(v, type)}
deserializer = Deserializer(models)


class DataFactoryRepositoryFactory:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,27 @@

from data_factory_testing_framework.generated.models import VariableSpecification
from data_factory_testing_framework.models.base.run_parameter import RunParameter
from data_factory_testing_framework.models.base.run_variable import RunVariable
from data_factory_testing_framework.models.base.pipeline_run_variable import PipelineRunVariable
from data_factory_testing_framework.models.state.run_state import RunState


class PipelineRunState(RunState):
def __init__(self, parameters: List[RunParameter] = [], variables: Dict[str, VariableSpecification] = []):
def __init__(self, parameters: List[RunParameter] = [], variables: Dict[str, VariableSpecification] = [], pipeline_activity_results: List[RunState] = [], iteration_item: str = None):
super().__init__(parameters)
self.variables = []
for variable in variables:
self.variables.append(RunVariable(variable.name, variable.default_value))
self.variables.append(PipelineRunVariable(variable.name, variable.value))

self.pipeline_activity_results = []
self.pipeline_activity_results = pipeline_activity_results
self.scoped_pipeline_activity_results = []
self.iteration_item = None
self.iteration_item = iteration_item

def add_activity_result(self, activity):
self.pipeline_activity_results.append(activity)
self.scoped_pipeline_activity_results.append(activity)

def create_iteration_scope(self, iteration_item: str):
return PipelineRunState(self.parameters, self.variables, self.pipeline_activity_results, iteration_item)

def add_scoped_activity_results_from_scoped_state(self, scoped_state):
self.pipeline_activity_results.extend(scoped_state.pipeline_activity_results)
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,17 @@

class TestFramework:

def __init__(self, data_factory_folder_path: str, should_evaluate_child_pipelines: bool = False):
self.repository = DataFactoryRepositoryFactory.parse_from_folder(data_factory_folder_path)
def __init__(self, data_factory_folder_path: str = None, should_evaluate_child_pipelines: bool = False):
self.repository = data_factory_folder_path is not None and DataFactoryRepositoryFactory.parse_from_folder(data_factory_folder_path)
self.should_evaluate_child_pipelines = should_evaluate_child_pipelines

def evaluate_activity(self, activity: Activity) -> DependencyCondition:
self.evaluate_activities([activity])
return activity.status
def evaluate_activity(self, activity: Activity, state: PipelineRunState) -> List[Activity]:
return self.evaluate_activities([activity], state)

def evaluate_pipeline(self, pipeline: PipelineResource, state: PipelineRunState):
def evaluate_pipeline(self, pipeline: PipelineResource, state: PipelineRunState) -> List[Activity]:
return self.evaluate_activities(pipeline.activities, state)

def evaluate_activities(self, activities: List[Activity], state: PipelineRunState) -> DependencyCondition:
print("hi")
def evaluate_activities(self, activities: List[Activity], state: PipelineRunState) -> List[Activity]:
while len(state.scoped_pipeline_activity_results) != len(activities):
any_activity_evaluated = False
for activity in filter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from data_factory_testing_framework.generated.models import ForEachActivity, Expression, ExpressionType, \
SetVariableActivity, DataFactoryElement
from data_factory_testing_framework.models.base.pipeline_run_variable import PipelineRunVariable
from data_factory_testing_framework.models.state.pipeline_run_state import PipelineRunState
from data_factory_testing_framework.models.test_framework import TestFramework


class TestForEachActivity:

def test_when_evaluate_child_activities_then_should_return_the_activity_with_item_expression_evaluated(self):
# Arrange
test_framework = TestFramework()
for_each_activity = ForEachActivity(name="ForEachActivity",
items=Expression(type=ExpressionType.EXPRESSION,
value="@split('a,b,c', ',')"),
activities=[
SetVariableActivity(name="setVariable", variable_name="variable",
value=DataFactoryElement[str]("item()"),
depends_on=[])
],
depends_on=[])
state = PipelineRunState()
state.variables.append(PipelineRunVariable("variable", ""))

# Act
activities = test_framework.evaluate_activity(for_each_activity, state)

# Assert
set_variable_activity = next(activities)
assert set_variable_activity is not None
assert set_variable_activity.name == "setVariable"
# assert set_variable_activity.Value == "a"

set_variable_activity = next(activities)
assert set_variable_activity is not None
assert set_variable_activity.name == "setVariable"
# assert set_variable_activity.Value == "b"

set_variable_activity = next(activities)
assert set_variable_activity is not None
assert set_variable_activity.name == "setVariable"
# assert set_variable_activity.Value == "c"

# Assert that there are no more activities
try:
next(activities)
assert False # This line should not be reached, an exception should be raised
except StopIteration:
pass

0 comments on commit 314a1f4

Please sign in to comment.