Skip to content

Commit

Permalink
Extended qhttp::Server API with the body stream read option
Browse files Browse the repository at this point in the history
Refactored request handling flow of qhttp
Added unit tests for the new API.
  • Loading branch information
iagaponenko committed Jun 30, 2024
1 parent d92777c commit 27ea6c0
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 111 deletions.
2 changes: 1 addition & 1 deletion src/qhttp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ target_link_libraries(testqhttp PUBLIC
Threads::Threads
)

add_test(NAME testqhttp COMMAND testqhttp -- --data=${CMAKE_CURRENT_SOURCE_DIR}/testdata/ --retries=2 --retry-delay=1 --threads=2)
add_test(NAME testqhttp COMMAND testqhttp -- --data=${CMAKE_CURRENT_SOURCE_DIR}/testdata/ --retries=2 --retry-delay=1 --threads=2 --client-threads=4)
73 changes: 64 additions & 9 deletions src/qhttp/Request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "qhttp/Request.h"

// System headers
#include <algorithm>
#include <cstdlib>
#include <utility>

Expand All @@ -34,6 +35,7 @@
#include "lsst/log/Log.h"
#include "qhttp/LogHelpers.h"

namespace asio = boost::asio;
namespace ip = boost::asio::ip;

namespace {
Expand All @@ -42,14 +44,23 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.qhttp");

namespace lsst::qserv::qhttp {

Request::Request(std::shared_ptr<Server> const server, std::shared_ptr<ip::tcp::socket> const socket)
: content(&_requestbuf), _server(std::move(server)), _socket(std::move(socket)) {
std::size_t const Request::defaultRecordSizeBytes = 1024 * 1024;

Request::Request(std::shared_ptr<Server> const server, std::shared_ptr<Response> const response,
std::shared_ptr<ip::tcp::socket> const socket)
: content(&_requestbuf),
_server(std::move(server)),
_response(std::move(response)),
_socket(std::move(socket)) {
boost::system::error_code ignore;
localAddr = _socket->local_endpoint(ignore);
remoteAddr = _socket->remote_endpoint(ignore);
}

bool Request::_parseHeader() {
bool Request::_parseHeader(std::size_t headerSizeBytes) {
// Note: a value of the attribute must be computed before parsing the header.
_contentReadBytes = _requestbuf.size() - headerSizeBytes;

std::string line;
if (!getline(content, line)) return false;

Expand Down Expand Up @@ -88,7 +99,16 @@ bool Request::_parseHeader() {
}
header[headerMatch[1]] = headerMatch[2];
}

if (header.count("Content-Length") > 0) {
try {
_contentLengthBytes = stoull(header["Content-Length"]);
} catch (std::exception const& e) {
LOGLS_WARN(_log, logger(_server)
<< logger(_socket) << "rejecting request with bad Content-Length: "
<< ctrlquote(header["Content-Length"]));
return false;
}
}
return true;
}

Expand Down Expand Up @@ -123,11 +143,6 @@ bool Request::_parseUri() {
return true;
}

bool Request::_parseBody() {
// TODO: implement application/x-www-form-urlencoded body -> body
return true;
}

std::string Request::_percentDecode(std::string const& encoded, bool exceptPathDelimeters, bool& hasNULs) {
std::string decoded;
hasNULs = false;
Expand Down Expand Up @@ -163,4 +178,44 @@ std::string Request::_percentDecode(std::string const& encoded, bool exceptPathD
return decoded;
}

void Request::readEntireBodyAsync(BodyReadCallback onFinished) {
std::size_t const bytesToRead = _contentLengthBytes - _contentReadBytes;
_readBodyAsync(_server->_startTimer(_socket), bytesToRead, onFinished);
}

void Request::readPartialBodyAsync(BodyReadCallback onFinished, size_t numBytes) {
std::size_t const bytesToRead =
std::min(_contentLengthBytes - _contentReadBytes, numBytes == 0 ? _recordSizeBytes : numBytes);
_readBodyAsync(_server->_startTimer(_socket), bytesToRead, onFinished);
}

void Request::_readBodyAsync(std::shared_ptr<asio::steady_timer> timer_, std::size_t bytesToRead,
BodyReadCallback onFinished) {
auto timer = std::move(timer_);
auto self = shared_from_this();
if (bytesToRead > 0) {
_contentReadBytes += bytesToRead;
LOGLS_INFO(_log, logger(_server) << logger(_socket) << method << " " << target << " " << version
<< " + " << bytesToRead << " bytes");
asio::async_read(*_socket, _requestbuf, asio::transfer_exactly(bytesToRead),
[self, timer, onFinished](boost::system::error_code const& ec, size_t bytesRead) {
timer->cancel();
if (ec == asio::error::operation_aborted) {
LOGLS_ERROR(_log, logger(self->_server) << logger(self->_socket)
<< "request body read canceled");
} else if (ec) {
LOGLS_ERROR(_log, logger(self->_server)
<< logger(self->_socket)
<< "request body read failed: " << ec.message());
}
onFinished(self, self->_response, !ec, bytesRead);
self->content.clear();
});
} else {
LOGLS_INFO(_log, logger(_server) << logger(_socket) << method << " " << target << " " << version);
timer->cancel();
onFinished(self, _response, true, 0);
}
}

} // namespace lsst::qserv::qhttp
71 changes: 66 additions & 5 deletions src/qhttp/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#define LSST_QSERV_QHTTP_REQUEST_H

// System headers
#include <functional>
#include <iostream>
#include <memory>
#include <string>
Expand All @@ -39,10 +40,18 @@
namespace lsst::qserv::qhttp {

class Server;
class Response;

class Request : public std::enable_shared_from_this<Request> {
public:
using Ptr = std::shared_ptr<Request>;
using BodyReadCallback =
std::function<void(std::shared_ptr<Request>, std::shared_ptr<Response>, bool, std::size_t)>;

// ----- The default size of the body read requests in stream mode (when
// calling method readPartialBodyAsync()).

static std::size_t const defaultRecordSizeBytes;

//----- The local address on which this request was accepted

Expand Down Expand Up @@ -70,27 +79,79 @@ class Request : public std::enable_shared_from_this<Request> {

//----- Body content for this request

std::istream content; // unparsed body
std::unordered_map<std::string, std::string> body; // parsed body, if x-www-form-urlencoded
std::istream content; // unparsed body

//----- Read the body of the request asynchronously. The onFinished callback will be called
// with a boolean argument that is true if the body was read successfully.
// The length of the body is always available to the request handler via contentLengthBytes().
// The actual number of bytes read so far is available via contentReadBytes(). This parameter
// is set if the body is read automatically or manually by calling method readEntireBodyAsync().
// Any differene between contentLengthBytes() and contentReadBytes() indicates that the body
// is not fully read yet.

void readEntireBodyAsync(BodyReadCallback onFinished);

std::size_t contentLengthBytes() const { return _contentLengthBytes; }
std::size_t contentReadBytes() const { return _contentReadBytes; }

//----- Read the next chunk of the body of the request asynchronously. The onFinished callback
// will be called with a boolean argument that is true if the body was read successfully.
// The caller is responsible for calling this method repeatedly until the body is fully read
// which is indicated by comparing contentLengthBytes() and contentReadBytes().
// The number of bytes read in each chunk is determined by the value of the optional
// parameter numBytes. If the value is 0, the size of the chunk is determined by the value
// of the recordSizeBytes() method.

void readPartialBodyAsync(BodyReadCallback onFinished, std::size_t numBytes = 0);

//----- Get/set the size of the body read requests in stream mode.
// The default value of 0 will reset the current setting to the implementation default.

std::size_t recordSizeBytes() const { return _recordSizeBytes; }
void setRecordSize(std::size_t bytes = 0) {
_recordSizeBytes = bytes == 0 ? defaultRecordSizeBytes : bytes;
}

private:
friend class Server;

Request(Request const&) = delete;
Request& operator=(Request const&) = delete;

explicit Request(std::shared_ptr<Server> const server,
explicit Request(std::shared_ptr<Server> const server, std::shared_ptr<Response> const response,
std::shared_ptr<boost::asio::ip::tcp::socket> const socket);

bool _parseHeader();
bool _parseHeader(std::size_t headerSizeBytes);
bool _parseUri();
bool _parseBody();

std::string _percentDecode(std::string const& encoded, bool exceptPathDelimeters, bool& hasNULs);

//----- Read the specified number of bytes of the request's body asynchronously. The onFinished
// callback will be called with a boolean argument that is true if the body was read successfully.

void _readBodyAsync(std::shared_ptr<boost::asio::steady_timer> timer_, std::size_t bytesToRead,
BodyReadCallback onFinished);

std::shared_ptr<Server> const _server;
std::shared_ptr<Response> const _response;
std::shared_ptr<boost::asio::ip::tcp::socket> const _socket;
boost::asio::streambuf _requestbuf;

// ----- The current size of the body read requests in stream mode (when calling method
// readPartialBodyAsync()). The size can be changed by calling method setRecordSize().

std::size_t _recordSizeBytes = defaultRecordSizeBytes;

// ----- Content-Length header value, if present is set by _parseHeader() and used to
// read the request's body.

std::size_t _contentLengthBytes = 0;

// ----- The number of content bytes read so far. It is used to determine when the body
// has been fully read and to prevent reading beyond the end of the body.
// The value is set by _parseHeader() and considerd when reading the request's body.

std::size_t _contentReadBytes = 0;
};

} // namespace lsst::qserv::qhttp
Expand Down
Loading

0 comments on commit 27ea6c0

Please sign in to comment.