Skip to content

Commit

Permalink
Integration tests working.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jun 8, 2021
1 parent c2263f7 commit c0a2a2c
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 28 deletions.
18 changes: 17 additions & 1 deletion core/modules/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
throw Bug("MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName);
}

LOGS(_log, LOG_LVL_INFO, "&&& MH::flush a");
switch(_state) {
case MsgState::HEADER_WAIT:
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush b");
_response->headerSize = static_cast<unsigned char>((*bufPtr)[0]);
if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) {
std::string sErr = "From:" + _wName + "Error decoding proto header for " + getStateStr(_state);
Expand All @@ -119,23 +121,29 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
<< " endNoData=" << endNoData);

_state = MsgState::RESULT_WAIT;
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush c");
if (endNoData || nextBufSize == 0) {
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush d");
if (!endNoData || nextBufSize != 0 ) {
throw Bug("inconsistent msg termination endNoData=" + std::to_string(endNoData)
+ " nextBufSize=" + std::to_string(nextBufSize));
}
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush e");
// Nothing to merge, but some bookkeeping needs to be done.
_infileMerger->mergeCompleteFor(_jobIds);
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush f");
last = true;
_state = MsgState::RESULT_RECV;
}
}
return true;
case MsgState::RESULT_WAIT:
{
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush g");
nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize();
auto job = getJobBase().lock();
if (!_verifyResult(bufPtr, bLen)) { return false; }
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush h");
if (!_setResult(bufPtr, bLen)) { // This sets _response->result
LOGS(_log, LOG_LVL_WARN, "setResult failure " << _wName);
return false;
Expand All @@ -147,7 +155,9 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
_jobIds.insert(jobId);
LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName);

LOGS(_log, LOG_LVL_INFO, "&&& MH::flush i");
auto success = _merge();
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush j");
_response.reset(new WorkerResponse());
return success;
}
Expand Down Expand Up @@ -218,18 +228,24 @@ void MergingHandler::_initState() {
}

bool MergingHandler::_merge() {
if (auto job = getJobBase().lock()) {
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge a");
auto job = getJobBase().lock();
if (job != nullptr) {
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge b");
if (_flushed) {
throw Bug("MergingRequester::_merge : already flushed");
}
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge c");
bool success = _infileMerger->merge(_response);
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge d");
if (!success) {
LOGS(_log, LOG_LVL_WARN, "_merge() failed");
rproc::InfileMergerError const& err = _infileMerger->getError();
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
_state = MsgState::RESULT_ERR;
}
_response.reset();
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge end");
return success;
}
LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL");
Expand Down
13 changes: 10 additions & 3 deletions core/modules/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,12 @@ void UserQuerySelect::submit() {
std::shared_ptr<ChunkMsgReceiver> cmr = ChunkMsgReceiver::newInstance(uberJobId, _messageStore);
auto respHandler = std::make_shared<MergingHandler>(cmr, _infileMerger, uberResultName);

string workerResourceName = workerIter->first;
deque<int>& dq = workerIter->second;
auto uJob = qdisp::UberJob::create(_executive, respHandler, _qMetaQueryId,
uberJobId++, _qMetaCzarId);
uberJobId++, _qMetaCzarId, workerResourceName);

int chunksInUber = 0;
deque<int>& dq = workerIter->second;

while (!dq.empty() && !chunksInQuery.empty() && chunksInUber < maxChunksPerUber) {
int chunkIdWorker = dq.front();
dq.pop_front();
Expand Down Expand Up @@ -459,15 +459,22 @@ void UserQuerySelect::submit() {
// If any chunks in the query were not found on a worker's list, run them individually.
//&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive.
for (auto& ciq:chunksInQuery) {
LOGS(_log, LOG_LVL_INFO, "&&& submit q1");
qdisp::JobQuery* jqRaw = ciq.second;
LOGS(_log, LOG_LVL_INFO, "&&& submit q2");
qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw);
LOGS(_log, LOG_LVL_INFO, "&&& submit q3");
std::function<void(util::CmdData*)> funcBuildJob =
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
LOGS(_log, LOG_LVL_INFO, "&&& submit q run1");
job->runJob();
LOGS(_log, LOG_LVL_INFO, "&&& submit q run2");
};
auto cmd = std::make_shared<qdisp::PriorityCommand>(funcBuildJob);
LOGS(_log, LOG_LVL_INFO, "&&& submit q4");
_executive->queueJobStart(cmd);
LOGS(_log, LOG_LVL_INFO, "&&& submit q5");
}

LOGS(_log, LOG_LVL_INFO, "&&& submit r");
Expand Down
2 changes: 1 addition & 1 deletion core/modules/czar/WorkerResources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ map<string, deque<int>> WorkerResources::getDequesFor(string const& dbName) {


void WorkerResources::setMonoNodeTest() {
string wName("/worker/worker");
string wName("/worker/5257fbab-c49c-11eb-ba7a-1856802308a2");
std::lock_guard<std::mutex> lg(_workerMapMtx);
_insertWorker(wName);
auto iter = _workers.find(wName);
Expand Down
3 changes: 2 additions & 1 deletion core/modules/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,8 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) {
//&&&XrdSsiResource jobResource(jobQuery->getDescription()->resource().path(), "", jobQuery->getIdStr(), "", 0, affinity);
// Affinity should be meaningless here as there should only be one instance of each worker.
XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default;
XrdSsiResource uJobResource(uJob->workerResource, "", uJob->getIdStr(), "", 0, affinity);
LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource());
XrdSsiResource uJobResource(uJob->getWorkerResource(), "", uJob->getIdStr(), "", 0, affinity);

// Now construct the actual query request and tie it to the jobQuery. The
// shared pointer is used by QueryRequest to keep itself alive, sloppy design.
Expand Down
31 changes: 21 additions & 10 deletions core/modules/qdisp/QueryRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ QueryRequest::~QueryRequest() {
// content of request data
char* QueryRequest::GetRequest(int& requestLength) {
QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid);
LOGS(_log, LOG_LVL_INFO, "&&& QueryRequest::GetRequest");
lock_guard<mutex> lock(_finishStatusMutex);
auto jq = _job;
if (_finishStatus != ACTIVE || jq == nullptr) {
Expand Down Expand Up @@ -448,7 +449,7 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast)
ResponseHandler::BufPtr bufPtr = _askForResponseDataCmd->getBufPtr();
_askForResponseDataCmd.reset(); // No longer need it, and don't want the destructor calling _errorFinish().


LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData a");
int const protoHeaderSize = proto::ProtoHeaderWrap::getProtoHeaderSize();
ResponseHandler::BufPtr nextHeaderBufPtr;

Expand All @@ -463,19 +464,20 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast)
// - The first (bytes = blen - ProtoHeaderWrap::getProtheaderSize())
// is the result associated with the previously received header.
// - The second is the header for the next message.

int respSize = blen - protoHeaderSize;
nextHeaderBufPtr = make_shared<vector<char>>(bufPtr->begin() + respSize, bufPtr->end());

LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData b");
// Read the result
/* &&& rebase
flushOk = jq->getDescription()->respHandler()->flush(respSize, bufPtr, last,
largeResult, nextBufSize);
*/
bool largeResult = false;
int nextBufSize = 0;
bool last = false;
bool flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize);
//&&& bool largeResult = false;
//&&& int nextBufSize = 0;
//&&& bool last = false;
// Values for last, largeResult, and nextBufSize filled in by flush
flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize);
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData c");
if (last) {
// Last should only be true when the header is read, not the result.
throw Bug("_processData result had 'last' true, which cannot be allowed.");
Expand All @@ -486,7 +488,7 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast)
throw Bug("Unexpected header size from flush(result) call QID="
+ to_string(_qid) + "#" + to_string(_jobid));
}

LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData d");
if (!flushOk) {
_flushError(jq);
return;
Expand All @@ -497,11 +499,14 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast)
// Values for largeResult, last, and nextBufSize will be filled in by flush().
flushOk = jq->getDescription()->respHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last,
largeResult, nextBufSize);
*/
largeResult = false;
nextBufSize = 0;
*/
// Values for last, largeResult, and nextBufSize filled in by flush
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData e");
flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize);

LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData f");
if (largeResult) {
if (!_largeResult) LOGS(_log, LOG_LVL_DEBUG, "holdState largeResult set to true");
_largeResult = true; // Once the worker indicates it's a large result, it stays that way.
Expand All @@ -512,22 +517,28 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast)
LOGS(_log, LOG_LVL_DEBUG, "processData disagreement between last=" << last
<< " and xrdLast=" << xrdLast);
}
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData g");
if (last) {
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData h");
jq->getStatus()->updateInfo(_jobIdStr, JobStatus::COMPLETE);
_finish();
// At this point all blocks for this job have been read, there's no point in
// having XrdSsi wait for anything.
return;
} else {
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData i");
_askForResponseDataCmd = make_shared<AskForResponseDataCmd>(shared_from_this(), jq, nextBufSize);
LOGS(_log, LOG_LVL_DEBUG, "queuing askForResponseDataCmd bufSize=" << nextBufSize);
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData j");
_queueAskForResponse(_askForResponseDataCmd, jq, false);
}
} else {
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData k");
LOGS(_log, LOG_LVL_WARN, "flushOk = false");
_flushError(jq);
return;
}
LOGS(_log, LOG_LVL_INFO, "&&&QueryRequest::_processData end");
return;
}

Expand Down
14 changes: 9 additions & 5 deletions core/modules/qdisp/UberJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ namespace qdisp {

UberJob::Ptr UberJob::create(Executive::Ptr const& executive,
std::shared_ptr<ResponseHandler> const& respHandler,
int queryId, int uberJobId, qmeta::CzarId czarId) {
UberJob::Ptr uJob(new UberJob(executive, respHandler, queryId, uberJobId, czarId));
int queryId, int uberJobId, qmeta::CzarId czarId, string const& workerResource) {
UberJob::Ptr uJob(new UberJob(executive, respHandler, queryId, uberJobId, czarId, workerResource));
uJob->_setup();
return uJob;
}

UberJob::UberJob(Executive::Ptr const& executive,
std::shared_ptr<ResponseHandler> const& respHandler,
int queryId, int uberJobId, qmeta::CzarId czarId)
: JobBase(), _executive(executive), _respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId),
int queryId, int uberJobId, qmeta::CzarId czarId, string const& workerResource)
: JobBase(), _workerResource(workerResource), _executive(executive),
_respHandler(respHandler), _queryId(queryId), _uberJobId(uberJobId),
_czarId(czarId), _idStr("QID=" + to_string(_queryId) + ":uber=" + to_string(uberJobId)) {
_qdispPool = executive->getQdispPool();
_jobStatus = make_shared<JobStatus>();
Expand Down Expand Up @@ -182,13 +184,15 @@ void UberJob::callMarkCompleteFunc(bool success) {
throw Bug("&&&NEED_CODE may need code to properly handle failed uberjob");
}
for (auto&& job:_jobs) {
string idStr = job->getIdStr();
job->getStatus()->updateInfo(idStr, JobStatus::COMPLETE);
job->callMarkCompleteFunc(success);
}
}


std::ostream& UberJob::dumpOS(std::ostream &os) const {
os << "(workerResource=" << workerResource
os << "(workerResource=" << _workerResource
<< " jobs sz=" << _jobs.size() << "(";
for (auto const& job:_jobs) {
JobDescription::Ptr desc = job->getDescription();
Expand Down
16 changes: 11 additions & 5 deletions core/modules/qdisp/UberJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UberJob : public JobBase {

static Ptr create(Executive::Ptr const& executive,
std::shared_ptr<ResponseHandler> const& respHandler,
int queryId, int uberJobId, qmeta::CzarId czarId);
int queryId, int uberJobId, qmeta::CzarId czarId, std::string const& workerResource);
UberJob() = delete;
UberJob(UberJob const&) = delete;
UberJob& operator=(UberJob const&) = delete;
Expand Down Expand Up @@ -73,17 +73,22 @@ class UberJob : public JobBase {

bool verifyPayload() const;

std::string getWorkerResource() { return _workerResource; }

/// &&& TODO:UJ may not need,
void prepScrubResults();

std::string workerResource; // TODO:UJ make private

std::ostream& dumpOS(std::ostream &os) const override;

private:
UberJob(Executive::Ptr const& executive,
std::shared_ptr<ResponseHandler> const& respHandler,
int queryId, int uberJobId, qmeta::CzarId czarId);
int queryId, int uberJobId, qmeta::CzarId czarId, std::string const& workerResource);

void _setup() {
JobBase::Ptr jbPtr = shared_from_this();
_respHandler->setJobQuery(jbPtr);
}

std::vector<JobQuery*> _jobs;
std::atomic<bool> _started{false};
Expand All @@ -93,7 +98,8 @@ class UberJob : public JobBase {
std::shared_ptr<QueryRequest> _queryRequestPtr;
std::mutex _qrMtx;

std::string _payload; ///< XrdSsi message to be sent to the worker resource.
std::string const _workerResource;
std::string _payload; ///< XrdSsi message to be sent to the _workerResource.

std::weak_ptr<Executive> _executive;
std::shared_ptr<ResponseHandler> _respHandler;
Expand Down
4 changes: 4 additions & 0 deletions core/modules/wbase/SendChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ void SendChannelShared::setTaskCount(int taskCount) {
_taskCount = taskCount;
}

void SendChannelShared::incrTaskCountBy(int partialCount) {
_taskCount += partialCount;
}


bool SendChannelShared::transmitTaskLast(StreamGuard sLock, bool inLast) {
/// _caller must have locked _streamMutex before calling this.
Expand Down
8 changes: 7 additions & 1 deletion core/modules/wbase/SendChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ namespace wbase {
/// A class that provides a SendChannel object with synchronization so it can be
/// shared by across multiple threads. Due to what may be sent, the synchronization locking
/// is needs to be available outside of the class.
/// Note: Tasks on a SendChannelShared cannot start processing until the total number of
/// Tasks using the SendChannelShared is know. Otherwise, there is a race condition
/// which could close the channel too soon.
class SendChannelShared {
public:
using Ptr = std::shared_ptr<SendChannelShared>;
Expand Down Expand Up @@ -95,6 +98,9 @@ class SendChannelShared {
/// This should not be changed once set.
void setTaskCount(int taskCount);

/// All of the tasks that use this SendChannel must be added
/// to the scheduler queue at the same time or it risks a race condition.
void incrTaskCountBy(int subCount);

/// Try to transmit the data in tData.
/// If the queue already has at least 2 TransmitData objects, addTransmit
Expand Down Expand Up @@ -151,7 +157,7 @@ class SendChannelShared {
/// metadata buffer. Once set, it cannot change until after Finish() has been called.
std::string _metadataBuf;

int _taskCount = 0; ///< The number of tasks to be sent over this SendChannel.
std::atomic<int> _taskCount{0}; ///< The number of tasks to be sent over this SendChannel.
int _lastCount = 0; ///< Then number of 'last' buffers received.
std::atomic<bool> _lastRecvd{false}; ///< The truly 'last' transmit message is in the queue.
std::atomic<bool> _firstTransmit{true}; ///< True until the first transmit has been sent.
Expand Down
5 changes: 4 additions & 1 deletion core/modules/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ std::vector<Task::Ptr> Task::createTasks(proto::TaskMsg const& taskMsg,
vect.push_back(task);
}
} else {
LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks queryStr=" << queryStr);
auto task = std::make_shared<wbase::Task>(taskMsg, queryStr, fragNum, sendChannel, gArena, rmLock);
//TODO: Maybe? Is it better to move fragment info from
// ChunkResource getResourceFragment(int i) to here???
Expand All @@ -176,7 +177,9 @@ std::vector<Task::Ptr> Task::createTasks(proto::TaskMsg const& taskMsg,

}
}
sendChannel->setTaskCount(vect.size());
LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks vect.size=" << vect.size());
//&&&sendChannel->setTaskCount(vect.size());
sendChannel->incrTaskCountBy(vect.size());
return vect;
}

Expand Down
Loading

0 comments on commit c0a2a2c

Please sign in to comment.