Skip to content

Commit

Permalink
Added code for testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jun 18, 2021
1 parent 0356e4f commit 26d928f
Show file tree
Hide file tree
Showing 24 changed files with 325 additions and 95 deletions.
4 changes: 3 additions & 1 deletion admin/templates/configuration/etc/log4cxx.czar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ log4j.appender.FILE.layout.conversionPattern=%d{yyyy-MM-ddTHH:mm:ss.SSSZ} %X %-5
#log4j.logger.lsst.qserv.qproc=DEBUG
#log4j.logger.lsst.qserv.util=DEBUG
#log4j.logger.lsst.qserv.qana=DEBUG
log4j.logger.lsst.qserv.xrdssi.msgs=WARN
#log4j.logger.lsst.qserv.xrdssi.msgs=WARN //&&&
log4j.logger.lsst.qserv.xrdssi.msgs=DEBUG

3 changes: 2 additions & 1 deletion admin/templates/configuration/etc/log4cxx.worker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] %X %-5p %c{2} (%F:%L) - %m%n

log4j.logger.lsst.qserv.xrdssi.msgs=WARN
#log4j.logger.lsst.qserv.xrdssi.msgs=WARN &&&
log4j.logger.lsst.qserv.xrdssi.msgs=DEBUG
6 changes: 4 additions & 2 deletions admin/templates/configuration/etc/qserv-czar.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,16 @@ qMetaSecsBetweenChunkCompletionUpdates = 59
# Please see qdisp/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
#poolSize = 50
poolSize = 1500
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
#vectRunSizes = 50:50:50:50
vectRunSizes = 1500:1500:1500:1500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3

Expand Down
6 changes: 5 additions & 1 deletion core/modules/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ void UserQuerySelect::submit() {
// &&& TODO:UJ skipping in proof of concept: get a list of all databases in jobs (all jobs should use the same databases) Use this to check for conflicts

// assign jobs to uberJobs
int maxChunksPerUber = 10;
int maxChunksPerUber = 50;
// keep cycling through workers until no more chunks to place.

/// make a map<worker, deque<chunkId> that will be destroyed as chunks are checked/used
Expand Down Expand Up @@ -404,6 +404,7 @@ void UserQuerySelect::submit() {
}
}

//LOGS(_log, LOG_LVL_INFO, "&&& making UberJob " << uberResultName << " chunks=" << chunksInUber);
if (chunksInUber > 0) {
uberJobs.push_back(uJob);
}
Expand All @@ -424,10 +425,13 @@ void UserQuerySelect::submit() {
for (auto&& uJob:uberJobs) {
uJob->runUberJob();
}
LOGS(_log, LOG_LVL_INFO, "&&& All UberJobs sent.");
// If any chunks in the query were not found on a worker's list, run them individually.
for (auto& ciq:chunksInQuery) {
LOGS(_log, LOG_LVL_INFO, "&&& running remaining jobs ");
qdisp::JobQuery* jqRaw = ciq.second;
qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw);
LOGS(_log, LOG_LVL_INFO, "&&& running remaining jobs " << job->getIdStr());
std::function<void(util::CmdData*)> funcBuildJob =
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
Expand Down
54 changes: 51 additions & 3 deletions core/modules/czar/WorkerResourceLists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fstream>
#include <map>
#include <set>
#include <list>

// Third-party headers

Expand Down Expand Up @@ -188,13 +189,15 @@ bool WorkerResourceLists::readIn(std::string const& fName) {
map<string, deque<string>> resourceMap;
if (fs.is_open()){
string line;
map<string, set<int>> tmpMap;
while(getline(fs, line)) {
auto pos = line.find_first_of(" ");
string wNameShort = line.substr(0, pos);
string chunkIdStr = line.substr(pos+1);
int chunkId = std::stoi(chunkIdStr);
LOGS(_log, LOG_LVL_INFO, "&&& line='" << line << "' name=" << wNameShort << " chunk=" << chunkIdStr << " c=" << chunkId);
//LOGS(_log, LOG_LVL_INFO, "&&& line='" << line << "' name=" << wNameShort << " chunk=" << chunkIdStr << " c=" << chunkId);

/* &&&
// Avoid making duplicate chunk entries
auto ret = foundChunks.insert(chunkId);
bool elementWasInserted = ret.second;
Expand All @@ -203,15 +206,61 @@ bool WorkerResourceLists::readIn(std::string const& fName) {
set<int>& chunkSet = workerChunkMap[wNameShort];
chunkSet.insert(chunkId);
}
*/
// need to add entry to the worker
tmpMap[wNameShort].insert(chunkId);

}

// Try to make fairly even distribution accross workers.
list<string> workerNames;
for (auto const& elem:tmpMap) {
string name = elem.first;
workerNames.push_back(name);
}

while (!workerNames.empty()) {
for (auto wIter = workerNames.begin(); wIter != workerNames.end();) {
auto wIterAdvanced = false;
string wName = *wIter;
set<int>& tmpChunkSet = tmpMap[wName];
set<int>& chunkSet = workerChunkMap[wName];
bool done = false;
auto cIter = tmpChunkSet.begin();
for (int j=0; j<10 && !done;) {
if (cIter == tmpChunkSet.end()) {
// no more chunks on this worker
done = true;
LOGS(_log, LOG_LVL_INFO, "&&& worker empty " << wName << " elemCount=" << chunkSet.size());
auto wIterToDel = wIter++;
wIterAdvanced = true;
workerNames.erase(wIterToDel);
} else {
int chunkId = *cIter;
auto ret = foundChunks.insert(chunkId);
bool elementInserted = ret.second;
if (elementInserted) {
++j;
chunkSet.insert(chunkId);
}
auto cIterToDel = cIter++;
tmpChunkSet.erase(cIterToDel);
}
}
if (!wIterAdvanced) ++wIter;
}
}



// At this point, there's a map of short worker names and integer chunkIds.
// This needs to be turned in to a map of sets of chunk resource name keyed by
// worker resource name.
LOGS(_log, LOG_LVL_INFO, "&&& workerChunkMap sz=" << workerChunkMap.size());
for (auto const& elem:workerChunkMap) {
string shortName = elem.first;
set<int> const& chunkInts = elem.second;
string workerResourceN = "/worker/worker-" + shortName;
string workerResourceN = "/worker/" + shortName;
auto& chunkStrs = resourceMap[workerResourceN];
for (int j:chunkInts) {
string chunkResourceN = "/chk/wise_01/" + to_string(j);
Expand All @@ -235,7 +284,6 @@ bool WorkerResourceLists::readIn(std::string const& fName) {
deque<string> const& dq = elem.second;
for(auto const& res:dq) {
wr->insert(res);
LOGS(_log, LOG_LVL_INFO, "&&& wName=" << wName << " res=" << res);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/modules/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ message TaskMsg {
message UberJobMsg {
required uint64 queryid = 1;
required uint32 czarid = 2;
repeated TaskMsg taskMsgs = 3;
required uint32 uberjobid = 3;
repeated TaskMsg taskmsgs = 4;
required uint32 magicnumber = 5;
}

// Result message received from worker
Expand Down
6 changes: 3 additions & 3 deletions core/modules/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ bool Executive::join() {
}

void Executive::markCompleted(int jobId, bool success) {
LOGS(_log, LOG_LVL_WARN, "&&& Executive::markCompleted jobId=" << jobId << " success=" << success);
//LOGS(_log, LOG_LVL_WARN, "&&& Executive::markCompleted jobId=" << jobId << " success=" << success);
ResponseHandler::Error err;
string idStr = QueryIdHelper::makeIdStr(_id, jobId);
LOGS(_log, LOG_LVL_DEBUG, "Executive::markCompleted " << success);
Expand Down Expand Up @@ -571,7 +571,7 @@ void Executive::_waitAllUntilEmpty() {
void Executive::_addToChunkJobMap(JobQuery::Ptr const& job) {
int chunkId = job->getDescription()->resource().chunk();
auto entry = pair<ChunkIdType, JobQuery*>(chunkId, job.get());
LOGS(_log, LOG_LVL_WARN, "&&& _addToChunkJobMap chunkId=" << chunkId);
//LOGS(_log, LOG_LVL_WARN, "&&& _addToChunkJobMap chunkId=" << chunkId);
lock_guard<mutex> lck(_chunkToJobMapMtx);
if (_chunkToJobMapInvalid) {
throw Bug("map insert FAILED, map is already invalid");
Expand Down Expand Up @@ -611,7 +611,7 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) {
// Construct a temporary resource object to pass to ProcessRequest().
// Affinity should be meaningless here as there should only be one instance of each worker.
XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default;
LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource());
LOGS(_log, LOG_LVL_INFO, "&&& startUberJob uJob->workerResource=" << uJob->getWorkerResource());
XrdSsiResource uJobResource(uJob->getWorkerResource(), "", uJob->getIdStr(), "", 0, affinity);

// Now construct the actual query request and tie it to the jobQuery. The
Expand Down
11 changes: 10 additions & 1 deletion core/modules/qdisp/QdispPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

// Qserv headers
#include "util/common.h"
#include "util/InstanceCount.h" //&&&

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.qdisp.QdispPool");
Expand Down Expand Up @@ -70,6 +71,7 @@ void PriorityQueue::queCmd(util::Command::Ptr const& cmd) {

void PriorityQueue::queCmd(PriorityCommand::Ptr const& cmd, int priority) {
{
//util::InstanceCount ic("PQ:queCmd&&&");
std::lock_guard<std::mutex> lock(_mtx);
auto iter = _queues.find(priority);
if (iter == _queues.end()) {
Expand All @@ -82,25 +84,28 @@ void PriorityQueue::queCmd(PriorityCommand::Ptr const& cmd, int priority) {
}
}
cmd->_priority = priority;
//LOGS (_log, LOG_LVL_INFO, "&&&priQue p=" << priority << _statsStr());
iter->second->queCmd(cmd);
LOGS (_log, LOG_LVL_DEBUG, "priQue p=" << priority << _statsStr());
_changed = true;
}
_cv.notify_one();
_cv.notify_one(); //&&&
}


std::atomic<unsigned int> localLogLimiter(0);

util::Command::Ptr PriorityQueue::getCmd(bool wait){
//util::InstanceCount ic("PQ:getCmd&&&");
util::Command::Ptr ptr;
std::unique_lock<std::mutex> uLock(_mtx);
while (true) {
_changed = false;
++localLogLimiter;
// Log this every once in while to INFO so there's some idea of system
// load without generating crushing amounts of log messages.
if (localLogLimiter % 500 == 0) {
if (true) { //&&& localLogLimiter % 500 == 0) { // &&& revert
LOGS(_log, LOG_LVL_INFO, "priQueGet " << _statsStr());
} else {
LOGS(_log, LOG_LVL_DEBUG, "priQueGet " << _statsStr());
Expand All @@ -115,6 +120,8 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){
if (que->running < que->getMinRunning()) {
ptr = que->getCmd(false); // no wait
if (ptr != nullptr) {
_changed = true;
_cv.notify_one();
return ptr;
}
}
Expand All @@ -130,13 +137,15 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){
if (ptr != nullptr) {
_changed = true;
_cv.notify_one();
_cv.notify_one(); //&&&
return ptr;
}
}
}

// If nothing was found, wait or return nullptr.
if (wait) {
//util::InstanceCount icWait("PQ:getCmd&&&WAIT");
LOGS (_log, LOG_LVL_DEBUG, "getCmd wait " << _statsStr());
_cv.wait(uLock, [this](){ return _changed; });
} else {
Expand Down
Loading

0 comments on commit 26d928f

Please sign in to comment.