From 188590ac2b6e26241657d64d6117c8d94e835d7c Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Mon, 27 Nov 2023 12:47:34 +0100 Subject: [PATCH] Do not block worker threads forever if no event is received (#329) Make sure that long-poll request handlers do not block forever when no corresponding event is received. Otherwise the clients will send one request after another once their request times out client-side, until the worker threads are exhausted and the server stops responding. It would be great if we could also detect the client connection be dropped using Keep-Alive, but cpp-httplib doesn't seem to have API for that. --- .../include/majordomo/RestBackend.hpp | 58 +++++++++---- .../test/majordomoworker_rest_tests.cpp | 85 +++++++++++++++++-- 2 files changed, 121 insertions(+), 22 deletions(-) diff --git a/src/majordomo/include/majordomo/RestBackend.hpp b/src/majordomo/include/majordomo/RestBackend.hpp index c5079855..d6dfd06a 100644 --- a/src/majordomo/include/majordomo/RestBackend.hpp +++ b/src/majordomo/include/majordomo/RestBackend.hpp @@ -62,9 +62,10 @@ using namespace std::chrono_literals; constexpr auto HTTP_OK = 200; constexpr auto HTTP_ERROR = 500; +constexpr auto HTTP_GATEWAY_TIMEOUT = 504; constexpr auto DEFAULT_REST_PORT = 8080; -constexpr auto REST_POLLING_TIME = 10s; constexpr auto UPDATER_POLLING_TIME = 1s; +constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s; constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s; constexpr std::size_t MAX_CACHED_REPLIES = 32; @@ -155,8 +156,8 @@ std::string_view acceptedMimeForRequest(const auto &request) { return acceptableMimeTypes[0]; } -bool respondWithError(auto &response, std::string_view message) { - response.status = HTTP_ERROR; +bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) { + response.status = status; response.set_content(message.data(), MIME::TEXT.typeName().data()); return true; }; @@ -287,9 +288,18 @@ struct Connection { return ReadLock(_cachedRepliesMutex); } - void waitForUpdate() { - auto temporaryLock = writeLock(); - _pollingIndexCV.wait(temporaryLock); + bool waitForUpdate(std::chrono::milliseconds timeout) { + // This could also periodically check for the client connection being dropped (e.g. due to client-side timeout) + // if cpp-httplib had API for that. + auto temporaryLock = writeLock(); + const auto next = _nextPollingIndex; + while (_nextPollingIndex == next) { + if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) { + return false; + } + } + + return true; } std::size_t cachedRepliesSize(ReadLock & /*lock*/) const { @@ -321,9 +331,10 @@ struct Connection { template class RestBackend : public Mode { protected: - Broker &_broker; - const VirtualFS &_vfs; - URI<> _restAddress; + Broker &_broker; + const VirtualFS &_vfs; + URI<> _restAddress; + std::atomic _majordomoTimeout = 30000ms; private: std::jthread _connectionUpdaterThread; @@ -332,6 +343,19 @@ class RestBackend : public Mode { std::map> _connectionForService; public: + /** + * Timeout used for interaction with majordomo workers, i.e. the time to wait + * for notifications on subscriptions (long-polling) and for responses to Get/Set + * requests. + */ + void setMajordomoTimeout(std::chrono::milliseconds timeout) { + _majordomoTimeout = timeout; + } + + std::chrono::milliseconds majordomoTimeout() const { + return _majordomoTimeout; + } + using BrokerType = Broker; // returns a connection with refcount 1. Make sure you lower it to // zero at some point @@ -711,9 +735,9 @@ struct RestBackend::RestWorker { } pollItem.socket = connection.notificationSubscriptionSocket.zmq_ptr; - auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast(REST_POLLING_TIME).count()); + auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast(restBackend.majordomoTimeout()).count()); if (!pollResult || pollResult.value() == 0) { - detail::respondWithError(response, "Error: No response from broker\n"); + detail::respondWithError(response, "Error: No response from broker\n", HTTP_GATEWAY_TIMEOUT); } else if (auto responseMessage = zmq::receive(connection.notificationSubscriptionSocket); !responseMessage) { detail::respondWithError(response, "Error: Empty response from broker\n"); } else if (!responseMessage->error.empty()) { @@ -740,14 +764,16 @@ struct RestBackend::RestWorker { auto *connection = restBackend.notificationSubscriptionConnectionFor(subscriptionInfo); assert(connection); - + const auto majordomoTimeout = restBackend.majordomoTimeout(); response.set_header("Access-Control-Allow-Origin", "*"); response.set_chunked_content_provider( "application/json", - [connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable { + [connection, majordomoTimeout](std::size_t /*offset*/, httplib::DataSink &sink) mutable { std::cerr << "Chunked reply...\n"; - connection->waitForUpdate(); + if (!connection->waitForUpdate(majordomoTimeout)) { + return false; + } auto connectionCacheLock = connection->readLock(); auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1; @@ -871,7 +897,9 @@ struct RestBackend::RestWorker { // Since we use KeepAlive object, the inital refCount can go away connection->decreaseReferenceCount(); - connection->waitForUpdate(); + if (!connection->waitForUpdate(restBackend.majordomoTimeout())) { + return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT); + } const auto newCache = fetchCache(); diff --git a/src/majordomo/test/majordomoworker_rest_tests.cpp b/src/majordomo/test/majordomoworker_rest_tests.cpp index 8fbf4ae7..e9ad1099 100644 --- a/src/majordomo/test/majordomoworker_rest_tests.cpp +++ b/src/majordomo/test/majordomoworker_rest_tests.cpp @@ -19,7 +19,7 @@ // Concepts and tests use common types #include -std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::vector &requiredResponses, [[maybe_unused]] std::source_location location = std::source_location::current()) { +std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::vector &requiredResponses, const std::vector &requiredStatusCodes = {}, [[maybe_unused]] std::source_location location = std::source_location::current()) { return std::jthread([=] { httplib::Client http("localhost", majordomo::DEFAULT_REST_PORT); http.set_follow_location(true); @@ -27,14 +27,12 @@ std::jthread makeGetRequestResponseCheckerThread(const std::string &address, con #define requireWithSource(arg) \ if (!(arg)) opencmw::zmq::debug::withLocation(location) << "<- call got a failed requirement:"; \ REQUIRE(arg) - for (const auto &requiredResponse : requiredResponses) { + for (std::size_t i = 0; i < requiredResponses.size(); ++i) { const auto response = http.Get(address); requireWithSource(response); - if (response->status != 200) { - fmt::println(std::cerr, "Unexpected error: {}", response->body); - } - requireWithSource(response->status == 200); - requireWithSource(response->body.find(requiredResponse) != std::string::npos); + const auto requiredStatusCode = i < requiredStatusCodes.size() ? requiredStatusCodes[i] : 200; + requireWithSource(response->status == requiredStatusCode); + requireWithSource(response->body.find(requiredResponses[i]) != std::string::npos); } #undef requireWithSource }); @@ -75,6 +73,29 @@ class ColorWorker : public majordomo::Worker +class WaitingWorker : public majordomo::Worker { +public: + using super_t = majordomo::Worker; + + template + explicit WaitingWorker(const BrokerType &broker) + : super_t(broker, {}) { + super_t::setCallback([](majordomo::RequestContext &, const WaitingContext &inCtx, const SingleString &in, WaitingContext &outCtx, SingleString &out) { + fmt::println("Sleep for {}", inCtx.timeoutMs); + std::this_thread::sleep_for(std::chrono::milliseconds(inCtx.timeoutMs)); + outCtx = inCtx; + out.value = fmt::format("You said: {}", in.value); + }); + } +}; + TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][majordomoworker][simple_example]") { // We run both broker and worker inproc majordomo::Broker broker("TestBroker", testSettings()); @@ -181,3 +202,53 @@ TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") { std::ranges::sort(subscriptions); REQUIRE(subscriptions == std::vector{ "/notifications", "/notifications?blue&green&red", "/notifications?green&red", "/notifications?red" }); } + +TEST_CASE("Majordomo timeouts", "[majordomo][majordomoworker][rest]") { + majordomo::Broker broker("TestBroker", testSettings()); + auto fs = cmrc::assets::get_filesystem(); + FileServerRestBackend rest(broker, fs); + RunInThread restServerRun(rest); + + opencmw::query::registerTypes(WaitingContext(), broker); + + WaitingWorker<"waiter"> worker(broker); + + RunInThread brokerRun(broker); + RunInThread workerRun(worker); + + REQUIRE(waitUntilServiceAvailable(broker.context, "waiter")); + + // set timeout to unit-test friendly interval + rest.setMajordomoTimeout(800ms); + + SECTION("Waiting for notification that doesn't happen in time returns 504 message") { + std::vector clientThreads; + for (int i = 0; i < 16; ++i) { + clientThreads.push_back(makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", {"Timeout"}, {504})); + } + clientThreads.clear(); + } + + SECTION("Waiting for notification that happens in time gives expected response") { + auto client = makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", {"This is a notification"}); + std::this_thread::sleep_for(400ms); + worker.notify({}, { "This is a notification" }); + } + + SECTION("Response to request takes too long, timeout status is returned") { + httplib::Client postData{ "http://localhost:8080" }; + auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=1200", "{\"value\": \"Hello!\"}", "application/json"); + REQUIRE(reply); + REQUIRE(reply->status == 504); + REQUIRE(reply->body.find("No response") != std::string::npos); + } + + SECTION("Response to request arrives in time") { + httplib::Client postData{ "http://localhost:8080" }; + auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=0", "{\"value\": \"Hello!\"}", "application/json"); + REQUIRE(reply); + REQUIRE(reply->status == 200); + REQUIRE(reply->body.find("You said: Hello!") != std::string::npos); + } + +}