diff --git a/src/Interpreters/AsynchronousInsertQueue.cpp b/src/Interpreters/AsynchronousInsertQueue.cpp index c7a39ad610bd..7d56dbabe3cf 100644 --- a/src/Interpreters/AsynchronousInsertQueue.cpp +++ b/src/Interpreters/AsynchronousInsertQueue.cpp @@ -214,44 +214,57 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo AsynchronousInsertQueue::~AsynchronousInsertQueue() { - LOG_TRACE(log, "Shutting down the asynchronous insertion queue"); - shutdown = true; - - for (size_t i = 0; i < pool_size; ++i) + try { - auto & shard = queue_shards[i]; - - shard.are_tasks_available.notify_one(); - assert(dump_by_first_update_threads[i].joinable()); - dump_by_first_update_threads[i].join(); + LOG_TRACE(log, "Shutting down the asynchronous insertion queue"); + shutdown = true; - if (flush_on_shutdown) - { - for (auto & [_, elem] : shard.queue) - scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i); - } - else + for (size_t i = 0; i < pool_size; ++i) { + auto & shard = queue_shards[i]; - for (auto & [_, elem] : shard.queue) - for (const auto & entry : elem.data->entries) - entry->finish(std::make_exception_ptr(Exception( - ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)"))); + shard.are_tasks_available.notify_one(); + assert(dump_by_first_update_threads[i].joinable()); + dump_by_first_update_threads[i].join(); + + if (flush_on_shutdown) + { + for (auto & [_, elem] : shard.queue) + scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext(), i); + } + else + { + for (auto & [_, elem] : shard.queue) + for (const auto & entry : elem.data->entries) + entry->finish( + std::make_exception_ptr(Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)"))); + } } - } - pool.wait(); - LOG_TRACE(log, "Asynchronous insertion queue finished"); + pool.wait(); + LOG_TRACE(log, "Asynchronous insertion queue finished"); + } + catch (...) + { + tryLogCurrentException(log); + pool.wait(); + } } void AsynchronousInsertQueue::scheduleDataProcessingJob( const InsertQuery & key, InsertDataPtr data, ContextPtr global_context, size_t shard_num) { + /// Intuitively it seems reasonable to process first inserted blocks first. + /// We add new chunks in the end of entries list, so they are automatically ordered by creation time + chassert(!data->entries.empty()); + const auto priority = Priority{data->entries.front()->create_time.time_since_epoch().count()}; + /// Wrap 'unique_ptr' with 'shared_ptr' to make this /// lambda copyable and allow to save it to the thread pool. pool.scheduleOrThrowOnError( [this, key, global_context, shard_num, my_data = std::make_shared(std::move(data))]() mutable - { processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); }); + { processData(key, std::move(*my_data), std::move(global_context), flush_time_history_per_queue_shard[shard_num]); }, + priority); } void AsynchronousInsertQueue::preprocessInsertQuery(const ASTPtr & query, const ContextPtr & query_context) @@ -375,6 +388,7 @@ AsynchronousInsertQueue::pushDataChunk(ASTPtr query, DataChunk chunk, ContextPtr assert(data); auto size_in_bytes = data->size_in_bytes; data->size_in_bytes += entry_data_size; + /// We rely on the fact that entries are being added to the list in order of creation time in `scheduleDataProcessingJob()` data->entries.emplace_back(entry); insert_future = entry->getFuture();