Skip to content

Commit

Permalink
Merge branch 'tickets/DM-39819'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Jul 22, 2023
2 parents fe21aa3 + 044b146 commit 655206f
Show file tree
Hide file tree
Showing 23 changed files with 207 additions and 107 deletions.
46 changes: 30 additions & 16 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,23 +419,27 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next
};
bool success = false;
if (!_response->result.fileresource_xroot().empty()) {
success = ::readXrootFileResourceAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
success = _noErrorsInResult() &&
::readXrootFileResourceAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error(
"MergingHandler::flush ** message deserialization failed **");
});
} else if (!_response->result.fileresource_http().empty()) {
success = ::readHttpFileAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
success = _noErrorsInResult() &&
::readHttpFileAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error(
"MergingHandler::flush ** message deserialization failed **");
});
} else {
success = mergeCurrentResult();
}
Expand All @@ -461,6 +465,16 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next
return false;
}

bool MergingHandler::_noErrorsInResult() {
if (_response->result.has_errorcode() || _response->result.has_errormsg()) {
_setError(_response->result.errorcode(), _response->result.errormsg());
LOGS(_log, LOG_LVL_ERROR,
"Error from worker:" << _response->protoHeader.wname() << " in response data: " << _error);
return false;
}
return true;
}

void MergingHandler::errorFlush(std::string const& msg, int code) {
_setError(code, msg);
// Might want more info from result service.
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class MergingHandler : public qdisp::ResponseHandler {
void _setError(int code, std::string const& msg); ///< Set error code and string
bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer.
bool _verifyResult(BufPtr const& bufPtr, int blen); ///< Check the result against hash in the header.
bool _noErrorsInResult(); ///< Check if the result message has no errors, report the ones (if any).

std::shared_ptr<MsgReceiver> _msgReceiver; ///< Message code receiver
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
Expand Down
37 changes: 18 additions & 19 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,
auto const& fromTable = tableRefPtr->getTable();
rowsTable = fromDb + "__" + fromTable + "__rows";
// TODO consider using QMetaSelect instead of making a new connection.
auto cnx = sql::SqlConnectionFactory::make(sharedResources->czarConfig.getMySqlQmetaConfig());
auto cnx = sql::SqlConnectionFactory::make(czar::CzarConfig::instance()->getMySqlQmetaConfig());
sql::SqlErrorObject err;
auto tableExists = cnx->tableExists(rowsTable, err);
LOGS(_log, LOG_LVL_DEBUG,
Expand All @@ -161,28 +161,27 @@ bool qmetaHasDataForSelectCountStarQuery(query::SelectStmt::Ptr const& stmt,
}

std::shared_ptr<UserQuerySharedResources> makeUserQuerySharedResources(
czar::CzarConfig const& czarConfig, std::shared_ptr<qproc::DatabaseModels> const& dbModels,
std::string const& czarName) {
std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
return std::make_shared<UserQuerySharedResources>(
czarConfig,
css::CssAccess::createFromConfig(czarConfig.getCssConfigMap(), czarConfig.getEmptyChunkPath()),
czarConfig.getMySqlResultConfig(),
std::make_shared<qproc::SecondaryIndex>(czarConfig.getMySqlQmetaConfig()),
std::make_shared<qmeta::QMetaMysql>(czarConfig.getMySqlQmetaConfig(),
czarConfig.getMaxMsgSourceStore()),
std::make_shared<qmeta::QStatusMysql>(czarConfig.getMySqlQStatusDataConfig()),
std::make_shared<qmeta::QMetaSelect>(czarConfig.getMySqlQmetaConfig()),
sql::SqlConnectionFactory::make(czarConfig.getMySqlResultConfig()), dbModels, czarName,
czarConfig.getInteractiveChunkLimit());
css::CssAccess::createFromConfig(czarConfig->getCssConfigMap(), czarConfig->getEmptyChunkPath()),
czarConfig->getMySqlResultConfig(),
std::make_shared<qproc::SecondaryIndex>(czarConfig->getMySqlQmetaConfig()),
std::make_shared<qmeta::QMetaMysql>(czarConfig->getMySqlQmetaConfig(),
czarConfig->getMaxMsgSourceStore()),
std::make_shared<qmeta::QStatusMysql>(czarConfig->getMySqlQStatusDataConfig()),
std::make_shared<qmeta::QMetaSelect>(czarConfig->getMySqlQmetaConfig()),
sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig()), dbModels, czarName,
czarConfig->getInteractiveChunkLimit());
}

////////////////////////////////////////////////////////////////////////
UserQueryFactory::UserQueryFactory(czar::CzarConfig const& czarConfig,
qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName)
: _userQuerySharedResources(makeUserQuerySharedResources(czarConfig, dbModels, czarName)),
UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, std::string const& czarName)
: _userQuerySharedResources(makeUserQuerySharedResources(dbModels, czarName)),
_useQservRowCounterOptimization(true) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
_executiveConfig = std::make_shared<qdisp::ExecutiveConfig>(
czarConfig.getXrootdFrontendUrl(), czarConfig.getQMetaSecondsBetweenChunkUpdates());
czarConfig->getXrootdFrontendUrl(), czarConfig->getQMetaSecondsBetweenChunkUpdates());

// When czar crashes/exits while some queries are still in flight they
// are left in EXECUTING state in QMeta. We want to cleanup that state
Expand Down Expand Up @@ -290,8 +289,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
if (sessionValid) {
executive = qdisp::Executive::create(*_executiveConfig, messageStore, qdispSharedResources,
_userQuerySharedResources->queryStatsData, qs);
infileMergerConfig = std::make_shared<rproc::InfileMergerConfig>(
_userQuerySharedResources->czarConfig, _userQuerySharedResources->mysqlResultConfig);
infileMergerConfig =
std::make_shared<rproc::InfileMergerConfig>(_userQuerySharedResources->mysqlResultConfig);
infileMergerConfig->debugNoMerge = _debugNoMerge;
}

Expand Down
7 changes: 1 addition & 6 deletions src/ccontrol/UserQueryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ class UserQuery;
class UserQuerySharedResources;
} // namespace lsst::qserv::ccontrol

namespace lsst::qserv::czar {
class CzarConfig;
}

namespace lsst::qserv::qdisp {
class ExecutiveConfig;
}
Expand All @@ -71,8 +67,7 @@ namespace lsst::qserv::ccontrol {
/// constant between successive user queries.
class UserQueryFactory : private boost::noncopyable {
public:
UserQueryFactory(czar::CzarConfig const& czarConfig,
std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName);
UserQueryFactory(std::shared_ptr<qproc::DatabaseModels> const& dbModels, std::string const& czarName);

/// @param query: Query text
/// @param defaultDb: Default database name, may be empty
Expand Down
8 changes: 3 additions & 5 deletions src/ccontrol/UserQueryResources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,15 @@
namespace lsst::qserv::ccontrol {

UserQuerySharedResources::UserQuerySharedResources(
czar::CzarConfig const& czarConfig_, std::shared_ptr<css::CssAccess> const& css_,
mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<css::CssAccess> const& css_, mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<qproc::SecondaryIndex> const& secondaryIndex_,
std::shared_ptr<qmeta::QMeta> const& queryMetadata_,
std::shared_ptr<qmeta::QStatus> const& queryStatsData_,
std::shared_ptr<qmeta::QMetaSelect> const& qMetaSelect_,
std::shared_ptr<sql::SqlConnection> const& resultDbConn_,
std::shared_ptr<qproc::DatabaseModels> const& dbModels_, std::string const& czarName,
int interactiveChunkLimit_)
: czarConfig(czarConfig_),
css(css_),
: css(css_),
mysqlResultConfig(mysqlResultConfig_),
secondaryIndex(secondaryIndex_),
queryMetadata(queryMetadata_),
Expand All @@ -52,7 +50,7 @@ UserQuerySharedResources::UserQuerySharedResources(
resultDbConn(resultDbConn_),
databaseModels(dbModels_),
interactiveChunkLimit(interactiveChunkLimit_),
semaMgrConnections(new util::SemaMgr(czarConfig.getResultMaxConnections())) {
semaMgrConnections(new util::SemaMgr(czar::CzarConfig::instance()->getResultMaxConnections())) {
// register czar in QMeta
// TODO: check that czar with the same name is not active already?
qMetaCzarId = queryMetadata->registerCzar(czarName);
Expand Down
7 changes: 1 addition & 6 deletions src/ccontrol/UserQueryResources.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ namespace lsst::qserv::css {
class CssAccess;
}

namespace lsst::qserv::czar {
class CzarConfig;
}

namespace lsst::qserv::mysql {
class MySqlConfig;
}
Expand Down Expand Up @@ -72,7 +68,7 @@ namespace lsst::qserv::ccontrol {
*/
class UserQuerySharedResources {
public:
UserQuerySharedResources(czar::CzarConfig const& czarConfig_, std::shared_ptr<css::CssAccess> const& css_,
UserQuerySharedResources(std::shared_ptr<css::CssAccess> const& css_,
mysql::MySqlConfig const& mysqlResultConfig_,
std::shared_ptr<qproc::SecondaryIndex> const& secondaryIndex_,
std::shared_ptr<qmeta::QMeta> const& queryMetadata_,
Expand All @@ -84,7 +80,6 @@ class UserQuerySharedResources {

UserQuerySharedResources(UserQuerySharedResources const& rhs) = default;
UserQuerySharedResources& operator=(UserQuerySharedResources const& rhs) = delete;
czar::CzarConfig const& czarConfig;
std::shared_ptr<css::CssAccess> css;
mysql::MySqlConfig const mysqlResultConfig;
std::shared_ptr<qproc::SecondaryIndex> secondaryIndex;
Expand Down
9 changes: 4 additions & 5 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,6 @@ QueryState UserQuerySelect::join() {
}
_executive->updateProxyMessages();

// Capture these parameters before discarding the merger which would also reset the config.
bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish();
std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl();
try {
_discardMerger();
} catch (std::exception const& exc) {
Expand Down Expand Up @@ -391,9 +388,11 @@ QueryState UserQuerySelect::join() {
operation = proto::QueryManagement::CANCEL;
state = ERROR;
}
if (notifyWorkersOnQueryFinish) {
std::shared_ptr<czar::CzarConfig> const czarConfig = czar::CzarConfig::instance();
if (czarConfig->notifyWorkersOnQueryFinish()) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId);
xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation,
_qMetaQueryId);
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
Expand Down
Loading

0 comments on commit 655206f

Please sign in to comment.