Skip to content

Commit

Permalink
refactor: move pipeline evaluation logic to framework (SRP)
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Oct 5, 2023
1 parent ba190ba commit 02e62c7
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public void BatchJobTest()
Assert.Equal("batch_job", pipeline.Name);
Assert.Equal(11, pipeline.Activities.Count);

var activities = testFramework.Evaluate(pipeline, new List<IRunParameter>
var activities = testFramework.EvaluateWithEnumerator(pipeline, new List<IRunParameter>
{
new RunParameter<string>(ParameterType.Parameter, "BatchPoolId", "batch-pool-id"),
new RunParameter<string>(ParameterType.Parameter, "WorkloadApplicationPackageName", "test-application"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public void WhenExecutePipelineActivityIsCalled_ThenChildPipelineActivitiesAreEx
var pipeline = testFramework.Repository.GetPipelineByName("main");

// Act
var activities = testFramework.Evaluate(pipeline, new List<IRunParameter>()
var activities = testFramework.EvaluateWithEnumerator(pipeline, new List<IRunParameter>()
{
new RunParameter<string>(ParameterType.Parameter, "Url", "https://example.com"),
new RunParameter<string>(ParameterType.Parameter, "Body", "{ \"key\": \"value\" }")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class ForEachActivityTests
public void WhenEvaluateChildActivities_ThenShouldReturnTheActivityWithItemExpressionEvaluated()
{
// Arrange
var testFramework = new TestFramework();
var forEachActivity = new ForEachActivity("ForEachActivity",
new DataFactoryExpression(DataFactoryExpressionType.Expression, "@split('a,b,c', ',')"),
new List<PipelineActivity>()
Expand All @@ -28,7 +29,7 @@ public void WhenEvaluateChildActivities_ThenShouldReturnTheActivityWithItemExpre

// Act
forEachActivity.Evaluate(state);
var childActivities = forEachActivity.EvaluateChildActivities(state, new TestFramework());
var childActivities = testFramework.Evaluate(forEachActivity, state);

// Assert
using var enumarator = childActivities.GetEnumerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public void WhenEvaluated_ShouldEvaluateExpression()
public void WhenEvaluated_ShouldEvaluateCorrectChildActivities(bool expressionOutcome, string expectedActivityName)
{
// Arrange
var testFramework = new TestFramework();
var expression = expressionOutcome ? "@equals(1, 1)" : "@equals(1, 2)";
var activity = new IfConditionActivity("IfConditionActivity",
new DataFactoryExpression(DataFactoryExpressionType.Expression, expression))
Expand All @@ -41,7 +42,7 @@ public void WhenEvaluated_ShouldEvaluateCorrectChildActivities(bool expressionOu
activity.Evaluate(state);

// Act
var childActivities = activity.EvaluateChildActivities(state).ToList();
var childActivities = testFramework.Evaluate(activity, state).ToList();

// Assert
Assert.Single(childActivities);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ public class PipelineTests
public void WhenEvaluatingPipelineWithMissingParameters_ShouldThrowException()
{
// Arrange
var testFramework = new TestFramework();
var pipeline = new Pipeline();
pipeline.Parameters.Add("key1", new EntityParameterSpecification(EntityParameterType.String));
pipeline.Parameters.Add("key2", new EntityParameterSpecification(EntityParameterType.String));

// Assert
Assert.Throws<PipelineParameterNotProvidedException>(() => pipeline.Evaluate(new List<IRunParameter>()).ToList());
Assert.Throws<PipelineParameterNotProvidedException>(() => testFramework.Evaluate(pipeline, new List<IRunParameter>()).ToList());
}

[Fact]
public void WhenEvaluatingPipeline_ShouldReturnActivities()
{
// Arrange
var testFramework = new TestFramework();
var pipeline = new Pipeline();
pipeline.Parameters.Add("key1", new EntityParameterSpecification(EntityParameterType.String));

// Act
var activities = pipeline.Evaluate(new List<IRunParameter>()
var activities = testFramework.EvaluateAll(pipeline, new List<IRunParameter>()
{
new RunParameter<string>(ParameterType.Parameter, "key1", "value1")
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void EvaluateWithoutIterationActivities_ShouldEvaluateAccordingToDependen
// Act
var state = new PipelineRunState();
state.Variables.Add(new PipelineRunVariable<string>("variable1", string.Empty));
var evaluatedActivities = _testFramework.EvaluateActivities(_activities, state).ToList();
var evaluatedActivities = _testFramework.Evaluate(_activities, state).ToList();

// Assert
Assert.NotNull(evaluatedActivities);
Expand All @@ -59,7 +59,7 @@ public void EvaluateWithCircularDependencies_ShouldThrowActivitiesEvaluatorInval
_setVariableActivity.DependsOn.Add(new PipelineActivityDependency("webActivity", new[] { DependencyCondition.Succeeded }));

// Assert
Assert.Throws<ActivitiesEvaluatorInvalidDependencyException>(() => _testFramework.EvaluateActivities(_activities, new PipelineRunState()).ToList());
Assert.Throws<ActivitiesEvaluatorInvalidDependencyException>(() => _testFramework.Evaluate(_activities, new PipelineRunState()).ToList());
}

[Fact]
Expand All @@ -75,7 +75,7 @@ public void EvaluateWithForeachActivities_ShouldEvaluateAccordingToDependencies(
_webActivity.Uri = new DataFactoryElement<string>("@concat('https://www.example.com/', item())", DataFactoryElementKind.Expression);

// Act
var evaluatedActivities = _testFramework.EvaluateActivities(new List<PipelineActivity> { foreachActivity }, state);
var evaluatedActivities = _testFramework.Evaluate(new List<PipelineActivity> { foreachActivity }, state);

// Assert
using var enumerator = evaluatedActivities.GetEnumerator();
Expand Down Expand Up @@ -108,7 +108,7 @@ public void EvaluateWithUntilActivities_ShouldEvaluateAccordingToDependencies()
_activities);

// Act
var evaluatedActivities = _testFramework.EvaluateActivities(new List<PipelineActivity> { untilActivity }, state);
var evaluatedActivities = _testFramework.Evaluate(new List<PipelineActivity> { untilActivity }, state);

// Assert
using var enumerator = evaluatedActivities.GetEnumerator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<PropertyGroup>
<PackageId>AzureDataFactory.TestingFramework</PackageId>
<PackageVersion>0.1.6-alpha</PackageVersion>
<PackageVersion>0.1.7-alpha</PackageVersion>
<Authors>arjendev</Authors>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,10 @@ namespace AzureDataFactory.TestingFramework.Models;

public partial class ControlActivity
{
protected virtual List<PipelineActivity> GetNextActivities()
internal delegate IEnumerable<PipelineActivity> EvaluateActivitiesDelegate(List<PipelineActivity> activities, PipelineRunState state);
internal virtual IEnumerable<PipelineActivity> EvaluateControlActivityIterations(PipelineRunState state, EvaluateActivitiesDelegate evaluateActivities)
{
// Note: unfortunately cannot use abstract method
return new List<PipelineActivity>();
}

internal virtual IEnumerable<PipelineActivity> EvaluateChildActivities(PipelineRunState state, TestFramework testFramework)
{
var scopedState = state.CreateIterationScope(null);
var activities = GetNextActivities();
foreach (var activity in testFramework.EvaluateActivities(activities, scopedState))
{
yield return activity;
}

state.AddScopedActivityResultsFromScopedState(scopedState);
}

internal virtual IEnumerable<PipelineActivity> EvaluateChildActivities(PipelineRunState state)
{
return EvaluateChildActivities(state, new TestFramework());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ namespace AzureDataFactory.TestingFramework.Models;

public partial class ForEachActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
{
return Activities.ToList();
}

private List<string>? _items;
public List<string> IterationItems => _items ?? throw new InvalidOperationException("Items have not been evaluated yet.");
public override DataFactoryEntity Evaluate(PipelineRunState state)
Expand All @@ -23,14 +18,17 @@ public override DataFactoryEntity Evaluate(PipelineRunState state)
return base.Evaluate(state);
}

internal override IEnumerable<PipelineActivity> EvaluateChildActivities(PipelineRunState state, TestFramework testFramework)
internal override IEnumerable<PipelineActivity> EvaluateControlActivityIterations(PipelineRunState state, EvaluateActivitiesDelegate evaluateActivities)
{
var activities = GetNextActivities();

return IterationItems.SelectMany(item =>
// Note: using enumerator to support yield return in foreach
using var enumerator = IterationItems.GetEnumerator();
while (enumerator.MoveNext())
{
var scopedState = state.CreateIterationScope(item);
return testFramework.EvaluateActivities(activities, scopedState);
});
var scopedState = state.CreateIterationScope(enumerator.Current);
foreach (var activity in evaluateActivities(Activities.ToList(), scopedState))
yield return activity;

state.AddScopedActivityResultsFromScopedState(scopedState);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ namespace AzureDataFactory.TestingFramework.Models;

public partial class IfConditionActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
{
return EvaluatedExpression ? IfTrueActivities.ToList() : IfFalseActivities.ToList();
}

private bool? _evaluatedExpression;
public bool EvaluatedExpression => _evaluatedExpression ?? throw new InvalidOperationException("Expression has not been evaluated yet.");
public override DataFactoryEntity Evaluate(PipelineRunState state)
Expand All @@ -24,4 +19,14 @@ public override DataFactoryEntity Evaluate(PipelineRunState state)

return this;
}

internal override IEnumerable<PipelineActivity> EvaluateControlActivityIterations(PipelineRunState state, EvaluateActivitiesDelegate evaluateActivities)
{
var scopedState = state.CreateIterationScope(null);
var activities = EvaluatedExpression ? IfTrueActivities.ToList() : IfFalseActivities.ToList();
foreach (var activity in evaluateActivities(activities, scopedState))
yield return activity;

state.AddScopedActivityResultsFromScopedState(scopedState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@ namespace AzureDataFactory.TestingFramework.Models;

public partial class UntilActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
internal override IEnumerable<PipelineActivity> EvaluateControlActivityIterations(PipelineRunState state, EvaluateActivitiesDelegate evaluateActivities)
{
return Activities.ToList();
do
{
var scopedState = state.CreateIterationScope(null);
foreach (var child in evaluateActivities(Activities.ToList(), scopedState))
yield return child;

state.AddScopedActivityResultsFromScopedState(scopedState);
} while (!Expression.Evaluate<bool>(state));
}
}
42 changes: 5 additions & 37 deletions src/AzureDataFactory.TestingFramework/Models/Pipelines/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ public PipelineActivity GetActivityByName(string activityName)
}

/// <summary>
/// Evaluates the pipeline with the provided parameters. The order of activity execution is simulated based on the dependencies. Any expression part of the activity is evaluated based on the state of the pipeline.
/// Validates whether the list of parameters are complete and not duplicate.
/// </summary>
/// <param name="parameters">The global and regular parameters to be used for evaluating expressions.</param>
/// <param name="testFramework">Reference to the test framework containing the repository and configuration</param>
/// <returns></returns>
/// <exception cref="PipelineParameterNotProvidedException">Thrown if a required pipeline parameter is not required</exception>
/// <exception cref="PipelineDuplicateParameterProvidedException">Thrown if a pipeline parameter is provided more than once</exception>
internal IEnumerable<PipelineActivity> Evaluate(List<IRunParameter> parameters, TestFramework testFramework)
/// <param name="parameters"></param>
/// <exception cref="PipelineParameterNotProvidedException"></exception>
/// <exception cref="PipelineDuplicateParameterProvidedException"></exception>
internal void ValidateParameters(List<IRunParameter> parameters)
{
//Check if all parameters are provided
foreach (var parameter in Parameters.Where(parameter => parameters.All(p => p.Name != parameter.Key)))
Expand All @@ -40,35 +38,5 @@ internal IEnumerable<PipelineActivity> Evaluate(List<IRunParameter> parameters,
var duplicateParameters = parameters.GroupBy(x => new { x.Name, x.Type }).Where(g => g.Count() > 1).Select(y => y.Key).ToList();
if (duplicateParameters.Any())
throw new PipelineDuplicateParameterProvidedException($"Duplicate parameters provided: {string.Join(", ", duplicateParameters.Select(x => $"{x.Name} ({x.Type})"))}");

var state = new PipelineRunState(parameters, Variables);
foreach (var activity in testFramework.EvaluateActivities(Activities.ToList(), state))
yield return activity;
}

/// <summary>
/// Evaluates the pipeline with the provided parameters. The order of activity execution is simulated based on the dependencies. Any expression part of the activity is evaluated based on the state of the pipeline.
/// </summary>
/// <param name="parameters">The global and regular parameters to be used for evaluating expressions.</param>
/// <returns></returns>
/// <exception cref="PipelineParameterNotProvidedException">Thrown if a required pipeline parameter is not required</exception>
/// <exception cref="PipelineDuplicateParameterProvidedException">Thrown if a pipeline parameter is provided more than once</exception>
internal IEnumerable<PipelineActivity> Evaluate(List<IRunParameter> parameters)
{
return Evaluate(parameters, new TestFramework());
}

/// <summary>
/// Evaluates the pipeline with the provider parameters and returns an enumerator to easily iterate over the activities. The order of activity execution is simulated based on the dependencies. Any expression part of the activity is evaluated based on the state of the pipeline.
/// </summary>
/// <param name="parameters">The global and regular parameters to be used for evaluating expressions.</param>
/// <param name="testFramework">Reference to the test framework containing the repository and configuration</param>
/// <returns></returns>
/// <exception cref="PipelineParameterNotProvidedException">Thrown if a required pipeline parameter is not required</exception>
/// <exception cref="PipelineDuplicateParameterProvidedException">Thrown if a pipeline parameter is provided more than once</exception>
internal ActivityEnumerator EvaluateWithActivityEnumerator(List<IRunParameter> parameters, TestFramework testFramework)
{
var activities = Evaluate(parameters, testFramework);
return new ActivityEnumerator(activities);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class DataFactoryRepository
/// <param name="linkedServices">List of deserialized linkedServices</param>
/// <param name="datasets">List of deserialized datasets</param>
/// <param name="triggers">List of deserialized triggers</param>
public DataFactoryRepository(List<Pipeline> pipelines, List<DataFactoryLinkedServiceProperties> linkedServices, List<DataFactoryDatasetProperties> datasets, List<DataFactoryTriggerProperties> triggers)
public DataFactoryRepository(List<Pipeline> pipelines, List<DataFactoryLinkedServiceData> linkedServices, List<DataFactoryDatasetData> datasets, List<DataFactoryTriggerData> triggers)
{
Pipelines = pipelines;
LinkedServices = linkedServices;
Expand All @@ -26,15 +26,15 @@ public DataFactoryRepository(List<Pipeline> pipelines, List<DataFactoryLinkedSer
public DataFactoryRepository()
{
Pipelines = new List<Pipeline>();
LinkedServices = new List<DataFactoryLinkedServiceProperties>();
Datasets = new List<DataFactoryDatasetProperties>();
Triggers = new List<DataFactoryTriggerProperties>();
LinkedServices = new List<DataFactoryLinkedServiceData>();
Datasets = new List<DataFactoryDatasetData>();
Triggers = new List<DataFactoryTriggerData>();
}

public List<Pipeline> Pipelines { get; private set; }
public List<DataFactoryLinkedServiceProperties> LinkedServices { get; private set; }
public List<DataFactoryDatasetProperties> Datasets { get; private set; }
public List<DataFactoryTriggerProperties> Triggers { get; private set; }
public List<DataFactoryLinkedServiceData> LinkedServices { get; private set; }
public List<DataFactoryDatasetData> Datasets { get; private set; }
public List<DataFactoryTriggerData> Triggers { get; private set; }

public Pipeline GetPipelineByName(string name)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public static class DataFactoryRepositoryFactory
public static DataFactoryRepository ParseFromFolder(string folderPath)
{
var pipelines = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "pipeline"), Pipeline.DeserializeDataFactoryPipelineData);
var linkedServices = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "linkedService"), DataFactoryLinkedServiceProperties.DeserializeDataFactoryLinkedServiceProperties);
var datasets = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "dataset"), DataFactoryDatasetProperties.DeserializeDataFactoryDatasetProperties);
var triggers = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "trigger"), DataFactoryTriggerProperties.DeserializeDataFactoryTriggerProperties);
var linkedServices = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "linkedService"), DataFactoryLinkedServiceData.DeserializeDataFactoryLinkedServiceData);
var datasets = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "dataset"), DataFactoryDatasetData.DeserializeDataFactoryDatasetData);
var triggers = GetDataFactoryEntitiesByFolderPath(Path.Combine(folderPath, "trigger"), DataFactoryTriggerData.DeserializeDataFactoryTriggerData);

return new DataFactoryRepository(pipelines, linkedServices, datasets, triggers);
}
Expand Down
Loading

0 comments on commit 02e62c7

Please sign in to comment.