Skip to content

Commit

Permalink
Merge branch 'master' into nereids_sql_block_rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Vallishp authored Nov 15, 2024
2 parents 80dd59a + 608b0a8 commit 5f9049a
Show file tree
Hide file tree
Showing 53 changed files with 733 additions and 523 deletions.
1 change: 1 addition & 0 deletions be/src/exec/lzo_decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ Status LzopDecompressor::decompress(uint8_t* input, size_t input_len, size_t* in
ptr = get_uint32(ptr, &uncompressed_size);
left_input_len -= sizeof(uint32_t);
if (uncompressed_size == 0) {
*input_bytes_read += sizeof(uint32_t);
*stream_end = true;
return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/tablet_info.h"

#include <butil/logging.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Partitions_types.h>
Expand Down Expand Up @@ -180,6 +181,17 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
auto it = slots_map.find(to_lower(pcolumn_desc.name()) + "+" + data_type_str +
is_null_str);
if (it == std::end(slots_map)) {
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\npschema={}",
pcolumn_desc.name(), pcolumn_desc.type(), data_type_str, is_null_str,
keys, debug_string(), pschema.ShortDebugString());

return Status::InternalError("unknown index column, column={}, type={}",
pcolumn_desc.name(), pcolumn_desc.type());
}
Expand Down Expand Up @@ -286,6 +298,18 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema) {
auto it = slots_map.find(to_lower(tcolumn_desc.column_name) + "+" + data_type_str +
is_null_str);
if (it == slots_map.end()) {
std::stringstream ss;
ss << tschema;
std::string keys {};
for (const auto& [key, _] : slots_map) {
keys += fmt::format("{},", key);
}
LOG_EVERY_SECOND(WARNING) << fmt::format(
"[OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema)]: "
"unknown index column, column={}, type={}, data_type_str={}, "
"is_null_str={}, slots_map.keys()=[{}], {}\ntschema={}",
tcolumn_desc.column_name, tcolumn_desc.column_type.type, data_type_str,
is_null_str, keys, debug_string(), ss.str());
return Status::InternalError("unknown index column, column={}, type={}",
tcolumn_desc.column_name,
tcolumn_desc.column_type.type);
Expand Down
38 changes: 0 additions & 38 deletions be/src/exprs/bitmapfilter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ namespace doris {
// only used in Runtime Filter
class BitmapFilterFuncBase : public RuntimeFilterFuncBase {
public:
virtual void insert(const void* data) = 0;
virtual void insert_many(const std::vector<const BitmapValue*>& bitmaps) = 0;
virtual bool empty() = 0;
virtual Status assign(BitmapValue* bitmap_value) = 0;
virtual void light_copy(BitmapFilterFuncBase* other) { _not_in = other->_not_in; }
virtual uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap,
uint16_t* offsets, int number) = 0;
virtual void find_batch(const char* data, const uint8* nullmap, size_t number,
Expand All @@ -58,8 +54,6 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {

~BitmapFilterFunc() override = default;

void insert(const void* data) override;

void insert_many(const std::vector<const BitmapValue*>& bitmaps) override;

uint16_t find_fixed_len_olap_engine(const char* data, const uint8* nullmap, uint16_t* offsets,
Expand All @@ -68,45 +62,21 @@ class BitmapFilterFunc : public BitmapFilterFuncBase {
void find_batch(const char* data, const uint8* nullmap, size_t number,
uint8* results) const override;

bool empty() override { return _bitmap_value->empty(); }

Status assign(BitmapValue* bitmap_value) override {
*_bitmap_value = *bitmap_value;
return Status::OK();
}

void light_copy(BitmapFilterFuncBase* bitmapfilter_func) override;

size_t size() const override { return _bitmap_value->cardinality(); }

uint64_t max() { return _bitmap_value->max(nullptr); }

uint64_t min() { return _bitmap_value->min(nullptr); }

bool contains_any(CppType left, CppType right) {
if (right < 0) {
return false;
}
return _bitmap_value->contains_any(std::max(left, (CppType)0), right);
}

std::shared_ptr<BitmapValue> get_inner_bitmap() { return _bitmap_value; }

private:
std::shared_ptr<BitmapValue> _bitmap_value;

bool find(CppType data) const { return _not_in ^ (data >= 0 && _bitmap_value->contains(data)); }
};

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert(const void* data) {
if (data == nullptr) {
return;
}

*_bitmap_value |= *reinterpret_cast<const BitmapValue*>(data);
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::insert_many(const std::vector<const BitmapValue*>& bitmaps) {
if (bitmaps.empty()) {
Expand Down Expand Up @@ -147,12 +117,4 @@ void BitmapFilterFunc<type>::find_batch(const char* data, const uint8* nullmap,
}
}

template <PrimitiveType type>
void BitmapFilterFunc<type>::light_copy(BitmapFilterFuncBase* bitmapfilter_func) {
BitmapFilterFuncBase::light_copy(bitmapfilter_func);
auto other_func = reinterpret_cast<BitmapFilterFunc*>(bitmapfilter_func);
_bitmap_value = other_func->_bitmap_value;
set_filter_id(bitmapfilter_func->get_filter_id());
}

} // namespace doris
24 changes: 10 additions & 14 deletions be/src/exprs/create_predicate_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include "common/exception.h"
#include "common/status.h"
#include "exprs/hybrid_set.h"
#include "exprs/minmax_predicate.h"
#include "function_filter.h"
Expand Down Expand Up @@ -244,12 +246,9 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
int be_exec_version, const TabletColumn*) {
if constexpr (PT == TYPE_TINYINT || PT == TYPE_SMALLINT || PT == TYPE_INT ||
PT == TYPE_BIGINT) {
std::shared_ptr<BitmapFilterFuncBase> filter_olap;
filter_olap.reset(create_bitmap_filter(PT));
filter_olap->light_copy(filter.get());
return new BitmapFilterColumnPredicate<PT>(column_id, filter, be_exec_version);
} else {
return nullptr;
throw Exception(ErrorCode::INTERNAL_ERROR, "bitmap filter do not support type {}", PT);
}
}

Expand All @@ -266,17 +265,14 @@ ColumnPredicate* create_olap_column_predicate(uint32_t column_id,
const std::shared_ptr<FunctionFilter>& filter, int,
const TabletColumn* column = nullptr) {
// currently only support like predicate
if constexpr (PT == TYPE_CHAR || PT == TYPE_VARCHAR || PT == TYPE_STRING) {
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id,
filter->_fn_ctx, filter->_string_param);
}
} else {
return nullptr;
if constexpr (PT == TYPE_CHAR) {
return new LikeColumnPredicate<TYPE_CHAR>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
} else if constexpr (PT == TYPE_VARCHAR || PT == TYPE_STRING) {
return new LikeColumnPredicate<TYPE_STRING>(filter->_opposite, column_id, filter->_fn_ctx,
filter->_string_param);
}
throw Exception(ErrorCode::INTERNAL_ERROR, "function filter do not support type {}", PT);
}

template <typename T>
Expand Down
22 changes: 18 additions & 4 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "http/http_headers.h"
#include "http/http_status.h"
#include "runtime/exec_env.h"
#include "util/security.h"
#include "util/stack_util.h"

namespace doris {
Expand Down Expand Up @@ -205,9 +206,11 @@ Status HttpClient::execute(const std::function<bool(const void* data, size_t len
_callback = &callback;
auto code = curl_easy_perform(_curl);
if (code != CURLE_OK) {
std::string url = mask_token(_get_url());
LOG(WARNING) << "fail to execute HTTP client, errmsg=" << _to_errmsg(code)
<< ", trace=" << get_stack_trace();
return Status::HttpError(_to_errmsg(code));
<< ", trace=" << get_stack_trace() << ", url=" << url;
std::string errmsg = fmt::format("{}, url={}", _to_errmsg(code), url);
return Status::HttpError(std::move(errmsg));
}
return Status::OK();
}
Expand Down Expand Up @@ -275,13 +278,22 @@ Status HttpClient::execute(std::string* response) {
return execute(callback);
}

const char* HttpClient::_to_errmsg(CURLcode code) {
const char* HttpClient::_to_errmsg(CURLcode code) const {
if (_error_buf[0] == 0) {
return curl_easy_strerror(code);
}
return _error_buf;
}

const char* HttpClient::_get_url() const {
const char* url = nullptr;
curl_easy_getinfo(_curl, CURLINFO_EFFECTIVE_URL, &url);
if (!url) {
url = "<unknown>";
}
return url;
}

Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
const std::function<Status(HttpClient*)>& callback) {
Status status;
Expand All @@ -293,7 +305,9 @@ Status HttpClient::execute_with_retry(int retry_times, int sleep_time,
if (http_status == 200) {
return status;
} else {
auto error_msg = fmt::format("http status code is not 200, code={}", http_status);
std::string url = mask_token(client._get_url());
auto error_msg = fmt::format("http status code is not 200, code={}, url={}",
http_status, url);
LOG(WARNING) << error_msg;
return Status::HttpError(error_msg);
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class HttpClient {
Status _escape_url(const std::string& url, std::string* escaped_url);

private:
const char* _to_errmsg(CURLcode code);
const char* _to_errmsg(CURLcode code) const;
const char* _get_url() const;

private:
CURL* _curl = nullptr;
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/bloom_filter_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class BloomFilterColumnPredicate : public ColumnPredicate {
uint16_t evaluate(const vectorized::IColumn& column, const uint8_t* null_map, uint16_t* sel,
uint16_t size) const {
if constexpr (is_nullable) {
DCHECK(null_map);
if (!null_map) {
throw Exception(ErrorCode::INTERNAL_ERROR, "null_map is nullptr");
}
}

uint16_t new_size = 0;
Expand All @@ -91,7 +93,9 @@ class BloomFilterColumnPredicate : public ColumnPredicate {

int get_filter_id() const override {
int filter_id = _filter->get_filter_id();
DCHECK(filter_id != -1);
if (filter_id == 1) {
throw Exception(ErrorCode::INTERNAL_ERROR, "filter_id is -1");
}
return filter_id;
}

Expand Down
14 changes: 5 additions & 9 deletions be/src/olap/single_replica_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "task/engine_clone_task.h"
#include "util/brpc_client_cache.h"
#include "util/doris_metrics.h"
#include "util/security.h"
#include "util/thrift_rpc_helper.h"
#include "util/trace.h"

Expand Down Expand Up @@ -373,7 +374,7 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
// then it will try to clone from BE 2, but it will find the file 1 already exist, but file 1 with same
// name may have different versions.
VLOG_DEBUG << "single replica compaction begin to download files, remote path="
<< _mask_token(remote_url_prefix) << " local_path=" << local_path;
<< mask_token(remote_url_prefix) << " local_path=" << local_path;
RETURN_IF_ERROR(io::global_local_filesystem()->delete_directory(local_path));
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(local_path));

Expand Down Expand Up @@ -438,10 +439,10 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
std::string local_file_path = local_path + file_name;

LOG(INFO) << "single replica compaction begin to download file from: "
<< _mask_token(remote_file_url) << " to: " << local_file_path
<< mask_token(remote_file_url) << " to: " << local_file_path
<< ". size(B): " << file_size << ", timeout(s): " << estimate_timeout;

auto download_cb = [this, &remote_file_url, estimate_timeout, &local_file_path,
auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
Expand All @@ -453,7 +454,7 @@ Status SingleReplicaCompaction::_download_files(DataDir* data_dir,
uint64_t local_file_size = std::filesystem::file_size(local_file_path);
if (local_file_size != file_size) {
LOG(WARNING) << "download file length error"
<< ", remote_path=" << _mask_token(remote_file_url)
<< ", remote_path=" << mask_token(remote_file_url)
<< ", file_size=" << file_size
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
Expand Down Expand Up @@ -585,9 +586,4 @@ Status SingleReplicaCompaction::_finish_clone(const string& clone_dir,
return res;
}

std::string SingleReplicaCompaction::_mask_token(const std::string& str) {
std::regex pattern("token=[\\w|-]+");
return regex_replace(str, pattern, "token=******");
}

} // namespace doris
1 change: 0 additions & 1 deletion be/src/olap/single_replica_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class SingleReplicaCompaction final : public CompactionMixin {
const std::string& local_path);
Status _release_snapshot(const std::string& ip, int port, const std::string& snapshot_path);
Status _finish_clone(const std::string& clone_dir, const Version& version);
std::string _mask_token(const std::string& str);
CompactionType _compaction_type;

std::vector<PendingRowsetGuard> _pending_rs_guards;
Expand Down
17 changes: 6 additions & 11 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include <memory>
#include <mutex>
#include <ostream>
#include <regex>
#include <set>
#include <shared_mutex>
#include <system_error>
Expand Down Expand Up @@ -64,6 +63,7 @@
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/network_util.h"
#include "util/security.h"
#include "util/stopwatch.hpp"
#include "util/thrift_rpc_helper.h"
#include "util/trace.h"
Expand Down Expand Up @@ -415,7 +415,7 @@ Status EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
status = _download_files(&data_dir, remote_url_prefix, local_data_path);
if (!status.ok()) [[unlikely]] {
LOG_WARNING("failed to download snapshot from remote BE")
.tag("url", _mask_token(remote_url_prefix))
.tag("url", mask_token(remote_url_prefix))
.error(status);
continue; // Try another BE
}
Expand Down Expand Up @@ -552,11 +552,11 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re

std::string local_file_path = local_path + "/" + file_name;

LOG(INFO) << "clone begin to download file from: " << _mask_token(remote_file_url)
LOG(INFO) << "clone begin to download file from: " << mask_token(remote_file_url)
<< " to: " << local_file_path << ". size(B): " << file_size
<< ", timeout(s): " << estimate_timeout;

auto download_cb = [this, &remote_file_url, estimate_timeout, &local_file_path,
auto download_cb = [&remote_file_url, estimate_timeout, &local_file_path,
file_size](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_file_url));
client->set_timeout_ms(estimate_timeout * 1000);
Expand All @@ -572,7 +572,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re
}
if (local_file_size != file_size) {
LOG(WARNING) << "download file length error"
<< ", remote_path=" << _mask_token(remote_file_url)
<< ", remote_path=" << mask_token(remote_file_url)
<< ", file_size=" << file_size
<< ", local_file_size=" << local_file_size;
return Status::InternalError("downloaded file size is not equal");
Expand Down Expand Up @@ -600,7 +600,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re

/// This method will only be called if tablet already exist in this BE when doing clone.
/// This method will do the following things:
/// 1. Linke all files from CLONE dir to tablet dir if file does not exist in tablet dir
/// 1. Link all files from CLONE dir to tablet dir if file does not exist in tablet dir
/// 2. Call _finish_xx_clone() to revise the tablet meta.
Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_dir, int64_t version,
bool is_incremental_clone) {
Expand Down Expand Up @@ -864,9 +864,4 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
// TODO(plat1ko): write cooldown meta to remote if this replica is cooldown replica
}

std::string EngineCloneTask::_mask_token(const std::string& str) {
std::regex pattern("token=[\\w|-]+");
return regex_replace(str, pattern, "token=******");
}

} // namespace doris
Loading

0 comments on commit 5f9049a

Please sign in to comment.