diff --git a/src/replica/ingest/IngestDataHttpSvcMod.cc b/src/replica/ingest/IngestDataHttpSvcMod.cc index 7007dbef3..32807b96b 100644 --- a/src/replica/ingest/IngestDataHttpSvcMod.cc +++ b/src/replica/ingest/IngestDataHttpSvcMod.cc @@ -26,10 +26,9 @@ #include "http/BinaryEncoding.h" #include "http/Exceptions.h" #include "http/Method.h" -#include "qhttp/Request.h" -#include "qhttp/Response.h" #include "replica/config/Configuration.h" #include "replica/services/DatabaseServices.h" +#include "replica/services/ServiceProvider.h" #include "replica/util/Csv.h" #include "util/String.h" @@ -38,18 +37,14 @@ #include #include +// Third party headers +#include "httplib.h" + using namespace std; using json = nlohmann::json; -namespace qhttp = lsst::qserv::qhttp; namespace util = lsst::qserv::util; namespace { -/// @return requestor's IP address -string senderIpAddr(shared_ptr const& req) { - ostringstream ss; - ss << req->remoteAddr.address(); - return ss.str(); -} /// These keywords are found in all known binary columns types of MySQL. vector const binColTypePatterns = {"BIT", "BINARY", "BLOB"}; @@ -70,18 +65,18 @@ bool isBinaryColumnType(string const& type) { namespace lsst::qserv::replica { -void IngestDataHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, string const& workerName, - shared_ptr const& req, - shared_ptr const& resp, string const& subModuleName, +void IngestDataHttpSvcMod::process(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp, string const& subModuleName, http::AuthType const authType) { IngestDataHttpSvcMod module(serviceProvider, workerName, req, resp); module.execute(subModuleName, authType); } -IngestDataHttpSvcMod::IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, - string const& workerName, shared_ptr const& req, - shared_ptr const& resp) - : http::QhttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp), +IngestDataHttpSvcMod::IngestDataHttpSvcMod(shared_ptr const& serviceProvider, + string const& workerName, httplib::Request const& req, + httplib::Response& resp) + : http::ChttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp), IngestFileSvc(serviceProvider, workerName) {} string IngestDataHttpSvcMod::context() const { return "INGEST-DATA-HTTP-SVC "; } @@ -112,7 +107,7 @@ json IngestDataHttpSvcMod::_syncProcessData() { _contrib.worker = workerName(); // To indicate the JSON-formatted data were streamed directly into the service - _contrib.url = "data-json://" + ::senderIpAddr(req()) + "/"; + _contrib.url = "data-json://" + req().remote_addr + "/"; _contrib.charsetName = body().optional("charset_name", config->get("worker", "ingest-charset-name")); diff --git a/src/replica/ingest/IngestDataHttpSvcMod.h b/src/replica/ingest/IngestDataHttpSvcMod.h index b92d3ae93..e4fb19758 100644 --- a/src/replica/ingest/IngestDataHttpSvcMod.h +++ b/src/replica/ingest/IngestDataHttpSvcMod.h @@ -22,23 +22,21 @@ #define LSST_QSERV_INGESTDATAHTTPSVCMOD_H // System headers +#include #include // Third party headers #include "nlohmann/json.hpp" // Qserv headers -#include "http/QhttpModule.h" +#include "http/ChttpModule.h" #include "replica/ingest/IngestFileSvc.h" #include "replica/ingest/TransactionContrib.h" -#include "replica/services/ServiceProvider.h" // Forward declarations - -namespace lsst::qserv::qhttp { -class Request; -class Response; -} // namespace lsst::qserv::qhttp +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica // This header declarations namespace lsst::qserv::replica { @@ -49,7 +47,7 @@ namespace lsst::qserv::replica { * Unlike class IngestHttpSvcMod, the current class is meant to be used for ingesting * payloads that are pushed directly into the service over the HTTP protocol. */ -class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc { +class IngestDataHttpSvcMod : public http::ChttpModule, public IngestFileSvc { public: IngestDataHttpSvcMod() = delete; IngestDataHttpSvcMod(IngestDataHttpSvcMod const&) = delete; @@ -74,9 +72,9 @@ class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc { * @param authType The authorization requirements for the module * @throws std::invalid_argument for unknown values of parameter 'subModuleName' */ - static void process(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName, - std::shared_ptr const& req, - std::shared_ptr const& resp, std::string const& subModuleName, + static void process(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + std::string const& subModuleName, http::AuthType const authType = http::AuthType::REQUIRED); protected: @@ -88,9 +86,8 @@ class IngestDataHttpSvcMod : public http::QhttpModule, public IngestFileSvc { private: /// @see method IngestDataHttpSvcMod::create() - IngestDataHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName, - std::shared_ptr const& req, - std::shared_ptr const& resp); + IngestDataHttpSvcMod(std::shared_ptr const& serviceProvider, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp); /// Process a table contribution request (SYNC). nlohmann::json _syncProcessData(); diff --git a/src/replica/ingest/IngestHttpSvc.cc b/src/replica/ingest/IngestHttpSvc.cc index aa47c6999..24d816fd7 100644 --- a/src/replica/ingest/IngestHttpSvc.cc +++ b/src/replica/ingest/IngestHttpSvc.cc @@ -24,21 +24,23 @@ // System headers #include +#include // Qserv headers -#include "http/MetaModule.h" -#include "qhttp/Request.h" -#include "qhttp/Response.h" +#include "http/ChttpMetaModule.h" #include "replica/config/Configuration.h" #include "replica/ingest/IngestDataHttpSvcMod.h" #include "replica/ingest/IngestHttpSvcMod.h" +#include "replica/ingest/IngestRequest.h" #include "replica/ingest/IngestRequestMgr.h" +#include "replica/services/ServiceProvider.h" +#include "replica/util/Common.h" // LSST headers #include "lsst/log/Log.h" // Third party headers -#include "boost/filesystem.hpp" +#include "httplib.h" #include "nlohmann/json.hpp" using namespace nlohmann; @@ -51,76 +53,67 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.IngestHttpSvc"); namespace lsst::qserv::replica { -IngestHttpSvc::Ptr IngestHttpSvc::create(ServiceProvider::Ptr const& serviceProvider, - string const& workerName) { - return IngestHttpSvc::Ptr(new IngestHttpSvc(serviceProvider, workerName)); +shared_ptr IngestHttpSvc::create(shared_ptr const& serviceProvider, + string const& workerName) { + return shared_ptr(new IngestHttpSvc(serviceProvider, workerName)); } -IngestHttpSvc::IngestHttpSvc(ServiceProvider::Ptr const& serviceProvider, string const& workerName) - : HttpSvc(serviceProvider, serviceProvider->config()->get("worker", "http-loader-port"), - serviceProvider->config()->get("worker", "http-max-listen-conn"), - serviceProvider->config()->get("worker", "num-http-loader-processing-threads")), +IngestHttpSvc::IngestHttpSvc(shared_ptr const& serviceProvider, string const& workerName) + : ChttpSvc(context_, serviceProvider, + serviceProvider->config()->get("worker", "http-loader-port"), + serviceProvider->config()->get("worker", "http-max-queued-requests"), + serviceProvider->config()->get("worker", "num-http-loader-processing-threads")), _workerName(workerName), _requestMgr(IngestRequestMgr::create(serviceProvider, workerName)), _threads(serviceProvider->config()->get("worker", "num-async-loader-processing-threads")) {} -string const& IngestHttpSvc::context() const { return context_; } - -void IngestHttpSvc::registerServices() { +void IngestHttpSvc::registerServices(unique_ptr const& server) { + throwIf(server == nullptr, context_ + "the server is not initialized"); auto const self = shared_from_base(); - httpServer()->addHandlers( - {{"GET", "/meta/version", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - json const info = json::object({{"kind", "replication-worker-ingest"}, - {"id", self->_workerName}, - {"instance_id", self->serviceProvider()->instanceId()}}); - http::MetaModule::process(::context_, info, req, resp, "VERSION"); - }}, - {"POST", "/ingest/data", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, - "SYNC-PROCESS-DATA"); - }}, - {"POST", "/ingest/file", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "SYNC-PROCESS"); - }}, - {"PUT", "/ingest/file/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "SYNC-RETRY"); - }}, - {"POST", "/ingest/file-async", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-SUBMIT"); - }}, - {"PUT", "/ingest/file-async/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-RETRY"); - }}, - {"GET", "/ingest/file-async/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-STATUS-BY-ID", http::AuthType::NONE); - }}, - {"DELETE", "/ingest/file-async/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-CANCEL-BY-ID"); - }}, - {"GET", "/ingest/file-async/trans/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-STATUS-BY-TRANS-ID", http::AuthType::NONE); - }}, - {"DELETE", "/ingest/file-async/trans/:id", - [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { - IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, - req, resp, "ASYNC-CANCEL-BY-TRANS-ID"); - }}}); + server->Get("/meta/version", [self](httplib::Request const& req, httplib::Response& resp) { + json const info = json::object({{"kind", "replication-worker-ingest"}, + {"id", self->_workerName}, + {"instance_id", self->serviceProvider()->instanceId()}}); + http::ChttpMetaModule::process(context_, info, req, resp, "VERSION"); + }); + + server->Post("/ingest/data", [self](httplib::Request const& req, httplib::Response& resp) { + IngestDataHttpSvcMod::process(self->serviceProvider(), self->_workerName, req, resp, + "SYNC-PROCESS-DATA"); + }); + server->Post("/ingest/file", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "SYNC-PROCESS"); + }); + server->Put("/ingest/file/:id", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "SYNC-RETRY"); + }); + server->Post("/ingest/file-async", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "ASYNC-SUBMIT"); + }); + server->Put("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "ASYNC-RETRY"); + }); + server->Get("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "ASYNC-STATUS-BY-ID", http::AuthType::NONE); + }); + server->Delete("/ingest/file-async/:id", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "ASYNC-CANCEL-BY-ID"); + }); + server->Get("/ingest/file-async/trans/:id", [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, self->_workerName, req, resp, + "ASYNC-STATUS-BY-TRANS-ID", http::AuthType::NONE); + }); + server->Delete("/ingest/file-async/trans/:id", + [self](httplib::Request const& req, httplib::Response& resp) { + IngestHttpSvcMod::process(self->serviceProvider(), self->_requestMgr, + self->_workerName, req, resp, "ASYNC-CANCEL-BY-TRANS-ID"); + }); // Create the thread pool for processing asynchronous loading requests. for (auto&& ptr : _threads) { diff --git a/src/replica/ingest/IngestHttpSvc.h b/src/replica/ingest/IngestHttpSvc.h index 339744bae..401a043c6 100644 --- a/src/replica/ingest/IngestHttpSvc.h +++ b/src/replica/ingest/IngestHttpSvc.h @@ -25,16 +25,21 @@ #include #include #include +#include // Qserv headers -#include "replica/services/ServiceProvider.h" -#include "replica/util/HttpSvc.h" +#include "replica/util/ChttpSvc.h" // Forward declarations namespace lsst::qserv::replica { class IngestRequestMgr; +class ServiceProvider; } // namespace lsst::qserv::replica +namespace httplib { +class Server; +} // namespace httplib + // This header declarations namespace lsst::qserv::replica { @@ -47,10 +52,8 @@ namespace lsst::qserv::replica { * service threads as configured in Configuration. * @note The implementation of the class is not thread-safe. */ -class IngestHttpSvc : public HttpSvc { +class IngestHttpSvc : public ChttpSvc { public: - typedef std::shared_ptr Ptr; - /** * Create an instance of the service. * @@ -59,7 +62,8 @@ class IngestHttpSvc : public HttpSvc { * checking consistency of the protocol). * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName); + static std::shared_ptr create(std::shared_ptr const& serviceProvider, + std::string const& workerName); IngestHttpSvc() = delete; IngestHttpSvc(IngestHttpSvc const&) = delete; @@ -68,15 +72,12 @@ class IngestHttpSvc : public HttpSvc { virtual ~IngestHttpSvc() = default; protected: - /// @see HttpSvc::context() - virtual std::string const& context() const; - /// @see HttpSvc::registerServices() - virtual void registerServices(); + virtual void registerServices(std::unique_ptr const& server) override; private: /// @see IngestHttpSvc::create() - IngestHttpSvc(ServiceProvider::Ptr const& serviceProvider, std::string const& workerName); + IngestHttpSvc(std::shared_ptr const& serviceProvider, std::string const& workerName); // Input parameters std::string const _workerName; diff --git a/src/replica/ingest/IngestHttpSvcMod.cc b/src/replica/ingest/IngestHttpSvcMod.cc index 2782c442e..bd6ab43f0 100644 --- a/src/replica/ingest/IngestHttpSvcMod.cc +++ b/src/replica/ingest/IngestHttpSvcMod.cc @@ -24,8 +24,9 @@ // Qserv header #include "http/Method.h" -#include "qhttp/Request.h" -#include "qhttp/Response.h" +#include "replica/ingest/IngestRequest.h" +#include "replica/ingest/IngestRequestMgr.h" +#include "replica/services/ServiceProvider.h" #include "replica/util/Csv.h" // System headers @@ -37,18 +38,19 @@ using json = nlohmann::json; namespace lsst::qserv::replica { -void IngestHttpSvcMod::process(ServiceProvider::Ptr const& serviceProvider, - IngestRequestMgr::Ptr const& ingestRequestMgr, string const& workerName, - qhttp::Request::Ptr const& req, shared_ptr const& resp, +void IngestHttpSvcMod::process(shared_ptr const& serviceProvider, + shared_ptr const& ingestRequestMgr, string const& workerName, + httplib::Request const& req, httplib::Response& resp, string const& subModuleName, http::AuthType const authType) { IngestHttpSvcMod module(serviceProvider, ingestRequestMgr, workerName, req, resp); module.execute(subModuleName, authType); } -IngestHttpSvcMod::IngestHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, - IngestRequestMgr::Ptr const& ingestRequestMgr, string const& workerName, - qhttp::Request::Ptr const& req, shared_ptr const& resp) - : http::QhttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp), +IngestHttpSvcMod::IngestHttpSvcMod(shared_ptr const& serviceProvider, + shared_ptr const& ingestRequestMgr, + string const& workerName, httplib::Request const& req, + httplib::Response& resp) + : http::ChttpModule(serviceProvider->authKey(), serviceProvider->adminAuthKey(), req, resp), _serviceProvider(serviceProvider), _ingestRequestMgr(ingestRequestMgr), _workerName(workerName) {} @@ -170,7 +172,7 @@ json IngestHttpSvcMod::_asyncTransCancelRequests() const { return json::object({{"contribs", contribsJson}}); } -IngestRequest::Ptr IngestHttpSvcMod::_createRequest(bool async) const { +shared_ptr IngestHttpSvcMod::_createRequest(bool async) const { auto const config = _serviceProvider->config(); TransactionId const transactionId = body().required("transaction_id"); string const table = body().required("table"); @@ -226,13 +228,13 @@ IngestRequest::Ptr IngestHttpSvcMod::_createRequest(bool async) const { debug(__func__, "max_num_warnings: " + to_string(maxNumWarnings)); debug(__func__, "max_retries: " + to_string(maxRetries)); - IngestRequest::Ptr const request = IngestRequest::create( - _serviceProvider, _workerName, transactionId, table, chunk, isOverlap, url, charsetName, async, - dialectInput, httpMethod, httpData, httpHeaders, maxNumWarnings, maxRetries); + auto const request = IngestRequest::create(_serviceProvider, _workerName, transactionId, table, chunk, + isOverlap, url, charsetName, async, dialectInput, httpMethod, + httpData, httpHeaders, maxNumWarnings, maxRetries); return request; } -IngestRequest::Ptr IngestHttpSvcMod::_createRetry(bool async) const { +shared_ptr IngestHttpSvcMod::_createRetry(bool async) const { unsigned int const id = stoul(params().at("id")); debug(__func__, "id: " + to_string(id)); return IngestRequest::createRetry(_serviceProvider, _workerName, id, async); diff --git a/src/replica/ingest/IngestHttpSvcMod.h b/src/replica/ingest/IngestHttpSvcMod.h index 0e24f322b..a3ce84227 100644 --- a/src/replica/ingest/IngestHttpSvcMod.h +++ b/src/replica/ingest/IngestHttpSvcMod.h @@ -28,10 +28,14 @@ #include "nlohmann/json.hpp" // Qserv headers -#include "http/QhttpModule.h" -#include "replica/ingest/IngestRequest.h" -#include "replica/ingest/IngestRequestMgr.h" -#include "replica/services/ServiceProvider.h" +#include "http/ChttpModule.h" + +// Forward declarations +namespace lsst::qserv::replica { +class IngestRequest; +class IngestRequestMgr; +class ServiceProvider; +} // namespace lsst::qserv::replica // This header declarations namespace lsst::qserv::replica { @@ -40,7 +44,7 @@ namespace lsst::qserv::replica { * Class IngestHttpSvcMod processes chunk/table contribution requests made over HTTP. * The class is used by the HTTP server built into the worker Ingest service. */ -class IngestHttpSvcMod : public http::QhttpModule { +class IngestHttpSvcMod : public http::ChttpModule { public: IngestHttpSvcMod() = delete; IngestHttpSvcMod(IngestHttpSvcMod const&) = delete; @@ -77,10 +81,10 @@ class IngestHttpSvcMod : public http::QhttpModule { * @param authType The authorization requirements for the module * @throws std::invalid_argument for unknown values of parameter 'subModuleName' */ - static void process(ServiceProvider::Ptr const& serviceProvider, - IngestRequestMgr::Ptr const& ingestRequestMgr, std::string const& workerName, - std::shared_ptr const& req, - std::shared_ptr const& resp, std::string const& subModuleName, + static void process(std::shared_ptr const& serviceProvider, + std::shared_ptr const& ingestRequestMgr, + std::string const& workerName, httplib::Request const& req, httplib::Response& resp, + std::string const& subModuleName, http::AuthType const authType = http::AuthType::REQUIRED); protected: @@ -92,10 +96,9 @@ class IngestHttpSvcMod : public http::QhttpModule { private: /// @see method IngestHttpSvcMod::create() - IngestHttpSvcMod(ServiceProvider::Ptr const& serviceProvider, - IngestRequestMgr::Ptr const& ingestRequestMgr, std::string const& workerName, - std::shared_ptr const& req, - std::shared_ptr const& resp); + IngestHttpSvcMod(std::shared_ptr const& serviceProvider, + std::shared_ptr const& ingestRequestMgr, std::string const& workerName, + httplib::Request const& req, httplib::Response& resp); /// Process a table contribution request (SYNC). nlohmann::json _syncProcessRequest() const; @@ -131,7 +134,7 @@ class IngestHttpSvcMod : public http::QhttpModule { * @param async The optional type of a request to be created. * @return A pointer to the created request. */ - IngestRequest::Ptr _createRequest(bool async = false) const; + std::shared_ptr _createRequest(bool async = false) const; /** * Locate and evaluate the specified table contribution request, and if it's @@ -140,11 +143,11 @@ class IngestHttpSvcMod : public http::QhttpModule { * with parameters request creation, or database services interactions. * @return A pointer to the prepared request. */ - IngestRequest::Ptr _createRetry(bool async = false) const; + std::shared_ptr _createRetry(bool async = false) const; // Input parameters - ServiceProvider::Ptr const _serviceProvider; - IngestRequestMgr::Ptr const _ingestRequestMgr; + std::shared_ptr const _serviceProvider; + std::shared_ptr const _ingestRequestMgr; std::string const _workerName; }; diff --git a/src/replica/util/CMakeLists.txt b/src/replica/util/CMakeLists.txt index c17ddd476..4dab0e615 100644 --- a/src/replica/util/CMakeLists.txt +++ b/src/replica/util/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(replica_util OBJECT) add_dependencies(replica_util replica_proto) target_sources(replica_util PRIVATE + ChttpSvc.cc ChunkNumber.cc ChunkedTable.cc Common.cc diff --git a/src/replica/util/ChttpSvc.cc b/src/replica/util/ChttpSvc.cc new file mode 100644 index 000000000..de24c4b98 --- /dev/null +++ b/src/replica/util/ChttpSvc.cc @@ -0,0 +1,84 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/util/ChttpSvc.h" + +// System headers +#include +#include +#include + +// Qserv headers +#include "replica/util/Common.h" + +// Third-party headers +#include "httplib.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.ChttpSvc"); + +} // namespace + +namespace lsst::qserv::replica { + +ChttpSvc::ChttpSvc(std::string const& context, shared_ptr const& serviceProvider, + uint16_t port, size_t maxQueuedRequests, size_t numThreads) + : _context(context), + _serviceProvider(serviceProvider), + _port(port), + _maxQueuedRequests(maxQueuedRequests), + _numThreads(numThreads) { + _createAndConfigure(); +} + +void ChttpSvc::run() { + // IMPORTANT: Request handlers can't be registered in the constructor + // since it's not allowed to make calls to shared_from_this() from there. + registerServices(_server); + + bool const started = _server->listen_after_bind(); + throwIf(!started, _context + "Failed to start the server"); +} + +void ChttpSvc::_createAndConfigure() { + _server = make_unique(); + throwIf(!_server->is_valid(), _context + "Failed to create the server"); + + _server->new_task_queue = [&] { return new httplib::ThreadPool(_numThreads, _maxQueuedRequests); }; + if (_port == 0) { + _port = _server->bind_to_any_port(_bindAddr, _port); + throwIf(_port < 0, _context + "Failed to bind the server to any port"); + } else { + bool const bound = _server->bind_to_port(_bindAddr, _port); + throwIf(!bound, + _context + "Failed to bind the server to the port: " + to_string(_port)); + } + LOGS(_log, LOG_LVL_INFO, _context + "started on port " + to_string(_port)); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/util/ChttpSvc.h b/src/replica/util/ChttpSvc.h new file mode 100644 index 000000000..e16e332ea --- /dev/null +++ b/src/replica/util/ChttpSvc.h @@ -0,0 +1,109 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_CHTTPSVC_H +#define LSST_QSERV_REPLICA_CHTTPSVC_H + +// System headers +#include +#include + +// Forward declarations +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +namespace httplib { +class Server; +} // namespace httplib + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class ChttpSvc is a base class for HTTP servers of various components of the system. + * + * @note The class's implementation runs the server witing its own collection + * of service threads. The number of threads is specified via the corresponding + * parameter of the class's constructor. + * @note The implementation of the class is not thread-safe. + */ +class ChttpSvc : public std::enable_shared_from_this { +public: + ChttpSvc() = delete; + ChttpSvc(ChttpSvc const&) = delete; + ChttpSvc& operator=(ChttpSvc const&) = delete; + + ~ChttpSvc() = default; + + /// @return Return the port number the server is bound to. + int port() const { return _port; } + + /** + * Register REST handlers, start threads and run the server in the thread pool. + * @note This is the blocking operation. Please, run it within its own thread if needed. + * @throw std::runtime_error If the server can't be started. + */ + void run(); + +protected: + /** + * The constructor won't start any threads. + * + * @param context The context string to be used for the message logging. + * @param serviceProvider For configuration, etc. services. + * @param port The number of a port to bind to (passing 0 would result in allocating the first available + * port). + * @param maxQueuedRequests The maximum number of queued requests (accepted()ed and waiting to be routed). + * @param numThreads The number of BOOST ASIO threads. + * @throws std::runtime_error If the server can't be created. + */ + ChttpSvc(std::string const& context, std::shared_ptr const& serviceProvider, + std::uint16_t port, std::size_t maxQueuedRequests, std::size_t numThreads); + + std::shared_ptr const& serviceProvider() const { return _serviceProvider; } + + /// @return A shared pointer of the desired subclass (no dynamic type checking) + template + std::shared_ptr shared_from_base() { + return std::static_pointer_cast(shared_from_this()); + } + + /// @return The context string to be used for the message logging. + std::string const& context() const { return _context; } + + /// Register subclass-specific REST services. + virtual void registerServices(std::unique_ptr const& server) = 0; + +private: + void _createAndConfigure(); + + std::string const _context; + std::shared_ptr const _serviceProvider; + int _port; + std::size_t const _maxQueuedRequests = 0; // 0 means unlimited + std::size_t const _numThreads; + std::string const _bindAddr = "0.0.0.0"; + std::unique_ptr _server; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_CHTTPSVC_H