Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered Reads #60

Merged
merged 6 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ cmake_minimum_required( VERSION 3.13 )

project( xrootd-http/s3 )

option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF )
option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the xrootd-s3-http unit tests" OFF )
option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF )
option( VALGRIND "Run select unit tests under valgrind" OFF )
option( ASAN "Build the plugin with the address sanitizer" OFF )

set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake )
set( CMAKE_BUILD_TYPE Debug )
Expand All @@ -25,6 +26,12 @@ if(VALGRIND)
find_program(VALGRIND_BIN valgrind REQUIRED)
endif()

if(ASAN)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address")
set(CMAKE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -fsanitize=address")
endif()

macro(use_cxx17)
if (CMAKE_VERSION VERSION_LESS "3.1")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
Expand Down
6 changes: 3 additions & 3 deletions src/CurlUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ HTTPRequest *HandlerQueue::TryConsume() {
void CurlWorker::RunStatic(CurlWorker *myself) {
try {
myself->Run();
} catch (...) {
myself->m_logger.Log(LogMask::Debug, "CurlWorker::RunStatic",
"Curl worker got an exception");
} catch (std::exception &exc) {
myself->m_logger.Log(LogMask::Error, "CurlWorker::RunStatic",
"Curl worker got an exception:", exc.what());
}
}

Expand Down
110 changes: 91 additions & 19 deletions src/HTTPCommands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ std::vector<CurlWorker *> HTTPRequest::m_workers;
std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration =
std::chrono::seconds(10);

namespace {

//
// "This function gets called by libcurl as soon as there is data received
// that needs to be saved. The size of the data pointed to by ptr is size
Expand All @@ -59,20 +57,56 @@ namespace {
// We also make extensive use of this function in the XML parsing code,
// for pretty much exactly the same reason.
//
size_t appendToString(const void *ptr, size_t size, size_t nmemb, void *str) {
size_t HTTPRequest::handleResults(const void *ptr, size_t size, size_t nmemb,
void *me_ptr) {
if (size == 0 || nmemb == 0) {
return 0;
}

std::string source((const char *)ptr, size * nmemb);
std::string *ssptr = (std::string *)str;
ssptr->append(source);

auto me = reinterpret_cast<HTTPRequest *>(me_ptr);
if (!me) {
return 0;
}
std::string_view source(static_cast<const char *>(ptr), size * nmemb);

// std::cout << "Handling results with size " << (size * nmemb) << " and
// HTTP verb " << me->httpVerb << "\n";
if (me->httpVerb == "GET") {
if (!me->responseCode) {
auto rv = curl_easy_getinfo(
me->m_curl_handle, CURLINFO_RESPONSE_CODE, &(me->responseCode));
if (rv != CURLE_OK) {
me->errorCode = "E_CURL_LIB";
me->errorMessage = "curl_easy_getinfo() failed.";
return 0;
}
}
if (me->getResponseCode() == me->expectedResponseCode &&
me->requestResult() != nullptr) {
if (!me->m_result_buffer_initialized) {
me->m_result_buffer_initialized = true;
me->m_result_buffer = *me->requestResult();
// std::cout << "Handling data for GET with response code " <<
// me->responseCode << "and expected response size " <<
// me->m_result.size() << "\n";
}
if (me->m_result_buffer.size() < source.size()) {
me->errorCode = "E_CURL_LIB";
me->errorMessage = "Curl had response with too-long result.";
return 0;
}
memcpy(const_cast<char *>(me->m_result_buffer.data()),
source.data(), source.size());
me->m_result_buffer = me->m_result_buffer.substr(source.size());
} else {
me->m_result.append(source);
}
} else {
me->m_result.append(source);
}
return (size * nmemb);
}

} // namespace

HTTPRequest::~HTTPRequest() {}

#define SET_CURL_SECURITY_OPTION(A, B, C) \
Expand Down Expand Up @@ -268,9 +302,26 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) {
return request;
}

bool HTTPRequest::sendPreparedRequest(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final) {
int HTTPRequest::XferInfoCallback(void *clientp, curl_off_t dltotal,
curl_off_t /*dlnow*/, curl_off_t ultotal,
curl_off_t /*ulnow*/) {
auto me = reinterpret_cast<HTTPRequest *>(clientp);
if ((me->m_bytes_recv != dltotal) || (me->m_bytes_sent != ultotal)) {
me->m_last_movement = std::chrono::steady_clock::now();
} else if (std::chrono::steady_clock::now() - me->m_last_movement >
m_transfer_stall) {
me->errorCode = "E_TIMEOUT";
me->errorMessage = "I/O stall during transfer";
return 1;
}
me->m_bytes_recv = dltotal;
me->m_bytes_sent = ultotal;
return 0;
}
bool HTTPRequest::sendPreparedRequestNonblocking(const std::string &uri,
const std::string_view payload,
off_t payload_size,
bool final) {
m_uri = uri;
m_payload = payload;
m_payload_size = payload_size;
Expand Down Expand Up @@ -306,6 +357,15 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri,
} else {
m_queue->Produce(this);
}
return true;
}

bool HTTPRequest::sendPreparedRequest(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final) {
if (!sendPreparedRequestNonblocking(uri, payload, payload_size, final)) {
return false;
}
std::unique_lock<std::mutex> lk(m_mtx);
m_cv.wait(lk, [&] { return m_result_ready; });

Expand Down Expand Up @@ -347,11 +407,11 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr);
curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, nullptr);
curl_easy_setopt(curl, CURLOPT_DEBUGDATA, nullptr);
curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L);
curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
curl_easy_setopt(curl, CURLOPT_POST, 0);
Expand Down Expand Up @@ -486,25 +546,37 @@ bool HTTPRequest::SetupHandle(CURL *curl) {
}
}

rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &appendToString);
rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &handleResults);
if (rv != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage =
"curl_easy_setopt( CURLOPT_WRITEFUNCTION ) failed.";
return false;
}

rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, &m_result);
rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
if (rv != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage = "curl_easy_setopt( CURLOPT_WRITEDATA ) failed.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) {
this->errorCode = "E_CURL_LIB";
this->errorMessage =
"curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed.";
errorCode = "E_CURL_LIB";
errorMessage = "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION,
HTTPRequest::XferInfoCallback) != CURLE_OK) {
errorCode = "E_CURL_LIB";
errorMessage = "Failed to set the transfer info callback function.";
return false;
}

if (curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this) != CURLE_OK) {
errorCode = "E_CURL_LIB";
errorMessage = "Failed to set the transfer info callback data.";
return false;
}

Expand Down
56 changes: 56 additions & 0 deletions src/HTTPCommands.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,36 @@ class HTTPRequest {
const std::string_view payload, off_t payload_size,
bool final);

// Send the request to the HTTP server.
// Returns immediately, not waiting for the result.
//
// If `final` is set to `false`, the HTTPRequest object will start streaming
// a request and assume that `sendPreparedRequest` will be repeated until
// all data is provided (the sum total of the chunks given is the
// payload_size). If payload_size is 0 and final is false, this indicates
// the complete size of the PUT is unknown and chunked encoding will be
// used.
//
// - url: URL, including query parameters, to use.
// - payload: The payload contents when uploading.
// - payload_size: Size of the entire payload (not just the current chunk).
// - final: True if this is the last or only payload for the request. False
// otherwise.
bool sendPreparedRequestNonblocking(const std::string &uri,
const std::string_view payload,
off_t payload_size, bool final);

// Called by the curl handler thread that the request has been finished.
virtual void Notify();

// Returns the standalone buffer if a sub-classe's externally-managed one
// is supposed to be used.
//
// If the std::string_view is empty, then it's assumed the HTTPRequest
// itself owns the result buffer and should create one. Note that,
// on errors, the HTTPRequest result buffer is still used.
virtual std::string_view *requestResult() { return nullptr; }

const std::string &getProtocol() { return m_protocol; }

// Returns true if the command is a streaming/partial request.
Expand Down Expand Up @@ -147,6 +174,9 @@ class HTTPRequest {
std::string errorMessage;
std::string errorCode;

// The contents of the result from the HTTP server.
// If this is a GET and we got the expectedResponseCode, then
// the results are populated in the m_result_buffer instead.
std::string m_result;
unsigned long responseCode{0};
unsigned long expectedResponseCode = 200;
Expand Down Expand Up @@ -184,6 +214,15 @@ class HTTPRequest {
// buffer.
static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v);

// Handle the callback from libcurl
static size_t handleResults(const void *ptr, size_t size, size_t nmemb,
void *me_ptr);

// Transfer information callback from libcurl
static int XferInfoCallback(void *clientp, curl_off_t dltotal,
curl_off_t dlnow, curl_off_t ultotal,
curl_off_t ulnow);

const TokenFile *m_token{nullptr};

// The following members manage the work queue and workers.
Expand All @@ -209,10 +248,27 @@ class HTTPRequest {
false}; // Flag indicating this command is a streaming request.
bool m_timeout{false}; // Flag indicating the request has timed out.
bool m_result_ready{false}; // Flag indicating the results data is ready.
bool m_result_buffer_initialized{
false}; // Flag indicating whether the result buffer view has been
// initialized.
off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown.
std::string m_protocol;
std::string m_uri; // URL to request from libcurl
std::string_view m_payload;

// Total number of bytes received from the server
off_t m_bytes_recv{0};
// Total number of bytes sent to server
off_t m_bytes_sent{0};
// Time of last data movement (upload or download). Used to detect transfer
// stalls
std::chrono::steady_clock::time_point m_last_movement;
// Transfer stall timeout
static constexpr std::chrono::steady_clock::duration m_transfer_stall{
std::chrono::seconds(10)};

// The contents of a successful GET request.
std::string_view m_result_buffer;
CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request
char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl
unsigned m_retry_count{0};
Expand Down
2 changes: 1 addition & 1 deletion src/HTTPFile.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ int HTTPFile::Fstat(struct stat *buff) {
size_t next_newline = std::string::npos;
size_t last_character = headers.size();
while (current_newline != std::string::npos &&
current_newline != last_character - 1) {
current_newline != last_character - 1 && last_character) {
next_newline = headers.find("\r\n", current_newline + 2);
std::string line =
substring(headers, current_newline + 2, next_newline);
Expand Down
Loading
Loading