Skip to content

Commit

Permalink
[Profile](runtimefilter) fix merge time of runtime filter (apache#21654)
Browse files Browse the repository at this point in the history
  • Loading branch information
HappenLee authored Jul 10, 2023
1 parent a1a8ee8 commit 1a08c81
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
1 change: 1 addition & 0 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class BloomFilterFuncBase {
if (_inited) {
return Status::OK();
}
// TODO: really need the lock?
std::lock_guard<std::mutex> l(_lock);
if (_inited) {
return Status::OK();
Expand Down
7 changes: 5 additions & 2 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cntVal;
int merged_size = 0;
int64_t merge_time = 0;
{
int64_t start_merge = MonotonicMillis();
std::lock_guard<std::mutex> guard(_filter_map_mutex);
Expand All @@ -334,9 +335,11 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
// TODO: avoid log when we had acquired a lock
VLOG_ROW << "merge size:" << merged_size << ":" << cntVal->producer_size;
DCHECK_LE(merged_size, cntVal->producer_size);
_merge_timer += (MonotonicMillis() - start_merge);
iter->second->merge_time += (MonotonicMillis() - start_merge);
if (merged_size < cntVal->producer_size) {
return Status::OK();
} else {
merge_time = iter->second->merge_time;
}
}

Expand Down Expand Up @@ -374,7 +377,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
rpc_contexts[cur]->request.set_filter_id(request->filter_id());
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
request->is_pipeline());
rpc_contexts[cur]->request.set_merge_time(_merge_timer);
rpc_contexts[cur]->request.set_merge_time(merge_time);
*rpc_contexts[cur]->request.mutable_query_id() = request->query_id();
if (has_attachment) {
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class RuntimeFilterMergeControllerEntity {
UniqueId instance_id() const { return _fragment_instance_id; }

struct RuntimeFilterCntlVal {
int64_t create_time;
int64_t merge_time;
int producer_size;
TRuntimeFilterDesc runtime_filter_desc;
std::vector<doris::TRuntimeFilterTargetParams> target_info;
Expand Down Expand Up @@ -173,7 +173,6 @@ class RuntimeFilterMergeControllerEntity {
std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
RuntimeState* _state;
bool _opt_remote_rf = true;
int64_t _merge_timer = 0;
};

// RuntimeFilterMergeController has a map query-id -> entity
Expand Down

0 comments on commit 1a08c81

Please sign in to comment.