Skip to content

Commit

Permalink
Rebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jul 30, 2024
1 parent 904a9e1 commit f4b1fe6
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 131 deletions.
1 change: 0 additions & 1 deletion src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ target_sources(czar PRIVATE
HttpCzarQueryModule.cc
HttpCzarSvc.cc
HttpCzarWorkerModule.cc
HttpModule.cc
HttpMonitorModule.cc
HttpSvc.cc
MessageTable.cc
Expand Down
78 changes: 0 additions & 78 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,84 +79,6 @@ namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.Czar");

/**
* This function will keep periodically updating Czar's info in the Replication
* System's Registry.
* @param name The unique identifier of the Czar to be registered.
* @param czarConfig A pointer to the Czar configuration service.
* @note The thread will terminate the process if the registraton request to the Registry
* was explicitly denied by the service. This means the application may be misconfigured.
* Transient communication errors when attempting to connect or send requests to
* the Registry will be posted into the log stream and ignored.
*/
void registryUpdateLoop(shared_ptr<cconfig::CzarConfig> const& czarConfig) {
auto const method = http::Method::POST;
string const url = "http://" + czarConfig->replicationRegistryHost() + ":" +
to_string(czarConfig->replicationRegistryPort()) + "/czar";
vector<string> const headers = {"Content-Type: application/json"};
json const request = json::object({{"version", http::MetaModule::version},
{"instance_id", czarConfig->replicationInstanceId()},
{"auth_key", czarConfig->replicationAuthKey()},
{"czar",
{{"name", czarConfig->name()},
{"id", czarConfig->id()},
{"management-port", czarConfig->replicationHttpPort()},
{"management-host-name", util::get_current_host_fqdn()}}}});
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_WARN, "&&&czarPost url=" << url);
LOGS(_log, LOG_LVL_WARN, "&&&czarPost request=" << request.dump());
LOGS(_log, LOG_LVL_WARN, "&&&czarPost headers=" << headers[0]);
http::Client client(method, url, request.dump(), headers);
while (true) {
try {
json const response = client.readAsJson();
if (0 == response.at("success").get<int>()) {
string const error = response.at("error").get<string>();
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
abort();
}
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
}
this_thread::sleep_for(chrono::seconds(max(1U, czarConfig->replicationRegistryHearbeatIvalSec())));
}
}


// &&& doc
void registryWorkerInfoLoop(shared_ptr<cconfig::CzarConfig> const& czarConfig) {
// Get worker information from the registry
auto const method = http::Method::GET;
string const url = "http://" + czarConfig->replicationRegistryHost() + ":" +
to_string(czarConfig->replicationRegistryPort())
+ "/services?instance_id=" + czarConfig->replicationInstanceId(); // &&& what is this value supposed to be to get worker info?
vector<string> const headers = {"Content-Type: application/json"};
json request = nlohmann::json();
string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'";
LOGS(_log, LOG_LVL_WARN, "&&&czarGet url=" << url);
LOGS(_log, LOG_LVL_WARN, "&&&czarGet request=" << request.dump());
LOGS(_log, LOG_LVL_WARN, "&&&czarGet headers=" << headers[0]);
http::Client client(method, url, request.dump(), headers);
while (true) {
LOGS(_log, LOG_LVL_WARN, "&&&czarGet loop start");
try {
json const response = client.readAsJson();
/* &&&
if (0 == response.at("success").get<int>()) {
string const error = response.at("error").get<string>();
LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'.");
abort();
}
*/
LOGS(_log, LOG_LVL_WARN, "&&&czarGet resp=" << response);
} catch (exception const& ex) {
LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what());
LOGS(_log, LOG_LVL_WARN, requestContext + " &&& failed, ex: " + ex.what());
}
this_thread::sleep_for(chrono::seconds(15));
}
}

} // anonymous namespace

namespace lsst::qserv::czar {
Expand Down
3 changes: 1 addition & 2 deletions src/czar/CzarChunkMap.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ class CzarFamilyMap {
typedef std::map<std::string, CzarChunkMap::Ptr> FamilyMapType;
typedef std::map<std::string, std::string> DbNameToFamilyNameType;

static Ptr create(
std::shared_ptr<qmeta::QMeta> const& qmeta); //&&& { return Ptr(new CzarFamilyMap(qmeta)); }
static Ptr create(std::shared_ptr<qmeta::QMeta> const& qmeta);

CzarFamilyMap() = delete;
CzarFamilyMap(CzarFamilyMap const&) = delete;
Expand Down
2 changes: 1 addition & 1 deletion src/czar/HttpCzarWorkerModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void HttpCzarWorkerModule::process(string const& context, shared_ptr<qhttp::Requ

HttpCzarWorkerModule::HttpCzarWorkerModule(string const& context, shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp)
: HttpModule(context, req, resp) {}
: QhttpModule(context, req, resp) {}

json HttpCzarWorkerModule::executeImpl(string const& subModuleName) {
string const func = string(__func__) + "[sub-module='" + subModuleName + "']";
Expand Down
4 changes: 2 additions & 2 deletions src/czar/HttpCzarWorkerModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "nlohmann/json.hpp"

// Qserv headers
#include "czar/HttpModule.h"
#include "czar/QhttpModule.h"

// Forward declarations
namespace lsst::qserv::qhttp {
Expand All @@ -41,7 +41,7 @@ class Response;
namespace lsst::qserv::czar {

/// This class is used to handle messages to this czar from the workers.
class HttpCzarWorkerModule : public czar::HttpModule {
class HttpCzarWorkerModule : public QhttpModule {
public:
/// @note supported values for parameter 'subModuleName' are:
/// 'QUERYJOB-ERROR' - error in a QUERYJOB
Expand Down
3 changes: 1 addition & 2 deletions src/http/Module.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ class Module {
*/
virtual void sendResponse(std::string const& content, std::string const& contentType) = 0;

std::string authKey() const { return _authKey; } // &&&uj

std::string authKey() const { return _authKey; }

private:
/**
Expand Down
18 changes: 8 additions & 10 deletions src/http/RequestBodyJSON.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
// This header declarations
namespace lsst::qserv::http {

// TODO:UJ This should be renamed RequestBodyJson, coding standards.

/**
* Class RequestBodyJSON represents the request body parsed into a JSON object.
* This type of an object is only available for requests that have the following
Expand All @@ -42,16 +44,16 @@ class RequestBodyJSON {
/// parsed body of the request
nlohmann::json objJson = nlohmann::json::object();

RequestBody() = default;
RequestBody(RequestBody const&) = default;
RequestBody& operator=(RequestBody const&) = default;
RequestBodyJSON() = default;
RequestBodyJSON(RequestBodyJSON const&) = default;
RequestBodyJSON& operator=(RequestBodyJSON const&) = default;

~RequestBody() = default;
~RequestBodyJSON() = default;

/// Make a new RequestBody based on `js`
/// TODO:UJ This would be much more efficient if this class had objJson defined as
/// a const reference or pointer to const, but implementation is likely ugly.
RequestBody(nlohmann::json const& js) : objJson(js) {}
RequestBodyJSON(nlohmann::json const& js) : objJson(js) {}

/**
* Check if the specified parameter is present in the input JSON object.
Expand Down Expand Up @@ -84,15 +86,11 @@ class RequestBodyJSON {
throw std::invalid_argument("RequestBodyJSON::" + std::string(__func__) +
"<T>[static] parameter 'obj' is not a valid JSON object");
}
<<<<<<< HEAD:src/http/RequestBodyJSON.h
if (obj.find(name) != obj.end()) return obj[name];
throw std::invalid_argument("RequestBodyJSON::" + std::string(__func__) +
=======

if (auto const iter = obj.find(name); iter != obj.end()) {
return *iter;
}
throw std::invalid_argument("RequestBody::" + std::string(__func__) +
>>>>>>> 4c670c16d (Czar and workers can send http messages to each other.):src/http/RequestBody.h
"<T>[static] required parameter " + name +
" is missing in the request body");
}
Expand Down
24 changes: 0 additions & 24 deletions src/qhttp/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,31 +277,7 @@ std::shared_ptr<Server::PathHandler> Server::_findPathHandler(Request::Ptr reque
for (auto& pathHandler : pathHandlersIt->second) {
if (boost::regex_match(request->path, pathMatch, pathHandler.path.regex)) {
pathHandler.path.updateParamsFromMatch(request, pathMatch);
#if 0 // &&& <<<<<<< HEAD
return std::make_shared<PathHandler>(pathHandler);
#else // &&& =====
LOGLS_DEBUG(_log, logger(this) << logger(request->_socket) << "invoking handler for "
<< pathHandler.path.regex);
try {
pathHandler.handler(request, response);
} catch (boost::system::system_error const& e) {
LOGLS_ERROR(_log, logger(this) << logger(request->_socket)
<< "exception thrown from handler: " << e.what());
switch (e.code().value()) {
case errc::permission_denied:
response->sendStatus(STATUS_FORBIDDEN);
break;
default:
response->sendStatus(STATUS_INTERNAL_SERVER_ERR);
break;
}
} catch (std::exception const& e) {
LOGLS_ERROR(_log, logger(this) << logger(request->_socket)
<< "exception thrown from handler: " << e.what());
response->sendStatus(STATUS_INTERNAL_SERVER_ERR);
}
return;
#endif //&&& >>>>>>> ca9f7b24f (Added some error handling.)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/wbase/Task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
#include "global/constants.h"
#include "global/LogContext.h"
#include "global/UnsupportedError.h"
#include "http/RequestBody.h"
#include "http/RequestBodyJSON.h"
#include "mysql/MySqlConfig.h"
#include "proto/worker.pb.h"
#include "util/Bug.h"
Expand Down Expand Up @@ -371,7 +371,7 @@ std::vector<Task::Ptr> Task::createTasksForChunk(
vector<Task::Ptr> vect;
for (auto const& job : jsJobs) {
json const& jsJobDesc = job["jobdesc"];
http::RequestBody rbJobDesc(jsJobDesc);
http::RequestBodyJSON rbJobDesc(jsJobDesc);
// See qproc::TaskMsgFactory::makeMsgJson for message construction.
auto const jdCzarId = rbJobDesc.required<qmeta::CzarId>("czarId");
auto const jdQueryId = rbJobDesc.required<QueryId>("queryId");
Expand Down Expand Up @@ -399,11 +399,11 @@ std::vector<Task::Ptr> Task::createTasksForChunk(
vector<int> fragSubchunkIds;
vector<TaskDbTbl> fragSubTables;
LOGS(_log, LOG_LVL_DEBUG, funcN << " frag=" << frag);
http::RequestBody rbFrag(frag);
http::RequestBodyJSON rbFrag(frag);
auto const& jsQueries = rbFrag.required<json>("queries");
// TODO:UJ move to uberjob???, these should be the same for all jobs
for (auto const& subQ : jsQueries) {
http::RequestBody rbSubQ(subQ);
http::RequestBodyJSON rbSubQ(subQ);
auto const subQuery = rbSubQ.required<string>("subQuery");
LOGS(_log, LOG_LVL_DEBUG, funcN << " subQuery=" << subQuery);
fragSubQueries.push_back(subQuery);
Expand All @@ -416,7 +416,7 @@ std::vector<Task::Ptr> Task::createTasksForChunk(
auto const& jsSubTables = rbFrag.required<json>("subchunkTables");

for (auto const& scDbTable : jsSubTables) { // TODO:UJ are these the same for all jobs?
http::RequestBody rbScDbTable(scDbTable);
http::RequestBodyJSON rbScDbTable(scDbTable);
string scDb = rbScDbTable.required<string>("scDb");
string scTable = rbScDbTable.required<string>("scTable");
TaskDbTbl scDbTbl(scDb, scTable);
Expand Down
2 changes: 1 addition & 1 deletion src/wbase/UberJobData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "http/Exceptions.h"
#include "http/MetaModule.h"
#include "http/Method.h"
#include "http/RequestBody.h"
#include "http/RequestBodyJSON.h"
#include "http/RequestQuery.h"
#include "util/Bug.h"
#include "util/MultiError.h"
Expand Down
10 changes: 5 additions & 5 deletions src/xrdsvc/HttpWorkerCzarModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "http/Client.h" // TODO:UJ will probably need to be removed
#include "http/Exceptions.h"
#include "http/MetaModule.h"
#include "http/RequestBody.h"
#include "http/RequestBodyJSON.h"
#include "http/RequestQuery.h"
#include "mysql/MySqlUtils.h"
#include "qmeta/types.h"
Expand Down Expand Up @@ -108,7 +108,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {
auto const& jsReq = body().objJson;
string const targetWorkerId = body().required<string>("worker");

http::RequestBody rbCzar(body().required<json>("czar"));
http::RequestBodyJSON rbCzar(body().required<json>("czar"));
auto czarName = rbCzar.required<string>("name");
auto czarId = rbCzar.required<qmeta::CzarId>("id");
auto czarPort = rbCzar.required<int>("management-port");
Expand All @@ -117,7 +117,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {
__func__ << " czar n=" << czarName << " id=" << czarId << " p=" << czarPort
<< " h=" << czarHostName);

http::RequestBody rbUberJob(body().required<json>("uberjob"));
http::RequestBodyJSON rbUberJob(body().required<json>("uberjob"));
auto ujQueryId = rbUberJob.required<QueryId>("queryid");
auto ujId = rbUberJob.required<UberJobId>("uberjobid");
auto ujCzarId = rbUberJob.required<int>("czarid");
Expand Down Expand Up @@ -146,7 +146,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {

for (auto const& job : ujJobs) {
json const& jsJobDesc = job["jobdesc"];
http::RequestBody rbJobDesc(jsJobDesc);
http::RequestBodyJSON rbJobDesc(jsJobDesc);
// See qproc::TaskMsgFactory::makeMsgJson for message construction.
auto const jdCzarId = rbJobDesc.required<qmeta::CzarId>("czarId");
jdQueryId = rbJobDesc.required<QueryId>("queryId");
Expand All @@ -166,7 +166,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) {
auto const jdChunkScanTables = rbJobDesc.required<json>("chunkScanTables");
if (!scanInfoSet) {
for (auto const& tbl : jdChunkScanTables) {
http::RequestBody rbTbl(tbl);
http::RequestBodyJSON rbTbl(tbl);
auto const& chunkScanDb = rbTbl.required<string>("db");
auto lockInMemory = rbTbl.required<bool>("lockInMemory");
auto const& chunkScanTable = rbTbl.required<string>("table");
Expand Down

0 comments on commit f4b1fe6

Please sign in to comment.