Skip to content

Commit

Permalink
Create callback based on file descriptor monitoring (#190)
Browse files Browse the repository at this point in the history
Massive feature by @shikokuchuo to enable later to schedule callbacks
based not on the passing of time, but on the state of file descriptors
(and Win32 SOCKETs). The entry point is `later::later_fd()`.

---------

Co-authored-by: shikokuchuo <[email protected]>
  • Loading branch information
jcheng5 and shikokuchuo authored Nov 8, 2024
1 parent 968542a commit 8210bc8
Show file tree
Hide file tree
Showing 19 changed files with 711 additions and 8 deletions.
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Version: 1.3.2.9000
Authors@R: c(
person("Winston", "Chang", role = c("aut", "cre"), email = "[email protected]"),
person("Joe", "Cheng", role = c("aut"), email = "[email protected]"),
person("Charlie", "Gao", role = c("aut"), email = "[email protected]", comment = c(ORCID = "0000-0002-0750-061X")),
person(family = "Posit Software, PBC", role = "cph"),
person("Marcus", "Geelnard", role = c("ctb", "cph"), comment = "TinyCThread library, https://tinycthread.github.io/"),
person("Evan", "Nemerson", role = c("ctb", "cph"), comment = "TinyCThread library, https://tinycthread.github.io/")
Expand All @@ -20,9 +21,10 @@ Imports:
rlang
LinkingTo: Rcpp
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
Suggests:
knitr,
nanonext,
rmarkdown,
testthat (>= 2.1.0)
VignetteBuilder: knitr
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(destroy_loop)
export(exists_loop)
export(global_loop)
export(later)
export(later_fd)
export(loop_empty)
export(next_op_secs)
export(run_now)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# later (development version)

* Adds `later_fd()` which executes a function when a file descriptor is ready for reading or writing, at some indeterminate time in the future (subject to an optional timeout). This facilitates an event-driven approach to asynchronous or streaming downloads. (@shikokuchuo and @jcheng5, #190)

* Fixed #186: Improvements to package load time as `rlang` is now only loaded when used. This is a notable efficiency for packages with only a 'linking to' dependency on `later`. Also updates to native symbol registration from dynamic lookup. (@shikokuchuo and @wch, #187)

# later 1.3.2
Expand Down
8 changes: 8 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ using_ubsan <- function() {
.Call(`_later_using_ubsan`)
}

execLater_fd <- function(callback, readfds, writefds, exceptfds, timeoutSecs, loop_id) {
.Call(`_later_execLater_fd`, callback, readfds, writefds, exceptfds, timeoutSecs, loop_id)
}

fd_cancel <- function(xptr) {
.Call(`_later_fd_cancel`, xptr)
}

setCurrentRegistryId <- function(id) {
invisible(.Call(`_later_setCurrentRegistryId`, id))
}
Expand Down
92 changes: 92 additions & 0 deletions R/later.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,98 @@ create_canceller <- function(id, loop_id) {
}
}

#' Executes a function when a file descriptor is ready
#'
#' Schedule an R function or formula to run after an indeterminate amount of
#' time when file descriptors are ready for reading or writing, subject to an
#' optional timeout.
#'
#' On the occasion the system-level `poll` (on Windows `WSAPoll`) returns an
#' error, the callback will be made on a vector of all `NA`s. This is
#' indistinguishable from a case where the `poll` succeeds but there are error
#' conditions pending against each file descriptor.
#'
#' If no file descriptors are supplied, the callback is scheduled for immediate
#' execution and made on the empty logical vector `logical(0)`.
#'
#' @param func A function that takes a single argument, a logical vector that
#' indicates which file descriptors are ready (a concatenation of `readfds`,
#' `writefds` and `exceptfds`). This may be all `FALSE` if the
#' `timeout` argument is non-`Inf`. File descriptors with error conditions
#' pending are represented as `NA`, as are invalid file descriptors such as
#' those already closed.
#' @param readfds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor for being ready to read.
#' @param writefds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor being ready to write.
#' @param exceptfds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor for error conditions pending.
#' @param timeout Number of seconds to wait before giving up, and calling `func`
#' with all `FALSE`. The default `Inf` implies waiting indefinitely.
#' Specifying `0` will check once without blocking, and supplying a negative
#' value defaults to a timeout of 1s.
#' @param loop A handle to an event loop. Defaults to the currently-active loop.
#'
#' @inherit later return note
#'
#' @examplesIf requireNamespace("nanonext", quietly = TRUE)
#' # create nanonext sockets
#' s1 <- nanonext::socket(listen = "inproc://nano")
#' s2 <- nanonext::socket(dial = "inproc://nano")
#' fd1 <- nanonext::opt(s1, "recv-fd")
#' fd2 <- nanonext::opt(s2, "recv-fd")
#'
#' # 1. timeout: prints FALSE, FALSE
#' later_fd(print, c(fd1, fd2), timeout = 0.1)
#' Sys.sleep(0.2)
#' run_now()
#'
#' # 2. fd1 ready: prints TRUE, FALSE
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' res <- nanonext::send(s2, "msg")
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 3. both ready: prints TRUE, TRUE
#' res <- nanonext::send(s1, "msg")
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 4. fd2 ready: prints FALSE, TRUE
#' res <- nanonext::recv(s1)
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 5. fds invalid: prints NA, NA
#' close(s2)
#' close(s1)
#' later_fd(print, c(fd1, fd2), timeout = 0)
#' Sys.sleep(0.1)
#' run_now()
#'
#' @export
later_fd <- function(func, readfds = integer(), writefds = integer(), exceptfds = integer(),
timeout = Inf, loop = current_loop()) {
if (!is.function(func)) {
func <- rlang::as_function(func)
}
xptr <- execLater_fd(func, readfds, writefds, exceptfds, timeout, loop$id)

invisible(create_fd_canceller(xptr))
}

# Returns a function that will cancel a callback with the given external
# pointer. If the callback has already been executed or canceled, then the
# function has no effect.
create_fd_canceller <- function(xptr) {
force(xptr)
function() {
invisible(fd_cancel(xptr))
}
}

#' Execute scheduled operations
#'
#' Normally, operations scheduled with [later()] will not execute unless/until
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ Or a formula (in this case, run as soon as control returns to the top-level):
```r
later::later(~print("Got here!"))
```
### File Descriptor Readiness

It is also possible to have a function run based on when file descriptors are ready for reading or writing, at some indeterminate time in the future.

Below, a logical vector is printed indicating which of file descriptors 21 or 22 were ready, subject to a timeout of 1s. Instead of just printing the result, the supplied function can also do something more useful such as reading from the descriptor.

```r
later::later_fd(print, c(21L, 22L), timeout = 1)
```

This is useful in particular for asynchronous or streaming data transfer over the network / internet, so that reads can be made from TCP sockets as soon as data is available. `later::later_fd()` pairs well with functions such as `curl::multi_fdset()` that return the relevant file descriptors to be monitored .

## Usage from C++

Expand All @@ -45,6 +56,13 @@ void later(void (*func)(void*), void* data, double secs)
The first argument is a pointer to a function that takes one `void*` argument and returns void. The second argument is a `void*` that will be passed to the function when it's called back. And the third argument is the number of seconds to wait (at a minimum) before invoking.
`later::later_fd` is also accessible from `later_api.h` and its prototype looks like this:
```cpp
void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs)
```
The first argument is a pointer to a function that takes two arguments: the first being an `int*` array provided by `later_fd()` when called back, and the second being a `void*`. The `int*` array will be the length of `num_fds` and contain the values `0`, `1` or `NA_INTEGER` to indicate the readiness of each file descriptor, or an error condition respectively. The second argument `data` is passed to the `void*` argument of the function when it's called back. The third is the total number of file descriptors being passed, the fourth a pointer to an array of `stuct pollfds`, and the fifth the number of seconds to wait until timing out.

To use the C++ interface, you'll need to add `later` to your `DESCRIPTION` file under both `LinkingTo` and `Imports`, and also make sure that your `NAMESPACE` file has an `import(later)` entry.

### Background tasks
Expand Down Expand Up @@ -121,4 +139,4 @@ void asyncMean(Rcpp::NumericVector data) {
}
```

It's not very useful to execute tasks on background threads if you can't get access to the results back in R. We'll soon be introducing a complementary R package that provides a suitable "promise" or "future" abstraction.
It's not very useful to execute tasks on background threads if you can't get access to the results back in R. The [promises](https://github.com/rstudio/promises) package complements later by providing a "promise" abstraction.
36 changes: 36 additions & 0 deletions inst/include/later.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
#endif

#ifdef _WIN32
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600 // so R <= 4.1 can find WSAPoll() on Windows
#endif
#include <winsock2.h>
#define WIN32_LEAN_AND_MEAN
// Taken from http://tolstoy.newcastle.edu.au/R/e2/devel/06/11/1242.html
// Undefine the Realloc macro, which is defined by both R and by Windows stuff
Expand All @@ -21,6 +25,7 @@
#undef Free
#include <windows.h>
#else // _WIN32
#include <poll.h>
#include <pthread.h>
#endif // _WIN32

Expand Down Expand Up @@ -91,6 +96,37 @@ inline void later(void (*func)(void*), void* data, double secs) {
later(func, data, secs, GLOBAL_LOOP);
}

inline void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs, int loop_id) {
// See above note for later()

// The function type for the real execLaterFdNative
typedef void (*elfdnfun)(void (*)(int *, void *), void *, int, struct pollfd *, double, int);
static elfdnfun elfdn = NULL;
if (!elfdn) {
// Initialize if necessary
if (func) {
// We're not initialized but someone's trying to actually schedule
// some code to be executed!
REprintf(
"Warning: later::execLaterFdNative called in uninitialized state. "
"If you're using <later.h>, please switch to <later_api.h>.\n"
);
}
elfdn = (elfdnfun) R_GetCCallable("later", "execLaterFdNative");
}

// We didn't want to execute anything, just initialize
if (!func) {
return;
}

elfdn(func, data, num_fds, fds, secs, loop_id);
}

inline void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs) {
later_fd(func, data, num_fds, fds, secs, GLOBAL_LOOP);
}


class BackgroundTask {

Expand Down
3 changes: 2 additions & 1 deletion inst/include/later_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace {
// See comment in execLaterNative to learn why we need to do this
// in a statically initialized object
later::later(NULL, NULL, 0);
later::later_fd(NULL, NULL, 0, NULL, 0);
}
};

static LaterInitializer init;

} // namespace
Expand Down
110 changes: 110 additions & 0 deletions man/later_fd.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/Makevars.win
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
PKG_CPPFLAGS = -DSTRICT_R_HEADERS
PKG_LIBS = -lWs2_32

#### Debugging flags ####
# Uncomment to enable thread assertions
Expand Down
Loading

0 comments on commit 8210bc8

Please sign in to comment.