Skip to content

Commit

Permalink
do
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 13, 2023
1 parent 7b6d3c0 commit 624b4ad
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 298 deletions.
172 changes: 20 additions & 152 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,6 @@
using System.Reactive.Subjects;
using System.Threading.Channels;
using ZLogger;
//using System.Reactive.Disposables;
//using System.Reactive.Subjects;
//using System.Threading.Channels;








var disposables = Enumerable.Range(1, 100).Select(x => new TestDisposable()).ToArray();
var composite = new System.Reactive.Disposables.CompositeDisposable(disposables);

foreach (var item in disposables)
{
composite.Remove(item);
}













SubscriptionTracker.EnableTracking = true;
SubscriptionTracker.EnableStackTrace = true;
Expand All @@ -51,145 +21,43 @@



var publisher = new Publisher<int, R3.Unit>();

var d = publisher
.Where(x => true)
.Select(x =>
{
//if (x == 2) throw new Exception("e");
return x;
})
.Take(5)
.OnErrorAsComplete()
.Subscribe(x =>
{
if (x == 2) throw new Exception("e");
logger.ZLogInformation($"OnNext: {x}");
}, e =>
{
logger.ZLogInformation($"failure resume");
},
x =>
{
//logger.ZLogInformation($"end:{x}");
x.TryThrow();
});

SubscriptionTracker.ForEachActiveTask(x =>
{
// logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}");


// logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}");
});

//publisher.PublishOnNext(1);
//publisher.PublishOnNext(2);
//publisher.PublishOnNext(3);

//publisher.PublishOnErrorResume(new Exception("ERROR"));

//publisher.PublishOnNext(4);
//publisher.PublishOnNext(5);
//publisher.PublishOnNext(6);
//publisher.PublishOnNext(7);


//d.Dispose();

var ct = new CancellationTokenSource(1000);
EventSystem.DefaultFrameProvider = new ThreadSleepFrameProvider(60);

var iiii = Enumerable.Range(1, 10).ElementAt(^12);
Console.WriteLine(iiii);


// System.Reactive.Linq.Observable.Empty<int>(

var s = new System.Reactive.Subjects.Subject<string>();

// Console.WriteLine($"Average: {Enumerable.Empty<int>().Average()}");

// s.ToListObservable();

// Observable.Throw(
// s.Where(

// new Result<int>(

// s.ObserveOn(
// Observable.FromEventPattern(



//foreach (var item in typeof(System.Reactive.Linq.Observable).GetMethods().Select(x => x.Name).Distinct().OrderBy(x => x))
//var t = new Thread(() =>
//{
// if (item == "ToString" || item == "Equals" || item == "GetHashCode" || item == "GetType")
// while (true)
// {
// continue;
// Console.WriteLine("loop"); Thread.Sleep(60);
// }
// Console.WriteLine("- [ ] " + item);
//}







var subject = new Subject<int>();

subject.OnCompleted();

Console.WriteLine("Subscribe");
subject.Subscribe(x => Console.WriteLine(x));
subject.OnNext(99);




// subject.ForEachAsync(






//p.PublishOnNext(4);
//p.PublishOnNext(5);

//Console.WriteLine("-------------------------");


//SubscriptionTracker.ForEachActiveTask(x =>
//{
// Console.WriteLine($"{x.TrackingId,3}: {x.FirstLine}");
//});
//t.IsBackground = true;
//t.Start();

//var s = new NewThreadScheduler(_ => new Thread(() => { while (true) { Console.WriteLine("loop"); Thread.Sleep(60); } }));

//s.Schedule(() => Console.WriteLine("do once"));
//using var f = new ThreadSleepFrameProvider(60);

//Event.Return(10, TimeProvider.System)
// .WriteLine();

//Console.ReadLine();

//var a = new ReactiveProperty<int>(100);
//var b = new ReactiveProperty<int>(999);
var source = Event.EveryUpdate(ct.Token);


//a.CombineLatest(b, (x, y) => (x, y)).WriteLine();

source.DoOnDisposed(() => {/*Console.WriteLine("DISPOSED")*/}).WriteLine();

//a.Value = 3;
//a.Value = 4;
//b.Value = 99999;
//b.Value = 1111;


// TODO: if WaitAsync is using, not disposed???
await source.WaitAsync();
Console.WriteLine("Press Key to done.");

//Observable.event


Console.ReadLine();

SubscriptionTracker.ForEachActiveTask(x =>
{
Console.WriteLine(x);
});

public static class Extensions
{
Expand Down
18 changes: 17 additions & 1 deletion src/R3/EventSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ public class EventSystem
{
public static ILogger<EventSystem> Logger { get; set; } = NullLogger<EventSystem>.Instance;

public static TimeProvider DefaultTimeProvider { get; set; } = TimeProvider.System;
public static FrameProvider DefaultFrameProvider { get; set; } = new NotSupportedFrameProvider();

static Action<Exception> unhandledException = WriteLog;

// Prevent +=, use Set and Get method.
public static void SetUnhandledExceptionHandler(Action<Exception> unhandledExceptionHandler)
public static void RegisterUnhandledExceptionHandler(Action<Exception> unhandledExceptionHandler)
{
unhandledException = unhandledExceptionHandler;
}
Expand All @@ -34,6 +37,19 @@ static void WriteLog(Exception exception)
}
}

internal sealed class NotSupportedFrameProvider : FrameProvider
{
public override long GetFrameCount()
{
throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set EventSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...).");
}

public override void Register(IFrameRunnerWorkItem callback)
{
throw new NotSupportedException("EventSystem.DefaultFrameProvider is not set. Please set EventSystem.DefaultFrameProvider to a valid FrameProvider(ThreadSleepFrameProvider, etc...).");
}
}

internal static partial class SystemLoggerExtensions
{
[LoggerMessage(Trace, "Add subscription tracking TrackingId: {TrackingId}.")]
Expand Down
129 changes: 59 additions & 70 deletions src/R3/Factories/_EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,76 +26,65 @@ public static partial class Event
// AsUnitComplete
// AsNeverComplete

// TODO: use SystemDefault

public static Event<Unit, Unit> EveryUpdate()
{
return new EveryUpdate(EventSystem.DefaultFrameProvider, CancellationToken.None);
}

public static Event<Unit, Unit> EveryUpdate(CancellationToken cancellationToken)
{
return new EveryUpdate(EventSystem.DefaultFrameProvider, cancellationToken);
}

public static Event<Unit, Unit> EveryUpdate(FrameProvider frameProvider)
{
return new EveryUpdate(frameProvider, CancellationToken.None);
}

public static Event<Unit, Unit> EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken)
{
return new EveryUpdate(frameProvider, cancellationToken);
}
}

//public static Event<Unit> EveryUpdate(FrameProvider frameProvider)
//{
// return new R3.Factories.EveryUpdate(frameProvider);
//}

//public static CompletableEvent<Unit> EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken)
//{
// return new R3.Factories.EveryUpdate(frameProvider);
//}
}

//internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event<Unit>
//{
// protected override IDisposable SubscribeCore(Subscriber<Unit> subscriber)
// {
// var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken);
// frameProvider.Register(runner);
// return runner;
// }

// class EveryUpdateRunnerWorkItem(Subscriber<Unit> subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable
// {
// bool isDisposed;

// public bool MoveNext(long frameCount)
// {
// if (isDisposed || cancellationToken.IsCancellationRequested)
// {
// return false;
// }

// subscriber.OnNext(default);
// return true;
// }

// public void Dispose()
// {
// isDisposed = true;
// }
// }
//}

//internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event<Unit>
//{
// protected override IDisposable SubscribeCore(Subscriber<Unit> subscriber)
// {
// var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken);
// frameProvider.Register(runner);
// return runner;
// }

// class EveryUpdateRunnerWorkItem(Subscriber<Unit> subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable
// {
// bool isDisposed;

// public bool MoveNext(long frameCount)
// {
// if (isDisposed || cancellationToken.IsCancellationRequested)
// {
// return false;
// }

// subscriber.OnNext(default);
// return true;
// }

// public void Dispose()
// {
// isDisposed = true;
// }
// }
//}
internal sealed class EveryUpdate(FrameProvider frameProvider, CancellationToken cancellationToken) : Event<Unit, Unit>
{
protected override IDisposable SubscribeCore(Subscriber<Unit, Unit> subscriber)
{
var runner = new EveryUpdateRunnerWorkItem(subscriber, cancellationToken);
frameProvider.Register(runner);
return runner;
}

class EveryUpdateRunnerWorkItem(Subscriber<Unit, Unit> subscriber, CancellationToken cancellationToken) : IFrameRunnerWorkItem, IDisposable
{
bool isDisposed;

public bool MoveNext(long frameCount)
{
if (isDisposed)
{
return false;
}

if (cancellationToken.IsCancellationRequested)
{
subscriber.OnCompleted();
Dispose();
return false;
}

subscriber.OnNext(default);
return true;
}

public void Dispose()
{
isDisposed = true;
}
}
}
Loading

0 comments on commit 624b4ad

Please sign in to comment.