diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 380576fdc1591bd..b5d68f7c0188692 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -322,7 +322,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange& if (0 == _ranges.size()) { return; } - _file_params.file_type = _ranges[0].file_type; _file_params.format_type = _ranges[0].format_type; _file_params.src_tuple_id = _params.src_tuple_id; _file_params.dest_tuple_id = _params.dest_tuple_id; @@ -336,6 +335,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange& for (int i = 0; i < _ranges.size(); ++i) { TFileRangeDesc file_range; + // TODO(cmy): in previous implementation, the file_type is set in _file_params + // and it use _ranges[0].file_type. + // Later, this field is moved to TFileRangeDesc, but here we still only use _ranges[0]'s + // file_type. + // Because I don't know if othere range has this field, so just keep it same as before. + file_range.file_type = _ranges[0].file_type; file_range.load_id = _ranges[i].load_id; file_range.path = _ranges[i].path; file_range.start_offset = _ranges[i].start_offset; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3f19d489d9ae7a9..d3aecf2351b9b0e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -676,6 +676,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->query_id = query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); + + // set file scan range params + if (params.__isset.file_scan_params) { + query_ctx->file_scan_range_params_map = params.file_scan_params; + } + query_ctx->coord_addr = params.coord; LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo) << " coord_addr " << query_ctx->coord_addr @@ -766,8 +772,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, cur_span->SetAttribute("query_id", print_id(params.params.query_id)); cur_span->SetAttribute("instance_id", print_id(params.params.fragment_instance_id)); - VLOG_ROW << "exec_plan_fragment params is " - << apache::thrift::ThriftDebugString(params).c_str(); + LOG(INFO) << "exec_plan_fragment params is " + << apache::thrift::ThriftDebugString(params).c_str(); // sometimes TExecPlanFragmentParams debug string is too long and glog // will truncate the log line, so print query options seperately for debuggin purpose VLOG_ROW << "query options is " diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 8ed0150db5a40f3..c7d57629a2a1796 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -197,6 +197,8 @@ class QueryContext { std::vector fragment_ids; + std::map file_scan_range_params_map; + private: ExecEnv* _exec_env; vectorized::VecDateTimeValue _start_time; diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 9da29ed6dc10acc..063ea7a92210da5 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte _io_ctx(io_ctx) { _file_format_type = _params.format_type; _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; - _file_compress_type = _params.compress_type; + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _size = _range.size; _text_converter.reset(new (std::nothrow) TextConverter('\\', _array_delimiter[0])); @@ -111,7 +116,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params _decompressor(nullptr), _io_ctx(io_ctx) { _file_format_type = _params.format_type; - _file_compress_type = _params.compress_type; + if (_range.__isset.compress_type) { + // for compatibility + _file_compress_type = _range.compress_type; + } else { + _file_compress_type = _params.compress_type; + } _size = _range.size; _init_system_properties(); _init_file_description(); @@ -120,7 +130,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params CsvReader::~CsvReader() = default; void CsvReader::_init_system_properties() { - _system_properties.system_type = _params.file_type; + if (_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _range.file_type; + } else { + _system_properties.system_type = _params.file_type; + } _system_properties.properties = _params.properties; _system_properties.hdfs_params = _params.hdfs_params; if (_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 7d17f650c6615b6..94c376ae697ae8a 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams } void NewJsonReader::_init_system_properties() { - _system_properties.system_type = _params.file_type; + if (_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _range.file_type; + } else { + _system_properties.system_type = _params.file_type; + } _system_properties.properties = _params.properties; _system_properties.hdfs_params = _params.hdfs_params; if (_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 7f87f77590d553b..001d32b821e2b1f 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -181,7 +181,9 @@ void OrcReader::_collect_profile_on_close() { COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes); COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time); COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time); - COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time); + COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time); + COUNTER_UPDATE(_orc_profile.init_column_time, _statistics.init_column_time); + COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time); COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time); COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time); } @@ -200,7 +202,11 @@ void OrcReader::_init_profile() { _orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES); _orc_profile.column_read_time = ADD_CHILD_TIMER(_profile, "ColumnReadTime", orc_profile); _orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile, "GetBatchTime", orc_profile); - _orc_profile.parse_meta_time = ADD_CHILD_TIMER(_profile, "ParseMetaTime", orc_profile); + _orc_profile.create_reader_time = + ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile); + _orc_profile.init_column_time = ADD_CHILD_TIMER(_profile, "InitColumnTime", orc_profile); + _orc_profile.set_fill_column_time = + ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile); _orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile, "DecodeValueTime", orc_profile); _orc_profile.decode_null_map_time = ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile); @@ -254,9 +260,14 @@ Status OrcReader::init_reader( not_single_slot_filter_conjuncts->end()); } _obj_pool = std::make_shared(); - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - RETURN_IF_ERROR(_create_file_reader()); - RETURN_IF_ERROR(_init_read_columns()); + { + SCOPED_RAW_TIMER(&_statistics.create_reader_time); + RETURN_IF_ERROR(_create_file_reader()); + } + { + SCOPED_RAW_TIMER(&_statistics.init_column_time); + RETURN_IF_ERROR(_init_read_columns()); + } return Status::OK(); } @@ -646,7 +657,7 @@ Status OrcReader::set_fill_columns( const std::unordered_map>& partition_columns, const std::unordered_map& missing_columns) { - SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + SCOPED_RAW_TIMER(&_statistics.set_fill_column_time); // std::unordered_map> std::unordered_map> predicate_columns; @@ -864,7 +875,12 @@ void OrcReader::_init_bloom_filter( } void OrcReader::_init_system_properties() { - _system_properties.system_type = _scan_params.file_type; + if (_scan_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _scan_range.file_type; + } else { + _system_properties.system_type = _scan_params.file_type; + } _system_properties.properties = _scan_params.properties; _system_properties.hdfs_params = _scan_params.hdfs_params; if (_scan_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 85a73e0e21effd5..d05fb4c47822699 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -126,7 +126,9 @@ class OrcReader : public GenericReader { int64_t fs_read_bytes = 0; int64_t column_read_time = 0; int64_t get_batch_time = 0; - int64_t parse_meta_time = 0; + int64_t create_reader_time = 0; + int64_t init_column_time = 0; + int64_t set_fill_column_time = 0; int64_t decode_value_time = 0; int64_t decode_null_map_time = 0; }; @@ -200,7 +202,9 @@ class OrcReader : public GenericReader { RuntimeProfile::Counter* read_bytes; RuntimeProfile::Counter* column_read_time; RuntimeProfile::Counter* get_batch_time; - RuntimeProfile::Counter* parse_meta_time; + RuntimeProfile::Counter* create_reader_time; + RuntimeProfile::Counter* init_column_time; + RuntimeProfile::Counter* set_fill_column_time; RuntimeProfile::Counter* decode_value_time; RuntimeProfile::Counter* decode_null_map_time; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index fed33b6d28d5485..ed4aa6e6ae3777c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -269,7 +269,12 @@ Status ParquetReader::open() { } void ParquetReader::_init_system_properties() { - _system_properties.system_type = _scan_params.file_type; + if (_scan_range.__isset.file_type) { + // for compatibility + _system_properties.system_type = _scan_range.file_type; + } else { + _system_properties.system_type = _scan_params.file_type; + } _system_properties.properties = _scan_params.properties; _system_properties.hdfs_params = _scan_params.hdfs_params; if (_scan_params.__isset.broker_addresses) { diff --git a/be/src/vec/exec/scan/avro_jni_reader.cpp b/be/src/vec/exec/scan/avro_jni_reader.cpp index 5d1ef40cbc87aca..ffca8682c488009 100644 --- a/be/src/vec/exec/scan/avro_jni_reader.cpp +++ b/be/src/vec/exec/scan/avro_jni_reader.cpp @@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader( index++; } - TFileType::type type = _params.file_type; + TFileType::type type; + if (_range.__isset.file_type) { + // for compatibility + type = _range.file_type; + } else { + type = _params.file_type; + } std::map required_param = { {"required_fields", required_fields.str()}, {"columns_types", columns_types.str()}, diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 127539d26e4b14c..0b8d3185519f2eb 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { Status NewFileScanNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VScanNode::prepare(state)); + if (state->get_query_ctx() != nullptr && + state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) { + TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()]; + _input_tuple_id = params.src_tuple_id; + _output_tuple_id = params.dest_tuple_id; + } return Status::OK(); } @@ -74,7 +80,10 @@ void NewFileScanNode::set_scan_ranges(const std::vector& scan_ _scan_ranges.shrink_to_fit(); LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); } - if (scan_ranges.size() > 0) { + if (scan_ranges.size() > 0 && + scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { + // for compatibility. + // in new implement, the tuple id is set in prepare phase _input_tuple_id = scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id; _output_tuple_id = diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index b95d2aa773b0645..b00fc492b296ec8 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -89,7 +89,6 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t const TFileScanRange& scan_range, RuntimeProfile* profile, ShardedKVCache* kv_cache) : VScanner(state, static_cast(parent), limit, profile), - _params(scan_range.params), _ranges(scan_range.ranges), _next_range(0), _cur_reader(nullptr), @@ -99,6 +98,14 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t if (scan_range.params.__isset.strict_mode) { _strict_mode = scan_range.params.strict_mode; } + + if (state->get_query_ctx() != nullptr && + state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) { + _params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]); + } else { + CHECK(scan_range.__isset.params); + _params = &(scan_range.params); + } } Status VFileScanner::prepare( @@ -133,13 +140,13 @@ Status VFileScanner::prepare( std::vector({_input_tuple_desc->id()}), std::vector({false}))); // prepare pre filters - if (_params.__isset.pre_filter_exprs_list) { + if (_params->__isset.pre_filter_exprs_list) { RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees( - _params.pre_filter_exprs_list, _pre_conjunct_ctxs)); - } else if (_params.__isset.pre_filter_exprs) { + _params->pre_filter_exprs_list, _pre_conjunct_ctxs)); + } else if (_params->__isset.pre_filter_exprs) { VExprContextSPtr context; RETURN_IF_ERROR( - doris::vectorized::VExpr::create_expr_tree(_params.pre_filter_exprs, context)); + doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context)); _pre_conjunct_ctxs.emplace_back(context); } @@ -569,9 +576,10 @@ Status VFileScanner::_get_next_reader() { // create reader for specific format Status init_status; - TFileFormatType::type format_type = _params.format_type; + TFileFormatType::type format_type = _params->format_type; // JNI reader can only push down column value range - bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI; + bool push_down_predicates = + !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI; if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { if (range.table_format_params.hudi_params.delta_logs.empty()) { @@ -598,9 +606,9 @@ Status VFileScanner::_get_next_reader() { ->init_reader(_colname_to_value_range); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "hudi") { - _cur_reader = - HudiJniReader::create_unique(_params, range.table_format_params.hudi_params, - _file_slot_descs, _state, _profile); + _cur_reader = HudiJniReader::create_unique(*_params, + range.table_format_params.hudi_params, + _file_slot_descs, _state, _profile); init_status = ((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range); } @@ -608,9 +616,10 @@ Status VFileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_PARQUET: { std::unique_ptr parquet_reader = ParquetReader::create_unique( - _profile, _params, range, _state->query_options().batch_size, + _profile, *_params, range, _state->query_options().batch_size, const_cast(&_state->timezone_obj()), _io_ctx.get(), _state, - ExecEnv::GetInstance()->file_meta_cache(), + config::max_external_file_meta_cache_num <=0 ? nullptr + : ExecEnv::GetInstance()->file_meta_cache(), _state->query_options().enable_parquet_lazy_mat); { SCOPED_TIMER(_open_reader_timer); @@ -627,7 +636,7 @@ Status VFileScanner::_get_next_reader() { range.table_format_params.table_format_type == "iceberg") { std::unique_ptr iceberg_reader = IcebergTableReader::create_unique(std::move(parquet_reader), _profile, - _state, _params, range, _kv_cache, + _state, *_params, range, _kv_cache, _io_ctx.get()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, @@ -649,7 +658,7 @@ Status VFileScanner::_get_next_reader() { } case TFileFormatType::FORMAT_ORC: { std::unique_ptr orc_reader = OrcReader::create_unique( - _profile, _state, _params, range, _state->query_options().batch_size, + _profile, _state, *_params, range, _state->query_options().batch_size, _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat); if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) { _push_down_conjuncts.resize(_conjuncts.size()); @@ -662,7 +671,7 @@ Status VFileScanner::_get_next_reader() { range.table_format_params.table_format_type == "transactional_hive") { std::unique_ptr tran_orc_reader = TransactionalHiveReader::create_unique(std::move(orc_reader), _profile, - _state, _params, range, + _state, *_params, range, _io_ctx.get()); init_status = tran_orc_reader->init_reader( _file_col_names, _colname_to_value_range, _push_down_conjuncts, @@ -686,13 +695,13 @@ Status VFileScanner::_get_next_reader() { case TFileFormatType::FORMAT_CSV_LZOP: case TFileFormatType::FORMAT_CSV_DEFLATE: case TFileFormatType::FORMAT_PROTO: { - _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, _params, range, + _cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, _io_ctx.get()); init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load); break; } case TFileFormatType::FORMAT_JSON: { - _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, _params, range, + _cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range, _file_slot_descs, &_scanner_eof, _io_ctx.get(), _is_dynamic_schema); init_status = @@ -700,13 +709,14 @@ Status VFileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_AVRO: { - _cur_reader = AvroJNIReader::create_unique(_state, _profile, _params, _file_slot_descs); + _cur_reader = + AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs); init_status = ((AvroJNIReader*)(_cur_reader.get())) ->init_fetch_table_reader(_colname_to_value_range); break; } default: - return Status::InternalError("Not supported file format: {}", _params.format_type); + return Status::InternalError("Not supported file format: {}", _params->format_type); } if (init_status.is()) { @@ -810,8 +820,8 @@ Status VFileScanner::_init_expr_ctxes() { } } - _num_of_columns_from_file = _params.num_of_columns_from_file; - for (const auto& slot_info : _params.required_slots) { + _num_of_columns_from_file = _params->num_of_columns_from_file; + for (const auto& slot_info : _params->required_slots) { auto slot_id = slot_info.slot_id; auto it = full_src_slot_map.find(slot_id); if (it == std::end(full_src_slot_map)) { @@ -843,8 +853,8 @@ Status VFileScanner::_init_expr_ctxes() { continue; } vectorized::VExprContextSPtr ctx; - auto it = _params.default_value_of_src_slot.find(slot_desc->id()); - if (it != std::end(_params.default_value_of_src_slot)) { + auto it = _params->default_value_of_src_slot.find(slot_desc->id()); + if (it != std::end(_params->default_value_of_src_slot)) { if (!it->second.nodes.empty()) { RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx)); RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc)); @@ -857,14 +867,14 @@ Status VFileScanner::_init_expr_ctxes() { if (_is_load) { // follow desc expr map is only for load task. - bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans; + bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans; int idx = 0; for (auto slot_desc : _output_tuple_desc->slots()) { if (!slot_desc->is_materialized()) { continue; } - auto it = _params.expr_of_dest_slot.find(slot_desc->id()); - if (it == std::end(_params.expr_of_dest_slot)) { + auto it = _params->expr_of_dest_slot.find(slot_desc->id()); + if (it == std::end(_params->expr_of_dest_slot)) { return Status::InternalError("No expr for dest slot, id={}, name={}", slot_desc->id(), slot_desc->col_name()); } @@ -879,8 +889,8 @@ Status VFileScanner::_init_expr_ctxes() { _dest_slot_name_to_idx[slot_desc->col_name()] = idx++; if (has_slot_id_map) { - auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); - if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) { + auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id()); + if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) { _src_slot_descs_order_by_dest.emplace_back(nullptr); } else { auto _src_slot_it = full_src_slot_map.find(it1->second); diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 0b1d7a558a02481..9a16d056edba9d1 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -89,7 +89,7 @@ class VFileScanner : public VScanner { protected: std::unique_ptr _text_converter; - const TFileScanRangeParams& _params; + const TFileScanRangeParams* _params; const std::vector& _ranges; int _next_range; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 09ed6361e5a8ff2..5b6238bf14ce5ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -180,11 +180,6 @@ private void updateRequiredSlots() throws UserException { } // Update required slots and column_idxs in scanRangeLocations. setColumnPositionMapping(); - for (TScanRangeLocations location : scanRangeLocations) { - TFileScanRangeParams rangeParams = location.getScanRange().getExtScanRange().getFileScanRange().getParams(); - rangeParams.setRequiredSlots(params.getRequiredSlots()); - rangeParams.setColumnIdxs(params.getColumnIdxs()); - } } @Override @@ -227,6 +222,10 @@ private void setColumnPositionMapping() params.setColumnIdxs(columnIdxs); } + public TFileScanRangeParams getFileScanRangeParams() { + return params; + } + @Override public void createScanRangeLocations() throws UserException { long start = System.currentTimeMillis(); @@ -237,41 +236,24 @@ public void createScanRangeLocations() throws UserException { } TFileFormatType fileFormatType = getFileFormatType(); params.setFormatType(fileFormatType); + boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; + if (isCsvOrJson) { + params.setFileAttributes(getFileAttributes()); + } + + Map locationProperties = getLocationProperties(); + // for JNI, only need to set properties + if (fileFormatType == TFileFormatType.FORMAT_JNI) { + params.setProperties(locationProperties); + } + List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { - TFileScanRangeParams scanRangeParams = new TFileScanRangeParams(params); FileSplit fileSplit = (FileSplit) split; TFileType locationType = getLocationType(fileSplit.getPath().toString()); - scanRangeParams.setFileType(locationType); - TFileCompressType fileCompressType = getFileCompressType(fileSplit); - scanRangeParams.setCompressType(fileCompressType); - boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON; - if (isCsvOrJson) { - scanRangeParams.setFileAttributes(getFileAttributes()); - } - - // set hdfs params for hdfs file type. - Map locationProperties = getLocationProperties(); - if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) { - scanRangeParams.setProperties(locationProperties); - } - if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { - String fsName = getFsName(fileSplit); - THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); - tHdfsParams.setFsName(fsName); - scanRangeParams.setHdfsParams(tHdfsParams); - - if (locationType == TFileType.FILE_BROKER) { - FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); - if (broker == null) { - throw new UserException("No alive broker."); - } - scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); - } - } - - TScanRangeLocations curLocations = newLocations(scanRangeParams); + setLocationPropertiesIfNecessary(locationType, fileSplit, locationProperties); + TScanRangeLocations curLocations = newLocations(); // If fileSplit has partition values, use the values collected from hive partitions. // Otherwise, use the values in file path. boolean isACID = false; @@ -285,6 +267,8 @@ public void createScanRangeLocations() throws UserException { TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, locationType); + TFileCompressType fileCompressType = getFileCompressType(fileSplit); + rangeDesc.setCompressType(fileCompressType); if (isACID) { HiveSplit hiveSplit = (HiveSplit) split; hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE); @@ -332,11 +316,31 @@ public void createScanRangeLocations() throws UserException { scanRangeLocations.size(), (System.currentTimeMillis() - start)); } - private TScanRangeLocations newLocations(TFileScanRangeParams params) { + private void setLocationPropertiesIfNecessary(TFileType locationType, FileSplit fileSplit, + Map locationProperties) throws UserException { + if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) { + if (!params.isSetHdfsParams()) { + String fsName = getFsName(fileSplit); + THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties); + tHdfsParams.setFsName(fsName); + params.setHdfsParams(tHdfsParams); + } + + if (locationType == TFileType.FILE_BROKER && !params.isSetBrokerAddresses()) { + FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker(); + if (broker == null) { + throw new UserException("No alive broker."); + } + params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port)); + } + } else if (locationType == TFileType.FILE_S3 && !params.isSetProperties()) { + params.setProperties(locationProperties); + } + } + + private TScanRangeLocations newLocations() { // Generate on file scan range TFileScanRange fileScanRange = new TFileScanRange(); - fileScanRange.setParams(params); - // Scan range TExternalScanRange externalScanRange = new TExternalScanRange(); externalScanRange.setFileScanRange(fileScanRange); @@ -361,6 +365,7 @@ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List col rangeDesc.setColumnsFromPath(columnsFromPath); rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys); + rangeDesc.setFileType(locationType); if (locationType == TFileType.FILE_HDFS) { rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); } else if (locationType == TFileType.FILE_S3 @@ -424,3 +429,4 @@ protected static Optional getTFileType(String location) { } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 180e3e1c75fef75..a81bd1984bee95c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -61,6 +61,7 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.planner.external.ExternalScanNode; +import org.apache.doris.planner.external.FileQueryScanNode; import org.apache.doris.planner.external.FileScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; @@ -83,6 +84,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -165,6 +167,9 @@ public class Coordinator { // copied from TQueryExecRequest; constant across all fragments private final TDescriptorTable descTable; + // scan node id -> TFileScanRangeParams + private Map fileScanRangeParamsMap = Maps.newHashMap(); + // Why do we use query global? // When `NOW()` function is in sql, we need only one now(), // but, we execute `NOW()` distributed. @@ -1937,6 +1942,11 @@ private void computeScanRangeAssignment() throws Exception { k -> Sets.newHashSet()); scanNodeIds.add(scanNode.getId().asInt()); + if (scanNode instanceof FileQueryScanNode) { + fileScanRangeParamsMap.put( + scanNode.getId().asInt(), ((FileQueryScanNode) scanNode).getFileScanRangeParams()); + } + FragmentScanRangeAssignment assignment = fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment; boolean fragmentContainsColocateJoin = isColocateFragment(scanNode.getFragment(), @@ -2294,7 +2304,7 @@ public boolean isDone() { return executionProfile.isAllInstancesDone(); } - // map from an impalad host address to the per-node assigned scan ranges; + // map from a BE host address to the per-node assigned scan ranges; // records scan range assignment for a single fragment class FragmentScanRangeAssignment extends HashMap>> { @@ -2580,6 +2590,7 @@ public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFr */ public void unsetFields() { this.rpcParams.unsetDescTbl(); + this.rpcParams.unsetFileScanParams(); this.rpcParams.unsetCoord(); this.rpcParams.unsetQueryGlobals(); this.rpcParams.unsetResourceInfo(); @@ -2731,6 +2742,7 @@ public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, */ public void unsetFields() { this.rpcParams.unsetDescTbl(); + this.rpcParams.unsetFileScanParams(); this.rpcParams.unsetCoord(); this.rpcParams.unsetQueryGlobals(); this.rpcParams.unsetResourceInfo(); @@ -3148,6 +3160,8 @@ List toThrift(int backendNum) { rf.getFilterId().asInt(), rf.toThrift()); } } + + params.setFileScanParams(fileScanRangeParamsMap); paramsList.add(params); } return paramsList; @@ -3186,6 +3200,8 @@ Map toTPipelineParams(int backendNum) if (tWorkloadGroups != null) { params.setWorkloadGroups(tWorkloadGroups); } + + params.setFileScanParams(fileScanRangeParamsMap); res.put(instanceExecParam.host, params); } TPipelineFragmentParams params = res.get(instanceExecParam.host); @@ -3420,3 +3436,4 @@ public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) { } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index ddc28de961cfba3..49961afcbc0f012 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -467,7 +467,6 @@ private void fillColumns(InternalService.PFetchTableSchemaResult result) private PFetchTableSchemaRequest getFetchTableStructureRequest() throws AnalysisException, TException { // set TFileScanRangeParams TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams(); - fileScanRangeParams.setFileType(getTFileType()); fileScanRangeParams.setFormatType(fileFormatType); fileScanRangeParams.setProperties(locationProperties); fileScanRangeParams.setFileAttributes(getFileAttributes()); @@ -491,9 +490,10 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws Analysis throw new AnalysisException("Can not get first file, please check uri."); } - fileScanRangeParams.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); // set TFileRangeDesc TFileRangeDesc fileRangeDesc = new TFileRangeDesc(); + fileRangeDesc.setFileType(getTFileType()); + fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType, firstFile.getPath())); fileRangeDesc.setPath(firstFile.getPath()); fileRangeDesc.setStartOffset(0); fileRangeDesc.setSize(firstFile.getSize()); @@ -508,3 +508,4 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws Analysis } } + diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 40457481b7bef47..95dc716b7ba1eb7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -420,6 +420,9 @@ struct TExecPlanFragmentParams { 22: optional list instances_sharing_hash_table; 23: optional string table_name; + + // scan node id -> scan range params, only for external file scan + 24: optional map file_scan_params } struct TExecPlanFragmentParamsList { @@ -632,6 +635,8 @@ struct TPipelineFragmentParams { 26: optional list workload_groups 27: optional TTxnParams txn_conf 28: optional string table_name + // scan node id -> scan range params, only for external file scan + 29: optional map file_scan_params } struct TPipelineFragmentParamsList { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1b74b0cc70a8c00..9dacaac8893e5d9 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -335,8 +335,10 @@ struct TTableFormatFileDesc { } struct TFileScanRangeParams { + // deprecated, move to TFileScanRange 1: optional Types.TFileType file_type; 2: optional TFileFormatType format_type; + // deprecated, move to TFileScanRange 3: optional TFileCompressType compress_type; // If this is for load job, src point to the source table and dest point to the doris table. // If this is for query, only dest_tuple_id is set, including both file slot and partition slot. @@ -395,6 +397,8 @@ struct TFileRangeDesc { 8: optional TTableFormatFileDesc table_format_params // Use modification time to determine whether the file is changed 9: optional i64 modification_time + 10: optional Types.TFileType file_type; + 11: optional TFileCompressType compress_type; } // TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it. @@ -402,6 +406,10 @@ struct TFileRangeDesc { // list: file location and range struct TFileScanRange { 1: optional list ranges + // If file_scan_params in TExecPlanFragmentParams is set in TExecPlanFragmentParams + // will use that field, otherwise, use this field. + // file_scan_params in TExecPlanFragmentParams will always be set in query request, + // and TFileScanRangeParams here is used for some other request such as fetch table schema for tvf. 2: optional TFileScanRangeParams params }