diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b2deac..b17c8a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,9 +2,10 @@ cmake_minimum_required( VERSION 3.13 ) project( xrootd-http/s3 ) -option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the scitokens-cpp unit tests" OFF ) +option( XROOTD_PLUGINS_BUILD_UNITTESTS "Build the xrootd-s3-http unit tests" OFF ) option( XROOTD_PLUGINS_EXTERNAL_GTEST "Use an external/pre-installed copy of GTest" OFF ) option( VALGRIND "Run select unit tests under valgrind" OFF ) +option( ASAN "Build the plugin with the address sanitizer" OFF ) set( CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake ) set( CMAKE_BUILD_TYPE Debug ) @@ -25,6 +26,12 @@ if(VALGRIND) find_program(VALGRIND_BIN valgrind REQUIRED) endif() +if(ASAN) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=address") + set(CMAKE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -fsanitize=address") +endif() + macro(use_cxx17) if (CMAKE_VERSION VERSION_LESS "3.1") if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc index 66b7ee8..d1a28f7 100644 --- a/src/CurlUtil.cc +++ b/src/CurlUtil.cc @@ -172,9 +172,9 @@ HTTPRequest *HandlerQueue::TryConsume() { void CurlWorker::RunStatic(CurlWorker *myself) { try { myself->Run(); - } catch (...) { - myself->m_logger.Log(LogMask::Debug, "CurlWorker::RunStatic", - "Curl worker got an exception"); + } catch (std::exception &exc) { + myself->m_logger.Log(LogMask::Error, "CurlWorker::RunStatic", + "Curl worker got an exception:", exc.what()); } } diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index 9cdf2b5..7e24305 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -46,8 +46,6 @@ std::vector HTTPRequest::m_workers; std::chrono::steady_clock::duration HTTPRequest::m_timeout_duration = std::chrono::seconds(10); -namespace { - // // "This function gets called by libcurl as soon as there is data received // that needs to be saved. The size of the data pointed to by ptr is size @@ -59,20 +57,56 @@ namespace { // We also make extensive use of this function in the XML parsing code, // for pretty much exactly the same reason. // -size_t appendToString(const void *ptr, size_t size, size_t nmemb, void *str) { +size_t HTTPRequest::handleResults(const void *ptr, size_t size, size_t nmemb, + void *me_ptr) { if (size == 0 || nmemb == 0) { return 0; } - std::string source((const char *)ptr, size * nmemb); - std::string *ssptr = (std::string *)str; - ssptr->append(source); - + auto me = reinterpret_cast(me_ptr); + if (!me) { + return 0; + } + std::string_view source(static_cast(ptr), size * nmemb); + + // std::cout << "Handling results with size " << (size * nmemb) << " and + // HTTP verb " << me->httpVerb << "\n"; + if (me->httpVerb == "GET") { + if (!me->responseCode) { + auto rv = curl_easy_getinfo( + me->m_curl_handle, CURLINFO_RESPONSE_CODE, &(me->responseCode)); + if (rv != CURLE_OK) { + me->errorCode = "E_CURL_LIB"; + me->errorMessage = "curl_easy_getinfo() failed."; + return 0; + } + } + if (me->getResponseCode() == me->expectedResponseCode && + me->requestResult() != nullptr) { + if (!me->m_result_buffer_initialized) { + me->m_result_buffer_initialized = true; + me->m_result_buffer = *me->requestResult(); + // std::cout << "Handling data for GET with response code " << + // me->responseCode << "and expected response size " << + // me->m_result.size() << "\n"; + } + if (me->m_result_buffer.size() < source.size()) { + me->errorCode = "E_CURL_LIB"; + me->errorMessage = "Curl had response with too-long result."; + return 0; + } + memcpy(const_cast(me->m_result_buffer.data()), + source.data(), source.size()); + me->m_result_buffer = me->m_result_buffer.substr(source.size()); + } else { + me->m_result.append(source); + } + } else { + me->m_result.append(source); + } return (size * nmemb); } -} // namespace - HTTPRequest::~HTTPRequest() {} #define SET_CURL_SECURITY_OPTION(A, B, C) \ @@ -268,9 +302,26 @@ size_t HTTPRequest::ReadCallback(char *buffer, size_t size, size_t n, void *v) { return request; } -bool HTTPRequest::sendPreparedRequest(const std::string &uri, - const std::string_view payload, - off_t payload_size, bool final) { +int HTTPRequest::XferInfoCallback(void *clientp, curl_off_t dltotal, + curl_off_t /*dlnow*/, curl_off_t ultotal, + curl_off_t /*ulnow*/) { + auto me = reinterpret_cast(clientp); + if ((me->m_bytes_recv != dltotal) || (me->m_bytes_sent != ultotal)) { + me->m_last_movement = std::chrono::steady_clock::now(); + } else if (std::chrono::steady_clock::now() - me->m_last_movement > + m_transfer_stall) { + me->errorCode = "E_TIMEOUT"; + me->errorMessage = "I/O stall during transfer"; + return 1; + } + me->m_bytes_recv = dltotal; + me->m_bytes_sent = ultotal; + return 0; +} +bool HTTPRequest::sendPreparedRequestNonblocking(const std::string &uri, + const std::string_view payload, + off_t payload_size, + bool final) { m_uri = uri; m_payload = payload; m_payload_size = payload_size; @@ -306,6 +357,15 @@ bool HTTPRequest::sendPreparedRequest(const std::string &uri, } else { m_queue->Produce(this); } + return true; +} + +bool HTTPRequest::sendPreparedRequest(const std::string &uri, + const std::string_view payload, + off_t payload_size, bool final) { + if (!sendPreparedRequestNonblocking(uri, payload, payload_size, final)) { + return false; + } std::unique_lock lk(m_mtx); m_cv.wait(lk, [&] { return m_result_ready; }); @@ -347,11 +407,11 @@ bool HTTPRequest::ReleaseHandle(CURL *curl) { curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); - curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, nullptr); - curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, nullptr); + curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, nullptr); curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, nullptr); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_DEBUGDATA, nullptr); curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L); curl_easy_setopt(curl, CURLOPT_NOBODY, 0); curl_easy_setopt(curl, CURLOPT_POST, 0); @@ -486,7 +546,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { } } - rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &appendToString); + rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &handleResults); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = @@ -494,7 +554,7 @@ bool HTTPRequest::SetupHandle(CURL *curl) { return false; } - rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, &m_result); + rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, this); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_WRITEDATA ) failed."; @@ -502,9 +562,21 @@ bool HTTPRequest::SetupHandle(CURL *curl) { } if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) { - this->errorCode = "E_CURL_LIB"; - this->errorMessage = - "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed."; + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed."; + return false; + } + + if (curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, + HTTPRequest::XferInfoCallback) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the transfer info callback function."; + return false; + } + + if (curl_easy_setopt(curl, CURLOPT_XFERINFODATA, this) != CURLE_OK) { + errorCode = "E_CURL_LIB"; + errorMessage = "Failed to set the transfer info callback data."; return false; } diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index 6f8f39d..3e33f64 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -114,9 +114,36 @@ class HTTPRequest { const std::string_view payload, off_t payload_size, bool final); + // Send the request to the HTTP server. + // Returns immediately, not waiting for the result. + // + // If `final` is set to `false`, the HTTPRequest object will start streaming + // a request and assume that `sendPreparedRequest` will be repeated until + // all data is provided (the sum total of the chunks given is the + // payload_size). If payload_size is 0 and final is false, this indicates + // the complete size of the PUT is unknown and chunked encoding will be + // used. + // + // - url: URL, including query parameters, to use. + // - payload: The payload contents when uploading. + // - payload_size: Size of the entire payload (not just the current chunk). + // - final: True if this is the last or only payload for the request. False + // otherwise. + bool sendPreparedRequestNonblocking(const std::string &uri, + const std::string_view payload, + off_t payload_size, bool final); + // Called by the curl handler thread that the request has been finished. virtual void Notify(); + // Returns the standalone buffer if a sub-classe's externally-managed one + // is supposed to be used. + // + // If the std::string_view is empty, then it's assumed the HTTPRequest + // itself owns the result buffer and should create one. Note that, + // on errors, the HTTPRequest result buffer is still used. + virtual std::string_view *requestResult() { return nullptr; } + const std::string &getProtocol() { return m_protocol; } // Returns true if the command is a streaming/partial request. @@ -147,6 +174,9 @@ class HTTPRequest { std::string errorMessage; std::string errorCode; + // The contents of the result from the HTTP server. + // If this is a GET and we got the expectedResponseCode, then + // the results are populated in the m_result_buffer instead. std::string m_result; unsigned long responseCode{0}; unsigned long expectedResponseCode = 200; @@ -184,6 +214,15 @@ class HTTPRequest { // buffer. static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v); + // Handle the callback from libcurl + static size_t handleResults(const void *ptr, size_t size, size_t nmemb, + void *me_ptr); + + // Transfer information callback from libcurl + static int XferInfoCallback(void *clientp, curl_off_t dltotal, + curl_off_t dlnow, curl_off_t ultotal, + curl_off_t ulnow); + const TokenFile *m_token{nullptr}; // The following members manage the work queue and workers. @@ -209,10 +248,27 @@ class HTTPRequest { false}; // Flag indicating this command is a streaming request. bool m_timeout{false}; // Flag indicating the request has timed out. bool m_result_ready{false}; // Flag indicating the results data is ready. + bool m_result_buffer_initialized{ + false}; // Flag indicating whether the result buffer view has been + // initialized. off_t m_payload_size{0}; // Size of the entire upload payload; 0 if unknown. std::string m_protocol; std::string m_uri; // URL to request from libcurl std::string_view m_payload; + + // Total number of bytes received from the server + off_t m_bytes_recv{0}; + // Total number of bytes sent to server + off_t m_bytes_sent{0}; + // Time of last data movement (upload or download). Used to detect transfer + // stalls + std::chrono::steady_clock::time_point m_last_movement; + // Transfer stall timeout + static constexpr std::chrono::steady_clock::duration m_transfer_stall{ + std::chrono::seconds(10)}; + + // The contents of a successful GET request. + std::string_view m_result_buffer; CURL *m_curl_handle{nullptr}; // The curl handle for the ongoing request char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl unsigned m_retry_count{0}; diff --git a/src/HTTPFile.cc b/src/HTTPFile.cc index 0506de7..dff96ee 100644 --- a/src/HTTPFile.cc +++ b/src/HTTPFile.cc @@ -202,7 +202,7 @@ int HTTPFile::Fstat(struct stat *buff) { size_t next_newline = std::string::npos; size_t last_character = headers.size(); while (current_newline != std::string::npos && - current_newline != last_character - 1) { + current_newline != last_character - 1 && last_character) { next_newline = headers.find("\r\n", current_newline + 2); std::string line = substring(headers, current_newline + 2, next_newline); diff --git a/src/S3Commands.cc b/src/S3Commands.cc index b48d1e4..4a6ab9a 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -18,6 +18,7 @@ #include "S3Commands.hh" #include "AWSv4-impl.hh" +#include "S3File.hh" #include "shortfile.hh" #include "stl_string_utils.hh" @@ -43,7 +44,7 @@ bool AmazonRequest::SendRequest() { switch (signatureVersion) { case 4: { auto qs = canonicalizeQueryString(); - return sendV4Request(qs, qs.size(), true, true); + return sendV4Request(qs, qs.size(), true, true, true); } default: this->errorCode = "E_INTERNAL"; @@ -418,7 +419,7 @@ bool AmazonRequest::createV4Signature(const std::string_view payload, bool AmazonRequest::sendV4Request(const std::string_view payload, off_t payload_size, bool sendContentSHA, - bool final) { + bool final, bool blocking) { if ((getProtocol() != "http") && (getProtocol() != "https")) { this->errorCode = "E_INVALID_SERVICE_URL"; this->errorMessage = "Service URL not of a known protocol (http[s])."; @@ -447,13 +448,18 @@ bool AmazonRequest::sendV4Request(const std::string_view payload, if (!canonicalQueryString.empty()) { url += "?" + canonicalQueryString; } - return sendPreparedRequest(url, payload, payload_size, final); + if (blocking) { + return sendPreparedRequest(url, payload, payload_size, final); + } else { + return sendPreparedRequestNonblocking(url, payload, payload_size, + final); + } } -// It's stated in the API documentation that you can upload to any region -// via us-east-1, which is moderately crazy. +// Send a request to a S3 backend bool AmazonRequest::SendS3Request(const std::string_view payload, - off_t payload_size, bool final) { + off_t payload_size, bool final, + bool blocking) { if (!m_streamingRequest && !final) { if (payload_size == 0) { errorCode = "E_INTERNAL"; @@ -469,7 +475,8 @@ bool AmazonRequest::SendS3Request(const std::string_view payload, if (region.empty()) { region = "us-east-1"; } - return sendV4Request(payload, payload_size, !m_streamingRequest, final); + return sendV4Request(payload, payload_size, !m_streamingRequest, final, + blocking); } // --------------------------------------------------------------------------- @@ -478,7 +485,7 @@ AmazonS3Upload::~AmazonS3Upload() {} bool AmazonS3Upload::SendRequest(const std::string_view &payload) { httpVerb = "PUT"; - return SendS3Request(payload, payload.size(), true); + return SendS3Request(payload, payload.size(), true, true); } // --------------------------------------------------------------------------- @@ -502,7 +509,7 @@ bool AmazonS3CompleteMultipartUpload::SendRequest( } payload += ""; - return SendS3Request(payload, payload.size(), true); + return SendS3Request(payload, payload.size(), true, true); } // --------------------------------------------------------------------------- @@ -514,7 +521,7 @@ bool AmazonS3CreateMultipartUpload::SendRequest() { query_parameters["x-id"] = "CreateMultipartUpload"; httpVerb = "POST"; - return SendS3Request("", 0, true); + return SendS3Request("", 0, true, true); } bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, @@ -525,7 +532,7 @@ bool AmazonS3SendMultipartPart::SendRequest(const std::string_view payload, query_parameters["uploadId"] = uploadId; includeResponseHeader = true; httpVerb = "PUT"; - return SendS3Request(payload, payloadSize, final); + return SendS3Request(payload, payloadSize, final, true); } // --------------------------------------------------------------------------- @@ -540,21 +547,29 @@ bool AmazonS3Download::SendRequest(off_t offset, size_t size) { headers["Range"] = range.c_str(); this->expectedResponseCode = 206; } + if (size && m_buffer) { + m_buffer_view = std::string_view(m_buffer, size); + } httpVerb = "GET"; - std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed, 0, true); + return SendS3Request("", 0, true, IsBlocking()); } // --------------------------------------------------------------------------- +template +AmazonS3NonblockingDownload::~AmazonS3NonblockingDownload() {} +template class AmazonS3NonblockingDownload; + +// --------------------------------------------------------------------------- + AmazonS3Head::~AmazonS3Head() {} bool AmazonS3Head::SendRequest() { httpVerb = "HEAD"; includeResponseHeader = true; std::string noPayloadAllowed; - return SendS3Request(noPayloadAllowed, 0, true); + return SendS3Request(noPayloadAllowed, 0, true, true); } void AmazonS3Head::parseResponse() { @@ -615,7 +630,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { hostUrl = getProtocol() + "://" + host + bucketPath; canonicalURI = bucketPath; - return SendS3Request("", 0, true); + return SendS3Request("", 0, true, true); } bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 8f60133..a86dfe3 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -92,8 +92,10 @@ class AmazonRequest : public HTTPRequest { // - payload_size: final size of the payload for uploads; 0 if unknown. // - final: True if this is the last (or only) payload of the request; false // otherwise + // - blocking: True if the method should block on a response; false + // otherwise virtual bool SendS3Request(const std::string_view payload, - off_t payload_size, bool final); + off_t payload_size, bool final, bool blocking); static void Init(XrdSysError &log) { HTTPRequest::Init(log); } @@ -107,8 +109,10 @@ class AmazonRequest : public HTTPRequest { // the final payload. Servers may verify this is what they received. // - final: True if this is the last (or only) payload of the request; false // otherwise. + // - blocking: True if this method should block until a response; false + // otherwise bool sendV4Request(const std::string_view payload, off_t payload_size, - bool sendContentSHA, bool final); + bool sendContentSHA, bool final, bool blocking); bool retainObject; bool m_streamingRequest{ @@ -246,23 +250,52 @@ class AmazonS3SendMultipartPart final : public AmazonRequest { protected: }; -class AmazonS3Download final : public AmazonRequest { +class AmazonS3Download : public AmazonRequest { using AmazonRequest::SendRequest; public: AmazonS3Download(const S3AccessInfo &ai, const std::string &objectName, - XrdSysError &log) - : AmazonRequest(ai, objectName, log) {} + XrdSysError &log, char *buffer) + : AmazonRequest(ai, objectName, log), m_buffer(buffer) {} AmazonS3Download(const std::string &s, const std::string &akf, const std::string &skf, const std::string &b, const std::string &o, const std::string &style, - XrdSysError &log) - : AmazonRequest(s, akf, skf, b, o, style, 4, log) {} + XrdSysError &log, char *buffer) + : AmazonRequest(s, akf, skf, b, o, style, 4, log), m_buffer(buffer) {} virtual ~AmazonS3Download(); virtual bool SendRequest(off_t offset, size_t size); + + protected: + virtual bool IsBlocking() { return true; } + virtual std::string_view *requestResult() override { + return &m_buffer_view; + } + + private: + char *m_buffer{nullptr}; + std::string_view m_buffer_view; +}; + +template +class AmazonS3NonblockingDownload final : public AmazonS3Download { + + public: + AmazonS3NonblockingDownload(const S3AccessInfo &ai, + const std::string &objectName, XrdSysError &log, + char *buffer, T ¬ifier) + : AmazonS3Download(ai, objectName, log, buffer), m_notifier(notifier) {} + + virtual ~AmazonS3NonblockingDownload(); + + protected: + virtual bool IsBlocking() override { return false; } + virtual void Notify() override { m_notifier.Notify(); } + + private: + T &m_notifier; }; class AmazonS3Head final : public AmazonRequest { diff --git a/src/S3File.cc b/src/S3File.cc index 81871e0..feb7779 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -114,31 +114,17 @@ int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { // This flag is not set when it's going to be a read operation // so we check if the file exists in order to be able to return a 404 if (!Oflag || (Oflag & O_APPEND)) { - AmazonS3Head head(m_ai, m_object, m_log); - - if (!head.SendRequest()) { - return -ENOENT; + auto res = Fstat(nullptr); + if (res < 0) { + return res; } - head.getSize(); } return 0; } ssize_t S3File::Read(void *buffer, off_t offset, size_t size) { - AmazonS3Download download(m_ai, m_object, m_log); - - if (!download.SendRequest(offset, size)) { - std::stringstream ss; - ss << "Failed to send GetObject command: " << download.getResponseCode() - << "'" << download.getResultString() << "'"; - m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); - return 0; - } - - const std::string &bytes = download.getResultString(); - memcpy(buffer, bytes.data(), bytes.size()); - return bytes.size(); + return m_cache.Read(*this, static_cast(buffer), offset, size); } int S3File::Fstat(struct stat *buff) { @@ -324,11 +310,11 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { } auto is_final = (m_part_size > 0 && m_part_written == m_part_size) || m_part_written == m_s3_part_size; - if (m_log.getMsgMask() & LogMask::Dump) { + if (m_log.getMsgMask() & LogMask::Debug) { std::stringstream ss; ss << "Sending request with buffer of size=" << write_size - << " and is_final=" << is_final; - m_log.Log(LogMask::Dump, "ContinueSendPart", ss.str().c_str()); + << ", offset=" << m_write_offset << " and is_final=" << is_final; + m_log.Log(LogMask::Debug, "ContinueSendPart", ss.str().c_str()); } if (!m_write_op->SendRequest( std::string_view(static_cast(buffer), write_size), @@ -356,7 +342,7 @@ ssize_t S3File::ContinueSendPart(const void *buffer, size_t size) { m_write_offset = -1; return -EIO; } - std::size_t endPos = resultString.find("\"", startPos + 7); + std::size_t endPos = resultString.find('"', startPos + 7); if (startPos == std::string::npos) { m_log.Emsg("Write", "Result from S3 does not include ETag end-character:", @@ -486,6 +472,530 @@ int S3File::Close(long long *retsz) { return 0; } +// Copy any overlapping data from the cache buffer into the request buffer, +// returning the remaining data necessary to fill the request. +// +// - `req_off`: File offset of the beginning of the request buffer. +// - `req_size`: Size of the request buffer +// - `req_buf`: Request buffer to copy data into +// - `cache_off`: File offset of the beginning of the cache buffer. +// - `cache_size`: Size of the cache buffer +// - `cache_buf`: Cache buffer to copy data from. +// - `used` (output): Incremented by the number of bytes copied from the cache +// buffer +// - Returns the (offset, size) of the remaining reads needed to satisfy the +// request. If there is only one (or no!) remaining reads, then the +// corresponding tuple returned is (-1, 0). +std::tuple +OverlapCopy(off_t req_off, size_t req_size, char *req_buf, off_t cache_off, + size_t cache_size, char *cache_buf, size_t &used) { + if (req_off < 0) { + return std::make_tuple(req_off, req_size, -1, 0); + } + if (cache_off < 0) { + return std::make_tuple(req_off, req_size, -1, 0); + } + + if (cache_off <= req_off) { + auto cache_end = cache_off + static_cast(cache_size); + if (cache_end > req_off) { + auto cache_buf_off = static_cast(req_off - cache_off); + auto cache_copy_bytes = + std::min(static_cast(cache_end - req_off), req_size); + memcpy(req_buf, cache_buf + cache_buf_off, cache_copy_bytes); + used += cache_copy_bytes; + return std::make_tuple(req_off + cache_copy_bytes, + req_size - cache_copy_bytes, -1, 0); + } + } + if (req_off < cache_off) { + auto req_end = static_cast(req_off + req_size); + if (req_end > cache_off) { + auto req_buf_off = static_cast(cache_off - req_off); + auto cache_end = static_cast(cache_off + cache_size); + auto trailing_bytes = static_cast(req_end - cache_end); + if (trailing_bytes > 0) { + memcpy(req_buf + req_buf_off, cache_buf, cache_size); + used += cache_size; + return std::make_tuple(req_off, req_buf_off, cache_end, + trailing_bytes); + } + memcpy(req_buf + req_buf_off, cache_buf, req_end - cache_off); + used += req_end - cache_off; + return std::make_tuple(req_off, req_buf_off, -1, 0); + } + } + return std::make_tuple(req_off, req_size, -1, 0); +} + +std::tuple +S3File::S3Cache::Entry::OverlapCopy(off_t req_off, size_t req_size, + char *req_buf) { + size_t bytes_copied = 0; + auto results = + ::OverlapCopy(req_off, req_size, req_buf, m_off, m_cache_entry_size, + m_data.data(), bytes_copied); + m_parent.m_hit_bytes += bytes_copied; + m_used += bytes_copied; + return results; +} + +std::tuple +S3File::DownloadBypass(off_t offset, size_t size, char *buffer) { + if (size <= m_cache_entry_size) { + return std::make_tuple(offset, size, false); + } + AmazonS3Download download(m_ai, m_object, m_log, buffer); + if (!download.SendRequest(offset, size)) { + std::stringstream ss; + ss << "Failed to send GetObject command: " << download.getResponseCode() + << "'" << download.getResultString() << "'"; + m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); + return std::make_tuple(0, -1, false); + } + return std::make_tuple(-1, 0, true); +} + +bool S3File::S3Cache::CouldUseAligned(off_t req, off_t cache) { + if (req < 0 || cache < 0) { + return false; + } + return (req >= cache) && + (req < cache + static_cast(S3File::m_cache_entry_size)); +} + +bool S3File::S3Cache::CouldUse(off_t req_off, size_t req_size, + off_t cache_off) { + if (req_off < 0 || cache_off < 0) { + return false; + } + auto cache_end = cache_off + static_cast(m_cache_entry_size); + if (req_off >= cache_off) { + return req_off < cache_end; + } else { + return req_off + static_cast(req_size) > cache_off; + } +} + +void S3File::S3Cache::DownloadCaches(S3File &file, bool download_a, + bool download_b, bool locked) { + if (!download_a && !download_b) { + return; + } + + std::unique_lock lk(m_mutex, std::defer_lock); + if (!locked) { + lk.lock(); + } + if (download_a) { + m_a.Download(file); + } + if (download_b) { + m_b.Download(file); + } +} + +ssize_t S3File::S3Cache::Read(S3File &file, char *buffer, off_t offset, + size_t size) { + if (offset >= file.content_length) { + return 0; + } + if (offset + static_cast(size) > file.content_length) { + size = file.content_length - offset; + } + if (file.m_log.getMsgMask() & LogMask::Debug) { + std::stringstream ss; + ss << "Read request for offset=" << offset << ", size=" << size; + file.m_log.Log(LogMask::Debug, "cache", ss.str().c_str()); + } + + off_t req3_off, req4_off, req5_off, req6_off; + size_t req3_size, req4_size, req5_size, req6_size; + // Copy as much data out of the cache as possible; wait for the caches to + // finish their downloads if a cache fill is in progress and we could + // utilize the cache fill. + { + std::unique_lock lk{m_mutex}; + if (m_a.m_inprogress) { + m_cv.wait(lk, [&] { + return !m_a.m_inprogress || !CouldUse(offset, size, m_a.m_off); + }); + } + off_t req1_off, req2_off; + size_t req1_size, req2_size; + std::tie(req1_off, req1_size, req2_off, req2_size) = + m_a.OverlapCopy(offset, size, buffer); + if (m_b.m_inprogress) { + m_cv.wait(lk, [&] { + return !m_b.m_inprogress || + !(CouldUse(req1_off, req1_size, m_b.m_off) || + CouldUse(req2_off, req2_size, m_b.m_off)); + }); + } + std::tie(req3_off, req3_size, req4_off, req4_size) = + m_b.OverlapCopy(req1_off, req1_size, buffer + req1_off - offset); + std::tie(req5_off, req5_size, req6_off, req6_size) = + m_b.OverlapCopy(req2_off, req2_size, buffer + req2_off - offset); + } + // If any of the remaining missing bytes are bigger than a single chunk, + // download those bypassing the cache. + bool downloaded; + size_t bypass_size = req3_size; + std::tie(req3_off, req3_size, downloaded) = + file.DownloadBypass(req3_off, req3_size, buffer + req3_off - offset); + if (req3_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req4_size; + std::tie(req4_off, req4_size, downloaded) = + file.DownloadBypass(req4_off, req4_size, buffer + req4_off - offset); + if (req4_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req5_size; + std::tie(req5_off, req5_size, downloaded) = + file.DownloadBypass(req5_off, req5_size, buffer + req5_off - offset); + if (req5_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + bypass_size = req6_size; + std::tie(req6_off, req6_size, downloaded) = + file.DownloadBypass(req6_off, req6_size, buffer + req6_off - offset); + if (req6_size < 0) { + m_errors += 1; + return -1; + } + if (downloaded) { + m_bypass_bytes += bypass_size; + m_bypass_count += 1; + } + if (req3_size == 0 && req4_size == 0 && req5_size == 0 && req6_size == 0) { + m_full_hit_count += 1; + // We've used more bytes in the cache, potentially all of the bytes. + // In that case, we could drop one of the cache entries and prefetch + // more of the object. + bool download_a = false, download_b = false; + { + std::unique_lock lk{m_mutex}; + auto next_offset = std::max(m_a.m_off, m_b.m_off) + + static_cast(m_cache_entry_size); + if (next_offset < file.content_length) { + if (!m_a.m_inprogress && m_a.m_used >= m_cache_entry_size) { + m_a.m_inprogress = true; + m_a.m_off = next_offset; + download_a = true; + next_offset += m_cache_entry_size; + } + if (!m_b.m_inprogress && m_b.m_used >= m_cache_entry_size) { + m_b.m_inprogress = true; + m_b.m_off = next_offset; + download_b = true; + } + } + } + if (download_a) { + m_prefetch_count++; + m_prefetch_bytes += m_cache_entry_size; + } + if (download_b) { + m_prefetch_count++; + m_prefetch_bytes += m_cache_entry_size; + } + DownloadCaches(file, download_a, download_b, false); + return size; + } + // At this point, the only remaining data requests must be less than the + // size of the cache chunk, implying it's a partial request at the beginning + // or end of the range -- hence only two can exist. + off_t req1_off = -1, req2_off = -1; + off_t *req_off = &req1_off; + size_t req1_size = 0, req2_size = 0; + size_t *req_size = &req1_size; + if (req3_off != -1) { + *req_off = req3_off; + *req_size = req3_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req4_off != -1) { + *req_off = req4_off; + *req_size = req4_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req5_off != -1) { + *req_off = req5_off; + *req_size = req5_size; + req_off = &req2_off; + req_size = &req2_size; + } + if (req6_off != -1) { + *req_off = req6_off; + *req_size = req6_size; + } + if (req1_off != -1 && req2_off == -1) { + auto chunk_off = static_cast(req1_off / m_cache_entry_size * + m_cache_entry_size + + m_cache_entry_size); + auto req_end = static_cast(req1_off + req1_size); + + if (req_end > chunk_off) { + req2_off = chunk_off; + req2_size = req_end - chunk_off; + req1_size = chunk_off - req1_off; + } + } + size_t miss_bytes = req1_size + req2_size; + if (miss_bytes == size) { + m_miss_count += 1; + } else { + m_partial_hit_count += 1; + } + m_miss_bytes += miss_bytes; + unsigned fetch_attempts = 0; + while (req1_off != -1) { + std::unique_lock lk(m_mutex); + m_cv.wait(lk, [&] { + bool req1waitOnA = + m_a.m_inprogress && CouldUseAligned(req1_off, m_a.m_off); + bool req2waitOnA = + m_a.m_inprogress && CouldUseAligned(req2_off, m_a.m_off); + bool req1waitOnB = + m_b.m_inprogress && CouldUseAligned(req1_off, m_b.m_off); + bool req2waitOnB = + m_b.m_inprogress && CouldUseAligned(req2_off, m_b.m_off); + // If there's an idle cache entry, use it -- unless the other cache + // entry is working on this request. + if (!m_a.m_inprogress && !req1waitOnB && !req2waitOnB) { + return true; + } + if (!m_b.m_inprogress && !req1waitOnA && !req2waitOnA) { + return true; + } + // If an idle cache entry can immediately satisfy the request, we + // use it. + if (!m_a.m_inprogress && (CouldUseAligned(req1_off, m_a.m_off) || + CouldUseAligned(req2_off, m_a.m_off))) { + return true; + } + if (!m_b.m_inprogress && (CouldUseAligned(req1_off, m_b.m_off) || + CouldUseAligned(req2_off, m_b.m_off))) { + return true; + } + // If either request is in progress, we continue to wait. + if (req1waitOnA || req1waitOnB || req2waitOnA || req2waitOnB) { + return false; + } + // If either cache is idle, we will use it. + return !m_a.m_inprogress || !m_b.m_inprogress; + }); + // std::cout << "A entry in progress: " << m_a.m_inprogress + // << ", with offset " << m_a.m_off << "\n"; + // std::cout << "B entry in progress: " << m_b.m_inprogress + // << ", with offset " << m_b.m_off << "\n"; + // Test to see if any of the buffers could immediately fulfill the + // requests. + auto consumed_req = false; + if (!m_a.m_inprogress) { + if (CouldUseAligned(req2_off, m_a.m_off)) { + if (m_a.m_failed) { + m_a.m_failed = false; + m_a.m_off = -1; + m_errors += 1; + return -1; + } + m_a.OverlapCopy(req2_off, req2_size, + buffer + req2_off - offset); + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + if (CouldUseAligned(req1_off, m_a.m_off)) { + if (m_a.m_failed) { + m_a.m_failed = false; + m_a.m_off = -1; + m_errors += 1; + return -1; + } + m_a.OverlapCopy(req1_off, req1_size, + buffer + req1_off - offset); + req1_off = req2_off; + req1_size = req2_size; + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + } + if (!m_b.m_inprogress) { + if (CouldUseAligned(req2_off, m_b.m_off)) { + if (m_b.m_failed) { + m_b.m_failed = false; + m_b.m_off = -1; + m_errors += 1; + return -1; + } + m_b.OverlapCopy(req2_off, req2_size, + buffer + req2_off - offset); + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + if (CouldUseAligned(req1_off, m_b.m_off)) { + if (m_b.m_failed) { + m_b.m_failed = false; + m_b.m_off = -1; + m_errors += 1; + return -1; + } + m_b.OverlapCopy(req1_off, req1_size, + buffer + req1_off - offset); + req1_off = req2_off; + req1_size = req2_size; + req2_off = -1; + req2_size = 0; + consumed_req = true; + } + } + if (consumed_req) { + continue; + } + + // No caches serve our requests - we must kick off a new download + // std::cout << "Will download data via cache; req1 offset=" << req1_off + // << ", req2 offset=" << req2_off << "\n"; + fetch_attempts++; + bool download_a = false, download_b = false, prefetch_b = false; + if (!m_a.m_inprogress && m_b.m_inprogress) { + m_a.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_a.m_inprogress = true; + download_a = true; + } else if (m_a.m_inprogress && !m_b.m_inprogress) { + m_b.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_b.m_inprogress = true; + download_b = true; + } else if (!m_a.m_inprogress && !m_b.m_inprogress) { + if (req2_off != -1) { + m_a.m_off = req1_off / m_cache_entry_size * m_cache_entry_size; + m_a.m_inprogress = true; + download_a = true; + m_b.m_off = req2_off / m_cache_entry_size * m_cache_entry_size; + m_b.m_inprogress = true; + download_b = true; + } else { + if (m_a.m_used >= m_cache_entry_size) { + // Cache A is fully read -- let's empty it + m_a.m_off = m_b.m_off; + m_b.m_off = -1; + m_a.m_used = m_b.m_used; + m_b.m_used = 0; + std::swap(m_a.m_data, m_b.m_data); + } + if (m_a.m_used >= m_cache_entry_size) { + // Both caches were fully read -- empty the second one. + m_a.m_off = -1; + m_a.m_used = 0; + } + if (m_a.m_off == -1 && m_b.m_off == -1) { + // Prefetch both caches at once + m_a.m_off = req1_off / + static_cast(m_cache_entry_size) * + static_cast(m_cache_entry_size); + auto prefetch_offset = + m_a.m_off + static_cast(m_cache_entry_size); + ; + download_a = true; + m_a.m_inprogress = true; + if (prefetch_offset < file.content_length) { + m_b.m_off = prefetch_offset; + prefetch_b = true; + m_b.m_inprogress = true; + } + } else { + // Select one cache entry to fetch data. + auto needed_off = req1_off / + static_cast(m_cache_entry_size) * + static_cast(m_cache_entry_size); + if (needed_off > m_a.m_off) { + m_b.m_off = needed_off; + download_b = true; + m_b.m_inprogress = true; + auto bytes_unused = + static_cast(m_cache_entry_size) - + static_cast(m_b.m_used); + m_unused_bytes += bytes_unused < 0 ? 0 : bytes_unused; + } else { + m_a.m_off = needed_off; + download_a = true; + m_a.m_inprogress = true; + auto bytes_unused = + static_cast(m_cache_entry_size) - + static_cast(m_a.m_used); + m_unused_bytes += bytes_unused < 0 ? 0 : bytes_unused; + } + } + } + } // else both caches are in-progress and neither satisfied our needs + if (download_a) { + m_fetch_count += 1; + m_fetch_bytes += m_cache_entry_size; + } + if (download_b) { + m_fetch_count += 1; + m_fetch_bytes += m_cache_entry_size; + } + if (prefetch_b) { + m_prefetch_count += 1; + m_prefetch_bytes += m_cache_entry_size; + } + DownloadCaches(file, download_a, download_b || prefetch_b, true); + } + return size; +} + +void S3File::S3Cache::Entry::Notify() { + std::unique_lock lk(m_parent.m_mutex); + m_inprogress = false; + m_failed = !m_request->getErrorCode().empty(); + m_request = nullptr; + + m_parent.m_cv.notify_all(); +} + +void S3File::S3Cache::Entry::Download(S3File &file) { + m_used = false; + m_data.resize(m_cache_entry_size); + m_request.reset(new AmazonS3NonblockingDownload( + file.m_ai, file.m_object, file.m_log, m_data.data(), *this)); + size_t request_size = m_cache_entry_size; + if (m_off + static_cast(request_size) > file.content_length) { + request_size = file.content_length - m_off; + } + if (!m_request->SendRequest(m_off, m_cache_entry_size)) { + std::stringstream ss; + ss << "Failed to send GetObject command: " + << m_request->getResponseCode() << "'" + << m_request->getResultString() << "'"; + file.m_log.Log(LogMask::Warning, "S3File::Read", ss.str().c_str()); + m_failed = true; + m_request.reset(); + } +} + extern "C" { /* diff --git a/src/S3File.hh b/src/S3File.hh index abd69ee..060f628 100644 --- a/src/S3File.hh +++ b/src/S3File.hh @@ -26,6 +26,8 @@ #include #include +#include +#include #include #include #include @@ -36,6 +38,7 @@ int parse_path(const S3FileSystem &fs, const char *path, std::string &exposedPath, std::string &object); class AmazonS3SendMultipartPart; +template class AmazonS3NonblockingDownload; class S3File : public XrdOssDF { public: @@ -124,18 +127,34 @@ class S3File : public XrdOssDF { ssize_t SendPartStreaming(); ssize_t ContinueSendPart(const void *buffer, size_t size); + + // Download data synchronously, bypassing the cache. + // The download is only performed if the request size is larger than a cache + // entry. + // + // - `offset`: File offset of the request. + // - `size`: Size of the request. + // - `buffer`: Buffer to place resulting data into. + // - Returns the (offset, size) of any remaining read and `true` if a + // download occured. + std::tuple DownloadBypass(off_t offset, size_t size, + char *buffer); + XrdSysError &m_log; S3FileSystem *m_oss; std::string m_object; S3AccessInfo m_ai; - size_t content_length; + off_t content_length; time_t last_modified; static const size_t m_s3_part_size = 100'000'000; // The size of each S3 chunk. + static constexpr size_t m_cache_entry_size = + (2 * 1024 * 1024); // Size of the buffer associated with the cache + bool m_create{false}; int partNumber; size_t m_part_written{ @@ -182,4 +201,96 @@ class S3File : public XrdOssDF { // Flag determining whether the monitoring thread has been launched. static std::once_flag m_monitor_launch; + + // The double-buffering component for the file handle. Reads are rounded up + // to a particular size and kept in the file handle; before requesting new + // data, the cache is searched to see if the read can be serviced from + // memory. When possible, a forward prefetch is done + struct S3Cache { + struct Entry { + bool m_failed{false}; // Indication as to whether last download + // attempt failed for cache entry. + bool m_inprogress{ + false}; // Indication as to whether a download is in-progress. + off_t m_off{-1}; // File offset of the beginning of the cache entry. + // -1 signifies unused entry + size_t m_used{ + 0}; // The number of bytes read out of the current cache entry. + std::vector m_data; // Contents of cache entry + S3Cache &m_parent; // Reference to owning object + std::unique_ptr> + m_request; // In-progress request to fill entry. + + Entry(S3Cache &cache) : m_parent(cache) {} + void Download( + S3File &); // Trigger download request for this cache entry. + void Notify(); // Notify containing cache that the entry's + // in-progress operation has completed. + + // Copy any overlapping data from the cache buffer into the request + // buffer, returning the remaining data necessary to fill the + // request. + // + // - `req_off`: File offset of the beginning of the request buffer. + // - `req_size`: Size of the request buffer + // - `req_buf`: Request buffer to copy data into + // - Returns the (offset, size) of the remaining reads needed to + // satisfy the request. If there is only one (or no!) remaining + // reads, then the corresponding tuple returned is (-1, 0). + std::tuple + OverlapCopy(off_t req_off, size_t req_size, char *req_buf); + }; + friend class AmazonS3NonblockingDownload; + + std::atomic m_hit_bytes{0}; // Bytes served from the cache. + std::atomic m_miss_bytes{ + 0}; // Bytes that resulted in a cache miss. + std::atomic m_full_hit_count{ + 0}; // Requests completely served from the cache. + std::atomic m_partial_hit_count{ + 0}; // Requests partially served from the cache. + std::atomic m_miss_count{ + 0}; // Requests that had no data served from the cache. + std::atomic m_bypass_bytes{ + 0}; // Bytes for requests that were large enough they bypassed the + // cache and fetched directly from S3. + std::atomic m_bypass_count{ + 0}; // Requests that were large enough they (at least partially) + // bypassed the cache and fetched directly from S3. + std::atomic m_fetch_bytes{ + 0}; // Bytes that were fetched from S3 to serve a cache miss. + std::atomic m_fetch_count{ + 0}; // Requests sent to S3 to serve a cache miss. + std::atomic m_unused_bytes{ + 0}; // Bytes that were unused at cache eviction. + std::atomic m_prefetch_bytes{0}; // Bytes prefetched + std::atomic m_prefetch_count{0}; // Number of prefetch requests + std::atomic m_errors{0}; // Count of errors encountered by cache. + + Entry m_a{*this}; // Cache entry A. Protected by m_mutex. + Entry m_b{*this}; // Cache entry B. Protected by m_mutex. + std::mutex m_mutex; // Mutex protecting the data in the S3Cache object + std::condition_variable m_cv; // Condition variable for notifying that + // new downloaded data is available. + + // Returns `true` if the request offset would be inside the cache entry. + // The request offset is assumed to be aligned to be inside a single + // cache entry (that is, smaller than a cache entry and not spanning two + // entries). + bool CouldUseAligned(off_t req, off_t cache); + + // Returns true if the specified request, [req_off, req_off + req_size), + // has any bytes inside the cache entry starting at `cache_off`. + bool CouldUse(off_t req_off, size_t req_size, off_t cache_off); + + // Trigger the non-blocking download into the cache entries. + // The condition variable will be notified when one of the caches + // finishes. + void DownloadCaches(S3File &file, bool download_a, bool download_b, + bool locked); + + // Trigger a blocking read from a given file + ssize_t Read(S3File &file, char *buffer, off_t offset, size_t size); + }; + S3Cache m_cache; }; diff --git a/test/s3_unit_tests.cc b/test/s3_unit_tests.cc index a39ab89..8fcf1cb 100644 --- a/test/s3_unit_tests.cc +++ b/test/s3_unit_tests.cc @@ -33,6 +33,7 @@ #include #include #include +#include std::once_flag g_init_once; std::string g_ca_file; @@ -155,7 +156,75 @@ s3.end VerifyContents(fs, name, writeSize, chunkByte, chunkSize); } + void RandomRead(const std::string &name, unsigned char chunkByte, + size_t chunkSize, + std::chrono::steady_clock::duration testLength) { + XrdSysLogger log; + S3FileSystem fs(&log, m_configfn.c_str(), nullptr); + + std::unique_ptr fh(fs.newFile()); + ASSERT_TRUE(fh); + + XrdOucEnv env; + auto rv = fh->Open(name.c_str(), O_CREAT | O_WRONLY, 0755, env); + ASSERT_EQ(rv, 0); + + struct stat buf; + rv = fh->Fstat(&buf); + ASSERT_EQ(rv, 0); + auto objSize = buf.st_size; + + auto startTime = std::chrono::steady_clock::now(); + size_t maxReadSize = 5'000'000; + std::string readBuf; + readBuf.resize(maxReadSize); + std::string correctContents; + correctContents.resize(maxReadSize); + while (std::chrono::steady_clock::now() - startTime < testLength) { + size_t readSize = std::rand() % maxReadSize; + off_t off = std::rand() % objSize; + ssize_t expectedReadSize = + (static_cast(readSize) + off - objSize > 0) + ? (objSize - off) + : readSize; + readBuf.resize(expectedReadSize); + rv = fh->Read(readBuf.data(), off, readSize); + ASSERT_EQ(rv, expectedReadSize); + GenCorrectContents(correctContents, off, expectedReadSize, + chunkByte, chunkSize, objSize); + ASSERT_EQ(readBuf, correctContents); + } + } + private: + void GenCorrectContents(std::string &correctContents, off_t off, + size_t size, unsigned char chunkByte, + size_t chunkSize, size_t objSize) { + auto chunkNum = static_cast(off / chunkSize); + auto curChunkByte = static_cast(chunkByte + chunkNum); + off_t chunkBoundary = (chunkNum + 1) * chunkSize; + correctContents.resize(size); + if (chunkBoundary < off + static_cast(size)) { + size_t firstLen = chunkBoundary - off; + std::string firstChunk(firstLen, curChunkByte); + correctContents.replace(0, firstLen, firstChunk); + auto iter = correctContents.begin() + firstLen; + off_t remaining = size - firstLen; + while (remaining) { + curChunkByte++; + auto chunkLen = (remaining > static_cast(chunkSize)) + ? chunkSize + : remaining; + std::string chunk(chunkLen, curChunkByte); + std::copy(chunk.begin(), chunk.end(), iter); + iter += chunkLen; + remaining -= chunkLen; + } + } else { + correctContents = std::string(size, curChunkByte); + } + } + void VerifyContents(S3FileSystem &fs, const std::string &obj, off_t expectedSize, unsigned char chunkByte, size_t chunkSize) { @@ -400,6 +469,113 @@ TEST_F(FileSystemS3Fixture, InvalidObject) { ASSERT_EQ(fs.Stat("/test/trailing/", &buf, 0, nullptr), -ENOENT); } +// Check out the logic of the overlap copy routine. +std::tuple +OverlapCopy(off_t req_off, size_t req_size, char *req_buf, off_t cache_off, + size_t cache_size, char *cache_buf, size_t &used); +TEST(OverlapCopy, Simple) { + std::string repeatA(4096, 'a'); + std::string repeatB(4096, 'b'); + size_t used{0}; + auto [req1_off, req1_size, req2_off, req2_size] = + OverlapCopy(0, 4096, repeatA.data(), 4096, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 4096U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); + + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), 2048, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 2048U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 2048U); + auto correctOverlap = std::string(2048, 'a') + std::string(2048, 'b'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), 1024, 1024, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 1024U); + ASSERT_EQ(req2_off, 2048); + ASSERT_EQ(req2_size, 2048U); + ASSERT_EQ(used, 1024U); + correctOverlap = std::string(1024, 'a') + std::string(1024, 'b') + + std::string(2048, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(1024, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 4096); + ASSERT_EQ(req1_size, 1024U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 3072U); + correctOverlap = std::string(3072, 'b') + std::string(1024, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(4096, 4096, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, 4096); + ASSERT_EQ(req1_size, 4096U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(-1, 0, repeatA.data(), 0, 4096, repeatB.data(), used); + ASSERT_EQ(req1_off, -1); + ASSERT_EQ(req1_size, 0U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); + + used = 0; + repeatA = std::string(4096, 'a'); + std::tie(req1_off, req1_size, req2_off, req2_size) = + OverlapCopy(0, 4096, repeatA.data(), -1, 0, repeatB.data(), used); + ASSERT_EQ(req1_off, 0); + ASSERT_EQ(req1_size, 4096U); + ASSERT_EQ(req2_off, -1); + ASSERT_EQ(req2_size, 0U); + ASSERT_EQ(used, 0U); + correctOverlap = std::string(4096, 'a'); + ASSERT_EQ(correctOverlap, repeatA); +} + +TEST_F(FileSystemS3Fixture, StressGet) { + // Upload a file + auto name = "/test/write_stress.txt"; + WritePattern(name, 100'000'000, 'a', 1'000'000, true); + + static const int workerThreads = 10; + std::vector> threads; + threads.resize(workerThreads); + for (auto &tptr : threads) { + tptr.reset(new std::thread([&] { + RandomRead(name, 'a', 1'000'000, std::chrono::seconds(5)); + })); + } + std::cout << "Launched all " << workerThreads << " threads" << std::endl; + for (const auto &tptr : threads) { + tptr->join(); + } +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();