Skip to content

Tpcc cli and import cleanup (#17333) #19771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/library/workload/tpcc/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ enum class ETransactionType {

constexpr const size_t TUI_LOG_LINES = 10;

// lower limit, real number is higher
constexpr const size_t WAREHOUSES_PER_CPU_CORE = 1500;

} // namespace NYdb::NTPCC
40 changes: 25 additions & 15 deletions ydb/library/workload/tpcc/import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,12 @@ class TPCCLoader {
}

void ImportSync() {
Config.SetDisplayUpdateInterval();
if (Config.WarehouseCount == 0) {
std::cerr << "Specified zero warehouses" << std::endl;
std::exit(1);
}

Config.SetDisplay();
CalculateApproximateDataSize();

// we want to switch buffers and draw UI ASAP to properly display logs
Expand All @@ -801,12 +806,19 @@ class TPCCLoader {
UpdateDisplayIfNeeded(Clock::now());
}

// TODO: detect number of threads
if (Config.LoadThreadCount == 0) {
LOG_W("Automatic calculation of loading threads is not implemented, falling back to the default");
Config.LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT;
}

// in particular this log message
LOG_I("Starting TPC-C data import for " << Config.WarehouseCount << " warehouses using " <<
Config.ThreadCount << " threads. Approximate data size: " << GetFormattedSize(LoadState.ApproximateDataSize));
Config.LoadThreadCount << " threads. Approximate data size: "
<< GetFormattedSize(LoadState.ApproximateDataSize));

// TODO: detect number of threads
size_t threadCount = std::min(Config.WarehouseCount, Config.ThreadCount );
size_t threadCount = std::min(Config.WarehouseCount, Config.LoadThreadCount);
threadCount = std::max(threadCount, size_t(1));

// TODO: calculate optimal number of drivers (but per thread looks good)
Expand Down Expand Up @@ -1071,7 +1083,7 @@ class TPCCLoader {

std::stringstream headerSs;
headerSs << "TPC-C Import: " << Config.WarehouseCount << " warehouses, "
<< Config.ThreadCount << " threads Estimated size: "
<< Config.LoadThreadCount << " threads Estimated size: "
<< GetFormattedSize(LoadState.ApproximateDataSize);

std::stringstream progressSs;
Expand Down Expand Up @@ -1101,21 +1113,19 @@ class TPCCLoader {
text(speedSs.str())
});

auto topRow = hbox({
importDetails | flex,
separator()
});
auto topRow = window(text("TPC-C data upload"), hbox({
importDetails
}));

// Index progress section (always shown)

Elements indexElements;
TString indexText;
if (LoadState.IndexBuildStates.empty()) {
indexText = "Index Creation Progress didn't start";
indexText = "Index Creation Didn't Start";
} else {
indexText = "Index Creation Progress:";
indexText = "Index Creation";
}
indexElements.push_back(text(indexText));

if (LoadState.IndexBuildStates.empty()) {
// Index building not started yet, need to leave enough space
Expand Down Expand Up @@ -1152,11 +1162,13 @@ class TPCCLoader {
}
}

auto indicesRow = window(text(indexText), vbox(indexElements));

// Create scrollable logs panel

Elements logElements;
LogBackend->GetLogLines([&](const std::string& line) {
logElements.push_back(text(line));
logElements.push_back(paragraph(line));
});

auto logsContent = vbox(logElements);
Expand All @@ -1166,9 +1178,7 @@ class TPCCLoader {

auto layout = vbox({
topRow,
separator(),
vbox(indexElements),
separator(),
indicesRow,
logsPanel | flex
});

Expand Down
85 changes: 64 additions & 21 deletions ydb/library/workload/tpcc/runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "transactions.h"

#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
#include <ydb/public/lib/ydb_cli/common/interactive.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h>

#include <library/cpp/logger/log.h>
Expand Down Expand Up @@ -216,14 +217,27 @@ TPCCRunner::TPCCRunner(const NConsoleClient::TClientCommand::TConfig& connection
std::exit(1);
}

const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig);
const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1;
if (Config.WarehouseCount == 0) {
std::cerr << "Specified zero warehouses" << std::endl;
std::exit(1);
}

const size_t terminalsCount = Config.WarehouseCount * TERMINALS_PER_WAREHOUSE;

// we might consider using less than maxTerminalThreads
const size_t threadCount = Config.ThreadCount == 0 ?
std::min(maxTerminalThreadCount, terminalsCount) : Config.ThreadCount;
size_t threadCount = 0;
if (Config.ThreadCount == 0) {
// here we calculate max possible efficient thread number
const size_t networkThreadCount = NConsoleClient::TYdbCommand::GetNetworkThreadNum(ConnectionConfig);
const size_t maxTerminalThreadCount = cpuCount > networkThreadCount ? cpuCount - networkThreadCount : 1;
threadCount = std::min(maxTerminalThreadCount, terminalsCount);

// usually this allows to lower number of threads
const size_t recommendedThreadCount =
(Config.WarehouseCount + WAREHOUSES_PER_CPU_CORE - 1) / WAREHOUSES_PER_CPU_CORE;
threadCount = std::min(threadCount, recommendedThreadCount);
} else {
threadCount = Config.ThreadCount;
}

// The number of terminals might be hundreds of thousands.
// For now, we don't have more than 32 network threads (check TYdbCommand::GetNetworkThreadNum()),
Expand Down Expand Up @@ -351,7 +365,7 @@ void TPCCRunner::Join() {
}

void TPCCRunner::RunSync() {
Config.SetDisplayUpdateInterval();
Config.SetDisplay();

Clock::time_point now = Clock::now();

Expand All @@ -363,14 +377,14 @@ void TPCCRunner::RunSync() {
// We don't want to start all terminals at the same time, because then there will be
// a huge queue of ready terminals, which we can't handle
bool forcedWarmup = false;
int minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1;
int minWarmupMinutes = (minWarmupSeconds + 59) / 60;
int warmupMinutes;
if (Config.WarmupMinutes < minWarmupMinutes) {
uint32_t minWarmupSeconds = Terminals.size() * MinWarmupPerTerminal.count() / 1000 + 1;
uint32_t minWarmupMinutes = (minWarmupSeconds + 59) / 60;
uint32_t warmupMinutes;
if (Config.WarmupDuration.Minutes() < minWarmupMinutes) {
forcedWarmup = true; // we must print log message later after display update
warmupMinutes = minWarmupMinutes;
} else {
warmupMinutes = Config.WarmupMinutes;
warmupMinutes = Config.WarmupDuration.Minutes();
}

WarmupStartTs = Clock::now();
Expand Down Expand Up @@ -411,14 +425,14 @@ void TPCCRunner::RunSync() {

StopWarmup.store(true, std::memory_order_relaxed);

LOG_I("Measuring during " << Config.RunMinutes << " minutes");
LOG_I("Measuring during " << Config.RunDuration);

MeasurementsStartTs = Clock::now();

// reset statistics
LastStatisticsSnapshot = std::make_unique<TAllStatistics>(PerThreadTerminalStats.size(), MeasurementsStartTs);

StopDeadline = MeasurementsStartTs + std::chrono::minutes(Config.RunMinutes);
StopDeadline = MeasurementsStartTs + std::chrono::seconds(Config.RunDuration.Seconds());
while (!GetGlobalInterruptSource().stop_requested()) {
if (now >= StopDeadline) {
break;
Expand Down Expand Up @@ -469,10 +483,12 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
ss << std::endl << "Efficiency: " << std::setprecision(1) << data.Efficiency << "% | "
<< "tpmC: " << std::setprecision(1) << data.Tpmc;

std::cout << ss.str();
LOG_I(ss.str());

// Per thread statistics (two columns)
std::cout << "\nPer thread statistics:" << std::endl;

std::stringstream debugSs;
debugSs << "\nPer thread statistics:" << std::endl;

size_t threadCount = LastStatisticsSnapshot->StatVec.size();
size_t halfCount = (threadCount + 1) / 2;
Expand All @@ -487,10 +503,10 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
<< std::setw(15) << "queue p90, ms";

// Print headers side by side
std::cout << threadsHeader.str() << " | " << threadsHeader.str() << std::endl;
debugSs << threadsHeader.str() << " | " << threadsHeader.str() << std::endl;

size_t totalWidth = threadsHeader.str().length() * 2 + 3;
std::cout << std::string(totalWidth, '-') << std::endl;
debugSs << std::string(totalWidth, '-') << std::endl;

// Print thread data in two columns
for (size_t i = 0; i < halfCount; ++i) {
Expand Down Expand Up @@ -525,13 +541,15 @@ void TPCCRunner::UpdateDisplayTextMode(const TCalculatedStatusData& data) {
rightLine << std::string(threadsHeader.str().length(), ' ');
}

std::cout << leftLine.str() << " | " << rightLine.str() << std::endl;
debugSs << leftLine.str() << " | " << rightLine.str() << std::endl;
}
std::cout << std::string(totalWidth, '-') << std::endl;
debugSs << std::string(totalWidth, '-') << std::endl;

// Transaction statistics
std::cout << "\n\n";
PrintTransactionStatisticsPretty(std::cout);
debugSs << "\n";
PrintTransactionStatisticsPretty(debugSs);

LOG_D(debugSs.str());
}

void TPCCRunner::UpdateDisplayTuiMode(const TCalculatedStatusData& data) {
Expand Down Expand Up @@ -903,6 +921,31 @@ void TPCCRunner::PrintFinalResultPretty() {

//-----------------------------------------------------------------------------

void TRunConfig::SetDisplay() {
if (NoTui) {
DisplayMode = EDisplayMode::Text;
} else {
if (NConsoleClient::IsStdoutInteractive()) {
DisplayMode = EDisplayMode::Tui;
} else {
DisplayMode = EDisplayMode::Text;
}
}

switch (DisplayMode) {
case EDisplayMode::None:
return;
case EDisplayMode::Text:
DisplayUpdateInterval = DisplayUpdateTextInterval;
return;
case EDisplayMode::Tui:
DisplayUpdateInterval = DisplayUpdateTuiInterval;
return;
}
}

//-----------------------------------------------------------------------------

void RunSync(const NConsoleClient::TClientCommand::TConfig& connectionConfig, const TRunConfig& runConfig) {
TPCCRunner runner(connectionConfig, runConfig);
runner.RunSync();
Expand Down
39 changes: 21 additions & 18 deletions ydb/library/workload/tpcc/runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@

#include <library/cpp/logger/priority.h>

#include <util/datetime/base.h>

#include <stop_token>

namespace NYdb::NTPCC {

constexpr int DEFAULT_WAREHOUSE_COUNT = 1;
constexpr int DEFAULT_WARMUP_MINUTES = 1; // TODO
constexpr int DEFAULT_RUN_MINUTES = 2; // TODO
constexpr TDuration DEFAULT_WARMUP_DURATION = TDuration::Minutes(1); // TODO
constexpr TDuration DEFAULT_RUN_DURATION = TDuration::Minutes(2); // TODO

constexpr int DEFAULT_MAX_SESSIONS = 100; // TODO
constexpr int DEFAULT_THREAD_COUNT = 0; // autodetect
constexpr int DEFAULT_LOAD_THREAD_COUNT = 10;

constexpr int DEFAULT_LOG_LEVEL = 6; // TODO: properly use enum

struct TRunConfig {
Expand All @@ -21,6 +27,11 @@ struct TRunConfig {
Tui,
};

enum class EFormat {
Pretty = 0,
Json,
};

TRunConfig() = default;
void SetFullPath(const NConsoleClient::TClientCommand::TConfig& connectionConfig) {
if (Path.empty()) {
Expand All @@ -35,37 +46,29 @@ struct TRunConfig {
Path = connectionConfig.Database + '/' + Path;
}

void SetDisplayUpdateInterval() {
switch (DisplayMode) {
case EDisplayMode::None:
return;
case EDisplayMode::Text:
DisplayUpdateInterval = DisplayUpdateTextInterval;
return;
case EDisplayMode::Tui:
DisplayUpdateInterval = DisplayUpdateTuiInterval;
return;
break;
}
}
void SetDisplay();

int WarehouseCount = DEFAULT_WAREHOUSE_COUNT;
int WarmupMinutes = DEFAULT_WARMUP_MINUTES;
int RunMinutes = DEFAULT_RUN_MINUTES;
TDuration WarmupDuration = DEFAULT_WARMUP_DURATION;
TDuration RunDuration = DEFAULT_RUN_DURATION;

int MaxInflight = DEFAULT_MAX_SESSIONS;

TString Path;

EFormat Format = EFormat::Pretty;

TString JsonResultPath;

// advanced settings (normally, used by developer only)

int ThreadCount = 0;
int ThreadCount = DEFAULT_THREAD_COUNT;
int LoadThreadCount = DEFAULT_LOAD_THREAD_COUNT;
int DriverCount = 0;
ELogPriority LogPriority = static_cast<ELogPriority>(DEFAULT_LOG_LEVEL);
bool NoDelays = false;
bool ExtendedStats = false;
bool NoTui = false;
EDisplayMode DisplayMode = EDisplayMode::None;

// instead of actual transaction just async sleep and return SUCCESS
Expand Down
Loading
Loading