Skip to content

Commit

Permalink
Do
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 23, 2023
1 parent c21b34a commit a7149ec
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 146 deletions.
89 changes: 89 additions & 0 deletions src/R3/Operators/Do.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<T> Do<T>(this Observable<T> source, Action<T>? onNext = null, Action<Exception>? onErrorResume = null, Action<Result>? onCompleted = null, Action? onDispose = null, Action? onSubscribe = null)
{
return new Do<T>(source, onNext, onErrorResume, onCompleted, onDispose, onSubscribe);
}

public static Observable<T> Do<T, TState>(this Observable<T> source, TState state, Action<T, TState>? onNext = null, Action<Exception, TState>? onErrorResume = null, Action<Result, TState>? onCompleted = null, Action<TState>? onDispose = null, Action<TState>? onSubscribe = null)
{
return new Do<T, TState>(source, state, onNext, onErrorResume, onCompleted, onDispose, onSubscribe);
}

public static Observable<T> CancelOnCompleted<T>(this Observable<T> source, CancellationTokenSource cancellationTokenSource)
{
return Do(source, cancellationTokenSource, onCompleted: (_, state) => state.Cancel());
}
}

internal sealed class Do<T>(Observable<T> source, Action<T>? onNext, Action<Exception>? onErrorResume, Action<Result>? onCompleted, Action? onDispose, Action? onSubscribe) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
onSubscribe?.Invoke();
return source.Subscribe(new _Do(observer, onNext, onErrorResume, onCompleted, onDispose));
}

internal sealed class _Do(Observer<T> observer, Action<T>? onNext, Action<Exception>? onErrorResume, Action<Result>? onCompleted, Action? onDispose) : Observer<T>
{
protected override void OnNextCore(T value)
{
onNext?.Invoke(value);
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
onErrorResume?.Invoke(error);
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
onCompleted?.Invoke(result);
observer.OnCompleted();
}

protected override void DisposeCore()
{
onDispose?.Invoke();
}
}
}

internal sealed class Do<T, TState>(Observable<T> source, TState state, Action<T, TState>? onNext, Action<Exception, TState>? onErrorResume, Action<Result, TState>? onCompleted, Action<TState>? onDispose, Action<TState>? onSubscribe) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
onSubscribe?.Invoke(state);
return source.Subscribe(new _Do(observer, state, onNext, onErrorResume, onCompleted, onDispose));
}

internal sealed class _Do(Observer<T> observer, TState state, Action<T, TState>? onNext, Action<Exception, TState>? onErrorResume, Action<Result, TState>? onCompleted, Action<TState>? onDispose) : Observer<T>
{
protected override void OnNextCore(T value)
{
onNext?.Invoke(value, state);
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
onErrorResume?.Invoke(error, state);
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
onCompleted?.Invoke(result, state);
observer.OnCompleted();
}

protected override void DisposeCore()
{
onDispose?.Invoke(state);
}
}
}
51 changes: 0 additions & 51 deletions src/R3/Operators/DoOnCompleted.cs

This file was deleted.

89 changes: 0 additions & 89 deletions src/R3/Operators/DoOnDisposed.cs

This file was deleted.

9 changes: 6 additions & 3 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ public static partial class ObservableExtensions
// AsUnitObservable

// Time based
// Frame based
// Debounce, Throttle, ThrottleFirst, Sample, Delay, DelaySubscription
// + frame variation

// TImeInterval <-> FrameInterval

// OnErrorStop

// Rx Merging:
Expand All @@ -21,6 +25,5 @@ public static partial class ObservableExtensions
// Skip, SkipLast, SkipUntil, SkipWhile

// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup,
}

2 changes: 1 addition & 1 deletion tests/R3.Tests/OperatorTests/AggregateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task BeforeCanceled()
var isDisposed = false;

var listTask = publisher
.DoOnDisposed(() => isDisposed = true)
.Do(onDispose: () => isDisposed = true)
.AggregateAsync(new List<int>(), (x, i) => { x.Add(i); return x; }, (x) => x, cts.Token);


Expand Down
41 changes: 41 additions & 0 deletions tests/R3.Tests/OperatorTests/DoTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace R3.Tests.OperatorTests;

public class DoTest
{
[Fact]
public void Test()
{
var subject = new Subject<int>();

List<int> onNext = new();
List<Exception> onErrorResume = new();
List<Result> onCompleted = new();
bool disposeCalled = false;
bool subscribeCalled = false;
var source = subject.Do(onNext.Add, onErrorResume.Add, onCompleted.Add, () => disposeCalled = true, () => subscribeCalled = true);

subscribeCalled.Should().BeFalse();

var list = source.ToLiveList();

subscribeCalled.Should().BeTrue();

subject.OnNext(10);
subject.OnNext(20);
subject.OnNext(30);
subject.OnErrorResume(new Exception("a"));
subject.OnErrorResume(new Exception("b"));
subject.OnErrorResume(new Exception("c"));

onNext.Should().Equal([10, 20, 30]);
onErrorResume.Select(x => x.Message).Should().Equal(["a", "b", "c"]);


disposeCalled.Should().BeFalse();

subject.OnCompleted();
onCompleted.Should().ContainSingle();

disposeCalled.Should().BeTrue();
}
}
2 changes: 1 addition & 1 deletion tests/R3.Tests/OperatorTests/TakeUntilTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public void EventOther()
var publisher1 = new Subject<int>();
var publisher2 = new Subject<int>();
var isDisposed = false;
var list = publisher1.TakeUntil(publisher2.DoOnDisposed(() => { isDisposed = true; })).ToLiveList();
var list = publisher1.TakeUntil(publisher2.Do(onDispose: () => { isDisposed = true; })).ToLiveList();

publisher1.OnNext(1);
publisher1.OnNext(2);
Expand Down
2 changes: 1 addition & 1 deletion tests/R3.Tests/OperatorTests/ToListTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task ResultCompletableCancel()

var publisher = new Subject<int>();

var listTask = publisher.DoOnDisposed(() => isDisposed = true).ToListAsync(cts.Token);
var listTask = publisher.Do(onDispose: () => isDisposed = true).ToListAsync(cts.Token);

publisher.OnNext(1);
publisher.OnNext(2);
Expand Down

0 comments on commit a7149ec

Please sign in to comment.