Skip to content

Commit

Permalink
Refactored configuration service of Czar
Browse files Browse the repository at this point in the history
The service is now implemented as a singleton which is initialized
and loaded with the configuration parameters at a time when Czar
is being created. The rest of the Czar fetches values of the parameters
from the singleton. The refactoring aims at reducing the number of parameters
sent around in the code and to improve readability of the code.
  • Loading branch information
iagaponenko committed Jun 28, 2023
1 parent d2a1f55 commit 0e20434
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 84 deletions.
37 changes: 18 additions & 19 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,
auto const& fromTable = tableRefPtr->getTable();
rowsTable = fromDb + "__" + fromTable + "__rows";
// TODO consider using QMetaSelect instead of making a new connection.
auto cnx = sql::SqlConnectionFactory::make(sharedResources->czarConfig.getMySqlQmetaConfig());
auto cnx = sql::SqlConnectionFactory::make(czar::CzarConfig::instance()->getMySqlQmetaConfig());
sql::SqlErrorObject err;
auto tableExists = cnx->tableExists(rowsTable, err);
LOGS(_log, LOG_LVL_DEBUG,
Expand All @@ -161,28 +161,27 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,
}

std::shared_ptr<UserQuerySharedResources> makeUserQuerySharedResources(
czar::CzarConfig const& czarConfig, std::shared_ptr<qproc::DatabaseModels> const& dbModels,
std::string const& czarName) {
std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
return std::make_shared<UserQuerySharedResources>(
czarConfig,
css::CssAccess::createFromConfig(czarConfig.getCssConfigMap(), czarConfig.getEmptyChunkPath()),
czarConfig.getMySqlResultConfig(),
std::make_shared<qproc::SecondaryIndex>(czarConfig.getMySqlQmetaConfig()),
std::make_shared<qmeta::QMetaMysql>(czarConfig.getMySqlQmetaConfig(),
czarConfig.getMaxMsgSourceStore()),
std::make_shared<qmeta::QStatusMysql>(czarConfig.getMySqlQStatusDataConfig()),
std::make_shared<qmeta::QMetaSelect>(czarConfig.getMySqlQmetaConfig()),
sql::SqlConnectionFactory::make(czarConfig.getMySqlResultConfig()), dbModels, czarName,
czarConfig.getInteractiveChunkLimit());
css::CssAccess::createFromConfig(czarConfig->getCssConfigMap(), czarConfig->getEmptyChunkPath()),
czarConfig->getMySqlResultConfig(),
std::make_shared<qproc::SecondaryIndex>(czarConfig->getMySqlQmetaConfig()),
std::make_shared<qmeta::QMetaMysql>(czarConfig->getMySqlQmetaConfig(),
czarConfig->getMaxMsgSourceStore()),
std::make_shared<qmeta::QStatusMysql>(czarConfig->getMySqlQStatusDataConfig()),
std::make_shared<qmeta::QMetaSelect>(czarConfig->getMySqlQmetaConfig()),
sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig()), dbModels, czarName,
czarConfig->getInteractiveChunkLimit());
}

////////////////////////////////////////////////////////////////////////
UserQueryFactory::UserQueryFactory(czar::CzarConfig const& czarConfig,
qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName)
: _userQuerySharedResources(makeUserQuerySharedResources(czarConfig, dbModels, czarName)),
UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName)
: _userQuerySharedResources(makeUserQuerySharedResources(dbModels, czarName)),
_useQservRowCounterOptimization(true) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
_executiveConfig = std::make_shared<qdisp::ExecutiveConfig>(
czarConfig.getXrootdFrontendUrl(), czarConfig.getQMetaSecondsBetweenChunkUpdates());
czarConfig->getXrootdFrontendUrl(), czarConfig->getQMetaSecondsBetweenChunkUpdates());

// When czar crashes/exits while some queries are still in flight they
// are left in EXECUTING state in QMeta. We want to cleanup that state
Expand Down Expand Up @@ -290,8 +289,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
if (sessionValid) {
executive = qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources,
_userQuerySharedResources->queryStatsData, qs);
infileMergerConfig = std::make_shared<rproc::InfileMergerConfig>(
_userQuerySharedResources->czarConfig, _userQuerySharedResources->mysqlResultConfig);
infileMergerConfig =
std::make_shared<rproc::InfileMergerConfig>(_userQuerySharedResources->mysqlResultConfig);
infileMergerConfig->debugNoMerge = _debugNoMerge;
}

Expand Down
7 changes: 1 addition & 6 deletions src/ccontrol/UserQueryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ class UserQuery;
class UserQuerySharedResources;
} // namespace lsst::qserv::ccontrol

namespace lsst::qserv::czar {
class CzarConfig;
}

namespace lsst::qserv::qdisp {
class ExecutiveConfig;
}
Expand All @@ -71,8 +67,7 @@ namespace lsst::qserv::ccontrol {
/// constant between successive user queries.
class UserQueryFactory : private boost::noncopyable {
public:
UserQueryFactory(czar::CzarConfig const& czarConfig,
std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName);
UserQueryFactory(std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName);

/// @param query: Query text
/// @param defaultDb: Default database name, may be empty
Expand Down
8 changes: 3 additions & 5 deletions src/ccontrol/UserQueryResources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@
namespace lsst::qserv::ccontrol {

UserQuerySharedResources::UserQuerySharedResources(
czar::CzarConfig const& czarConfig_, std::shared_ptr<css::CssAccess> const& css_,
mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<css::CssAccess> const& css_, mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<qproc::SecondaryIndex> const& secondaryIndex_,
std::shared_ptr<qmeta::QMeta> const& queryMetadata_,
std::shared_ptr<qmeta::QStatus> const& queryStatsData_,
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect_,
std::shared_ptr<sql::SqlConnection> const& resultDbConn_,
std::shared_ptr<qproc::DatabaseModels> const& dbModels_, std::string const& czarName,
int interactiveChunkLimit_)
: czarConfig(czarConfig_),
css(css_),
: css(css_),
mysqlResultConfig(mysqlResultConfig_),
secondaryIndex(secondaryIndex_),
queryMetadata(queryMetadata_),
Expand All @@ -52,7 +50,7 @@ UserQuerySharedResources::UserQuerySharedResources(
resultDbConn(resultDbConn_),
databaseModels(dbModels_),
interactiveChunkLimit(interactiveChunkLimit_),
semaMgrConnections(new util::SemaMgr(czarConfig.getResultMaxConnections())) {
semaMgrConnections(new util::SemaMgr(czar::CzarConfig::instance()->getResultMaxConnections())) {
// register czar in QMeta
// TODO: check that czar with the same name is not active already?
qMetaCzarId = queryMetadata->registerCzar(czarName);
Expand Down
7 changes: 1 addition & 6 deletions src/ccontrol/UserQueryResources.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ namespace lsst::qserv::css {
class CssAccess;
}

namespace lsst::qserv::czar {
class CzarConfig;
}

namespace lsst::qserv::mysql {
class MySqlConfig;
}
Expand Down Expand Up @@ -72,7 +68,7 @@ namespace lsst::qserv::ccontrol {
*/
class UserQuerySharedResources {
public:
UserQuerySharedResources(czar::CzarConfig const& czarConfig_, std::shared_ptr<css::CssAccess> const& css_,
UserQuerySharedResources(std::shared_ptr<css::CssAccess> const& css_,
mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<qproc::SecondaryIndex> const& secondaryIndex_,
std::shared_ptr<qmeta::QMeta> const& queryMetadata_,
Expand All @@ -84,7 +80,6 @@ class UserQuerySharedResources {

UserQuerySharedResources(UserQuerySharedResources const& rhs) = default;
UserQuerySharedResources& operator=(UserQuerySharedResources const& rhs) = delete;
czar::CzarConfig const& czarConfig;
std::shared_ptr<css::CssAccess> css;
mysql::MySqlConfig const mysqlResultConfig;
std::shared_ptr<qproc::SecondaryIndex> secondaryIndex;
Expand Down
9 changes: 4 additions & 5 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,6 @@ QueryState UserQuerySelect::join() {
}
_executive->updateProxyMessages();

// Capture these parameters before discarding the merger which would also reset the config.
bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish();
std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl();
try {
_discardMerger();
} catch (std::exception const& exc) {
Expand Down Expand Up @@ -391,9 +388,11 @@ QueryState UserQuerySelect::join() {
operation = proto::QueryManagement::CANCEL;
state = ERROR;
}
if (notifyWorkersOnQueryFinish) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
if (czarConfig->notifyWorkersOnQueryFinish()) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId);
xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation,
_qMetaQueryId);
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
Expand Down
53 changes: 27 additions & 26 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,27 @@
#include "czar/Czar.h"

// System headers
#include <sys/time.h>
#include <stdexcept>
#include <sys/time.h>
#include <thread>

// Third-party headers
#include "boost/format.hpp"
#include "boost/lexical_cast.hpp"

#include "../qdisp/CzarStats.h"
// LSST headers
#include "lsst/log/Log.h"

// Qserv headers
#include "ccontrol/ConfigMap.h"
#include "ccontrol/UserQuerySelect.h"
#include "ccontrol/UserQueryType.h"
#include "czar/CzarConfig.h"
#include "czar/CzarErrors.h"
#include "czar/MessageTable.h"
#include "global/LogContext.h"
#include "proto/worker.pb.h"
#include "qdisp/CzarStats.h"
#include "qdisp/PseudoFifo.h"
#include "qdisp/QdispPool.h"
#include "qdisp/SharedResources.h"
Expand Down Expand Up @@ -88,7 +89,7 @@ Czar::Ptr Czar::createCzar(string const& configPath, string const& czarName) {
// Constructors
Czar::Czar(string const& configPath, string const& czarName)
: _czarName(czarName),
_czarConfig(configPath),
_czarConfig(CzarConfig::create(configPath)),
_idCounter(),
_uqFactory(),
_clientToQuery(),
Expand All @@ -104,27 +105,27 @@ Czar::Czar(string const& configPath, string const& czarName)
// The id will be used as the high-watermark for queries that need to be cancelled.
// All queries that have identifiers that are strictly less than this one will
// be affected by the operation.
if (_czarConfig.notifyWorkersOnCzarRestart()) {
if (_czarConfig->notifyWorkersOnCzarRestart()) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(),
xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig->getXrootdFrontendUrl(),
proto::QueryManagement::CANCEL_AFTER_RESTART,
_lastQueryIdBeforeRestart());
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}

auto databaseModels =
qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig());
auto databaseModels = qproc::DatabaseModels::create(_czarConfig->getCssConfigMap(),
_czarConfig->getMySqlResultConfig());

// Need to be done first as it adds logging context for new threads
_uqFactory.reset(new ccontrol::UserQueryFactory(_czarConfig, databaseModels, _czarName));
_uqFactory.reset(new ccontrol::UserQueryFactory(databaseModels, _czarName));

int qPoolSize = _czarConfig.getQdispPoolSize();
int maxPriority = std::max(0, _czarConfig.getQdispMaxPriority());
string vectRunSizesStr = _czarConfig.getQdispVectRunSizes();
int qPoolSize = _czarConfig->getQdispPoolSize();
int maxPriority = std::max(0, _czarConfig->getQdispMaxPriority());
string vectRunSizesStr = _czarConfig->getQdispVectRunSizes();
vector<int> vectRunSizes = util::StringHelper::getIntVectFromStr(vectRunSizesStr, ":", 1);
string vectMinRunningSizesStr = _czarConfig.getQdispVectMinRunningSizes();
string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes();
vector<int> vectMinRunningSizes = util::StringHelper::getIntVectFromStr(vectMinRunningSizesStr, ":", 0);
LOGS(_log, LOG_LVL_INFO,
"INFO qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
Expand All @@ -135,22 +136,22 @@ Czar::Czar(string const& configPath, string const& czarName)
make_shared<qdisp::QdispPool>(qPoolSize, maxPriority, vectRunSizes, vectMinRunningSizes);
qdisp::CzarStats::setup(qdispPool);

int qReqPseudoMaxRunning = _czarConfig.getQReqPseudoFifoMaxRunning();
int qReqPseudoMaxRunning = _czarConfig->getQReqPseudoFifoMaxRunning();
qdisp::PseudoFifo::Ptr queryRequestPseudoFifo = make_shared<qdisp::PseudoFifo>(qReqPseudoMaxRunning);
_qdispSharedResources = qdisp::SharedResources::create(qdispPool, queryRequestPseudoFifo);

int xrootdCBThreadsMax = _czarConfig.getXrootdCBThreadsMax();
int xrootdCBThreadsInit = _czarConfig.getXrootdCBThreadsInit();
int xrootdCBThreadsMax = _czarConfig->getXrootdCBThreadsMax();
int xrootdCBThreadsInit = _czarConfig->getXrootdCBThreadsInit();
LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsMax=" << xrootdCBThreadsMax);
LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsInit=" << xrootdCBThreadsInit);
XrdSsiProviderClient->SetCBThreads(xrootdCBThreadsMax, xrootdCBThreadsInit);
int const xrootdSpread = _czarConfig.getXrootdSpread();
int const xrootdSpread = _czarConfig->getXrootdSpread();
LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread);
XrdSsiProviderClient->SetSpread(xrootdSpread);
_queryDistributionTestVer = _czarConfig.getQueryDistributionTestVer();
_queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer();

LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName);
LOGS(_log, LOG_LVL_INFO, "Czar config: " << _czarConfig);
LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig);

// Watch to see if the log configuration is changed.
// If LSST_LOG_CONFIG is not defined, there's no good way to know what log
Expand Down Expand Up @@ -187,7 +188,7 @@ SubmitResult Czar::submitQuery(string const& query, map<string, string> const& h
// make message table name
string userQueryId = to_string(_idCounter++);
LOGS(_log, LOG_LVL_DEBUG, "userQueryId: " << userQueryId);
string resultDb = _czarConfig.getMySqlResultConfig().dbName;
string resultDb = _czarConfig->getMySqlResultConfig().dbName;
string const msgTableName = "message_" + userQueryId;
string const lockName = resultDb + "." + msgTableName;

Expand All @@ -197,7 +198,7 @@ SubmitResult Czar::submitQuery(string const& query, map<string, string> const& h
SubmitResult result;

// instantiate message table manager
MessageTable msgTable(lockName, _czarConfig.getMySqlResultConfig());
MessageTable msgTable(lockName, _czarConfig->getMySqlResultConfig());
try {
msgTable.lock();
} catch (std::exception const& exc) {
Expand Down Expand Up @@ -261,7 +262,7 @@ SubmitResult Czar::submitQuery(string const& query, map<string, string> const& h
// we do not need to lock message because result is ready before we return
string const resultTableName = resultDb + ".result_async_" + userQueryId;
string const asyncLockName = resultDb + ".message_async_" + userQueryId;
MessageTable msgTable(asyncLockName, _czarConfig.getMySqlResultConfig());
MessageTable msgTable(asyncLockName, _czarConfig->getMySqlResultConfig());
try {
_makeAsyncResult(resultTableName, uq->getQueryId(), uq->getResultLocation());
msgTable.create();
Expand Down Expand Up @@ -397,7 +398,7 @@ void Czar::_updateQueryHistory(string const& clientId, int threadId, ccontrol::U
}

void Czar::_makeAsyncResult(string const& asyncResultTable, QueryId queryId, string const& resultLoc) {
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig());
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig());
LOGS(_log, LOG_LVL_DEBUG, "creating async result table " << asyncResultTable);

sql::SqlErrorObject sqlErr;
Expand Down Expand Up @@ -431,9 +432,9 @@ void Czar::removeOldResultTables() {
// Run in a separate thread in the off chance this takes a while.
thread t([this]() {
LOGS(_log, LOG_LVL_INFO, "Removing old result database tables.");
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig());
string dbName = _czarConfig.getMySqlResultConfig().dbName;
string dStr = to_string(_czarConfig.getOldestResultKeptDays());
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig());
string dbName = _czarConfig->getMySqlResultConfig().dbName;
string dStr = to_string(_czarConfig->getOldestResultKeptDays());

// Find result related tables that haven't been updated in a long time.
string sql =
Expand Down Expand Up @@ -481,7 +482,7 @@ void Czar::removeOldResultTables() {

QueryId Czar::_lastQueryIdBeforeRestart() const {
string const context = "Czar::" + string(__func__) + " ";
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig());
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlQmetaConfig());
string const sql = "SELECT MAX(queryId) FROM QInfo";
sql::SqlResults results;
sql::SqlErrorObject err;
Expand Down
13 changes: 7 additions & 6 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
// Qserv headers
#include "ccontrol/UserQuery.h"
#include "ccontrol/UserQueryFactory.h"
#include "czar/CzarConfig.h"
#include "czar/SubmitResult.h"
#include "global/intTypes.h"
#include "global/stringTypes.h"
Expand All @@ -48,6 +47,10 @@

namespace lsst::qserv {

namespace czar {
class CzarConfig;
}

namespace qdisp {
class PseudoFifo;
}
Expand All @@ -61,10 +64,8 @@ namespace czar {
/// @addtogroup czar

/**
* @ingroup czar
*
* @brief Class representing czar "entry points".
*
* @ingroup czar
* @brief Class representing czar "entry points".
*/

class Czar {
Expand Down Expand Up @@ -148,7 +149,7 @@ class Czar {
typedef std::map<QueryId, std::weak_ptr<ccontrol::UserQuery>> IdToQuery;

std::string const _czarName; ///< Unique czar name
CzarConfig const _czarConfig;
std::shared_ptr<CzarConfig> const _czarConfig;

std::atomic<uint64_t> _idCounter; ///< Query/task identifier for next query
std::unique_ptr<ccontrol::UserQueryFactory> _uqFactory;
Expand Down
20 changes: 20 additions & 0 deletions src/czar/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ bool dummy = XrdSsiLogger::SetMCB(QservLogger, XrdSsiLogger::mcbClient);

namespace lsst::qserv::czar {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
std::shared_ptr<CzarConfig> const ptr =
std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName)));
_instance = ptr;
return ptr;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
}
return _instance;
}

CzarConfig::CzarConfig(util::ConfigStore const& configStore)
: _mySqlResultConfig(configStore.get("resultdb.user", "qsmaster"),
configStore.getRequired("resultdb.passwd"),
Expand Down
Loading

0 comments on commit 0e20434

Please sign in to comment.