Skip to content

Commit

Permalink
Extended Replication Framework to pull status of worker databases
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Aug 1, 2023
1 parent 94847e6 commit d0976ac
Show file tree
Hide file tree
Showing 13 changed files with 642 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ message WorkerCommandH {

// Return various status info on a worker
GET_STATUS = 7;

// Return various info on the worker database service, including the ongoing
// queries, etc.
GET_DATABASE_STATUS = 8;
}
required Command command = 1;
}
Expand Down Expand Up @@ -360,6 +364,13 @@ message WorkerCommandGetStatusR {
required string info = 1;
}

// The message to be sent back in response to the 'GET_DATABASE_STATUS' command
//
message WorkerCommandGetDbStatusR {

// Status info serialized from a JSON object
required string info = 1;
}

/////////////////////////////////////////////////////////////////
// Protocol definition for the query management requests. These
Expand Down
2 changes: 2 additions & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ target_sources(replica PRIVATE
FixUpJob.h
GetReplicasQservMgtRequest.cc
GetReplicasQservMgtRequest.h
GetDbStatusQservMgtRequest.cc
GetDbStatusQservMgtRequest.h
GetStatusQservMgtRequest.cc
GetStatusQservMgtRequest.h
HealthMonitorTask.cc
Expand Down
129 changes: 129 additions & 0 deletions src/replica/GetDbStatusQservMgtRequest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/GetDbStatusQservMgtRequest.h"

// System headers
#include <set>
#include <stdexcept>

// Third party headers
#include "XrdSsi/XrdSsiProvider.hh"
#include "XrdSsi/XrdSsiService.hh"

// Qserv headers
#include "global/ResourceUnit.h"
#include "replica/ServiceProvider.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace nlohmann;
using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetDbStatusQservMgtRequest");

} // namespace

namespace lsst::qserv::replica {

GetDbStatusQservMgtRequest::Ptr GetDbStatusQservMgtRequest::create(
ServiceProvider::Ptr const& serviceProvider, string const& worker,
GetDbStatusQservMgtRequest::CallbackType const& onFinish) {
return GetDbStatusQservMgtRequest::Ptr(new GetDbStatusQservMgtRequest(serviceProvider, worker, onFinish));
}

GetDbStatusQservMgtRequest::GetDbStatusQservMgtRequest(
ServiceProvider::Ptr const& serviceProvider, string const& worker,
GetDbStatusQservMgtRequest::CallbackType const& onFinish)
: QservMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {}

json const& GetDbStatusQservMgtRequest::info() const {
if (!((state() == State::FINISHED) && (extendedState() == ExtendedState::SUCCESS))) {
throw logic_error("GetDbStatusQservMgtRequest::" + string(__func__) +
" no info available in state: " + state2string(state(), extendedState()));
}
return _info;
}

list<pair<string, string>> GetDbStatusQservMgtRequest::extendedPersistentState() const {
list<pair<string, string>> result;
return result;
}

void GetDbStatusQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<GetDbStatusQservMgtRequest>();
_qservRequest =
xrdreq::GetDbStatusQservRequest::create([request](xrdreq::GetDbStatusQservRequest::Status status,
string const& error, string const& info) {
if (request->state() == State::FINISHED) return;
replica::Lock const lock(request->_mtx, request->context() + string(__func__) + "[callback]");
if (request->state() == State::FINISHED) return;

switch (status) {
case xrdreq::GetDbStatusQservRequest::Status::SUCCESS:
try {
request->_setInfo(lock, info);
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
} catch (exception const& ex) {
string const msg = "failed to parse worker response, ex: " + string(ex.what());
LOGS(_log, LOG_LVL_ERROR,
"GetDbStatusQservMgtRequest::" << __func__ << " " << msg);
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD_RESPONSE, msg);
}
break;
case xrdreq::GetDbStatusQservRequest::Status::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;
default:
throw logic_error("GetDbStatusQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
xrdreq::GetDbStatusQservRequest::status2str(status));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
service()->ProcessRequest(*_qservRequest, resource);
}

void GetDbStatusQservMgtRequest::finishImpl(replica::Lock const& lock) {
switch (extendedState()) {
case ExtendedState::CANCELLED:
case ExtendedState::TIMEOUT_EXPIRED:
if (_qservRequest) _qservRequest->cancel();
break;
default:
break;
}
}

void GetDbStatusQservMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<GetDbStatusQservMgtRequest>(lock, _onFinish);
}

void GetDbStatusQservMgtRequest::_setInfo(replica::Lock const& lock, string const& info) {
_info = json::parse(info);
}

} // namespace lsst::qserv::replica
115 changes: 115 additions & 0 deletions src/replica/GetDbStatusQservMgtRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H
#define LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H

// System headers
#include <memory>
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "replica/QservMgtRequest.h"
#include "replica/ServiceProvider.h"
#include "xrdreq/GetDbStatusQservRequest.h"

// This header declarations
namespace lsst::qserv::replica {

/**
* Class GetDbStatusQservMgtRequest is a request for obtaining various info
* on the database service of the Qserv worker.
*/
class GetDbStatusQservMgtRequest : public QservMgtRequest {
public:
typedef std::shared_ptr<GetDbStatusQservMgtRequest> Ptr;

/// The function type for notifications on the completion of the request
typedef std::function<void(Ptr)> CallbackType;

GetDbStatusQservMgtRequest() = delete;
GetDbStatusQservMgtRequest(GetDbStatusQservMgtRequest const&) = delete;
GetDbStatusQservMgtRequest& operator=(GetDbStatusQservMgtRequest const&) = delete;

~GetDbStatusQservMgtRequest() final = default;

/**
* Static factory method is needed to prevent issues with the lifespan
* and memory management of instances created otherwise (as values or via
* low-level pointers).
* @param serviceProvider A reference to a provider of services for accessing
* Configuration, saving the request's persistent state to the database.
* @param worker The name of a worker to send the request to.
* @param onFinish (optional) callback function to be called upon request completion.
* @return A pointer to the created object.
*/
static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
CallbackType const& onFinish = nullptr);

/**
* @return The info object returned back by the worker.
* @note The method will throw exception std::logic_error if called before
* the request finishes or if it's finished with any status but SUCCESS.
*/
nlohmann::json const& info() const;

/// @see QservMgtRequest::extendedPersistentState()
std::list<std::pair<std::string, std::string>> extendedPersistentState() const override;

protected:
/// @see QservMgtRequest::startImpl()
void startImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::finishImpl()
void finishImpl(replica::Lock const& lock) final;

/// @see QservMgtRequest::notify()
void notify(replica::Lock const& lock) final;

private:
/// @see GetDbStatusQservMgtRequest::create()
GetDbStatusQservMgtRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
CallbackType const& onFinish);

/**
* Carry over results of the request into a local storage.
* @param lock A lock on QservMgtRequest::_mtx must be acquired by a caller of the method.
* @param info The data string returned by a worker.
*/
void _setInfo(replica::Lock const& lock, std::string const& info);

// Input parameters

std::string const _data;
CallbackType _onFinish; ///< this object is reset after finishing the request

/// A request to the remote services
xrdreq::GetDbStatusQservRequest::Ptr _qservRequest;

/// The info object returned by the Qserv worker
nlohmann::json _info;
};

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_GETDBSTATUSQSERVMGTREQUEST_H
34 changes: 34 additions & 0 deletions src/replica/QservMgtServices.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,40 @@ GetStatusQservMgtRequest::Ptr QservMgtServices::status(std::string const& worker
return request;
}

GetDbStatusQservMgtRequest::Ptr QservMgtServices::databaseStatus(
std::string const& worker, std::string const& jobId,
GetDbStatusQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) {
GetDbStatusQservMgtRequest::Ptr request;

// Make sure the XROOTD/SSI service is available before attempting
// any operations on requests

XrdSsiService* service = _xrdSsiService();
if (not service) {
return request;
} else {
replica::Lock lock(_mtx, "QservMgtServices::" + string(__func__));

auto const manager = shared_from_this();

request = GetDbStatusQservMgtRequest::create(
serviceProvider(), worker,
[manager](QservMgtRequest::Ptr const& request) { manager->_finish(request->id()); });

// Register the request (along with its callback) by its unique
// identifier in the local registry. Once it's complete it'll
// be automatically removed from the Registry.
_registry[request->id()] =
make_shared<QservMgtRequestWrapperImpl<GetDbStatusQservMgtRequest>>(request, onFinish);
}

// Initiate the request in the lock-free zone to avoid blocking the service
// from initiating other requests which this one is starting.
request->start(service, jobId, requestExpirationIvalSec);

return request;
}

void QservMgtServices::_finish(string const& id) {
string const context = id + " QservMgtServices::" + string(__func__) + " ";

Expand Down
18 changes: 18 additions & 0 deletions src/replica/QservMgtServices.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
// Qserv headers
#include "replica/AddReplicaQservMgtRequest.h"
#include "replica/GetReplicasQservMgtRequest.h"
#include "replica/GetDbStatusQservMgtRequest.h"
#include "replica/GetStatusQservMgtRequest.h"
#include "replica/RemoveReplicaQservMgtRequest.h"
#include "replica/ServiceProvider.h"
Expand Down Expand Up @@ -218,6 +219,23 @@ class QservMgtServices : public std::enable_shared_from_this<QservMgtServices> {
GetStatusQservMgtRequest::CallbackType const& onFinish = nullptr,
unsigned int requestExpirationIvalSec = 0);

/**
* Request detailed status on the database service of a Qserv worker
*
* @param worker The name of a worker.
* @param jobId An optional identifier of a job specifying a context in which
* a request will be executed.
* @param onFinish A callback function to be called upon request completion.
* @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing
* to override the default value of the corresponding parameter from the Configuration.
* @return A pointer to the request object if the request was made. Return
* nullptr otherwise.
*/
GetDbStatusQservMgtRequest::Ptr databaseStatus(
std::string const& worker, std::string const& jobId = "",
GetDbStatusQservMgtRequest::CallbackType const& onFinish = nullptr,
unsigned int requestExpirationIvalSec = 0);

private:
/**
* @param serviceProvider Is required for accessing configuration parameters.
Expand Down
1 change: 1 addition & 0 deletions src/wpublish/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ target_sources(wpublish PRIVATE
ChunkInventory.cc
ChunkListCommand.cc
GetChunkListCommand.cc
GetDbStatusCommand.cc
GetStatusCommand.cc
QueriesAndChunks.cc
RemoveChunkGroupCommand.cc
Expand Down
Loading

0 comments on commit d0976ac

Please sign in to comment.