diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index e9935b7f45..45e76291bf 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -419,23 +419,27 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next }; bool success = false; if (!_response->result.fileresource_xroot().empty()) { - success = ::readXrootFileResourceAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readXrootFileResourceAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else if (!_response->result.fileresource_http().empty()) { - success = ::readHttpFileAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readHttpFileAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else { success = mergeCurrentResult(); } @@ -461,6 +465,16 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next return false; } +bool MergingHandler::_noErrorsInResult() { + if (_response->result.has_errorcode() || _response->result.has_errormsg()) { + _setError(_response->result.errorcode(), _response->result.errormsg()); + LOGS(_log, LOG_LVL_ERROR, + "Error from worker:" << _response->protoHeader.wname() << " in response data: " << _error); + return false; + } + return true; +} + void MergingHandler::errorFlush(std::string const& msg, int code) { _setError(code, msg); // Might want more info from result service. diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index bddbdbd735..7ce5f39c2f 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -100,6 +100,7 @@ class MergingHandler : public qdisp::ResponseHandler { void _setError(int code, std::string const& msg); ///< Set error code and string bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer. bool _verifyResult(BufPtr const& bufPtr, int blen); ///< Check the result against hash in the header. + bool _noErrorsInResult(); ///< Check if the result message has no errors, report the ones (if any). std::shared_ptr _msgReceiver; ///< Message code receiver std::shared_ptr _infileMerger; ///< Merging delegate diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 351c3fe948..870f28a928 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -151,7 +151,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt, auto const& fromTable = tableRefPtr->getTable(); rowsTable = fromDb + "__" + fromTable + "__rows"; // TODO consider using QMetaSelect instead of making a new connection. - auto cnx = sql::SqlConnectionFactory::make(sharedResources->czarConfig.getMySqlQmetaConfig()); + auto cnx = sql::SqlConnectionFactory::make(czar::CzarConfig::instance()->getMySqlQmetaConfig()); sql::SqlErrorObject err; auto tableExists = cnx->tableExists(rowsTable, err); LOGS(_log, LOG_LVL_DEBUG, @@ -161,28 +161,27 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt, } std::shared_ptr makeUserQuerySharedResources( - czar::CzarConfig const& czarConfig, std::shared_ptr const& dbModels, - std::string const& czarName) { + std::shared_ptr const& dbModels, std::string const& czarName) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); return std::make_shared( - czarConfig, - css::CssAccess::createFromConfig(czarConfig.getCssConfigMap(), czarConfig.getEmptyChunkPath()), - czarConfig.getMySqlResultConfig(), - std::make_shared(czarConfig.getMySqlQmetaConfig()), - std::make_shared(czarConfig.getMySqlQmetaConfig(), - czarConfig.getMaxMsgSourceStore()), - std::make_shared(czarConfig.getMySqlQStatusDataConfig()), - std::make_shared(czarConfig.getMySqlQmetaConfig()), - sql::SqlConnectionFactory::make(czarConfig.getMySqlResultConfig()), dbModels, czarName, - czarConfig.getInteractiveChunkLimit()); + css::CssAccess::createFromConfig(czarConfig->getCssConfigMap(), czarConfig->getEmptyChunkPath()), + czarConfig->getMySqlResultConfig(), + std::make_shared(czarConfig->getMySqlQmetaConfig()), + std::make_shared(czarConfig->getMySqlQmetaConfig(), + czarConfig->getMaxMsgSourceStore()), + std::make_shared(czarConfig->getMySqlQStatusDataConfig()), + std::make_shared(czarConfig->getMySqlQmetaConfig()), + sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig()), dbModels, czarName, + czarConfig->getInteractiveChunkLimit()); } //////////////////////////////////////////////////////////////////////// -UserQueryFactory::UserQueryFactory(czar::CzarConfig const& czarConfig, - qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName) - : _userQuerySharedResources(makeUserQuerySharedResources(czarConfig, dbModels, czarName)), +UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName) + : _userQuerySharedResources(makeUserQuerySharedResources(dbModels, czarName)), _useQservRowCounterOptimization(true) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); _executiveConfig = std::make_shared( - czarConfig.getXrootdFrontendUrl(), czarConfig.getQMetaSecondsBetweenChunkUpdates()); + czarConfig->getXrootdFrontendUrl(), czarConfig->getQMetaSecondsBetweenChunkUpdates()); // When czar crashes/exits while some queries are still in flight they // are left in EXECUTING state in QMeta. We want to cleanup that state @@ -290,8 +289,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st if (sessionValid) { executive = qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources, _userQuerySharedResources->queryStatsData, qs); - infileMergerConfig = std::make_shared( - _userQuerySharedResources->czarConfig, _userQuerySharedResources->mysqlResultConfig); + infileMergerConfig = + std::make_shared(_userQuerySharedResources->mysqlResultConfig); infileMergerConfig->debugNoMerge = _debugNoMerge; } diff --git a/src/ccontrol/UserQueryFactory.h b/src/ccontrol/UserQueryFactory.h index 1dcc896ec7..b813544fa4 100644 --- a/src/ccontrol/UserQueryFactory.h +++ b/src/ccontrol/UserQueryFactory.h @@ -47,10 +47,6 @@ class UserQuery; class UserQuerySharedResources; } // namespace lsst::qserv::ccontrol -namespace lsst::qserv::czar { -class CzarConfig; -} - namespace lsst::qserv::qdisp { class ExecutiveConfig; } @@ -71,8 +67,7 @@ namespace lsst::qserv::ccontrol { /// constant between successive user queries. class UserQueryFactory : private boost::noncopyable { public: - UserQueryFactory(czar::CzarConfig const& czarConfig, - std::shared_ptr const& dbModels, std::string const& czarName); + UserQueryFactory(std::shared_ptr const& dbModels, std::string const& czarName); /// @param query: Query text /// @param defaultDb: Default database name, may be empty diff --git a/src/ccontrol/UserQueryResources.cc b/src/ccontrol/UserQueryResources.cc index 3baf12d9a6..8bef7c4030 100644 --- a/src/ccontrol/UserQueryResources.cc +++ b/src/ccontrol/UserQueryResources.cc @@ -33,8 +33,7 @@ namespace lsst::qserv::ccontrol { UserQuerySharedResources::UserQuerySharedResources( - czar::CzarConfig const& czarConfig_, std::shared_ptr const& css_, - mysql::MySqlConfig const& mysqlResultConfig_, + std::shared_ptr const& css_, mysql::MySqlConfig const& mysqlResultConfig_, std::shared_ptr const& secondaryIndex_, std::shared_ptr const& queryMetadata_, std::shared_ptr const& queryStatsData_, @@ -42,8 +41,7 @@ UserQuerySharedResources::UserQuerySharedResources( std::shared_ptr const& resultDbConn_, std::shared_ptr const& dbModels_, std::string const& czarName, int interactiveChunkLimit_) - : czarConfig(czarConfig_), - css(css_), + : css(css_), mysqlResultConfig(mysqlResultConfig_), secondaryIndex(secondaryIndex_), queryMetadata(queryMetadata_), @@ -52,7 +50,7 @@ UserQuerySharedResources::UserQuerySharedResources( resultDbConn(resultDbConn_), databaseModels(dbModels_), interactiveChunkLimit(interactiveChunkLimit_), - semaMgrConnections(new util::SemaMgr(czarConfig.getResultMaxConnections())) { + semaMgrConnections(new util::SemaMgr(czar::CzarConfig::instance()->getResultMaxConnections())) { // register czar in QMeta // TODO: check that czar with the same name is not active already? qMetaCzarId = queryMetadata->registerCzar(czarName); diff --git a/src/ccontrol/UserQueryResources.h b/src/ccontrol/UserQueryResources.h index 97324fb4e7..98d57d33f9 100644 --- a/src/ccontrol/UserQueryResources.h +++ b/src/ccontrol/UserQueryResources.h @@ -38,10 +38,6 @@ namespace lsst::qserv::css { class CssAccess; } -namespace lsst::qserv::czar { -class CzarConfig; -} - namespace lsst::qserv::mysql { class MySqlConfig; } @@ -72,7 +68,7 @@ namespace lsst::qserv::ccontrol { */ class UserQuerySharedResources { public: - UserQuerySharedResources(czar::CzarConfig const& czarConfig_, std::shared_ptr const& css_, + UserQuerySharedResources(std::shared_ptr const& css_, mysql::MySqlConfig const& mysqlResultConfig_, std::shared_ptr const& secondaryIndex_, std::shared_ptr const& queryMetadata_, @@ -84,7 +80,6 @@ class UserQuerySharedResources { UserQuerySharedResources(UserQuerySharedResources const& rhs) = default; UserQuerySharedResources& operator=(UserQuerySharedResources const& rhs) = delete; - czar::CzarConfig const& czarConfig; std::shared_ptr css; mysql::MySqlConfig const mysqlResultConfig; std::shared_ptr secondaryIndex; diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 242d30e16f..79d08994cc 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -355,9 +355,6 @@ QueryState UserQuerySelect::join() { } _executive->updateProxyMessages(); - // Capture these parameters before discarding the merger which would also reset the config. - bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish(); - std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl(); try { _discardMerger(); } catch (std::exception const& exc) { @@ -391,9 +388,11 @@ QueryState UserQuerySelect::join() { operation = proto::QueryManagement::CANCEL; state = ERROR; } - if (notifyWorkersOnQueryFinish) { + std::shared_ptr const czarConfig = czar::CzarConfig::instance(); + if (czarConfig->notifyWorkersOnQueryFinish()) { try { - xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId); + xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation, + _qMetaQueryId); } catch (std::exception const& ex) { LOGS(_log, LOG_LVL_WARN, ex.what()); } diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 4e85dc8183..084db248b1 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -24,15 +24,14 @@ #include "czar/Czar.h" // System headers -#include #include +#include #include // Third-party headers #include "boost/format.hpp" #include "boost/lexical_cast.hpp" -#include "../qdisp/CzarStats.h" // LSST headers #include "lsst/log/Log.h" @@ -40,10 +39,12 @@ #include "ccontrol/ConfigMap.h" #include "ccontrol/UserQuerySelect.h" #include "ccontrol/UserQueryType.h" +#include "czar/CzarConfig.h" #include "czar/CzarErrors.h" #include "czar/MessageTable.h" #include "global/LogContext.h" #include "proto/worker.pb.h" +#include "qdisp/CzarStats.h" #include "qdisp/PseudoFifo.h" #include "qdisp/QdispPool.h" #include "qdisp/SharedResources.h" @@ -88,7 +89,7 @@ Czar::Ptr Czar::createCzar(string const& configPath, string const& czarName) { // Constructors Czar::Czar(string const& configPath, string const& czarName) : _czarName(czarName), - _czarConfig(configPath), + _czarConfig(CzarConfig::create(configPath)), _idCounter(), _uqFactory(), _clientToQuery(), @@ -104,9 +105,9 @@ Czar::Czar(string const& configPath, string const& czarName) // The id will be used as the high-watermark for queries that need to be cancelled. // All queries that have identifiers that are strictly less than this one will // be affected by the operation. - if (_czarConfig.notifyWorkersOnCzarRestart()) { + if (_czarConfig->notifyWorkersOnCzarRestart()) { try { - xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(), + xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig->getXrootdFrontendUrl(), proto::QueryManagement::CANCEL_AFTER_RESTART, _lastQueryIdBeforeRestart()); } catch (std::exception const& ex) { @@ -114,17 +115,17 @@ Czar::Czar(string const& configPath, string const& czarName) } } - auto databaseModels = - qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig()); + auto databaseModels = qproc::DatabaseModels::create(_czarConfig->getCssConfigMap(), + _czarConfig->getMySqlResultConfig()); // Need to be done first as it adds logging context for new threads - _uqFactory.reset(new ccontrol::UserQueryFactory(_czarConfig, databaseModels, _czarName)); + _uqFactory.reset(new ccontrol::UserQueryFactory(databaseModels, _czarName)); - int qPoolSize = _czarConfig.getQdispPoolSize(); - int maxPriority = std::max(0, _czarConfig.getQdispMaxPriority()); - string vectRunSizesStr = _czarConfig.getQdispVectRunSizes(); + int qPoolSize = _czarConfig->getQdispPoolSize(); + int maxPriority = std::max(0, _czarConfig->getQdispMaxPriority()); + string vectRunSizesStr = _czarConfig->getQdispVectRunSizes(); vector vectRunSizes = util::StringHelper::getIntVectFromStr(vectRunSizesStr, ":", 1); - string vectMinRunningSizesStr = _czarConfig.getQdispVectMinRunningSizes(); + string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes(); vector vectMinRunningSizes = util::StringHelper::getIntVectFromStr(vectMinRunningSizesStr, ":", 0); LOGS(_log, LOG_LVL_INFO, "INFO qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes=" @@ -135,22 +136,22 @@ Czar::Czar(string const& configPath, string const& czarName) make_shared(qPoolSize, maxPriority, vectRunSizes, vectMinRunningSizes); qdisp::CzarStats::setup(qdispPool); - int qReqPseudoMaxRunning = _czarConfig.getQReqPseudoFifoMaxRunning(); + int qReqPseudoMaxRunning = _czarConfig->getQReqPseudoFifoMaxRunning(); qdisp::PseudoFifo::Ptr queryRequestPseudoFifo = make_shared(qReqPseudoMaxRunning); _qdispSharedResources = qdisp::SharedResources::create(qdispPool, queryRequestPseudoFifo); - int xrootdCBThreadsMax = _czarConfig.getXrootdCBThreadsMax(); - int xrootdCBThreadsInit = _czarConfig.getXrootdCBThreadsInit(); + int xrootdCBThreadsMax = _czarConfig->getXrootdCBThreadsMax(); + int xrootdCBThreadsInit = _czarConfig->getXrootdCBThreadsInit(); LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsMax=" << xrootdCBThreadsMax); LOGS(_log, LOG_LVL_INFO, "config xrootdCBThreadsInit=" << xrootdCBThreadsInit); XrdSsiProviderClient->SetCBThreads(xrootdCBThreadsMax, xrootdCBThreadsInit); - int const xrootdSpread = _czarConfig.getXrootdSpread(); + int const xrootdSpread = _czarConfig->getXrootdSpread(); LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread); XrdSsiProviderClient->SetSpread(xrootdSpread); - _queryDistributionTestVer = _czarConfig.getQueryDistributionTestVer(); + _queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer(); LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName); - LOGS(_log, LOG_LVL_INFO, "Czar config: " << _czarConfig); + LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig); // Watch to see if the log configuration is changed. // If LSST_LOG_CONFIG is not defined, there's no good way to know what log @@ -187,7 +188,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h // make message table name string userQueryId = to_string(_idCounter++); LOGS(_log, LOG_LVL_DEBUG, "userQueryId: " << userQueryId); - string resultDb = _czarConfig.getMySqlResultConfig().dbName; + string resultDb = _czarConfig->getMySqlResultConfig().dbName; string const msgTableName = "message_" + userQueryId; string const lockName = resultDb + "." + msgTableName; @@ -197,7 +198,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h SubmitResult result; // instantiate message table manager - MessageTable msgTable(lockName, _czarConfig.getMySqlResultConfig()); + MessageTable msgTable(lockName, _czarConfig->getMySqlResultConfig()); try { msgTable.lock(); } catch (std::exception const& exc) { @@ -261,7 +262,7 @@ SubmitResult Czar::submitQuery(string const& query, map const& h // we do not need to lock message because result is ready before we return string const resultTableName = resultDb + ".result_async_" + userQueryId; string const asyncLockName = resultDb + ".message_async_" + userQueryId; - MessageTable msgTable(asyncLockName, _czarConfig.getMySqlResultConfig()); + MessageTable msgTable(asyncLockName, _czarConfig->getMySqlResultConfig()); try { _makeAsyncResult(resultTableName, uq->getQueryId(), uq->getResultLocation()); msgTable.create(); @@ -397,7 +398,7 @@ void Czar::_updateQueryHistory(string const& clientId, int threadId, ccontrol::U } void Czar::_makeAsyncResult(string const& asyncResultTable, QueryId queryId, string const& resultLoc) { - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig()); LOGS(_log, LOG_LVL_DEBUG, "creating async result table " << asyncResultTable); sql::SqlErrorObject sqlErr; @@ -431,9 +432,9 @@ void Czar::removeOldResultTables() { // Run in a separate thread in the off chance this takes a while. thread t([this]() { LOGS(_log, LOG_LVL_INFO, "Removing old result database tables."); - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlResultConfig()); - string dbName = _czarConfig.getMySqlResultConfig().dbName; - string dStr = to_string(_czarConfig.getOldestResultKeptDays()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlResultConfig()); + string dbName = _czarConfig->getMySqlResultConfig().dbName; + string dStr = to_string(_czarConfig->getOldestResultKeptDays()); // Find result related tables that haven't been updated in a long time. string sql = @@ -481,7 +482,7 @@ void Czar::removeOldResultTables() { QueryId Czar::_lastQueryIdBeforeRestart() const { string const context = "Czar::" + string(__func__) + " "; - auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig()); + auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig->getMySqlQmetaConfig()); string const sql = "SELECT MAX(queryId) FROM QInfo"; sql::SqlResults results; sql::SqlErrorObject err; diff --git a/src/czar/Czar.h b/src/czar/Czar.h index f0acfcf734..54a2b71e5e 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -37,7 +37,6 @@ // Qserv headers #include "ccontrol/UserQuery.h" #include "ccontrol/UserQueryFactory.h" -#include "czar/CzarConfig.h" #include "czar/SubmitResult.h" #include "global/intTypes.h" #include "global/stringTypes.h" @@ -48,6 +47,10 @@ namespace lsst::qserv { +namespace czar { +class CzarConfig; +} + namespace qdisp { class PseudoFifo; } @@ -61,10 +64,8 @@ namespace czar { /// @addtogroup czar /** - * @ingroup czar - * - * @brief Class representing czar "entry points". - * + * @ingroup czar + * @brief Class representing czar "entry points". */ class Czar { @@ -148,7 +149,7 @@ class Czar { typedef std::map> IdToQuery; std::string const _czarName; ///< Unique czar name - CzarConfig const _czarConfig; + std::shared_ptr const _czarConfig; std::atomic _idCounter; ///< Query/task identifier for next query std::unique_ptr _uqFactory; diff --git a/src/czar/CzarConfig.cc b/src/czar/CzarConfig.cc index a42847349c..c4d68072d4 100644 --- a/src/czar/CzarConfig.cc +++ b/src/czar/CzarConfig.cc @@ -58,6 +58,26 @@ bool dummy = XrdSsiLogger::SetMCB(QservLogger, XrdSsiLogger::mcbClient); namespace lsst::qserv::czar { +std::mutex CzarConfig::_mtxOnInstance; + +std::shared_ptr CzarConfig::_instance; + +std::shared_ptr CzarConfig::create(std::string const& configFileName) { + std::lock_guard const lock(_mtxOnInstance); + if (_instance == nullptr) { + _instance = std::shared_ptr(new CzarConfig(util::ConfigStore(configFileName))); + } + return _instance; +} + +std::shared_ptr CzarConfig::instance() { + std::lock_guard const lock(_mtxOnInstance); + if (_instance == nullptr) { + throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created."); + } + return _instance; +} + CzarConfig::CzarConfig(util::ConfigStore const& configStore) : _mySqlResultConfig(configStore.get("resultdb.user", "qsmaster"), configStore.getRequired("resultdb.passwd"), diff --git a/src/czar/CzarConfig.h b/src/czar/CzarConfig.h index 64bc8f035a..31bf39a3fb 100644 --- a/src/czar/CzarConfig.h +++ b/src/czar/CzarConfig.h @@ -25,6 +25,11 @@ #define LSST_QSERV_CZAR_CZARCONFIG_H // System headers +#include +#include +#include +#include +#include // Qserv headers #include "mysql/MySqlConfig.h" @@ -44,15 +49,33 @@ namespace lsst::qserv::czar { */ class CzarConfig { public: - CzarConfig(std::string configFileName) : CzarConfig(util::ConfigStore(configFileName)) {} + /** + * Create an instance of CzarConfig and load parameters from the specifid file. + * @note One has to call this method at least once before trying to obtain + * a pointer of the instance by calling 'instance()'. The method 'create()' + * can be called many times. A new instance would be created each time and + * stored within the class. + * @param configFileName - path to worker INI configuration file + * @return the shared pointer to the configuration object + */ + static std::shared_ptr create(std::string const& configFileName); + + /** + * Get a pointer to an instance that was created by the last call to + * the method 'create'. + * @return the shared pointer to the configuration object + * @throws std::logic_error when attempting to call the bethod before creating an instance. + */ + static std::shared_ptr instance(); + CzarConfig() = delete; CzarConfig(CzarConfig const&) = delete; CzarConfig& operator=(CzarConfig const&) = delete; /** Overload output operator for current class * * @param out - * @param workerConfig + * @param czarConfig * @return an output stream */ friend std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig); @@ -174,6 +197,12 @@ class CzarConfig { private: CzarConfig(util::ConfigStore const& ConfigStore); + /// This mutex is needed for managing a state of the static member _instance. + static std::mutex _mtxOnInstance; + + /// The configuratoon object created by the last call to the method 'create()'. + static std::shared_ptr _instance; + // Parameters below used in czar::Czar mysql::MySqlConfig const _mySqlResultConfig; diff --git a/src/mysql/MySqlConnection.cc b/src/mysql/MySqlConnection.cc index 48a88858af..f9532242ac 100644 --- a/src/mysql/MySqlConnection.cc +++ b/src/mysql/MySqlConnection.cc @@ -155,7 +155,7 @@ bool MySqlConnection::queryUnbuffered(std::string const& query) { int MySqlConnection::cancel() { std::lock_guard lock(_interruptMutex); int rc; - if (!_isExecuting || _interrupted) { + if (_interrupted) { // Should we log this? return -1; // No further action needed. } @@ -172,6 +172,9 @@ int MySqlConnection::cancel() { rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size()); mysql_close(killMysql); if (rc) { + LOGS(_log, LOG_LVL_WARN, + "failed to kill MySQL thread: " << threadId << ", error: " << std::string(mysql_error(killMysql)) + << ", errno: " << std::to_string(mysql_errno(killMysql))); return 2; } return 0; diff --git a/src/proto/worker.proto b/src/proto/worker.proto index d61459cb6c..74be7d4126 100644 --- a/src/proto/worker.proto +++ b/src/proto/worker.proto @@ -74,6 +74,7 @@ message TaskMsg { optional bool scaninteractive = 12; optional int32 attemptcount = 13; optional uint32 czarid = 14; + optional int32 maxtablesize_mb = 15 [default = 0]; } // Result message received from worker diff --git a/src/qproc/TaskMsgFactory.cc b/src/qproc/TaskMsgFactory.cc index b6a4ade2d6..90a091d355 100644 --- a/src/qproc/TaskMsgFactory.cc +++ b/src/qproc/TaskMsgFactory.cc @@ -41,6 +41,7 @@ #include "lsst/log/Log.h" // Qserv headers +#include "czar/CzarConfig.h" #include "global/intTypes.h" #include "qmeta/types.h" #include "qproc/ChunkQuerySpec.h" @@ -84,6 +85,7 @@ std::shared_ptr TaskMsgFactory::_makeMsg(ChunkQuerySpec const& c taskMsg->set_scanpriority(chunkQuerySpec.scanInfo.scanRating); taskMsg->set_scaninteractive(chunkQuerySpec.scanInteractive); + taskMsg->set_maxtablesize_mb(czar::CzarConfig::instance()->getMaxTableSizeMB()); // per-chunk taskMsg->set_chunkid(chunkQuerySpec.chunkId); diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index 1375008696..f928ec3f61 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -55,6 +55,7 @@ // Qserv headers #include "czar/Czar.h" +#include "czar/CzarConfig.h" #include "global/intTypes.h" #include "proto/WorkerResponse.h" #include "proto/ProtoImporter.h" @@ -101,6 +102,11 @@ std::string getTimeStampId() { const char JOB_ID_BASE_NAME[] = "jobId"; size_t const MB_SIZE_BYTES = 1024 * 1024; + +/// @return Error info on the last operation with MySQL +string lastMysqlError(MYSQL* mysql) { + return "error: " + string(mysql_error(mysql)) + ", errno: " + to_string(mysql_errno(mysql)); +} } // anonymous namespace namespace lsst::qserv::rproc { @@ -114,11 +120,11 @@ InfileMerger::InfileMerger(InfileMergerConfig const& c, std::shared_ptrgetMaxSqlConnectionAttempts()), + _maxResultTableSizeBytes(czar::CzarConfig::instance()->getMaxTableSizeMB() * MB_SIZE_BYTES), _semaMgrConn(semaMgrConn) { _fixupTargetName(); - _setEngineFromStr(_config.czarConfig.getResultEngine()); + _setEngineFromStr(czar::CzarConfig::instance()->getResultEngine()); if (_dbEngine == MYISAM) { LOGS(_log, LOG_LVL_INFO, "Engine is MYISAM, serial"); if (!_setupConnectionMyIsam()) { @@ -348,27 +354,34 @@ bool InfileMerger::_applyMysqlMyIsam(std::string const& query) { sleep(1); lock.lock(); } else { - LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysql _setupConnection() failed!!!"); + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlMyIsam _setupConnectionMyIsam() failed!!!"); return false; // Reconnection failed. This is an error. } } } int rc = mysql_real_query(_mysqlConn.getMySql(), query.data(), query.size()); - return rc == 0; + if (rc == 0) return true; + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlMyIsam mysql_real_query() " + ::lastMysqlError(_mysqlConn.getMySql())); + return false; } bool InfileMerger::_applyMysqlInnoDb(std::string const& query) { mysql::MySqlConnection mySConn(_config.mySqlConfig); if (!mySConn.connected()) { if (!_setupConnectionInnoDb(mySConn)) { - LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysql _setupConnection() failed!!!"); + LOGS(_log, LOG_LVL_ERROR, "InfileMerger::_applyMysqlInnoDb _setupConnectionInnoDb() failed!!!"); return false; // Reconnection failed. This is an error. } } int rc = mysql_real_query(mySConn.getMySql(), query.data(), query.size()); - return rc == 0; + if (rc == 0) return true; + LOGS(_log, LOG_LVL_ERROR, + "InfileMerger::_applyMysqlInnoDb mysql_real_query() " + ::lastMysqlError(mySConn.getMySql())); + return false; } bool InfileMerger::_setupConnectionInnoDb(mysql::MySqlConnection& mySConn) { diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index 6d093a3cb9..77ab5b7079 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -46,9 +46,6 @@ // Forward declarations namespace lsst::qserv { -namespace czar { -class CzarConfig; -} namespace mysql { class MysqlConfig; } @@ -88,11 +85,9 @@ typedef util::Error InfileMergerError; class InfileMergerConfig { public: InfileMergerConfig() = delete; - InfileMergerConfig(czar::CzarConfig const& czarConfig_, mysql::MySqlConfig const& mySqlConfig_) - : czarConfig(czarConfig_), mySqlConfig(mySqlConfig_) {} + InfileMergerConfig(mysql::MySqlConfig const& mySqlConfig_) : mySqlConfig(mySqlConfig_) {} // for final result, and imported result - czar::CzarConfig const& czarConfig; mysql::MySqlConfig const mySqlConfig; std::string targetTable; std::shared_ptr mergeStmt; diff --git a/src/util/Error.h b/src/util/Error.h index d4bf22e7a3..c95ec76b06 100644 --- a/src/util/Error.h +++ b/src/util/Error.h @@ -60,7 +60,9 @@ struct ErrorCode { CREATE_TABLE, MYSQLCONNECT, MYSQLEXEC, - INTERNAL + INTERNAL, + // Worker errors: + WORKER_RESULT_TOO_LARGE }; }; diff --git a/src/util/MultiError.cc b/src/util/MultiError.cc index da0723c418..bf1ac13616 100644 --- a/src/util/MultiError.cc +++ b/src/util/MultiError.cc @@ -52,6 +52,8 @@ std::string MultiError::toOneLineString() const { return oss.str(); } +int MultiError::firstErrorCode() const { return empty() ? ErrorCode::NONE : _errorVector.front().getCode(); } + bool MultiError::empty() const { return _errorVector.empty(); } std::vector::size_type MultiError::size() const { return _errorVector.size(); } diff --git a/src/util/MultiError.h b/src/util/MultiError.h index 678b7171a0..ea0d620488 100644 --- a/src/util/MultiError.h +++ b/src/util/MultiError.h @@ -69,6 +69,15 @@ class MultiError : public std::exception { */ std::string toOneLineString() const; + /** Return the first error code (if any) + * + * The idea is to return the first code that might trigger the "chain" reaction. + * An interpretation of the code depns on a context. + * + * @return the code or ErrorCode::NONE if the collection of errors is empty + */ + int firstErrorCode() const; + virtual ~MultiError() throw() {} /** Overload output operator for this class diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 6b3e23ee67..2254c705d7 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -216,7 +216,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrgetMaxTableSize(); + maxTableSize > 0 && bytesTransmitted > maxTableSize) { + string const err = "The result set size " + to_string(bytesTransmitted) + + " of a job exceeds the requested limit of " + to_string(maxTableSize) + + " bytes, task: " + task->getIdStr(); + multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err)); + LOGS(_log, LOG_LVL_ERROR, err); + erred = true; + break; + } + // If no more rows are left in the task's result set then we need to check // if this is last task in a logical group of ones created for processing // the current request (note that certain classes of requests may require diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 1871c67b2d..1bca050aa2 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -89,6 +89,9 @@ string buildResultFilePath(shared_ptr const& taskMs to_string(taskMsg->chunkid()) + "-" + to_string(taskMsg->attemptcount()) + ".proto"; return path.string(); } + +size_t const MB_SIZE_BYTES = 1024 * 1024; + } // namespace namespace lsst::qserv::wbase { @@ -184,6 +187,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptrscanpriority(); _scanInfo.sortTablesSlowestFirst(); _scanInteractive = t->scaninteractive(); + _maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES; // Create sets and vectors for 'aquiring' subchunk temporary tables. proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum)); @@ -473,6 +477,7 @@ nlohmann::json Task::getJson() const { js["attemptId"] = _attemptCount; js["sequenceId"] = _tSeq; js["scanInteractive"] = _scanInteractive; + js["maxTableSize"] = _maxTableSize; js["cancelled"] = to_string(_cancelled); js["state"] = static_cast(_state.load()); js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index eba22bcdd2..f68d6622a9 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -220,6 +220,7 @@ class Task : public util::CommandForThreadPool { int getJobId() const { return _jId; } int getAttemptCount() const { return _attemptCount; } bool getScanInteractive() { return _scanInteractive; } + int64_t getMaxTableSize() const { return _maxTableSize; } proto::ScanInfo& getScanInfo() { return _scanInfo; } void setOnInteractive(bool val) { _onInteractive = val; } bool getOnInteractive() { return _onInteractive; } @@ -322,6 +323,7 @@ class Task : public util::CommandForThreadPool { bool _scanInteractive; ///< True if the czar thinks this query should be interactive. bool _onInteractive{ false}; ///< True if the scheduler put this task on the interactive (group) scheduler. + int64_t _maxTableSize = 0; std::atomic _memHandle{memman::MemMan::HandleType::INVALID}; memman::MemMan::Ptr _memMan; diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 4cbe64f922..9ae3923f9e 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -173,6 +173,7 @@ void TransmitData::_buildDataMsg(lock_guard const& lock, Task const& task string msg = "Error(s) in result for chunk #" + to_string(task.getChunkId()) + ": " + multiErr.toOneLineString(); _result->set_errormsg(msg); + _result->set_errorcode(multiErr.firstErrorCode()); LOGS(_log, LOG_LVL_ERROR, _idStr << "buildDataMsg adding " << msg); } _result->SerializeToString(&_dataMsg);