From a5416141723b628e70677909e4e683797fb6c077 Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Fri, 8 Nov 2024 10:03:03 +0530 Subject: [PATCH] Fix errors --- velox/functions/remote/client/Remote.cpp | 154 ++++++++++++----------- 1 file changed, 81 insertions(+), 73 deletions(-) diff --git a/velox/functions/remote/client/Remote.cpp b/velox/functions/remote/client/Remote.cpp index aad1df6a73e0a..452edaca3c5e2 100644 --- a/velox/functions/remote/client/Remote.cpp +++ b/velox/functions/remote/client/Remote.cpp @@ -29,11 +29,10 @@ #include "velox/vector/VectorStream.h" #include -#include -#include -#include -#include +#include +#include #include +#include using namespace folly; namespace facebook::velox::functions { @@ -66,38 +65,88 @@ std::string urlEncode(const std::string& value) { return escaped.str(); } -struct WriteThis { - std::unique_ptr buf; - size_t offset; -}; - static size_t read_callback(char* dest, size_t size, size_t nmemb, void* userp) { - auto* wt = static_cast(userp); + auto* inputBufQueue = static_cast(userp); size_t buffer_size = size * nmemb; - - if (wt->buf && wt->offset < wt->buf->length()) { - size_t remaining = wt->buf->length() - wt->offset; - size_t copy_this_much = remaining; - if (copy_this_much > buffer_size) - copy_this_much = buffer_size; - memcpy(dest, wt->buf->data() + wt->offset, copy_this_much); - wt->offset += copy_this_much; - - return copy_this_much; + size_t total_copied = 0; + + while (total_copied < buffer_size && !inputBufQueue->empty()) { + auto buf = inputBufQueue->front(); + size_t remaining_size = buffer_size - total_copied; + size_t copy_size = std::min(remaining_size, buf->length()); + std::memcpy(dest + total_copied, buf->data(), copy_size); + total_copied += copy_size; + inputBufQueue->pop_front(); } - return 0; -} + return total_copied; +} static size_t write_callback(char* ptr, size_t size, size_t nmemb, void* userdata) { - auto* outputBuf = static_cast(userdata); + auto* outputBuf = static_cast(userdata); size_t total_size = size * nmemb; - auto buf = IOBuf::copyBuffer(ptr, total_size); + auto buf = folly::IOBuf::copyBuffer(ptr, total_size); outputBuf->append(std::move(buf)); return total_size; } +std::unique_ptr performCurlRequest( + const std::string& fullUrl, + std::unique_ptr requestPayload) { + try { + folly::IOBufQueue inputBufQueue(folly::IOBufQueue::cacheChainLength()); + inputBufQueue.append(std::move(requestPayload)); + + CURL* curl = curl_easy_init(); + if (!curl) { + VELOX_FAIL(fmt::format( + "Error initializing CURL: {}", + curl_easy_strerror(CURLE_FAILED_INIT))); + } + + curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback); + curl_easy_setopt(curl, CURLOPT_READDATA, &inputBufQueue); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback); + + folly::IOBufQueue outputBuf(folly::IOBufQueue::cacheChainLength()); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &outputBuf); + curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); + + struct curl_slist* headers = nullptr; + headers = + curl_slist_append(headers, "Content-Type: application/X-presto-pages"); + headers = curl_slist_append(headers, "Accept: application/X-presto-pages"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + curl_easy_setopt( + curl, + CURLOPT_POSTFIELDSIZE, + static_cast(inputBufQueue.chainLength())); + + CURLcode res = curl_easy_perform(curl); + if (res != CURLE_OK) { + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + VELOX_FAIL(fmt::format( + "Error communicating with server: {}\nURL: {}\nCURL Error: {}", + curl_easy_strerror(res), + fullUrl.c_str(), + curl_easy_strerror(res))); + } + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + return outputBuf.move(); + + } catch (const std::exception& e) { + VELOX_FAIL(fmt::format("Exception during CURL request: {}", e.what())); + } +} + class RemoteFunction : public exec::VectorFunction { public: RemoteFunction( @@ -158,24 +207,9 @@ class RemoteFunction : public exec::VectorFunction { rows.end(), std::move(args)); - // Serialize the RowVector into an IOBuf - std::unique_ptr requestBody = std::make_unique(rowVectorToIOBuf( - remoteRowVector, rows.end(), *context.pool(), &serde)); - - WriteThis wt; - wt.buf = std::move(requestBody); - wt.offset = 0; - - // Initialize CURL - CURL* curl; - CURLcode res; - res = curl_global_init(CURL_GLOBAL_DEFAULT); - if (res != CURLE_OK) { - VELOX_FAIL( - "Error initializing curl with error : %s\n", - curl_easy_strerror(res)); - } - curl = curl_easy_init(); + std::unique_ptr requestBody = + std::make_unique(rowVectorToIOBuf( + remoteRowVector, rows.end(), *context.pool(), &serde)); const std::string fullUrl = fmt::format( "{}/v1/functions/{}/{}/{}/{}", @@ -185,39 +219,13 @@ class RemoteFunction : public exec::VectorFunction { urlEncode(metadata_.functionId.value_or("default_function_id")), metadata_.version.value_or("1")); - if (curl) { - curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str()); - curl_easy_setopt(curl, CURLOPT_POST, 1L); - curl_easy_setopt(curl, CURLOPT_READFUNCTION, read_callback); - curl_easy_setopt(curl, CURLOPT_READDATA, &wt); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_callback); - - IOBufQueue outputBuf(IOBufQueue::cacheChainLength()); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &outputBuf); - curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L); - - struct curl_slist* headers = nullptr; - headers = curl_slist_append( - headers, "Content-Type: application/X-presto-pages"); - headers = - curl_slist_append(headers, "Accept: application/X-presto-pages"); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)(wt.buf->length())); - - res = curl_easy_perform(curl); - if (res != CURLE_OK) { - VELOX_FAIL( - "Error performing communicating with server : %s\n", - curl_easy_strerror(res)); - } - std::unique_ptr responseBody = outputBuf.move(); + std::unique_ptr responseBody = + performCurlRequest(fullUrl, std::move(requestBody)); - // Deserialize the response into a RowVector - auto outputRowVector = IOBufToRowVector( - *responseBody, ROW({outputType}), *context.pool(), &serde); + auto outputRowVector = IOBufToRowVector( + *responseBody, ROW({outputType}), *context.pool(), &serde); - result = outputRowVector->childAt(0); - } + result = outputRowVector->childAt(0); } catch (const std::exception& e) { VELOX_FAIL( "Error while executing remote function '{}': {}",