From 156d64637987f0413dc9e8a568a9d9fe7bfd2297 Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov Date: Fri, 10 Oct 2025 21:37:26 +0300 Subject: [PATCH] Blob compaction stops after TEvGetWriteInfo (#26660) --- .../persqueue/pqtablet/partition/partition.cpp | 7 +++++++ ydb/core/persqueue/pqtablet/partition/partition.h | 5 +++++ .../pqtablet/partition/partition_compaction.cpp | 15 +++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/ydb/core/persqueue/pqtablet/partition/partition.cpp b/ydb/core/persqueue/pqtablet/partition/partition.cpp index 7d4844692f08..b14c23397a30 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition.cpp @@ -1472,6 +1472,13 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon "Topic.Partition.GetWriteInfo", NWilson::EFlags::AUTO_END); + StopCompaction = true; + if (CompactionInProgress) { + LOG_D("Event TEvPQ::TEvGetWriteInfoRequest will be processed later"); + PendingGetWriteInfoRequest.reset(ev->Release().Release()); + return; + } + ProcessPendingEvent(ev, ctx); } diff --git a/ydb/core/persqueue/pqtablet/partition/partition.h b/ydb/core/persqueue/pqtablet/partition/partition.h index 9d3bc0bac5a2..f278b2c1fb82 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition.h +++ b/ydb/core/persqueue/pqtablet/partition/partition.h @@ -1166,6 +1166,11 @@ class TPartition : public TBaseActor { bool WasTheLastBlobBig = true; void DumpKeysForBlobsCompaction() const; + + void TryProcessGetWriteInfoRequest(const TActorContext& ctx); + + std::unique_ptr PendingGetWriteInfoRequest; + bool StopCompaction = false; }; inline ui64 TPartition::GetStartOffset() const { diff --git a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp index 58136a8b4507..45675caeccb4 100644 --- a/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp +++ b/ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp @@ -163,6 +163,11 @@ void TPartition::DumpKeysForBlobsCompaction() const void TPartition::TryRunCompaction() { + if (StopCompaction) { + LOG_D("Blobs compaction is stopped"); + return; + } + if (CompactionInProgress) { LOG_D("Blobs compaction in progress"); return; @@ -499,10 +504,20 @@ void TPartition::BlobsForCompactionWereWrite() KeysForCompaction.clear(); CompactionBlobsCount = 0; + TryProcessGetWriteInfoRequest(ctx); + ProcessTxsAndUserActs(ctx); // Now you can delete unnecessary keys. TryRunCompaction(); } +void TPartition::TryProcessGetWriteInfoRequest(const TActorContext& ctx) +{ + if (PendingGetWriteInfoRequest) { + ProcessPendingEvent(std::move(PendingGetWriteInfoRequest), ctx); + PendingGetWriteInfoRequest = nullptr; + } +} + void TPartition::EndProcessWritesForCompaction(TEvKeyValue::TEvRequest* request, const TInstant blobCreationUnixTime, const TActorContext& ctx) { if (CompactionBlobEncoder.HeadCleared) {