diff --git a/src/util/Error.h b/src/util/Error.h index d4bf22e7a..c95ec76b0 100644 --- a/src/util/Error.h +++ b/src/util/Error.h @@ -60,7 +60,9 @@ struct ErrorCode { CREATE_TABLE, MYSQLCONNECT, MYSQLEXEC, - INTERNAL + INTERNAL, + // Worker errors: + WORKER_RESULT_TOO_LARGE }; }; diff --git a/src/util/MultiError.cc b/src/util/MultiError.cc index da0723c41..bf1ac1361 100644 --- a/src/util/MultiError.cc +++ b/src/util/MultiError.cc @@ -52,6 +52,8 @@ std::string MultiError::toOneLineString() const { return oss.str(); } +int MultiError::firstErrorCode() const { return empty() ? ErrorCode::NONE : _errorVector.front().getCode(); } + bool MultiError::empty() const { return _errorVector.empty(); } std::vector::size_type MultiError::size() const { return _errorVector.size(); } diff --git a/src/util/MultiError.h b/src/util/MultiError.h index 678b7171a..ea0d62048 100644 --- a/src/util/MultiError.h +++ b/src/util/MultiError.h @@ -69,6 +69,15 @@ class MultiError : public std::exception { */ std::string toOneLineString() const; + /** Return the first error code (if any) + * + * The idea is to return the first code that might trigger the "chain" reaction. + * An interpretation of the code depns on a context. + * + * @return the code or ErrorCode::NONE if the collection of errors is empty + */ + int firstErrorCode() const; + virtual ~MultiError() throw() {} /** Overload output operator for this class diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 6b3e23ee6..2254c705d 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -216,7 +216,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrgetMaxTableSize(); + maxTableSize > 0 && bytesTransmitted > maxTableSize) { + string const err = "The result set size " + to_string(bytesTransmitted) + + " of a job exceeds the requested limit of " + to_string(maxTableSize) + + " bytes, task: " + task->getIdStr(); + multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err)); + LOGS(_log, LOG_LVL_ERROR, err); + erred = true; + break; + } + // If no more rows are left in the task's result set then we need to check // if this is last task in a logical group of ones created for processing // the current request (note that certain classes of requests may require diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 1871c67b2..1bca050aa 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -89,6 +89,9 @@ string buildResultFilePath(shared_ptr const& taskMs to_string(taskMsg->chunkid()) + "-" + to_string(taskMsg->attemptcount()) + ".proto"; return path.string(); } + +size_t const MB_SIZE_BYTES = 1024 * 1024; + } // namespace namespace lsst::qserv::wbase { @@ -184,6 +187,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptrscanpriority(); _scanInfo.sortTablesSlowestFirst(); _scanInteractive = t->scaninteractive(); + _maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES; // Create sets and vectors for 'aquiring' subchunk temporary tables. proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum)); @@ -473,6 +477,7 @@ nlohmann::json Task::getJson() const { js["attemptId"] = _attemptCount; js["sequenceId"] = _tSeq; js["scanInteractive"] = _scanInteractive; + js["maxTableSize"] = _maxTableSize; js["cancelled"] = to_string(_cancelled); js["state"] = static_cast(_state.load()); js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index eba22bcdd..f68d6622a 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -220,6 +220,7 @@ class Task : public util::CommandForThreadPool { int getJobId() const { return _jId; } int getAttemptCount() const { return _attemptCount; } bool getScanInteractive() { return _scanInteractive; } + int64_t getMaxTableSize() const { return _maxTableSize; } proto::ScanInfo& getScanInfo() { return _scanInfo; } void setOnInteractive(bool val) { _onInteractive = val; } bool getOnInteractive() { return _onInteractive; } @@ -322,6 +323,7 @@ class Task : public util::CommandForThreadPool { bool _scanInteractive; ///< True if the czar thinks this query should be interactive. bool _onInteractive{ false}; ///< True if the scheduler put this task on the interactive (group) scheduler. + int64_t _maxTableSize = 0; std::atomic _memHandle{memman::MemMan::HandleType::INVALID}; memman::MemMan::Ptr _memMan; diff --git a/src/wbase/TransmitData.cc b/src/wbase/TransmitData.cc index 4cbe64f92..9ae3923f9 100644 --- a/src/wbase/TransmitData.cc +++ b/src/wbase/TransmitData.cc @@ -173,6 +173,7 @@ void TransmitData::_buildDataMsg(lock_guard const& lock, Task const& task string msg = "Error(s) in result for chunk #" + to_string(task.getChunkId()) + ": " + multiErr.toOneLineString(); _result->set_errormsg(msg); + _result->set_errorcode(multiErr.firstErrorCode()); LOGS(_log, LOG_LVL_ERROR, _idStr << "buildDataMsg adding " << msg); } _result->SerializeToString(&_dataMsg);