Skip to content

Commit

Permalink
Merge branch 'tickets/DM-39704'
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Jun 20, 2023
2 parents 6c9b570 + fbe0972 commit b65efbc
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 97 deletions.
2 changes: 1 addition & 1 deletion src/admin/python/lsst/qserv/admin/replicationInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def __init__(
self.repl_ctrl = urlparse(repl_ctrl_uri)
self.auth_key = auth_key
self.admin_auth_key = admin_auth_key
self.repl_api_version = 19
self.repl_api_version = 20
_log.debug(f"ReplicationInterface %s", self.repl_ctrl)

def version(self) -> str:
Expand Down
15 changes: 5 additions & 10 deletions src/replica/DirectorIndexApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ DirectorIndexApp::DirectorIndexApp(int argc, char* argv[])
" the table will be scanned, and the scan won't include the super-transaction"
" column 'qserv_trans_id'.",
_transactionId)
.flag("local",
"This flag is used to load contributions using 'LOAD DATA LOCAL INFILE' protocol"
" instead of 'LOAD DATA INFILE'. See MySQL documentation for further details"
" on this subject.",
_localFile)
.flag("all-workers",
"The flag for selecting all workers regardless of their status (DISABLED or READ-ONLY).",
_allWorkers)
Expand All @@ -104,11 +99,11 @@ int DirectorIndexApp::runImpl() {
auto const controller = Controller::create(serviceProvider());

string const noParentJobId;
auto const job = DirectorIndexJob::create(
_database, _table, _transactionId != numeric_limits<TransactionId>::max(), _transactionId,
_allWorkers, _localFile, controller, noParentJobId,
nullptr, // no callback
PRIORITY_NORMAL);
auto const job = DirectorIndexJob::create(_database, _table,
_transactionId != numeric_limits<TransactionId>::max(),
_transactionId, _allWorkers, controller, noParentJobId,
nullptr, // no callback
PRIORITY_NORMAL);
job->start();
job->wait();

Expand Down
5 changes: 0 additions & 5 deletions src/replica/DirectorIndexApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ class DirectorIndexApp : public Application {
/// A unique identifier of a super-transaction (not used if its value stays default)
TransactionId _transactionId = std::numeric_limits<TransactionId>::max();

/// This flag is used to load contributions using "LOAD DATA LOCAL INFILE" protocol
/// instead of just "LOAD DATA INFILE". See MySQL documentation for further details
/// on this subject.
bool _localFile = false;

/// A connection URL to the MySQL service of the Qserv master database.
std::string _qservCzarDbUrl;

Expand Down
35 changes: 16 additions & 19 deletions src/replica/DirectorIndexJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,22 @@ string DirectorIndexJob::typeName() { return "DirectorIndexJob"; }

DirectorIndexJob::Ptr DirectorIndexJob::create(string const& databaseName, string const& directorTableName,
bool hasTransactions, TransactionId transactionId,
bool allWorkers, bool localFile,
Controller::Ptr const& controller, string const& parentJobId,
CallbackType const& onFinish, int priority) {
bool allWorkers, Controller::Ptr const& controller,
string const& parentJobId, CallbackType const& onFinish,
int priority) {
return Ptr(new DirectorIndexJob(databaseName, directorTableName, hasTransactions, transactionId,
allWorkers, localFile, controller, parentJobId, onFinish, priority));
allWorkers, controller, parentJobId, onFinish, priority));
}

DirectorIndexJob::DirectorIndexJob(string const& databaseName, string const& directorTableName,
bool hasTransactions, TransactionId transactionId, bool allWorkers,
bool localFile, Controller::Ptr const& controller,
string const& parentJobId, CallbackType const& onFinish, int priority)
Controller::Ptr const& controller, string const& parentJobId,
CallbackType const& onFinish, int priority)
: Job(controller, parentJobId, "INDEX", priority),
_directorTableName(directorTableName),
_hasTransactions(hasTransactions),
_transactionId(transactionId),
_allWorkers(allWorkers),
_localFile(localFile),
_onFinish(onFinish) {
try {
_database = controller->serviceProvider()->config()->databaseInfo(databaseName);
Expand Down Expand Up @@ -127,7 +126,6 @@ list<std::pair<string, string>> DirectorIndexJob::extendedPersistentState() cons
result.emplace_back("has_transactions", bool2str(hasTransactions()));
result.emplace_back("transaction_id", to_string(transactionId()));
result.emplace_back("all_workers", bool2str(allWorkers()));
result.emplace_back("local_file", bool2str(localFile()));
return result;
}

Expand Down Expand Up @@ -388,27 +386,26 @@ void DirectorIndexJob::_loadDataIntoTable() {
if (request == nullptr) break;

// Load request's data into the destination table.
bool const localFile = true;
try {
string const query = g.loadDataInfile(
request->responseData().fileName, directorIndexTableName(database(), directorTable()),
controller()->serviceProvider()->config()->get<string>("worker", "ingest-charset-name"),
localFile());
localFile);
h.conn->executeInOwnTransaction([&](auto conn) {
conn->execute(query);
// Loading operations based on this mechanism won't result in throwing exceptions in
// case of certain types of problems encountered during the loading, such as
// out-of-range data, duplicate keys, etc. These errors are reported as warnings
// which need to be retrieved using a special call to the database API.
if (localFile()) {
auto const warnings = conn->warnings();
if (!warnings.empty()) {
auto const& w = warnings.front();
throw database::mysql::Error(
"query: " + query +
" failed with total number of problems: " + to_string(warnings.size()) +
", first problem (Level,Code,Message) was: " + w.level + "," +
to_string(w.code) + "," + w.message);
}
auto const warnings = conn->warnings();
if (!warnings.empty()) {
auto const& w = warnings.front();
throw database::mysql::Error(
"query: " + query +
" failed with total number of problems: " + to_string(warnings.size()) +
", first problem (Level,Code,Message) was: " + w.level + "," + to_string(w.code) +
"," + w.message);
}
});

Expand Down
25 changes: 10 additions & 15 deletions src/replica/DirectorIndexJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ namespace lsst::qserv::replica {
* the "director" index retrieval requests for the relevant chunks to
* the workers. Results are directly loaded into the "director" index of
* the specified director table.
*
* Contributions are always loaded into the index table using the "LOCAL" attribute
* of the query:
* @code
* LOAD DATA LOCAL INFILE ...
* @endcode
*/
class DirectorIndexJob : public Job {
public:
Expand Down Expand Up @@ -82,21 +88,13 @@ class DirectorIndexJob : public Job {
* @param allWorkers engage all known workers regardless of their status.
* If the flag is set to 'false' then only 'ENABLED' workers which are not
* in the 'READ-ONLY' state will be involved into the operation.
* @param localFile If the flag is set to 'true' then index contribution files
* retrieved from workers would be loaded into the "director" index" table using MySQL
* statement "LOAD DATA LOCAL INFILE". Otherwise, contributions will be loaded
* using "LOAD DATA INFILE", which will require the files be directly visible by
* the MySQL server where the table is residing. Note that the non-local
* option results in the better performance of the operation. On the other hand,
* the local option requires the server be properly configured to allow this
* mechanism.
* @param controller is needed launching requests and accessing the Configuration
* @param parentJobId an identifier of the parent job
* @param onFinish a function to be called upon a completion of the job
* @param priority the priority level of the job
*/
static Ptr create(std::string const& databaseName, std::string const& directorTableName,
bool hasTransactions, TransactionId transactionId, bool allWorkers, bool localFile,
bool hasTransactions, TransactionId transactionId, bool allWorkers,
Controller::Ptr const& controller, std::string const& parentJobId,
CallbackType const& onFinish, int priority);

Expand All @@ -115,7 +113,6 @@ class DirectorIndexJob : public Job {
bool hasTransactions() const { return _hasTransactions; }
TransactionId transactionId() const { return _transactionId; }
bool allWorkers() const { return _allWorkers; }
bool localFile() const { return _localFile; }

/// @see Job::progress
virtual Job::Progress progress() const override;
Expand Down Expand Up @@ -156,7 +153,7 @@ class DirectorIndexJob : public Job {

private:
DirectorIndexJob(std::string const& databaseName, std::string const& directorTableName,
bool hasTransactions, TransactionId transactionId, bool allWorkers, bool localFile,
bool hasTransactions, TransactionId transactionId, bool allWorkers,
Controller::Ptr const& controller, std::string const& parentJobId,
CallbackType const& onFinish, int priority);

Expand Down Expand Up @@ -216,11 +213,9 @@ class DirectorIndexJob : public Job {
bool const _hasTransactions;
TransactionId const _transactionId;
bool const _allWorkers;
bool const _localFile;

CallbackType _onFinish; /// @note is reset when the job finishes

DatabaseInfo _database; /// Initialized by the c-tor
CallbackType _onFinish; ///< Is reset when the job finishes
DatabaseInfo _database; ///< Is initialized by the c-tor

/// A collection of chunks to be processed at specific workers
std::map<std::string, std::queue<unsigned int>> _chunks;
Expand Down
7 changes: 4 additions & 3 deletions src/replica/HttpDirectorIndexModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ json HttpDirectorIndexModule::_buildDirectorIndex() {
string const directorTableName = body().optional<string>("director_table", string());
bool const allowForPublished = body().optional<int>("allow_for_published", 0) != 0;
bool const rebuild = body().optional<int>("rebuild", 0) != 0;
bool const localFile = body().optional<int>("local", 0) != 0;
if (body().has("local")) {
warn("Option 'local' is obsolete as of the version 20 of the API.");
}

debug(__func__, "database=" + databaseName);
debug(__func__, "director_table=" + directorTableName);
debug(__func__, "allow_for_published=" + bool2str(allowForPublished));
debug(__func__, "rebuild=" + bool2str(rebuild));
debug(__func__, "local=" + bool2str(localFile));

auto const database = config->databaseInfo(databaseName);
if (database.isPublished and not allowForPublished) {
Expand Down Expand Up @@ -174,7 +175,7 @@ json HttpDirectorIndexModule::_buildDirectorIndex() {
string const noParentJobId;
auto const job =
DirectorIndexJob::create(database.name, tableName, noTransactions, noTransactionId,
allWorkers, localFile, controller(), noParentJobId,
allWorkers, controller(), noParentJobId,
nullptr, // no callback
config->get<int>("controller", "catalog-management-priority-level"));
job->start();
Expand Down
7 changes: 3 additions & 4 deletions src/replica/HttpIngestModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,15 @@ json HttpIngestModule::_addDatabase() {
auto const numSubStripes = body().required<unsigned int>("num_sub_stripes");
auto const overlap = body().required<double>("overlap");
auto const enableAutoBuildDirectorIndex = body().optional<unsigned int>("auto_build_secondary_index", 1);
auto const enableLocalLoadDirectorIndex = body().optional<unsigned int>("local_load_secondary_index", 0);
if (body().has("local_load_secondary_index")) {
warn("Option 'local_load_secondary_index' is obsolete as of the version 20 of the API.");
}

debug(__func__, "database=" + databaseName);
debug(__func__, "num_stripes=" + to_string(numStripes));
debug(__func__, "num_sub_stripes=" + to_string(numSubStripes));
debug(__func__, "overlap=" + to_string(overlap));
debug(__func__, "auto_build_secondary_index=" + to_string(enableAutoBuildDirectorIndex ? 1 : 0));
debug(__func__, "local_load_secondary_index=" + to_string(enableLocalLoadDirectorIndex ? 1 : 0));

if (overlap < 0) throw HttpError(__func__, "overlap can't have a negative value");

Expand Down Expand Up @@ -263,8 +264,6 @@ json HttpIngestModule::_addDatabase() {
// the index.
databaseServices->saveIngestParam(database.name, "secondary-index", "auto-build",
to_string(enableAutoBuildDirectorIndex ? 1 : 0));
databaseServices->saveIngestParam(database.name, "secondary-index", "local-load",
to_string(enableLocalLoadDirectorIndex ? 1 : 0));

// Tell workers to reload their configurations
error = reconfigureWorkers(database, allWorkers, workerReconfigTimeoutSec());
Expand Down
10 changes: 5 additions & 5 deletions src/replica/HttpIngestTransModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,11 @@ json HttpIngestTransModule::_endTransaction() {
auto const table = database.findTable(tableName);
if (table.isPublished) continue;
bool const hasTransactions = true;
auto const job = DirectorIndexJob::create(
database.name, table.name, hasTransactions, transactionId, allWorkers,
localLoadDirectorIndex(database.name), controller(), noParentJobId,
nullptr, // no callback
config->get<int>("controller", "ingest-priority-level"));
auto const job =
DirectorIndexJob::create(database.name, table.name, hasTransactions,
transactionId, allWorkers, controller(), noParentJobId,
nullptr, // no callback
config->get<int>("controller", "ingest-priority-level"));
json transEventData = {{"job", job->id()}, {"table", table.name}};
transaction = databaseServices->updateTransaction(transactionId, "begin " + transEvent,
transEventData);
Expand Down
2 changes: 1 addition & 1 deletion src/replica/HttpMetaModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using json = nlohmann::json;

namespace lsst::qserv::replica {

unsigned int const HttpMetaModule::version = 19;
unsigned int const HttpMetaModule::version = 20;

void HttpMetaModule::process(ServiceProvider::Ptr const& serviceProvider, string const& context,
qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp,
Expand Down
12 changes: 0 additions & 12 deletions src/replica/HttpModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,6 @@ bool HttpModule::autoBuildDirectorIndex(string const& databaseName) const {
return false;
}

bool HttpModule::localLoadDirectorIndex(string const& database) const {
auto const databaseServices = controller()->serviceProvider()->databaseServices();
try {
DatabaseIngestParam const paramInfo =
databaseServices->ingestParam(database, "secondary-index", "local-load");
return paramInfo.value != "0";
} catch (DatabaseServicesNotFound const& ex) {
info(__func__, "the director index local-load mode was not specified");
}
return false;
}

DatabaseInfo HttpModule::getDatabaseInfo(string const& func, bool throwIfPublished) const {
debug(func);
auto const databaseServices = controller()->serviceProvider()->databaseServices();
Expand Down
13 changes: 0 additions & 13 deletions src/replica/HttpModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,6 @@ class HttpModule : public EventLogger, public HttpModuleBase {
*/
bool autoBuildDirectorIndex(std::string const& database) const;

/**
* Fetch a mode of loading contributions into the "director" index as requested by
* a catalog ingest workflow and recorded at the database creation time. A value of
* the parameter is recorded in a database.
*
* @param database The name of a database for which a value of the parameter
* is requested.
* @return 'true' if the index was requested to be loaded using MySQL protocol
* "LOAD DATA LOCAL INFILE" instead of just "LOAD DATA INFILE". See MySQL
* documentation for further explanation of the protocol.
*/
bool localLoadDirectorIndex(std::string const& database) const;

/**
* Get database info for a database that was specified in a request, either explicitly
* in attribute "database" or implicitly in attribute "transation_id". The method may
Expand Down
24 changes: 17 additions & 7 deletions src/replica/HttpModuleBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ using json = nlohmann::json;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.HttpModuleBase");

string packWarnings(list<string> const& warnings) {
string packed;
for (auto const& msg : warnings) {
if (!packed.empty()) packed += "; ";
packed += msg;
}
return packed;
}
} // namespace

namespace lsst::qserv::replica {

Expand Down Expand Up @@ -79,15 +88,13 @@ void HttpModuleBase::checkApiVersion(string const& func, unsigned int minVersion
try {
if (req()->method == "GET") {
if (!query().has(versionAttrName)) {
_warningOnVersionMissing = "No version number was provided in the request's query.";
warn(_warningOnVersionMissing);
warn("No version number was provided in the request's query.");
return;
}
version = query().requiredUInt(versionAttrName);
} else {
if (!body().has(versionAttrName)) {
_warningOnVersionMissing = "No version number was provided in the request's body.";
warn(_warningOnVersionMissing);
warn("No version number was provided in the request's body.");
return;
}
version = body().required<unsigned int>(versionAttrName);
Expand All @@ -107,7 +114,10 @@ void HttpModuleBase::info(string const& msg) const { LOGS(_log, LOG_LVL_INFO, co

void HttpModuleBase::debug(string const& msg) const { LOGS(_log, LOG_LVL_DEBUG, context() << msg); }

void HttpModuleBase::warn(string const& msg) const { LOGS(_log, LOG_LVL_WARN, context() << msg); }
void HttpModuleBase::warn(string const& msg) const {
LOGS(_log, LOG_LVL_WARN, context() << msg);
_warnings.push_back(msg);
}

void HttpModuleBase::error(string const& msg) const { LOGS(_log, LOG_LVL_ERROR, context() << msg); }

Expand All @@ -117,15 +127,15 @@ void HttpModuleBase::_sendError(string const& func, string const& errorMsg, json
result["success"] = 0;
result["error"] = errorMsg;
result["error_ext"] = errorExt.is_null() ? json::object() : errorExt;
result["warning"] = _warningOnVersionMissing;
result["warning"] = ::packWarnings(_warnings);
resp()->send(result.dump(), "application/json");
}

void HttpModuleBase::_sendData(json& result) {
result["success"] = 1;
result["error"] = "";
result["error_ext"] = json::object();
result["warning"] = _warningOnVersionMissing;
result["warning"] = ::packWarnings(_warnings);
resp()->send(result.dump(), "application/json");
}

Expand Down
Loading

0 comments on commit b65efbc

Please sign in to comment.