Skip to content

DSProxyDiscover: fix bugs, more tests #19764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,17 +790,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {

DSP_LOG_ALERT_S("BSD18", str.Str());

if (FromLeader) {
Sleep(TDuration::Seconds(1));

str << " logacc# ";
LogCtx.LogAcc.Output(str);
str << " verboseNoData# ";
str << msg->DebugInfo;

Y_ABORT_UNLESS(false, "%s", str.Str().data());
}

IsGetDataDone = true;
if (IsGetBlockDone) {
DSP_LOG_ERROR_S("BSD19", "Handle TEvGetResult Die. status# "
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
SendToQueue(std::move(query), 0);
++RequestsInFlight;
}
} else {
GetBlockFinished = true;
}

// initial kick for workers -- send messages to corresponding VDisks
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_counters_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Y_UNIT_TEST(PutGeneratedSubrequestBytes) {
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, erasure, 1, 0);
TTestState testState(runtime, erasure, env.Info);
TTestState testState(runtime, env.Info);

TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticDefault;
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog;
Expand Down Expand Up @@ -92,7 +92,7 @@ Y_UNIT_TEST(MultiPutGeneratedSubrequestBytes) {
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, erasure, 1, 0);
TTestState testState(runtime, erasure, env.Info);
TTestState testState(runtime, env.Info);

TEvBlobStorage::TEvPut::ETactic tactic = TEvBlobStorage::TEvPut::TacticDefault;
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog;
Expand Down
108 changes: 98 additions & 10 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_discover_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Y_UNIT_TEST(Block42Success) {
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, type, env.Info);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
Expand All @@ -37,6 +37,7 @@ Y_UNIT_TEST(Block42Success) {
testState.HandleVGetsWithMock(8); // get the blob

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId2);
}

Expand All @@ -46,15 +47,14 @@ Y_UNIT_TEST(Block42SuccessLastBlobMissingParts) {
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, type, env.Info);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
TLogoBlobID blobId2(tabletId, 1, 2, 0, testState.BlobSize, 0);

TGroupMock& groupMock = testState.GetGroupMock();
groupMock.Put(blobId1, testState.BlobData);
// FIXIT: if parts are 1 2 3 4 then 5 and 6 part are restored twice
THashSet<ui32> selectedParts{3, 4, 5, 6};
groupMock.Put(blobId2, testState.BlobData, 0, &selectedParts);

Expand All @@ -67,6 +67,7 @@ Y_UNIT_TEST(Block42SuccessLastBlobMissingParts) {
testState.HandleVPutsWithMock(2); // restore puts

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId2);
}

Expand All @@ -76,7 +77,7 @@ Y_UNIT_TEST(Block42SuccessLastBlobNotFullyWritten) {
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, type, env.Info);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
Expand All @@ -95,18 +96,17 @@ Y_UNIT_TEST(Block42SuccessLastBlobNotFullyWritten) {
testState.HandleVGetsWithMock(8);

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId1);
}

// FIXIT: this test crashes in DSProxy
/*
Y_UNIT_TEST(Block42ProxyCrashWhileGettingTheBlob) {
Y_UNIT_TEST(Block42ErrorWhenBlobIsLostAfterDiscover) {
TBlobStorageGroupType type(TErasureType::Erasure4Plus2Block);
TTestBasicRuntime runtime;
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, type, env.Info);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
Expand All @@ -126,9 +126,97 @@ Y_UNIT_TEST(Block42ProxyCrashWhileGettingTheBlob) {
testState.HandleVGetsWithMock(8);

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId1);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::ERROR);
}

Y_UNIT_TEST(Mirror3dcSuccess) {
TBlobStorageGroupType type(TErasureType::ErasureMirror3dc);
TTestBasicRuntime runtime;
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
TLogoBlobID blobId2(tabletId, 1, 2, 0, testState.BlobSize, 0);

TGroupMock& groupMock = testState.GetGroupMock();
groupMock.Put(blobId1, testState.BlobData);
groupMock.Put(blobId2, testState.BlobData);

auto discover = std::make_unique<TEvBlobStorage::TEvDiscover>(
tabletId, 1, true, false, TInstant::Max(), 0, true);
runtime.Send(new IEventHandle(env.RealProxyActorId, testState.EdgeActor, discover.release()), 0, true);

testState.HandleVGetsWithMock(9);
testState.HandleVGetsWithMock(9);
testState.HandleVPutsWithMock(1);

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId2);
}

Y_UNIT_TEST(Mirror3dcSuccessLastBlobMissingParts) {
TBlobStorageGroupType type(TErasureType::ErasureMirror3dc);
TTestBasicRuntime runtime;
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
TLogoBlobID blobId2(tabletId, 1, 2, 0, testState.BlobSize, 0);

TGroupMock& groupMock = testState.GetGroupMock();
groupMock.Put(blobId1, testState.BlobData);
THashSet<ui32> selectedParts{1};
groupMock.Put(blobId2, testState.BlobData, 0, &selectedParts);

auto discover = std::make_unique<TEvBlobStorage::TEvDiscover>(
tabletId, 1, true, false, TInstant::Max(), 0, true);
runtime.Send(new IEventHandle(env.RealProxyActorId, testState.EdgeActor, discover.release()), 0, true);

testState.HandleVGetsWithMock(9);
testState.HandleVGetsWithMock(9);
testState.HandleVPutsWithMock(3);

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Id, blobId2);
}

Y_UNIT_TEST(Mirror3dcErrorWhenBlobIsLostAfterDiscover) {
TBlobStorageGroupType type(TErasureType::ErasureMirror3dc);
TTestBasicRuntime runtime;
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0, TBlobStorageGroupInfo::EEM_NONE);
TTestState testState(runtime, env.Info);

const ui64 tabletId = 72075186224047637;
TLogoBlobID blobId1(tabletId, 1, 1, 0, testState.BlobSize, 0);
TLogoBlobID blobId2(tabletId, 1, 2, 0, testState.BlobSize, 0);

TGroupMock& groupMock = testState.GetGroupMock();
groupMock.Put(blobId1, testState.BlobData);
groupMock.Put(blobId2, testState.BlobData);

auto discover = std::make_unique<TEvBlobStorage::TEvDiscover>(
tabletId, 1, true, false, TInstant::Max(), 0, true);
runtime.Send(new IEventHandle(env.RealProxyActorId, testState.EdgeActor, discover.release()), 0, true);

testState.HandleVGetsWithMock(9);
testState.GroupMock.SetError(TVDiskID(0, 1, 0, 0, 0), NKikimrProto::ERROR);
testState.GroupMock.SetError(TVDiskID(0, 1, 1, 0, 0), NKikimrProto::NOT_YET);
testState.GroupMock.SetError(TVDiskID(0, 1, 2, 0, 0), NKikimrProto::NOT_YET);
testState.HandleVGetsWithMock(9);

TEvBlobStorage::TEvDiscoverResult::TPtr ev = testState.GrabEventPtr<TEvBlobStorage::TEvDiscoverResult>();
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrProto::ERROR);
}
*/

} // TDSProxyDiscover

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies)
TBlobStorageGroupType groupType(erasureSpecies);
const ui32 domainCount = groupType.BlobSubgroupSize();;

TGroupMock group(groupId, erasureSpecies, 1, domainCount, 1);
TGroupMock group(groupId, erasureSpecies, domainCount, 1, 1);
TIntrusivePtr<TGroupQueues> groupQueues = group.MakeGroupQueues();

TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(new ::NMonitoring::TDynamicCounters());
Expand Down Expand Up @@ -393,7 +393,7 @@ void TestPutResultWithVDiskResults(TBlobStorageGroupType type, TMap<TVDiskID, NK
SetupRuntime(runtime);
TDSProxyEnv env;
env.Configure(runtime, type, 0, 0);
TTestState testState(runtime, type, env.Info);
TTestState testState(runtime, env.Info);

TLogoBlobID blobId(72075186224047637, 1, 863, 1, 786, 24576);
TStringBuilder dataBuilder;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ Y_UNIT_TEST(TestBlock42PutWithChangingSlowDisk) {
TBlobStorageGroupType type = {TErasureType::Erasure4Plus2Block};
TTestBasicRuntime runtime(1, false);
Setup(runtime, type);
TTestState testState(runtime, type, DSProxyEnv.Info);
TTestState testState(runtime, DSProxyEnv.Info);

TLogoBlobID blobId(72075186224047637, 1, 863, 1, 786, 24576);

Expand Down Expand Up @@ -500,7 +500,7 @@ Y_UNIT_TEST(TestBlock42PutWithChangingSlowDisk) {
void MakeTestMultiPutItemStatuses(TTestBasicRuntime &runtime, const TBlobStorageGroupType &type,
const TBatchedVec<NKikimrProto::EReplyStatus> &statuses) {
TString data("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
TTestState testState(runtime, type, DSProxyEnv.Info);
TTestState testState(runtime, DSProxyEnv.Info);

TVector<TLogoBlobID> blobIds = {
TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576),
Expand Down Expand Up @@ -599,7 +599,7 @@ Y_UNIT_TEST(TestGivenBlock42GroupGenerationGreaterThanVDiskGenerations) {
Setup(runtime, type);

TString data("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
TTestState testState(runtime, type, DSProxyEnv.Info);
TTestState testState(runtime, DSProxyEnv.Info);

TVector<TLogoBlobID> blobIds = {
TLogoBlobID(72075186224047637, 1, 863, 1, 786, 24576),
Expand Down Expand Up @@ -649,7 +649,7 @@ Y_UNIT_TEST(TestGivenMirror3DCGetWithFirstSlowDisk) {

TLogoBlobID blobId = TLogoBlobID(72075186224047637, 1, 2194, 1, 142, 12288);

TTestState testState(runtime, type, DSProxyEnv.Info);
TTestState testState(runtime, DSProxyEnv.Info);

TEvBlobStorage::TEvGet::TPtr ev = testState.CreateGetRequest({blobId}, false);
TActorId getActorId = runtime.Register(DSProxyEnv.CreateGetRequestActor(ev, NKikimrBlobStorage::TabletLog).release());
Expand Down
25 changes: 5 additions & 20 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,17 @@ namespace NKikimr {
struct TTestState {
TTestActorRuntime &Runtime;
TActorId EdgeActor;
TBlobStorageGroupType Type;
TGroupMock GroupMock;
TIntrusivePtr<TBlobStorageGroupInfo> Info;
TGroupMock GroupMock;

const ui32 BlobSize;
TString BlobData;

TTestState(TTestActorRuntime &runtime, const TBlobStorageGroupType &type,
TIntrusivePtr<TBlobStorageGroupInfo> &info, ui64 nodeIndex = 0, ui32 blobSize = 1024)
TTestState(TTestActorRuntime &runtime, TIntrusivePtr<TBlobStorageGroupInfo> &info, ui64 nodeIndex = 0, ui32 blobSize = 1024)
: Runtime(runtime)
, EdgeActor(runtime.AllocateEdgeActor(nodeIndex))
, Type(type)
, GroupMock(0, Type.GetErasure(), Type.BlobSubgroupSize(), 1, info)
, Info(info)
, BlobSize(blobSize)
{
FillBlobData();
}

TTestState(TTestActorRuntime &runtime, const TBlobStorageGroupType &type, ui64 nodeIndex = 0,
ui32 failRealms = 1, ui32 blobSize = 1024)
: Runtime(runtime)
, EdgeActor(runtime.AllocateEdgeActor(nodeIndex))
, Type(type)
, GroupMock(0, Type.GetErasure(), failRealms, Type.BlobSubgroupSize(), 1)
, Info(GroupMock.GetInfo())
, GroupMock(0, info)
, BlobSize(blobSize)
{
FillBlobData();
Expand All @@ -58,9 +43,9 @@ struct TTestState {

TPartLocation HandoffVDiskForBlobPart(TLogoBlobID blobId, ui64 handoffIdx) {
Y_ABORT_UNLESS(blobId.PartId());
Y_ABORT_UNLESS(handoffIdx < Type.Handoff());
Y_ABORT_UNLESS(handoffIdx < Info->Type.Handoff());
TLogoBlobID origBlobId(blobId, 0);
TVDiskID vDiskId = Info->GetVDiskInSubgroup(Type.TotalPartCount() + handoffIdx, origBlobId.Hash());
TVDiskID vDiskId = Info->GetVDiskInSubgroup(Info->Type.TotalPartCount() + handoffIdx, origBlobId.Hash());
return {blobId, vDiskId};
}

Expand Down
Loading
Loading