Skip to content

Commit

Permalink
Wired the query control to notify workers on query completion/cancell…
Browse files Browse the repository at this point in the history
…ation

Added Czar configuration options to disable this feature if needed.
The options will exist for some time before the new code will be proven
to work w/o any side effects to the query processing.
  • Loading branch information
iagaponenko committed Jun 20, 2023
1 parent 19d6029 commit e503efe
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 4 deletions.
12 changes: 12 additions & 0 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ xrootdSpread = 0
# This is per user query and important milestones ignore this limit.
qMetaSecsBetweenChunkCompletionUpdates = 59


# If not 0 then broadcast query completion/cancellation events to all workers
# so that they would do proper garbage collection and resource recycling.
notifyWorkersOnQueryFinish = 1

# If not 0 then broadcast this event to all workers to let them cancel any older
# that were submitted before the restart. The first query identifier in the new
# series will be reported to the workers. The identifier will be used as
# a high watermark for diffirentiating between the older (to be cancelled)
# and the newer queries.
notifyWorkersOnCzarRestart = 1

#[debug]
#chunkLimit = -1

Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target_link_libraries(ccontrol PUBLIC
log
parser
sphgeom
xrdreq
)

FUNCTION(ccontrol_tests)
Expand Down
25 changes: 22 additions & 3 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include <cassert>
#include <chrono>
#include <memory>
#include <stdexcept>

// Third-party headers
#include <boost/algorithm/string/replace.hpp>
Expand All @@ -78,6 +79,7 @@
#include "ccontrol/MergingHandler.h"
#include "ccontrol/TmpTableName.h"
#include "ccontrol/UserQueryError.h"
#include "czar/CzarConfig.h"
#include "global/constants.h"
#include "global/LogContext.h"
#include "global/MsgReceiver.h"
Expand All @@ -103,6 +105,7 @@
#include "sql/Schema.h"
#include "util/IterableFormatter.h"
#include "util/ThreadPriority.h"
#include "xrdreq/QueryManagementAction.h"

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQuerySelect");
Expand Down Expand Up @@ -352,6 +355,9 @@ QueryState UserQuerySelect::join() {
}
_executive->updateProxyMessages();

// Capture these parameters before discarding the merger which would also reset the config.
bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish();
std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl();
try {
_discardMerger();
} catch (std::exception const& exc) {
Expand All @@ -367,19 +373,32 @@ QueryState UserQuerySelect::join() {
// finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the
// same.
if (finalRows < 0) finalRows = collectedRows;
// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE;
QueryState state = SUCCESS;
if (successful) {
_qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_INFO, "Joined everything (success)");
return SUCCESS;
} else if (_killed) {
// status is already set to ABORTED
LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)");
return ERROR;
operation = proto::QueryManagement::CANCEL;
state = ERROR;
} else {
_qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)");
return ERROR;
operation = proto::QueryManagement::CANCEL;
state = ERROR;
}
if (notifyWorkersOnQueryFinish) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId);
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}
return state;
}

/// Release resources held by the merger
Expand Down
38 changes: 38 additions & 0 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

// System headers
#include <sys/time.h>
#include <stdexcept>
#include <thread>

// Third-party headers
Expand All @@ -42,6 +43,7 @@
#include "czar/CzarErrors.h"
#include "czar/MessageTable.h"
#include "global/LogContext.h"
#include "proto/worker.pb.h"
#include "qdisp/PseudoFifo.h"
#include "qdisp/QdispPool.h"
#include "qdisp/SharedResources.h"
Expand All @@ -54,6 +56,7 @@
#include "util/FileMonitor.h"
#include "util/IterableFormatter.h"
#include "util/StringHelper.h"
#include "xrdreq/QueryManagementAction.h"
#include "XrdSsi/XrdSsiProvider.hh"

using namespace std;
Expand Down Expand Up @@ -96,6 +99,21 @@ Czar::Czar(string const& configPath, string const& czarName)
const int year = 60 * 60 * 24 * 365;
_idCounter = uint64_t(tv.tv_sec % year) * 1000 + tv.tv_usec / 1000;

// Tell workers to cancel any queries that were submitted before this restart of Czar.
// Figure out which query (if any) was recorded in Czar database before the restart.
// The id will be used as the high-watermark for queries that need to be cancelled.
// All queries that have identifiers that are strictly less than this one will
// be affected by the operation.
if (_czarConfig.notifyWorkersOnCzarRestart()) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(),
proto::QueryManagement::CANCEL_AFTER_RESTART,
_lastQueryIdBeforeRestart());
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}

auto databaseModels =
qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig());

Expand Down Expand Up @@ -461,4 +479,24 @@ void Czar::removeOldResultTables() {
_oldTableRemovalThread = std::move(t);
}

QueryId Czar::_lastQueryIdBeforeRestart() const {
string const context = "Czar::" + string(__func__) + " ";
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig());
string const sql = "SELECT MAX(queryId) FROM QInfo";
sql::SqlResults results;
sql::SqlErrorObject err;
if (!sqlConn->runQuery(sql, results, err)) {
string const msg =
context + "Query to find the last query id failed, err=" + err.printErrMsg() + ", sql=" + sql;
throw runtime_error(msg);
}
string queryIdStr;
if (!results.extractFirstValue(queryIdStr, err)) {
string const msg = context + "Failed to extract the last query id from the result set, err=" +
err.printErrMsg() + ", sql=" + sql;
throw runtime_error(msg);
}
return stoull(queryIdStr);
}

} // namespace lsst::qserv::czar
4 changes: 4 additions & 0 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "ccontrol/UserQueryFactory.h"
#include "czar/CzarConfig.h"
#include "czar/SubmitResult.h"
#include "global/intTypes.h"
#include "global/stringTypes.h"
#include "mysql/MySqlConfig.h"
#include "qdisp/SharedResources.h"
Expand Down Expand Up @@ -136,6 +137,9 @@ class Czar {
/// Create and fill async result table
void _makeAsyncResult(std::string const& asyncResultTable, QueryId queryId, std::string const& resultLoc);

/// @return An identifier of the last query that was recorded in the query metadata table
QueryId _lastQueryIdBeforeRestart() const;

static Ptr _czar; ///< Pointer to single instance of the Czar.

// combines client name (ID) and its thread ID into one unique ID
Expand Down
4 changes: 3 additions & 1 deletion src/czar/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ CzarConfig::CzarConfig(util::ConfigStore const& configStore)
_qdispMaxPriority(configStore.getInt("qdisppool.largestPriority", 2)),
_qdispVectRunSizes(configStore.get("qdisppool.vectRunSizes", "50:50:50:50")),
_qdispVectMinRunningSizes(configStore.get("qdisppool.vectMinRunningSizes", "0:1:3:3")),
_qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)) {}
_qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)),
_notifyWorkersOnQueryFinish(configStore.getInt("tuning.notifyWorkersOnQueryFinish", 1)),
_notifyWorkersOnCzarRestart(configStore.getInt("tuning.notifyWorkersOnCzarRestart", 1)) {}

std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig) {
out << "[cssConfigMap=" << util::printable(czarConfig._cssConfigMap)
Expand Down
15 changes: 15 additions & 0 deletions src/czar/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ class CzarConfig {

int getOldestResultKeptDays() const { return _oldestResultKeptDays; }

/// @return 'true' to allow broadcasting query completion/cancellation events
/// to all workers so that they would do proper garbage collection and resource recycling.
bool notifyWorkersOnQueryFinish() const { return _notifyWorkersOnQueryFinish != 0; }

/// @return 'true' to allow broadcasting this event to all workers to let them cancel
/// any older that were submitted before the restart. The first query identifier in the new
/// series will be reported to the workers. The identifier will be used as
/// a high watermark for diffirentiating between the older (to be cancelled)
/// and the newer queries.
bool notifyWorkersOnCzarRestart() const { return _notifyWorkersOnCzarRestart != 0; }

private:
CzarConfig(util::ConfigStore const& ConfigStore);

Expand Down Expand Up @@ -196,6 +207,10 @@ class CzarConfig {

// Parameters for QueryRequest PseudoFifo
int const _qReqPseudoFifoMaxRunning;

// Events sent to workers
int const _notifyWorkersOnQueryFinish; ///< Sent by cccontrol::UserQuerySelect
int const _notifyWorkersOnCzarRestart; ///< Sent by czar::Czar
};

} // namespace lsst::qserv::czar
Expand Down

0 comments on commit e503efe

Please sign in to comment.