Skip to content

Commit

Permalink
Merge pull request #856 from lsst/tickets/DM-43386
Browse files Browse the repository at this point in the history
Tickets/dm 43386
  • Loading branch information
jgates108 authored Aug 1, 2024
2 parents 04b828f + 5a502cc commit 79bafad
Show file tree
Hide file tree
Showing 86 changed files with 4,259 additions and 812 deletions.
1 change: 1 addition & 0 deletions src/admin/python/lsst/qserv/admin/itest.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,7 @@ def compareQueryResults(run_cases: List[str], outputs_dir: str) -> List[ITestCas
if not os.path.exists(os.path.join(outputs_dir, case)):
_log.warn("There are no query results to compare for %s", case)
continue

comparisons = (
(query_mode_mysql, query_mode_qserv_attached),
(query_mode_mysql, query_mode_qserv_detached),
Expand Down
2 changes: 2 additions & 0 deletions src/admin/python/lsst/qserv/admin/qservCli/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ def cmake(
build_image,
"cmake",
"..",
"-DCMAKE_BUILD_TYPE=Debug"
]
# "-DCMAKE_BUILD_TYPE=Debug"
if dry:
print(" ".join(args))
return
Expand Down
2 changes: 2 additions & 0 deletions src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key

# Must match MetaModule::version in http/MetaModule.cc
self.repl_api_version = 35
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

Expand Down
242 changes: 237 additions & 5 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "proto/worker.pb.h"
#include "qdisp/CzarStats.h"
#include "qdisp/JobQuery.h"
#include "qdisp/UberJob.h"
#include "rproc/InfileMerger.h"
#include "util/Bug.h"
#include "util/common.h"
Expand Down Expand Up @@ -360,7 +361,7 @@ bool readHttpFileAndMerge(string const& httpUrl,
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, ex.what());
LOGS(_log, LOG_LVL_ERROR, string(__func__) + " " + ex.what());
success = false;
}

Expand All @@ -375,6 +376,166 @@ bool readHttpFileAndMerge(string const& httpUrl,
return success;
}

std::tuple<bool, bool> readHttpFileAndMergeHttp(
lsst::qserv::qdisp::UberJob::Ptr const& uberJob, string const& httpUrl,
function<bool(char const*, uint32_t, bool&)> const& messageIsReady,
shared_ptr<http::ClientConnPool> const& httpConnPool) {
string const context = "MergingHandler::" + string(__func__) + " " + " qid=" + uberJob->getIdStr() + " ";

LOGS(_log, LOG_LVL_DEBUG, context << "httpUrl=" << httpUrl);

// Track the file while the control flow is staying within the function.
ResultFileTracker const resultFileTracker;

// The data transmit rate tracker is set up before reading each data message.
unique_ptr<lsst::qserv::TimeCountTracker<double>> transmitRateTracker;

// A location of the next byte to be read from the input file. The variable
// is used for error reporting.
uint64_t offset = 0;

// Temporary buffer for messages read from the file. The buffer gets automatically
// resized to fit the largest message.
unique_ptr<char[]> msgBuf;
size_t msgBufSize = 0;
size_t msgBufNext = 0; // An index of the next character in the buffer.

// Fixed-size buffer to store the message size.
string msgSizeBuf(sizeof(uint32_t), '\0');
size_t msgSizeBufNext = 0; // An index of the next character in the buffer.

// The size of the next/current message. The variable is set after succesfully parsing
// the message length header and is reset back to 0 after parsing the message body.
// The value is stays 0 while reading the frame header.
uint32_t msgSizeBytes = 0;
bool success = true;
bool mergeSuccess = true;
int headerCount = 0;
uint64_t totalBytesRead = 0;
try {
string const noClientData;
vector<string> const noClientHeaders;
http::ClientConfig clientConfig;
clientConfig.httpVersion = CURL_HTTP_VERSION_1_1; // same as in qhttp
clientConfig.bufferSize = CURL_MAX_READ_SIZE; // 10 MB in the current version of libcurl
clientConfig.tcpKeepAlive = true;
clientConfig.tcpKeepIdle = 5; // the default is 60 sec
clientConfig.tcpKeepIntvl = 5; // the default is 60 sec
http::Client reader(http::Method::GET, httpUrl, noClientData, noClientHeaders, clientConfig,
httpConnPool);
reader.read([&](char const* inBuf, size_t inBufSize) {
// A value of the flag is set by the message processor when it's time to finish
// or abort reading the file.
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
while ((next < end) && !last) {
LOGS(_log, LOG_LVL_WARN,
context << "TODO:UJ next=" << (uint64_t)next << " end=" << (uint64_t)end
<< " last=" << last);
if (msgSizeBytes == 0) {
// Continue or finish reading the frame header.
size_t const bytes2read =
std::min(sizeof(uint32_t) - msgSizeBufNext, (size_t)(end - next));
std::memcpy(msgSizeBuf.data() + msgSizeBufNext, next, bytes2read);
next += bytes2read;
offset += bytes2read;
msgSizeBufNext += bytes2read;
if (msgSizeBufNext == sizeof(uint32_t)) {
++headerCount;
// Done reading the frame header.
msgSizeBufNext = 0;
// Parse and evaluate the message length.
msgSizeBytes = *(reinterpret_cast<uint32_t*>(msgSizeBuf.data()));
if (msgSizeBytes == 0) {
throw runtime_error("message size is 0 at offset " +
to_string(offset - sizeof(uint32_t)) + ", file: " + httpUrl);
}
if (msgSizeBytes > ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) {
throw runtime_error("message size " + to_string(msgSizeBytes) + " at offset " +
to_string(offset - sizeof(uint32_t)) +
" exceeds the hard limit of " +
to_string(ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) +
", file: " + httpUrl);
}
// Extend the message buffer (if needed). Note that buffer never gets
// truncated to avoid excessive memory deallocations/allocations.
if (msgBufSize < msgSizeBytes) {
msgBufSize = msgSizeBytes;
msgBuf.reset(new char[msgBufSize]);
}
// Starts the tracker to measure the performance of the network I/O.
transmitRateTracker =
make_unique<lsst::qserv::TimeCountTracker<double>>(reportFileRecvRate);
}
} else {
// Continue or finish reading the message body.
size_t const bytes2read =
std::min((size_t)msgSizeBytes - msgBufNext, (size_t)(end - next));
std::memcpy(msgBuf.get() + msgBufNext, next, bytes2read);
next += bytes2read;
offset += bytes2read;
msgBufNext += bytes2read;
if (msgBufNext == msgSizeBytes) {
// Done reading message body.
msgBufNext = 0;

// Destroying the tracker will result in stopping the tracker's timer and
// reporting the file read rate before proceeding to the merge.
if (transmitRateTracker != nullptr) {
transmitRateTracker->addToValue(msgSizeBytes);
transmitRateTracker->setSuccess();
transmitRateTracker.reset();
}

// Parse and evaluate the message.
mergeSuccess = messageIsReady(msgBuf.get(), msgSizeBytes, last);
totalBytesRead += msgSizeBytes;
if (!mergeSuccess) {
success = false;
throw runtime_error("message processing failed at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
// Reset the variable to prepare for reading the next header & message (if any).
msgSizeBytes = 0;
} else {
LOGS(_log, LOG_LVL_WARN,
context << " headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
}
}
}
});
LOGS(_log, LOG_LVL_DEBUG,
context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes
<< " totalBytesRead=" << totalBytesRead);
if (msgSizeBufNext != 0) {
throw runtime_error("short read of the message header at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
if (msgBufNext != 0) {
throw runtime_error("short read of the message body at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, context + " " + ex.what());
success = false;
}

// Remove the file from the worker if it still exists. Report and ignore errors.
// The files will be garbage-collected by workers.
try {
http::Client remover(http::Method::DELETE, httpUrl);
remover.read([](char const* inBuf, size_t inBufSize) {});
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
}
// If the merge failed, that indicates something went wrong in the local database table,
// is likely this user query is doomed and should be cancelled.
LOGS(_log, LOG_LVL_DEBUG, context << " end succes=" << success << " mergeSuccess=" << mergeSuccess);
return {success, mergeSuccess};
}

} // namespace

namespace lsst::qserv::ccontrol {
Expand All @@ -396,18 +557,20 @@ MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std:
_initState();
}

MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__); }
MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__ << " " << _tableName); }

bool MergingHandler::flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) {
_wName = responseSummary.wname();

// This is needed to ensure the job query would be staying alive for the duration
// of the operation to prevent inconsistency witin the application.
auto const jobQuery = getJobQuery().lock();
if (jobQuery == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobQuery was NULL");
auto const jobBase = getJobBase().lock();
if (jobBase == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobBase was NULL");
return false;
}
auto const jobQuery = std::dynamic_pointer_cast<qdisp::JobQuery>(jobBase);

LOGS(_log, LOG_LVL_TRACE,
"MergingHandler::" << __func__ << " jobid=" << responseSummary.jobid()
<< " transmitsize=" << responseSummary.transmitsize()
Expand Down Expand Up @@ -508,10 +671,79 @@ bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary,
return success;
}

bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
proto::ResponseData const& responseData) {
if (_flushed) {
throw util::Bug(ERR_LOC, "already flushed");
}
bool const success = _infileMerger->mergeHttp(uberJob, responseData);
if (!success) {
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
util::Error const& err = _infileMerger->getError();
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
}
return success;
}

void MergingHandler::_setError(int code, std::string const& msg) {
LOGS(_log, LOG_LVL_DEBUG, "_setErr: code: " << code << ", message: " << msg);
std::lock_guard<std::mutex> lock(_errorMutex);
_error = Error(code, msg);
}

tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expectedRows,
uint64_t& resultRows) {
bool success = false;
bool shouldCancel = false;

// This is needed to ensure the job query would be staying alive for the duration
// of the operation to prevent inconsistency within the application.
auto const jobBase = getJobBase().lock();
if (jobBase == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " failed, jobBase was NULL");
return {success, shouldCancel}; // both should still be false
}
auto const uberJob = std::dynamic_pointer_cast<qdisp::UberJob>(jobBase);

LOGS(_log, LOG_LVL_TRACE,
"MergingHandler::" << __func__ << " uberJob=" << uberJob->getIdStr() << " fileUrl=" << fileUrl);

// Dispatch result processing to the corresponidng method which depends on
// the result delivery protocol configured at the worker.
// Notify the file reader when all rows have been read by setting 'last = true'.
auto const dataMergerHttp = [&](char const* buf, uint32_t bufSize, bool& last) {
LOGS(_log, LOG_LVL_TRACE, "dataMergerHttp");
last = true;
proto::ResponseData responseData;
if (responseData.ParseFromArray(buf, bufSize) && responseData.IsInitialized()) {
bool const mergeSuccess = _mergeHttp(uberJob, responseData);
if (mergeSuccess) {
resultRows += responseData.row_size();
last = resultRows >= expectedRows;
}
return mergeSuccess;
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
};

tie(success, shouldCancel) =
::readHttpFileAndMergeHttp(uberJob, fileUrl, dataMergerHttp, MergingHandler::_getHttpConnPool());

if (!success || shouldCancel) {
LOGS(_log, LOG_LVL_WARN, __func__ << " success=" << success << " shouldCancel=" << shouldCancel);
}

if (success) {
_infileMerger->mergeCompleteFor(uberJob->getJobId());
}
return {success, shouldCancel};
}

void MergingHandler::flushHttpError(int errorCode, std::string const& errorMsg, int status) {
if (!_errorSet.exchange(true)) {
_error = util::Error(errorCode, errorMsg, util::ErrorCode::MYSQLEXEC);
_setError(ccontrol::MSG_RESULT_ERROR, _error.getMsg());
}
}

} // namespace lsst::qserv::ccontrol
13 changes: 13 additions & 0 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ResponseSummary;

namespace lsst::qserv::qdisp {
class JobQuery;
class UberJob;
} // namespace lsst::qserv::qdisp

namespace lsst::qserv::rproc {
Expand Down Expand Up @@ -74,6 +75,14 @@ class MergingHandler : public qdisp::ResponseHandler {
/// @return true if successful (no error)
bool flush(proto::ResponseSummary const& responseSummary, uint32_t& resultRows) override;

/// @see ResponseHandler::flushHttp
/// @see MerginHandler::_mergeHttp
std::tuple<bool, bool> flushHttp(std::string const& fileUrl, uint64_t expectedRows,
uint64_t& resultRows) override;

/// @see ResponseHandler::flushHttpError
void flushHttpError(int errorCode, std::string const& errorMsg, int status) override;

/// Signal an unrecoverable error condition. No further calls are expected.
void errorFlush(std::string const& msg, int code) override;

Expand Down Expand Up @@ -101,6 +110,9 @@ class MergingHandler : public qdisp::ResponseHandler {
bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jobQuery);

/// Call InfileMerger to do the work of merging this data to the result.
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, proto::ResponseData const& responseData);

/// Set error code and string.
void _setError(int code, std::string const& msg);

Expand All @@ -115,6 +127,7 @@ class MergingHandler : public qdisp::ResponseHandler {
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
std::string _tableName; ///< Target table name
Error _error; ///< Error description
std::atomic<bool> _errorSet{false}; ///< Set to true when an error is set.
mutable std::mutex _errorMutex; ///< Protect readers from partial updates
bool _flushed{false}; ///< flushed to InfileMerger?
std::string _wName{"~"}; ///< worker name
Expand Down
6 changes: 3 additions & 3 deletions src/ccontrol/UserQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
#include "qmeta/types.h"

// Forward decl
namespace lsst::qserv::qdisp {
namespace lsst::qserv::qmeta {
class MessageStore;
} // namespace lsst::qserv::qdisp
} // namespace lsst::qserv::qmeta

namespace lsst::qserv::ccontrol {

Expand Down Expand Up @@ -74,7 +74,7 @@ class UserQuery {
virtual void discard() = 0;

// Delegate objects
virtual std::shared_ptr<qdisp::MessageStore> getMessageStore() = 0;
virtual std::shared_ptr<qmeta::MessageStore> getMessageStore() = 0;

/// This method should disappear when we start supporting results
/// in locations other than MySQL tables. We'll switch to getResultLocation()
Expand Down
Loading

0 comments on commit 79bafad

Please sign in to comment.