Skip to content

Commit

Permalink
Modified Czar to allow pulling result files from workers
Browse files Browse the repository at this point in the history
The new version of the result merger at Czar would dynamically determine
which protocol should be used for pulling results from workers: SSI stream,
a file read via the XROOTD file protocol, or a file read via the HTTP
protocol. In case of the last two options, the merger would also also
utomatically tell a worker to delete the result file upon completion of
the merge (including the unsuccessful ones).

A choice of the delivery method is based on the optional
fileds in the Protobuf messages received from workers.
It's up to the workers to decide on what method to select.
  • Loading branch information
iagaponenko committed Jul 7, 2023
1 parent 3b0efd4 commit 62238ca
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_dependencies(ccontrol proto)

target_include_directories(ccontrol PRIVATE
${ANTLR4_INCLUDE_DIR}
${XROOTD_INCLUDE_DIRS}
)

target_sources(ccontrol PRIVATE
Expand All @@ -28,8 +29,10 @@ target_link_libraries(ccontrol PUBLIC
boost_regex
log
parser
replica
sphgeom
xrdreq
XrdCl
)

FUNCTION(ccontrol_tests)
Expand Down
287 changes: 284 additions & 3 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
#include "ccontrol/MergingHandler.h"

// System headers
#include <algorithm>
#include <cassert>
#include <cstring>

// Third-party headers
#include "XrdCl/XrdClFile.hh"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -38,22 +43,269 @@
#include "proto/ProtoImporter.h"
#include "proto/WorkerResponse.h"
#include "qdisp/JobQuery.h"
#include "replica/HttpClient.h"
#include "rproc/InfileMerger.h"
#include "util/Bug.h"
#include "util/common.h"
#include "util/StringHash.h"

using lsst::qserv::proto::ProtoHeader;
using lsst::qserv::proto::ProtoHeaderWrap;
using lsst::qserv::proto::ProtoImporter;
using lsst::qserv::proto::Result;
using lsst::qserv::proto::WorkerResponse;
using lsst::qserv::replica::HttpClient;

using namespace std;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.MergingHandler");

string xrootdStatus2str(XrdCl::XRootDStatus const& s) {
return "status=" + to_string(s.status) + ", code=" + to_string(s.code) + ", errNo=" + to_string(s.errNo) +
", message='" + s.GetErrorMessage() + "'";
}

/**
* Extract the file path (including both slashes) from the XROOTD-style URL.
* Input:
* @code
* "xroot://<server>:<port>//<path>""
* @code
* Output:
* @code
* "//<path>""
* @code
*/
string xrootUrl2path(string const& xrootUrl) {
string const delim = "//";
auto firstPos = xrootUrl.find(delim, 0);
if (string::npos != firstPos) {
// Resume serching at the first character following the delimiter.
auto secondPos = xrootUrl.find(delim, firstPos + 2);
if (string::npos != secondPos) {
return xrootUrl.substr(secondPos);
}
}
throw runtime_error("MergingHandler::" + string(__func__) + " illegal file resource url: " + xrootUrl);
}

bool readXrootFileResourceAndMerge(lsst::qserv::proto::Result const& result,
function<bool(char const*, uint32_t)> const& messageIsReady) {
string const context = "MergingHandler::" + string(__func__) + " ";

// Extract data from the input result object before modifying the one.
string const xrootUrl = result.fileresource_xroot();

// The algorithm will read the input file to locate result objects containing rows
// and call the provided callback for each such row.
XrdCl::File file;
XrdCl::XRootDStatus status;
status = file.Open(xrootUrl, XrdCl::OpenFlags::Read);
if (!status.IsOK()) {
LOGS(_log, LOG_LVL_ERROR,
context << "failed to open " << xrootUrl << ", " << xrootdStatus2str(status));
return false;
}

// Temporary buffer for messages read from the file. The buffer will be (re-)allocated
// as needed to get the largest message. Note that a size of the messages won't exceed
// a limit set in ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT.
unique_ptr<char[]> buf;
size_t bufSize = 0;

uint64_t offset = 0; // A location of the next byte to be read from the input file.
bool success = true;
try {
while (true) {
// Read the frame header that carries a size of the subsequent message.
uint32_t msgSizeBytes = 0;
uint32_t bytesRead = 0;
status = file.Read(offset, sizeof(uint32_t), reinterpret_cast<char*>(&msgSizeBytes), bytesRead);
if (!status.IsOK()) {
throw runtime_error(context + "failed to read next frame header (" +
to_string(sizeof(uint32_t)) + " bytes) at offset " + to_string(offset) +
" from " + xrootUrl + ", " + xrootdStatus2str(status));
}
offset += bytesRead;

if (bytesRead == 0) break;
if (bytesRead != sizeof(uint32_t)) {
throw runtime_error(context + "read " + to_string(bytesRead) + " bytes instead of " +
to_string(sizeof(uint32_t)) +
" bytes when reading next frame header at offset " +
to_string(offset - bytesRead) + " from " + xrootUrl + ", " +
xrootdStatus2str(status));
}
if (msgSizeBytes == 0) break;
if (msgSizeBytes > ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) {
throw runtime_error(context + "message size of " + to_string(msgSizeBytes) +
" bytes at the frame header read at offset " +
to_string(offset - bytesRead) + " exceeds the hard limit set to " +
to_string(ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) + " bytes, from " +
xrootUrl + ", " + xrootdStatus2str(status));
}

// (Re-)allocate the buffer if needed.
if (bufSize < msgSizeBytes) {
bufSize = msgSizeBytes;
buf.reset(new char[bufSize]);
}

// Read the message.
size_t bytes2read = msgSizeBytes;
while (bytes2read != 0) {
uint32_t bytesRead = 0;
status = file.Read(offset, bytes2read, buf.get(), bytesRead);
if (!status.IsOK()) {
throw runtime_error(context + "failed to read " + to_string(bytes2read) +
" bytes at offset " + to_string(offset) + " from " + xrootUrl + ", " +
xrootdStatus2str(status));
}
if (bytesRead == 0) {
throw runtime_error(context + "read 0 bytes instead of " + to_string(bytes2read) +
" bytes at offset " + to_string(offset) + " from " + xrootUrl + ", " +
xrootdStatus2str(status));
}
offset += bytesRead;
bytes2read -= bytesRead;
}
success = messageIsReady(buf.get(), msgSizeBytes);
if (!success) break;
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, ex.what());
success = false;
}
status = file.Close();
if (!status.IsOK()) {
LOGS(_log, LOG_LVL_WARN,
context << "failed to close " << xrootUrl << ", " << xrootdStatus2str(status));
}

// Remove the file from the worker if it still exists. Report and ignore errors.
// The files will be garbage-collected by workers.
XrdCl::FileSystem fileSystem(xrootUrl);
status = fileSystem.Rm(xrootUrl2path(xrootUrl));
if (!status.IsOK()) {
LOGS(_log, LOG_LVL_WARN,
context << "failed to remove " << xrootUrl << ", " << xrootdStatus2str(status));
}
return success;
}

bool readHttpFileAndMerge(lsst::qserv::proto::Result const& result,
function<bool(char const*, uint32_t)> const& messageIsReady) {
string const context = "MergingHandler::" + string(__func__) + " ";

// Extract data from the input result object before modifying the one.
string const httpUrl = result.fileresource_http();

// A location of the next byte to be read from the input file. The variable
// is used for error reporting.
uint64_t offset = 0;

// Temporary buffer for messages read from the file. The buffer gets automatically
// resized to fit the largest message.
unique_ptr<char[]> msgBuf;
size_t msgBufSize = 0;
size_t msgBufNext = 0; // An index of the next character in the buffer.

// Fixed-size buffer to store the message size.
string msgSizeBuf(sizeof(uint32_t), '\0');
size_t msgSizeBufNext = 0; // An index of the next character in the buffer.

// The size of the next/current message. The variable is set after succesfully parsing
// the message length header and is reset back to 0 after parsing the message body.
// The value is stays 0 while reading the frame header.
uint32_t msgSizeBytes = 0;
bool success = true;
try {
HttpClient reader("GET", httpUrl);
reader.read([&](char const* inBuf, size_t inBufSize) {
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
while (next < end) {
if (msgSizeBytes == 0) {
// Continue or finish reading the frame header.
size_t const bytes2read =
std::min(sizeof(uint32_t) - msgSizeBufNext, (size_t)(end - next));
std::memcpy(msgSizeBuf.data() + msgSizeBufNext, next, bytes2read);
next += bytes2read;
offset += bytes2read;
msgSizeBufNext += bytes2read;
if (msgSizeBufNext == sizeof(uint32_t)) {
// Done reading the frame header.
msgSizeBufNext = 0;
// Parse and evaluate the message length.
msgSizeBytes = *(reinterpret_cast<uint32_t*>(msgSizeBuf.data()));
if (msgSizeBytes == 0) {
throw runtime_error(context + "message size is 0 at offset " +
to_string(offset - sizeof(uint32_t)) + ", file: " + httpUrl);
}
if (msgSizeBytes > ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) {
throw runtime_error(context + "message size " + to_string(msgSizeBytes) +
" at offset " + to_string(offset - sizeof(uint32_t)) +
" exceeds the hard limit of " +
to_string(ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) +
", file: " + httpUrl);
}
// Extend the message buffer (if needed). Note that buffer never gets
// truncated to avoid excessive memory deallocations/allocations.
if (msgBufSize < msgSizeBytes) {
msgBufSize = msgSizeBytes;
msgBuf.reset(new char[msgBufSize]);
}
}
} else {
// Continue or finish reading the message body.
size_t const bytes2read =
std::min((size_t)msgSizeBytes - msgBufNext, (size_t)(end - next));
std::memcpy(msgBuf.get() + msgBufNext, next, bytes2read);
next += bytes2read;
offset += bytes2read;
msgBufNext += bytes2read;
if (msgBufNext == msgSizeBytes) {
// Done reading message body.
msgBufNext = 0;
// Parse and evaluate the message.
bool const success = messageIsReady(msgBuf.get(), msgSizeBytes);
if (!success) {
throw runtime_error(context + "message processing failed at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
// Reset the variable to prepare for reading the next header & message (if any).
msgSizeBytes = 0;
}
}
}
});
if (msgSizeBufNext != 0) {
throw runtime_error(context + "short read of the message header at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
if (msgBufNext != 0) {
throw runtime_error(context + "short read of the message body at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR, ex.what());
success = false;
}

// Remove the file from the worker if it still exists. Report and ignore errors.
// The files will be garbage-collected by workers.
try {
HttpClient remover("DELETE", httpUrl);
remover.read([](char const* inBuf, size_t inBufSize) {});
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, context << "failed to remove " << httpUrl << ", ex: " << ex.what());
}
return success;
}

} // namespace

namespace lsst::qserv::ccontrol {

////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -148,16 +400,45 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next
LOGS(_log, LOG_LVL_WARN, "setResult failure " << _wName);
return false;
}
resultRows = _response->result.row_size();
LOGS(_log, LOG_LVL_DEBUG, "From:" << _wName << " _mBuf " << util::prettyCharList(*bufPtr, 5));
_state = MsgState::HEADER_WAIT;

int jobId = _response->result.jobid();
_jobIds.insert(jobId);
LOGS(_log, LOG_LVL_DEBUG, "Flushed last=" << last << " for tableName=" << _tableName);

auto success = _merge();
_response.reset(new WorkerResponse());
// Dispatch result processing to the corresponidng method which depends on
// the result delivery protocol configured at the worker.
auto const mergeCurrentResult = [this, &resultRows]() {
resultRows += _response->result.row_size();
bool const success = _merge();
// A fresh instance may be needed to process the next message of the results stream.
// Note that _merge() resets the object.
_response.reset(new WorkerResponse());
return success;
};
bool success = false;
if (!_response->result.fileresource_xroot().empty()) {
success = ::readXrootFileResourceAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
} else if (!_response->result.fileresource_http().empty()) {
success = ::readHttpFileAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
} else {
success = mergeCurrentResult();
}
return success;
}
case MsgState::RESULT_RECV:
Expand Down

0 comments on commit 62238ca

Please sign in to comment.