diff --git a/Streamistry.Core/Fluent/BasePipeBuilder.cs b/Streamistry.Core/Fluent/BasePipeBuilder.cs index 995d536..b40f093 100644 --- a/Streamistry.Core/Fluent/BasePipeBuilder.cs +++ b/Streamistry.Core/Fluent/BasePipeBuilder.cs @@ -53,4 +53,7 @@ public ParserBuilder Parse(ParserDelegate => new(this, parser); public ParserBuilder Parse() => new(this); + + public BinderBuilder Bind(Segment segment) + => new(this, segment); } diff --git a/Streamistry.Core/Fluent/BinderBuilder.cs b/Streamistry.Core/Fluent/BinderBuilder.cs new file mode 100644 index 0000000..ff8ce50 --- /dev/null +++ b/Streamistry.Core/Fluent/BinderBuilder.cs @@ -0,0 +1,23 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Fluent; +public class BinderBuilder : PipeElementBuilder +{ + protected Segment Segment { get; } + + public BinderBuilder(IPipeBuilder upstream, Segment segment) + : base(upstream) + => (Segment) = (segment); + + public override IChainablePort OnBuildPipeElement() + { + var upstream = Upstream.BuildPipeElement(); + var (input, output) = Segment.Craft(upstream.Pipe.Pipeline!); + input.Bind(upstream); + return output; + } +} diff --git a/Streamistry.Core/Fluent/Segment.cs b/Streamistry.Core/Fluent/Segment.cs new file mode 100644 index 0000000..75cef9b --- /dev/null +++ b/Streamistry.Core/Fluent/Segment.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability; + +namespace Streamistry.Fluent; + +public class Segment : IPipeBuilder +{ + private VirtualInput Input { get; set; } = new(); + internal Func, BasePipeBuilder> Builder { get; } + + public Segment(Func, BasePipeBuilder> builder) + => Builder = builder; + + public IChainablePort BuildPipeElement() + => Input ??= (VirtualInput)OnBuildPipeElement(); + + public IChainablePort OnBuildPipeElement() + => new VirtualInput(); + + public (IBindablePipe input, IChainablePort) Craft(Pipeline pipeline) + { + Input.Pipeline = pipeline; + var builder = Builder.Invoke(Input); + builder.Build(); + return (Input.GetTarget(), builder.BuildPipeElement()); + } + + private class VirtualInput : BasePipeBuilder, IChainablePort, IChainablePipe + { + private IBindablePipe? Target { get; set; } + + public IChainablePipe Pipe + => this; + + public Pipeline? Pipeline { get; set; } + + public IBindablePipe GetTarget() + => Target ?? throw new InvalidOperationException(); + + public void RegisterDownstream(Action downstream) + => Target ??= (IBindablePipe)downstream.Target!; + + public void UnregisterDownstream(Action downstream) + => Target ??= (IBindablePipe)downstream.Target!; + + + public void RegisterOnCompleted(Action? complete) + { + if (complete is not null) + Target ??= (IBindablePipe)complete.Target!; + } + + public override IChainablePort OnBuildPipeElement() + => this; + + public void RegisterObservability(ObservabilityProvider? provider) => throw new NotImplementedException(); + public ObservabilityProvider? GetObservabilityProvider() => null; + } +} + + diff --git a/Streamistry.SourceGenerator/BasePipeBuilder.scriban b/Streamistry.SourceGenerator/BasePipeBuilder.scriban index fc51bb7..f48f8fa 100644 --- a/Streamistry.SourceGenerator/BasePipeBuilder.scriban +++ b/Streamistry.SourceGenerator/BasePipeBuilder.scriban @@ -6,4 +6,15 @@ indexes | array.each @concat | array.join ", " }}) => new(this, upstream{{ indexes | array.join ", upstream" }}); + + public BranchesBuilder Branch<{{ generics | array.join ", " }}>( + {{- + func concat; ret string.append "Segment upstream" | string.append $0; end + indexes | array.each @concat | array.join ", " + }}) + => new(this, + {{- + func concat; ret string.append "upstream" $0 | string.append ".Builder"; end + indexes | array.each @concat | array.join ", " + }}); } diff --git a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs index 347d6de..dad7228 100644 --- a/Streamistry.Testing/Fluent/PipelineBuilderTests.cs +++ b/Streamistry.Testing/Fluent/PipelineBuilderTests.cs @@ -518,7 +518,7 @@ public void Build_InBranchOfBranchCheckpointForAllPortsTypedAllAsserted_Success( day => day.Map(x => x.AddDays(1)).Pluck(x => x.Day).Branch( day1 => day1.Map(x => x + 1) , day2 => day2.Map(x => x + 2) - ).Zip((x,y)=> x + y) + ).Zip((x, y) => x + y) , month => month.Map(x => x.ToString("MMMM", CultureInfo.InvariantCulture)) ).Checkpoints(out var portDay, out var portMonth) .Build(); @@ -556,4 +556,55 @@ public void Build_CombineFiveUpstreamsCheckpoint_Success() Assert.That(output, Does.Contain(25)); Assert.That(output, Does.Contain(30)); } + + [Test] + public void Build_WithSingleSegment_Success() + { + var segment = new Segment(x => x.Map(y => y * 2).Filter(y => y % 5 == 3)); + + var pipeline = new PipelineBuilder() + .Source([1, 2, 3]) + .Map(x => x * 3) + .Bind(segment) + .Map(x => x % 4).Checkpoint(out var sink) + .Build(); + + var output = sink.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain(2)); + } + + [Test] + public void Build_WithSingleSegmentChangeType_Success() + { + var segment = new Segment(x => x.Map(y => y + 3).Map(y => new string('*', y))); + + var pipeline = new PipelineBuilder() + .Source([1, 2, 3]) + .Map(x => x - 2) + .Bind(segment) + .Map(x => x!.PadLeft(4, '-')).Checkpoint(out var sink) + .Build(); + + var output = sink.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain("--**")); + Assert.That(output, Does.Contain("-***")); + Assert.That(output, Does.Contain("****")); + } + + [Test] + public void Build_WithManySegments_Success() + { + var odd = new Segment(x => x.Filter(y => y % 2 == 1).Map(y => y * 2)); + var even = new Segment(x => x.Filter(y => y % 2 == 0).Map(y => y * 5)); + + var pipeline = new PipelineBuilder() + .Source([1, 2, 3, 6]) + .Branch(odd, even) + .Zip((x, y) => x + y).Checkpoint(out var zip) + .Build(); + + var output = zip.GetOutputs(pipeline.Start); + Assert.That(output, Does.Contain(12)); + Assert.That(output, Does.Contain(36)); + } }