Skip to content

Commit

Permalink
Ydb stable 23-4-11
Browse files Browse the repository at this point in the history
x-stable-origin-commit: 9cc0616f07997e08c92266a30796bc4ff25aea66
  • Loading branch information
Gazizonoki committed Apr 17, 2024
1 parent 31a6b06 commit daa563b
Show file tree
Hide file tree
Showing 37 changed files with 693 additions and 382 deletions.
87 changes: 29 additions & 58 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -779,64 +779,35 @@ struct TEvBlobStorage {
EvBunchOfEvents,

// blobstorage controller interface
// EvControllerReadSchemeString = EvPut + 11 * 512,
// EvControllerReadDataString,
EvControllerRegisterNode = EvPut + 11 * 512 + 2,
EvControllerCreatePDisk,
EvControllerCreateVDiskSlots,
EvControllerCreateGroup,
EvControllerSelectGroups,
EvControllerGetGroup,
EvControllerUpdateDiskStatus,
EvControllerUpdateGroupsUsage, // Not used.
EvControllerConfigRequest,
EvControllerConfigResponse,
EvControllerProposeRequest,
EvControllerProposeResponse,
EvControllerVDiskStatusSubscribeRequest,
EvControllerVDiskStatusReport,
EvControllerGroupStatusRequest,
EvControllerGroupStatusResponse,
EvControllerUpdateGroup,
EvControllerUpdateFaultyDisks,
EvControllerProposeGroupKey,
EvControllerUpdateGroupLatencies, // Not used.
EvControllerUpdateGroupStat,
EvControllerNotifyGroupChange,
EvControllerCommitGroupLatencies,
EvControllerUpdateSelfHealInfo,
EvControllerScrubQueryStartQuantum,
EvControllerScrubQuantumFinished,
EvControllerScrubReportQuantumInProgress,
EvControllerUpdateNodeDrives,
EvControllerGroupDecommittedNotify,
EvControllerGroupDecommittedResponse,
EvControllerGroupMetricsExchange,

// EvControllerReadSchemeStringResult = EvPut + 12 * 512,
// EvControllerReadDataStringResult,
EvControllerNodeServiceSetUpdate = EvPut + 12 * 512 + 2,
EvControllerCreatePDiskResult,
EvControllerCreateVDiskSlotsResult,
EvControllerCreateGroupResult,
EvControllerSelectGroupsResult,
EvRequestControllerInfo,
EvResponseControllerInfo,
EvControllerGroupReconfigureReplace, // Not used.
EvControllerGroupReconfigureReplaceResult, // Not used.
EvControllerGroupReconfigureWipe,
EvControllerGroupReconfigureWipeResult,
EvControllerNodeReport,
EvControllerScrubStartQuantum,

EvControllerMigrationPause,
EvControllerMigrationContinue,
EvControllerMigrationFinished,
EvControllerMigrationBatch,
EvControllerMigrationBatchRequest,
EvControllerMigrationDone,

EvControllerUpdateSystemViews,
EvControllerRegisterNode = 0x10031602,
EvControllerSelectGroups = 0x10031606,
EvControllerGetGroup = 0x10031607,
EvControllerUpdateDiskStatus = 0x10031608,
EvControllerConfigRequest = 0x1003160a,
EvControllerConfigResponse = 0x1003160b,
EvControllerProposeGroupKey = 0x10031614,
EvControllerUpdateGroupStat = 0x10031616,
EvControllerNotifyGroupChange = 0x10031617,
EvControllerCommitGroupLatencies = 0x10031618,
EvControllerUpdateSelfHealInfo = 0x10031619,
EvControllerScrubQueryStartQuantum = 0x1003161a,
EvControllerScrubQuantumFinished = 0x1003161b,
EvControllerScrubReportQuantumInProgress = 0x1003161c,
EvControllerUpdateNodeDrives = 0x1003161d,
EvControllerGroupDecommittedNotify = 0x1003161e,
EvControllerGroupDecommittedResponse = 0x1003161f,
EvControllerGroupMetricsExchange = 0x10031620,

// BSC interface result section
EvControllerNodeServiceSetUpdate = 0x10031802,
EvControllerSelectGroupsResult = 0x10031806,
EvRequestControllerInfo = 0x10031807,
EvResponseControllerInfo = 0x10031808,
EvControllerGroupReconfigureWipe = 0x1003180b,
EvControllerGroupReconfigureWipeResult = 0x1003180c,
EvControllerNodeReport = 0x1003180d,
EvControllerScrubStartQuantum = 0x1003180e,
EvControllerUpdateSystemViews = 0x10031815,

// proxy - node controller interface
EvConfigureProxy = EvPut + 13 * 512,
Expand Down
12 changes: 7 additions & 5 deletions ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_blobmap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,14 @@ namespace NKikimr {
TMirror3dcMapper(const TBlobStorageGroupInfo::TTopology *topology)
: Topology(topology)
, NumFailRealms(Topology->FailRealms.size())
, NumFailDomainsPerFailRealm(Topology->FailRealms[0].FailDomains.size())
, NumVDisksPerFailDomain(Topology->FailRealms[0].FailDomains[0].VDisks.size())
, NumFailDomainsPerFailRealm(NumFailRealms ? Topology->FailRealms[0].FailDomains.size() : 0)
, NumVDisksPerFailDomain(NumFailDomainsPerFailRealm ? Topology->FailRealms[0].FailDomains[0].VDisks.size() : 0)
{
Y_VERIFY(NumFailRealms >= NumFailRealmsInSubgroup &&
NumFailDomainsPerFailRealm >= NumFailDomainsPerFailRealmInSubgroup,
"mirror-3-dc group tolopogy is invalid: %s", topology->ToString().data());
if (NumFailRealms && NumFailDomainsPerFailRealm && NumVDisksPerFailDomain) {
Y_VERIFY(NumFailRealms >= NumFailRealmsInSubgroup &&
NumFailDomainsPerFailRealm >= NumFailDomainsPerFailRealmInSubgroup,
"mirror-3-dc group tolopogy is invalid: %s", topology->ToString().data());
}
}

void PickSubgroup(ui32 hash, TBlobStorageGroupInfo::TOrderNums &orderNums) override final {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ Y_UNIT_TEST_SUITE(GroupReconfiguration) {
Timer.Reset();
auto ev = std::make_unique<TEvBlobStorage::TEvControllerGetGroup>();
ev->Record.AddGroupIDs(GroupId);
NTabletPipe::SendData(SelfId(), ClientId, ev.release(), 0);
NTabletPipe::SendData(SelfId(), ClientId, ev.release(), Max<ui64>());
}
}

Expand Down
19 changes: 11 additions & 8 deletions ydb/core/blobstorage/ut_blobstorage/lib/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -712,17 +712,20 @@ struct TEnvironmentSetup {
Sim(TDuration::Seconds(15));
}

void Wipe(ui32 nodeId, ui32 pdiskId, ui32 vslotId) {
const TActorId self = Runtime->AllocateEdgeActor(Settings.ControllerNodeId, __FILE__, __LINE__);
auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupReconfigureWipe>();
auto& record = ev->Record;
auto *vslot = record.MutableVSlotId();
void Wipe(ui32 nodeId, ui32 pdiskId, ui32 vslotId, const TVDiskID& vdiskId) {
NKikimrBlobStorage::TConfigRequest request;
request.SetIgnoreGroupFailModelChecks(true);
request.SetIgnoreDegradedGroupsChecks(true);
request.SetIgnoreDisintegratedGroupsChecks(true);
auto *cmd = request.AddCommand();
auto *wipe = cmd->MutableWipeVDisk();
auto *vslot = wipe->MutableVSlotId();
vslot->SetNodeId(nodeId);
vslot->SetPDiskId(pdiskId);
vslot->SetVSlotId(vslotId);
Runtime->SendToPipe(TabletId, self, ev.release(), 0, TTestActorSystem::GetPipeConfigWithRetries());
auto response = WaitForEdgeActorEvent<TEvBlobStorage::TEvControllerGroupReconfigureWipeResult>(self);
UNIT_ASSERT_VALUES_EQUAL(response->Get()->Record.GetStatus(), NKikimrProto::OK);
VDiskIDFromVDiskID(vdiskId, wipe->MutableVDiskId());
auto response = Invoke(request);
UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription());
}

void WaitForVDiskToGetRunning(const TVDiskID& vdiskId, TActorId actorId) {
Expand Down
20 changes: 2 additions & 18 deletions ydb/core/blobstorage/ut_blobstorage/mirror3of4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,10 @@ Y_UNIT_TEST_SUITE(Mirror3of4) {
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
if (i == 500) {
const TActorId self = runtime->AllocateEdgeActor(1);
auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupReconfigureWipe>();
ev->Record.MutableVSlotId()->SetNodeId(2);
ev->Record.MutableVSlotId()->SetPDiskId(1000);
ev->Record.MutableVSlotId()->SetVSlotId(1000);
runtime->SendToPipe(env.TabletId, self, ev.release(), 0, TTestActorSystem::GetPipeConfigWithRetries());
auto response = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvControllerGroupReconfigureWipeResult>(self);
auto& r = response->Get()->Record;
UNIT_ASSERT_EQUAL(r.GetStatus(), NKikimrProto::OK);
env.Wipe(2, 1000, 1000, TVDiskID(groupId, 1, 0, 1, 0));
}
if (i == 600) {
const TActorId self = runtime->AllocateEdgeActor(1);
auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupReconfigureWipe>();
ev->Record.MutableVSlotId()->SetNodeId(3);
ev->Record.MutableVSlotId()->SetPDiskId(1000);
ev->Record.MutableVSlotId()->SetVSlotId(1000);
runtime->SendToPipe(env.TabletId, self, ev.release(), 0, TTestActorSystem::GetPipeConfigWithRetries());
auto response = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvControllerGroupReconfigureWipeResult>(self);
auto& r = response->Get()->Record;
UNIT_ASSERT_EQUAL(r.GetStatus(), NKikimrProto::OK);
env.Wipe(3, 1000, 1000, TVDiskID(groupId, 1, 0, 2, 0));
}
}

Expand Down
11 changes: 2 additions & 9 deletions ydb/core/blobstorage/ut_blobstorage/osiris.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,8 @@ bool DoTestCase(TBlobStorageGroupType::EErasureSpecies erasure, const std::set<s
for (const auto& vslot : response.GetStatus(0).GetBaseConfig().GetVSlot()) {
const TVDiskID vdiskId(vslot.GetGroupId(), vslot.GetGroupGeneration(), vslot.GetFailRealmIdx(),
vslot.GetFailDomainIdx(), vslot.GetVDiskIdx());

const TActorId sender = env.Runtime->AllocateEdgeActor(1);
auto ev = std::make_unique<TEvBlobStorage::TEvControllerGroupReconfigureWipe>();
auto *slotId = ev->Record.MutableVSlotId();
slotId->CopyFrom(vslot.GetVSlotId());
env.Runtime->SendToPipe(env.TabletId, sender, ev.release(), 0, TTestActorSystem::GetPipeConfigWithRetries());
auto response = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvControllerGroupReconfigureWipeResult>(sender);
Y_VERIFY(response->Get()->Record.GetStatus() == NKikimrProto::OK);

const auto& v = vslot.GetVSlotId();
env.Wipe(v.GetNodeId(), v.GetPDiskId(), v.GetVSlotId(), vdiskId);
env.Sim(TDuration::Seconds(30));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ namespace NKikimr {
TEvBlobStorage::EvControllerUpdateDiskStatus,
TEvBlobStorage::EvControllerConfigRequest,
TEvBlobStorage::EvControllerConfigResponse,
TEvBlobStorage::EvControllerVDiskStatusSubscribeRequest,
TEvBlobStorage::EvControllerVDiskStatusReport,
TEvBlobStorage::EvControllerGroupStatusRequest,
TEvBlobStorage::EvControllerGroupStatusResponse,
TEvBlobStorage::EvControllerUpdateGroup,
TEvBlobStorage::EvControllerUpdateFaultyDisks,
TEvBlobStorage::EvControllerUpdateGroupStat,

TEvBlobStorage::EvControllerSelectGroupsResult,
Expand Down
25 changes: 13 additions & 12 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
return;
}

for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
auto& partitions = topicToPartitions.second;
for (auto& [topicName, partitions]: NewPartitionsToLockOnTime) {
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
if (partitionsIt->LockOn <= ctx.Now()) {
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
TopicPartitions[topicName].ToLock.emplace(partitionsIt->PartitionId);
NeedRebalance = true;
partitionsIt = partitions.erase(partitionsIt);
} else {
Expand Down Expand Up @@ -408,6 +407,8 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy
}

void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
NewPartitionsToLockOnTime.clear();

for (auto& [topicName, topicInfo] : TopicsInfo) {
if (topicInfo.TabletID == tabletId) {
auto partitionsIt = TopicPartitions.find(topicName);
Expand Down Expand Up @@ -579,8 +580,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
auto newPartitionsToLockCount = newPartitionsToLockIt == NewPartitionsToLockOnTime.end() ? 0 : newPartitionsToLockIt->second.size();

auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
Y_ABORT_UNLESS(topicPartitionsIt != TopicPartitions.end());
Y_ABORT_UNLESS(record.GetCount() <= topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size() + newPartitionsToLockCount);
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);

for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
Expand All @@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
}

NeedRebalance = true;
size_t partitionToReleaseIndex = 0;
size_t i = 0;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
partitionToReleaseIndex = partIndex;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
}

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev)
YQL_ENSURE(state->Generation == msg.GetGeneration());

if (state->State == EShardState::Starting) {
++TotalRetries;
if (TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
CA_LOG_E("TKqpScanFetcherActor: broken tablet for this request " << state->TabletId
<< ", retries limit exceeded (" << state->TotalRetries << "/" << TotalRetries << ")");
SendGlobalFail(NDqProto::COMPUTE_STATE_FAILURE, YdbStatusToDqStatus(status), issues);
return PassAway();
}

if (FindSchemeErrorInIssues(status, issues)) {
return EnqueueResolveShard(state);
}
Expand Down
32 changes: 17 additions & 15 deletions ydb/core/mind/bscontroller/bsc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,23 @@ void TBlobStorageController::TGroupInfo::CalculateGroupStatus() {
failedByPDisk |= {Topology.get(), slot->GetShortVDiskId()};
}
}
auto deriveStatus = [&](const auto& failed) {
auto& checker = *Topology->QuorumChecker;
if (!failed.GetNumSetItems()) { // all disks of group are operational
return NKikimrBlobStorage::TGroupStatus::FULL;
} else if (!checker.CheckFailModelForGroup(failed)) { // fail model exceeded
return NKikimrBlobStorage::TGroupStatus::DISINTEGRATED;
} else if (checker.IsDegraded(failed)) { // group degraded
return NKikimrBlobStorage::TGroupStatus::DEGRADED;
} else if (failed.GetNumSetItems()) { // group partially available, but not degraded
return NKikimrBlobStorage::TGroupStatus::PARTIAL;
} else {
Y_FAIL("unexpected case");
}
};
Status.MakeWorst(deriveStatus(failed), deriveStatus(failed | failedByPDisk));
Status.MakeWorst(DeriveStatus(Topology.get(), failed), DeriveStatus(Topology.get(), failed | failedByPDisk));
}
}

NKikimrBlobStorage::TGroupStatus::E TBlobStorageController::DeriveStatus(const TBlobStorageGroupInfo::TTopology *topology,
const TBlobStorageGroupInfo::TGroupVDisks& failed) {
auto& checker = *topology->QuorumChecker;
if (!failed.GetNumSetItems()) { // all disks of group are operational
return NKikimrBlobStorage::TGroupStatus::FULL;
} else if (!checker.CheckFailModelForGroup(failed)) { // fail model exceeded
return NKikimrBlobStorage::TGroupStatus::DISINTEGRATED;
} else if (checker.IsDegraded(failed)) { // group degraded
return NKikimrBlobStorage::TGroupStatus::DEGRADED;
} else if (failed.GetNumSetItems()) { // group partially available, but not degraded
return NKikimrBlobStorage::TGroupStatus::PARTIAL;
} else {
Y_FAIL("unexpected case");
}
}

Expand Down
Loading

0 comments on commit daa563b

Please sign in to comment.