From 99b50e8b3cde376595814e6113294dba0ac512ff Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 14 Dec 2024 14:13:37 -0600 Subject: [PATCH 1/6] Add buffering + prefetch scheme to S3 reads This keeps a 2-entry cache inside the S3File object. These cache entries round up small reads and allow prefetching in the case of linear reads. The goal is to establish pipelining when doing linear reads, allowing data to be read to the S3File cache entry before it is requested. --- src/HTTPCommands.cc | 110 +++++++-- src/HTTPCommands.hh | 56 +++++ src/S3Commands.cc | 45 ++-- src/S3Commands.hh | 47 +++- src/S3File.cc | 553 ++++++++++++++++++++++++++++++++++++++++-- src/S3File.hh | 110 +++++++++ test/s3_unit_tests.cc | 174 +++++++++++++ 7 files changed, 1032 insertions(+), 63 deletions(-) diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index 9cdf2b5..7e24305 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -46,8 +46,6 @@ std::vector HTTPRequest::m_workers; std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration = std::chrono::seconds(10); -namespace { - // // "This function gets called by libcurl as soon as there is data received // that needs to be saved. The size of the data pointed to by ptr is size @@ -59,20 +57,56 @@ namespace { // We also make extensive use of this function in the XML parsing code, // for pretty much exactly the same reason. // -size_t appendToString(const void *ptr, size_t size, size_t nmemb, void *str) { +size_t HTTPRequest::handleResults(const void *ptr, size_t size, size_t nmemb, + void *me_ptr) { if (size == 0 || nmemb == 0) { return 0; } - std::string source((const char *)ptr, size * nmemb); - std::string *ssptr = (std::string *)str; - ssptr->append(source); - + auto me = reinterpret_cast(me_ptr); + if (!me) { + return 0; + } + std::string_view source(static_cast(ptr), size * nmemb); + + // std::cout << "Handling results with size " << (size * nmemb) << " and + // HTTP verb " << me->httpVerb << "\n"; + if (me->httpVerb == "GET") { + if (!me->responseCode) { + auto rv = curl_easy_getinfo( + me->m_curl_handle, CURLINFO_RESPONSE_CODE, &(me->responseCode)); + if (rv != CURLE_OK) { + me->errorCode = "E_CURL_LIB"; + me->errorMessage = "curl_easy_getinfo() failed."; + return 0; + } + } + if (me->getResponseCode() == me->expectedResponseCode && + me->requestResult() != nullptr) { + if (!me->m_result_buffer_initialized) { + me->m_result_buffer_initialized = true; + me->m_result_buffer = *me->requestResult(); + // std::cout << "Handling data for GET with response code " << + // me->responseCode << "and expected response size " << + // me->m_result.size() << "\n"; + } + if (me->m_result_buffer.size() < source.size()) { + me->errorCode = "E_CURL_LIB"; + me->errorMessage = "Curl had response with too-long result."; + return 0; + } + memcpy(const_cast(me->m_result_buffer.data()), + source.data(), source.size()); + me->m_result_buffer = me->m_result_buffer.substr(source.size()); + } else { + me->m_result.append(source); + } + } else { + me->m_result.append(source); + } return (size * nmemb); } -} // namespace - HTTPRequest::~HTTPRequest() {} #define SET_CURL_SECURITY_OPTION(A, B, C) \ @@ -268,9 +302,26 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { return request; } -bool HTTPRequest::sendPreparedRequest(const std::string &uri, - const std::string_view payload, - off_t payload_size, bool final) { +int HTTPRequest::XferInfoCallback(void *clientp, curl_off_t dltotal, + curl_off_t /*dlnow*/, curl_off_t ultotal, + curl_off_t /*ulnow*/) { + auto me = reinterpret_cast(clientp); + if ((me->m_bytes_recv != dltotal) || (me->m_bytes_sent != ultotal)) { + me->m_last_movement = std::chrono::steady_clock::now(); + } else if (std::chrono::steady_clock::now() - me->m_last_movement > + m_transfer_stall) { + me->errorCode = "E_TIMEOUT"; + me->errorMessage = "I/O stall during transfer"; + return 1; + } + me->m_bytes_recv = dltotal; + me->m_bytes_sent = ultotal; + return 0; +} +bool HTTPRequest::sendPreparedRequestNonblocking(const std::string &uri, + const std::string_view payload, + off_t payload_size, + bool final) { m_uri = uri; m_payload = payload; m_payload_size = payload_size; @@ -306,6 +357,15 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, } else { m_queue->Produce(this); } + return true; +} + +bool HTTPRequest::sendPreparedRequest(const std::string &uri, + const std::string_view payload, + off_t payload_size, bool final) { + if (!sendPreparedRequestNonblocking(uri, payload, payload_size, final)) { + return false; + } std::unique_lock lk(m_mtx); m_cv.wait(lk, [&] { return m_result_ready; }); @@ -347,11 +407,11 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); - curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, nullptr); - curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, nullptr); + curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, nullptr); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_DEBUGDATA, nullptr); curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L); curl_easy_setopt(curl, CURLOPT_NOBODY, 0); curl_easy_setopt(curl, CURLOPT_POST, 0); @@ -486,7 +546,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { } } - rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &appendToString); + rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &handleResults); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = @@ -494,7 +554,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, &m_result); + rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_WRITEDATA ) failed."; @@ -502,9 +562,21 @@ bool HTTPRequest::SetupHandle(CURL *curl) { } if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) { - this->errorCode = "E_CURL_LIB"; - this->errorMessage = - "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed."; + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed."; + return false; + } + + if (curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, + HTTPRequest::XferInfoCallback) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the transfer info callback function."; + return false; + } + + if (curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the transfer info callback data."; return false; } diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 6f8f39d..3e33f64 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -114,9 +114,36 @@ class HTTPRequest { const std::string_view payload, off_t payload_size, bool final); + // Send the request to the HTTP server. + // Returns immediately, not waiting for the result. + // + // If `final` is set to `false`, the HTTPRequest object will start streaming + // a request and assume that `sendPreparedRequest` will be repeated until + // all data is provided (the sum total of the chunks given is the + // payload_size). If payload_size is 0 and final is false, this indicates + // the complete size of the PUT is unknown and chunked encoding will be + // used. + // + // - url: URL, including query parameters, to use. + // - payload: The payload contents when uploading. + // - payload_size: Size of the entire payload (not just the current chunk). + // - final: True if this is the last or only payload for the request. False + // otherwise. + bool sendPreparedRequestNonblocking(const std::string &uri, + const std::string_view payload, + off_t payload_size, bool final); + // Called by the curl handler thread that the request has been finished. virtual void Notify(); + // Returns the standalone buffer if a sub-classe's externally-managed one + // is supposed to be used. + // + // If the std::string_view is empty, then it's assumed the HTTPRequest + // itself owns the result buffer and should create one. Note that, + // on errors, the HTTPRequest result buffer is still used. + virtual std::string_view *requestResult() { return nullptr; } + const std::string &getProtocol() { return m_protocol; } // Returns true if the command is a streaming/partial request. @@ -147,6 +174,9 @@ class HTTPRequest { std::string errorMessage; std::string errorCode; + // The contents of the result from the HTTP server. + // If this is a GET and we got the expectedResponseCode, then + // the results are populated in the m_result_buffer instead. std::string m_result; unsigned long responseCode{0}; unsigned long expectedResponseCode = 200; @@ -184,6 +214,15 @@ class HTTPRequest { // buffer. static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v); + // Handle the callback from libcurl + static size_t handleResults(const void *ptr, size_t size, size_t nmemb, + void *me_ptr); + + // Transfer information callback from libcurl + static int XferInfoCallback(void *clientp, curl_off_t dltotal, + curl_off_t dlnow, curl_off_t ultotal, + curl_off_t ulnow); + const TokenFile *m_token{nullptr}; // The following members manage the work queue and workers. @@ -209,10 +248,27 @@ class HTTPRequest { false}; // Flag indicating this command is a streaming request. bool m_timeout{false}; // Flag indicating the request has timed out. bool m_result_ready{false}; // Flag indicating the results data is ready. + bool m_result_buffer_initialized{ + false}; // Flag indicating whether the result buffer view has been + // initialized. off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown. std::string m_protocol; std::string m_uri; // URL to request from libcurl std::string_view m_payload; + + // Total number of bytes received from the server + off_t m_bytes_recv{0}; + // Total number of bytes sent to server + off_t m_bytes_sent{0}; + // Time of last data movement (upload or download). Used to detect transfer + // stalls + std::chrono::steady_clock::time_point m_last_movement; + // Transfer stall timeout + static constexpr std::chrono::steady_clock::duration m_transfer_stall{ + std::chrono::seconds(10)}; + + // The contents of a successful GET request. + std::string_view m_result_buffer; CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl unsigned m_retry_count{0}; diff --git a/src/S3Commands.cc b/src/S3Commands.cc index b48d1e4..4a6ab9a 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -18,6 +18,7 @@ #include "S3Commands.hh" #include "AWSv4-impl.hh" +#include "S3File.hh" #include "shortfile.hh" #include "stl_string_utils.hh" @@ -43,7 +44,7 @@ bool AmazonRequest::SendRequest() { switch (signatureVersion) { case 4: { auto qs = canonicalizeQueryString(); - return sendV4Request(qs, qs.size(), true, true); + return sendV4Request(qs, qs.size(), true, true, true); } default: this->errorCode = "E_INTERNAL"; @@ -418,7 +419,7 @@ bool AmazonRequest::createV4Signature(const std::string_view payload, bool AmazonRequest::sendV4Request(const std::string_view payload, off_t payload_size, bool sendContentSHA, - bool final) { + bool final, bool blocking) { if ((getProtocol() != "http") && (getProtocol() != "https")) { this->errorCode = "E_INVALID_SERVICE_URL"; this->errorMessage = "Service URL not of a known protocol (http[s])."; @@ -447,13 +448,18 @@ bool AmazonRequest::sendV4Request(const std::string_view payload, if (!canonicalQueryString.empty()) { url += "?" + canonicalQueryString; } - return sendPreparedRequest(url, payload, payload_size, final); + if (blocking) { + return sendPreparedRequest(url, payload, payload_size, final); + } else { + return sendPreparedRequestNonblocking(url, payload, payload_size, + final); + } } -// It's stated in the API documentation that you can upload to any region -// via us-east-1, which is moderately crazy. +// Send a request to a S3 backend bool AmazonRequest::SendS3Request(const std::string_view payload, - off_t payload_size, bool final) { + off_t payload_size, bool final, + bool blocking) { if (!m_streamingRequest && !final) { if (payload_size == 0) { errorCode = "E_INTERNAL"; @@ -469,7 +475,8 @@ bool AmazonRequest::SendS3Request(const std::string_view payload, if (region.empty()) { region = "us-east-1"; } - return sendV4Request(payload, payload_size, !m_streamingRequest, final); + return sendV4Request(payload, payload_size, !m_streamingRequest, final, + blocking); } // --------------------------------------------------------------------------- @@ -478,7 +485,7 @@ AmazonS3Upload::~AmazonS3Upload() {} bool AmazonS3Upload::SendRequest(const std::string_view &payload) { httpVerb = "PUT"; - return SendS3Request(payload, payload.size(), true); + return SendS3Request(payload, payload.size(), true, true); } // --------------------------------------------------------------------------- @@ -502,7 +509,7 @@ bool AmazonS3CompleteMultipartUpload::SendRequest( } payload += ""; - return SendS3Request(payload, payload.size(), true); + return SendS3Request(payload, payload.size(), true, true); } // --------------------------------------------------------------------------- @@ -514,7 +521,7 @@ bool AmazonS3CreateMultipartUpload::SendRequest() { query_parameters["x-id"] = "CreateMultipartUpload"; httpVerb = "POST"; - return SendS3Request("", 0, true); + return SendS3Request("", 0, true, true); } bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, @@ -525,7 +532,7 @@ bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, query_parameters["uploadId"] = uploadId; includeResponseHeader = true; httpVerb = "PUT"; - return SendS3Request(payload, payloadSize, final); + return SendS3Request(payload, payloadSize, final, true); } // --------------------------------------------------------------------------- @@ -540,21 +547,29 @@ bool AmazonS3Download::SendRequest(off_t offset, size_t size) { headers["Range"] = range.c_str(); this->expectedResponseCode = 206; } + if (size && m_buffer) { + m_buffer_view = std::string_view(m_buffer, size); + } httpVerb = "GET"; - std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed, 0, true); + return SendS3Request("", 0, true, IsBlocking()); } // --------------------------------------------------------------------------- +template +AmazonS3NonblockingDownload::~AmazonS3NonblockingDownload() {} +template class AmazonS3NonblockingDownload; + +// --------------------------------------------------------------------------- + AmazonS3Head::~AmazonS3Head() {} bool AmazonS3Head::SendRequest() { httpVerb = "HEAD"; includeResponseHeader = true; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed, 0, true); + return SendS3Request(noPayloadAllowed, 0, true, true); } void AmazonS3Head::parseResponse() { @@ -615,7 +630,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { hostUrl = getProtocol() + "://" + host + bucketPath; canonicalURI = bucketPath; - return SendS3Request("", 0, true); + return SendS3Request("", 0, true, true); } bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 8f60133..a86dfe3 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -92,8 +92,10 @@ class AmazonRequest : public HTTPRequest { // - payload_size: final size of the payload for uploads; 0 if unknown. // - final: True if this is the last (or only) payload of the request; false // otherwise + // - blocking: True if the method should block on a response; false + // otherwise virtual bool SendS3Request(const std::string_view payload, - off_t payload_size, bool final); + off_t payload_size, bool final, bool blocking); static void Init(XrdSysError &log) { HTTPRequest::Init(log); } @@ -107,8 +109,10 @@ class AmazonRequest : public HTTPRequest { // the final payload. Servers may verify this is what they received. // - final: True if this is the last (or only) payload of the request; false // otherwise. + // - blocking: True if this method should block until a response; false + // otherwise bool sendV4Request(const std::string_view payload, off_t payload_size, - bool sendContentSHA, bool final); + bool sendContentSHA, bool final, bool blocking); bool retainObject; bool m_streamingRequest{ @@ -246,23 +250,52 @@ class AmazonS3SendMultipartPart final : public AmazonRequest { protected: }; -class AmazonS3Download final : public AmazonRequest { +class AmazonS3Download : public AmazonRequest { using AmazonRequest::SendRequest; public: AmazonS3Download(const S3AccessInfo &ai, const std::string &objectName, - XrdSysError &log) - : AmazonRequest(ai, objectName, log) {} + XrdSysError &log, char *buffer) + : AmazonRequest(ai, objectName, log), m_buffer(buffer) {} AmazonS3Download(const std::string &s, const std::string &akf, const std::string &skf, const std::string &b, const std::string &o, const std::string &style, - XrdSysError &log) - : AmazonRequest(s, akf, skf, b, o, style, 4, log) {} + XrdSysError &log, char *buffer) + : AmazonRequest(s, akf, skf, b, o, style, 4, log), m_buffer(buffer) {} virtual ~AmazonS3Download(); virtual bool SendRequest(off_t offset, size_t size); + + protected: + virtual bool IsBlocking() { return true; } + virtual std::string_view *requestResult() override { + return &m_buffer_view; + } + + private: + char *m_buffer{nullptr}; + std::string_view m_buffer_view; +}; + +template +class AmazonS3NonblockingDownload final : public AmazonS3Download { + + public: + AmazonS3NonblockingDownload(const S3AccessInfo &ai, + const std::string &objectName, XrdSysError &log, + char *buffer, T ¬ifier) + : AmazonS3Download(ai, objectName, log, buffer), m_notifier(notifier) {} + + virtual ~AmazonS3NonblockingDownload(); + + protected: + virtual bool IsBlocking() override { return false; } + virtual void Notify() override { m_notifier.Notify(); } + + private: + T &m_notifier; }; class AmazonS3Head final : public AmazonRequest { diff --git a/src/S3File.cc b/src/S3File.cc index 81871e0..b71543e 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -114,31 +114,17 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { // This flag is not set when it's going to be a read operation // so we check if the file exists in order to be able to return a 404 if (!Oflag || (Oflag & O_APPEND)) { - AmazonS3Head head(m_ai, m_object, m_log); - - if (!head.SendRequest()) { - return -ENOENT; + auto res = Fstat(nullptr); + if (res < 0) { + return res; } - head.getSize(); } return 0; } ssize_t S3File::Read(void *buffer, off_t offset, size_t size) { - AmazonS3Download download(m_ai, m_object, m_log); - - if (!download.SendRequest(offset, size)) { - std::stringstream ss; - ss << "Failed to send GetObject command: " << download.getResponseCode() - << "'" << download.getResultString() << "'"; - m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); - return 0; - } - - const std::string &bytes = download.getResultString(); - memcpy(buffer, bytes.data(), bytes.size()); - return bytes.size(); + return m_cache.Read(*this, static_cast(buffer), offset, size); } int S3File::Fstat(struct stat *buff) { @@ -186,6 +172,9 @@ int S3File::Fstat(struct stat *buff) { buff->st_dev = 0; buff->st_ino = 0; } + if (!buff) { + return 0; + } return 0; } @@ -324,11 +313,11 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { } auto is_final = (m_part_size > 0 && m_part_written == m_part_size) || m_part_written == m_s3_part_size; - if (m_log.getMsgMask() & LogMask::Dump) { + if (m_log.getMsgMask() & LogMask::Debug) { std::stringstream ss; ss << "Sending request with buffer of size=" << write_size - << " and is_final=" << is_final; - m_log.Log(LogMask::Dump, "ContinueSendPart", ss.str().c_str()); + << ", offset=" << m_write_offset << " and is_final=" << is_final; + m_log.Log(LogMask::Debug, "ContinueSendPart", ss.str().c_str()); } if (!m_write_op->SendRequest( std::string_view(static_cast(buffer), write_size), @@ -356,7 +345,7 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { m_write_offset = -1; return -EIO; } - std::size_t endPos = resultString.find("\"", startPos + 7); + std::size_t endPos = resultString.find('"', startPos + 7); if (startPos == std::string::npos) { m_log.Emsg("Write", "Result from S3 does not include ETag end-character:", @@ -486,6 +475,526 @@ int S3File::Close(long long *retsz) { return 0; } +// Copy any overlapping data from the cache buffer into the request buffer, +// returning the remaining data necessary to fill the request. +// +// - `req_off`: File offset of the beginning of the request buffer. +// - `req_size`: Size of the request buffer +// - `req_buf`: Request buffer to copy data into +// - `cache_off`: File offset of the beginning of the cache buffer. +// - `cache_size`: Size of the cache buffer +// - `cache_buf`: Cache buffer to copy data from. +// - `used` (output): Incremented by the number of bytes copied from the cache +// buffer +// - Returns the (offset, size) of the remaining reads needed to satisfy the +// request. If there is only one (or no!) remaining reads, then the +// corresponding tuple returned is (-1, 0). +std::tuple +OverlapCopy(off_t req_off, size_t req_size, char *req_buf, off_t cache_off, + size_t cache_size, char *cache_buf, size_t &used) { + if (req_off < 0) { + return std::make_tuple(req_off, req_size, -1, 0); + } + if (cache_off < 0) { + return std::make_tuple(req_off, req_size, -1, 0); + } + + if (cache_off <= req_off) { + auto cache_end = cache_off + cache_size; + if (cache_end > req_off) { + auto cache_buf_off = static_cast(req_off - cache_off); + auto cache_copy_bytes = + std::min(static_cast(cache_end - req_off), req_size); + memcpy(req_buf, cache_buf + cache_buf_off, cache_copy_bytes); + used += cache_copy_bytes; + return std::make_tuple(req_off + cache_copy_bytes, + req_size - cache_copy_bytes, -1, 0); + } + } + if (req_off < cache_off) { + auto req_end = static_cast(req_off + req_size); + if (req_end > cache_off) { + auto req_buf_off = static_cast(cache_off - req_off); + auto cache_end = static_cast(cache_off + cache_size); + auto trailing_bytes = static_cast(req_end - cache_end); + if (trailing_bytes > 0) { + memcpy(req_buf + req_buf_off, cache_buf, cache_size); + used += cache_size; + return std::make_tuple(req_off, req_buf_off, cache_end, + trailing_bytes); + } + memcpy(req_buf + req_buf_off, cache_buf, req_end - cache_off); + used += req_end - cache_off; + return std::make_tuple(req_off, req_buf_off, -1, 0); + } + } + return std::make_tuple(req_off, req_size, -1, 0); +} + +std::tuple +S3File::S3Cache::Entry::OverlapCopy(off_t req_off, size_t req_size, + char *req_buf) { + size_t bytes_copied = 0; + auto results = + ::OverlapCopy(req_off, req_size, req_buf, m_off, m_cache_entry_size, + m_data.data(), bytes_copied); + m_parent.m_hit_bytes += bytes_copied; + m_used += bytes_copied; + return results; +} + +std::tuple +S3File::DownloadBypass(off_t offset, size_t size, char *buffer) { + if (size <= m_cache_entry_size) { + return std::make_tuple(offset, size, false); + } + AmazonS3Download download(m_ai, m_object, m_log, buffer); + if (!download.SendRequest(offset, size)) { + std::stringstream ss; + ss << "Failed to send GetObject command: " << download.getResponseCode() + << "'" << download.getResultString() << "'"; + m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); + return std::make_tuple(0, -1, false); + } + return std::make_tuple(-1, 0, true); +} + +bool S3File::S3Cache::CouldUseAligned(off_t req, off_t cache) { + if (req < 0 || cache < 0) { + return false; + } + return (req >= cache) && (req < cache + S3File::m_cache_entry_size); +} + +bool S3File::S3Cache::CouldUse(off_t req_off, size_t req_size, + off_t cache_off) { + if (req_off < 0 || cache_off < 0) { + return false; + } + auto cache_end = cache_off + m_cache_entry_size; + if (req_off >= cache_off) { + return req_off < cache_end; + } else { + return req_off + req_size > cache_off; + } +} + +void S3File::S3Cache::DownloadCaches(S3File &file, bool download_a, + bool download_b, bool locked) { + if (!download_a && !download_b) { + return; + } + + std::unique_lock lk(m_mutex, std::defer_lock); + if (!locked) { + lk.lock(); + } + if (download_a) { + m_a.Download(file); + } + if (download_b) { + m_b.Download(file); + } +} + +ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, + size_t size) { + if (offset >= file.content_length) { + return 0; + } + if (offset + size > file.content_length) { + size = file.content_length - offset; + } + if (file.m_log.getMsgMask() & LogMask::Debug) { + std::stringstream ss; + ss << "Read request for offset=" << offset << ", size=" << size; + file.m_log.Log(LogMask::Debug, "cache", ss.str().c_str()); + } + + off_t req3_off, req4_off, req5_off, req6_off; + size_t req3_size, req4_size, req5_size, req6_size; + // Copy as much data out of the cache as possible; wait for the caches to + // finish their downloads if a cache fill is in progress and we could + // utilize the cache fill. + { + std::unique_lock lk{m_mutex}; + if (m_a.m_inprogress) { + m_cv.wait(lk, [&] { + return !m_a.m_inprogress || !CouldUse(offset, size, m_a.m_off); + }); + } + off_t req1_off, req2_off; + size_t req1_size, req2_size; + std::tie(req1_off, req1_size, req2_off, req2_size) = + m_a.OverlapCopy(offset, size, buffer); + if (m_b.m_inprogress) { + m_cv.wait(lk, [&] { + return !m_b.m_inprogress || + !(CouldUse(req1_off, req1_size, m_b.m_off) || + CouldUse(req2_off, req2_size, m_b.m_off)); + }); + } + std::tie(req3_off, req3_size, req4_off, req4_size) = + m_b.OverlapCopy(req1_off, req1_size, buffer + req1_off - offset); + std::tie(req5_off, req5_size, req6_off, req6_size) = + m_b.OverlapCopy(req2_off, req2_size, buffer + req2_off - offset); + } + // If any of the remaining missing bytes are bigger than a single chunk, + // download those bypassing the cache. + bool downloaded; + size_t bypass_size = req3_size; + std::tie(req3_off, req3_size, downloaded) = + file.DownloadBypass(req3_off, req3_size, buffer + req3_off - offset); + if (req3_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req4_size; + std::tie(req4_off, req4_size, downloaded) = + file.DownloadBypass(req4_off, req4_size, buffer + req4_off - offset); + if (req4_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req5_size; + std::tie(req5_off, req5_size, downloaded) = + file.DownloadBypass(req5_off, req5_size, buffer + req5_off - offset); + if (req5_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req6_size; + std::tie(req6_off, req6_size, downloaded) = + file.DownloadBypass(req6_off, req6_size, buffer + req6_off - offset); + if (req6_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + if (req3_size == 0 && req4_size == 0 && req5_size == 0 && req6_size == 0) { + m_full_hit_count += 1; + // We've used more bytes in the cache, potentially all of the bytes. + // In that case, we could drop one of the cache entries and prefetch + // more of the object. + bool download_a = false, download_b = false; + { + std::unique_lock lk{m_mutex}; + auto next_offset = + std::max(m_a.m_off, m_b.m_off) + m_cache_entry_size; + if (next_offset < file.content_length) { + if (!m_a.m_inprogress && m_a.m_used >= m_cache_entry_size) { + m_a.m_inprogress = true; + m_a.m_off = next_offset; + download_a = true; + next_offset += m_cache_entry_size; + } + if (!m_b.m_inprogress && m_b.m_used >= m_cache_entry_size) { + m_b.m_inprogress = true; + m_b.m_off = next_offset; + download_b = true; + } + } + } + if (download_a) { + m_prefetch_count++; + m_prefetch_bytes += m_cache_entry_size; + } + if (download_b) { + m_prefetch_count++; + m_prefetch_bytes += m_cache_entry_size; + } + DownloadCaches(file, download_a, download_b, false); + return size; + } + // At this point, the only remaining data requests must be less than the + // size of the cache chunk, implying it's a partial request at the beginning + // or end of the range -- hence only two can exist. + off_t req1_off = -1, req2_off = -1; + off_t *req_off = &req1_off; + size_t req1_size = 0, req2_size = 0; + size_t *req_size = &req1_size; + if (req3_off != -1) { + *req_off = req3_off; + *req_size = req3_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req4_off != -1) { + *req_off = req4_off; + *req_size = req4_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req5_off != -1) { + *req_off = req5_off; + *req_size = req5_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req6_off != -1) { + *req_off = req6_off; + *req_size = req6_size; + } + if (req1_off != -1 && req2_off == -1) { + auto chunk_off = static_cast(req1_off / m_cache_entry_size * + m_cache_entry_size + + m_cache_entry_size); + auto req_end = static_cast(req1_off + req1_size); + + if (req_end > chunk_off) { + req2_off = chunk_off; + req2_size = req_end - chunk_off; + req1_size = chunk_off - req1_off; + } + } + size_t miss_bytes = req1_size + req2_size; + if (miss_bytes == size) { + m_miss_count += 1; + } else { + m_partial_hit_count += 1; + } + m_miss_bytes += miss_bytes; + unsigned fetch_attempts = 0; + while (req1_off != -1) { + std::unique_lock lk(m_mutex); + m_cv.wait(lk, [&] { + bool req1waitOnA = + m_a.m_inprogress && CouldUseAligned(req1_off, m_a.m_off); + bool req2waitOnA = + m_a.m_inprogress && CouldUseAligned(req2_off, m_a.m_off); + bool req1waitOnB = + m_b.m_inprogress && CouldUseAligned(req1_off, m_b.m_off); + bool req2waitOnB = + m_b.m_inprogress && CouldUseAligned(req2_off, m_b.m_off); + // If there's an idle cache entry, use it -- unless the other cache + // entry is working on this request. + if (!m_a.m_inprogress && !req1waitOnB && !req2waitOnB) { + return true; + } + if (!m_b.m_inprogress && !req1waitOnA && !req2waitOnA) { + return true; + } + // If an idle cache entry can immediately satisfy the request, we + // use it. + if (!m_a.m_inprogress && (CouldUseAligned(req1_off, m_a.m_off) || + CouldUseAligned(req2_off, m_a.m_off))) { + return true; + } + if (!m_b.m_inprogress && (CouldUseAligned(req1_off, m_b.m_off) || + CouldUseAligned(req2_off, m_b.m_off))) { + return true; + } + // If either request is in progress, we continue to wait. + if (req1waitOnA || req1waitOnB || req2waitOnA || req2waitOnB) { + return false; + } + // If either cache is idle, we will use it. + return !m_a.m_inprogress || !m_b.m_inprogress; + }); + // std::cout << "A entry in progress: " << m_a.m_inprogress + // << ", with offset " << m_a.m_off << "\n"; + // std::cout << "B entry in progress: " << m_b.m_inprogress + // << ", with offset " << m_b.m_off << "\n"; + // Test to see if any of the buffers could immediately fulfill the + // requests. + auto consumed_req = false; + if (!m_a.m_inprogress) { + if (CouldUseAligned(req2_off, m_a.m_off)) { + if (m_a.m_failed) { + m_a.m_failed = false; + m_a.m_off = -1; + m_errors += 1; + return -1; + } + m_a.OverlapCopy(req2_off, req2_size, + buffer + req2_off - offset); + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + if (CouldUseAligned(req1_off, m_a.m_off)) { + if (m_a.m_failed) { + m_a.m_failed = false; + m_a.m_off = -1; + m_errors += 1; + return -1; + } + m_a.OverlapCopy(req1_off, req1_size, + buffer + req1_off - offset); + req1_off = req2_off; + req1_size = req2_size; + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + } + if (!m_b.m_inprogress) { + if (CouldUseAligned(req2_off, m_b.m_off)) { + if (m_b.m_failed) { + m_b.m_failed = false; + m_b.m_off = -1; + m_errors += 1; + return -1; + } + m_b.OverlapCopy(req2_off, req2_size, + buffer + req2_off - offset); + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + if (CouldUseAligned(req1_off, m_b.m_off)) { + if (m_b.m_failed) { + m_b.m_failed = false; + m_b.m_off = -1; + m_errors += 1; + return -1; + } + m_b.OverlapCopy(req1_off, req1_size, + buffer + req1_off - offset); + req1_off = req2_off; + req1_size = req2_size; + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + } + if (consumed_req) { + continue; + } + + // No caches serve our requests - we must kick off a new download + // std::cout << "Will download data via cache; req1 offset=" << req1_off + // << ", req2 offset=" << req2_off << "\n"; + fetch_attempts++; + bool download_a = false, download_b = false, prefetch_b = false; + if (!m_a.m_inprogress && m_b.m_inprogress) { + m_a.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_a.m_inprogress = true; + download_a = true; + } else if (m_a.m_inprogress && !m_b.m_inprogress) { + m_b.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_b.m_inprogress = true; + download_b = true; + } else if (!m_a.m_inprogress && !m_b.m_inprogress) { + if (req2_off != -1) { + m_a.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_a.m_inprogress = true; + download_a = true; + m_b.m_off = req2_off / m_cache_entry_size * m_cache_entry_size; + m_b.m_inprogress = true; + download_b = true; + } else { + if (m_a.m_used >= m_cache_entry_size) { + // Cache A is fully read -- let's empty it + m_a.m_off = m_b.m_off; + m_b.m_off = -1; + m_a.m_used = m_b.m_used; + m_b.m_used = 0; + std::swap(m_a.m_data, m_b.m_data); + } + if (m_a.m_used >= m_cache_entry_size) { + // Both caches were fully read -- empty the second one. + m_a.m_off = -1; + m_a.m_used = 0; + } + if (m_a.m_off == -1 && m_b.m_off == -1) { + // Prefetch both caches at once + m_a.m_off = + req1_off / m_cache_entry_size * m_cache_entry_size; + auto prefetch_offset = m_a.m_off + m_cache_entry_size; + ; + download_a = true; + m_a.m_inprogress = true; + if (prefetch_offset < file.content_length) { + m_b.m_off = prefetch_offset; + prefetch_b = true; + m_b.m_inprogress = true; + } + } else { + // Select one cache entry to fetch data. + auto needed_off = + req1_off / m_cache_entry_size * m_cache_entry_size; + if (needed_off > m_a.m_off) { + m_b.m_off = needed_off; + download_b = true; + m_b.m_inprogress = true; + auto bytes_unused = + static_cast(m_cache_entry_size) - + static_cast(m_b.m_used); + m_unused_bytes += bytes_unused < 0 ? 0 : bytes_unused; + } else { + m_a.m_off = needed_off; + download_a = true; + m_a.m_inprogress = true; + auto bytes_unused = + static_cast(m_cache_entry_size) - + static_cast(m_a.m_used); + m_unused_bytes += bytes_unused < 0 ? 0 : bytes_unused; + } + } + } + } // else both caches are in-progress and neither satisfied our needs + if (download_a) { + m_fetch_count += 1; + m_fetch_bytes += m_cache_entry_size; + } + if (download_b) { + m_fetch_count += 1; + m_fetch_bytes += m_cache_entry_size; + } + if (prefetch_b) { + m_prefetch_count += 1; + m_prefetch_bytes += m_cache_entry_size; + } + DownloadCaches(file, download_a, download_b || prefetch_b, true); + } + return size; +} + +void S3File::S3Cache::Entry::Notify() { + std::unique_lock lk(m_parent.m_mutex); + m_inprogress = false; + m_failed = !m_request->getErrorCode().empty(); + m_request = nullptr; + + m_parent.m_cv.notify_all(); +} + +void S3File::S3Cache::Entry::Download(S3File &file) { + m_used = false; + m_data.resize(m_cache_entry_size); + m_request.reset(new AmazonS3NonblockingDownload( + file.m_ai, file.m_object, file.m_log, m_data.data(), *this)); + size_t request_size = m_cache_entry_size; + if (m_off + request_size > file.content_length) { + request_size = file.content_length - m_off; + } + if (!m_request->SendRequest(m_off, m_cache_entry_size)) { + std::stringstream ss; + ss << "Failed to send GetObject command: " + << m_request->getResponseCode() << "'" + << m_request->getResultString() << "'"; + file.m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); + m_failed = true; + m_request.reset(); + } +} + extern "C" { /* diff --git a/src/S3File.hh b/src/S3File.hh index abd69ee..622cbfb 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ int parse_path(const S3FileSystem &fs, const char *path, std::string &exposedPath, std::string &object); class AmazonS3SendMultipartPart; +template class AmazonS3NonblockingDownload; class S3File : public XrdOssDF { public: @@ -124,6 +126,19 @@ class S3File : public XrdOssDF { ssize_t SendPartStreaming(); ssize_t ContinueSendPart(const void *buffer, size_t size); + + // Download data synchronously, bypassing the cache. + // The download is only performed if the request size is larger than a cache + // entry. + // + // - `offset`: File offset of the request. + // - `size`: Size of the request. + // - `buffer`: Buffer to place resulting data into. + // - Returns the (offset, size) of any remaining read and `true` if a + // download occured. + std::tuple DownloadBypass(off_t offset, size_t size, + char *buffer); + XrdSysError &m_log; S3FileSystem *m_oss; @@ -136,6 +151,9 @@ class S3File : public XrdOssDF { static const size_t m_s3_part_size = 100'000'000; // The size of each S3 chunk. + static constexpr size_t m_cache_entry_size = + (2 * 1024 * 1024); // Size of the buffer associated with the cache + bool m_create{false}; int partNumber; size_t m_part_written{ @@ -182,4 +200,96 @@ class S3File : public XrdOssDF { // Flag determining whether the monitoring thread has been launched. static std::once_flag m_monitor_launch; + + // The double-buffering component for the file handle. Reads are rounded up + // to a particular size and kept in the file handle; before requesting new + // data, the cache is searched to see if the read can be serviced from + // memory. When possible, a forward prefetch is done + struct S3Cache { + struct Entry { + bool m_failed{false}; // Indication as to whether last download + // attempt failed for cache entry. + bool m_inprogress{ + false}; // Indication as to whether a download is in-progress. + off_t m_off{-1}; // File offset of the beginning of the cache entry. + // -1 signifies unused entry + size_t m_used{ + 0}; // The number of bytes read out of the current cache entry. + std::vector m_data; // Contents of cache entry + S3Cache &m_parent; // Reference to owning object + std::unique_ptr> + m_request; // In-progress request to fill entry. + + Entry(S3Cache &cache) : m_parent(cache) {} + void Download( + S3File &); // Trigger download request for this cache entry. + void Notify(); // Notify containing cache that the entry's + // in-progress operation has completed. + + // Copy any overlapping data from the cache buffer into the request + // buffer, returning the remaining data necessary to fill the + // request. + // + // - `req_off`: File offset of the beginning of the request buffer. + // - `req_size`: Size of the request buffer + // - `req_buf`: Request buffer to copy data into + // - Returns the (offset, size) of the remaining reads needed to + // satisfy the request. If there is only one (or no!) remaining + // reads, then the corresponding tuple returned is (-1, 0). + std::tuple + OverlapCopy(off_t req_off, size_t req_size, char *req_buf); + }; + friend class AmazonS3NonblockingDownload; + + std::atomic m_hit_bytes{0}; // Bytes served from the cache. + std::atomic m_miss_bytes{ + 0}; // Bytes that resulted in a cache miss. + std::atomic m_full_hit_count{ + 0}; // Requests completely served from the cache. + std::atomic m_partial_hit_count{ + 0}; // Requests partially served from the cache. + std::atomic m_miss_count{ + 0}; // Requests that had no data served from the cache. + std::atomic m_bypass_bytes{ + 0}; // Bytes for requests that were large enough they bypassed the + // cache and fetched directly from S3. + std::atomic m_bypass_count{ + 0}; // Requests that were large enough they (at least partially) + // bypassed the cache and fetched directly from S3. + std::atomic m_fetch_bytes{ + 0}; // Bytes that were fetched from S3 to serve a cache miss. + std::atomic m_fetch_count{ + 0}; // Requests sent to S3 to serve a cache miss. + std::atomic m_unused_bytes{ + 0}; // Bytes that were unused at cache eviction. + std::atomic m_prefetch_bytes{0}; // Bytes prefetched + std::atomic m_prefetch_count{0}; // Number of prefetch requests + std::atomic m_errors{0}; // Count of errors encountered by cache. + + Entry m_a{*this}; // Cache entry A. Protected by m_mutex. + Entry m_b{*this}; // Cache entry B. Protected by m_mutex. + std::mutex m_mutex; // Mutex protecting the data in the S3Cache object + std::condition_variable m_cv; // Condition variable for notifying that + // new downloaded data is available. + + // Returns `true` if the request offset would be inside the cache entry. + // The request offset is assumed to be aligned to be inside a single + // cache entry (that is, smaller than a cache entry and not spanning two + // entries). + bool CouldUseAligned(off_t req, off_t cache); + + // Returns true if the specified request, [req_off, req_off + req_size), + // has any bytes inside the cache entry starting at `cache_off`. + bool CouldUse(off_t req_off, size_t req_size, off_t cache_off); + + // Trigger the non-blocking download into the cache entries. + // The condition variable will be notified when one of the caches + // finishes. + void DownloadCaches(S3File &file, bool download_a, bool download_b, + bool locked); + + // Trigger a blocking read from a given file + ssize_t Read(S3File &file, char *buffer, off_t offset, size_t size); + }; + S3Cache m_cache; }; diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index a39ab89..92b83f2 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -33,6 +33,7 @@ #include #include #include +#include std::once_flag g_init_once; std::string g_ca_file; @@ -155,7 +156,73 @@ s3.end VerifyContents(fs, name, writeSize, chunkByte, chunkSize); } + void RandomRead(const std::string &name, unsigned char chunkByte, + size_t chunkSize, + std::chrono::steady_clock::duration testLength) { + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + struct stat buf; + rv = fh->Fstat(&buf); + ASSERT_EQ(rv, 0); + auto objSize = buf.st_size; + + auto startTime = std::chrono::steady_clock::now(); + size_t maxReadSize = 5'000'000; + std::string readBuf; + readBuf.resize(maxReadSize); + std::string correctContents; + correctContents.resize(maxReadSize); + while (std::chrono::steady_clock::now() - startTime < testLength) { + size_t readSize = std::rand() % maxReadSize; + off_t off = std::rand() % objSize; + size_t expectedReadSize = + (static_cast(readSize) + off - objSize > 0) + ? (objSize - off) + : readSize; + readBuf.resize(expectedReadSize); + rv = fh->Read(readBuf.data(), off, readSize); + ASSERT_EQ(rv, expectedReadSize); + GenCorrectContents(correctContents, off, expectedReadSize, + chunkByte, chunkSize, objSize); + ASSERT_EQ(readBuf, correctContents); + } + } + private: + void GenCorrectContents(std::string &correctContents, off_t off, + size_t size, unsigned char chunkByte, + size_t chunkSize, size_t objSize) { + auto chunkNum = static_cast(off / chunkSize); + auto curChunkByte = static_cast(chunkByte + chunkNum); + off_t chunkBoundary = (chunkNum + 1) * chunkSize; + correctContents.resize(size); + if (chunkBoundary < off + size) { + size_t firstLen = chunkBoundary - off; + std::string firstChunk(firstLen, curChunkByte); + correctContents.replace(0, firstLen, firstChunk); + auto iter = correctContents.begin() + firstLen; + off_t remaining = size - firstLen; + while (remaining) { + curChunkByte++; + auto chunkLen = (remaining > chunkSize) ? chunkSize : remaining; + std::string chunk(chunkLen, curChunkByte); + std::copy(chunk.begin(), chunk.end(), iter); + iter += chunkLen; + remaining -= chunkLen; + } + } else { + correctContents = std::string(size, curChunkByte); + } + } + void VerifyContents(S3FileSystem &fs, const std::string &obj, off_t expectedSize, unsigned char chunkByte, size_t chunkSize) { @@ -400,6 +467,113 @@ TEST_F(FileSystemS3Fixture, InvalidObject) { ASSERT_EQ(fs.Stat("/test/trailing/", &buf, 0, nullptr), -ENOENT); } +// Check out the logic of the overlap copy routine. +std::tuple +OverlapCopy(off_t req_off, size_t req_size, char *req_buf, off_t cache_off, + size_t cache_size, char *cache_buf, size_t &used); +TEST(OverlapCopy, Simple) { + std::string repeatA(4096, 'a'); + std::string repeatB(4096, 'b'); + size_t used{0}; + auto [req1_off, req1_size, req2_off, req2_size] = + OverlapCopy(0, 4096, repeatA.data(), 4096, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 0); + + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), 2048, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 2048); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 2048); + auto correctOverlap = std::string(2048, 'a') + std::string(2048, 'b'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), 1024, 1024, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 1024); + ASSERT_EQ(req2_off, 2048); + ASSERT_EQ(req2_size, 2048); + ASSERT_EQ(used, 1024); + correctOverlap = std::string(1024, 'a') + std::string(1024, 'b') + + std::string(2048, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(1024, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 4096); + ASSERT_EQ(req1_size, 1024); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 3072); + correctOverlap = std::string(3072, 'b') + std::string(1024, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(4096, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 4096); + ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 0); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(-1, 0, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, -1); + ASSERT_EQ(req1_size, 0); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 0); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), -1, 0, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0); + ASSERT_EQ(used, 0); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); +} + +TEST_F(FileSystemS3Fixture, StressGet) { + // Upload a file + auto name = "/test/write_stress.txt"; + WritePattern(name, 100'000'000, 'a', 1'000'000, true); + + static const int workerThreads = 10; + std::vector> threads; + threads.resize(workerThreads); + for (auto &tptr : threads) { + tptr.reset(new std::thread([&] { + RandomRead(name, 'a', 1'000'000, std::chrono::seconds(5)); + })); + } + std::cout << "Launched all " << workerThreads << " threads" << std::endl; + for (const auto &tptr : threads) { + tptr->join(); + } +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From 271b803e3170a935c78fea2af418da0cc585819c Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 14 Dec 2024 14:14:40 -0600 Subject: [PATCH 2/6] Log exceptions in the curl worker at the error level --- src/CurlUtil.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 66b7ee8..d1a28f7 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -172,9 +172,9 @@ HTTPRequest *HandlerQueue::TryConsume() { void CurlWorker::RunStatic(CurlWorker *myself) { try { myself->Run(); - } catch (...) { - myself->m_logger.Log(LogMask::Debug, "CurlWorker::RunStatic", - "Curl worker got an exception"); + } catch (std::exception &exc) { + myself->m_logger.Log(LogMask::Error, "CurlWorker::RunStatic", + "Curl worker got an exception:", exc.what()); } } From dcf151830d46ece6cbf8d1ae0d9d6aa814c60ccc Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 23 Nov 2024 23:31:07 -0600 Subject: [PATCH 3/6] Prevent segfault if there's no header passed. --- src/HTTPFile.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/HTTPFile.cc b/src/HTTPFile.cc index 0506de7..dff96ee 100644 --- a/src/HTTPFile.cc +++ b/src/HTTPFile.cc @@ -202,7 +202,7 @@ int HTTPFile::Fstat(struct stat *buff) { size_t next_newline = std::string::npos; size_t last_character = headers.size(); while (current_newline != std::string::npos && - current_newline != last_character - 1) { + current_newline != last_character - 1 && last_character) { next_newline = headers.find("\r\n", current_newline + 2); std::string line = substring(headers, current_newline + 2, next_newline); From a69b1add025a6471ab86bfc0a78a049b93e6e44d Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 24 Nov 2024 19:06:42 -0600 Subject: [PATCH 4/6] Fix warnings on Linux - Missing imports that weren't necessary on Mac OS X. - Signed comparision issues. - Content-length was wrong type -- should really be off_t. --- src/S3File.cc | 30 +++++++++++++++----------- src/S3File.hh | 3 ++- test/s3_unit_tests.cc | 50 ++++++++++++++++++++++--------------------- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/src/S3File.cc b/src/S3File.cc index b71543e..b6ab9da 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -500,7 +500,7 @@ OverlapCopy(off_t req_off, size_t req_size, char *req_buf, off_t cache_off, } if (cache_off <= req_off) { - auto cache_end = cache_off + cache_size; + auto cache_end = cache_off + static_cast(cache_size); if (cache_end > req_off) { auto cache_buf_off = static_cast(req_off - cache_off); auto cache_copy_bytes = @@ -563,7 +563,8 @@ bool S3File::S3Cache::CouldUseAligned(off_t req, off_t cache) { if (req < 0 || cache < 0) { return false; } - return (req >= cache) && (req < cache + S3File::m_cache_entry_size); + return (req >= cache) && + (req < cache + static_cast(S3File::m_cache_entry_size)); } bool S3File::S3Cache::CouldUse(off_t req_off, size_t req_size, @@ -571,11 +572,11 @@ bool S3File::S3Cache::CouldUse(off_t req_off, size_t req_size, if (req_off < 0 || cache_off < 0) { return false; } - auto cache_end = cache_off + m_cache_entry_size; + auto cache_end = cache_off + static_cast(m_cache_entry_size); if (req_off >= cache_off) { return req_off < cache_end; } else { - return req_off + req_size > cache_off; + return req_off + static_cast(req_size) > cache_off; } } @@ -602,7 +603,7 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, if (offset >= file.content_length) { return 0; } - if (offset + size > file.content_length) { + if (offset + static_cast(size) > file.content_length) { size = file.content_length - offset; } if (file.m_log.getMsgMask() & LogMask::Debug) { @@ -694,8 +695,8 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, bool download_a = false, download_b = false; { std::unique_lock lk{m_mutex}; - auto next_offset = - std::max(m_a.m_off, m_b.m_off) + m_cache_entry_size; + auto next_offset = std::max(m_a.m_off, m_b.m_off) + + static_cast(m_cache_entry_size); if (next_offset < file.content_length) { if (!m_a.m_inprogress && m_a.m_used >= m_cache_entry_size) { m_a.m_inprogress = true; @@ -914,9 +915,11 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, } if (m_a.m_off == -1 && m_b.m_off == -1) { // Prefetch both caches at once - m_a.m_off = - req1_off / m_cache_entry_size * m_cache_entry_size; - auto prefetch_offset = m_a.m_off + m_cache_entry_size; + m_a.m_off = req1_off / + static_cast(m_cache_entry_size) * + static_cast(m_cache_entry_size); + auto prefetch_offset = + m_a.m_off + static_cast(m_cache_entry_size); ; download_a = true; m_a.m_inprogress = true; @@ -927,8 +930,9 @@ ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, } } else { // Select one cache entry to fetch data. - auto needed_off = - req1_off / m_cache_entry_size * m_cache_entry_size; + auto needed_off = req1_off / + static_cast(m_cache_entry_size) * + static_cast(m_cache_entry_size); if (needed_off > m_a.m_off) { m_b.m_off = needed_off; download_b = true; @@ -981,7 +985,7 @@ void S3File::S3Cache::Entry::Download(S3File &file) { m_request.reset(new AmazonS3NonblockingDownload( file.m_ai, file.m_object, file.m_log, m_data.data(), *this)); size_t request_size = m_cache_entry_size; - if (m_off + request_size > file.content_length) { + if (m_off + static_cast(request_size) > file.content_length) { request_size = file.content_length - m_off; } if (!m_request->SendRequest(m_off, m_cache_entry_size)) { diff --git a/src/S3File.hh b/src/S3File.hh index 622cbfb..060f628 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -26,6 +26,7 @@ #include #include +#include #include #include #include @@ -145,7 +146,7 @@ class S3File : public XrdOssDF { std::string m_object; S3AccessInfo m_ai; - size_t content_length; + off_t content_length; time_t last_modified; static const size_t m_s3_part_size = diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index 92b83f2..8fcf1cb 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -183,7 +183,7 @@ s3.end while (std::chrono::steady_clock::now() - startTime < testLength) { size_t readSize = std::rand() % maxReadSize; off_t off = std::rand() % objSize; - size_t expectedReadSize = + ssize_t expectedReadSize = (static_cast(readSize) + off - objSize > 0) ? (objSize - off) : readSize; @@ -204,7 +204,7 @@ s3.end auto curChunkByte = static_cast(chunkByte + chunkNum); off_t chunkBoundary = (chunkNum + 1) * chunkSize; correctContents.resize(size); - if (chunkBoundary < off + size) { + if (chunkBoundary < off + static_cast(size)) { size_t firstLen = chunkBoundary - off; std::string firstChunk(firstLen, curChunkByte); correctContents.replace(0, firstLen, firstChunk); @@ -212,7 +212,9 @@ s3.end off_t remaining = size - firstLen; while (remaining) { curChunkByte++; - auto chunkLen = (remaining > chunkSize) ? chunkSize : remaining; + auto chunkLen = (remaining > static_cast(chunkSize)) + ? chunkSize + : remaining; std::string chunk(chunkLen, curChunkByte); std::copy(chunk.begin(), chunk.end(), iter); iter += chunkLen; @@ -478,18 +480,18 @@ TEST(OverlapCopy, Simple) { auto [req1_off, req1_size, req2_off, req2_size] = OverlapCopy(0, 4096, repeatA.data(), 4096, 4096, repeatB.data(), used); ASSERT_EQ(req1_off, 0); - ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req1_size, 4096U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 0); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(0, 4096, repeatA.data(), 2048, 4096, repeatB.data(), used); ASSERT_EQ(req1_off, 0); - ASSERT_EQ(req1_size, 2048); + ASSERT_EQ(req1_size, 2048U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 2048); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 2048U); auto correctOverlap = std::string(2048, 'a') + std::string(2048, 'b'); ASSERT_EQ(correctOverlap, repeatA); @@ -498,10 +500,10 @@ TEST(OverlapCopy, Simple) { std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(0, 4096, repeatA.data(), 1024, 1024, repeatB.data(), used); ASSERT_EQ(req1_off, 0); - ASSERT_EQ(req1_size, 1024); + ASSERT_EQ(req1_size, 1024U); ASSERT_EQ(req2_off, 2048); - ASSERT_EQ(req2_size, 2048); - ASSERT_EQ(used, 1024); + ASSERT_EQ(req2_size, 2048U); + ASSERT_EQ(used, 1024U); correctOverlap = std::string(1024, 'a') + std::string(1024, 'b') + std::string(2048, 'a'); ASSERT_EQ(correctOverlap, repeatA); @@ -511,10 +513,10 @@ TEST(OverlapCopy, Simple) { std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(1024, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); ASSERT_EQ(req1_off, 4096); - ASSERT_EQ(req1_size, 1024); + ASSERT_EQ(req1_size, 1024U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 3072); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 3072U); correctOverlap = std::string(3072, 'b') + std::string(1024, 'a'); ASSERT_EQ(correctOverlap, repeatA); @@ -523,10 +525,10 @@ TEST(OverlapCopy, Simple) { std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(4096, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); ASSERT_EQ(req1_off, 4096); - ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req1_size, 4096U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 0); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); correctOverlap = std::string(4096, 'a'); ASSERT_EQ(correctOverlap, repeatA); @@ -535,10 +537,10 @@ TEST(OverlapCopy, Simple) { std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(-1, 0, repeatA.data(), 0, 4096, repeatB.data(), used); ASSERT_EQ(req1_off, -1); - ASSERT_EQ(req1_size, 0); + ASSERT_EQ(req1_size, 0U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 0); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); correctOverlap = std::string(4096, 'a'); ASSERT_EQ(correctOverlap, repeatA); @@ -547,10 +549,10 @@ TEST(OverlapCopy, Simple) { std::tie(req1_off, req1_size, req2_off, req2_size) = OverlapCopy(0, 4096, repeatA.data(), -1, 0, repeatB.data(), used); ASSERT_EQ(req1_off, 0); - ASSERT_EQ(req1_size, 4096); + ASSERT_EQ(req1_size, 4096U); ASSERT_EQ(req2_off, -1); - ASSERT_EQ(req2_size, 0); - ASSERT_EQ(used, 0); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); correctOverlap = std::string(4096, 'a'); ASSERT_EQ(correctOverlap, repeatA); } From 96e87480face7a7b0582a8ec6c1afa4506357cb0 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 24 Nov 2024 20:23:46 -0600 Subject: [PATCH 5/6] Enable address sanitizer for debug builds --- CMakeLists.txt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b2deac..b17c8a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,9 +2,10 @@ cmake_minimum_required( VERSION 3.13 ) project( xrootd-http/s3 ) -option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF ) +option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the xrootd-s3-http unit tests" OFF ) option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF ) option( VALGRIND "Run select unit tests under valgrind" OFF ) +option( ASAN "Build the plugin with the address sanitizer" OFF ) set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) set( CMAKE_BUILD_TYPE Debug ) @@ -25,6 +26,12 @@ if(VALGRIND) find_program(VALGRIND_BIN valgrind REQUIRED) endif() +if(ASAN) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address") + set(CMAKE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -fsanitize=address") +endif() + macro(use_cxx17) if (CMAKE_VERSION VERSION_LESS "3.1") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") From 74656d0638adff1252d75ec059400bab50f768cf Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 14 Dec 2024 14:25:47 -0600 Subject: [PATCH 6/6] Fixup: remove unnecessary code --- src/S3File.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/S3File.cc b/src/S3File.cc index b6ab9da..feb7779 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -172,9 +172,6 @@ int S3File::Fstat(struct stat *buff) { buff->st_dev = 0; buff->st_ino = 0; } - if (!buff) { - return 0; - } return 0; }