From 3242e3ab315ef3b5f8b2da3eb70087ec3a73643b Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 1 Dec 2023 23:13:12 +0000 Subject: [PATCH] Extended the replication System's Framework to monitor result files --- src/replica/CMakeLists.txt | 1 + src/replica/GetResultFilesQservMgtRequest.cc | 60 +++++++++++++ src/replica/GetResultFilesQservMgtRequest.h | 88 ++++++++++++++++++++ src/replica/HttpProcessor.cc | 6 ++ src/replica/HttpQservMonitorModule.cc | 52 ++++++++---- src/replica/HttpQservMonitorModule.h | 20 +++++ src/replica/QservMgtServices.cc | 12 +++ src/replica/QservMgtServices.h | 18 ++++ 8 files changed, 241 insertions(+), 16 deletions(-) create mode 100644 src/replica/GetResultFilesQservMgtRequest.cc create mode 100644 src/replica/GetResultFilesQservMgtRequest.h diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 8cf88f791..c27ee04bd 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -76,6 +76,7 @@ target_sources(replica PRIVATE GetReplicasQservMgtRequest.cc GetDbStatusQservMgtRequest.cc GetConfigQservMgtRequest.cc + GetResultFilesQservMgtRequest.cc GetStatusQservMgtRequest.cc HealthMonitorTask.cc HttpAsyncReqApp.cc diff --git a/src/replica/GetResultFilesQservMgtRequest.cc b/src/replica/GetResultFilesQservMgtRequest.cc new file mode 100644 index 000000000..c75293547 --- /dev/null +++ b/src/replica/GetResultFilesQservMgtRequest.cc @@ -0,0 +1,60 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetResultFilesQservMgtRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetResultFilesQservMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetResultFilesQservMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + GetResultFilesQservMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetResultFilesQservMgtRequest(serviceProvider, worker, onFinish)); +} + +GetResultFilesQservMgtRequest::GetResultFilesQservMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + GetResultFilesQservMgtRequest::CallbackType const& onFinish) + : QservMgtRequest(serviceProvider, "QSERV_GET_RESULT_FILES", worker), _onFinish(onFinish) {} + +void GetResultFilesQservMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const service = "/files"; + createHttpReq(lock, service); +} + +void GetResultFilesQservMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetResultFilesQservMgtRequest.h b/src/replica/GetResultFilesQservMgtRequest.h new file mode 100644 index 000000000..c9fcf6509 --- /dev/null +++ b/src/replica/GetResultFilesQservMgtRequest.h @@ -0,0 +1,88 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H + +// System headers +#include +#include + +// Qserv headers +#include "replica/QservMgtRequest.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetResultFilesQservMgtRequest is a request for obtaining info + * on the partial result files from the Qserv worker. + */ +class GetResultFilesQservMgtRequest : public QservMgtRequest { +public: + typedef std::shared_ptr Ptr; + + /// The function type for notifications on the completion of the request + typedef std::function CallbackType; + + GetResultFilesQservMgtRequest() = delete; + GetResultFilesQservMgtRequest(GetResultFilesQservMgtRequest const&) = delete; + GetResultFilesQservMgtRequest& operator=(GetResultFilesQservMgtRequest const&) = delete; + + virtual ~GetResultFilesQservMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + CallbackType const& onFinish = nullptr); + +protected: + /// @see QservMgtRequest::createHttpReqImpl() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see GetResultFilesQservMgtRequest::create() + GetResultFilesQservMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, CallbackType const& onFinish); + + // Input parameters + + CallbackType _onFinish; ///< This callback is reset after finishing the request. +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETRESULTFILESQSERVMGTREQUEST_H diff --git a/src/replica/HttpProcessor.cc b/src/replica/HttpProcessor.cc index 2d5699c23..7817374f1 100644 --- a/src/replica/HttpProcessor.cc +++ b/src/replica/HttpProcessor.cc @@ -257,6 +257,12 @@ void HttpProcessor::registerServices() { self->_processorConfig, req, resp, "WORKER-DB"); }); + httpServer()->addHandler("GET", "/replication/qserv/worker/files/:worker", + [self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) { + HttpQservMonitorModule::process(self->controller(), self->name(), + self->_processorConfig, req, resp, + "WORKER-FILES"); + }); httpServer()->addHandler("GET", "/replication/qserv/master/status", [self](qhttp::Request::Ptr const req, qhttp::Response::Ptr const resp) { HttpQservMonitorModule::process(self->controller(), self->name(), diff --git a/src/replica/HttpQservMonitorModule.cc b/src/replica/HttpQservMonitorModule.cc index 439ecf81f..b25f85774 100644 --- a/src/replica/HttpQservMonitorModule.cc +++ b/src/replica/HttpQservMonitorModule.cc @@ -131,6 +131,15 @@ void HttpQservMonitorModule::process(Controller::Ptr const& controller, string c module.execute(subModuleName, authType); } +void HttpQservMonitorModule::_throwIfNotSucceeded(string const& func, + shared_ptr const& request) { + if (request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS) return; + string const msg = "request id: " + request->id() + " of type: " + request->type() + + " sent to worker: " + request->worker() + + " failed, error: " + QservMgtRequest::state2string(request->extendedState()); + throw http::Error(func, msg); +} + HttpQservMonitorModule::HttpQservMonitorModule(Controller::Ptr const& controller, string const& taskName, HttpProcessorConfig const& processorConfig, qhttp::Request::Ptr const& req, @@ -146,6 +155,8 @@ json HttpQservMonitorModule::executeImpl(string const& subModuleName) { return _workerConfig(); else if (subModuleName == "WORKER-DB") return _workerDb(); + else if (subModuleName == "WORKER-FILES") + return _workerFiles(); else if (subModuleName == "CZAR") return _czar(); else if (subModuleName == "CZAR-CONFIG") @@ -250,15 +261,9 @@ json HttpQservMonitorModule::_workerConfig() { auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId, onFinish, timeoutSec); request->wait(); + _throwIfNotSucceeded(__func__, request); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); - throw http::Error(__func__, msg); - } - json result = json::object(); - result["config"] = request->info(); - return result; + return json::object({{"config", request->info()}}); } json HttpQservMonitorModule::_workerDb() { @@ -277,15 +282,30 @@ json HttpQservMonitorModule::_workerDb() { auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus( worker, noParentJobId, onFinish, timeoutSec); request->wait(); + _throwIfNotSucceeded(__func__, request); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); - throw http::Error(__func__, msg); - } - json result = json::object(); - result["status"] = request->info(); - return result; + return json::object({{"status", request->info()}}); +} + +json HttpQservMonitorModule::_workerFiles() { + debug(__func__); + checkApiVersion(__func__, 28); + + auto const worker = params().at("worker"); + unsigned int const timeoutSec = query().optionalUInt("timeout_sec", workerResponseTimeoutSec()); + + debug(__func__, "worker=" + worker); + debug(__func__, "timeout_sec=" + to_string(timeoutSec)); + + string const noParentJobId; + GetResultFilesQservMgtRequest::CallbackType const onFinish = nullptr; + + auto const request = controller()->serviceProvider()->qservMgtServices()->resultFiles( + worker, noParentJobId, onFinish, timeoutSec); + request->wait(); + + _throwIfNotSucceeded(__func__, request); + return json::object({{"status", request->info()}}); } json HttpQservMonitorModule::_czar() { diff --git a/src/replica/HttpQservMonitorModule.h b/src/replica/HttpQservMonitorModule.h index 292d42c3f..d474066e1 100644 --- a/src/replica/HttpQservMonitorModule.h +++ b/src/replica/HttpQservMonitorModule.h @@ -40,6 +40,10 @@ namespace lsst::qserv::wbase { struct TaskSelector; } // namespace lsst::qserv::wbase +namespace lsst::qserv::replica { +class QservMgtRequest; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -58,6 +62,7 @@ class HttpQservMonitorModule : public HttpModule { * WORKER - get the status info of a specific worker * WORKER-CONFIG - get configuration parameters of a specific worker * WORKER-DB - get the database status of a specific worker + * WORKER-FILES - get acollection of partial result files from a worker * CZAR - get the status info of Czar * CZAR-CONFIG - get configuration parameters of Czar * CZAR-DB - get the database status of Czar @@ -84,6 +89,15 @@ class HttpQservMonitorModule : public HttpModule { nlohmann::json executeImpl(std::string const& subModuleName) final; private: + /** + * The helper method for check the completion status of a request to ensure it succeded. + * @param func The calling context (for error reporting). + * @param request A request to be evaluated. + * @throw http::Error If the request didn't succeed. + */ + static void _throwIfNotSucceeded(std::string const& func, + std::shared_ptr const& request); + HttpQservMonitorModule(Controller::Ptr const& controller, std::string const& taskName, HttpProcessorConfig const& processorConfig, qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp); @@ -113,6 +127,12 @@ class HttpQservMonitorModule : public HttpModule { */ nlohmann::json _workerDb(); + /** + * Process a request for extracting info on the partial query result files + * from select Qserv worker. + */ + nlohmann::json _workerFiles(); + /** * Process a request for extracting various status info of Czar. */ diff --git a/src/replica/QservMgtServices.cc b/src/replica/QservMgtServices.cc index 459b7783a..d824839a3 100644 --- a/src/replica/QservMgtServices.cc +++ b/src/replica/QservMgtServices.cc @@ -152,6 +152,18 @@ GetConfigQservMgtRequest::Ptr QservMgtServices::config(string const& worker, str return request; } +GetResultFilesQservMgtRequest::Ptr QservMgtServices::resultFiles( + string const& worker, string const& jobId, + GetResultFilesQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + auto const request = GetResultFilesQservMgtRequest::create( + serviceProvider(), worker, [self = shared_from_this()](QservMgtRequest::Ptr const& request) { + self->_finish(request->id()); + }); + _register(__func__, request, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + void QservMgtServices::_finish(string const& id) { string const context = "QservMgtServices::" + string(__func__) + "[" + id + "] "; LOGS(_log, LOG_LVL_TRACE, context); diff --git a/src/replica/QservMgtServices.h b/src/replica/QservMgtServices.h index 9366aeda1..bcd6d4597 100644 --- a/src/replica/QservMgtServices.h +++ b/src/replica/QservMgtServices.h @@ -31,6 +31,7 @@ #include "replica/GetReplicasQservMgtRequest.h" #include "replica/GetDbStatusQservMgtRequest.h" #include "replica/GetConfigQservMgtRequest.h" +#include "replica/GetResultFilesQservMgtRequest.h" #include "replica/GetStatusQservMgtRequest.h" #include "replica/RemoveReplicaQservMgtRequest.h" #include "replica/SetReplicasQservMgtRequest.h" @@ -294,6 +295,23 @@ class QservMgtServices : public std::enable_shared_from_this { GetConfigQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + /** + * Request info on the partial result files of a Qserv worker + * @param worker The name of a worker. + * @param jobId An optional identifier of a job specifying a context in which + * a request will be executed. + * @param onFinish A callback function to be called upon request completion. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. + * @return A pointer to the request object if the request was made. Return + * nullptr otherwise. + */ + GetResultFilesQservMgtRequest::Ptr resultFiles( + std::string const& worker, std::string const& jobId = "", + GetResultFilesQservMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + private: /** * @param serviceProvider Is required for accessing configuration parameters.