diff --git a/src/proto/worker.proto b/src/proto/worker.proto index 548591bc5..ee6049b0c 100644 --- a/src/proto/worker.proto +++ b/src/proto/worker.proto @@ -150,13 +150,14 @@ message Result { // separate messages (of the corresponding types). message WorkerCommandH { enum Command { - TEST_ECHO = 1; // Return back a value sent to the command processor. - ADD_CHUNK_GROUP = 2; // Add a group of collocated chunks. - REMOVE_CHUNK_GROUP = 3; // Remove a group of collocated chunks. - UPDATE_CHUNK_LIST = 4; // Update (rebuild and/or reload) the list of available chunks. - GET_CHUNK_LIST = 5; // Return a list of chunks known to a worker. - SET_CHUNK_LIST = 6; // Set a new list of chunks. - GET_STATUS = 7; // Return various status info on a worker. + TEST_ECHO = 1; // Return back a value sent to the command processor. + ADD_CHUNK_GROUP = 2; // Add a group of collocated chunks. + REMOVE_CHUNK_GROUP = 3; // Remove a group of collocated chunks. + UPDATE_CHUNK_LIST = 4; // Update (rebuild and/or reload) the list of available chunks. + GET_CHUNK_LIST = 5; // Return a list of chunks known to a worker. + SET_CHUNK_LIST = 6; // Set a new list of chunks. + GET_STATUS = 7; // Return various status info on a worker. + GET_DATABASE_STATUS = 8; // Return various info on the worker database service. } required Command command = 1; } @@ -274,6 +275,13 @@ message WorkerCommandGetStatusR { required string info = 1; // Status info serialized from a JSON object } +// The message to be sent back in response to the 'GET_DATABASE_STATUS' command +// +message WorkerCommandGetDbStatusR { + required WorkerCommandStatus status = 1; // Completion status of the operation + required string info = 2; // Status info serialized from a JSON object +} + ///////////////////////////////////////////////////////////////// // Protocol definition for the query management requests. These // requests do not require any response messages to be explicitly diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index cd9163d18..28e8855cf 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -144,6 +144,8 @@ target_sources(replica PRIVATE FixUpJob.h GetReplicasQservMgtRequest.cc GetReplicasQservMgtRequest.h + GetDbStatusQservMgtRequest.cc + GetDbStatusQservMgtRequest.h GetStatusQservMgtRequest.cc GetStatusQservMgtRequest.h HealthMonitorTask.cc diff --git a/src/replica/GetDbStatusQservMgtRequest.cc b/src/replica/GetDbStatusQservMgtRequest.cc new file mode 100644 index 000000000..c1e89bd52 --- /dev/null +++ b/src/replica/GetDbStatusQservMgtRequest.cc @@ -0,0 +1,128 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetDbStatusQservMgtRequest.h" + +// System headers +#include +#include + +// Third party headers +#include "XrdSsi/XrdSsiProvider.hh" +#include "XrdSsi/XrdSsiService.hh" + +// Qserv headers +#include "global/ResourceUnit.h" +#include "proto/worker.pb.h" +#include "replica/ServiceProvider.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetDbStatusQservMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +GetDbStatusQservMgtRequest::Ptr GetDbStatusQservMgtRequest::create( + ServiceProvider::Ptr const& serviceProvider, string const& worker, + GetDbStatusQservMgtRequest::CallbackType const& onFinish) { + return GetDbStatusQservMgtRequest::Ptr(new GetDbStatusQservMgtRequest(serviceProvider, worker, onFinish)); +} + +GetDbStatusQservMgtRequest::GetDbStatusQservMgtRequest( + ServiceProvider::Ptr const& serviceProvider, string const& worker, + GetDbStatusQservMgtRequest::CallbackType const& onFinish) + : QservMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {} + +json const& GetDbStatusQservMgtRequest::info() const { + if (!((state() == State::FINISHED) && (extendedState() == ExtendedState::SUCCESS))) { + throw logic_error("GetDbStatusQservMgtRequest::" + string(__func__) + + " no info available in state: " + state2string(state(), extendedState())); + } + return _info; +} + +list> GetDbStatusQservMgtRequest::extendedPersistentState() const { + list> result; + return result; +} + +void GetDbStatusQservMgtRequest::startImpl(replica::Lock const& lock) { + auto const request = shared_from_base(); + _qservRequest = xrdreq::GetDbStatusQservRequest::create([request](proto::WorkerCommandStatus::Code code, + string const& error, + string const& info) { + if (request->state() == State::FINISHED) return; + replica::Lock const lock(request->_mtx, request->context() + string(__func__) + "[callback]"); + if (request->state() == State::FINISHED) return; + + switch (code) { + case proto::WorkerCommandStatus::SUCCESS: + try { + request->_setInfo(lock, info); + request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS); + } catch (exception const& ex) { + string const msg = "failed to parse worker response, ex: " + string(ex.what()); + LOGS(_log, LOG_LVL_ERROR, "GetDbStatusQservMgtRequest::" << __func__ << " " << msg); + request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD_RESPONSE, msg); + } + break; + case proto::WorkerCommandStatus::ERROR: + request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error); + break; + default: + throw logic_error("GetDbStatusQservMgtRequest::" + string(__func__) + + " unhandled server status: " + proto::WorkerCommandStatus_Code_Name(code)); + } + }); + XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker())); + service()->ProcessRequest(*_qservRequest, resource); +} + +void GetDbStatusQservMgtRequest::finishImpl(replica::Lock const& lock) { + switch (extendedState()) { + case ExtendedState::CANCELLED: + case ExtendedState::TIMEOUT_EXPIRED: + if (_qservRequest) _qservRequest->cancel(); + break; + default: + break; + } +} + +void GetDbStatusQservMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +void GetDbStatusQservMgtRequest::_setInfo(replica::Lock const& lock, string const& info) { + _info = json::parse(info); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetDbStatusQservMgtRequest.h b/src/replica/GetDbStatusQservMgtRequest.h new file mode 100644 index 000000000..57a7d8b42 --- /dev/null +++ b/src/replica/GetDbStatusQservMgtRequest.h @@ -0,0 +1,115 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H + +// System headers +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "replica/QservMgtRequest.h" +#include "replica/ServiceProvider.h" +#include "xrdreq/GetDbStatusQservRequest.h" + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetDbStatusQservMgtRequest is a request for obtaining various info + * on the database service of the Qserv worker. + */ +class GetDbStatusQservMgtRequest : public QservMgtRequest { +public: + typedef std::shared_ptr Ptr; + + /// The function type for notifications on the completion of the request + typedef std::function CallbackType; + + GetDbStatusQservMgtRequest() = delete; + GetDbStatusQservMgtRequest(GetDbStatusQservMgtRequest const&) = delete; + GetDbStatusQservMgtRequest& operator=(GetDbStatusQservMgtRequest const&) = delete; + + ~GetDbStatusQservMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, + CallbackType const& onFinish = nullptr); + + /** + * @return The info object returned back by the worker. + * @note The method will throw exception std::logic_error if called before + * the request finishes or if it's finished with any status but SUCCESS. + */ + nlohmann::json const& info() const; + + /// @see QservMgtRequest::extendedPersistentState() + std::list> extendedPersistentState() const override; + +protected: + /// @see QservMgtRequest::startImpl() + void startImpl(replica::Lock const& lock) final; + + /// @see QservMgtRequest::finishImpl() + void finishImpl(replica::Lock const& lock) final; + + /// @see QservMgtRequest::notify() + void notify(replica::Lock const& lock) final; + +private: + /// @see GetDbStatusQservMgtRequest::create() + GetDbStatusQservMgtRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker, + CallbackType const& onFinish); + + /** + * Carry over results of the request into a local storage. + * @param lock A lock on QservMgtRequest::_mtx must be acquired by a caller of the method. + * @param info The data string returned by a worker. + */ + void _setInfo(replica::Lock const& lock, std::string const& info); + + // Input parameters + + std::string const _data; + CallbackType _onFinish; ///< this object is reset after finishing the request + + /// A request to the remote services + xrdreq::GetDbStatusQservRequest::Ptr _qservRequest; + + /// The info object returned by the Qserv worker + nlohmann::json _info; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H diff --git a/src/replica/QservMgtServices.cc b/src/replica/QservMgtServices.cc index 2e363d5d0..7422b6910 100644 --- a/src/replica/QservMgtServices.cc +++ b/src/replica/QservMgtServices.cc @@ -312,6 +312,40 @@ GetStatusQservMgtRequest::Ptr QservMgtServices::status(std::string const& worker return request; } +GetDbStatusQservMgtRequest::Ptr QservMgtServices::databaseStatus( + std::string const& worker, std::string const& jobId, + GetDbStatusQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + GetDbStatusQservMgtRequest::Ptr request; + + // Make sure the XROOTD/SSI service is available before attempting + // any operations on requests + + XrdSsiService* service = _xrdSsiService(); + if (not service) { + return request; + } else { + replica::Lock lock(_mtx, "QservMgtServices::" + string(__func__)); + + auto const manager = shared_from_this(); + + request = GetDbStatusQservMgtRequest::create( + serviceProvider(), worker, + [manager](QservMgtRequest::Ptr const& request) { manager->_finish(request->id()); }); + + // Register the request (along with its callback) by its unique + // identifier in the local registry. Once it's complete it'll + // be automatically removed from the Registry. + _registry[request->id()] = + make_shared>(request, onFinish); + } + + // Initiate the request in the lock-free zone to avoid blocking the service + // from initiating other requests which this one is starting. + request->start(service, jobId, requestExpirationIvalSec); + + return request; +} + void QservMgtServices::_finish(string const& id) { string const context = id + " QservMgtServices::" + string(__func__) + " "; diff --git a/src/replica/QservMgtServices.h b/src/replica/QservMgtServices.h index eb1b973f1..be747a3a1 100644 --- a/src/replica/QservMgtServices.h +++ b/src/replica/QservMgtServices.h @@ -30,6 +30,7 @@ // Qserv headers #include "replica/AddReplicaQservMgtRequest.h" #include "replica/GetReplicasQservMgtRequest.h" +#include "replica/GetDbStatusQservMgtRequest.h" #include "replica/GetStatusQservMgtRequest.h" #include "replica/RemoveReplicaQservMgtRequest.h" #include "replica/ServiceProvider.h" @@ -218,6 +219,23 @@ class QservMgtServices : public std::enable_shared_from_this { GetStatusQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + /** + * Request detailed status on the database service of a Qserv worker + * + * @param worker The name of a worker. + * @param jobId An optional identifier of a job specifying a context in which + * a request will be executed. + * @param onFinish A callback function to be called upon request completion. + * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing + * to override the default value of the corresponding parameter from the Configuration. + * @return A pointer to the request object if the request was made. Return + * nullptr otherwise. + */ + GetDbStatusQservMgtRequest::Ptr databaseStatus( + std::string const& worker, std::string const& jobId = "", + GetDbStatusQservMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + private: /** * @param serviceProvider Is required for accessing configuration parameters. diff --git a/src/wpublish/CMakeLists.txt b/src/wpublish/CMakeLists.txt index 985020656..2bce3cc79 100644 --- a/src/wpublish/CMakeLists.txt +++ b/src/wpublish/CMakeLists.txt @@ -6,6 +6,7 @@ target_sources(wpublish PRIVATE ChunkInventory.cc ChunkListCommand.cc GetChunkListCommand.cc + GetDbStatusCommand.cc GetStatusCommand.cc QueriesAndChunks.cc RemoveChunkGroupCommand.cc diff --git a/src/wpublish/GetDbStatusCommand.cc b/src/wpublish/GetDbStatusCommand.cc new file mode 100644 index 000000000..4231a10de --- /dev/null +++ b/src/wpublish/GetDbStatusCommand.cc @@ -0,0 +1,124 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "wpublish/GetDbStatusCommand.h" + +// System headers +#include +#include +#include + +// Third party headers +#include +#include +#include +#include "nlohmann/json.hpp" + +// Qserv headers +#include "mysql/MySqlConfig.h" +#include "mysql/MySqlConnection.h" +#include "proto/worker.pb.h" +#include "wbase/SendChannel.h" +#include "wconfig/WorkerConfig.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; +using json = nlohmann::json; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.wpublish.GetDbStatusCommand"); + +string errInfo(lsst::qserv::mysql::MySqlConnection const& conn) { + return "errno: " + to_string(conn.getErrno()) + ", error: " + conn.getError(); +} + +} // anonymous namespace + +namespace lsst::qserv::wpublish { + +GetDbStatusCommand::GetDbStatusCommand(shared_ptr const& sendChannel) + : wbase::WorkerCommand(sendChannel) {} + +void GetDbStatusCommand::run() { + string const context = "GetDbStatusCommand::" + string(__func__); + LOGS(_log, LOG_LVL_DEBUG, context); + + string const query = "SHOW FULL PROCESSLIST"; + + mysql::MySqlConnection conn(wconfig::WorkerConfig::instance()->getMySqlConfig()); + if (!conn.connect()) { + string const err = "failed to connect to the worker database, " + ::errInfo(conn); + LOGS(_log, LOG_LVL_ERROR, context << " " << err); + reportError(err); + return; + } + if (!conn.queryUnbuffered(query)) { + string const err = "failed to execute the query: '" + query + "', " + ::errInfo(conn); + LOGS(_log, LOG_LVL_ERROR, context << " " << err); + reportError(err); + return; + } + json result; + result["queries"] = json::object({{"columns", json::array()}, {"rows", json::array()}}); + int const numFields = conn.getResultFieldCount(); + if (numFields > 0) { + result["queries"]["columns"] = conn.getColumnNames(); + auto& rows = result["queries"]["rows"]; + MYSQL_RES* mysqlResult = conn.getResult(); + while (true) { + MYSQL_ROW mysqlRow = mysql_fetch_row(mysqlResult); + if (!mysqlRow) { + if (0 == conn.getErrno()) { + // End of iteration if no specific error was reported. + break; + } + string const err = "failed to fetch next row for query: '" + query + "', " + ::errInfo(conn); + LOGS(_log, LOG_LVL_ERROR, context << " " << err); + reportError(err); + return; + } + size_t const* lengths = mysql_fetch_lengths(mysqlResult); + json row = json::array(); + for (int i = 0; i < numFields; i++) { + // Report the empty string for SQL NULL. + auto const length = lengths[i]; + row.push_back(length == 0 ? string() : string(mysqlRow[i], length)); + } + rows.push_back(row); + } + } + proto::WorkerCommandGetDbStatusR reply; + reply.mutable_status(); + reply.set_info(result.dump()); + + _frameBuf.serialize(reply); + string str(_frameBuf.data(), _frameBuf.size()); + _sendChannel->sendStream(xrdsvc::StreamBuffer::createWithMove(str), true); + + LOGS(_log, LOG_LVL_DEBUG, context << " ** SENT **"); +} + +} // namespace lsst::qserv::wpublish diff --git a/src/wpublish/GetDbStatusCommand.h b/src/wpublish/GetDbStatusCommand.h new file mode 100644 index 000000000..97cab8826 --- /dev/null +++ b/src/wpublish/GetDbStatusCommand.h @@ -0,0 +1,61 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_WPUBLISH_GET_DB_STATUS_COMMAND_H +#define LSST_QSERV_WPUBLISH_GET_DB_STATUS_COMMAND_H + +// System headers +#include + +// Qserv headers +#include "wbase/WorkerCommand.h" + +// Forward declarations +namespace lsst::qserv::wbase { +class SendChannel; +} // namespace lsst::qserv::wbase + +// This header declarations +namespace lsst::qserv::wpublish { + +/** + * Class GetDbStatusCommand returns various info on the status of the database + * service of the Qserv worker. + */ +class GetDbStatusCommand : public wbase::WorkerCommand { +public: + /** + * @param sendChannel The communication channel for reporting results. + */ + GetDbStatusCommand(std::shared_ptr const& sendChannel); + + GetDbStatusCommand() = delete; + GetDbStatusCommand& operator=(GetDbStatusCommand const&) = delete; + GetDbStatusCommand(GetDbStatusCommand const&) = delete; + + virtual ~GetDbStatusCommand() override = default; + + virtual void run() override; +}; + +} // namespace lsst::qserv::wpublish + +#endif // LSST_QSERV_WPUBLISH_GET_DB_STATUS_COMMAND_H diff --git a/src/xrdreq/CMakeLists.txt b/src/xrdreq/CMakeLists.txt index 6aa8cf95d..e63e1e58b 100644 --- a/src/xrdreq/CMakeLists.txt +++ b/src/xrdreq/CMakeLists.txt @@ -5,6 +5,7 @@ target_sources(xrdreq PRIVATE ChunkGroupQservRequest.cc ChunkListQservRequest.cc GetChunkListQservRequest.cc + GetDbStatusQservRequest.cc GetStatusQservRequest.cc QservRequest.cc QueryManagementAction.cc diff --git a/src/xrdreq/GetDbStatusQservRequest.cc b/src/xrdreq/GetDbStatusQservRequest.cc new file mode 100644 index 000000000..f781dc6cd --- /dev/null +++ b/src/xrdreq/GetDbStatusQservRequest.cc @@ -0,0 +1,93 @@ +/* + * LSST Data Management System + * Copyright 2018 LSST Corporation. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "xrdreq/GetDbStatusQservRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdreq.GetDbStatusQservRequest"); + +} // namespace + +namespace lsst::qserv::xrdreq { + +GetDbStatusQservRequest::Ptr GetDbStatusQservRequest::create(GetDbStatusQservRequest::CallbackType onFinish) { + GetDbStatusQservRequest::Ptr ptr(new GetDbStatusQservRequest(onFinish)); + ptr->setRefToSelf4keepAlive(ptr); + return ptr; +} + +GetDbStatusQservRequest::GetDbStatusQservRequest(GetDbStatusQservRequest::CallbackType onFinish) + : _onFinish(onFinish) { + LOGS(_log, LOG_LVL_DEBUG, "GetDbStatusQservRequest ** CONSTRUCTED **"); +} + +GetDbStatusQservRequest::~GetDbStatusQservRequest() { + LOGS(_log, LOG_LVL_DEBUG, "GetDbStatusQservRequest ** DELETED **"); +} + +void GetDbStatusQservRequest::onRequest(proto::FrameBuffer& buf) { + proto::WorkerCommandH header; + header.set_command(proto::WorkerCommandH::GET_DATABASE_STATUS); + buf.serialize(header); +} + +void GetDbStatusQservRequest::onResponse(proto::FrameBufferView& view) { + proto::WorkerCommandGetDbStatusR reply; + view.parse(reply); + + if (nullptr != _onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + + auto onFinish = move(_onFinish); + _onFinish = nullptr; + onFinish(proto::WorkerCommandStatus::SUCCESS, string(), reply.info()); + } +} + +void GetDbStatusQservRequest::onError(string const& error) { + if (nullptr != _onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + + auto onFinish = move(_onFinish); + _onFinish = nullptr; + onFinish(proto::WorkerCommandStatus::ERROR, error, string()); + } +} + +} // namespace lsst::qserv::xrdreq diff --git a/src/xrdreq/GetDbStatusQservRequest.h b/src/xrdreq/GetDbStatusQservRequest.h new file mode 100644 index 000000000..be75c53a6 --- /dev/null +++ b/src/xrdreq/GetDbStatusQservRequest.h @@ -0,0 +1,87 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_XRDREQ_GET_DB_STATUS_QSERV_REQUEST_H +#define LSST_QSERV_XRDREQ_GET_DB_STATUS_QSERV_REQUEST_H + +// System headers +#include +#include +#include +#include +#include + +// Qserv headers +#include "proto/worker.pb.h" +#include "xrdreq/QservRequest.h" + +namespace lsst::qserv::xrdreq { + +/** + * Class GetDbStatusQservRequest represents a request returning various info + * on the status of the database service of the Qserv worker. + */ +class GetDbStatusQservRequest : public QservRequest { +public: + /// The pointer type for instances of the class + typedef std::shared_ptr Ptr; + + /// The callback function type to be used for notifications on + /// the operation completion. + using CallbackType = std::function; // worker info received (if success) + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * + * @param onFinish (optional )callback function to be called upon the completion + * (successful or not) of the request. + * @see wbase::Task::Status + * @return the smart pointer to the object of the class + */ + static Ptr create(CallbackType onFinish = nullptr); + + GetDbStatusQservRequest() = delete; + GetDbStatusQservRequest(GetDbStatusQservRequest const&) = delete; + GetDbStatusQservRequest& operator=(GetDbStatusQservRequest const&) = delete; + + virtual ~GetDbStatusQservRequest() override; + +protected: + /// @see GetDbStatusQservRequest::create() + GetDbStatusQservRequest(CallbackType onFinish); + + virtual void onRequest(proto::FrameBuffer& buf) override; + virtual void onResponse(proto::FrameBufferView& view) override; + virtual void onError(std::string const& error) override; + +private: + // Parameters of the object + + CallbackType _onFinish; +}; + +} // namespace lsst::qserv::xrdreq + +#endif // LSST_QSERV_XRDREQ_GET_DB_STATUS_QSERV_REQUEST_H diff --git a/src/xrdsvc/SsiRequest.cc b/src/xrdsvc/SsiRequest.cc index 7dec4df11..a3359b8a3 100644 --- a/src/xrdsvc/SsiRequest.cc +++ b/src/xrdsvc/SsiRequest.cc @@ -53,6 +53,7 @@ #include "wpublish/AddChunkGroupCommand.h" #include "wpublish/ChunkListCommand.h" #include "wpublish/GetChunkListCommand.h" +#include "wpublish/GetDbStatusCommand.h" #include "wpublish/GetStatusCommand.h" #include "wpublish/RemoveChunkGroupCommand.h" #include "wpublish/ResourceMonitor.h" @@ -392,6 +393,10 @@ wbase::WorkerCommand::Ptr SsiRequest::parseWorkerCommand( sendChannel, _foreman, _resourceMonitor, ::proto2taskSelector(message)); break; } + case proto::WorkerCommandH::GET_DATABASE_STATUS: { + command = std::make_shared(sendChannel); + break; + } default: reportError("Unsupported command " + proto::WorkerCommandH_Command_Name(header.command()) + " found in WorkerCommandH on worker resource=" + _resourceName);