From be6ed7f3af69c4ade0b8aa959a9cef94ee8423cb Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 3 Sep 2024 11:52:38 -0700 Subject: [PATCH] Added cancellation code and for queries, uberjobs, and czar restart. --- src/ccontrol/UserQuerySelect.cc | 10 +- src/czar/ActiveWorker.cc | 284 ++++++++++------------------ src/czar/ActiveWorker.h | 148 ++++++--------- src/czar/Czar.cc | 23 ++- src/czar/CzarChunkMap.cc | 6 +- src/czar/CzarRegistry.cc | 21 +- src/czar/CzarRegistry.h | 8 +- src/global/ResourceUnit.h | 2 +- src/global/intTypes.h | 4 +- src/http/WorkerQueryStatusData.cc | 230 ++++++++++++++-------- src/http/WorkerQueryStatusData.h | 142 ++++++++------ src/http/testStatusData.cc | 61 +++--- src/proto/ScanTableInfo.h | 2 +- src/proto/worker.proto | 2 +- src/qdisp/Executive.cc | 8 +- src/qdisp/Executive.h | 8 +- src/qdisp/JobBase.h | 3 +- src/qdisp/JobDescription.cc | 4 +- src/qdisp/JobQuery.cc | 2 +- src/qdisp/UberJob.h | 5 +- src/qdisp/testQDisp.cc | 6 +- src/wbase/FileChannelShared.cc | 15 +- src/wbase/FileChannelShared.h | 2 +- src/wbase/MsgProcessor.h | 10 +- src/wbase/Task.cc | 63 +++++- src/wbase/Task.h | 22 ++- src/wbase/UberJobData.cc | 8 + src/wbase/UberJobData.h | 6 + src/wbase/UserQueryInfo.cc | 76 ++++---- src/wbase/UserQueryInfo.h | 40 ++-- src/wcontrol/Foreman.cc | 2 + src/wcontrol/Foreman.h | 8 +- src/wdb/QueryRunner.cc | 3 +- src/wdb/testQueryRunner.cc | 32 ++-- src/wpublish/QueriesAndChunks.cc | 20 +- src/wpublish/QueriesAndChunks.h | 7 +- src/wpublish/QueryStatistics.cc | 10 +- src/wpublish/QueryStatistics.h | 12 +- src/wsched/testSchedulers.cc | 23 ++- src/xrdreq/QueryManagementAction.h | 2 +- src/xrdreq/QueryManagementRequest.h | 2 +- src/xrdsvc/ChannelStream.h | 2 +- src/xrdsvc/HttpSvc.cc | 4 +- src/xrdsvc/HttpWorkerCzarModule.cc | 119 +++++++++++- src/xrdsvc/SsiRequest.cc | 13 +- src/xrdsvc/SsiRequest.h | 3 +- 46 files changed, 862 insertions(+), 621 deletions(-) diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index d7d4fb5a0..7627fb960 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -364,8 +364,6 @@ void UserQuerySelect::buildAndSendUberJobs() { // Make a map of all jobs in the executive. // TODO:UJ Maybe a check should be made that all databases are in the same family? - - // keep cycling through workers until no more chunks to place. // - create a map of UberJobs key=, val=> // - for chunkId in `unassignedChunksInQuery` @@ -509,7 +507,7 @@ QueryState UserQuerySelect::join() { if (finalRows < 0) finalRows = collectedRows; // Notify workers on the query completion/cancellation to ensure // resources are properly cleaned over there as well. - proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE; + proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE; //&&&QM QueryState state = SUCCESS; if (successful) { _qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows); @@ -517,18 +515,18 @@ QueryState UserQuerySelect::join() { } else if (_killed) { // status is already set to ABORTED LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)"); - operation = proto::QueryManagement::CANCEL; + operation = proto::QueryManagement::CANCEL; //&&&QM state = ERROR; } else { _qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows); LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)"); - operation = proto::QueryManagement::CANCEL; + operation = proto::QueryManagement::CANCEL; //&&&QM state = ERROR; } auto const czarConfig = cconfig::CzarConfig::instance(); if (czarConfig->notifyWorkersOnQueryFinish()) { try { - // &&& do this another way, also see executive::squash + // &&& do this another way, also see executive::squash &&&QM xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation, _qMetaCzarId, _qMetaQueryId); } catch (std::exception const& ex) { diff --git a/src/czar/ActiveWorker.cc b/src/czar/ActiveWorker.cc index 78b7d04f0..39aa042ef 100644 --- a/src/czar/ActiveWorker.cc +++ b/src/czar/ActiveWorker.cc @@ -54,9 +54,12 @@ string WorkerContactInfo::dump() const { string ActiveWorker::getStateStr(State st) { switch (st) { - case ALIVE: return string("ALIVE"); - case QUESTIONABLE: return string("QUESTIONABLE"); - case DEAD: return string("DEAD"); + case ALIVE: + return string("ALIVE"); + case QUESTIONABLE: + return string("QUESTIONABLE"); + case DEAD: + return string("DEAD"); } return string("unknown"); } @@ -74,202 +77,74 @@ void ActiveWorker::setWorkerContactInfo(http::WorkerContactInfo::Ptr const& wcIn void ActiveWorker::_changeStateTo(State newState, double secsSinceUpdate, string const& note) { auto lLvl = (newState == DEAD) ? LOG_LVL_ERROR : LOG_LVL_INFO; - LOGS(_log, lLvl, note << " oldState=" << getStateStr(_state) << " newState=" << getStateStr(newState) << " secsSince=" << secsSinceUpdate); + LOGS(_log, lLvl, + note << " oldState=" << getStateStr(_state) << " newState=" << getStateStr(newState) + << " secsSince=" << secsSinceUpdate); _state = newState; } -void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double timeoutDeadSecs, double maxLifetime) { - // &&& function too long +void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double timeoutDeadSecs, + double maxLifetime) { lock_guard lg(_aMtx); double secsSinceUpdate = _wqsData->_wInfo->timeSinceRegUpdateSeconds(); // Update the last time the registry contacted this worker. switch (_state) { - case ALIVE: { - if (secsSinceUpdate > timeoutAliveSecs) { - _changeStateTo(QUESTIONABLE, secsSinceUpdate, cName(__func__)); - // Anything that should be done here? - } - break; - } - case QUESTIONABLE: { - if (secsSinceUpdate < timeoutAliveSecs) { - _changeStateTo(ALIVE, secsSinceUpdate, cName(__func__)); - } - if (secsSinceUpdate > timeoutDeadSecs) { - _changeStateTo(DEAD, secsSinceUpdate, cName(__func__)); - // &&& TODO:UJ all uberjobs for this worker need to die. - } - break; - } - case DEAD: { - LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE"); - if (secsSinceUpdate < timeoutAliveSecs) { - _changeStateTo(ALIVE, secsSinceUpdate, cName(__func__)); - } else { - // Don't waste time on this worker until the registry has heard from it. - return; + case ALIVE: { + if (secsSinceUpdate > timeoutAliveSecs) { + _changeStateTo(QUESTIONABLE, secsSinceUpdate, cName(__func__)); + // &&& Anything else that should be done here? + } + break; } - break; - } - - } - - // Check how many messages are currently being sent to the worker, if at the limit, return - if (_wqsData->_qIdDoneKeepFiles.empty() && _wqsData->_qIdDoneDeleteFiles.empty() && _wqsData->_qIdDeadUberJobs.empty()) { - return; - } - int tCount = _conThreadCount; - if (tCount > _maxConThreadCount) { - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " not sending message since at max threads " << tCount); - return; - } - - // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a - // message to send to the worker. -#if 0 // &&& - auto now = CLOCK::now(); - auto const czarConfig = cconfig::CzarConfig::instance(); - - shared_ptr jsWorkerReqPtr = make_shared(); - json& jsWorkerR = *jsWorkerReqPtr; - jsWorkerR["version"] = http::MetaModule::version; - jsWorkerR["instance_id"] = czarConfig->replicationInstanceId(); - jsWorkerR["auth_key"] = czarConfig->replicationAuthKey(); - jsWorkerR["worker"] = _wInfo->wId; - jsWorkerR["qiddonekeepfiles"] = json::array(); - jsWorkerR["qiddonedeletefiles"] = json::array(); - jsWorkerR["qiddeaduberjobs"] = json::array(); - jsWorkerR["czar"] = json::object(); - auto& jsWCzar = jsWorkerR["czar"]; - jsWCzar["name"] = czarConfig->name(); - jsWCzar["id"]= czarConfig->id(); - jsWCzar["management-port"] = czarConfig->replicationHttpPort(); - jsWCzar["management-host-name"] = util::get_current_host_fqdn(); - - - { - auto& jsDoneKeep = jsWorkerR["qiddonekeepfiles"]; - auto iterDoneKeep = _qIdDoneKeepFiles.begin(); - while (iterDoneKeep != _qIdDoneKeepFiles.end()) { - auto qId = iterDoneKeep->first; - jsDoneKeep.push_back(qId); - auto tmStamp = iterDoneKeep->second; - double ageSecs = std::chrono::duration(now - tmStamp).count(); - if (ageSecs > maxLifetime) { - iterDoneKeep = _qIdDoneKeepFiles.erase(iterDoneKeep); - } else { - ++iterDoneKeep; + case QUESTIONABLE: { + if (secsSinceUpdate < timeoutAliveSecs) { + _changeStateTo(ALIVE, secsSinceUpdate, cName(__func__)); + } + if (secsSinceUpdate > timeoutDeadSecs) { + _changeStateTo(DEAD, secsSinceUpdate, cName(__func__)); + // &&& TODO:UJ all uberjobs for this worker need to die. } + break; } - } - { - auto& jsDoneDelete = jsWorkerR["qiddonedeletefiles"]; - auto iterDoneDelete = _qIdDoneDeleteFiles.begin(); - while (iterDoneDelete != _qIdDoneDeleteFiles.end()) { - auto qId = iterDoneDelete->first; - jsDoneDelete.push_back(qId); - auto tmStamp = iterDoneDelete->second; - double ageSecs = std::chrono::duration(now - tmStamp).count(); - if (ageSecs > maxLifetime) { - iterDoneDelete = _qIdDoneDeleteFiles.erase(iterDoneDelete); + case DEAD: { + LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE"); + if (secsSinceUpdate < timeoutAliveSecs) { + _changeStateTo(ALIVE, secsSinceUpdate, cName(__func__)); } else { - ++iterDoneDelete; + // Don't waste time on this worker until the registry has heard from it. + return; } + break; } } - { - auto& jsDeadUj = jsWorkerR["qiddeaduberjobs"]; - auto iterDeadUjQid = _qIdDeadUberJobs.begin(); - while (iterDeadUjQid != _qIdDeadUberJobs.end()) { - TIMEPOINT oldestTm; // default is zero - auto qId = iterDeadUjQid->first; - auto& ujIdMap = iterDeadUjQid->second; - - json jsQidUj = {{"qid", qId}, {"ujids", json::array()}}; - auto& jsUjIds = jsQidUj["ujids"]; - - auto iterUjId = ujIdMap.begin(); - bool addedUjId = false; - while (iterUjId != ujIdMap.end()) { - UberJobId ujId = iterUjId->first; - auto tmStamp = iterUjId->second; - if (tmStamp > oldestTm) { - oldestTm = tmStamp; - } - - jsUjIds.push_back(ujId); - addedUjId = true; - double ageSecs = std::chrono::duration(now - tmStamp).count(); - if (ageSecs > maxLifetime) { - iterUjId = ujIdMap.erase(iterUjId); - } else { - ++iterUjId; - } - } - - if (addedUjId) { - jsDeadUj.push_back(jsQidUj); - } - if (ujIdMap.empty() - || std::chrono::duration(now - oldestTm).count() > maxLifetime) { - iterDeadUjQid = _qIdDeadUberJobs.erase(iterDeadUjQid); - } else { - ++iterDeadUjQid; - } + shared_ptr jsWorkerReqPtr; + { + lock_guard mapLg(_wqsData->_mapMtx); + // Check how many messages are currently being sent to the worker, if at the limit, return + if (_wqsData->_qIdDoneKeepFiles.empty() && _wqsData->_qIdDoneDeleteFiles.empty() && + _wqsData->_qIdDeadUberJobs.empty()) { + return; + } + int tCount = _conThreadCount; + if (tCount > _maxConThreadCount) { + LOGS(_log, LOG_LVL_DEBUG, + cName(__func__) << " not sending message since at max threads " << tCount); + return; } - } -#endif // &&& - auto jsWorkerReqPtr = _wqsData->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a + // message to send to the worker. + jsWorkerReqPtr = _wqsData->serializeJson(maxLifetime); + } + // &&& Maybe only send the status message if the lists are not empty ??? // Start a thread to send the message. (Maybe these should go on the qdisppool? &&&) // put this in a different function and start the thread.&&&; _sendStatusMsg(jsWorkerReqPtr); } -#if 0 // &&& -bool ActiveWorker::_parse(nlohmann::json const& jsWorkerReq) { - auto const czarConfig = cconfig::CzarConfig::instance(); - - http::RequestBodyJSON rbWReq(jsWorkerReq); - if (jsWorkerReq["version"] != http::MetaModule::version) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " bad version"); - return false; - } - - - http::RequestBodyJSON rbCzar(rbWReq.required("czar")); - auto czarName = rbCzar.required("name"); - auto czarId = rbCzar.required("id"); - auto czarPort = rbCzar.required("management-port"); - auto czarHostName = rbCzar.required("management-host-name"); - /* &&& - jsWorkerReq["instance_id"] != czarConfig->replicationInstanceId(); - jsWorkerReq["auth_key"] != czarConfig->replicationAuthKey(); - jsWorkerReq["worker"] != _wInfo->wId; - auto& jsWCzar = jsWorkerReq["czar"]; - jsWCzar["name"] != czarConfig->name(); - jsWCzar["id"] != czarConfig->id(); - jsWCzar["management-port"] != czarConfig->replicationHttpPort(); - jsWCzar["management-host-name"] != util::get_current_host_fqdn(); - */ - - - auto& jsQIdDoneKeepFiles = jsWorkerReq["qiddonekeepfiles"]; - for (auto const& qidKeep : jsQIdDoneKeepFiles) { - - } - - auto& jsQIdDoneDeleteFiles = jsWorkerReq["qiddonedeletefiles"]; - - auto& jsQIdDeadUberJobs = jsWorkerReq["qiddeaduberjobs"]; - -} -#endif // &&& - void ActiveWorker::_sendStatusMsg(std::shared_ptr const& jsWorkerReqPtr) { - auto& jsWorkerReq = *jsWorkerReqPtr; auto const method = http::Method::POST; auto const& wInf = _wqsData->_wInfo; @@ -277,7 +152,6 @@ void ActiveWorker::_sendStatusMsg(std::shared_ptr const& jsWorke vector const headers = {"Content-Type: application/json"}; auto const& czarConfig = cconfig::CzarConfig::instance(); - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " REQ " << jsWorkerReq); string const requestContext = "Czar: '" + http::method2string(method) + "' stat request to '" + url + "'"; LOGS(_log, LOG_LVL_TRACE, @@ -289,7 +163,7 @@ void ActiveWorker::_sendStatusMsg(std::shared_ptr const& jsWorke try { json const response = client.readAsJson(); if (0 != response.at("success").get()) { - transmitSuccess = true; + transmitSuccess = _wqsData->handleResponseJson(response); } else { LOGS(_log, LOG_LVL_WARN, cName(__func__) << " response success=0"); } @@ -299,12 +173,14 @@ void ActiveWorker::_sendStatusMsg(std::shared_ptr const& jsWorke } if (!transmitSuccess) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " transmit failure"); - } else { - // parse the return statement and remove the indicated entries from the list - //HERE &&&; } } +void ActiveWorker::addToDoneDeleteFiles(QueryId qId) { _wqsData->addToDoneDeleteFiles(qId); } + +void ActiveWorker::addToDoneKeepFiles(QueryId qId) { _wqsData->addToDoneKeepFiles(qId); } + +void ActiveWorker::removeDeadUberJobsFor(QueryId qId) { _wqsData->removeDeadUberJobsFor(qId); } string ActiveWorker::dump() const { lock_guard lg(_aMtx); @@ -317,8 +193,10 @@ string ActiveWorker::_dump() const { return os.str(); } - -void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap, http::CzarContactInfo::Ptr const& czInfo, std::string const& replicationInstanceId, std::string const& replicationAuthKey) { +void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap, + http::CzarContactInfo::Ptr const& czInfo, + std::string const& replicationInstanceId, + std::string const& replicationAuthKey) { // Go through wcMap, update existing entries in _awMap, create new entries for those that don't exist, lock_guard awLg(_awMapMtx); for (auto const& [wcKey, wcVal] : wcMap) { @@ -326,11 +204,16 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap, htt if (iter == _awMap.end()) { auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey); _awMap[wcKey] = newAW; + if (_czarCancelAfterRestart) { + newAW->setCzarCancelAfterRestart(_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId); + } } else { auto aWorker = iter->second; if (!aWorker->compareContactInfo(*wcVal)) { // This should not happen, but try to handle it gracefully if it does. - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " worker contact info changed for " << wcKey << " new=" << wcVal->dump() << " old=" << aWorker->dump()); + LOGS(_log, LOG_LVL_WARN, + cName(__func__) << " worker contact info changed for " << wcKey + << " new=" << wcVal->dump() << " old=" << aWorker->dump()); aWorker->setWorkerContactInfo(wcVal); } } @@ -351,13 +234,46 @@ void ActiveWorkerMap::pruneMap() { } */ +void ActiveWorkerMap::setCzarCancelAfterRestart(CzarIdType czId, QueryId lastQId) { + _czarCancelAfterRestart = true; + _czarCancelAfterRestartCzId = czId; + _czarCancelAfterRestartQId = lastQId; +} + void ActiveWorkerMap::sendActiveWorkersMessages() { // Send messages to each active worker as needed lock_guard lck(_awMapMtx); - for(auto&& [wName, awPtr] : _awMap) { + for (auto&& [wName, awPtr] : _awMap) { awPtr->updateStateAndSendMessages(_timeoutAliveSecs, _timeoutDeadSecs, _maxLifetime); } } +/// &&& doc +void ActiveWorkerMap::addToDoneDeleteFiles(QueryId qId) { + lock_guard lck(_awMapMtx); + for (auto const& [wName, awPtr] : _awMap) { + awPtr->addToDoneDeleteFiles(qId); + awPtr->removeDeadUberJobsFor(qId); + } +} + +/// &&& doc +void ActiveWorkerMap::addToDoneKeepFiles(QueryId qId) { + lock_guard lck(_awMapMtx); + for (auto const& [wName, awPtr] : _awMap) { + awPtr->addToDoneKeepFiles(qId); + awPtr->removeDeadUberJobsFor(qId); + } +} + +/* &&& +/// &&& doc +void ActiveWorkerMap::removeDeadUberJobsFor(QueryId qId) { + lock_guard lck(_awMapMtx); + for (auto const& [wName, awPtr] : _awMap) { + awPtr->removeDeadUberJobsFor(qId); + } +} +*/ } // namespace lsst::qserv::czar diff --git a/src/czar/ActiveWorker.h b/src/czar/ActiveWorker.h index 0db7a0d76..0c05e0180 100644 --- a/src/czar/ActiveWorker.h +++ b/src/czar/ActiveWorker.h @@ -32,74 +32,12 @@ #include "nlohmann/json.hpp" // qserv headers -// &&& #include "global/clock_defs.h" -// &&& #include "global/intTypes.h" #include "http/WorkerQueryStatusData.h" - +#include "util/Bug.h" // This header declarations namespace lsst::qserv::czar { - -/* &&& -/// &&& doc This class just contains the worker id and network communication -/// information, but it may be desirable to store connections to the -/// worker here as well. -class WorkerContactInfo { -public: - using Ptr = std::shared_ptr; - - using WCMap = std::unordered_map; - using WCMapPtr = std::shared_ptr; - - WorkerContactInfo(std::string const& wId_, std::string const& wHost_, - std::string const& wManagementHost_, int wPort_, TIMEPOINT updateTime_) - : wId(wId_), - wHost(wHost_), - wManagementHost(wManagementHost_), - wPort(wPort_) { - regUpdateTime(updateTime_); - } - std::string const wId; ///< key - std::string const wHost; ///< "host-addr" entry. - std::string const wManagementHost; ///< "management-host-name" entry. - int const wPort; ///< "management-port" entry. - - - /// Return true if all members, aside from updateTime, are equal. - bool isSameContactInfo(WorkerContactInfo const& other) const { - return (wId == other.wId && wHost == other.wHost && wManagementHost == other.wManagementHost && - wPort == other.wPort); - } - - void regUpdateTime(TIMEPOINT updateTime) { - std::lock_guard lg(_rMtx); - _regUpdate = updateTime; - } - - double timeSinceRegUpdateSeconds() const { - std::lock_guard lg(_rMtx); - double secs = std::chrono::duration(CLOCK::now() - _regUpdate).count(); - return secs; - } - - TIMEPOINT getRegUpdate() const { - std::lock_guard lg(_rMtx); - return _regUpdate; - } - - std::string dump() const; - -private: - /// Last time the registry heard from this worker. The ActiveWorker class - /// will use this to determine the worker's state. - /// &&& Store in seconds since epoch to make atomic? - TIMEPOINT _regUpdate; - - mutable std::mutex _rMtx; ///< protects _regUpdate -}; -*/ - /// &&& doc - maintain list of done/cancelled queries for an active worker, and send /// that list to the worker. Once the worker has accepted the list, remove /// all of those queryId's from the list. @@ -137,11 +75,7 @@ class ActiveWorker : public std::enable_shared_from_this { public: using Ptr = std::shared_ptr; - enum State { - ALIVE = 0, - QUESTIONABLE, - DEAD - }; + enum State { ALIVE = 0, QUESTIONABLE, DEAD }; ActiveWorker() = delete; ActiveWorker(ActiveWorker const&) = delete; @@ -154,10 +88,18 @@ class ActiveWorker : public std::enable_shared_from_this { static std::string getStateStr(State st); static Ptr create(http::WorkerContactInfo::Ptr const& wInfo, http::CzarContactInfo::Ptr const& czInfo, - std::string const& replicationInstanceId, std::string const& replicationAuthKey) { + std::string const& replicationInstanceId, std::string const& replicationAuthKey) { return Ptr(new ActiveWorker(wInfo, czInfo, replicationInstanceId, replicationAuthKey)); } + /// This function should only be called before the _monitor thread is started + /// and shortly after czar startup: it tells all workers to delete all + /// query information for queries with czarId `czId` and queryId less than + /// or equal to `lastQId`. + void setCzarCancelAfterRestart(CzarIdType czId, QueryId lastQId) { + _wqsData->setCzarCancelAfterRestart(czId, lastQId); + } + http::WorkerContactInfo::Ptr getWInfo() const { if (_wqsData == nullptr) return nullptr; return _wqsData->_wInfo; @@ -173,13 +115,27 @@ class ActiveWorker : public std::enable_shared_from_this { /// &&& doc void updateStateAndSendMessages(double timeoutAliveSecs, double timeoutDeadSecs, double maxLifetime); + /// &&& doc + void addToDoneDeleteFiles(QueryId qId); + + /// &&& doc + void addToDoneKeepFiles(QueryId qId); + + /// &&& doc + void removeDeadUberJobsFor(QueryId qId); + std::string dump() const; private: ///&&&ActiveWorker(WorkerContactInfo::Ptr const& wInfo) : _wInfo(wInfo) {} ActiveWorker(http::WorkerContactInfo::Ptr const& wInfo, http::CzarContactInfo::Ptr const& czInfo, - std::string const& replicationInstanceId, std::string const& replicationAuthKey) - : _wqsData(http::WorkerQueryStatusData::create(wInfo, czInfo, replicationInstanceId, replicationAuthKey)) {} + std::string const& replicationInstanceId, std::string const& replicationAuthKey) + : _wqsData(http::WorkerQueryStatusData::create(wInfo, czInfo, replicationInstanceId, + replicationAuthKey)) { + if (_wqsData == nullptr) { + throw util::Bug(ERR_LOC, "ActiveWorker _wqsData null"); + } + } /// &&& doc /// _aMtx must be held before calling. @@ -192,21 +148,13 @@ class ActiveWorker : public std::enable_shared_from_this { /// _aMtx must be held before calling. std::string _dump() const; - /* &&& - std::map _qIdDoneKeepFiles; ///< &&& doc - limit reached - std::map _qIdDoneDeleteFiles; ///< &&& doc -cancelled/finished - std::map> _qIdDeadUberJobs; ///< &&& doc + /// Contains data that needs to be sent to workers about finished/cancelled + /// user queries and UberJobs. It must not be null. + http::WorkerQueryStatusData::Ptr const _wqsData; - /// &&& TODO:UJ Worth the effort to inform worker of killed UberJobs? - //std::map> _killedUberJobs; + State _state{QUESTIONABLE}; ///< current state of this worker. - WorkerContactInfo::Ptr _wInfo; ///< &&& doc - */ - http::WorkerQueryStatusData::Ptr _wqsData; ///< &&& doc - - State _state{QUESTIONABLE}; ///< current state of this worker. - - mutable std::mutex _aMtx; ///< protects _wInfo, _state, _qIdDoneKeepFiles, _qIdDoneDeleteFiles + mutable std::mutex _aMtx; ///< protects _wInfo, _state, _qIdDoneKeepFiles, _qIdDoneDeleteFiles /// The number of communication threads currently in use by this class instance. std::atomic _conThreadCount{0}; @@ -214,7 +162,7 @@ class ActiveWorker : public std::enable_shared_from_this { /// &&& doc /// @throws std::invalid_argument - bool _parse(nlohmann::json const& jsWorkerReq); // &&& delete after basic testing + bool _parse(nlohmann::json const& jsWorkerReq); // &&& delete after basic testing }; /// &&& doc @@ -229,26 +177,38 @@ class ActiveWorkerMap { ActiveWorkerMap operator=(ActiveWorkerMap const&) = delete; ~ActiveWorkerMap() = default; - std::string cName(const char* fName) { - return std::string("ActiveWorkerMap::") + fName + " "; - } + std::string cName(const char* fName) { return std::string("ActiveWorkerMap::") + fName + " "; } /// &&& doc - void updateMap(http::WorkerContactInfo::WCMap const& wcMap, http::CzarContactInfo::Ptr const& czInfo, std::string const& replicationInstanceId, std::string const& replicationAuthKey); + void updateMap(http::WorkerContactInfo::WCMap const& wcMap, http::CzarContactInfo::Ptr const& czInfo, + std::string const& replicationInstanceId, std::string const& replicationAuthKey); - //&&&void pruneMap(); /// &&& may not be needed ??? + /// If this is to be called, it must be called before Czar::_monitor is started: + /// It tells the workers all queries from `czId` with QueryIds less than `lastQId` + /// should be cancelled. + void setCzarCancelAfterRestart(CzarIdType czId, QueryId lastQId); // &&& doc void sendActiveWorkersMessages(); + /// &&& doc + void addToDoneDeleteFiles(QueryId qId); + + /// &&& doc + void addToDoneKeepFiles(QueryId qId); + private: std::map _awMap; - std::mutex _awMapMtx; ///< protects _awMap; + std::mutex _awMapMtx; ///< protects _awMap; //&&&double const _maxDeadTimeSeconds = 60.0 * 15.0; ///< &&& set from config. - double _timeoutAliveSecs = 60.0 * 5.0; ///< &&& set from config. 5min - double _timeoutDeadSecs = 60.0 * 10.0; ///< &&& set from config. 10min - double _maxLifetime = 60.0 * 60.0; ///< &&& set from config. 1hr + double _timeoutAliveSecs = 60.0 * 5.0; ///< &&& set from config. 5min + double _timeoutDeadSecs = 60.0 * 10.0; ///< &&& set from config. 10min + double _maxLifetime = 60.0 * 60.0; ///< &&& set from config. 1hr + + bool _czarCancelAfterRestart = false; + CzarIdType _czarCancelAfterRestartCzId = 0; + QueryId _czarCancelAfterRestartQId = 0; }; } // namespace lsst::qserv::czar diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 75bae4107..b9f35cb98 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -146,8 +146,6 @@ void Czar::_monitor() { // &&& Go through the ActiveWorkerMap. Each ActiveWorker instance has a list of QueryIds // that have not yet been acknowledged by the worker, so send a message to each worker // with that list. - - } } @@ -158,7 +156,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) _idCounter(), _uqFactory(), _clientToQuery(), - _activeWorkerMap(new ActiveWorkerMap()){ + _activeWorkerMap(new ActiveWorkerMap()) { // set id counter to milliseconds since the epoch, mod 1 year. struct timeval tv; gettimeofday(&tv, nullptr); @@ -175,9 +173,6 @@ Czar::Czar(string const& configFilePath, string const& czarName) // the name of the Czar gets translated into a numeric identifier. _czarConfig->setId(_uqFactory->userQuerySharedResources()->qMetaCzarId); - // This will block until there is a successful read of the database tables. - _czarFamilyMap = CzarFamilyMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); - // Tell workers to cancel any queries that were submitted before this restart of Czar. // Figure out which query (if any) was recorded in Czar database before the restart. // The id will be used as the high-watermark for queries that need to be cancelled. @@ -186,6 +181,18 @@ Czar::Czar(string const& configFilePath, string const& czarName) // if (_czarConfig->notifyWorkersOnCzarRestart()) { try { + QueryId lastQId = _lastQueryIdBeforeRestart(); + _activeWorkerMap->setCzarCancelAfterRestart(_czarConfig->id(), lastQId); + } catch (std::exception const& ex) { + LOGS(_log, LOG_LVL_WARN, ex.what()); + } + } + /* &&& (moved this and czar crashed instantly, why?) + + if (_czarConfig->notifyWorkersOnCzarRestart()) { + try { + // &&&QM use http - Add flag to each worker in _activeWorkerMap + // TODO:UJ - Workers need to contact the registry and kill queries if the associated czar dies. xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig->getXrootdFrontendUrl(), proto::QueryManagement::CANCEL_AFTER_RESTART, _czarConfig->id(), _lastQueryIdBeforeRestart()); @@ -193,6 +200,10 @@ Czar::Czar(string const& configFilePath, string const& czarName) LOGS(_log, LOG_LVL_WARN, ex.what()); } } + */ + + // This will block until there is a successful read of the database tables. + _czarFamilyMap = CzarFamilyMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); int qPoolSize = _czarConfig->getQdispPoolSize(); int maxPriority = std::max(0, _czarConfig->getQdispMaxPriority()); diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 7116aa1cc..3e8607b54 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -300,8 +300,10 @@ bool CzarFamilyMap::_read() { return false; } - // &&& TODO:UJ Before makeNewMaps(), get a list of workers considered to be alive by czar::_activeWorkerMap - // give that list to makeNewMaps, and don't and workers to the maps that aren't on the list.&&& !!! + // &&& TODO:UJ Before makeNewMaps(), get a list of workers considered to be alive by + // czar::_activeWorkerMap + // give that list to makeNewMaps, and don't and workers to the maps that aren't on the + // list.&&& !!! // Make the new maps. shared_ptr familyMapPtr = makeNewMaps(qChunkMap); diff --git a/src/czar/CzarRegistry.cc b/src/czar/CzarRegistry.cc index 0824d0ad8..c37b5da47 100644 --- a/src/czar/CzarRegistry.cc +++ b/src/czar/CzarRegistry.cc @@ -125,13 +125,15 @@ void CzarRegistry::_registryWorkerInfoLoop() { http::WorkerContactInfo::WCMapPtr wMap = _buildMapFromJson(response); // Compare the new map to the existing map and replace if different. { - auto czInfo = http::CzarContactInfo::create(_czarConfig->name(), _czarConfig->id(), _czarConfig->replicationHttpPort(), util::get_current_host_fqdn()); + auto czInfo = http::CzarContactInfo::create(_czarConfig->name(), _czarConfig->id(), + _czarConfig->replicationHttpPort(), + util::get_current_host_fqdn()); lock_guard lck(_mapMtx); if (wMap != nullptr && !_compareMapContactInfo(*wMap)) { _contactMap = wMap; _latestMapUpdate = CLOCK::now(); - _activeWorkerMap.updateMap(*_contactMap, czInfo, replicationInstanceId, replicationAuthKey); - + _activeWorkerMap.updateMap(*_contactMap, czInfo, replicationInstanceId, + replicationAuthKey); } } } @@ -200,4 +202,17 @@ void CzarRegistry::sendActiveWorkersMessages() { _activeWorkerMap.sendActiveWorkersMessages(); } +void CzarRegistry::endUserQuery(QueryId qId, bool deleteWorkerResults) { + lock_guard lck(_mapMtx); + // Add query id to the appropriate list. + if (deleteWorkerResults) { + _activeWorkerMap.addToDoneDeleteFiles(qId); + } else { + _activeWorkerMap.addToDoneKeepFiles(qId); + } + + // With lists updated, send out messages. + _activeWorkerMap.sendActiveWorkersMessages(); +} + } // namespace lsst::qserv::czar diff --git a/src/czar/CzarRegistry.h b/src/czar/CzarRegistry.h index e1e52a6e1..c743c6001 100644 --- a/src/czar/CzarRegistry.h +++ b/src/czar/CzarRegistry.h @@ -77,6 +77,9 @@ class CzarRegistry { /// &&& doc void sendActiveWorkersMessages(); + /// &&& doc + void endUserQuery(QueryId qId, bool deleteWorkerResults); + private: CzarRegistry() = delete; CzarRegistry(std::shared_ptr const& czarConfig); @@ -105,9 +108,10 @@ class CzarRegistry { /// Pointer to the map of worker contact information. http::WorkerContactInfo::WCMapPtr _contactMap; - TIMEPOINT _latestMapUpdate; ///< The last time the _contactMap was updated, unrelated to WorkerContactInfo update. + TIMEPOINT _latestMapUpdate; ///< The last time the _contactMap was updated, unrelated to + ///< WorkerContactInfo update. // &&& review how this _mapMtx is used, probably locks for too long a period. - std::mutex _mapMtx; /// Protects _contactMap, _latestUpdate, _activeWorkerMap + std::mutex _mapMtx; /// Protects _contactMap, _latestUpdate, _activeWorkerMap ActiveWorkerMap _activeWorkerMap; ///< Map of workers czar considers active. }; diff --git a/src/global/ResourceUnit.h b/src/global/ResourceUnit.h index ad4a1ef0b..c9f983740 100644 --- a/src/global/ResourceUnit.h +++ b/src/global/ResourceUnit.h @@ -42,7 +42,7 @@ namespace lsst::qserv { /// construction, the code for generating a path that includes the key-value /// portion is not implemented. It is unclear whether we need the generation /// capability, now that key-value pairs can be packed in protobufs messages. -class ResourceUnit { +class ResourceUnit { // TODO:UJ &&& delete if possible public: class Checker; enum UnitType { GARBAGE, DBCHUNK, UNKNOWN, QUERY }; diff --git a/src/global/intTypes.h b/src/global/intTypes.h index f4b4197f7..8463644e5 100644 --- a/src/global/intTypes.h +++ b/src/global/intTypes.h @@ -38,8 +38,8 @@ typedef std::vector Int32Vector; /// Typedef for Query ID in query metadata. typedef std::uint64_t QueryId; typedef std::int64_t JobId; -typedef JobId UberJobId; // These must be the same type. -typedef std::uint32_t CzarIdType; // TODO:UJ remove qmeta::CzarId and rename this CzarId +typedef JobId UberJobId; // These must be the same type. +typedef std::uint32_t CzarIdType; // TODO:UJ remove qmeta::CzarId and rename this CzarId /// Class to provide a consistent format for QueryIds in the log file class QueryIdHelper { diff --git a/src/http/WorkerQueryStatusData.cc b/src/http/WorkerQueryStatusData.cc index cd254f7c0..aed6bf73b 100644 --- a/src/http/WorkerQueryStatusData.cc +++ b/src/http/WorkerQueryStatusData.cc @@ -46,7 +46,7 @@ namespace lsst::qserv::http { json CzarContactInfo::serializeJson() const { json jsCzar; jsCzar["name"] = czName; - jsCzar["id"]= czId; + jsCzar["id"] = czId; jsCzar["management-port"] = czPort; jsCzar["management-host-name"] = czHostName; return jsCzar; @@ -71,11 +71,9 @@ std::string CzarContactInfo::dump() const { return os.str(); } - - json WorkerContactInfo::serializeJson() const { json jsWorker; - jsWorker["id"]= wId; + jsWorker["id"] = wId; jsWorker["host"] = wHost; jsWorker["management-host-name"] = wManagementHost; jsWorker["management-port"] = wPort; @@ -100,8 +98,6 @@ WorkerContactInfo::Ptr WorkerContactInfo::createJson(nlohmann::json const& wJson return nullptr; } - - string WorkerContactInfo::dump() const { stringstream os; os << "workerContactInfo{" @@ -130,41 +126,44 @@ void WorkerQueryStatusData::setWorkerContactInfo(WorkerContactInfo::Ptr const& w } */ - -shared_ptr WorkerQueryStatusData::serializeJson(double timeoutAliveSecs, double timeoutDeadSecs, double maxLifetime) { - +shared_ptr WorkerQueryStatusData::serializeJson(double maxLifetime) { // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a // message to send to the worker. auto now = CLOCK::now(); - //&&&auto const czarConfig = cconfig::CzarConfig::instance(); - shared_ptr jsWorkerReqPtr = make_shared(); json& jsWorkerR = *jsWorkerReqPtr; jsWorkerR["version"] = http::MetaModule::version; - /* &&& - jsWorkerR["instance_id"] = czarConfig->replicationInstanceId(); - jsWorkerR["auth_key"] = czarConfig->replicationAuthKey(); - */ jsWorkerR["instance_id"] = _replicationInstanceId; jsWorkerR["auth_key"] = _replicationAuthKey; - //&&&jsWorkerR["worker"] = _wInfo->wId; - jsWorkerR["qiddonekeepfiles"] = json::array(); - jsWorkerR["qiddonedeletefiles"] = json::array(); - jsWorkerR["qiddeaduberjobs"] = json::array(); - //&&&jsWorkerR["czar"] = json::object(); jsWorkerR["czar"] = _czInfo->serializeJson(); - //&&&jsWorkerR["worker"] = json::object(); jsWorkerR["worker"] = _wInfo->serializeJson(); + addListsToJson(jsWorkerR, now, maxLifetime); + if (_czarCancelAfterRestart) { + jsWorkerR["czarrestart"] = true; + lock_guard mapLg(_mapMtx); + jsWorkerR["czarrestartcancelczid"] = _czarCancelAfterRestartCzId; + jsWorkerR["czarrestartcancelqid"] = _czarCancelAfterRestartQId; + } else { + jsWorkerR["czarrestart"] = false; + } + + return jsWorkerReqPtr; +} +void WorkerQueryStatusData::addListsToJson(json& jsWR, TIMEPOINT tm, double maxLifetime) { + jsWR["qiddonekeepfiles"] = json::array(); + jsWR["qiddonedeletefiles"] = json::array(); + jsWR["qiddeaduberjobs"] = json::array(); + lock_guard mapLg(_mapMtx); { - auto& jsDoneKeep = jsWorkerR["qiddonekeepfiles"]; + auto& jsDoneKeep = jsWR["qiddonekeepfiles"]; auto iterDoneKeep = _qIdDoneKeepFiles.begin(); while (iterDoneKeep != _qIdDoneKeepFiles.end()) { auto qId = iterDoneKeep->first; jsDoneKeep.push_back(qId); auto tmStamp = iterDoneKeep->second; - double ageSecs = std::chrono::duration(now - tmStamp).count(); + double ageSecs = std::chrono::duration(tm - tmStamp).count(); if (ageSecs > maxLifetime) { iterDoneKeep = _qIdDoneKeepFiles.erase(iterDoneKeep); } else { @@ -173,13 +172,13 @@ shared_ptr WorkerQueryStatusData::serializeJson(double timeoutAliveSecs, d } } { - auto& jsDoneDelete = jsWorkerR["qiddonedeletefiles"]; + auto& jsDoneDelete = jsWR["qiddonedeletefiles"]; auto iterDoneDelete = _qIdDoneDeleteFiles.begin(); while (iterDoneDelete != _qIdDoneDeleteFiles.end()) { auto qId = iterDoneDelete->first; jsDoneDelete.push_back(qId); auto tmStamp = iterDoneDelete->second; - double ageSecs = std::chrono::duration(now - tmStamp).count(); + double ageSecs = std::chrono::duration(tm - tmStamp).count(); if (ageSecs > maxLifetime) { iterDoneDelete = _qIdDoneDeleteFiles.erase(iterDoneDelete); } else { @@ -188,10 +187,10 @@ shared_ptr WorkerQueryStatusData::serializeJson(double timeoutAliveSecs, d } } { - auto& jsDeadUj = jsWorkerR["qiddeaduberjobs"]; + auto& jsDeadUj = jsWR["qiddeaduberjobs"]; auto iterDeadUjQid = _qIdDeadUberJobs.begin(); while (iterDeadUjQid != _qIdDeadUberJobs.end()) { - TIMEPOINT oldestTm; // default is zero + TIMEPOINT oldestTm; // default is zero auto qId = iterDeadUjQid->first; auto& ujIdMap = iterDeadUjQid->second; @@ -209,7 +208,7 @@ shared_ptr WorkerQueryStatusData::serializeJson(double timeoutAliveSecs, d jsUjIds.push_back(ujId); addedUjId = true; - double ageSecs = std::chrono::duration(now - tmStamp).count(); + double ageSecs = std::chrono::duration(tm - tmStamp).count(); if (ageSecs > maxLifetime) { iterUjId = ujIdMap.erase(iterUjId); } else { @@ -221,25 +220,19 @@ shared_ptr WorkerQueryStatusData::serializeJson(double timeoutAliveSecs, d jsDeadUj.push_back(jsQidUj); } - if (ujIdMap.empty() - || std::chrono::duration(now - oldestTm).count() > maxLifetime) { + if (ujIdMap.empty() || std::chrono::duration(tm - oldestTm).count() > maxLifetime) { iterDeadUjQid = _qIdDeadUberJobs.erase(iterDeadUjQid); } else { ++iterDeadUjQid; } } } - - /* &&& happens in the caller now. - // Start a thread to send the message. (Maybe these should go on the qdisppool? &&&) - // put this in a different function and start the thread.&&&; - _sendStatusMsg(jsWorkerReqPtr); - */ - return jsWorkerReqPtr; } WorkerQueryStatusData::Ptr WorkerQueryStatusData::createJson(nlohmann::json const& jsWorkerReq, - std::string const& replicationInstanceId, std::string const& replicationAuthKey, TIMEPOINT updateTm) { + std::string const& replicationInstanceId, + std::string const& replicationAuthKey, + TIMEPOINT updateTm) { LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& a"); try { if (jsWorkerReq["version"] != http::MetaModule::version) { @@ -253,17 +246,21 @@ WorkerQueryStatusData::Ptr WorkerQueryStatusData::createJson(nlohmann::json cons auto wInfo_ = WorkerContactInfo::createJson(jsWorkerReq["worker"], updateTm); LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& d"); if (czInfo_ == nullptr || wInfo_ == nullptr) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson czar or worker info could not be parsed in " << jsWorkerReq); + LOGS(_log, LOG_LVL_ERROR, + "WorkerQueryStatusData::createJson czar or worker info could not be parsed in " + << jsWorkerReq); } - auto wqsData = WorkerQueryStatusData::create(wInfo_, czInfo_, replicationInstanceId, replicationAuthKey); + auto wqsData = + WorkerQueryStatusData::create(wInfo_, czInfo_, replicationInstanceId, replicationAuthKey); LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& e"); - - auto parseRes = wqsData->_parseLists(jsWorkerReq, updateTm); - if (!parseRes) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson error reading lists in " << jsWorkerReq); - return nullptr; - } + wqsData->parseLists(jsWorkerReq, updateTm); LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& end"); + bool czarRestart = RequestBodyJSON::required(jsWorkerReq, "czarrestart"); + if (czarRestart) { + auto restartCzarId = RequestBodyJSON::required(jsWorkerReq, "czarrestartcancelczid"); + auto restartQueryId = RequestBodyJSON::required(jsWorkerReq, "czarrestartcancelqid"); + wqsData->setCzarCancelAfterRestart(restartCzarId, restartQueryId); + } return wqsData; } catch (invalid_argument const& exc) { LOGS(_log, LOG_LVL_ERROR, string("WorkerQueryStatusData::createJson invalid ") << exc.what()); @@ -271,48 +268,52 @@ WorkerQueryStatusData::Ptr WorkerQueryStatusData::createJson(nlohmann::json cons return nullptr; } -bool WorkerQueryStatusData::_parseLists(nlohmann::json const& jsWorkerReq, TIMEPOINT updateTm) { - try { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& a"); - auto& jsQIdDoneKeepFiles = jsWorkerReq["qiddonekeepfiles"]; - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& b"); - for (auto const& qidKeep : jsQIdDoneKeepFiles) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& b1"); - _qIdDoneKeepFiles[qidKeep] = updateTm; - } +void WorkerQueryStatusData::parseLists(nlohmann::json const& jsWR, TIMEPOINT updateTm) { + lock_guard mapLg(_mapMtx); + parseListsInto(jsWR, updateTm, _qIdDoneKeepFiles, _qIdDoneDeleteFiles, _qIdDeadUberJobs); +} - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& c"); - auto& jsQIdDoneDeleteFiles = jsWorkerReq["qiddonedeletefiles"]; - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& d"); - for (auto const& qidDelete : jsQIdDoneDeleteFiles) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& d1"); - _qIdDoneDeleteFiles[qidDelete] = updateTm; - } +void WorkerQueryStatusData::parseListsInto(nlohmann::json const& jsWR, TIMEPOINT updateTm, + std::map& doneKeepF, + std::map& doneDeleteF, + std::map>& deadUberJobs) { + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& a"); + auto& jsQIdDoneKeepFiles = jsWR["qiddonekeepfiles"]; + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& b"); + for (auto const& qidKeep : jsQIdDoneKeepFiles) { + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& b1"); + doneKeepF[qidKeep] = updateTm; + } - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& e"); - auto& jsQIdDeadUberJobs = jsWorkerReq["qiddeaduberjobs"]; - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& f jsQIdDeadUberJobs=" << jsQIdDeadUberJobs); - // Interestingly, !jsQIdDeadUberJobs.empty() doesn't work, but .size() > 0 does. - // Not having the size() check causes issues with the for loop trying to read the - // first element of an empty list, which goes badly. - if (jsQIdDeadUberJobs.size() > 0) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& f1"); - for (auto const& qDeadUjs : jsQIdDeadUberJobs) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& f1a qDeadUjs=" << qDeadUjs); - QueryId qId = qDeadUjs["qid"]; - auto const& ujIds = qDeadUjs["ujids"]; - auto& mapOfUj = _qIdDeadUberJobs[qId]; - for (auto const& ujId : ujIds) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& f1d1 qId=" << qId << " ujId=" << ujId); - mapOfUj[ujId] = updateTm; - } + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& c"); + auto& jsQIdDoneDeleteFiles = jsWR["qiddonedeletefiles"]; + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& d"); + for (auto const& qidDelete : jsQIdDoneDeleteFiles) { + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& d1"); + doneDeleteF[qidDelete] = updateTm; + } + + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& e"); + auto& jsQIdDeadUberJobs = jsWR["qiddeaduberjobs"]; + LOGS(_log, LOG_LVL_ERROR, + "WorkerQueryStatusData::parseListsInto &&& f jsQIdDeadUberJobs=" << jsQIdDeadUberJobs); + // Interestingly, !jsQIdDeadUberJobs.empty() doesn't work, but .size() > 0 does. + // Not having the size() check causes issues with the for loop trying to read the + // first element of an empty list, which goes badly. + if (jsQIdDeadUberJobs.size() > 0) { + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& f1"); + for (auto const& qDeadUjs : jsQIdDeadUberJobs) { + LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& f1a qDeadUjs=" << qDeadUjs); + QueryId qId = qDeadUjs["qid"]; + auto const& ujIds = qDeadUjs["ujids"]; + auto& mapOfUj = deadUberJobs[qId]; + for (auto const& ujId : ujIds) { + LOGS(_log, LOG_LVL_ERROR, + "WorkerQueryStatusData::parseListsInto &&& f1d1 qId=" << qId << " ujId=" << ujId); + mapOfUj[ujId] = updateTm; } } - } catch (invalid_argument const& exc) { - LOGS(_log, LOG_LVL_ERROR, string("WorkerQueryStatusData::_parseLists invalid ") << exc.what()); - return false; } - return true; } void WorkerQueryStatusData::addDeadUberJobs(QueryId qId, std::vector ujIds, TIMEPOINT tm) { @@ -322,10 +323,71 @@ void WorkerQueryStatusData::addDeadUberJobs(QueryId qId, std::vector } } +void WorkerQueryStatusData::addToDoneDeleteFiles(QueryId qId) { + lock_guard mapLg(_mapMtx); + _qIdDoneDeleteFiles[qId] = CLOCK::now(); +} + +void WorkerQueryStatusData::addToDoneKeepFiles(QueryId qId) { + lock_guard mapLg(_mapMtx); + _qIdDoneKeepFiles[qId] = CLOCK::now(); +} + +void WorkerQueryStatusData::removeDeadUberJobsFor(QueryId qId) { + lock_guard mapLg(_mapMtx); + _qIdDeadUberJobs.erase(qId); +} + +json WorkerQueryStatusData::serializeResponseJson() { + // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a + // reponse. Nothing should be deleted and time is irrelevant for this, so maxLifetime is enormous + // and any time could be used, but now is easy. + double maxLifetime = std::numeric_limits::max(); + auto now = CLOCK::now(); + json jsResp = {{"success", 1}, {"errortype", "none"}, {"note", ""}}; + addListsToJson(jsResp, now, maxLifetime); + return jsResp; +} + +bool WorkerQueryStatusData::handleResponseJson(nlohmann::json const& jsResp) { + auto now = CLOCK::now(); + std::map doneKeepF; + std::map doneDeleteF; + std::map> deadUberJobs; + parseListsInto(jsResp, now, doneKeepF, doneDeleteF, deadUberJobs); + + lock_guard mapLg(_mapMtx); + // Remove entries from _qIdDoneKeepFiles + for (auto const& [qId, tm] : doneKeepF) { + _qIdDoneKeepFiles.erase(qId); + } + + // Remove entries from _qIdDoneDeleteFiles + for (auto const& [qId, tm] : doneDeleteF) { + _qIdDoneDeleteFiles.erase(qId); + } + + // Remove entries from _qIdDeadUberJobs + for (auto const& [qId, ujMap] : deadUberJobs) { + auto iter = _qIdDeadUberJobs.find(qId); + if (iter != _qIdDeadUberJobs.end()) { + auto& deadMap = iter->second; + for (auto const& [ujId, tm] : ujMap) { + deadMap.erase(ujId); + } + if (deadMap.empty()) { + _qIdDeadUberJobs.erase(iter); + } + } + } + + return true; +} + string WorkerQueryStatusData::dump() const { stringstream os; os << "ActiveWorker " << ((_wInfo == nullptr) ? "?" : _wInfo->dump()); return os.str(); } -} // namespace lsst::qserv::czar +} // namespace lsst::qserv::http diff --git a/src/http/WorkerQueryStatusData.h b/src/http/WorkerQueryStatusData.h index f0f6c1aaa..44282462c 100644 --- a/src/http/WorkerQueryStatusData.h +++ b/src/http/WorkerQueryStatusData.h @@ -35,7 +35,6 @@ #include "global/clock_defs.h" #include "global/intTypes.h" - // This header declarations namespace lsst::qserv::http { @@ -43,9 +42,7 @@ namespace lsst::qserv::http { class CzarContactInfo { public: using Ptr = std::shared_ptr; - std::string cName(const char* fnc) const { - return std::string("CzarContactInfo") + fnc; - } + std::string cName(const char* fnc) const { return std::string("CzarContactInfo") + fnc; } CzarContactInfo() = delete; CzarContactInfo(CzarContactInfo const&) = default; @@ -53,19 +50,21 @@ class CzarContactInfo { /// &&& doc bool compare(CzarContactInfo const& other) { - return (czName == other.czName && czId == other.czId && czPort == other.czPort && czHostName == other.czHostName); + return (czName == other.czName && czId == other.czId && czPort == other.czPort && + czHostName == other.czHostName); } - static Ptr create(std::string const& czName_, CzarIdType czId_, int czPort_, std::string const& czHostName_) { + static Ptr create(std::string const& czName_, CzarIdType czId_, int czPort_, + std::string const& czHostName_) { return Ptr(new CzarContactInfo(czName_, czId_, czPort_, czHostName_)); } static Ptr createJson(nlohmann::json const& czarJson); - std::string const czName; ///< czar "name" - CzarIdType const czId; ///< czar "id" - int const czPort; ///< czar "management-port" - std::string const czHostName; ///< czar "management-host-name" + std::string const czName; ///< czar "name" + CzarIdType const czId; ///< czar "id" + int const czPort; ///< czar "management-port" + std::string const czHostName; ///< czar "management-host-name" /// &&& doc nlohmann::json serializeJson() const; @@ -83,10 +82,9 @@ class CzarContactInfo { */ private: CzarContactInfo(std::string const& czName_, CzarIdType czId_, int czPort_, std::string const& czHostName_) - : czName(czName_), czId(czId_), czPort(czPort_), czHostName(czHostName_) {} + : czName(czName_), czId(czId_), czPort(czPort_), czHostName(czHostName_) {} }; - /// &&& doc This class just contains the worker id and network communication /// information, but it may be desirable to store connections to the /// worker here as well. @@ -97,10 +95,9 @@ class WorkerContactInfo { using WCMap = std::unordered_map; using WCMapPtr = std::shared_ptr; - static Ptr create(std::string const& wId_, std::string const& wHost_, - std::string const& wManagementHost_, int wPort_, TIMEPOINT updateTime_) { - return Ptr(new WorkerContactInfo(wId_, wHost_, - wManagementHost_, wPort_, updateTime_)); + static Ptr create(std::string const& wId_, std::string const& wHost_, std::string const& wManagementHost_, + int wPort_, TIMEPOINT updateTime_) { + return Ptr(new WorkerContactInfo(wId_, wHost_, wManagementHost_, wPort_, updateTime_)); } /// &&& doc @@ -112,12 +109,9 @@ class WorkerContactInfo { std::string cName(const char* fn) { return std::string("WorkerContactInfo::") + fn; } /// &&& make private - WorkerContactInfo(std::string const& wId_, std::string const& wHost_, - std::string const& wManagementHost_, int wPort_, TIMEPOINT updateTime_) - : wId(wId_), - wHost(wHost_), - wManagementHost(wManagementHost_), - wPort(wPort_) { + WorkerContactInfo(std::string const& wId_, std::string const& wHost_, std::string const& wManagementHost_, + int wPort_, TIMEPOINT updateTime_) + : wId(wId_), wHost(wHost_), wManagementHost(wManagementHost_), wPort(wPort_) { regUpdateTime(updateTime_); } std::string const wId; ///< key @@ -125,7 +119,6 @@ class WorkerContactInfo { std::string const wManagementHost; ///< "management-host-name" entry. int const wPort; ///< "management-port" entry. - /// Return true if all members, aside from updateTime, are equal. bool isSameContactInfo(WorkerContactInfo const& other) const { return (wId == other.wId && wHost == other.wHost && wManagementHost == other.wManagementHost && @@ -156,42 +149,31 @@ class WorkerContactInfo { /// &&& Store in seconds since epoch to make atomic? TIMEPOINT _regUpdate; - mutable std::mutex _rMtx; ///< protects _regUpdate + mutable std::mutex _rMtx; ///< protects _regUpdate }; - /// &&& doc -class WorkerQueryStatusData { +class WorkerQueryStatusData { public: using Ptr = std::shared_ptr; - /* &&& - enum State { - ALIVE = 0, - QUESTIONABLE, - DEAD - }; - */ - WorkerQueryStatusData() = delete; WorkerQueryStatusData(WorkerQueryStatusData const&) = delete; WorkerQueryStatusData& operator=(WorkerQueryStatusData const&) = delete; std::string cName(const char* fName) { - return std::string("WorkerQueryStatusData::") + fName + " " + ((_wInfo == nullptr) ? "?" : _wInfo->wId); + return std::string("WorkerQueryStatusData::") + fName + " " + + ((_wInfo == nullptr) ? "?" : _wInfo->wId); } - //&&&static std::string getStateStr(State st); - static Ptr create(WorkerContactInfo::Ptr const& wInfo, CzarContactInfo::Ptr const& czInfo, - std::string const& replicationInstanceId, std::string const& replicationAuthKey) { + std::string const& replicationInstanceId, std::string const& replicationAuthKey) { return Ptr(new WorkerQueryStatusData(wInfo, czInfo, replicationInstanceId, replicationAuthKey)); } /// &&& doc - static Ptr createJson(nlohmann::json const& czarJson, - std::string const& replicationInstanceId, std::string const& replicationAuthKey, TIMEPOINT updateTm); - + static Ptr createJson(nlohmann::json const& czarJson, std::string const& replicationInstanceId, + std::string const& replicationAuthKey, TIMEPOINT updateTm); ~WorkerQueryStatusData() = default; @@ -200,33 +182,79 @@ class WorkerQueryStatusData { /// &&& doc void addDeadUberJobs(QueryId qId, std::vector ujIds, TIMEPOINT tm); + /// &&& doc + void addToDoneDeleteFiles(QueryId qId); + + /// &&& doc + void addToDoneKeepFiles(QueryId qId); + + /// &&& doc + void removeDeadUberJobsFor(QueryId qId); + + void setCzarCancelAfterRestart(CzarIdType czId, QueryId lastQId) { + std::lock_guard mapLg(_mapMtx); + _czarCancelAfterRestart = true; + _czarCancelAfterRestartCzId = czId; + _czarCancelAfterRestartQId = lastQId; + } + + bool isCzarRestart() const { return _czarCancelAfterRestart; } + CzarIdType getCzarRestartCzarId() const { return _czarCancelAfterRestartCzId; } + QueryId getCzarRestartQueryId() const { return _czarCancelAfterRestartQId; } + std::string dump() const; -//&&&private: + //&&&private: // &&& Most of this needs to be made private again. WorkerQueryStatusData(WorkerContactInfo::Ptr const& wInfo, CzarContactInfo::Ptr const& czInfo, - std::string const& replicationInstanceId, std::string const& replicationAuthKey) - : _wInfo(wInfo), _czInfo(czInfo), - _replicationInstanceId(replicationInstanceId), _replicationAuthKey(replicationAuthKey) {} + std::string const& replicationInstanceId, std::string const& replicationAuthKey) + : _wInfo(wInfo), + _czInfo(czInfo), + _replicationInstanceId(replicationInstanceId), + _replicationAuthKey(replicationAuthKey) {} - std::map _qIdDoneKeepFiles; ///< &&& doc - limit reached - std::map _qIdDoneDeleteFiles; ///< &&& doc -cancelled/finished - std::map> _qIdDeadUberJobs; ///< &&& doc + std::map _qIdDoneKeepFiles; ///< &&& doc - limit reached + std::map _qIdDoneDeleteFiles; ///< &&& doc -cancelled/finished + std::map> _qIdDeadUberJobs; ///< &&& doc + std::atomic _czarCancelAfterRestart = false; + CzarIdType _czarCancelAfterRestartCzId = 0; + QueryId _czarCancelAfterRestartQId = 0; - /// &&& TODO:UJ Worth the effort to inform worker of killed UberJobs? - //std::map> _killedUberJobs; + /// Protects _qIdDoneKeepFiles, _qIdDoneDeleteFiles, _qIdDeadUberJobs, + /// and czarCancelAfter variables. + mutable std::mutex _mapMtx; - WorkerContactInfo::Ptr _wInfo; ///< &&& doc - CzarContactInfo::Ptr _czInfo; //< &&& doc + WorkerContactInfo::Ptr _wInfo; ///< &&& doc make const??? + CzarContactInfo::Ptr _czInfo; //< &&& doc make const??? - std::string const _replicationInstanceId; ///< &&& doc - std::string const _replicationAuthKey; ///< &&& doc + std::string const _replicationInstanceId; ///< &&& doc + std::string const _replicationAuthKey; ///< &&& doc - /// &&& doc - std::shared_ptr serializeJson(double timeoutAliveSecs, double timeoutDeadSecs, double maxLifetime); + /// Create a json object held by a shared pointer to use as a message. + /// Old objects in this instance will be removed after being added to the + /// json message. + std::shared_ptr serializeJson(double maxLifetime); + + /// Add contents of qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs to `jsWR` + void addListsToJson(nlohmann::json& jsWR, TIMEPOINT tm, double maxLifetime); /// &&& doc /// @throws std::invalid_argument - bool _parseLists(nlohmann::json const& jsWorkerReq, TIMEPOINT updateTm); // &&& delete after basic testing + void parseLists(nlohmann::json const& jsWR, TIMEPOINT updateTm); + + /// &&& doc + nlohmann::json serializeResponseJson(); + + /// &&& doc + bool handleResponseJson(nlohmann::json const& jsResp); + + /// &&& doc + ///&&&void handleCzarRestart(); + + /// &&& doc + static void parseListsInto(nlohmann::json const& jsWR, TIMEPOINT updateTm, + std::map& doneKeepF, + std::map& doneDeleteF, + std::map>& deadUberJobs); }; } // namespace lsst::qserv::http diff --git a/src/http/testStatusData.cc b/src/http/testStatusData.cc index 97767dd9f..191053631 100644 --- a/src/http/testStatusData.cc +++ b/src/http/testStatusData.cc @@ -44,16 +44,15 @@ using namespace lsst::qserv::http; BOOST_AUTO_TEST_SUITE(Suite) BOOST_AUTO_TEST_CASE(CzarContactInfo) { - string const replicationInstanceId = "repliInstId"; string const replicationAuthKey = "repliIAuthKey"; - string const cName("czar_name"); - lsst::qserv::CzarIdType const cId = 32; - int cPort = 2022; - string const cHost("cz_host"); + string const czrName("czar_name"); + lsst::qserv::CzarIdType const czrId = 32; + int czrPort = 2022; + string const czrHost("cz_host"); - auto czarA = lsst::qserv::http::CzarContactInfo::create(cName, cId, cPort, cHost); + auto czarA = lsst::qserv::http::CzarContactInfo::create(czrName, czrId, czrPort, czrHost); LOGS_ERROR("&&& a czarA=" << czarA->dump()); auto czarAJs = czarA->serializeJson(); @@ -63,7 +62,7 @@ BOOST_AUTO_TEST_CASE(CzarContactInfo) { LOGS_ERROR("&&& c czarB=" << czarB); BOOST_REQUIRE(czarA->compare(*czarB)); - auto czarC = lsst::qserv::http::CzarContactInfo::create("different", cId, cPort, cHost); + auto czarC = lsst::qserv::http::CzarContactInfo::create("different", czrId, czrPort, czrHost); BOOST_REQUIRE(!czarA->compare(*czarC)); auto start = lsst::qserv::CLOCK::now(); @@ -80,29 +79,30 @@ BOOST_AUTO_TEST_CASE(CzarContactInfo) { BOOST_REQUIRE(workerA->isSameContactInfo(*workerA1)); // WorkerQueryStatusData - auto wqsdA = lsst::qserv::http::WorkerQueryStatusData::create(workerA, czarA, replicationInstanceId, replicationAuthKey); + auto wqsdA = lsst::qserv::http::WorkerQueryStatusData::create(workerA, czarA, replicationInstanceId, + replicationAuthKey); LOGS_ERROR("&&& g wqsdA=" << wqsdA->dump()); - double timeoutAliveSecs = 100.0; - double timeoutDeadSecs = 2*timeoutAliveSecs; + //&&&double timeoutAliveSecs = 100.0; + //&&&double timeoutDeadSecs = 2*timeoutAliveSecs; double maxLifetime = 300.0; - auto jsDataA = wqsdA->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + auto jsDataA = wqsdA->serializeJson(maxLifetime); LOGS_ERROR("&&& h jsDataA=" << *jsDataA); // Check that empty lists work. - auto wqsdA1 = lsst::qserv::http::WorkerQueryStatusData::createJson(*jsDataA, replicationInstanceId, replicationAuthKey, start1Sec); + auto wqsdA1 = lsst::qserv::http::WorkerQueryStatusData::createJson(*jsDataA, replicationInstanceId, + replicationAuthKey, start1Sec); LOGS_ERROR("&&& i wqsdA1=" << wqsdA1->dump()); - auto jsDataA1 = wqsdA1->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + auto jsDataA1 = wqsdA1->serializeJson(maxLifetime); BOOST_REQUIRE(*jsDataA == *jsDataA1); - - vector qIdsDelFiles = { 7, 8, 9, 15, 25, 26, 27, 30 }; - vector qIdsKeepFiles = { 1, 2, 3, 4, 6, 10, 13, 19, 33 }; + vector qIdsDelFiles = {7, 8, 9, 15, 25, 26, 27, 30}; + vector qIdsKeepFiles = {1, 2, 3, 4, 6, 10, 13, 19, 33}; for (auto const qIdDF : qIdsDelFiles) { wqsdA->_qIdDoneDeleteFiles[qIdDF] = start; } - jsDataA = wqsdA->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + jsDataA = wqsdA->serializeJson(maxLifetime); LOGS_ERROR("&&& j jsDataA=" << jsDataA); BOOST_REQUIRE(*jsDataA != *jsDataA1); @@ -114,27 +114,42 @@ BOOST_AUTO_TEST_CASE(CzarContactInfo) { LOGS_ERROR("&&& i wqsdA=" << wqsdA->dump()); - jsDataA = wqsdA->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + jsDataA = wqsdA->serializeJson(maxLifetime); LOGS_ERROR("&&& j jsDataA=" << *jsDataA); auto start5Sec = start + 5s; - auto workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createJson(*jsDataA, replicationInstanceId, replicationAuthKey, start5Sec); - auto jsWorkerAFromJson = workerAFromJson->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + auto workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createJson( + *jsDataA, replicationInstanceId, replicationAuthKey, start5Sec); + auto jsWorkerAFromJson = workerAFromJson->serializeJson(maxLifetime); BOOST_REQUIRE(*jsDataA == *jsWorkerAFromJson); wqsdA->addDeadUberJobs(12, {34}, start5Sec); wqsdA->addDeadUberJobs(91, {77}, start5Sec); wqsdA->addDeadUberJobs(1059, {1, 4, 6, 7, 8, 10, 3, 22, 93}, start5Sec); - jsDataA = wqsdA->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + jsDataA = wqsdA->serializeJson(maxLifetime); LOGS_ERROR("&&& k jsDataA=" << *jsDataA); BOOST_REQUIRE(*jsDataA != *jsWorkerAFromJson); - workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createJson(*jsDataA, replicationInstanceId, replicationAuthKey, start5Sec); - jsWorkerAFromJson = workerAFromJson->serializeJson(timeoutAliveSecs, timeoutDeadSecs, maxLifetime); + workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createJson(*jsDataA, replicationInstanceId, + replicationAuthKey, start5Sec); + jsWorkerAFromJson = workerAFromJson->serializeJson(maxLifetime); LOGS_ERROR("&&& l jsWorkerAFromJson=" << *jsWorkerAFromJson); BOOST_REQUIRE(*jsDataA == *jsWorkerAFromJson); + // Make the response, which contains lists of the items handled by the workers. + auto jsWorkerResp = workerAFromJson->serializeResponseJson(); + + // test removal of elements after response. + BOOST_REQUIRE(!wqsdA->_qIdDoneDeleteFiles.empty()); + BOOST_REQUIRE(!wqsdA->_qIdDoneKeepFiles.empty()); + BOOST_REQUIRE(!wqsdA->_qIdDeadUberJobs.empty()); + + wqsdA->handleResponseJson(jsWorkerResp); + + BOOST_REQUIRE(wqsdA->_qIdDoneDeleteFiles.empty()); + BOOST_REQUIRE(wqsdA->_qIdDoneKeepFiles.empty()); + BOOST_REQUIRE(wqsdA->_qIdDeadUberJobs.empty()); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/proto/ScanTableInfo.h b/src/proto/ScanTableInfo.h index bb362c51d..76d03e5f4 100644 --- a/src/proto/ScanTableInfo.h +++ b/src/proto/ScanTableInfo.h @@ -35,7 +35,7 @@ namespace lsst::qserv::proto { /// Structure to store shared scan information for a single table. /// -struct ScanTableInfo { // &&& check if still useful +struct ScanTableInfo { // &&& check if still useful using ListOf = std::vector; ScanTableInfo() = default; diff --git a/src/proto/worker.proto b/src/proto/worker.proto index 4ef2ae4e7..0310420ed 100644 --- a/src/proto/worker.proto +++ b/src/proto/worker.proto @@ -89,7 +89,7 @@ message WorkerCommandStatus { optional string error = 2 [default = ""]; // Optional error message (depends on the code) } -// &&& try to eliminate this +// &&&QM try to eliminate this message QueryManagement { enum Operation { CANCEL_AFTER_RESTART = 1; // Cancel older queries before the specified query (excluding that one). diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 9edc44ed3..edc05bde3 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -489,14 +489,20 @@ void Executive::_squashSuperfluous() { LOGS(_log, LOG_LVL_DEBUG, "Executive::squashSuperfluous done"); } -void Executive::sendWorkerCancelMsg(bool deleteResults) { +void Executive::sendWorkerCancelMsg(bool deleteResults) { // &&&QM rename sendEndMsgs // TODO:UJ need to send a message to the worker that the query is cancelled and all result files // should be delete + // &&&QM + // TODO:UJ &&& worker needs to monitor registry to see if czar dies + // &&& - worker will need to kill related queries/uberjobs and store info to send to the + // &&& dead czar in case it comes back to life. LOGS(_log, LOG_LVL_ERROR, "TODO:UJ NEED CODE Executive::sendWorkerCancelMsg to send messages to workers to cancel this czarId " "+ " "queryId. " << deleteResults); + + czar::Czar::getCzar()->getCzarRegistry()->endUserQuery(_id, deleteResults); // &&&QM } int Executive::getNumInflight() const { diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index 5c4beba84..cdbb967f6 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -52,7 +52,7 @@ #include "util/ThreadPool.h" // TODO:UJ replace with better enable/disable feature, or just use only UberJobs -#define uberJobsEnabled 1 // &&& delete +#define uberJobsEnabled 1 // &&& delete namespace lsst::qserv { @@ -255,8 +255,8 @@ class Executive : public std::enable_shared_from_this { std::atomic _empty{true}; std::shared_ptr _messageStore; ///< MessageStore for logging - JobMap _jobMap; ///< Contains information about all jobs. - JobMap _incompleteJobs; ///< Map of incomplete jobs. + JobMap _jobMap; ///< Contains information about all jobs. + JobMap _incompleteJobs; ///< Map of incomplete jobs. /// How many jobs are used in this query. 1 avoids possible 0 of 0 jobs completed race condition. /// The correct value is set when it is available. std::atomic _totalJobs{1}; @@ -267,7 +267,7 @@ class Executive : public std::enable_shared_from_this { /** Execution errors */ util::MultiError _multiError; - std::atomic _requestCount{0}; ///< Count of submitted jobs + std::atomic _requestCount{0}; ///< Count of submitted jobs util::Flag _cancelled{false}; ///< Has execution been cancelled. // Mutexes diff --git a/src/qdisp/JobBase.h b/src/qdisp/JobBase.h index 1a4239457..a77476daa 100644 --- a/src/qdisp/JobBase.h +++ b/src/qdisp/JobBase.h @@ -59,7 +59,8 @@ class JobBase : public std::enable_shared_from_this { virtual UberJobId getJobId() const = 0; virtual std::string const& getIdStr() const = 0; virtual std::shared_ptr getQdispPool() = 0; - //&&& virtual std::string const& getPayload() const = 0; ///< const& in return type is essential for xrootd + //&&& virtual std::string const& getPayload() const = 0; ///< const& in return type is essential for + //xrootd virtual std::shared_ptr getRespHandler() = 0; virtual std::shared_ptr getStatus() = 0; virtual bool getScanInteractive() const = 0; diff --git a/src/qdisp/JobDescription.cc b/src/qdisp/JobDescription.cc index ad8d3e62b..ab4234545 100644 --- a/src/qdisp/JobDescription.cc +++ b/src/qdisp/JobDescription.cc @@ -102,8 +102,8 @@ bool JobDescription::getScanInteractive() const { return _chunkQuerySpec->scanIn int JobDescription::getScanRating() const { return _chunkQuerySpec->scanInfo.scanRating; } ostream& operator<<(ostream& os, JobDescription const& jd) { - os << "job(id=" << jd._jobId << " ru=" << jd._resource.path() - << " attemptCount=" << jd._attemptCount << ")"; + os << "job(id=" << jd._jobId << " ru=" << jd._resource.path() << " attemptCount=" << jd._attemptCount + << ")"; return os; } diff --git a/src/qdisp/JobQuery.cc b/src/qdisp/JobQuery.cc index 7245e60a2..ad28f5c7e 100644 --- a/src/qdisp/JobQuery.cc +++ b/src/qdisp/JobQuery.cc @@ -63,7 +63,7 @@ JobQuery::~JobQuery() { } /// Cancel response handling. Return true if this is the first time cancel has been called. -bool JobQuery::cancel(bool superfluous) { /// &&& This can probably be simplified more +bool JobQuery::cancel(bool superfluous) { /// &&& This can probably be simplified more QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getJobId()); LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel()"); if (_cancelled.exchange(true) == false) { diff --git a/src/qdisp/UberJob.h b/src/qdisp/UberJob.h index b0f6fc24e..5416c260d 100644 --- a/src/qdisp/UberJob.h +++ b/src/qdisp/UberJob.h @@ -95,7 +95,8 @@ class UberJob : public JobBase { /// Set the worker information needed to send messages to the worker believed to /// be responsible for the chunks handled in this UberJob. - void setWorkerContactInfo(http::WorkerContactInfo::Ptr const& wContactInfo) { // Change to ActiveWorker &&& ??? + void setWorkerContactInfo( + http::WorkerContactInfo::Ptr const& wContactInfo) { // Change to ActiveWorker &&& ??? _wContactInfo = wContactInfo; } @@ -159,7 +160,7 @@ class UberJob : public JobBase { czar::CzarChunkMap::WorkerChunksData::Ptr _workerData; // TODO:UJ this may not be needed // Contact information for the target worker. - http::WorkerContactInfo::Ptr _wContactInfo; // Change to ActiveWorker &&& ??? + http::WorkerContactInfo::Ptr _wContactInfo; // Change to ActiveWorker &&& ??? }; } // namespace lsst::qserv::qdisp diff --git a/src/qdisp/testQDisp.cc b/src/qdisp/testQDisp.cc index b750af776..ce3a4069d 100644 --- a/src/qdisp/testQDisp.cc +++ b/src/qdisp/testQDisp.cc @@ -337,7 +337,8 @@ BOOST_AUTO_TEST_CASE(ExecutiveCancel) { { LOGS_DEBUG("ExecutiveCancel: squash it test"); SetupTest tEnv("respdata"); - //&&&qdisp::XrdSsiServiceMock::setGo(false); // Can't let jobs run or they are untracked before squash + //&&&qdisp::XrdSsiServiceMock::setGo(false); // Can't let jobs run or they are untracked before + //squash SequentialInt sequence(0); tEnv.jqTest = executiveTest(tEnv.ex, sequence, chunkId, tEnv.qrMsg, 1); tEnv.ex->squash(); @@ -355,7 +356,8 @@ BOOST_AUTO_TEST_CASE(ExecutiveCancel) { { LOGS_DEBUG("ExecutiveCancel: squash 20 test"); SetupTest tEnv("respdata"); - //&&&qdisp::XrdSsiServiceMock::setGo(false); // Can't let jobs run or they are untracked before squash + //&&&qdisp::XrdSsiServiceMock::setGo(false); // Can't let jobs run or they are untracked before + //squash SequentialInt sequence(0); executiveTest(tEnv.ex, sequence, chunkId, tEnv.qrMsg, 20); tEnv.ex->squash(); diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 722d4ea0c..cf8c06fc3 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -365,15 +365,9 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared } return true; } else { - auto ujData = _uberJobData.lock(); - if (ujData == nullptr) { - LOGS(_log, LOG_LVL_WARN, - __func__ << " not sending error as ujData is null " << multiErr.toString()); - return false; - } // Delete the result file as nobody will come looking for it. _kill(tMtxLock, " buildAndTransmitError"); - return ujData->responseError(multiErr, task, cancelled); + return _uberJobData->responseError(multiErr, task, cancelled); } return false; } @@ -660,13 +654,8 @@ bool FileChannelShared::_sendResponse(lock_guard const& tMtxLock, shared_ return false; } } else { - auto ujData = _uberJobData.lock(); - if (ujData == nullptr) { - LOGS(_log, LOG_LVL_WARN, __func__ << " uberJobData is nullptr for ujId=" << _uberJobId); - return false; - } string httpFileUrl = task->resultFileHttpUrl(); - ujData->responseFileReady(httpFileUrl, _rowcount, _transmitsize, _headerCount); + _uberJobData->responseFileReady(httpFileUrl, _rowcount, _transmitsize, _headerCount); } return true; } diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 102f87fe2..69e4268fe 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -243,7 +243,7 @@ class FileChannelShared { bool _isUberJob; ///< true if this is using UberJob http. To be removed when _sendChannel goes away. std::shared_ptr const _sendChannel; ///< Used to pass encoded information to XrdSsi. - std::weak_ptr _uberJobData; ///< Pointer to UberJobData + std::shared_ptr _uberJobData; ///< Contains czar contact info. UberJobId const _uberJobId; ///< The UberJobId qmeta::CzarId const _czarId; ///< id of the czar that requested this task(s). TODO:UJ delete diff --git a/src/wbase/MsgProcessor.h b/src/wbase/MsgProcessor.h index 8b48de7ec..4f875f93e 100644 --- a/src/wbase/MsgProcessor.h +++ b/src/wbase/MsgProcessor.h @@ -42,21 +42,23 @@ class WorkerCommand; namespace lsst::qserv::wbase { /// MsgProcessor implementations handle incoming Task objects. -struct MsgProcessor { // &&& delete file if possible +struct MsgProcessor { // &&& delete file if possible virtual ~MsgProcessor() {} /// Process a group of query processing tasks. - virtual void processTasks(std::vector> const& tasks) = 0; // &&& delete + virtual void processTasks(std::vector> const& tasks) = 0; // &&& delete /// Process a managememt command - virtual void processCommand(std::shared_ptr const& command) = 0; // &&& can this be deleted + virtual void processCommand( + std::shared_ptr const& command) = 0; // &&& can this be deleted /** * Retreive the status of queries being processed by the worker. * @param taskSelector Task selection criterias. * @return a JSON representation of the object's status for the monitoring */ - virtual nlohmann::json statusToJson(wbase::TaskSelector const& taskSelector) = 0; // &&& can this be deleted + virtual nlohmann::json statusToJson( + wbase::TaskSelector const& taskSelector) = 0; // &&& can this be deleted }; } // namespace lsst::qserv::wbase diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index cc7c1668f..b3c4f8818 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -120,13 +120,21 @@ atomic taskSequence{0}; ///< Unique identifier source for Task. /// available to define the action to take when this task is run, so /// Command::setFunc() is used set the action later. This is why /// the util::CommandThreadPool is not called here. +/* &&& Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, shared_ptr const& userQueryInfo, size_t templateId, bool hasSubchunks, int subchunkId, string const& db, proto::ScanInfo const& scanInfo, bool scanInteractive, int maxTableSize, vector const& fragSubTables, vector const& fragSubchunkIds, - shared_ptr const& sc, uint16_t resultsHttpPort) - : _userQueryInfo(userQueryInfo), - _sendChannel(sc), + shared_ptr const& sc, std::shared_ptr const& +queryStats_, uint16_t resultsHttpPort) : _userQueryInfo(userQueryInfo), +*/ +Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, + size_t templateId, bool hasSubchunks, int subchunkId, string const& db, + proto::ScanInfo const& scanInfo, bool scanInteractive, int maxTableSize, + vector const& fragSubTables, vector const& fragSubchunkIds, + shared_ptr const& sc, + std::shared_ptr const& queryStats_, uint16_t resultsHttpPort) + : _sendChannel(sc), _tSeq(++taskSequence), _qId(ujData->getQueryId()), _templateId(templateId), @@ -141,6 +149,7 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun _czarId(ujData->getCzarId()), _scanInfo(scanInfo), _scanInteractive(scanInteractive), + _queryStats(queryStats_), _maxTableSize(maxTableSize * ::MB_SIZE_BYTES) { // These attributes will be passed back to Czar in the Protobuf response // to advice which result delivery channel to use. @@ -191,14 +200,15 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun } Task::~Task() { + /* &&& _userQueryInfo.reset(); UserQueryInfo::uqMapErase(_qId); if (UserQueryInfo::uqMapGet(_qId) == nullptr) { LOGS(_log, LOG_LVL_TRACE, "~Task Cleared uqMap entry for _qId=" << _qId); } + */ } - std::vector Task::createTasksForChunk( std::shared_ptr const& ujData, nlohmann::json const& jsJobs, std::shared_ptr const& sendChannel, proto::ScanInfo const& scanInfo, @@ -209,7 +219,9 @@ std::vector Task::createTasksForChunk( QueryId qId = ujData->getQueryId(); UberJobId ujId = ujData->getUberJobId(); - UserQueryInfo::Ptr userQueryInfo = UserQueryInfo::uqMapInsert(qId); + //&&&UserQueryInfo::Ptr userQueryInfo = UserQueryInfo::uqMapInsert(qId); + wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId); + UserQueryInfo::Ptr userQueryInfo = queryStats->getUserQueryInfo(); string funcN(__func__); funcN += " QID=" + to_string(qId) + " "; @@ -274,19 +286,35 @@ std::vector Task::createTasksForChunk( if (fragSubchunkIds.empty()) { bool const noSubchunks = false; int const subchunkId = -1; + /* &&& auto task = Task::Ptr(new Task( ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, userQueryInfo, templateId, noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, - maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, resultsHttpPort)); + maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, queryStats, + resultsHttpPort)); + */ + auto task = Task::Ptr(new Task( + ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, templateId, + noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, + fragSubTables, fragSubchunkIds, sendChannel, queryStats, resultsHttpPort)); + vect.push_back(task); } else { for (auto subchunkId : fragSubchunkIds) { bool const hasSubchunks = true; + auto task = Task::Ptr(new Task(ujData, jdJobId, jdAttemptCount, jdChunkId, + fragmentNumber, templateId, hasSubchunks, subchunkId, + jdQuerySpecDb, scanInfo, scanInteractive, + maxTableSizeMb, fragSubTables, fragSubchunkIds, + sendChannel, queryStats, resultsHttpPort)); + /* &&& auto task = Task::Ptr(new Task(ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, userQueryInfo, templateId, hasSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, fragSubTables, - fragSubchunkIds, sendChannel, resultsHttpPort)); + fragSubchunkIds, sendChannel, queryStats, + resultsHttpPort)); + */ vect.push_back(task); } } @@ -340,13 +368,30 @@ void Task::action(util::CmdData* data) { } string Task::getQueryString() const { - string qs = _userQueryInfo->getTemplate(_templateId); + //&&&string qs = _userQueryInfo->getTemplate(_templateId); + auto qStats = _queryStats.lock(); + if (qStats == nullptr) { + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " _queryStats could not be locked"); + return string(""); + } + + // auto uQInfo = _userQueryInfo.lock(); + auto uQInfo = qStats->getUserQueryInfo(); + /* &&& + if (uQInfo == nullptr) { + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " _userQueryInfo could not be locked"); + return string(""); + } + */ + string qs = uQInfo->getTemplate(_templateId); + LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& a qs=" << qs); boost::algorithm::replace_all(qs, CHUNK_TAG, to_string(_chunkId)); boost::algorithm::replace_all(qs, SUBCHUNK_TAG, to_string(_subchunkId)); + LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& b qs=" << qs); return qs; } -void Task::setQueryStatistics(wpublish::QueryStatistics::Ptr const& qStats) { _queryStats = qStats; } +//&&&void Task::setQueryStatistics(wpublish::QueryStatistics::Ptr const& qStats) { _queryStats = qStats; } wpublish::QueryStatistics::Ptr Task::getQueryStats() const { auto qStats = _queryStats.lock(); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index bb37949dd..47ec091ad 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -167,18 +167,29 @@ class Task : public util::CommandForThreadPool { bool operator()(Ptr const& x, Ptr const& y); }; + std::string cName(const char* func) const { return std::string("Task::") + func; } + // TODO:UJ too many parameters. // - fragmentNumber seems pointless // - hasSubchunks seems redundant. // Hopefully, many are the same for all tasks and can be moved to ujData and userQueryInfo. // Candidates: scanInfo, maxTableSizeMb, FileChannelShared, resultsHttpPort. // Unfortunately, this will be much easier if it is done after xrootd method is removed. + /* &&& Task(std::shared_ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, std::shared_ptr const& userQueryInfo, size_t templateId, bool hasSubchunks, int subchunkId, std::string const& db, proto::ScanInfo const& scanInfo, bool scanInteractive, int maxTableSizeMb, std::vector const& fragSubTables, std::vector const& fragSubchunkIds, std::shared_ptr const& sc, + std::shared_ptr const& queryStats_, uint16_t resultsHttpPort = 8080); + */ + Task(std::shared_ptr const& ujData, int jobId, int attemptCount, int chunkId, + int fragmentNumber, size_t templateId, bool hasSubchunks, int subchunkId, std::string const& db, + proto::ScanInfo const& scanInfo, bool scanInteractive, int maxTableSizeMb, + std::vector const& fragSubTables, std::vector const& fragSubchunkIds, + std::shared_ptr const& sc, + std::shared_ptr const& queryStats_, uint16_t resultsHttpPort = 8080); Task& operator=(const Task&) = delete; Task(const Task&) = delete; @@ -194,7 +205,7 @@ class Task : public util::CommandForThreadPool { std::shared_ptr const& queriesAndChunks, uint16_t resultsHttpPort = 8080); - void setQueryStatistics(std::shared_ptr const& qC); + //&&&void setQueryStatistics(std::shared_ptr const& qC); std::shared_ptr getSendChannel() const { return _sendChannel; } void resetSendChannel() { _sendChannel.reset(); } ///< reset the shared pointer for FileChannelShared @@ -334,7 +345,7 @@ class Task : public util::CommandForThreadPool { } private: - std::shared_ptr _userQueryInfo; ///< Details common to Tasks in this UserQuery. + //&&&std::weak_ptr _userQueryInfo; ///< Details common to Tasks in this UserQuery. std::shared_ptr _sendChannel; ///< Send channel. uint64_t const _tSeq = 0; ///< identifier for the specific task @@ -372,6 +383,10 @@ class Task : public util::CommandForThreadPool { bool _scanInteractive; ///< True if the czar thinks this query should be interactive. bool _onInteractive{ false}; ///< True if the scheduler put this task on the interactive (group) scheduler. + + /// Stores information on the query's resource usage. + std::weak_ptr const _queryStats; + int64_t _maxTableSize = 0; std::atomic _memHandle{memman::MemMan::HandleType::INVALID}; memman::MemMan::Ptr _memMan; @@ -387,9 +402,6 @@ class Task : public util::CommandForThreadPool { std::chrono::system_clock::time_point _finishTime; ///< data transmission to Czar fiished size_t _totalSize = 0; ///< Total size of the result so far. - /// Stores information on the query's resource usage. - std::weak_ptr _queryStats; - std::atomic _mysqlThreadId{0}; ///< 0 if not connected to MySQL std::atomic _booted{false}; ///< Set to true if this task takes too long and is booted. diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index 64538fc6c..ac828fa4d 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -185,4 +185,12 @@ bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr lg(_ujTasksMtx); + for (auto const& task : _ujTasks) { + task->cancel(); + } +} + } // namespace lsst::qserv::wbase diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index f4ab4e303..03813979e 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -81,6 +81,7 @@ class UberJobData { /// Add the tasks defined in the UberJob to this UberJobData object. void addTasks(std::vector> const& tasks) { + std::lock_guard tLg(_ujTasksMtx); _ujTasks.insert(_ujTasks.end(), tasks.begin(), tasks.end()); } @@ -94,6 +95,9 @@ class UberJobData { std::string getIdStr() const { return _idStr; } std::string cName(std::string const& funcName) { return "UberJobData::" + funcName + " " + getIdStr(); } + /// &&& doc + void cancelAllTasks(); + private: UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string czarHost, int czarPort, uint64_t queryId, std::string const& workerId, @@ -113,6 +117,8 @@ class UberJobData { std::vector> _ujTasks; std::shared_ptr _fileChannelShared; + std::mutex _ujTasksMtx; ///< Protects _ujTasks. + std::string const _idStr; }; diff --git a/src/wbase/UserQueryInfo.cc b/src/wbase/UserQueryInfo.cc index 79c24f07e..72d148060 100644 --- a/src/wbase/UserQueryInfo.cc +++ b/src/wbase/UserQueryInfo.cc @@ -39,45 +39,6 @@ namespace lsst::qserv::wbase { UserQueryInfo::UserQueryInfo(QueryId qId) : _qId(qId) {} -UserQueryInfo::Ptr UserQueryInfo::uqMapInsert(QueryId qId) { - Ptr uqi; - lock_guard lg(_uqMapMtx); - auto iter = _uqMap.find(qId); - if (iter != _uqMap.end()) { - uqi = iter->second.lock(); - } - // If uqi is invalid at this point, a new one needs to be made. - if (uqi == nullptr) { - uqi = make_shared(qId); - _uqMap[qId] = uqi; - } - return uqi; -} - -UserQueryInfo::Ptr UserQueryInfo::uqMapGet(QueryId qId) { - lock_guard lg(_uqMapMtx); - auto iter = _uqMap.find(qId); - if (iter != _uqMap.end()) { - return iter->second.lock(); - } - return nullptr; -} - -void UserQueryInfo::uqMapErase(QueryId qId) { - lock_guard lg(_uqMapMtx); - auto iter = _uqMap.find(qId); - if (iter != _uqMap.end()) { - // If the weak pointer has 0 real references - if (iter->second.expired()) { - _uqMap.erase(qId); - } - } -} - -UserQueryInfo::Map UserQueryInfo::_uqMap; - -mutex UserQueryInfo::_uqMapMtx; - size_t UserQueryInfo::addTemplate(std::string const& templateStr) { size_t j = 0; { @@ -108,4 +69,41 @@ void UserQueryInfo::addUberJob(std::shared_ptr const& ujData) { _uberJobMap[ujId] = ujData; } +/// &&& doc +void UserQueryInfo::cancelFromCzar() { + if (_cancelledByCzar.exchange(true)) { + LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " already cancelledByCzar"); + return; + } + lock_guard lockUq(_uberJobMapMtx); + for (auto const& [ujId, weakUjPtr] : _uberJobMap) { + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " cancelling ujId=" << ujId); + auto ujPtr = weakUjPtr.lock(); + if (ujPtr != nullptr) { + ujPtr->cancelAllTasks(); + } + } +} + +/// &&& doc +void UserQueryInfo::cancelUberJob(UberJobId ujId) { + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " cancelling ujId=" << ujId); + lock_guard lockUq(_uberJobMapMtx); + _deadUberJobSet.insert(ujId); + auto iter = _uberJobMap.find(ujId); + if (iter != _uberJobMap.end()) { + auto weakUjPtr = iter->second; + auto ujPtr = weakUjPtr.lock(); + if (ujPtr != nullptr) { + ujPtr->cancelAllTasks(); + } + } +} + +bool UserQueryInfo::isUberJobDead(UberJobId ujId) const { + lock_guard lockUq(_uberJobMapMtx); + auto iter = _deadUberJobSet.find(ujId); + return iter != _deadUberJobSet.end(); +} + } // namespace lsst::qserv::wbase diff --git a/src/wbase/UserQueryInfo.h b/src/wbase/UserQueryInfo.h index 4b7a799f0..4694d8834 100644 --- a/src/wbase/UserQueryInfo.h +++ b/src/wbase/UserQueryInfo.h @@ -24,6 +24,7 @@ #define LSST_QSERV_WBASE_USERQUERYINFO_H // System headers +#include #include #include #include @@ -44,20 +45,18 @@ class UserQueryInfo { using Ptr = std::shared_ptr; using Map = std::map>; - static Ptr uqMapInsert(QueryId qId); - static Ptr uqMapGet(QueryId qId); - /// Erase the entry for `qId` in the map, as long as there are only - /// weak references to the UserQueryInfoObject. - /// Clear appropriate local and member references before calling this. - static void uqMapErase(QueryId qId); - - UserQueryInfo(QueryId qId); UserQueryInfo() = delete; UserQueryInfo(UserQueryInfo const&) = delete; UserQueryInfo& operator=(UserQueryInfo const&) = delete; + static Ptr create(QueryId qId) { return std::shared_ptr(new UserQueryInfo(qId)); } + ~UserQueryInfo() = default; + std::string cName(const char* func) { + return std::string("UserQueryInfo::") + func + " qId=" + std::to_string(_qId); + } + /// Add a query template to the map of templates for this user query. size_t addTemplate(std::string const& templateStr); @@ -68,9 +67,21 @@ class UserQueryInfo { /// Add an UberJobData object to the UserQueryInfo. void addUberJob(std::shared_ptr const& ujData); + /// &&& doc + bool getCancelledByCzar() const { return _cancelledByCzar; } + + /// &&& doc + void cancelFromCzar(); + + /// &&& doc + void cancelUberJob(UberJobId ujId); + + bool isUberJobDead(UberJobId ujId) const; + + QueryId getQueryId() const { return _qId; } + private: - static Map _uqMap; - static std::mutex _uqMapMtx; ///< protects _uqMap + UserQueryInfo(QueryId qId); QueryId const _qId; ///< The User Query Id number. @@ -78,11 +89,14 @@ class UserQueryInfo { /// This must be a vector. New entries are always added to the end so as not /// to alter existing indexes into the vector. std::vector _templates; - std::mutex _uqMtx; ///< protects _templates; + std::mutex _uqMtx; ///< protects _templates /// Map of all UberJobData objects on this worker for this User Query. - std::map> _uberJobMap; - std::mutex _uberJobMapMtx; ///< protects _uberJobMap; + std::map> _uberJobMap; + std::set _deadUberJobSet; ///< Set of cancelled UberJob Ids. + mutable std::mutex _uberJobMapMtx; ///< protects _uberJobMap, _deadUberJobSet + + std::atomic _cancelledByCzar{false}; }; } // namespace lsst::qserv::wbase diff --git a/src/wcontrol/Foreman.cc b/src/wcontrol/Foreman.cc index df3ed4063..288ed67e8 100644 --- a/src/wcontrol/Foreman.cc +++ b/src/wcontrol/Foreman.cc @@ -146,6 +146,8 @@ Foreman::~Foreman() { _httpServer->stop(); } +wpublish::QueryStatistics::Ptr Foreman::addQueryId(QueryId qId) { return _queries->addQueryId(qId); } + void Foreman::processTasks(vector const& tasks) { std::vector cmds; for (auto const& task : tasks) { diff --git a/src/wcontrol/Foreman.h b/src/wcontrol/Foreman.h index 6fe5ca439..5045cfe96 100644 --- a/src/wcontrol/Foreman.h +++ b/src/wcontrol/Foreman.h @@ -66,6 +66,7 @@ class QueryRunner; namespace lsst::qserv::wpublish { class ChunkInventory; class QueriesAndChunks; +class QueryStatistics; } // namespace lsst::qserv::wpublish // This header declarations @@ -128,11 +129,14 @@ class Foreman : public wbase::MsgProcessor { /// Process a group of query processing tasks. /// @see MsgProcessor::processTasks() - void processTasks(std::vector> const& tasks) override; // &&& delete + void processTasks(std::vector> const& tasks) override; // &&& delete /// Implement the corresponding method of the base class /// @see MsgProcessor::processCommand() - void processCommand(std::shared_ptr const& command) override; // &&& delete + void processCommand(std::shared_ptr const& command) override; // &&& delete + + /// &&& doc + std::shared_ptr addQueryId(QueryId qId); /// Implement the corresponding method of the base class /// @see MsgProcessor::statusToJson() diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 0e73f664d..eb76be91b 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -281,7 +281,8 @@ bool QueryRunner::_dispatchChannel() { if (taskSched != nullptr) { taskSched->histTimeOfRunningTasks->addEntry(primeT.getElapsed()); LOGS(_log, LOG_LVL_DEBUG, "QR " << taskSched->histTimeOfRunningTasks->getString("run")); - LOGS(_log, LOG_LVL_WARN, "&&&DASH QR " << taskSched->histTimeOfRunningTasks->getString("run")); + LOGS(_log, LOG_LVL_WARN, + "&&&DASH QR " << taskSched->histTimeOfRunningTasks->getString("run")); } else { LOGS(_log, LOG_LVL_ERROR, "QR runtaskSched == nullptr"); LOGS(_log, LOG_LVL_ERROR, "&&&DASH QR runtaskSched == nullptr"); diff --git a/src/wdb/testQueryRunner.cc b/src/wdb/testQueryRunner.cc index 276beaace..c59182858 100644 --- a/src/wdb/testQueryRunner.cc +++ b/src/wdb/testQueryRunner.cc @@ -146,7 +146,6 @@ struct Fixture { {"tblScanRating", mInfo.scanRating}}; chunkScanTables.push_back(move(cst)); - auto& jsFragments = jsJobMsg["queryFragments"]; /* &&& if (chunkQuerySpec.nextFragment.get()) { @@ -170,8 +169,8 @@ struct Fixture { for (unsigned int t = 0; t < (chunkQuerySpec.queries).size(); t++) { LOGS(_log, LOG_LVL_TRACE, (chunkQuerySpec.queries).at(t)); } - _addFragmentJson(jsFragments, resultTable, chunkQuerySpec.subChunkTables, chunkQuerySpec.subChunkIds, - chunkQuerySpec.queries); + _addFragmentJson(jsFragments, resultTable, chunkQuerySpec.subChunkTables, + chunkQuerySpec.subChunkIds, chunkQuerySpec.queries); } */ nlohmann::json jsFrag = {{"resultTable", mInfo.resultName}, @@ -230,22 +229,20 @@ BOOST_AUTO_TEST_CASE(Simple) { MsgInfo mInfo; auto msgJson = newTaskJson(mInfo); shared_ptr sendC(SendChannel::newNopChannel()); - auto sc = FileChannelShared::create(sendC, mInfo.czarId); + auto sChannel = FileChannelShared::create(sendC, mInfo.czarId); FakeBackend::Ptr backend = make_shared(); shared_ptr crm = ChunkResourceMgr::newMgr(backend); SqlConnMgr::Ptr sqlConnMgr = make_shared(20, 15); auto const queries = queriesAndChunks(); auto ujData = lsst::qserv::wbase::UberJobData::create(mInfo.uberJobId, mInfo.czarName, mInfo.czarId, - mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); + mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, + mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); lsst::qserv::proto::ScanInfo scanInfo; scanInfo.scanRating = mInfo.scanRating; scanInfo.infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); - vector taskVect = Task::createTasksForChunk( - ujData, *msgJson, sc, scanInfo, - mInfo.scanInteractive, mInfo.maxTableSize, - crm, - newMySqlConfig(), sqlConnMgr, - queries); + vector taskVect = + Task::createTasksForChunk(ujData, *msgJson, sChannel, scanInfo, mInfo.scanInteractive, + mInfo.maxTableSize, crm, newMySqlConfig(), sqlConnMgr, queries); Task::Ptr task = taskVect[0]; QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); BOOST_CHECK(a->runQuery()); @@ -278,20 +275,17 @@ BOOST_AUTO_TEST_CASE(Output) { SqlConnMgr::Ptr sqlConnMgr = make_shared(20, 15); auto const queries = queriesAndChunks(); auto ujData = lsst::qserv::wbase::UberJobData::create(mInfo.uberJobId, mInfo.czarName, mInfo.czarId, - mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); + mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, + mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); lsst::qserv::proto::ScanInfo scanInfo; scanInfo.scanRating = mInfo.scanRating; scanInfo.infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); - vector taskVect = Task::createTasksForChunk( - ujData, *msgJson, sc, scanInfo, - mInfo.scanInteractive, mInfo.maxTableSize, - crm, - newMySqlConfig(), sqlConnMgr, - queries); + vector taskVect = + Task::createTasksForChunk(ujData, *msgJson, sc, scanInfo, mInfo.scanInteractive, + mInfo.maxTableSize, crm, newMySqlConfig(), sqlConnMgr, queries); Task::Ptr task = taskVect[0]; QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); BOOST_CHECK(a->runQuery()); - } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/wpublish/QueriesAndChunks.cc b/src/wpublish/QueriesAndChunks.cc index 2499a6267..940be6698 100644 --- a/src/wpublish/QueriesAndChunks.cc +++ b/src/wpublish/QueriesAndChunks.cc @@ -119,21 +119,39 @@ void QueriesAndChunks::setBlendScheduler(shared_ptr cons void QueriesAndChunks::setRequiredTasksCompleted(unsigned int value) { _requiredTasksCompleted = value; } +QueryStatistics::Ptr QueriesAndChunks::addQueryId(QueryId qId) { + unique_lock guardStats(_queryStatsMapMtx); + auto itr = _queryStatsMap.find(qId); + QueryStatistics::Ptr stats; + if (_queryStatsMap.end() == itr) { + stats = QueryStatistics::create(qId); + _queryStatsMap[qId] = stats; + } else { + stats = itr->second; + } + return stats; +} + /// Add statistics for the Task, creating a QueryStatistics object if needed. void QueriesAndChunks::addTask(wbase::Task::Ptr const& task) { auto qid = task->getQueryId(); +#if 0 // &&& delete upper block unique_lock guardStats(_queryStatsMapMtx); auto itr = _queryStatsMap.find(qid); QueryStatistics::Ptr stats; if (_queryStatsMap.end() == itr) { stats = QueryStatistics::create(qid); _queryStatsMap[qid] = stats; + throw util::Bug(ERR_LOC, "&&& QueriesAndChunks::addTask entry should already be there"); // &&& replace with error message ??? } else { stats = itr->second; } guardStats.unlock(); +#else // &&& + auto stats = addQueryId(qid); +#endif // &&& stats->addTask(task); - task->setQueryStatistics(stats); + //&&&task->setQueryStatistics(stats); } /// Update statistics for the Task that was just queued. diff --git a/src/wpublish/QueriesAndChunks.h b/src/wpublish/QueriesAndChunks.h index a51e1d24d..83bcddf36 100644 --- a/src/wpublish/QueriesAndChunks.h +++ b/src/wpublish/QueriesAndChunks.h @@ -193,9 +193,14 @@ class QueriesAndChunks { void removeDead(); void removeDead(QueryStatistics::Ptr const& queryStats); - /// Return the statistics for a user query. + /// Return the statistics for a user query, may be nullptr + /// @see addQueryId() QueryStatistics::Ptr getStats(QueryId const& qId) const; + /// Return the statistics for a user query, creating if needed. + /// @see getStats() + QueryStatistics::Ptr addQueryId(QueryId qId); + void addTask(wbase::Task::Ptr const& task); void queuedTask(wbase::Task::Ptr const& task); void startedTask(wbase::Task::Ptr const& task); diff --git a/src/wpublish/QueryStatistics.cc b/src/wpublish/QueryStatistics.cc index 576effdee..607288658 100644 --- a/src/wpublish/QueryStatistics.cc +++ b/src/wpublish/QueryStatistics.cc @@ -50,7 +50,8 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.QueriesAndChunks"); namespace lsst::qserv::wpublish { -QueryStatistics::QueryStatistics(QueryId const& qId_) : creationTime(CLOCK::now()), queryId(qId_) { +QueryStatistics::QueryStatistics(QueryId const& qId_) + : creationTime(CLOCK::now()), queryId(qId_), _userQueryInfo(wbase::UserQueryInfo::create(qId_)) { /// For all of the histograms, all entries should be kept at least until the work is finished. string qidStr = to_string(queryId); _histSizePerTask = util::Histogram::Ptr(new util::Histogram( @@ -186,6 +187,13 @@ QueryStatistics::SchedTasksInfoMap QueryStatistics::getSchedulerTasksInfoMap() { return _taskSchedInfoMap; } +/* &&& +void QueryStatistics::touch(TIMEPOINT const now) { + lock_guard lock(_qStatsMtx); + _touched = now; +} +*/ + void QueryStatistics::addTask(TIMEPOINT const now) { lock_guard lock(_qStatsMtx); _touched = now; diff --git a/src/wpublish/QueryStatistics.h b/src/wpublish/QueryStatistics.h index dc26a9da4..dbacd5d53 100644 --- a/src/wpublish/QueryStatistics.h +++ b/src/wpublish/QueryStatistics.h @@ -41,10 +41,12 @@ #include "global/intTypes.h" #include "wbase/Task.h" #include "wsched/SchedulerBase.h" +#include "util/InstanceCount.h" //&&& namespace lsst::qserv::wbase { -class Histogram; -} +//&&&class Histogram; +class UserQueryInfo; +} // namespace lsst::qserv::wbase // This header declarations namespace lsst::qserv::wpublish { @@ -73,6 +75,8 @@ class QueryStatistics { return _queryBooted; } + std::shared_ptr getUserQueryInfo() const { return _userQueryInfo; } + void setQueryBooted(bool booted, TIMEPOINT now); /// Add statistics related to the running of the query in the task. @@ -93,6 +97,7 @@ class QueryStatistics { void addTaskTransmit(double timeSeconds, int64_t bytesTransmitted, int64_t rowsTransmitted, double bufferFillSecs); + //&&&void touch(TIMEPOINT const now); void addTask(TIMEPOINT const now); void addTaskRunning(TIMEPOINT const now); bool addTaskCompleted(TIMEPOINT const now, double const taskDuration); @@ -194,6 +199,9 @@ class QueryStatistics { std::shared_ptr _histRowsPerTask; ///< Histogram of rows per Task. SchedTasksInfoMap _taskSchedInfoMap; ///< Map of task information ordered by scheduler name. + + std::shared_ptr const _userQueryInfo; ///< &&& doc + util::InstanceCount _ic{"QueryStatiscs_&&&"}; }; } // namespace lsst::qserv::wpublish diff --git a/src/wsched/testSchedulers.cc b/src/wsched/testSchedulers.cc index 2b3c4df5b..4bf41ec08 100644 --- a/src/wsched/testSchedulers.cc +++ b/src/wsched/testSchedulers.cc @@ -162,7 +162,6 @@ struct SchedulerFixture { } */ - int counter; }; @@ -222,7 +221,7 @@ struct SchedFixture { // TODO: DM-33302 replace this test case BOOST_AUTO_TEST_CASE(Grouping) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable SchedFixture f(60.0, 1); // Values to keep QueriesAndChunk from triggering. LOGS(_log, LOG_LVL_DEBUG, "Test_case grouping"); @@ -307,7 +306,7 @@ BOOST_AUTO_TEST_CASE(Grouping) { } BOOST_AUTO_TEST_CASE(GroupMaxThread) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable // Test that maxThreads is meaningful. LOGS(_log, LOG_LVL_WARN, "Test_case GroupMaxThread"); auto queries = QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), maxBootedC, @@ -342,7 +341,7 @@ BOOST_AUTO_TEST_CASE(GroupMaxThread) { } BOOST_AUTO_TEST_CASE(ScanScheduleTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case ScanScheduleTest"); auto queries = QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), maxBootedC, maxDarkTasksC, resetForTestingC); @@ -407,7 +406,7 @@ BOOST_AUTO_TEST_CASE(ScanScheduleTest) { } BOOST_AUTO_TEST_CASE(BlendScheduleTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case BlendScheduleTest"); // Test that space is appropriately reserved for each scheduler as Tasks are started and finished. // In this case, memMan->lock(..) always returns true (really HandleType::ISEMPTY). @@ -606,11 +605,11 @@ BOOST_AUTO_TEST_CASE(BlendScheduleTest) { BOOST_CHECK(f.blend->calcAvailableTheads() == 5); BOOST_CHECK(f.blend->getInFlight() == 0); LOGS(_log, LOG_LVL_DEBUG, "BlendScheduleTest-1 done"); -#endif // &&& fix and re-enable +#endif // &&& fix and re-enable } BOOST_AUTO_TEST_CASE(BlendScheduleThreadLimitingTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case BlendScheduleThreadLimitingTest"); SchedFixture f(60.0, 1); // Values to keep QueriesAndChunk from triggering. // Test that only 6 threads can be started on a single ScanScheduler @@ -682,7 +681,7 @@ BOOST_AUTO_TEST_CASE(BlendScheduleThreadLimitingTest) { } BOOST_AUTO_TEST_CASE(BlendScheduleQueryRemovalTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable // Test that space is appropriately reserved for each scheduler as Tasks are started and finished. // In this case, memMan->lock(..) always returns true (really HandleType::ISEMPTY). // ChunkIds matter as they control the order Tasks come off individual schedulers. @@ -744,7 +743,7 @@ BOOST_AUTO_TEST_CASE(BlendScheduleQueryRemovalTest) { } BOOST_AUTO_TEST_CASE(BlendScheduleQueryBootTaskTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable // Test if a task is removed if it takes takes too long. // Give the user query 0.1 seconds to run and run it for a second, it should get removed. double tenthOfSecInMinutes = 1.0 / 600.0; // task @@ -830,7 +829,7 @@ BOOST_AUTO_TEST_CASE(BlendScheduleQueryBootTaskTest) { } BOOST_AUTO_TEST_CASE(SlowTableHeapTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case SlowTableHeapTest start"); auto queries = QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), maxBootedC, maxDarkTasksC, resetForTestingC); @@ -867,7 +866,7 @@ BOOST_AUTO_TEST_CASE(SlowTableHeapTest) { } BOOST_AUTO_TEST_CASE(ChunkTasksTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case ChunkTasksTest start"); auto queries = QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), maxBootedC, maxDarkTasksC, resetForTestingC); @@ -942,7 +941,7 @@ BOOST_AUTO_TEST_CASE(ChunkTasksTest) { } BOOST_AUTO_TEST_CASE(ChunkTasksQueueTest) { -#if 0 // &&& fix and re-enable +#if 0 // &&& fix and re-enable LOGS(_log, LOG_LVL_DEBUG, "Test_case ChunkTasksQueueTest start"); auto queries = QueriesAndChunks::setupGlobal(chrono::seconds(1), chrono::seconds(300), maxBootedC, maxDarkTasksC, resetForTestingC); diff --git a/src/xrdreq/QueryManagementAction.h b/src/xrdreq/QueryManagementAction.h index ec5ff9158..c624ecf88 100644 --- a/src/xrdreq/QueryManagementAction.h +++ b/src/xrdreq/QueryManagementAction.h @@ -39,7 +39,7 @@ namespace lsst::qserv::xrdreq { * Class QueryManagementAction is an interface for managing query completion/cancellation * at all Qserv workers that are connected as "publishers" to the XROOTD redirector. */ -// &&& need to get the same functionality using json messages, and not in xrdreq. +// &&&QM need to get the same functionality using json messages, and not in xrdreq. class QueryManagementAction : public std::enable_shared_from_this { public: /// The reponse type represents errors reported by the workers, where worker diff --git a/src/xrdreq/QueryManagementRequest.h b/src/xrdreq/QueryManagementRequest.h index 9c92fcfe6..0e366afe2 100644 --- a/src/xrdreq/QueryManagementRequest.h +++ b/src/xrdreq/QueryManagementRequest.h @@ -41,7 +41,7 @@ namespace lsst::qserv::xrdreq { * the error messages in case of any problems in delivering or processing * notifications. */ -class QueryManagementRequest : public QservRequest { +class QueryManagementRequest : public QservRequest { //&&&QM public: /// The pointer type for instances of the class typedef std::shared_ptr Ptr; diff --git a/src/xrdsvc/ChannelStream.h b/src/xrdsvc/ChannelStream.h index 61c8777e7..db9290fb9 100644 --- a/src/xrdsvc/ChannelStream.h +++ b/src/xrdsvc/ChannelStream.h @@ -40,7 +40,7 @@ namespace lsst::qserv::xrdsvc { /// ChannelStream is an implementation of an XrdSsiStream that accepts /// SendChannel streamed data. -class ChannelStream : public XrdSsiStream { // &&& delete +class ChannelStream : public XrdSsiStream { // &&& delete public: ChannelStream(); virtual ~ChannelStream(); diff --git a/src/xrdsvc/HttpSvc.cc b/src/xrdsvc/HttpSvc.cc index 392f5e6b8..0908efcaa 100644 --- a/src/xrdsvc/HttpSvc.cc +++ b/src/xrdsvc/HttpSvc.cc @@ -138,13 +138,13 @@ uint16_t HttpSvc::start() { _httpServerPtr->addHandlers( {{"POST", "/queryjob", [self](shared_ptr const& req, shared_ptr const& resp) { - HttpWorkerCzarModule::process(::serviceName, self->_foreman, req, resp, "QUERYJOB", + HttpWorkerCzarModule::process(::serviceName, self->_foreman, req, resp, "/queryjob", http::AuthType::REQUIRED); }}}); _httpServerPtr->addHandlers( {{"POST", "/querystatus", [self](shared_ptr const& req, shared_ptr const& resp) { - HttpWorkerCzarModule::process(::serviceName, self->_foreman, req, resp, "QUERYSTATUS", + HttpWorkerCzarModule::process(::serviceName, self->_foreman, req, resp, "/querystatus", http::AuthType::REQUIRED); }}}); _httpServerPtr->start(); diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index 856bd4455..3408aa4cd 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -36,6 +36,7 @@ #include "http/MetaModule.h" #include "http/RequestBodyJSON.h" #include "http/RequestQuery.h" +#include "http/WorkerQueryStatusData.h" #include "mysql/MySqlUtils.h" #include "qmeta/types.h" #include "util/String.h" @@ -48,6 +49,8 @@ #include "wcontrol/Foreman.h" #include "wcontrol/ResourceMonitor.h" #include "wpublish/ChunkInventory.h" +#include "wpublish/QueriesAndChunks.h" +#include "wpublish/QueryStatistics.h" #include "xrdsvc/SsiProvider.h" #include "xrdsvc/XrdName.h" @@ -88,8 +91,8 @@ json HttpWorkerCzarModule::executeImpl(string const& subModuleName) { string const func = string(__func__) + "[sub-module='" + subModuleName + "']"; enforceInstanceId(func, wconfig::WorkerConfig::instance()->replicationInstanceId()); enforceWorkerId(func); - if (subModuleName == "QUERYJOB") return _queryJob(); - if (subModuleName == "QUERYSTATUS") return _queryStatus(); + if (subModuleName == "/queryjob") return _queryJob(); + if (subModuleName == "/querystatus") return _queryStatus(); throw invalid_argument(context() + func + " unsupported sub-module"); } @@ -126,13 +129,24 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { LOGS(_log, LOG_LVL_TRACE, __func__ << " uj qid=" << ujQueryId << " ujid=" << ujId << " czid=" << ujCzarId); + // Get or create QueryStatistics and UserQueryInfo instances. + auto queryStats = foreman()->addQueryId(ujQueryId); + auto userQueryInfo = queryStats->getUserQueryInfo(); + + if (userQueryInfo->getCancelledByCzar()) { + throw wbase::TaskException( + ERR_LOC, string("Already cancelled by czar. ujQueryId=") + to_string(ujQueryId)); + } + if (userQueryInfo->isUberJobDead(ujId)) { + throw wbase::TaskException(ERR_LOC, string("UberJob already dead. ujQueryId=") + + to_string(ujQueryId) + " ujId=" + to_string(ujId)); + } + auto ujData = wbase::UberJobData::create(ujId, czarName, czarId, czarHostName, czarPort, ujQueryId, targetWorkerId, foreman(), authKey()); // Find the entry for this queryId, creat a new one if needed. - wbase::UserQueryInfo::Ptr userQueryInfo = wbase::UserQueryInfo::uqMapInsert(ujQueryId); userQueryInfo->addUberJob(ujData); - auto channelShared = wbase::FileChannelShared::create(ujData, czarId, czarHostName, czarPort, targetWorkerId); ujData->setFileChannelShared(channelShared); @@ -220,9 +234,100 @@ json HttpWorkerCzarModule::_queryStatus() { } json HttpWorkerCzarModule::_handleQueryStatus(std::string const& func) { - LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE HttpWorkerCzarModule::_handleQueryStatus"); - throw util::Bug(ERR_LOC, "&&& NEED CODE HttpWorkerCzarModule::_handleQueryStatus"); -} + LOGS(_log, LOG_LVL_ERROR, "&&& HttpWorkerCzarModule::_handleQueryStatus"); + + json jsRet; + auto now = CLOCK::now(); + auto const workerConfig = wconfig::WorkerConfig::instance(); + auto const replicationInstanceId = workerConfig->replicationInstanceId(); + auto const replicationAuthKey = workerConfig->replicationAuthKey(); + + auto const& jsReq = body().objJson; + auto wqsData = + http::WorkerQueryStatusData::createJson(jsReq, replicationInstanceId, replicationAuthKey, now); + + // For all queryId and czarId items, if the item can't be found, it is simply ignored. Anything that + // is missed will eventually be picked up by other mechanisms, such as results being rejected + // by the czar. + + // If a czar was restarted, cancel and/or delete the abandoned items. + if (wqsData->isCzarRestart()) { + auto restartCzarId = wqsData->getCzarRestartCzarId(); + auto restartQId = wqsData->getCzarRestartQueryId(); + if (restartCzarId > 0 && restartQId > 0) { + wbase::FileChannelShared::cleanUpResultsOnCzarRestart(wqsData->getCzarRestartCzarId(), + wqsData->getCzarRestartQueryId()); + } + } + + // Take the values from the lists in the message to cancel the + // appropriate queries and tasks as needed. + auto const queriesAndChunks = foreman()->queriesAndChunks(); + vector cancelledList; + // Cancelled queries where we want to keep the files + lock_guard mapLg(wqsData->_mapMtx); + for (auto const& [dkQid, dkTm] : wqsData->_qIdDoneKeepFiles) { + auto qStats = queriesAndChunks->addQueryId(dkQid); + if (qStats != nullptr) { + auto uqInfo = qStats->getUserQueryInfo(); + if (uqInfo != nullptr) { + if (!uqInfo->getCancelledByCzar()) { + cancelledList.push_back(uqInfo); + } + } + } + } + vector deleteFilesList; + for (auto const& [dkQid, dkTm] : wqsData->_qIdDoneDeleteFiles) { + auto qStats = queriesAndChunks->addQueryId(dkQid); + if (qStats != nullptr) { + auto uqInfo = qStats->getUserQueryInfo(); + if (uqInfo != nullptr) { + if (!uqInfo->getCancelledByCzar()) { + cancelledList.push_back(uqInfo); + } + deleteFilesList.push_back(uqInfo); + } + } + } + + // Cancel everything in the cancelled list. + for (auto const& canUqInfo : cancelledList) { + canUqInfo->cancelFromCzar(); + } + + // For dead UberJobs, add them to a list of dead uberjobs within UserQueryInfo. + // UserQueryInfo will cancel the tasks in the uberjobs if they exist. + // New UberJob Id's will be checked against the list, and immediately be + // killed if they are on it. (see HttpWorkerCzarModule::_handleQueryJob) + for (auto const& [ujQid, ujIdMap] : wqsData->_qIdDeadUberJobs) { + auto qStats = queriesAndChunks->addQueryId(ujQid); + if (qStats != nullptr) { + auto uqInfo = qStats->getUserQueryInfo(); + if (uqInfo != nullptr) { + if (!uqInfo->getCancelledByCzar()) { + for (auto const& [ujId, tm] : ujIdMap) { + uqInfo->cancelUberJob(ujId); + } + } + } + } + } + + // Delete files that should be deleted + CzarIdType czarId = wqsData->_czInfo->czId; + for (wbase::UserQueryInfo::Ptr uqiPtr : deleteFilesList) { + if (uqiPtr == nullptr) continue; + QueryId qId = uqiPtr->getQueryId(); + wbase::FileChannelShared::cleanUpResults(czarId, qId); + } + + // Syntax errors in the message would throw invalid_argument, which is handled elsewhere. + + // Return a message containing lists of the queries that were cancelled. + jsRet = wqsData->serializeResponseJson(); + return jsRet; +} } // namespace lsst::qserv::xrdsvc diff --git a/src/xrdsvc/SsiRequest.cc b/src/xrdsvc/SsiRequest.cc index 1b4ca9aeb..724c098f9 100644 --- a/src/xrdsvc/SsiRequest.cc +++ b/src/xrdsvc/SsiRequest.cc @@ -115,20 +115,22 @@ void SsiRequest::execute(XrdSsiRequest& req) { // Process the request switch (ru.unitType()) { - case ResourceUnit::DBCHUNK: { // &&& delete + case ResourceUnit::DBCHUNK: { // &&& delete // Increment the counter of the database/chunk resources in use - _foreman->resourceMonitor()->increment(_resourceName); // &&& TODO:UJ make sure this is implemented elsewhere. + _foreman->resourceMonitor()->increment( + _resourceName); // &&& TODO:UJ make sure this is implemented elsewhere. reportError("&&& DBCHUNK requests are no longer available resource db=" + ru.db() + - " chunkId=" + std::to_string(ru.chunk())); + " chunkId=" + std::to_string(ru.chunk())); + throw util::Bug(ERR_LOC, "&&& ResourceUnit::DBCHUNK"); break; } - case ResourceUnit::QUERY: { // &&& delete + case ResourceUnit::QUERY: { // &&& delete LOGS(_log, LOG_LVL_DEBUG, "Parsing request details for resource=" << _resourceName); reportError("&&& QUERY requests are no longer available"); - /* &&& + /* &&&QM proto::QueryManagement request; try { // reqData has the entire request, so we can unpack it without waiting for @@ -187,7 +189,6 @@ void SsiRequest::execute(XrdSsiRequest& req) { } // Note that upon exit the _finMutex will be unlocked allowing Finished() // to actually do something once everything is actually setup. - } /// Called by SSI to free resources. diff --git a/src/xrdsvc/SsiRequest.h b/src/xrdsvc/SsiRequest.h index 5850d18bf..29a600bd3 100644 --- a/src/xrdsvc/SsiRequest.h +++ b/src/xrdsvc/SsiRequest.h @@ -60,7 +60,8 @@ class StreamBuffer; /// qserv worker services. The SSI interface encourages such an approach, and /// object lifetimes are explicitly stated in the documentation which we /// adhere to using BindRequest() and UnBindRequest() responder methods. -class SsiRequest : public XrdSsiResponder, public std::enable_shared_from_this { // &&& delete if possible +class SsiRequest : public XrdSsiResponder, + public std::enable_shared_from_this { // &&& delete if possible public: // Smart pointer definitions