Skip to content

Commit

Permalink
[chore](conf)remove unused doris_max_scan_key_num and max_send_batch_…
Browse files Browse the repository at this point in the history
…parallelism_per_job conf (apache#39219)

Use SessionVariable to control it, rather than be conf.

update doc apache/doris-website#1001
  • Loading branch information
Mryange authored Aug 16, 2024
1 parent 8306a21 commit 7a09202
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 43 deletions.
13 changes: 0 additions & 13 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ DEFINE_mInt32(doris_scanner_row_num, "16384");
// single read execute fragment row bytes
DEFINE_mInt32(doris_scanner_row_bytes, "10485760");
DEFINE_mInt32(min_bytes_in_scanner_queue, "67108864");
// number of max scan keys
DEFINE_mInt32(doris_max_scan_key_num, "48");
// the max number of push down values of a single column.
// if exceed, no conditions will be pushed down for that column.
DEFINE_mInt32(max_pushdown_conditions_per_column, "1024");
// (Advanced) Maximum size of per-query receive-side buffer
DEFINE_mInt32(exchg_node_buffer_size_bytes, "20485760");
DEFINE_mInt32(exchg_buffer_queue_capacity_factor, "64");
Expand Down Expand Up @@ -810,14 +805,6 @@ DEFINE_Int32(load_stream_eagain_wait_seconds, "600");
DEFINE_Int32(load_stream_flush_token_max_tasks, "15");
// max wait flush token time in load stream
DEFINE_Int32(load_stream_max_wait_flush_token_time_ms, "600000");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
DEFINE_mInt32(max_send_batch_parallelism_per_job, "5");
DEFINE_Validator(max_send_batch_parallelism_per_job,
[](const int config) -> bool { return config >= 1; });

// number of send batch thread pool size
DEFINE_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
Expand Down
11 changes: 0 additions & 11 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ DECLARE_mInt32(doris_scanner_row_num);
// single read execute fragment row bytes
DECLARE_mInt32(doris_scanner_row_bytes);
DECLARE_mInt32(min_bytes_in_scanner_queue);
// number of max scan keys
DECLARE_mInt32(doris_max_scan_key_num);
// the max number of push down values of a single column.
// if exceed, no conditions will be pushed down for that column.
DECLARE_mInt32(max_pushdown_conditions_per_column);
// (Advanced) Maximum size of per-query receive-side buffer
DECLARE_mInt32(exchg_node_buffer_size_bytes);
DECLARE_mInt32(exchg_buffer_queue_capacity_factor);
Expand Down Expand Up @@ -872,12 +867,6 @@ DECLARE_Int32(load_stream_eagain_wait_seconds);
DECLARE_Int32(load_stream_flush_token_max_tasks);
// max wait flush token time in load stream
DECLARE_Int32(load_stream_max_wait_flush_token_time_ms);

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
// if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job
DECLARE_mInt32(max_send_batch_parallelism_per_job);

// number of send batch thread pool size
DECLARE_Int32(send_batch_thread_pool_thread_num);
// number of send batch thread pool queue size
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1393,13 +1393,9 @@ Status ScanOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState*
const TQueryOptions& query_options = state->query_options();
if (query_options.__isset.max_scan_key_num) {
_max_scan_key_num = query_options.max_scan_key_num;
} else {
_max_scan_key_num = config::doris_max_scan_key_num;
}
if (query_options.__isset.max_pushdown_conditions_per_column) {
_max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column;
} else {
_max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column;
}
// tnode.olap_scan_node.push_down_agg_type_opt field is deprecated
// Introduced a new field : tnode.push_down_agg_type_opt
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
std::unordered_map<std::string, int> _colname_to_slot_id;

// These two values are from query_options
int _max_scan_key_num;
int _max_pushdown_conditions_per_column;
int _max_scan_key_num = 48;
int _max_pushdown_conditions_per_column = 1024;

// If the query like select * from table limit 10; then the query should run in
// single scanner to avoid too many scanners which will cause lots of useless read.
Expand Down
4 changes: 1 addition & 3 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,10 +1077,8 @@ Status VTabletWriter::open(doris::RuntimeState* state, doris::RuntimeProfile* pr

RETURN_IF_ERROR(index_channel->check_intolerable_failure());
}
int32_t send_batch_parallelism =
MIN(_send_batch_parallelism, config::max_send_batch_parallelism_per_job);
_send_batch_thread_pool_token = state->exec_env()->send_batch_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT, send_batch_parallelism);
ThreadPool::ExecutionMode::CONCURRENT, _send_batch_parallelism);

// start to send batch continually. this must be called after _init
if (bthread_start_background(&_sender_thread, nullptr, periodic_send_batch, (void*)this) != 0) {
Expand Down
14 changes: 4 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,11 +951,10 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = DEFAULT_TMP_STORAGE_ENGINE)
public String defaultTmpStorageEngine = "olap";

// -1 means unset, BE will use its config value
@VariableMgr.VarAttr(name = MAX_SCAN_KEY_NUM)
public int maxScanKeyNum = -1;
public int maxScanKeyNum = 48;
@VariableMgr.VarAttr(name = MAX_PUSHDOWN_CONDITIONS_PER_COLUMN)
public int maxPushdownConditionsPerColumn = -1;
public int maxPushdownConditionsPerColumn = 1024;
@VariableMgr.VarAttr(name = SHOW_HIDDEN_COLUMNS, flag = VariableMgr.SESSION_ONLY)
public boolean showHiddenColumns = false;

Expand Down Expand Up @@ -3585,13 +3584,8 @@ public TQueryOptions toThrift() {
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);

if (maxScanKeyNum > -1) {
tResult.setMaxScanKeyNum(maxScanKeyNum);
}
if (maxPushdownConditionsPerColumn > -1) {
tResult.setMaxPushdownConditionsPerColumn(maxPushdownConditionsPerColumn);
}
tResult.setMaxScanKeyNum(maxScanKeyNum);
tResult.setMaxPushdownConditionsPerColumn(maxPushdownConditionsPerColumn);

tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs);
tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum);
Expand Down

0 comments on commit 7a09202

Please sign in to comment.