Skip to content

Commit

Permalink
Merge pull request ClickHouse#60885 from nickitat/async_inserts_small…
Browse files Browse the repository at this point in the history
…_tweaks

Small changes in async inserts code
  • Loading branch information
nickitat authored Mar 6, 2024
2 parents 6c0e9e1 + 4f6cb21 commit 4f6b1d0
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,44 +214,57 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo

AsynchronousInsertQueue::~AsynchronousInsertQueue()
{
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;

for (size_t i = 0; i < pool_size; ++i)
try
{
auto & shard = queue_shards[i];

shard.are_tasks_available.notify_one();
assert(dump_by_first_update_threads[i].joinable());
dump_by_first_update_threads[i].join();
LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;

if (flush_on_shutdown)
{
for (auto & [_, elem] : shard.queue)
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i);
}
else
for (size_t i = 0; i < pool_size; ++i)
{
auto & shard = queue_shards[i];

for (auto & [_, elem] : shard.queue)
for (const auto & entry : elem.data->entries)
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)")));
shard.are_tasks_available.notify_one();
assert(dump_by_first_update_threads[i].joinable());
dump_by_first_update_threads[i].join();

if (flush_on_shutdown)
{
for (auto & [_, elem] : shard.queue)
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i);
}
else
{
for (auto & [_, elem] : shard.queue)
for (const auto & entry : elem.data->entries)
entry->finish(
std::make_exception_ptr(Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)")));
}
}
}

pool.wait();
LOG_TRACE(log, "Asynchronous insertion queue finished");
pool.wait();
LOG_TRACE(log, "Asynchronous insertion queue finished");
}
catch (...)
{
tryLogCurrentException(log);
pool.wait();
}
}

void AsynchronousInsertQueue::scheduleDataProcessingJob(
const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num)
{
/// Intuitively it seems reasonable to process first inserted blocks first.
/// We add new chunks in the end of entries list, so they are automatically ordered by creation time
chassert(!data->entries.empty());
const auto priority = Priority{data->entries.front()->create_time.time_since_epoch().count()};

/// Wrap 'unique_ptr' with 'shared_ptr' to make this
/// lambda copyable and allow to save it to the thread pool.
pool.scheduleOrThrowOnError(
[this, key, global_context, shard_num, my_data = std::make_shared<InsertDataPtr>(std::move(data))]() mutable
{ processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); });
{ processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); },
priority);
}

void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context)
Expand Down Expand Up @@ -375,6 +388,7 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr
assert(data);
auto size_in_bytes = data->size_in_bytes;
data->size_in_bytes += entry_data_size;
/// We rely on the fact that entries are being added to the list in order of creation time in `scheduleDataProcessingJob()`
data->entries.emplace_back(entry);
insert_future = entry->getFuture();

Expand Down

0 comments on commit 4f6b1d0

Please sign in to comment.