diff --git a/core/modules/ccontrol/MergingHandler.cc b/core/modules/ccontrol/MergingHandler.cc index 18d5495799..e0e152e14c 100644 --- a/core/modules/ccontrol/MergingHandler.cc +++ b/core/modules/ccontrol/MergingHandler.cc @@ -97,8 +97,10 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar throw Bug("MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName); } + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush a"); switch(_state) { case MsgState::HEADER_WAIT: + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush b"); _response->headerSize = static_cast((*bufPtr)[0]); if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) { std::string sErr = "From:" + _wName + "Error decoding proto header for " + getStateStr(_state); @@ -119,13 +121,17 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar << " endNoData=" << endNoData); _state = MsgState::RESULT_WAIT; + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush c"); if (endNoData || nextBufSize == 0) { + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush d"); if (!endNoData || nextBufSize != 0 ) { throw Bug("inconsistent msg termination endNoData=" + std::to_string(endNoData) + " nextBufSize=" + std::to_string(nextBufSize)); } + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush e"); // Nothing to merge, but some bookkeeping needs to be done. _infileMerger->mergeCompleteFor(_jobIds); + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush f"); last = true; _state = MsgState::RESULT_RECV; } @@ -133,9 +139,11 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar return true; case MsgState::RESULT_WAIT: { + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush g"); nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); auto job = getJobBase().lock(); if (!_verifyResult(bufPtr, bLen)) { return false; } + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush h"); if (!_setResult(bufPtr, bLen)) { // This sets _response->result LOGS(_log, LOG_LVL_WARN, "setResult failure " << _wName); return false; @@ -147,7 +155,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar _jobIds.insert(jobId); LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName); + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush i"); auto success = _merge(); + LOGS(_log, LOG_LVL_INFO, "&&& MH::flush j"); _response.reset(new WorkerResponse()); return success; } @@ -218,11 +228,16 @@ void MergingHandler::_initState() { } bool MergingHandler::_merge() { - if (auto job = getJobBase().lock()) { + LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge a"); + auto job = getJobBase().lock(); + if (job != nullptr) { + LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge b"); if (_flushed) { throw Bug("MergingRequester::_merge : already flushed"); } + LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge c"); bool success = _infileMerger->merge(_response); + LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge d"); if (!success) { LOGS(_log, LOG_LVL_WARN, "_merge() failed"); rproc::InfileMergerError const& err = _infileMerger->getError(); @@ -230,6 +245,7 @@ bool MergingHandler::_merge() { _state = MsgState::RESULT_ERR; } _response.reset(); + LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge end"); return success; } LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL"); diff --git a/core/modules/ccontrol/UserQuerySelect.cc b/core/modules/ccontrol/UserQuerySelect.cc index 4038d71e64..72004ae90b 100644 --- a/core/modules/ccontrol/UserQuerySelect.cc +++ b/core/modules/ccontrol/UserQuerySelect.cc @@ -414,12 +414,12 @@ void UserQuerySelect::submit() { std::shared_ptr cmr = ChunkMsgReceiver::newInstance(uberJobId, _messageStore); auto respHandler = std::make_shared(cmr, _infileMerger, uberResultName); + string workerResourceName = workerIter->first; + deque& dq = workerIter->second; auto uJob = qdisp::UberJob::create(_executive, respHandler, _qMetaQueryId, - uberJobId++, _qMetaCzarId); + uberJobId++, _qMetaCzarId, workerResourceName); int chunksInUber = 0; - deque& dq = workerIter->second; - while (!dq.empty() && !chunksInQuery.empty() && chunksInUber < maxChunksPerUber) { int chunkIdWorker = dq.front(); dq.pop_front(); @@ -459,15 +459,22 @@ void UserQuerySelect::submit() { // If any chunks in the query were not found on a worker's list, run them individually. //&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive. for (auto& ciq:chunksInQuery) { + LOGS(_log, LOG_LVL_INFO, "&&& submit q1"); qdisp::JobQuery* jqRaw = ciq.second; + LOGS(_log, LOG_LVL_INFO, "&&& submit q2"); qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw); + LOGS(_log, LOG_LVL_INFO, "&&& submit q3"); std::function funcBuildJob = [this, job{move(job)}](util::CmdData*) { // references in captures cause races QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); + LOGS(_log, LOG_LVL_INFO, "&&& submit q run1"); job->runJob(); + LOGS(_log, LOG_LVL_INFO, "&&& submit q run2"); }; auto cmd = std::make_shared(funcBuildJob); + LOGS(_log, LOG_LVL_INFO, "&&& submit q4"); _executive->queueJobStart(cmd); + LOGS(_log, LOG_LVL_INFO, "&&& submit q5"); } LOGS(_log, LOG_LVL_INFO, "&&& submit r"); diff --git a/core/modules/czar/WorkerResources.cc b/core/modules/czar/WorkerResources.cc index 9c65a4e62b..6eecbd50e5 100644 --- a/core/modules/czar/WorkerResources.cc +++ b/core/modules/czar/WorkerResources.cc @@ -108,7 +108,7 @@ map> WorkerResources::getDequesFor(string const& dbName) { void WorkerResources::setMonoNodeTest() { - string wName("/worker/worker"); + string wName("/worker/5257fbab-c49c-11eb-ba7a-1856802308a2"); std::lock_guard lg(_workerMapMtx); _insertWorker(wName); auto iter = _workers.find(wName); diff --git a/core/modules/qdisp/Executive.cc b/core/modules/qdisp/Executive.cc index 3bea3858b6..59c3a07d30 100644 --- a/core/modules/qdisp/Executive.cc +++ b/core/modules/qdisp/Executive.cc @@ -614,7 +614,8 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) { //&&&XrdSsiResource jobResource(jobQuery->getDescription()->resource().path(), "", jobQuery->getIdStr(), "", 0, affinity); // Affinity should be meaningless here as there should only be one instance of each worker. XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default; - XrdSsiResource uJobResource(uJob->workerResource, "", uJob->getIdStr(), "", 0, affinity); + LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource()); + XrdSsiResource uJobResource(uJob->getWorkerResource(), "", uJob->getIdStr(), "", 0, affinity); // Now construct the actual query request and tie it to the jobQuery. The // shared pointer is used by QueryRequest to keep itself alive, sloppy design. diff --git a/core/modules/qdisp/QueryRequest.cc b/core/modules/qdisp/QueryRequest.cc index 1d384c2f8e..970b3e582c 100644 --- a/core/modules/qdisp/QueryRequest.cc +++ b/core/modules/qdisp/QueryRequest.cc @@ -222,6 +222,7 @@ QueryRequest::~QueryRequest() { // content of request data char* QueryRequest::GetRequest(int& requestLength) { QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); + LOGS(_log, LOG_LVL_INFO, "&&& QueryRequest::GetRequest"); lock_guard lock(_finishStatusMutex); auto jq = _job; if (_finishStatus != ACTIVE || jq == nullptr) { @@ -448,7 +449,7 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) ResponseHandler::BufPtr bufPtr = _askForResponseDataCmd->getBufPtr(); _askForResponseDataCmd.reset(); // No longer need it, and don't want the destructor calling _errorFinish(). - + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData a"); int const protoHeaderSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); ResponseHandler::BufPtr nextHeaderBufPtr; @@ -463,19 +464,20 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) // - The first (bytes = blen - ProtoHeaderWrap::getProtheaderSize()) // is the result associated with the previously received header. // - The second is the header for the next message. - int respSize = blen - protoHeaderSize; nextHeaderBufPtr = make_shared>(bufPtr->begin() + respSize, bufPtr->end()); - + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData b"); // Read the result /* &&& rebase flushOk = jq->getDescription()->respHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize); */ - bool largeResult = false; - int nextBufSize = 0; - bool last = false; - bool flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize); + //&&& bool largeResult = false; + //&&& int nextBufSize = 0; + //&&& bool last = false; + // Values for last, largeResult, and nextBufSize filled in by flush + flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize); + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData c"); if (last) { // Last should only be true when the header is read, not the result. throw Bug("_processData result had 'last' true, which cannot be allowed."); @@ -486,7 +488,7 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) throw Bug("Unexpected header size from flush(result) call QID=" + to_string(_qid) + "#" + to_string(_jobid)); } - + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData d"); if (!flushOk) { _flushError(jq); return; @@ -497,11 +499,14 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) // Values for largeResult, last, and nextBufSize will be filled in by flush(). flushOk = jq->getDescription()->respHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize); - */ + largeResult = false; nextBufSize = 0; + */ + // Values for last, largeResult, and nextBufSize filled in by flush + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData e"); flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize); - + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData f"); if (largeResult) { if (!_largeResult) LOGS(_log, LOG_LVL_DEBUG, "holdState largeResult set to true"); _largeResult = true; // Once the worker indicates it's a large result, it stays that way. @@ -512,22 +517,28 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) LOGS(_log, LOG_LVL_DEBUG, "processData disagreement between last=" << last << " and xrdLast=" << xrdLast); } + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData g"); if (last) { + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData h"); jq->getStatus()->updateInfo(_jobIdStr, JobStatus::COMPLETE); _finish(); // At this point all blocks for this job have been read, there's no point in // having XrdSsi wait for anything. return; } else { + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData i"); _askForResponseDataCmd = make_shared(shared_from_this(), jq, nextBufSize); LOGS(_log, LOG_LVL_DEBUG, "queuing askForResponseDataCmd bufSize=" << nextBufSize); + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData j"); _queueAskForResponse(_askForResponseDataCmd, jq, false); } } else { + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData k"); LOGS(_log, LOG_LVL_WARN, "flushOk = false"); _flushError(jq); return; } + LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData end"); return; } diff --git a/core/modules/qdisp/UberJob.cc b/core/modules/qdisp/UberJob.cc index 67b6b5d1e3..f57c0baa63 100644 --- a/core/modules/qdisp/UberJob.cc +++ b/core/modules/qdisp/UberJob.cc @@ -49,15 +49,17 @@ namespace qdisp { UberJob::Ptr UberJob::create(Executive::Ptr const& executive, std::shared_ptr const& respHandler, - int queryId, int uberJobId, qmeta::CzarId czarId) { - UberJob::Ptr uJob(new UberJob(executive, respHandler, queryId, uberJobId, czarId)); + int queryId, int uberJobId, qmeta::CzarId czarId, string const& workerResource) { + UberJob::Ptr uJob(new UberJob(executive, respHandler, queryId, uberJobId, czarId, workerResource)); + uJob->_setup(); return uJob; } UberJob::UberJob(Executive::Ptr const& executive, std::shared_ptr const& respHandler, - int queryId, int uberJobId, qmeta::CzarId czarId) - : JobBase(), _executive(executive), _respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId), + int queryId, int uberJobId, qmeta::CzarId czarId, string const& workerResource) + : JobBase(), _workerResource(workerResource), _executive(executive), + _respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId), _czarId(czarId), _idStr("QID=" + to_string(_queryId) + ":uber=" + to_string(uberJobId)) { _qdispPool = executive->getQdispPool(); _jobStatus = make_shared(); @@ -182,13 +184,15 @@ void UberJob::callMarkCompleteFunc(bool success) { throw Bug("&&&NEED_CODE may need code to properly handle failed uberjob"); } for (auto&& job:_jobs) { + string idStr = job->getIdStr(); + job->getStatus()->updateInfo(idStr, JobStatus::COMPLETE); job->callMarkCompleteFunc(success); } } std::ostream& UberJob::dumpOS(std::ostream &os) const { - os << "(workerResource=" << workerResource + os << "(workerResource=" << _workerResource << " jobs sz=" << _jobs.size() << "("; for (auto const& job:_jobs) { JobDescription::Ptr desc = job->getDescription(); diff --git a/core/modules/qdisp/UberJob.h b/core/modules/qdisp/UberJob.h index 498b58b736..46b2bd85bb 100644 --- a/core/modules/qdisp/UberJob.h +++ b/core/modules/qdisp/UberJob.h @@ -43,7 +43,7 @@ class UberJob : public JobBase { static Ptr create(Executive::Ptr const& executive, std::shared_ptr const& respHandler, - int queryId, int uberJobId, qmeta::CzarId czarId); + int queryId, int uberJobId, qmeta::CzarId czarId, std::string const& workerResource); UberJob() = delete; UberJob(UberJob const&) = delete; UberJob& operator=(UberJob const&) = delete; @@ -73,17 +73,22 @@ class UberJob : public JobBase { bool verifyPayload() const; + std::string getWorkerResource() { return _workerResource; } + /// &&& TODO:UJ may not need, void prepScrubResults(); - std::string workerResource; // TODO:UJ make private - std::ostream& dumpOS(std::ostream &os) const override; private: UberJob(Executive::Ptr const& executive, std::shared_ptr const& respHandler, - int queryId, int uberJobId, qmeta::CzarId czarId); + int queryId, int uberJobId, qmeta::CzarId czarId, std::string const& workerResource); + + void _setup() { + JobBase::Ptr jbPtr = shared_from_this(); + _respHandler->setJobQuery(jbPtr); + } std::vector _jobs; std::atomic _started{false}; @@ -93,7 +98,8 @@ class UberJob : public JobBase { std::shared_ptr _queryRequestPtr; std::mutex _qrMtx; - std::string _payload; ///< XrdSsi message to be sent to the worker resource. + std::string const _workerResource; + std::string _payload; ///< XrdSsi message to be sent to the _workerResource. std::weak_ptr _executive; std::shared_ptr _respHandler; diff --git a/core/modules/wbase/SendChannelShared.cc b/core/modules/wbase/SendChannelShared.cc index 44bc13b206..032ce08c15 100644 --- a/core/modules/wbase/SendChannelShared.cc +++ b/core/modules/wbase/SendChannelShared.cc @@ -63,6 +63,10 @@ void SendChannelShared::setTaskCount(int taskCount) { _taskCount = taskCount; } +void SendChannelShared::incrTaskCountBy(int partialCount) { + _taskCount += partialCount; +} + bool SendChannelShared::transmitTaskLast(StreamGuard sLock, bool inLast) { /// _caller must have locked _streamMutex before calling this. diff --git a/core/modules/wbase/SendChannelShared.h b/core/modules/wbase/SendChannelShared.h index f6e96e741f..27a4125b85 100644 --- a/core/modules/wbase/SendChannelShared.h +++ b/core/modules/wbase/SendChannelShared.h @@ -43,6 +43,9 @@ namespace wbase { /// A class that provides a SendChannel object with synchronization so it can be /// shared by across multiple threads. Due to what may be sent, the synchronization locking /// is needs to be available outside of the class. +/// Note: Tasks on a SendChannelShared cannot start processing until the total number of +/// Tasks using the SendChannelShared is know. Otherwise, there is a race condition +/// which could close the channel too soon. class SendChannelShared { public: using Ptr = std::shared_ptr; @@ -95,6 +98,9 @@ class SendChannelShared { /// This should not be changed once set. void setTaskCount(int taskCount); + /// All of the tasks that use this SendChannel must be added + /// to the scheduler queue at the same time or it risks a race condition. + void incrTaskCountBy(int subCount); /// Try to transmit the data in tData. /// If the queue already has at least 2 TransmitData objects, addTransmit @@ -151,7 +157,7 @@ class SendChannelShared { /// metadata buffer. Once set, it cannot change until after Finish() has been called. std::string _metadataBuf; - int _taskCount = 0; ///< The number of tasks to be sent over this SendChannel. + std::atomic _taskCount{0}; ///< The number of tasks to be sent over this SendChannel. int _lastCount = 0; ///< Then number of 'last' buffers received. std::atomic _lastRecvd{false}; ///< The truly 'last' transmit message is in the queue. std::atomic _firstTransmit{true}; ///< True until the first transmit has been sent. diff --git a/core/modules/wbase/Task.cc b/core/modules/wbase/Task.cc index 3d560d5afd..99ec755790 100644 --- a/core/modules/wbase/Task.cc +++ b/core/modules/wbase/Task.cc @@ -166,6 +166,7 @@ std::vector Task::createTasks(proto::TaskMsg const& taskMsg, vect.push_back(task); } } else { + LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks queryStr=" << queryStr); auto task = std::make_shared(taskMsg, queryStr, fragNum, sendChannel, gArena, rmLock); //TODO: Maybe? Is it better to move fragment info from // ChunkResource getResourceFragment(int i) to here??? @@ -176,7 +177,9 @@ std::vector Task::createTasks(proto::TaskMsg const& taskMsg, } } - sendChannel->setTaskCount(vect.size()); + LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks vect.size=" << vect.size()); + //&&&sendChannel->setTaskCount(vect.size()); + sendChannel->incrTaskCountBy(vect.size()); return vect; } diff --git a/core/modules/xrdsvc/SsiRequest.cc b/core/modules/xrdsvc/SsiRequest.cc index d824bcd048..a4e1a2d157 100644 --- a/core/modules/xrdsvc/SsiRequest.cc +++ b/core/modules/xrdsvc/SsiRequest.cc @@ -513,7 +513,9 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, // Make a Task for each TaskMsg in the UberJobMsg vector tasks; + LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ tSize=" << tSize); for (int j=0; j < tSize; ++j) { + LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ j=" << j); proto::TaskMsg const& taskMsg = uberJobMsg->taskmsgs(j); if (!taskMsg.has_db() || !taskMsg.has_chunkid()) { @@ -525,6 +527,7 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, int chunkId = taskMsg.chunkid(); //&&&string resourcePath = "/" + db + "/" + to_string(chunkId); string resourcePath = ResourceUnit::makePath(chunkId, db); + LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ resourcePath=" << resourcePath); ResourceUnit ru(resourcePath); if (ru.db() != db || ru.chunk() != chunkId) { throw Bug("resource path didn't match ru"); @@ -532,6 +535,7 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, auto resourceLock = std::make_shared(*(_resourceMonitor.get()), resourcePath); // If the query uses subchunks, the taskMsg will return multiple Tasks. Otherwise, one task. + LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ creating tasks"); auto nTasks = wbase::Task::createTasks(taskMsg, sendChannel, gArena, resourceLock); // Move nTasks into tasks tasks.insert(tasks.end(), std::make_move_iterator(nTasks.begin()),