From 5d548935e0cdcfd39f9b003875abda82948eb397 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Fri, 17 Nov 2023 21:41:38 +0800 Subject: [PATCH] [improvement](insert) support schema change and decommission for group commit (#26359) --- be/src/common/config.cpp | 1 - be/src/common/config.h | 1 - be/src/olap/wal_manager.cpp | 138 ++++++++++++- be/src/olap/wal_manager.h | 26 ++- be/src/olap/wal_reader.cpp | 35 +++- be/src/olap/wal_reader.h | 2 + be/src/olap/wal_table.cpp | 192 ++++++++++++++++-- be/src/olap/wal_table.h | 23 ++- be/src/olap/wal_writer.cpp | 68 +++++-- be/src/olap/wal_writer.h | 8 +- be/src/runtime/group_commit_mgr.cpp | 15 +- be/src/service/internal_service.cpp | 16 ++ be/src/service/internal_service.h | 5 + be/src/vec/exec/format/wal/wal_reader.cpp | 26 ++- be/src/vec/exec/format/wal/wal_reader.h | 4 + be/src/vec/exec/scan/vfile_scanner.cpp | 4 - be/src/vec/sink/writer/vtablet_writer.cpp | 66 +----- be/src/vec/sink/writer/vtablet_writer.h | 16 +- be/src/vec/sink/writer/vwal_writer.cpp | 120 +++++++++++ be/src/vec/sink/writer/vwal_writer.h | 108 ++++++++++ be/test/exec/test_data/wal_scanner/wal | Bin 132 -> 180 bytes be/test/vec/exec/vtablet_sink_test.cpp | 8 + be/test/vec/exec/vwal_scanner_test.cpp | 20 +- .../java/org/apache/doris/common/Config.java | 4 + .../apache/doris/alter/SchemaChangeJobV2.java | 39 +++- .../org/apache/doris/alter/SystemHandler.java | 7 +- .../java/org/apache/doris/catalog/Env.java | 7 + .../doris/httpv2/rest/CheckWalSizeAction.java | 113 +++++++++++ .../apache/doris/httpv2/rest/LoadAction.java | 119 +++++++++-- .../apache/doris/load/GroupCommitManager.java | 145 +++++++++++++ .../org/apache/doris/qe/StmtExecutor.java | 27 +++ .../doris/rpc/BackendServiceClient.java | 6 + .../apache/doris/rpc/BackendServiceProxy.java | 16 ++ .../doris/service/FrontendServiceImpl.java | 41 ++++ .../cluster/DecommissionBackendTest.java | 41 +++- gensrc/proto/internal_service.proto | 11 + gensrc/thrift/FrontendService.thrift | 12 ++ 37 files changed, 1305 insertions(+), 185 deletions(-) create mode 100644 be/src/vec/sink/writer/vwal_writer.cpp create mode 100644 be/src/vec/sink/writer/vwal_writer.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1f0796e9832892..870ba5c9b0a720 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1076,7 +1076,6 @@ DEFINE_Int16(bitmap_serialize_version, "1"); DEFINE_String(group_commit_replay_wal_dir, "./wal"); DEFINE_Int32(group_commit_replay_wal_retry_num, "10"); DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5"); -DEFINE_Int32(group_commit_sync_wal_batch, "10"); DEFINE_Bool(wait_internal_group_commit_finish, "false"); // the count of thread to group commit insert diff --git a/be/src/common/config.h b/be/src/common/config.h index 6544ed9ebc8d18..ac560d1f1c607a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1145,7 +1145,6 @@ DECLARE_Int16(bitmap_serialize_version); DECLARE_String(group_commit_replay_wal_dir); DECLARE_Int32(group_commit_replay_wal_retry_num); DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds); -DECLARE_Int32(group_commit_sync_wal_batch); DECLARE_Bool(wait_internal_group_commit_finish); // This config can be set to limit thread number in group commit insert thread pool. diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 1eb91d85e0a4b7..10af1494456e94 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -38,7 +38,7 @@ namespace doris { WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) - : _exec_env(exec_env), _stop_background_threads_latch(1) { + : _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) { doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs); _all_wal_disk_bytes = std::make_shared(0); } @@ -48,12 +48,15 @@ WalManager::~WalManager() { } void WalManager::stop() { - _stop = true; - _stop_background_threads_latch.count_down(); - if (_replay_thread) { - _replay_thread->join(); + if (!this->_stop.load()) { + this->_stop.store(true); + stop_relay_wal(); + _stop_background_threads_latch.count_down(); + if (_replay_thread) { + _replay_thread->join(); + } + LOG(INFO) << "WalManager is stopped"; } - LOG(INFO) << "WalManager is stopped"; } Status WalManager::init() { @@ -76,6 +79,78 @@ Status WalManager::init() { &_replay_thread); } +void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { + std::lock_guard wrlock(_wal_status_lock); + LOG(INFO) << "add wal queue " + << ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status; + auto it = _wal_status_queues.find(table_id); + if (it == _wal_status_queues.end()) { + std::unordered_map tmp; + tmp.emplace(wal_id, wal_status); + _wal_status_queues.emplace(table_id, tmp); + } else { + it->second.emplace(wal_id, wal_status); + } +} + +Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) { + std::lock_guard wrlock(_wal_status_lock); + auto it = _wal_status_queues.find(table_id); + LOG(INFO) << "remove wal queue " + << ",table_id:" << table_id << ",wal_id:" << wal_id; + if (it == _wal_status_queues.end()) { + return Status::InternalError("table_id " + std::to_string(table_id) + + " not found in wal status queue"); + } else { + it->second.erase(wal_id); + if (it->second.empty()) { + _wal_status_queues.erase(table_id); + } + } + return Status::OK(); +} + +Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response) { + std::lock_guard wrlock(_wal_status_lock); + size_t count = 0; + auto table_id = request->table_id(); + auto txn_id = request->txn_id(); + if (table_id > 0 && txn_id > 0) { + auto it = _wal_status_queues.find(table_id); + if (it == _wal_status_queues.end()) { + LOG(INFO) << ("table_id " + std::to_string(table_id) + + " not found in wal status queue"); + } else { + for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) { + if (wal_it->first <= txn_id) { + count += 1; + } + } + } + } else { + for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); it++) { + count += it->second.size(); + } + } + response->set_size(count); + if (count > 0) { + print_wal_status_queue(); + } + return Status::OK(); +} + +void WalManager::print_wal_status_queue() { + std::stringstream ss; + for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); ++it) { + ss << "table_id:" << it->first << std::endl; + for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) { + ss << "wal_id:" << wal_it->first << ",status:" << wal_it->second << std::endl; + } + } + LOG(INFO) << ss.str(); +} + Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label) { std::string base_path = @@ -132,7 +207,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr& } return Status::OK(); } - Status WalManager::scan_wals(const std::string& wal_path) { size_t count = 0; bool exists = true; @@ -174,8 +248,16 @@ Status WalManager::scan_wals(const std::string& wal_path) { res.emplace_back(wal_file); { std::lock_guard wrlock(_wal_lock); - int64_t wal_id = std::strtoll(wal.file_name.c_str(), NULL, 10); - _wal_path_map.emplace(wal_id, wal_file); + auto pos = wal.file_name.find("_"); + try { + int64_t wal_id = + std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10); + _wal_path_map.emplace(wal_id, wal_file); + int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10); + add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } } } st = add_recover_wal(db_id.file_name, table_id.file_name, res); @@ -193,11 +275,12 @@ Status WalManager::scan_wals(const std::string& wal_path) { Status WalManager::replay() { do { - if (_stop || _exec_env->master_info() == nullptr) { + if (_stop.load()) { break; } // port == 0 means not received heartbeat yet - if (_exec_env->master_info()->network_address.port == 0) { + if (_exec_env->master_info() != nullptr && + _exec_env->master_info()->network_address.port == 0) { continue; } std::vector replay_tables; @@ -276,4 +359,37 @@ Status WalManager::delete_wal(int64_t wal_id) { return Status::OK(); } +bool WalManager::is_running() { + return !_stop.load(); +} + +void WalManager::stop_relay_wal() { + std::lock_guard wrlock(_lock); + for (auto it = _table_map.begin(); it != _table_map.end(); it++) { + it->second->stop(); + } +} + +void WalManager::add_wal_column_index(int64_t wal_id, std::vector& column_index) { + _wal_column_id_map.emplace(wal_id, column_index); +} + +void WalManager::erase_wal_column_index(int64_t wal_id) { + if (_wal_column_id_map.erase(wal_id)) { + LOG(INFO) << "erase " << wal_id << " from wal_column_id_map"; + } else { + LOG(WARNING) << "fail to erase wal " << wal_id << " from wal_column_id_map"; + } +} + +Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& column_index) { + auto it = _wal_column_id_map.find(wal_id); + if (it != _wal_column_id_map.end()) { + column_index = it->second; + } else { + return Status::InternalError("cannot find wal {} in wal_column_id_map", wal_id); + } + return Status::OK(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index d8fa4a705278d2..cf4589fbf00112 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include + #include #include "common/config.h" @@ -32,6 +34,13 @@ namespace doris { class WalManager { ENABLE_FACTORY_CREATOR(WalManager); +public: + enum WAL_STATUS { + PREPARE = 0, + REPLAY, + CREATE, + }; + public: WalManager(ExecEnv* exec_env, const std::string& wal_dir); ~WalManager(); @@ -47,7 +56,19 @@ class WalManager { std::vector wals); Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label); Status get_wal_path(int64_t wal_id, std::string& wal_path); + Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response); + Status get_all_wal_status_queue_size(const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response); + void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status); + Status erase_wal_status_queue(int64_t table_id, int64_t wal_id); + void print_wal_status_queue(); void stop(); + bool is_running(); + void stop_relay_wal(); + void add_wal_column_index(int64_t wal_id, std::vector& column_index); + void erase_wal_column_index(int64_t wal_id); + Status get_wal_column_index(int64_t wal_id, std::vector& column_index); private: ExecEnv* _exec_env; @@ -57,9 +78,12 @@ class WalManager { std::map> _table_map; std::vector _wal_dirs; std::shared_mutex _wal_lock; + std::shared_mutex _wal_status_lock; std::unordered_map _wal_path_map; std::unordered_map> _wal_id_to_writer_map; std::shared_ptr _all_wal_disk_bytes; - bool _stop = false; + std::unordered_map> _wal_status_queues; + std::atomic _stop; + std::unordered_map&> _wal_column_id_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_reader.cpp b/be/src/olap/wal_reader.cpp index 180e3910167900..c3a9b225dd7f20 100644 --- a/be/src/olap/wal_reader.cpp +++ b/be/src/olap/wal_reader.cpp @@ -21,6 +21,7 @@ #include "io/fs/file_reader.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" +#include "util/coding.h" #include "util/crc32c.h" #include "wal_writer.h" @@ -52,8 +53,7 @@ Status WalReader::read_block(PBlock& block) { RETURN_IF_ERROR( file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); _offset += WalWriter::LENGTH_SIZE; - size_t block_len; - memcpy(&block_len, row_len_buf, WalWriter::LENGTH_SIZE); + size_t block_len = decode_fixed64_le(row_len_buf); // read block std::string block_buf; block_buf.resize(block_len); @@ -65,12 +65,39 @@ Status WalReader::read_block(PBlock& block) { RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, WalWriter::CHECKSUM_SIZE}, &bytes_read)); _offset += WalWriter::CHECKSUM_SIZE; - uint32_t checksum; - memcpy(&checksum, checksum_len_buf, WalWriter::CHECKSUM_SIZE); + uint32_t checksum = decode_fixed32_le(checksum_len_buf); RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum)); return Status::OK(); } +Status WalReader::read_header(uint32_t& version, std::string& col_ids) { + size_t bytes_read = 0; + std::string magic_str; + magic_str.resize(k_wal_magic_length); + RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read)); + if (strcmp(magic_str.c_str(), k_wal_magic) != 0) { + return Status::Corruption("Bad wal file {}: magic number not match", _file_name); + } + _offset += k_wal_magic_length; + uint8_t version_buf[WalWriter::VERSION_SIZE]; + RETURN_IF_ERROR( + file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read)); + _offset += WalWriter::VERSION_SIZE; + version = decode_fixed32_le(version_buf); + uint8_t len_buf[WalWriter::LENGTH_SIZE]; + RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read)); + _offset += WalWriter::LENGTH_SIZE; + size_t len = decode_fixed64_le(len_buf); + col_ids.resize(len); + RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read)); + _offset += len; + if (len != bytes_read) { + return Status::InternalError("failed to read header expected= " + std::to_string(len) + + ",actually=" + std::to_string(bytes_read)); + } + return Status::OK(); +} + Status WalReader::_deserialize(PBlock& block, std::string& buf) { if (UNLIKELY(!block.ParseFromString(buf))) { return Status::InternalError("failed to deserialize row"); diff --git a/be/src/olap/wal_reader.h b/be/src/olap/wal_reader.h index 825d11fae3016b..f68a031c09ba05 100644 --- a/be/src/olap/wal_reader.h +++ b/be/src/olap/wal_reader.h @@ -32,11 +32,13 @@ class WalReader { Status finalize(); Status read_block(PBlock& block); + Status read_header(uint32_t& version, std::string& col_ids); private: Status _deserialize(PBlock& block, std::string& buf); Status _check_checksum(const char* binary, size_t size, uint32_t checksum); +private: std::string _file_name; size_t _offset; io::FileReaderSPtr file_reader; diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp index fd21fafef0ea2c..4bfcec502a8c58 100644 --- a/be/src/olap/wal_table.cpp +++ b/be/src/olap/wal_table.cpp @@ -41,7 +41,7 @@ namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id) {} + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {} WalTable::~WalTable() {} #ifdef BE_TEST @@ -73,21 +73,32 @@ Status WalTable::replay_wals() { LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id << ", wal=" << wal << ", retry_num=" << config::group_commit_replay_wal_retry_num; - std::string rename_path = get_tmp_path(wal); + std::string rename_path = _get_tmp_path(wal); LOG(INFO) << "rename wal from " << wal << " to " << rename_path; std::rename(wal.c_str(), rename_path.c_str()); _replay_wal_map.erase(wal); continue; } - if (need_replay(info)) { - replaying = true; + if (_need_replay(info)) { need_replay_wals.push_back(wal); } } std::sort(need_replay_wals.begin(), need_replay_wals.end()); } for (const auto& wal : need_replay_wals) { - auto st = replay_wal_internal(wal); + { + std::lock_guard lock(_replay_wal_lock); + if (_stop.load()) { + break; + } else { + auto it = _replay_wal_map.find(wal); + if (it != _replay_wal_map.end()) { + auto& [retry_num, start_time, replaying] = it->second; + replaying = true; + } + } + } + auto st = _replay_wal_internal(wal); if (!st.ok()) { std::lock_guard lock(_replay_wal_lock); auto it = _replay_wal_map.find(wal); @@ -105,7 +116,7 @@ Status WalTable::replay_wals() { return Status::OK(); } -std::string WalTable::get_tmp_path(const std::string wal) { +std::string WalTable::_get_tmp_path(const std::string wal) { std::vector path_element; doris::vectorized::WalReader::string_split(wal, "/", path_element); std::stringstream ss; @@ -126,7 +137,7 @@ std::string WalTable::get_tmp_path(const std::string wal) { return ss.str(); } -bool WalTable::need_replay(const doris::WalTable::replay_wal_info& info) { +bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) { #ifndef BE_TEST auto& [retry_num, start_ts, replaying] = info; auto replay_interval = @@ -137,7 +148,27 @@ bool WalTable::need_replay(const doris::WalTable::replay_wal_info& info) { #endif } -Status WalTable::replay_wal_internal(const std::string& wal) { +Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) { + TLoadTxnRollbackRequest request; + request.__set_auth_code(0); // this is a fake, fe not check it now + request.__set_db_id(db_id); + request.__set_txnId(wal_id); + std::string reason = "relay wal " + std::to_string(wal_id); + request.__set_reason(reason); + TLoadTxnRollbackResult result; + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + auto st = ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->loadTxnRollback(result, request); + }, + 10000L); + auto result_status = Status::create(result.status); + LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" << result_status; + return result_status; +} + +Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; // start a new stream load { @@ -153,28 +184,42 @@ Status WalTable::replay_wal_internal(const std::string& wal) { return Status::OK(); } } - auto pair = get_wal_info(wal); - auto wal_id = pair.first; - auto label = pair.second; - RETURN_IF_ERROR(send_request(wal_id, wal, label)); + std::shared_ptr> pair = nullptr; + RETURN_IF_ERROR(_get_wal_info(wal, pair)); + auto wal_id = pair->first; + auto label = pair->second; +#ifndef BE_TEST + auto st = _abort_txn(_db_id, wal_id); + if (!st.ok()) { + LOG(WARNING) << "abort txn " << wal_id << " fail"; + } + RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); +#endif + RETURN_IF_ERROR(_send_request(wal_id, wal, label)); return Status::OK(); } -std::pair WalTable::get_wal_info(const std::string& wal) { +Status WalTable::_get_wal_info(const std::string& wal, + std::shared_ptr>& pair) { std::vector path_element; doris::vectorized::WalReader::string_split(wal, "/", path_element); auto pos = path_element[path_element.size() - 1].find("_"); - int64_t wal_id = - std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(), NULL, 10); - auto label = path_element[path_element.size() - 1].substr(pos + 1); - return std::make_pair(wal_id, label); + try { + int64_t wal_id = std::strtoll(path_element[path_element.size() - 1].substr(0, pos).c_str(), + NULL, 10); + auto label = path_element[path_element.size() - 1].substr(pos + 1); + pair = std::make_shared>(std::make_pair(wal_id, label)); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } + return Status::OK(); } void http_request_done(struct evhttp_request* req, void* arg) { event_base_loopbreak((struct event_base*)arg); } -Status WalTable::send_request(int64_t wal_id, const std::string& wal, const std::string& label) { +Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { #ifndef BE_TEST struct event_base* base = nullptr; struct evhttp_connection* conn = nullptr; @@ -187,9 +232,35 @@ Status WalTable::send_request(int64_t wal_id, const std::string& wal, const std: evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str()); evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str()); evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str()); + std::string columns; + RETURN_IF_ERROR(_read_wal_header(wal, columns)); + std::vector column_id_element; + doris::vectorized::WalReader::string_split(columns, ",", column_id_element); + std::vector index_vector; + std::stringstream ss_name; + std::stringstream ss_id; + int index = 0; + for (auto column_id_str : column_id_element) { + try { + int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); + auto it = _column_id_name_map.find(column_id); + if (it != _column_id_name_map.end()) { + ss_name << it->second << ","; + ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ","; + index_vector.emplace_back(index); + _column_id_name_map.erase(column_id); + } + index++; + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } + } + _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); + auto name = ss_name.str().substr(0, ss_name.str().size() - 1); + auto id = ss_id.str().substr(0, ss_id.str().size() - 1); std::stringstream ss; - ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label - << " select * from http_stream(\"format\" = \"wal\", \"table_id\" = \"" + ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" + << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" << std::to_string(_table_id) << "\")"; evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str()); evbuffer* output = evhttp_request_get_output_buffer(req); @@ -251,14 +322,93 @@ Status WalTable::send_request(int64_t wal_id, const std::string& wal, const std: } else { LOG(INFO) << "success to replay wal =" << wal << ",status:" << status << ",msg:" << msg; RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(wal_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(_table_id, wal_id)); std::lock_guard lock(_replay_wal_lock); - _replay_wal_map.erase(wal); + if (_replay_wal_map.erase(wal)) { + LOG(INFO) << "erase " << wal << " from _replay_wal_map"; + } else { + LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map"; + } } + _exec_env->wal_mgr()->erase_wal_column_index(wal_id); return Status::OK(); } +void WalTable::stop() { + bool done = true; + do { + { + std::lock_guard lock(_replay_wal_lock); + if (!this->_stop.load()) { + this->_stop.store(true); + } + auto it = _replay_wal_map.begin(); + for (; it != _replay_wal_map.end(); it++) { + auto& [retry_num, start_time, replaying] = it->second; + if (replaying) { + break; + } + } + if (it != _replay_wal_map.end()) { + done = false; + } else { + done = true; + } + } + if (!done) { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } while (!done); +} + size_t WalTable::size() { std::lock_guard lock(_replay_wal_lock); return _replay_wal_map.size(); } + +Status WalTable::_get_column_info(int64_t db_id, int64_t tb_id) { + TGetColumnInfoRequest request; + request.__set_db_id(db_id); + request.__set_table_id(tb_id); + TGetColumnInfoResult result; + Status status; + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + if (master_addr.hostname.empty() || master_addr.port == 0) { + status = Status::InternalError("Have not get FE Master heartbeat yet"); + } else { + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->getColumnInfo(result, request); + })); + std::string columns_str = result.column_info; + std::vector column_element; + doris::vectorized::WalReader::string_split(columns_str, ",", column_element); + int64_t index = 1; + for (auto column : column_element) { + auto pos = column.find(":"); + try { + auto column_name = column.substr(0, pos); + int64_t column_id = std::strtoll(column.substr(pos + 1).c_str(), NULL, 10); + _column_id_name_map.emplace(column_id, column_name); + _column_id_index_map.emplace(column_id, index++); + } catch (const std::invalid_argument& e) { + return Status::InvalidArgument("Invalid format, {}", e.what()); + } + } + + status = Status::create(result.status); + } + return status; +} + +Status WalTable::_read_wal_header(const std::string& wal_path, std::string& columns) { + std::shared_ptr wal_reader; + RETURN_IF_ERROR(_exec_env->wal_mgr()->create_wal_reader(wal_path, wal_reader)); + uint32_t version = 0; + RETURN_IF_ERROR(wal_reader->read_header(version, columns)); + RETURN_IF_ERROR(wal_reader->finalize()); + return Status::OK(); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h index 2dd63240d3547e..354f4f16b05cc4 100644 --- a/be/src/olap/wal_table.h +++ b/be/src/olap/wal_table.h @@ -31,17 +31,25 @@ class WalTable { public: WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id); ~WalTable(); - // - using replay_wal_info = std::tuple; // used when be start and there are wals need to do recovery void add_wals(std::vector wals); Status replay_wals(); size_t size(); + void stop(); + +public: + // + using replay_wal_info = std::tuple; private: - std::pair get_wal_info(const std::string& wal); - std::string get_tmp_path(const std::string wal); - Status send_request(int64_t wal_id, const std::string& wal, const std::string& label); + Status _get_wal_info(const std::string& wal, std::shared_ptr>&); + std::string _get_tmp_path(const std::string wal); + Status _send_request(int64_t wal_id, const std::string& wal, const std::string& label); + Status _abort_txn(int64_t db_id, int64_t wal_id); + Status _get_column_info(int64_t db_id, int64_t tb_id); + Status _read_wal_header(const std::string& wal, std::string& columns); + bool _need_replay(const replay_wal_info& info); + Status _replay_wal_internal(const std::string& wal); private: ExecEnv* _exec_env; @@ -52,7 +60,8 @@ class WalTable { mutable std::mutex _replay_wal_lock; // key is wal_id std::map _replay_wal_map; - bool need_replay(const replay_wal_info& info); - Status replay_wal_internal(const std::string& wal); + std::atomic _stop; + std::map _column_id_name_map; + std::map _column_id_index_map; }; } // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_writer.cpp b/be/src/olap/wal_writer.cpp index b433c7aaaa2f13..52202b35c3d65d 100644 --- a/be/src/olap/wal_writer.cpp +++ b/be/src/olap/wal_writer.cpp @@ -28,17 +28,16 @@ namespace doris { +const char* k_wal_magic = "WAL1"; +const uint32_t k_wal_magic_length = 4; + WalWriter::WalWriter(const std::string& file_name, const std::shared_ptr& all_wal_disk_bytes) - : _file_name(file_name), - _count(0), - _disk_bytes(0), - _all_wal_disk_bytes(all_wal_disk_bytes) {} + : _file_name(file_name), _disk_bytes(0), _all_wal_disk_bytes(all_wal_disk_bytes) {} WalWriter::~WalWriter() {} Status WalWriter::init() { - _batch = config::group_commit_sync_wal_batch; RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_name, &_file_writer)); return Status::OK(); } @@ -62,31 +61,60 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) { for (const auto& block : blocks) { total_size += LENGTH_SIZE + block->ByteSizeLong() + CHECKSUM_SIZE; } - std::string binary(total_size, '\0'); - char* row_binary = binary.data(); size_t offset = 0; for (const auto& block : blocks) { - unsigned long row_length = block->GetCachedSize(); - memcpy(row_binary + offset, &row_length, LENGTH_SIZE); + uint8_t len_buf[sizeof(uint64_t)]; + uint64_t block_length = block->ByteSizeLong(); + encode_fixed64_le(len_buf, block_length); + RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)})); offset += LENGTH_SIZE; - memcpy(row_binary + offset, block->SerializeAsString().data(), row_length); - offset += row_length; - uint32_t checksum = crc32c::Value(block->SerializeAsString().data(), row_length); - memcpy(row_binary + offset, &checksum, CHECKSUM_SIZE); + std::string content = block->SerializeAsString(); + RETURN_IF_ERROR(_file_writer->append(content)); + offset += block_length; + uint8_t checksum_buf[sizeof(uint32_t)]; + uint32_t checksum = crc32c::Value(content.data(), block_length); + encode_fixed32_le(checksum_buf, checksum); + RETURN_IF_ERROR(_file_writer->append({checksum_buf, sizeof(uint32_t)})); offset += CHECKSUM_SIZE; } - DCHECK(offset == total_size); + if (offset != total_size) { + return Status::InternalError( + "failed to write block to wal expected= " + std::to_string(total_size) + + ",actually=" + std::to_string(offset)); + } _disk_bytes.store(_disk_bytes.fetch_add(total_size, std::memory_order_relaxed), std::memory_order_relaxed); _all_wal_disk_bytes->store( _all_wal_disk_bytes->fetch_add(total_size, std::memory_order_relaxed), std::memory_order_relaxed); - // write rows - RETURN_IF_ERROR(_file_writer->append({row_binary, offset})); - _count++; - if (_count % _batch == 0) { - //todo sync data - //LOG(INFO) << "count=" << count << ",do sync"; + return Status::OK(); +} + +Status WalWriter::append_header(uint32_t version, std::string col_ids) { + size_t total_size = 0; + uint64_t length = col_ids.size(); + total_size += k_wal_magic_length; + total_size += VERSION_SIZE; + total_size += LENGTH_SIZE; + total_size += length; + size_t offset = 0; + RETURN_IF_ERROR(_file_writer->append({k_wal_magic, k_wal_magic_length})); + offset += k_wal_magic_length; + + uint8_t version_buf[sizeof(uint32_t)]; + encode_fixed32_le(version_buf, version); + RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)})); + offset += VERSION_SIZE; + uint8_t len_buf[sizeof(uint64_t)]; + encode_fixed64_le(len_buf, length); + RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)})); + offset += LENGTH_SIZE; + RETURN_IF_ERROR(_file_writer->append(col_ids)); + offset += length; + if (offset != total_size) { + return Status::InternalError( + "failed to write header to wal expected= " + std::to_string(total_size) + + ",actually=" + std::to_string(offset)); } return Status::OK(); } diff --git a/be/src/olap/wal_writer.h b/be/src/olap/wal_writer.h index f11dbc20eb05cd..4c070c0aeafe41 100644 --- a/be/src/olap/wal_writer.h +++ b/be/src/olap/wal_writer.h @@ -28,6 +28,8 @@ namespace doris { using PBlockArray = std::vector; +extern const char* k_wal_magic; +extern const uint32_t k_wal_magic_length; class WalWriter { public: @@ -40,18 +42,20 @@ class WalWriter { Status append_blocks(const PBlockArray& blocks); size_t disk_bytes() const { return _disk_bytes.load(std::memory_order_relaxed); }; + Status append_header(uint32_t version, std::string col_ids); std::string file_name() { return _file_name; }; + +public: static const int64_t LENGTH_SIZE = 8; static const int64_t CHECKSUM_SIZE = 4; doris::ConditionVariable cv; + static const int64_t VERSION_SIZE = 4; private: static constexpr size_t MAX_WAL_WRITE_WAIT_TIME = 1000; std::string _file_name; io::FileWriterPtr _file_writer; - int64_t _count; - int64_t _batch; std::atomic_size_t _disk_bytes; std::shared_ptr _all_wal_disk_bytes; doris::Mutex _mutex; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 77995da8d43072..00226d9dd785a5 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -257,8 +257,14 @@ Status GroupCommitTable::_create_group_commit_load( _need_plan_fragment = false; _cv.notify_all(); } - st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, - pipeline_params); + if (_exec_env->wal_mgr()->is_running()) { + _exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id, + WalManager::WAL_STATUS::PREPARE); + st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline, params, + pipeline_params); + } else { + st = Status::InternalError("be is stopping"); + } if (!st.ok()) { static_cast(_finish_group_commit_load(_db_id, _table_id, label, txn_id, instance_id, st, true, nullptr)); @@ -336,6 +342,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal( std::to_string(db_id), std::to_string(table_id), std::vector {wal_path})); + _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, + WalManager::WAL_STATUS::REPLAY); } return st; } @@ -347,8 +355,11 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_ RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(std::to_string(db_id), std::to_string(table_id), std::vector {wal_path})); + _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id, + WalManager::WAL_STATUS::REPLAY); } else { RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(txn_id)); + RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id, txn_id)); } std::stringstream ss; ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id << ", label=" << label diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0c12e910c3391f..ce249536e763b9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -79,6 +79,7 @@ #include "olap/tablet_schema.h" #include "olap/txn_manager.h" #include "olap/utils.h" +#include "olap/wal_manager.h" #include "runtime/buffer_control_block.h" #include "runtime/cache/result_cache.h" #include "runtime/define_primitive_type.h" @@ -1849,4 +1850,19 @@ void PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController* } }; +void PInternalServiceImpl::get_wal_queue_size(google::protobuf::RpcController* controller, + const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + Status st = Status::OK(); + st = _exec_env->wal_mgr()->get_wal_status_queue_size(request, response); + response->mutable_status()->set_status_code(st.code()); + }); + if (!ret) { + offer_failed(response, done, _light_work_pool); + } +} + } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index ca28c8b8b06c52..c324986f9bd15b 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -205,6 +205,11 @@ class PInternalServiceImpl : public PBackendService { PGroupCommitInsertResponse* response, google::protobuf::Closure* done) override; + void get_wal_queue_size(google::protobuf::RpcController* controller, + const PGetWalQueueSizeRequest* request, + PGetWalQueueSizeResponse* response, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp b/be/src/vec/exec/format/wal/wal_reader.cpp index 9c1f164b9e1f64..035ce2cd82cb88 100644 --- a/be/src/vec/exec/format/wal/wal_reader.cpp +++ b/be/src/vec/exec/format/wal/wal_reader.cpp @@ -20,6 +20,7 @@ #include "common/logging.h" #include "olap/wal_manager.h" #include "runtime/runtime_state.h" +#include "vec/data_types/data_type_string.h" namespace doris::vectorized { WalReader::WalReader(RuntimeState* state) : _state(state) { _wal_id = state->wal_id(); @@ -35,6 +36,7 @@ Status WalReader::init_reader() { return Status::OK(); } Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + //read src block PBlock pblock; auto st = _wal_reader->read_block(pblock); if (st.is()) { @@ -47,9 +49,23 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { LOG(WARNING) << "Failed to read wal on path = " << _wal_path; return st; } - vectorized::Block tmp_block; - static_cast(tmp_block.deserialize(pblock)); - block->swap(tmp_block); + vectorized::Block src_block; + RETURN_IF_ERROR(src_block.deserialize(pblock)); + //convert to dst block + vectorized::Block dst_block; + int index = 0; + auto columns = block->get_columns_with_type_and_name(); + for (auto column : columns) { + auto pos = _column_index[index]; + vectorized::ColumnPtr column_ptr = src_block.get_by_position(pos).column; + if (column.column->is_nullable()) { + column_ptr = make_nullable(column_ptr); + } + dst_block.insert(index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), + column.type, column.name)); + index++; + } + block->swap(dst_block); *read_rows = block->rows(); VLOG_DEBUG << "read block rows:" << *read_rows; return Status::OK(); @@ -71,6 +87,10 @@ void WalReader::string_split(const std::string& str, const std::string& splits, Status WalReader::get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) { + RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids)); + std::vector col_element; + string_split(_col_ids, ",", col_element); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id, _column_index)); return Status::OK(); } diff --git a/be/src/vec/exec/format/wal/wal_reader.h b/be/src/vec/exec/format/wal/wal_reader.h index c204cd9b5e5a4e..bee48fe146d09d 100644 --- a/be/src/vec/exec/format/wal/wal_reader.h +++ b/be/src/vec/exec/format/wal/wal_reader.h @@ -31,6 +31,7 @@ class WalReader : public GenericReader { std::unordered_set* missing_cols) override; static void string_split(const std::string& str, const std::string& splits, std::vector& res); + std::vector get_index() { return _column_index; } private: RuntimeState* _state; @@ -38,6 +39,9 @@ class WalReader : public GenericReader { std::string _path_split = "/"; int64_t _wal_id; std::shared_ptr _wal_reader = nullptr; + uint32_t _version = 0; + std::string _col_ids; + std::vector _column_index; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 697859f4644d49..7f6c344091b01f 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -329,10 +329,6 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eo RETURN_IF_ERROR( _cur_reader->get_next_block(_src_block_ptr, &read_rows, &_cur_reader_eof)); } - if (_params->format_type == TFileFormatType::FORMAT_WAL) { - block->swap(*_src_block_ptr); - break; - } // use read_rows instead of _src_block_ptr->rows(), because the first column of _src_block_ptr // may not be filled after calling `get_next_block()`, so _src_block_ptr->rows() may return wrong result. if (read_rows > 0) { diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index d82168eff35deb..14e5402e0323a5 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -931,11 +931,6 @@ void VNodeChannel::mark_close() { VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) : AsyncResultWriter(output_exprs), _t_sink(t_sink) { _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; - DCHECK(t_sink.__isset.olap_table_sink); - auto& table_sink = t_sink.olap_table_sink; - _db_id = table_sink.db_id; - _tb_id = table_sink.table_id; - _wal_id = table_sink.txn_id; } Status VTabletWriter::init_properties(doris::ObjectPool* pool, bool group_commit) { @@ -1222,9 +1217,9 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { } if (_group_commit) { - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, - _state->import_label())); - RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); + _v_wal_writer = std::make_shared(table_sink.db_id, table_sink.table_id, + table_sink.txn_id, _state, _output_tuple_desc); + RETURN_IF_ERROR(_v_wal_writer->init()); } RETURN_IF_ERROR(_init_row_distribution()); @@ -1523,8 +1518,8 @@ Status VTabletWriter::close(Status exec_status) { [](const std::shared_ptr& ch) { ch->clear_all_blocks(); }); } - if (_wal_writer != nullptr) { - static_cast(_wal_writer->finalize()); + if (_v_wal_writer != nullptr) { + RETURN_IF_ERROR(_v_wal_writer->close()); } return _close_status; } @@ -1624,9 +1619,10 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { } } - if (_group_commit) { - _group_commit_block(&input_block, block->rows(), filtered_rows, _state, block.get(), - _block_convertor.get(), _tablet_finder.get()); + if (_v_wal_writer != nullptr) { + RETURN_IF_ERROR(_v_wal_writer->append_block(&input_block, block->rows(), filtered_rows, + block.get(), _block_convertor.get(), + _tablet_finder.get())); } // TODO: Before load, we need to projection unuseful column // auto slots = _schema->tuple_desc()->slots(); @@ -1657,49 +1653,5 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) { return Status::OK(); } -Status VTabletWriter::write_wal(OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder, vectorized::Block* block, - RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { - PBlock pblock; - size_t uncompressed_bytes = 0, compressed_bytes = 0; - if (filtered_rows == 0) { - RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, - &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } else { - auto cloneBlock = block->clone_without_columns(); - auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); - for (int i = 0; i < num_rows; ++i) { - if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { - continue; - } - if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { - continue; - } - res_block.add_row(block, i); - } - RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, - &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::SNAPPY)); - RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); - } - return Status::OK(); -} - -void VTabletWriter::_group_commit_block(vectorized::Block* input_block, int64_t num_rows, - int64_t filter_rows, RuntimeState* state, - vectorized::Block* block, - OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder) { - static_cast( - write_wal(block_convertor, tablet_finder, block, state, num_rows, filter_rows)); -#ifndef BE_TEST - auto* future_block = assert_cast(input_block); - std::unique_lock l(*(future_block->lock)); - future_block->set_result(Status::OK(), num_rows, num_rows - filter_rows); - future_block->cv->notify_all(); -#endif -} - } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index dc68d4d61a4f6d..e9f6ae13f437dd 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -35,6 +35,7 @@ #include #include "olap/wal_writer.h" +#include "vwal_writer.h" // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -573,15 +574,6 @@ class VTabletWriter final : public AsyncResultWriter { Status _incremental_open_node_channel(const std::vector& partitions); - Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, - vectorized::Block* block, RuntimeState* state, int64_t num_rows, - int64_t filtered_rows); - - void _group_commit_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows, - RuntimeState* state, vectorized::Block* block, - OlapTableBlockConvertor* block_convertor, - OlapTabletFinder* tablet_finder); - TDataSink _t_sink; std::shared_ptr _mem_tracker; @@ -681,14 +673,10 @@ class VTabletWriter final : public AsyncResultWriter { RuntimeState* _state = nullptr; // not owned, set when open RuntimeProfile* _profile = nullptr; // not owned, set when open bool _group_commit = false; - std::shared_ptr _wal_writer = nullptr; VRowDistribution _row_distribution; // reuse to avoid frequent memory allocation and release. std::vector _row_part_tablet_ids; - - int64_t _tb_id; - int64_t _db_id; - int64_t _wal_id; + std::shared_ptr _v_wal_writer = nullptr; }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vwal_writer.cpp b/be/src/vec/sink/writer/vwal_writer.cpp new file mode 100644 index 00000000000000..81ee92eb00bd15 --- /dev/null +++ b/be/src/vec/sink/writer/vwal_writer.cpp @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vwal_writer.h" + +#include + +#include +#include +#include +#include +#include + +#include "common/compiler_util.h" +#include "common/status.h" +#include "olap/wal_manager.h" +#include "runtime/client_cache.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "util/doris_metrics.h" +#include "util/network_util.h" +#include "util/proto_util.h" +#include "util/thrift_util.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/future_block.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" + +namespace doris { +namespace vectorized { + +VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, + TupleDescriptor* output_tuple_desc) + : _db_id(db_id), + _tb_id(tb_id), + _wal_id(wal_id), + _state(state), + _output_tuple_desc(output_tuple_desc) {} + +VWalWriter::~VWalWriter() {} + +Status VWalWriter::init() { + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->add_wal_path(_db_id, _tb_id, _wal_id, + _state->import_label())); + RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->create_wal_writer(_wal_id, _wal_writer)); + _state->exec_env()->wal_mgr()->add_wal_status_queue(_tb_id, _wal_id, + WalManager::WAL_STATUS::CREATE); + std::stringstream ss; + for (auto slot_desc : _output_tuple_desc->slots()) { + ss << std::to_string(slot_desc->col_unique_id()) << ","; + } + std::string col_ids = ss.str().substr(0, ss.str().size() - 1); + RETURN_IF_ERROR(_wal_writer->append_header(_version, col_ids)); + return Status::OK(); +} +Status VWalWriter::write_wal(OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder, vectorized::Block* block, + RuntimeState* state, int64_t num_rows, int64_t filtered_rows) { + PBlock pblock; + size_t uncompressed_bytes = 0, compressed_bytes = 0; + if (filtered_rows == 0) { + RETURN_IF_ERROR(block->serialize(state->be_exec_version(), &pblock, &uncompressed_bytes, + &compressed_bytes, segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); + } else { + auto cloneBlock = block->clone_without_columns(); + auto res_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + for (int i = 0; i < num_rows; ++i) { + if (block_convertor->num_filtered_rows() > 0 && block_convertor->filter_map()[i]) { + continue; + } + if (tablet_finder->num_filtered_rows() > 0 && tablet_finder->filter_bitmap().Get(i)) { + continue; + } + res_block.add_row(block, i); + } + RETURN_IF_ERROR(res_block.to_block().serialize(state->be_exec_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, + segment_v2::CompressionTypePB::SNAPPY)); + RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector {&pblock})); + } + return Status::OK(); +} +Status VWalWriter::append_block(vectorized::Block* input_block, int64_t num_rows, + int64_t filter_rows, vectorized::Block* block, + OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder) { + RETURN_IF_ERROR( + write_wal(block_convertor, tablet_finder, block, _state, num_rows, filter_rows)); +#ifndef BE_TEST + auto* future_block = assert_cast(input_block); + std::unique_lock l(*(future_block->lock)); + future_block->set_result(Status::OK(), num_rows, num_rows - filter_rows); + future_block->cv->notify_all(); +#endif + return Status::OK(); +} +Status VWalWriter::close() { + if (_wal_writer != nullptr) { + RETURN_IF_ERROR(_wal_writer->finalize()); + } + return Status::OK(); +} +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/sink/writer/vwal_writer.h b/be/src/vec/sink/writer/vwal_writer.h new file mode 100644 index 00000000000000..017f447c32979a --- /dev/null +++ b/be/src/vec/sink/writer/vwal_writer.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "exec/data_sink.h" +#include "exec/tablet_info.h" +#include "gutil/ref_counted.h" +#include "olap/wal_writer.h" +#include "runtime/decimalv2_value.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "runtime/thread_context.h" +#include "runtime/types.h" +#include "util/countdown_latch.h" +#include "util/ref_count_closure.h" +#include "util/runtime_profile.h" +#include "util/spinlock.h" +#include "util/stopwatch.hpp" +#include "vec/columns/column.h" +#include "vec/common/allocator.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/runtime/vfile_format_transformer.h" +#include "vec/sink/vrow_distribution.h" +#include "vec/sink/vtablet_block_convertor.h" +#include "vec/sink/vtablet_finder.h" +#include "vec/sink/writer/async_result_writer.h" +namespace doris { +namespace vectorized { + +class VWalWriter { +public: + VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, RuntimeState* state, + TupleDescriptor* output_tuple_desc); + ~VWalWriter(); + Status init(); + Status write_wal(OlapTableBlockConvertor* block_convertor, OlapTabletFinder* tablet_finder, + vectorized::Block* block, RuntimeState* state, int64_t num_rows, + int64_t filtered_rows); + Status append_block(vectorized::Block* input_block, int64_t num_rows, int64_t filter_rows, + vectorized::Block* block, OlapTableBlockConvertor* block_convertor, + OlapTabletFinder* tablet_finder); + Status close(); + +private: + int64_t _db_id; + int64_t _tb_id; + int64_t _wal_id; + uint32_t _version = 0; + std::string _label; + RuntimeState* _state = nullptr; + TupleDescriptor* _output_tuple_desc = nullptr; + std::shared_ptr _wal_writer = nullptr; +}; +} // namespace vectorized +} // namespace doris \ No newline at end of file diff --git a/be/test/exec/test_data/wal_scanner/wal b/be/test/exec/test_data/wal_scanner/wal index 976c60e8489e470836a46ed1e700b58b0dab4745..2c5fe90963de85c6b38499113bc13f01060b24ef 100644 GIT binary patch literal 180 zcmWG{^f6=r0#*0bX_n zPDW0UkR&53h%#bi<(1)O=U@TKs&j|{Nft&%j>M#7Mizq<216rb6H_yD3rhnA2}Xrn M4Q2xdC%67+0DFWB^#A|> diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 4daa9505ce5fec..c310c8a41f294a 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -960,6 +960,10 @@ TEST_F(VOlapTableSinkTest, group_commit) { auto wal_reader = WalReader(wal_path); st = wal_reader.init(); ASSERT_TRUE(st.ok()); + uint32_t version; + std::string col_ids; + st = wal_reader.read_header(version, col_ids); + ASSERT_TRUE(st.ok()); st = wal_reader.read_block(pblock); ASSERT_TRUE(st.ok()); vectorized::Block wal_block; @@ -1087,6 +1091,10 @@ TEST_F(VOlapTableSinkTest, group_commit_with_filter_row) { auto wal_reader = WalReader(wal_path); st = wal_reader.init(); ASSERT_TRUE(st.ok()); + uint32_t version; + std::string col_ids; + st = wal_reader.read_header(version, col_ids); + ASSERT_TRUE(st.ok()); st = wal_reader.read_block(pblock); ASSERT_TRUE(st.ok()); vectorized::Block wal_block; diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 30f494cf5b8d48..a239741a5cb341 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -106,7 +106,8 @@ void VWalScannerTest::init_desc_table() { TTypeNode node; node.__set_type(TTypeNodeType::SCALAR); TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::INT); + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(32); node.__set_scalar_type(scalar_type); type.types.push_back(node); } @@ -132,7 +133,8 @@ void VWalScannerTest::init_desc_table() { TTypeNode node; node.__set_type(TTypeNodeType::SCALAR); TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::BIGINT); + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(32); node.__set_scalar_type(scalar_type); type.types.push_back(node); } @@ -147,7 +149,7 @@ void VWalScannerTest::init_desc_table() { t_desc_table.slotDescriptors.push_back(slot_desc); } - // k3 + // c3 { TSlotDescriptor slot_desc; @@ -159,7 +161,7 @@ void VWalScannerTest::init_desc_table() { node.__set_type(TTypeNodeType::SCALAR); TScalarType scalar_type; scalar_type.__set_type(TPrimitiveType::VARCHAR); - scalar_type.__set_len(10); + scalar_type.__set_len(32); node.__set_scalar_type(scalar_type); type.types.push_back(node); } @@ -187,7 +189,7 @@ void VWalScannerTest::init_desc_table() { t_desc_table.tupleDescriptors.push_back(t_tuple_desc); } - static_cast(DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl)); + auto st = DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl); _runtime_state.set_desc_tbl(_desc_tbl); } @@ -213,10 +215,15 @@ void VWalScannerTest::init() { _env = ExecEnv::GetInstance(); _env->_wal_manager = WalManager::create_shared(_env, wal_dir); - static_cast(_env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label)); + auto st = _env->_wal_manager->add_wal_path(db_id, tb_id, txn_id, label); } TEST_F(VWalScannerTest, normal) { + std::vector index_vector; + index_vector.emplace_back(0); + index_vector.emplace_back(1); + index_vector.emplace_back(2); + _env->_wal_manager->add_wal_column_index(txn_id, index_vector); // config::group_commit_replay_wal_dir = wal_dir; NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl); scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id); @@ -239,6 +246,7 @@ TEST_F(VWalScannerTest, normal) { _kv_cache.reset(new ShardedKVCache(48)); _runtime_state._wal_id = txn_id; VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range, _profile, _kv_cache.get()); + scanner._is_load = false; vectorized::VExprContextSPtrs _conjuncts; std::unordered_map _colname_to_value_range; std::unordered_map _colname_to_slot_id; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b62faee7aaad9a..424897a5c86d83 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2264,6 +2264,10 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int publish_topic_info_interval_ms = 30000; // 30s + @ConfField(description = {"查询be wal_queue 的超时阈值(ms)", + "the timeout threshold of checking wal_queue on be(ms)"}) + public static int check_wal_queue_timeout_threshold = 180000; // 3 min + @ConfField(mutable = true, masterOnly = true, description = { "对于自动分区表,防止用户意外创建大量分区,每个OLAP表允许的分区数量为`max_auto_partition_num`。默认2000。", "For auto-partitioned tables to prevent users from accidentally creating a large number of partitions, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index f5b13a16c1df51..5ac163bd666f47 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -40,6 +40,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; @@ -47,6 +48,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.GroupCommitManager.SchemaChangeStatus; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -322,7 +324,7 @@ protected void runPendingJob() throws AlterCancelException { } else { // only show at most 3 results List subList = countDownLatch.getLeftMarks().stream().limit(3) - .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") + .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") .collect(Collectors.toList()); errMsg = "Error replicas:" + Joiner.on(", ").join(subList); } @@ -529,12 +531,15 @@ protected void runRunningJob() throws AlterCancelException { int failedTaskCount = failedAgentTasks.get(task.getTabletId()).size(); if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { throw new AlterCancelException("schema change tasks failed on same tablet reach threshold " - + failedAgentTasks.get(task.getTabletId())); + + failedAgentTasks.get(task.getTabletId())); } } } return; } + long maxWalId = Env.getCurrentGlobalTransactionMgr() + .getTransactionIDGenerator().getNextTransactionId(); + waitWalFinished(maxWalId); /* * all tasks are finished. check the integrity. * we just check whether all new replicas are healthy. @@ -580,7 +585,6 @@ protected void runRunningJob() throws AlterCancelException { } // end for tablets } } // end for partitions - // all partitions are good onFinished(tbl); } finally { @@ -598,6 +602,35 @@ protected void runRunningJob() throws AlterCancelException { LOG.info("set table's state to NORMAL, table id: {}, job id: {}", tableId, jobId); } + private void waitWalFinished(long maxWalId) { + // wait wal done here + Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.BLOCK); + LOG.info("block table {}", tableId); + List aliveBeIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; + boolean walFinished = false; + while (System.currentTimeMillis() < expireTime) { + LOG.info("wai for wal queue size to be empty"); + walFinished = Env.getCurrentEnv().getGroupCommitManager() + .isPreviousWalFinished(tableId, maxWalId, aliveBeIds); + if (walFinished) { + LOG.info("all wal is finished"); + break; + } else { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("schema change job sleep wait for wal InterruptedException: ", ie); + } + } + } + if (!walFinished) { + LOG.warn("waitWalFinished time out"); + } + Env.getCurrentEnv().getGroupCommitManager().setStatus(tableId, SchemaChangeStatus.NORMAL); + LOG.info("release table {}", tableId); + } + private void onFinished(OlapTable tbl) { // replace the origin index with shadow index, set index state as NORMAL for (Partition partition : tbl.getPartitions()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index a146a72869a977..51eb0f83ad6ce0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -83,7 +83,7 @@ private void runAlterJobV2() { } List backendTabletIds = invertedIndex.getTabletIdsByBackendId(beId); - if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds)) { + if (Config.drop_backend_after_decommission && checkTablets(beId, backendTabletIds) && checkWal(backend)) { try { systemInfoService.dropBackend(beId); LOG.info("no available tablet on decommission backend {}, drop it", beId); @@ -196,6 +196,11 @@ private boolean checkTablets(Long beId, List backendTabletIds) { return false; } + private boolean checkWal(Backend backend) { + return Env.getCurrentEnv().getGroupCommitManager() + .getAllWalQueueSize(backend) == 0; + } + private List checkDecommission(DecommissionBackendClause decommissionBackendClause) throws DdlException { if (decommissionBackendClause.getHostInfos().isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 24945649912a04..b835476dcbc4e0 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -155,6 +155,7 @@ import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; +import org.apache.doris.load.GroupCommitManager; import org.apache.doris.load.Load; import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.load.loadv2.LoadEtlChecker; @@ -333,6 +334,7 @@ public class Env { private StreamLoadRecordMgr streamLoadRecordMgr; private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr; private RoutineLoadManager routineLoadManager; + private GroupCommitManager groupCommitManager; private SqlBlockRuleMgr sqlBlockRuleMgr; private ExportMgr exportMgr; private SyncJobManager syncJobManager; @@ -603,6 +605,7 @@ private Env(boolean isCheckpointCatalog) { this.catalogMgr = new CatalogMgr(); this.load = new Load(); this.routineLoadManager = new RoutineLoadManager(); + this.groupCommitManager = new GroupCommitManager(); this.sqlBlockRuleMgr = new SqlBlockRuleMgr(); this.exportMgr = new ExportMgr(); this.syncJobManager = new SyncJobManager(); @@ -3778,6 +3781,10 @@ public RoutineLoadManager getRoutineLoadManager() { return routineLoadManager; } + public GroupCommitManager getGroupCommitManager() { + return groupCommitManager; + } + public SqlBlockRuleMgr getSqlBlockRuleMgr() { return sqlBlockRuleMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java new file mode 100644 index 00000000000000..f7822580fb760b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/CheckWalSizeAction.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.ArrayList; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * cal wal size of specific be + * fe_host:fe_http_port/api/get_wal_size?host_ports=host:port,host2:port2... + * return: + * { + * "msg": "OK", + * "code": 0, + * "data": ["192.168.10.11:9050:1", "192.168.10.11:9050:0"], + * "count": 0 + * } + */ + +@RestController +public class CheckWalSizeAction extends RestBaseController { + public static final String HOST_PORTS = "host_ports"; + + @RequestMapping(path = "/api/get_wal_size", method = RequestMethod.GET) + public Object execute(HttpServletRequest request, HttpServletResponse response) { + // check user auth + executeCheckPassword(request, response); + checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.OPERATOR); + + String hostPorts = request.getParameter(HOST_PORTS); + if (Strings.isNullOrEmpty(hostPorts)) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + String[] hostPortArr = hostPorts.split(","); + if (hostPortArr.length == 0) { + return ResponseEntityBuilder.badRequest("No host:port specified"); + } + + List hostInfos = Lists.newArrayList(); + for (String hostPort : hostPortArr) { + try { + HostInfo hostInfo = SystemInfoService.getHostAndPort(hostPort); + hostInfos.add(hostInfo); + } catch (AnalysisException e) { + return ResponseEntityBuilder.badRequest(e.getMessage()); + } + } + + try { + List backends = getBackends(hostInfos); + List backendsList = new ArrayList<>(); + for (Backend backend : backends) { + long size = Env.getCurrentEnv().getGroupCommitManager() + .getAllWalQueueSize(backend); + backendsList.add(backend.getHost() + ":" + backend.getHeartbeatPort() + ":" + size); + } + return ResponseEntityBuilder.ok(backendsList); + } catch (DdlException e) { + return ResponseEntityBuilder.okWithCommonError(e.getMessage()); + } + } + + private List getBackends(List hostInfos) throws DdlException { + SystemInfoService infoService = Env.getCurrentSystemInfo(); + List backends = Lists.newArrayList(); + // check if exist + for (HostInfo hostInfo : hostInfos) { + Backend backend = infoService.getBackendWithHeartbeatPort(hostInfo.getHost(), + hostInfo.getPort()); + if (backend == null) { + throw new DdlException("Backend does not exist[" + + hostInfo.getHost() + + ":" + hostInfo.getPort() + "]"); + } + backends.add(backend); + } + return backends; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 994b7788f45079..32757f2894f714 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -17,7 +17,9 @@ package org.apache.doris.httpv2.rest; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -38,6 +40,7 @@ import io.netty.handler.codec.http.HttpHeaderNames; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -73,7 +76,7 @@ public Object load(HttpServletRequest request, HttpServletResponse response, return entity; } else { executeCheckPassword(request, response); - return executeWithoutPassword(request, response, db, table, false); + return executeWithoutPassword(request, response, db, table, false, false); } } @@ -81,6 +84,22 @@ public Object load(HttpServletRequest request, HttpServletResponse response, public Object streamLoad(HttpServletRequest request, HttpServletResponse response, @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { + boolean groupCommit = false; + String groupCommitStr = request.getHeader("group_commit"); + if (groupCommitStr != null && groupCommitStr.equals("true")) { + groupCommit = true; + try { + String[] pair = new String[] {db, table}; + LOG.info(pair[0] + ":" + pair[1]); + if (isGroupCommitBlock(pair)) { + String msg = "insert table " + pair[1] + " is blocked on schema change"; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); + } + } if (needRedirect(request.getScheme())) { return redirectToHttps(request); } @@ -99,7 +118,7 @@ public Object streamLoad(HttpServletRequest request, } } - return executeWithoutPassword(request, response, db, table, true); + return executeWithoutPassword(request, response, db, table, true, groupCommit); } @RequestMapping(path = "/api/_http_stream", @@ -108,6 +127,21 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse response) { String sql = request.getHeader("sql"); LOG.info("streaming load sql={}", sql); + boolean groupCommit = false; + String groupCommitStr = request.getHeader("group_commit"); + if (groupCommitStr != null && groupCommitStr.equals("true")) { + groupCommit = true; + try { + String[] pair = parseDbAndTb(sql); + if (isGroupCommitBlock(pair)) { + String msg = "insert table " + pair[1] + " is blocked on schema change"; + return new RestBaseResult(msg); + } + } catch (Exception e) { + LOG.info("exception:" + e); + return new RestBaseResult(e.getMessage()); + } + } executeCheckPassword(request, response); try { // A 'Load' request must have 100-continue header @@ -122,7 +156,7 @@ public Object streamLoadWithSql(HttpServletRequest request, String label = request.getHeader(LABEL_KEY); TNetworkAddress redirectAddr; - redirectAddr = selectRedirectBackend(clusterName); + redirectAddr = selectRedirectBackend(clusterName, groupCommit); LOG.info("redirect load action to destination={}, label: {}", redirectAddr.toString(), label); @@ -134,6 +168,43 @@ public Object streamLoadWithSql(HttpServletRequest request, } } + private boolean isGroupCommitBlock(String[] pair) throws TException { + String fullDbName = getFullDbName(pair[0]); + Database dbObj = Env.getCurrentInternalCatalog() + .getDbOrException(fullDbName, s -> new TException("database is invalid for dbName: " + s)); + Table tblObj = dbObj.getTableOrException(pair[1], s -> new TException("table is invalid: " + s)); + return Env.getCurrentEnv().getGroupCommitManager().isBlock(tblObj.getId()); + } + + private String[] parseDbAndTb(String sql) throws Exception { + String[] array = sql.split(" "); + String tmp = null; + int count = 0; + for (String s : array) { + if (!s.equals("")) { + count++; + if (count == 3) { + tmp = s; + break; + } + } + } + if (tmp == null) { + throw new Exception("parse db and tb with wrong sql:" + sql); + } + String pairStr = null; + if (tmp.contains("(")) { + pairStr = tmp.split("\\(")[0]; + } else { + pairStr = tmp; + } + String[] pair = pairStr.split("\\."); + if (pair.length != 2) { + throw new Exception("parse db and tb with wrong sql:" + sql); + } + return pair; + } + @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT) public Object streamLoad2PC(HttpServletRequest request, HttpServletResponse response, @@ -162,7 +233,7 @@ public Object streamLoad2PC_table(HttpServletRequest request, // Same as Multi load, to be compatible with http v1's response body, // we return error by using RestBaseResult. private Object executeWithoutPassword(HttpServletRequest request, - HttpServletResponse response, String db, String table, boolean isStreamLoad) { + HttpServletResponse response, String db, String table, boolean isStreamLoad, boolean groupCommit) { try { String dbName = db; String tableName = table; @@ -213,7 +284,7 @@ private Object executeWithoutPassword(HttpServletRequest request, return new RestBaseResult(e.getMessage()); } } else { - redirectAddr = selectRedirectBackend(clusterName); + redirectAddr = selectRedirectBackend(clusterName, groupCommit); } LOG.info("redirect load action to destination={}, stream: {}, db: {}, tbl: {}, label: {}", @@ -249,7 +320,7 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { return new RestBaseResult("No transaction operation(\'commit\' or \'abort\') selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(clusterName); + TNetworkAddress redirectAddr = selectRedirectBackend(clusterName, false); LOG.info("redirect stream load 2PC action to destination={}, db: {}, txn: {}, operation: {}", redirectAddr.toString(), dbName, request.getHeader(TXN_ID_KEY), txnOperation); @@ -261,18 +332,30 @@ private Object executeStreamLoad2PC(HttpServletRequest request, String db) { } } - private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadException { - String qualifiedUser = ConnectContext.get().getQualifiedUser(); - Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder() - .addTags(userTags) - .needLoadAvailable().build(); - List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (backendIds.isEmpty()) { - throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + private TNetworkAddress selectRedirectBackend(String clusterName, boolean groupCommit) throws LoadException { + Backend backend = null; + BeSelectionPolicy policy = null; + if (groupCommit) { + List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + for (Long backendId : allBackendIds) { + Backend candidateBe = Env.getCurrentSystemInfo().getBackend(backendId); + if (!candidateBe.isDecommissioned()) { + backend = candidateBe; + break; + } + } + } else { + String qualifiedUser = ConnectContext.get().getQualifiedUser(); + Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + policy = new BeSelectionPolicy.Builder() + .addTags(userTags) + .needLoadAvailable().build(); + List backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (backendIds.isEmpty()) { + throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); + } + backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); } - - Backend backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0)); if (backend == null) { throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy); } @@ -340,7 +423,7 @@ private Object executeWithClusterToken(HttpServletRequest request, String db, return new RestBaseResult("No label selected."); } - TNetworkAddress redirectAddr = selectRedirectBackend(clusterName); + TNetworkAddress redirectAddr = selectRedirectBackend(clusterName, false); LOG.info("Redirect load action with auth token to destination={}," + "stream: {}, db: {}, tbl: {}, label: {}", diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java new file mode 100644 index 00000000000000..ec4e01a0b4da13 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; + +public class GroupCommitManager { + + public enum SchemaChangeStatus { + BLOCK, NORMAL + } + + private static final Logger LOG = LogManager.getLogger(GroupCommitManager.class); + + private final Map statusMap = new ConcurrentHashMap<>(); + + public boolean isBlock(long tableId) { + if (statusMap.containsKey(tableId)) { + return statusMap.get(tableId) == SchemaChangeStatus.BLOCK; + } + return false; + } + + public void setStatus(long tableId, SchemaChangeStatus status) { + LOG.debug("Setting status for tableId {}: {}", tableId, status); + statusMap.put(tableId, status); + } + + /** + * Check the wal before the endTransactionId is finished or not. + */ + public boolean isPreviousWalFinished(long tableId, long endTransactionId, List aliveBeIds) { + boolean empty = true; + for (int i = 0; i < aliveBeIds.size(); i++) { + Backend backend = Env.getCurrentSystemInfo().getBackend(aliveBeIds.get(i)); + // in ut port is -1, skip checking + if (backend.getBrpcPort() < 0) { + return true; + } + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(tableId) + .setTxnId(endTransactionId) + .build(); + long size = getWallQueueSize(backend, request); + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",wal size:" + size); + empty = false; + } + } + return empty; + } + + public long getAllWalQueueSize(Backend backend) { + PGetWalQueueSizeRequest request = PGetWalQueueSizeRequest.newBuilder() + .setTableId(-1) + .setTxnId(-1) + .build(); + long size = getWallQueueSize(backend, request); + if (size > 0) { + LOG.info("backend id:" + backend.getId() + ",all wal size:" + size); + } + return size; + } + + public long getWallQueueSize(Backend backend, PGetWalQueueSizeRequest request) { + PGetWalQueueSizeResponse response = null; + long expireTime = System.currentTimeMillis() + Config.check_wal_queue_timeout_threshold; + long size = 0; + while (System.currentTimeMillis() <= expireTime) { + if (!backend.isAlive()) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + try { + Future future = BackendServiceProxy.getInstance() + .getWalQueueSize(new TNetworkAddress(backend.getHost(), backend.getBrpcPort()), request); + response = future.get(); + } catch (Exception e) { + LOG.warn("encounter exception while getting wal queue size on backend id: " + backend.getId() + + ",exception:" + e); + String msg = e.getMessage(); + if (msg.contains("Method") && msg.contains("unimplemented")) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + String msg = "get all queue size fail,backend id: " + backend.getId() + ", status: " + + response.getStatus(); + LOG.warn(msg); + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.info("group commit manager sleep wait InterruptedException: ", ie); + } + continue; + } + size = response.getSize(); + break; + } + return size; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f77a3b9fdca31c..ff2964feb88a4f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -149,6 +149,7 @@ import org.apache.doris.service.FrontendOptions; import org.apache.doris.statistics.ResultRow; import org.apache.doris.statistics.util.InternalQueryBuffer; +import org.apache.doris.system.Backend; import org.apache.doris.task.LoadEtlTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -1854,7 +1855,33 @@ private void handleInsertStmt() throws Exception { txnId = context.getTxnEntry().getTxnConf().getTxnId(); } else if (insertStmt instanceof NativeInsertStmt && ((NativeInsertStmt) insertStmt).isGroupCommit()) { isGroupCommit = true; + if (Env.getCurrentEnv().getGroupCommitManager().isBlock(insertStmt.getTargetTable().getId())) { + String msg = "insert table " + insertStmt.getTargetTable().getId() + " is blocked on schema change"; + LOG.info(msg); + throw new DdlException(msg); + } NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) insertStmt; + Backend backend = context.getInsertGroupCommit(insertStmt.getTargetTable().getId()); + if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { + List allBackendIds = Env.getCurrentSystemInfo().getAllBackendIds(true); + if (allBackendIds.isEmpty()) { + throw new DdlException("No alive backend"); + } + Collections.shuffle(allBackendIds); + boolean find = false; + for (Long beId : allBackendIds) { + backend = Env.getCurrentSystemInfo().getBackend(beId); + if (!backend.isDecommissioned()) { + context.setInsertGroupCommit(insertStmt.getTargetTable().getId(), backend); + find = true; + LOG.debug("choose new be {}", backend.getId()); + break; + } + } + if (!find) { + throw new DdlException("No suitable backend"); + } + } int maxRetry = 3; for (int i = 0; i < maxRetry; i++) { GroupCommitPlanner groupCommitPlanner = nativeInsertStmt.planForGroupCommit(context.queryId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 17ba83eb63d714..9535d075e24645 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -161,6 +161,12 @@ public Future groupCommitInsert( return stub.groupCommitInsert(request); } + public Future getWalQueueSize( + InternalService.PGetWalQueueSizeRequest request) { + return stub.getWalQueueSize(request); + } + + public void shutdown() { if (!channel.isShutdown()) { channel.shutdown(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 229391a3dc298f..80767acecfdf8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -23,6 +23,8 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest; +import org.apache.doris.proto.InternalService.PGetWalQueueSizeResponse; import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest; import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse; import org.apache.doris.proto.Types; @@ -451,4 +453,18 @@ public Future groupCommitInsert(TNetworkAddress addr throw new RpcException(address.hostname, e.getMessage()); } } + + public Future getWalQueueSize(TNetworkAddress address, + PGetWalQueueSizeRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.getWalQueueSize(request); + } catch (Throwable e) { + LOG.warn("failed to get wal queue size from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 65fc162279b5d9..b589b0fee06f09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -143,6 +143,8 @@ import org.apache.doris.thrift.TGetBinlogLagResult; import org.apache.doris.thrift.TGetBinlogRequest; import org.apache.doris.thrift.TGetBinlogResult; +import org.apache.doris.thrift.TGetColumnInfoRequest; +import org.apache.doris.thrift.TGetColumnInfoResult; import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetMasterTokenRequest; @@ -3479,6 +3481,45 @@ private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp) } } + + @Override + public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) { + TGetColumnInfoResult result = new TGetColumnInfoResult(); + TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); + long dbId = request.getDbId(); + long tableId = request.getTableId(); + if (!Env.getCurrentEnv().isMaster()) { + errorStatus.setStatusCode(TStatusCode.NOT_MASTER); + errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG); + return result; + } + + Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId); + if (db == null) { + errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId))); + result.setStatus(errorStatus); + return result; + } + + Table table = db.getTable(tableId).get(); + if (table == null) { + errorStatus.setErrorMsgs( + (Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId)))); + result.setStatus(errorStatus); + return result; + } + StringBuilder sb = new StringBuilder(); + for (Column column : table.getFullSchema()) { + sb.append(column.getName() + ":" + column.getUniqueId() + ","); + } + String columnInfo = sb.toString(); + columnInfo = columnInfo.substring(0, columnInfo.length() - 1); + result.setStatus(new TStatus(TStatusCode.OK)); + result.setColumnInfo(columnInfo); + return result; + } + public TGetBackendMetaResult getBackendMeta(TGetBackendMetaRequest request) throws TException { String clientAddr = getClientAddrAsString(); LOG.debug("receive get backend meta request: {}", request); diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 6d2722ad4ec9c9..2bc85f183dec58 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -114,10 +114,33 @@ public void testDecommissionBackend() throws Exception { @Test public void testDecommissionBackendById() throws Exception { - + // 1. create connect context + connectContext = createDefaultCtx(); ImmutableMap idToBackendRef = Env.getCurrentSystemInfo().getIdToBackend(); - Backend srcBackend = idToBackendRef.values().asList().get(0); + Assertions.assertEquals(backendNum(), idToBackendRef.size()); + + // 2. create database db1 + createDatabase("db2"); + System.out.println(Env.getCurrentInternalCatalog().getDbNames()); + + // 3. create table tbl1 + createTable("create table db2.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + + // 4. query tablet num + int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); + Assertions.assertTrue(tabletNum > 0); + + // 5. execute decommission + Backend srcBackend = null; + for (Backend backend : idToBackendRef.values()) { + if (!Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).isEmpty()) { + srcBackend = backend; + break; + } + } + Assertions.assertNotNull(srcBackend); + // decommission backend by id String decommissionByIdStmtStr = "alter system decommission backend \"" + srcBackend.getId() + "\""; AlterSystemStmt decommissionByIdStmt = (AlterSystemStmt) parseAndAnalyzeStmt(decommissionByIdStmtStr); @@ -148,17 +171,17 @@ public void testDecommissionBackendWithDropTable() throws Exception { ImmutableMap idToBackendRef = infoService.getIdToBackend(); Assertions.assertEquals(backendNum(), idToBackendRef.size()); - // 2. create database db2 - createDatabase("db2"); + // 2. create database db3 + createDatabase("db3"); System.out.println(Env.getCurrentInternalCatalog().getDbNames()); long availableBeNum = infoService.getAllBackendIds(true).stream() .filter(beId -> infoService.checkBackendScheduleAvailable(beId)).count(); // 3. create table tbl1 tbl2 - createTable("create table db2.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '" + createTable("create table db3.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '" + availableBeNum + "');"); - createTable("create table db2.tbl2(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); + createTable("create table db3.tbl2(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');"); // 4. query tablet num int tabletNum = Env.getCurrentInvertedIndex().getTabletMetaMap().size(); @@ -177,7 +200,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { Assertions.assertNotNull(srcBackend); // 5. drop table tbl1 - dropTable("db2.tbl1", false); + dropTable("db3.tbl1", false); // 6. execute decommission String decommissionStmtStr = "alter system decommission backend \"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\""; @@ -195,7 +218,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { Assertions.assertEquals(backendNum() - 1, Env.getCurrentSystemInfo().getIdToBackend().size()); // tbl1 has been dropped successfully - final String sql = "show create table db2.tbl1;"; + final String sql = "show create table db3.tbl1;"; Assertions.assertThrows(AnalysisException.class, () -> showCreateTable(sql)); // TabletInvertedIndex still holds these tablets of srcBackend, but they are all in recycled status @@ -204,7 +227,7 @@ public void testDecommissionBackendWithDropTable() throws Exception { Assertions.assertTrue(Env.getCurrentRecycleBin().allTabletsInRecycledStatus(tabletList)); // recover tbl1, because tbl1 has more than one replica, so it still can be recovered - Assertions.assertDoesNotThrow(() -> recoverTable("db2.tbl1")); + Assertions.assertDoesNotThrow(() -> recoverTable("db3.tbl1")); Assertions.assertDoesNotThrow(() -> showCreateTable(sql)); addNewBackend(); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index cf45d08a336a69..2a9260c450ac9b 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -790,6 +790,16 @@ message PStreamHeader { repeated PTabletID tablets = 10; } +message PGetWalQueueSizeRequest{ + optional int64 table_id = 1; + optional int64 txn_id = 2; +} + +message PGetWalQueueSizeResponse{ + required PStatus status = 1; + optional int64 size = 2; +} + service PBackendService { rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult); rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult); @@ -831,6 +841,7 @@ service PBackendService { rpc report_stream_load_status(PReportStreamLoadStatusRequest) returns (PReportStreamLoadStatusResponse); rpc glob(PGlobRequest) returns (PGlobResponse); rpc group_commit_insert(PGroupCommitInsertRequest) returns (PGroupCommitInsertResponse); + rpc get_wal_queue_size(PGetWalQueueSizeRequest) returns(PGetWalQueueSizeResponse); rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); }; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 42f4016d10d056..7552167ff9213d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1292,6 +1292,16 @@ struct TGetBackendMetaResult { 2: optional list backends } +struct TGetColumnInfoRequest { + 1: optional i64 db_id + 2: optional i64 table_id +} + +struct TGetColumnInfoResult { + 1: optional Status.TStatus status + 2: optional string column_info +} + service FrontendService { TGetDbsResult getDbNames(1: TGetDbsParams params) TGetTablesResult getTableNames(1: TGetTablesParams params) @@ -1367,4 +1377,6 @@ service FrontendService { TGetMetaResult getMeta(1: TGetMetaRequest request) TGetBackendMetaResult getBackendMeta(1: TGetBackendMetaRequest request) + + TGetColumnInfoResult getColumnInfo(1: TGetColumnInfoRequest request) }