diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 7d6f44be6..58c79ba34 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -779,7 +779,7 @@ void MergingHandler::flushHttpError(int errorCode, std::string const& errorMsg, _flushError(jq); */ - if(!_errorSet.exchange(true)) { + if (!_errorSet.exchange(true)) { _error = util::Error(errorCode, errorMsg, util::ErrorCode::MYSQLEXEC); _setError(ccontrol::MSG_RESULT_ERROR, _error.getMsg()); } diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index 2a228c88f..edc5f5a68 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -130,7 +130,7 @@ class MergingHandler : public qdisp::ResponseHandler { std::shared_ptr _infileMerger; ///< Merging delegate std::string _tableName; ///< Target table name Error _error; ///< Error description - std::atomic _errorSet{false}; ///< &&& doc + std::atomic _errorSet{false}; ///< &&& doc mutable std::mutex _errorMutex; ///< Protect readers from partial updates bool _flushed{false}; ///< flushed to InfileMerger? std::string _wName{"~"}; ///< worker name diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index a90885dcf..2300055e4 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -359,7 +359,7 @@ void UserQuerySelect::submit() { //&&&uj LOGS(_log, LOG_LVL_WARN, "&&& UserQuerySelect::submitNew g"); //&&&uj LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); - _executive->waitForAllJobsToStart(); // &&& this may not be needed anymore? + _executive->waitForAllJobsToStart(); // &&& this may not be needed anymore? // we only care about per-chunk info for ASYNC queries if (_async) { @@ -377,7 +377,8 @@ void UserQuerySelect::buildAndSendUberJobs() { // Ensure `_monitor()` doesn't do anything until everything is ready. if (!_executive->isReadyToExecute()) { - LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " executive isn't ready to generate UberJobs."); + LOGS(_log, LOG_LVL_DEBUG, + "UserQuerySelect::" << __func__ << " executive isn't ready to generate UberJobs."); return; } diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index cd86a3573..1c8c49a59 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -91,12 +91,11 @@ Czar::Ptr Czar::createCzar(string const& configFilePath, string const& czarName) void Czar::_monitor() { LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor a"); - while(_monitorLoop) { + while (_monitorLoop) { LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor b"); this_thread::sleep_for(_monitorSleepTime); LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor c"); - /// Check database for changes in worker chunk assignments and aliveness //&&&_czarChunkMap->read(); _czarFamilyMap->read(); @@ -113,7 +112,7 @@ void Czar::_monitor() { // Make a copy of all valid Executives lock_guard execMapLock(_executiveMapMtx); auto iter = _executiveMap.begin(); - while(iter != _executiveMap.end()) { + while (iter != _executiveMap.end()) { auto qIdKey = iter->first; shared_ptr exec = iter->second.lock(); if (exec == nullptr) { @@ -246,7 +245,6 @@ Czar::Czar(string const& configFilePath, string const& czarName) // Start the monitor thread thread monitorThrd(&Czar::_monitor, this); _monitorThrd = move(monitorThrd); - } Czar::~Czar() { diff --git a/src/czar/Czar.h b/src/czar/Czar.h index dd6fea3fb..74eacdb5f 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -63,7 +63,6 @@ namespace lsst::qserv::qdisp { class Executive; } // namespace lsst::qserv::qdisp - namespace lsst::qserv::czar { class CzarFamilyMap; @@ -209,14 +208,13 @@ class Czar { /// Connection to the registry to register the czar and get worker contact information. std::shared_ptr _czarRegistry; + std::mutex _executiveMapMtx; ///< protects _executiveMap + std::map> + _executiveMap; ///< Map of executives for queries in progress. - std::mutex _executiveMapMtx; ///< protects _executiveMap - std::map> _executiveMap; ///< Map of executives for queries in progress. - - std::thread _monitorThrd; ///< &&& doc - std::atomic _monitorLoop{true}; ///< &&& doc + std::thread _monitorThrd; ///< &&& doc + std::atomic _monitorLoop{true}; ///< &&& doc std::chrono::milliseconds _monitorSleepTime{15000}; ///< Wait time between checks. &&& set from config - }; } // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index bc2cec67a..59c0e2871 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -59,8 +59,7 @@ CzarChunkMap::CzarChunkMap(std::shared_ptr const& qmeta) : _qmeta( } */ -CzarChunkMap::CzarChunkMap() { -} +CzarChunkMap::CzarChunkMap() {} CzarChunkMap::~CzarChunkMap() { LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap::~CzarChunkMap()"); } @@ -106,8 +105,8 @@ bool CzarChunkMap::_readOld() { } */ -pair, shared_ptr> CzarChunkMap::makeNewMapsOld( - qmeta::QMetaChunkMap const& qChunkMap) { +pair, shared_ptr> +CzarChunkMap::makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap) { // Create new maps. auto wcMapPtr = make_shared(); auto chunkMapPtr = make_shared(); @@ -314,10 +313,8 @@ void CzarChunkMap::verify() { // it's easier to isolate problems. throw ChunkMapException(ERR_LOC, "verification failed with " + to_string(errorCount) + " errors"); } - } - string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) { stringstream os; os << "ChunkMap{"; @@ -362,7 +359,6 @@ CzarChunkMap::ChunkData::getWorkerHasThisMapCopy() const { } void CzarChunkMap::organize() { - auto chunksSortedBySize = make_shared(); //&&&calcChunkMap(*chunkMapPtr, *chunksSortedBySize); @@ -385,8 +381,8 @@ void CzarChunkMap::organize() { continue; // maybe the next one will be okay. } LOGS(_log, LOG_LVL_DEBUG, - __func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize - << " smallest=" << smallest); + __func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize + << " smallest=" << smallest); if (wkrData->_sharedScanTotalSize < smallest) { smallestWkr = wkrData; smallest = smallestWkr->_sharedScanTotalSize; @@ -394,13 +390,13 @@ void CzarChunkMap::organize() { } if (smallestWkr == nullptr) { throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + - to_string(chunkData->_chunkId)); + to_string(chunkData->_chunkId)); } smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; chunkData->_primaryScanWorker = smallestWkr; LOGS(_log, LOG_LVL_DEBUG, - " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); + " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); } } @@ -436,8 +432,6 @@ string CzarChunkMap::WorkerChunksData::dump() const { return os.str(); } - - CzarFamilyMap::CzarFamilyMap(std::shared_ptr const& qmeta) : _qmeta(qmeta) { try { auto mapsSet = _read(); @@ -469,8 +463,8 @@ bool CzarFamilyMap::_read() { if (_lastUpdateTime >= qChunkMap.updateTime) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " no need to read " - << util::TimeUtils::timePointToDateTimeString(_lastUpdateTime) - << " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime)); + << util::TimeUtils::timePointToDateTimeString(_lastUpdateTime) + << " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime)); return false; } @@ -501,10 +495,12 @@ std::shared_ptr CzarFamilyMap::makeNewMaps( LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& c " << workerId << " dbs.sz=" << dbs.size()); // Databases -> Tables map for (auto const& [dbName, tables] : dbs) { - LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& d " << dbName << " tbls.sz=" << tables.size() ); + LOGS(_log, LOG_LVL_WARN, + "CzarFamilyMap::makeNewMaps &&& d " << dbName << " tbls.sz=" << tables.size()); // Tables -> Chunks map for (auto const& [tableName, chunks] : tables) { - LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& e " << tableName << " chunks.sz=" << chunks.size()); + LOGS(_log, LOG_LVL_WARN, + "CzarFamilyMap::makeNewMaps &&& e " << tableName << " chunks.sz=" << chunks.size()); // vector of ChunkInfo for (qmeta::QMetaChunkMap::ChunkInfo const& chunkInfo : chunks) { LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& f"); @@ -542,7 +538,7 @@ std::shared_ptr CzarFamilyMap::makeNewMaps( // this needs to be done for each CzarChunkMap in the family map. LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& i"); - for(auto&& [familyName, chunkMapPtr] : *newFamilyMap) { + for (auto&& [familyName, chunkMapPtr] : *newFamilyMap) { LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& j"); LOGS(_log, LOG_LVL_DEBUG, "CzarFamilyMap::makeNewMaps working on " << familyName); LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& k"); @@ -556,13 +552,15 @@ std::shared_ptr CzarFamilyMap::makeNewMaps( } void CzarFamilyMap::insertIntoMaps(std::shared_ptr const& newFamilyMap, string const& workerId, - string const& dbName, string const& tableName, int64_t chunkIdNum, - CzarChunkMap::SizeT sz) { + string const& dbName, string const& tableName, int64_t chunkIdNum, + CzarChunkMap::SizeT sz) { LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& a"); // Get the CzarChunkMap for this family auto familyName = getFamilyNameFromDbName(dbName); LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& b"); - LOGS(_log, LOG_LVL_INFO, cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName << " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}"); + LOGS(_log, LOG_LVL_INFO, + cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName + << " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}"); auto& nfMap = *newFamilyMap; CzarChunkMap::Ptr czarChunkMap; LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& c"); diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h index 54057e836..de9f8a609 100644 --- a/src/czar/CzarChunkMap.h +++ b/src/czar/CzarChunkMap.h @@ -91,7 +91,7 @@ class CzarChunkMap { CzarChunkMap(CzarChunkMap const&) = delete; CzarChunkMap& operator=(CzarChunkMap const&) = delete; - //static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } + // static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } static Ptr create() { return Ptr(new CzarChunkMap()); } ~CzarChunkMap(); @@ -196,7 +196,7 @@ class CzarChunkMap { /// @param `sz` - size in bytes of the table being inserted. static void insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, - int64_t chunkIdNum, SizeT sz); /// &&& delete + int64_t chunkIdNum, SizeT sz); /// &&& delete /// Calculate the total bytes in each chunk and then sort the resulting ChunkVector by chunk size, /// descending. @@ -204,15 +204,16 @@ class CzarChunkMap { /// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`. static std::pair, std::shared_ptr> - makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap); //&&& delete + makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap); //&&& delete /// Verify that all chunks belong to at least one worker and that all chunks are represented in shared /// scans. /// @throws ChunkMapException - static void verifyOld(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); // &&& delete + static void verifyOld(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); // &&& delete void verify(); - //void setMaps(std::shared_ptr const& chunkMapPtr, std::shared_ptr const& wcMapPtr); // &&& delete + // void setMaps(std::shared_ptr const& chunkMapPtr, std::shared_ptr const& + // wcMapPtr); // &&& delete static std::string dumpChunkMap(ChunkMap const& chunkMap); @@ -242,14 +243,12 @@ class CzarChunkMap { /// Return shared pointers to `_chunkMap` and `_workerChunkMap`, which should be held until /// finished with the data. - std::pair, - std::shared_ptr> + std::pair, std::shared_ptr> _getMaps() const { std::lock_guard lck(_mapMtx); return {_chunkMap, _workerChunkMap}; } - /// Read the json worker list from the database and update the maps if there's a new /// version since the `_lastUpdateTime`. /// @throws `qmeta::QMetaError` @@ -269,7 +268,7 @@ class CzarChunkMap { /// The last time the maps were updated with information from the replicator. TIMEPOINT _lastUpdateTime; // initialized to 0; */ - mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap (&&& still needed???) + mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap (&&& still needed???) friend CzarFamilyMap; }; @@ -312,30 +311,30 @@ class CzarFamilyMap { return _getChunkMap(familyName); } - /// &&& doc bool read(); /// &&& doc /// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`. - //&&&static std::pair, std::shared_ptr> + //&&&static std::pair, + //std::shared_ptr> std::shared_ptr makeNewMaps(qmeta::QMetaChunkMap const& qChunkMap); // &&& doc - void insertIntoMaps(std::shared_ptr const& newFamilyMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, int64_t chunkIdNum, CzarChunkMap::SizeT sz); + void insertIntoMaps(std::shared_ptr const& newFamilyMap, std::string const& workerId, + std::string const& dbName, std::string const& tableName, int64_t chunkIdNum, + CzarChunkMap::SizeT sz); // &&& static void verify(std::shared_ptr const& familyMap); private: - /// Try to `_read` values for maps from `qmeta`. CzarFamilyMap(std::shared_ptr const& qmeta); /// &&& doc bool _read(); - /// Return the chunk map for the `familyName` CzarChunkMap::Ptr _getChunkMap(std::string const& familyName) const { std::lock_guard familyLock(_familyMapMtx); @@ -352,7 +351,7 @@ class CzarFamilyMap { TIMEPOINT _lastUpdateTime; // initialized to 0; std::shared_ptr _familyMap{new FamilyMapType()}; - mutable std::mutex _familyMapMtx; ///< protects _familyMap, _timeStamp, and _qmeta. + mutable std::mutex _familyMapMtx; ///< protects _familyMap, _timeStamp, and _qmeta. }; } // namespace lsst::qserv::czar diff --git a/src/czar/testCzar.cc b/src/czar/testCzar.cc index 9ab805282..96ee576d4 100644 --- a/src/czar/testCzar.cc +++ b/src/czar/testCzar.cc @@ -203,7 +203,6 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) { auto dbToFamily = make_shared(); czar::CzarFamilyMap czFamMap(dbToFamily); - auto jsTest1 = nlohmann::json::parse(test1); qmeta::QMetaChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1); //&&&auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1); diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 05dc89b87..9d005ac30 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -102,7 +102,8 @@ namespace lsst::qserv::qdisp { /* &&& mutex Executive::_executiveMapMtx; ///< protects _executiveMap -map> Executive::_executiveMap; ///< Map of executives for queries in progress. +map> Executive::_executiveMap; ///< Map of executives for queries in +progress. */ //////////////////////////////////////////////////////////////////////// @@ -304,11 +305,10 @@ void Executive::queueFileCollect(PriorityCommand::Ptr const& cmd) { LOGS(_log, LOG_LVL_WARN, "&&& Executive::queueFileCollect end"); } - void Executive::runUberJob(std::shared_ptr const& uberJob) { LOGS(_log, LOG_LVL_WARN, "&&& Executive::runUberJob start"); - bool const useqdisppool = true; /// &&& delete + bool const useqdisppool = true; /// &&& delete if (useqdisppool) { auto runUberJobFunc = [uberJob](util::CmdData*) { LOGS(_log, LOG_LVL_WARN, "&&&uj Executive::runUberJob::runUberJobFunc a"); @@ -454,7 +454,8 @@ void Executive::addMultiError(int errorCode, std::string const& errorMsg, int er { lock_guard lock(_errorsMutex); _multiError.push_back(err); - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) + " multiError:" << _multiError.size() << ":" << _multiError); + LOGS(_log, LOG_LVL_DEBUG, + cName(__func__) + " multiError:" << _multiError.size() << ":" << _multiError); } } @@ -618,7 +619,9 @@ void Executive::_squashSuperfluous() { } void Executive::sendWorkerCancelMsg(bool deleteResults) { - LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE Executive::sendWorkerCancelMsg to send messages to workers to cancel this czarId + queryId."); + LOGS(_log, LOG_LVL_ERROR, + "&&& NEED CODE Executive::sendWorkerCancelMsg to send messages to workers to cancel this czarId + " + "queryId."); } int Executive::getNumInflight() const { diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index 73c4ca43d..3930d7df8 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -122,9 +122,7 @@ class Executive : public std::enable_shared_from_this { static Ptr getExecutiveFromMap(QueryId qId); */ - std::string cName(const char* funcName="") { - return std::string("Executive::") + funcName; - } + std::string cName(const char* funcName = "") { return std::string("Executive::") + funcName; } /// &&&uj doc void setUserQuerySelect(std::shared_ptr const& uqs) { _userQuerySelect = uqs; } @@ -174,7 +172,7 @@ class Executive : public std::enable_shared_from_this { //&&&void setTmpTableNameGenerator(std::shared_ptr const& ttn) { _ttn = ttn; } //&&&void setInfileMerger(std::shared_ptr infileMerger) { _infileMerger = - //infileMerger; } + // infileMerger; } QueryId getId() const { return _id; } std::string const& getIdStr() const { return _idStr; } @@ -237,7 +235,7 @@ class Executive : public std::enable_shared_from_this { // The below value should probably be based on the user query, with longer sleeps for slower queries. int getAttemptSleepSeconds() const { return 15; } // As above or until added to config file. - int getMaxAttempts() const { return 5; } // Should be set by config + int getMaxAttempts() const { return 5; } // Should be set by config /// Calling this indicates the executive is ready to create and execute UberJobs. void setReadyToExecute() { _readyToExecute = true; } @@ -362,7 +360,6 @@ class Executive : public std::enable_shared_from_this { /// Flag that is set to true when ready to create and run UberJobs. std::atomic _readyToExecute{false}; - }; /// &&&uj MarkCompleteFunc is not needed with uberjobs. diff --git a/src/qdisp/JobDescription.cc b/src/qdisp/JobDescription.cc index fd3b64c16..de8db6044 100644 --- a/src/qdisp/JobDescription.cc +++ b/src/qdisp/JobDescription.cc @@ -65,7 +65,7 @@ JobDescription::JobDescription(qmeta::CzarId czarId, QueryId qId, JobId jobId, R _chunkResultName(chunkResultName), _mock(mock) {} -bool JobDescription::incrAttemptCountScrubResults() { // &&& to be deleted +bool JobDescription::incrAttemptCountScrubResults() { // &&& to be deleted if (_attemptCount >= 0) { _respHandler->prepScrubResults(_jobId, _attemptCount); // Registers the job-attempt as invalid } @@ -79,7 +79,6 @@ bool JobDescription::incrAttemptCountScrubResults() { // &&& to be deleted } bool JobDescription::incrAttemptCountScrubResultsJson(std::shared_ptr const& exec, bool increase) { - if (increase) { ++_attemptCount; } @@ -92,8 +91,12 @@ bool JobDescription::incrAttemptCountScrubResultsJson(std::shared_ptr int maxAttempts = exec->getMaxAttempts(); LOGS(_log, LOG_LVL_INFO, "JoQDescription::" << __func__ << " attempts=" << _attemptCount); if (_attemptCount > maxAttempts) { - LOGS(_log, LOG_LVL_ERROR, "JoQDescription::" << __func__ << " attempts(" << _attemptCount << ") > maxAttempts(" << maxAttempts << ") cancelling"); - exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "max attempts reached " + to_string(_attemptCount) + " " + _qIdStr, util::ErrorCode::INTERNAL); + LOGS(_log, LOG_LVL_ERROR, + "JoQDescription::" << __func__ << " attempts(" << _attemptCount << ") > maxAttempts(" + << maxAttempts << ") cancelling"); + exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, + "max attempts reached " + to_string(_attemptCount) + " " + _qIdStr, + util::ErrorCode::INTERNAL); exec->squash(); return false; } diff --git a/src/qdisp/JobDescription.h b/src/qdisp/JobDescription.h index 106e4b9ed..88aa48c3b 100644 --- a/src/qdisp/JobDescription.h +++ b/src/qdisp/JobDescription.h @@ -90,10 +90,10 @@ class JobDescription { /// @returns true when _attemptCount is incremented correctly and the payload is built. /// If the starting value of _attemptCount was greater than or equal to zero, that /// attempt is scrubbed from the result table. - bool incrAttemptCountScrubResults(); // &&&uj - to be deleted + bool incrAttemptCountScrubResults(); // &&&uj - to be deleted /// doc &&&uj - scrubbing results probably unneeded with uj. This should be renamed. bool incrAttemptCountScrubResultsJson(std::shared_ptr const& exec, bool increase); - bool verifyPayload() const; ///< @return true if the payload is acceptable to protobufs. + bool verifyPayload() const; ///< @return true if the payload is acceptable to protobufs. //&&&bool fillTaskMsg(proto::TaskMsg* tMsg); //&&&uj diff --git a/src/qdisp/JobQuery.cc b/src/qdisp/JobQuery.cc index 0ba2b3258..5ebc7c6d2 100644 --- a/src/qdisp/JobQuery.cc +++ b/src/qdisp/JobQuery.cc @@ -60,7 +60,7 @@ JobQuery::JobQuery(Executive::Ptr const& executive, JobDescription::Ptr const& j JobQuery::~JobQuery() { LOGS(_log, LOG_LVL_DEBUG, "~JobQuery"); - LOGS(_log, LOG_LVL_WARN, "~JobQuery QID=" <<_idStr); + LOGS(_log, LOG_LVL_WARN, "~JobQuery QID=" << _idStr); } /** Attempt to run the job on a worker. diff --git a/src/qdisp/UberJob.cc b/src/qdisp/UberJob.cc index 3a8bcc896..5aae13d33 100644 --- a/src/qdisp/UberJob.cc +++ b/src/qdisp/UberJob.cc @@ -146,7 +146,8 @@ bool UberJob::runUberJob() { auto const description = jbPtr->getDescription(); LOGS(_log, LOG_LVL_WARN, getIdStr() << "&&& UberJob ::runUberJob() f1a"); if (description == nullptr) { - throw util::Bug(ERR_LOC, cName(__func__) + " &&&uj description=null for job=" + jbPtr->getIdStr()); + throw util::Bug(ERR_LOC, + cName(__func__) + " &&&uj description=null for job=" + jbPtr->getIdStr()); } auto const jsForWorker = jbPtr->getDescription()->getJsForWorker(); LOGS(_log, LOG_LVL_WARN, getIdStr() << "&&& UberJob ::runUberJob() f1b"); @@ -164,7 +165,8 @@ bool UberJob::runUberJob() { LOGS(_log, LOG_LVL_WARN, __func__ << " &&&REQ " << request); string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; LOGS(_log, LOG_LVL_TRACE, - cName(__func__) << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]); + cName(__func__) << " czarPost url=" << url << " request=" << request.dump() + << " headers=" << headers[0]); http::Client client(method, url, request.dump(), headers); bool transmitSuccess = false; string exceptionWhat; @@ -186,7 +188,7 @@ bool UberJob::runUberJob() { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " transmit failure, try to send jobs elsewhere"); _unassignJobs(); // locks _jobsMtx setStatusIfOk(qmeta::JobStatus::RESPONSE_ERROR, - cName(__func__) + " not transmitSuccess " + exceptionWhat); + cName(__func__) + " not transmitSuccess " + exceptionWhat); } else { LOGS(_log, LOG_LVL_WARN, "&&&uj UberJob::runUberJob() register all jobs as transmitted to worker"); @@ -215,20 +217,21 @@ void UberJob::_unassignJobs() { string jid = job->getIdStr(); if (!job->unassignFromUberJob(getJobId())) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " could not unassign job=" << jid << " cancelling"); - exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "unable to re-assign " + jid, util::ErrorCode::INTERNAL); + exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "unable to re-assign " + jid, + util::ErrorCode::INTERNAL); exec->squash(); return; } /* &&& auto attempts = job->getAttemptCount(); if (attempts > maxAttempts) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " job=" << jid << " attempts=" << attempts << " maxAttempts reached, cancelling"); - exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "max attempts reached " + to_string(attempts) + " job=" + jid, util::ErrorCode::INTERNAL); - exec->squash(); - return; + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " job=" << jid << " attempts=" << attempts << " + maxAttempts reached, cancelling"); exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "max attempts + reached " + to_string(attempts) + " job=" + jid, util::ErrorCode::INTERNAL); exec->squash(); return; } */ - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " job=" << jid << " attempts=" << job->getAttemptCount()); + LOGS(_log, LOG_LVL_DEBUG, + cName(__func__) << " job=" << jid << " attempts=" << job->getAttemptCount()); } _jobs.clear(); bool const setFlag = true; @@ -260,16 +263,16 @@ bool UberJob::_setStatusIfOk(qmeta::JobStatus::State newState, string const& msg // has already done, so doing it a second time would be an error. if (newState <= currentState) { LOGS(_log, LOG_LVL_WARN, - cName(__func__) << " could not change from state=" - << _jobStatus->stateStr(currentState) << " to " << _jobStatus->stateStr(newState)); + cName(__func__) << " could not change from state=" << _jobStatus->stateStr(currentState) + << " to " << _jobStatus->stateStr(newState)); return false; } // Overwriting errors is probably not a good idea. if (currentState >= qmeta::JobStatus::CANCEL && currentState < qmeta::JobStatus::COMPLETE) { LOGS(_log, LOG_LVL_WARN, - cName(__func__) << " already error current=" - << _jobStatus->stateStr(currentState) << " new=" << _jobStatus->stateStr(newState)); + cName(__func__) << " already error current=" << _jobStatus->stateStr(currentState) + << " new=" << _jobStatus->stateStr(newState)); return false; } @@ -311,9 +314,9 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_ LOGS(_log, LOG_LVL_WARN, "&&&uj UberJob::importResultFile a"); LOGS(_log, LOG_LVL_WARN, cName(__func__) << "&&&uj fileUrl=" << fileUrl << " rowCount=" << rowCount - << " fileSize=" << fileSize); - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " fileUrl=" << fileUrl - << " rowCount=" << rowCount << " fileSize=" << fileSize); + << " fileSize=" << fileSize); + LOGS(_log, LOG_LVL_DEBUG, + cName(__func__) << " fileUrl=" << fileUrl << " rowCount=" << rowCount << " fileSize=" << fileSize); if (isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, "UberJob::importResultFile import job was cancelled."); @@ -344,7 +347,8 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_ bool const statusSet = setStatusIfOk(qmeta::JobStatus::RESPONSE_READY, getIdStr() + " " + fileUrl); if (!statusSet) { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&&uj setStatusFail could not set status to RESPONSE_READY"); + LOGS(_log, LOG_LVL_WARN, + cName(__func__) << " &&&uj setStatusFail could not set status to RESPONSE_READY"); return _importResultError(false, "setStatusFail", "could not set status to RESPONSE_READY"); } @@ -357,7 +361,8 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_ LOGS(_log, LOG_LVL_WARN, "&&&uj UberJob::importResultFile::fileCollectFunc a"); auto ujPtr = ujThis.lock(); if (ujPtr == nullptr) { - LOGS(_log, LOG_LVL_DEBUG, "UberJob::importResultFile::fileCollectFunction uberjob ptr is null " << fileUrl); + LOGS(_log, LOG_LVL_DEBUG, + "UberJob::importResultFile::fileCollectFunction uberjob ptr is null " << fileUrl); return; } uint64_t resultRows = 0; @@ -407,7 +412,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) { if ((dataIgnored - 1) % 1000 == 0) { LOGS(_log, LOG_LVL_INFO, cName(__func__) << " ignoring, enough rows already " - << "dataIgnored=" << dataIgnored); + << "dataIgnored=" << dataIgnored); } return _workerErrorFinish(keepData, "none", "limitRowComplete"); } @@ -421,7 +426,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) { // TODO:UJ see if recoverable errors can be detected on the workers, or // maybe allow a single retry before sending the error back to the user? bool recoverableError = false; - recoverableError = true; //&&& delete after testing &&&&&&& + recoverableError = true; //&&& delete after testing &&&&&&& if (recoverableError) { // The czar should have new maps before the the new UberJob(s) for // these Jobs are created. (see Czar::_monitor) @@ -446,8 +451,8 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str auto exec = _executive.lock(); if (exec != nullptr) { LOGS(_log, LOG_LVL_ERROR, - cName(__func__) << " shouldCancel=" << shouldCancel - << " errorType=" << errorType << " " << note); + cName(__func__) << " shouldCancel=" << shouldCancel << " errorType=" << errorType << " " + << note); if (shouldCancel) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " failing jobs"); callMarkCompleteFunc(false); // all jobs failed, no retry @@ -464,15 +469,15 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str } } else { LOGS(_log, LOG_LVL_INFO, - cName(__func__) << " already cancelled shouldCancel=" - << shouldCancel << " errorType=" << errorType << " " << note); + cName(__func__) << " already cancelled shouldCancel=" << shouldCancel + << " errorType=" << errorType << " " << note); } return jsRet; } nlohmann::json UberJob::_importResultFinish(uint64_t resultRows) { LOGS(_log, LOG_LVL_WARN, "&&&uj UberJob::_importResultFinish a"); - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << "&&&uj start"); // &&& keep + LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << "&&&uj start"); // &&& keep /// If this is called, the file has been collected and the worker should delete it /// /// This function should call markComplete for all jobs in the uberjob diff --git a/src/qdisp/UberJob.h b/src/qdisp/UberJob.h index 6942b3b75..ad25e645a 100644 --- a/src/qdisp/UberJob.h +++ b/src/qdisp/UberJob.h @@ -60,9 +60,7 @@ class UberJob : public JobBase { bool addJob(std::shared_ptr const& job); bool runUberJob(); - std::string cName(const char* funcN) const { - return std::string("UberJob::") + funcN + " " + getIdStr(); - } + std::string cName(const char* funcN) const { return std::string("UberJob::") + funcN + " " + getIdStr(); } QueryId getQueryId() const override { return _queryId; } // TODO:UJ relocate to JobBase UberJobId getJobId() const override { return _uberJobId; } // &&&uj change name diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index 8b06966b4..8fff95cd5 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -883,7 +883,9 @@ QMetaChunkMap QMetaMysql::getChunkMap(chrono::time_point c unsigned int chunk = lsst::qserv::stoui(row[3]); size_t const size = stoull(row[4]); chunkMap.workers[worker][database][table].push_back(QMetaChunkMap::ChunkInfo{chunk, size}); - LOGS(_log, LOG_LVL_WARN, "&&& QMetaInsrt{worker=" << worker << " dbN=" << database << " tblN=" << table << " chunk=" << chunk << " sz=" << size); + LOGS(_log, LOG_LVL_WARN, + "&&& QMetaInsrt{worker=" << worker << " dbN=" << database << " tblN=" << table + << " chunk=" << chunk << " sz=" << size); } chunkMap.updateTime = updateTime; } catch (exception const& ex) { @@ -897,7 +899,8 @@ chrono::time_point QMetaMysql::_getChunkMapUpdateTime(lock sql::SqlErrorObject errObj; sql::SqlResults results; string const tableName = "chunkMapStatus"; - string const query = "SELECT TIME_TO_SEC(`update_time`) FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1"; + string const query = + "SELECT TIME_TO_SEC(`update_time`) FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1"; LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); if (!_conn->runQuery(query, results, errObj)) { LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); @@ -914,8 +917,8 @@ chrono::time_point QMetaMysql::_getChunkMapUpdateTime(lock throw ConsistencyError(ERR_LOC, "Too many rows in result set of query " + query); } try { - int j=0; // &&& del - for (auto const& str : updateTime) { // &&& del + int j = 0; // &&& del + for (auto const& str : updateTime) { // &&& del LOGS(_log, LOG_LVL_WARN, "&&& _updatetime j=" << j << " Insrt=" << str << " stol=" << stol(str)); ++j; } diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index ac933e076..a9a46ad2f 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -418,61 +418,61 @@ std::vector Task::createTasksForChunk( // See qproc::TaskMsgFactory::makeMsgJson for message construction. LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k1"); auto const jdCzarId = rbJobDesc.required("czarId"); - //LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k2"); + // LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k2"); auto const jdQueryId = rbJobDesc.required("queryId"); if (jdQueryId != qId) { throw TaskException(ERR_LOC, string("ujId=") + to_string(ujId) + " qId=" + to_string(qId) + " QueryId mismatch Job qId=" + to_string(jdQueryId)); } - //LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k3"); + // LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k3"); auto const jdJobId = rbJobDesc.required("jobId"); - //LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k4"); + // LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k4"); auto const jdAttemptCount = rbJobDesc.required("attemptCount"); - //LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k5"); + // LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k5"); auto const jdQuerySpecDb = rbJobDesc.required("querySpecDb"); - //LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k6"); + // LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k6"); auto const jdScanPriority = rbJobDesc.required("scanPriority"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k7"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k7"); auto const jdScanInteractive = rbJobDesc.required("scanInteractive"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k8"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k8"); auto const jdMaxTableSizeMb = rbJobDesc.required("maxTableSize"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k9"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k9"); auto const jdChunkId = rbJobDesc.required("chunkId"); LOGS(_log, LOG_LVL_WARN, - funcN << "&&&SUBC jd cid=" << jdCzarId << " jdQId=" << jdQueryId << " jdJobId=" << jdJobId - << " jdAtt=" << jdAttemptCount << " jdQDb=" << jdQuerySpecDb - << " jdScanPri=" << jdScanPriority << " interactive=" << jdScanInteractive - << " maxTblSz=" << jdMaxTableSizeMb << " chunkId=" << jdChunkId); + funcN << "&&&SUBC jd cid=" << jdCzarId << " jdQId=" << jdQueryId << " jdJobId=" << jdJobId + << " jdAtt=" << jdAttemptCount << " jdQDb=" << jdQuerySpecDb + << " jdScanPri=" << jdScanPriority << " interactive=" << jdScanInteractive + << " maxTblSz=" << jdMaxTableSizeMb << " chunkId=" << jdChunkId); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10"); auto const jdQueryFragments = rbJobDesc.required("queryFragments"); int fragmentNumber = 0; //&&&uj should this be 1??? Is this at all useful? for (auto const& frag : jdQueryFragments) { vector fragSubQueries; vector fragSubchunkIds; vector fragSubTables; - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10a"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10a"); LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC frag=" << frag); http::RequestBody rbFrag(frag); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10b"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10b"); auto const& jsQueries = rbFrag.required("queries"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10c"); - // &&&uj move to uberjob???, these should be the same for all jobs + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10c"); + // &&&uj move to uberjob???, these should be the same for all jobs for (auto const& subQ : jsQueries) { LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10c1"); http::RequestBody rbSubQ(subQ); auto const subQuery = rbSubQ.required("subQuery"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10c2"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10c2"); LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC subQuery=" << subQuery); fragSubQueries.push_back(subQuery); } LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC k10d1"); auto const& resultTable = rbFrag.required("resultTable"); - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10d2"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10d2"); auto const& jsSubIds = rbFrag.required("subchunkIds"); LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC scId jsSubIds=" << jsSubIds); for (auto const& scId : jsSubIds) { - //LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10e1"); + // LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10e1"); LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC scId=" << scId); fragSubchunkIds.push_back(scId); } @@ -501,7 +501,9 @@ std::vector Task::createTasksForChunk( fragSubTables, fragSubchunkIds, sendChannel, resultsHttpPort)); // &&& change to make_shared vect.push_back(task); - LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC fragSubchunkIds.empty()==true vect.sz=" << vect.size() << " fragNum=" << fragmentNumber); + LOGS(_log, LOG_LVL_WARN, + funcN << "&&&SUBC fragSubchunkIds.empty()==true vect.sz=" << vect.size() + << " fragNum=" << fragmentNumber); } else { for (auto subchunkId : fragSubchunkIds) { bool const hasSubchunks = true; @@ -511,7 +513,9 @@ std::vector Task::createTasksForChunk( scanInteractive, maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, resultsHttpPort)); // &&& change to make_shared vect.push_back(task); - LOGS(_log, LOG_LVL_WARN, funcN << "&&&SUBC fragSubchunkIds.empty()==false vect.sz=" << vect.size() << " fragNum=" << fragmentNumber); + LOGS(_log, LOG_LVL_WARN, + funcN << "&&&SUBC fragSubchunkIds.empty()==false vect.sz=" << vect.size() + << " fragNum=" << fragmentNumber); } } } diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index c12ba2ff9..8316eadee 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -92,9 +92,7 @@ class UberJobData { bool responseError(util::MultiError& multiErr, std::shared_ptr const& task, bool cancelled); std::string getIdStr() const { return _idStr; } - std::string cName(std::string const& funcName) { - return "UberJobData::" + funcName + " " + getIdStr(); - } + std::string cName(std::string const& funcName) { return "UberJobData::" + funcName + " " + getIdStr(); } private: UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string czarHost, diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index dcc95bee8..4514dfb7f 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -159,7 +159,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { ujData->setFileChannelShared(channelShared); QueryId jdQueryId = 0; - proto::ScanInfo scanInfo; // &&& + proto::ScanInfo scanInfo; // &&& bool scanInfoSet = false; bool jdScanInteractive = false; int jdMaxTableSize = 0; @@ -213,9 +213,10 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { auto const& tblScanRating = rbTbl.required("tblScanRating"); LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k11a5"); LOGS(_log, LOG_LVL_WARN, - __func__ << "&&&SUBC chunkSDb=" << chunkScanDb << " lockinmem=" << lockInMemory - << " csTble=" << chunkScanTable << " tblScanRating=" << tblScanRating); - scanInfo.infoTables.emplace_back(chunkScanDb, chunkScanTable, lockInMemory, tblScanRating); + __func__ << "&&&SUBC chunkSDb=" << chunkScanDb << " lockinmem=" << lockInMemory + << " csTble=" << chunkScanTable << " tblScanRating=" << tblScanRating); + scanInfo.infoTables.emplace_back(chunkScanDb, chunkScanTable, lockInMemory, + tblScanRating); scanInfoSet = true; } } @@ -223,15 +224,16 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { scanInfo.scanRating = jdScanPriority; LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC k10"); - } // create tasks and add them to ujData - auto chunkTasks = wbase::Task::createTasksForChunk( // &&& getting called twice when it should only be called once + auto chunkTasks = wbase::Task::createTasksForChunk( // &&& getting called twice when it should only + // be called once ujData, ujJobs, channelShared, scanInfo, jdScanInteractive, jdMaxTableSize, foreman()->chunkResourceMgr(), foreman()->mySqlConfig(), foreman()->sqlConnMgr(), foreman()->queriesAndChunks(), foreman()->httpPort()); - LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC chunkTasks.sz=" << chunkTasks.size() << " QID=" << jdQueryId); + LOGS(_log, LOG_LVL_WARN, + __func__ << "&&&SUBC chunkTasks.sz=" << chunkTasks.size() << " QID=" << jdQueryId); ujTasks.insert(ujTasks.end(), chunkTasks.begin(), chunkTasks.end()); LOGS(_log, LOG_LVL_WARN, __func__ << "&&&SUBC ujTasks.sz=" << ujTasks.size() << " QID=" << jdQueryId);