Skip to content

Commit

Permalink
Merge branch 'master' into pipelineX-union
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Nov 21, 2023
2 parents 3775a4d + 6e86bf5 commit e93ce01
Show file tree
Hide file tree
Showing 670 changed files with 68,089 additions and 1,432 deletions.
1 change: 0 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,6 @@ DEFINE_Bool(wait_internal_group_commit_finish, "false");

// the count of thread to group commit insert
DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_mInt32(group_commit_interval_ms, "10000");

DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
Expand Down
1 change: 0 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,6 @@ DECLARE_Bool(wait_internal_group_commit_finish);

// This config can be set to limit thread number in group commit insert thread pool.
DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_interval_ms);

// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
Expand Down
4 changes: 0 additions & 4 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,6 @@ class ComparisonPredicateBase : public ColumnPredicate {
return false;
}

DCHECK(sizeof(T) <= statistic.first->size() || Type == TYPE_DATE)
<< " Type: " << Type << " sizeof(T): " << sizeof(T)
<< " statistic.first->size(): " << statistic.first->size();

T tmp_min_value = get_zone_map_value<Type, T>(statistic.first->cell_ptr());
T tmp_max_value = get_zone_map_value<Type, T>(statistic.second->cell_ptr());

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ vectorized::IColumn::MutablePtr Schema::get_predicate_column_ptr(const Field& fi
break;
case FieldType::OLAP_FIELD_TYPE_VARCHAR:
case FieldType::OLAP_FIELD_TYPE_STRING:
case FieldType::OLAP_FIELD_TYPE_JSONB:
if (config::enable_low_cardinality_optimize && reader_type == ReaderType::READER_QUERY) {
ptr = doris::vectorized::ColumnDictionary<doris::vectorized::Int32>::create(
field.type());
Expand Down
79 changes: 72 additions & 7 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,58 @@ using strings::SkipWhitespace;
namespace doris {
using namespace ErrorCode;

namespace {
/// if binlog file exist, then check if binlog file md5sum equal
/// if equal, then skip link file
/// if not equal, then return error
/// return value: if binlog file not exist, then return to binlog file path
Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
const std::string& clone_file, bool* skip_link_file) {
// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);

// check to to file exist
bool exists = true;
auto status = io::global_local_filesystem()->exists(to, &exists);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (!exists) {
return to;
}

LOG(WARNING) << "binlog file already exist. "
<< "tablet_dir=" << tablet_dir << ", clone_file=" << clone_file;

std::string clone_file_md5sum;
status = io::global_local_filesystem()->md5sum(clone_file, &clone_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}
std::string to_file_md5sum;
status = io::global_local_filesystem()->md5sum(to, &to_file_md5sum);
if (!status.ok()) {
return ResultError(std::move(status));
}

if (clone_file_md5sum == to_file_md5sum) {
// if md5sum equal, then skip link file
*skip_link_file = true;
return to;
} else {
auto err_msg = fmt::format(
"binlog file already exist, but md5sum not equal. "
"tablet_dir={}, clone_file={}",
tablet_dir, clone_file);
LOG(WARNING) << err_msg;
return ResultError(Status::InternalError(std::move(err_msg)));
}
}
} // namespace

#define RETURN_IF_ERROR_(status, stmt) \
do { \
status = (stmt); \
Expand Down Expand Up @@ -599,6 +651,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
/// Traverse all downloaded clone files in CLONE dir.
/// If it does not exist in local tablet dir, link the file to local tablet dir
/// And save all linked files in linked_success_files.
/// if binlog exist in clone dir and md5sum equal, then skip link file
bool skip_link_file = false;
for (const string& clone_file : clone_file_names) {
if (local_file_names.find(clone_file) != local_file_names.end()) {
VLOG_NOTICE << "find same file when clone, skip it. "
Expand All @@ -615,19 +669,30 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
break;
}

// change clone_file suffix .binlog to .dat
std::string new_clone_file = clone_file;
new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
if (auto&& result = check_dest_binlog_valid(tablet_dir, clone_file, &skip_link_file);
result) {
to = std::move(result.value());
} else {
status = std::move(result.error());
return status;
}
} else {
to = fmt::format("{}/{}", tablet_dir, clone_file);
}

RETURN_IF_ERROR(io::global_local_filesystem()->link_file(from, to));
linked_success_files.emplace_back(std::move(to));
if (!skip_link_file) {
status = io::global_local_filesystem()->link_file(from, to);
if (!status.ok()) {
return status;
}
linked_success_files.emplace_back(std::move(to));
}
}
if (contain_binlog) {
RETURN_IF_ERROR(tablet->ingest_binlog_metas(&rowset_binlog_metas_pb));
status = tablet->ingest_binlog_metas(&rowset_binlog_metas_pb);
if (!status.ok()) {
return status;
}
}

// clone and compaction operation should be performed sequentially
Expand Down
16 changes: 9 additions & 7 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,15 @@ Status EnginePublishVersionTask::finish() {
(*_succ_tablets)[tablet_id] = 0;
} else {
add_error_tablet_id(tablet_id);
LOG(WARNING)
<< "publish version failed on transaction, tablet version not "
"exists. "
<< "transaction_id=" << transaction_id
<< ", tablet_id=" << tablet_id
<< ", tablet_state=" << tablet_state_name(tablet->tablet_state())
<< ", version=" << par_ver_info.version;
if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG(WARNING)
<< "publish version failed on transaction, tablet version not "
"exists. "
<< "transaction_id=" << transaction_id
<< ", tablet_id=" << tablet_id << ", tablet_state="
<< tablet_state_name(tablet->tablet_state())
<< ", version=" << par_ver_info.version;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ Status ScanLocalState<Derived>::_normalize_noneq_binary_predicate(
DCHECK(expr->children().size() == 2);

auto noneq_checker = [](const std::string& fn_name) {
return fn_name != "ne" && fn_name != "eq";
return fn_name != "ne" && fn_name != "eq" && fn_name != "eq_for_null";
};
StringRef value;
int slot_ref_child = -1;
Expand Down
12 changes: 9 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ Status PipelineFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
BackendOptions::get_localhost());
Expand Down Expand Up @@ -851,7 +851,13 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
return _root_pipeline->set_sink(sink_);
}

void PipelineFragmentContext::_close_action() {
// If all pipeline tasks binded to the fragment instance are finished, then we could
// close the fragment instance.
void PipelineFragmentContext::_close_fragment_instance() {
if (_is_fragment_instance_closed) {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done
Expand All @@ -862,7 +868,7 @@ void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
virtual Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
virtual void _close_action();
virtual void _close_fragment_instance();
void _init_next_report_time();
void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }

Expand Down Expand Up @@ -205,7 +205,7 @@ class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFrag
RuntimeProfile::Counter* _prepare_timer;

std::function<void(RuntimeState*, Status*)> _call_back;
std::once_flag _close_once_flag;
bool _is_fragment_instance_closed = false;

// If this is set to false, and '_is_report_success' is false as well,
// This executor will not report status to FE on being cancelled.
Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ Status PipelineXFragmentContext::submit() {
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
_close_fragment_instance();
}
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
BackendOptions::get_localhost());
Expand Down Expand Up @@ -969,7 +969,11 @@ void PipelineXFragmentContext::close_if_prepare_failed() {
}
}

void PipelineXFragmentContext::_close_action() {
void PipelineXFragmentContext::_close_fragment_instance() {
if (_is_fragment_instance_closed) {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// all submitted tasks done
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
std::string debug_string() override;

private:
void _close_action() override;
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, PipelinePtr& cur_pipe,
const std::vector<TExpr>& texprs);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
task->set_close_pipeline_time();
task->release_dependency();
task->set_running(false);
// close_a_pipeline may delete fragment context and will core in some defer
// code, because the defer code will access fragment context it self.
std::shared_ptr<PipelineFragmentContext> lock_for_context =
task->fragment_context()->shared_from_this();
task->fragment_context()->close_a_pipeline();
}

Expand Down
14 changes: 7 additions & 7 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
*eos = false;
std::unique_lock l(mutex);
if (!need_commit) {
auto left_milliseconds = config::group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
auto left_milliseconds =
_group_commit_interval_ms - std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
need_commit = true;
}
}
while (_status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
CHECK(*_single_block_queue_bytes == 0);
auto left_milliseconds = config::group_commit_interval_ms;
auto left_milliseconds = _group_commit_interval_ms;
if (!need_commit) {
left_milliseconds = config::group_commit_interval_ms -
left_milliseconds = _group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
Expand Down Expand Up @@ -251,7 +251,7 @@ Status GroupCommitTable::_create_group_commit_load(
{
load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version, _all_block_queues_bytes,
result.wait_internal_group_commit_finish);
result.wait_internal_group_commit_finish, result.group_commit_interval_ms);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ class LoadBlockQueue {
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, int64_t txn_id,
int64_t schema_version,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
bool wait_internal_group_commit_finish)
bool wait_internal_group_commit_finish, int64_t group_commit_interval_ms)
: load_instance_id(load_instance_id),
label(label),
txn_id(txn_id),
schema_version(schema_version),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_start_time(std::chrono::steady_clock::now()),
_all_block_queues_bytes(all_block_queues_bytes) {
_all_block_queues_bytes(all_block_queues_bytes),
_group_commit_interval_ms(group_commit_interval_ms) {
_single_block_queue_bytes = std::make_shared<std::atomic_size_t>(0);
};

Expand Down Expand Up @@ -79,6 +80,8 @@ class LoadBlockQueue {
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
// memory consumption of one load block queue, used for correctness check.
std::shared_ptr<std::atomic_size_t> _single_block_queue_bytes;
// group commit interval in ms, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
};

class GroupCommitTable {
Expand Down
2 changes: 0 additions & 2 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ static const std::string THREAD_TOTAL_TIME = "TotalWallClockTime";
static const std::string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches";
static const std::string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches";

static const std::string SPAN_ATTRIBUTE_KEY_SEPARATOR = "-";

// The root counter name for all top level counters.
static const std::string ROOT_COUNTER;

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* e
DCHECK(expr->children().size() == 2);

auto noneq_checker = [](const std::string& fn_name) {
return fn_name != "ne" && fn_name != "eq";
return fn_name != "ne" && fn_name != "eq" && fn_name != "eq_for_null";
};
StringRef value;
int slot_ref_child = -1;
Expand Down
2 changes: 1 addition & 1 deletion docs/en/docs/advanced/partition/auto-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.

# AUTO PARTITION

<version since="2.0.3">
<version since="2.1">

</version>

Expand Down
2 changes: 1 addition & 1 deletion docs/zh-CN/docs/advanced/partition/auto-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.

# 自动分区

<version since="2.0.3">
<version since="2.1">

</version>

Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.doris.journal.bdbje.BDBToolOptions;
import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.qe.QeService;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FeServer;
import org.apache.doris.service.FrontendOptions;
Expand Down Expand Up @@ -194,6 +195,8 @@ public static void start(String dorisHomeDir, String pidDir, String[] args, Star
Env.getCurrentEnv().setHttpReady(true);
}

SimpleScheduler.init();

if (options.enableQeService) {
QeService qeService = new QeService(Config.query_port, Config.arrow_flight_sql_port,
ExecuteEnv.getInstance().getScheduler());
Expand Down
2 changes: 2 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
|| properties
.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
|| properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,7 @@ public void updateTableProperties(Database db, String tableName, Map<String, Str
if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty()
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
&& !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
LOG.info("Properties already up-to-date");
return;
Expand Down
Loading

0 comments on commit e93ce01

Please sign in to comment.