Skip to content

Commit

Permalink
Enforce large result limit at workers when fetching result sets
Browse files Browse the repository at this point in the history
Note that the enforcement only applies to the file-based result
delivery protocol as this mechanism makes no sense when results
are streamed to Czar over the SSI protocol.

The large result conditions are reported to Czar with the code
util::ErrorCode::WORKER_RESULT_TOO_LARGE which would help Czar
to identify this specific condition and properly report the one
to a user.
  • Loading branch information
iagaponenko committed Jul 20, 2023
1 parent 66b9ab8 commit bec0146
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 2 deletions.
4 changes: 3 additions & 1 deletion src/util/Error.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ struct ErrorCode {
CREATE_TABLE,
MYSQLCONNECT,
MYSQLEXEC,
INTERNAL
INTERNAL,
// Worker errors:
WORKER_RESULT_TOO_LARGE
};
};

Expand Down
2 changes: 2 additions & 0 deletions src/util/MultiError.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ std::string MultiError::toOneLineString() const {
return oss.str();
}

int MultiError::firstErrorCode() const { return empty() ? ErrorCode::NONE : _errorVector.front().getCode(); }

bool MultiError::empty() const { return _errorVector.empty(); }

std::vector<Error>::size_type MultiError::size() const { return _errorVector.size(); }
Expand Down
9 changes: 9 additions & 0 deletions src/util/MultiError.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ class MultiError : public std::exception {
*/
std::string toOneLineString() const;

/** Return the first error code (if any)
*
* The idea is to return the first code that might trigger the "chain" reaction.
* An interpretation of the code depns on a context.
*
* @return the code or ErrorCode::NONE if the collection of errors is empty
*/
int firstErrorCode() const;

virtual ~MultiError() throw() {}

/** Overload output operator for this class
Expand Down
15 changes: 14 additions & 1 deletion src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
transmitT.start();

double bufferFillSecs = 0.0;
int bytesTransmitted = 0;
int64_t bytesTransmitted = 0;
int rowsTransmitted = 0;

// Keep reading rows and converting those into messages while any
Expand Down Expand Up @@ -259,6 +259,19 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
_rowcount += rows;
_transmitsize += bytes;

// Fail the operation if the amount of data in the result set exceeds the requested
// "large result" limit (in case if the one was specified).
if (int64_t const maxTableSize = task->getMaxTableSize();
maxTableSize > 0 && bytesTransmitted > maxTableSize) {
string const err = "The result set size " + to_string(bytesTransmitted) +
" of a job exceeds the requested limit of " + to_string(maxTableSize) +
" bytes, task: " + task->getIdStr();
multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err));
LOGS(_log, LOG_LVL_ERROR, err);
erred = true;
break;
}

// If no more rows are left in the task's result set then we need to check
// if this is last task in a logical group of ones created for processing
// the current request (note that certain classes of requests may require
Expand Down
5 changes: 5 additions & 0 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ string buildResultFilePath(shared_ptr<lsst::qserv::proto::TaskMsg> const& taskMs
to_string(taskMsg->chunkid()) + "-" + to_string(taskMsg->attemptcount()) + ".proto";
return path.string();
}

size_t const MB_SIZE_BYTES = 1024 * 1024;

} // namespace

namespace lsst::qserv::wbase {
Expand Down Expand Up @@ -184,6 +187,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptr<UserQueryInf
_scanInfo.scanRating = t->scanpriority();
_scanInfo.sortTablesSlowestFirst();
_scanInteractive = t->scaninteractive();
_maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES;

// Create sets and vectors for 'aquiring' subchunk temporary tables.
proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum));
Expand Down Expand Up @@ -473,6 +477,7 @@ nlohmann::json Task::getJson() const {
js["attemptId"] = _attemptCount;
js["sequenceId"] = _tSeq;
js["scanInteractive"] = _scanInteractive;
js["maxTableSize"] = _maxTableSize;
js["cancelled"] = to_string(_cancelled);
js["state"] = static_cast<uint64_t>(_state.load());
js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime);
Expand Down
2 changes: 2 additions & 0 deletions src/wbase/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class Task : public util::CommandForThreadPool {
int getJobId() const { return _jId; }
int getAttemptCount() const { return _attemptCount; }
bool getScanInteractive() { return _scanInteractive; }
int64_t getMaxTableSize() const { return _maxTableSize; }
proto::ScanInfo& getScanInfo() { return _scanInfo; }
void setOnInteractive(bool val) { _onInteractive = val; }
bool getOnInteractive() { return _onInteractive; }
Expand Down Expand Up @@ -322,6 +323,7 @@ class Task : public util::CommandForThreadPool {
bool _scanInteractive; ///< True if the czar thinks this query should be interactive.
bool _onInteractive{
false}; ///< True if the scheduler put this task on the interactive (group) scheduler.
int64_t _maxTableSize = 0;
std::atomic<memman::MemMan::Handle> _memHandle{memman::MemMan::HandleType::INVALID};
memman::MemMan::Ptr _memMan;

Expand Down
1 change: 1 addition & 0 deletions src/wbase/TransmitData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void TransmitData::_buildDataMsg(lock_guard<mutex> const& lock, Task const& task
string msg = "Error(s) in result for chunk #" + to_string(task.getChunkId()) + ": " +
multiErr.toOneLineString();
_result->set_errormsg(msg);
_result->set_errorcode(multiErr.firstErrorCode());
LOGS(_log, LOG_LVL_ERROR, _idStr << "buildDataMsg adding " << msg);
}
_result->SerializeToString(&_dataMsg);
Expand Down

0 comments on commit bec0146

Please sign in to comment.