Skip to content

Commit

Permalink
Reinforced response processing in the result merger
Browse files Browse the repository at this point in the history
The new code verifies if the summary messages reported for the file-based
results don't have any error contexts. Abort result merging should
any such contexts be found.
  • Loading branch information
iagaponenko committed Jul 3, 2023
1 parent e75839a commit f906869
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
46 changes: 30 additions & 16 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,23 +419,27 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next
};
bool success = false;
if (!_response->result.fileresource_xroot().empty()) {
success = ::readXrootFileResourceAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
success = _noErrorsInResult() &&
::readXrootFileResourceAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error(
"MergingHandler::flush ** message deserialization failed **");
});
} else if (!_response->result.fileresource_http().empty()) {
success = ::readHttpFileAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error("MergingHandler::flush ** message deserialization failed **");
});
success = _noErrorsInResult() &&
::readHttpFileAndMerge(
_response->result, [&](char const* buf, uint32_t messageLength) -> bool {
if (_response->result.ParseFromArray(buf, messageLength) &&
_response->result.IsInitialized()) {
return mergeCurrentResult();
}
throw runtime_error(
"MergingHandler::flush ** message deserialization failed **");
});
} else {
success = mergeCurrentResult();
}
Expand All @@ -461,6 +465,16 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next
return false;
}

bool MergingHandler::_noErrorsInResult() {
if (_response->result.has_errorcode() || _response->result.has_errormsg()) {
_setError(_response->result.errorcode(), _response->result.errormsg());
LOGS(_log, LOG_LVL_ERROR,
"Error from worker:" << _response->protoHeader.wname() << " in response data: " << _error);
return false;
}
return true;
}

void MergingHandler::errorFlush(std::string const& msg, int code) {
_setError(code, msg);
// Might want more info from result service.
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class MergingHandler : public qdisp::ResponseHandler {
void _setError(int code, std::string const& msg); ///< Set error code and string
bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer.
bool _verifyResult(BufPtr const& bufPtr, int blen); ///< Check the result against hash in the header.
bool _noErrorsInResult(); ///< Check if the rsult message object has no errors, report the ones (if any).

std::shared_ptr<MsgReceiver> _msgReceiver; ///< Message code receiver
std::shared_ptr<rproc::InfileMerger> _infileMerger; ///< Merging delegate
Expand Down

0 comments on commit f906869

Please sign in to comment.