diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index e9935b7f4..45e76291b 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -419,23 +419,27 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next }; bool success = false; if (!_response->result.fileresource_xroot().empty()) { - success = ::readXrootFileResourceAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readXrootFileResourceAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else if (!_response->result.fileresource_http().empty()) { - success = ::readHttpFileAndMerge( - _response->result, [&](char const* buf, uint32_t messageLength) -> bool { - if (_response->result.ParseFromArray(buf, messageLength) && - _response->result.IsInitialized()) { - return mergeCurrentResult(); - } - throw runtime_error("MergingHandler::flush ** message deserialization failed **"); - }); + success = _noErrorsInResult() && + ::readHttpFileAndMerge( + _response->result, [&](char const* buf, uint32_t messageLength) -> bool { + if (_response->result.ParseFromArray(buf, messageLength) && + _response->result.IsInitialized()) { + return mergeCurrentResult(); + } + throw runtime_error( + "MergingHandler::flush ** message deserialization failed **"); + }); } else { success = mergeCurrentResult(); } @@ -461,6 +465,16 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& next return false; } +bool MergingHandler::_noErrorsInResult() { + if (_response->result.has_errorcode() || _response->result.has_errormsg()) { + _setError(_response->result.errorcode(), _response->result.errormsg()); + LOGS(_log, LOG_LVL_ERROR, + "Error from worker:" << _response->protoHeader.wname() << " in response data: " << _error); + return false; + } + return true; +} + void MergingHandler::errorFlush(std::string const& msg, int code) { _setError(code, msg); // Might want more info from result service. diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index bddbdbd73..dd387ece2 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -100,6 +100,7 @@ class MergingHandler : public qdisp::ResponseHandler { void _setError(int code, std::string const& msg); ///< Set error code and string bool _setResult(BufPtr const& bufPtr, int blen); ///< Extract the result from the protobuffer. bool _verifyResult(BufPtr const& bufPtr, int blen); ///< Check the result against hash in the header. + bool _noErrorsInResult(); ///< Check if the rsult message object has no errors, report the ones (if any). std::shared_ptr _msgReceiver; ///< Message code receiver std::shared_ptr _infileMerger; ///< Merging delegate