Skip to content

Commit abeada2

Browse files
authored
Tpcc cli and import cleanup (#17333) (#19771)
1 parent 3335cfb commit abeada2

File tree

5 files changed

+200
-103
lines changed

5 files changed

+200
-103
lines changed

ydb/library/workload/tpcc/constants.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,7 @@ enum class ETransactionType {
8585

8686
constexpr const size_t TUI_LOG_LINES = 10;
8787

88+
// lower limit, real number is higher
89+
constexpr const size_t WAREHOUSES_PER_CPU_CORE = 1500;
90+
8891
} // namespace NYdb::NTPCC

ydb/library/workload/tpcc/import.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,12 @@ class TPCCLoader {
792792
}
793793

794794
void ImportSync() {
795-
Config.SetDisplayUpdateInterval();
795+
if (Config.WarehouseCount == 0) {
796+
std::cerr << "Specified zero warehouses" << std::endl;
797+
std::exit(1);
798+
}
799+
800+
Config.SetDisplay();
796801
CalculateApproximateDataSize();
797802

798803
// we want to switch buffers and draw UI ASAP to properly display logs
@@ -801,12 +806,19 @@ class TPCCLoader {
801806
UpdateDisplayIfNeeded(Clock::now());
802807
}
803808

809+
// TODO: detect number of threads
810+
if (Config.LoadThreadCount == 0) {
811+
LOG_W("Automatic calculation of loading threads is not implemented, falling back to the default");
812+
Config.LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT;
813+
}
814+
804815
// in particular this log message
805816
LOG_I("Starting TPC-C data import for " << Config.WarehouseCount << " warehouses using " <<
806-
Config.ThreadCount << " threads. Approximate data size: " << GetFormattedSize(LoadState.ApproximateDataSize));
817+
Config.LoadThreadCount << " threads. Approximate data size: "
818+
<< GetFormattedSize(LoadState.ApproximateDataSize));
807819

808820
// TODO: detect number of threads
809-
size_t threadCount = std::min(Config.WarehouseCount, Config.ThreadCount );
821+
size_t threadCount = std::min(Config.WarehouseCount, Config.LoadThreadCount);
810822
threadCount = std::max(threadCount, size_t(1));
811823

812824
// TODO: calculate optimal number of drivers (but per thread looks good)
@@ -1071,7 +1083,7 @@ class TPCCLoader {
10711083

10721084
std::stringstream headerSs;
10731085
headerSs << "TPC-C Import: " << Config.WarehouseCount << " warehouses, "
1074-
<< Config.ThreadCount << " threads Estimated size: "
1086+
<< Config.LoadThreadCount << " threads Estimated size: "
10751087
<< GetFormattedSize(LoadState.ApproximateDataSize);
10761088

10771089
std::stringstream progressSs;
@@ -1101,21 +1113,19 @@ class TPCCLoader {
11011113
text(speedSs.str())
11021114
});
11031115

1104-
auto topRow = hbox({
1105-
importDetails | flex,
1106-
separator()
1107-
});
1116+
auto topRow = window(text("TPC-C data upload"), hbox({
1117+
importDetails
1118+
}));
11081119

11091120
// Index progress section (always shown)
11101121

11111122
Elements indexElements;
11121123
TString indexText;
11131124
if (LoadState.IndexBuildStates.empty()) {
1114-
indexText = "Index Creation Progress didn't start";
1125+
indexText = "Index Creation Didn't Start";
11151126
} else {
1116-
indexText = "Index Creation Progress:";
1127+
indexText = "Index Creation";
11171128
}
1118-
indexElements.push_back(text(indexText));
11191129

11201130
if (LoadState.IndexBuildStates.empty()) {
11211131
// Index building not started yet, need to leave enough space
@@ -1152,11 +1162,13 @@ class TPCCLoader {
11521162
}
11531163
}
11541164

1165+
auto indicesRow = window(text(indexText), vbox(indexElements));
1166+
11551167
// Create scrollable logs panel
11561168

11571169
Elements logElements;
11581170
LogBackend->GetLogLines([&](const std::string& line) {
1159-
logElements.push_back(text(line));
1171+
logElements.push_back(paragraph(line));
11601172
});
11611173

11621174
auto logsContent = vbox(logElements);
@@ -1166,9 +1178,7 @@ class TPCCLoader {
11661178

11671179
auto layout = vbox({
11681180
topRow,
1169-
separator(),
1170-
vbox(indexElements),
1171-
separator(),
1181+
indicesRow,
11721182
logsPanel | flex
11731183
});
11741184

ydb/library/workload/tpcc/runner.cpp

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "transactions.h"
99

1010
#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
11+
#include <ydb/public/lib/ydb_cli/common/interactive.h>
1112
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>
1213

1314
#include <library/cpp/logger/log.h>
@@ -216,14 +217,27 @@ TPCCRunner::TPCCRunner(const NConsoleClient::TClientCommand::TConfig& connection
216217
std::exit(1);
217218
}
218219

219-
const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig);
220-
const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1;
220+
if (Config.WarehouseCount == 0) {
221+
std::cerr << "Specified zero warehouses" << std::endl;
222+
std::exit(1);
223+
}
221224

222225
const size_t terminalsCount = Config.WarehouseCount * TERMINALS_PER_WAREHOUSE;
223226

224-
// we might consider using less than maxTerminalThreads
225-
const size_t threadCount = Config.ThreadCount == 0 ?
226-
std::min(maxTerminalThreadCount, terminalsCount) : Config.ThreadCount;
227+
size_t threadCount = 0;
228+
if (Config.ThreadCount == 0) {
229+
// here we calculate max possible efficient thread number
230+
const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig);
231+
const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1;
232+
threadCount = std::min(maxTerminalThreadCount, terminalsCount);
233+
234+
// usually this allows to lower number of threads
235+
const size_t recommendedThreadCount =
236+
(Config.WarehouseCount + WAREHOUSES_PER_CPU_CORE - 1) / WAREHOUSES_PER_CPU_CORE;
237+
threadCount = std::min(threadCount, recommendedThreadCount);
238+
} else {
239+
threadCount = Config.ThreadCount;
240+
}
227241

228242
// The number of terminals might be hundreds of thousands.
229243
// For now, we don't have more than 32 network threads (check TYdbCommand::GetNetworkThreadNum()),
@@ -351,7 +365,7 @@ void TPCCRunner::Join() {
351365
}
352366

353367
void TPCCRunner::RunSync() {
354-
Config.SetDisplayUpdateInterval();
368+
Config.SetDisplay();
355369

356370
Clock::time_point now = Clock::now();
357371

@@ -363,14 +377,14 @@ void TPCCRunner::RunSync() {
363377
// We don't want to start all terminals at the same time, because then there will be
364378
// a huge queue of ready terminals, which we can't handle
365379
bool forcedWarmup = false;
366-
int minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1;
367-
int minWarmupMinutes = (minWarmupSeconds + 59) / 60;
368-
int warmupMinutes;
369-
if (Config.WarmupMinutes < minWarmupMinutes) {
380+
uint32_t minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1;
381+
uint32_t minWarmupMinutes = (minWarmupSeconds + 59) / 60;
382+
uint32_t warmupMinutes;
383+
if (Config.WarmupDuration.Minutes() < minWarmupMinutes) {
370384
forcedWarmup = true; // we must print log message later after display update
371385
warmupMinutes = minWarmupMinutes;
372386
} else {
373-
warmupMinutes = Config.WarmupMinutes;
387+
warmupMinutes = Config.WarmupDuration.Minutes();
374388
}
375389

376390
WarmupStartTs = Clock::now();
@@ -411,14 +425,14 @@ void TPCCRunner::RunSync() {
411425

412426
StopWarmup.store(true, std::memory_order_relaxed);
413427

414-
LOG_I("Measuring during " << Config.RunMinutes << " minutes");
428+
LOG_I("Measuring during " << Config.RunDuration);
415429

416430
MeasurementsStartTs = Clock::now();
417431

418432
// reset statistics
419433
LastStatisticsSnapshot = std::make_unique<TAllStatistics>(PerThreadTerminalStats.size(), MeasurementsStartTs);
420434

421-
StopDeadline = MeasurementsStartTs + std::chrono::minutes(Config.RunMinutes);
435+
StopDeadline = MeasurementsStartTs + std::chrono::seconds(Config.RunDuration.Seconds());
422436
while (!GetGlobalInterruptSource().stop_requested()) {
423437
if (now >= StopDeadline) {
424438
break;
@@ -469,10 +483,12 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
469483
ss << std::endl << "Efficiency: " << std::setprecision(1) << data.Efficiency << "% | "
470484
<< "tpmC: " << std::setprecision(1) << data.Tpmc;
471485

472-
std::cout << ss.str();
486+
LOG_I(ss.str());
473487

474488
// Per thread statistics (two columns)
475-
std::cout << "\nPer thread statistics:" << std::endl;
489+
490+
std::stringstream debugSs;
491+
debugSs << "\nPer thread statistics:" << std::endl;
476492

477493
size_t threadCount = LastStatisticsSnapshot->StatVec.size();
478494
size_t halfCount = (threadCount + 1) / 2;
@@ -487,10 +503,10 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
487503
<< std::setw(15) << "queue p90, ms";
488504

489505
// Print headers side by side
490-
std::cout << threadsHeader.str() << " | " << threadsHeader.str() << std::endl;
506+
debugSs << threadsHeader.str() << " | " << threadsHeader.str() << std::endl;
491507

492508
size_t totalWidth = threadsHeader.str().length() * 2 + 3;
493-
std::cout << std::string(totalWidth, '-') << std::endl;
509+
debugSs << std::string(totalWidth, '-') << std::endl;
494510

495511
// Print thread data in two columns
496512
for (size_t i = 0; i < halfCount; ++i) {
@@ -525,13 +541,15 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
525541
rightLine << std::string(threadsHeader.str().length(), ' ');
526542
}
527543

528-
std::cout << leftLine.str() << " | " << rightLine.str() << std::endl;
544+
debugSs << leftLine.str() << " | " << rightLine.str() << std::endl;
529545
}
530-
std::cout << std::string(totalWidth, '-') << std::endl;
546+
debugSs << std::string(totalWidth, '-') << std::endl;
531547

532548
// Transaction statistics
533-
std::cout << "\n\n";
534-
PrintTransactionStatisticsPretty(std::cout);
549+
debugSs << "\n";
550+
PrintTransactionStatisticsPretty(debugSs);
551+
552+
LOG_D(debugSs.str());
535553
}
536554

537555
void TPCCRunner::UpdateDisplayTuiMode(const TCalculatedStatusData& data) {
@@ -903,6 +921,31 @@ void TPCCRunner::PrintFinalResultPretty() {
903921

904922
//-----------------------------------------------------------------------------
905923

924+
void TRunConfig::SetDisplay() {
925+
if (NoTui) {
926+
DisplayMode = EDisplayMode::Text;
927+
} else {
928+
if (NConsoleClient::IsStdoutInteractive()) {
929+
DisplayMode = EDisplayMode::Tui;
930+
} else {
931+
DisplayMode = EDisplayMode::Text;
932+
}
933+
}
934+
935+
switch (DisplayMode) {
936+
case EDisplayMode::None:
937+
return;
938+
case EDisplayMode::Text:
939+
DisplayUpdateInterval = DisplayUpdateTextInterval;
940+
return;
941+
case EDisplayMode::Tui:
942+
DisplayUpdateInterval = DisplayUpdateTuiInterval;
943+
return;
944+
}
945+
}
946+
947+
//-----------------------------------------------------------------------------
948+
906949
void RunSync(const NConsoleClient::TClientCommand::TConfig& connectionConfig, const TRunConfig& runConfig) {
907950
TPCCRunner runner(connectionConfig, runConfig);
908951
runner.RunSync();

ydb/library/workload/tpcc/runner.h

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@
44

55
#include <library/cpp/logger/priority.h>
66

7+
#include <util/datetime/base.h>
8+
79
#include <stop_token>
810

911
namespace NYdb::NTPCC {
1012

1113
constexpr int DEFAULT_WAREHOUSE_COUNT = 1;
12-
constexpr int DEFAULT_WARMUP_MINUTES = 1; // TODO
13-
constexpr int DEFAULT_RUN_MINUTES = 2; // TODO
14+
constexpr TDuration DEFAULT_WARMUP_DURATION = TDuration::Minutes(1); // TODO
15+
constexpr TDuration DEFAULT_RUN_DURATION = TDuration::Minutes(2); // TODO
16+
1417
constexpr int DEFAULT_MAX_SESSIONS = 100; // TODO
18+
constexpr int DEFAULT_THREAD_COUNT = 0; // autodetect
19+
constexpr int DEFAULT_LOAD_THREAD_COUNT = 10;
20+
1521
constexpr int DEFAULT_LOG_LEVEL = 6; // TODO: properly use enum
1622

1723
struct TRunConfig {
@@ -21,6 +27,11 @@ struct TRunConfig {
2127
Tui,
2228
};
2329

30+
enum class EFormat {
31+
Pretty = 0,
32+
Json,
33+
};
34+
2435
TRunConfig() = default;
2536
void SetFullPath(const NConsoleClient::TClientCommand::TConfig& connectionConfig) {
2637
if (Path.empty()) {
@@ -35,37 +46,29 @@ struct TRunConfig {
3546
Path = connectionConfig.Database + '/' + Path;
3647
}
3748

38-
void SetDisplayUpdateInterval() {
39-
switch (DisplayMode) {
40-
case EDisplayMode::None:
41-
return;
42-
case EDisplayMode::Text:
43-
DisplayUpdateInterval = DisplayUpdateTextInterval;
44-
return;
45-
case EDisplayMode::Tui:
46-
DisplayUpdateInterval = DisplayUpdateTuiInterval;
47-
return;
48-
break;
49-
}
50-
}
49+
void SetDisplay();
5150

5251
int WarehouseCount = DEFAULT_WAREHOUSE_COUNT;
53-
int WarmupMinutes = DEFAULT_WARMUP_MINUTES;
54-
int RunMinutes = DEFAULT_RUN_MINUTES;
52+
TDuration WarmupDuration = DEFAULT_WARMUP_DURATION;
53+
TDuration RunDuration = DEFAULT_RUN_DURATION;
5554

5655
int MaxInflight = DEFAULT_MAX_SESSIONS;
5756

5857
TString Path;
5958

59+
EFormat Format = EFormat::Pretty;
60+
6061
TString JsonResultPath;
6162

6263
// advanced settings (normally, used by developer only)
6364

64-
int ThreadCount = 0;
65+
int ThreadCount = DEFAULT_THREAD_COUNT;
66+
int LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT;
6567
int DriverCount = 0;
6668
ELogPriority LogPriority = static_cast<ELogPriority>(DEFAULT_LOG_LEVEL);
6769
bool NoDelays = false;
6870
bool ExtendedStats = false;
71+
bool NoTui = false;
6972
EDisplayMode DisplayMode = EDisplayMode::None;
7073

7174
// instead of actual transaction just async sleep and return SUCCESS

0 commit comments

Comments
 (0)