Skip to content

Commit

Permalink
Enfore large result limit at workers when fetching result sets
Browse files Browse the repository at this point in the history
Note that the enforcement only applies to the file-based result
delivery protocol as this mechanism makes no sense when results
are streamed to Czar over the SSI protocol.
  • Loading branch information
iagaponenko committed Jun 29, 2023
1 parent b8ee721 commit 6e9545a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 1 deletion.
4 changes: 3 additions & 1 deletion src/util/Error.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ struct ErrorCode {
CREATE_TABLE,
MYSQLCONNECT,
MYSQLEXEC,
INTERNAL
INTERNAL,
// Worker errors
WORKER_RESULT_TOO_LARGE
};
};

Expand Down
11 changes: 11 additions & 0 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,17 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
_rowcount += rows;
_transmitsize += bytes;

// Fail the operation if the amount of data in the result set exceeds the requested
// "large result" limit (in case if the one was specified).
if (int maxTableSize = task->getMaxTableSize();
maxTableSize > 0 && bytesTransmitted > maxTableSize) {
string const err = "The result set size " + to_string(bytesTransmitted) +
" of a job exceeds the requested limit of " + to_string(maxTableSize) +
" bytes, task: " + task->getIdStr();
multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err));
throw runtime_error("FileChannelShared::" + string(__func__) + " " + err);
}

// If no more rows are left in the task's result set then we need to check
// if this is last task in a logical group of ones created for processing
// the current request (note that certain classes of requests may require
Expand Down
5 changes: 5 additions & 0 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ string buildResultFilePath(shared_ptr<lsst::qserv::proto::TaskMsg> const& taskMs
to_string(taskMsg->chunkid()) + "-" + to_string(taskMsg->attemptcount()) + ".proto";
return path.string();
}

size_t const MB_SIZE_BYTES = 1024 * 1024;

} // namespace

namespace lsst::qserv::wbase {
Expand Down Expand Up @@ -184,6 +187,7 @@ Task::Task(TaskMsgPtr const& t, int fragmentNumber, std::shared_ptr<UserQueryInf
_scanInfo.scanRating = t->scanpriority();
_scanInfo.sortTablesSlowestFirst();
_scanInteractive = t->scaninteractive();
_maxTableSize = t->maxtablesize_mb() * ::MB_SIZE_BYTES;

// Create sets and vectors for 'aquiring' subchunk temporary tables.
proto::TaskMsg_Fragment const& fragment(t->fragment(_queryFragmentNum));
Expand Down Expand Up @@ -473,6 +477,7 @@ nlohmann::json Task::getJson() const {
js["attemptId"] = _attemptCount;
js["sequenceId"] = _tSeq;
js["scanInteractive"] = _scanInteractive;
js["maxTableSize"] = _maxTableSize;
js["cancelled"] = to_string(_cancelled);
js["state"] = static_cast<uint64_t>(_state.load());
js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime);
Expand Down
2 changes: 2 additions & 0 deletions src/wbase/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class Task : public util::CommandForThreadPool {
int getJobId() const { return _jId; }
int getAttemptCount() const { return _attemptCount; }
bool getScanInteractive() { return _scanInteractive; }
int getMaxTableSize() const { return _maxTableSize; }
proto::ScanInfo& getScanInfo() { return _scanInfo; }
void setOnInteractive(bool val) { _onInteractive = val; }
bool getOnInteractive() { return _onInteractive; }
Expand Down Expand Up @@ -322,6 +323,7 @@ class Task : public util::CommandForThreadPool {
bool _scanInteractive; ///< True if the czar thinks this query should be interactive.
bool _onInteractive{
false}; ///< True if the scheduler put this task on the interactive (group) scheduler.
int _maxTableSize = 0;
std::atomic<memman::MemMan::Handle> _memHandle{memman::MemMan::HandleType::INVALID};
memman::MemMan::Ptr _memMan;

Expand Down

0 comments on commit 6e9545a

Please sign in to comment.