Skip to content

Commit

Permalink
[Chore](runtime filter) change runtime filter dcheck to error status …
Browse files Browse the repository at this point in the history
…or exception (apache#21475)

change runtime filter dcheck to error status or exception
  • Loading branch information
BiteTheDDDDt authored Jul 5, 2023
1 parent d3eeb23 commit f02bec8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
41 changes: 26 additions & 15 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "common/exception.h"
#include "common/status.h"
#include "exprs/runtime_filter.h"
#include "runtime/runtime_filter_mgr.h"
Expand All @@ -28,20 +29,24 @@
#include "vec/runtime/shared_hash_table_controller.h"

namespace doris {
// this class used in a hash join node
// Provide a unified interface for other classes
template <typename ExprCtxType>
class RuntimeFilterSlotsBase {
// this class used in hash join node
class VRuntimeFilterSlots {
public:
RuntimeFilterSlotsBase(const std::vector<std::shared_ptr<ExprCtxType>>& prob_expr_ctxs,
const std::vector<std::shared_ptr<ExprCtxType>>& build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>& prob_expr_ctxs,
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
: _probe_expr_context(prob_expr_ctxs),
_build_expr_context(build_expr_ctxs),
_runtime_filter_descs(runtime_filter_descs) {}

Status init(RuntimeState* state, int64_t hash_table_size, size_t build_bf_cardinality) {
DCHECK(_probe_expr_context.size() == _build_expr_context.size());
if (_probe_expr_context.size() != _build_expr_context.size()) {
return Status::InternalError(
"_probe_expr_context.size() != _build_expr_context.size(), "
"_probe_expr_context.size()={}, _build_expr_context.size()={}",
_probe_expr_context.size(), _build_expr_context.size());
}

// runtime filter effect strategy
// 1. we will ignore IN filter when hash_table_size is too big
Expand All @@ -53,7 +58,10 @@ class RuntimeFilterSlotsBase {
auto ignore_local_filter = [state](int filter_id) {
std::vector<IRuntimeFilter*> filters;
state->runtime_filter_mgr()->get_consume_filters(filter_id, filters);
DCHECK(!filters.empty());
if (filters.empty()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, filter_id={}",
filter_id);
}
for (auto filter : filters) {
filter->set_ignored();
filter->signal();
Expand Down Expand Up @@ -92,9 +100,13 @@ class RuntimeFilterSlotsBase {
IRuntimeFilter* runtime_filter = nullptr;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
&runtime_filter));
DCHECK(runtime_filter != nullptr);
DCHECK(runtime_filter->expr_order() >= 0);
DCHECK(runtime_filter->expr_order() < _probe_expr_context.size());
if (runtime_filter->expr_order() < 0 ||
runtime_filter->expr_order() >= _probe_expr_context.size()) {
return Status::InternalError(
"runtime_filter meet invalid expr_order, expr_order={}, "
"probe_expr_context.size={}",
runtime_filter->expr_order(), _probe_expr_context.size());
}

// do not create 'in filter' when hash_table size over limit
auto max_in_num = state->runtime_filter_max_in_num();
Expand Down Expand Up @@ -250,12 +262,11 @@ class RuntimeFilterSlotsBase {
bool empty() { return !_runtime_filters.size(); }

private:
const std::vector<std::shared_ptr<ExprCtxType>>& _probe_expr_context;
const std::vector<std::shared_ptr<ExprCtxType>>& _build_expr_context;
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _probe_expr_context;
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
};

using VRuntimeFilterSlots = RuntimeFilterSlotsBase<vectorized::VExprContext>;
} // namespace doris
18 changes: 10 additions & 8 deletions be/src/exprs/runtime_filter_slots_cross.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,26 @@

namespace doris {
// this class used in cross join node
template <typename ExprCtxType = vectorized::VExprContext>
class RuntimeFilterSlotsCross {
class VRuntimeFilterSlotsCross {
public:
RuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& runtime_filter_descs,
const vectorized::VExprContextSPtrs& src_expr_ctxs)
VRuntimeFilterSlotsCross(const std::vector<TRuntimeFilterDesc>& runtime_filter_descs,
const vectorized::VExprContextSPtrs& src_expr_ctxs)
: _runtime_filter_descs(runtime_filter_descs), filter_src_expr_ctxs(src_expr_ctxs) {}

~RuntimeFilterSlotsCross() = default;
~VRuntimeFilterSlotsCross() = default;

Status init(RuntimeState* state) {
for (auto& filter_desc : _runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(filter_desc.filter_id,
&runtime_filter));
DCHECK(runtime_filter != nullptr);
if (runtime_filter == nullptr) {
return Status::InternalError("runtime filter is nullptr");
}
// cross join has not remote filter
DCHECK(!runtime_filter->has_remote_target());
if (runtime_filter->has_remote_target()) {
return Status::InternalError("cross join runtime filter has remote target");
}
_runtime_filters.push_back(runtime_filter);
}
return Status::OK();
Expand Down Expand Up @@ -104,5 +107,4 @@ class RuntimeFilterSlotsCross {
std::vector<IRuntimeFilter*> _runtime_filters;
};

using VRuntimeFilterSlotsCross = RuntimeFilterSlotsCross<vectorized::VExprContext>;
} // namespace doris

0 comments on commit f02bec8

Please sign in to comment.