Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jul 13, 2023
1 parent 5b1cbe8 commit 48d1c01
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 17 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
// Later, this field is moved to TFileRangeDesc, but here we still only use _ranges[0]'s
// file_type.
// Because I don't know if othere range has this field, so just keep it same as before.
file_range.file_type = _ranges[0].file_type;
file_range.file_type = _ranges[0].file_type;
file_range.load_id = _ranges[i].load_id;
file_range.path = _ranges[i].path;
file_range.start_offset = _ranges[i].start_offset;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
cur_span->SetAttribute("instance_id", print_id(params.params.fragment_instance_id));

LOG(INFO) << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "query options is "
Expand Down
1 change: 0 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_request,
PFragmentRequestVersion version,
bool compact) {
LOG(INFO) << "yy debug _exec_plan_fragment_impl ser_request size: " << ser_request.size();
// Sometimes the BE do not receive the first heartbeat message and it receives request from FE
// If BE execute this fragment, it will core when it wants to get some property from master info.
if (ExecEnv::GetInstance()->master_info() == nullptr) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,11 @@ void OrcReader::_init_profile() {
_orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES);
_orc_profile.column_read_time = ADD_CHILD_TIMER(_profile, "ColumnReadTime", orc_profile);
_orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile, "GetBatchTime", orc_profile);
_orc_profile.create_reader_time = ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile);
_orc_profile.create_reader_time =
ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile);
_orc_profile.init_column_time = ADD_CHILD_TIMER(_profile, "InitColumnTime", orc_profile);
_orc_profile.set_fill_column_time = ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile);
_orc_profile.set_fill_column_time =
ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile);
_orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile, "DecodeValueTime", orc_profile);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile);
Expand Down
9 changes: 5 additions & 4 deletions be/src/vec/exec/scan/new_file_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
if (state->get_query_ctx() != nullptr
&& state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()];
_input_tuple_id = params.src_tuple_id;
_output_tuple_id = params.dest_tuple_id;
Expand Down Expand Up @@ -80,10 +80,11 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0 && scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
if (scan_ranges.size() > 0 &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
_input_tuple_id =
_input_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
Expand Down
17 changes: 9 additions & 8 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
_strict_mode = scan_range.params.strict_mode;
}

if (state->get_query_ctx() != nullptr
&& state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) {
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(parent->id()) > 0) {
_params = &(state->get_query_ctx()->file_scan_range_params_map[parent->id()]);
} else {
CHECK(scan_range.__isset.params);
Expand Down Expand Up @@ -559,7 +559,6 @@ Status VFileScanner::_convert_to_output_block(Block* block) {
}

Status VFileScanner::_get_next_reader() {
LOG(INFO) << "yy debug _get_next_reader";
while (true) {
_cur_reader.reset(nullptr);
_src_block_init = false;
Expand All @@ -579,7 +578,8 @@ Status VFileScanner::_get_next_reader() {
Status init_status;
TFileFormatType::type format_type = _params->format_type;
// JNI reader can only push down column value range
bool push_down_predicates = !_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
if (range.table_format_params.hudi_params.delta_logs.empty()) {
Expand All @@ -606,9 +606,9 @@ Status VFileScanner::_get_next_reader() {
->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
_cur_reader =
HudiJniReader::create_unique(*_params, range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
_cur_reader = HudiJniReader::create_unique(*_params,
range.table_format_params.hudi_params,
_file_slot_descs, _state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
}
Expand Down Expand Up @@ -708,7 +708,8 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_AVRO: {
_cur_reader = AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs);
_cur_reader =
AvroJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
Expand Down

0 comments on commit 48d1c01

Please sign in to comment.