diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 88c69ab5c9a584a..1419c8cdce892b3 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -70,7 +70,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list& continue; } - int segment_start = 0; + int64_t segment_start = 0; auto split = RowSetSplits(reader->clone()); for (size_t i = 0; i != segments_rows.size(); ++i) { @@ -179,14 +179,15 @@ Status ParallelScannerBuilder::_load() { auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); - SegmentCacheHandle segment_cache_handle; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - std::dynamic_pointer_cast(rowset), &segment_cache_handle, - enable_segment_cache, false)); - - for (const auto& segment : segment_cache_handle.get_segments()) { - _all_segments_rows[rowset_id].emplace_back(segment->num_rows()); + auto beta_rowset = std::dynamic_pointer_cast(rowset); + auto segment_count = rowset->num_segments(); + for (int64_t i = 0; i != segment_count; i++) { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( + beta_rowset, i, &segment_cache_handle, enable_segment_cache, false)); + const auto& segments = segment_cache_handle.get_segments(); + _all_segments_rows[rowset_id].emplace_back(segments[0]->num_rows()); } _total_rows += rowset->num_rows(); } diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 9a4d71587a02c10..1f90a700578f1a3 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -226,32 +226,44 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context // When reader type is for query, session variable `enable_segment_cache` should be respected. bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY && enable_segment_cache); - SegmentCacheHandle segment_cache_handle; - { - SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns); - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - _rowset, &segment_cache_handle, should_use_cache, - /*need_load_pk_index_and_bf*/ false)); - } + auto segment_count = _rowset->num_segments(); - // create iterator for each segment - auto& segments = segment_cache_handle.get_segments(); - _segments_rows.resize(segments.size()); - for (size_t i = 0; i < segments.size(); i++) { - _segments_rows[i] = segments[i]->num_rows(); - } - if (_read_context->record_rowids) { - // init segment rowid map for rowid conversion - std::vector segment_num_rows; - RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows)); - RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), - segment_num_rows)); - } + std::vector segments(segment_count); auto [seg_start, seg_end] = _segment_offsets; if (seg_start == seg_end) { seg_start = 0; - seg_end = segments.size(); + seg_end = segment_count; + } + if (_read_context->record_rowids) { + _segments_rows.resize(segment_count); + { + SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns); + for (int64_t i = 0; i != segment_count; ++i) { + SegmentCacheHandle segment_cache_handle; + RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( + _rowset, i, &segment_cache_handle, should_use_cache, + /*need_load_pk_index_and_bf*/ false)); + const auto& tmp_segments = segment_cache_handle.get_segments(); + _segments_rows[i] = tmp_segments[0]->num_rows(); + if (i >= seg_start && i < seg_end) { + segments[i] = tmp_segments[0]; + } + } + } + + // init segment rowid map for rowid conversion + RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(), + _segments_rows)); + } else { + SegmentCacheHandle segment_cache_handle; + for (int64_t i = seg_start; i != seg_end; ++i) { + RETURN_IF_ERROR(SegmentLoader::instance()->load_segment( + _rowset, i, &segment_cache_handle, should_use_cache, + /*need_load_pk_index_and_bf*/ false)); + const auto& tmp_segments = segment_cache_handle.get_segments(); + segments[i] = tmp_segments[0]; + } } const bool is_merge_iterator = _is_merge_iterator(); diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 33b2fb6a58c08ba..a88d907ff02df5b 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -80,6 +80,7 @@ class BetaRowsetReader : public RowsetReader { return _iterator->current_block_row_locations(locations); } + // Currently only used in UT Status get_segment_num_rows(std::vector* segment_num_rows) override; bool update_profile(RuntimeProfile* profile) override; @@ -97,7 +98,7 @@ class BetaRowsetReader : public RowsetReader { _rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1; } - int32_t _get_segment_num() const { + int64_t _get_segment_num() const { auto [seg_start, seg_end] = _segment_offsets; if (seg_start == seg_end) { seg_start = 0; @@ -108,7 +109,7 @@ class BetaRowsetReader : public RowsetReader { DorisCallOnce _init_iter_once; - std::pair _segment_offsets; + std::pair _segment_offsets; std::vector _segment_row_ranges; SchemaSPtr _input_schema; @@ -120,6 +121,7 @@ class BetaRowsetReader : public RowsetReader { std::unique_ptr _iterator; + // Currently only used when _read_context->record_rowids is true std::vector _segments_rows; StorageReadOptions _read_options; diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 58c0f592b9c5451..6ce6e7e5dac155a 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -40,7 +40,7 @@ struct RowSetSplits { // if segment_offsets is not empty, means we only scan // [pair.first, pair.second) segment in rs_reader, only effective in dup key // and pipeline - std::pair segment_offsets; + std::pair segment_offsets; // RowRanges of each segment. std::vector segment_row_ranges; diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 4240f7e250a06b0..1bd21ad4e553c0a 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -52,6 +52,37 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) { LRUCachePolicy::erase(key.encode()); } +Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id, + SegmentCacheHandle* cache_handle, bool use_cache, + bool need_load_pk_index_and_bf, + OlapReaderStatistics* index_load_stats) { + SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id); + if (_segment_cache->lookup(cache_key, cache_handle)) { + // Has to check the segment status here, because the segment in cache may has something wrong during + // load index or create column reader. + // Not merge this if logic with previous to make the logic more clear. + if (cache_handle->pop_unhealthy_segment() == nullptr) { + return Status::OK(); + } + } + // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. + segment_v2::SegmentSharedPtr segment; + RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment)); + if (need_load_pk_index_and_bf) { + RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats)); + } + if (use_cache && !config::disable_segment_cache) { + // memory of SegmentCache::CacheValue will be handled by SegmentCache + auto* cache_value = new SegmentCache::CacheValue(segment); + _cache_mem_usage += segment->meta_mem_usage(); + _segment_cache->insert(cache_key, *cache_value, cache_handle); + } else { + cache_handle->push_segment(std::move(segment)); + } + + return Status::OK(); +} + Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, bool use_cache, bool need_load_pk_index_and_bf, @@ -60,29 +91,8 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, return Status::OK(); } for (int64_t i = 0; i < rowset->num_segments(); i++) { - SegmentCache::CacheKey cache_key(rowset->rowset_id(), i); - if (_segment_cache->lookup(cache_key, cache_handle)) { - // Has to check the segment status here, because the segment in cache may has something wrong during - // load index or create column reader. - // Not merge this if logic with previous to make the logic more clear. - if (cache_handle->pop_unhealthy_segment() == nullptr) { - continue; - } - } - // If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache. - segment_v2::SegmentSharedPtr segment; - RETURN_IF_ERROR(rowset->load_segment(i, &segment)); - if (need_load_pk_index_and_bf) { - RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats)); - } - if (use_cache && !config::disable_segment_cache) { - // memory of SegmentCache::CacheValue will be handled by SegmentCache - auto* cache_value = new SegmentCache::CacheValue(segment); - _cache_mem_usage += segment->meta_mem_usage(); - _segment_cache->insert(cache_key, *cache_value, cache_handle); - } else { - cache_handle->push_segment(std::move(segment)); - } + RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache, need_load_pk_index_and_bf, + index_load_stats)); } cache_handle->set_inited(); return Status::OK(); diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 2c5b1ed200dde78..d1813f434cb2789 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -120,6 +120,13 @@ class SegmentLoader { bool use_cache = false, bool need_load_pk_index_and_bf = false, OlapReaderStatistics* index_load_stats = nullptr); + // Load one segment of "rowset", return the "cache_handle" which contains segments. + // If use_cache is true, it will be loaded from _cache. + Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id, + SegmentCacheHandle* cache_handle, bool use_cache = false, + bool need_load_pk_index_and_bf = false, + OlapReaderStatistics* index_load_stats = nullptr); + void erase_segment(const SegmentCache::CacheKey& key); void erase_segments(const RowsetId& rowset_id, int64_t num_segments); diff --git a/be/test/olap/ordered_data_compaction_test.cpp b/be/test/olap/ordered_data_compaction_test.cpp index 058ed52dd995dc7..831e3031378e4e5 100644 --- a/be/test/olap/ordered_data_compaction_test.cpp +++ b/be/test/olap/ordered_data_compaction_test.cpp @@ -464,8 +464,6 @@ TEST_F(OrderedDataCompactionTest, test_01) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index df56cd0559e4d87..175c25aa5418ec2 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -361,6 +361,7 @@ class TestRowIdConversion : public testing::TestWithParam return_columns = {0, 1}; diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp index 62a3232889dedee..336524c6448cefe 100644 --- a/be/test/olap/segcompaction_mow_test.cpp +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -230,6 +230,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam { int expect_total_rows, int rows_mark_deleted, bool skip_value_check = false) { RowsetReaderContext reader_context; + reader_context.record_rowids = true; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut reader_context.reader_type = ReaderType::READER_QUERY; diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp index 4029467dd42fbb8..f41e9dacf0c3ca3 100644 --- a/be/test/olap/segcompaction_test.cpp +++ b/be/test/olap/segcompaction_test.cpp @@ -344,6 +344,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) { { // read RowsetReaderContext reader_context; + reader_context.record_rowids = true; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; @@ -849,6 +850,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) { { // read RowsetReaderContext reader_context; + reader_context.record_rowids = true; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; @@ -1113,6 +1115,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) { { // read RowsetReaderContext reader_context; + reader_context.record_rowids = true; reader_context.tablet_schema = tablet_schema; // use this type to avoid cache from other ut reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION; diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 4c4409a75068c1f..dd6f6932efcb266 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -521,8 +521,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -628,8 +626,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -736,8 +732,6 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " " @@ -848,8 +842,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { ASSERT_GE(std::get<0>(item), 100); @@ -951,8 +943,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // All keys less than 1000 are deleted by delete handler for (auto& item : output_data) { ASSERT_GE(std::get<0>(item), 100); @@ -1042,8 +1032,6 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { EXPECT_EQ(Status::Error(""), s); EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size()); EXPECT_EQ(output_data.size(), num_segments * rows_per_segment); - std::vector segment_num_rows; - EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok()); // check vertical compaction result for (auto id = 0; id < output_data.size(); id++) { LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "