From d0cbd50a0b71ecdd64a12acd33b6ee24647179a2 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Tue, 14 Jan 2025 16:57:56 +0100 Subject: [PATCH 01/10] fix(remoteFDB): consted connection read/write --- src/fdb5/remote/Connection.cc | 37 ++++---- src/fdb5/remote/Connection.h | 47 +++++++---- src/fdb5/remote/client/Client.cc | 42 +++++++--- src/fdb5/remote/client/Client.h | 33 +++++--- src/fdb5/remote/client/ClientConnection.cc | 47 +++++++---- src/fdb5/remote/client/ClientConnection.h | 34 ++++---- src/fdb5/remote/client/RemoteStore.cc | 9 +- src/fdb5/remote/server/ServerConnection.cc | 98 ++++++++++++++-------- src/fdb5/remote/server/ServerConnection.h | 23 ++--- 9 files changed, 231 insertions(+), 139 deletions(-) diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index f4468f85f..bfacf9e34 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -3,13 +3,13 @@ #include "fdb5/LibFdb5.h" #include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -Connection::Connection() : single_(false) {} -Connection::~Connection() {} +Connection::Connection() : single_(false) { } void Connection::teardown() { @@ -19,20 +19,20 @@ void Connection::teardown() { // all done - disconnecting Connection::write(Message::Exit, false, 0, 0); } catch(...) { - // if connection is already down, no need to escalate + // if connection is already down, no need to escalate } } try { // all done - disconnecting Connection::write(Message::Exit, true, 0, 0); } catch(...) { - // if connection is already down, no need to escalate + // if connection is already down, no need to escalate } } //---------------------------------------------------------------------------------------------------------------------- -void Connection::writeUnsafe(bool control, const void* data, size_t length) { +void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const { long written = 0; if (control || single_) { written = controlSocket().write(data, length); @@ -51,7 +51,7 @@ void Connection::writeUnsafe(bool control, const void* data, size_t length) { } } -void Connection::readUnsafe(bool control, void* data, size_t length) { +void Connection::readUnsafe(bool control, void* data, size_t length) const { long read = 0; if (control || single_) { read = controlSocket().read(data, length); @@ -70,7 +70,7 @@ void Connection::readUnsafe(bool control, void* data, size_t length) { } } -eckit::Buffer Connection::read(bool control, MessageHeader& hdr) { +eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { eckit::FixedString<4> tail; std::lock_guard lock((control || single_) ? readControlMutex_ : readDataMutex_); @@ -99,11 +99,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) { return payload; } -void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) { - write(msg, control, clientID, requestID, std::vector>{{data, length}}); -} - -void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector> data) { +void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const { uint32_t payloadLength = 0; for (auto d: data) { @@ -123,16 +119,25 @@ void Connection::write(remote::Message msg, bool control, uint32_t clientID, uin writeUnsafe(control, &EndMarker, sizeof(EndMarker)); } -void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) { +void Connection::write(Message msg, + const bool control, + const uint32_t clientID, + const uint32_t requestID, + const void* data, + const uint32_t length) const { + write(msg, control, clientID, requestID, {{data, length}}); +} + +void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, std::vector>{{msg.c_str(), msg.length()}}); + write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}}); } -eckit::Buffer Connection::readControl(MessageHeader& hdr) { +eckit::Buffer Connection::readControl(MessageHeader& hdr) const { return read(true, hdr); } -eckit::Buffer Connection::readData(MessageHeader& hdr) { +eckit::Buffer Connection::readData(MessageHeader& hdr) const { return read(false, hdr); } diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 560f8b01c..10393cc0f 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,11 +10,17 @@ #pragma once +#include "fdb5/remote/Messages.h" + #include "eckit/exception/Exceptions.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" -#include "fdb5/remote/Messages.h" +#include +#include +#include +#include +#include namespace eckit { @@ -40,41 +46,46 @@ class TCPException : public eckit::Exception { class Connection : eckit::NonCopyable { +public: // types + using Payload = std::vector>; + public: // methods Connection(); - virtual ~Connection(); - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length); - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector> data = {}); + virtual ~Connection() = default; + + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const; - void error(const std::string& msg, uint32_t clientID, uint32_t requestID); + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const; - eckit::Buffer readControl(MessageHeader& hdr); - eckit::Buffer readData(MessageHeader& hdr); + void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const; + + eckit::Buffer readControl(MessageHeader& hdr) const; + + eckit::Buffer readData(MessageHeader& hdr) const; void teardown(); private: // methods + eckit::Buffer read(bool control, MessageHeader& hdr) const; + + void writeUnsafe(bool control, const void* data, size_t length) const; - eckit::Buffer read(bool control, MessageHeader& hdr); + void readUnsafe(bool control, void* data, size_t length) const; - void writeUnsafe(bool control, const void* data, size_t length); - void readUnsafe(bool control, void* data, size_t length); + virtual const eckit::net::TCPSocket& controlSocket() const = 0; - virtual eckit::net::TCPSocket& controlSocket() = 0; - virtual eckit::net::TCPSocket& dataSocket() = 0; + virtual const eckit::net::TCPSocket& dataSocket() const = 0; protected: // members bool single_; private: // members - - std::mutex controlMutex_; - std::mutex dataMutex_; - std::mutex readControlMutex_; - std::mutex readDataMutex_; - + mutable std::mutex controlMutex_; + mutable std::mutex dataMutex_; + mutable std::mutex readControlMutex_; + mutable std::mutex readDataMutex_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index d443e37ea..123d7ccfa 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -8,12 +8,23 @@ * does it submit to any jurisdiction. */ - -#include "fdb5/LibFdb5.h" - #include "fdb5/remote/client/Client.h" + +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnectionRouter.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/io/Buffer.h" +#include "eckit/net/Endpoint.h" + +#include +#include +#include +#include +#include +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -44,29 +55,34 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const void* payload, uint32_t payloadLength) { +void Client::controlWriteCheckResponse(Message msg, + uint32_t requestID, + bool dataListener, + const void* payload, + uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - std::vector> data; - if (payloadLength) { - data.push_back(std::make_pair(payload, payloadLength)); - } + Connection::Payload data; + if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); } - std::future f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, const void* payload, uint32_t payloadLength) { +eckit::Buffer Client::controlWriteReadResponse(Message msg, + uint32_t requestID, + const void* payload, + uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - - std::vector> data{}; + + Connection::Payload data {}; if (payloadLength) { data.push_back(std::make_pair(payload, payloadLength)); } @@ -76,7 +92,7 @@ eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID, return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, std::vector> data) { +void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) { connection_.dataWrite(*this, msg, requestID, data); } diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 689e42607..4e1bc4e6f 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -10,11 +10,10 @@ #pragma once -#include "eckit/config/Configuration.h" #include "eckit/memory/NonCopyable.h" #include "eckit/net/Endpoint.h" -#include "fdb5/database/Key.h" +#include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" @@ -33,28 +32,40 @@ class RemoteFDBException : public eckit::RemoteException { class Client : eckit::NonCopyable { public: Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); + Client(const std::vector>& endpoints); + virtual ~Client(); uint32_t clientId() const { return id_; } + uint32_t id() const { return id_; } + const eckit::net::Endpoint& controlEndpoint() const { return connection_.controlEndpoint(); } + const std::string& defaultEndpoint() const { return connection_.defaultEndpoint(); } - uint32_t generateRequestID() { return connection_.generateRequestID(); } + uint32_t generateRequestID() const { return connection_.generateRequestID(); } // blocking requests - void controlWriteCheckResponse(Message msg, uint32_t requestID, bool dataListener, const void* payload=nullptr, uint32_t payloadLength=0); - eckit::Buffer controlWriteReadResponse (Message msg, uint32_t requestID, const void* payload=nullptr, uint32_t payloadLength=0); + void controlWriteCheckResponse(Message msg, + uint32_t requestID, + bool dataListener, + const void* payload = nullptr, + uint32_t payloadLength = 0) const; + + eckit::Buffer controlWriteReadResponse(Message msg, + uint32_t requestID, + const void* payload = nullptr, + uint32_t payloadLength = 0) const; + + void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {}); - void dataWrite(remote::Message msg, uint32_t requestID, std::vector> data={}); - // handlers for incoming messages - to be defined in the client class - virtual bool handle(Message message, uint32_t requestID) = 0; + virtual bool handle(Message message, uint32_t requestID) = 0; virtual bool handle(Message message, uint32_t requestID, eckit::Buffer&& payload) = 0; protected: - ClientConnection& connection_; private: @@ -63,7 +74,7 @@ class Client : eckit::NonCopyable { private: uint32_t id_; - std::mutex blockingRequestMutex_; + mutable std::mutex blockingRequestMutex_; }; -} \ No newline at end of file +} // namespace fdb5::remote diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index f85707156..689444edb 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -1,23 +1,33 @@ - -#include -#include +#include "fdb5/remote/client/ClientConnection.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/client/ClientConnectionRouter.h" #include "eckit/config/LocalConfiguration.h" #include "eckit/config/Resource.h" +#include "eckit/container/Queue.h" +#include "eckit/exception/Exceptions.h" #include "eckit/io/Buffer.h" #include "eckit/log/Bytes.h" +#include "eckit/log/CodeLocation.h" #include "eckit/log/Log.h" -#include "eckit/message/Message.h" -#include "eckit/runtime/Main.h" +#include "eckit/net/Endpoint.h" +#include "eckit/runtime/SessionID.h" #include "eckit/serialisation/MemoryStream.h" -#include "eckit/utils/Translator.h" -#include "fdb5/LibFdb5.h" -#include "fdb5/remote/Messages.h" -#include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/remote/client/ClientConnection.h" -#include "fdb5/remote/client/ClientConnectionRouter.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fdb5::remote { @@ -162,9 +172,13 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { return conf; } -// ----------------------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------------------------------------- -std::future ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector> data) { +std::future ClientConnection::controlWrite(const Client& client, + Message msg, + const uint32_t requestID, + const bool /*dataListener*/, + Payload data) const { std::future f; { std::lock_guard lock(promisesMutex_); @@ -176,11 +190,12 @@ std::future ClientConnection::controlWrite(Client& client, Messag return f; } -void ClientConnection::dataWrite(DataWriteRequest& r) { - Connection::write(r.msg_, false, r.client_->clientId(), r.id_, r.data_.data(), r.data_.size()); +void ClientConnection::dataWrite(DataWriteRequest& request) const { + Connection::write(request.msg_, false, request.client_->clientId(), request.id_, request.data_.data(), + request.data_.size()); } -void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, std::vector> data) { +void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, Payload data) { static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); { diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index f24cb8112..9a5bb870f 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -10,19 +10,19 @@ #pragma once -#include -#include +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "eckit/config/LocalConfiguration.h" #include "eckit/container/Queue.h" #include "eckit/io/Buffer.h" -#include "eckit/io/Length.h" +#include "eckit/net/Endpoint.h" #include "eckit/net/TCPClient.h" -#include "eckit/net/TCPStream.h" +#include "eckit/net/TCPSocket.h" #include "eckit/runtime/SessionID.h" -#include "fdb5/remote/Messages.h" -#include "fdb5/remote/Connection.h" +#include +#include namespace fdb5::remote { @@ -35,11 +35,15 @@ class DataWriteRequest; class ClientConnection : protected Connection { public: // methods + ~ClientConnection() override; - virtual ~ClientConnection(); + std::future controlWrite(const Client& client, + Message msg, + uint32_t requestID, + bool startDataListener, + Payload data = {}) const; - std::future controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector> data={}); - void dataWrite(Client& client, Message msg, uint32_t requestID, std::vector> data={}); + void dataWrite(Client& client, Message msg, uint32_t requestID, Payload data = {}); void add(Client& client); bool remove(uint32_t clientID); @@ -57,7 +61,7 @@ class ClientConnection : protected Connection { ClientConnection(const eckit::net::Endpoint& controlEndpoint, const std::string& defaultEndpoint); - void dataWrite(DataWriteRequest& dataWriteRequest); + void dataWrite(DataWriteRequest& request) const; // construct dictionary for protocol negotiation - to be defined in the client class eckit::LocalConfiguration availableFunctionality() const; @@ -73,8 +77,9 @@ class ClientConnection : protected Connection { void listeningDataThreadLoop(); void dataWriteThreadLoop(); - eckit::net::TCPSocket& controlSocket() override { return controlClient_; } - eckit::net::TCPSocket& dataSocket() override { return dataClient_; } + const eckit::net::TCPSocket& controlSocket() const override { return controlClient_; } + + const eckit::net::TCPSocket& dataSocket() const override { return dataClient_; } private: // members @@ -104,8 +109,9 @@ class ClientConnection : protected Connection { bool controlStopping_; bool dataStopping_; - std::mutex promisesMutex_; - std::map> promises_; + mutable std::mutex promisesMutex_; + + mutable std::map> promises_; std::mutex dataWriteMutex_; std::unique_ptr> dataWriteQueue_; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d1951f77b..e86d4a55c 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -20,11 +20,12 @@ #include "eckit/serialisation/MemoryStream.h" #include "fdb5/LibFdb5.h" -#include "fdb5/rules/Rule.h" #include "fdb5/database/FieldLocation.h" -#include "fdb5/remote/client/RemoteStore.h" -#include "fdb5/remote/RemoteFieldLocation.h" #include "fdb5/io/FDBFileHandle.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/RemoteFieldLocation.h" +#include "fdb5/remote/client/RemoteStore.h" +#include "fdb5/rules/Rule.h" #include @@ -251,7 +252,7 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - std::vector> payloads; + Connection::Payload payloads; payloads.push_back(std::pair{keyBuffer, keyStream.position()}); payloads.push_back(std::pair{data, length}); diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 8eaf0c5fc..01d3c845c 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -13,33 +13,51 @@ * (Project ID: 671951) www.nextgenio.eu */ -#include -#include +#include "fdb5/remote/server/ServerConnection.h" #include "eckit/config/Resource.h" -#include "eckit/maths/Functions.h" -#include "eckit/net/Endpoint.h" -#include "eckit/runtime/Main.h" -#include "eckit/runtime/SessionID.h" -#include "eckit/serialisation/MemoryStream.h" -#include "eckit/log/Log.h" - #include "fdb5/LibFdb5.h" -#include "fdb5/fdb5_version.h" #include "fdb5/api/helpers/FDBToolRequest.h" -#include "fdb5/database/Key.h" -#include "fdb5/remote/server/AvailablePortList.h" +#include "fdb5/fdb5_version.h" +#include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" -#include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/api/FDB.h" +#include "fdb5/remote/server/AvailablePortList.h" -#include "fdb5/remote/server/ServerConnection.h" +#include "eckit/config/LocalConfiguration.h" +#include "eckit/exception/Exceptions.h" +#include "eckit/log/CodeLocation.h" +#include "eckit/log/Log.h" +#include "eckit/net/Endpoint.h" +#include "eckit/net/TCPServer.h" +#include "eckit/net/TCPSocket.h" +#include "eckit/runtime/SessionID.h" +#include "eckit/serialisation/MemoryStream.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- // helpers + namespace { +constexpr const auto defaultRetrieveQueueSize = 10000; +constexpr const auto defaultArchiveQueueSize = 320; + std::vector intersection(const eckit::LocalConfiguration& c1, const eckit::LocalConfiguration& c2, const std::string& field){ std::vector v1 = c1.getIntVector(field); @@ -57,13 +75,14 @@ std::vector intersection(const eckit::LocalConfiguration& c1, const eckit:: } // namespace -ServerConnection::ServerConnection(eckit::net::TCPSocket& socket, const Config& config) : - Connection(), config_(config), - dataListenHostname_(config.getString("dataListenHostname", "")), - readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", 10000)), - archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", 320)), - controlSocket_(socket), numControlConnection_(0), numDataConnection_(0), - dataSocket_(nullptr), dataListener_(0) { +//---------------------------------------------------------------------------------------------------------------------- + +ServerConnection::ServerConnection(eckit::net::TCPSocket& socket, const Config& config) + : config_(config), + dataListenHostname_(config.getString("dataListenHostname", "")), + readLocationQueue_(eckit::Resource("fdbRetrieveQueueSize", defaultRetrieveQueueSize)), + archiveQueue_(eckit::Resource("fdbServerMaxQueueSize", defaultArchiveQueueSize)), + controlSocket_(socket) { LOG_DEBUG_LIB(LibFdb5) << "ServerConnection::ServerConnection initialized" << std::endl; } @@ -75,10 +94,22 @@ ServerConnection::~ServerConnection() { if (archiveFuture_.valid()) { archiveFuture_.wait(); } - + eckit::Log::info() << "Done" << std::endl; } +// //---------------------------------------------------------------------------------------------------------------------- +// +// uint32_t ServerConnection::writeSocketControl(const void* buf, const uint32_t length) { +// return controlSocket_.write(buf, length); +// } +// +// uint32_t ServerConnection::writeSocketData(const void* buf, const uint32_t length) { +// ASSERT(dataSocket_); +// return dataSocket_->write(buf, length); +// } + +//---------------------------------------------------------------------------------------------------------------------- Handled ServerConnection::handleData(Message message, uint32_t clientID, uint32_t requestID) { try { @@ -187,7 +218,6 @@ void ServerConnection::initialiseConnections() { agreedConf_ = eckit::LocalConfiguration(); bool compatibleProtocol = true; - std::vector rflCommon = intersection(clientAvailableFunctionality, serverConf, "RemoteFieldLocation"); if (rflCommon.size() > 0) { LOG_DEBUG_LIB(LibFdb5) << "Protocol negotiation - RemoteFieldLocation version " << rflCommon.back() << std::endl; @@ -291,7 +321,7 @@ void ServerConnection::initialiseConnections() { std::stringstream ss; ss << "Session IDs do not match: " << serverSession << " != " << sessionID_; throw eckit::BadValue(ss.str(), Here()); - } + } } if (!errorMsg.empty()) { @@ -360,8 +390,6 @@ size_t ServerConnection::archiveThreadLoop() { return totalArchived; } - - void ServerConnection::listeningThreadLoopData() { MessageHeader hdr; @@ -382,7 +410,7 @@ void ServerConnection::listeningThreadLoopData() { break; } else { - + Handled handled; if (payload.size() == 0) { handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); @@ -396,10 +424,8 @@ void ServerConnection::listeningThreadLoopData() { case Handled::YesRemoveReadListener: { std::lock_guard lock(handlerMutex_); - dataListener_--; - if (dataListener_ == 0) { - return; - } + numDataListener_--; + if (numDataListener_ == 0) { return; } break; } case Handled::Replied: // nothing to do @@ -428,7 +454,7 @@ void ServerConnection::listeningThreadLoopData() { void ServerConnection::handle() { initialiseConnections(); - + std::thread listeningThreadData; MessageHeader hdr; @@ -469,7 +495,6 @@ void ServerConnection::handle() { handled = handleData(hdr.message, hdr.clientID(), hdr.requestID); } } - switch (handled) { @@ -479,8 +504,8 @@ void ServerConnection::handle() { case Handled::YesAddReadListener: { std::lock_guard lock(handlerMutex_); - dataListener_++; - if (dataListener_ == 1 && !single_) { + numDataListener_++; + if (numDataListener_ == 1 && !single_) { listeningThreadData = std::thread([this] { listeningThreadLoopData(); }); } } @@ -540,7 +565,6 @@ void ServerConnection::tidyWorkers() { } } - void ServerConnection::archiver() { // Ensure that we aren't already running a catalogue/store diff --git a/src/fdb5/remote/server/ServerConnection.h b/src/fdb5/remote/server/ServerConnection.h index 66ef81f56..6a4ba27e6 100644 --- a/src/fdb5/remote/server/ServerConnection.h +++ b/src/fdb5/remote/server/ServerConnection.h @@ -19,8 +19,8 @@ #include #include -#include +#include "eckit/container/Queue.h" #include "eckit/io/Buffer.h" #include "eckit/io/DataHandle.h" #include "eckit/net/TCPServer.h" @@ -66,7 +66,7 @@ struct readLocationElem { std::unique_ptr readLocation; readLocationElem() : clientID(0), requestID(0), readLocation(nullptr) {} - + readLocationElem(uint32_t clientID, uint32_t requestID, std::unique_ptr readLocation) : clientID(clientID), requestID(requestID), readLocation(std::move(readLocation)) {} }; @@ -105,14 +105,15 @@ class ServerConnection : public Connection, public Handler { // socket methods int selectDataPort(); eckit::LocalConfiguration availableFunctionality() const; - + // Worker functionality void tidyWorkers(); void waitForWorkers(); // archival thread size_t archiveThreadLoop(); - virtual void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) = 0; + + virtual void archiveBlob(uint32_t clientID, uint32_t requestID, const void* data, size_t length) = 0; // archival helper methods void archiver(); @@ -124,8 +125,9 @@ class ServerConnection : public Connection, public Handler { void listeningThreadLoopData(); - eckit::net::TCPSocket& controlSocket() override { return controlSocket_; } - eckit::net::TCPSocket& dataSocket() override { + const eckit::net::TCPSocket& controlSocket() const override { return controlSocket_; } + + const eckit::net::TCPSocket& dataSocket() const override { ASSERT(dataSocket_); return *dataSocket_; } @@ -143,7 +145,7 @@ class ServerConnection : public Connection, public Handler { eckit::LocalConfiguration agreedConf_; std::mutex readLocationMutex_; std::thread readLocationWorker_; - + std::map> workerThreads_; eckit::Queue archiveQueue_; std::future archiveFuture_; @@ -151,8 +153,8 @@ class ServerConnection : public Connection, public Handler { eckit::net::TCPSocket controlSocket_; std::mutex handlerMutex_; - size_t numControlConnection_; - size_t numDataConnection_; + size_t numControlConnection_ {0}; + size_t numDataConnection_ {0}; private: @@ -160,7 +162,8 @@ class ServerConnection : public Connection, public Handler { // data connection std::unique_ptr dataSocket_; - size_t dataListener_; + + size_t numDataListener_ {0}; }; //---------------------------------------------------------------------------------------------------------------------- From e28bde68dcf478b993b5e214165ab5fab66f8eff Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Tue, 14 Jan 2025 20:03:02 +0100 Subject: [PATCH 02/10] chore(remoteFDB): removed commented code --- src/fdb5/remote/server/ServerConnection.cc | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 01d3c845c..1a6d540a3 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -98,17 +98,6 @@ ServerConnection::~ServerConnection() { eckit::Log::info() << "Done" << std::endl; } -// //---------------------------------------------------------------------------------------------------------------------- -// -// uint32_t ServerConnection::writeSocketControl(const void* buf, const uint32_t length) { -// return controlSocket_.write(buf, length); -// } -// -// uint32_t ServerConnection::writeSocketData(const void* buf, const uint32_t length) { -// ASSERT(dataSocket_); -// return dataSocket_->write(buf, length); -// } - //---------------------------------------------------------------------------------------------------------------------- Handled ServerConnection::handleData(Message message, uint32_t clientID, uint32_t requestID) { From 59f6bc917c2e2a06ae159faefb4e94ad16a0e0ca Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Wed, 15 Jan 2025 15:51:08 +0100 Subject: [PATCH 03/10] fix(remoteFDB): Message class type payload and other improvements --- src/fdb5/remote/Connection.cc | 49 +++++++++--------- src/fdb5/remote/Connection.h | 21 ++++---- src/fdb5/remote/Messages.cc | 4 +- src/fdb5/remote/Messages.h | 59 ++++++++++++---------- src/fdb5/remote/client/Client.cc | 36 +++++++------ src/fdb5/remote/client/Client.h | 8 ++- src/fdb5/remote/client/ClientConnection.cc | 32 ++++++------ src/fdb5/remote/client/ClientConnection.h | 6 +-- src/fdb5/remote/client/RemoteCatalogue.cc | 43 ++++++++++++---- src/fdb5/remote/client/RemoteCatalogue.h | 16 ++++-- src/fdb5/remote/client/RemoteStore.cc | 50 ++++++++++++------ src/fdb5/remote/server/CatalogueHandler.cc | 19 ++++--- src/fdb5/remote/server/CatalogueHandler.h | 7 ++- src/fdb5/remote/server/ServerConnection.cc | 10 ++-- src/fdb5/remote/server/StoreHandler.cc | 10 ++-- 15 files changed, 222 insertions(+), 148 deletions(-) diff --git a/src/fdb5/remote/Connection.cc b/src/fdb5/remote/Connection.cc index bfacf9e34..796b423e7 100644 --- a/src/fdb5/remote/Connection.cc +++ b/src/fdb5/remote/Connection.cc @@ -4,6 +4,9 @@ #include "fdb5/LibFdb5.h" #include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" +#include +#include +#include namespace fdb5::remote { @@ -32,7 +35,7 @@ void Connection::teardown() { //---------------------------------------------------------------------------------------------------------------------- -void Connection::writeUnsafe(const bool control, const void* data, const size_t length) const { +void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const { long written = 0; if (control || single_) { written = controlSocket().write(data, length); @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) const { } } -eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { +eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const { eckit::FixedString<4> tail; std::lock_guard lock((control || single_) ? readControlMutex_ : readDataMutex_); readUnsafe(control, &hdr, sizeof(hdr)); - ASSERT(hdr.marker == StartMarker); - ASSERT(hdr.version == CurrentVersion); + ASSERT(hdr.marker == MessageHeader::StartMarker); + ASSERT(hdr.version == MessageHeader::currentVersion); ASSERT(single_ || hdr.control() == control); eckit::Buffer payload{hdr.payloadSize}; @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { } // Ensure we have consumed exactly the correct amount from the socket. readUnsafe(control, &tail, sizeof(tail)); - ASSERT(tail == EndMarker); + ASSERT(tail == MessageHeader::EndMarker); if (hdr.message == Message::Error) { @@ -99,38 +102,36 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) const { return payload; } -void Connection::write(Message msg, const bool control, const uint32_t clientID, const uint32_t requestID, Payload data) const { +void Connection::write(const Message msg, + const bool control, + const uint32_t clientID, + const uint32_t requestID, + const PayloadList payloads) const { uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (const auto& payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } MessageHeader message{msg, control, clientID, requestID, payloadLength}; - LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() + << ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size() + << ",payloadLength=" << payloadLength << "]" << std::endl; std::lock_guard lock((control || single_) ? controlMutex_ : dataMutex_); + writeUnsafe(control, &message, sizeof(message)); - for (auto d: data) { - writeUnsafe(control, d.first, d.second); - } - writeUnsafe(control, &EndMarker, sizeof(EndMarker)); -} -void Connection::write(Message msg, - const bool control, - const uint32_t clientID, - const uint32_t requestID, - const void* data, - const uint32_t length) const { - write(msg, control, clientID, requestID, {{data, length}}); + for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); } + + writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes); } -void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) const { +void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const { eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl; - write(Message::Error, false, clientID, requestID, {{msg.c_str(), msg.length()}}); + write(Message::Error, false, clientID, requestID, msg.data(), msg.length()); } eckit::Buffer Connection::readControl(MessageHeader& hdr) const { diff --git a/src/fdb5/remote/Connection.h b/src/fdb5/remote/Connection.h index 10393cc0f..5dad425c0 100644 --- a/src/fdb5/remote/Connection.h +++ b/src/fdb5/remote/Connection.h @@ -10,16 +10,18 @@ #pragma once +#include "eckit/serialisation/MemoryStream.h" #include "fdb5/remote/Messages.h" #include "eckit/exception/Exceptions.h" #include "eckit/net/TCPSocket.h" #include "eckit/os/BackTrace.h" +#include #include #include #include -#include +#include #include namespace eckit { @@ -47,18 +49,20 @@ class TCPException : public eckit::Exception { class Connection : eckit::NonCopyable { public: // types - using Payload = std::vector>; + using PayloadList = std::vector; public: // methods Connection(); virtual ~Connection() = default; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, Payload data = {}) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const; - void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const; + void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const { + write(msg, control, clientID, requestID, {{length, data}}); + } - void error(const std::string& msg, uint32_t clientID, uint32_t requestID) const; + void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const; eckit::Buffer readControl(MessageHeader& hdr) const; @@ -66,7 +70,7 @@ class Connection : eckit::NonCopyable { void teardown(); -private: // methods +private: // methods eckit::Buffer read(bool control, MessageHeader& hdr) const; void writeUnsafe(bool control, const void* data, size_t length) const; @@ -77,11 +81,10 @@ class Connection : eckit::NonCopyable { virtual const eckit::net::TCPSocket& dataSocket() const = 0; -protected: // members - +protected: // members bool single_; -private: // members +private: // members mutable std::mutex controlMutex_; mutable std::mutex dataMutex_; mutable std::mutex readControlMutex_; diff --git a/src/fdb5/remote/Messages.cc b/src/fdb5/remote/Messages.cc index 5e96fdab6..04e316312 100644 --- a/src/fdb5/remote/Messages.cc +++ b/src/fdb5/remote/Messages.cc @@ -15,8 +15,6 @@ #include "fdb5/remote/Messages.h" -using namespace eckit; - namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -62,7 +60,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) { MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) : marker(StartMarker), - version(CurrentVersion), + version(currentVersion), message(message), clientID_((clientID<<1) + (control ? 1 : 0)), requestID(requestID), diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index ce203b39b..6f3a2ee1d 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -18,8 +18,9 @@ #pragma once -#include #include +#include +#include #include "eckit/types/FixedString.h" @@ -31,10 +32,12 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -const static eckit::FixedString<4> StartMarker {"SFDB"}; -const static eckit::FixedString<4> EndMarker {"EFDB"}; +struct Payload { + Payload(std::size_t length, const void* data) : length {length}, data {data} { } -constexpr uint16_t CurrentVersion = 12; + std::size_t length {0}; + const void* data {nullptr}; +}; enum class Message : uint16_t { @@ -76,21 +79,31 @@ enum class Message : uint16_t { std::ostream& operator<<(std::ostream& s, const Message& m); +//---------------------------------------------------------------------------------------------------------------------- + // Header used for all messages class MessageHeader { -public: // methods +public: // types + constexpr static uint16_t currentVersion = 12; + + constexpr static const auto hashBytes = 16; + + constexpr static const auto markerBytes = 4; + + using MarkerType = eckit::FixedString; + + using HashType = eckit::FixedString; - MessageHeader() : - version(CurrentVersion), - message(Message::None), - clientID_(0), - requestID(0), - payloadSize(0) {} + inline static const MarkerType StartMarker {"SFDB"}; + inline static const MarkerType EndMarker {"EFDB"}; + +public: // methods + MessageHeader() = default; MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize); - + bool control() const { return ((clientID_ & 0x00000001) == 1); } @@ -99,21 +112,13 @@ class MessageHeader { } public: - - eckit::FixedString<4> marker; // 4 bytes --> 4 - - uint16_t version; // 2 bytes --> 6 - - Message message; // 2 bytes --> 8 - - uint32_t clientID_; // 4 bytes --> 12 - - uint32_t requestID; // 4 bytes --> 16 - - uint32_t payloadSize; // 4 bytes --> 20 - - eckit::FixedString<16> hash; // 16 bytes --> 36 - + MarkerType marker; // 4 bytes --> 4 + uint16_t version {currentVersion}; // 2 bytes --> 6 + Message message {Message::None}; // 2 bytes --> 8 + uint32_t clientID_ {0}; // 4 bytes --> 12 + uint32_t requestID {0}; // 4 bytes --> 16 + uint32_t payloadSize {0}; // 4 bytes --> 20 + HashType hash; // 16 bytes --> 36 }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 123d7ccfa..44257edb9 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -55,45 +55,43 @@ Client::~Client() { connection_.remove(id_); } -void Client::controlWriteCheckResponse(Message msg, - uint32_t requestID, - bool dataListener, - const void* payload, - uint32_t payloadLength) const { +void Client::controlWriteCheckResponse(const Message msg, + const uint32_t requestID, + const bool dataListener, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data; - if (payloadLength != 0) { data.push_back(std::make_pair(payload, payloadLength)); } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - auto f = connection_.controlWrite(*this, msg, requestID, dataListener, data); + auto f = connection_.controlWrite(*this, msg, requestID, dataListener, payloads); f.wait(); ASSERT(f.get().size() == 0); } -eckit::Buffer Client::controlWriteReadResponse(Message msg, - uint32_t requestID, - const void* payload, - uint32_t payloadLength) const { +eckit::Buffer Client::controlWriteReadResponse(const Message msg, + const uint32_t requestID, + const void* const payload, + const uint32_t payloadLength) const { ASSERT(requestID); ASSERT(!(!payloadLength ^ !payload)); std::lock_guard lock(blockingRequestMutex_); - Connection::Payload data {}; - if (payloadLength) { - data.push_back(std::make_pair(payload, payloadLength)); - } + PayloadList payloads; + if (payloadLength > 0) { payloads.emplace_back(payloadLength, payload); } - std::future f = connection_.controlWrite(*this, msg, requestID, false, data); + auto f = connection_.controlWrite(*this, msg, requestID, false, payloads); f.wait(); return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, Connection::Payload data) { - connection_.dataWrite(*this, msg, requestID, data); +void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) { + connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 4e1bc4e6f..360daa72b 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -17,6 +17,10 @@ #include "fdb5/remote/Messages.h" #include "fdb5/remote/client/ClientConnection.h" +#include +#include // std::pair +#include + namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- @@ -30,6 +34,8 @@ class RemoteFDBException : public eckit::RemoteException { //---------------------------------------------------------------------------------------------------------------------- class Client : eckit::NonCopyable { + using PayloadList = Connection::PayloadList; + public: Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); @@ -59,7 +65,7 @@ class Client : eckit::NonCopyable { const void* payload = nullptr, uint32_t payloadLength = 0) const; - void dataWrite(Message msg, uint32_t requestID, Connection::Payload data = {}); + void dataWrite(Message msg, uint32_t requestID, PayloadList payloads = {}); // handlers for incoming messages - to be defined in the client class virtual bool handle(Message message, uint32_t requestID) = 0; diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 689444edb..1a0fe06d9 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -175,17 +176,17 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const { //---------------------------------------------------------------------------------------------------------------------- std::future ClientConnection::controlWrite(const Client& client, - Message msg, + const Message msg, const uint32_t requestID, const bool /*dataListener*/, - Payload data) const { + const PayloadList payloads) const { std::future f; { std::lock_guard lock(promisesMutex_); auto pp = promises_.emplace(requestID, std::promise{}).first; f = pp->second.get_future(); } - Connection::write(msg, true, client.clientId(), requestID, data); + Connection::write(msg, true, client.clientId(), requestID, payloads); return f; } @@ -195,42 +196,43 @@ void ClientConnection::dataWrite(DataWriteRequest& request) const { request.data_.size()); } -void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, Payload data) { +void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t requestID, PayloadList payloads) { static size_t maxQueueLength = eckit::Resource("fdbDataWriteQueueLength;$FDB_DATA_WRITE_QUEUE_LENGTH", 320); + { // retrieve or add client to the list - std::lock_guard lock(clientsMutex_); - auto it = clients_.find(client.clientId()); - ASSERT(it != clients_.end()); + std::lock_guard lock(clientsMutex_); + ASSERT(clients_.find(client.clientId()) != clients_.end()); } + { std::lock_guard lock(dataWriteMutex_); if (!dataWriteThread_.joinable()) { // Reset the queue after previous done/errors ASSERT(!dataWriteQueue_); - dataWriteQueue_.reset(new eckit::Queue{maxQueueLength}); + dataWriteQueue_ = std::make_unique>(maxQueueLength); dataWriteThread_ = std::thread([this] { dataWriteThreadLoop(); }); } } + uint32_t payloadLength = 0; - for (auto d: data) { - ASSERT(d.first); - payloadLength += d.second; + for (auto payload : payloads) { + ASSERT(payload.data); + payloadLength += payload.length; } eckit::Buffer buffer{payloadLength}; uint32_t offset = 0; - for (auto d: data) { - buffer.copy(d.first, d.second, offset); - offset += d.second; + for (auto payload : payloads) { + buffer.copy(payload.data, payload.length, offset); + offset += payload.length; } dataWriteQueue_->emplace(&client, msg, requestID, std::move(buffer)); } - void ClientConnection::dataWriteThreadLoop() { eckit::Timer timer; diff --git a/src/fdb5/remote/client/ClientConnection.h b/src/fdb5/remote/client/ClientConnection.h index 9a5bb870f..d2e362fc6 100644 --- a/src/fdb5/remote/client/ClientConnection.h +++ b/src/fdb5/remote/client/ClientConnection.h @@ -40,10 +40,10 @@ class ClientConnection : protected Connection { std::future controlWrite(const Client& client, Message msg, uint32_t requestID, - bool startDataListener, - Payload data = {}) const; + bool /*dataListener*/, + PayloadList payload = {}) const; - void dataWrite(Client& client, Message msg, uint32_t requestID, Payload data = {}); + void dataWrite(Client& client, Message msg, uint32_t requestID, PayloadList payloads = {}); void add(Client& client); bool remove(uint32_t clientID); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index e0a93d854..0b813102b 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -8,18 +8,35 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" -#include "eckit/log/Log.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteCatalogue.h" #include "fdb5/LibFdb5.h" -#include "fdb5/remote/client/RemoteCatalogue.h" +#include "fdb5/database/Key.h" +#include "fdb5/remote/Messages.h" + +#include "eckit/filesystem/URI.h" +#include "eckit/log/Log.h" +#include "eckit/serialisation/MemoryStream.h" -#include +#include +#include +#include using namespace eckit; + namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + +namespace { + +constexpr size_t archivePayloadSize = 8192; +constexpr size_t keyPayloadSize = 4096; + +} + +//---------------------------------------------------------------------------------------------------------------------- + RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), @@ -49,14 +66,14 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(8192); + Buffer buffer(archivePayloadSize); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; stream << *fieldLocation; - std::vector> payloads; - payloads.push_back(std::pair{buffer, stream.position()}); + std::vector payloads; + payloads.emplace_back(stream.position(), buffer.data()); dataWrite(Message::Blob, id, payloads); @@ -124,7 +141,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(4096); + eckit::Buffer keyBuffer(keyPayloadSize); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; @@ -139,6 +156,7 @@ bool RemoteCatalogue::handle(Message message, uint32_t requestID) { Log::warning() << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << "]" << std::endl; return false; } + bool RemoteCatalogue::handle(Message message, uint32_t requestID, eckit::Buffer&& payload) { LOG_DEBUG_LIB(LibFdb5) << *this << " - Received [message=" << ((uint) message) << ",requestID=" << requestID << ",payloadSize=" << payload.size() << "]" << std::endl; return false; @@ -162,14 +180,19 @@ void RemoteCatalogue::print( std::ostream &out ) const { out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; } - std::string RemoteCatalogue::type() const { return "remote"; } + bool RemoteCatalogue::open() { return true; } +//---------------------------------------------------------------------------------------------------------------------- + static CatalogueReaderBuilder reader("remote"); static CatalogueWriterBuilder writer("remote"); + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index 1d10b9716..29631dab5 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -1,12 +1,22 @@ #pragma once -#include "fdb5/api/FDBStats.h" +#include "fdb5/api/helpers/ControlIterator.h" +#include "fdb5/config/Config.h" #include "fdb5/database/Catalogue.h" +#include "fdb5/database/DbStats.h" #include "fdb5/database/Index.h" #include "fdb5/database/Store.h" #include "fdb5/remote/client/Client.h" +#include "eckit/filesystem/URI.h" + +#include +#include +#include +#include +#include + namespace fdb5::remote { // class RemoteCatalogueArchiver; @@ -29,8 +39,8 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C void reconsolidate() override; //From CatalogueReader - DbStats stats() const override { return DbStats(); } - bool retrieve(const Key& key, Field& field) const override { return false; } + DbStats stats() const override { return {}; } + bool retrieve(const Key& /*key*/, Field& /*field*/) const override { return false; } // From Catalogue bool selectIndex(const Key& idxKey) override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index e86d4a55c..4f2fcec2d 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -8,26 +8,44 @@ * does it submit to any jurisdiction. */ -#include -#include - -#include "eckit/log/Timer.h" - -#include "eckit/config/Resource.h" -#include "eckit/io/AIOHandle.h" -#include "eckit/io/EmptyHandle.h" -#include "eckit/runtime/Main.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/client/RemoteStore.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/Field.h" #include "fdb5/database/FieldLocation.h" -#include "fdb5/io/FDBFileHandle.h" +#include "fdb5/database/Store.h" #include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/RemoteFieldLocation.h" -#include "fdb5/remote/client/RemoteStore.h" +#include "fdb5/remote/client/Client.h" #include "fdb5/rules/Rule.h" -#include +#include "eckit/exception/Exceptions.h" +#include "eckit/filesystem/URI.h" +#include "eckit/io/Length.h" +#include "eckit/io/Offset.h" +#include "eckit/log/Log.h" +#include "eckit/net/Endpoint.h" +#include "eckit/runtime/Main.h" +#include "eckit/serialisation/MemoryStream.h" +#include "eckit/serialisation/Reanimator.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include using namespace eckit; @@ -252,9 +270,9 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - Connection::Payload payloads; - payloads.push_back(std::pair{keyBuffer, keyStream.position()}); - payloads.push_back(std::pair{data, length}); + std::vector payloads; + payloads.emplace_back(keyStream.position(), keyBuffer.data()); + payloads.emplace_back(length, data); dataWrite(Message::Blob, id, payloads); diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 22cccf76b..9a3dc8f80 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -8,16 +8,22 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" +#include "fdb5/remote/server/CatalogueHandler.h" +#include "fdb5/LibFdb5.h" +#include "fdb5/api/helpers/FDBToolRequest.h" +#include "fdb5/remote/Connection.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/server/ServerConnection.h" + #include "eckit/net/NetMask.h" +#include "eckit/net/TCPSocket.h" #include "eckit/serialisation/MemoryStream.h" -#include "fdb5/LibFdb5.h" -#include "fdb5/api/helpers/FDBToolRequest.h" -#include "fdb5/remote/server/CatalogueHandler.h" +#include +#include +#include using namespace eckit; -using metkit::mars::MarsRequest; namespace fdb5::remote { @@ -161,6 +167,7 @@ struct BaseHelper { struct ListHelper : public BaseHelper { ListIterator apiCall(FDB& fdb, const FDBToolRequest& request) const { + /// @todo remember to add level_ to this helper return fdb.list(request); } }; @@ -226,7 +233,7 @@ void CatalogueHandler::forwardApiCall(uint32_t clientID, uint32_t requestID, eck typename decltype(iterator)::value_type elem; while (iterator.next(elem)) { auto encoded(helper.encode(elem, *this)); - write(Message::Blob, false, clientID, requestID, std::vector>{{encoded.buf, encoded.position}}); + write(Message::Blob, false, clientID, requestID, encoded.buf, encoded.position); } write(Message::Complete, false, clientID, requestID); } diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index f63055b3a..33fdf825e 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -10,8 +10,11 @@ #pragma once -#include "fdb5/remote/server/ServerConnection.h" #include "fdb5/api/FDB.h" +#include "fdb5/database/Catalogue.h" +#include "fdb5/remote/server/ServerConnection.h" + +#include namespace fdb5::remote { @@ -24,7 +27,7 @@ struct CatalogueArchiver { bool controlConnection; bool dataConnection; - + std::unique_ptr catalogue; size_t locationsExpected; size_t locationsArchived; diff --git a/src/fdb5/remote/server/ServerConnection.cc b/src/fdb5/remote/server/ServerConnection.cc index 1a6d540a3..85d24e25b 100644 --- a/src/fdb5/remote/server/ServerConnection.cc +++ b/src/fdb5/remote/server/ServerConnection.cc @@ -278,7 +278,7 @@ void ServerConnection::initialiseConnections() { LOG_DEBUG_LIB(LibFdb5) << "Protocol negotiation - configuration: " << agreedConf_ <>{{startupBuffer.data(), s.position()}}); + write(Message::Startup, true, 0, 0, startupBuffer.data(), s.position()); if (!single_) { ASSERT(dataSocketFuture.valid()); @@ -292,7 +292,7 @@ void ServerConnection::initialiseConnections() { MessageHeader dataHdr; eckit::Buffer payload2 = readData(dataHdr); - ASSERT(dataHdr.version == CurrentVersion); + ASSERT(dataHdr.version == MessageHeader::currentVersion); ASSERT(dataHdr.message == Message::Startup); ASSERT(dataHdr.requestID == 0); @@ -355,9 +355,9 @@ size_t ServerConnection::archiveThreadLoop() { const void* payloadData = charData; charData += hdr->payloadSize; - const decltype(EndMarker)* e = static_cast(static_cast(charData)); - ASSERT(*e == EndMarker); - charData += sizeof(EndMarker); + const auto* e = static_cast(static_cast(charData)); + ASSERT(*e == MessageHeader::EndMarker); + charData += MessageHeader::markerBytes; archiveBlob(elem.clientID_, elem.requestID_, payloadData, hdr->payloadSize); totalArchived += 1; diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 39a9cbbec..1baa108b0 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -34,7 +34,7 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t case Message::Store: // notification that the client is starting to send data for archival archiver(); return Handled::YesAddArchiveListener; - + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -132,7 +132,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request << std::endl; while ((dataRead = dh->read(writeBuffer, writeBuffer.size())) != 0) { - write(Message::Blob, false, clientID, requestID, std::vector>{{writeBuffer, dataRead}}); + write(Message::Blob, false, clientID, requestID, writeBuffer, dataRead); } // And when we are done, add a complete message. @@ -157,7 +157,7 @@ void StoreHandler::writeToParent(const uint32_t clientID, const uint32_t request void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) { - + MemoryStream s(data, length); fdb5::Key dbKey(s); @@ -168,7 +168,7 @@ void StoreHandler::archiveBlob(const uint32_t clientID, const uint32_t requestID const char* charData = static_cast(data); // To allow pointer arithmetic Log::status() << "Archiving data: " << ss_key.str() << std::endl; - + Store& ss = store(clientID, dbKey); std::shared_ptr location = ss.archive(idxKey, charData + s.position(), length - s.position()); @@ -217,7 +217,7 @@ void StoreHandler::flush(uint32_t clientID, uint32_t requestID, const eckit::Buf } bool StoreHandler::remove(bool control, uint32_t clientID) { - + std::lock_guard lock(handlerMutex_); auto it = stores_.find(clientID); if (it != stores_.end()) { From 1c573a650d59109b372271903794e31b2a277f33 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 11:24:18 +0100 Subject: [PATCH 04/10] fix(remoteFDB): cleanup remote/client --- src/fdb5/remote/client/Client.cc | 2 +- src/fdb5/remote/client/Client.h | 3 ++- src/fdb5/remote/client/RemoteCatalogue.cc | 19 +++++++------------ src/fdb5/remote/client/RemoteCatalogue.h | 12 +++++++----- src/fdb5/remote/client/RemoteStore.cc | 2 +- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/fdb5/remote/client/Client.cc b/src/fdb5/remote/client/Client.cc index 44257edb9..728f8ae9d 100644 --- a/src/fdb5/remote/client/Client.cc +++ b/src/fdb5/remote/client/Client.cc @@ -90,7 +90,7 @@ eckit::Buffer Client::controlWriteReadResponse(const Message msg, return eckit::Buffer{f.get()}; } -void Client::dataWrite(remote::Message msg, uint32_t requestID, PayloadList payloads) { +void Client::dataWrite(Message msg, uint32_t requestID, PayloadList payloads) { connection_.dataWrite(*this, msg, requestID, std::move(payloads)); } diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 360daa72b..72594a405 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -34,9 +34,10 @@ class RemoteFDBException : public eckit::RemoteException { //---------------------------------------------------------------------------------------------------------------------- class Client : eckit::NonCopyable { +public: // types using PayloadList = Connection::PayloadList; -public: +public: // methods Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); Client(const std::vector>& endpoints); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 0b813102b..fcde73a82 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -30,9 +30,8 @@ namespace fdb5::remote { namespace { -constexpr size_t archivePayloadSize = 8192; -constexpr size_t keyPayloadSize = 4096; - +constexpr size_t archiveBufferSize = 8192; +constexpr size_t keyBufferSize = 4096; } //---------------------------------------------------------------------------------------------------------------------- @@ -66,13 +65,13 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(archivePayloadSize); + Buffer buffer(archiveBufferSize); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; stream << *fieldLocation; - std::vector payloads; + PayloadList payloads; payloads.emplace_back(stream.position(), buffer.data()); dataWrite(Message::Blob, id, payloads); @@ -141,7 +140,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(keyPayloadSize); + eckit::Buffer keyBuffer(keyBufferSize); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; @@ -180,18 +179,14 @@ void RemoteCatalogue::print( std::ostream &out ) const { out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; } -std::string RemoteCatalogue::type() const { - return "remote"; -} - bool RemoteCatalogue::open() { return true; } //---------------------------------------------------------------------------------------------------------------------- -static CatalogueReaderBuilder reader("remote"); -static CatalogueWriterBuilder writer("remote"); +static CatalogueReaderBuilder reader(RemoteCatalogue::typeName()); +static CatalogueWriterBuilder writer(RemoteCatalogue::typeName()); //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index 29631dab5..b51009780 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -24,13 +24,13 @@ namespace fdb5::remote { class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public CatalogueImpl, public Client { -public: +public: // types + static const char* typeName() { return "remote"; } +public: // methods RemoteCatalogue(const Key& key, const Config& config); RemoteCatalogue(const eckit::URI& uri, const Config& config); - ~RemoteCatalogue() override = default; - // From CatalogueWriter const Index& currentIndex() override; void archive(const Key& idxKey, const Key& datumKey, std::shared_ptr fieldLocation) override; @@ -59,8 +59,10 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C std::vector indexes(bool sorted=false) const override; void maskIndexEntry(const Index& index) const override; void allMasked(std::set>& metadata, std::set& data) const override; - void print( std::ostream &out ) const override; - std::string type() const override; + void print(std::ostream& out) const override; + + std::string type() const override { return typeName(); } + bool open() override; void flush(size_t archivedFields) override; void clean() override; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 4f2fcec2d..524a90337 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -270,7 +270,7 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length keyStream << dbKey_; keyStream << key; - std::vector payloads; + PayloadList payloads; payloads.emplace_back(keyStream.position(), keyBuffer.data()); payloads.emplace_back(length, data); From 428b5549eb4d8674d11da207d1a6dded1a06e839 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 11:52:25 +0100 Subject: [PATCH 05/10] feat(remoteFDB): add client side catalogue exists client-server comm that checks server catalogue exists --- src/fdb5/remote/Messages.cc | 1 + src/fdb5/remote/Messages.h | 1 + src/fdb5/remote/client/RemoteCatalogue.cc | 20 ++++++++++++++++++-- src/fdb5/remote/server/CatalogueHandler.cc | 19 +++++++++++++++++++ src/fdb5/remote/server/CatalogueHandler.h | 1 + 5 files changed, 40 insertions(+), 2 deletions(-) diff --git a/src/fdb5/remote/Messages.cc b/src/fdb5/remote/Messages.cc index 04e316312..638154b2c 100644 --- a/src/fdb5/remote/Messages.cc +++ b/src/fdb5/remote/Messages.cc @@ -45,6 +45,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) { case Message::Move: s << "Move"; break; case Message::Store: s << "Store"; break; case Message::Axes: s << "Axes"; break; + case Message::Exists: s << "Exists"; break; // Responses case Message::Received: s << "Received"; break; diff --git a/src/fdb5/remote/Messages.h b/src/fdb5/remote/Messages.h index 6f3a2ee1d..e8efc8271 100644 --- a/src/fdb5/remote/Messages.h +++ b/src/fdb5/remote/Messages.h @@ -66,6 +66,7 @@ enum class Message : uint16_t { Move, Store, Axes, + Exists, // Responses Received = 200, diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index fcde73a82..0725419eb 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -124,9 +124,25 @@ void RemoteCatalogue::clean() {NOTIMP;} void RemoteCatalogue::close() {NOTIMP;} -bool RemoteCatalogue::exists() const {NOTIMP;} +bool RemoteCatalogue::exists() const { -void RemoteCatalogue::checkUID() const {} + bool exists = false; + + Buffer sendBuf(keyBufferSize); + MemoryStream sms(sendBuf); + sms << dbKey_; + + eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + + eckit::MemoryStream rms(recvBuf); + rms >> exists; + + return exists; +} + +void RemoteCatalogue::checkUID() const { + LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::checkUID() is noop!" << std::endl; +} eckit::URI RemoteCatalogue::uri() const { return eckit::URI("fdb", controlEndpoint().host(), controlEndpoint().port()); diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 9a3dc8f80..46a504bac 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -11,6 +11,8 @@ #include "fdb5/remote/server/CatalogueHandler.h" #include "fdb5/LibFdb5.h" #include "fdb5/api/helpers/FDBToolRequest.h" +#include "fdb5/database/Catalogue.h" +#include "fdb5/database/Key.h" #include "fdb5/remote/Connection.h" #include "fdb5/remote/Messages.h" #include "fdb5/remote/server/ServerConnection.h" @@ -117,6 +119,10 @@ Handled CatalogueHandler::handleControl(Message message, uint32_t clientID, uint flush(clientID, requestID, std::move(payload)); return Handled::Yes; + case Message::Exists: // check if catalogue exists + exists(clientID, requestID, std::move(payload)); + return Handled::Replied; + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -348,11 +354,24 @@ void CatalogueHandler::stores(uint32_t clientID, uint32_t requestID) { } } +void CatalogueHandler::exists(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) const { + ASSERT(payload.size() > 0); + bool exists = false; + { + MemoryStream stream(payload); + const Key dbKey(stream); + exists = CatalogueReaderFactory::instance().build(dbKey, config_)->exists(); + } + eckit::Buffer existBuf(5); + eckit::MemoryStream stream(existBuf); + stream << exists; + write(Message::Received, true, clientID, requestID, existBuf.data(), stream.position()); +} void CatalogueHandler::flush(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) { diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index 33fdf825e..9d10888af 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -59,6 +59,7 @@ class CatalogueHandler : public ServerConnection { void stats(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload); void schema(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload); void stores(uint32_t clientID, uint32_t requestID); + void exists(uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) const; void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) override; From fbbbd3c83306240d2c02542e499f156f8316b386 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 12:59:57 +0100 Subject: [PATCH 06/10] feat(remoteFDB): initial fixup before RemoteStore::exists formatting, unnamed namespace, typename, use make_unique, etc --- src/fdb5/remote/client/RemoteStore.cc | 56 +++++++++++++++------------ src/fdb5/remote/client/RemoteStore.h | 20 ++++++---- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 524a90337..0a04270ab 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -49,11 +49,11 @@ using namespace eckit; -//---------------------------------------------------------------------------------------------------------------------- - namespace fdb5::remote { -// ----------------------------------------------------------------------------------------------------- +//---------------------------------------------------------------------------------------------------------------------- + +namespace { // /// @note The DataHandles returned by retrieve() MUST STRICTLY be read in order. @@ -211,6 +211,8 @@ std::vector> storeEndpoints(const C return out; } +} // namespace + //---------------------------------------------------------------------------------------------------------------------- RemoteStore::RemoteStore(const Key& dbKey, const Config& config) : @@ -219,10 +221,8 @@ RemoteStore::RemoteStore(const Key& dbKey, const Config& config) : {} // this is used only in retrieval, with an URI already referring to an accessible Store -RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) : - Client(eckit::net::Endpoint(uri.hostport()), uri.hostport()), - dbKey_(Key()), config_(config) - { +RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) + : Client(eckit::net::Endpoint(uri.hostport()), uri.hostport()), config_(config) { // no need to set the local_ flag on the read path ASSERT(uri.scheme() == "fdb"); } @@ -312,11 +312,14 @@ size_t RemoteStore::flush() { void RemoteStore::close() { } -void RemoteStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const { +void RemoteStore::remove(const eckit::URI& /*uri*/, + std::ostream& /*logAlways*/, + std::ostream& /*logVerbose*/, + const bool /*doit*/) const { NOTIMP; } -void RemoteStore::remove(const Key& key) const { +void RemoteStore::remove(const Key& /*key*/) const { NOTIMP; } @@ -453,24 +456,29 @@ RemoteStore& RemoteStore::get(const eckit::URI& uri) { return *(it->second); } - return *(readStores_[endpoint] = std::unique_ptr(new RemoteStore(uri, Config()))); + return *(readStores_[endpoint] = std::make_unique(uri, Config())); } - bool RemoteStore::uriBelongs(const eckit::URI&) const { - NOTIMP; - } - bool RemoteStore::uriExists(const eckit::URI&) const { - NOTIMP; - } - std::vector RemoteStore::collocatedDataURIs() const { - NOTIMP; - } - std::set RemoteStore::asCollocatedDataURIs(const std::vector&) const { - NOTIMP; - } +bool RemoteStore::uriBelongs(const eckit::URI&) const { + NOTIMP; +} + +bool RemoteStore::uriExists(const eckit::URI&) const { + NOTIMP; +} + +std::vector RemoteStore::collocatedDataURIs() const { + NOTIMP; +} + +std::set RemoteStore::asCollocatedDataURIs(const std::vector&) const { + NOTIMP; +} + +//---------------------------------------------------------------------------------------------------------------------- -static StoreBuilder builder("remote"); +static StoreBuilder builder(RemoteStore::typeName()); //---------------------------------------------------------------------------------------------------------------------- -} // namespace fdb5::remote +} // namespace fdb5::remote diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index 3446d318e..50b7e5f96 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -14,12 +14,17 @@ #pragma once -#include "fdb5/api/FDBStats.h" #include "fdb5/database/Catalogue.h" -#include "fdb5/database/Index.h" +#include "fdb5/database/Key.h" #include "fdb5/database/Store.h" +#include "fdb5/remote/Messages.h" #include "fdb5/remote/client/Client.h" +#include +#include +#include +#include + namespace fdb5::remote { class Locations { @@ -100,9 +105,10 @@ class Locations { class RemoteStore : public Store, public Client { public: // types + using StoredMessage = std::pair; + using MessageQueue = eckit::Queue; - using StoredMessage = std::pair; - using MessageQueue = eckit::Queue; + static const char* typeName() { return "remote"; } public: // methods @@ -132,12 +138,10 @@ class RemoteStore : public Store, public Client { std::vector collocatedDataURIs() const override; std::set asCollocatedDataURIs(const std::vector&) const override; - const Config& config() { return config_; } - + const Config& config() const { return config_; } protected: // methods - - std::string type() const override { return "remote"; } + std::string type() const override { return typeName(); } bool exists() const override; From 9bf825bb3d7d5644fa33fac2c57bd390613b7eab Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 13:18:05 +0100 Subject: [PATCH 07/10] fix(remoteFDB): loop in storeEndpoints --- src/fdb5/remote/client/RemoteStore.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 0a04270ab..d1e545f99 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -196,7 +196,9 @@ class FDBRemoteDataHandle : public DataHandle { bool complete_; }; -std::vector> storeEndpoints(const Config& config) { +using EndPointList = std::vector>; + +EndPointList storeEndpoints(const Config& config) { ASSERT(config.has("stores")); ASSERT(config.has("fieldLocationEndpoints")); @@ -204,9 +206,11 @@ std::vector> storeEndpoints(const C std::vector fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); ASSERT(stores.size() == fieldLocationEndpoints.size()); - std::vector> out; - for (size_t i=0; i Date: Thu, 16 Jan 2025 15:37:21 +0100 Subject: [PATCH 08/10] chore(remoteFDB): some improvements --- src/fdb5/remote/client/Client.h | 9 +++-- src/fdb5/remote/client/RemoteCatalogue.cc | 26 +++++--------- src/fdb5/remote/client/RemoteStore.cc | 16 ++++----- src/fdb5/remote/server/CatalogueHandler.cc | 4 +-- src/fdb5/remote/server/StoreHandler.cc | 20 +++++++---- src/fdb5/remote/server/StoreHandler.h | 42 ++++++++++++---------- 6 files changed, 63 insertions(+), 54 deletions(-) diff --git a/src/fdb5/remote/client/Client.h b/src/fdb5/remote/client/Client.h index 72594a405..dfaa5829b 100644 --- a/src/fdb5/remote/client/Client.h +++ b/src/fdb5/remote/client/Client.h @@ -35,12 +35,17 @@ class RemoteFDBException : public eckit::RemoteException { class Client : eckit::NonCopyable { public: // types - using PayloadList = Connection::PayloadList; + using PayloadList = Connection::PayloadList; + using EndpointList = std::vector>; + + static constexpr size_t defaultBufferSizeArchive = 8192; + static constexpr size_t defaultBufferSizeFlush = 1024; + static constexpr size_t defaultBufferSizeKey = 4096; public: // methods Client(const eckit::net::Endpoint& endpoint, const std::string& defaultEndpoint); - Client(const std::vector>& endpoints); + Client(const EndpointList& endpoints); virtual ~Client(); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 0725419eb..7a5ce9a6a 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -28,14 +28,6 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -namespace { - -constexpr size_t archiveBufferSize = 8192; -constexpr size_t keyBufferSize = 4096; -} - -//---------------------------------------------------------------------------------------------------------------------- - RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), @@ -65,7 +57,7 @@ void RemoteCatalogue::archive(const Key& idxKey, const Key& datumKey, std::share numLocations_++; } - Buffer buffer(archiveBufferSize); + Buffer buffer(defaultBufferSizeArchive); MemoryStream stream(buffer); stream << idxKey; stream << datumKey; @@ -107,8 +99,8 @@ void RemoteCatalogue::flush(size_t archivedFields) { // Flush only does anything if there is an ongoing archive(); if (numLocations_ > 0) { - Buffer sendBuf(1024); - MemoryStream s(sendBuf); + eckit::Buffer sendBuf(defaultBufferSizeFlush); + eckit::MemoryStream s(sendBuf); s << numLocations_; LOG_DEBUG_LIB(LibFdb5) << " RemoteCatalogue::flush - flushing " << numLocations_ << " fields" << std::endl; @@ -126,18 +118,18 @@ void RemoteCatalogue::close() {NOTIMP;} bool RemoteCatalogue::exists() const { - bool exists = false; + bool result = false; - Buffer sendBuf(keyBufferSize); - MemoryStream sms(sendBuf); + eckit::Buffer sendBuf(defaultBufferSizeKey); + eckit::MemoryStream sms(sendBuf); sms << dbKey_; eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); - rms >> exists; + rms >> result; - return exists; + return result; } void RemoteCatalogue::checkUID() const { @@ -156,7 +148,7 @@ void RemoteCatalogue::loadSchema() { LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; // send dbkey to remote. - eckit::Buffer keyBuffer(keyBufferSize); + eckit::Buffer keyBuffer(defaultBufferSizeKey); eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index d1e545f99..8df0bb25e 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -196,18 +196,16 @@ class FDBRemoteDataHandle : public DataHandle { bool complete_; }; -using EndPointList = std::vector>; - -EndPointList storeEndpoints(const Config& config) { +Client::EndpointList storeEndpoints(const Config& config) { ASSERT(config.has("stores")); ASSERT(config.has("fieldLocationEndpoints")); - std::vector stores = config.getStringVector("stores"); - std::vector fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); + const auto stores = config.getStringVector("stores"); + const auto fieldLocationEndpoints = config.getStringVector("fieldLocationEndpoints"); ASSERT(stores.size() == fieldLocationEndpoints.size()); - EndPointList out; + Client::EndpointList out; out.reserve(stores.size()); for (size_t i = 0; i < stores.size(); ++i) { out.emplace_back(eckit::net::Endpoint {stores.at(i)}, fieldLocationEndpoints.at(i)); @@ -269,8 +267,8 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length // store the callback, associated with the request id - to be done BEFORE sending the data locations_.archive(id, catalogue_archive); - Buffer keyBuffer(4096); - MemoryStream keyStream(keyBuffer); + eckit::Buffer keyBuffer(defaultBufferSizeKey); + eckit::MemoryStream keyStream(keyBuffer); keyStream << dbKey_; keyStream << key; @@ -300,7 +298,7 @@ size_t RemoteStore::flush() { size_t locations = complete ? locations_.archived() : locations_.wait(); - Buffer sendBuf(1024); + Buffer sendBuf(defaultBufferSizeFlush); MemoryStream s(sendBuf); s << locations; diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 46a504bac..75ac5c01a 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -361,8 +361,8 @@ void CatalogueHandler::exists(uint32_t clientID, uint32_t requestID, eckit::Buff bool exists = false; { - MemoryStream stream(payload); - const Key dbKey(stream); + eckit::MemoryStream stream(payload); + const Key dbKey(stream); exists = CatalogueReaderFactory::instance().build(dbKey, config_)->exists(); } diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index 1baa108b0..e11d4e491 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -8,25 +8,33 @@ * does it submit to any jurisdiction. */ -#include "eckit/config/Resource.h" -#include "eckit/serialisation/MemoryStream.h" +#include "fdb5/remote/server/StoreHandler.h" #include "fdb5/LibFdb5.h" +#include "fdb5/database/Key.h" #include "fdb5/database/Store.h" -#include "fdb5/remote/server/StoreHandler.h" +#include "fdb5/remote/Messages.h" +#include "fdb5/remote/server/ServerConnection.h" + +#include "eckit/net/TCPSocket.h" +#include "eckit/serialisation/MemoryStream.h" + +#include +#include +#include +#include using namespace eckit; -using metkit::mars::MarsRequest; namespace fdb5::remote { +//---------------------------------------------------------------------------------------------------------------------- + StoreHandler::StoreHandler(eckit::net::TCPSocket& socket, const Config& config): ServerConnection(socket, config) { LibFdb5::instance().constructorCallback()(*this); } -StoreHandler::~StoreHandler() {} - Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t requestID) { try { diff --git a/src/fdb5/remote/server/StoreHandler.h b/src/fdb5/remote/server/StoreHandler.h index 40362b09a..4f342589a 100644 --- a/src/fdb5/remote/server/StoreHandler.h +++ b/src/fdb5/remote/server/StoreHandler.h @@ -14,27 +14,16 @@ #include "fdb5/database/Store.h" #include "fdb5/remote/server/ServerConnection.h" -namespace fdb5::remote { - -//---------------------------------------------------------------------------------------------------------------------- - -struct StoreHelper { - StoreHelper(bool dataConnection, const Key& dbKey, const Config& config) : - controlConnection(true), dataConnection(dataConnection), - store(StoreFactory::instance().build(dbKey, config)) {} +#include +#include +#include - bool controlConnection; - bool dataConnection; - - std::unique_ptr store; -}; +namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- class StoreHandler : public ServerConnection, public CallbackRegistry { public: // methods - StoreHandler(eckit::net::TCPSocket& socket, const Config& config); - ~StoreHandler() override; private: // methods @@ -42,24 +31,41 @@ class StoreHandler : public ServerConnection, public CallbackRegistry { Handled handleControl(Message message, uint32_t clientID, uint32_t requestID, eckit::Buffer&& payload) override; void flush(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload); + void read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload); - void archiveBlob(const uint32_t clientID, const uint32_t requestID, const void* data, size_t length) override; + void exists(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) const; + + void archiveBlob(uint32_t clientID, uint32_t requestID, const void* data, size_t length) override; void readLocationThreadLoop(); - void writeToParent(const uint32_t clientID, const uint32_t requestID, std::unique_ptr dh); + + void writeToParent(uint32_t clientID, uint32_t requestID, std::unique_ptr dh); bool remove(bool control, uint32_t clientID) override; Store& store(uint32_t clientID); + Store& store(uint32_t clientID, const Key& dbKey); private: // members - + struct StoreHelper; // clientID --> Store std::map stores_; }; //---------------------------------------------------------------------------------------------------------------------- +struct StoreHandler::StoreHelper { + StoreHelper(bool dataConnection, const Key& dbKey, const Config& config) + : dataConnection(dataConnection), store(StoreFactory::instance().build(dbKey, config)) { } + + bool controlConnection {true}; + bool dataConnection {false}; + + std::unique_ptr store; +}; + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote From 77b365c133ed7e0e7378c1dd953e5e4cb3f5eafb Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Thu, 16 Jan 2025 15:40:16 +0100 Subject: [PATCH 09/10] feat(remoteFDB): add exists to remote store --- src/fdb5/remote/client/RemoteStore.cc | 14 ++++++++++++- src/fdb5/remote/server/StoreHandler.cc | 27 ++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 8df0bb25e..946862239 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -244,7 +244,19 @@ eckit::URI RemoteStore::uri() const { } bool RemoteStore::exists() const { - return true; + + bool result = false; + + eckit::Buffer sendBuf(defaultBufferSizeKey); + eckit::MemoryStream sms(sendBuf); + sms << dbKey_; + + eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + + eckit::MemoryStream rms(recvBuf); + rms >> result; + + return result; } eckit::DataHandle* RemoteStore::retrieve(Field& field) const { diff --git a/src/fdb5/remote/server/StoreHandler.cc b/src/fdb5/remote/server/StoreHandler.cc index e11d4e491..c0c290e47 100644 --- a/src/fdb5/remote/server/StoreHandler.cc +++ b/src/fdb5/remote/server/StoreHandler.cc @@ -75,6 +75,10 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t flush(clientID, requestID, payload); return Handled::Yes; + case Message::Exists: // given key (payload), check if store exists + exists(clientID, requestID, payload); + return Handled::Replied; + default: { std::stringstream ss; ss << "ERROR: Unexpected message recieved (" << message << "). ABORTING"; @@ -94,6 +98,8 @@ Handled StoreHandler::handleControl(Message message, uint32_t clientID, uint32_t return Handled::No; } +//---------------------------------------------------------------------------------------------------------------------- + void StoreHandler::read(uint32_t clientID, uint32_t requestID, const eckit::Buffer& payload) { { @@ -272,4 +278,25 @@ Store& StoreHandler::store(uint32_t clientID, const Key& dbKey) { return *((stores_.emplace(clientID, StoreHelper(!single_, dbKey, config_)).first)->second.store); } +void StoreHandler::exists(const uint32_t clientID, const uint32_t requestID, const eckit::Buffer& payload) const { + + ASSERT(payload.size() > 0); + + bool exists = false; + + { + eckit::MemoryStream stream(payload); + const Key dbKey(stream); + exists = StoreFactory::instance().build(dbKey, config_)->exists(); + } + + eckit::Buffer existBuf(5); + eckit::MemoryStream stream(existBuf); + stream << exists; + + write(Message::Received, true, clientID, requestID, existBuf.data(), stream.position()); +} + +//---------------------------------------------------------------------------------------------------------------------- + } // namespace fdb5::remote From 5c18872086ba5a1c8a20d5c04feb612c5b3add56 Mon Sep 17 00:00:00 2001 From: Metin Cakircali Date: Fri, 17 Jan 2025 12:32:34 +0100 Subject: [PATCH 10/10] feat(remoteFDB): RemoteCatalogue loads schema lazily related to issue of creating empty DBs --- src/fdb5/remote/client/ClientConnection.cc | 4 +- src/fdb5/remote/client/RemoteCatalogue.cc | 59 +++++++++++++--------- src/fdb5/remote/client/RemoteCatalogue.h | 6 ++- src/fdb5/remote/client/RemoteStore.cc | 2 +- 4 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/fdb5/remote/client/ClientConnection.cc b/src/fdb5/remote/client/ClientConnection.cc index 1a0fe06d9..2501f7b83 100644 --- a/src/fdb5/remote/client/ClientConnection.cc +++ b/src/fdb5/remote/client/ClientConnection.cc @@ -218,14 +218,14 @@ void ClientConnection::dataWrite(Client& client, remote::Message msg, uint32_t r } uint32_t payloadLength = 0; - for (auto payload : payloads) { + for (const auto& payload : payloads) { ASSERT(payload.data); payloadLength += payload.length; } eckit::Buffer buffer{payloadLength}; uint32_t offset = 0; - for (auto payload : payloads) { + for (const auto& payload : payloads) { buffer.copy(payload.data, payload.length, offset); offset += payload.length; } diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 7a5ce9a6a..319ae9c97 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -17,8 +17,10 @@ #include "eckit/filesystem/URI.h" #include "eckit/log/Log.h" #include "eckit/serialisation/MemoryStream.h" +#include "fdb5/rules/Schema.h" #include +#include #include #include @@ -28,18 +30,38 @@ namespace fdb5::remote { //---------------------------------------------------------------------------------------------------------------------- -RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config): - CatalogueImpl(key, ControlIdentifiers(), config), // xxx what are control identifiers? Setting empty here... - Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), - config_(config), schema_(nullptr), numLocations_(0) { +namespace { - loadSchema(); +Schema* fetchSchema(const Key& dbKey, const RemoteCatalogue& catalogue) { + LOG_DEBUG_LIB(LibFdb5) << "Fetching schema from remote catalogue: " << catalogue.controlEndpoint() << std::endl; + + // send dbkey to remote + eckit::Buffer keyBuffer(RemoteCatalogue::defaultBufferSizeKey); + eckit::MemoryStream keyStream(keyBuffer); + keyStream << dbKey; + + const auto requestID = catalogue.generateRequestID(); + + // receive schema from remote + auto recvBuf = catalogue.controlWriteReadResponse(Message::Schema, requestID, keyBuffer, keyStream.position()); + + eckit::MemoryStream schemaStream(recvBuf); + return eckit::Reanimator::reanimate(schemaStream); } +} // namespace + +//---------------------------------------------------------------------------------------------------------------------- + +RemoteCatalogue::RemoteCatalogue(const Key& key, const Config& config) + : CatalogueImpl(key, {}, config), // xxx what are control identifiers? Setting empty here... + Client({config.getString("host"), config.getInt("port")}, ""), + config_(config) { } + // Catalogue(URI, Config) is only used by the Visitors to traverse the catalogue. In the remote, we use the RemoteFDB for catalogue traversal // this ctor is here only to comply with the factory -RemoteCatalogue::RemoteCatalogue(const eckit::URI& uri, const Config& config): - Client(eckit::net::Endpoint(config.getString("host"), config.getInt("port")), ""), config_(config), schema_(nullptr), numLocations_(0) { +RemoteCatalogue::RemoteCatalogue(const eckit::URI& /*uri*/, const Config& config) + : Client({config.getString("host"), config.getInt("port")}, ""), config_(config) { NOTIMP; } @@ -87,7 +109,11 @@ void RemoteCatalogue::deselectIndex() { currentIndexKey_ = Key(); } const Schema& RemoteCatalogue::schema() const { - ASSERT(schema_); + // lazy loading schema + if (!schema_) { + schema_.reset(fetchSchema(dbKey_, *this)); + ASSERT(schema_); + } return *schema_; } @@ -124,7 +150,7 @@ bool RemoteCatalogue::exists() const { eckit::MemoryStream sms(sendBuf); sms << dbKey_; - eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); rms >> result; @@ -143,20 +169,7 @@ eckit::URI RemoteCatalogue::uri() const { void RemoteCatalogue::loadSchema() { // NB we're at the db level, so get the db schema. We will want to get the master schema beforehand. // (outside of the catalogue) - - if (!schema_) { - LOG_DEBUG_LIB(LibFdb5) << "RemoteCatalogue::loadSchema()" << std::endl; - - // send dbkey to remote. - eckit::Buffer keyBuffer(defaultBufferSizeKey); - eckit::MemoryStream keyStream(keyBuffer); - keyStream << dbKey_; - - eckit::Buffer buf = controlWriteReadResponse(Message::Schema, generateRequestID(), keyBuffer, keyStream.position()); - - eckit::MemoryStream s(buf); - schema_.reset(eckit::Reanimator::reanimate(s)); - } + if (!schema_) { schema_.reset(fetchSchema(dbKey_, *this)); } } bool RemoteCatalogue::handle(Message message, uint32_t requestID) { diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index b51009780..dca8b7e84 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -6,6 +6,7 @@ #include "fdb5/database/Catalogue.h" #include "fdb5/database/DbStats.h" #include "fdb5/database/Index.h" +#include "fdb5/database/Key.h" #include "fdb5/database/Store.h" #include "fdb5/remote/client/Client.h" @@ -14,6 +15,7 @@ #include #include #include +#include #include #include @@ -89,10 +91,10 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C private: Key currentIndexKey_; - std::unique_ptr schema_; + mutable std::unique_ptr schema_; std::mutex archiveMutex_; - size_t numLocations_; + size_t numLocations_ {0}; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index 946862239..ea340595a 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -251,7 +251,7 @@ bool RemoteStore::exists() const { eckit::MemoryStream sms(sendBuf); sms << dbKey_; - eckit::Buffer recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); + auto recvBuf = controlWriteReadResponse(Message::Exists, generateRequestID(), sendBuf, sms.position()); eckit::MemoryStream rms(recvBuf); rms >> result;