From 645688df68eff8f6939f2110669ba67ad24243fa Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Tue, 5 Nov 2024 10:40:32 +0100 Subject: [PATCH 1/4] IoSerialiserYaS: remove deprecated template keyword Modern LLVM errors on this deprected use of the keyword. Signed-off-by: Alexander Krimm --- src/serialiser/include/IoSerialiserYaS.hpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/serialiser/include/IoSerialiserYaS.hpp b/src/serialiser/include/IoSerialiserYaS.hpp index 4ae7bbd3..0f9162a2 100644 --- a/src/serialiser/include/IoSerialiserYaS.hpp +++ b/src/serialiser/include/IoSerialiserYaS.hpp @@ -7,8 +7,8 @@ #include #pragma clang diagnostic push -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" -#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-magic-numbers" +#pragma ide diagnostic ignored "cppcoreguidelines-avoid-c-arrays" namespace opencmw { struct YaS : Protocol<"YaS"> {}; @@ -340,7 +340,7 @@ inline DeserialiserInfo checkHeaderInfo(IoBuffer &buffer, DeserialiserInfo const auto magic = buffer.get(); if (yas::VERSION_MAGIC_NUMBER != magic) { if (check == ProtocolCheck::LENIENT) { - info.exceptions.template emplace_back(ProtocolException("Wrong serialiser magic number: {} != -1", magic)); + info.exceptions.emplace_back(ProtocolException("Wrong serialiser magic number: {} != -1", magic)); } if (check == ProtocolCheck::ALWAYS) { throw ProtocolException("Wrong serialiser magic number: {} != -1", magic); @@ -350,7 +350,7 @@ inline DeserialiserInfo checkHeaderInfo(IoBuffer &buffer, DeserialiserInfo auto proto_name = buffer.get(); if (yas::PROTOCOL_NAME != proto_name) { if (check == ProtocolCheck::LENIENT) { - info.exceptions.template emplace_back(ProtocolException("Wrong serialiser identification string: {} != YaS", proto_name)); + info.exceptions.emplace_back(ProtocolException("Wrong serialiser identification string: {} != YaS", proto_name)); } if (check == ProtocolCheck::ALWAYS) { throw ProtocolException("Wrong serialiser identification string: {} != YaS", proto_name); @@ -362,7 +362,7 @@ inline DeserialiserInfo checkHeaderInfo(IoBuffer &buffer, DeserialiserInfo auto ver_micro = buffer.get(); if (yas::VERSION_MAJOR != ver_major) { if (check == ProtocolCheck::LENIENT) { - info.exceptions.template emplace_back(ProtocolException("Major versions do not match, received {}.{}.{}", ver_major, ver_minor, ver_micro)); + info.exceptions.emplace_back(ProtocolException("Major versions do not match, received {}.{}.{}", ver_major, ver_minor, ver_micro)); } if (check == ProtocolCheck::ALWAYS) { throw ProtocolException("Major versions do not match, received {}.{}.{}", ver_major, ver_minor, ver_micro); From df7b1efc050c1b732c58a6c7b68920bb9e3caf7a Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Tue, 5 Nov 2024 09:20:34 +0100 Subject: [PATCH 2/4] RestClient: Correctly use LongPollingIdx Parameter Changes the RestClient to internally track the last received update index instead of just blindly always requesting the latest update. This prevents duplicate notifications and allows catching up and blocking on not yet arrived updates. For emscripten this needs a bit of trickery because the api of emscripten_fetch does not provide a way to access the final redirected URL, so we have to call out to javascript to retrieve it. Signed-off-by: Alexander Krimm --- src/client/include/RestClientEmscripten.hpp | 53 +++++++++++++++------ src/client/include/RestClientNative.hpp | 50 +++++++++---------- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/src/client/include/RestClientEmscripten.hpp b/src/client/include/RestClientEmscripten.hpp index abb1eaa4..1b126434 100644 --- a/src/client/include/RestClientEmscripten.hpp +++ b/src/client/include/RestClientEmscripten.hpp @@ -18,6 +18,27 @@ using namespace opencmw; namespace opencmw::client { namespace detail { + +/*** + * Get the final URL of a possibly redirected HTTP fetch call. + * Uses Javascript to return the the url as a string. + */ +static std::string getFinalURL(std::uint32_t id) { + auto finalURLChar = static_cast(EM_ASM_PTR({ + var fetch = Fetch.xhrs.get($0); + if (fetch) { + var finalURL = fetch.responseURL; + var lengthBytes = lengthBytesUTF8(finalURL) + 1; + var stringOnWasmHeap = _malloc(lengthBytes); + stringToUTF8(finalURL, stringOnWasmHeap, lengthBytes); + return stringOnWasmHeap; + } + return 0; }, id)); + std::string finalURL{ finalURLChar, strlen(finalURLChar) }; + EM_ASM({ _free($0) }, finalURLChar); + return finalURL; +} + struct pointer_equals { using is_transparent = void; template @@ -103,6 +124,7 @@ static std::unordered_set, detail:: struct SubscriptionPayload : FetchPayload { bool _live = true; MIME::MimeType _mimeType; + std::size_t _update = 0; SubscriptionPayload(Command &&_command, MIME::MimeType mimeType) : FetchPayload(std::move(_command)) @@ -114,7 +136,7 @@ struct SubscriptionPayload : FetchPayload { SubscriptionPayload &operator=(SubscriptionPayload &&other) noexcept = default; void requestNext() { - auto uri = command.topic; + auto uri = opencmw::URI::UriFactory(command.topic).addQueryParameter("LongPollingIdx", (_update == 0) ? "Next" : fmt::format("{}", _update)).build(); fmt::print("URL 1 >>> {}\n", uri.relativeRef()); auto preferredHeader = detail::getPreferredContentTypeHeader(command.topic, _mimeType); std::array preferredHeaderEmscripten; @@ -140,14 +162,22 @@ struct SubscriptionPayload : FetchPayload { attr.attributes = EMSCRIPTEN_FETCH_LOAD_TO_MEMORY; attr.requestHeaders = preferredHeaderEmscripten.data(); attr.onsuccess = [](emscripten_fetch_t *fetch) { - auto payloadIt = getPayloadIt(fetch); - auto &payload = *payloadIt; + auto payloadIt = getPayloadIt(fetch); + auto &payload = *payloadIt; + std::string finalURL = getFinalURL(fetch->id); + std::string longPollingIdxString = opencmw::URI<>(finalURL).queryParamMap().at("LongPollingIdx").value_or("0"); + char *end = longPollingIdxString.data() + longPollingIdxString.size(); + std::size_t longPollingIdx = strtoull(longPollingIdxString.data(), &end, 10); + if (longPollingIdx != payload->_update) { + fmt::print("received unexpected update: {}, expected {}\n", longPollingIdx, payload->_update); + return; + } payload->onsuccess(fetch->status, std::string_view(fetch->data, detail::checkedStringViewSize(fetch->numBytes))); emscripten_fetch_close(fetch); + payload->_update++; if (payload->_live) { payload->requestNext(); } else { - emscripten_fetch_close(fetch); detail::subscriptionPayloads.erase(payloadIt); } }; @@ -195,7 +225,7 @@ class RestClient : public ClientBase { } ~RestClient() { RestClient::stop(); }; - void stop() override{}; + void stop() override {}; std::vector protocols() noexcept override { return { "http", "https" }; } @@ -273,26 +303,19 @@ class RestClient : public ClientBase { } void startSubscription(Command &&cmd) { - auto uri = opencmw::URI<>::factory(cmd.topic).addQueryParameter("LongPollingIdx", "Next").build(); - cmd.topic = uri; - auto payload = std::make_unique(std::move(cmd), _mimeType); auto rawPayload = payload.get(); detail::subscriptionPayloads.insert(std::move(payload)); + fmt::print("starting subscription: {}, existiing subscriptions: {}\n", cmd.topic.str(), detail::subscriptionPayloads.size()); rawPayload->requestNext(); } void stopSubscription(Command &&cmd) { - // TODO: Can we provide - // Subscription subscribe(...) - // void get(...) - // void set(...) - // instead of going through a fake generic request(...)? - auto uri = opencmw::URI<>::factory(cmd.topic).addQueryParameter("LongPollingIdx", "Next").build(); auto payloadIt = std::find_if(detail::subscriptionPayloads.begin(), detail::subscriptionPayloads.end(), [&](const auto &ptr) { - return ptr->command.topic == uri; + return ptr->command.topic == cmd.topic; }); + fmt::print("stopping subscription: {}, existiing subscriptions: {}\n", cmd.topic.str(), detail::subscriptionPayloads.size()); if (payloadIt == detail::subscriptionPayloads.end()) { return; } diff --git a/src/client/include/RestClientNative.hpp b/src/client/include/RestClientNative.hpp index 2ec18293..73b9d4a7 100644 --- a/src/client/include/RestClientNative.hpp +++ b/src/client/include/RestClientNative.hpp @@ -33,7 +33,7 @@ class MinIoThreads { public: MinIoThreads() = default; MinIoThreads(int value) noexcept - : _minThreads(value){}; + : _minThreads(value) {}; constexpr operator int() const noexcept { return _minThreads; }; }; @@ -43,7 +43,7 @@ class MaxIoThreads { public: MaxIoThreads() = default; MaxIoThreads(int value) noexcept - : _maxThreads(value){}; + : _maxThreads(value) {}; constexpr operator int() const noexcept { return _maxThreads; }; }; @@ -52,9 +52,9 @@ struct ClientCertificates { ClientCertificates() = default; ClientCertificates(const char *X509_ca_bundle) noexcept - : _certificates(X509_ca_bundle){}; + : _certificates(X509_ca_bundle) {}; ClientCertificates(const std::string &X509_ca_bundle) noexcept - : _certificates(X509_ca_bundle){}; + : _certificates(X509_ca_bundle) {}; constexpr operator std::string() const noexcept { return _certificates; }; }; @@ -313,37 +313,31 @@ class RestClient : public ClientBase { { client.set_follow_location(true); - auto longPollingEndpoint = [&] { - if (!cmd.topic.queryParamMap().contains(LONG_POLLING_IDX_TAG)) { - return URI<>::factory(cmd.topic).addQueryParameter(LONG_POLLING_IDX_TAG, "Next").build(); - } else { - return URI<>::factory(cmd.topic).build(); - } - }(); - - const auto pollHeaders = getPreferredContentTypeHeader(longPollingEndpoint); - auto endpoint = longPollingEndpoint.relativeRef().value(); + std::size_t longPollingIdx = 0; + const auto pollHeaders = getPreferredContentTypeHeader(cmd.topic); client.set_read_timeout(cmd.timeout); // default keep-alive value while (_run) { - auto redirect_get = [&client](auto url, auto headers) { - for (;;) { - auto result = client.Get(url, headers); - if (!result) return result; - - if (result->status >= 300 && result->status < 400) { - url = httplib::detail::decode_url(result.value().get_header_value("location"), true); - } else { - return result; - } + auto endpoint = [&]() { + if (longPollingIdx == 0UZ) { + return URI::factory(cmd.topic).addQueryParameter(LONG_POLLING_IDX_TAG, "Next").build().relativeRef().value(); + } else { + return URI::factory(cmd.topic).addQueryParameter(LONG_POLLING_IDX_TAG, fmt::format("{}", longPollingIdx)).build().relativeRef().value(); } - }; - if (const httplib::Result &result = redirect_get(endpoint, pollHeaders)) { + }(); + if (const httplib::Result &result = client.Get(endpoint, pollHeaders)) { returnMdpMessage(cmd, result); - } else { // failed or server is down -> wait until retry - std::this_thread::sleep_for(cmd.timeout); // time-out until potential retry + // update long-polling-index + std::string location = result->location.empty() ? endpoint : result->location; + std::string updateIdxString = URI(location).queryParamMap().at(std::string(LONG_POLLING_IDX_TAG)).value_or("0"); + char *end = updateIdxString.data() + updateIdxString.size(); + longPollingIdx = strtoull(updateIdxString.data(), &end, 10) + 1; + auto headerTimestamp = result->get_header_value_u64("X-TIMESTAMP"); + auto latency = headerTimestamp - std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count(); + } else { // failed or server is down -> wait until retry if (_run) { returnMdpMessage(cmd, result, fmt::format("Long-Polling-GET request failed for {}: {}", cmd.topic.str(), static_cast(result.error()))); } + std::this_thread::sleep_for(cmd.timeout); // time-out until potential retry } } } From ae567da6cdf016593b8d268aad35492228993d37 Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Wed, 6 Nov 2024 17:24:57 +0100 Subject: [PATCH 3/4] RestBackend: Remove artificial rate limit The poll loop that handles the messages from the broker had a 100ms sleep built-in and only handled only one update per iteration and connection. This lead to updates being acumulated in the RestBackend's subscription socket, whenever the underlying property would update with more than 10Hz. This commit reduces the sleep between poll loops to 10ms and handles all notifications for all connections at once, allowing the RestBackend to catch up on a burst of notifications. smaller changes: - make http headers case-insensitive - general code-cleanup Signed-off-by: Alexander Krimm --- .../include/majordomo/RestBackend.hpp | 169 ++++++++---------- 1 file changed, 73 insertions(+), 96 deletions(-) diff --git a/src/majordomo/include/majordomo/RestBackend.hpp b/src/majordomo/include/majordomo/RestBackend.hpp index 89ee01cf..cb955d6f 100644 --- a/src/majordomo/include/majordomo/RestBackend.hpp +++ b/src/majordomo/include/majordomo/RestBackend.hpp @@ -229,8 +229,7 @@ struct Connection { std::atomic_int _refCount = 1; - // Here be dragons! This is not to be used after - // the connection was involved in any threading code + // Here be dragons! This is not to be used after the connection was involved in any threading code Connection(Connection &&other) noexcept : notificationSubscriptionSocket(std::move(other.notificationSubscriptionSocket)) , requestResponseSocket(std::move(other.requestResponseSocket)) @@ -250,8 +249,7 @@ struct Connection { Connection &operator=(const Connection &) = delete; Connection &operator=(Connection &&) = delete; - // Here be dragons! This is not to be used after - // the connection was involved in any threading code + // Here be dragons! This is not to be used after the connection was involved in any threading code Connection unsafeMove() && { return std::move(*this); } @@ -283,8 +281,7 @@ struct Connection { } bool waitForUpdate(std::chrono::milliseconds timeout) { - // This could also periodically check for the client connection being dropped (e.g. due to client-side timeout) - // if cpp-httplib had API for that. + // This could also periodically check for the client connection being dropped (e.g. due to client-side timeout) if cpp-httplib had API for that. auto temporaryLock = writeLock(); const auto next = _nextPollingIndex; while (_nextPollingIndex == next) { @@ -331,9 +328,9 @@ class RestBackend : public Mode { std::atomic _majordomoTimeout = 30000ms; private: - std::jthread _connectionUpdaterThread; - std::shared_mutex _connectionsMutex; - std::map> _connectionForService; + std::jthread _mdpConnectionUpdaterThread; + std::shared_mutex _mdpConnectionsMutex; + std::map> _mdpConnectionForService; public: /** @@ -350,18 +347,17 @@ class RestBackend : public Mode { } using BrokerType = Broker; - // returns a connection with refcount 1. Make sure you lower it to - // zero at some point + // returns a connection with refcount 1. Make sure you lower it to zero at some point detail::Connection *notificationSubscriptionConnectionFor(const std::string &subscriptionKey) { - detail::WriteLock lock(_connectionsMutex); + detail::WriteLock lock(_mdpConnectionsMutex); // TODO: No need to find + emplace as separate steps - if (auto it = _connectionForService.find(subscriptionKey); it != _connectionForService.end()) { + if (auto it = _mdpConnectionForService.find(subscriptionKey); it != _mdpConnectionForService.end()) { auto *connection = it->second.get(); connection->increaseReferenceCount(); return connection; } - auto [it, inserted] = _connectionForService.emplace(std::piecewise_construct, + auto [it, inserted] = _mdpConnectionForService.emplace(std::piecewise_construct, std::forward_as_tuple(subscriptionKey), std::forward_as_tuple(std::make_unique(_broker.context, subscriptionKey))); @@ -390,18 +386,20 @@ class RestBackend : public Mode { // Starts the thread to keep the unused subscriptions alive void startUpdaterThread() { - _connectionUpdaterThread = std::jthread([this](std::stop_token stop_token) { + _mdpConnectionUpdaterThread = std::jthread([this](const std::stop_token &stopToken) { thread::setThreadName("RestBackend updater thread"); - while (!stop_token.stop_requested()) { - std::this_thread::sleep_for(100ms); + + std::vector connections; + std::vector pollItems; + while (!stopToken.stop_requested()) { + std::list keep; { - // This is a long lock, alternatively, message reading - // could have separate locks per connection - detail::WriteLock lock(_connectionsMutex); + // This is a long lock, alternatively, message reading could have separate locks per connection + detail::WriteLock lock(_mdpConnectionsMutex); // Expired subscriptions cleanup std::vector expiredSubscriptions; - for (auto &[subscriptionKey, connection] : _connectionForService) { + for (auto &[subscriptionKey, connection] : _mdpConnectionForService) { // fmt::print("Reference count is {}\n", connection->referenceCount()); if (connection->referenceCount() == 0) { auto connectionLock = connection->writeLock(); @@ -414,49 +412,43 @@ class RestBackend : public Mode { } } for (const auto &subscriptionKey : expiredSubscriptions) { - _connectionForService.erase(subscriptionKey); + _mdpConnectionForService.erase(subscriptionKey); } - // Reading the missed messages - const auto connectionCount = _connectionForService.size(); - - if (connectionCount != 0) { - std::vector connections; - std::vector pollItems; - connections.resize(connectionCount); - pollItems.resize(connectionCount); - - std::list keep; - - for (std::size_t i = 0; auto &kvp : _connectionForService) { - auto &[key, connection] = kvp; - - connections[i] = connection.get(); - keep.emplace_back(connection.get()); + // setup poller and socket data structures for all connections + const std::size_t connectionCount = _mdpConnectionForService.size(); + connections.resize(connectionCount); + pollItems.resize(connectionCount); + for (std::size_t i = 0UZ; auto &[key, connection] : _mdpConnectionForService) { + connections[i] = connection.get(); + keep.emplace_back(connection.get()); + pollItems[i].events = ZMQ_POLLIN; + pollItems[i].socket = connection->notificationSubscriptionSocket.zmq_ptr; + ++i; + } + } // finished copying local state, keep ensures that connections are kept alive, end of lock on _mdpConnectionsForService - pollItems[i].events = ZMQ_POLLIN; - pollItems[i].socket = connection->notificationSubscriptionSocket.zmq_ptr; - ++i; - } + if (pollItems.empty()) { + std::this_thread::sleep_for(100ms); // prevent spinning on connection cleanup if there are no connections to poll on + continue; + } - auto pollCount = zmq::invoke(zmq_poll, pollItems.data(), static_cast(pollItems.size()), - std::chrono::duration_cast(UPDATER_POLLING_TIME).count()); - if (!pollCount) { - std::terminate(); - } - if (pollCount.value() == 0) { - continue; - } + auto pollCount = zmq::invoke(zmq_poll, pollItems.data(), static_cast(pollItems.size()), std::chrono::duration_cast(UPDATER_POLLING_TIME).count()); + if (!pollCount) { + fmt::print("Error while polling for updates from the broker\n"); + std::terminate(); + } + if (pollCount.value() == 0) { + continue; + } - for (std::size_t i = 0; i < connectionCount; ++i) { - const auto events = pollItems[i].revents; - if (events & ZMQ_POLLIN) { - auto *currentConnection = connections[i]; - auto connectionLock = currentConnection->writeLock(); - if (auto responseMessage = zmq::receive(currentConnection->notificationSubscriptionSocket)) { - currentConnection->addCachedReply(connectionLock, std::string(responseMessage->data.asString())); - } - } + // Reading messages + for (std::size_t i = 0; i < connections.size(); ++i) { + if (pollItems[i].revents & ZMQ_POLLIN) { + detail::Connection *currentConnection = connections[i]; + std::unique_lock connectionLock = currentConnection->writeLock(); + while (auto responseMessage = zmq::receive(currentConnection->notificationSubscriptionSocket)) { + currentConnection->addCachedReply(connectionLock, std::string(responseMessage->data.asString())); } } } @@ -483,8 +475,8 @@ class RestBackend : public Mode { virtual ~RestBackend() { _svr.stop(); // shutdown thread before _connectionForService is destroyed - _connectionUpdaterThread.request_stop(); - _connectionUpdaterThread.join(); + _mdpConnectionUpdaterThread.request_stop(); + _mdpConnectionUpdaterThread.join(); } auto handleServiceRequest(const httplib::Request &request, httplib::Response &response, const httplib::ContentReader *content_reader_ = nullptr) { @@ -515,12 +507,8 @@ class RestBackend : public Mode { auto topic = std::move(*maybeTopic); auto restMethod = [&] { + auto methodString = request.has_header("X-OPENCMW-METHOD") ? request.get_header_value("X-OPENCMW-METHOD") : request.method; // clang-format off - auto methodString = - request.has_header("X-OPENCMW-METHOD") ? - request.get_header_value("X-OPENCMW-METHOD") : - request.method; - return methodString == "SUB" ? RestMethod::Subscribe : methodString == "POLL" ? RestMethod::LongPoll : methodString == "PUT" ? RestMethod::Post : @@ -532,10 +520,8 @@ class RestBackend : public Mode { for (const auto &[key, value] : request.params) { if (key == "LongPollingIdx") { - // This parameter is not passed on, it just means we - // want to use long polling + // This parameter is not passed on, it just means we want to use long polling restMethod = value == "Subscription" ? RestMethod::Subscribe : RestMethod::LongPoll; - } else if (key == "SubscriptionContext") { topic = mdp::Topic::fromString(value, {}); // params are parsed from value } @@ -550,14 +536,11 @@ class RestBackend : public Mode { switch (restMethod) { case RestMethod::Get: case RestMethod::Post: - return worker.respondWithPubSub(request, response, topic, restMethod, content_reader_); - + return worker.respondWithGetSet(request, response, topic, restMethod, content_reader_); case RestMethod::LongPoll: return worker.respondWithLongPoll(request, response, topic); - case RestMethod::Subscribe: return worker.respondWithSubscription(response, topic); - default: // std::unreachable() is C++23 assert(!"We have already checked that restMethod is not Invalid"); @@ -639,34 +622,27 @@ struct RestBackend::RestWorker { return std::move(connection).unsafeMove(); } - bool respondWithPubSub(const httplib::Request &request, httplib::Response &response, mdp::Topic topic, detail::RestMethod restMethod, const httplib::ContentReader *content_reader_ = nullptr) { - // clang-format off - const mdp::Command mdpMessageCommand = - restMethod == detail::RestMethod::Post ? mdp::Command::Set : - /* default */ mdp::Command::Get; - // clang-format on + bool respondWithGetSet(const httplib::Request &request, httplib::Response &response, mdp::Topic topic, detail::RestMethod restMethod, const httplib::ContentReader *content_reader_ = nullptr) { + const mdp::Command mdpMessageCommand = restMethod == detail::RestMethod::Post ? mdp::Command::Set : mdp::Command::Get; - auto uri = URI<>::factory(); - std::string bodyOverride; - std::string contentType; - int contentLength{ 0 }; + auto uri = URI<>::factory(); + std::string bodyOverride; + std::string contentType; + int contentLength{ 0 }; for (const auto &[key, value] : request.params) { if (key == "_bodyOverride") { bodyOverride = value; - } else if (key == "LongPollingIdx") { - // This parameter is not passed on, it just means we - // want to use long polling -- already handled - + // This parameter is not passed on, it just means we want to use long polling -- already handled } else { uri = std::move(uri).addQueryParameter(key, value); } } for (const auto &[key, value] : request.headers) { - if (key == "Content-Length") { + if (httplib::detail::case_ignore::equal(key, "Content-Length")) { contentLength = std::stoi(value); - } else if (key == "Content-Type") { + } else if (httplib::detail::case_ignore::equal(key, "Content-Type")) { contentType = value; } } @@ -742,6 +718,7 @@ struct RestBackend::RestWorker { response.set_header("X-OPENCMW-TOPIC", responseMessage->topic.str().data()); response.set_header("X-OPENCMW-SERVICE-NAME", responseMessage->serviceName.data()); response.set_header("Access-Control-Allow-Origin", "*"); + response.set_header("X-TIMESTAMP", fmt::format("{}", std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count())); const auto data = responseMessage->data.asString(); if (request.method != "GET") { @@ -759,6 +736,8 @@ struct RestBackend::RestWorker { assert(connection); const auto majordomoTimeout = restBackend.majordomoTimeout(); response.set_header("Access-Control-Allow-Origin", "*"); + response.set_header("X-TIMESTAMP", fmt::format("{}", std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch()).count())); + response.set_chunked_content_provider( "application/json", [connection, majordomoTimeout](std::size_t /*offset*/, httplib::DataSink &sink) mutable { @@ -818,8 +797,8 @@ struct RestBackend::RestWorker { detail::Connection *connection = nullptr; }; auto fetchCache = [this, &subscriptionKey] { - std::shared_lock lock(restBackend._connectionsMutex); - auto &recycledConnectionForService = restBackend._connectionForService; + std::shared_lock lock(restBackend._mdpConnectionsMutex); + auto &recycledConnectionForService = restBackend._mdpConnectionForService; if (auto it = recycledConnectionForService.find(subscriptionKey); it != recycledConnectionForService.cend()) { auto *connectionCache = it->second.get(); detail::Connection::KeepAlive keep(connectionCache); @@ -842,7 +821,7 @@ struct RestBackend::RestWorker { detail::PollingIndex requestedLongPollingIdx = 0; - // Hoping we already have the requested value in the cache + // Hoping we already have the requested value in the cache. Holding this caches blocks all cache entries, so no further updates can be received or other connections initiated. { const auto cache = fetchCache(); response.set_header("Access-Control-Allow-Origin", "*"); @@ -888,7 +867,7 @@ struct RestBackend::RestWorker { assert(connection); detail::Connection::KeepAlive keep(connection); - // Since we use KeepAlive object, the inital refCount can go away + // Since we use KeepAlive object, the initial refCount can go away connection->decreaseReferenceCount(); if (!connection->waitForUpdate(restBackend.majordomoTimeout())) { @@ -904,7 +883,6 @@ struct RestBackend::RestWorker { auto connectionCacheLock = newCache.connection->readLock(); response.set_content(newCache.connection->cachedReply(connectionCacheLock, requestedLongPollingIdx), MIME::JSON.typeName().data()); return true; - } else { return detail::respondWithError(response, "Error: We waited for the new value, but it was not found"); } @@ -914,8 +892,7 @@ struct RestBackend::RestWorker { void addParameters(const httplib::Request &request, URI<>::UriFactory &uri) { for (const auto &[key, value] : request.params) { if (key == "LongPollingIdx") { - // This parameter is not passed on, it just means we - // want to use long polling -- already handled + // This parameter is not passed on, it just means we want to use long polling -- already handled } else { uri = std::move(uri).addQueryParameter(key, value); } From 4e1d52ecf7c4fcaf8e2a401c8c82f195d4ee8760 Mon Sep 17 00:00:00 2001 From: Alexander Krimm Date: Wed, 6 Nov 2024 19:20:00 +0100 Subject: [PATCH 4/4] majordomoworker_rest_tests: test with LongPollIdx Fix the unittests which do not corretly handle the LongPollingIdx parameter. Signed-off-by: Alexander Krimm --- .../test/majordomoworker_rest_tests.cpp | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/src/majordomo/test/majordomoworker_rest_tests.cpp b/src/majordomo/test/majordomoworker_rest_tests.cpp index c6220e7d..948d74b3 100644 --- a/src/majordomo/test/majordomoworker_rest_tests.cpp +++ b/src/majordomo/test/majordomoworker_rest_tests.cpp @@ -38,6 +38,29 @@ std::jthread makeGetRequestResponseCheckerThread(const std::string &address, con }); } +std::jthread makeLongPollingRequestResponseCheckerThread(const std::string &address, const std::vector &requiredResponses, const std::vector &requiredStatusCodes = {}, [[maybe_unused]] std::source_location location = std::source_location::current()) { + return std::jthread([=] { + httplib::Client http("localhost", majordomo::DEFAULT_REST_PORT); + http.set_follow_location(true); + http.set_keep_alive(true); +#define requireWithSource(arg) \ + if (!(arg)) opencmw::zmq::debug::withLocation(location) << "<- call got a failed requirement:"; \ + REQUIRE(arg) + for (std::size_t i = 0; i < requiredResponses.size(); ++i) { + const std::string url = fmt::format("{}{}LongPollingIdx={}", address, address.contains('?') ? "&" : "?", i == 0 ? "Next" : fmt::format("{}", i)); + const auto response = http.Get(url); + if (i == 0) { // check forwarding to the explicit index + REQUIRE(response->location.find("&LongPollingIdx=0") != std::string::npos); + } + requireWithSource(response); + const auto requiredStatusCode = i < requiredStatusCodes.size() ? requiredStatusCodes[i] : 200; + requireWithSource(response->status == requiredStatusCode); + requireWithSource(response->body.find(requiredResponses[i]) != std::string::npos); + } +#undef requireWithSource + }); +} + struct ColorContext { bool red = false; bool green = false; @@ -252,12 +275,12 @@ TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") { REQUIRE(waitUntilWorkerServiceAvailable(broker.context, worker)); - auto allListener = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next", { "0", "1", "2", "3", "4", "5", "6" }); - auto redListener = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next&red", { "0", "3", "4", "6" }); - auto yellowListener = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next&red&green", { "4", "6" }); - auto whiteListener1 = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next&red&green&blue", { "6" }); - auto whiteListener2 = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next&green&red&blue", { "6" }); - auto whiteListener3 = makeGetRequestResponseCheckerThread("/colors?LongPollingIdx=Next&blue&green&red", { "6" }); + auto allListener = makeLongPollingRequestResponseCheckerThread("/colors", { "0", "1", "2", "3", "4", "5", "6" }); + auto redListener = makeLongPollingRequestResponseCheckerThread("/colors?red", { "0", "3", "4", "6" }); + auto yellowListener = makeLongPollingRequestResponseCheckerThread("/colors?red&green", { "4", "6" }); + auto whiteListener1 = makeLongPollingRequestResponseCheckerThread("/colors?red&green&blue", { "6" }); + auto whiteListener2 = makeLongPollingRequestResponseCheckerThread("/colors?green&red&blue", { "6" }); + auto whiteListener3 = makeLongPollingRequestResponseCheckerThread("/colors?blue&green&red", { "6" }); std::this_thread::sleep_for(50ms); // give time for subscriptions to happen