Skip to content

Commit

Permalink
Ydb stable 22-5-10
Browse files Browse the repository at this point in the history
x-stable-origin-commit: f696baac1a4b8d48eb52b52b35930eef6d0eab42
  • Loading branch information
dcherednik committed Feb 9, 2023
1 parent 9b78acb commit b0967c3
Show file tree
Hide file tree
Showing 304 changed files with 11,843 additions and 3,143 deletions.
1 change: 1 addition & 0 deletions library/cpp/actors/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ target_sources(cpp-actors-core PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_io.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_pool_united.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/executor_thread.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/harmonizer.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/interconnect.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/io_dispatcher.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/log.cpp
Expand Down
6 changes: 5 additions & 1 deletion library/cpp/actors/core/actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,12 @@ Y_UNIT_TEST_SUITE(TestDecorator) {
setup->NodeId = 0;
setup->ExecutorsCount = 1;
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);

ui64 ts = GetCycleCountFast();
THolder<IHarmonizer> harmonizer(MakeHarmonizer(ts));
for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic");
setup->Executors[i] = new TBasicExecutorPool(i, 1, 10, "basic", harmonizer.Get());
harmonizer->AddPool(setup->Executors[i].Get());
}
setup->Scheduler = new TBasicSchedulerThread;

Expand Down
45 changes: 45 additions & 0 deletions library/cpp/actors/core/actorsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,55 @@ namespace NActors {
return 1;
}

virtual i16 GetPriority() const {
return 0;
}

// generic
virtual TAffinity* Affinity() const = 0;

virtual void SetRealTimeMode() const {}

virtual ui32 GetThreadCount() const {
return 1;
};

virtual void SetThreadCount(ui32 threads) {
Y_UNUSED(threads);
}

virtual i16 GetBlockingThreadCount() const {
return 0;
}

virtual i16 GetDefaultThreadCount() const {
return 1;
}

virtual i16 GetMinThreadCount() const {
return 1;
}

virtual i16 GetMaxThreadCount() const {
return 1;

}

virtual bool IsThreadBeingStopped(i16 threadIdx) const {
Y_UNUSED(threadIdx);
return false;
}

virtual double GetThreadConsumedUs(i16 threadIdx) {
Y_UNUSED(threadIdx);
return 0.0;
}

virtual double GetThreadBookedUs(i16 threadIdx) {
Y_UNUSED(threadIdx);
return 0.0;
}

};

// could be proxy to in-pool schedulers (for NUMA-aware executors)
Expand Down
26 changes: 18 additions & 8 deletions library/cpp/actors/core/balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

#include "probes.h"

#include <library/cpp/actors/util/intrinsics.h>
#include <library/cpp/actors/util/cpu_load_log.h>
#include <library/cpp/actors/util/datetime.h>
#include <library/cpp/actors/util/intrinsics.h>

#include <util/system/spinlock.h>

Expand All @@ -27,11 +28,11 @@ namespace NActors {

TLevel() {}

TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) {
TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
ScaleFactor = double(currentCpus) / cfg.Cpus;
if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu
if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
LoadClass = Underloaded;
} else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency
} else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
LoadClass = Overloaded;
} else {
LoadClass = Moderate;
Expand Down Expand Up @@ -82,6 +83,8 @@ namespace NActors {
TBalancerConfig Config;

public:

ui64 GetPeriodUs() override;
// Setup
TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
Expand Down Expand Up @@ -238,9 +241,12 @@ namespace NActors {
}

// Compute levels
pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle);
pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized
pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1);
pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);

// Prepare for balancing
pool.PrevCpus = pool.CurrentCpus;
Expand All @@ -263,7 +269,7 @@ namespace NActors {
TPool& from = **fromIter;
if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
{
MoveCpu(from, to);
from.CurrentCpus--;
Expand Down Expand Up @@ -295,6 +301,10 @@ namespace NActors {
Lock.Release();
}

ui64 TBalancer::GetPeriodUs() {
return Config.PeriodUs;
}

IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
return new TBalancer(config, unitedPools, ts);
}
Expand Down
3 changes: 3 additions & 0 deletions library/cpp/actors/core/balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace NActors {
ui64 Ts = 0; // Measurement timestamp
ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start
ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex
ui64 WorstActivationTimeUs = 0;
ui64 ExpectedLatencyIncreaseUs = 0;
};

// Pool cpu balancer
Expand All @@ -20,6 +22,7 @@ namespace NActors {
virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0;
virtual void Balance() = 0;
virtual void Unlock() = 0;
virtual ui64 GetPeriodUs() = 0;
// TODO: add method for reconfiguration on fly
};

Expand Down
11 changes: 11 additions & 0 deletions library/cpp/actors/core/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ namespace NActors {
ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
int RealtimePriority = 0;
ui32 MaxActivityType = 5;
i16 MinThreadCount = 0;
i16 MaxThreadCount = 0;
i16 DefaultThreadCount = 0;
i16 Priority = 0;
};

struct TIOExecutorPoolConfig {
Expand Down Expand Up @@ -88,11 +92,18 @@ namespace NActors {
TBalancerConfig Balancer;
};

struct TSelfPingInfo {
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
ui32 MaxAvgPingUs;
};

struct TCpuManagerConfig {
TUnitedWorkersConfig UnitedWorkers;
TVector<TBasicExecutorPoolConfig> Basic;
TVector<TIOExecutorPoolConfig> IO;
TVector<TUnitedExecutorPoolConfig> United;
TVector<TSelfPingInfo> PingInfoByPool;

ui32 GetExecutorsCount() const {
return Basic.size() + IO.size() + United.size();
Expand Down
10 changes: 9 additions & 1 deletion library/cpp/actors/core/cpu_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ namespace NActors {
UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
}

ui64 ts = GetCycleCountFast();
Harmonizer.Reset(MakeHarmonizer(ts));

Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);

for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
Executors[excIdx].Reset(CreateExecutorPool(excIdx));
if (excIdx < Config.PingInfoByPool.size()) {
Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
} else {
Harmonizer->AddPool(Executors[excIdx].Get());
}
}
}

Expand Down Expand Up @@ -89,7 +97,7 @@ namespace NActors {
IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
if (cfg.PoolId == poolId) {
return new TBasicExecutorPool(cfg);
return new TBasicExecutorPool(cfg, Harmonizer.Get());
}
}
for (TIOExecutorPoolConfig& cfg : Config.IO) {
Expand Down
2 changes: 2 additions & 0 deletions library/cpp/actors/core/cpu_manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "actorsystem.h"
#include "harmonizer.h"
#include "executor_pool_basic.h"
#include "executor_pool_io.h"
#include "executor_pool_united.h"
Expand All @@ -11,6 +12,7 @@ namespace NActors {
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
THolder<TUnitedWorkers> UnitedWorkers;
THolder<IBalancer> Balancer;
THolder<IHarmonizer> Harmonizer;
TCpuManagerConfig Config;
public:
explicit TCpuManager(THolder<TActorSystemSetup>& setup)
Expand Down
Loading

0 comments on commit b0967c3

Please sign in to comment.