Skip to content

Commit

Permalink
pass
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jul 12, 2023
1 parent fdf69e9 commit 7bbdc36
Show file tree
Hide file tree
Showing 17 changed files with 121 additions and 48 deletions.
7 changes: 6 additions & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
if (0 == _ranges.size()) {
return;
}
_file_params.file_type = _ranges[0].file_type;
_file_params.format_type = _ranges[0].format_type;
_file_params.src_tuple_id = _params.src_tuple_id;
_file_params.dest_tuple_id = _params.dest_tuple_id;
Expand All @@ -337,6 +336,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&

for (int i = 0; i < _ranges.size(); ++i) {
TFileRangeDesc file_range;
// TODO(cmy): in previous implementation, the file_type is set in _file_params
// and it use _ranges[0].file_type.
// Later, this field is moved to TFileRangeDesc, but here we still only use _ranges[0]'s
// file_type.
// Because I don't know if othere range has this field, so just keep it same as before.
file_range.file_type = _ranges[0].file_type;
file_range.load_id = _ranges[i].load_id;
file_range.path = _ranges[i].path;
file_range.start_offset = _ranges[i].start_offset;
Expand Down
9 changes: 8 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
query_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));

// set file scan range params
if (params.__isset.file_scan_params) {
LOG(INFO) << "yy debug set file_scan_range_params_map size: " << params.file_scan_params.size();
query_ctx->file_scan_range_params_map = params.file_scan_params;
}

query_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
<< " coord_addr " << query_ctx->coord_addr
Expand Down Expand Up @@ -762,7 +769,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
cur_span->SetAttribute("query_id", print_id(params.params.query_id));
cur_span->SetAttribute("instance_id", print_id(params.params.fragment_instance_id));

VLOG_ROW << "exec_plan_fragment params is "
LOG(INFO) << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class QueryContext {

std::vector<TUniqueId> fragment_ids;

std::map<int, TFileScanRangeParams> file_scan_range_params_map;

private:
ExecEnv* _exec_env;
vectorized::VecDateTimeValue _start_time;
Expand Down
1 change: 1 addition & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_request,
PFragmentRequestVersion version,
bool compact) {
LOG(INFO) << "yy debug _exec_plan_fragment_impl ser_request size: " << ser_request.size();
// Sometimes the BE do not receive the first heartbeat message and it receives request from FE
// If BE execute this fragment, it will core when it wants to get some property from master info.
if (ExecEnv::GetInstance()->master_info() == nullptr) {
Expand Down
21 changes: 18 additions & 3 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
_is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
_file_compress_type = _params.compress_type;
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_size = _range.size;

_text_converter.reset(new (std::nothrow) TextConverter('\\'));
Expand All @@ -111,7 +116,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params
_decompressor(nullptr),
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
_file_compress_type = _params.compress_type;
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_size = _range.size;
_init_system_properties();
_init_file_description();
Expand All @@ -120,7 +130,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params
CsvReader::~CsvReader() = default;

void CsvReader::_init_system_properties() {
_system_properties.system_type = _params.file_type;
if (_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _range.file_type;
} else {
_system_properties.system_type = _params.file_type;
}
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams
}

void NewJsonReader::_init_system_properties() {
_system_properties.system_type = _params.file_type;
if (_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _range.file_type;
} else {
_system_properties.system_type = _params.file_type;
}
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,12 @@ void OrcReader::_init_bloom_filter(
}

void OrcReader::_init_system_properties() {
_system_properties.system_type = _scan_params.file_type;
if (_scan_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _scan_range.file_type;
} else {
_system_properties.system_type = _scan_params.file_type;
}
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,12 @@ Status ParquetReader::open() {
}

void ParquetReader::_init_system_properties() {
_system_properties.system_type = _scan_params.file_type;
if (_scan_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _scan_range.file_type;
} else {
_system_properties.system_type = _scan_params.file_type;
}
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/scan/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader(
index++;
}

TFileType::type type = _params.file_type;
TFileType::type type;
if (_range.__isset.file_type) {
// for compatibility
type = _range.file_type;
} else {
type = _params.file_type;
}
std::map<String, String> required_param = {
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
Expand Down
12 changes: 10 additions & 2 deletions be/src/vec/exec/scan/new_file_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
if (state->get_query_ctx() != nullptr
&& state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()];
_input_tuple_id = params.src_tuple_id;
_output_tuple_id = params.dest_tuple_id;
}
return Status::OK();
}

Expand All @@ -74,8 +80,10 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0) {
_input_tuple_id =
if (scan_ranges.size() > 0 && scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
_input_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
Expand Down
61 changes: 36 additions & 25 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
const TFileScanRange& scan_range, RuntimeProfile* profile,
ShardedKVCache* kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_params(scan_range.params),
_ranges(scan_range.ranges),
_next_range(0),
_cur_reader(nullptr),
Expand All @@ -99,6 +98,18 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}

LOG(INFO) << "yy debug is null: " << (state->get_query_ctx() != nullptr)
<< ", find: " << state->get_query_ctx()->file_scan_range_params_map.count(parent->id())
<< ", id: " << parent->id();
if (state->get_query_ctx() != nullptr
&& state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) {
LOG(INFO) << "yy debug got from query context";
_params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
} else {
CHECK(scan_range.__isset.params);
_params = &(scan_range.params);
}
}

Status VFileScanner::prepare(
Expand Down Expand Up @@ -133,13 +144,13 @@ Status VFileScanner::prepare(
std::vector<TupleId>({_input_tuple_desc->id()}),
std::vector<bool>({false})));
// prepare pre filters
if (_params.__isset.pre_filter_exprs_list) {
if (_params->__isset.pre_filter_exprs_list) {
RETURN_IF_ERROR(doris::vectorized::VExpr::create_expr_trees(
_params.pre_filter_exprs_list, _pre_conjunct_ctxs));
} else if (_params.__isset.pre_filter_exprs) {
_params->pre_filter_exprs_list, _pre_conjunct_ctxs));
} else if (_params->__isset.pre_filter_exprs) {
VExprContextSPtr context;
RETURN_IF_ERROR(
doris::vectorized::VExpr::create_expr_tree(_params.pre_filter_exprs, context));
doris::vectorized::VExpr::create_expr_tree(_params->pre_filter_exprs, context));
_pre_conjunct_ctxs.emplace_back(context);
}

Expand Down Expand Up @@ -569,9 +580,9 @@ Status VFileScanner::_get_next_reader() {

// create reader for specific format
Status init_status;
TFileFormatType::type format_type = _params.format_type;
TFileFormatType::type format_type = _params->format_type;
// JNI reader can only push down column value range
bool push_down_predicates = !_is_load && _params.format_type != TFileFormatType::FORMAT_JNI;
bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
if (range.table_format_params.hudi_params.delta_logs.empty()) {
Expand Down Expand Up @@ -599,7 +610,7 @@ Status VFileScanner::_get_next_reader() {
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader =
HudiJniReader::create_unique(_params, range.table_format_params.hudi_params,
HudiJniReader::create_unique(*_params, range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
Expand All @@ -608,7 +619,7 @@ Status VFileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<ParquetReader> parquet_reader = ParquetReader::create_unique(
_profile, _params, range, _state->query_options().batch_size,
_profile, *_params, range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get(), _state,
ExecEnv::GetInstance()->file_meta_cache(),
_state->query_options().enable_parquet_lazy_mat);
Expand All @@ -627,7 +638,7 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergTableReader> iceberg_reader =
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
_state, _params, range, _kv_cache,
_state, *_params, range, _kv_cache,
_io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
Expand All @@ -649,7 +660,7 @@ Status VFileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, _params, range, _state->query_options().batch_size,
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
Expand All @@ -662,7 +673,7 @@ Status VFileScanner::_get_next_reader() {
range.table_format_params.table_format_type == "transactional_hive") {
std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
_state, _params, range,
_state, *_params, range,
_io_ctx.get());
init_status = tran_orc_reader->init_reader(
_file_col_names, _colname_to_value_range, _push_down_conjuncts,
Expand All @@ -686,27 +697,27 @@ Status VFileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_PROTO: {
_cur_reader = CsvReader::create_unique(_state, _profile, &_counter, _params, range,
_cur_reader = CsvReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, _io_ctx.get());
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
case TFileFormatType::FORMAT_JSON: {
_cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, _params, range,
_cur_reader = NewJsonReader::create_unique(_state, _profile, &_counter, *_params, range,
_file_slot_descs, &_scanner_eof,
_io_ctx.get(), _is_dynamic_schema);
init_status =
((NewJsonReader*)(_cur_reader.get()))->init_reader(_col_default_value_ctx);
break;
}
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile, _params, _file_slot_descs);
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
}
default:
return Status::InternalError("Not supported file format: {}", _params.format_type);
return Status::InternalError("Not supported file format: {}", _params->format_type);
}

if (init_status.is<END_OF_FILE>()) {
Expand Down Expand Up @@ -810,8 +821,8 @@ Status VFileScanner::_init_expr_ctxes() {
}
}

_num_of_columns_from_file = _params.num_of_columns_from_file;
for (const auto& slot_info : _params.required_slots) {
_num_of_columns_from_file = _params->num_of_columns_from_file;
for (const auto& slot_info : _params->required_slots) {
auto slot_id = slot_info.slot_id;
auto it = full_src_slot_map.find(slot_id);
if (it == std::end(full_src_slot_map)) {
Expand Down Expand Up @@ -843,8 +854,8 @@ Status VFileScanner::_init_expr_ctxes() {
continue;
}
vectorized::VExprContextSPtr ctx;
auto it = _params.default_value_of_src_slot.find(slot_desc->id());
if (it != std::end(_params.default_value_of_src_slot)) {
auto it = _params->default_value_of_src_slot.find(slot_desc->id());
if (it != std::end(_params->default_value_of_src_slot)) {
if (!it->second.nodes.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(it->second, ctx));
RETURN_IF_ERROR(ctx->prepare(_state, *_default_val_row_desc));
Expand All @@ -857,14 +868,14 @@ Status VFileScanner::_init_expr_ctxes() {

if (_is_load) {
// follow desc expr map is only for load task.
bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
bool has_slot_id_map = _params->__isset.dest_sid_to_src_sid_without_trans;
int idx = 0;
for (auto slot_desc : _output_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
auto it = _params.expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params.expr_of_dest_slot)) {
auto it = _params->expr_of_dest_slot.find(slot_desc->id());
if (it == std::end(_params->expr_of_dest_slot)) {
return Status::InternalError("No expr for dest slot, id={}, name={}",
slot_desc->id(), slot_desc->col_name());
}
Expand All @@ -879,8 +890,8 @@ Status VFileScanner::_init_expr_ctxes() {
_dest_slot_name_to_idx[slot_desc->col_name()] = idx++;

if (has_slot_id_map) {
auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) {
auto it1 = _params->dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it1 == std::end(_params->dest_sid_to_src_sid_without_trans)) {
_src_slot_descs_order_by_dest.emplace_back(nullptr);
} else {
auto _src_slot_it = full_src_slot_map.find(it1->second);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class VFileScanner : public VScanner {

protected:
std::unique_ptr<TextConverter> _text_converter;
const TFileScanRangeParams& _params;
const TFileScanRangeParams* _params;
const std::vector<TFileRangeDesc>& _ranges;
int _next_range;

Expand Down
Loading

0 comments on commit 7bbdc36

Please sign in to comment.