Skip to content

Commit

Permalink
AsUnitObservable, Cast, OfType
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 27, 2023
1 parent c14cbfd commit ee71370
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 1 deletion.
2 changes: 1 addition & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
var range = System.Reactive.Linq.Observable.Range(1, 10);



Enumerable.Range(1, 10).Cast<int>();

// range.Catch(
// range.Append(
Expand Down
41 changes: 41 additions & 0 deletions src/R3/Operators/AsUnitObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<Unit> AsUnitObservable<T>(this Observable<T> source)
{
if (source is Observable<Unit> unit)
{
return unit;
}

return new AsUnitObservable<T>(source);
}
}

internal sealed class AsUnitObservable<T>(Observable<T> source) : Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
return source.Subscribe(new _AsUnitObservable(observer));
}

sealed class _AsUnitObservable(Observer<Unit> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(default);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}

37 changes: 37 additions & 0 deletions src/R3/Operators/Cast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<TResult> Cast<T, TResult>(this Observable<T> source)
{
return new Cast<T, TResult>(source);
}
}

internal sealed class Cast<T, TResult>(Observable<T> source) : Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
return source.Subscribe(new _Cast(observer));
}

sealed class _Cast(Observer<TResult> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
var v = (TResult?)(object?)value;
observer.OnNext(v!);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}

39 changes: 39 additions & 0 deletions src/R3/Operators/OfType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace R3;

public static partial class ObservableExtensions
{
public static Observable<TResult> OfType<T, TResult>(this Observable<T> source)
{
return new OfType<T, TResult>(source);
}
}

internal sealed class OfType<T, TResult>(Observable<T> source) : Observable<TResult>
{
protected override IDisposable SubscribeCore(Observer<TResult> observer)
{
return source.Subscribe(new _OfType(observer));
}

sealed class _OfType(Observer<TResult> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
if (value is TResult v)
{
observer.OnNext(v);
}
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}
}
}

20 changes: 20 additions & 0 deletions tests/R3.Tests/OperatorTests/AsUnitObservableTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
namespace R3.Tests.OperatorTests;

public class AsUnitObservableTest
{
[Fact]
public void Test()
{
var subject = new Subject<int>();
using var list = subject.AsUnitObservable().ToLiveList();

subject.OnNext(10);
subject.OnNext(20);

list.AssertEqual([Unit.Default, Unit.Default]);

subject.OnCompleted();

list.AssertIsCompleted();
}
}
21 changes: 21 additions & 0 deletions tests/R3.Tests/OperatorTests/CastTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace R3.Tests.OperatorTests;

public class CastTest
{
[Fact]
public void Cast()
{
var subject = new Subject<object>();
using var list = subject.Cast<object, int>().ToLiveList();

subject.OnNext(10);
subject.OnNext(20);
subject.OnNext(30);

list.AssertEqual([10, 20, 30]);

subject.OnCompleted();

list.AssertIsCompleted();
}
}
23 changes: 23 additions & 0 deletions tests/R3.Tests/OperatorTests/OfTypeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace R3.Tests.OperatorTests;

public class OfTypeTest
{
[Fact]
public void Test()
{
var subject = new Subject<object>();
using var list = subject.OfType<object, int>().ToLiveList();

subject.OnNext(10);
subject.OnNext("hello");
subject.OnNext(20);
subject.OnNext(30);
subject.OnNext("world");
subject.OnNext(40);

list.AssertEqual([10, 20, 30, 40]);

subject.OnCompleted();
list.AssertIsCompleted();
}
}

0 comments on commit ee71370

Please sign in to comment.