From 34fad9276fdbdcc300f1ca08da3bc6d11b2e9d1b Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 11:38:10 +0200 Subject: [PATCH 1/9] Implemented setting query_id for any query. Added tests for query_id Added pretty-printing for the Block Added more tests + minor fixes --- clickhouse/block.cpp | 16 ++++- clickhouse/block.h | 31 +++++++- clickhouse/client.cpp | 134 +++++++++++++++++++---------------- clickhouse/client.h | 3 + clickhouse/columns/array.cpp | 3 +- clickhouse/query.cpp | 8 ++- clickhouse/query.h | 15 ++-- tests/simple/CMakeLists.txt | 4 +- tests/simple/main.cpp | 2 +- ut/CMakeLists.txt | 1 + ut/block_ut.cpp | 85 ++++++++++++++++++++++ ut/client_ut.cpp | 49 ++++++++++--- ut/columns_ut.cpp | 13 ++++ ut/readonly_client_test.cpp | 2 +- ut/socket_ut.cpp | 55 +++++++++----- ut/tcp_server.cpp | 9 ++- ut/tcp_server.h | 7 +- ut/types_ut.cpp | 1 + ut/utils.cpp | 87 +++++++++++++++++++++++ ut/utils.h | 9 +++ 20 files changed, 426 insertions(+), 108 deletions(-) create mode 100644 ut/block_ut.cpp diff --git a/clickhouse/block.cpp b/clickhouse/block.cpp index 685173c9..c4ddb855 100644 --- a/clickhouse/block.cpp +++ b/clickhouse/block.cpp @@ -10,6 +10,11 @@ Block::Iterator::Iterator(const Block& block) { } +Block::Iterator::Iterator(const Block& block, Block::Iterator::ConstructAtEndTag /*at_end*/) + : block_(block) + , idx_(block.GetColumnCount()) +{} + const std::string& Block::Iterator::Name() const { return block_.columns_[idx_].name; } @@ -22,8 +27,9 @@ ColumnRef Block::Iterator::Column() const { return block_.columns_[idx_].column; } -void Block::Iterator::Next() { +bool Block::Iterator::Next() { ++idx_; + return IsValid(); } bool Block::Iterator::IsValid() const { @@ -95,4 +101,12 @@ ColumnRef Block::operator [] (size_t idx) const { throw std::out_of_range("column index is out of range. Index: ["+std::to_string(idx)+"], columns: [" + std::to_string(columns_.size())+"]"); } +Block::Iterator Block::begin() const { + return Iterator(*this); +} + +Block::Iterator Block::end() const { + return Iterator(*this, Iterator::ConstructAtEndTag{}); +} + } diff --git a/clickhouse/block.h b/clickhouse/block.h index b2b2d14e..a647f12d 100644 --- a/clickhouse/block.h +++ b/clickhouse/block.h @@ -25,13 +25,35 @@ class Block { /// Reference to column object. ColumnRef Column() const; - /// Move to next column. - void Next(); + /// Move to next column, returns false if next call to IsValid() would return false; + bool Next(); /// Is the iterator still valid. bool IsValid() const; + size_t ColumnIndex() const { + return idx_; + } + + Iterator& operator*() { return *this; } + const Iterator& operator*() const { return *this; } + + bool operator==(const Iterator & other) const { + return &block_ == &other.block_ && idx_ == other.idx_; + } + bool operator!=(const Iterator & other) const { + return !(*this == other); + } + + Iterator& operator++() { + this->Next(); + return *this; + } + private: + friend class Block; + struct ConstructAtEndTag {}; + Iterator(const Block& block, ConstructAtEndTag at_end); Iterator() = delete; const Block& block_; @@ -63,6 +85,11 @@ class Block { /// Reference to column by index in the block. ColumnRef operator [] (size_t idx) const; + Iterator begin() const; + Iterator end() const; + Iterator cbegin() const { return begin(); } + Iterator cend() const { return end(); } + private: struct ColumnItem { std::string name; diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 3b763fb4..958f9fec 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -116,7 +116,7 @@ class Client::Impl { void SendCancel(); - void Insert(const std::string& table_name, const Block& block); + void Insert(const std::string& table_name, const std::string& query_id, const Block& block); void Ping(); @@ -129,7 +129,7 @@ class Client::Impl { bool ReceivePacket(uint64_t* server_packet = nullptr); - void SendQuery(const std::string& query); + void SendQuery(const std::string& query, const std::string& query_id); void SendData(const Block& block); @@ -232,7 +232,7 @@ void Client::Impl::ExecuteQuery(Query query) { RetryGuard([this]() { Ping(); }); } - SendQuery(query.GetText()); + SendQuery(query.GetText(), query.GetQueryID()); while (ReceivePacket()) { ; @@ -258,13 +258,13 @@ std::string NameToQueryString(const std::string &input) return output; } -void Client::Impl::Insert(const std::string& table_name, const Block& block) { +void Client::Impl::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { if (options_.ping_before_query) { RetryGuard([this]() { Ping(); }); } std::stringstream fields_section; - const auto num_columns = block.GetColumnCount(); + const auto num_columns = block.GetColumnCount(); for (unsigned int i = 0; i < num_columns; ++i) { if (i == num_columns - 1) { @@ -274,7 +274,7 @@ void Client::Impl::Insert(const std::string& table_name, const Block& block) { } } - SendQuery("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES"); + SendQuery("INSERT INTO " + table_name + " ( " + fields_section.str() + " ) VALUES", query_id); uint64_t server_packet; // Receive data packet. @@ -371,22 +371,22 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { case ServerCodes::ProfileInfo: { Profile profile; - if (!WireFormat::ReadUInt64(*input_, &profile.rows)) { + if (!WireFormat::ReadUInt64(*input_, &profile.rows)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &profile.blocks)) { + if (!WireFormat::ReadUInt64(*input_, &profile.blocks)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &profile.bytes)) { + if (!WireFormat::ReadUInt64(*input_, &profile.bytes)) { return false; } - if (!WireFormat::ReadFixed(*input_, &profile.applied_limit)) { + if (!WireFormat::ReadFixed(*input_, &profile.applied_limit)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &profile.rows_before_limit)) { + if (!WireFormat::ReadUInt64(*input_, &profile.rows_before_limit)) { return false; } - if (!WireFormat::ReadFixed(*input_, &profile.calculated_rows_before_limit)) { + if (!WireFormat::ReadFixed(*input_, &profile.calculated_rows_before_limit)) { return false; } @@ -400,14 +400,14 @@ bool Client::Impl::ReceivePacket(uint64_t* server_packet) { case ServerCodes::Progress: { Progress info; - if (!WireFormat::ReadUInt64(*input_, &info.rows)) { + if (!WireFormat::ReadUInt64(*input_, &info.rows)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &info.bytes)) { + if (!WireFormat::ReadUInt64(*input_, &info.bytes)) { return false; } if (REVISION >= DBMS_MIN_REVISION_WITH_TOTAL_ROWS_IN_PROGRESS) { - if (!WireFormat::ReadUInt64(*input_, &info.total_rows)) { + if (!WireFormat::ReadUInt64(*input_, &info.total_rows)) { return false; } } @@ -489,7 +489,7 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) { if (ColumnRef col = CreateColumnByType(type, create_column_settings)) { if (num_rows && !col->Load(&input, num_rows)) { - throw std::runtime_error("can't load"); + throw std::runtime_error("can't load column '" + name + "' of type " + type); } block->AppendColumn(name, col); @@ -539,23 +539,23 @@ bool Client::Impl::ReceiveException(bool rethrow) { do { bool has_nested = false; - if (!WireFormat::ReadFixed(*input_, ¤t->code)) { + if (!WireFormat::ReadFixed(*input_, ¤t->code)) { exception_received = false; break; } - if (!WireFormat::ReadString(*input_, ¤t->name)) { + if (!WireFormat::ReadString(*input_, ¤t->name)) { exception_received = false; break; } - if (!WireFormat::ReadString(*input_, ¤t->display_text)) { + if (!WireFormat::ReadString(*input_, ¤t->display_text)) { exception_received = false; break; } - if (!WireFormat::ReadString(*input_, ¤t->stack_trace)) { + if (!WireFormat::ReadString(*input_, ¤t->stack_trace)) { exception_received = false; break; } - if (!WireFormat::ReadFixed(*input_, &has_nested)) { + if (!WireFormat::ReadFixed(*input_, &has_nested)) { exception_received = false; break; } @@ -580,13 +580,13 @@ bool Client::Impl::ReceiveException(bool rethrow) { } void Client::Impl::SendCancel() { - WireFormat::WriteUInt64(*output_, ClientCodes::Cancel); + WireFormat::WriteUInt64(*output_, ClientCodes::Cancel); output_->Flush(); } -void Client::Impl::SendQuery(const std::string& query) { - WireFormat::WriteUInt64(*output_, ClientCodes::Query); - WireFormat::WriteString(*output_, std::string()); +void Client::Impl::SendQuery(const std::string& query, const std::string& query_id) { + WireFormat::WriteUInt64(*output_, ClientCodes::Query); + WireFormat::WriteString(*output_, query_id); /// Client info. if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) { @@ -599,23 +599,23 @@ void Client::Impl::SendQuery(const std::string& query) { info.client_revision = REVISION; - WireFormat::WriteFixed(*output_, info.query_kind); - WireFormat::WriteString(*output_, info.initial_user); - WireFormat::WriteString(*output_, info.initial_query_id); - WireFormat::WriteString(*output_, info.initial_address); - WireFormat::WriteFixed(*output_, info.iface_type); + WireFormat::WriteFixed(*output_, info.query_kind); + WireFormat::WriteString(*output_, info.initial_user); + WireFormat::WriteString(*output_, info.initial_query_id); + WireFormat::WriteString(*output_, info.initial_address); + WireFormat::WriteFixed(*output_, info.iface_type); - WireFormat::WriteString(*output_, info.os_user); - WireFormat::WriteString(*output_, info.client_hostname); - WireFormat::WriteString(*output_, info.client_name); - WireFormat::WriteUInt64(*output_, info.client_version_major); - WireFormat::WriteUInt64(*output_, info.client_version_minor); - WireFormat::WriteUInt64(*output_, info.client_revision); + WireFormat::WriteString(*output_, info.os_user); + WireFormat::WriteString(*output_, info.client_hostname); + WireFormat::WriteString(*output_, info.client_name); + WireFormat::WriteUInt64(*output_, info.client_version_major); + WireFormat::WriteUInt64(*output_, info.client_version_minor); + WireFormat::WriteUInt64(*output_, info.client_revision); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO) - WireFormat::WriteString(*output_, info.quota_key); + WireFormat::WriteString(*output_, info.quota_key); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { - WireFormat::WriteUInt64(*output_, info.client_version_patch); + WireFormat::WriteUInt64(*output_, info.client_version_patch); } } @@ -623,11 +623,11 @@ void Client::Impl::SendQuery(const std::string& query) { //if (settings) // settings->serialize(*out); //else - WireFormat::WriteString(*output_, std::string()); + WireFormat::WriteString(*output_, std::string()); - WireFormat::WriteUInt64(*output_, Stages::Complete); - WireFormat::WriteUInt64(*output_, compression_); - WireFormat::WriteString(*output_, query); + WireFormat::WriteUInt64(*output_, Stages::Complete); + WireFormat::WriteUInt64(*output_, compression_); + WireFormat::WriteString(*output_, query); // Send empty block as marker of // end of data SendData(Block()); @@ -659,10 +659,10 @@ void Client::Impl::WriteBlock(const Block& block, OutputStream& output) { } void Client::Impl::SendData(const Block& block) { - WireFormat::WriteUInt64(*output_, ClientCodes::Data); + WireFormat::WriteUInt64(*output_, ClientCodes::Data); if (server_info_.revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES) { - WireFormat::WriteString(*output_, std::string()); + WireFormat::WriteString(*output_, std::string()); } if (compression_ == CompressionState::Enable) { @@ -689,14 +689,14 @@ void Client::Impl::InitializeStreams(std::unique_ptr&& socket) { } bool Client::Impl::SendHello() { - WireFormat::WriteUInt64(*output_, ClientCodes::Hello); - WireFormat::WriteString(*output_, std::string(DBMS_NAME) + " client"); - WireFormat::WriteUInt64(*output_, DBMS_VERSION_MAJOR); - WireFormat::WriteUInt64(*output_, DBMS_VERSION_MINOR); - WireFormat::WriteUInt64(*output_, REVISION); - WireFormat::WriteString(*output_, options_.default_database); - WireFormat::WriteString(*output_, options_.user); - WireFormat::WriteString(*output_, options_.password); + WireFormat::WriteUInt64(*output_, ClientCodes::Hello); + WireFormat::WriteString(*output_, std::string(DBMS_NAME) + " client"); + WireFormat::WriteUInt64(*output_, DBMS_VERSION_MAJOR); + WireFormat::WriteUInt64(*output_, DBMS_VERSION_MINOR); + WireFormat::WriteUInt64(*output_, REVISION); + WireFormat::WriteString(*output_, options_.default_database); + WireFormat::WriteString(*output_, options_.user); + WireFormat::WriteString(*output_, options_.password); output_->Flush(); @@ -706,38 +706,38 @@ bool Client::Impl::SendHello() { bool Client::Impl::ReceiveHello() { uint64_t packet_type = 0; - if (!WireFormat::ReadVarint64(*input_, &packet_type)) { + if (!WireFormat::ReadVarint64(*input_, &packet_type)) { return false; } if (packet_type == ServerCodes::Hello) { - if (!WireFormat::ReadString(*input_, &server_info_.name)) { + if (!WireFormat::ReadString(*input_, &server_info_.name)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &server_info_.version_major)) { + if (!WireFormat::ReadUInt64(*input_, &server_info_.version_major)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &server_info_.version_minor)) { + if (!WireFormat::ReadUInt64(*input_, &server_info_.version_minor)) { return false; } - if (!WireFormat::ReadUInt64(*input_, &server_info_.revision)) { + if (!WireFormat::ReadUInt64(*input_, &server_info_.revision)) { return false; } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE) { - if (!WireFormat::ReadString(*input_, &server_info_.timezone)) { + if (!WireFormat::ReadString(*input_, &server_info_.timezone)) { return false; } } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME) { - if (!WireFormat::ReadString(*input_, &server_info_.display_name)) { + if (!WireFormat::ReadString(*input_, &server_info_.display_name)) { return false; } } if (server_info_.revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH) { - if (!WireFormat::ReadUInt64(*input_, &server_info_.version_patch)) { + if (!WireFormat::ReadUInt64(*input_, &server_info_.version_patch)) { return false; } } @@ -797,16 +797,28 @@ void Client::Select(const std::string& query, SelectCallback cb) { Execute(Query(query).OnData(cb)); } +void Client::Select(const std::string& query, const std::string& query_id, SelectCallback cb) { + Execute(Query(query, query_id).OnData(cb)); +} + void Client::SelectCancelable(const std::string& query, SelectCancelableCallback cb) { Execute(Query(query).OnDataCancelable(cb)); } +void Client::SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb) { + Execute(Query(query, query_id).OnDataCancelable(cb)); +} + void Client::Select(const Query& query) { Execute(query); } void Client::Insert(const std::string& table_name, const Block& block) { - impl_->Insert(table_name, block); + impl_->Insert(table_name, Query::default_query_id, block); +} + +void Client::Insert(const std::string& table_name, const std::string& query_id, const Block& block) { + impl_->Insert(table_name, query_id, block); } void Client::Ping() { diff --git a/clickhouse/client.h b/clickhouse/client.h index 91f3d082..7f2b97dd 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -209,16 +209,19 @@ class Client { /// Intends for execute select queries. Data will be returned with /// one or more call of \p cb. void Select(const std::string& query, SelectCallback cb); + void Select(const std::string& query, const std::string& query_id, SelectCallback cb); /// Executes a select query which can be canceled by returning false from /// the data handler function \p cb. void SelectCancelable(const std::string& query, SelectCancelableCallback cb); + void SelectCancelable(const std::string& query, const std::string& query_id, SelectCancelableCallback cb); /// Alias for Execute. void Select(const Query& query); /// Intends for insert block of data into a table \p table_name. void Insert(const std::string& table_name, const Block& block); + void Insert(const std::string& table_name, const std::string& query_id, const Block& block); /// Ping server for aliveness. void Ping(); diff --git a/clickhouse/columns/array.cpp b/clickhouse/columns/array.cpp index e9d5a4d3..a83ba045 100644 --- a/clickhouse/columns/array.cpp +++ b/clickhouse/columns/array.cpp @@ -35,8 +35,7 @@ ColumnRef ColumnArray::Slice(size_t begin, size_t size) const { auto result = std::make_shared(GetAsColumn(begin)); result->OffsetsIncrease(1); - for (size_t i = 1; i < size; i++) - { + for (size_t i = 1; i < size; i++) { result->Append(std::make_shared(GetAsColumn(begin + i))); } diff --git a/clickhouse/query.cpp b/clickhouse/query.cpp index 89a9037b..3986064c 100644 --- a/clickhouse/query.cpp +++ b/clickhouse/query.cpp @@ -2,16 +2,20 @@ namespace clickhouse { +const std::string Query::default_query_id = {}; + Query::Query() { } -Query::Query(const char* query) +Query::Query(const char* query, const char* query_id) : query_(query) + , query_id_(query_id ? std::string(query_id): default_query_id) { } -Query::Query(const std::string& query) +Query::Query(const std::string& query, const std::string& query_id) : query_(query) + , query_id_(query_id) { } diff --git a/clickhouse/query.h b/clickhouse/query.h index c9d7e68e..e9e12856 100644 --- a/clickhouse/query.h +++ b/clickhouse/query.h @@ -88,15 +88,19 @@ using SelectCancelableCallback = std::function; class Query : public QueryEvents { public: Query(); - Query(const char* query); - Query(const std::string& query); + Query(const char* query, const char* query_id = nullptr); + Query(const std::string& query, const std::string& query_id = default_query_id); ~Query(); /// - inline std::string GetText() const { + inline const std::string& GetText() const { return query_; } + inline const std::string& GetQueryID() const { + return query_id_; + } + /// Set handler for receiving result data. inline Query& OnData(SelectCallback cb) { select_cb_ = cb; @@ -121,6 +125,8 @@ class Query : public QueryEvents { return *this; } + static const std::string default_query_id; + private: void OnData(const Block& block) override { if (select_cb_) { @@ -156,7 +162,8 @@ class Query : public QueryEvents { } private: - std::string query_; + const std::string query_; + const std::string query_id_; ExceptionCallback exception_cb_; ProgressCallback progress_cb_; SelectCallback select_cb_; diff --git a/tests/simple/CMakeLists.txt b/tests/simple/CMakeLists.txt index e562f1db..cd102ec3 100644 --- a/tests/simple/CMakeLists.txt +++ b/tests/simple/CMakeLists.txt @@ -1,10 +1,10 @@ ADD_EXECUTABLE (simple-test ../../ut/utils.cpp - main.cpp + main.cpp ) TARGET_LINK_LIBRARIES (simple-test - clickhouse-cpp-lib-static + clickhouse-cpp-lib-static ) IF (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 470e9e4e..51340a86 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -225,7 +225,7 @@ inline void GenericExample(Client& client) { /// Select values inserted in the previous step. client.Select("SELECT id, name FROM test_client", [](const Block& block) { - PrintBlock(block); + std::cout << PrettyPrintBlock{block} << std::endl; } ); diff --git a/ut/CMakeLists.txt b/ut/CMakeLists.txt index bdb9bc98..baa72b91 100644 --- a/ut/CMakeLists.txt +++ b/ut/CMakeLists.txt @@ -1,6 +1,7 @@ SET ( clickhouse-cpp-ut-src main.cpp + block_ut.cpp client_ut.cpp columns_ut.cpp itemview_ut.cpp diff --git a/ut/block_ut.cpp b/ut/block_ut.cpp new file mode 100644 index 00000000..3828cb65 --- /dev/null +++ b/ut/block_ut.cpp @@ -0,0 +1,85 @@ +#include +#include "readonly_client_test.h" +#include "connection_failed_client_test.h" +#include "utils.h" + +#include + +namespace { +using namespace clickhouse; + +Block MakeBlock(std::vector> columns) { + Block result; + + for (const auto & name_and_col : columns) { + result.AppendColumn(name_and_col.first, name_and_col.second); + } + + result.RefreshRowCount(); + return result; +} + +} + +TEST(BlockTest, Iterator) { + const auto block = MakeBlock({ + {"foo", std::make_shared(std::vector{1, 2, 3, 4, 5})}, + {"bar", std::make_shared(std::vector{"1", "2", "3", "4", "5"})}, + }); + const char* names[] = {"foo", "bar"}; + + ASSERT_EQ(2u, block.GetColumnCount()); + ASSERT_EQ(5u, block.GetRowCount()); + + size_t col_index = 0; + Block::Iterator i(block); + while (i.IsValid()) { + + const auto& name_and_col = i; + ASSERT_EQ(col_index, name_and_col.ColumnIndex()); + ASSERT_EQ(block[col_index].get(), name_and_col.Column().get()); + ASSERT_EQ(names[col_index], name_and_col.Name()); + + i.Next(); + ++col_index; + } +} + +TEST(BlockTest, RangeBasedForLoop) { + const auto block = MakeBlock({ + {"foo", std::make_shared(std::vector{1, 2, 3, 4, 5})}, + {"bar", std::make_shared(std::vector{"1", "2", "3", "4", "5"})}, + }); + const char* names[] = {"foo", "bar"}; + + ASSERT_EQ(2u, block.GetColumnCount()); + ASSERT_EQ(5u, block.GetRowCount()); + + size_t col_index = 0; + for (const auto & name_and_col : block) { + ASSERT_EQ(col_index, name_and_col.ColumnIndex()); + ASSERT_EQ(block[col_index].get(), name_and_col.Column().get()); + ASSERT_EQ(names[col_index], name_and_col.Name()); + ++col_index; + } +} + +TEST(BlockTest, Iterators) { + Block block; + // Empty block, all iterators point to 'end' + ASSERT_EQ(block.begin(), block.cbegin()); + ASSERT_EQ(block.end(), block.cend()); + ASSERT_EQ(block.begin(), block.end()); + + block = MakeBlock({ + {"foo", std::make_shared(std::vector{1, 2, 3, 4, 5})}, + {"bar", std::make_shared(std::vector{"1", "2", "3", "4", "5"})}, + }); + + // Non-empty block + ASSERT_EQ(block.begin(), block.cbegin()); + ASSERT_EQ(block.end(), block.cend()); + ASSERT_NE(block.begin(), block.end()); + ASSERT_NE(block.cbegin(), block.cend()); +} + diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 0c8c8a6c..f8a9ecb7 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -52,13 +52,9 @@ class ClientCase : public testing::TestWithParam { protected: void SetUp() override { client_ = std::make_unique(GetParam()); - // client_->Execute("CREATE DATABASE IF NOT EXISTS test_clickhouse_cpp"); } - void TearDown() override { - //if (client_) - // client_->Execute("DROP DATABASE test_clickhouse_cpp"); - } + void TearDown() override {} template std::shared_ptr createTableWithOneColumn(Block & block) @@ -429,8 +425,7 @@ TEST_P(ClientCase, Numbers) { TEST_P(ClientCase, SimpleAggregateFunction) { const auto & server_info = client_->GetServerInfo(); if (versionNumber(server_info) < versionNumber(19, 9)) { - std::cout << "Test is skipped since server '" << server_info << "' does not support SimpleAggregateFunction" << std::endl; - return; + GTEST_SKIP() << "Test is skipped since server '" << server_info << "' does not support SimpleAggregateFunction" << std::endl; } client_->Execute("DROP TEMPORARY TABLE IF EXISTS test_clickhouse_cpp_SimpleAggregateFunction"); @@ -806,8 +801,7 @@ TEST_P(ClientCase, ColEscapeNameTest) { TEST_P(ClientCase, DateTime64) { const auto & server_info = client_->GetServerInfo(); if (versionNumber(server_info) < versionNumber(20, 1)) { - std::cout << "Test is skipped since server '" << server_info << "' does not support DateTime64" << std::endl; - return; + GTEST_SKIP() << "Test is skipped since server '" << server_info << "' does not support DateTime64" << std::endl; } Block block; @@ -866,6 +860,43 @@ TEST_P(ClientCase, DateTime64) { ASSERT_EQ(total_rows, data.size()); } +TEST_P(ClientCase, Query_ID) { + const auto server_info = client_->GetServerInfo(); + + std::srand(std::time(nullptr) + reinterpret_cast(&server_info)); + const auto * test_info = ::testing::UnitTest::GetInstance()->current_test_info(); + const std::string query_id = std::to_string(std::rand()) + "-" + test_info->test_suite_name() + "/" + test_info->name(); + + SCOPED_TRACE(query_id); + + const std::string table_name = "test_clickhouse_cpp_query_id_test"; + client_->Execute(Query("CREATE TEMPORARY TABLE IF NOT EXISTS " + table_name + " (a Int64)", query_id)); + + { + Block b; + b.AppendColumn("a", std::make_shared(std::vector{1, 2, 3})); + client_->Insert(table_name, query_id, b); + } + + client_->Select("SELECT 'a', count(*) FROM " + table_name, query_id, [](const Block &) {}); + client_->SelectCancelable("SELECT 'b', count(*) FROM " + table_name, query_id, [](const Block &) { return true; }); + client_->Execute(Query("TRUNCATE TABLE " + table_name, query_id)); + + client_->Execute("SYSTEM FLUSH LOGS"); + + size_t total_count = 0; + client_->Select("SELECT type, query_kind, query_id, query " + " FROM system.query_log " + " WHERE type = 'QueryStart' AND query_id == '" + query_id +"'", + [&total_count](const Block & block) { + total_count += block.GetRowCount(); + std::cerr << PrettyPrintBlock{block} << std::endl; + }); + + // We've executed 5 queries with explicit query_id, hence we expect to see 5 entries in logs. + EXPECT_EQ(5u, total_count); +} + const auto LocalHostEndpoint = ClientOptions() .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) diff --git a/ut/columns_ut.cpp b/ut/columns_ut.cpp index cb8d6d76..125ebbab 100644 --- a/ut/columns_ut.cpp +++ b/ut/columns_ut.cpp @@ -952,6 +952,19 @@ TEST(ColumnsCase, LowCardinalityAsWrappedColumn) { ASSERT_EQ(Type::FixedString, CreateColumnByType("LowCardinality(FixedString(10000))", create_column_settings)->As()->GetType().GetCode()); } +TEST(ColumnsCase, ArrayOfDecimal) { + auto column = std::make_shared(18, 10); + auto array = std::make_shared(column->Slice(0, 0)); + + column->Append("1"); + column->Append("2"); + EXPECT_EQ(2u, column->Size()); + + array->AppendAsColumn(column); + ASSERT_EQ(1u, array->Size()); + EXPECT_EQ(2u, array->GetAsColumn(0)->Size()); +} + class ColumnsCaseWithName : public ::testing::TestWithParam {}; diff --git a/ut/readonly_client_test.cpp b/ut/readonly_client_test.cpp index 181a25de..a1963424 100644 --- a/ut/readonly_client_test.cpp +++ b/ut/readonly_client_test.cpp @@ -31,7 +31,7 @@ TEST_P(ReadonlyClientTest, Select) { std::cout << query << " => " << "\n\trows: " << block.GetRowCount() << ", columns: " << block.GetColumnCount() - << ", data:\n\t" << block << std::endl; + << ", data:\n" << PrettyPrintBlock{block} << std::endl; } ); } diff --git a/ut/socket_ut.cpp b/ut/socket_ut.cpp index 4523387b..6f428428 100644 --- a/ut/socket_ut.cpp +++ b/ut/socket_ut.cpp @@ -11,22 +11,41 @@ using namespace clickhouse; TEST(Socketcase, connecterror) { - int port = 9978; - NetworkAddress addr("localhost", std::to_string(port)); - LocalTcpServer server(port); - server.start(); - std::this_thread::sleep_for(std::chrono::seconds(1)); - try { - Socket socket(addr); - } catch (const std::system_error& e) { - FAIL(); - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - server.stop(); - try { - Socket socket(addr); - FAIL(); - } catch (const std::system_error& e) { - ASSERT_NE(EINPROGRESS,e.code().value()); - } + int port = 19978; + NetworkAddress addr("localhost", std::to_string(port)); + LocalTcpServer server(port); + server.start(); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + try { + Socket socket(addr); + } catch (const std::system_error& e) { + FAIL(); + } + + std::this_thread::sleep_for(std::chrono::seconds(1)); + server.stop(); + try { + Socket socket(addr); + FAIL(); + } catch (const std::system_error& e) { + ASSERT_NE(EINPROGRESS,e.code().value()); + } } + +// Test to verify that reading from empty socket doesn't hangs. +//TEST(Socketcase, ReadFromEmptySocket) { +// const int port = 12345; +// const NetworkAddress addr("127.0.0.1", std::to_string(port)); + +// LocalTcpServer server(port); +// server.start(); + +// std::this_thread::sleep_for(std::chrono::seconds(1)); + +// char buffer[1024]; +// Socket socket(addr); +// socket.SetTcpNoDelay(true); +// auto input = socket.makeInputStream(); +// input->Read(buffer, sizeof(buffer)); +//} diff --git a/ut/tcp_server.cpp b/ut/tcp_server.cpp index 24794e09..c5eabc71 100644 --- a/ut/tcp_server.cpp +++ b/ut/tcp_server.cpp @@ -31,7 +31,7 @@ void LocalTcpServer::start() { sockaddr_in servAddr; memset((char*)&servAddr, 0, sizeof(servAddr)); servAddr.sin_family = AF_INET; - servAddr.sin_addr.s_addr = htonl(INADDR_ANY); + servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); servAddr.sin_port = htons(port_); serverSd_ = socket(AF_INET, SOCK_STREAM, 0); if (serverSd_ < 0) { @@ -52,8 +52,11 @@ void LocalTcpServer::start() { //bind the socket to its local address int bindStatus = bind(serverSd_, (struct sockaddr*) &servAddr, sizeof(servAddr)); if (bindStatus < 0) { - std::cerr << "Error binding socket to local address" << std::endl; - throw std::runtime_error("Error binding socket to local address"); + auto err = errno; + const char * error = strerror(err); + + std::cerr << "Error binding socket to local address: " << error << std::endl; + throw std::runtime_error("Error binding socket to local address: " + std::string(error ? error : "")); } listen(serverSd_, 3); } diff --git a/ut/tcp_server.h b/ut/tcp_server.h index d8f3cd57..2c2bce67 100644 --- a/ut/tcp_server.h +++ b/ut/tcp_server.h @@ -1,7 +1,12 @@ #pragma once +#include + namespace clickhouse { +class InputStream; +class OutputStream; + class LocalTcpServer { public: LocalTcpServer(int port); @@ -11,9 +16,7 @@ class LocalTcpServer { void stop(); private: - void startImpl(); -private: int port_; int serverSd_; }; diff --git a/ut/types_ut.cpp b/ut/types_ut.cpp index 11182314..8e355ee1 100644 --- a/ut/types_ut.cpp +++ b/ut/types_ut.cpp @@ -1,5 +1,6 @@ #include #include +#include #include diff --git a/ut/utils.cpp b/ut/utils.cpp index 9debe5e4..e24bccc7 100644 --- a/ut/utils.cpp +++ b/ut/utils.cpp @@ -12,6 +12,8 @@ #include // for ipv4-ipv6 platform-specific stuff +#include + namespace { using namespace clickhouse; struct DateTimeValue { @@ -44,6 +46,29 @@ bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { return false; } +template <> +bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { + if (const auto & casted_c = c->As()) { + // via temporary stream to preserve fill and alignment of the ostr + std::stringstream sstr; + sstr << casted_c->NameAt(row) << " (" << static_cast(casted_c->At(row)) << ")"; + ostr << sstr.str(); + return true; + } + return false; +} + +template <> +bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { + if (const auto & casted_c = c->As()) { + std::stringstream sstr; + sstr << casted_c->NameAt(row) << " (" << static_cast(casted_c->At(row)) << ")"; + ostr << sstr.str(); + return true; + } + return false; +} + std::ostream & printColumnValue(const ColumnRef& c, const size_t row, std::ostream & ostr) { const auto r = false @@ -73,6 +98,15 @@ std::ostream & printColumnValue(const ColumnRef& c, const size_t row, std::ostre return ostr; } +struct ColumnValue { + const ColumnRef& c; + size_t row; +}; + +std::ostream & operator<<(std::ostream & ostr, const ColumnValue& v) { + return printColumnValue(v.c, v.row, ostr); +} + } std::ostream& operator<<(std::ostream & ostr, const Block & block) { @@ -97,6 +131,59 @@ std::ostream& operator<<(std::ostream & ostr, const Block & block) { return ostr; } +std::ostream& operator<<(std::ostream & ostr, const PrettyPrintBlock & pretty_print_block) { + // Pretty-print block: + // - names of each column + // - types of each column + // - values of column row-by-row + + const auto & block = pretty_print_block.block; + if (block.GetRowCount() == 0 || block.GetColumnCount() == 0) + return ostr; + + std::vector column_width(block.GetColumnCount()); + const auto horizontal_bar = '|'; + const auto cross = '+'; + const auto vertical_bar = '-'; + + std::stringstream sstr; + for (auto i = block.begin(); i != block.end(); ++i) { + auto width = column_width[i.ColumnIndex()] = std::max(i.Type()->GetName().size(), i.Name().size()); + sstr << cross << std::setw(width + 2) << std::setfill(vertical_bar) << vertical_bar; + } + sstr << cross; + const auto split_line = sstr.str(); + + ostr << split_line << std::endl; + // column name + for (auto i = block.begin(); i != block.end(); ++i) { + auto width = column_width[i.ColumnIndex()]; + ostr << horizontal_bar << ' ' << std::setw(width) << i.Name() << ' '; + } + ostr << horizontal_bar << std::endl;; + ostr << split_line << std::endl; + + // column type + for (auto i = block.begin(); i != block.end(); ++i) { + auto width = column_width[i.ColumnIndex()]; + ostr << horizontal_bar << ' ' << std::setw(width) << i.Type()->GetName() << ' '; + } + ostr << horizontal_bar << std::endl;; + ostr << split_line << std::endl; + + // values + for (size_t row_index = 0; row_index < block.GetRowCount(); ++row_index) { + for (auto i = block.begin(); i != block.end(); ++i) { + auto width = column_width[i.ColumnIndex()]; + ostr << horizontal_bar << ' ' << std::setw(width) << ColumnValue{i.Column(), row_index} << ' '; + } + ostr << horizontal_bar << std::endl; + } + ostr << split_line << std::endl; + + return ostr; +} + std::ostream& operator<<(std::ostream& ostr, const in_addr& addr) { char buf[INET_ADDRSTRLEN]; const char* ip_str = inet_ntop(AF_INET, &addr, buf, sizeof(buf)); diff --git a/ut/utils.h b/ut/utils.h index 9014d1f1..e96ffb84 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -110,7 +110,13 @@ MeasuresCollector collect(MeasureFunc && f) { struct in_addr; struct in6_addr; +// Helper for pretty-printing of the Block +struct PrettyPrintBlock { + const clickhouse::Block & block; +}; + std::ostream& operator<<(std::ostream & ostr, const clickhouse::Block & block); +std::ostream& operator<<(std::ostream & ostr, const PrettyPrintBlock & block); std::ostream& operator<<(std::ostream& ostr, const in_addr& addr); std::ostream& operator<<(std::ostream& ostr, const in6_addr& addr); @@ -118,6 +124,9 @@ std::ostream& operator<<(std::ostream& ostr, const in6_addr& addr); template auto getEnvOrDefault(const std::string& env, const char * default_val) { const char* v = std::getenv(env.c_str()); + if (!v && !default_val) + throw std::runtime_error("Environment var '" + env + "' is not set."); + const std::string value = v ? v : default_val; if constexpr (std::is_same_v) { From d6a0c7d67b528980f5e0191312fcf046ff9d6ec6 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 11:41:41 +0200 Subject: [PATCH 2/9] Fixed test --- ut/client_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index f8a9ecb7..6adb55ed 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -890,7 +890,7 @@ TEST_P(ClientCase, Query_ID) { " WHERE type = 'QueryStart' AND query_id == '" + query_id +"'", [&total_count](const Block & block) { total_count += block.GetRowCount(); - std::cerr << PrettyPrintBlock{block} << std::endl; +// std::cerr << PrettyPrintBlock{block} << std::endl; }); // We've executed 5 queries with explicit query_id, hence we expect to see 5 entries in logs. From 1db2f787d0c8c960edf17dc32185e49351b3631f Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 11:44:38 +0200 Subject: [PATCH 3/9] Explicit memory order for storing unique_type_id_ value --- clickhouse/types/types.cpp | 4 ++-- clickhouse/types/types.h | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/clickhouse/types/types.cpp b/clickhouse/types/types.cpp index f9ac8ca9..32ef9321 100644 --- a/clickhouse/types/types.cpp +++ b/clickhouse/types/types.cpp @@ -121,9 +121,9 @@ uint64_t Type::GetTypeUniqueId() const { // 1. going to be the same // 2. going to be stored atomically - if (type_unique_id_ == 0) { + if (type_unique_id_.load(std::memory_order::memory_order_relaxed) == 0) { const auto name = GetName(); - type_unique_id_ = CityHash64WithSeed(name.c_str(), name.size(), code_); + type_unique_id_.store(CityHash64WithSeed(name.c_str(), name.size(), code_), std::memory_order::memory_order_relaxed); } return type_unique_id_; diff --git a/clickhouse/types/types.h b/clickhouse/types/types.h index 8f4e50e4..173edd96 100644 --- a/clickhouse/types/types.h +++ b/clickhouse/types/types.h @@ -75,6 +75,7 @@ class Type { /// Is given type same as current one. bool IsEqual(const Type& other) const { + // Types are equal only if both code_ and type_unique_id_ are equal. return this == &other // GetTypeUniqueId() is relatively heavy, so avoid calling it when comparing obviously different types. || (this->GetCode() == other.GetCode() && this->GetTypeUniqueId() == other.GetTypeUniqueId()); From 18e3da290253d2456411a189ff1b43297e0f8469 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 12:51:17 +0200 Subject: [PATCH 4/9] Fixed build on Windows --- ut/utils.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ut/utils.h b/ut/utils.h index e96ffb84..bf8c4869 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -69,6 +71,9 @@ inline ostream & operator<<(ostream & ostr, const chrono::duration & d) { } } +#ifdef _win_ +#define _SILENCE_CXX17_RESULT_OF_DEPRECATION_WARNING +#endif // Since result_of is deprecated in C++20, and invoke_result_of is unavailable until C++20... template using my_result_of_t = From 8638aa3636cb12fde8ed4b5440d0cce08e9b64ef Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 13:06:32 +0200 Subject: [PATCH 5/9] Second attempt to fix win builds --- ut/utils.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ut/utils.h b/ut/utils.h index bf8c4869..d785b39b 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -72,7 +72,7 @@ inline ostream & operator<<(ostream & ostr, const chrono::duration & d) { } #ifdef _win_ -#define _SILENCE_CXX17_RESULT_OF_DEPRECATION_WARNING +#define _SILENCE_CXX17_RESULT_OF_DEPRECATION_WARNING 1 #endif // Since result_of is deprecated in C++20, and invoke_result_of is unavailable until C++20... template From 4fc2b182e0cf6517c0b42de344a9ec4d08cfcca7 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 14:22:53 +0200 Subject: [PATCH 6/9] Fixed win build, attempt #3 --- ut/utils.h | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/ut/utils.h b/ut/utils.h index d785b39b..138ad160 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -71,22 +72,19 @@ inline ostream & operator<<(ostream & ostr, const chrono::duration & d) { } } -#ifdef _win_ -#define _SILENCE_CXX17_RESULT_OF_DEPRECATION_WARNING 1 -#endif -// Since result_of is deprecated in C++20, and invoke_result_of is unavailable until C++20... +// Since result_of is deprecated in C++17, and invoke_result_of is unavailable until C++20... template using my_result_of_t = -#if __cplusplus >= 202002L - std::invoke_result_of_t; +#if __cplusplus >= 201703L + std::invoke_result_t; #else - std::result_of_t; + std::result_of_t; #endif template class MeasuresCollector { public: - using Result = my_result_of_t; + using Result = my_result_of_t; explicit MeasuresCollector(MeasureFunc && measurment_func, const size_t preallocate_results = 10) : measurment_func_(std::move(measurment_func)) From 9c411b54c5a2d881065827b51f89e2f0a6a455cd Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 14:29:48 +0200 Subject: [PATCH 7/9] Fix win build, attempt #4 --- ut/columns_ut.cpp | 2 +- ut/utils.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ut/columns_ut.cpp b/ut/columns_ut.cpp index 125ebbab..1ab7209d 100644 --- a/ut/columns_ut.cpp +++ b/ut/columns_ut.cpp @@ -95,7 +95,7 @@ static const auto LOWCARDINALITY_STRING_FOOBAR_10_ITEMS_BINARY = template auto GenerateVector(size_t items, Generator && gen) { - std::vector> result; + std::vector> result; result.reserve(items); for (size_t i = 0; i < items; ++i) { result.push_back(std::move(gen(i))); diff --git a/ut/utils.h b/ut/utils.h index 138ad160..82446d13 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -78,7 +78,7 @@ using my_result_of_t = #if __cplusplus >= 201703L std::invoke_result_t; #else - std::result_of_t; + std::result_of_t; #endif template From f810c16f9093daa8540c1cce3a4981489b0f5dbc Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 15:08:19 +0200 Subject: [PATCH 8/9] Fix build attempt 5 --- ut/columns_ut.cpp | 3 ++- ut/utils.cpp | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/ut/columns_ut.cpp b/ut/columns_ut.cpp index 1ab7209d..28c6f88d 100644 --- a/ut/columns_ut.cpp +++ b/ut/columns_ut.cpp @@ -12,13 +12,14 @@ #include #include #include +#include // for ipv4-ipv6 platform-specific stuff #include #include "utils.h" #include +#include -#include // for ipv4-ipv6 platform-specific stuff // only compare PODs of equal size this way template // for ipv4-ipv6 platform-specific stuff #include +#include namespace { using namespace clickhouse; @@ -46,9 +47,9 @@ bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { return false; } -template <> -bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { - if (const auto & casted_c = c->As()) { +template +bool doPrintEnumValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { + if (const auto & casted_c = c->As()) { // via temporary stream to preserve fill and alignment of the ostr std::stringstream sstr; sstr << casted_c->NameAt(row) << " (" << static_cast(casted_c->At(row)) << ")"; @@ -58,15 +59,14 @@ bool doPrintValue(const ColumnRef & c, const size_t row, std::ostre return false; } +template <> +bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { + return doPrintEnumValue(c, row, ostr); +} + template <> bool doPrintValue(const ColumnRef & c, const size_t row, std::ostream & ostr) { - if (const auto & casted_c = c->As()) { - std::stringstream sstr; - sstr << casted_c->NameAt(row) << " (" << static_cast(casted_c->At(row)) << ")"; - ostr << sstr.str(); - return true; - } - return false; + return doPrintEnumValue(c, row, ostr); } std::ostream & printColumnValue(const ColumnRef& c, const size_t row, std::ostream & ostr) { @@ -152,7 +152,7 @@ std::ostream& operator<<(std::ostream & ostr, const PrettyPrintBlock & pretty_pr sstr << cross << std::setw(width + 2) << std::setfill(vertical_bar) << vertical_bar; } sstr << cross; - const auto split_line = sstr.str(); + const std::string split_line(sstr.str()); ostr << split_line << std::endl; // column name From 07a928f75ce62f3c9ac4224bc53c8daaba48164b Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 8 Mar 2022 15:28:25 +0200 Subject: [PATCH 9/9] Fix test with SYSTEM FLUSH LOGS; On some systems in CI/CD we don't have enough privileges to flush logs and have to wait for this to occur automatically. --- ut/client_ut.cpp | 18 +++++++++++++++++- ut/utils.h | 2 ++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 6adb55ed..9a8be221 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1,10 +1,14 @@ #include + #include "readonly_client_test.h" #include "connection_failed_client_test.h" #include "utils.h" + #include #include +#include +#include using namespace clickhouse; @@ -882,7 +886,19 @@ TEST_P(ClientCase, Query_ID) { client_->SelectCancelable("SELECT 'b', count(*) FROM " + table_name, query_id, [](const Block &) { return true; }); client_->Execute(Query("TRUNCATE TABLE " + table_name, query_id)); - client_->Execute("SYSTEM FLUSH LOGS"); + try { + client_->Execute("SYSTEM FLUSH LOGS"); + } catch (const std::exception & e) { + // DB::Exception: clickhouse_cpp_cicd: Not enough privileges. To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON + if (std::string(e.what()).find("To execute this query it's necessary to have grant SYSTEM FLUSH LOGS ON") != std::string::npos) { + // Insufficient privileges, the only safe way is to wait long enough for system + // to flush the logs automaticaly. Usualy it takes 7.5 seconds, so just in case, + // wait 3 times that to ensure that all previously executed queries are in the logs now. + const auto wait_duration = std::chrono::seconds(23); + std::cerr << "Got error while flushing logs, now we wait " << wait_duration << "..." << std::endl; + std::this_thread::sleep_for(wait_duration); + } + } size_t total_count = 0; client_->Select("SELECT type, query_kind, query_id, query " diff --git a/ut/utils.h b/ut/utils.h index 82446d13..834b0bcf 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -58,6 +58,8 @@ inline const char * getPrefix() { prefix = "c"; } else if constexpr (std::ratio_equal_v) { prefix = "d"; + } else if constexpr (std::ratio_equal_v>) { + prefix = ""; } else { static_assert("Unsupported ratio"); }