Skip to content

Commit

Permalink
Factory complete
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 23, 2023
1 parent ede36e5 commit c21b34a
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 20 deletions.
4 changes: 3 additions & 1 deletion sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

var range = System.Reactive.Linq.Observable.Range(1, 10);

System.Reactive.Linq.Observable.Defer(


// range.TakeLast(

Expand All @@ -42,6 +42,8 @@

//var xs = await publisher.Take(TimeSpan.FromSeconds(5));



foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3))
{
Console.WriteLine(item);
Expand Down
31 changes: 31 additions & 0 deletions src/R3/Factories/Create.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace R3;

public static partial class Observable
{
public static Observable<T> Create<T>(Func<Observer<T>, IDisposable> subscribe)
{
return new AnonymousObservable<T>(subscribe);
}

public static Observable<T> Create<T, TState>(TState state, Func<Observer<T>, TState, IDisposable> subscribe)
{
return new AnonymousObservable<T, TState>(state, subscribe);
}
}

internal sealed class AnonymousObservable<T>(Func<Observer<T>, IDisposable> subscribe) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return subscribe(observer);
}
}

internal sealed class AnonymousObservable<T, TState>(TState state, Func<Observer<T>, TState, IDisposable> subscribe) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return subscribe(observer, state);
}
}

16 changes: 0 additions & 16 deletions src/R3/Factories/_EventFactory.cs

This file was deleted.

29 changes: 29 additions & 0 deletions src/R3/ObserverExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ public static void OnCompleted<T>(this Observer<T> observer, Exception exception
observer.OnCompleted(Result.Failure(exception));
}

public static Observer<T> Wrap<T>(this Observer<T> observer)
{
return new WrappedObserver<T>(observer);
}


public static Observer<T> ToObserver<T>(this IObserver<T> observer)
{
return new IObserverToObserver<T>(observer);
Expand Down Expand Up @@ -42,3 +48,26 @@ protected override void OnCompletedCore(Result result)
}
}
}

internal sealed class WrappedObserver<T>(Observer<T> observer) : Observer<T>
{
protected override void OnNextCore(T value)
{
observer.OnNext(value);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void DisposeCore()
{
observer.Dispose();
}
}
25 changes: 22 additions & 3 deletions src/R3/Operators/AsObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,33 @@

public static partial class ObservableExtensions
{
// TODO: more overload?
public static IObservable<T> AsObservable<T>(this Observable<T> source)
// TODO: test

public static Observable<T> AsObservable<T>(this Observable<T> source)
{
if (source is AsObservable<T>) // already hide
{
return source;
}

return new AsObservable<T>(source);
}

public static IObservable<T> AsIObservable<T>(this Observable<T> source)
{
return new AsIObservable<T>(source);
}
}

internal sealed class AsObservable<T>(Observable<T> observable) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return observable.Subscribe(observer.Wrap());
}
}

internal sealed class AsObservable<T>(Observable<T> source) : IObservable<T>
internal sealed class AsIObservable<T>(Observable<T> source) : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer)
{
Expand Down
2 changes: 2 additions & 0 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public static partial class ObservableExtensions
// TODO: this is working space, will remove this file after complete.


// AsUnitObservable

// Time based
// Frame based
// OnErrorStop
Expand Down
60 changes: 60 additions & 0 deletions tests/R3.Tests/FactoryTests/CreateTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

namespace R3.Tests.FactoryTests;

public class CreateTest
{
[Fact]
public void Create()
{
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(10);
observer.OnNext(100);
observer.OnCompleted();
return Disposable.Empty;
});

source.ToLiveList().AssertEqual([1, 10, 100]);
}

[Fact]
public void CreateS()
{
using var publisher = new Subject<int>();
var source = Observable.Create<int, Subject<int>>(publisher, (observer, state) =>
{
return state.Subscribe(new Wrap<int>(observer));
});

using var list = source.ToLiveList();

publisher.OnNext(1);
list.AssertEqual([1]);
publisher.OnNext(10);
list.AssertEqual([1, 10]);
publisher.OnNext(100);
list.AssertEqual([1, 10, 100]);

publisher.OnCompleted();
list.AssertIsCompleted();
}
}

file class Wrap<T>(Observer<T> observer) : Observer<T>
{
protected override void OnCompletedCore(Result result)
{
observer.OnCompleted(result);
}

protected override void OnErrorResumeCore(Exception error)
{
observer.OnErrorResume(error);
}

protected override void OnNextCore(T value)
{
observer.OnNext(value);
}
}
2 changes: 2 additions & 0 deletions tests/R3.Tests/FactoryTests/DeferTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ public void Test()
list.AssertEqual([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}
}


0 comments on commit c21b34a

Please sign in to comment.