From 0c7805a2aa2ced284632e3bb884f13c0d9a7b45f Mon Sep 17 00:00:00 2001 From: Sukhorukov Anton Date: Mon, 11 Dec 2023 14:56:58 +0300 Subject: [PATCH] s3 refactoring --- .../WriteOperators/S3/S3TarWriteOperator.cs | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/common/ASC.Data.Storage/DataOperators/WriteOperators/S3/S3TarWriteOperator.cs b/common/ASC.Data.Storage/DataOperators/WriteOperators/S3/S3TarWriteOperator.cs index 67f1c0e7994..dbe25e75e56 100644 --- a/common/ASC.Data.Storage/DataOperators/WriteOperators/S3/S3TarWriteOperator.cs +++ b/common/ASC.Data.Storage/DataOperators/WriteOperators/S3/S3TarWriteOperator.cs @@ -37,6 +37,7 @@ public class S3TarWriteOperator : IDataWriteOperator private readonly List _tasks = new(); private readonly TaskScheduler _scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Limit).ConcurrentScheduler; private readonly ConcurrentQueue _queue = new(); + private readonly CancellationTokenSource _cts = new(); public string Hash { get; private set; } public string StoragePath { get; private set; } @@ -76,7 +77,21 @@ public async Task WriteEntryAsync(string tarKey, string domain, string path, IDa if (store is S3Storage s3Store) { var fullPath = s3Store.MakePath(domain, path); - var task = new Task(_store.ConcatFileAsync(fullPath, tarKey, _domain, _key, _queue).Wait); + var task = new Task(() => + { + try + { + if (!_cts.Token.IsCancellationRequested) + { + _store.ConcatFileAsync(fullPath, tarKey, _domain, _key, _queue).Wait(); + } + } + catch + { + _cts.Cancel(); + throw; + } + }); _tasks.Add(task); task.Start(_scheduler); } @@ -92,16 +107,29 @@ public async Task WriteEntryAsync(string tarKey, string domain, string path, IDa } } - public Task WriteEntryAsync(string tarKey, Stream stream) + public async Task WriteEntryAsync(string tarKey, Stream stream) { var tStream = _tempStream.Create(); stream.Position = 0; - stream.CopyTo(tStream); + await stream.CopyToAsync(tStream); - var task = new Task(_store.ConcatFileStreamAsync(tStream, tarKey, _domain, _key, _queue).Wait); + var task = new Task(() => + { + try + { + if (!_cts.Token.IsCancellationRequested) + { + _store.ConcatFileStreamAsync(tStream, tarKey, _domain, _key, _queue).Wait(); + } + } + catch + { + _cts.Cancel(); + throw; + } + }); _tasks.Add(task); task.Start(_scheduler); - return Task.CompletedTask; } public async ValueTask DisposeAsync()