diff --git a/CMakeLists.txt b/CMakeLists.txt index 1238fa4..4578acf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,8 +57,8 @@ endif() include_directories(${XROOTD_INCLUDES} ${CURL_INCLUDE_DIRS} ${LIBCRYPTO_INCLUDE_DIRS}) -add_library(XrdS3 SHARED src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) -add_library(XrdHTTPServer SHARED src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +add_library(XrdS3 SHARED src/CurlUtil.cc src/S3File.cc src/S3Directory.cc src/S3AccessInfo.cc src/S3FileSystem.cc src/AWSv4-impl.cc src/S3Commands.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) +add_library(XrdHTTPServer SHARED src/CurlUtil.cc src/HTTPFile.cc src/HTTPFileSystem.cc src/HTTPCommands.cc src/TokenFile.cc src/stl_string_utils.cc src/shortfile.cc src/logging.cc) target_link_libraries(XrdS3 -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} tinyxml2::tinyxml2 Threads::Threads) target_link_libraries(XrdHTTPServer -ldl ${XROOTD_UTILS_LIB} ${XROOTD_SERVER_LIB} ${CURL_LIBRARIES} ${LIBCRYPTO_LIBRARIES} Threads::Threads) diff --git a/src/CurlUtil.cc b/src/CurlUtil.cc new file mode 100644 index 0000000..296f997 --- /dev/null +++ b/src/CurlUtil.cc @@ -0,0 +1,325 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +#include "CurlUtil.hh" +#include "CurlWorker.hh" +#include "HTTPCommands.hh" +#include "logging.hh" + +#include +#include + +#include +#include + +#include +#include +#include +#include + +using namespace XrdHTTPServer; + +thread_local std::vector HandlerQueue::m_handles; + +HandlerQueue::HandlerQueue() { + int filedes[2]; + auto result = pipe(filedes); + if (result == -1) { + throw std::runtime_error(strerror(errno)); + } + m_read_fd = filedes[0]; + m_write_fd = filedes[1]; +}; + +namespace { + +// Simple debug function for getting information from libcurl; to enable, you +// need to recompile with GetHandle(true); +int dump_header(CURL *handle, curl_infotype type, char *data, size_t size, + void *clientp) { + (void)handle; + (void)clientp; + + switch (type) { + case CURLINFO_HEADER_OUT: + printf("Header > %s\n", std::string(data, size).c_str()); + break; + default: + printf("Info: %s", std::string(data, size).c_str()); + break; + } + return 0; +} + +} // namespace + +CURL *GetHandle(bool verbose) { + auto result = curl_easy_init(); + if (result == nullptr) { + return result; + } + + curl_easy_setopt(result, CURLOPT_USERAGENT, "xrootd-s3/devel"); + curl_easy_setopt(result, CURLOPT_DEBUGFUNCTION, dump_header); + if (verbose) + curl_easy_setopt(result, CURLOPT_VERBOSE, 1L); + + curl_easy_setopt(result, CURLOPT_BUFFERSIZE, 32 * 1024); + + return result; +} + +CURL *HandlerQueue::GetHandle() { + if (m_handles.size()) { + auto result = m_handles.back(); + m_handles.pop_back(); + return result; + } + + return ::GetHandle(false); +} + +void HandlerQueue::RecycleHandle(CURL *curl) { m_handles.push_back(curl); } + +void HandlerQueue::Produce(HTTPRequest *handler) { + std::unique_lock lk{m_mutex}; + m_cv.wait(lk, [&] { return m_ops.size() < m_max_pending_ops; }); + + m_ops.push_back(handler); + char ready[] = "1"; + while (true) { + auto result = write(m_write_fd, ready, 1); + if (result == -1) { + if (errno == EINTR) { + continue; + } + throw std::runtime_error(strerror(errno)); + } + break; + } + + lk.unlock(); + m_cv.notify_one(); +} + +HTTPRequest *HandlerQueue::Consume() { + std::unique_lock lk(m_mutex); + m_cv.wait(lk, [&] { return m_ops.size() > 0; }); + + auto result = std::move(m_ops.front()); + m_ops.pop_front(); + + char ready[1]; + while (true) { + auto result = read(m_read_fd, ready, 1); + if (result == -1) { + if (errno == EINTR) { + continue; + } + throw std::runtime_error(strerror(errno)); + } + break; + } + + lk.unlock(); + m_cv.notify_one(); + + return result; +} + +HTTPRequest *HandlerQueue::TryConsume() { + std::unique_lock lk(m_mutex); + if (m_ops.size() == 0) { + return nullptr; + } + + auto result = std::move(m_ops.front()); + m_ops.pop_front(); + + char ready[1]; + while (true) { + auto result = read(m_read_fd, ready, 1); + if (result == -1) { + if (errno == EINTR) { + continue; + } + throw std::runtime_error(strerror(errno)); + } + break; + } + + lk.unlock(); + m_cv.notify_one(); + + return result; +} + +void CurlWorker::RunStatic(CurlWorker *myself) { + try { + myself->Run(); + } catch (...) { + myself->m_logger.Log(LogMask::Debug, "CurlWorker::RunStatic", + "Curl worker got an exception"); + } +} + +void CurlWorker::Run() { + // Create a copy of the shared_ptr here. Otherwise, when the main thread's + // destructors run, there won't be any other live references to the + // shared_ptr, triggering cleanup of the condition variable. Because we + // purposely don't shutdown the worker threads, those threads may be waiting + // on the condition variable; destroying a condition variable while a thread + // is waiting on it is undefined behavior. + auto queue_ref = m_queue; + auto &queue = *queue_ref.get(); + m_logger.Log(LogMask::Debug, "CurlWorker::Run", "Started a curl worker"); + + CURLM *multi_handle = curl_multi_init(); + if (multi_handle == nullptr) { + throw std::runtime_error("Failed to create curl multi-handle"); + } + + int running_handles = 0; + time_t last_marker = time(NULL); + CURLMcode mres = CURLM_OK; + + std::vector waitfds; + waitfds.resize(1); + waitfds[0].fd = queue.PollFD(); + waitfds[0].events = CURL_WAIT_POLLIN; + waitfds[0].revents = 0; + + while (true) { + while (running_handles < static_cast(m_max_ops)) { + auto op = + running_handles == 0 ? queue.Consume() : queue.TryConsume(); + if (!op) { + break; + } + auto curl = queue.GetHandle(); + if (curl == nullptr) { + m_logger.Log(LogMask::Debug, "CurlWorker", + "Unable to allocate a curl handle"); + op->Fail("E_NOMEM", "Unable to get allocate a curl handle"); + continue; + } + try { + op->SetupHandle(curl); + } catch (...) { + m_logger.Log(LogMask::Debug, "CurlWorker", + "Unable to setup the curl handle"); + op->Fail("E_NOMEM", + "Failed to setup the curl handle for the operation"); + continue; + } + m_op_map[curl] = op; + auto mres = curl_multi_add_handle(multi_handle, curl); + if (mres != CURLM_OK) { + if (m_logger.getMsgMask() & LogMask::Debug) { + std::stringstream ss; + ss << "Unable to add operation to the curl multi-handle: " + << curl_multi_strerror(mres); + m_logger.Log(LogMask::Debug, "CurlWorker", + ss.str().c_str()); + } + op->Fail("E_CURL_LIB", + "Unable to add operation to the curl multi-handle"); + continue; + } + running_handles += 1; + } + + // Maintain the periodic reporting of thread activity + time_t now = time(NULL); + time_t next_marker = last_marker + m_marker_period; + if (now >= next_marker) { + if (m_logger.getMsgMask() & LogMask::Debug) { + std::stringstream ss; + ss << "Curl worker thread " << getpid() << " is running " + << running_handles << "operations"; + m_logger.Log(LogMask::Debug, "CurlWorker", ss.str().c_str()); + } + last_marker = now; + } + + // Wait until there is activity to perform. + int64_t max_sleep_time = next_marker - now; + if (max_sleep_time < 0) + max_sleep_time = 0; + + long timeo; + curl_multi_timeout(multi_handle, &timeo); + mres = curl_multi_wait(multi_handle, &waitfds[0], waitfds.size(), 50, + nullptr); + if (mres != CURLM_OK) { + if (m_logger.getMsgMask() & LogMask::Warning) { + std::stringstream ss; + ss << "Failed to wait on multi-handle: " << mres; + m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str()); + } + } + + // Do maintenance on the multi-handle + int still_running; + auto mres = curl_multi_perform(multi_handle, &still_running); + if (mres == CURLM_CALL_MULTI_PERFORM) { + continue; + } else if (mres != CURLM_OK) { + if (m_logger.getMsgMask() & LogMask::Warning) { + std::stringstream ss; + ss << "Failed to perform multi-handle operation: " << mres; + m_logger.Log(LogMask::Warning, "CurlWorker", ss.str().c_str()); + } + break; + } + + CURLMsg *msg; + do { + int msgq = 0; + msg = curl_multi_info_read(multi_handle, &msgq); + if (msg && (msg->msg == CURLMSG_DONE)) { + auto iter = m_op_map.find(msg->easy_handle); + if (iter == m_op_map.end()) { + m_logger.Log(LogMask::Error, "CurlWorker", + "Logic error: got a callback for an entry " + "that doesn't exist"); + mres = CURLM_BAD_EASY_HANDLE; + break; + } + auto &op = iter->second; + auto res = msg->data.result; + op->ProcessCurlResult(iter->first, res); + op->ReleaseHandle(iter->first); + running_handles -= 1; + curl_multi_remove_handle(multi_handle, iter->first); + if (res == CURLE_OK) { + // If the handle was successful, then we can recycle it. + queue.RecycleHandle(iter->first); + } else { + curl_easy_cleanup(iter->first); + m_op_map.erase(iter); + } + } + } while (msg); + } + + for (auto &map_entry : m_op_map) { + map_entry.second->Fail("E_CURL_LIB", curl_multi_strerror(mres)); + } + m_op_map.clear(); +} diff --git a/src/CurlUtil.hh b/src/CurlUtil.hh new file mode 100644 index 0000000..d97fbe9 --- /dev/null +++ b/src/CurlUtil.hh @@ -0,0 +1,67 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +#pragma once + +#include +#include +#include +#include +#include +#include + +// Forward dec'ls +typedef void CURL; +struct curl_slist; + +class HTTPRequest; + +// Returns a newly-created curl handle (no internal caching) +CURL *GetHandle(bool verbose); + +/** + * HandlerQueue is a deque of curl operations that need + * to be performed. The object is thread safe and can + * be waited on via poll(). + * + * The fact that it's poll'able is necessary because the + * multi-curl driver thread is based on polling FD's + */ +class HandlerQueue { +public: + HandlerQueue(); + + void Produce(HTTPRequest *handler); + + HTTPRequest *Consume(); + HTTPRequest *TryConsume(); + + int PollFD() const {return m_read_fd;} + + CURL *GetHandle(); + void RecycleHandle(CURL *); + +private: + std::deque m_ops; + thread_local static std::vector m_handles; + std::condition_variable m_cv; + std::mutex m_mutex; + const static unsigned m_max_pending_ops{20}; + int m_read_fd{-1}; + int m_write_fd{-1}; +}; diff --git a/src/CurlWorker.hh b/src/CurlWorker.hh new file mode 100644 index 0000000..928dd11 --- /dev/null +++ b/src/CurlWorker.hh @@ -0,0 +1,52 @@ +/*************************************************************** + * + * Copyright (C) 2024, Pelican Project, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +#pragma once + +#include +#include + +typedef void CURL; + +class XrdSysError; + +class HTTPRequest; +class HandlerQueue; + +class CurlWorker { +public: + CurlWorker(std::shared_ptr queue, XrdSysError &logger) : + m_queue(queue), + m_logger(logger) + {} + + CurlWorker(const CurlWorker &) = delete; + + void Run(); + static void RunStatic(CurlWorker *myself); + static unsigned GetPollThreads() {return m_workers;} + +private: + std::shared_ptr m_queue; + std::unordered_map m_op_map; + XrdSysError &m_logger; + + const static unsigned m_workers{5}; + const static unsigned m_max_ops{20}; + const static unsigned m_marker_period{5}; +}; diff --git a/src/HTTPCommands.cc b/src/HTTPCommands.cc index cdbe22d..ac604f4 100644 --- a/src/HTTPCommands.cc +++ b/src/HTTPCommands.cc @@ -24,11 +24,14 @@ #include #include #include +#include #include #include #include +#include "CurlUtil.hh" +#include "CurlWorker.hh" #include "HTTPCommands.hh" #include "logging.hh" #include "shortfile.hh" @@ -36,6 +39,13 @@ using namespace XrdHTTPServer; +std::shared_ptr HTTPRequest::m_queue = + std::make_unique(); +bool HTTPRequest::m_workers_initialized = false; +std::vector HTTPRequest::m_workers; + +namespace { + // // "This function gets called by libcurl as soon as there is data received // that needs to be saved. The size of the data pointed to by ptr is size @@ -59,14 +69,16 @@ size_t appendToString(const void *ptr, size_t size, size_t nmemb, void *str) { return (size * nmemb); } +} // namespace + HTTPRequest::~HTTPRequest() {} #define SET_CURL_SECURITY_OPTION(A, B, C) \ { \ CURLcode rv##B = curl_easy_setopt(A, B, C); \ if (rv##B != CURLE_OK) { \ - this->errorCode = "E_CURL_LIB"; \ - this->errorMessage = "curl_easy_setopt( " #B " ) failed."; \ + errorCode = "E_CURL_LIB"; \ + errorMessage = "curl_easy_setopt( " #B " ) failed."; \ return false; \ } \ } @@ -83,9 +95,9 @@ bool HTTPRequest::parseProtocol(const std::string &url, std::string &protocol) { } bool HTTPRequest::SendHTTPRequest(const std::string &payload) { - if ((protocol != "http") && (protocol != "https")) { - this->errorCode = "E_INVALID_SERVICE_URL"; - this->errorMessage = "Service URL not of a known protocol (http[s])."; + if ((m_protocol != "http") && (m_protocol != "https")) { + errorCode = "E_INVALID_SERVICE_URL"; + errorMessage = "Service URL not of a known protocol (http[s])."; m_log.Log(LogMask::Warning, "HTTPRequest::SendHTTPRequest", "Host URL '", hostUrl.c_str(), "' not of a known protocol (http[s])."); @@ -100,7 +112,7 @@ bool HTTPRequest::SendHTTPRequest(const std::string &payload) { // by default for "PUT", which we really don't want. headers["Transfer-Encoding"] = ""; - return sendPreparedRequest(protocol, hostUrl, payload); + return sendPreparedRequest(hostUrl, payload); } static void dump(const char *text, FILE *stream, unsigned char *ptr, @@ -208,31 +220,53 @@ size_t read_callback(char *buffer, size_t size, size_t n, void *v) { return request; } -bool HTTPRequest::sendPreparedRequest(const std::string &protocol, - const std::string &uri, +bool HTTPRequest::sendPreparedRequest(const std::string &uri, const std::string &payload) { + m_uri = uri; + m_payload = payload; - m_log.Log(XrdHTTPServer::Debug, "SendRequest", "Sending HTTP request", - uri.c_str()); + m_queue->Produce(this); + std::unique_lock lk(m_mtx); + m_cv.wait(lk, [&] { return m_result_ready; }); - std::unique_ptr curl( - curl_easy_init(), &curl_easy_cleanup); + return errorCode.empty(); +} - if (curl.get() == NULL) { - this->errorCode = "E_CURL_LIB"; - this->errorMessage = "curl_easy_init() failed."; +bool HTTPRequest::ReleaseHandle(CURL *curl) { + if (curl == nullptr) + return false; + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, nullptr); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, nullptr); + curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, nullptr); + curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA, nullptr); + curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, nullptr); + curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L); + m_header_list.reset(); + + return true; +} + +bool HTTPRequest::SetupHandle(CURL *curl) { + m_log.Log(XrdHTTPServer::Debug, "SetupHandle", "Sending HTTP request", + m_uri.c_str()); + + if (curl == nullptr) { + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_init() failed."; return false; } - char errorBuffer[CURL_ERROR_SIZE]; - auto rv = curl_easy_setopt(curl.get(), CURLOPT_ERRORBUFFER, errorBuffer); + auto rv = curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, m_errorBuffer); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_ERRORBUFFER ) failed."; return false; } - rv = curl_easy_setopt(curl.get(), CURLOPT_URL, uri.c_str()); + rv = curl_easy_setopt(curl, CURLOPT_URL, m_uri.c_str()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_URL ) failed."; @@ -240,7 +274,7 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } if (httpVerb == "HEAD") { - rv = curl_easy_setopt(curl.get(), CURLOPT_NOBODY, 1); + rv = curl_easy_setopt(curl, CURLOPT_NOBODY, 1); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_HEAD ) failed."; @@ -249,14 +283,14 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } if (httpVerb == "POST") { - rv = curl_easy_setopt(curl.get(), CURLOPT_POST, 1); + rv = curl_easy_setopt(curl, CURLOPT_POST, 1); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_POST ) failed."; return false; } - rv = curl_easy_setopt(curl.get(), CURLOPT_POSTFIELDS, payload.c_str()); + rv = curl_easy_setopt(curl, CURLOPT_POSTFIELDS, m_payload.c_str()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = @@ -266,7 +300,7 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } if (httpVerb == "PUT") { - rv = curl_easy_setopt(curl.get(), CURLOPT_UPLOAD, 1); + rv = curl_easy_setopt(curl, CURLOPT_UPLOAD, 1); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_UPLOAD ) failed."; @@ -276,17 +310,16 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, // Our HTTPRequest instance should have a pointer to the payload data // and the offset of the data Here, we tell curl_easy_setopt to use the // read_callback function to read the data from the payload - this->callback_payload = std::unique_ptr( - new HTTPRequest::Payload{&payload, 0}); - rv = curl_easy_setopt(curl.get(), CURLOPT_READDATA, - callback_payload.get()); + m_callback_payload = std::unique_ptr( + new HTTPRequest::Payload{&m_payload, 0}); + rv = curl_easy_setopt(curl, CURLOPT_READDATA, m_callback_payload.get()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_READDATA ) failed."; return false; } - rv = curl_easy_setopt(curl.get(), CURLOPT_READFUNCTION, read_callback); + rv = curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = @@ -295,7 +328,7 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } } - rv = curl_easy_setopt(curl.get(), CURLOPT_NOPROGRESS, 1); + rv = curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_NOPROGRESS ) failed."; @@ -303,7 +336,7 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } if (includeResponseHeader) { - rv = curl_easy_setopt(curl.get(), CURLOPT_HEADER, 1); + rv = curl_easy_setopt(curl, CURLOPT_HEADER, 1); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_HEADER ) failed."; @@ -311,7 +344,7 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } } - rv = curl_easy_setopt(curl.get(), CURLOPT_WRITEFUNCTION, &appendToString); + rv = curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &appendToString); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = @@ -319,14 +352,14 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, return false; } - rv = curl_easy_setopt(curl.get(), CURLOPT_WRITEDATA, &this->resultString); + rv = curl_easy_setopt(curl, CURLOPT_WRITEDATA, &m_result); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_WRITEDATA ) failed."; return false; } - if (curl_easy_setopt(curl.get(), CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) { + if (curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1) != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_FOLLOWLOCATION ) failed."; @@ -336,53 +369,36 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, // // Set security options. // - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSL_VERIFYPEER, 1); - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSL_VERIFYHOST, 2); - - // NB: Contrary to libcurl's manual, it doesn't strdup() strings passed - // to it, so they MUST remain in scope until after we call - // curl_easy_cleanup(). Otherwise, curl_perform() will fail with - // a completely bogus error, number 60, claiming that there's a - // 'problem with the SSL CA cert'. + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSL_VERIFYPEER, 1); + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSL_VERIFYHOST, 2); + std::string CAFile = ""; std::string CAPath = ""; - - char *x509_ca_dir = getenv("X509_CERT_DIR"); - if (x509_ca_dir != NULL) { - CAPath = x509_ca_dir; - } - - char *x509_ca_file = getenv("X509_CERT_FILE"); - if (x509_ca_file != NULL) { - CAFile = x509_ca_file; + auto x509_ca_dir = getenv("X509_CERT_DIR"); + if (x509_ca_dir != nullptr && x509_ca_dir[0] != '\0') { + SET_CURL_SECURITY_OPTION(curl, CURLOPT_CAPATH, x509_ca_dir); } - if (!CAPath.empty()) { - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_CAPATH, CAPath.c_str()); - } - - if (!CAFile.empty()) { - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_CAINFO, CAFile.c_str()); - } - - if (setenv("OPENSSL_ALLOW_PROXY", "1", 0) != 0) { + auto x509_ca_file = getenv("X509_CERT_FILE"); + if (x509_ca_file != nullptr) { + SET_CURL_SECURITY_OPTION(curl, CURLOPT_CAINFO, x509_ca_file); } // // Configure for x.509 operation. // - if (protocol == "x509" && requiresSignature) { - const std::string *accessKeyFilePtr = this->getAccessKey(); - const std::string *secretKeyFilePtr = this->getSecretKey(); + if (m_protocol == "x509" && requiresSignature) { + auto accessKeyFilePtr = getAccessKey(); + auto secretKeyFilePtr = getSecretKey(); if (accessKeyFilePtr && secretKeyFilePtr) { - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSLKEYTYPE, "PEM"); - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSLKEY, + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSLKEYTYPE, "PEM"); + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSLKEY, *secretKeyFilePtr->c_str()); - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSLCERTTYPE, "PEM"); - SET_CURL_SECURITY_OPTION(curl.get(), CURLOPT_SSLCERT, + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSLCERTTYPE, "PEM"); + SET_CURL_SECURITY_OPTION(curl, CURLOPT_SSLCERT, *accessKeyFilePtr->c_str()); } } @@ -399,98 +415,103 @@ bool HTTPRequest::sendPreparedRequest(const std::string &protocol, } } } - { - const auto iter = headers.find("User-Agent"); - if (iter == headers.end()) { - headers["User-Agent"] = "xrootd-http/devel"; - } - } std::string headerPair; - struct curl_slist *header_slist = NULL; + m_header_list.reset(); for (auto i = headers.begin(); i != headers.end(); ++i) { formatstr(headerPair, "%s: %s", i->first.c_str(), i->second.c_str()); - header_slist = curl_slist_append(header_slist, headerPair.c_str()); - if (header_slist == NULL) { + auto tmp_headers = + curl_slist_append(m_header_list.get(), headerPair.c_str()); + if (tmp_headers == nullptr) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_slist_append() failed."; return false; } + m_header_list.release(); + m_header_list.reset(tmp_headers); } - rv = curl_easy_setopt(curl.get(), CURLOPT_HTTPHEADER, header_slist); + rv = curl_easy_setopt(curl, CURLOPT_HTTPHEADER, m_header_list.get()); if (rv != CURLE_OK) { this->errorCode = "E_CURL_LIB"; this->errorMessage = "curl_easy_setopt( CURLOPT_HTTPHEADER ) failed."; - if (header_slist) { - curl_slist_free_all(header_slist); - } return false; } if (m_log.getMsgMask() & LogMask::Dump) { - rv = curl_easy_setopt(curl.get(), CURLOPT_DEBUGFUNCTION, debugCallback); - rv = curl_easy_setopt(curl.get(), CURLOPT_VERBOSE, 1L); + rv = curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugCallback); + rv = curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); } -retry: - rv = curl_easy_perform(curl.get()); + return true; +} - if (rv != 0) { +bool HTTPRequest::Fail(const std::string &ecode, const std::string &emsg) { + errorCode = ecode; + errorMessage = emsg; + + Notify(); + return true; +} - this->errorCode = "E_CURL_IO"; +void HTTPRequest::Notify() { + std::lock_guard lk(m_mtx); + m_result_ready = true; + m_cv.notify_one(); +} + +HTTPRequest::CurlResult HTTPRequest::ProcessCurlResult(CURL *curl, + CURLcode rv) { + + auto cleaner = [&](void *) { Notify(); }; + auto unique = std::unique_ptr((void *)1, cleaner); + + if (rv != 0) { + errorCode = "E_CURL_IO"; std::ostringstream error; - error << "curl_easy_perform() failed (" << rv << "): '" - << curl_easy_strerror(rv) << "'."; - this->errorMessage = error.str(); - if (header_slist) { - curl_slist_free_all(header_slist); - } + error << "curl failed (" << rv << "): '" << curl_easy_strerror(rv) + << "'."; + errorMessage = error.str(); - return false; + return CurlResult::Fail; } responseCode = 0; - rv = curl_easy_getinfo(curl.get(), CURLINFO_RESPONSE_CODE, &responseCode); + rv = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode); if (rv != CURLE_OK) { // So we contacted the server but it returned such gibberish that // CURL couldn't identify the response code. Let's assume that's // bad news. Since we're already terminally failing the request, // don't bother to check if this was our last chance at retrying. - this->errorCode = "E_CURL_LIB"; - this->errorMessage = "curl_easy_getinfo() failed."; - if (header_slist) { - curl_slist_free_all(header_slist); - } + errorCode = "E_CURL_LIB"; + errorMessage = "curl_easy_getinfo() failed."; - return false; + return CurlResult::Fail; } if (responseCode == 503 && - (resultString.find("RequestLimitExceeded") != - std::string::npos)) { - resultString.clear(); - goto retry; - } - - if (header_slist) { - curl_slist_free_all(header_slist); + (m_result.find("RequestLimitExceeded") != + std::string::npos) && + m_retry_count == 0) { + m_result.clear(); + m_retry_count++; + return CurlResult::Retry; } - if (responseCode != this->expectedResponseCode) { - formatstr(this->errorCode, + if (responseCode != expectedResponseCode) { + formatstr(errorCode, "E_HTTP_RESPONSE_NOT_EXPECTED (response %lu != expected %lu)", - responseCode, this->expectedResponseCode); - this->errorMessage = resultString; - if (this->errorMessage.empty()) { + responseCode, expectedResponseCode); + errorMessage = m_result; + if (errorMessage.empty()) { formatstr( - this->errorMessage, + errorMessage, "HTTP response was %lu, not %lu, and no body was returned.", - responseCode, this->expectedResponseCode); + responseCode, expectedResponseCode); } - return false; + return CurlResult::Fail; } - return true; + return CurlResult::Ok; } // --------------------------------------------------------------------------- @@ -510,7 +531,16 @@ bool HTTPUpload::SendRequest(const std::string &payload, off_t offset, return SendHTTPRequest(payload); } -void HTTPRequest::init() { +void HTTPRequest::Init(XrdSysError &log) { + if (!m_workers_initialized) { + for (unsigned idx = 0; idx < CurlWorker::GetPollThreads(); idx++) { + m_workers.push_back(new CurlWorker(m_queue, log)); + std::thread t(CurlWorker::RunStatic, m_workers.back()); + t.detach(); + } + m_workers_initialized = true; + } + CURLcode rv = curl_global_init(CURL_GLOBAL_ALL); if (rv != 0) { throw std::runtime_error("libcurl failed to initialize"); diff --git a/src/HTTPCommands.hh b/src/HTTPCommands.hh index cd8bb42..26ffb84 100644 --- a/src/HTTPCommands.hh +++ b/src/HTTPCommands.hh @@ -20,21 +20,30 @@ #include "TokenFile.hh" +#include #include #include +#include #include +#include + class XrdSysError; +class HandlerQueue; +class CurlWorker; class HTTPRequest { + friend class CurlWorker; + public: HTTPRequest(const std::string &hostUrl, XrdSysError &log, const TokenFile *token) - : hostUrl(hostUrl), m_log(log), m_token(token) { + : hostUrl(hostUrl), m_header_list(nullptr, &curl_slist_free_all), + m_log(log), m_token(token) { // Parse the URL and populate // What to do if the function returns false? // TODO: Figure out best way to deal with this - if (!parseProtocol(hostUrl, protocol)) { + if (!parseProtocol(hostUrl, m_protocol)) { errorCode = "E_INVALID_HOST_URL"; errorMessage = "Failed to parse protocol from host/service URL."; } @@ -51,7 +60,7 @@ class HTTPRequest { unsigned long getResponseCode() const { return responseCode; } const std::string &getErrorCode() const { return errorCode; } const std::string &getErrorMessage() const { return errorMessage; } - const std::string &getResultString() const { return resultString; } + const std::string &getResultString() const { return m_result; } // Currently only used in PUTS, but potentially useful elsewhere struct Payload { @@ -63,19 +72,19 @@ class HTTPRequest { // // Should be called at least once per application from a non-threaded // context. - static void init(); + static void Init(XrdSysError &); protected: - bool sendPreparedRequest(const std::string &protocol, - const std::string &uri, + bool sendPreparedRequest(const std::string &uri, const std::string &payload); + const std::string &getProtocol() { return m_protocol; } + typedef std::map AttributeValueMap; AttributeValueMap query_parameters; AttributeValueMap headers; std::string hostUrl; - std::string protocol; bool requiresSignature{false}; struct timespec signatureTime; @@ -83,18 +92,57 @@ class HTTPRequest { std::string errorMessage; std::string errorCode; - std::string resultString; + std::string m_result; unsigned long responseCode{0}; unsigned long expectedResponseCode = 200; bool includeResponseHeader{false}; std::string httpVerb{"POST"}; - std::unique_ptr callback_payload; + std::unique_ptr m_callback_payload; + + std::unique_ptr + m_header_list; // Headers associated with the request XrdSysError &m_log; private: - const TokenFile *m_token; + enum class CurlResult { Ok, Fail, Retry }; + + void Notify(); // Notify the main request thread the request has been + // processed by a worker + virtual bool SetupHandle( + CURL *curl); // Configure the curl handle to be used by a given request. + CurlResult ProcessCurlResult( + CURL *curl, + CURLcode rv); // Process a curl command that ran to completion. + bool + Fail(const std::string &ecode, + const std::string &emsg); // Record a failure occurring for the request + // (curl request did not complete) + bool ReleaseHandle( + CURL *curl); // Cleanup any resources associated with the curl handle + + const TokenFile *m_token{nullptr}; + + // The following members manage the work queue and workers. + static bool + m_workers_initialized; // The global state of the worker initialization. + static std::shared_ptr + m_queue; // Global queue for all HTTP requests to be processed. + static std::vector + m_workers; // Set of all the curl worker threads. + + // The following variables manage the state of the request. + std::mutex + m_mtx; // Mutex guarding the results from the curl worker's callback + std::condition_variable m_cv; // Condition variable to notify the curl + // worker completed the callback + bool m_result_ready{false}; // Flag indicating the results data is ready. + std::string m_protocol; + std::string m_uri; // URL to request from libcurl + std::string m_payload; + char m_errorBuffer[CURL_ERROR_SIZE]; // Static error buffer for libcurl + unsigned m_retry_count{0}; }; class HTTPUpload : public HTTPRequest { diff --git a/src/HTTPFile.cc b/src/HTTPFile.cc index e15e9c7..1a0dd58 100644 --- a/src/HTTPFile.cc +++ b/src/HTTPFile.cc @@ -293,7 +293,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger, envP->Export("XRDXROOTD_NOPOSC", "1"); try { - HTTPRequest::init(); + HTTPRequest::Init(log); g_http_oss = new HTTPFileSystem(Logger, config_fn, envP); return g_http_oss; } catch (std::runtime_error &re) { diff --git a/src/S3Commands.cc b/src/S3Commands.cc index 7cdcb24..587994b 100644 --- a/src/S3Commands.cc +++ b/src/S3Commands.cc @@ -410,7 +410,7 @@ bool AmazonRequest::createV4Signature(const std::string &payload, bool AmazonRequest::sendV4Request(const std::string &payload, bool sendContentSHA) { - if ((protocol != "http") && (protocol != "https")) { + if ((getProtocol() != "http") && (getProtocol() != "https")) { this->errorCode = "E_INVALID_SERVICE_URL"; this->errorMessage = "Service URL not of a known protocol (http[s])."; return false; @@ -438,7 +438,7 @@ bool AmazonRequest::sendV4Request(const std::string &payload, if (!canonicalQueryString.empty()) { url += "?" + canonicalQueryString; } - return sendPreparedRequest(protocol, url, payload); + return sendPreparedRequest(url, payload); } // It's stated in the API documentation that you can upload to any region @@ -563,7 +563,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { httpVerb = "GET"; // Operation is on the bucket itself; alter the URL to remove the object - hostUrl = protocol + "://" + host + bucketPath; + hostUrl = getProtocol() + "://" + host + bucketPath; return SendS3Request(""); } @@ -571,7 +571,7 @@ bool AmazonS3List::SendRequest(const std::string &continuationToken) { bool AmazonS3CreateMultipartUpload::Results(std::string &uploadId, std::string &errMsg) { tinyxml2::XMLDocument doc; - auto err = doc.Parse(resultString.c_str()); + auto err = doc.Parse(getResultString().c_str()); if (err != tinyxml2::XML_SUCCESS) { errMsg = doc.ErrorStr(); return false; @@ -614,7 +614,7 @@ bool AmazonS3List::Results(std::vector &objInfo, std::vector &commonPrefixes, std::string &ct, std::string &errMsg) { tinyxml2::XMLDocument doc; - auto err = doc.Parse(resultString.c_str()); + auto err = doc.Parse(m_result.c_str()); if (err != tinyxml2::XML_SUCCESS) { errMsg = doc.ErrorStr(); return false; diff --git a/src/S3Commands.hh b/src/S3Commands.hh index 71e1a5c..56a5a29 100644 --- a/src/S3Commands.hh +++ b/src/S3Commands.hh @@ -64,7 +64,7 @@ class AmazonRequest : public HTTPRequest { // requests, and // --> "https://my-url.com:443/my-bucket/my-object" for path style // requests. - hostUrl = protocol + "://" + host + canonicalURI; + hostUrl = getProtocol() + "://" + host + canonicalURI; // If we can, set the region based on the host. size_t secondDot = host.find(".", 2 + 1); @@ -83,7 +83,7 @@ class AmazonRequest : public HTTPRequest { virtual bool SendRequest(); virtual bool SendS3Request(const std::string &payload); - static void init() { HTTPRequest::init(); } + static void Init(XrdSysError &log) { HTTPRequest::Init(log); } protected: bool sendV4Request(const std::string &payload, bool sendContentSHA = false); diff --git a/src/S3File.cc b/src/S3File.cc index b92318c..56e8f98 100644 --- a/src/S3File.cc +++ b/src/S3File.cc @@ -17,6 +17,7 @@ ***************************************************************/ #include "S3File.hh" +#include "CurlWorker.hh" #include "S3Commands.hh" #include "S3FileSystem.hh" #include "logging.hh" @@ -51,10 +52,10 @@ S3File::S3File(XrdSysError &log, S3FileSystem *oss) write_buffer(""), partNumber(1) {} int S3File::Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env) { - if (Oflag && O_CREAT) { + if (Oflag & O_CREAT) { m_log.Log(LogMask::Info, "File opened for creation: ", path); } - if (Oflag && O_APPEND) { + if (Oflag & O_APPEND) { m_log.Log(LogMask::Info, "File opened for append: ", path); } @@ -299,7 +300,7 @@ XrdOss *XrdOssGetStorageSystem2(XrdOss *native_oss, XrdSysLogger *Logger, envP->Export("XRDXROOTD_NOPOSC", "1"); try { - AmazonRequest::init(); + AmazonRequest::Init(log); g_s3_oss = new S3FileSystem(Logger, config_fn, envP); return g_s3_oss; } catch (std::runtime_error &re) { diff --git a/test/s3_tests.cc b/test/s3_tests.cc index 4c346fc..099b334 100644 --- a/test/s3_tests.cc +++ b/test/s3_tests.cc @@ -309,6 +309,9 @@ TEST_F(FileSystemS3PathBucketSlash, List) { } int main(int argc, char **argv) { + auto logger = new XrdSysLogger(2, 0); + auto log = new XrdSysError(logger, "curl_"); + AmazonRequest::Init(*log); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }