Skip to content

Commit

Permalink
Merge branch 'tickets/DM-39464'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Jun 20, 2023
2 parents b65efbc + a0b20e5 commit 962db0b
Show file tree
Hide file tree
Showing 59 changed files with 1,175 additions and 396 deletions.
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand Down Expand Up @@ -127,6 +128,7 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
12 changes: 12 additions & 0 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ xrootdSpread = 0
# This is per user query and important milestones ignore this limit.
qMetaSecsBetweenChunkCompletionUpdates = 59


# If not 0 then broadcast query completion/cancellation events to all workers
# so that they would do proper garbage collection and resource recycling.
notifyWorkersOnQueryFinish = 1

# If not 0 then broadcast this event to all workers to let them cancel any older
# that were submitted before the restart. The first query identifier in the new
# series will be reported to the workers. The identifier will be used as
# a high watermark for diffirentiating between the older (to be cancelled)
# and the newer queries.
notifyWorkersOnCzarRestart = 1

#[debug]
#chunkLimit = -1

Expand Down
2 changes: 2 additions & 0 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ target_link_libraries(ccontrol PUBLIC
log
parser
sphgeom
xrdreq
)

FUNCTION(ccontrol_tests)
Expand All @@ -45,6 +46,7 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
25 changes: 22 additions & 3 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include <cassert>
#include <chrono>
#include <memory>
#include <stdexcept>

// Third-party headers
#include <boost/algorithm/string/replace.hpp>
Expand All @@ -78,6 +79,7 @@
#include "ccontrol/MergingHandler.h"
#include "ccontrol/TmpTableName.h"
#include "ccontrol/UserQueryError.h"
#include "czar/CzarConfig.h"
#include "global/constants.h"
#include "global/LogContext.h"
#include "global/MsgReceiver.h"
Expand All @@ -103,6 +105,7 @@
#include "sql/Schema.h"
#include "util/IterableFormatter.h"
#include "util/ThreadPriority.h"
#include "xrdreq/QueryManagementAction.h"

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQuerySelect");
Expand Down Expand Up @@ -352,6 +355,9 @@ QueryState UserQuerySelect::join() {
}
_executive->updateProxyMessages();

// Capture these parameters before discarding the merger which would also reset the config.
bool const notifyWorkersOnQueryFinish = _infileMergerConfig->czarConfig.notifyWorkersOnQueryFinish();
std::string const xrootdFrontendUrl = _infileMergerConfig->czarConfig.getXrootdFrontendUrl();
try {
_discardMerger();
} catch (std::exception const& exc) {
Expand All @@ -367,19 +373,32 @@ QueryState UserQuerySelect::join() {
// finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the
// same.
if (finalRows < 0) finalRows = collectedRows;
// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
proto::QueryManagement::Operation operation = proto::QueryManagement::COMPLETE;
QueryState state = SUCCESS;
if (successful) {
_qMetaUpdateStatus(qmeta::QInfo::COMPLETED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_INFO, "Joined everything (success)");
return SUCCESS;
} else if (_killed) {
// status is already set to ABORTED
LOGS(_log, LOG_LVL_ERROR, "Joined everything (killed)");
return ERROR;
operation = proto::QueryManagement::CANCEL;
state = ERROR;
} else {
_qMetaUpdateStatus(qmeta::QInfo::FAILED, collectedRows, collectedBytes, finalRows);
LOGS(_log, LOG_LVL_ERROR, "Joined everything (failure!)");
return ERROR;
operation = proto::QueryManagement::CANCEL;
state = ERROR;
}
if (notifyWorkersOnQueryFinish) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(xrootdFrontendUrl, operation, _qMetaQueryId);
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}
return state;
}

/// Release resources held by the merger
Expand Down
38 changes: 38 additions & 0 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

// System headers
#include <sys/time.h>
#include <stdexcept>
#include <thread>

// Third-party headers
Expand All @@ -42,6 +43,7 @@
#include "czar/CzarErrors.h"
#include "czar/MessageTable.h"
#include "global/LogContext.h"
#include "proto/worker.pb.h"
#include "qdisp/PseudoFifo.h"
#include "qdisp/QdispPool.h"
#include "qdisp/SharedResources.h"
Expand All @@ -54,6 +56,7 @@
#include "util/FileMonitor.h"
#include "util/IterableFormatter.h"
#include "util/StringHelper.h"
#include "xrdreq/QueryManagementAction.h"
#include "XrdSsi/XrdSsiProvider.hh"

using namespace std;
Expand Down Expand Up @@ -96,6 +99,21 @@ Czar::Czar(string const& configPath, string const& czarName)
const int year = 60 * 60 * 24 * 365;
_idCounter = uint64_t(tv.tv_sec % year) * 1000 + tv.tv_usec / 1000;

// Tell workers to cancel any queries that were submitted before this restart of Czar.
// Figure out which query (if any) was recorded in Czar database before the restart.
// The id will be used as the high-watermark for queries that need to be cancelled.
// All queries that have identifiers that are strictly less than this one will
// be affected by the operation.
if (_czarConfig.notifyWorkersOnCzarRestart()) {
try {
xrdreq::QueryManagementAction::notifyAllWorkers(_czarConfig.getXrootdFrontendUrl(),
proto::QueryManagement::CANCEL_AFTER_RESTART,
_lastQueryIdBeforeRestart());
} catch (std::exception const& ex) {
LOGS(_log, LOG_LVL_WARN, ex.what());
}
}

auto databaseModels =
qproc::DatabaseModels::create(_czarConfig.getCssConfigMap(), _czarConfig.getMySqlResultConfig());

Expand Down Expand Up @@ -461,4 +479,24 @@ void Czar::removeOldResultTables() {
_oldTableRemovalThread = std::move(t);
}

QueryId Czar::_lastQueryIdBeforeRestart() const {
string const context = "Czar::" + string(__func__) + " ";
auto sqlConn = sql::SqlConnectionFactory::make(_czarConfig.getMySqlQmetaConfig());
string const sql = "SELECT MAX(queryId) FROM QInfo";
sql::SqlResults results;
sql::SqlErrorObject err;
if (!sqlConn->runQuery(sql, results, err)) {
string const msg =
context + "Query to find the last query id failed, err=" + err.printErrMsg() + ", sql=" + sql;
throw runtime_error(msg);
}
string queryIdStr;
if (!results.extractFirstValue(queryIdStr, err)) {
string const msg = context + "Failed to extract the last query id from the result set, err=" +
err.printErrMsg() + ", sql=" + sql;
throw runtime_error(msg);
}
return stoull(queryIdStr);
}

} // namespace lsst::qserv::czar
4 changes: 4 additions & 0 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "ccontrol/UserQueryFactory.h"
#include "czar/CzarConfig.h"
#include "czar/SubmitResult.h"
#include "global/intTypes.h"
#include "global/stringTypes.h"
#include "mysql/MySqlConfig.h"
#include "qdisp/SharedResources.h"
Expand Down Expand Up @@ -136,6 +137,9 @@ class Czar {
/// Create and fill async result table
void _makeAsyncResult(std::string const& asyncResultTable, QueryId queryId, std::string const& resultLoc);

/// @return An identifier of the last query that was recorded in the query metadata table
QueryId _lastQueryIdBeforeRestart() const;

static Ptr _czar; ///< Pointer to single instance of the Czar.

// combines client name (ID) and its thread ID into one unique ID
Expand Down
4 changes: 3 additions & 1 deletion src/czar/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ CzarConfig::CzarConfig(util::ConfigStore const& configStore)
_qdispMaxPriority(configStore.getInt("qdisppool.largestPriority", 2)),
_qdispVectRunSizes(configStore.get("qdisppool.vectRunSizes", "50:50:50:50")),
_qdispVectMinRunningSizes(configStore.get("qdisppool.vectMinRunningSizes", "0:1:3:3")),
_qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)) {}
_qReqPseudoFifoMaxRunning(configStore.getInt("qdisppool.qReqPseudoFifoMaxRunning", 300)),
_notifyWorkersOnQueryFinish(configStore.getInt("tuning.notifyWorkersOnQueryFinish", 1)),
_notifyWorkersOnCzarRestart(configStore.getInt("tuning.notifyWorkersOnCzarRestart", 1)) {}

std::ostream& operator<<(std::ostream& out, CzarConfig const& czarConfig) {
out << "[cssConfigMap=" << util::printable(czarConfig._cssConfigMap)
Expand Down
15 changes: 15 additions & 0 deletions src/czar/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ class CzarConfig {

int getOldestResultKeptDays() const { return _oldestResultKeptDays; }

/// @return 'true' to allow broadcasting query completion/cancellation events
/// to all workers so that they would do proper garbage collection and resource recycling.
bool notifyWorkersOnQueryFinish() const { return _notifyWorkersOnQueryFinish != 0; }

/// @return 'true' to allow broadcasting this event to all workers to let them cancel
/// any older that were submitted before the restart. The first query identifier in the new
/// series will be reported to the workers. The identifier will be used as
/// a high watermark for diffirentiating between the older (to be cancelled)
/// and the newer queries.
bool notifyWorkersOnCzarRestart() const { return _notifyWorkersOnCzarRestart != 0; }

private:
CzarConfig(util::ConfigStore const& ConfigStore);

Expand Down Expand Up @@ -196,6 +207,10 @@ class CzarConfig {

// Parameters for QueryRequest PseudoFifo
int const _qReqPseudoFifoMaxRunning;

// Events sent to workers
int const _notifyWorkersOnQueryFinish; ///< Sent by cccontrol::UserQuerySelect
int const _notifyWorkersOnCzarRestart; ///< Sent by czar::Czar
};

} // namespace lsst::qserv::czar
Expand Down
65 changes: 14 additions & 51 deletions src/global/ResourceUnit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ std::string ResourceUnit::path() const {
switch (_unitType) {
case GARBAGE:
return "/GARBAGE";
case DBCHUNK: // For now, DBCHUNK is handled the same as CQUERY
case CQUERY:
case DBCHUNK:
ss << _pathSep << _db;
if (_chunk != -1) {
ss << _pathSep << _chunk;
Expand All @@ -84,9 +83,8 @@ std::string ResourceUnit::path() const {
case UNKNOWN:
ss << _pathSep << "UNKNOWN_RESOURCE_UNIT";
break;
case RESULT:
case WORKER:
ss << _hashName;
ss << _workerId;
break;
default:
::abort();
Expand All @@ -107,25 +105,24 @@ std::string ResourceUnit::prefix(UnitType const& r) {
switch (r) {
case DBCHUNK:
return "chk";
case CQUERY:
return "q";
case UNKNOWN:
return "UNKNOWN";
case RESULT:
return "result";
case WORKER:
return "worker";
case QUERY:
return "query";
case GARBAGE:
default:
return "GARBAGE";
}
}

std::string ResourceUnit::makePath(int chunk, std::string const& db) {
return "/" + prefix(UnitType::DBCHUNK) + "/" + db + "/" + std::to_string(chunk);
return _pathSep + prefix(UnitType::DBCHUNK) + _pathSep + db + _pathSep + std::to_string(chunk);
}

std::string ResourceUnit::makeWorkerPath(std::string const& id) {
return "/" + prefix(UnitType::WORKER) + "/" + id;
return _pathSep + prefix(UnitType::WORKER) + _pathSep + id;
}

void ResourceUnit::setAsDbChunk(std::string const& db, int chunk) {
Expand All @@ -134,12 +131,6 @@ void ResourceUnit::setAsDbChunk(std::string const& db, int chunk) {
_chunk = chunk;
}

void ResourceUnit::setAsCquery(std::string const& db, int chunk) {
_unitType = CQUERY;
_db = db;
_chunk = chunk;
}

bool ResourceUnit::_markGarbageIfDone(Tokenizer& t) {
if (t.done()) {
_unitType = GARBAGE;
Expand Down Expand Up @@ -182,48 +173,20 @@ void ResourceUnit::_setFromPath(std::string const& path) {
}
_chunk = t.tokenAsInt();
_ingestLeafAndKeys(t.token());
} else if (rTypeString == prefix(CQUERY)) {
// Import as chunk query
_unitType = CQUERY;
if (_markGarbageIfDone(t)) {
return;
}
t.next();
_db = t.token();
if (_db.empty()) {
_unitType = GARBAGE;
return;
}
if (_markGarbageIfDone(t)) {
return;
}
t.next();
if (t.token().empty()) {
_unitType = GARBAGE;
return;
}
_chunk = t.tokenAsInt();
_ingestLeafAndKeys(t.token());

} else if (rTypeString == prefix(RESULT)) {
_unitType = RESULT;
} else if (rTypeString == prefix(WORKER)) {
_unitType = WORKER;
if (_markGarbageIfDone(t)) {
return;
}
t.next();
_hashName = t.token();
if (_hashName.empty()) {
_workerId = t.token();
if (_workerId.empty()) {
_unitType = GARBAGE;
return;
}
} else if (rTypeString == prefix(WORKER)) {
_unitType = WORKER;
if (_markGarbageIfDone(t)) {
return;
}
t.next();
_hashName = t.token();
if (_hashName.empty()) {
} else if (rTypeString == prefix(QUERY)) {
_unitType = QUERY;
if (!t.done()) {
_unitType = GARBAGE;
return;
}
Expand Down
Loading

0 comments on commit 962db0b

Please sign in to comment.