diff --git a/sandbox/ConsoleApp1/Program.cs b/sandbox/ConsoleApp1/Program.cs index 1ce3d30e..40ea821c 100644 --- a/sandbox/ConsoleApp1/Program.cs +++ b/sandbox/ConsoleApp1/Program.cs @@ -30,7 +30,7 @@ var range = System.Reactive.Linq.Observable.Range(1, 10); -System.Reactive.Linq.Observable.Defer( + // range.TakeLast( @@ -42,6 +42,8 @@ //var xs = await publisher.Take(TimeSpan.FromSeconds(5)); + + foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3)) { Console.WriteLine(item); diff --git a/src/R3/Factories/Create.cs b/src/R3/Factories/Create.cs new file mode 100644 index 00000000..7e023d97 --- /dev/null +++ b/src/R3/Factories/Create.cs @@ -0,0 +1,31 @@ +namespace R3; + +public static partial class Observable +{ + public static Observable Create(Func, IDisposable> subscribe) + { + return new AnonymousObservable(subscribe); + } + + public static Observable Create(TState state, Func, TState, IDisposable> subscribe) + { + return new AnonymousObservable(state, subscribe); + } +} + +internal sealed class AnonymousObservable(Func, IDisposable> subscribe) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return subscribe(observer); + } +} + +internal sealed class AnonymousObservable(TState state, Func, TState, IDisposable> subscribe) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return subscribe(observer, state); + } +} + diff --git a/src/R3/Factories/_EventFactory.cs b/src/R3/Factories/_EventFactory.cs deleted file mode 100644 index 64893424..00000000 --- a/src/R3/Factories/_EventFactory.cs +++ /dev/null @@ -1,16 +0,0 @@ -namespace R3; - -public static partial class Observable -{ - // TODO: this is working space, will remove this file after complete. - - // TODO: Defer, DeferAsync, Start, Using, Create - - - // ToObservable(ToEvent) - // ToAsyncEnumerable? - // AsObservable - // AsSingleUnitObservable - // AsUnitObservable - // AsUniResult -} diff --git a/src/R3/ObserverExtensions.cs b/src/R3/ObserverExtensions.cs index 682fff90..737c8a36 100644 --- a/src/R3/ObserverExtensions.cs +++ b/src/R3/ObserverExtensions.cs @@ -12,6 +12,12 @@ public static void OnCompleted(this Observer observer, Exception exception observer.OnCompleted(Result.Failure(exception)); } + public static Observer Wrap(this Observer observer) + { + return new WrappedObserver(observer); + } + + public static Observer ToObserver(this IObserver observer) { return new IObserverToObserver(observer); @@ -42,3 +48,26 @@ protected override void OnCompletedCore(Result result) } } } + +internal sealed class WrappedObserver(Observer observer) : Observer +{ + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void DisposeCore() + { + observer.Dispose(); + } +} diff --git a/src/R3/Operators/AsObservable.cs b/src/R3/Operators/AsObservable.cs index 70874939..1bfed5ce 100644 --- a/src/R3/Operators/AsObservable.cs +++ b/src/R3/Operators/AsObservable.cs @@ -2,14 +2,33 @@ public static partial class ObservableExtensions { - // TODO: more overload? - public static IObservable AsObservable(this Observable source) + // TODO: test + + public static Observable AsObservable(this Observable source) { + if (source is AsObservable) // already hide + { + return source; + } + return new AsObservable(source); } + + public static IObservable AsIObservable(this Observable source) + { + return new AsIObservable(source); + } +} + +internal sealed class AsObservable(Observable observable) : Observable +{ + protected override IDisposable SubscribeCore(Observer observer) + { + return observable.Subscribe(observer.Wrap()); + } } -internal sealed class AsObservable(Observable source) : IObservable +internal sealed class AsIObservable(Observable source) : IObservable { public IDisposable Subscribe(IObserver observer) { diff --git a/src/R3/Operators/_Operators.cs b/src/R3/Operators/_Operators.cs index a45a0e88..aeb28c28 100644 --- a/src/R3/Operators/_Operators.cs +++ b/src/R3/Operators/_Operators.cs @@ -5,6 +5,8 @@ public static partial class ObservableExtensions // TODO: this is working space, will remove this file after complete. + // AsUnitObservable + // Time based // Frame based // OnErrorStop diff --git a/tests/R3.Tests/FactoryTests/CreateTest.cs b/tests/R3.Tests/FactoryTests/CreateTest.cs new file mode 100644 index 00000000..9bb30765 --- /dev/null +++ b/tests/R3.Tests/FactoryTests/CreateTest.cs @@ -0,0 +1,60 @@ + +namespace R3.Tests.FactoryTests; + +public class CreateTest +{ + [Fact] + public void Create() + { + var source = Observable.Create(observer => + { + observer.OnNext(1); + observer.OnNext(10); + observer.OnNext(100); + observer.OnCompleted(); + return Disposable.Empty; + }); + + source.ToLiveList().AssertEqual([1, 10, 100]); + } + + [Fact] + public void CreateS() + { + using var publisher = new Subject(); + var source = Observable.Create>(publisher, (observer, state) => + { + return state.Subscribe(new Wrap(observer)); + }); + + using var list = source.ToLiveList(); + + publisher.OnNext(1); + list.AssertEqual([1]); + publisher.OnNext(10); + list.AssertEqual([1, 10]); + publisher.OnNext(100); + list.AssertEqual([1, 10, 100]); + + publisher.OnCompleted(); + list.AssertIsCompleted(); + } +} + +file class Wrap(Observer observer) : Observer +{ + protected override void OnCompletedCore(Result result) + { + observer.OnCompleted(result); + } + + protected override void OnErrorResumeCore(Exception error) + { + observer.OnErrorResume(error); + } + + protected override void OnNextCore(T value) + { + observer.OnNext(value); + } +} diff --git a/tests/R3.Tests/FactoryTests/DeferTest.cs b/tests/R3.Tests/FactoryTests/DeferTest.cs index ecfb7094..2fd490f2 100644 --- a/tests/R3.Tests/FactoryTests/DeferTest.cs +++ b/tests/R3.Tests/FactoryTests/DeferTest.cs @@ -21,3 +21,5 @@ public void Test() list.AssertEqual([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); } } + +