Skip to content

Commit

Permalink
Minor changes for integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jun 8, 2021
1 parent 56677ed commit c2263f7
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 22 deletions.
47 changes: 28 additions & 19 deletions core/modules/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,31 +368,24 @@ void UserQuerySelect::submit() {

if (!uberJobsEnabled) {
std::function<void(util::CmdData*)> funcBuildJob =
[this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races
//&&&[this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
job->runJob();
};
auto cmd = std::make_shared<qdisp::PriorityCommand>(funcBuildJob);
_executive->queueJobStart(cmd);

}
++sequence;
}

if (uberJobsEnabled) {
vector<qdisp::UberJob::Ptr> uberJobs;
/* &&&
vector<czar::WorkerResource> workers; // &&& delete and replace with a real list of workers
throw Bug("&&&NEED_CODE to find all workers"); // workers = all workers found in database
for (auto&& worker:workers) {
worker.fillChunkIdSet();
}
*/

czar::WorkerResources workerResources;
workerResources.setMonoNodeTest(); //&&& TODO:UJ only good for mono-node test.
workerResources.setMonoNodeTest(); //&&& TODO:UJ only good for mono-node test. Need a real list of workers and their chunks. ******

// &&& make a map of all jobs in the executive.
// Make a map of all jobs in the executive.
// &&& TODO:UJ for now, just using ints. At some point, need to check that ResourceUnit databases can be found for all databases in the query
qdisp::Executive::ChunkIdJobMapType chunksInQuery = _executive->getChunkJobMapAndInvalidate();

Expand All @@ -405,13 +398,6 @@ void UserQuerySelect::submit() {
/// make a map<worker, deque<chunkId> that will be destroyed as chunks are checked/used
map<string, deque<int>> tmpWorkerList = workerResources.getDequesFor(dbName);

/* &&&
list<std::reference_wrapper<czar::WorkerResource>> tmpWorkerList;
for(auto&& worker:workers) {
tmpWorkerList.push_back(worker);
}
*/

// TODO:UJ So UberJobIds don't conflict with chunk numbers or jobIds, start at a large number.
// This could use some refinement.
int uberJobId = qdisp::UberJob::getFirstIdNumber();
Expand Down Expand Up @@ -461,26 +447,49 @@ void UserQuerySelect::submit() {
workerIter = tmpWorkerList.begin();
}
}
LOGS(_log, LOG_LVL_INFO, "&&& submit m");
_executive->addUberJobs(uberJobs);
LOGS(_log, LOG_LVL_INFO, "&&& submit n");
for (auto&& uJob:uberJobs) {
LOGS(_log, LOG_LVL_INFO, "&&& submit o");
uJob->runUberJob();
LOGS(_log, LOG_LVL_INFO, "&&& submit p");
}
_executive->startRemainingJobs();
LOGS(_log, LOG_LVL_INFO, "&&& submit q");
// If any chunks in the query were not found on a worker's list, run them individually.
//&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive.
for (auto& ciq:chunksInQuery) {
qdisp::JobQuery* jqRaw = ciq.second;
qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw);
std::function<void(util::CmdData*)> funcBuildJob =
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
job->runJob();
};
auto cmd = std::make_shared<qdisp::PriorityCommand>(funcBuildJob);
_executive->queueJobStart(cmd);
}

LOGS(_log, LOG_LVL_INFO, "&&& submit r");
}

// attempt to restore original thread priority, requires root
if (increaseThreadPriority) {
threadPriority.restoreOriginalValues();
}
LOGS(_log, LOG_LVL_INFO, "&&& submit s");

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
_executive->waitForAllJobsToStart();
LOGS(_log, LOG_LVL_INFO, "&&& submit t");

// we only care about per-chunk info for ASYNC queries
if (_async) {
LOGS(_log, LOG_LVL_INFO, "&&& submit u");
std::lock_guard<std::mutex> lock(chunksMtx);
_qMetaAddChunks(chunks);
}
LOGS(_log, LOG_LVL_INFO, "&&& submit v");
}


Expand Down
16 changes: 15 additions & 1 deletion core/modules/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ bool Executive::_addJobToMap(JobQuery::Ptr const& job) {
return res;
}


JobQuery::Ptr Executive::getSharedPtrForRawJobPtr(JobQuery* jqRaw) {
assert(jqRaw != nullptr);
int jobId = jqRaw->getIdInt();
lock_guard<recursive_mutex> lockJobMap(_jobMapMtx);
auto iter = _jobMap.find(jobId);
if (iter == _jobMap.end()) {
throw Bug("Could not find the entry for jobId=" + to_string(jobId));
}
JobQuery::Ptr jq = iter->second;
return jq;
}


bool Executive::join() {
// To join, we make sure that all of the chunks added so far are complete.
// Check to see if _requesters is empty, if not, then sleep on a condition.
Expand Down Expand Up @@ -616,7 +630,7 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) {
}


void Executive::startRemainingJobs() {
void Executive::startRemainingJobs(ChunkIdJobMapType& remainingChunks) {
throw Bug("&&&NEED_CODE executive start remaining jobs");
}

Expand Down
3 changes: 2 additions & 1 deletion core/modules/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ class Executive : public std::enable_shared_from_this<Executive> {
bool startQuery(std::shared_ptr<JobQuery> const& jobQuery);

/// Start any jobs that were not started as part of UberJobs.
void startRemainingJobs();
void startRemainingJobs(ChunkIdJobMapType& remainingJobs); // &&& delete

///&&& TODO:UJ UberJob
void addUberJobs(std::vector<std::shared_ptr<UberJob>> const& jobsToAdd);
ChunkIdJobMapType& getChunkJobMapAndInvalidate();
bool startUberJob(std::shared_ptr<UberJob> const& uJob);
std::shared_ptr<JobQuery> getSharedPtrForRawJobPtr(JobQuery* jqRaw);

private:
Executive(ExecutiveConfig const& c, std::shared_ptr<MessageStore> const& ms,
Expand Down
18 changes: 17 additions & 1 deletion core/modules/qdisp/UberJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ UberJob::UberJob(Executive::Ptr const& executive,
: JobBase(), _executive(executive), _respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId),
_czarId(czarId), _idStr("QID=" + to_string(_queryId) + ":uber=" + to_string(uberJobId)) {
_qdispPool = executive->getQdispPool();
_jobStatus = make_shared<JobStatus>();
}


Expand All @@ -77,6 +78,7 @@ bool UberJob::addJob(JobQuery* job) {

bool UberJob::runUberJob() {
QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getIdInt());
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob a");
// Build the uberjob payload.
// TODO:UJ For simplicity in the first pass, just make a TaskMsg for each Job and append it to the UberJobMsg.
// This is terribly inefficient and should be replaced by using a template and list of chunks that the
Expand All @@ -86,22 +88,32 @@ bool UberJob::runUberJob() {
proto::UberJobMsg* ujMsg = google::protobuf::Arena::CreateMessage<proto::UberJobMsg>(&arena);
ujMsg->set_queryid(getQueryId());
ujMsg->set_czarid(_czarId);
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b");
for (auto&& job:_jobs) {
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b1");
proto::TaskMsg* tMsg = ujMsg->add_taskmsgs();
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b2");
job->getDescription()->fillTaskMsg(tMsg);
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob b3");
}
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob c");
ujMsg->SerializeToString(&_payload);
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob d");
}

LOGS(_log, LOG_LVL_INFO, "&&& runUberJob e");
auto executive = _executive.lock();
if (executive == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "runUberJob failed executive==nullptr");
return false;
}
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob f");
bool cancelled = executive->getCancelled();
bool handlerReset = _respHandler->reset();
bool started = _started.exchange(true);
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob g");
if (!cancelled && handlerReset && !started) {
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob h");
auto criticalErr = [this, &executive](std::string const& msg) {
LOGS(_log, LOG_LVL_ERROR, msg << " "
<< *this << " Canceling user query!");
Expand All @@ -114,16 +126,20 @@ bool UberJob::runUberJob() {
return false;
}

LOGS(_log, LOG_LVL_INFO, "&&& runUberJob i");
// At this point we are all set to actually run the queries. We create a
// a shared pointer to this object to prevent it from escaping while we
// are trying to start this whole process. We also make sure we record
// whether or not we are in SSI as cancellation handling differs.
//
LOGS(_log, LOG_LVL_TRACE, "runJob calls StartQuery()");
LOGS(_log, LOG_LVL_TRACE, "runUberJob calls StartQuery()");
std::shared_ptr<UberJob> uJob(dynamic_pointer_cast<UberJob>(shared_from_this()));
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob j");
_inSsi = true;
if (executive->startUberJob(uJob)) {
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob k");
_jobStatus->updateInfo(_idStr, JobStatus::REQUEST);
LOGS(_log, LOG_LVL_INFO, "&&& runUberJob l");
return true;
}
_inSsi = false;
Expand Down
1 change: 1 addition & 0 deletions core/modules/qproc/TaskMsgFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const&
_addFragment(*taskMsg, resultTable, chunkQuerySpec.subChunkTables,
chunkQuerySpec.subChunkIds, chunkQuerySpec.queries);
}
LOGS(_log, LOG_LVL_WARN, "&&& _makeMsg end chunkId=" << chunkQuerySpec.chunkId);
return true;
}

Expand Down

0 comments on commit c2263f7

Please sign in to comment.