diff --git a/core/modules/ccontrol/MergingHandler.cc b/core/modules/ccontrol/MergingHandler.cc index e0e152e14c..05faf15c0e 100644 --- a/core/modules/ccontrol/MergingHandler.cc +++ b/core/modules/ccontrol/MergingHandler.cc @@ -97,10 +97,8 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar throw Bug("MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName); } - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush a"); switch(_state) { case MsgState::HEADER_WAIT: - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush b"); _response->headerSize = static_cast((*bufPtr)[0]); if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) { std::string sErr = "From:" + _wName + "Error decoding proto header for " + getStateStr(_state); @@ -121,17 +119,13 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar << " endNoData=" << endNoData); _state = MsgState::RESULT_WAIT; - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush c"); if (endNoData || nextBufSize == 0) { - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush d"); if (!endNoData || nextBufSize != 0 ) { throw Bug("inconsistent msg termination endNoData=" + std::to_string(endNoData) + " nextBufSize=" + std::to_string(nextBufSize)); } - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush e"); // Nothing to merge, but some bookkeeping needs to be done. _infileMerger->mergeCompleteFor(_jobIds); - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush f"); last = true; _state = MsgState::RESULT_RECV; } @@ -139,11 +133,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar return true; case MsgState::RESULT_WAIT: { - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush g"); nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); auto job = getJobBase().lock(); if (!_verifyResult(bufPtr, bLen)) { return false; } - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush h"); if (!_setResult(bufPtr, bLen)) { // This sets _response->result LOGS(_log, LOG_LVL_WARN, "setResult failure " << _wName); return false; @@ -155,9 +147,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar _jobIds.insert(jobId); LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName); - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush i"); auto success = _merge(); - LOGS(_log, LOG_LVL_INFO, "&&& MH::flush j"); _response.reset(new WorkerResponse()); return success; } @@ -228,16 +218,12 @@ void MergingHandler::_initState() { } bool MergingHandler::_merge() { - LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge a"); auto job = getJobBase().lock(); if (job != nullptr) { - LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge b"); if (_flushed) { throw Bug("MergingRequester::_merge : already flushed"); } - LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge c"); bool success = _infileMerger->merge(_response); - LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge d"); if (!success) { LOGS(_log, LOG_LVL_WARN, "_merge() failed"); rproc::InfileMergerError const& err = _infileMerger->getError(); @@ -245,7 +231,6 @@ bool MergingHandler::_merge() { _state = MsgState::RESULT_ERR; } _response.reset(); - LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge end"); return success; } LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL"); diff --git a/core/modules/ccontrol/UserQuerySelect.cc b/core/modules/ccontrol/UserQuerySelect.cc index 72004ae90b..4294485c6d 100644 --- a/core/modules/ccontrol/UserQuerySelect.cc +++ b/core/modules/ccontrol/UserQuerySelect.cc @@ -304,37 +304,6 @@ void UserQuerySelect::submit() { i != e && !_executive->getCancelled(); ++i) { auto& chunkSpec = *i; - /* &&& - std::function funcBuildJob = - [this, sequence, // sequence must be a copy - &chunkSpec, &queryTemplates, - &chunks, &chunksMtx, &ttn, - &taskMsgFactory](util::CmdData*) { - - QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); - - qproc::ChunkQuerySpec::Ptr cs; - { - std::lock_guard lock(chunksMtx); - cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec); - chunks.push_back(cs->chunkId); - } - std::string chunkResultName = ttn.make(cs->chunkId); - - std::shared_ptr cmr = ChunkMsgReceiver::newInstance(cs->chunkId, _messageStore); - ResourceUnit ru; - ru.setAsDbChunk(cs->db, cs->chunkId); - qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create(_qMetaCzarId, - _executive->getId(), sequence, ru, - std::make_shared(cmr, _infileMerger, chunkResultName), - taskMsgFactory, cs, chunkResultName); - _executive->add(jobDesc); - }; - - auto cmd = std::make_shared(funcBuildJob); - _executive->queueJobStart(cmd); - */ - // Make the JobQuery now QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); @@ -368,7 +337,6 @@ void UserQuerySelect::submit() { if (!uberJobsEnabled) { std::function funcBuildJob = - //&&&[this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races [this, job{move(job)}](util::CmdData*) { // references in captures cause races QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); job->runJob(); @@ -447,56 +415,37 @@ void UserQuerySelect::submit() { workerIter = tmpWorkerList.begin(); } } - LOGS(_log, LOG_LVL_INFO, "&&& submit m"); _executive->addUberJobs(uberJobs); - LOGS(_log, LOG_LVL_INFO, "&&& submit n"); for (auto&& uJob:uberJobs) { - LOGS(_log, LOG_LVL_INFO, "&&& submit o"); uJob->runUberJob(); - LOGS(_log, LOG_LVL_INFO, "&&& submit p"); } - LOGS(_log, LOG_LVL_INFO, "&&& submit q"); // If any chunks in the query were not found on a worker's list, run them individually. - //&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive. for (auto& ciq:chunksInQuery) { - LOGS(_log, LOG_LVL_INFO, "&&& submit q1"); qdisp::JobQuery* jqRaw = ciq.second; - LOGS(_log, LOG_LVL_INFO, "&&& submit q2"); qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw); - LOGS(_log, LOG_LVL_INFO, "&&& submit q3"); std::function funcBuildJob = [this, job{move(job)}](util::CmdData*) { // references in captures cause races QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); - LOGS(_log, LOG_LVL_INFO, "&&& submit q run1"); job->runJob(); - LOGS(_log, LOG_LVL_INFO, "&&& submit q run2"); }; auto cmd = std::make_shared(funcBuildJob); - LOGS(_log, LOG_LVL_INFO, "&&& submit q4"); _executive->queueJobStart(cmd); - LOGS(_log, LOG_LVL_INFO, "&&& submit q5"); } - - LOGS(_log, LOG_LVL_INFO, "&&& submit r"); } // attempt to restore original thread priority, requires root if (increaseThreadPriority) { threadPriority.restoreOriginalValues(); } - LOGS(_log, LOG_LVL_INFO, "&&& submit s"); LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); _executive->waitForAllJobsToStart(); - LOGS(_log, LOG_LVL_INFO, "&&& submit t"); // we only care about per-chunk info for ASYNC queries if (_async) { - LOGS(_log, LOG_LVL_INFO, "&&& submit u"); std::lock_guard lock(chunksMtx); _qMetaAddChunks(chunks); } - LOGS(_log, LOG_LVL_INFO, "&&& submit v"); } diff --git a/core/modules/czar/WorkerResources.cc b/core/modules/czar/WorkerResources.cc index 6eecbd50e5..81f5bb5fd2 100644 --- a/core/modules/czar/WorkerResources.cc +++ b/core/modules/czar/WorkerResources.cc @@ -100,7 +100,6 @@ map> WorkerResources::getDequesFor(string const& dbName) { for (auto&& elem:_workers) { string wName = elem.first; WorkerResource::Ptr const& wr = elem.second; - //&&& deque dq = wr.getDequeFor(dbName); dqMap.emplace(wName, wr->getDequeFor(dbName)); } return dqMap; diff --git a/core/modules/czar/WorkerResources.h b/core/modules/czar/WorkerResources.h index 2e9233b17d..b96455ad2a 100644 --- a/core/modules/czar/WorkerResources.h +++ b/core/modules/czar/WorkerResources.h @@ -84,7 +84,8 @@ class WorkerResources { public: WorkerResources() = default; - //&&& other constructors + WorkerResources(WorkerResources const&) = delete; + WorkerResources& operator=(WorkerResources const&) = delete; ~WorkerResources() = default; diff --git a/core/modules/global/ResourceUnit.h b/core/modules/global/ResourceUnit.h index 01bedf7959..d21a29e91e 100644 --- a/core/modules/global/ResourceUnit.h +++ b/core/modules/global/ResourceUnit.h @@ -48,13 +48,6 @@ class ResourceUnit { class Checker; enum UnitType {GARBAGE, DBCHUNK, CQUERY, UNKNOWN, RESULT, WORKER}; - /* &&& - static std::string makeChunkResourceName(std::string const& db, int chunkId) { - std::string str = "/chk/" + db + "/" + std::to_string(chunkId); - return str; - } - */ - ResourceUnit() : _unitType(GARBAGE), _chunk(-1) {} explicit ResourceUnit(std::string const& path); diff --git a/core/modules/qdisp/Executive.cc b/core/modules/qdisp/Executive.cc index 59c3a07d30..502df157ce 100644 --- a/core/modules/qdisp/Executive.cc +++ b/core/modules/qdisp/Executive.cc @@ -206,7 +206,6 @@ void Executive::waitForAllJobsToStart() { // @return true if query was actually started (i.e. we were not cancelled) // bool Executive::startQuery(shared_ptr const& jobQuery) { - LOGS(_log, LOG_LVL_WARN, "&&& Executive::startQuery"); lock_guard lock(_cancelled.getMutex()); // If we have been cancelled, then return false. @@ -238,7 +237,6 @@ bool Executive::startQuery(shared_ptr const& jobQuery) { /// Return true if it was successfully added to the map. /// bool Executive::_addJobToMap(JobQuery::Ptr const& job) { - LOGS(_log, LOG_LVL_WARN, "&&& Executive::_addJobToMap jobId=" << job->getIdInt()); auto entry = pair(job->getIdInt(), job); lock_guard lockJobMap(_jobMapMtx); bool res = _jobMap.insert(entry).second; @@ -611,7 +609,6 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) { if (_cancelled) return false; // Construct a temporary resource object to pass to ProcessRequest(). - //&&&XrdSsiResource jobResource(jobQuery->getDescription()->resource().path(), "", jobQuery->getIdStr(), "", 0, affinity); // Affinity should be meaningless here as there should only be one instance of each worker. XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default; LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource()); @@ -631,10 +628,6 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) { } -void Executive::startRemainingJobs(ChunkIdJobMapType& remainingChunks) { - throw Bug("&&&NEED_CODE executive start remaining jobs"); -} - ostream& operator<<(ostream& os, Executive::JobMap::value_type const& v) { JobStatus::Ptr status = v.second->getStatus(); os << v.first << ": " << *status; diff --git a/core/modules/qdisp/Executive.h b/core/modules/qdisp/Executive.h index 4dcbb90154..16672ad996 100644 --- a/core/modules/qdisp/Executive.h +++ b/core/modules/qdisp/Executive.h @@ -85,7 +85,6 @@ class Executive : public std::enable_shared_from_this { typedef std::shared_ptr Ptr; typedef std::unordered_map> JobMap; typedef int ChunkIdType; //&&& TODO:UJ probably needs to be ResourceUnit - //&&&typedef std::map ChunkIdJobMapType; typedef std::unordered_map ChunkIdJobMapType; /// Construct an Executive. @@ -140,10 +139,7 @@ class Executive : public std::enable_shared_from_this { bool startQuery(std::shared_ptr const& jobQuery); - /// Start any jobs that were not started as part of UberJobs. - void startRemainingJobs(ChunkIdJobMapType& remainingJobs); // &&& delete - - ///&&& TODO:UJ UberJob + /// Add UbjerJobs to this user query. void addUberJobs(std::vector> const& jobsToAdd); ChunkIdJobMapType& getChunkJobMapAndInvalidate(); bool startUberJob(std::shared_ptr const& uJob); @@ -212,7 +208,7 @@ class Executive : public std::enable_shared_from_this { bool _scanInteractive = false; ///< true for interactive scans. - // &&& TODO UberJob + // Add a job to the _chunkToJobMap void _addToChunkJobMap(std::shared_ptr const& job); /// _chunkToJobMap is created once and then destroyed when used. std::atomic _chunkToJobMapInvalid{false}; ///< true indicates the map is no longer valid. diff --git a/core/modules/qdisp/JobQuery.cc b/core/modules/qdisp/JobQuery.cc index e5fcb2cc65..0eff4b569e 100644 --- a/core/modules/qdisp/JobQuery.cc +++ b/core/modules/qdisp/JobQuery.cc @@ -70,7 +70,6 @@ JobQuery::~JobQuery() { bool JobQuery::runJob() { QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getIdInt()); LOGS(_log, LOG_LVL_DEBUG, " runJob " << *this); - LOGS(_log, LOG_LVL_WARN, "&&& runJob " << *this); auto executive = _executive.lock(); if (executive == nullptr) { LOGS(_log, LOG_LVL_ERROR, "runJob failed executive==nullptr"); diff --git a/core/modules/qdisp/JobQuery.h b/core/modules/qdisp/JobQuery.h index 8914e5fc88..4342b12019 100644 --- a/core/modules/qdisp/JobQuery.h +++ b/core/modules/qdisp/JobQuery.h @@ -66,7 +66,6 @@ class JobQuery : public JobBase { QueryId getQueryId() const override {return _qid; } int getIdInt() const override { return _jobDescription->id(); } - //&&& std::string getPayload() const override { return _jobDescription->payload(); } std::string const& getPayload() const override; std::string const& getIdStr() const override { return _idStr; } std::shared_ptr getRespHandler() override { return _jobDescription->respHandler(); } @@ -101,8 +100,10 @@ class JobQuery : public JobBase { JobStatus::Ptr const& jobStatus, std::shared_ptr const& markCompleteFunc, QueryId qid); - /// &&& TODO:UJ UberJob functions + /// Set to true if this job is part of an UberJob void setInUberJob(bool inUberJob) { _inUberJob = inUberJob; }; + + /// @return true if this job is part of an UberJob. bool inUberJob() const { return _inUberJob; } protected: @@ -143,7 +144,7 @@ class JobQuery : public JobBase { std::shared_ptr _qdispPool; - /// &&& TODO:UJ UberJob + /// True if this job is part of an UberJob. std::atomic _inUberJob{false}; ///< TODO:UJ There are probably several places this should be checked }; diff --git a/core/modules/qdisp/QueryRequest.cc b/core/modules/qdisp/QueryRequest.cc index 970b3e582c..02ad058b3f 100644 --- a/core/modules/qdisp/QueryRequest.cc +++ b/core/modules/qdisp/QueryRequest.cc @@ -140,7 +140,6 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // _processData will have created another AskForResponseDataCmd object if was needed. tTotal.stop(); } - LOGS(_log, LOG_LVL_WARN, "&&& QR after _processData"); _setState(State::DONE2); LOGS(_log, LOG_LVL_DEBUG, "Ask data is done wait=" << tWaiting.getElapsed() << " total=" << tTotal.getElapsed()); @@ -222,7 +221,6 @@ QueryRequest::~QueryRequest() { // content of request data char* QueryRequest::GetRequest(int& requestLength) { QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); - LOGS(_log, LOG_LVL_INFO, "&&& QueryRequest::GetRequest"); lock_guard lock(_finishStatusMutex); auto jq = _job; if (_finishStatus != ACTIVE || jq == nullptr) { @@ -449,7 +447,6 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) ResponseHandler::BufPtr bufPtr = _askForResponseDataCmd->getBufPtr(); _askForResponseDataCmd.reset(); // No longer need it, and don't want the destructor calling _errorFinish(). - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData a"); int const protoHeaderSize = proto::ProtoHeaderWrap::getProtoHeaderSize(); ResponseHandler::BufPtr nextHeaderBufPtr; @@ -466,18 +463,9 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) // - The second is the header for the next message. int respSize = blen - protoHeaderSize; nextHeaderBufPtr = make_shared>(bufPtr->begin() + respSize, bufPtr->end()); - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData b"); // Read the result - /* &&& rebase - flushOk = jq->getDescription()->respHandler()->flush(respSize, bufPtr, last, - largeResult, nextBufSize); - */ - //&&& bool largeResult = false; - //&&& int nextBufSize = 0; - //&&& bool last = false; // Values for last, largeResult, and nextBufSize filled in by flush flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize); - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData c"); if (last) { // Last should only be true when the header is read, not the result. throw Bug("_processData result had 'last' true, which cannot be allowed."); @@ -488,25 +476,14 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) throw Bug("Unexpected header size from flush(result) call QID=" + to_string(_qid) + "#" + to_string(_jobid)); } - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData d"); if (!flushOk) { _flushError(jq); return; } // Read the next header - /* &&& rebase - // Values for largeResult, last, and nextBufSize will be filled in by flush(). - flushOk = jq->getDescription()->respHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, - largeResult, nextBufSize); - - largeResult = false; - nextBufSize = 0; - */ // Values for last, largeResult, and nextBufSize filled in by flush - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData e"); flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize); - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData f"); if (largeResult) { if (!_largeResult) LOGS(_log, LOG_LVL_DEBUG, "holdState largeResult set to true"); _largeResult = true; // Once the worker indicates it's a large result, it stays that way. @@ -517,28 +494,22 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) LOGS(_log, LOG_LVL_DEBUG, "processData disagreement between last=" << last << " and xrdLast=" << xrdLast); } - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData g"); if (last) { - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData h"); jq->getStatus()->updateInfo(_jobIdStr, JobStatus::COMPLETE); _finish(); // At this point all blocks for this job have been read, there's no point in // having XrdSsi wait for anything. return; } else { - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData i"); _askForResponseDataCmd = make_shared(shared_from_this(), jq, nextBufSize); LOGS(_log, LOG_LVL_DEBUG, "queuing askForResponseDataCmd bufSize=" << nextBufSize); - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData j"); _queueAskForResponse(_askForResponseDataCmd, jq, false); } } else { - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData k"); LOGS(_log, LOG_LVL_WARN, "flushOk = false"); _flushError(jq); return; } - LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData end"); return; } @@ -708,17 +679,12 @@ void QueryRequest::_finish() { /// Inform the Executive that this query completed, and // Call MarkCompleteFunc only once, it should only be called from _finish() or _errorFinish. void QueryRequest::_callMarkComplete(bool success) { - LOGS(_log, LOG_LVL_WARN, "&&& QueryRequest::_callMarkComplete a _calledMarkComplete=" << _calledMarkComplete); if (!_calledMarkComplete.exchange(true)) { - LOGS(_log, LOG_LVL_WARN, "&&& QueryRequest::_callMarkComplete b"); auto jq = _job; - //&&&if (jq != nullptr) jq->getMarkCompleteFunc()->operator()(success); if (jq != nullptr) { - LOGS(_log, LOG_LVL_WARN, "&&& QueryRequest::_callMarkComplete c"); jq->callMarkCompleteFunc(success); } } - LOGS(_log, LOG_LVL_WARN, "&&& QueryRequest::_callMarkComplete end"); } ostream& operator<<(ostream& os, QueryRequest const& qr) { diff --git a/core/modules/qdisp/QueryRequest.h b/core/modules/qdisp/QueryRequest.h index 0988f687ba..ead942ce04 100644 --- a/core/modules/qdisp/QueryRequest.h +++ b/core/modules/qdisp/QueryRequest.h @@ -97,13 +97,6 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this Ptr; - /* &&& - static Ptr create(std::shared_ptr const& jobQuery) { - Ptr newQueryRequest(new QueryRequest(jobQuery)); - return newQueryRequest; - } - */ - static Ptr create(std::shared_ptr const& JobBase) { Ptr newQueryRequest(new QueryRequest(JobBase)); return newQueryRequest; diff --git a/core/modules/qdisp/UberJob.cc b/core/modules/qdisp/UberJob.cc index f57c0baa63..6cf555040f 100644 --- a/core/modules/qdisp/UberJob.cc +++ b/core/modules/qdisp/UberJob.cc @@ -80,7 +80,6 @@ bool UberJob::addJob(JobQuery* job) { bool UberJob::runUberJob() { QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getIdInt()); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob a"); // Build the uberjob payload. // TODO:UJ For simplicity in the first pass, just make a TaskMsg for each Job and append it to the UberJobMsg. // This is terribly inefficient and should be replaced by using a template and list of chunks that the @@ -90,32 +89,22 @@ bool UberJob::runUberJob() { proto::UberJobMsg* ujMsg = google::protobuf::Arena::CreateMessage(&arena); ujMsg->set_queryid(getQueryId()); ujMsg->set_czarid(_czarId); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b"); for (auto&& job:_jobs) { - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b1"); proto::TaskMsg* tMsg = ujMsg->add_taskmsgs(); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b2"); job->getDescription()->fillTaskMsg(tMsg); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b3"); } - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob c"); ujMsg->SerializeToString(&_payload); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob d"); } - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob e"); auto executive = _executive.lock(); if (executive == nullptr) { LOGS(_log, LOG_LVL_ERROR, "runUberJob failed executive==nullptr"); return false; } - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob f"); bool cancelled = executive->getCancelled(); bool handlerReset = _respHandler->reset(); bool started = _started.exchange(true); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob g"); if (!cancelled && handlerReset && !started) { - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob h"); auto criticalErr = [this, &executive](std::string const& msg) { LOGS(_log, LOG_LVL_ERROR, msg << " " << *this << " Canceling user query!"); @@ -128,7 +117,6 @@ bool UberJob::runUberJob() { return false; } - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob i"); // At this point we are all set to actually run the queries. We create a // a shared pointer to this object to prevent it from escaping while we // are trying to start this whole process. We also make sure we record @@ -136,12 +124,9 @@ bool UberJob::runUberJob() { // LOGS(_log, LOG_LVL_TRACE, "runUberJob calls StartQuery()"); std::shared_ptr uJob(dynamic_pointer_cast(shared_from_this())); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob j"); _inSsi = true; if (executive->startUberJob(uJob)) { - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob k"); _jobStatus->updateInfo(_idStr, JobStatus::REQUEST); - LOGS(_log, LOG_LVL_INFO, "&&& runUberJob l"); return true; } _inSsi = false; diff --git a/core/modules/qproc/TaskMsgFactory.cc b/core/modules/qproc/TaskMsgFactory.cc index c85cb28908..79efc91a0d 100644 --- a/core/modules/qproc/TaskMsgFactory.cc +++ b/core/modules/qproc/TaskMsgFactory.cc @@ -70,7 +70,7 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const& taskMsg->set_jobid(jobId); taskMsg->set_attemptcount(attemptCount); taskMsg->set_czarid(czarId); - LOGS(_log, LOG_LVL_WARN, "&&& _makeMsg ses=" << _session << " db=" << chunkQuerySpec.db << " qId=" << queryId << " jId=" << jobId << " att=" << attemptCount << " cz=" << czarId); + LOGS(_log, LOG_LVL_INFO, "&&& _makeMsg ses=" << _session << " db=" << chunkQuerySpec.db << " qId=" << queryId << " jId=" << jobId << " att=" << attemptCount << " cz=" << czarId); // scanTables (for shared scans) // check if more than 1 db in scanInfo std::string db; diff --git a/core/modules/wbase/SendChannelShared.cc b/core/modules/wbase/SendChannelShared.cc index 032ce08c15..4f8240e6d4 100644 --- a/core/modules/wbase/SendChannelShared.cc +++ b/core/modules/wbase/SendChannelShared.cc @@ -59,10 +59,6 @@ SendChannelShared::~SendChannelShared() { } -void SendChannelShared::setTaskCount(int taskCount) { - _taskCount = taskCount; -} - void SendChannelShared::incrTaskCountBy(int partialCount) { _taskCount += partialCount; } @@ -153,7 +149,7 @@ bool SendChannelShared::_transmit(bool erred, bool scanInteractive, bool largeRe auto sz = _transmitQueue.size(); // Is this really the last message for this SharedSendChannel? bool reallyLast = (_lastRecvd && sz == 0); - LOGS(_log, LOG_LVL_WARN, "&&& reallyLast=" << reallyLast << " _lastRecvd=" << _lastRecvd << " sz=" << sz); + LOGS(_log, LOG_LVL_INFO, "&&& reallyLast=" << reallyLast << " _lastRecvd=" << _lastRecvd << " sz=" << sz); // Append the header for the next message to thisTransmit->dataMsg. proto::ProtoHeader* nextPHdr; diff --git a/core/modules/wbase/SendChannelShared.h b/core/modules/wbase/SendChannelShared.h index 27a4125b85..4cfbcc4a36 100644 --- a/core/modules/wbase/SendChannelShared.h +++ b/core/modules/wbase/SendChannelShared.h @@ -93,11 +93,6 @@ class SendChannelShared { return _sendChannel->isDead(); } - - /// Set the number of Tasks that will be sent using this SendChannel. - /// This should not be changed once set. - void setTaskCount(int taskCount); - /// All of the tasks that use this SendChannel must be added /// to the scheduler queue at the same time or it risks a race condition. void incrTaskCountBy(int subCount);