Skip to content

Commit

Permalink
s3 refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
SuhorukovAnton committed Dec 11, 2023
1 parent dfd244e commit 0c7805a
Showing 1 changed file with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class S3TarWriteOperator : IDataWriteOperator
private readonly List<Task> _tasks = new();
private readonly TaskScheduler _scheduler = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Limit).ConcurrentScheduler;
private readonly ConcurrentQueue<int> _queue = new();
private readonly CancellationTokenSource _cts = new();

public string Hash { get; private set; }
public string StoragePath { get; private set; }
Expand Down Expand Up @@ -76,7 +77,21 @@ public async Task WriteEntryAsync(string tarKey, string domain, string path, IDa
if (store is S3Storage s3Store)
{
var fullPath = s3Store.MakePath(domain, path);
var task = new Task(_store.ConcatFileAsync(fullPath, tarKey, _domain, _key, _queue).Wait);
var task = new Task(() =>
{
try
{
if (!_cts.Token.IsCancellationRequested)
{
_store.ConcatFileAsync(fullPath, tarKey, _domain, _key, _queue).Wait();
}
}
catch
{
_cts.Cancel();
throw;
}
});
_tasks.Add(task);
task.Start(_scheduler);
}
Expand All @@ -92,16 +107,29 @@ public async Task WriteEntryAsync(string tarKey, string domain, string path, IDa
}
}

public Task WriteEntryAsync(string tarKey, Stream stream)
public async Task WriteEntryAsync(string tarKey, Stream stream)
{
var tStream = _tempStream.Create();
stream.Position = 0;
stream.CopyTo(tStream);
await stream.CopyToAsync(tStream);

var task = new Task(_store.ConcatFileStreamAsync(tStream, tarKey, _domain, _key, _queue).Wait);
var task = new Task(() =>
{
try
{
if (!_cts.Token.IsCancellationRequested)
{
_store.ConcatFileStreamAsync(tStream, tarKey, _domain, _key, _queue).Wait();
}
}
catch
{
_cts.Cancel();
throw;
}
});
_tasks.Add(task);
task.Start(_scheduler);
return Task.CompletedTask;
}

public async ValueTask DisposeAsync()
Expand Down

0 comments on commit 0c7805a

Please sign in to comment.