Skip to content

Commit

Permalink
FromEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 22, 2023
1 parent b48113d commit f56d015
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 10 deletions.
4 changes: 2 additions & 2 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
var repeat = System.Reactive.Linq.Observable.Repeat("foo", 10);
// repeat.TakeWhile(



// System.Reactive.Linq.Observable.FromEvent(

var rp = new ReactiveProperty<int>(999);
rp.Value += 10;
Expand All @@ -63,6 +62,7 @@ public static IDisposable WriteLine<T>(this Observable<T> source)
}



class TestDisposable : IDisposable
{
public int CalledCount = 0;
Expand Down
114 changes: 114 additions & 0 deletions src/R3/Factories/FromEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
namespace R3;

public static partial class Observable
{
public static Observable<(object? sender, EventArgs e)> FromEventHandler(Action<EventHandler> addHandler, Action<EventHandler> removeHandler)
{
return new FromEvent<EventHandler, (object? sender, EventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler);
}

public static Observable<(object? sender, TEventArgs e)> FromEventHandler<TEventArgs>(Action<EventHandler<TEventArgs>> addHandler, Action<EventHandler<TEventArgs>> removeHandler)
{
return new FromEvent<EventHandler<TEventArgs>, (object? sender, TEventArgs e)>(h => (sender, e) => h((sender, e)), addHandler, removeHandler);
}

public static Observable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler)
{
return new FromEvent<Action>(static h => h, addHandler, removeHandler);
}

public static Observable<T> FromEvent<T>(Action<Action<T>> addHandler, Action<Action<T>> removeHandler)
{
return new FromEvent<Action<T>, T>(static h => h, addHandler, removeHandler);
}

public static Observable<Unit> FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
{
return new FromEvent<TDelegate>(conversion, addHandler, removeHandler);
}

public static Observable<T> FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
{
return new FromEvent<TDelegate, T>(conversion, addHandler, removeHandler);
}
}

internal sealed class FromEvent<TDelegate>(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
: Observable<Unit>
{
protected override IDisposable SubscribeCore(Observer<Unit> observer)
{
return new _FromEventPattern(conversion, addHandler, removeHandler, observer);
}

sealed class _FromEventPattern : IDisposable
{
Observer<Unit>? observer;
Action<TDelegate>? removeHandler;
TDelegate registeredHandler;

public _FromEventPattern(Func<Action, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<Unit> observer)
{
this.observer = observer;
this.removeHandler = removeHandler;
this.registeredHandler = conversion(OnNext);
addHandler(this.registeredHandler);
}

void OnNext()
{
observer?.OnNext(default);
}

public void Dispose()
{
var handler = Interlocked.Exchange(ref removeHandler, null);
if (handler != null)
{
observer = null;
removeHandler = null;
handler(this.registeredHandler);
}
}
}
}

internal sealed class FromEvent<TDelegate, T>(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler)
: Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return new _FromEventPattern(conversion, addHandler, removeHandler, observer);
}

sealed class _FromEventPattern : IDisposable
{
Observer<T>? observer;
Action<TDelegate>? removeHandler;
TDelegate registeredHandler;

public _FromEventPattern(Func<Action<T>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, Observer<T> observer)
{
this.observer = observer;
this.removeHandler = removeHandler;
this.registeredHandler = conversion(OnNext);
addHandler(this.registeredHandler);
}

void OnNext(T value)
{
observer?.OnNext(value);
}

public void Dispose()
{
var handler = Interlocked.Exchange(ref removeHandler, null);
if (handler != null)
{
observer = null;
removeHandler = null;
handler(this.registeredHandler);
}
}
}
}
20 changes: 19 additions & 1 deletion src/R3/Factories/ToObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ internal class IObservableToObservable<T>(IObservable<T> source) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
{
return source.Subscribe(observer);
return source.Subscribe(new ObserverToObserver(observer));
}

sealed class ObserverToObserver(Observer<T> observer) : IObserver<T>
{
public void OnNext(T value)
{
observer.OnNext(value);
}

public void OnError(Exception error)
{
observer.OnCompleted(error);
}

public void OnCompleted()
{
observer.OnCompleted();
}
}
}
14 changes: 7 additions & 7 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@

using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Threading;

namespace R3;
namespace R3;

public static partial class Observable
{
// TODO: this is working space, will remove this file after complete.

// TODO: Defer, DeferAsync, FromAsync, FromAsyncPattern, FromEvent, FromEventPattern, Start, Using, Create
// TODO: Defer, DeferAsync, Start, Using, Create
// Timer, Interval, TimerFrame, IntervalFrame, ToObservable(ToEvent)


Expand All @@ -28,5 +23,10 @@ public static partial class Observable
// AsNeverComplete

// TODO: use SystemDefault




}


1 change: 1 addition & 0 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ public static partial class ObservableExtensions
// return tasks:
// All, Any, Contains, SequenceEqual, IsEmpty, MaxBy, MinBy, ToDictionary, ToLookup, ForEachAsync
}

82 changes: 82 additions & 0 deletions tests/R3.Tests/FactoryTests/FromEventTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
namespace R3.Tests.FactoryTests;

public class FromEventTest
{

[Fact]
public void Event()
{
var ev = new EventPattern();

var l1 = Observable.FromEventHandler(h => ev.E1 += h, h => ev.E1 -= h).ToLiveList();
var l2 = Observable.FromEventHandler<int>(h => ev.E2 += h, h => ev.E2 -= h).ToLiveList();
var l3 = Observable.FromEvent(h => ev.A1 += h, h => ev.A1 -= h).ToLiveList();
var l4 = Observable.FromEvent<int>(h => ev.A2 += h, h => ev.A2 -= h).ToLiveList();
var l5 = Observable.FromEvent<MyDelegate1>(h => new MyDelegate1(h), h => ev.M1 += h, h => ev.M1 -= h).ToLiveList();
var l6 = Observable.FromEvent<MyDelegate2, int>(h => new MyDelegate2(h), h => ev.M2 += h, h => ev.M2 -= h).ToLiveList();
var l7 = Observable.FromEvent<MyDelegate3, (int x, int y)>(h => (x, y) => h((x, y)), h => ev.M3 += h, h => ev.M3 -= h).ToLiveList();

ev.Raise(10, 20);
ev.Raise(100, 200);

l1.Should().HaveCount(2);
l3.Should().HaveCount(2);
l5.Should().HaveCount(2);

l2.Select(x => x.e).Should().Equal([10, 100]);
l4.AssertEqual([10, 100]);
l6.AssertEqual([10, 100]);
l7.AssertEqual([(10, 20), (100, 200)]);

ev.InvocationListCount().Should().Be((1, 1, 1, 1, 1, 1, 1));

l1.Dispose();
l2.Dispose();
l3.Dispose();
l4.Dispose();
l5.Dispose();
l6.Dispose();
l7.Dispose();

ev.InvocationListCount().Should().Be((0, 0, 0, 0, 0, 0, 0));
}


class EventPattern
{
public event EventHandler? E1;
public event EventHandler<int>? E2;
public event Action? A1;
public event Action<int>? A2;
public event MyDelegate1? M1;
public event MyDelegate2? M2;
public event MyDelegate3? M3;

public void Raise(int x, int y)
{
E1?.Invoke(this, new EventArgs());
E2?.Invoke(this, x);
A1?.Invoke();
A2?.Invoke(x);
M1?.Invoke();
M2?.Invoke(x);
M3?.Invoke(x, y);
}

public (int, int, int, int, int, int, int) InvocationListCount()
{
return (
E1?.GetInvocationList().Length ?? 0,
E2?.GetInvocationList().Length ?? 0,
A1?.GetInvocationList().Length ?? 0,
A2?.GetInvocationList().Length ?? 0,
M1?.GetInvocationList().Length ?? 0,
M2?.GetInvocationList().Length ?? 0,
M3?.GetInvocationList().Length ?? 0);
}
}

public delegate void MyDelegate1();
public delegate void MyDelegate2(int x);
public delegate void MyDelegate3(int x, int y);
}

0 comments on commit f56d015

Please sign in to comment.