Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency Handling #11

Open
StephanGeorg opened this issue Nov 7, 2024 · 2 comments
Open

Concurrency Handling #11

StephanGeorg opened this issue Nov 7, 2024 · 2 comments
Labels
help wanted Extra attention is needed question Further information is requested

Comments

@StephanGeorg
Copy link

I am trying to understand how simultaneous HTTP requests are handled within this project and was hoping for some guidance. Specifically, I am interested in knowing:

  1. Concurrency Handling: Does httpclient handle multiple requests concurrently, and if so, are there any built-in limits on simultaneous requests?
  2. Queueing and Throttling: Is there any form of queueing or throttling mechanism for high traffic situations, or does it rely on external configuration for these aspects?

Additional Context:
I'm using a MACRO to do the requests

CREATE OR REPLACE MACRO call_endpoint(a) AS (
  SELECT
    http_post(
      'https://domain.lol/api/endpoint',
      headers => MAP {
        'accept': 'application/json',
      },
      params => a
    )
    AS data); 

and then apply it to all rows of a table

SELECT call_endpoint(MAP { 
  'countryCode': country,
  'street': street,
  'houseNumber': house_number,
  'postCode': postcode,
  'locality': city
}) AS output_data
FROM '/path/to/input.csv'; 

If /path/to/input.csv would have millions of rows, are all requests fired at the same time?

@lmangani
Copy link
Collaborator

lmangani commented Nov 7, 2024

Hi @StephanGeorg thanks for opening this thread i'm sure it will interest others too. TLDR; let's find out.

The extension simply wraps the DuckDB provided httplib-cpp implementation so everything about the library does apply regardless of it being currently leveraged. Right now the client does not attempt anything fancy (yet) and fires on invocation while the default throttle logic from the library would apply. We can build some test to attempt this with incremental load to find out if there are settings we might need to involve to tweaks limits and/or behavior.

@lmangani lmangani added help wanted Extra attention is needed question Further information is requested labels Nov 9, 2024
@falling-springs
Copy link
Contributor

@lmangani @StephanGeorg

From my own quick exploration:

a) there is no concurrent execution in this extension as it is currently implemented*
b) it is easy to adapt the library to run queries across multiple threads (see example below)
c) handling concurrency/threading/rate limits/retries will add complexity to this extension; from a quick look at the core httpfs code, it seems they've already handled a number of these issues, so it may be worth borrowing directly from that extension?

*though DuckDB itself may split a query across multiple threads, this will only apply for queries that run over many thousands of rows, which is probably not relevant for most use cases of this particular extension.

Below is an example of the HTTPPostRequestFunction that splits requests across threads; I confirmed in testing that this speeds up queries by approx thread count (~10X in my case); this is quick and dirty so there may be a better/more performant way to do this.

static void HTTPPostRequestFunctionParallel(DataChunk &args, ExpressionState &state, Vector &result) {
    D_ASSERT(args.data.size() == 3);

    using STRING_TYPE = PrimitiveType<string_t>;
    using LENTRY_TYPE = PrimitiveType<list_entry_t>;

    auto &url_vector = args.data[0];
    auto &headers_vector = args.data[1];
    auto &headers_entry = ListVector::GetEntry(headers_vector);
    auto &body_vector = args.data[2];

    idx_t count = args.size();

    // Pre-extract input data to thread-safe structures
    std::vector<std::string> url_inputs(count);
    std::vector<std::vector<std::pair<std::string, std::string>>> header_inputs(count);
    std::vector<std::string> body_inputs(count);

    for (idx_t i = 0; i < count; ++i) {
        // Extract URL
        auto url_value = url_vector.GetValue(i);
        url_inputs[i] = url_value.ToString();

        // Extract headers
        auto headers_value = headers_vector.GetValue(i);
        auto header_list = ListValue::GetChildren(headers_value);
        std::vector<std::pair<std::string, std::string>> headers;

        for (const auto &child : header_list) {
            auto entry = StructValue::GetChildren(child);
            std::string key = entry[0].ToString();
            std::string val = entry[1].ToString();
            headers.emplace_back(key, val);
        }
        header_inputs[i] = std::move(headers);

        // Extract body
        auto body_value = body_vector.GetValue(i);
        body_inputs[i] = body_value.ToString();
    }

    // Prepare to collect responses
    std::vector<std::string> responses(count);

    // Determine the number of threads to use
    const idx_t MAX_THREADS = std::thread::hardware_concurrency();
    idx_t num_threads = std::min(count, MAX_THREADS);
    idx_t chunk_size = (count + num_threads - 1) / num_threads;

    std::vector<std::thread> threads;

    // Launch threads to process HTTP requests concurrently
    for (idx_t t = 0; t < num_threads; ++t) {
        idx_t start = t * chunk_size;
        idx_t end = std::min(start + chunk_size, count);

        threads.emplace_back([&, start, end]() {
            for (idx_t i = start; i < end; ++i) {
                std::string url_str = url_inputs[i];
                auto &headers_vec = header_inputs[i];
                std::string body_str = body_inputs[i];

                // Set up the HTTP client
                auto client_and_path = SetupHttpClient(url_str);
                auto &client = client_and_path.first;
                auto &path = client_and_path.second;

                // Prepare headers
                duckdb_httplib_openssl::Headers header_map;
                for (const auto &header : headers_vec) {
                    header_map.emplace(header.first, header.second);
                }

                // Make the POST request
                auto res = client.Post(path.c_str(), header_map, body_str, "application/json");

                // Collect the response
                if (res) {
                    responses[i] = GetJsonResponse(res->status, res->reason, res->body);
                } else {
                    responses[i] = GetJsonResponse(-1, GetHttpErrorMessage(res, "POST"), "");
                }
            }
        });
    }

    // Wait for all threads to finish
    for (auto &thread : threads) {
        thread.join();
    }

    // Populate the result vector with responses
    for (idx_t i = 0; i < count; ++i) {
        result.SetValue(i, Value(responses[i]));
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants