Skip to content

Support for external data #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,33 @@ int main()
}
);

/// Select values inserted in the previous step using external data feature
/// See https://clickhouse.com/docs/engines/table-engines/special/external-data
{
Block block1, block2;
auto id = std::make_shared<ColumnUInt64>();
id->Append(1);
block1.AppendColumn("id" , id);

auto name = std::make_shared<ColumnString>();
name->Append("seven");
block2.AppendColumn("name", name);

const std::string _1 = "_1";
const std::string _2 = "_2";

const ExternalTables external = {{_1, block1}, {_2, block2}};
client.SelectWithExternalData("SELECT id, name FROM default.numbers where id in (_1) or name in (_2)",
external, [] (const Block& block)
{
for (size_t i = 0; i < block.GetRowCount(); ++i) {
std::cout << block[0]->As<ColumnUInt64>()->At(i) << " "
<< block[1]->As<ColumnString>()->At(i) << "\n";
}
}
);
}

/// Delete table.
client.Execute("DROP TABLE default.numbers");

Expand Down
88 changes: 76 additions & 12 deletions clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class Client::Impl {

void ExecuteQuery(Query query);

void SelectWithExternalData(Query query, const ExternalTables& external_tables);

void SendCancel();

void Insert(const std::string& table_name, const std::string& query_id, const Block& block);
Expand All @@ -174,10 +176,14 @@ class Client::Impl {

bool ReceivePacket(uint64_t* server_packet = nullptr);

void SendQuery(const Query& query);
void SendQuery(const Query& query, bool finalize = true);
void FinalizeQuery();

void SendData(const Block& block);

void SendBlockData(const Block& block);
void SendExternalData(const ExternalTables& external_tables);

bool SendHello();

bool ReadBlock(InputStream& input, Block* block);
Expand Down Expand Up @@ -291,6 +297,51 @@ void Client::Impl::ExecuteQuery(Query query) {
}
}


void Client::Impl::SelectWithExternalData(Query query, const ExternalTables& external_tables) {
if (server_info_.revision < DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
throw UnimplementedError("This version of ClickHouse server doesn't support temporary tables");
}

EnsureNull en(static_cast<QueryEvents*>(&query), &events_);

if (options_.ping_before_query) {
RetryGuard([this]() { Ping(); });
}

SendQuery(query, false);
SendExternalData(external_tables);
FinalizeQuery();

while (ReceivePacket()) {
;
}
}

void Client::Impl::SendBlockData(const Block& block) {
if (compression_ == CompressionState::Enable) {
std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size);

WriteBlock(block, buffered);
} else {
WriteBlock(block, *output_);
}
}

void Client::Impl::SendExternalData(const ExternalTables& external_tables) {
for (const auto& table: external_tables) {
if (!table.data.GetRowCount()) {
// skip empty blocks to keep the connection in the consistent state as the current request would be marked as finished by such an empty block
continue;
}
WireFormat::WriteFixed<uint8_t>(*output_, ClientCodes::Data);
WireFormat::WriteString(*output_, table.name);
SendBlockData(table.data);
}
}


std::string NameToQueryString(const std::string &input)
{
std::string output;
Expand Down Expand Up @@ -753,7 +804,7 @@ void Client::Impl::SendCancel() {
output_->Flush();
}

void Client::Impl::SendQuery(const Query& query) {
void Client::Impl::SendQuery(const Query& query, bool finalize) {
WireFormat::WriteUInt64(*output_, ClientCodes::Query);
WireFormat::WriteString(*output_, query.GetQueryID());

Expand Down Expand Up @@ -858,7 +909,13 @@ void Client::Impl::SendQuery(const Query& query) {
}
WireFormat::WriteString(*output_, std::string()); // empty string after last param
}

if (finalize) {
FinalizeQuery();
}
}

void Client::Impl::FinalizeQuery() {
// Send empty block as marker of
// end of data
SendData(Block());
Expand Down Expand Up @@ -905,16 +962,7 @@ void Client::Impl::SendData(const Block& block) {
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) {
WireFormat::WriteString(*output_, std::string());
}

if (compression_ == CompressionState::Enable) {

std::unique_ptr<OutputStream> compressed_output = std::make_unique<CompressedOutput>(output_.get(), options_.max_compression_chunk_size, options_.compression_method);
BufferedOutput buffered(std::move(compressed_output), options_.max_compression_chunk_size);

WriteBlock(block, buffered);
} else {
WriteBlock(block, *output_);
}
SendBlockData(block);

output_->Flush();
}
Expand Down Expand Up @@ -1077,6 +1125,22 @@ void Client::Select(const Query& query) {
Execute(query);
}

void Client::SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb) {
impl_->SelectWithExternalData(Query(query).OnData(std::move(cb)), external_tables);
}

void Client::SelectWithExternalData(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCallback cb) {
impl_->SelectWithExternalData(Query(query, query_id).OnData(std::move(cb)), external_tables);
}

void Client::SelectWithExternalDataCancelable(const std::string& query, const ExternalTables& external_tables, SelectCancelableCallback cb) {
impl_->SelectWithExternalData(Query(query).OnDataCancelable(std::move(cb)), external_tables);
}

void Client::SelectWithExternalDataCancelable(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCancelableCallback cb) {
impl_->SelectWithExternalData(Query(query, query_id).OnDataCancelable(std::move(cb)), external_tables);
}

void Client::Insert(const std::string& table_name, const Block& block) {
impl_->Insert(table_name, Query::default_query_id, block);
}
Expand Down
17 changes: 17 additions & 0 deletions clickhouse/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ std::ostream& operator<<(std::ostream& os, const Endpoint& options);

class SocketFactory;

struct ExternalTable {
const std::string_view name;
const Block& data;
};

using ExternalTables = std::vector<ExternalTable>;

/**
*
*/
Expand All @@ -248,6 +255,16 @@ class Client {
void SelectCancelable(const std::string& query, SelectCancelableCallback cb);
void SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb);

// The same as Select but with an external data
// required for the query, see https://clickhouse.com/docs/engines/table-engines/special/external-data
void SelectWithExternalData(const std::string& query, const ExternalTables& external_tables, SelectCallback cb);
void SelectWithExternalData(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCallback cb);

// The same as SelectWithExternalData but can be canceled by returning false from
// the data handler function \p cb.
void SelectWithExternalDataCancelable(const std::string& query, const ExternalTables& external_tables, SelectCancelableCallback cb);
void SelectWithExternalDataCancelable(const std::string& query, const std::string& query_id, const ExternalTables& external_tables, SelectCancelableCallback cb);

/// Alias for Execute.
void Select(const Query& query);

Expand Down