Skip to content

Commit

Permalink
More cancellation code added.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Sep 19, 2024
1 parent be6ed7f commit 8f3eb8f
Show file tree
Hide file tree
Showing 60 changed files with 671 additions and 2,762 deletions.
2 changes: 0 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -131,7 +130,6 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
2 changes: 0 additions & 2 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ target_link_libraries(ccontrol PUBLIC
parser
replica
sphgeom
xrdreq
XrdCl
)

Expand All @@ -51,7 +50,6 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
23 changes: 7 additions & 16 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
#include "util/Bug.h"
#include "util/IterableFormatter.h"
#include "util/ThreadPriority.h"
#include "xrdreq/QueryManagementAction.h"
#include "qdisp/UberJob.h"

namespace {
Expand Down Expand Up @@ -453,7 +452,8 @@ void UserQuerySelect::buildAndSendUberJobs() {
}

// Add worker contact info to UberJobs.
auto const wContactMap = czRegistry->getWorkerContactMap();
//&&& auto const wContactMap = czRegistry->getWorkerContactMap();
auto const wContactMap = czRegistry->waitForWorkerContactMap(); //&&&Z
LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts());
for (auto const& [wIdKey, ujVect] : workerJobMap) {
auto iter = wContactMap->find(wIdKey);
Expand Down Expand Up @@ -505,34 +505,25 @@ QueryState UserQuerySelect::join() {
// finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the
// same.
if (finalRows < 0) finalRows = collectedRows;
// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE; //&&&QM

QueryState state = SUCCESS;
if (successful) {
_qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_INFO, "Joined everything (success)");
} else if (_killed) {
// status is already set to ABORTED
LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)");
operation = proto::QueryManagement::CANCEL; //&&&QM
state = ERROR;
} else {
_qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)");
operation = proto::QueryManagement::CANCEL; //&&&QM
state = ERROR;
}
auto const czarConfig = cconfig::CzarConfig::instance();
if (czarConfig->notifyWorkersOnQueryFinish()) {
try {
// &&& do this another way, also see executive::squash &&&QM
xrdreq::QueryManagementAction::notifyAllWorkers(czarConfig->getXrootdFrontendUrl(), operation,
_qMetaCzarId, _qMetaQueryId);
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}

// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(_executive->getId());
return state;
}

Expand Down
156 changes: 84 additions & 72 deletions src/czar/ActiveWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "czar/Czar.h"
#include "http/Client.h"
#include "http/MetaModule.h"
#include "util/common.h"
Expand All @@ -43,15 +44,6 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.ActiveWorker");

namespace lsst::qserv::czar {

/* &&&
string WorkerContactInfo::dump() const {
stringstream os;
os << "workerContactInfo{"
<< "id=" << wId << " host=" << wHost << " mgHost=" << wManagementHost << " port=" << wPort << "}";
return os.str();
}
*/

string ActiveWorker::getStateStr(State st) {
switch (st) {
case ALIVE:
Expand All @@ -66,13 +58,15 @@ string ActiveWorker::getStateStr(State st) {

bool ActiveWorker::compareContactInfo(http::WorkerContactInfo const& wcInfo) const {
lock_guard<mutex> lg(_aMtx);
return _wqsData->_wInfo->isSameContactInfo(wcInfo);
auto wInfo_ = _wqsData->getWInfo();
if (wInfo_ == nullptr) return false;
return wInfo_->isSameContactInfo(wcInfo);
}

void ActiveWorker::setWorkerContactInfo(http::WorkerContactInfo::Ptr const& wcInfo) {
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " new info=" << wcInfo->dump());
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " new info=" << wcInfo->dump());
lock_guard<mutex> lg(_aMtx);
_wqsData->_wInfo = wcInfo;
_wqsData->setWInfo(wcInfo);
}

void ActiveWorker::_changeStateTo(State newState, double secsSinceUpdate, string const& note) {
Expand All @@ -85,45 +79,64 @@ void ActiveWorker::_changeStateTo(State newState, double secsSinceUpdate, string

void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double timeoutDeadSecs,
double maxLifetime) {
lock_guard<mutex> lg(_aMtx);
double secsSinceUpdate = _wqsData->_wInfo->timeSinceRegUpdateSeconds();
// Update the last time the registry contacted this worker.
switch (_state) {
case ALIVE: {
if (secsSinceUpdate > timeoutAliveSecs) {
_changeStateTo(QUESTIONABLE, secsSinceUpdate, cName(__func__));
// &&& Anything else that should be done here?
}
break;
bool newlyDeadWorker = false;
http::WorkerContactInfo::Ptr wInfo_;
{
lock_guard<mutex> lg(_aMtx);
wInfo_ = _wqsData->getWInfo();
if (wInfo_ == nullptr) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " no WorkerContactInfo");
return;
}
case QUESTIONABLE: {
if (secsSinceUpdate < timeoutAliveSecs) {
_changeStateTo(ALIVE, secsSinceUpdate, cName(__func__));
double secsSinceUpdate = (wInfo_ == nullptr) ? timeoutDeadSecs : wInfo_->timeSinceRegUpdateSeconds();

// Update the last time the registry contacted this worker.
switch (_state) {
case ALIVE: {
if (secsSinceUpdate >= timeoutAliveSecs) {
_changeStateTo(QUESTIONABLE, secsSinceUpdate, cName(__func__));
// &&& Anything else that should be done here?
}
break;
}
if (secsSinceUpdate > timeoutDeadSecs) {
_changeStateTo(DEAD, secsSinceUpdate, cName(__func__));
// &&& TODO:UJ all uberjobs for this worker need to die.
case QUESTIONABLE: {
if (secsSinceUpdate < timeoutAliveSecs) {
_changeStateTo(ALIVE, secsSinceUpdate, cName(__func__));
}
if (secsSinceUpdate >= timeoutDeadSecs) {
_changeStateTo(DEAD, secsSinceUpdate, cName(__func__));
// All uberjobs for this worker need to die.
newlyDeadWorker = true;
}
break;
}
break;
}
case DEAD: {
LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE");
if (secsSinceUpdate < timeoutAliveSecs) {
_changeStateTo(ALIVE, secsSinceUpdate, cName(__func__));
} else {
// Don't waste time on this worker until the registry has heard from it.
return;
case DEAD: {
if (secsSinceUpdate < timeoutAliveSecs) {
_changeStateTo(ALIVE, secsSinceUpdate, cName(__func__));
} else {
// Don't waste time on this worker until the registry has heard from it.
// &&& If it's been a really really long time, maybe delete this entry ???
return;
}
break;
}
break;
}
}

// _aMtx must not be held when calling this.
if (newlyDeadWorker) {
LOGS(_log, LOG_LVL_WARN,
cName(__func__) << " worker " << wInfo_->wId << " appears to have died, reassigning its jobs.");
czar::Czar::getCzar()->killIncompleteUbjerJobsOn(wInfo_->wId);
}

shared_ptr<json> jsWorkerReqPtr;
{
lock_guard<mutex> mapLg(_wqsData->_mapMtx);
lock_guard<mutex> lg(_aMtx); //&&& needed ???
lock_guard<mutex> mapLg(_wqsData->mapMtx);
// Check how many messages are currently being sent to the worker, if at the limit, return
if (_wqsData->_qIdDoneKeepFiles.empty() && _wqsData->_qIdDoneDeleteFiles.empty() &&
_wqsData->_qIdDeadUberJobs.empty()) {
if (_wqsData->qIdDoneKeepFiles.empty() && _wqsData->qIdDoneDeleteFiles.empty() &&
_wqsData->qIdDeadUberJobs.empty()) {
return;
}
int tCount = _conThreadCount;
Expand All @@ -141,14 +154,20 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti
// &&& Maybe only send the status message if the lists are not empty ???
// Start a thread to send the message. (Maybe these should go on the qdisppool? &&&)
// put this in a different function and start the thread.&&&;
_sendStatusMsg(jsWorkerReqPtr);
_sendStatusMsg(wInfo_, jsWorkerReqPtr);
}

void ActiveWorker::_sendStatusMsg(std::shared_ptr<nlohmann::json> const& jsWorkerReqPtr) {
void ActiveWorker::_sendStatusMsg(http::WorkerContactInfo::Ptr const& wInf,
std::shared_ptr<nlohmann::json> const& jsWorkerReqPtr) {
auto& jsWorkerReq = *jsWorkerReqPtr;
auto const method = http::Method::POST;
auto const& wInf = _wqsData->_wInfo;
string const url = "http://" + wInf->wHost + ":" + to_string(wInf->wPort) + "/querystatus";
//&&&auto const wInf = _wqsData->getWInfo();
if (wInf == nullptr) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " wInfo was null.");
return;
}
auto [ciwId, ciwHost, ciwManag, ciwPort] = wInf->getAll();
string const url = "http://" + ciwHost + ":" + to_string(ciwPort) + "/querystatus";
vector<string> const headers = {"Content-Type: application/json"};
auto const& czarConfig = cconfig::CzarConfig::instance();

Expand All @@ -163,7 +182,13 @@ void ActiveWorker::_sendStatusMsg(std::shared_ptr<nlohmann::json> const& jsWorke
try {
json const response = client.readAsJson();
if (0 != response.at("success").get<int>()) {
transmitSuccess = _wqsData->handleResponseJson(response);
bool startupTimeChanged = false;
tie(transmitSuccess, startupTimeChanged) = _wqsData->handleResponseJson(response);
if (startupTimeChanged) {
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " worker startupTime changed, likely rebooted.");
// kill all incomplete UberJobs on this worker.
czar::Czar::getCzar()->killIncompleteUbjerJobsOn(wInf->wId);
}
} else {
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " response success=0");
}
Expand All @@ -182,6 +207,11 @@ void ActiveWorker::addToDoneKeepFiles(QueryId qId) { _wqsData->addToDoneKeepFile

void ActiveWorker::removeDeadUberJobsFor(QueryId qId) { _wqsData->removeDeadUberJobsFor(qId); }

void ActiveWorker::addDeadUberJob(QueryId qId, UberJobId ujId) {
auto now = CLOCK::now();
_wqsData->addDeadUberJob(qId, ujId, now);
}

string ActiveWorker::dump() const {
lock_guard<mutex> lg(_aMtx);
return _dump();
Expand Down Expand Up @@ -214,32 +244,26 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap,
LOGS(_log, LOG_LVL_WARN,
cName(__func__) << " worker contact info changed for " << wcKey
<< " new=" << wcVal->dump() << " old=" << aWorker->dump());
// If there is existing information, only host and port values will change.
aWorker->setWorkerContactInfo(wcVal);
}
}
}
}

/* &&&
void ActiveWorkerMap::pruneMap() {
lock_guard<mutex> awLg(_awMapMtx);
for (auto iter = _awMap.begin(); iter != _awMap.end();) {
auto aWorker = iter->second;
if (aWorker->getWInfo()->timeSinceTouchSeconds() > _maxDeadTimeSeconds) {
iter = _awMap.erase(iter);
} else {
++iter;
}
}
}
*/

void ActiveWorkerMap::setCzarCancelAfterRestart(CzarIdType czId, QueryId lastQId) {
_czarCancelAfterRestart = true;
_czarCancelAfterRestartCzId = czId;
_czarCancelAfterRestartQId = lastQId;
}

ActiveWorker::Ptr ActiveWorkerMap::getActiveWorker(string const& workerId) const {
lock_guard<mutex> lck(_awMapMtx);
auto iter = _awMap.find(workerId);
if (iter == _awMap.end()) return nullptr;
return iter->second;
}

void ActiveWorkerMap::sendActiveWorkersMessages() {
// Send messages to each active worker as needed
lock_guard<mutex> lck(_awMapMtx);
Expand All @@ -248,7 +272,6 @@ void ActiveWorkerMap::sendActiveWorkersMessages() {
}
}

/// &&& doc
void ActiveWorkerMap::addToDoneDeleteFiles(QueryId qId) {
lock_guard<mutex> lck(_awMapMtx);
for (auto const& [wName, awPtr] : _awMap) {
Expand All @@ -257,7 +280,6 @@ void ActiveWorkerMap::addToDoneDeleteFiles(QueryId qId) {
}
}

/// &&& doc
void ActiveWorkerMap::addToDoneKeepFiles(QueryId qId) {
lock_guard<mutex> lck(_awMapMtx);
for (auto const& [wName, awPtr] : _awMap) {
Expand All @@ -266,14 +288,4 @@ void ActiveWorkerMap::addToDoneKeepFiles(QueryId qId) {
}
}

/* &&&
/// &&& doc
void ActiveWorkerMap::removeDeadUberJobsFor(QueryId qId) {
lock_guard<mutex> lck(_awMapMtx);
for (auto const& [wName, awPtr] : _awMap) {
awPtr->removeDeadUberJobsFor(qId);
}
}
*/

} // namespace lsst::qserv::czar
Loading

0 comments on commit 8f3eb8f

Please sign in to comment.