From bb80f0b4166e39b714cb7ca9d88034502fa79686 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 28 Jun 2023 02:53:37 +0000 Subject: [PATCH 1/6] Refactored configuration service of Czar The service is now implemented as a singleton which is initialized and loaded with the configuration parameters at a time when Czar is being created. The rest of the Czar fetches values of the parameters from the singleton. The refactoring aims at reducing the number of parameters sent around in the code and to improve readability of the code. --- src/ccontrol/UserQueryFactory.cc | 37 ++++++++++----------- src/ccontrol/UserQueryFactory.h | 7 +--- src/ccontrol/UserQueryResources.cc | 8 ++--- src/ccontrol/UserQueryResources.h | 7 +--- src/ccontrol/UserQuerySelect.cc | 9 +++-- src/czar/Czar.cc | 53 +++++++++++++++--------------- src/czar/Czar.h | 13 ++++---- src/czar/CzarConfig.cc | 20 +++++++++++ src/czar/CzarConfig.h | 33 +++++++++++++++++-- src/rproc/InfileMerger.cc | 7 ++-- src/rproc/InfileMerger.h | 7 +--- 11 files changed, 117 insertions(+), 84 deletions(-) diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 351c3fe948..870f28a928 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -151,7 +151,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt, auto const& fromTable = tableRefPtr->getTable(); rowsTable = fromDb + "__" + fromTable + "__rows"; // TODO consider using QMetaSelect instead of making a new connection. - auto cnx = sql::SqlConnectionFactory::make(sharedResources->czarConfig.getMySqlQmetaConfig()); + auto cnx = sql::SqlConnectionFactory::make(czar::CzarConfig::instance()->getMySqlQmetaConfig()); sql::SqlErrorObject err; auto tableExists = cnx->tableExists(rowsTable, err); LOGS(_log, LOG_LVL_DEBUG, @@ -161,28 +161,27 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt, } std::shared_ptr makeUserQuerySharedResources( - czar::CzarConfig const& czarConfig, std::shared_ptr const& dbModels, - std::string const& czarName) { + std::shared_ptr const& dbModels, std::string const& czarName) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); return std::make_shared( - czarConfig, - css::CssAccess::createFromConfig(czarConfig.getCssConfigMap(), czarConfig.getEmptyChunkPath()), - czarConfig.getMySqlResultConfig(), - std::make_shared(czarConfig.getMySqlQmetaConfig()), - std::make_shared(czarConfig.getMySqlQmetaConfig(), - czarConfig.getMaxMsgSourceStore()), - std::make_shared(czarConfig.getMySqlQStatusDataConfig()), - std::make_shared(czarConfig.getMySqlQmetaConfig()), - sql::SqlConnectionFactory::make(czarConfig.getMySqlResultConfig()), dbModels, czarName, - czarConfig.getInteractiveChunkLimit()); + css::CssAccess::createFromConfig(czarConfig->getCssConfigMap(), czarConfig->getEmptyChunkPath()), + czarConfig->getMySqlResultConfig(), + std::make_shared(czarConfig->getMySqlQmetaConfig()), + std::make_shared(czarConfig->getMySqlQmetaConfig(), + czarConfig->getMaxMsgSourceStore()), + std::make_shared(czarConfig->getMySqlQStatusDataConfig()), + std::make_shared(czarConfig->getMySqlQmetaConfig()), + sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig()), dbModels, czarName, + czarConfig->getInteractiveChunkLimit()); } //////////////////////////////////////////////////////////////////////// -UserQueryFactory::UserQueryFactory(czar::CzarConfig const& czarConfig, - qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName) - : _userQuerySharedResources(makeUserQuerySharedResources(czarConfig, dbModels, czarName)), +UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName) + : _userQuerySharedResources(makeUserQuerySharedResources(dbModels, czarName)), _useQservRowCounterOptimization(true) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); _executiveConfig = std::make_shared( - czarConfig.getXrootdFrontendUrl(), czarConfig.getQMetaSecondsBetweenChunkUpdates()); + czarConfig->getXrootdFrontendUrl(), czarConfig->getQMetaSecondsBetweenChunkUpdates()); // When czar crashes/exits while some queries are still in flight they // are left in EXECUTING state in QMeta. We want to cleanup that state @@ -290,8 +289,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st if (sessionValid) { executive = qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources, _userQuerySharedResources->queryStatsData, qs); - infileMergerConfig = std::make_shared( - _userQuerySharedResources->czarConfig, _userQuerySharedResources->mysqlResultConfig); + infileMergerConfig = + std::make_shared(_userQuerySharedResources->mysqlResultConfig); infileMergerConfig->debugNoMerge = _debugNoMerge; } diff --git a/src/ccontrol/UserQueryFactory.h b/src/ccontrol/UserQueryFactory.h index 1dcc896ec7..b813544fa4 100644 --- a/src/ccontrol/UserQueryFactory.h +++ b/src/ccontrol/UserQueryFactory.h @@ -47,10 +47,6 @@ class UserQuery; class UserQuerySharedResources; } // namespace lsst::qserv::ccontrol -namespace lsst::qserv::czar { -class CzarConfig; -} - namespace lsst::qserv::qdisp { class ExecutiveConfig; } @@ -71,8 +67,7 @@ namespace lsst::qserv::ccontrol { /// constant between successive user queries. class UserQueryFactory : private boost::noncopyable { public: - UserQueryFactory(czar::CzarConfig const& czarConfig, - std::shared_ptr const& dbModels, std::string const& czarName); + UserQueryFactory(std::shared_ptr const& dbModels, std::string const& czarName); /// @param query: Query text /// @param defaultDb: Default database name, may be empty diff --git a/src/ccontrol/UserQueryResources.cc b/src/ccontrol/UserQueryResources.cc index 3baf12d9a6..8bef7c4030 100644 --- a/src/ccontrol/UserQueryResources.cc +++ b/src/ccontrol/UserQueryResources.cc @@ -33,8 +33,7 @@ namespace lsst::qserv::ccontrol { UserQuerySharedResources::UserQuerySharedResources( - czar::CzarConfig const& czarConfig_, std::shared_ptr const& css_, - mysql::MySqlConfig const& mysqlResultConfig_, + std::shared_ptr const& css_, mysql::MySqlConfig const& mysqlResultConfig_, std::shared_ptr const& secondaryIndex_, std::shared_ptr const& queryMetadata_, std::shared_ptr const& queryStatsData_, @@ -42,8 +41,7 @@ UserQuerySharedResources::UserQuerySharedResources( std::shared_ptr const& resultDbConn_, std::shared_ptr const& dbModels_, std::string const& czarName, int interactiveChunkLimit_) - : czarConfig(czarConfig_), - css(css_), + : css(css_), mysqlResultConfig(mysqlResultConfig_), secondaryIndex(secondaryIndex_), queryMetadata(queryMetadata_), @@ -52,7 +50,7 @@ UserQuerySharedResources::UserQuerySharedResources( resultDbConn(resultDbConn_), databaseModels(dbModels_), interactiveChunkLimit(interactiveChunkLimit_), - semaMgrConnections(new util::SemaMgr(czarConfig.getResultMaxConnections())) { + semaMgrConnections(new util::SemaMgr(czar::CzarConfig::instance()->getResultMaxConnections())) { // register czar in QMeta // TODO: check that czar with the same name is not active already? qMetaCzarId = queryMetadata->registerCzar(czarName); diff --git a/src/ccontrol/UserQueryResources.h b/src/ccontrol/UserQueryResources.h index 97324fb4e7..98d57d33f9 100644 --- a/src/ccontrol/UserQueryResources.h +++ b/src/ccontrol/UserQueryResources.h @@ -38,10 +38,6 @@ namespace lsst::qserv::css { class CssAccess; } -namespace lsst::qserv::czar { -class CzarConfig; -} - namespace lsst::qserv::mysql { class MySqlConfig; } @@ -72,7 +68,7 @@ namespace lsst::qserv::ccontrol { */ class UserQuerySharedResources { public: - UserQuerySharedResources(czar::CzarConfig const& czarConfig_, std::shared_ptr const& css_, + UserQuerySharedResources(std::shared_ptr const& css_, mysql::MySqlConfig const& mysqlResultConfig_, std::shared_ptr const& secondaryIndex_, std::shared_ptr const& queryMetadata_, @@ -84,7 +80,6 @@ class UserQuerySharedResources { UserQuerySharedResources(UserQuerySharedResources const& rhs) = default; UserQuerySharedResources& operator=(UserQuerySharedResources const& rhs) = delete; - czar::CzarConfig const& czarConfig; std::shared_ptr css; mysql::MySqlConfig const mysqlResultConfig; std::shared_ptr secondaryIndex; diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 242d30e16f..79d08994cc 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -355,9 +355,6 @@ QueryState UserQuerySelect::join() { } _executive->updateProxyMessages(); - // Capture these parameters before discarding the merger which would also reset the config. - bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish(); - std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl(); try { _discardMerger(); } catch (std::exception const& exc) { @@ -391,9 +388,11 @@ QueryState UserQuerySelect::join() { operation = proto::QueryManagement::CANCEL; state = ERROR; } - if (notifyWorkersOnQueryFinish) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); + if (czarConfig->notifyWorkersOnQueryFinish()) { try { - xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId); + xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation, + _qMetaQueryId); } catch (std::exception const& ex) { LOGS(_log, LOG_LVL_WARN, ex.what()); } diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 4e85dc8183..084db248b1 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -24,15 +24,14 @@ #include "czar/Czar.h" // System headers -#include #include +#include #include // Third-party headers #include "boost/format.hpp" #include "boost/lexical_cast.hpp" -#include "../qdisp/CzarStats.h" // LSST headers #include "lsst/log/Log.h" @@ -40,10 +39,12 @@ #include "ccontrol/ConfigMap.h" #include "ccontrol/UserQuerySelect.h" #include "ccontrol/UserQueryType.h" +#include "czar/CzarConfig.h" #include "czar/CzarErrors.h" #include "czar/MessageTable.h" #include "global/LogContext.h" #include "proto/worker.pb.h" +#include "qdisp/CzarStats.h" #include "qdisp/PseudoFifo.h" #include "qdisp/QdispPool.h" #include "qdisp/SharedResources.h" @@ -88,7 +89,7 @@ Czar::Ptr Czar::createCzar(string const& configPath, string const& czarName) { // Constructors Czar::Czar(string const& configPath, string const& czarName) : _czarName(czarName), - _czarConfig(configPath), + _czarConfig(CzarConfig::create(configPath)), _idCounter(), _uqFactory(), _clientToQuery(), @@ -104,9 +105,9 @@ Czar::Czar(string const& configPath, string const& czarName) // The id will be used as the high-watermark for queries that need to be cancelled. // All queries that have identifiers that are strictly less than this one will // be affected by the operation. - if (_czarConfig.notifyWorkersOnCzarRestart()) { + if (_czarConfig->notifyWorkersOnCzarRestart()) { try { - xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(), + xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig->getXrootdFrontendUrl(), proto::QueryManagement::CANCEL_AFTER_RESTART, _lastQueryIdBeforeRestart()); } catch (std::exception const& ex) { @@ -114,17 +115,17 @@ Czar::Czar(string const& configPath, string const& czarName) } } - auto databaseModels = - qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig()); + auto databaseModels = qproc::DatabaseModels::create(_czarConfig->getCssConfigMap(), + _czarConfig->getMySqlResultConfig()); // Need to be done first as it adds logging context for new threads - _uqFactory.reset(new ccontrol::UserQueryFactory(_czarConfig, databaseModels, _czarName)); + _uqFactory.reset(new ccontrol::UserQueryFactory(databaseModels, _czarName)); - int qPoolSize = _czarConfig.getQdispPoolSize(); - int maxPriority = std::max(0, _czarConfig.getQdispMaxPriority()); - string vectRunSizesStr = _czarConfig.getQdispVectRunSizes(); + int qPoolSize = _czarConfig->getQdispPoolSize(); + int maxPriority = std::max(0, _czarConfig->getQdispMaxPriority()); + string vectRunSizesStr = _czarConfig->getQdispVectRunSizes(); vector vectRunSizes = util::StringHelper::getIntVectFromStr(vectRunSizesStr, ":", 1); - string vectMinRunningSizesStr = _czarConfig.getQdispVectMinRunningSizes(); + string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes(); vector vectMinRunningSizes = util::StringHelper::getIntVectFromStr(vectMinRunningSizesStr, ":", 0); LOGS(_log, LOG_LVL_INFO, "INFO qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes=" @@ -135,22 +136,22 @@ Czar::Czar(string const& configPath, string const& czarName) make_shared(qPoolSize, maxPriority, vectRunSizes, vectMinRunningSizes); qdisp::CzarStats::setup(qdispPool); - int qReqPseudoMaxRunning = _czarConfig.getQReqPseudoFifoMaxRunning(); + int qReqPseudoMaxRunning = _czarConfig->getQReqPseudoFifoMaxRunning(); qdisp::PseudoFifo::Ptr queryRequestPseudoFifo = make_shared(qReqPseudoMaxRunning); _qdispSharedResources = qdisp::SharedResources::create(qdispPool, queryRequestPseudoFifo); - int xrootdCBThreadsMax = _czarConfig.getXrootdCBThreadsMax(); - int xrootdCBThreadsInit = _czarConfig.getXrootdCBThreadsInit(); + int xrootdCBThreadsMax = _czarConfig->getXrootdCBThreadsMax(); + int xrootdCBThreadsInit = _czarConfig->getXrootdCBThreadsInit(); LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsMax=" << xrootdCBThreadsMax); LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsInit=" << xrootdCBThreadsInit); XrdSsiProviderClient->SetCBThreads(xrootdCBThreadsMax, xrootdCBThreadsInit); - int const xrootdSpread = _czarConfig.getXrootdSpread(); + int const xrootdSpread = _czarConfig->getXrootdSpread(); LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread); XrdSsiProviderClient->SetSpread(xrootdSpread); - _queryDistributionTestVer = _czarConfig.getQueryDistributionTestVer(); + _queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer(); LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName); - LOGS(_log, LOG_LVL_INFO, "Czar config: " << _czarConfig); + LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig); // Watch to see if the log configuration is changed. // If LSST_LOG_CONFIG is not defined, there's no good way to know what log @@ -187,7 +188,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h // make message table name string userQueryId = to_string(_idCounter++); LOGS(_log, LOG_LVL_DEBUG, "userQueryId: " << userQueryId); - string resultDb = _czarConfig.getMySqlResultConfig().dbName; + string resultDb = _czarConfig->getMySqlResultConfig().dbName; string const msgTableName = "message_" + userQueryId; string const lockName = resultDb + "." + msgTableName; @@ -197,7 +198,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h SubmitResult result; // instantiate message table manager - MessageTable msgTable(lockName, _czarConfig.getMySqlResultConfig()); + MessageTable msgTable(lockName, _czarConfig->getMySqlResultConfig()); try { msgTable.lock(); } catch (std::exception const& exc) { @@ -261,7 +262,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h // we do not need to lock message because result is ready before we return string const resultTableName = resultDb + ".result_async_" + userQueryId; string const asyncLockName = resultDb + ".message_async_" + userQueryId; - MessageTable msgTable(asyncLockName, _czarConfig.getMySqlResultConfig()); + MessageTable msgTable(asyncLockName, _czarConfig->getMySqlResultConfig()); try { _makeAsyncResult(resultTableName, uq->getQueryId(), uq->getResultLocation()); msgTable.create(); @@ -397,7 +398,7 @@ void Czar::_updateQueryHistory(string const& clientId, int threadId, ccontrol::U } void Czar::_makeAsyncResult(string const& asyncResultTable, QueryId queryId, string const& resultLoc) { - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig()); LOGS(_log, LOG_LVL_DEBUG, "creating async result table " << asyncResultTable); sql::SqlErrorObject sqlErr; @@ -431,9 +432,9 @@ void Czar::removeOldResultTables() { // Run in a separate thread in the off chance this takes a while. thread t([this]() { LOGS(_log, LOG_LVL_INFO, "Removing old result database tables."); - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig()); - string dbName = _czarConfig.getMySqlResultConfig().dbName; - string dStr = to_string(_czarConfig.getOldestResultKeptDays()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig()); + string dbName = _czarConfig->getMySqlResultConfig().dbName; + string dStr = to_string(_czarConfig->getOldestResultKeptDays()); // Find result related tables that haven't been updated in a long time. string sql = @@ -481,7 +482,7 @@ void Czar::removeOldResultTables() { QueryId Czar::_lastQueryIdBeforeRestart() const { string const context = "Czar::" + string(__func__) + " "; - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlQmetaConfig()); string const sql = "SELECT MAX(queryId) FROM QInfo"; sql::SqlResults results; sql::SqlErrorObject err; diff --git a/src/czar/Czar.h b/src/czar/Czar.h index f0acfcf734..54a2b71e5e 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -37,7 +37,6 @@ // Qserv headers #include "ccontrol/UserQuery.h" #include "ccontrol/UserQueryFactory.h" -#include "czar/CzarConfig.h" #include "czar/SubmitResult.h" #include "global/intTypes.h" #include "global/stringTypes.h" @@ -48,6 +47,10 @@ namespace lsst::qserv { +namespace czar { +class CzarConfig; +} + namespace qdisp { class PseudoFifo; } @@ -61,10 +64,8 @@ namespace czar { /// @addtogroup czar /** - * @ingroup czar - * - * @brief Class representing czar "entry points". - * + * @ingroup czar + * @brief Class representing czar "entry points". */ class Czar { @@ -148,7 +149,7 @@ class Czar { typedef std::map> IdToQuery; std::string const _czarName; ///< Unique czar name - CzarConfig const _czarConfig; + std::shared_ptr const _czarConfig; std::atomic _idCounter; ///< Query/task identifier for next query std::unique_ptr _uqFactory; diff --git a/src/czar/CzarConfig.cc b/src/czar/CzarConfig.cc index a42847349c..c4d68072d4 100644 --- a/src/czar/CzarConfig.cc +++ b/src/czar/CzarConfig.cc @@ -58,6 +58,26 @@ bool dummy = XrdSsiLogger::SetMCB(QservLogger, XrdSsiLogger::mcbClient); namespace lsst::qserv::czar { +std::mutex CzarConfig::_mtxOnInstance; + +std::shared_ptr CzarConfig::_instance; + +std::shared_ptr CzarConfig::create(std::string const& configFileName) { + std::lock_guard const lock(_mtxOnInstance); + if (_instance == nullptr) { + _instance = std::shared_ptr(new CzarConfig(util::ConfigStore(configFileName))); + } + return _instance; +} + +std::shared_ptr CzarConfig::instance() { + std::lock_guard const lock(_mtxOnInstance); + if (_instance == nullptr) { + throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created."); + } + return _instance; +} + CzarConfig::CzarConfig(util::ConfigStore const& configStore) : _mySqlResultConfig(configStore.get("resultdb.user", "qsmaster"), configStore.getRequired("resultdb.passwd"), diff --git a/src/czar/CzarConfig.h b/src/czar/CzarConfig.h index 64bc8f035a..31bf39a3fb 100644 --- a/src/czar/CzarConfig.h +++ b/src/czar/CzarConfig.h @@ -25,6 +25,11 @@ #define LSST_QSERV_CZAR_CZARCONFIG_H // System headers +#include +#include +#include +#include +#include // Qserv headers #include "mysql/MySqlConfig.h" @@ -44,15 +49,33 @@ namespace lsst::qserv::czar { */ class CzarConfig { public: - CzarConfig(std::string configFileName) : CzarConfig(util::ConfigStore(configFileName)) {} + /** + * Create an instance of CzarConfig and load parameters from the specifid file. + * @note One has to call this method at least once before trying to obtain + * a pointer of the instance by calling 'instance()'. The method 'create()' + * can be called many times. A new instance would be created each time and + * stored within the class. + * @param configFileName - path to worker INI configuration file + * @return the shared pointer to the configuration object + */ + static std::shared_ptr create(std::string const& configFileName); + + /** + * Get a pointer to an instance that was created by the last call to + * the method 'create'. + * @return the shared pointer to the configuration object + * @throws std::logic_error when attempting to call the bethod before creating an instance. + */ + static std::shared_ptr instance(); + CzarConfig() = delete; CzarConfig(CzarConfig const&) = delete; CzarConfig& operator=(CzarConfig const&) = delete; /** Overload output operator for current class * * @param out - * @param workerConfig + * @param czarConfig * @return an output stream */ friend std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig); @@ -174,6 +197,12 @@ class CzarConfig { private: CzarConfig(util::ConfigStore const& ConfigStore); + /// This mutex is needed for managing a state of the static member _instance. + static std::mutex _mtxOnInstance; + + /// The configuratoon object created by the last call to the method 'create()'. + static std::shared_ptr _instance; + // Parameters below used in czar::Czar mysql::MySqlConfig const _mySqlResultConfig; diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 1375008696..487355722c 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -55,6 +55,7 @@ // Qserv headers #include "czar/Czar.h" +#include "czar/CzarConfig.h" #include "global/intTypes.h" #include "proto/WorkerResponse.h" #include "proto/ProtoImporter.h" @@ -114,11 +115,11 @@ InfileMerger::InfileMerger(InfileMergerConfig const& c, std::shared_ptrgetMaxSqlConnectionAttempts()), + _maxResultTableSizeBytes(czar::CzarConfig::instance()->getMaxTableSizeMB() * MB_SIZE_BYTES), _semaMgrConn(semaMgrConn) { _fixupTargetName(); - _setEngineFromStr(_config.czarConfig.getResultEngine()); + _setEngineFromStr(czar::CzarConfig::instance()->getResultEngine()); if (_dbEngine == MYISAM) { LOGS(_log, LOG_LVL_INFO, "Engine is MYISAM, serial"); if (!_setupConnectionMyIsam()) { diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index 6d093a3cb9..77ab5b7079 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -46,9 +46,6 @@ // Forward declarations namespace lsst::qserv { -namespace czar { -class CzarConfig; -} namespace mysql { class MysqlConfig; } @@ -88,11 +85,9 @@ typedef util::Error InfileMergerError; class InfileMergerConfig { public: InfileMergerConfig() = delete; - InfileMergerConfig(czar::CzarConfig const& czarConfig_, mysql::MySqlConfig const& mySqlConfig_) - : czarConfig(czarConfig_), mySqlConfig(mySqlConfig_) {} + InfileMergerConfig(mysql::MySqlConfig const& mySqlConfig_) : mySqlConfig(mySqlConfig_) {} // for final result, and imported result - czar::CzarConfig const& czarConfig; mysql::MySqlConfig const mySqlConfig; std::string targetTable; std::shared_ptr mergeStmt; From 38bac576de78290e474dc999aa293614f266ca37 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 28 Jun 2023 02:03:08 +0000 Subject: [PATCH 2/6] Extended protobuf definition to pass query result limit to workers --- src/proto/worker.proto | 1 + src/qproc/TaskMsgFactory.cc | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/proto/worker.proto b/src/proto/worker.proto index d61459cb6c..74be7d4126 100644 --- a/src/proto/worker.proto +++ b/src/proto/worker.proto @@ -74,6 +74,7 @@ message TaskMsg { optional bool scaninteractive = 12; optional int32 attemptcount = 13; optional uint32 czarid = 14; + optional int32 maxtablesize_mb = 15 [default = 0]; } // Result message received from worker diff --git a/src/qproc/TaskMsgFactory.cc b/src/qproc/TaskMsgFactory.cc index b6a4ade2d6..90a091d355 100644 --- a/src/qproc/TaskMsgFactory.cc +++ b/src/qproc/TaskMsgFactory.cc @@ -41,6 +41,7 @@ #include "lsst/log/Log.h" // Qserv headers +#include "czar/CzarConfig.h" #include "global/intTypes.h" #include "qmeta/types.h" #include "qproc/ChunkQuerySpec.h" @@ -84,6 +85,7 @@ std::shared_ptr TaskMsgFactory::_makeMsg(ChunkQuerySpec const& c taskMsg->set_scanpriority(chunkQuerySpec.scanInfo.scanRating); taskMsg->set_scaninteractive(chunkQuerySpec.scanInteractive); + taskMsg->set_maxtablesize_mb(czar::CzarConfig::instance()->getMaxTableSizeMB()); // per-chunk taskMsg->set_chunkid(chunkQuerySpec.chunkId); From 53b951a76e2a4b47338815eb4735359b929a9f58 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 29 Jun 2023 04:09:55 +0000 Subject: [PATCH 3/6] Enforce large result limit at workers when fetching result sets Note that the enforcement only applies to the file-based result delivery protocol as this mechanism makes no sense when results are streamed to Czar over the SSI protocol. The large result conditions are reported to Czar with the code util::ErrorCode::WORKER_RESULT_TOO_LARGE which would help Czar to identify this specific condition and properly report the one to a user. --- src/util/Error.h | 4 +++- src/util/MultiError.cc | 2 ++ src/util/MultiError.h | 9 +++++++++ src/wbase/FileChannelShared.cc | 15 ++++++++++++++- src/wbase/Task.cc | 5 +++++ src/wbase/Task.h | 2 ++ src/wbase/TransmitData.cc | 1 + 7 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/util/Error.h b/src/util/Error.h index d4bf22e7a3..c95ec76b06 100644 --- a/src/util/Error.h +++ b/src/util/Error.h @@ -60,7 +60,9 @@ struct ErrorCode { CREATE_TABLE, MYSQLCONNECT, MYSQLEXEC, - INTERNAL + INTERNAL, + // Worker errors: + WORKER_RESULT_TOO_LARGE }; }; diff --git a/src/util/MultiError.cc b/src/util/MultiError.cc index da0723c418..bf1ac13616 100644 --- a/src/util/MultiError.cc +++ b/src/util/MultiError.cc @@ -52,6 +52,8 @@ std::string MultiError::toOneLineString() const { return oss.str(); } +int MultiError::firstErrorCode() const { return empty() ? ErrorCode::NONE : _errorVector.front().getCode(); } + bool MultiError::empty() const { return _errorVector.empty(); } std::vector::size_type MultiError::size() const { return _errorVector.size(); } diff --git a/src/util/MultiError.h b/src/util/MultiError.h index 678b7171a0..ea0d620488 100644 --- a/src/util/MultiError.h +++ b/src/util/MultiError.h @@ -69,6 +69,15 @@ class MultiError : public std::exception { */ std::string toOneLineString() const; + /** Return the first error code (if any) + * + * The idea is to return the first code that might trigger the "chain" reaction. + * An interpretation of the code depns on a context. + * + * @return the code or ErrorCode::NONE if the collection of errors is empty + */ + int firstErrorCode() const; + virtual ~MultiError() throw() {} /** Overload output operator for this class diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 6b3e23ee67..2254c705d7 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -216,7 +216,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrgetMaxTableSize(); + maxTableSize > 0 && bytesTransmitted > maxTableSize) { + string const err = "The result set size " + to_string(bytesTransmitted) + + " of a job exceeds the requested limit of " + to_string(maxTableSize) + + " bytes, task: " + task->getIdStr(); + multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err)); + LOGS(_log, LOG_LVL_ERROR, err); + erred = true; + break; + } + // If no more rows are left in the task's result set then we need to check // if this is last task in a logical group of ones created for processing // the current request (note that certain classes of requests may require diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 1871c67b2d..1bca050aa2 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -89,6 +89,9 @@ string buildResultFilePath(shared_ptr const& taskMs to_string(taskMsg->chunkid()) + "-" + to_string(taskMsg->attemptcount()) + ".proto"; return path.string(); } + +size_t const MB_SIZE_BYTES = 1024 * 1024; + } // namespace namespace lsst::qserv::wbase { @@ -184,6 +187,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptrscanpriority(); _scanInfo.sortTablesSlowestFirst(); _scanInteractive = t->scaninteractive(); + _maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES; // Create sets and vectors for 'aquiring' subchunk temporary tables. proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum)); @@ -473,6 +477,7 @@ nlohmann::json Task::getJson() const { js["attemptId"] = _attemptCount; js["sequenceId"] = _tSeq; js["scanInteractive"] = _scanInteractive; + js["maxTableSize"] = _maxTableSize; js["cancelled"] = to_string(_cancelled); js["state"] = static_cast(_state.load()); js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index eba22bcdd2..f68d6622a9 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -220,6 +220,7 @@ class Task : public util::CommandForThreadPool { int getJobId() const { return _jId; } int getAttemptCount() const { return _attemptCount; } bool getScanInteractive() { return _scanInteractive; } + int64_t getMaxTableSize() const { return _maxTableSize; } proto::ScanInfo& getScanInfo() { return _scanInfo; } void setOnInteractive(bool val) { _onInteractive = val; } bool getOnInteractive() { return _onInteractive; } @@ -322,6 +323,7 @@ class Task : public util::CommandForThreadPool { bool _scanInteractive; ///< True if the czar thinks this query should be interactive. bool _onInteractive{ false}; ///< True if the scheduler put this task on the interactive (group) scheduler. + int64_t _maxTableSize = 0; std::atomic _memHandle{memman::MemMan::HandleType::INVALID}; memman::MemMan::Ptr _memMan; diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 4cbe64f922..9ae3923f9e 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -173,6 +173,7 @@ void TransmitData::_buildDataMsg(lock_guard const& lock, Task const& task string msg = "Error(s) in result for chunk #" + to_string(task.getChunkId()) + ": " + multiErr.toOneLineString(); _result->set_errormsg(msg); + _result->set_errorcode(multiErr.firstErrorCode()); LOGS(_log, LOG_LVL_ERROR, _idStr << "buildDataMsg adding " << msg); } _result->SerializeToString(&_dataMsg); From 77b1680c50f4a13ac560a151dd9f78bba71b0b51 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Mon, 3 Jul 2023 21:29:33 +0000 Subject: [PATCH 4/6] Reinforced response processing in the result merger The new code verifies if the summary messages reported for the file-based results don't have any error contexts. Abort result merging should any such contexts be found. --- src/ccontrol/MergingHandler.cc | 46 ++++++++++++++++++++++------------ src/ccontrol/MergingHandler.h | 1 + 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index e9935b7f45..45e76291bf 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -419,23 +419,27 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next }; bool success = false; if (!_response->result.fileresource_xroot().empty()) { - success = ::readXrootFileResourceAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readXrootFileResourceAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else if (!_response->result.fileresource_http().empty()) { - success = ::readHttpFileAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readHttpFileAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else { success = mergeCurrentResult(); } @@ -461,6 +465,16 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next return false; } +bool MergingHandler::_noErrorsInResult() { + if (_response->result.has_errorcode() || _response->result.has_errormsg()) { + _setError(_response->result.errorcode(), _response->result.errormsg()); + LOGS(_log, LOG_LVL_ERROR, + "Error from worker:" << _response->protoHeader.wname() << " in response data: " << _error); + return false; + } + return true; +} + void MergingHandler::errorFlush(std::string const& msg, int code) { _setError(code, msg); // Might want more info from result service. diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index bddbdbd735..7ce5f39c2f 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -100,6 +100,7 @@ class MergingHandler : public qdisp::ResponseHandler { void _setError(int code, std::string const& msg); ///< Set error code and string bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer. bool _verifyResult(BufPtr const& bufPtr, int blen); ///< Check the result against hash in the header. + bool _noErrorsInResult(); ///< Check if the result message has no errors, report the ones (if any). std::shared_ptr _msgReceiver; ///< Message code receiver std::shared_ptr _infileMerger; ///< Merging delegate From 951c35d7016a7983779d6dc5f647284d2ee282eb Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Wed, 19 Jul 2023 01:35:42 +0000 Subject: [PATCH 5/6] Improved error reporting on MySQL query failures --- src/rproc/InfileMerger.cc | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 487355722c..f928ec3f61 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -102,6 +102,11 @@ std::string getTimeStampId() { const char JOB_ID_BASE_NAME[] = "jobId"; size_t const MB_SIZE_BYTES = 1024 * 1024; + +/// @return Error info on the last operation with MySQL +string lastMysqlError(MYSQL* mysql) { + return "error: " + string(mysql_error(mysql)) + ", errno: " + to_string(mysql_errno(mysql)); +} } // anonymous namespace namespace lsst::qserv::rproc { @@ -349,27 +354,34 @@ bool InfileMerger::_applyMysqlMyIsam(std::string const& query) { sleep(1); lock.lock(); } else { - LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysql _setupConnection() failed!!!"); + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlMyIsam _setupConnectionMyIsam() failed!!!"); return false; // Reconnection failed. This is an error. } } } int rc = mysql_real_query(_mysqlConn.getMySql(), query.data(), query.size()); - return rc == 0; + if (rc == 0) return true; + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlMyIsam mysql_real_query() " + ::lastMysqlError(_mysqlConn.getMySql())); + return false; } bool InfileMerger::_applyMysqlInnoDb(std::string const& query) { mysql::MySqlConnection mySConn(_config.mySqlConfig); if (!mySConn.connected()) { if (!_setupConnectionInnoDb(mySConn)) { - LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysql _setupConnection() failed!!!"); + LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysqlInnoDb _setupConnectionInnoDb() failed!!!"); return false; // Reconnection failed. This is an error. } } int rc = mysql_real_query(mySConn.getMySql(), query.data(), query.size()); - return rc == 0; + if (rc == 0) return true; + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlInnoDb mysql_real_query() " + ::lastMysqlError(mySConn.getMySql())); + return false; } bool InfileMerger::_setupConnectionInnoDb(mysql::MySqlConnection& mySConn) { From 044b146c555409059e2d2e9fe02ac7ed201c13b4 Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 21 Jul 2023 02:01:41 +0000 Subject: [PATCH 6/6] Fixed a candidate bug in the MySQL query cancellation Also, added a logging statement on failures to do so. --- src/mysql/MySqlConnection.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/mysql/MySqlConnection.cc b/src/mysql/MySqlConnection.cc index 48a88858af..f9532242ac 100644 --- a/src/mysql/MySqlConnection.cc +++ b/src/mysql/MySqlConnection.cc @@ -155,7 +155,7 @@ bool MySqlConnection::queryUnbuffered(std::string const& query) { int MySqlConnection::cancel() { std::lock_guard lock(_interruptMutex); int rc; - if (!_isExecuting || _interrupted) { + if (_interrupted) { // Should we log this? return -1; // No further action needed. } @@ -172,6 +172,9 @@ int MySqlConnection::cancel() { rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size()); mysql_close(killMysql); if (rc) { + LOGS(_log, LOG_LVL_WARN, + "failed to kill MySQL thread: " << threadId << ", error: " << std::string(mysql_error(killMysql)) + << ", errno: " << std::to_string(mysql_errno(killMysql))); return 2; } return 0;