diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index f0157c57f..9c856bedc 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -11,7 +11,6 @@ target_sources(czar PRIVATE HttpCzarQueryModule.cc HttpCzarSvc.cc HttpCzarWorkerModule.cc - HttpModule.cc HttpMonitorModule.cc HttpSvc.cc MessageTable.cc diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index ab48c063d..bc73e2eca 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -79,84 +79,6 @@ namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.Czar"); -/** - * This function will keep periodically updating Czar's info in the Replication - * System's Registry. - * @param name The unique identifier of the Czar to be registered. - * @param czarConfig A pointer to the Czar configuration service. - * @note The thread will terminate the process if the registraton request to the Registry - * was explicitly denied by the service. This means the application may be misconfigured. - * Transient communication errors when attempting to connect or send requests to - * the Registry will be posted into the log stream and ignored. - */ -void registryUpdateLoop(shared_ptr const& czarConfig) { - auto const method = http::Method::POST; - string const url = "http://" + czarConfig->replicationRegistryHost() + ":" + - to_string(czarConfig->replicationRegistryPort()) + "/czar"; - vector const headers = {"Content-Type: application/json"}; - json const request = json::object({{"version", http::MetaModule::version}, - {"instance_id", czarConfig->replicationInstanceId()}, - {"auth_key", czarConfig->replicationAuthKey()}, - {"czar", - {{"name", czarConfig->name()}, - {"id", czarConfig->id()}, - {"management-port", czarConfig->replicationHttpPort()}, - {"management-host-name", util::get_current_host_fqdn()}}}}); - string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; - LOGS(_log, LOG_LVL_WARN, "&&&czarPost url=" << url); - LOGS(_log, LOG_LVL_WARN, "&&&czarPost request=" << request.dump()); - LOGS(_log, LOG_LVL_WARN, "&&&czarPost headers=" << headers[0]); - http::Client client(method, url, request.dump(), headers); - while (true) { - try { - json const response = client.readAsJson(); - if (0 == response.at("success").get()) { - string const error = response.at("error").get(); - LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); - abort(); - } - } catch (exception const& ex) { - LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); - } - this_thread::sleep_for(chrono::seconds(max(1U, czarConfig->replicationRegistryHearbeatIvalSec()))); - } -} - - -// &&& doc -void registryWorkerInfoLoop(shared_ptr const& czarConfig) { - // Get worker information from the registry - auto const method = http::Method::GET; - string const url = "http://" + czarConfig->replicationRegistryHost() + ":" + - to_string(czarConfig->replicationRegistryPort()) - + "/services?instance_id=" + czarConfig->replicationInstanceId(); // &&& what is this value supposed to be to get worker info? - vector const headers = {"Content-Type: application/json"}; - json request = nlohmann::json(); - string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; - LOGS(_log, LOG_LVL_WARN, "&&&czarGet url=" << url); - LOGS(_log, LOG_LVL_WARN, "&&&czarGet request=" << request.dump()); - LOGS(_log, LOG_LVL_WARN, "&&&czarGet headers=" << headers[0]); - http::Client client(method, url, request.dump(), headers); - while (true) { - LOGS(_log, LOG_LVL_WARN, "&&&czarGet loop start"); - try { - json const response = client.readAsJson(); - /* &&& - if (0 == response.at("success").get()) { - string const error = response.at("error").get(); - LOGS(_log, LOG_LVL_ERROR, requestContext + " was denied, error: '" + error + "'."); - abort(); - } - */ - LOGS(_log, LOG_LVL_WARN, "&&&czarGet resp=" << response); - } catch (exception const& ex) { - LOGS(_log, LOG_LVL_WARN, requestContext + " failed, ex: " + ex.what()); - LOGS(_log, LOG_LVL_WARN, requestContext + " &&& failed, ex: " + ex.what()); - } - this_thread::sleep_for(chrono::seconds(15)); - } -} - } // anonymous namespace namespace lsst::qserv::czar { diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h index 82ce15c7b..f0b85a1d3 100644 --- a/src/czar/CzarChunkMap.h +++ b/src/czar/CzarChunkMap.h @@ -244,8 +244,7 @@ class CzarFamilyMap { typedef std::map FamilyMapType; typedef std::map DbNameToFamilyNameType; - static Ptr create( - std::shared_ptr const& qmeta); //&&& { return Ptr(new CzarFamilyMap(qmeta)); } + static Ptr create(std::shared_ptr const& qmeta); CzarFamilyMap() = delete; CzarFamilyMap(CzarFamilyMap const&) = delete; diff --git a/src/czar/HttpCzarWorkerModule.cc b/src/czar/HttpCzarWorkerModule.cc index 74a3cea3f..471bacee2 100644 --- a/src/czar/HttpCzarWorkerModule.cc +++ b/src/czar/HttpCzarWorkerModule.cc @@ -57,7 +57,7 @@ void HttpCzarWorkerModule::process(string const& context, shared_ptr const& req, shared_ptr const& resp) - : HttpModule(context, req, resp) {} + : QhttpModule(context, req, resp) {} json HttpCzarWorkerModule::executeImpl(string const& subModuleName) { string const func = string(__func__) + "[sub-module='" + subModuleName + "']"; diff --git a/src/czar/HttpCzarWorkerModule.h b/src/czar/HttpCzarWorkerModule.h index ef0a74509..69f4a3fef 100644 --- a/src/czar/HttpCzarWorkerModule.h +++ b/src/czar/HttpCzarWorkerModule.h @@ -29,7 +29,7 @@ #include "nlohmann/json.hpp" // Qserv headers -#include "czar/HttpModule.h" +#include "czar/QhttpModule.h" // Forward declarations namespace lsst::qserv::qhttp { @@ -41,7 +41,7 @@ class Response; namespace lsst::qserv::czar { /// This class is used to handle messages to this czar from the workers. -class HttpCzarWorkerModule : public czar::HttpModule { +class HttpCzarWorkerModule : public QhttpModule { public: /// @note supported values for parameter 'subModuleName' are: /// 'QUERYJOB-ERROR' - error in a QUERYJOB diff --git a/src/http/Module.h b/src/http/Module.h index 2864496bf..0a8d0ac95 100644 --- a/src/http/Module.h +++ b/src/http/Module.h @@ -208,8 +208,7 @@ class Module { */ virtual void sendResponse(std::string const& content, std::string const& contentType) = 0; - std::string authKey() const { return _authKey; } // &&&uj - + std::string authKey() const { return _authKey; } private: /** diff --git a/src/http/RequestBodyJSON.h b/src/http/RequestBodyJSON.h index fa738e855..a3363e341 100644 --- a/src/http/RequestBodyJSON.h +++ b/src/http/RequestBodyJSON.h @@ -32,6 +32,8 @@ // This header declarations namespace lsst::qserv::http { +// TODO:UJ This should be renamed RequestBodyJson, coding standards. + /** * Class RequestBodyJSON represents the request body parsed into a JSON object. * This type of an object is only available for requests that have the following @@ -42,16 +44,16 @@ class RequestBodyJSON { /// parsed body of the request nlohmann::json objJson = nlohmann::json::object(); - RequestBody() = default; - RequestBody(RequestBody const&) = default; - RequestBody& operator=(RequestBody const&) = default; + RequestBodyJSON() = default; + RequestBodyJSON(RequestBodyJSON const&) = default; + RequestBodyJSON& operator=(RequestBodyJSON const&) = default; - ~RequestBody() = default; + ~RequestBodyJSON() = default; /// Make a new RequestBody based on `js` /// TODO:UJ This would be much more efficient if this class had objJson defined as /// a const reference or pointer to const, but implementation is likely ugly. - RequestBody(nlohmann::json const& js) : objJson(js) {} + RequestBodyJSON(nlohmann::json const& js) : objJson(js) {} /** * Check if the specified parameter is present in the input JSON object. @@ -84,15 +86,11 @@ class RequestBodyJSON { throw std::invalid_argument("RequestBodyJSON::" + std::string(__func__) + "[static] parameter 'obj' is not a valid JSON object"); } -<<<<<<< HEAD:src/http/RequestBodyJSON.h - if (obj.find(name) != obj.end()) return obj[name]; - throw std::invalid_argument("RequestBodyJSON::" + std::string(__func__) + -======= + if (auto const iter = obj.find(name); iter != obj.end()) { return *iter; } throw std::invalid_argument("RequestBody::" + std::string(__func__) + ->>>>>>> 4c670c16d (Czar and workers can send http messages to each other.):src/http/RequestBody.h "[static] required parameter " + name + " is missing in the request body"); } diff --git a/src/qhttp/Server.cc b/src/qhttp/Server.cc index 0cb23ae7e..5116255cb 100644 --- a/src/qhttp/Server.cc +++ b/src/qhttp/Server.cc @@ -277,31 +277,7 @@ std::shared_ptr Server::_findPathHandler(Request::Ptr reque for (auto& pathHandler : pathHandlersIt->second) { if (boost::regex_match(request->path, pathMatch, pathHandler.path.regex)) { pathHandler.path.updateParamsFromMatch(request, pathMatch); -#if 0 // &&& <<<<<<< HEAD return std::make_shared(pathHandler); -#else // &&& ===== - LOGLS_DEBUG(_log, logger(this) << logger(request->_socket) << "invoking handler for " - << pathHandler.path.regex); - try { - pathHandler.handler(request, response); - } catch (boost::system::system_error const& e) { - LOGLS_ERROR(_log, logger(this) << logger(request->_socket) - << "exception thrown from handler: " << e.what()); - switch (e.code().value()) { - case errc::permission_denied: - response->sendStatus(STATUS_FORBIDDEN); - break; - default: - response->sendStatus(STATUS_INTERNAL_SERVER_ERR); - break; - } - } catch (std::exception const& e) { - LOGLS_ERROR(_log, logger(this) << logger(request->_socket) - << "exception thrown from handler: " << e.what()); - response->sendStatus(STATUS_INTERNAL_SERVER_ERR); - } - return; -#endif //&&& >>>>>>> ca9f7b24f (Added some error handling.) } } } diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 4e4153b81..0448a6af7 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -46,7 +46,7 @@ #include "global/constants.h" #include "global/LogContext.h" #include "global/UnsupportedError.h" -#include "http/RequestBody.h" +#include "http/RequestBodyJSON.h" #include "mysql/MySqlConfig.h" #include "proto/worker.pb.h" #include "util/Bug.h" @@ -371,7 +371,7 @@ std::vector Task::createTasksForChunk( vector vect; for (auto const& job : jsJobs) { json const& jsJobDesc = job["jobdesc"]; - http::RequestBody rbJobDesc(jsJobDesc); + http::RequestBodyJSON rbJobDesc(jsJobDesc); // See qproc::TaskMsgFactory::makeMsgJson for message construction. auto const jdCzarId = rbJobDesc.required("czarId"); auto const jdQueryId = rbJobDesc.required("queryId"); @@ -399,11 +399,11 @@ std::vector Task::createTasksForChunk( vector fragSubchunkIds; vector fragSubTables; LOGS(_log, LOG_LVL_DEBUG, funcN << " frag=" << frag); - http::RequestBody rbFrag(frag); + http::RequestBodyJSON rbFrag(frag); auto const& jsQueries = rbFrag.required("queries"); // TODO:UJ move to uberjob???, these should be the same for all jobs for (auto const& subQ : jsQueries) { - http::RequestBody rbSubQ(subQ); + http::RequestBodyJSON rbSubQ(subQ); auto const subQuery = rbSubQ.required("subQuery"); LOGS(_log, LOG_LVL_DEBUG, funcN << " subQuery=" << subQuery); fragSubQueries.push_back(subQuery); @@ -416,7 +416,7 @@ std::vector Task::createTasksForChunk( auto const& jsSubTables = rbFrag.required("subchunkTables"); for (auto const& scDbTable : jsSubTables) { // TODO:UJ are these the same for all jobs? - http::RequestBody rbScDbTable(scDbTable); + http::RequestBodyJSON rbScDbTable(scDbTable); string scDb = rbScDbTable.required("scDb"); string scTable = rbScDbTable.required("scTable"); TaskDbTbl scDbTbl(scDb, scTable); diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index 598727a37..d969b80b7 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -34,7 +34,7 @@ #include "http/Exceptions.h" #include "http/MetaModule.h" #include "http/Method.h" -#include "http/RequestBody.h" +#include "http/RequestBodyJSON.h" #include "http/RequestQuery.h" #include "util/Bug.h" #include "util/MultiError.h" diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index 2399ec538..af6f741da 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -34,7 +34,7 @@ #include "http/Client.h" // TODO:UJ will probably need to be removed #include "http/Exceptions.h" #include "http/MetaModule.h" -#include "http/RequestBody.h" +#include "http/RequestBodyJSON.h" #include "http/RequestQuery.h" #include "mysql/MySqlUtils.h" #include "qmeta/types.h" @@ -108,7 +108,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { auto const& jsReq = body().objJson; string const targetWorkerId = body().required("worker"); - http::RequestBody rbCzar(body().required("czar")); + http::RequestBodyJSON rbCzar(body().required("czar")); auto czarName = rbCzar.required("name"); auto czarId = rbCzar.required("id"); auto czarPort = rbCzar.required("management-port"); @@ -117,7 +117,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { __func__ << " czar n=" << czarName << " id=" << czarId << " p=" << czarPort << " h=" << czarHostName); - http::RequestBody rbUberJob(body().required("uberjob")); + http::RequestBodyJSON rbUberJob(body().required("uberjob")); auto ujQueryId = rbUberJob.required("queryid"); auto ujId = rbUberJob.required("uberjobid"); auto ujCzarId = rbUberJob.required("czarid"); @@ -146,7 +146,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { for (auto const& job : ujJobs) { json const& jsJobDesc = job["jobdesc"]; - http::RequestBody rbJobDesc(jsJobDesc); + http::RequestBodyJSON rbJobDesc(jsJobDesc); // See qproc::TaskMsgFactory::makeMsgJson for message construction. auto const jdCzarId = rbJobDesc.required("czarId"); jdQueryId = rbJobDesc.required("queryId"); @@ -166,7 +166,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { auto const jdChunkScanTables = rbJobDesc.required("chunkScanTables"); if (!scanInfoSet) { for (auto const& tbl : jdChunkScanTables) { - http::RequestBody rbTbl(tbl); + http::RequestBodyJSON rbTbl(tbl); auto const& chunkScanDb = rbTbl.required("db"); auto lockInMemory = rbTbl.required("lockInMemory"); auto const& chunkScanTable = rbTbl.required("table");