Skip to content

Commit

Permalink
Merge pull request #852 from lsst/tickets/DM-43385
Browse files Browse the repository at this point in the history
Tickets/dm 43385
  • Loading branch information
jgates108 authored Apr 15, 2024
2 parents 4e03cf1 + 7a4e5d0 commit 4626bde
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "ccontrol/UserQueryType.h"
#include "css/CssAccess.h"
#include "css/KvInterfaceImplMem.h"
#include "czar/Czar.h"
#include "mysql/MySqlConfig.h"
#include "parser/ParseException.h"
#include "qdisp/Executive.h"
Expand Down
1 change: 1 addition & 0 deletions src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_dependencies(czar proto)
target_sources(czar PRIVATE
Czar.cc
CzarChunkMap.cc
CzarRegistry.cc
HttpCzarSvc.cc
HttpCzarQueryModule.cc
HttpModule.cc
Expand Down
45 changes: 2 additions & 43 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "czar/CzarErrors.h"
#include "czar/HttpSvc.h"
#include "czar/MessageTable.h"
#include "czar/CzarRegistry.h"
#include "global/LogContext.h"
#include "http/Client.h"
#include "http/Method.h"
Expand Down Expand Up @@ -83,45 +84,6 @@ string const createAsyncResultTmpl(

LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.Czar");

/**
* This function will keep periodically updating Czar's info in the Replication
* System's Registry.
* @param name The unique identifier of the Czar to be registered.
* @param czarConfig A pointer to the Czar configuration service.
* @note The thread will terminate the process if the registraton request to the Registry
* was explicitly denied by the service. This means the application may be misconfigured.
* Transient communication errors when attempting to connect or send requests to
* the Registry will be posted into the log stream and ignored.
*/
void registryUpdateLoop(shared_ptr<cconfig::CzarConfig> const& czarConfig) {
auto const method = http::Method::POST;
string const url = "http://" + czarConfig->replicationRegistryHost() + ":" +
to_string(czarConfig->replicationRegistryPort()) + "/czar";
vector<string> const headers = {"Content-Type: application/json"};
json const request = json::object({{"instance_id", czarConfig->replicationInstanceId()},
{"auth_key", czarConfig->replicationAuthKey()},
{"czar",
{{"name", czarConfig->name()},
{"id", czarConfig->id()},
{"management-port", czarConfig->replicationHttpPort()},
{"management-host-name", util::get_current_host_fqdn()}}}});
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
http::Client client(method, url, request.dump(), headers);
while (true) {
try {
json const response = client.readAsJson();
if (0 == response.at("success").get<int>()) {
string const error = response.at("error").get<string>();
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
abort();
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
}
this_thread::sleep_for(chrono::seconds(max(1U, czarConfig->replicationRegistryHearbeatIvalSec())));
}
}

} // anonymous namespace

namespace lsst::qserv::czar {
Expand Down Expand Up @@ -228,10 +190,7 @@ Czar::Czar(string const& configFilePath, string const& czarName)
auto const port = _controlHttpSvc->start();
_czarConfig->setReplicationHttpPort(port);

// Begin periodically updating worker's status in the Replication System's registry
// in the detached thread. This will continue before the application gets terminated.
thread registryUpdateThread(::registryUpdateLoop, _czarConfig);
registryUpdateThread.detach();
_czarRegistry = CzarRegistry::create(_czarConfig);
}

SubmitResult Czar::submitQuery(string const& query, map<string, string> const& hints) {
Expand Down
8 changes: 8 additions & 0 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class FileMonitor;
namespace lsst::qserv::czar {

class CzarChunkMap;
class CzarRegistry;

/// @addtogroup czar

Expand Down Expand Up @@ -127,6 +128,10 @@ class Czar {
/// @return The reconstructed info for the query
SubmitResult getQueryInfo(QueryId queryId) const;

std::shared_ptr<CzarChunkMap> getCzarChunkMap() const { return _czarChunkMap; }

std::shared_ptr<CzarRegistry> getCzarRegistry() const { return _czarRegistry; }

private:
/// Private constructor for singleton.
Czar(std::string const& configFilePath, std::string const& czarName);
Expand Down Expand Up @@ -184,6 +189,9 @@ class Czar {

/// Map of which chunks on which workers and shared scan order.
std::shared_ptr<CzarChunkMap> _czarChunkMap;

/// Connection to the registry to register the czar and get worker contact information.
std::shared_ptr<CzarRegistry> _czarRegistry;
};

} // namespace lsst::qserv::czar
Expand Down
3 changes: 3 additions & 0 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "lsst/log/Log.h"

// Qserv headers
#include "czar/Czar.h"
#include "czar/CzarRegistry.h"
#include "qmeta/Exceptions.h"
#include "util/Bug.h"
#include "util/TimeUtils.h"
Expand Down Expand Up @@ -72,6 +74,7 @@ bool CzarChunkMap::_read() {
auto [chunkMapPtr, wcMapPtr] = makeNewMaps(qChunkMap);

verify(*chunkMapPtr, *wcMapPtr);

LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));

Expand Down
13 changes: 12 additions & 1 deletion src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class ChunkMapException : public util::Issue {
/// This class is used to organize worker chunk table information so that it
/// can be used to send jobs to the appropriate worker and inform workers
/// what chunks they can expect to handle in shared scans.
/// The data for the maps is provided by the Replicator and stored in QMeta.
/// The data for the maps is provided by the Replicator and stored in the
/// QMeta database.
/// When the data is changed, there is a timestamp that is updated, which
/// will cause new maps to be made by this class.
///
Expand All @@ -62,6 +63,16 @@ class ChunkMapException : public util::Issue {
/// the worker should handle during a shared scan.
/// `getMaps() -> ChunkMap` is expected to be more useful if there is a
/// failure and a chunk query needs to go to a different worker.
///
/// Workers failing or new workers being added is expected to be a rare event.
/// The current algorithm to split chunks between the workers tries to split
/// the work evenly. However, if a new worker is added, it's likely that
/// the new distribution of chunks for shared scans will put the chunks on
/// different workers than previously, which in turn will result in the system
/// being less efficient until all the old scans are complete. If workers
/// being added or removed from the system becomes frequent, the algorithm should
/// probably change to try to maintain some chunk location consistency once
/// the system is up.
class CzarChunkMap {
public:
using Ptr = std::shared_ptr<CzarChunkMap>;
Expand Down
191 changes: 191 additions & 0 deletions src/czar/CzarRegistry.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "czar/CzarRegistry.h"

// System headers
#include <stdexcept>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "czar/CzarChunkMap.h"
#include "czar/Czar.h"
#include "http/Client.h"
#include "http/Method.h"
#include "util/common.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;
using namespace nlohmann;

namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarRegistry");
} // namespace

namespace lsst::qserv::czar {

CzarRegistry::CzarRegistry(std::shared_ptr<cconfig::CzarConfig> const& czarConfig) : _czarConfig(czarConfig) {
// Begin periodically updating worker's status in the Replication System's registry
// in the detached thread. This will continue before the application gets terminated.
thread registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this);
_czarHeartbeatThrd = move(registryUpdateThread);

thread registryWorkerUpdateThread(&CzarRegistry::_registryWorkerInfoLoop, this);
_czarWorkerInfoThrd = move(registryWorkerUpdateThread);
}

CzarRegistry::~CzarRegistry() {
_loop = false;
if (_czarHeartbeatThrd.joinable()) {
_czarHeartbeatThrd.join();
}
if (_czarWorkerInfoThrd.joinable()) {
_czarWorkerInfoThrd.join();
}
}

void CzarRegistry::_registryUpdateLoop() {
auto const method = http::Method::POST;
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
to_string(_czarConfig->replicationRegistryPort()) + "/czar";
vector<string> const headers = {"Content-Type: application/json"};
json const request = json::object({{"instance_id", _czarConfig->replicationInstanceId()},
{"auth_key", _czarConfig->replicationAuthKey()},
{"czar",
{{"name", _czarConfig->name()},
{"id", _czarConfig->id()},
{"management-port", _czarConfig->replicationHttpPort()},
{"management-host-name", util::get_current_host_fqdn()}}}});
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_TRACE,
__func__ << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]);
http::Client client(method, url, request.dump(), headers);
while (_loop) {
LOGS(_log, LOG_LVL_TRACE,
__func__ << " loop url=" << url << " request=" << request.dump() << " headers=" << headers[0]);
try {
json const response = client.readAsJson();
if (0 == response.at("success").get<int>()) {
string const error = response.at("error").get<string>();
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
// TODO: Is there a better thing to do than just log this here?
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
}
this_thread::sleep_for(chrono::seconds(max(1U, _czarConfig->replicationRegistryHearbeatIvalSec())));
}
}

void CzarRegistry::_registryWorkerInfoLoop() {
// Get worker information from the registry
vector<string> const headers;
auto const method = http::Method::GET;
string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" +
to_string(_czarConfig->replicationRegistryPort()) +
"/services?instance_id=" + _czarConfig->replicationInstanceId();
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_TRACE, __func__ << " url=" << url);
http::Client client(method, url, string(), headers);
while (_loop) {
try {
json const response = client.readAsJson();
if (0 == response.at("success").get<int>()) {
string const error = response.at("error").get<string>();
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
// TODO: Is there a better thing to do than just log this here?
} else {
WorkerContactMapPtr wMap = _buildMapFromJson(response);
// Compare the new map to the existing map and replace if different.
{
lock_guard<mutex> lck(_mapMtx);
if (wMap != nullptr && !_compareMap(*wMap)) {
_contactMap = wMap;
_latestUpdate = CLOCK::now();
}
}
}
LOGS(_log, LOG_LVL_TRACE, __func__ << " resp=" << response);
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
}
this_thread::sleep_for(chrono::seconds(15));
}
}

CzarRegistry::WorkerContactMapPtr CzarRegistry::_buildMapFromJson(nlohmann::json const& response) {
auto const& jsServices = response.at("services");
auto const& jsWorkers = jsServices.at("workers");
auto wMap = WorkerContactMapPtr(new WorkerContactMap());
for (auto const& [key, value] : jsWorkers.items()) {
auto const& jsQserv = value.at("qserv");
LOGS(_log, LOG_LVL_DEBUG, __func__ << " key=" << key << " jsQ=" << jsQserv);
string wHost = jsQserv.at("host-addr").get<string>();
string wManagementHost = jsQserv.at("management-host-name").get<string>();
int wPort = jsQserv.at("management-port").get<int>();
uint64_t updateTimeInt = jsQserv.at("update-time-ms").get<uint64_t>();
TIMEPOINT updateTime = TIMEPOINT(chrono::milliseconds(updateTimeInt));
WorkerContactInfo wInfo(key, wHost, wManagementHost, wPort, updateTime);
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " wHost=" << wHost << " wPort=" << wPort << " updateTime=" << updateTimeInt);
auto iter = wMap->find(key);
if (iter != wMap->end()) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " duplicate key " << key << " in " << response);
if (!wInfo.sameContactInfo(iter->second)) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " incongruent key " << key << " in " << response);
return nullptr;
}
// ignore the duplicate, since it matches the previous one.
} else {
wMap->insert({key, wInfo});
}
}
return wMap;
}

bool CzarRegistry::_compareMap(WorkerContactMap const& other) const {
if (_contactMap == nullptr) {
// If _contactMap is null, it needs to be replaced.
return false;
}
if (other.size() != _contactMap->size()) {
return false;
}
for (auto const& [key, wInfo] : *_contactMap) {
auto iter = other.find(key);
if (iter == other.end()) {
return false;
} else {
if (!(iter->second.sameContactInfo(wInfo))) {
return false;
}
}
}
return true;
}

} // namespace lsst::qserv::czar
Loading

0 comments on commit 4626bde

Please sign in to comment.