Skip to content

Commit

Permalink
fix oom
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jul 14, 2023
1 parent b013f80 commit e8042ae
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 89 deletions.
7 changes: 6 additions & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
if (0 == _ranges.size()) {
return;
}
_file_params.file_type = _ranges[0].file_type;
_file_params.format_type = _ranges[0].format_type;
_file_params.src_tuple_id = _params.src_tuple_id;
_file_params.dest_tuple_id = _params.dest_tuple_id;
Expand All @@ -336,6 +335,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&

for (int i = 0; i < _ranges.size(); ++i) {
TFileRangeDesc file_range;
// TODO(cmy): in previous implementation, the file_type is set in _file_params
// and it use _ranges[0].file_type.
// Later, this field is moved to TFileRangeDesc, but here we still only use _ranges[0]'s
// file_type.
// Because I don't know if othere range has this field, so just keep it same as before.
file_range.file_type = _ranges[0].file_type;
file_range.load_id = _ranges[i].load_id;
file_range.path = _ranges[i].path;
file_range.start_offset = _ranges[i].start_offset;
Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
query_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));

// set file scan range params
if (params.__isset.file_scan_params) {
query_ctx->file_scan_range_params_map = params.file_scan_params;
}

query_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
<< " coord_addr " << query_ctx->coord_addr
Expand Down Expand Up @@ -766,8 +772,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
cur_span->SetAttribute("query_id", print_id(params.params.query_id));
cur_span->SetAttribute("instance_id", print_id(params.params.fragment_instance_id));

VLOG_ROW << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
LOG(INFO) << "exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "query options is "
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class QueryContext {

std::vector<TUniqueId> fragment_ids;

std::map<int, TFileScanRangeParams> file_scan_range_params_map;

private:
ExecEnv* _exec_env;
vectorized::VecDateTimeValue _start_time;
Expand Down
21 changes: 18 additions & 3 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
_is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
_file_compress_type = _params.compress_type;
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_size = _range.size;

_text_converter.reset(new (std::nothrow) TextConverter('\\', _array_delimiter[0]));
Expand All @@ -111,7 +116,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params
_decompressor(nullptr),
_io_ctx(io_ctx) {
_file_format_type = _params.format_type;
_file_compress_type = _params.compress_type;
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
} else {
_file_compress_type = _params.compress_type;
}
_size = _range.size;
_init_system_properties();
_init_file_description();
Expand All @@ -120,7 +130,12 @@ CsvReader::CsvReader(RuntimeProfile* profile, const TFileScanRangeParams& params
CsvReader::~CsvReader() = default;

void CsvReader::_init_system_properties() {
_system_properties.system_type = _params.file_type;
if (_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _range.file_type;
} else {
_system_properties.system_type = _params.file_type;
}
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams
}

void NewJsonReader::_init_system_properties() {
_system_properties.system_type = _params.file_type;
if (_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _range.file_type;
} else {
_system_properties.system_type = _params.file_type;
}
_system_properties.properties = _params.properties;
_system_properties.hdfs_params = _params.hdfs_params;
if (_params.__isset.broker_addresses) {
Expand Down
30 changes: 23 additions & 7 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ void OrcReader::_collect_profile_on_close() {
COUNTER_UPDATE(_orc_profile.read_bytes, _statistics.fs_read_bytes);
COUNTER_UPDATE(_orc_profile.column_read_time, _statistics.column_read_time);
COUNTER_UPDATE(_orc_profile.get_batch_time, _statistics.get_batch_time);
COUNTER_UPDATE(_orc_profile.parse_meta_time, _statistics.parse_meta_time);
COUNTER_UPDATE(_orc_profile.create_reader_time, _statistics.create_reader_time);
COUNTER_UPDATE(_orc_profile.init_column_time, _statistics.init_column_time);
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
}
Expand All @@ -200,7 +202,11 @@ void OrcReader::_init_profile() {
_orc_profile.read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES);
_orc_profile.column_read_time = ADD_CHILD_TIMER(_profile, "ColumnReadTime", orc_profile);
_orc_profile.get_batch_time = ADD_CHILD_TIMER(_profile, "GetBatchTime", orc_profile);
_orc_profile.parse_meta_time = ADD_CHILD_TIMER(_profile, "ParseMetaTime", orc_profile);
_orc_profile.create_reader_time =
ADD_CHILD_TIMER(_profile, "CreateReaderTime", orc_profile);
_orc_profile.init_column_time = ADD_CHILD_TIMER(_profile, "InitColumnTime", orc_profile);
_orc_profile.set_fill_column_time =
ADD_CHILD_TIMER(_profile, "SetFillColumnTime", orc_profile);
_orc_profile.decode_value_time = ADD_CHILD_TIMER(_profile, "DecodeValueTime", orc_profile);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER(_profile, "DecodeNullMapTime", orc_profile);
Expand Down Expand Up @@ -254,9 +260,14 @@ Status OrcReader::init_reader(
not_single_slot_filter_conjuncts->end());
}
_obj_pool = std::make_shared<ObjectPool>();
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_create_file_reader());
RETURN_IF_ERROR(_init_read_columns());
{
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
RETURN_IF_ERROR(_create_file_reader());
}
{
SCOPED_RAW_TIMER(&_statistics.init_column_time);
RETURN_IF_ERROR(_init_read_columns());
}
return Status::OK();
}

Expand Down Expand Up @@ -646,7 +657,7 @@ Status OrcReader::set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
SCOPED_RAW_TIMER(&_statistics.set_fill_column_time);

// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
Expand Down Expand Up @@ -864,7 +875,12 @@ void OrcReader::_init_bloom_filter(
}

void OrcReader::_init_system_properties() {
_system_properties.system_type = _scan_params.file_type;
if (_scan_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _scan_range.file_type;
} else {
_system_properties.system_type = _scan_params.file_type;
}
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ class OrcReader : public GenericReader {
int64_t fs_read_bytes = 0;
int64_t column_read_time = 0;
int64_t get_batch_time = 0;
int64_t parse_meta_time = 0;
int64_t create_reader_time = 0;
int64_t init_column_time = 0;
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
};
Expand Down Expand Up @@ -200,7 +202,9 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* read_bytes;
RuntimeProfile::Counter* column_read_time;
RuntimeProfile::Counter* get_batch_time;
RuntimeProfile::Counter* parse_meta_time;
RuntimeProfile::Counter* create_reader_time;
RuntimeProfile::Counter* init_column_time;
RuntimeProfile::Counter* set_fill_column_time;
RuntimeProfile::Counter* decode_value_time;
RuntimeProfile::Counter* decode_null_map_time;
};
Expand Down
7 changes: 6 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,12 @@ Status ParquetReader::open() {
}

void ParquetReader::_init_system_properties() {
_system_properties.system_type = _scan_params.file_type;
if (_scan_range.__isset.file_type) {
// for compatibility
_system_properties.system_type = _scan_range.file_type;
} else {
_system_properties.system_type = _scan_params.file_type;
}
_system_properties.properties = _scan_params.properties;
_system_properties.hdfs_params = _scan_params.hdfs_params;
if (_scan_params.__isset.broker_addresses) {
Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/scan/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,13 @@ Status AvroJNIReader::init_fetch_table_reader(
index++;
}

TFileType::type type = _params.file_type;
TFileType::type type;
if (_range.__isset.file_type) {
// for compatibility
type = _range.file_type;
} else {
type = _params.file_type;
}
std::map<String, String> required_param = {
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
Expand Down
11 changes: 10 additions & 1 deletion be/src/vec/exec/scan/new_file_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ Status NewFileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {

Status NewFileScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.count(id()) > 0) {
TFileScanRangeParams& params = state->get_query_ctx()->file_scan_range_params_map[id()];
_input_tuple_id = params.src_tuple_id;
_output_tuple_id = params.dest_tuple_id;
}
return Status::OK();
}

Expand All @@ -74,7 +80,10 @@ void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_
_scan_ranges.shrink_to_fit();
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
}
if (scan_ranges.size() > 0) {
if (scan_ranges.size() > 0 &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
_input_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
Expand Down
Loading

0 comments on commit e8042ae

Please sign in to comment.