Skip to content

Commit

Permalink
feat: scoped pipeline activities and added support for more functions
Browse files Browse the repository at this point in the history
  • Loading branch information
arjendev committed Sep 20, 2023
1 parent 02bf906 commit 2d53cfd
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 33 deletions.
17 changes: 13 additions & 4 deletions src/AzureDataFactory.TestingFramework.sln
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureDataFactory.TestingFramework", "AzureDataFactory.TestingFramework\AzureDataFactory.TestingFramework.csproj", "{F5C4C98C-B717-4ED8-95EA-5DD51CB87C92}"
# Visual Studio Version 17
VisualStudioVersion = 17.7.34031.279
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureDataFactory.TestingFramework", "AzureDataFactory.TestingFramework\AzureDataFactory.TestingFramework.csproj", "{F5C4C98C-B717-4ED8-95EA-5DD51CB87C92}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureDataFactory.TestingFramework.Tests", "AzureDataFactory.TestingFramework.Tests\AzureDataFactory.TestingFramework.Tests.csproj", "{76089DF4-314B-4556-9ED7-D2B2F3BBA87A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureDataFactory.TestingFramework.Tests", "AzureDataFactory.TestingFramework.Tests\AzureDataFactory.TestingFramework.Tests.csproj", "{76089DF4-314B-4556-9ED7-D2B2F3BBA87A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AzureDataFactory.TestingFramework.Example", "AzureDataFactory.TestingFramework.Example\AzureDataFactory.TestingFramework.Example.csproj", "{F044EB13-DF36-43FB-90E8-9ADDE0E03821}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureDataFactory.TestingFramework.Example", "AzureDataFactory.TestingFramework.Example\AzureDataFactory.TestingFramework.Example.csproj", "{F044EB13-DF36-43FB-90E8-9ADDE0E03821}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{2EC1EFEA-98DC-47FC-9EAD-69541C65ACA1}"
ProjectSection(SolutionItems) = preProject
..\README.md = ..\README.md
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand All @@ -25,6 +33,7 @@ Global
{F044EB13-DF36-43FB-90E8-9ADDE0E03821}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F044EB13-DF36-43FB-90E8-9ADDE0E03821}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

<PropertyGroup>
<PackageId>AzureDataFactory.TestingFramework</PackageId>
<PackageVersion>0.1.3-alpha</PackageVersion>
<PackageVersion>0.1.4-alpha</PackageVersion>
<Authors>arjendev</Authors>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public ActivityExpression(string expression) : base(expression)
public TType Evaluate<TType>(PipelineRunState state)
{
var (activityName, fields) = GetActivityNameAndFields();
var activity = state.PipelineActivityResults.SingleOrDefault(x => string.Equals(x.Name, activityName, StringComparison.CurrentCultureIgnoreCase)) ??
var activity = state.PipelineActivityResults.LastOrDefault(x => string.Equals(x.Name, activityName, StringComparison.CurrentCultureIgnoreCase)) ??
throw new ActivityNotFoundException(activityName);
if (activity.Status == null)
throw new ActivityNotEvaluatedException(activity.Name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public TType Evaluate<TType>(RunState state)
{
{ } type when type == typeof(bool) && bool.TryParse(evalExpression, out var boolValue) => (TType)(object) boolValue,
{ } type when type == typeof(int) && int.TryParse(evalExpression, out var intValue) => (TType)(object) intValue,
{ } type when type == typeof(long) && long.TryParse(evalExpression, out var longValue) => (TType)(object) longValue,
{ } type when type == typeof(string) => (TType)(object) evalExpression.TrimOneChar('\''),
{ } type => throw new ArgumentException($"The result {evalExpression} with DataType: {type} could not be parsed accordingly.")
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public TType Evaluate<TType>(RunState state)
{ } when parameterType == typeof(float) => argument.Evaluate<float>(state),
{ } when parameterType == typeof(double) => argument.Evaluate<double>(state),
{ } when parameterType == typeof(object) => argument.Evaluate<object>(state),
{ IsArray: true } => argument.Evaluate<object[]>(state),
_ => throw new Exception($"Unsupported parameter type: {parameterType}")
};
}).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,16 @@ public static class FunctionsRepository
{ "replace", (string input, string pattern, string replacement) => input.Replace(pattern, replacement) },
{ "string", (string input) => input },
{ "union" , (string arg0, string arg1) => JsonSerializer.Serialize(JsonSerializer.Deserialize<JsonArray>(arg0).Union(JsonSerializer.Deserialize<JsonArray>((arg1)))) },
{ "coalesce", (IEnumerable<string> args) => args.FirstOrDefault(arg => !string.IsNullOrEmpty(arg)) }

{ "coalesce", (IEnumerable<string> args) => args.FirstOrDefault(arg => !string.IsNullOrEmpty(arg)) },
{ "or", (bool a, bool b) => a || b },
{ "utcnow", () => DateTime.UtcNow.ToString("o") },
{ "utcNow", () => DateTime.UtcNow.ToString("o") },
{ "ticks", (string dateTime) => DateTime.Parse(dateTime).Ticks },
{ "sub", (long a, long b) => a - b },
{ "div", (long a, long b) => a / b },
{ "greaterOrEquals", (long a, long b) => a >= b },
{ "not", (bool value) => !value },
{ "empty", (object[] array) => array.Length == 0 },
};

public static void Register(string functionName, Delegate function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,31 @@ public static class ActivitiesEvaluator
{
public static IEnumerable<PipelineActivity> Evaluate(List<PipelineActivity> activities, PipelineRunState state)
{
while (state.PipelineActivityResults.Count != activities.Count)
while (state.ScopedPipelineActivityResults.Count != activities.Count)
{
var anyActivityEvaluated = false;
foreach (var activity in activities
.Where(activity => !state.PipelineActivityResults.Contains(activity))
.Where(activity => !state.ScopedPipelineActivityResults.Contains(activity))
.Where(activity => activity.AreDependencyConditionMet(state)))
{
yield return (PipelineActivity) activity.Evaluate(state);
var evaluatedActivity = (PipelineActivity) activity.Evaluate(state);
if (evaluatedActivity is not IIterationActivity)
yield return evaluatedActivity;

anyActivityEvaluated = true;
state.AddActivityResult(activity);

if (activity is ControlActivity controlActivity)
if (activity is IIterationActivity)
{
if (controlActivity is UntilActivity untilActivity)
if (activity is UntilActivity untilActivity)
{
while (untilActivity.Expression.Evaluate<bool>(state))
do
{
foreach (var child in untilActivity.EvaluateChildActivities(state))
yield return child;
} while (!untilActivity.Expression.Evaluate<bool>(state));
}
else
else if (activity is ControlActivity controlActivity)
{
foreach (var child in controlActivity.EvaluateChildActivities(state))
yield return child;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public bool AreDependencyConditionMet(PipelineRunState state)
{
foreach (var dependency in DependsOn)
{
var dependencyActivity = state.PipelineActivityResults.SingleOrDefault(a => a.Name == dependency.Activity);
var dependencyActivity = state.ScopedPipelineActivityResults.SingleOrDefault(a => a.Name == dependency.Activity);

// If dependency is not yet evaluated, conditions are not met
if (dependencyActivity == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ public virtual IEnumerable<PipelineActivity> EvaluateChildActivities(PipelineRun
{
yield return activity;
}

state.AddScopedActivityResultsFromScopedState(scopedState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace AzureDataFactory.TestingFramework.Models;

public partial class ForEachActivity
public partial class ForEachActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace AzureDataFactory.TestingFramework.Models;

public interface IIterationActivity
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@

namespace AzureDataFactory.TestingFramework.Models;

public partial class IfConditionActivity
public partial class IfConditionActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
{
return _evaluatedExpression.Value ? IfTrueActivities.ToList() : IfFalseActivities.ToList();
return EvaluatedExpression ? IfTrueActivities.ToList() : IfFalseActivities.ToList();
}

public bool? _evaluatedExpression;
private bool? _evaluatedExpression;
public bool EvaluatedExpression => _evaluatedExpression ?? throw new InvalidOperationException("Expression has not been evaluated yet.");
public override DataFactoryEntity Evaluate(PipelineRunState state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,10 @@

namespace AzureDataFactory.TestingFramework.Models;

public partial class UntilActivity
public partial class UntilActivity : IIterationActivity
{
protected override List<PipelineActivity> GetNextActivities()
{
return Activities.ToList();
}

private bool? _evaluatedExpression;
public override PipelineActivity Evaluate(PipelineRunState state)
{
base.Evaluate(state);

_evaluatedExpression = Expression.Evaluate<bool>(state);

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class PipelineRunState : RunState
public List<IPipelineRunVariable> Variables { get; }
public string? IterationItem { get; set; }
public List<IPipelineActivityResult> PipelineActivityResults { get; }
public List<IPipelineActivityResult> ScopedPipelineActivityResults { get; }

public PipelineRunState(List<IRunParameter> parameters, IDictionary<string, PipelineVariableSpecification> variables) : base(parameters)
{
Expand All @@ -27,30 +28,40 @@ public PipelineRunState(List<IRunParameter> parameters, IDictionary<string, Pipe
throw new Exception($"Unknown variable type: {variable.Value.VariableType}");
}).ToList();;
PipelineActivityResults = new List<IPipelineActivityResult>();
ScopedPipelineActivityResults = new List<IPipelineActivityResult>();
IterationItem = null;
}

public PipelineRunState(List<IRunParameter> parameters, List<IPipelineRunVariable> variables, string? iterationItem) : base(parameters)
public PipelineRunState(List<IRunParameter> parameters, List<IPipelineRunVariable> variables, List<IPipelineActivityResult> activityResults, string? iterationItem) : base(parameters)
{
Variables = variables;
PipelineActivityResults = new List<IPipelineActivityResult>();
PipelineActivityResults.AddRange(activityResults);
ScopedPipelineActivityResults = new List<IPipelineActivityResult>();
IterationItem = iterationItem;
}

public PipelineRunState(): base(new List<IRunParameter>())
{
Variables = new List<IPipelineRunVariable>();
PipelineActivityResults = new List<IPipelineActivityResult>();
ScopedPipelineActivityResults = new List<IPipelineActivityResult>();
IterationItem = null;
}

public void AddActivityResult(IPipelineActivityResult pipelineActivityResult)
{
ScopedPipelineActivityResults.Add(pipelineActivityResult);
PipelineActivityResults.Add(pipelineActivityResult);
}

public void AddScopedActivityResultsFromScopedState(PipelineRunState scopedState)
{
PipelineActivityResults.AddRange(scopedState.ScopedPipelineActivityResults);
}

public PipelineRunState CreateIterationScope(string? iterationItem)
{
return new PipelineRunState(Parameters, Variables, iterationItem);
return new PipelineRunState(Parameters, Variables, PipelineActivityResults, iterationItem);
}
}

0 comments on commit 2d53cfd

Please sign in to comment.