Skip to content

Commit

Permalink
Code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jul 11, 2024
1 parent 5436802 commit f65bbf9
Show file tree
Hide file tree
Showing 47 changed files with 387 additions and 1,608 deletions.
67 changes: 14 additions & 53 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
lsst::qserv::qdisp::UberJob::Ptr const& uberJob, string const& httpUrl,
function<bool(char const*, uint32_t, bool&)> const& messageIsReady,
shared_ptr<http::ClientConnPool> const& httpConnPool) {
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp a");
string const context = "MergingHandler::" + string(__func__) + " " + " qid=" + uberJob->getIdStr() + " ";

LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl);
Expand Down Expand Up @@ -411,9 +410,8 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
uint32_t msgSizeBytes = 0;
bool success = true;
bool mergeSuccess = true;
LOGS(_log, LOG_LVL_WARN, context + "&&& readHttpFileAndMergeHttp b");
int headerCount = 0; // &&& del
uint64_t totalBytesRead = 0; /// &&& del
int headerCount = 0;
uint64_t totalBytesRead = 0;
try {
string const noClientData;
vector<string> const noClientHeaders;
Expand All @@ -423,27 +421,19 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
clientConfig.tcpKeepAlive = true;
clientConfig.tcpKeepIdle = 5; // the default is 60 sec
clientConfig.tcpKeepIntvl = 5; // the default is 60 sec
LOGS(_log, LOG_LVL_WARN, context + "&&& readHttpFileAndMergeHttp c");
http::Client reader(http::Method::GET, httpUrl, noClientData, noClientHeaders, clientConfig,
httpConnPool);
LOGS(_log, LOG_LVL_WARN, context + "&&& readHttpFileAndMergeHttp d");
reader.read([&](char const* inBuf, size_t inBufSize) {
// LOGS(_log, LOG_LVL_WARN, context + "&&& readHttpFileAndMergeHttp d1 reader.read
// ok");
// A value of the flag is set by the message processor when it's time to finish
// or abort reading the file.
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e next=" << (uint64_t)next << " end="
<< (uint64_t)end << " last=" << last);
while ((next < end) && !last) {
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e1 next=" << (uint64_t)next << " end="
<< (uint64_t)end << " last=" << last);
context << "TODO:UJ next=" << (uint64_t)next << " end=" << (uint64_t)end
<< " last=" << last);
if (msgSizeBytes == 0) {
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e2");
// Continue or finish reading the frame header.
size_t const bytes2read =
std::min(sizeof(uint32_t) - msgSizeBufNext, (size_t)(end - next));
Expand All @@ -453,8 +443,6 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
msgSizeBufNext += bytes2read;
if (msgSizeBufNext == sizeof(uint32_t)) {
++headerCount;
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e3 &&& headerCount=" << headerCount);
// Done reading the frame header.
msgSizeBufNext = 0;
// Parse and evaluate the message length.
Expand Down Expand Up @@ -482,15 +470,13 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
}
} else {
// Continue or finish reading the message body.
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e4");
size_t const bytes2read =
std::min((size_t)msgSizeBytes - msgBufNext, (size_t)(end - next));
std::memcpy(msgBuf.get() + msgBufNext, next, bytes2read);
next += bytes2read;
offset += bytes2read;
msgBufNext += bytes2read;
if (msgBufNext == msgSizeBytes) {
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e5");
// Done reading message body.
msgBufNext = 0;

Expand All @@ -503,67 +489,50 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
}

// Parse and evaluate the message.
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e6");
mergeSuccess = messageIsReady(msgBuf.get(), msgSizeBytes, last);
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e7 next="
<< (uint64_t)next << " end=" << (uint64_t)end << " last=" << last
<< " success=" << success);
totalBytesRead += msgSizeBytes;
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e7 headerCount="
<< headerCount << " msgSizeBytes=" << msgSizeBytes
<< " totalBytesRead=" << totalBytesRead);
if (!mergeSuccess) {
success = false;
throw runtime_error("message processing failed at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
// Reset the variable to prepare for reading the next header & message (if any).
msgSizeBytes = 0;
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e8");
} else {
LOGS(_log, LOG_LVL_WARN,
"&&&uj headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
context << " headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
}
}
}
});
LOGS(_log, LOG_LVL_WARN,
context + "&&& readHttpFileAndMergeHttp e9 headerCount="
<< headerCount << " msgSizeBytes=" << msgSizeBytes
LOGS(_log, LOG_LVL_DEBUG,
context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes
<< " totalBytesRead=" << totalBytesRead);
if (msgSizeBufNext != 0) {
throw runtime_error("short read of the message header at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e10");
if (msgBufNext != 0) {
throw runtime_error("short read of the message body at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp e11");
} catch (exception const& ex) { // &&&uj anything being caught here besides runtime_error?
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, context + " " + ex.what());
success = false;
}

LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp f");
// Remove the file from the worker if it still exists. Report and ignore errors.
// The files will be garbage-collected by workers.
try {
http::Client remover(http::Method::DELETE, httpUrl);
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp g");
remover.read([](char const* inBuf, size_t inBufSize) {});
LOGS(_log, LOG_LVL_WARN, "&&& readHttpFileAndMergeHttp h");
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
}
// If the merge failed, that indicates something went wrong in the local database table,
// is likely this user query is doomed and should be cancelled.
LOGS(_log, LOG_LVL_WARN,
"&&& readHttpFileAndMergeHttp end succes=" << success << " mergeSuccess=" << mergeSuccess);
LOGS(_log, LOG_LVL_DEBUG, context << " end succes=" << success << " mergeSuccess=" << mergeSuccess);
return {success, mergeSuccess};
}

Expand Down Expand Up @@ -726,8 +695,6 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
uint64_t& resultRows) {
bool success = false;
bool shouldCancel = false;
/// &&&uj NEED CODE
//&&& _wName = responseSummary.wname();

// This is needed to ensure the job query would be staying alive for the duration
// of the operation to prevent inconsistency within the application.
Expand All @@ -745,7 +712,7 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
// the result delivery protocol configured at the worker.
// Notify the file reader when all rows have been read by setting 'last = true'.
auto const dataMergerHttp = [&](char const* buf, uint32_t bufSize, bool& last) {
LOGS(_log, LOG_LVL_WARN, "&&& dataMergerHttp ");
LOGS(_log, LOG_LVL_TRACE, "dataMergerHttp");
last = true;
proto::ResponseData responseData;
if (responseData.ParseFromArray(buf, bufSize) && responseData.IsInitialized()) {
Expand All @@ -762,8 +729,9 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
tie(success, shouldCancel) =
::readHttpFileAndMergeHttp(uberJob, fileUrl, dataMergerHttp, MergingHandler::_getHttpConnPool());

LOGS(_log, LOG_LVL_WARN,
"&&& MergingHandler::flushHttp success=" << success << " shouldCancel=" << shouldCancel);
if (!success || shouldCancel) {
LOGS(_log, LOG_LVL_WARN, __func__ << " success=" << success << " shouldCancel=" << shouldCancel);
}

if (success) {
_infileMerger->mergeCompleteFor(uberJob->getJobId());
Expand All @@ -772,13 +740,6 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
}

void MergingHandler::flushHttpError(int errorCode, std::string const& errorMsg, int status) {
/* &&&
_error = util::Error(responseSummary.errorcode(), responseSummary.errormsg(),
util::ErrorCode::MYSQLEXEC);
_setError(ccontrol::MSG_RESULT_ERROR, _error.getMsg());
_flushError(jq);
*/

if (!_errorSet.exchange(true)) {
_error = util::Error(errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
_setError(ccontrol::MSG_RESULT_ERROR, _error.getMsg());
Expand Down
13 changes: 5 additions & 8 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,12 @@ class MergingHandler : public qdisp::ResponseHandler {
/// @return true if successful (no error)
bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override;

/// &&&uj doc see ResponseHandler::flushHttp
/// @return success - true if the operation was successful
/// @return shouldCancel - if success was false, this being true indicates there
/// was an unrecoverable error in table writing and the query
/// should be cancelled.
/// @see ResponseHandler::flushHttp
/// @see MerginHandler::_mergeHttp
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
uint64_t& resultRows) override;

/// &&&uj doc
/// @see ResponseHandler::flushHttpError
void flushHttpError(int errorCode, std::string const& errorMsg, int status) override;

/// Signal an unrecoverable error condition. No further calls are expected.
Expand Down Expand Up @@ -113,7 +110,7 @@ class MergingHandler : public qdisp::ResponseHandler {
bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jobQuery);

/// &&&uj doc
/// Call InfileMerger to do the work of merging this data to the result.
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, proto::ResponseData const& responseData);

/// Set error code and string.
Expand All @@ -130,7 +127,7 @@ class MergingHandler : public qdisp::ResponseHandler {
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
std::string _tableName; ///< Target table name
Error _error; ///< Error description
std::atomic<bool> _errorSet{false}; ///< &&& doc
std::atomic<bool> _errorSet{false}; ///< Set to true when an error is set.
mutable std::mutex _errorMutex; ///< Protect readers from partial updates
bool _flushed{false}; ///< flushed to InfileMerger?
std::string _wName{"~"}; ///< worker name
Expand Down
3 changes: 0 additions & 3 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,6 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
// First check for SUBMIT and strip it
std::string query = aQuery;

// TODO: DM-43386 need to have WorkerChunkMap info at this point
// &&&uj

std::string stripped;
bool async = false;
if (UserQueryType::isSubmit(query, stripped)) {
Expand Down
Loading

0 comments on commit f65bbf9

Please sign in to comment.