Skip to content

Commit

Permalink
[improvement](segment) optimize segments load
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Jan 9, 2025
1 parent 12e9429 commit edb84dc
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 166 deletions.
18 changes: 7 additions & 11 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
continue;
}

int segment_start = 0;
int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments_rows.size(); ++i) {
Expand Down Expand Up @@ -171,22 +171,18 @@ Status ParallelScannerBuilder::_load() {
if (!_state->skip_delete_predicate()) {
read_source.fill_delete_predicates();
}
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;

for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
RETURN_IF_ERROR(beta_rowset->load_segment_num_rows());
const auto& segment_rows = beta_rowset->get_segment_num_rows();
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
}
_total_rows += rowset->num_rows();
}
Expand Down
26 changes: 22 additions & 4 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/crc32c.h"
Expand All @@ -68,10 +69,27 @@ Status BetaRowset::init() {
return Status::OK(); // no op
}

Status BetaRowset::do_load(bool /*use_cache*/) {
// do nothing.
// the segments in this rowset will be loaded by calling load_segments() explicitly.
return Status::OK();
Status BetaRowset::load_segment_num_rows() {
DCHECK(_rowset_state_machine.rowset_state() == ROWSET_LOADED);

return _load_segment_rows_once.call([this] {
auto segment_count = num_segments();
_segments_rows.resize(segment_count);
for (int64_t i = 0; i != segment_count; ++i) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
std::static_pointer_cast<BetaRowset>(shared_from_this()), i,
&segment_cache_handle, false, false));
const auto& tmp_segments = segment_cache_handle.get_segments();
_segments_rows[i] = tmp_segments[0]->num_rows();
}
return Status::OK();
});
}

const std::vector<uint32_t>& BetaRowset::get_segment_num_rows() {
DCHECK(_load_segment_rows_once.has_called() && _load_segment_rows_once.stored_result().ok());
return _segments_rows;
}

Status BetaRowset::get_inverted_index_size(size_t* index_size) {
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,16 @@ class BetaRowset final : public Rowset {
Status show_nested_index_file(rapidjson::Value* rowset_value,
rapidjson::Document::AllocatorType& allocator);

Status load_segment_num_rows();
const std::vector<uint32_t>& get_segment_num_rows();

protected:
BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta,
std::string tablet_path);

// init segment groups
Status init() override;

Status do_load(bool use_cache) override;

void do_close() override;

Status check_current_rowset_segment() override;
Expand All @@ -107,6 +108,9 @@ class BetaRowset final : public Rowset {
private:
friend class RowsetFactory;
friend class BetaRowsetReader;

DorisCallOnce<Status> _load_segment_rows_once;
std::vector<uint32_t> _segments_rows;
};

} // namespace doris
Expand Down
91 changes: 24 additions & 67 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.io_ctx.expiration_time = 0;
}

// load segments
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
Expand All @@ -226,76 +225,40 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
SegmentCacheHandle segment_cache_handle;
{
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
}

// create iterator for each segment
auto& segments = segment_cache_handle.get_segments();
_segments_rows.resize(segments.size());
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
}
if (_read_context->record_rowids) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows));
}

auto segment_count = _rowset->num_segments();
auto [seg_start, seg_end] = _segment_offsets;
// If seg_start == seg_end, it means that the segments of a rowset is not
// split scanned by multiple scanners, and the rowset reader is used to read the whole rowset.
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segments.size();
seg_end = segment_count;
}
if (_read_context->record_rowids && _read_context->rowid_conversion) {
// init segment rowid map for rowid conversion
RETURN_IF_ERROR(_rowset->load_segment_num_rows());
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(
rowset()->rowset_id(), _rowset->get_segment_num_rows()));
}

const bool is_merge_iterator = _is_merge_iterator();
const bool use_lazy_init_iterators =
!is_merge_iterator && _read_context->reader_type == ReaderType::READER_QUERY;
for (int i = seg_start; i < seg_end; i++) {
for (int64_t i = seg_start; i < seg_end; i++) {
SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
auto& seg_ptr = segments[i];
std::unique_ptr<RowwiseIterator> iter;

if (use_lazy_init_iterators) {
/// For non-merging iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
_read_options);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
local_options);
}
/// For iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, _read_options);
} else {
Status status;
/// If `_segment_row_ranges` is empty, the segment is not split.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
status = seg_ptr->new_iterator(_input_schema, _read_options, &iter);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
status = seg_ptr->new_iterator(_input_schema, local_options, &iter);
}

if (!status.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
<< "]: " << status.to_string();
return Status::Error<ROWSET_READER_INIT>(status.to_string());
}
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, local_options);
}

if (iter->empty()) {
Expand Down Expand Up @@ -423,10 +386,4 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const {
_read_context->sequence_id_idx == -1) ||
_read_context->enable_unique_key_merge_on_write);
}

Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) {
segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
return Status::OK();
}

} // namespace doris
8 changes: 2 additions & 6 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ class BetaRowsetReader : public RowsetReader {
return _iterator->current_block_row_locations(locations);
}

Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) override;

bool update_profile(RuntimeProfile* profile) override;

RowsetReaderSharedPtr clone() override;
Expand All @@ -97,7 +95,7 @@ class BetaRowsetReader : public RowsetReader {
_rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1;
}

int32_t _get_segment_num() const {
int64_t _get_segment_num() const {
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
Expand All @@ -108,7 +106,7 @@ class BetaRowsetReader : public RowsetReader {

DorisCallOnce<Status> _init_iter_once;

std::pair<int, int> _segment_offsets;
std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;

SchemaSPtr _input_schema;
Expand All @@ -120,8 +118,6 @@ class BetaRowsetReader : public RowsetReader {

std::unique_ptr<RowwiseIterator> _iterator;

std::vector<uint32_t> _segments_rows;

StorageReadOptions _read_options;

bool _empty = false;
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ Status Rowset::load(bool use_cache) {
std::lock_guard load_lock(_lock);
// after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
// first do load, then change the state
RETURN_IF_ERROR(do_load(use_cache));
RETURN_IF_ERROR(_rowset_state_machine.on_load());
}
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
virtual Status init() = 0;

// The actual implementation of load(). Guaranteed by to called exactly once.
virtual Status do_load(bool use_cache) = 0;

// release resources in this api
virtual void do_close() = 0;

Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct RowSetSplits {
// if segment_offsets is not empty, means we only scan
// [pair.first, pair.second) segment in rs_reader, only effective in dup key
// and pipeline
std::pair<int, int> segment_offsets;
std::pair<int64_t, int64_t> segment_offsets;

// RowRanges of each segment.
std::vector<RowRanges> segment_row_ranges;
Expand Down Expand Up @@ -83,10 +83,6 @@ class RowsetReader {
return Status::NotSupported("to be implemented");
}

virtual Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) {
return Status::NotSupported("to be implemented");
}

virtual bool update_profile(RuntimeProfile* profile) = 0;

virtual RowsetReaderSharedPtr clone() = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
RowIdConversion* rowid_conversion;
RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
RowsetId rowset_id;
Expand Down
23 changes: 19 additions & 4 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"

#include "olap/segment_loader.h"

namespace doris::segment_v2 {

LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> segment,
SchemaSPtr schema, const StorageReadOptions& opts)
: _schema(std::move(schema)), _segment(std::move(segment)), _read_options(opts) {}
LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id,
bool should_use_cache, SchemaSPtr schema,
const StorageReadOptions& opts)
: _rowset(std::move(rowset)),
_segment_id(segment_id),
_should_use_cache(should_use_cache),
_schema(std::move(schema)),
_read_options(opts) {}

/// Here do not use the argument of `opts`,
/// see where the iterator is created in `BetaRowsetReader::get_segment_iterators`
Expand All @@ -31,7 +38,15 @@ Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
return Status::OK();
}

RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, &_inner_iterator));
std::shared_ptr<Segment> segment;
{
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
_rowset, _segment_id, &segment_cache_handle, _should_use_cache, false));
const auto& tmp_segments = segment_cache_handle.get_segments();
segment = tmp_segments[0];
}
RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options, &_inner_iterator));
return _inner_iterator->init(_read_options);
}

Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "vec/core/block.h"

namespace doris {
class BetaRowset;
using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
}; // namespace doris
namespace doris::segment_v2 {

using namespace vectorized;

class LazyInitSegmentIterator : public RowwiseIterator {
public:
LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema,
const StorageReadOptions& opts);
LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id, bool should_use_cache,
SchemaSPtr schema, const StorageReadOptions& opts);

~LazyInitSegmentIterator() override = default;

Expand Down Expand Up @@ -59,8 +63,10 @@ class LazyInitSegmentIterator : public RowwiseIterator {

private:
bool _need_lazy_init {true};
BetaRowsetSharedPtr _rowset;
int64_t _segment_id {-1};
bool _should_use_cache {false};
SchemaSPtr _schema = nullptr;
std::shared_ptr<Segment> _segment;
StorageReadOptions _read_options;
RowwiseIteratorUPtr _inner_iterator;
};
Expand Down
Loading

0 comments on commit edb84dc

Please sign in to comment.