Skip to content

Commit

Permalink
TODO:ObserveOn
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 26, 2023
1 parent 42513bd commit d106a82
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 25 deletions.
2 changes: 0 additions & 2 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@
//var xs = await publisher.Take(TimeSpan.FromSeconds(5));



foreach (var item in Enumerable.Range(1, 10).TakeWhile(x => x <= 3))
{
Console.WriteLine(item);
}

var repeat = System.Reactive.Linq.Observable.Repeat("foo", 10);


// System.Reactive.Linq.Observable.Append(

// repeat.TakeWhile(
Expand Down
75 changes: 52 additions & 23 deletions src/R3/Operators/ObserveOn.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Runtime.InteropServices;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;

namespace R3;

Expand Down Expand Up @@ -30,6 +31,7 @@ public static Observable<T> ObserveOn<T>(this Observable<T> source, FrameProvide
}
}

// TODO: use local-queue(careful re-entrant) implementation
internal sealed class ObserveOnSynchronizationContext<T>(Observable<T> source, SynchronizationContext synchronizationContext) : Observable<T>
{
protected override IDisposable SubscribeCore(Observer<T> observer)
Expand Down Expand Up @@ -103,51 +105,78 @@ protected override IDisposable SubscribeCore(Observer<T> observer)

sealed class _ObserveOn(Observer<T> observer) : Observer<T>
{
static readonly Action<(_ObserveOn, T)> onNext = PostOnNext;
static readonly Action<(_ObserveOn, Exception)> onErrorResume = PostOnErrorResume;
static readonly Action<(_ObserveOn, Result)> onCompleted = PostOnCompleted;
static readonly Action<_ObserveOn> drainMessages = DrainMessages;

Observer<T> observer = observer;
ConcurrentQueue<Notification<T>> q = new();
bool running = false;

protected override bool AutoDisposeOnCompleted => false;

protected override void OnNextCore(T value)
{
var item = PooledThreadPoolWorkItem<(_ObserveOn, T)>.Create(onNext, (this, value));
ThreadPool.UnsafeQueueUserWorkItem(item, preferLocal: false);
q.Enqueue(new(value));
TryStartWorker();
}

protected override void OnErrorResumeCore(Exception error)
{
var item = PooledThreadPoolWorkItem<(_ObserveOn, Exception)>.Create(onErrorResume, (this, error));
ThreadPool.UnsafeQueueUserWorkItem(item, preferLocal: false);
q.Enqueue(new(error));
TryStartWorker();
}

protected override void OnCompletedCore(Result result)
{
var item = PooledThreadPoolWorkItem<(_ObserveOn, Result)>.Create(onCompleted, (this, result));
ThreadPool.UnsafeQueueUserWorkItem(item, preferLocal: false);
q.Enqueue(new(result));
TryStartWorker();
}

static void PostOnNext((_ObserveOn, T) state)
void TryStartWorker()
{
state.Item1.observer.OnNext(state.Item2);
}

static void PostOnErrorResume((_ObserveOn, Exception) state)
{
state.Item1.observer.OnErrorResume(state.Item2);
lock (q)
{
if (!running)
{
running = true;
ThreadPool.UnsafeQueueUserWorkItem(drainMessages, this, preferLocal: false);
}
}
}

static void PostOnCompleted((_ObserveOn, Result) state)
static void DrainMessages(_ObserveOn state)
{
try
AGAIN:
while (state.q.TryDequeue(out var item))
{
state.Item1.observer.OnCompleted(state.Item2);
switch (item.Kind)
{
case NotificationKind.OnNext:
state.observer.OnNext(item.Value!);
break;
case NotificationKind.OnErrorResume:
state.observer.OnErrorResume(item.Error!);
break;
case NotificationKind.OnCompleted:
try
{
state.observer.OnCompleted(item.Result!.Value);
}
finally
{
state.Dispose();
}
break;
}
}
finally

lock (state.q)
{
state.Item1.Dispose();
if (state.q.Count != 0)
{
goto AGAIN;
}
state.running = false;
return;
}
}
}
Expand Down Expand Up @@ -433,7 +462,7 @@ public bool MoveNext(long frameCount)
{
list.Clear();
}

if (IsDisposed)
{
running = false;
Expand Down

0 comments on commit d106a82

Please sign in to comment.