Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 27, 2023
1 parent 25e57b8 commit 8bbf488
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 11 deletions.
4 changes: 3 additions & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
var range = System.Reactive.Linq.Observable.Range(1, 10);




// range.Catch(
// range.Append(


// Enumerable.Range(1,10).Min(
// range.SelectMany(

// range.TakeLast(
Expand Down
4 changes: 1 addition & 3 deletions src/R3/Factories/Amb.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@


using static System.Runtime.InteropServices.JavaScript.JSType;
namespace R3;

public static partial class Observable
{
Expand Down
121 changes: 121 additions & 0 deletions src/R3/Factories/Merge.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
namespace R3;

public static partial class Observable
{
public static Observable<T> Merge<T>(params Observable<T>[] sources)
{
return new Merge<T>(sources);
}

public static Observable<T> Merge<T>(IEnumerable<Observable<T>> sources)
{
return new Merge<T>(sources);
}
}

public static partial class ObservableExtensions
{
public static Observable<T> Merge<T>(this Observable<T> source, Observable<T> second)
{
return new Merge<T>(new[] { source, second });
}
}

internal sealed class Merge<T>(IEnumerable<Observable<T>> sources) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
var merge = new _Merge(observer);
var builder = Disposable.CreateBuilder();

var count = 0;
foreach (var item in sources)
{
item.Subscribe(new _MergeObserver(merge)).AddTo(ref builder);
count++;
}

merge.disposable.Disposable = builder.Build();

merge.SetSourceCount(count);

return merge;
}

sealed class _Merge(Observer<T> observer) : IDisposable
{
public Observer<T> observer = observer;
public SingleAssignmentDisposableCore disposable;
public readonly object gate = new object();

int sourceCount = -1; // not set yet.
int completeCount;

public void SetSourceCount(int count)
{
lock (gate)
{
sourceCount = count;
if (sourceCount == completeCount)
{
observer.OnCompleted();
Dispose();
}
}
}

// when all sources are completed, then this observer is completed
public void TryPublishCompleted()
{
lock (gate)
{
completeCount++;
if (completeCount == sourceCount)
{
observer.OnCompleted();
Dispose();
}
}
}

public void Dispose()
{
disposable.Dispose();
}
}

sealed class _MergeObserver(_Merge parent) : Observer<T>
{
protected override void OnNextCore(T value)
{
lock (parent.gate)
{
parent.observer.OnNext(value);
}
}

protected override void OnErrorResumeCore(Exception error)
{
lock (parent.gate)
{
parent.observer.OnErrorResume(error);
}
}

protected override void OnCompletedCore(Result result)
{
if (result.IsFailure)
{
// when error, publish OnCompleted immediately
lock (parent.gate)
{
parent.observer.OnCompleted(result);
}
}
else
{
parent.TryPublishCompleted();
}
}
}
}
2 changes: 1 addition & 1 deletion src/R3/Operators/AggregateOperators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace R3;

// TODO: ToDictionary
// TODO: ToLookup
// TODO: Selector APIs

public static partial class ObservableExtensions
{
Expand Down Expand Up @@ -40,7 +41,6 @@ public static Task<HashSet<T>> ToHashSetAsync<T>(this Observable<T> source, IEqu
}, (value) => value, cancellationToken); // ignore complete
}


// CountAsync using AggregateAsync
public static Task<int> CountAsync<T>(this Observable<T> source, CancellationToken cancellationToken = default)
{
Expand Down
8 changes: 2 additions & 6 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@ public static partial class ObservableExtensions

// TImeInterval <-> FrameInterval

// Buffer + BUfferFrame

// OnErrorStop

// Observe
// Buffer + BUfferFrame => Chunk, ChunkFrame

// Rx Merging:
//CombineLatest, Merge, Zip, WithLatestFrom, ZipLatest, Switch

// Standard Query:
// Distinct, DistinctUntilChanged, Scan
// Distinct, DistinctBy, DistinctUntilChanged, Scan

// SkipTake:
// Skip, SkipLast, SkipUntil, SkipWhile
Expand Down
59 changes: 59 additions & 0 deletions tests/R3.Tests/OperatorTests/MergeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace R3.Tests.OperatorTests;

public class MergeTest
{
[Fact]
public void Std()
{
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();

using var list = Observable.Merge(subject1, subject2, subject3).ToLiveList();

subject1.OnNext(10);
subject2.OnNext(20);
subject2.OnNext(21);
subject3.OnNext(30);
subject1.OnNext(11);
subject3.OnNext(31);

subject2.OnCompleted();

subject1.OnNext(12);
subject1.OnCompleted();

subject3.OnNext(32);

list.AssertIsNotCompleted();

subject3.OnCompleted();

list.AssertEqual([10, 20, 21, 30, 11, 31, 12, 32]);

list.AssertIsCompleted();
}

[Fact]
public void CompleteFirst()
{
var subject1 = new Subject<int>();
var subject2 = new Subject<int>();
var subject3 = new Subject<int>();

subject1.OnCompleted();
subject2.OnCompleted();
subject3.OnCompleted();
using var list = Observable.Merge(subject1, subject2, subject3).ToLiveList();

list.AssertIsCompleted();
}

[Fact]
public void Empty()
{
using var list = Observable.Empty<int>().ToLiveList();

list.AssertIsCompleted();
}
}

0 comments on commit 8bbf488

Please sign in to comment.