Skip to content

Commit

Permalink
[Enhancement] Skip load primary index for empty rowset (backport #55628
Browse files Browse the repository at this point in the history
…) (#55806)

Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev authored Feb 11, 2025
1 parent 2c37614 commit 82a9613
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 3 deletions.
13 changes: 10 additions & 3 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
SCOPED_THREAD_LOCAL_CHECK_MEM_LIMIT_SETTER(true);
SCOPED_THREAD_LOCAL_SINGLETON_CHECK_MEM_TRACKER_SETTER(
config::enable_pk_strict_memcheck ? _tablet.update_mgr()->mem_tracker() : nullptr);
// local persistent index will update index version, so we need to load first
// still need prepre primary index even there is an empty compaction
if (_index_entry == nullptr && _has_empty_compaction) {
if (_index_entry == nullptr &&
(_has_empty_compaction || (_metadata->enable_persistent_index() &&
_metadata->persistent_index_type() == PersistentIndexTypePB::LOCAL))) {
// get lock to avoid gc
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });
Expand Down Expand Up @@ -248,11 +251,11 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });

RETURN_IF_ERROR(prepare_primary_index());
if (op_write.dels_size() == 0 && op_write.rowset().num_rows() == 0 &&
!op_write.rowset().has_delete_predicate()) {
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
if (is_column_mode_partial_update(op_write)) {
return _tablet.update_mgr()->publish_column_mode_partial_update(op_write, txn_id, _metadata, &_tablet,
&_builder, _base_version);
Expand All @@ -267,15 +270,19 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });

RETURN_IF_ERROR(prepare_primary_index());
if (op_compaction.input_rowsets().empty()) {
DCHECK(!op_compaction.has_output_rowset() || op_compaction.output_rowset().num_rows() == 0);
// Apply the compaction operation to the cloud native pk index.
// This ensures that the pk index is updated with the compaction changes.
_builder.remove_compacted_sst(op_compaction);
if (op_compaction.input_sstables().empty() || !op_compaction.has_output_sstable()) {
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
RETURN_IF_ERROR(_index_entry->value().apply_opcompaction(*_metadata, op_compaction));
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, txn_id, *_metadata, _tablet,
_index_entry, &_builder, _base_version);
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,4 +1122,13 @@ Status UpdateManager::pk_index_major_compaction(int64_t tablet_id, DataDir* data
return Status::OK();
}

bool UpdateManager::TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt) {
auto index_entry = _index_cache.get(tablet_id);
if (index_entry == nullptr) {
return expected_cnt == 0;
}
_index_cache.release(index_entry);
return index_entry->get_ref() == expected_cnt;
}

} // namespace starrocks::lake
2 changes: 2 additions & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class UpdateManager {

Status pk_index_major_compaction(int64_t tablet_id, DataDir* data_dir);

bool TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt);

private:
// print memory tracker state
void _print_memory_stats();
Expand Down
32 changes: 32 additions & 0 deletions be/test/storage/lake/alter_tablet_meta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,4 +566,36 @@ TEST_F(AlterTabletMetaTest, test_alter_persistent_index_type) {
ASSERT_TRUE(tablet_meta4->orphan_files_size() > 0);
}

TEST_F(AlterTabletMetaTest, test_skip_load_pindex) {
std::shared_ptr<TabletMetadata> tablet_metadata = generate_simple_tablet_metadata(PRIMARY_KEYS);
ASSERT_OK(_tablet_mgr->put_tablet_metadata(*tablet_metadata));
// write empty rowset
{
TxnLogPB log;
auto op_write_meta = log.mutable_op_write();
auto rs_meta = op_write_meta->mutable_rowset();
rs_meta->set_id(next_id());
rs_meta->set_num_rows(0);

auto tablet_id = tablet_metadata->id();
auto version = tablet_metadata->version() + 1;
std::unique_ptr<TxnLogApplier> log_applier =
new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version, false);
ASSERT_OK(log_applier->apply(log));
ASSERT_TRUE(_tablet_mgr->update_mgr()->TEST_primary_index_refcnt(tablet_metadata->id(), 0));
}

{
TxnLogPB log;
auto op_compaction_meta = log.mutable_op_compaction();
auto tablet_id = tablet_metadata->id();
auto version = tablet_metadata->version() + 1;
op_compaction_meta->set_compact_version(version);
std::unique_ptr<TxnLogApplier> log_applier =
new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version, false);
ASSERT_OK(log_applier->apply(log));
ASSERT_TRUE(_tablet_mgr->update_mgr()->TEST_primary_index_refcnt(tablet_metadata->id(), 0));
}
}

} // namespace starrocks::lake

0 comments on commit 82a9613

Please sign in to comment.