Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
artemiusgreat committed Oct 25, 2024
1 parent 666387c commit 908b9a5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Distribution/Distribution.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PropertyGroup>
<IsPackable>true</IsPackable>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<Version>2.1.8</Version>
<Version>2.1.9</Version>
<Description>General purpose virtual actor framework for peer-to-peer microservices or in-process communication within the same app with possible extension to blockchains.</Description>
<Authors>artemiusgreat</Authors>
<Copyright>indemos.com</Copyright>
Expand Down
26 changes: 10 additions & 16 deletions Distribution/Services/ScheduleService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,22 @@ public class ScheduleService : IDisposable
/// Constructor
/// </summary>
/// <param name="count"></param>
/// <param name="scheduler"></param>
/// <param name="cancellation"></param>
public ScheduleService(int count, CancellationTokenSource cancellation)
{
_count = count;
_cancellation = cancellation;
_semaphore = new ManualResetEvent(true);
_queue = Channel.CreateBounded<ActionModel>(Environment.ProcessorCount * 100);
_option = new OptionModel
{
IsRemovable = true
};
_option = new OptionModel { IsRemovable = true };

var process = new Thread(() =>
{
while (cancellation.IsCancellationRequested is false)
while (_cancellation.IsCancellationRequested is false)
{
_semaphore?.WaitOne();

while (_queue.Reader.TryRead(out var actionModel))
{
actionModel.Action();
}

_semaphore?.Reset();
_semaphore.WaitOne();
while (_queue.Reader.TryRead(out var actionModel)) actionModel.Action();
_semaphore.Reset();
}
});

Expand All @@ -64,8 +55,6 @@ public virtual void Dispose()
_queue?.Writer?.TryComplete();
_cancellation?.Cancel();
_semaphore?.Dispose();

_semaphore = null;
}

/// <summary>
Expand Down Expand Up @@ -251,6 +240,11 @@ public virtual TaskCompletionSource<T> Send<T>(Task<T> action, OptionModel optio
/// <param name="actionModel"></param>
protected virtual void Enqueue(ActionModel actionModel)
{
while (_cancellation.IsCancellationRequested)
{
return;
}

if (_queue.Reader.TryPeek(out var previousAction))
{
if ((previousAction.Option ?? _option).IsRemovable && _queue.Reader.Count >= _count)
Expand Down

0 comments on commit 908b9a5

Please sign in to comment.