Skip to content

Commit

Permalink
Removed dead code.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jun 8, 2021
1 parent c0a2a2c commit 7804023
Show file tree
Hide file tree
Showing 15 changed files with 10 additions and 159 deletions.
15 changes: 0 additions & 15 deletions core/modules/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,8 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
throw Bug("MergingHandler invalid blen=" + to_string(bLen) + " from " + _wName);
}

LOGS(_log, LOG_LVL_INFO, "&&& MH::flush a");
switch(_state) {
case MsgState::HEADER_WAIT:
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush b");
_response->headerSize = static_cast<unsigned char>((*bufPtr)[0]);
if (!proto::ProtoHeaderWrap::unwrap(_response, *bufPtr)) {
std::string sErr = "From:" + _wName + "Error decoding proto header for " + getStateStr(_state);
Expand All @@ -121,29 +119,23 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
<< " endNoData=" << endNoData);

_state = MsgState::RESULT_WAIT;
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush c");
if (endNoData || nextBufSize == 0) {
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush d");
if (!endNoData || nextBufSize != 0 ) {
throw Bug("inconsistent msg termination endNoData=" + std::to_string(endNoData)
+ " nextBufSize=" + std::to_string(nextBufSize));
}
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush e");
// Nothing to merge, but some bookkeeping needs to be done.
_infileMerger->mergeCompleteFor(_jobIds);
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush f");
last = true;
_state = MsgState::RESULT_RECV;
}
}
return true;
case MsgState::RESULT_WAIT:
{
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush g");
nextBufSize = proto::ProtoHeaderWrap::getProtoHeaderSize();
auto job = getJobBase().lock();
if (!_verifyResult(bufPtr, bLen)) { return false; }
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush h");
if (!_setResult(bufPtr, bLen)) { // This sets _response->result
LOGS(_log, LOG_LVL_WARN, "setResult failure " << _wName);
return false;
Expand All @@ -155,9 +147,7 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
_jobIds.insert(jobId);
LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName);

LOGS(_log, LOG_LVL_INFO, "&&& MH::flush i");
auto success = _merge();
LOGS(_log, LOG_LVL_INFO, "&&& MH::flush j");
_response.reset(new WorkerResponse());
return success;
}
Expand Down Expand Up @@ -228,24 +218,19 @@ void MergingHandler::_initState() {
}

bool MergingHandler::_merge() {
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge a");
auto job = getJobBase().lock();
if (job != nullptr) {
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge b");
if (_flushed) {
throw Bug("MergingRequester::_merge : already flushed");
}
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge c");
bool success = _infileMerger->merge(_response);
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge d");
if (!success) {
LOGS(_log, LOG_LVL_WARN, "_merge() failed");
rproc::InfileMergerError const& err = _infileMerger->getError();
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
_state = MsgState::RESULT_ERR;
}
_response.reset();
LOGS(_log, LOG_LVL_INFO, "&&& MH::_merge end");
return success;
}
LOGS(_log, LOG_LVL_ERROR, "MergingHandler::_merge() failed, jobQuery was NULL");
Expand Down
51 changes: 0 additions & 51 deletions core/modules/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,37 +304,6 @@ void UserQuerySelect::submit() {
i != e && !_executive->getCancelled(); ++i) {
auto& chunkSpec = *i;

/* &&&
std::function<void(util::CmdData*)> funcBuildJob =
[this, sequence, // sequence must be a copy
&chunkSpec, &queryTemplates,
&chunks, &chunksMtx, &ttn,
&taskMsgFactory](util::CmdData*) {
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
qproc::ChunkQuerySpec::Ptr cs;
{
std::lock_guard<std::mutex> lock(chunksMtx);
cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec);
chunks.push_back(cs->chunkId);
}
std::string chunkResultName = ttn.make(cs->chunkId);
std::shared_ptr<ChunkMsgReceiver> cmr = ChunkMsgReceiver::newInstance(cs->chunkId, _messageStore);
ResourceUnit ru;
ru.setAsDbChunk(cs->db, cs->chunkId);
qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create(_qMetaCzarId,
_executive->getId(), sequence, ru,
std::make_shared<MergingHandler>(cmr, _infileMerger, chunkResultName),
taskMsgFactory, cs, chunkResultName);
_executive->add(jobDesc);
};
auto cmd = std::make_shared<qdisp::PriorityCommand>(funcBuildJob);
_executive->queueJobStart(cmd);
*/

// Make the JobQuery now
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);

Expand Down Expand Up @@ -368,7 +337,6 @@ void UserQuerySelect::submit() {

if (!uberJobsEnabled) {
std::function<void(util::CmdData*)> funcBuildJob =
//&&&[this, sequence, job{move(job)}](util::CmdData*) { // references in captures cause races
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
job->runJob();
Expand Down Expand Up @@ -447,56 +415,37 @@ void UserQuerySelect::submit() {
workerIter = tmpWorkerList.begin();
}
}
LOGS(_log, LOG_LVL_INFO, "&&& submit m");
_executive->addUberJobs(uberJobs);
LOGS(_log, LOG_LVL_INFO, "&&& submit n");
for (auto&& uJob:uberJobs) {
LOGS(_log, LOG_LVL_INFO, "&&& submit o");
uJob->runUberJob();
LOGS(_log, LOG_LVL_INFO, "&&& submit p");
}
LOGS(_log, LOG_LVL_INFO, "&&& submit q");
// If any chunks in the query were not found on a worker's list, run them individually.
//&&&_executive->startRemainingJobs(chunksInQuery); //&&& delete func in Executive.
for (auto& ciq:chunksInQuery) {
LOGS(_log, LOG_LVL_INFO, "&&& submit q1");
qdisp::JobQuery* jqRaw = ciq.second;
LOGS(_log, LOG_LVL_INFO, "&&& submit q2");
qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw);
LOGS(_log, LOG_LVL_INFO, "&&& submit q3");
std::function<void(util::CmdData*)> funcBuildJob =
[this, job{move(job)}](util::CmdData*) { // references in captures cause races
QSERV_LOGCONTEXT_QUERY(_qMetaQueryId);
LOGS(_log, LOG_LVL_INFO, "&&& submit q run1");
job->runJob();
LOGS(_log, LOG_LVL_INFO, "&&& submit q run2");
};
auto cmd = std::make_shared<qdisp::PriorityCommand>(funcBuildJob);
LOGS(_log, LOG_LVL_INFO, "&&& submit q4");
_executive->queueJobStart(cmd);
LOGS(_log, LOG_LVL_INFO, "&&& submit q5");
}

LOGS(_log, LOG_LVL_INFO, "&&& submit r");
}

// attempt to restore original thread priority, requires root
if (increaseThreadPriority) {
threadPriority.restoreOriginalValues();
}
LOGS(_log, LOG_LVL_INFO, "&&& submit s");

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
_executive->waitForAllJobsToStart();
LOGS(_log, LOG_LVL_INFO, "&&& submit t");

// we only care about per-chunk info for ASYNC queries
if (_async) {
LOGS(_log, LOG_LVL_INFO, "&&& submit u");
std::lock_guard<std::mutex> lock(chunksMtx);
_qMetaAddChunks(chunks);
}
LOGS(_log, LOG_LVL_INFO, "&&& submit v");
}


Expand Down
1 change: 0 additions & 1 deletion core/modules/czar/WorkerResources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ map<string, deque<int>> WorkerResources::getDequesFor(string const& dbName) {
for (auto&& elem:_workers) {
string wName = elem.first;
WorkerResource::Ptr const& wr = elem.second;
//&&& deque<int> dq = wr.getDequeFor(dbName);
dqMap.emplace(wName, wr->getDequeFor(dbName));
}
return dqMap;
Expand Down
3 changes: 2 additions & 1 deletion core/modules/czar/WorkerResources.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class WorkerResources {
public:

WorkerResources() = default;
//&&& other constructors
WorkerResources(WorkerResources const&) = delete;
WorkerResources& operator=(WorkerResources const&) = delete;

~WorkerResources() = default;

Expand Down
7 changes: 0 additions & 7 deletions core/modules/global/ResourceUnit.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@ class ResourceUnit {
class Checker;
enum UnitType {GARBAGE, DBCHUNK, CQUERY, UNKNOWN, RESULT, WORKER};

/* &&&
static std::string makeChunkResourceName(std::string const& db, int chunkId) {
std::string str = "/chk/" + db + "/" + std::to_string(chunkId);
return str;
}
*/

ResourceUnit() : _unitType(GARBAGE), _chunk(-1) {}

explicit ResourceUnit(std::string const& path);
Expand Down
7 changes: 0 additions & 7 deletions core/modules/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ void Executive::waitForAllJobsToStart() {
// @return true if query was actually started (i.e. we were not cancelled)
//
bool Executive::startQuery(shared_ptr<JobQuery> const& jobQuery) {
LOGS(_log, LOG_LVL_WARN, "&&& Executive::startQuery");
lock_guard<recursive_mutex> lock(_cancelled.getMutex());

// If we have been cancelled, then return false.
Expand Down Expand Up @@ -238,7 +237,6 @@ bool Executive::startQuery(shared_ptr<JobQuery> const& jobQuery) {
/// Return true if it was successfully added to the map.
///
bool Executive::_addJobToMap(JobQuery::Ptr const& job) {
LOGS(_log, LOG_LVL_WARN, "&&& Executive::_addJobToMap jobId=" << job->getIdInt());
auto entry = pair<int, JobQuery::Ptr>(job->getIdInt(), job);
lock_guard<recursive_mutex> lockJobMap(_jobMapMtx);
bool res = _jobMap.insert(entry).second;
Expand Down Expand Up @@ -611,7 +609,6 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) {
if (_cancelled) return false;

// Construct a temporary resource object to pass to ProcessRequest().
//&&&XrdSsiResource jobResource(jobQuery->getDescription()->resource().path(), "", jobQuery->getIdStr(), "", 0, affinity);
// Affinity should be meaningless here as there should only be one instance of each worker.
XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default;
LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource());
Expand All @@ -631,10 +628,6 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) {
}


void Executive::startRemainingJobs(ChunkIdJobMapType& remainingChunks) {
throw Bug("&&&NEED_CODE executive start remaining jobs");
}

ostream& operator<<(ostream& os, Executive::JobMap::value_type const& v) {
JobStatus::Ptr status = v.second->getStatus();
os << v.first << ": " << *status;
Expand Down
8 changes: 2 additions & 6 deletions core/modules/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ class Executive : public std::enable_shared_from_this<Executive> {
typedef std::shared_ptr<Executive> Ptr;
typedef std::unordered_map<int, std::shared_ptr<JobQuery>> JobMap;
typedef int ChunkIdType; //&&& TODO:UJ probably needs to be ResourceUnit
//&&&typedef std::map<ChunkIdType, JobQuery*> ChunkIdJobMapType;
typedef std::unordered_map<ChunkIdType, JobQuery*> ChunkIdJobMapType;

/// Construct an Executive.
Expand Down Expand Up @@ -140,10 +139,7 @@ class Executive : public std::enable_shared_from_this<Executive> {

bool startQuery(std::shared_ptr<JobQuery> const& jobQuery);

/// Start any jobs that were not started as part of UberJobs.
void startRemainingJobs(ChunkIdJobMapType& remainingJobs); // &&& delete

///&&& TODO:UJ UberJob
/// Add UbjerJobs to this user query.
void addUberJobs(std::vector<std::shared_ptr<UberJob>> const& jobsToAdd);
ChunkIdJobMapType& getChunkJobMapAndInvalidate();
bool startUberJob(std::shared_ptr<UberJob> const& uJob);
Expand Down Expand Up @@ -212,7 +208,7 @@ class Executive : public std::enable_shared_from_this<Executive> {

bool _scanInteractive = false; ///< true for interactive scans.

// &&& TODO UberJob
// Add a job to the _chunkToJobMap
void _addToChunkJobMap(std::shared_ptr<JobQuery> const& job);
/// _chunkToJobMap is created once and then destroyed when used.
std::atomic<bool> _chunkToJobMapInvalid{false}; ///< true indicates the map is no longer valid.
Expand Down
1 change: 0 additions & 1 deletion core/modules/qdisp/JobQuery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ JobQuery::~JobQuery() {
bool JobQuery::runJob() {
QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getIdInt());
LOGS(_log, LOG_LVL_DEBUG, " runJob " << *this);
LOGS(_log, LOG_LVL_WARN, "&&& runJob " << *this);
auto executive = _executive.lock();
if (executive == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "runJob failed executive==nullptr");
Expand Down
7 changes: 4 additions & 3 deletions core/modules/qdisp/JobQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class JobQuery : public JobBase {

QueryId getQueryId() const override {return _qid; }
int getIdInt() const override { return _jobDescription->id(); }
//&&& std::string getPayload() const override { return _jobDescription->payload(); }
std::string const& getPayload() const override;
std::string const& getIdStr() const override { return _idStr; }
std::shared_ptr<ResponseHandler> getRespHandler() override { return _jobDescription->respHandler(); }
Expand Down Expand Up @@ -101,8 +100,10 @@ class JobQuery : public JobBase {
JobStatus::Ptr const& jobStatus, std::shared_ptr<MarkCompleteFunc> const& markCompleteFunc,
QueryId qid);

/// &&& TODO:UJ UberJob functions
/// Set to true if this job is part of an UberJob
void setInUberJob(bool inUberJob) { _inUberJob = inUberJob; };

/// @return true if this job is part of an UberJob.
bool inUberJob() const { return _inUberJob; }

protected:
Expand Down Expand Up @@ -143,7 +144,7 @@ class JobQuery : public JobBase {

std::shared_ptr<QdispPool> _qdispPool;

/// &&& TODO:UJ UberJob
/// True if this job is part of an UberJob.
std::atomic<bool> _inUberJob{false}; ///< TODO:UJ There are probably several places this should be checked
};

Expand Down
Loading

0 comments on commit 7804023

Please sign in to comment.