diff --git a/NAMESPACE b/NAMESPACE index 078620a2..377e59b1 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -5,6 +5,7 @@ S3method(print,httr2_response) export("%>%") export(req) export(req_fetch) +export(req_stream) export(resp_body_json) export(resp_body_raw) export(resp_body_string) diff --git a/R/req-fetch.R b/R/req-fetch.R index 0d6ad421..d237be67 100644 --- a/R/req-fetch.R +++ b/R/req-fetch.R @@ -34,14 +34,33 @@ req_fetch <- function(req, path = NULL, handle = NULL) { ) } - -req_stream <- function(req, callback, timeout = Inf, buffer_kb = 64) { +#' Perform a request, streaming data back to R +#' +#' After preparing a request, call `req_stream()` to perform the request +#' and handle the result with a streaming callback. This is useful for +#' streaming HTTP APIs where potentially the stream never ends. +#' +#' @inheritParams req_fetch +#' @param callback A single argument callback function. It will be called +#' repeatedly with a raw vector whenever there is at least `buffer_kb` +#' worth of data to process. It must return `TRUE` to continue streaming. +#' @param timeout_sec Number of seconds to processs stream for. +#' @param buffer_kb Buffer size, in kilobytes. +#' @export +#' @examples +#' show_bytes <- function(x) { +#' cat("Got ", length(x), " bytes\n", sep = "") +#' TRUE +#' } +#' req("http://httpbin.org/stream-bytes/100000") %>% +#' req_stream(show_bytes, buffer_kb = 32) +req_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64) { url <- req_url_get(req) handle <- req_handle(req) callback <- as_function(callback) - stopifnot(is.numeric(timeout), timeout > 0) - stop_time <- Sys.time() + timeout + stopifnot(is.numeric(timeout_sec), timeout_sec > 0) + stop_time <- Sys.time() + timeout_sec stream <- curl::curl(url, handle = handle) open(stream, "rb") @@ -55,14 +74,14 @@ req_stream <- function(req, callback, timeout = Inf, buffer_kb = 64) { } } - res <- curl::handle_data(handle) - + data <- curl::handle_data(handle) new_response( - url = res$url, - status_code = res$status_code, - headers = curl::parse_headers_list(res$headers), - body = res$content, - times = res$times + handle = handle, + url = data$url, + status_code = data$status_code, + headers = curl::parse_headers_list(data$headers), + body = NULL, + times = data$times ) } @@ -77,6 +96,5 @@ req_handle <- function(req) { handle } - new_path <- function(x) structure(x, class = "httr_path") is_path <- function(x) inherits(x, "httr_path") diff --git a/man/req_stream.Rd b/man/req_stream.Rd new file mode 100644 index 00000000..fced9f6c --- /dev/null +++ b/man/req_stream.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/req-fetch.R +\name{req_stream} +\alias{req_stream} +\title{Perform a request, streaming data back to R} +\usage{ +req_stream(req, callback, timeout_sec = Inf, buffer_kb = 64) +} +\arguments{ +\item{req}{A \link{req}uest.} + +\item{callback}{A single argument callback function. It will be called +repeatedly with a raw vector whenever there is at least \code{buffer_kb} +worth of data to process. It must return \code{TRUE} to continue streaming.} + +\item{timeout_sec}{Number of seconds to processs stream for.} + +\item{buffer_kb}{Buffer size, in kilobytes.} +} +\description{ +After preparing a request, call \code{req_stream()} to perform the request +and handle the result with a streaming callback. This is useful for +streaming HTTP APIs where potentially the stream never ends. +} +\examples{ +show_bytes <- function(x) { + cat("Got ", length(x), " bytes\n", sep = "") + TRUE +} +req("http://httpbin.org/stream-bytes/100000") \%>\% + req_stream(show_bytes, buffer_kb = 32) +}