Skip to content

Commit

Permalink
Seems like a big f.ck up
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed May 11, 2022
1 parent 500f95a commit acde578
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
23 changes: 14 additions & 9 deletions src/Core/src/Eventuous.Subscriptions/Filters/ConcurrentFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ namespace Eventuous.Subscriptions.Filters;
public sealed class ConcurrentFilter : ConsumeFilter<DelayedAckConsumeContext>, IAsyncDisposable {
readonly ConcurrentChannelWorker<WorkerTask> _worker;

public ConcurrentFilter(uint concurrencyLimit, uint bufferSize = 10)
=> _worker = new ConcurrentChannelWorker<WorkerTask>(
Channel.CreateBounded<WorkerTask>((int)(concurrencyLimit * bufferSize)),
public ConcurrentFilter(uint concurrencyLimit, uint bufferSize = 10) {
var capacity = (int)(concurrencyLimit * bufferSize);

var options = new BoundedChannelOptions(capacity) {
SingleReader = true, SingleWriter = true
};

_worker = new ConcurrentChannelWorker<WorkerTask>(
Channel.CreateBounded<WorkerTask>(options),
DelayedConsume,
(int)concurrencyLimit
);
}

static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken ct) {
var ctx = workerTask.Context;
Expand All @@ -32,15 +39,13 @@ static async ValueTask DelayedConsume(WorkerTask workerTask, CancellationToken c
var exception = ctx.HandlingResults.GetException();

switch (exception) {
case TaskCanceledException:
break;
case null: throw new ApplicationException("Event handler failed");
default: throw exception;
case TaskCanceledException: break;
case null: throw new ApplicationException("Event handler failed");
default: throw exception;
}
}

if (!ctx.HandlingResults.IsPending())
await ctx.Acknowledge().NoContext();
if (!ctx.HandlingResults.IsPending()) await ctx.Acknowledge().NoContext();
}
catch (TaskCanceledException) {
ctx.Ignore<ConcurrentFilter>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ConsumePipe consumePipe
// we won't add the concurrent filter, so it won't clash with any custom setup
static ConsumePipe ConfigurePipe(ConsumePipe pipe, CatchUpSubscriptionOptions options)
=> options.ConcurrencyLimit > 1 && pipe.RegisteredFilters.All(x => x is not ConcurrentFilter)
? pipe.AddFilterFirst(new ConcurrentFilter(options.ConcurrencyLimit))
? pipe.AddFilterFirst(new PartitioningFilter(options.ConcurrencyLimit))
: pipe;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down

0 comments on commit acde578

Please sign in to comment.