Skip to content

Commit

Permalink
Do not block worker threads forever if no event is received (#329)
Browse files Browse the repository at this point in the history
Make sure that long-poll request handlers do not block forever when no
corresponding event is received. Otherwise the clients will send one
request after another once their request times out client-side, until
the worker threads are exhausted and the server stops responding.

It would be great if we could also detect the client connection be
dropped using Keep-Alive, but cpp-httplib doesn't seem to have API for
that.
  • Loading branch information
frankosterfeld committed Dec 1, 2023
1 parent 52770cc commit 188590a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 22 deletions.
58 changes: 43 additions & 15 deletions src/majordomo/include/majordomo/RestBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ using namespace std::chrono_literals;

constexpr auto HTTP_OK = 200;
constexpr auto HTTP_ERROR = 500;
constexpr auto HTTP_GATEWAY_TIMEOUT = 504;
constexpr auto DEFAULT_REST_PORT = 8080;
constexpr auto REST_POLLING_TIME = 10s;
constexpr auto UPDATER_POLLING_TIME = 1s;
constexpr auto LONG_POLL_SERVER_TIMEOUT = 30s;
constexpr auto UNUSED_SUBSCRIPTION_EXPIRATION_TIME = 30s;
constexpr std::size_t MAX_CACHED_REPLIES = 32;

Expand Down Expand Up @@ -155,8 +156,8 @@ std::string_view acceptedMimeForRequest(const auto &request) {
return acceptableMimeTypes[0];
}

bool respondWithError(auto &response, std::string_view message) {
response.status = HTTP_ERROR;
bool respondWithError(auto &response, std::string_view message, int status = HTTP_ERROR) {
response.status = status;
response.set_content(message.data(), MIME::TEXT.typeName().data());
return true;
};
Expand Down Expand Up @@ -287,9 +288,18 @@ struct Connection {
return ReadLock(_cachedRepliesMutex);
}

void waitForUpdate() {
auto temporaryLock = writeLock();
_pollingIndexCV.wait(temporaryLock);
bool waitForUpdate(std::chrono::milliseconds timeout) {
// This could also periodically check for the client connection being dropped (e.g. due to client-side timeout)
// if cpp-httplib had API for that.
auto temporaryLock = writeLock();
const auto next = _nextPollingIndex;
while (_nextPollingIndex == next) {
if (_pollingIndexCV.wait_for(temporaryLock, timeout) == std::cv_status::timeout) {
return false;
}
}

return true;
}

std::size_t cachedRepliesSize(ReadLock & /*lock*/) const {
Expand Down Expand Up @@ -321,9 +331,10 @@ struct Connection {
template<typename Mode, typename VirtualFS, role... Roles>
class RestBackend : public Mode {
protected:
Broker<Roles...> &_broker;
const VirtualFS &_vfs;
URI<> _restAddress;
Broker<Roles...> &_broker;
const VirtualFS &_vfs;
URI<> _restAddress;
std::atomic<std::chrono::milliseconds> _majordomoTimeout = 30000ms;

private:
std::jthread _connectionUpdaterThread;
Expand All @@ -332,6 +343,19 @@ class RestBackend : public Mode {
std::map<detail::SubscriptionInfo, std::unique_ptr<detail::Connection>> _connectionForService;

public:
/**
* Timeout used for interaction with majordomo workers, i.e. the time to wait
* for notifications on subscriptions (long-polling) and for responses to Get/Set
* requests.
*/
void setMajordomoTimeout(std::chrono::milliseconds timeout) {
_majordomoTimeout = timeout;
}

std::chrono::milliseconds majordomoTimeout() const {
return _majordomoTimeout;
}

using BrokerType = Broker<Roles...>;
// returns a connection with refcount 1. Make sure you lower it to
// zero at some point
Expand Down Expand Up @@ -711,9 +735,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
}

pollItem.socket = connection.notificationSubscriptionSocket.zmq_ptr;
auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast<std::chrono::milliseconds>(REST_POLLING_TIME).count());
auto pollResult = zmq::invoke(zmq_poll, &pollItem, 1, std::chrono::duration_cast<std::chrono::milliseconds>(restBackend.majordomoTimeout()).count());
if (!pollResult || pollResult.value() == 0) {
detail::respondWithError(response, "Error: No response from broker\n");
detail::respondWithError(response, "Error: No response from broker\n", HTTP_GATEWAY_TIMEOUT);
} else if (auto responseMessage = zmq::receive<mdp::MessageFormat::WithoutSourceId>(connection.notificationSubscriptionSocket); !responseMessage) {
detail::respondWithError(response, "Error: Empty response from broker\n");
} else if (!responseMessage->error.empty()) {
Expand All @@ -740,14 +764,16 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {

auto *connection = restBackend.notificationSubscriptionConnectionFor(subscriptionInfo);
assert(connection);

const auto majordomoTimeout = restBackend.majordomoTimeout();

Check warning on line 767 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L767

Added line #L767 was not covered by tests
response.set_header("Access-Control-Allow-Origin", "*");
response.set_chunked_content_provider(
"application/json",
[connection](std::size_t /*offset*/, httplib::DataSink &sink) mutable {
[connection, majordomoTimeout](std::size_t /*offset*/, httplib::DataSink &sink) mutable {

Check warning on line 771 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L771

Added line #L771 was not covered by tests
std::cerr << "Chunked reply...\n";

connection->waitForUpdate();
if (!connection->waitForUpdate(majordomoTimeout)) {
return false;

Check warning on line 775 in src/majordomo/include/majordomo/RestBackend.hpp

View check run for this annotation

Codecov / codecov/patch

src/majordomo/include/majordomo/RestBackend.hpp#L775

Added line #L775 was not covered by tests
}

auto connectionCacheLock = connection->readLock();
auto lastIndex = connection->nextPollingIndex(connectionCacheLock) - 1;
Expand Down Expand Up @@ -871,7 +897,9 @@ struct RestBackend<Mode, VirtualFS, Roles...>::RestWorker {
// Since we use KeepAlive object, the inital refCount can go away
connection->decreaseReferenceCount();

connection->waitForUpdate();
if (!connection->waitForUpdate(restBackend.majordomoTimeout())) {
return detail::respondWithError(response, "Timeout waiting for update", HTTP_GATEWAY_TIMEOUT);
}

const auto newCache = fetchCache();

Expand Down
85 changes: 78 additions & 7 deletions src/majordomo/test/majordomoworker_rest_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
// Concepts and tests use common types
#include <concepts/majordomo/helpers.hpp>

std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::vector<std::string> &requiredResponses, [[maybe_unused]] std::source_location location = std::source_location::current()) {
std::jthread makeGetRequestResponseCheckerThread(const std::string &address, const std::vector<std::string> &requiredResponses, const std::vector<int> &requiredStatusCodes = {}, [[maybe_unused]] std::source_location location = std::source_location::current()) {
return std::jthread([=] {
httplib::Client http("localhost", majordomo::DEFAULT_REST_PORT);
http.set_follow_location(true);
http.set_keep_alive(true);
#define requireWithSource(arg) \
if (!(arg)) opencmw::zmq::debug::withLocation(location) << "<- call got a failed requirement:"; \
REQUIRE(arg)
for (const auto &requiredResponse : requiredResponses) {
for (std::size_t i = 0; i < requiredResponses.size(); ++i) {
const auto response = http.Get(address);
requireWithSource(response);
if (response->status != 200) {
fmt::println(std::cerr, "Unexpected error: {}", response->body);
}
requireWithSource(response->status == 200);
requireWithSource(response->body.find(requiredResponse) != std::string::npos);
const auto requiredStatusCode = i < requiredStatusCodes.size() ? requiredStatusCodes[i] : 200;
requireWithSource(response->status == requiredStatusCode);
requireWithSource(response->body.find(requiredResponses[i]) != std::string::npos);
}
#undef requireWithSource
});
Expand Down Expand Up @@ -75,6 +73,29 @@ class ColorWorker : public majordomo::Worker<serviceName, ColorContext, majordom
}
};

struct WaitingContext {
int32_t timeoutMs = 0;
opencmw::MIME::MimeType contentType = opencmw::MIME::JSON;
};
ENABLE_REFLECTION_FOR(WaitingContext, timeoutMs, contentType)

template<units::basic_fixed_string serviceName, typename... Meta>
class WaitingWorker : public majordomo::Worker<serviceName, WaitingContext, SingleString, SingleString, Meta...> {
public:
using super_t = majordomo::Worker<serviceName, WaitingContext, SingleString, SingleString, Meta...>;

template<typename BrokerType>
explicit WaitingWorker(const BrokerType &broker)
: super_t(broker, {}) {
super_t::setCallback([](majordomo::RequestContext &, const WaitingContext &inCtx, const SingleString &in, WaitingContext &outCtx, SingleString &out) {
fmt::println("Sleep for {}", inCtx.timeoutMs);
std::this_thread::sleep_for(std::chrono::milliseconds(inCtx.timeoutMs));
outCtx = inCtx;
out.value = fmt::format("You said: {}", in.value);
});
}
};

TEST_CASE("Simple MajordomoWorker example showing its usage", "[majordomo][majordomoworker][simple_example]") {
// We run both broker and worker inproc
majordomo::Broker broker("TestBroker", testSettings());
Expand Down Expand Up @@ -181,3 +202,53 @@ TEST_CASE("Subscriptions", "[majordomo][majordomoworker][subscription]") {
std::ranges::sort(subscriptions);
REQUIRE(subscriptions == std::vector<std::string>{ "/notifications", "/notifications?blue&green&red", "/notifications?green&red", "/notifications?red" });
}

TEST_CASE("Majordomo timeouts", "[majordomo][majordomoworker][rest]") {
majordomo::Broker broker("TestBroker", testSettings());
auto fs = cmrc::assets::get_filesystem();
FileServerRestBackend<majordomo::PLAIN_HTTP, decltype(fs)> rest(broker, fs);
RunInThread restServerRun(rest);

opencmw::query::registerTypes(WaitingContext(), broker);

WaitingWorker<"waiter"> worker(broker);

RunInThread brokerRun(broker);
RunInThread workerRun(worker);

REQUIRE(waitUntilServiceAvailable(broker.context, "waiter"));

// set timeout to unit-test friendly interval
rest.setMajordomoTimeout(800ms);

SECTION("Waiting for notification that doesn't happen in time returns 504 message") {
std::vector<std::jthread> clientThreads;
for (int i = 0; i < 16; ++i) {
clientThreads.push_back(makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", {"Timeout"}, {504}));
}
clientThreads.clear();
}

SECTION("Waiting for notification that happens in time gives expected response") {
auto client = makeGetRequestResponseCheckerThread("/waiter?LongPollingIdx=Next", {"This is a notification"});
std::this_thread::sleep_for(400ms);
worker.notify({}, { "This is a notification" });
}

SECTION("Response to request takes too long, timeout status is returned") {
httplib::Client postData{ "http://localhost:8080" };
auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=1200", "{\"value\": \"Hello!\"}", "application/json");
REQUIRE(reply);
REQUIRE(reply->status == 504);
REQUIRE(reply->body.find("No response") != std::string::npos);
}

SECTION("Response to request arrives in time") {
httplib::Client postData{ "http://localhost:8080" };
auto reply = postData.Post("/waiter?contentType=application%2Fjson&timeoutMs=0", "{\"value\": \"Hello!\"}", "application/json");
REQUIRE(reply);
REQUIRE(reply->status == 200);
REQUIRE(reply->body.find("You said: Hello!") != std::string::npos);
}

Check notice on line 253 in src/majordomo/test/majordomoworker_rest_tests.cpp

View check run for this annotation

codefactor.io / CodeFactor

src/majordomo/test/majordomoworker_rest_tests.cpp#L253

Redundant blank line at the end of a code block should be deleted. (whitespace/blank_line)
}

0 comments on commit 188590a

Please sign in to comment.