Skip to content

Commit

Permalink
RavenDB-16771: Unobserved exception in Subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
garayx authored and ppekrol committed Jun 29, 2023
1 parent 71746b0 commit cb53e6c
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions src/Raven.Client/Documents/Subscriptions/SubscriptionWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,13 @@ private void AssertConnectionState(SubscriptionConnectionServerMessage connectio

private async Task ProcessSubscriptionAsync()
{
Task notifiedSubscriber = Task.CompletedTask;

try
{
_processingCts.Token.ThrowIfCancellationRequested();

var contextPool = _store.GetRequestExecutor(_dbName).ContextPool;

using (contextPool.AllocateOperationContext(out JsonOperationContext context))
using (context.GetMemoryBuffer(out var buffer))
using (var tcpStream = await ConnectToServer(_processingCts.Token).ConfigureAwait(false))
Expand All @@ -508,8 +509,6 @@ private async Task ProcessSubscriptionAsync()

OnEstablishedSubscriptionConnection?.Invoke();

Task notifiedSubscriber = Task.CompletedTask;

var batch = new SubscriptionBatch<T>(_subscriptionLocalRequestExecutor, _store, _dbName, _logger);

while (_processingCts.IsCancellationRequested == false)
Expand Down Expand Up @@ -610,6 +609,20 @@ private async Task ProcessSubscriptionAsync()
// otherwise this is thrown when shutting down,
// it isn't an error, so we don't need to treat it as such
}
finally
{
try
{
if (notifiedSubscriber is { IsCompleted: false })
{
await notifiedSubscriber.WaitWithTimeout(TimeSpan.FromSeconds(15)).ConfigureAwait(false);
}
}
catch
{
// ignored
}
}
}

private async Task<BatchFromServer> ReadSingleSubscriptionBatchFromServer(JsonContextPool contextPool, Stream tcpStream, JsonOperationContext.MemoryBuffer buffer, SubscriptionBatch<T> batch)
Expand Down

0 comments on commit cb53e6c

Please sign in to comment.