From 5afb024dfc58fd0777ef4005b7f67b35e737eda7 Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 9 Apr 2024 16:33:55 -0700 Subject: [PATCH 1/4] Added a basic test, which isn't working. --- src/czar/Czar.cc | 41 ++++++++++++++++++++++++++++++++ src/replica/contr/Controller.cc | 2 +- src/replica/registry/Registry.cc | 2 +- src/xrdsvc/SsiService.cc | 4 ++-- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 0e3bab5d8..cfa7f67d6 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -106,6 +106,9 @@ void registryUpdateLoop(shared_ptr const& czarConfig) { {"management-port", czarConfig->replicationHttpPort()}, {"management-host-name", util::get_current_host_fqdn()}}}}); string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; + LOGS(_log, LOG_LVL_WARN, "&&&czarPost url=" << url); + LOGS(_log, LOG_LVL_WARN, "&&&czarPost request=" << request.dump()); + LOGS(_log, LOG_LVL_WARN, "&&&czarPost headers=" << headers[0]); http::Client client(method, url, request.dump(), headers); while (true) { try { @@ -122,6 +125,41 @@ void registryUpdateLoop(shared_ptr const& czarConfig) { } } + +// &&& doc +void registryWorkerInfoLoop(shared_ptr const& czarConfig) { + // Get worker information from the registry + auto const method = http::Method::GET; + string const url = "http://" + czarConfig->replicationRegistryHost() + ":" + + to_string(czarConfig->replicationRegistryPort()) + + "/services?instance_id=" + czarConfig->replicationInstanceId(); // &&& what is this value supposed to be to get worker info? + vector const headers = {"Content-Type: application/json"}; + json request = nlohmann::json(); + string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; + LOGS(_log, LOG_LVL_WARN, "&&&czarGet url=" << url); + LOGS(_log, LOG_LVL_WARN, "&&&czarGet request=" << request.dump()); + LOGS(_log, LOG_LVL_WARN, "&&&czarGet headers=" << headers[0]); + http::Client client(method, url, request.dump(), headers); + while (true) { + LOGS(_log, LOG_LVL_WARN, "&&&czarGet loop start"); + try { + json const response = client.readAsJson(); + /* &&& + if (0 == response.at("success").get()) { + string const error = response.at("error").get(); + LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); + abort(); + } + */ + LOGS(_log, LOG_LVL_WARN, "&&&czarGet resp=" << response); + } catch (exception const& ex) { + LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); + LOGS(_log, LOG_LVL_WARN, requestContext + " &&& failed, ex: " + ex.what()); + } + this_thread::sleep_for(chrono::seconds(15)); + } +} + } // anonymous namespace namespace lsst::qserv::czar { @@ -232,6 +270,9 @@ Czar::Czar(string const& configFilePath, string const& czarName) // in the detached thread. This will continue before the application gets terminated. thread registryUpdateThread(::registryUpdateLoop, _czarConfig); registryUpdateThread.detach(); + + thread registryWorkerUpdateThread(::registryWorkerInfoLoop, _czarConfig); //&&& + registryWorkerUpdateThread.detach(); //&&& } SubmitResult Czar::submitQuery(string const& query, map const& hints) { diff --git a/src/replica/contr/Controller.cc b/src/replica/contr/Controller.cc index 5ff0a0871..2d746b628 100644 --- a/src/replica/contr/Controller.cc +++ b/src/replica/contr/Controller.cc @@ -90,7 +90,7 @@ void tracker(weak_ptr const& controller, string const& context) { config->get("controller", "auto-register-workers") != 0; vector workers; try { - workers = ptr->serviceProvider()->registry()->workers(); + workers = ptr->serviceProvider()->registry()->workers(); //&&& important? } catch (exception const& ex) { LOGS(_log, LOG_LVL_WARN, context << "failed to pull worker info from the registry, ex: " << ex.what()); diff --git a/src/replica/registry/Registry.cc b/src/replica/registry/Registry.cc index 74264a117..00dd6bc62 100644 --- a/src/replica/registry/Registry.cc +++ b/src/replica/registry/Registry.cc @@ -59,7 +59,7 @@ Registry::Registry(ServiceProvider::Ptr const& serviceProvider) _baseUrl("http://" + serviceProvider->config()->get("registry", "host") + ":" + to_string(serviceProvider->config()->get("registry", "port"))) {} -vector Registry::workers() const { +vector Registry::workers() const { //&&& important? vector coll; string const resource = "/services?instance_id=" + _serviceProvider->instanceId(); json const resultJson = _request(http::Method::GET, resource); diff --git a/src/xrdsvc/SsiService.cc b/src/xrdsvc/SsiService.cc index 5f7b8c636..4855ffad0 100644 --- a/src/xrdsvc/SsiService.cc +++ b/src/xrdsvc/SsiService.cc @@ -111,7 +111,7 @@ std::shared_ptr makeChunkInventory(mysql::MySqlConfig * Transient communication errors when attempting to connect or send requests to * the Registry will be posted into the log stream and ignored. */ -void registryUpdateLoop(string const& id) { +void registryUpdateLoop(string const& id) { // &&& important? auto const workerConfig = wconfig::WorkerConfig::instance(); auto const method = http::Method::POST; string const url = "http://" + workerConfig->replicationRegistryHost() + ":" + @@ -266,7 +266,7 @@ SsiService::SsiService(XrdSsiLogger* log) { _controlHttpSvc = HttpSvc::create(_foreman, workerConfig->replicationHttpPort(), workerConfig->replicationNumHttpThreads()); auto const port = _controlHttpSvc->start(); - workerConfig->setReplicationHttpPort(port); + workerConfig->setReplicationHttpPort(port); // &&& important? // Begin periodically updating worker's status in the Replication System's registry // in the detached thread. This will continue before the application gets terminated. From 8b7d91a1d5a30999be0acc362e9814b3d3b7656f Mon Sep 17 00:00:00 2001 From: John Gates Date: Mon, 15 Apr 2024 12:31:57 -0700 Subject: [PATCH 2/4] Added CzarRegistry. --- src/ccontrol/UserQueryFactory.cc | 4 + src/czar/CMakeLists.txt | 1 + src/czar/Czar.cc | 86 +------------- src/czar/Czar.h | 8 ++ src/czar/CzarChunkMap.cc | 3 + src/czar/CzarChunkMap.h | 13 ++- src/czar/CzarRegistry.cc | 191 +++++++++++++++++++++++++++++++ src/czar/CzarRegistry.h | 127 ++++++++++++++++++++ src/replica/contr/Controller.cc | 2 +- src/replica/registry/Registry.cc | 2 +- src/xrdsvc/SsiService.cc | 4 +- 11 files changed, 352 insertions(+), 89 deletions(-) create mode 100644 src/czar/CzarRegistry.cc create mode 100644 src/czar/CzarRegistry.h diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 68761c196..0c508fafc 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -52,6 +52,7 @@ #include "ccontrol/UserQueryType.h" #include "css/CssAccess.h" #include "css/KvInterfaceImplMem.h" +#include "czar/Czar.h" #include "mysql/MySqlConfig.h" #include "parser/ParseException.h" #include "qdisp/Executive.h" @@ -225,6 +226,9 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st std::string query = aQuery; // TODO: DM-43386 need to have WorkerChunkMap info at this point + auto cz = czar::Czar::getCzar(); + auto czarChunkMap = cz->getCzarChunkMap(); + auto czarRegistry = cz->getCzarRegistry(); std::string stripped; bool async = false; diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 3b9d63e40..c97b83636 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -4,6 +4,7 @@ add_dependencies(czar proto) target_sources(czar PRIVATE Czar.cc CzarChunkMap.cc + CzarRegistry.cc HttpCzarSvc.cc HttpCzarQueryModule.cc HttpModule.cc diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index cfa7f67d6..57845d18a 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -47,6 +47,7 @@ #include "czar/CzarErrors.h" #include "czar/HttpSvc.h" #include "czar/MessageTable.h" +#include "czar/CzarRegistry.h" #include "global/LogContext.h" #include "http/Client.h" #include "http/Method.h" @@ -83,83 +84,6 @@ string const createAsyncResultTmpl( LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.Czar"); -/** - * This function will keep periodically updating Czar's info in the Replication - * System's Registry. - * @param name The unique identifier of the Czar to be registered. - * @param czarConfig A pointer to the Czar configuration service. - * @note The thread will terminate the process if the registraton request to the Registry - * was explicitly denied by the service. This means the application may be misconfigured. - * Transient communication errors when attempting to connect or send requests to - * the Registry will be posted into the log stream and ignored. - */ -void registryUpdateLoop(shared_ptr const& czarConfig) { - auto const method = http::Method::POST; - string const url = "http://" + czarConfig->replicationRegistryHost() + ":" + - to_string(czarConfig->replicationRegistryPort()) + "/czar"; - vector const headers = {"Content-Type: application/json"}; - json const request = json::object({{"instance_id", czarConfig->replicationInstanceId()}, - {"auth_key", czarConfig->replicationAuthKey()}, - {"czar", - {{"name", czarConfig->name()}, - {"id", czarConfig->id()}, - {"management-port", czarConfig->replicationHttpPort()}, - {"management-host-name", util::get_current_host_fqdn()}}}}); - string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; - LOGS(_log, LOG_LVL_WARN, "&&&czarPost url=" << url); - LOGS(_log, LOG_LVL_WARN, "&&&czarPost request=" << request.dump()); - LOGS(_log, LOG_LVL_WARN, "&&&czarPost headers=" << headers[0]); - http::Client client(method, url, request.dump(), headers); - while (true) { - try { - json const response = client.readAsJson(); - if (0 == response.at("success").get()) { - string const error = response.at("error").get(); - LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); - abort(); - } - } catch (exception const& ex) { - LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); - } - this_thread::sleep_for(chrono::seconds(max(1U, czarConfig->replicationRegistryHearbeatIvalSec()))); - } -} - - -// &&& doc -void registryWorkerInfoLoop(shared_ptr const& czarConfig) { - // Get worker information from the registry - auto const method = http::Method::GET; - string const url = "http://" + czarConfig->replicationRegistryHost() + ":" + - to_string(czarConfig->replicationRegistryPort()) - + "/services?instance_id=" + czarConfig->replicationInstanceId(); // &&& what is this value supposed to be to get worker info? - vector const headers = {"Content-Type: application/json"}; - json request = nlohmann::json(); - string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; - LOGS(_log, LOG_LVL_WARN, "&&&czarGet url=" << url); - LOGS(_log, LOG_LVL_WARN, "&&&czarGet request=" << request.dump()); - LOGS(_log, LOG_LVL_WARN, "&&&czarGet headers=" << headers[0]); - http::Client client(method, url, request.dump(), headers); - while (true) { - LOGS(_log, LOG_LVL_WARN, "&&&czarGet loop start"); - try { - json const response = client.readAsJson(); - /* &&& - if (0 == response.at("success").get()) { - string const error = response.at("error").get(); - LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); - abort(); - } - */ - LOGS(_log, LOG_LVL_WARN, "&&&czarGet resp=" << response); - } catch (exception const& ex) { - LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); - LOGS(_log, LOG_LVL_WARN, requestContext + " &&& failed, ex: " + ex.what()); - } - this_thread::sleep_for(chrono::seconds(15)); - } -} - } // anonymous namespace namespace lsst::qserv::czar { @@ -266,13 +190,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) auto const port = _controlHttpSvc->start(); _czarConfig->setReplicationHttpPort(port); - // Begin periodically updating worker's status in the Replication System's registry - // in the detached thread. This will continue before the application gets terminated. - thread registryUpdateThread(::registryUpdateLoop, _czarConfig); - registryUpdateThread.detach(); - - thread registryWorkerUpdateThread(::registryWorkerInfoLoop, _czarConfig); //&&& - registryWorkerUpdateThread.detach(); //&&& + _czarRegistry = CzarRegistry::create(_czarConfig); } SubmitResult Czar::submitQuery(string const& query, map const& hints) { diff --git a/src/czar/Czar.h b/src/czar/Czar.h index 32549fe9d..d33ce5c73 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -62,6 +62,7 @@ class FileMonitor; namespace lsst::qserv::czar { class CzarChunkMap; +class CzarRegistry; /// @addtogroup czar @@ -127,6 +128,10 @@ class Czar { /// @return The reconstructed info for the query SubmitResult getQueryInfo(QueryId queryId) const; + std::shared_ptr getCzarChunkMap() { return _czarChunkMap; } + + std::shared_ptr getCzarRegistry() { return _czarRegistry; } + private: /// Private constructor for singleton. Czar(std::string const& configFilePath, std::string const& czarName); @@ -184,6 +189,9 @@ class Czar { /// Map of which chunks on which workers and shared scan order. std::shared_ptr _czarChunkMap; + + /// Connection to the registry to register the czar and get worker contact information. + std::shared_ptr _czarRegistry; }; } // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index ef7bb85d3..164ad9dd3 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -30,6 +30,8 @@ #include "lsst/log/Log.h" // Qserv headers +#include "czar/Czar.h" +#include "czar/CzarRegistry.h" #include "qmeta/Exceptions.h" #include "util/Bug.h" #include "util/TimeUtils.h" @@ -72,6 +74,7 @@ bool CzarChunkMap::_read() { auto [chunkMapPtr, wcMapPtr] = makeNewMaps(qChunkMap); verify(*chunkMapPtr, *wcMapPtr); + LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h index ba4ca8968..f15eda58f 100644 --- a/src/czar/CzarChunkMap.h +++ b/src/czar/CzarChunkMap.h @@ -47,7 +47,8 @@ class ChunkMapException : public util::Issue { /// This class is used to organize worker chunk table information so that it /// can be used to send jobs to the appropriate worker and inform workers /// what chunks they can expect to handle in shared scans. -/// The data for the maps is provided by the Replicator and stored in QMeta. +/// The data for the maps is provided by the Replicator and stored in the +/// QMeta database. /// When the data is changed, there is a timestamp that is updated, which /// will cause new maps to be made by this class. /// @@ -62,6 +63,16 @@ class ChunkMapException : public util::Issue { /// the worker should handle during a shared scan. /// `getMaps() -> ChunkMap` is expected to be more useful if there is a /// failure and a chunk query needs to go to a different worker. +/// +/// Workers failing or new workers being added is expected to be a rare event. +/// The current algorithm to split chunks between the workers tries to split +/// the work evenly. However, if a new worker is added, it's likely that +/// the new distribution of chunks for shared scans will put the chunks on +/// different workers than previously, which in turn will result in the system +/// being less efficient until all the old scans are complete. If workers +/// being added or removed from the system becomes frequent, the algorithm should +/// probably change to try to maintain some chunk location consistency once +/// the system is up. class CzarChunkMap { public: using Ptr = std::shared_ptr; diff --git a/src/czar/CzarRegistry.cc b/src/czar/CzarRegistry.cc new file mode 100644 index 000000000..76440084e --- /dev/null +++ b/src/czar/CzarRegistry.cc @@ -0,0 +1,191 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/CzarRegistry.h" + +// System headers +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "cconfig/CzarConfig.h" +#include "czar/CzarChunkMap.h" +#include "czar/Czar.h" +#include "http/Client.h" +#include "http/Method.h" +#include "util/common.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; +using namespace nlohmann; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarRegistry"); +} // namespace + +namespace lsst::qserv::czar { + +CzarRegistry::CzarRegistry(std::shared_ptr const& czarConfig) : _czarConfig(czarConfig) { + // Begin periodically updating worker's status in the Replication System's registry + // in the detached thread. This will continue before the application gets terminated. + thread _registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this); + _czarHeartbeatThrd = move(_registryUpdateThread); + + thread _registryWorkerUpdateThread(&CzarRegistry::_registryWorkerInfoLoop, this); + _czarWorkerInfoThrd = move(_registryWorkerUpdateThread); +} + +CzarRegistry::~CzarRegistry() { + _loop = false; + if (_czarHeartbeatThrd.joinable()) { + _czarHeartbeatThrd.join(); + } + if (_czarWorkerInfoThrd.joinable()) { + _czarWorkerInfoThrd.join(); + } +} + +void CzarRegistry::_registryUpdateLoop() { + auto const method = http::Method::POST; + string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" + + to_string(_czarConfig->replicationRegistryPort()) + "/czar"; + vector const headers = {"Content-Type: application/json"}; + json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()}, + {"auth_key", _czarConfig->replicationAuthKey()}, + {"czar", + {{"name", _czarConfig->name()}, + {"id", _czarConfig->id()}, + {"management-port", _czarConfig->replicationHttpPort()}, + {"management-host-name", util::get_current_host_fqdn()}}}}); + string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; + LOGS(_log, LOG_LVL_TRACE, + __func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]); + http::Client client(method, url, request.dump(), headers); + while (_loop) { + LOGS(_log, LOG_LVL_TRACE, + __func__ << " loop url=" << url << " request=" << request.dump() << " headers=" << headers[0]); + try { + json const response = client.readAsJson(); + if (0 == response.at("success").get()) { + string const error = response.at("error").get(); + LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); + // TODO: Is there a better thing to do than just log this here? + } + } catch (exception const& ex) { + LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); + } + this_thread::sleep_for(chrono::seconds(max(1U, _czarConfig->replicationRegistryHearbeatIvalSec()))); + } +} + +void CzarRegistry::_registryWorkerInfoLoop() { + // Get worker information from the registry + vector const headers; + auto const method = http::Method::GET; + string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" + + to_string(_czarConfig->replicationRegistryPort()) + + "/services?instance_id=" + _czarConfig->replicationInstanceId(); + string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; + LOGS(_log, LOG_LVL_TRACE, __func__ << " url=" << url); + http::Client client(method, url, string(), headers); + while (_loop) { + try { + json const response = client.readAsJson(); + if (0 == response.at("success").get()) { + string const error = response.at("error").get(); + LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); + // TODO: Is there a better thing to do than just log this here? + } else { + WorkerContactMapPtr wMap = _buildMapFromJson(response); + // Compare the new map to the existing map and replace if different. + { + lock_guard lck(_mapMtx); + if (wMap != nullptr && !_compareMap(*wMap)) { + _contactMap = wMap; + _latestUpdate = CLOCK::now(); + } + } + } + LOGS(_log, LOG_LVL_TRACE, __func__ << " resp=" << response); + } catch (exception const& ex) { + LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); + } + this_thread::sleep_for(chrono::seconds(15)); + } +} + +CzarRegistry::WorkerContactMapPtr CzarRegistry::_buildMapFromJson(nlohmann::json const& response) { + auto const& jsServices = response.at("services"); + auto const& jsWorkers = jsServices.at("workers"); + auto wMap = WorkerContactMapPtr(new WorkerContactMap()); + for (auto const& [key, value] : jsWorkers.items()) { + auto const& jsQserv = value.at("qserv"); + LOGS(_log, LOG_LVL_DEBUG, __func__ << " key=" << key << " jsQ=" << jsQserv); + string wHost = jsQserv.at("host-addr").get(); + string wManagementHost = jsQserv.at("management-host-name").get(); + int wPort = jsQserv.at("management-port").get(); + uint64_t updateTimeInt = jsQserv.at("update-time-ms").get(); + TIMEPOINT updateTime = TIMEPOINT(chrono::milliseconds(updateTimeInt)); + WorkerContactInfo wInfo(key, wHost, wManagementHost, wPort, updateTime); + LOGS(_log, LOG_LVL_DEBUG, + __func__ << " wHost=" << wHost << " wPort=" << wPort << " updateTime=" << updateTimeInt); + auto iter = wMap->find(key); + if (iter != wMap->end()) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " duplicate key " << key << " in " << response); + if (!wInfo.same(iter->second)) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " incongruent key " << key << " in " << response); + return nullptr; + } + // ignore the duplicate, since it matches the previous one. + } else { + wMap->insert({key, wInfo}); + } + } + return wMap; +} + +bool CzarRegistry::_compareMap(WorkerContactMap const& other) const { + if (_contactMap == nullptr) { + // If _contactMap is null, it needs to be replaced. + return false; + } + if (other.size() != _contactMap->size()) { + return false; + } + for (auto const& [key, wInfo] : *_contactMap) { + auto iter = other.find(key); + if (iter == other.end()) { + return false; + } else { + if (!(iter->second.same(wInfo))) { + return false; + } + } + } + return true; +} + +} // namespace lsst::qserv::czar diff --git a/src/czar/CzarRegistry.h b/src/czar/CzarRegistry.h new file mode 100644 index 000000000..4cf31a266 --- /dev/null +++ b/src/czar/CzarRegistry.h @@ -0,0 +1,127 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_CZAR_CZARREGISTRY_H +#define LSST_QSERV_CZAR_CZARREGISTRY_H + +// System headers +#include +#include +#include +#include +#include +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "global/clock_defs.h" + +namespace lsst::qserv::cconfig { +class CzarConfig; +} // namespace lsst::qserv::cconfig + +namespace lsst::qserv::czar { + +/// This class connects to the Replication System's Registry to register this czar and get +/// worker contact information. +/// The assumptions going forward are that the CzarChunkMap provides the real location of +/// where all chunks are located and any workers in that map that are missing from this +/// map are just temporary communications problems. A real prolonged failure of a worker +/// will result in a new CzarChunkMap being created. As such, problems with missing +/// worker contact information will be handled in Job creation +/// in UserQueryFactory::newUserQuery and will be treated in similar manner as not being +/// able to contact a worker. +/// +/// There really shouldn't be communications problems, but there are, the best course of +/// action would probably be to destroy the first instance of this and create a new one. +/// +class CzarRegistry { +public: + using Ptr = std::shared_ptr; + + /// Return a pointer to a new CzarRegistry object. + static Ptr create(std::shared_ptr const& czarConfig) { + return Ptr(new CzarRegistry(czarConfig)); + } + + ~CzarRegistry(); + + struct WorkerContactInfo { + WorkerContactInfo(std::string const& wId_, std::string const& wHost_, + std::string const& wManagementHost_, int wPort_, TIMEPOINT updateTime_) + : wId(wId_), + wHost(wHost_), + wManagementHost(wManagementHost_), + wPort(wPort_), + updateTime(updateTime_) {} + std::string const wId; ///< key + std::string const wHost; ///< "host-addr" entry. + std::string const wManagementHost; ///< "management-host-name" entry. + int const wPort; ///< "management-port" entry. + TIMEPOINT const updateTime; ///< "update-time-ms" entry. + + /// Return true if all members, aside from updateTime, are equal. + bool same(WorkerContactInfo const& other) const { + return (wId == other.wId && wHost == other.wHost && wManagementHost == other.wManagementHost && + wPort == other.wPort); + } + }; + + using WorkerContactMap = std::unordered_map; + using WorkerContactMapPtr = std::shared_ptr; + +private: + CzarRegistry() = delete; + CzarRegistry(std::shared_ptr const& czarConfig); + + /// This function will keep periodically updating Czar's info in the Replication System's Registry + /// until _loop is set to false. + /// Communications problems are logged but ignored. This should probably change. + void _registryUpdateLoop(); + + /// This function collects worker contact information from the Replication System's Registry + /// until _loop is set to false. + /// Communications problems are logged but ignored. This should probably change. + void _registryWorkerInfoLoop(); + + /// Build a new WorkerContactMap from the json `response` + WorkerContactMapPtr _buildMapFromJson(nlohmann::json const& response); + + /// Return true if maps are the same size and all of the elements are the same(). + bool _compareMap(WorkerContactMap const& other) const; + + std::shared_ptr const _czarConfig; ///< Pointer to the CzarConfig. + + std::atomic _loop{true}; ///< Threads will continue to run until this is set false. + std::thread _czarHeartbeatThrd; ///< This thread continually registers this czar with the registry. + std::thread _czarWorkerInfoThrd; ///< This thread continuously collects worker contact information. + + /// Pointer to the map of worker contact information. + WorkerContactMapPtr _contactMap; + TIMEPOINT _latestUpdate; ///< The last time the _contactMap was updated. + std::mutex _mapMtx; /// Protects _contactMap, _latestUpdate. +}; + +} // namespace lsst::qserv::czar + +#endif // LSST_QSERV_CZAR_CZARREGISTRY_H diff --git a/src/replica/contr/Controller.cc b/src/replica/contr/Controller.cc index 2d746b628..5ff0a0871 100644 --- a/src/replica/contr/Controller.cc +++ b/src/replica/contr/Controller.cc @@ -90,7 +90,7 @@ void tracker(weak_ptr const& controller, string const& context) { config->get("controller", "auto-register-workers") != 0; vector workers; try { - workers = ptr->serviceProvider()->registry()->workers(); //&&& important? + workers = ptr->serviceProvider()->registry()->workers(); } catch (exception const& ex) { LOGS(_log, LOG_LVL_WARN, context << "failed to pull worker info from the registry, ex: " << ex.what()); diff --git a/src/replica/registry/Registry.cc b/src/replica/registry/Registry.cc index 00dd6bc62..74264a117 100644 --- a/src/replica/registry/Registry.cc +++ b/src/replica/registry/Registry.cc @@ -59,7 +59,7 @@ Registry::Registry(ServiceProvider::Ptr const& serviceProvider) _baseUrl("http://" + serviceProvider->config()->get("registry", "host") + ":" + to_string(serviceProvider->config()->get("registry", "port"))) {} -vector Registry::workers() const { //&&& important? +vector Registry::workers() const { vector coll; string const resource = "/services?instance_id=" + _serviceProvider->instanceId(); json const resultJson = _request(http::Method::GET, resource); diff --git a/src/xrdsvc/SsiService.cc b/src/xrdsvc/SsiService.cc index 4855ffad0..5f7b8c636 100644 --- a/src/xrdsvc/SsiService.cc +++ b/src/xrdsvc/SsiService.cc @@ -111,7 +111,7 @@ std::shared_ptr makeChunkInventory(mysql::MySqlConfig * Transient communication errors when attempting to connect or send requests to * the Registry will be posted into the log stream and ignored. */ -void registryUpdateLoop(string const& id) { // &&& important? +void registryUpdateLoop(string const& id) { auto const workerConfig = wconfig::WorkerConfig::instance(); auto const method = http::Method::POST; string const url = "http://" + workerConfig->replicationRegistryHost() + ":" + @@ -266,7 +266,7 @@ SsiService::SsiService(XrdSsiLogger* log) { _controlHttpSvc = HttpSvc::create(_foreman, workerConfig->replicationHttpPort(), workerConfig->replicationNumHttpThreads()); auto const port = _controlHttpSvc->start(); - workerConfig->setReplicationHttpPort(port); // &&& important? + workerConfig->setReplicationHttpPort(port); // Begin periodically updating worker's status in the Replication System's registry // in the detached thread. This will continue before the application gets terminated. From 599000a41191687e12931474d432a8182fa654ae Mon Sep 17 00:00:00 2001 From: John Gates Date: Mon, 15 Apr 2024 14:30:03 -0700 Subject: [PATCH 3/4] Review changes. --- src/ccontrol/UserQueryFactory.cc | 3 --- src/czar/Czar.h | 4 ++-- src/czar/CzarRegistry.cc | 4 ++-- src/czar/CzarRegistry.h | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 0c508fafc..44aeda0c1 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -226,9 +226,6 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st std::string query = aQuery; // TODO: DM-43386 need to have WorkerChunkMap info at this point - auto cz = czar::Czar::getCzar(); - auto czarChunkMap = cz->getCzarChunkMap(); - auto czarRegistry = cz->getCzarRegistry(); std::string stripped; bool async = false; diff --git a/src/czar/Czar.h b/src/czar/Czar.h index d33ce5c73..36878c9aa 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -128,9 +128,9 @@ class Czar { /// @return The reconstructed info for the query SubmitResult getQueryInfo(QueryId queryId) const; - std::shared_ptr getCzarChunkMap() { return _czarChunkMap; } + std::shared_ptr getCzarChunkMap() const { return _czarChunkMap; } - std::shared_ptr getCzarRegistry() { return _czarRegistry; } + std::shared_ptr getCzarRegistry() const { return _czarRegistry; } private: /// Private constructor for singleton. diff --git a/src/czar/CzarRegistry.cc b/src/czar/CzarRegistry.cc index 76440084e..7ed4ccb64 100644 --- a/src/czar/CzarRegistry.cc +++ b/src/czar/CzarRegistry.cc @@ -155,7 +155,7 @@ CzarRegistry::WorkerContactMapPtr CzarRegistry::_buildMapFromJson(nlohmann::json auto iter = wMap->find(key); if (iter != wMap->end()) { LOGS(_log, LOG_LVL_ERROR, __func__ << " duplicate key " << key << " in " << response); - if (!wInfo.same(iter->second)) { + if (!wInfo.sameContactInfo(iter->second)) { LOGS(_log, LOG_LVL_ERROR, __func__ << " incongruent key " << key << " in " << response); return nullptr; } @@ -180,7 +180,7 @@ bool CzarRegistry::_compareMap(WorkerContactMap const& other) const { if (iter == other.end()) { return false; } else { - if (!(iter->second.same(wInfo))) { + if (!(iter->second.sameContactInfo(wInfo))) { return false; } } diff --git a/src/czar/CzarRegistry.h b/src/czar/CzarRegistry.h index 4cf31a266..dd51d4409 100644 --- a/src/czar/CzarRegistry.h +++ b/src/czar/CzarRegistry.h @@ -81,7 +81,7 @@ class CzarRegistry { TIMEPOINT const updateTime; ///< "update-time-ms" entry. /// Return true if all members, aside from updateTime, are equal. - bool same(WorkerContactInfo const& other) const { + bool sameContactInfo(WorkerContactInfo const& other) const { return (wId == other.wId && wHost == other.wHost && wManagementHost == other.wManagementHost && wPort == other.wPort); } From 7a4e5d0aa63c382b368ea9ce94f6c21009ced363 Mon Sep 17 00:00:00 2001 From: John Gates Date: Mon, 15 Apr 2024 14:47:46 -0700 Subject: [PATCH 4/4] Review changes. --- src/czar/CzarRegistry.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/czar/CzarRegistry.cc b/src/czar/CzarRegistry.cc index 7ed4ccb64..074ba9bba 100644 --- a/src/czar/CzarRegistry.cc +++ b/src/czar/CzarRegistry.cc @@ -51,11 +51,11 @@ namespace lsst::qserv::czar { CzarRegistry::CzarRegistry(std::shared_ptr const& czarConfig) : _czarConfig(czarConfig) { // Begin periodically updating worker's status in the Replication System's registry // in the detached thread. This will continue before the application gets terminated. - thread _registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this); - _czarHeartbeatThrd = move(_registryUpdateThread); + thread registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this); + _czarHeartbeatThrd = move(registryUpdateThread); - thread _registryWorkerUpdateThread(&CzarRegistry::_registryWorkerInfoLoop, this); - _czarWorkerInfoThrd = move(_registryWorkerUpdateThread); + thread registryWorkerUpdateThread(&CzarRegistry::_registryWorkerInfoLoop, this); + _czarWorkerInfoThrd = move(registryWorkerUpdateThread); } CzarRegistry::~CzarRegistry() {