diff --git a/src/Raven.Client/Documents/Subscriptions/SubscriptionWorker.cs b/src/Raven.Client/Documents/Subscriptions/SubscriptionWorker.cs index b780f59aa12..bea2af20277 100644 --- a/src/Raven.Client/Documents/Subscriptions/SubscriptionWorker.cs +++ b/src/Raven.Client/Documents/Subscriptions/SubscriptionWorker.cs @@ -480,12 +480,13 @@ private void AssertConnectionState(SubscriptionConnectionServerMessage connectio private async Task ProcessSubscriptionAsync() { + Task notifiedSubscriber = Task.CompletedTask; + try { _processingCts.Token.ThrowIfCancellationRequested(); var contextPool = _store.GetRequestExecutor(_dbName).ContextPool; - using (contextPool.AllocateOperationContext(out JsonOperationContext context)) using (context.GetMemoryBuffer(out var buffer)) using (var tcpStream = await ConnectToServer(_processingCts.Token).ConfigureAwait(false)) @@ -508,8 +509,6 @@ private async Task ProcessSubscriptionAsync() OnEstablishedSubscriptionConnection?.Invoke(); - Task notifiedSubscriber = Task.CompletedTask; - var batch = new SubscriptionBatch(_subscriptionLocalRequestExecutor, _store, _dbName, _logger); while (_processingCts.IsCancellationRequested == false) @@ -610,6 +609,20 @@ private async Task ProcessSubscriptionAsync() // otherwise this is thrown when shutting down, // it isn't an error, so we don't need to treat it as such } + finally + { + try + { + if (notifiedSubscriber is { IsCompleted: false }) + { + await notifiedSubscriber.WaitWithTimeout(TimeSpan.FromSeconds(15)).ConfigureAwait(false); + } + } + catch + { + // ignored + } + } } private async Task ReadSingleSubscriptionBatchFromServer(JsonContextPool contextPool, Stream tcpStream, JsonOperationContext.MemoryBuffer buffer, SubscriptionBatch batch)