Skip to content

Commit

Permalink
OnErrorResume
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 11, 2023
1 parent 26cb113 commit 09064e1
Show file tree
Hide file tree
Showing 22 changed files with 398 additions and 153 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,35 @@
# R3

Third Generation of Reactive Extensions.

```csharp
public abstract class Event<TMessage>
{
public IDisposable Subscribe(Subscriber<TMessage> subscriber);
}

public abstract class Subscriber<TMessage> : IDisposable
{
public void OnNext(TMessage message);
public void OnErrorResume(Exception error);
}

// Completable
public abstract class CompletableEvent<TMessage, TComplete>
{
public IDisposable Subscribe(Subscriber<TMessage, TComplete> subscriber)
}

public abstract class Subscriber<TMessage, TComplete> : IDisposable
{
public void OnNext(TMessage message);
public void OnErrorResume(Exception error);
public void OnCompleted(TComplete complete);
}
```

```csharp
// similar as IObserver<T>
CompletableEvent<TMessage, Result<TComplete>>
```

39 changes: 32 additions & 7 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,41 @@
EventSystem.Logger = factory.CreateLogger<EventSystem>();
var logger = factory.CreateLogger<Program>();


EventSystem.SetUnhandledExceptionHandler(e =>
{
logger.ZLogError($"{e}");
});



var publisher = new Publisher<int>();

var d = publisher
.Where(x => true)
.Select(x => x)
.Select(x =>
{
//if (x == 2) throw new Exception("e");
return x;
})
.Take(5)
.OnErrorAsComplete()
.Subscribe(x =>
{
if (x == 2) throw new Exception("e");
logger.ZLogInformation($"OnNext: {x}");
}, e =>
{
logger.ZLogInformation($"failure resume");
},
x =>
{
logger.ZLogInformation($"end:{x}");
});

SubscriptionTracker.ForEachActiveTask(x =>
{
logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}");
// logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}");


// logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}");
Expand All @@ -46,14 +68,15 @@
publisher.PublishOnNext(2);
publisher.PublishOnNext(3);

d.Dispose();

publisher.PublishOnErrorResume(new Exception("ERROR"));

publisher.PublishOnNext(4);
publisher.PublishOnNext(5);
publisher.PublishOnNext(6);
publisher.PublishOnNext(7);


Observable.Range(1, 10, Scheduler.CurrentThread)
.Take(3)
.Subscribe();
d.Dispose();



Expand All @@ -63,6 +86,7 @@




// Observable.Throw(
// s.Where(

Expand Down Expand Up @@ -154,3 +178,4 @@ public static IDisposable WriteLine<T, U>(this CompletableEvent<T, U> source)
}
}


15 changes: 15 additions & 0 deletions src/R3/CancellationDisposable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace R3;

public sealed class CancellationDisposable(CancellationTokenSource cancellationTokenSource) : IDisposable
{
public CancellationDisposable()
: this(new CancellationTokenSource())
{
}

public CancellationToken Token => cancellationTokenSource.Token;

public bool IsDisposed => cancellationTokenSource.IsCancellationRequested;

public void Dispose() => cancellationTokenSource.Cancel();
}
22 changes: 10 additions & 12 deletions src/R3/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,20 @@ public void OnNext(TMessage message)
}
catch (Exception ex)
{
OnError(ex);
OnErrorResume(ex);
}
}

public abstract void OnNextCore(TMessage message);
protected abstract void OnNextCore(TMessage message);

[StackTraceHidden, DebuggerStepThrough]
public void OnError(Exception error)
public void OnErrorResume(Exception error)
{
if (IsDisposed) return;
OnErrorCore(error);
OnErrorResumeCore(error);
}

[StackTraceHidden, DebuggerStepThrough]
public virtual void OnErrorCore(Exception error) { }
protected abstract void OnErrorResumeCore(Exception error);

[StackTraceHidden, DebuggerStepThrough]
public void Dispose()
Expand Down Expand Up @@ -141,22 +140,21 @@ public void OnNext(TMessage message)
}
catch (Exception ex)
{
OnError(ex);
OnErrorResume(ex);
}
}

public abstract void OnNextCore(TMessage message);
protected abstract void OnNextCore(TMessage message);

[StackTraceHidden, DebuggerStepThrough]
public void OnError(Exception error)
public void OnErrorResume(Exception error)
{
if (IsDisposed || IsCalledCompleted) return;

OnErrorCore(error);
OnErrorResumeCore(error);
}

[StackTraceHidden, DebuggerStepThrough]
public virtual void OnErrorCore(Exception error) { }
protected abstract void OnErrorResumeCore(Exception error);

[StackTraceHidden, DebuggerStepThrough]
public void OnCompleted(TComplete complete)
Expand Down
11 changes: 9 additions & 2 deletions src/R3/EventSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@ public class EventSystem
{
public static ILogger<EventSystem> Logger { get; set; } = NullLogger<EventSystem>.Instance;

public static Action<Exception> UnhandledException { get; set; } = Throw;
static Action<Exception> unhandledException = Throw;

EventSystem()
// Prevent +=, use Set and Get method.
public static void SetUnhandledExceptionHandler(Action<Exception> unhandledExceptionHandler)
{
unhandledException = unhandledExceptionHandler;
}

public static Action<Exception> GetUnhandledExceptionHandler()
{
return unhandledException;
}

static void Throw(Exception exception)
Expand Down
40 changes: 40 additions & 0 deletions src/R3/Factories/ToCompletableEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
namespace R3
{
public static partial class EventFactory
{
public static CompletableEvent<TMessage, Result<Unit>> ToCompletableEvent<TMessage>(this Task<TMessage> task)
{
return new R3.Factories.ToCompletableEvent<TMessage>(task);
}
}
}

namespace R3.Factories
{
internal sealed class ToCompletableEvent<TMessage>(Task<TMessage> task) : CompletableEvent<TMessage, Result<Unit>>
{
protected override IDisposable SubscribeCore(Subscriber<TMessage, Result<Unit>> subscriber)
{
var subscription = new CancellationDisposable();
SubscribeTask(subscriber, subscription.Token);
return subscription;
}

async void SubscribeTask(Subscriber<TMessage, Result<Unit>> subscriber, CancellationToken cancellationToken)
{
TMessage? result;
try
{
result = await task.WaitAsync(cancellationToken);
}
catch (Exception ex)
{
subscriber.OnCompleted(Result.Failure<Unit>(ex));
return;
}

subscriber.OnNext(result);
subscriber.OnCompleted(Result.Success<Unit>(default));
}
}
}
12 changes: 6 additions & 6 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace R3

using System.Diagnostics.CodeAnalysis;

namespace R3
{
public static partial class EventFactory
{
Expand Down Expand Up @@ -27,11 +30,8 @@ public static partial class EventFactory
// AsUnitComplete
// AsNeverComplete

// Repeat
public static CompletableEvent<TMessage, Unit> Repeat<TMessage>(TMessage value)
{
return new ImmediateScheduleReturn<TMessage, Unit>(value, default); // immediate
}


}
}

Expand Down
19 changes: 17 additions & 2 deletions src/R3/Operators/CombineLatest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public void OnNext(TRight message)
}
}

public void OnError(Exception error)
{
subscriber.OnErrorResume(error);
}

void Publish(TLeft m1, TRight m2)
{
var result = selector(m1, m2);
Expand All @@ -90,18 +95,28 @@ void Publish(TLeft m1, TRight m2)

sealed class LeftSubscriber(_CombineLatest parent) : Subscriber<TLeft>
{
public override void OnNextCore(TLeft message)
protected override void OnNextCore(TLeft message)
{
parent.OnNext(message);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnError(error);
}
}

sealed class RightSubscriber(_CombineLatest parent) : Subscriber<TRight>
{
public override void OnNextCore(TRight message)
protected override void OnNextCore(TRight message)
{
parent.OnNext(message);
}

protected override void OnErrorResumeCore(Exception error)
{
parent.OnError(error);
}
}
}
}
19 changes: 16 additions & 3 deletions src/R3/Operators/CountAsync.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
namespace R3

namespace R3
{
public static partial class EventExtensions
{
Expand Down Expand Up @@ -36,14 +37,20 @@ internal sealed class CountAsync<TMessage, TComplete>(TaskCompletionSource<int>
{
int count;

public override void OnNextCore(TMessage message)
protected override void OnNextCore(TMessage message)
{
checked
{
count++;
}
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
this.Dispose(); // stop subscription.
}

protected override void OnCompletedCore(TComplete complete)
{
tcs.TrySetResult(count);
Expand All @@ -54,13 +61,19 @@ internal sealed class CountUAsync<TMessage, TComplete>(TaskCompletionSource<int>
{
int count;

public override void OnNextCore(TMessage message)
protected override void OnNextCore(TMessage message)
{
checked
{
count++;
}
}

protected override void OnErrorResumeCore(Exception error)
{
tcs.TrySetException(error);
this.Dispose(); // stop subscription.
}

protected override void OnCompletedCore(Result<TComplete> complete)
{
Expand Down
8 changes: 7 additions & 1 deletion src/R3/Operators/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public _Delay(Subscriber<TMessage> subscriber, TimeSpan dueTime, TimeProvider ti
this.timer = timeProvider.CreateStoppedTimer(timerCallback, this);
}

public override void OnNextCore(TMessage message)
protected override void OnNextCore(TMessage message)
{
var timestamp = timeProvider.GetTimestamp();
lock (queue)
Expand All @@ -68,6 +68,12 @@ public override void OnNextCore(TMessage message)
}
}

protected override void OnErrorResumeCore(Exception error)
{
// TODO: what should we do?
throw new NotImplementedException();
}

static void DrainMessages(object? state)
{
var self = (_Delay)state!;
Expand Down
Loading

0 comments on commit 09064e1

Please sign in to comment.