Skip to content

Commit

Permalink
Reformatted.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jul 10, 2024
1 parent 05ca17b commit 5436802
Show file tree
Hide file tree
Showing 19 changed files with 140 additions and 134 deletions.
2 changes: 1 addition & 1 deletion src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ void MergingHandler::flushHttpError(int errorCode, std::string const& errorMsg,
_flushError(jq);
*/

if(!_errorSet.exchange(true)) {
if (!_errorSet.exchange(true)) {
_error = util::Error(errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
_setError(ccontrol::MSG_RESULT_ERROR, _error.getMsg());
}
Expand Down
2 changes: 1 addition & 1 deletion src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class MergingHandler : public qdisp::ResponseHandler {
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
std::string _tableName; ///< Target table name
Error _error; ///< Error description
std::atomic<bool> _errorSet{false}; ///< &&& doc
std::atomic<bool> _errorSet{false}; ///< &&& doc
mutable std::mutex _errorMutex; ///< Protect readers from partial updates
bool _flushed{false}; ///< flushed to InfileMerger?
std::string _wName{"~"}; ///< worker name
Expand Down
5 changes: 3 additions & 2 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void UserQuerySelect::submit() { //&&&uj
LOGS(_log, LOG_LVL_WARN, "&&& UserQuerySelect::submitNew g"); //&&&uj

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
_executive->waitForAllJobsToStart(); // &&& this may not be needed anymore?
_executive->waitForAllJobsToStart(); // &&& this may not be needed anymore?

// we only care about per-chunk info for ASYNC queries
if (_async) {
Expand All @@ -377,7 +377,8 @@ void UserQuerySelect::buildAndSendUberJobs() {

// Ensure `_monitor()` doesn't do anything until everything is ready.
if (!_executive->isReadyToExecute()) {
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " executive isn't ready to generate UberJobs.");
LOGS(_log, LOG_LVL_DEBUG,
"UserQuerySelect::" << __func__ << " executive isn't ready to generate UberJobs.");
return;
}

Expand Down
6 changes: 2 additions & 4 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ Czar::Ptr Czar::createCzar(string const& configFilePath, string const& czarName)

void Czar::_monitor() {
LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor a");
while(_monitorLoop) {
while (_monitorLoop) {
LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor b");
this_thread::sleep_for(_monitorSleepTime);
LOGS(_log, LOG_LVL_WARN, "&&& Czar::_monitor c");


/// Check database for changes in worker chunk assignments and aliveness
//&&&_czarChunkMap->read();
_czarFamilyMap->read();
Expand All @@ -113,7 +112,7 @@ void Czar::_monitor() {
// Make a copy of all valid Executives
lock_guard<mutex> execMapLock(_executiveMapMtx);
auto iter = _executiveMap.begin();
while(iter != _executiveMap.end()) {
while (iter != _executiveMap.end()) {
auto qIdKey = iter->first;
shared_ptr<qdisp::Executive> exec = iter->second.lock();
if (exec == nullptr) {
Expand Down Expand Up @@ -246,7 +245,6 @@ Czar::Czar(string const& configFilePath, string const& czarName)
// Start the monitor thread
thread monitorThrd(&Czar::_monitor, this);
_monitorThrd = move(monitorThrd);

}

Czar::~Czar() {
Expand Down
12 changes: 5 additions & 7 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ namespace lsst::qserv::qdisp {
class Executive;
} // namespace lsst::qserv::qdisp


namespace lsst::qserv::czar {

class CzarFamilyMap;
Expand Down Expand Up @@ -209,14 +208,13 @@ class Czar {
/// Connection to the registry to register the czar and get worker contact information.
std::shared_ptr<CzarRegistry> _czarRegistry;

std::mutex _executiveMapMtx; ///< protects _executiveMap
std::map<QueryId, std::weak_ptr<qdisp::Executive>>
_executiveMap; ///< Map of executives for queries in progress.

std::mutex _executiveMapMtx; ///< protects _executiveMap
std::map<QueryId, std::weak_ptr<qdisp::Executive>> _executiveMap; ///< Map of executives for queries in progress.

std::thread _monitorThrd; ///< &&& doc
std::atomic<bool> _monitorLoop{true}; ///< &&& doc
std::thread _monitorThrd; ///< &&& doc
std::atomic<bool> _monitorLoop{true}; ///< &&& doc
std::chrono::milliseconds _monitorSleepTime{15000}; ///< Wait time between checks. &&& set from config

};

} // namespace lsst::qserv::czar
Expand Down
40 changes: 19 additions & 21 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(
}
*/

CzarChunkMap::CzarChunkMap() {
}
CzarChunkMap::CzarChunkMap() {}

CzarChunkMap::~CzarChunkMap() { LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap::~CzarChunkMap()"); }

Expand Down Expand Up @@ -106,8 +105,8 @@ bool CzarChunkMap::_readOld() {
}
*/

pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap>> CzarChunkMap::makeNewMapsOld(
qmeta::QMetaChunkMap const& qChunkMap) {
pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap>>
CzarChunkMap::makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap) {
// Create new maps.
auto wcMapPtr = make_shared<WorkerChunkMap>();
auto chunkMapPtr = make_shared<ChunkMap>();
Expand Down Expand Up @@ -314,10 +313,8 @@ void CzarChunkMap::verify() {
// it's easier to isolate problems.
throw ChunkMapException(ERR_LOC, "verification failed with " + to_string(errorCount) + " errors");
}

}


string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) {
stringstream os;
os << "ChunkMap{";
Expand Down Expand Up @@ -362,7 +359,6 @@ CzarChunkMap::ChunkData::getWorkerHasThisMapCopy() const {
}

void CzarChunkMap::organize() {

auto chunksSortedBySize = make_shared<ChunkVector>();

//&&&calcChunkMap(*chunkMapPtr, *chunksSortedBySize);
Expand All @@ -385,22 +381,22 @@ void CzarChunkMap::organize() {
continue; // maybe the next one will be okay.
}
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize
<< " smallest=" << smallest);
__func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize
<< " smallest=" << smallest);
if (wkrData->_sharedScanTotalSize < smallest) {
smallestWkr = wkrData;
smallest = smallestWkr->_sharedScanTotalSize;
}
}
if (smallestWkr == nullptr) {
throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" +
to_string(chunkData->_chunkId));
to_string(chunkData->_chunkId));
}
smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData;
smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes;
chunkData->_primaryScanWorker = smallestWkr;
LOGS(_log, LOG_LVL_DEBUG,
" chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
" chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
}
}

Expand Down Expand Up @@ -436,8 +432,6 @@ string CzarChunkMap::WorkerChunksData::dump() const {
return os.str();
}



CzarFamilyMap::CzarFamilyMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta) {
try {
auto mapsSet = _read();
Expand Down Expand Up @@ -469,8 +463,8 @@ bool CzarFamilyMap::_read() {
if (_lastUpdateTime >= qChunkMap.updateTime) {
LOGS(_log, LOG_LVL_DEBUG,
cName(__func__) << " no need to read "
<< util::TimeUtils::timePointToDateTimeString(_lastUpdateTime)
<< " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime));
<< util::TimeUtils::timePointToDateTimeString(_lastUpdateTime)
<< " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime));
return false;
}

Expand Down Expand Up @@ -501,10 +495,12 @@ std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& c " << workerId << " dbs.sz=" << dbs.size());
// Databases -> Tables map
for (auto const& [dbName, tables] : dbs) {
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& d " << dbName << " tbls.sz=" << tables.size() );
LOGS(_log, LOG_LVL_WARN,
"CzarFamilyMap::makeNewMaps &&& d " << dbName << " tbls.sz=" << tables.size());
// Tables -> Chunks map
for (auto const& [tableName, chunks] : tables) {
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& e " << tableName << " chunks.sz=" << chunks.size());
LOGS(_log, LOG_LVL_WARN,
"CzarFamilyMap::makeNewMaps &&& e " << tableName << " chunks.sz=" << chunks.size());
// vector of ChunkInfo
for (qmeta::QMetaChunkMap::ChunkInfo const& chunkInfo : chunks) {
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& f");
Expand Down Expand Up @@ -542,7 +538,7 @@ std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(

// this needs to be done for each CzarChunkMap in the family map.
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& i");
for(auto&& [familyName, chunkMapPtr] : *newFamilyMap) {
for (auto&& [familyName, chunkMapPtr] : *newFamilyMap) {
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& j");
LOGS(_log, LOG_LVL_DEBUG, "CzarFamilyMap::makeNewMaps working on " << familyName);
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::makeNewMaps &&& k");
Expand All @@ -556,13 +552,15 @@ std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(
}

void CzarFamilyMap::insertIntoMaps(std::shared_ptr<FamilyMapType> const& newFamilyMap, string const& workerId,
string const& dbName, string const& tableName, int64_t chunkIdNum,
CzarChunkMap::SizeT sz) {
string const& dbName, string const& tableName, int64_t chunkIdNum,
CzarChunkMap::SizeT sz) {
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& a");
// Get the CzarChunkMap for this family
auto familyName = getFamilyNameFromDbName(dbName);
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& b");
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName << " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}");
LOGS(_log, LOG_LVL_INFO,
cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName
<< " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}");
auto& nfMap = *newFamilyMap;
CzarChunkMap::Ptr czarChunkMap;
LOGS(_log, LOG_LVL_WARN, "CzarFamilyMap::insertIntoMaps &&& c");
Expand Down
29 changes: 14 additions & 15 deletions src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class CzarChunkMap {
CzarChunkMap(CzarChunkMap const&) = delete;
CzarChunkMap& operator=(CzarChunkMap const&) = delete;

//static Ptr create(std::shared_ptr<qmeta::QMeta> const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); }
// static Ptr create(std::shared_ptr<qmeta::QMeta> const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); }
static Ptr create() { return Ptr(new CzarChunkMap()); }

~CzarChunkMap();
Expand Down Expand Up @@ -196,23 +196,24 @@ class CzarChunkMap {
/// @param `sz` - size in bytes of the table being inserted.
static void insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId,
std::string const& dbName, std::string const& tableName,
int64_t chunkIdNum, SizeT sz); /// &&& delete
int64_t chunkIdNum, SizeT sz); /// &&& delete

/// Calculate the total bytes in each chunk and then sort the resulting ChunkVector by chunk size,
/// descending.
static void calcChunkMap(ChunkMap const& chunkMap, ChunkVector& chunksSortedBySize);

/// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`.
static std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>, std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap); //&&& delete
makeNewMapsOld(qmeta::QMetaChunkMap const& qChunkMap); //&&& delete

/// Verify that all chunks belong to at least one worker and that all chunks are represented in shared
/// scans.
/// @throws ChunkMapException
static void verifyOld(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); // &&& delete
static void verifyOld(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); // &&& delete
void verify();

//void setMaps(std::shared_ptr<ChunkMap> const& chunkMapPtr, std::shared_ptr<WorkerChunkMap> const& wcMapPtr); // &&& delete
// void setMaps(std::shared_ptr<ChunkMap> const& chunkMapPtr, std::shared_ptr<WorkerChunkMap> const&
// wcMapPtr); // &&& delete

static std::string dumpChunkMap(ChunkMap const& chunkMap);

Expand Down Expand Up @@ -242,14 +243,12 @@ class CzarChunkMap {

/// Return shared pointers to `_chunkMap` and `_workerChunkMap`, which should be held until
/// finished with the data.
std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>,
std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>, std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
_getMaps() const {
std::lock_guard<std::mutex> lck(_mapMtx);
return {_chunkMap, _workerChunkMap};
}


/// Read the json worker list from the database and update the maps if there's a new
/// version since the `_lastUpdateTime`.
/// @throws `qmeta::QMetaError`
Expand All @@ -269,7 +268,7 @@ class CzarChunkMap {
/// The last time the maps were updated with information from the replicator.
TIMEPOINT _lastUpdateTime; // initialized to 0;
*/
mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap (&&& still needed???)
mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap (&&& still needed???)

friend CzarFamilyMap;
};
Expand Down Expand Up @@ -312,30 +311,30 @@ class CzarFamilyMap {
return _getChunkMap(familyName);
}


/// &&& doc
bool read();

/// &&& doc
/// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`.
//&&&static std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>, std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
//&&&static std::pair<std::shared_ptr<CzarChunkMap::ChunkMap>,
//std::shared_ptr<CzarChunkMap::WorkerChunkMap>>
std::shared_ptr<FamilyMapType> makeNewMaps(qmeta::QMetaChunkMap const& qChunkMap);

// &&& doc
void insertIntoMaps(std::shared_ptr<FamilyMapType> const& newFamilyMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, int64_t chunkIdNum, CzarChunkMap::SizeT sz);
void insertIntoMaps(std::shared_ptr<FamilyMapType> const& newFamilyMap, std::string const& workerId,
std::string const& dbName, std::string const& tableName, int64_t chunkIdNum,
CzarChunkMap::SizeT sz);

// &&&
static void verify(std::shared_ptr<FamilyMapType> const& familyMap);

private:

/// Try to `_read` values for maps from `qmeta`.
CzarFamilyMap(std::shared_ptr<qmeta::QMeta> const& qmeta);

/// &&& doc
bool _read();


/// Return the chunk map for the `familyName`
CzarChunkMap::Ptr _getChunkMap(std::string const& familyName) const {
std::lock_guard<std::mutex> familyLock(_familyMapMtx);
Expand All @@ -352,7 +351,7 @@ class CzarFamilyMap {
TIMEPOINT _lastUpdateTime; // initialized to 0;

std::shared_ptr<FamilyMapType const> _familyMap{new FamilyMapType()};
mutable std::mutex _familyMapMtx; ///< protects _familyMap, _timeStamp, and _qmeta.
mutable std::mutex _familyMapMtx; ///< protects _familyMap, _timeStamp, and _qmeta.
};

} // namespace lsst::qserv::czar
Expand Down
1 change: 0 additions & 1 deletion src/czar/testCzar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) {
auto dbToFamily = make_shared<czar::CzarFamilyMap::DbNameToFamilyNameType>();
czar::CzarFamilyMap czFamMap(dbToFamily);


auto jsTest1 = nlohmann::json::parse(test1);
qmeta::QMetaChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1);
//&&&auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1);
Expand Down
13 changes: 8 additions & 5 deletions src/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ namespace lsst::qserv::qdisp {

/* &&&
mutex Executive::_executiveMapMtx; ///< protects _executiveMap
map<QueryId, std::weak_ptr<Executive>> Executive::_executiveMap; ///< Map of executives for queries in progress.
map<QueryId, std::weak_ptr<Executive>> Executive::_executiveMap; ///< Map of executives for queries in
progress.
*/

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -304,11 +305,10 @@ void Executive::queueFileCollect(PriorityCommand::Ptr const& cmd) {
LOGS(_log, LOG_LVL_WARN, "&&& Executive::queueFileCollect end");
}


void Executive::runUberJob(std::shared_ptr<UberJob> const& uberJob) {
LOGS(_log, LOG_LVL_WARN, "&&& Executive::runUberJob start");

bool const useqdisppool = true; /// &&& delete
bool const useqdisppool = true; /// &&& delete
if (useqdisppool) {
auto runUberJobFunc = [uberJob](util::CmdData*) {
LOGS(_log, LOG_LVL_WARN, "&&&uj Executive::runUberJob::runUberJobFunc a");
Expand Down Expand Up @@ -454,7 +454,8 @@ void Executive::addMultiError(int errorCode, std::string const& errorMsg, int er
{
lock_guard<mutex> lock(_errorsMutex);
_multiError.push_back(err);
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) + " multiError:" << _multiError.size() << ":" << _multiError);
LOGS(_log, LOG_LVL_DEBUG,
cName(__func__) + " multiError:" << _multiError.size() << ":" << _multiError);
}
}

Expand Down Expand Up @@ -618,7 +619,9 @@ void Executive::_squashSuperfluous() {
}

void Executive::sendWorkerCancelMsg(bool deleteResults) {
LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE Executive::sendWorkerCancelMsg to send messages to workers to cancel this czarId + queryId.");
LOGS(_log, LOG_LVL_ERROR,
"&&& NEED CODE Executive::sendWorkerCancelMsg to send messages to workers to cancel this czarId + "
"queryId.");
}

int Executive::getNumInflight() const {
Expand Down
Loading

0 comments on commit 5436802

Please sign in to comment.