Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pooled request #653

Merged
merged 12 commits into from
Feb 10, 2025
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* `req_perform_connection()` gives a better error if request fails at networking level.
* `req_throttle()` now uses a "token bucket" which preserves the average rate limit, but allows bursts of higher requests.
* `req_dry_run()` and `req_verbose()` now do a better job of displaying compressed bodies (#91, #656).
* `resp_link_url()` once again ignores the case of headers (@DavidRLovell, #655)
Expand Down
138 changes: 25 additions & 113 deletions R/multi-req.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,36 @@
config = progress
)

perfs <- vector("list", length(reqs))
for (i in seq_along(reqs)) {
perfs[[i]] <- Performance$new(
error_call <- environment()
resps <- rep_along(reqs, list())

handle_success <- function(i, resp, tries) {
progress$update()
resps[[i]] <<- resp
}
handle_problem <- function(i, error, tries) {
progress$update()
error$call <- error_call
resps[[i]] <<- error
signal("", error = error, class = "httr2_fail")

Check warning on line 78 in R/multi-req.R

View check run for this annotation

Codecov / codecov/patch

R/multi-req.R#L75-L78

Added lines #L75 - L78 were not covered by tests
}

pooled_requests <- map(seq_along(reqs), function(i) {
pooled_request(
req = reqs[[i]],
path = paths[[i]],
progress = progress,
error_call = environment()
error_call = error_call,
on_success = function(resp, tries) handle_success(i, resp, tries),
on_failure = function(error, tries) handle_problem(i, error, tries),
on_error = function(error, tries) handle_problem(i, error, tries)
)
perfs[[i]]$submit(pool)
}
})

pool_run(pool, perfs, on_error = on_error)
walk(pooled_requests, function(req) req$submit(pool))
pool_run(pool, pooled_requests, on_error = on_error)
progress$done()

map(perfs, ~ .$resp)
resps
}


Expand Down Expand Up @@ -131,117 +146,14 @@
)

try_fetch(
repeat({
run <- curl::multi_run(0.1, pool = pool, poll = TRUE)
if (run$pending == 0) {
break
}
}),
curl::multi_run(pool = pool),
interrupt = function(cnd) NULL,
httr2_fail = httr2_fail
)

invisible()
}

# Wrap up all components of request -> response in a single object
Performance <- R6Class("Performance", public = list(
req = NULL,
req_prep = NULL,
path = NULL,

handle = NULL,
resp = NULL,
pool = NULL,
error_call = NULL,
progress = NULL,

initialize = function(req, path = NULL, progress = NULL, error_call = NULL) {
self$req <- req
self$path <- path
self$progress <- progress
self$error_call <- error_call

req <- auth_sign(req)
req <- cache_pre_fetch(req, path)
if (is_response(req)) {
self$resp <- req
} else {
self$req_prep <- req_prepare(req)
self$handle <- req_handle(self$req_prep)
curl::handle_setopt(self$handle, url = req$url)
}
},

submit = function(pool = NULL) {
if (!is.null(self$resp)) {
# cached
return()
}

self$pool <- pool
curl::multi_add(
handle = self$handle,
pool = self$pool,
data = self$path,
done = self$succeed,
fail = self$fail
)
invisible(self)
},

succeed = function(res) {
self$progress$update()
req_completed(self$req_prep)

if (is.null(self$path)) {
body <- res$content
} else {
# Only needed with curl::multi_run()
if (!file.exists(self$path)) {
file.create(self$path)
}
body <- new_path(self$path)
}
resp <- new_response(
method = req_method_get(self$req),
url = res$url,
status_code = res$status_code,
headers = as_headers(res$headers),
body = body,
request = self$req
)
resp <- cache_post_fetch(self$req, resp, path = self$path)
self$resp <- tryCatch(
handle_resp(self$req, resp, error_call = self$error_call),
error = identity
)
if (is_error(self$resp)) {
signal("", error = self$resp, class = "httr2_fail")
}
},

fail = function(msg) {
self$progress$update()
req_completed(self$req_prep)

self$resp <- error_cnd(
"httr2_failure",
message = msg,
request = self$req,
call = self$error_call
)
signal("", error = self$resp, class = "httr2_fail")
},

cancel = function() {
# No handle if response was cached
if (!is.null(self$handle)) {
curl::multi_cancel(self$handle)
}
}
))

pool_cancel <- function(pool, perfs) {
walk(perfs, ~ .x$cancel())
curl::multi_run(pool = pool)
Expand Down
2 changes: 1 addition & 1 deletion R/oauth.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ resp_is_invalid_oauth_token <- function(req, resp) {
return(FALSE)
}

if (resp_status(resp) != 401) {
if (is_error(resp) || resp_status(resp) != 401) {
return(FALSE)
}

Expand Down
133 changes: 133 additions & 0 deletions R/pooled-request.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
pooled_request <- function(
req,
path = NULL,
on_success = NULL,
on_failure = NULL,
on_error = NULL,
error_call = caller_env()
) {

check_request(req)
check_string(path, allow_null = TRUE)
check_function2(on_success, args = c("resp", "tries"), allow_null = TRUE)
check_function2(on_failure, args = c("error", "tries"), allow_null = TRUE)
check_function2(on_error, args = c("error", "tries"), allow_null = TRUE)

PooledRequest$new(
req = req,
path = path,
error_call = error_call,
on_success = on_success,
on_failure = on_failure,
on_error = on_error
)
}

# Wrap up all components of request -> response in a single object
PooledRequest <- R6Class(
"Performance",
public = list(
req = NULL,
resp = NULL,
tries = 0,

initialize = function(
req,
path = NULL,
error_call = NULL,
on_success = NULL,
on_failure = NULL,
on_error = NULL
) {
self$req <- req
private$path <- path
private$error_call <- error_call
private$on_success <- on_success
private$on_failure <- on_failure
private$on_error <- on_error
},

submit = function(pool) {
self$tries <- self$tries + 1

req <- cache_pre_fetch(self$req, private$path)
if (is_response(req)) {
private$on_success(req, self$tries)
return()
}

private$req_prep <- req_prepare(req)
private$handle <- req_handle(private$req_prep)

Check warning on line 60 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L59-L60

Added lines #L59 - L60 were not covered by tests

curl::multi_add(
handle = private$handle,
pool = pool,
data = private$path,
done = private$succeed,
fail = private$fail
)

Check warning on line 68 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L62-L68

Added lines #L62 - L68 were not covered by tests

invisible(self)

Check warning on line 70 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L70

Added line #L70 was not covered by tests
},

cancel = function() {
# No handle if response was cached
if (!is.null(private$handle)) {
curl::multi_cancel(private$handle)

Check warning on line 76 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L76

Added line #L76 was not covered by tests
}
}

),
private = list(
path = NULL,
error_call = NULL,
pool = NULL,

req_prep = NULL,
handle = NULL,

on_success = NULL,
on_failure = NULL,
on_error = NULL,

succeed = function(curl_data) {
private$handle <- NULL
req_completed(private$req_prep)

Check warning on line 95 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L94-L95

Added lines #L94 - L95 were not covered by tests

if (is.null(private$path)) {
body <- curl_data$content

Check warning on line 98 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L97-L98

Added lines #L97 - L98 were not covered by tests
} else {
# Only needed with curl::multi_run()
if (!file.exists(private$path)) {
file.create(private$path)

Check warning on line 102 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L101-L102

Added lines #L101 - L102 were not covered by tests
}
body <- new_path(private$path)

Check warning on line 104 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L104

Added line #L104 was not covered by tests
}

resp <- create_response(self$req, curl_data, body)
resp <- cache_post_fetch(self$req, resp, path = private$path)

Check warning on line 108 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L107-L108

Added lines #L107 - L108 were not covered by tests

if (is_error(resp)) {
private$on_error(resp, tries = self$tries)
} else if (error_is_error(self$req, resp)) {
cnd <- resp_failure_cnd(self$req, resp)
private$on_failure(cnd, tries = self$tries)

Check warning on line 114 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L110-L114

Added lines #L110 - L114 were not covered by tests
} else {
private$on_success(resp)
}
},

fail = function(msg) {
private$handle <- NULL
req_completed(private$req_prep)

Check warning on line 122 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L121-L122

Added lines #L121 - L122 were not covered by tests

error <- error_cnd(
"httr2_failure",
message = msg,
request = self$req
)
private$on_error(error, tries = self$tries)

Check warning on line 129 in R/pooled-request.R

View check run for this annotation

Codecov / codecov/patch

R/pooled-request.R#L124-L129

Added lines #L124 - L129 were not covered by tests
}

)
)
2 changes: 1 addition & 1 deletion R/req-dry-run.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ req_dry_run <- function(req, quiet = FALSE, redact_headers = TRUE) {
}

show_body <- function(body, content_type, prefix = "") {
if (is.null(body)) {
if (!is.raw(body)) {
return(invisible())
}

Expand Down
13 changes: 13 additions & 0 deletions R/req-error.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,16 @@ error_body <- function(req, resp, call = caller_env()) {
}
)
}

capture_curl_error <- function(code) {
resp <- tryCatch(
code,
error = function(err) {
error_cnd(
message = "Failed to perform HTTP request.",
class = c("httr2_failure", "httr2_error"),
parent = err
)
}
)
}
Loading
Loading