Skip to content

Commit

Permalink
[improvement](insert) support schema change and decommission for grou…
Browse files Browse the repository at this point in the history
…p commit (apache#26359)
  • Loading branch information
hust-hhb authored Nov 17, 2023
1 parent e3e249c commit 5d54893
Show file tree
Hide file tree
Showing 37 changed files with 1,305 additions and 185 deletions.
1 change: 0 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,6 @@ DEFINE_Int16(bitmap_serialize_version, "1");
DEFINE_String(group_commit_replay_wal_dir, "./wal");
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
DEFINE_Int32(group_commit_sync_wal_batch, "10");
DEFINE_Bool(wait_internal_group_commit_finish, "false");

// the count of thread to group commit insert
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,6 @@ DECLARE_Int16(bitmap_serialize_version);
DECLARE_String(group_commit_replay_wal_dir);
DECLARE_Int32(group_commit_replay_wal_retry_num);
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
DECLARE_Int32(group_commit_sync_wal_batch);
DECLARE_Bool(wait_internal_group_commit_finish);

// This config can be set to limit thread number in group commit insert thread pool.
Expand Down
138 changes: 127 additions & 11 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

namespace doris {
WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
: _exec_env(exec_env), _stop_background_threads_latch(1), _stop(false) {
doris::vectorized::WalReader::string_split(wal_dir_list, ",", _wal_dirs);
_all_wal_disk_bytes = std::make_shared<std::atomic_size_t>(0);
}
Expand All @@ -48,12 +48,15 @@ WalManager::~WalManager() {
}

void WalManager::stop() {
_stop = true;
_stop_background_threads_latch.count_down();
if (_replay_thread) {
_replay_thread->join();
if (!this->_stop.load()) {
this->_stop.store(true);
stop_relay_wal();
_stop_background_threads_latch.count_down();
if (_replay_thread) {
_replay_thread->join();
}
LOG(INFO) << "WalManager is stopped";
}
LOG(INFO) << "WalManager is stopped";
}

Status WalManager::init() {
Expand All @@ -76,6 +79,78 @@ Status WalManager::init() {
&_replay_thread);
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id << ",status:" << wal_status;
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
std::unordered_map<int64_t, WAL_STATUS> tmp;
tmp.emplace(wal_id, wal_status);
_wal_status_queues.emplace(table_id, tmp);
} else {
it->second.emplace(wal_id, wal_status);
}
}

Status WalManager::erase_wal_status_queue(int64_t table_id, int64_t wal_id) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
auto it = _wal_status_queues.find(table_id);
LOG(INFO) << "remove wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id;
if (it == _wal_status_queues.end()) {
return Status::InternalError("table_id " + std::to_string(table_id) +
" not found in wal status queue");
} else {
it->second.erase(wal_id);
if (it->second.empty()) {
_wal_status_queues.erase(table_id);
}
}
return Status::OK();
}

Status WalManager::get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
size_t count = 0;
auto table_id = request->table_id();
auto txn_id = request->txn_id();
if (table_id > 0 && txn_id > 0) {
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
LOG(INFO) << ("table_id " + std::to_string(table_id) +
" not found in wal status queue");
} else {
for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
if (wal_it->first <= txn_id) {
count += 1;
}
}
}
} else {
for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); it++) {
count += it->second.size();
}
}
response->set_size(count);
if (count > 0) {
print_wal_status_queue();
}
return Status::OK();
}

void WalManager::print_wal_status_queue() {
std::stringstream ss;
for (auto it = _wal_status_queues.begin(); it != _wal_status_queues.end(); ++it) {
ss << "table_id:" << it->first << std::endl;
for (auto wal_it = it->second.begin(); wal_it != it->second.end(); ++wal_it) {
ss << "wal_id:" << wal_it->first << ",status:" << wal_it->second << std::endl;
}
}
LOG(INFO) << ss.str();
}

Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id,
const std::string& label) {
std::string base_path =
Expand Down Expand Up @@ -132,7 +207,6 @@ Status WalManager::create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
}
return Status::OK();
}

Status WalManager::scan_wals(const std::string& wal_path) {
size_t count = 0;
bool exists = true;
Expand Down Expand Up @@ -174,8 +248,16 @@ Status WalManager::scan_wals(const std::string& wal_path) {
res.emplace_back(wal_file);
{
std::lock_guard<std::shared_mutex> wrlock(_wal_lock);
int64_t wal_id = std::strtoll(wal.file_name.c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
auto pos = wal.file_name.find("_");
try {
int64_t wal_id =
std::strtoll(wal.file_name.substr(0, pos).c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
int64_t tb_id = std::strtoll(table_id.file_name.c_str(), NULL, 10);
add_wal_status_queue(tb_id, wal_id, WalManager::WAL_STATUS::REPLAY);
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
}
st = add_recover_wal(db_id.file_name, table_id.file_name, res);
Expand All @@ -193,11 +275,12 @@ Status WalManager::scan_wals(const std::string& wal_path) {

Status WalManager::replay() {
do {
if (_stop || _exec_env->master_info() == nullptr) {
if (_stop.load()) {
break;
}
// port == 0 means not received heartbeat yet
if (_exec_env->master_info()->network_address.port == 0) {
if (_exec_env->master_info() != nullptr &&
_exec_env->master_info()->network_address.port == 0) {
continue;
}
std::vector<std::string> replay_tables;
Expand Down Expand Up @@ -276,4 +359,37 @@ Status WalManager::delete_wal(int64_t wal_id) {
return Status::OK();
}

bool WalManager::is_running() {
return !_stop.load();
}

void WalManager::stop_relay_wal() {
std::lock_guard<std::shared_mutex> wrlock(_lock);
for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
it->second->stop();
}
}

void WalManager::add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
_wal_column_id_map.emplace(wal_id, column_index);
}

void WalManager::erase_wal_column_index(int64_t wal_id) {
if (_wal_column_id_map.erase(wal_id)) {
LOG(INFO) << "erase " << wal_id << " from wal_column_id_map";
} else {
LOG(WARNING) << "fail to erase wal " << wal_id << " from wal_column_id_map";
}
}

Status WalManager::get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index) {
auto it = _wal_column_id_map.find(wal_id);
if (it != _wal_column_id_map.end()) {
column_index = it->second;
} else {
return Status::InternalError("cannot find wal {} in wal_column_id_map", wal_id);
}
return Status::OK();
}

} // namespace doris
26 changes: 25 additions & 1 deletion be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <gen_cpp/PaloInternalService_types.h>

#include <memory>

#include "common/config.h"
Expand All @@ -32,6 +34,13 @@ namespace doris {
class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);

public:
enum WAL_STATUS {
PREPARE = 0,
REPLAY,
CREATE,
};

public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
Expand All @@ -47,7 +56,19 @@ class WalManager {
std::vector<std::string> wals);
Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
Status get_all_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status);
Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
void print_wal_status_queue();
void stop();
bool is_running();
void stop_relay_wal();
void add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);

private:
ExecEnv* _exec_env;
Expand All @@ -57,9 +78,12 @@ class WalManager {
std::map<std::string, std::shared_ptr<WalTable>> _table_map;
std::vector<std::string> _wal_dirs;
std::shared_mutex _wal_lock;
std::shared_mutex _wal_status_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>> _wal_id_to_writer_map;
std::shared_ptr<std::atomic_size_t> _all_wal_disk_bytes;
bool _stop = false;
std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>> _wal_status_queues;
std::atomic<bool> _stop;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
};
} // namespace doris
35 changes: 31 additions & 4 deletions be/src/olap/wal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/fs/file_reader.h"
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "wal_writer.h"

Expand Down Expand Up @@ -52,8 +53,7 @@ Status WalReader::read_block(PBlock& block) {
RETURN_IF_ERROR(
file_reader->read_at(_offset, {row_len_buf, WalWriter::LENGTH_SIZE}, &bytes_read));
_offset += WalWriter::LENGTH_SIZE;
size_t block_len;
memcpy(&block_len, row_len_buf, WalWriter::LENGTH_SIZE);
size_t block_len = decode_fixed64_le(row_len_buf);
// read block
std::string block_buf;
block_buf.resize(block_len);
Expand All @@ -65,12 +65,39 @@ Status WalReader::read_block(PBlock& block) {
RETURN_IF_ERROR(file_reader->read_at(_offset, {checksum_len_buf, WalWriter::CHECKSUM_SIZE},
&bytes_read));
_offset += WalWriter::CHECKSUM_SIZE;
uint32_t checksum;
memcpy(&checksum, checksum_len_buf, WalWriter::CHECKSUM_SIZE);
uint32_t checksum = decode_fixed32_le(checksum_len_buf);
RETURN_IF_ERROR(_check_checksum(block_buf.data(), block_len, checksum));
return Status::OK();
}

Status WalReader::read_header(uint32_t& version, std::string& col_ids) {
size_t bytes_read = 0;
std::string magic_str;
magic_str.resize(k_wal_magic_length);
RETURN_IF_ERROR(file_reader->read_at(_offset, magic_str, &bytes_read));
if (strcmp(magic_str.c_str(), k_wal_magic) != 0) {
return Status::Corruption("Bad wal file {}: magic number not match", _file_name);
}
_offset += k_wal_magic_length;
uint8_t version_buf[WalWriter::VERSION_SIZE];
RETURN_IF_ERROR(
file_reader->read_at(_offset, {version_buf, WalWriter::VERSION_SIZE}, &bytes_read));
_offset += WalWriter::VERSION_SIZE;
version = decode_fixed32_le(version_buf);
uint8_t len_buf[WalWriter::LENGTH_SIZE];
RETURN_IF_ERROR(file_reader->read_at(_offset, {len_buf, WalWriter::LENGTH_SIZE}, &bytes_read));
_offset += WalWriter::LENGTH_SIZE;
size_t len = decode_fixed64_le(len_buf);
col_ids.resize(len);
RETURN_IF_ERROR(file_reader->read_at(_offset, col_ids, &bytes_read));
_offset += len;
if (len != bytes_read) {
return Status::InternalError("failed to read header expected= " + std::to_string(len) +
",actually=" + std::to_string(bytes_read));
}
return Status::OK();
}

Status WalReader::_deserialize(PBlock& block, std::string& buf) {
if (UNLIKELY(!block.ParseFromString(buf))) {
return Status::InternalError("failed to deserialize row");
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/wal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class WalReader {
Status finalize();

Status read_block(PBlock& block);
Status read_header(uint32_t& version, std::string& col_ids);

private:
Status _deserialize(PBlock& block, std::string& buf);
Status _check_checksum(const char* binary, size_t size, uint32_t checksum);

private:
std::string _file_name;
size_t _offset;
io::FileReaderSPtr file_reader;
Expand Down
Loading

0 comments on commit 5d54893

Please sign in to comment.