Skip to content

Commit

Permalink
Fix summary reporting with parallel replicas with LIMIT (ClickHouse#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
Algunenano authored Sep 4, 2023
1 parent a663f7e commit e192d4c
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 50 deletions.
6 changes: 3 additions & 3 deletions src/Client/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1013,8 +1013,8 @@ Packet Connection::receivePacket()
case Protocol::Server::ReadTaskRequest:
return res;

case Protocol::Server::MergeTreeAllRangesAnnounecement:
res.announcement = receiveInitialParallelReadAnnounecement();
case Protocol::Server::MergeTreeAllRangesAnnouncement:
res.announcement = receiveInitialParallelReadAnnouncement();
return res;

case Protocol::Server::MergeTreeReadTaskRequest:
Expand Down Expand Up @@ -1181,7 +1181,7 @@ ParallelReadRequest Connection::receiveParallelReadRequest() const
return ParallelReadRequest::deserialize(*in);
}

InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnounecement() const
InitialAllRangesAnnouncement Connection::receiveInitialParallelReadAnnouncement() const
{
return InitialAllRangesAnnouncement::deserialize(*in);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Client/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class Connection : public IServerConnection
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
ParallelReadRequest receiveParallelReadRequest() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnounecement() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnouncement() const;
ProfileInfo receiveProfileInfo() const;

void initInputBuffers();
Expand Down
4 changes: 2 additions & 2 deletions src/Client/MultiplexedConnections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
Expand Down Expand Up @@ -339,7 +339,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
switch (packet.type)
{
case Protocol::Server::TimezoneUpdate:
case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
Expand Down
4 changes: 2 additions & 2 deletions src/Core/Protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ namespace Protocol
/// This is such an inverted logic, where server sends requests
/// And client returns back response
ProfileEvents = 14, /// Packet with profile events from server.
MergeTreeAllRangesAnnounecement = 15,
MergeTreeAllRangesAnnouncement = 15,
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
MAX = TimezoneUpdate,
Expand Down Expand Up @@ -110,7 +110,7 @@ namespace Protocol
"PartUUIDs",
"ReadTaskRequest",
"ProfileEvents",
"MergeTreeAllRangesAnnounecement",
"MergeTreeAllRangesAnnouncement",
"MergeTreeReadTaskRequest",
"TimezoneUpdate",
};
Expand Down
21 changes: 21 additions & 0 deletions src/Processors/Executors/PipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,27 @@ void PipelineExecutor::finalizeExecution()
all_processors_finished = false;
break;
}
else if (node->processor && read_progress_callback)
{
/// Some executors might have reported progress as part of their finish() call
/// For example, when reading from parallel replicas the coordinator will cancel the queries as soon as it
/// enough data (on LIMIT), but as the progress report is asynchronous it might not be reported until the
/// connection is cancelled and all packets drained
/// To cover these cases we check if there is any pending progress in the processors to report
if (auto read_progress = node->processor->getReadProgress())
{
if (read_progress->counters.total_rows_approx)
read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);

if (read_progress->counters.total_bytes)
read_progress_callback->addTotalBytes(read_progress->counters.total_bytes);

/// We are finalizing the execution, so no need to call onProgress if there is nothing to report
if (read_progress->counters.read_rows || read_progress->counters.read_bytes)
read_progress_callback->onProgress(
read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits);
}
}
}

if (!all_processors_finished)
Expand Down
14 changes: 14 additions & 0 deletions src/Processors/ISource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ void ISource::progress(size_t read_rows, size_t read_bytes)
{
//std::cerr << "========= Progress " << read_rows << " from " << getName() << std::endl << StackTrace().toString() << std::endl;
read_progress_was_set = true;
std::lock_guard lock(read_progress_mutex);
read_progress.read_rows += read_rows;
read_progress.read_bytes += read_bytes;
}

std::optional<ISource::ReadProgress> ISource::getReadProgress()
{
std::lock_guard lock(read_progress_mutex);
if (finished && read_progress.read_bytes == 0 && read_progress.total_rows_approx == 0)
return {};

Expand All @@ -85,6 +87,18 @@ std::optional<ISource::ReadProgress> ISource::getReadProgress()
return ReadProgress{res_progress, empty_limits};
}

void ISource::addTotalRowsApprox(size_t value)
{
std::lock_guard lock(read_progress_mutex);
read_progress.total_rows_approx += value;
}

void ISource::addTotalBytes(size_t value)
{
std::lock_guard lock(read_progress_mutex);
read_progress.total_bytes += value;
}

void ISource::work()
{
try
Expand Down
10 changes: 7 additions & 3 deletions src/Processors/ISource.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

#include <Processors/IProcessor.h>

#include <atomic>
#include <mutex>


namespace DB
{

class ISource : public IProcessor
{
private:
std::mutex read_progress_mutex;
ReadProgressCounters read_progress;
bool read_progress_was_set = false;
std::atomic_bool read_progress_was_set = false;
bool auto_progress;

protected:
Expand Down Expand Up @@ -42,8 +46,8 @@ class ISource : public IProcessor
/// Default implementation for all the sources.
std::optional<ReadProgress> getReadProgress() final;

void addTotalRowsApprox(size_t value) { read_progress.total_rows_approx += value; }
void addTotalBytes(size_t value) { read_progress.total_bytes += value; }
void addTotalRowsApprox(size_t value);
void addTotalBytes(size_t value);
};

using SourcePtr = std::shared_ptr<ISource>;
Expand Down
80 changes: 46 additions & 34 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
processMergeTreeReadTaskRequest(packet.request.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);

case Protocol::Server::MergeTreeAllRangesAnnounecement:
case Protocol::Server::MergeTreeAllRangesAnnouncement:
chassert(packet.announcement.has_value());
processMergeTreeInitialReadAnnounecement(packet.announcement.value());
processMergeTreeInitialReadAnnouncement(packet.announcement.value());
return ReadResult(ReadResult::Type::ParallelReplicasToken);

case Protocol::Server::ReadTaskRequest:
Expand Down Expand Up @@ -568,7 +568,7 @@ void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest re
connections->sendMergeTreeReadTaskResponse(response);
}

void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRangesAnnouncement announcement)
{
if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
Expand Down Expand Up @@ -602,39 +602,51 @@ void RemoteQueryExecutor::finish()
return;

/// Get the remaining packets so that there is no out of sync in the connections to the replicas.
Packet packet = connections->drain();
switch (packet.type)
/// We do this manually instead of calling drain() because we want to process Log, ProfileEvents and Progress
/// packets that had been sent before the connection is fully finished in order to have final statistics of what
/// was executed in the remote queries
while (connections->hasActiveConnections() && !finished)
{
case Protocol::Server::EndOfStream:
finished = true;
break;

case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;

case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;

case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;
Packet packet = connections->receivePacket();

case Protocol::Server::TimezoneUpdate:
break;

default:
got_unknown_packet_from_replica = true;
throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
toString(packet.type),
connections->dumpAddresses());
switch (packet.type)
{
case Protocol::Server::EndOfStream:
finished = true;
break;

case Protocol::Server::Exception:
got_exception_from_replica = true;
packet.exception->rethrow();
break;

case Protocol::Server::Log:
/// Pass logs from remote server to client
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
log_queue->pushBlock(std::move(packet.block));
break;

case Protocol::Server::ProfileEvents:
/// Pass profile events from remote server to client
if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
if (!profile_queue->emplace(std::move(packet.block)))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
break;

case Protocol::Server::ProfileInfo:
/// Use own (client-side) info about read bytes, it is more correct info than server-side one.
if (profile_info_callback)
profile_info_callback(packet.profile_info);
break;

case Protocol::Server::Progress:
if (progress_callback)
progress_callback(packet.progress);
break;

default:
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class RemoteQueryExecutor
void processReadTaskRequest();

void processMergeTreeReadTaskRequest(ParallelReadRequest request);
void processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement);
void processMergeTreeInitialReadAnnouncement(InitialAllRangesAnnouncement announcement);

/// Cancel query and restart it with info about duplicate UUIDs
/// only for `allow_experimental_query_deduplication`.
Expand Down
6 changes: 3 additions & 3 deletions src/Server/TCPHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ void TCPHandler::runImpl()
if (state.cancellation_status == CancellationStatus::FULLY_CANCELLED)
return;

sendMergeTreeAllRangesAnnounecementAssumeLocked(announcement);
sendMergeTreeAllRangesAnnouncementAssumeLocked(announcement);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSent);
ProfileEvents::increment(ProfileEvents::MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, watch.elapsedMicroseconds());
});
Expand Down Expand Up @@ -1044,9 +1044,9 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
}


void TCPHandler::sendMergeTreeAllRangesAnnounecementAssumeLocked(InitialAllRangesAnnouncement announcement)
void TCPHandler::sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement)
{
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnounecement, *out);
writeVarUInt(Protocol::Server::MergeTreeAllRangesAnnouncement, *out);
announcement.serialize(*out);
out->next();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Server/TCPHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class TCPHandler : public Poco::Net::TCPServerConnection
void sendEndOfStream();
void sendPartUUIDs();
void sendReadTaskRequestAssumeLocked();
void sendMergeTreeAllRangesAnnounecementAssumeLocked(InitialAllRangesAnnouncement announcement);
void sendMergeTreeAllRangesAnnouncementAssumeLocked(InitialAllRangesAnnouncement announcement);
void sendMergeTreeReadTaskRequestAssumeLocked(ParallelReadRequest request);
void sendProfileInfo(const ProfileInfo & info);
void sendTotals(const Block & totals);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
1
1
02841_summary_default_interactive_0 2
02841_summary_default_interactive_high 2
61 changes: 61 additions & 0 deletions tests/queries/0_stateless/02841_parallel_replicas_summary.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env bash

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

function involved_parallel_replicas () {
# Not using current_database = '$CLICKHOUSE_DATABASE' as nested parallel queries aren't run with it
$CLICKHOUSE_CLIENT --query "
SELECT
initial_query_id,
(count() - 2) / 2 as number_of_parallel_replicas
FROM system.query_log
WHERE event_date >= yesterday()
AND initial_query_id LIKE '$1%'
GROUP BY initial_query_id
ORDER BY min(event_time_microseconds) ASC
FORMAT TSV"
}

$CLICKHOUSE_CLIENT --query "CREATE TABLE replicas_summary (n Int64) ENGINE = MergeTree() ORDER BY n AS Select * from numbers(100_000)"

# Note that we are not verifying the exact read rows and bytes (apart from not being 0) for 2 reasons:
# - Different block sizes lead to different read rows
# - Depending on how fast the replicas are they might need data that ends up being discarded because the coordinator
# already has enough (but it has been read in parallel, so it's reported).

query_id_base="02841_summary_$CLICKHOUSE_DATABASE"

echo "
SELECT *
FROM replicas_summary
LIMIT 100
SETTINGS
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 1,
parallel_replicas_for_non_replicated_merge_tree = 1,
use_hedged_requests = 0,
interactive_delay=0
"\
| ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&wait_end_of_query=1&query_id=${query_id_base}_interactive_0" --data-binary @- -vvv 2>&1 \
| grep "Summary" | grep -cv '"read_rows":"0"'

echo "
SELECT *
FROM replicas_summary
LIMIT 100
SETTINGS
max_parallel_replicas = 2,
cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost',
allow_experimental_parallel_reading_from_replicas = 1,
parallel_replicas_for_non_replicated_merge_tree = 1,
use_hedged_requests = 0,
interactive_delay=99999999999
"\
| ${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&wait_end_of_query=1&query_id=${query_id_base}_interactive_high" --data-binary @- -vvv 2>&1 \
| grep "Summary" | grep -cv '"read_rows":"0"'

$CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS"
involved_parallel_replicas "${query_id_base}"

0 comments on commit e192d4c

Please sign in to comment.