Skip to content

Commit

Permalink
Merge pull request #60 from bbockelm/buffered_reads_v1
Browse files Browse the repository at this point in the history
Add buffering and read-ahead to the read workflow.
  • Loading branch information
bbockelm authored Dec 14, 2024
2 parents ab22c54 + 74656d0 commit 21b3e77
Show file tree
Hide file tree
Showing 10 changed files with 1,049 additions and 69 deletions.
9 changes: 8 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ cmake_minimum_required( VERSION 3.13 )

project( xrootd-http/s3 )

option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF )
option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the xrootd-s3-http unit tests" OFF )
option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF )
option( VALGRIND "Run select unit tests under valgrind" OFF )
option( ASAN "Build the plugin with the address sanitizer" OFF )

set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake )
set( CMAKE_BUILD_TYPE Debug )
Expand All @@ -25,6 +26,12 @@ if(VALGRIND)
find_program(VALGRIND_BIN valgrind REQUIRED)
endif()

if(ASAN)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address")
set(CMAKE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -fsanitize=address")
endif()

macro(use_cxx17)
if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down
6 changes: 3 additions & 3 deletions src/CurlUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ HTTPRequest *HandlerQueue::TryConsume() {
void CurlWorker::RunStatic(CurlWorker *myself) {
try {
myself->Run();
} catch (...) {
myself->m_logger.Log(LogMask::Debug, "CurlWorker::RunStatic",
"Curl worker got an exception");
} catch (std::exception &exc) {
myself->m_logger.Log(LogMask::Error, "CurlWorker::RunStatic",
"Curl worker got an exception:", exc.what());
}
}

Expand Down
110 changes: 91 additions & 19 deletions src/HTTPCommands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ std::vector<CurlWorker *> HTTPRequest::m_workers;
std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration =
std::chrono::seconds(10);

namespace {

//
// "This function gets called by libcurl as soon as there is data received
// that needs to be saved. The size of the data pointed to by ptr is size
Expand All @@ -59,20 +57,56 @@ namespace {
// We also make extensive use of this function in the XML parsing code,
// for pretty much exactly the same reason.
//
size_t appendToString(const void *ptr, size_t size, size_t nmemb, void *str) {
size_t HTTPRequest::handleResults(const void *ptr, size_t size, size_t nmemb,
void *me_ptr) {
if (size == 0 || nmemb == 0) {
return 0;
}

std::string source((const char *)ptr, size * nmemb);
std::string *ssptr = (std::string *)str;
ssptr->append(source);

auto me = reinterpret_cast<HTTPRequest *>(me_ptr);
if (!me) {
return 0;
}
std::string_view source(static_cast<const char *>(ptr), size * nmemb);

// std::cout << "Handling results with size " << (size * nmemb) << " and
// HTTP verb " << me->httpVerb << "\n";
if (me->httpVerb == "GET") {
if (!me->responseCode) {
auto rv = curl_easy_getinfo(
me->m_curl_handle, CURLINFO_RESPONSE_CODE, &(me->responseCode));
if (rv != CURLE_OK) {
me->errorCode = "E_CURL_LIB";
me->errorMessage = "curl_easy_getinfo() failed.";
return 0;
}
}
if (me->getResponseCode() == me->expectedResponseCode &&
me->requestResult() != nullptr) {
if (!me->m_result_buffer_initialized) {
me->m_result_buffer_initialized = true;
me->m_result_buffer = *me->requestResult();
// std::cout << "Handling data for GET with response code " <<
// me->responseCode << "and expected response size " <<
// me->m_result.size() << "\n";
}
if (me->m_result_buffer.size() < source.size()) {
me->errorCode = "E_CURL_LIB";
me->errorMessage = "Curl had response with too-long result.";
return 0;
}
memcpy(const_cast<char *>(me->m_result_buffer.data()),
source.data(), source.size());
me->m_result_buffer = me->m_result_buffer.substr(source.size());
} else {
me->m_result.append(source);
}
} else {
me->m_result.append(source);
}
return (size * nmemb);
}

} // namespace

HTTPRequest::~HTTPRequest() {}

#define SET_CURL_SECURITY_OPTION(A, B, C) \
Expand Down Expand Up @@ -268,9 +302,26 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) {
return request;
}

bool HTTPRequest::sendPreparedRequest(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final) {
int HTTPRequest::XferInfoCallback(void *clientp, curl_off_t dltotal,
curl_off_t /*dlnow*/, curl_off_t ultotal,
curl_off_t /*ulnow*/) {
auto me = reinterpret_cast<HTTPRequest *>(clientp);
if ((me->m_bytes_recv != dltotal) || (me->m_bytes_sent != ultotal)) {
me->m_last_movement = std::chrono::steady_clock::now();
} else if (std::chrono::steady_clock::now() - me->m_last_movement >
m_transfer_stall) {
me->errorCode = "E_TIMEOUT";
me->errorMessage = "I/O stall during transfer";
return 1;
}
me->m_bytes_recv = dltotal;
me->m_bytes_sent = ultotal;
return 0;
}
bool HTTPRequest::sendPreparedRequestNonblocking(const std::string &uri,
const std::string_view payload,
off_t payload_size,
bool final) {
m_uri = uri;
m_payload = payload;
m_payload_size = payload_size;
Expand Down Expand Up @@ -306,6 +357,15 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
} else {
m_queue->Produce(this);
}
return true;
}

bool HTTPRequest::sendPreparedRequest(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final) {
if (!sendPreparedRequestNonblocking(uri, payload, payload_size, final)) {
return false;
}
std::unique_lock<std::mutex> lk(m_mtx);
m_cv.wait(lk, [&] { return m_result_ready; });

Expand Down Expand Up @@ -347,11 +407,11 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr);
curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
curl_easy_setopt(curl, CURLOPT_POST, 0);
Expand Down Expand Up @@ -486,25 +546,37 @@ bool HTTPRequest::SetupHandle(CURL *curl) {
}
}

rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &appendToString);
rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &handleResults);
if (rv != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage =
"curl_easy_setopt( CURLOPT_WRITEFUNCTION ) failed.";
return false;
}

rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, &m_result);
rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
if (rv != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage = "curl_easy_setopt( CURLOPT_WRITEDATA ) failed.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage =
"curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed.";
errorCode = "E_CURL_LIB";
errorMessage = "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION,
HTTPRequest::XferInfoCallback) != CURLE_OK) {
errorCode = "E_CURL_LIB";
errorMessage = "Failed to set the transfer info callback function.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this) != CURLE_OK) {
errorCode = "E_CURL_LIB";
errorMessage = "Failed to set the transfer info callback data.";
return false;
}

Expand Down
56 changes: 56 additions & 0 deletions src/HTTPCommands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,36 @@ class HTTPRequest {
const std::string_view payload, off_t payload_size,
bool final);

// Send the request to the HTTP server.
// Returns immediately, not waiting for the result.
//
// If `final` is set to `false`, the HTTPRequest object will start streaming
// a request and assume that `sendPreparedRequest` will be repeated until
// all data is provided (the sum total of the chunks given is the
// payload_size). If payload_size is 0 and final is false, this indicates
// the complete size of the PUT is unknown and chunked encoding will be
// used.
//
// - url: URL, including query parameters, to use.
// - payload: The payload contents when uploading.
// - payload_size: Size of the entire payload (not just the current chunk).
// - final: True if this is the last or only payload for the request. False
// otherwise.
bool sendPreparedRequestNonblocking(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final);

// Called by the curl handler thread that the request has been finished.
virtual void Notify();

// Returns the standalone buffer if a sub-classe's externally-managed one
// is supposed to be used.
//
// If the std::string_view is empty, then it's assumed the HTTPRequest
// itself owns the result buffer and should create one. Note that,
// on errors, the HTTPRequest result buffer is still used.
virtual std::string_view *requestResult() { return nullptr; }

const std::string &getProtocol() { return m_protocol; }

// Returns true if the command is a streaming/partial request.
Expand Down Expand Up @@ -147,6 +174,9 @@ class HTTPRequest {
std::string errorMessage;
std::string errorCode;

// The contents of the result from the HTTP server.
// If this is a GET and we got the expectedResponseCode, then
// the results are populated in the m_result_buffer instead.
std::string m_result;
unsigned long responseCode{0};
unsigned long expectedResponseCode = 200;
Expand Down Expand Up @@ -184,6 +214,15 @@ class HTTPRequest {
// buffer.
static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v);

// Handle the callback from libcurl
static size_t handleResults(const void *ptr, size_t size, size_t nmemb,
void *me_ptr);

// Transfer information callback from libcurl
static int XferInfoCallback(void *clientp, curl_off_t dltotal,
curl_off_t dlnow, curl_off_t ultotal,
curl_off_t ulnow);

const TokenFile *m_token{nullptr};

// The following members manage the work queue and workers.
Expand All @@ -209,10 +248,27 @@ class HTTPRequest {
false}; // Flag indicating this command is a streaming request.
bool m_timeout{false}; // Flag indicating the request has timed out.
bool m_result_ready{false}; // Flag indicating the results data is ready.
bool m_result_buffer_initialized{
false}; // Flag indicating whether the result buffer view has been
// initialized.
off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown.
std::string m_protocol;
std::string m_uri; // URL to request from libcurl
std::string_view m_payload;

// Total number of bytes received from the server
off_t m_bytes_recv{0};
// Total number of bytes sent to server
off_t m_bytes_sent{0};
// Time of last data movement (upload or download). Used to detect transfer
// stalls
std::chrono::steady_clock::time_point m_last_movement;
// Transfer stall timeout
static constexpr std::chrono::steady_clock::duration m_transfer_stall{
std::chrono::seconds(10)};

// The contents of a successful GET request.
std::string_view m_result_buffer;
CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request
char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl
unsigned m_retry_count{0};
Expand Down
2 changes: 1 addition & 1 deletion src/HTTPFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ int HTTPFile::Fstat(struct stat *buff) {
size_t next_newline = std::string::npos;
size_t last_character = headers.size();
while (current_newline != std::string::npos &&
current_newline != last_character - 1) {
current_newline != last_character - 1 && last_character) {
next_newline = headers.find("\r\n", current_newline + 2);
std::string line =
substring(headers, current_newline + 2, next_newline);
Expand Down
Loading

0 comments on commit 21b3e77

Please sign in to comment.