Skip to content

Commit

Permalink
Refined states of the worker tasks
Browse files Browse the repository at this point in the history
Introduced an additional state STARTED to differentiate between
two events - when a task was started by a scheduler from the actual
start time of the corresponding MySQL query. The change is meant
to improve the monitoring of the worker tasks.
The corresponding monitoring page of the Web Dashboard was modified
as well.
  • Loading branch information
iagaponenko committed Aug 15, 2023
1 parent 828b6d3 commit 1c17f16
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 20 deletions.
12 changes: 10 additions & 2 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ void Task::queued(std::chrono::system_clock::time_point const& now) {
bool Task::isRunning() const {
std::lock_guard<std::mutex> lock(_stateMtx);
switch (_state) {
case TaskState::STARTED:
case TaskState::EXECUTING_QUERY:
case TaskState::READING_DATA:
return true;
Expand All @@ -391,13 +392,18 @@ bool Task::isRunning() const {
}
}

/// Set values associated with the Task being started.
void Task::started(std::chrono::system_clock::time_point const& now) {
std::lock_guard<std::mutex> guard(_stateMtx);
_state = TaskState::EXECUTING_QUERY;
_state = TaskState::STARTED;
_startTime = now;
}

void Task::queryExecutionStarted() {
std::lock_guard<std::mutex> guard(_stateMtx);
_state = TaskState::EXECUTING_QUERY;
_queryExecTime = std::chrono::system_clock::now();
}

void Task::queried() {
std::lock_guard<std::mutex> guard(_stateMtx);
_state = TaskState::READING_DATA;
Expand Down Expand Up @@ -430,6 +436,7 @@ std::chrono::milliseconds Task::getRunTime() const {
switch (_state) {
case TaskState::FINISHED:
return std::chrono::duration_cast<std::chrono::milliseconds>(_finishTime - _startTime);
case TaskState::STARTED:
case TaskState::EXECUTING_QUERY:
case TaskState::READING_DATA:
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() -
Expand Down Expand Up @@ -483,6 +490,7 @@ nlohmann::json Task::getJson() const {
js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime);
js["queueTime_msec"] = util::TimeUtils::tp2ms(_queueTime);
js["startTime_msec"] = util::TimeUtils::tp2ms(_startTime);
js["queryExecTime_msec"] = util::TimeUtils::tp2ms(_queryExecTime);
js["queryTime_msec"] = util::TimeUtils::tp2ms(_queryTime);
js["finishTime_msec"] = util::TimeUtils::tp2ms(_finishTime);
js["sizeSoFar"] = _totalSize;
Expand Down
16 changes: 10 additions & 6 deletions src/wbase/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ class Task : public util::CommandForThreadPool {
void queued(std::chrono::system_clock::time_point const& now);
void started(std::chrono::system_clock::time_point const& now);

/// The actual execution of the corresponding MySQL query (or queries) started.
void queryExecutionStarted();

/// MySQL finished executing queries.
void queried();

Expand Down Expand Up @@ -335,12 +338,13 @@ class Task : public util::CommandForThreadPool {
mutable std::mutex _stateMtx; ///< Mutex to protect state related members _state, _???Time.
std::atomic<TaskState> _state{TaskState::CREATED};
std::chrono::system_clock::time_point _createTime =
std::chrono::system_clock::now(); ///< task was created
std::chrono::system_clock::time_point _queueTime; ///< task was queued
std::chrono::system_clock::time_point _startTime; ///< task processing started
std::chrono::system_clock::time_point _queryTime; ///< MySQL finished executing queries
std::chrono::system_clock::time_point _finishTime; ///< data transmission to Czar fiished
size_t _totalSize = 0; ///< Total size of the result so far.
std::chrono::system_clock::now(); ///< task was created
std::chrono::system_clock::time_point _queueTime; ///< task was queued
std::chrono::system_clock::time_point _startTime; ///< task processing started
std::chrono::system_clock::time_point _queryExecTime; ///< query execution at MySQL started
std::chrono::system_clock::time_point _queryTime; ///< MySQL finished executing queries
std::chrono::system_clock::time_point _finishTime; ///< data transmission to Czar fiished
size_t _totalSize = 0; ///< Total size of the result so far.

/// Stores information on the query's resource usage.
std::weak_ptr<wpublish::QueryStatistics> _queryStats;
Expand Down
13 changes: 12 additions & 1 deletion src/wbase/TaskState.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ namespace lsst::qserv::wbase {
* introducing an additional (Protobuf) representation for those, or converting
* the values to strings and vs.
*/
enum class TaskState : std::uint64_t { CREATED = 0, QUEUED, EXECUTING_QUERY, READING_DATA, FINISHED };
enum class TaskState : std::uint64_t {
CREATED = 0,
QUEUED,
STARTED,
EXECUTING_QUERY,
READING_DATA,
FINISHED
};

/// @return The string representation of the input state.
/// @throw std::invalid_argument If the string can't be parsed into a valid state.
Expand All @@ -52,6 +59,8 @@ inline std::string taskState2str(TaskState state) {
return "CREATED";
case TaskState::QUEUED:
return "QUEUED";
case TaskState::STARTED:
return "STARTED";
case TaskState::EXECUTING_QUERY:
return "EXECUTING_QUERY";
case TaskState::READING_DATA:
Expand All @@ -71,6 +80,8 @@ inline TaskState str2taskState(std::string const& state) {
return TaskState::CREATED;
else if (state == "QUEUED")
return TaskState::QUEUED;
else if (state == "STARTED")
return TaskState::STARTED;
else if (state == "EXECUTING_QUERY")
return TaskState::EXECUTING_QUERY;
else if (state == "READING_DATA")
Expand Down
1 change: 1 addition & 0 deletions src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ bool QueryRunner::_dispatchChannel() {
string const& query = _task->getQueryString();
util::Timer primeT;
primeT.start();
_task->queryExecutionStarted();
MYSQL_RES* res = _primeResult(query); // This runs the SQL query, throws SqlErrorObj on failure.
primeT.stop();
needToFreeRes = true;
Expand Down
30 changes: 19 additions & 11 deletions src/www/qserv/js/QservWorkerTasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function(CSSLoader,
<label for="num-tasks"># displayed / selected / total:</label>
<input type="text" id="num-tasks" class="form-control" value="0 / 0 / 0" disabled>
</div>
<div class="form-group col-md-3">
<div class="form-group col-md-2">
<label for="worker">Worker:</label>
<select id="worker" class="form-control form-control-selector">
</select>
Expand All @@ -62,15 +62,17 @@ function(CSSLoader,
<option value="" selected>&lt;any&gt;</option>
</select>
</div>
<div class="form-group col-md-3">
<div class="form-group col-md-4">
<label for="state">State:</label>
<select id="state" class="form-control form-control-selector">
<option value="">&lt;any&gt;</option>
<option value="CREATED">CREATED</option>
<option value="QUEUED">QUEUED</option>
<option value="STARTED">STARTED</option>
<option value="EXECUTING_QUERY">EXECUTING_QUERY</option>
<option value="READING_DATA">READING_DATA</option>
<option value="EXECUTING_QUERY,READING_DATA" selected>EXECUTING_QUERY | READING_DATA</option>
<option value="EXECUTING_QUERY,READING_DATA">EXECUTING_QUERY | READING_DATA</option>
<option value="STARTED,EXECUTING_QUERY,READING_DATA" selected>STARTED | EXECUTING_QUERY | READING_DATA</option>
<option value="FINISHED">FINISHED</option>
<option value="CREATED,QUEUED,EXECUTING_QUERY,READING_DATA">!FINISHED</option>
</select>
Expand Down Expand Up @@ -133,6 +135,8 @@ function(CSSLoader,
<th class="sticky right-aligned"><elem style="color:red;">&rarr;</elem></th>
<th class="sticky" style="text-align:right;">started</th>
<th class="sticky right-aligned"><elem style="color:red;">&rarr;</elem></th>
<th class="sticky" style="text-align:right;">query&nbsp;started</th>
<th class="sticky right-aligned"><elem style="color:red;">&rarr;</elem></th>
<th class="sticky" style="text-align:right;">queried</th>
<th class="sticky right-aligned"><elem style="color:red;">&rarr;</elem></th>
<th class="sticky" style="text-align:right;">finished</th>
Expand All @@ -150,7 +154,7 @@ function(CSSLoader,
});
cont.find("button#reset-tasks-form").click(() => {
this._set_query('');
this._set_state("EXECUTING_QUERY,READING_DATA");
this._set_state("STARTED,EXECUTING_QUERY,READING_DATA");
this._set_max_tasks(200);
this._set_update_interval_sec(10);
this._load();
Expand Down Expand Up @@ -328,7 +332,9 @@ function(CSSLoader,
<td style="text-align:right;"><pre>${task.queueTime_msec ? QservWorkerTasks._timestamp2hhmmss(task.queueTime_msec) : ""}</pre></td>
<th style="text-align:right;"><pre>${QservWorkerTasks._timestamps_diff2str(task.queueTime_msec, task.startTime_msec, snapshotTime_msec)}</pre></th>
<td style="text-align:right;"><pre>${task.startTime_msec ? QservWorkerTasks._timestamp2hhmmss(task.startTime_msec) : ""}</pre></td>
<th style="text-align:right;"><pre>${QservWorkerTasks._timestamps_diff2str(task.startTime_msec, task.queryTime_msec, snapshotTime_msec)}</pre></th>
<th style="text-align:right;"><pre>${QservWorkerTasks._timestamps_diff2str(task.startTime_msec, task.queryExecTime_msec, snapshotTime_msec)}</pre></th>
<td style="text-align:right;"><pre>${task.queryExecTime_msec ? QservWorkerTasks._timestamp2hhmmss(task.queryExecTime_msec) : ""}</pre></td>
<th style="text-align:right;"><pre>${QservWorkerTasks._timestamps_diff2str(task.queryExecTime_msec, task.queryTime_msec, snapshotTime_msec)}</pre></th>
<td style="text-align:right;"><pre>${task.queryTime_msec ? QservWorkerTasks._timestamp2hhmmss(task.queryTime_msec) : ""}</pre></td>
<th style="text-align:right;"><pre>${QservWorkerTasks._timestamps_diff2str(task.queryTime_msec, task.finishTime_msec, snapshotTime_msec)}</pre></th>
<td style="text-align:right;"><pre>${task.finishTime_msec ? QservWorkerTasks._timestamp2hhmmss(task.finishTime_msec) : ""}</pre></td>
Expand Down Expand Up @@ -362,19 +368,21 @@ function(CSSLoader,
switch (state) {
case 0: return 'table-warning';
case 1: return 'table-light';
case 2: return 'table-primary';
case 3: return 'table-info';
case 4: return 'table-secondary';
case 2: return 'table-danger';
case 3: return 'table-primary';
case 4: return 'table-info';
case 5: return 'table-secondary';
default: return 'table-warning';
}
}
static _state2str(state) {
switch (state) {
case 0: return 'CREATED';
case 1: return 'QUEUED';
case 2: return 'EXECUTING_QUERY';
case 3: return 'READING_DATA';
case 4: return 'FINISHED';
case 2: return 'STARTED';
case 3: return 'EXECUTING_QUERY';
case 4: return 'READING_DATA';
case 5: return 'FINISHED';
default: return 'UNKNOWN';
}
}
Expand Down

0 comments on commit 1c17f16

Please sign in to comment.