Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Skip load primary index for empty rowset #55628

Merged
merged 4 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions be/src/storage/lake/txn_log_applier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,11 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
SCOPED_THREAD_LOCAL_CHECK_MEM_LIMIT_SETTER(true);
SCOPED_THREAD_LOCAL_SINGLETON_CHECK_MEM_TRACKER_SETTER(
config::enable_pk_strict_memcheck ? _tablet.update_mgr()->mem_tracker() : nullptr);
// local persistent index will update index version, so we need to load first
// still need prepre primary index even there is an empty compaction
if (_index_entry == nullptr && _has_empty_compaction) {
if (_index_entry == nullptr &&
(_has_empty_compaction || (_metadata->enable_persistent_index() &&
_metadata->persistent_index_type() == PersistentIndexTypePB::LOCAL))) {
// get lock to avoid gc
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });
Expand Down Expand Up @@ -249,11 +252,11 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });

RETURN_IF_ERROR(prepare_primary_index());
if (op_write.dels_size() == 0 && op_write.rowset().num_rows() == 0 &&
!op_write.rowset().has_delete_predicate()) {
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
if (is_column_mode_partial_update(op_write)) {
return _tablet.update_mgr()->publish_column_mode_partial_update(op_write, txn_id, _metadata, &_tablet,
&_builder, _base_version);
Expand All @@ -268,15 +271,19 @@ class PrimaryKeyTxnLogApplier : public TxnLogApplier {
_tablet.update_mgr()->lock_shard_pk_index_shard(_tablet.id());
DeferOp defer([&]() { _tablet.update_mgr()->unlock_shard_pk_index_shard(_tablet.id()); });

RETURN_IF_ERROR(prepare_primary_index());
if (op_compaction.input_rowsets().empty()) {
DCHECK(!op_compaction.has_output_rowset() || op_compaction.output_rowset().num_rows() == 0);
// Apply the compaction operation to the cloud native pk index.
// This ensures that the pk index is updated with the compaction changes.
_builder.remove_compacted_sst(op_compaction);
if (op_compaction.input_sstables().empty() || !op_compaction.has_output_sstable()) {
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
RETURN_IF_ERROR(_index_entry->value().apply_opcompaction(*_metadata, op_compaction));
return Status::OK();
}
RETURN_IF_ERROR(prepare_primary_index());
return _tablet.update_mgr()->publish_primary_compaction(op_compaction, txn_id, *_metadata, _tablet,
_index_entry, &_builder, _base_version);
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1125,4 +1125,13 @@ Status UpdateManager::pk_index_major_compaction(int64_t tablet_id, DataDir* data
return Status::OK();
}

bool UpdateManager::TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt) {
auto index_entry = _index_cache.get(tablet_id);
if (index_entry == nullptr) {
return expected_cnt == 0;
}
_index_cache.release(index_entry);
return index_entry->get_ref() == expected_cnt;
}

} // namespace starrocks::lake
2 changes: 2 additions & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class UpdateManager {

Status pk_index_major_compaction(int64_t tablet_id, DataDir* data_dir);

bool TEST_primary_index_refcnt(int64_t tablet_id, uint32_t expected_cnt);

private:
// print memory tracker state
void _print_memory_stats();
Expand Down
32 changes: 32 additions & 0 deletions be/test/storage/lake/alter_tablet_meta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,36 @@ TEST_F(AlterTabletMetaTest, test_alter_persistent_index_type) {
ASSERT_TRUE(tablet_meta4->orphan_files_size() > 0);
}

TEST_F(AlterTabletMetaTest, test_skip_load_pindex) {
std::shared_ptr<TabletMetadata> tablet_metadata = generate_simple_tablet_metadata(PRIMARY_KEYS);
ASSERT_OK(_tablet_mgr->put_tablet_metadata(*tablet_metadata));
// write empty rowset
{
TxnLogPB log;
auto op_write_meta = log.mutable_op_write();
auto rs_meta = op_write_meta->mutable_rowset();
rs_meta->set_id(next_id());
rs_meta->set_num_rows(0);

auto tablet_id = tablet_metadata->id();
auto version = tablet_metadata->version() + 1;
std::unique_ptr<TxnLogApplier> log_applier =
new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version, false);
ASSERT_OK(log_applier->apply(log));
ASSERT_TRUE(_tablet_mgr->update_mgr()->TEST_primary_index_refcnt(tablet_metadata->id(), 0));
}

{
TxnLogPB log;
auto op_compaction_meta = log.mutable_op_compaction();
auto tablet_id = tablet_metadata->id();
auto version = tablet_metadata->version() + 1;
op_compaction_meta->set_compact_version(version);
std::unique_ptr<TxnLogApplier> log_applier =
new_txn_log_applier(Tablet(_tablet_mgr.get(), tablet_id), tablet_metadata, version, false);
ASSERT_OK(log_applier->apply(log));
ASSERT_TRUE(_tablet_mgr->update_mgr()->TEST_primary_index_refcnt(tablet_metadata->id(), 0));
}
}

} // namespace starrocks::lake
Loading