From 70f714a29f14e28322082798b4b90ad3eb077a0d Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Mon, 9 Dec 2024 13:52:32 -0800 Subject: [PATCH] libflux: convert reactor to libuv This is a partial conversion with the following gaps: - no periodic watchers - no child watchers - no flux_reactor_active_incref/decref - no flux_watcher_set_priority Memory will leak if watchers are destroyed outside of the reactor because uv_close() works asynchronously. Currently there is an unexplained segfault in the composite watcher unit test. See issue #6492 --- src/common/libflux/Makefile.am | 2 + src/common/libflux/reactor.c | 63 ++-- src/common/libflux/test/reactor.c | 25 +- src/common/libflux/watcher_wrap.c | 596 +++++++++++++++++++----------- 4 files changed, 441 insertions(+), 245 deletions(-) diff --git a/src/common/libflux/Makefile.am b/src/common/libflux/Makefile.am index f48e7c38620e..e3bcd388a88d 100644 --- a/src/common/libflux/Makefile.am +++ b/src/common/libflux/Makefile.am @@ -17,6 +17,7 @@ AM_CPPFLAGS = \ -DLUADIR=\"$(luadir)\" \ -DLUAEXECDIR=\"$(luaexecdir)\" \ $(JANSSON_CFLAGS) \ + $(LIBUV_CFLAGS) \ $(LIBUUID_CFLAGS) fluxcoreinclude_HEADERS = \ @@ -137,6 +138,7 @@ test_ldadd = \ $(top_builddir)/src/common/libtap/libtap.la \ $(LIBUUID_LIBS) \ $(JANSSON_LIBS) \ + $(LIBUV_LIBS) \ $(LIBPTHREAD) \ $(LIBDL) diff --git a/src/common/libflux/reactor.c b/src/common/libflux/reactor.c index 29f2f11f2ebc..11405e3dbdb0 100644 --- a/src/common/libflux/reactor.c +++ b/src/common/libflux/reactor.c @@ -16,14 +16,13 @@ #include #include #include +#include #include -#include "src/common/libev/ev.h" - #include "reactor_private.h" struct flux_reactor { - struct ev_loop *loop; + uv_loop_t loop; int usecount; unsigned int errflag:1; }; @@ -41,12 +40,7 @@ void flux_reactor_decref (flux_reactor_t *r) { if (r && --r->usecount == 0) { int saved_errno = errno; - if (r->loop) { - if (ev_is_default_loop (r->loop)) - ev_default_destroy (); - else - ev_loop_destroy (r->loop); - } + (void)uv_loop_close (&r->loop); // could return -EBUSY free (r); errno = saved_errno; } @@ -66,83 +60,82 @@ void flux_reactor_destroy (flux_reactor_t *r) flux_reactor_t *flux_reactor_create (int flags) { flux_reactor_t *r; + int uverr; if (valid_flags (flags, FLUX_REACTOR_SIGCHLD) < 0) return NULL; if (!(r = calloc (1, sizeof (*r)))) return NULL; - if ((flags & FLUX_REACTOR_SIGCHLD)) - r->loop = ev_default_loop (EVFLAG_SIGNALFD); - else - r->loop = ev_loop_new (EVFLAG_NOSIGMASK); - if (!r->loop) { - errno = ENOMEM; - flux_reactor_destroy (r); + uverr = uv_loop_init (&r->loop); + if (uverr < 0) { + free (r); + errno = -uverr; return NULL; } - ev_set_userdata (r->loop, r); r->usecount = 1; return r; } int flux_reactor_run (flux_reactor_t *r, int flags) { - int ev_flags = 0; + uv_run_mode mode; int count; - if (valid_flags (flags, FLUX_REACTOR_NOWAIT | FLUX_REACTOR_ONCE) < 0) - return -1; - if (flags & FLUX_REACTOR_NOWAIT) - ev_flags |= EVRUN_NOWAIT; - if (flags & FLUX_REACTOR_ONCE) - ev_flags |= EVRUN_ONCE; + if (flags == FLUX_REACTOR_NOWAIT) + mode = UV_RUN_NOWAIT; + else if (flags == FLUX_REACTOR_ONCE) + mode = UV_RUN_ONCE; + else if (flags == 0) + mode = UV_RUN_DEFAULT; + else { + errno = EINVAL; + return-1; + } r->errflag = 0; - count = ev_run (r->loop, ev_flags); + count = uv_run (&r->loop, mode); return (r->errflag ? -1 : count); } double flux_reactor_time (void) { - return ev_time (); + return 1E-9 * uv_hrtime (); } double flux_reactor_now (flux_reactor_t *r) { - return ev_now (r->loop); + return 1E-3 * uv_now (&r->loop); } void flux_reactor_now_update (flux_reactor_t *r) { - return ev_now_update (r->loop); + uv_update_time (&r->loop); } void flux_reactor_stop (flux_reactor_t *r) { r->errflag = 0; - ev_break (r->loop, EVBREAK_ALL); + uv_stop (&r->loop); } void flux_reactor_stop_error (flux_reactor_t *r) { r->errflag = 1; - ev_break (r->loop, EVBREAK_ALL); + uv_stop (&r->loop); } void flux_reactor_active_incref (flux_reactor_t *r) { - if (r) - ev_ref (r->loop); + // FIXME - see https://docs.libuv.org/en/v1.x/handle.html#refcount } void flux_reactor_active_decref (flux_reactor_t *r) { - if (r) - ev_unref (r->loop); + // FIXME } void *reactor_get_loop (flux_reactor_t *r) { - return r ? r->loop : NULL; + return r ? &r->loop : NULL; } /* diff --git a/src/common/libflux/test/reactor.c b/src/common/libflux/test/reactor.c index d500464cef38..af808c23a562 100644 --- a/src/common/libflux/test/reactor.c +++ b/src/common/libflux/test/reactor.c @@ -161,7 +161,7 @@ static void oneshot (flux_reactor_t *r, static void test_timer (flux_reactor_t *reactor) { flux_watcher_t *w; - double elapsed, t0, t[] = { 0.001, 0.010, 0.050, 0.100, 0.200 }; + double elapsed, t0, t[] = { 0.005, 0.010, 0.050, 0.100, 0.200 }; int i, rc; /* in case this test runs a while after last reactor run. @@ -229,13 +229,15 @@ static void test_timer (flux_reactor_t *reactor) oneshot_runs = 0; rc = flux_reactor_run (reactor, 0); elapsed = flux_reactor_now (reactor) - t0; - ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i], - "timer: reactor ran %.3fs oneshot at >= time (%.3fs)", t[i], elapsed); + // libuv timer rez is 1ms so allow event to fire up to 1ms early + ok (rc == 0 && oneshot_runs == 1 && elapsed >= t[i] - 0.001, + "timer: reactor ran %.3fs oneshot punctually", t[i]); + diag ("elapsed time was %.3fs", elapsed); } flux_watcher_destroy (w); } - +#if TODO_PERIODIC /* A reactor callback that immediately stops reactor without error */ static bool do_stop_callback_ran = false; static void do_stop_reactor (flux_reactor_t *r, @@ -362,6 +364,7 @@ static void test_periodic (flux_reactor_t *reactor) flux_watcher_destroy (w); } +#endif static int idle_count = 0; static void idle_cb (flux_reactor_t *r, @@ -515,6 +518,7 @@ static void test_signal (flux_reactor_t *reactor) flux_watcher_destroy (idle); } +#if TODO_CHILD static pid_t child_pid = -1; static void child_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -563,6 +567,7 @@ static void test_child (flux_reactor_t *reactor) flux_watcher_destroy (w); flux_reactor_destroy (r); } +#endif struct stat_ctx { int fd; @@ -654,6 +659,7 @@ static void test_stat (flux_reactor_t *reactor) free (ctx.path); } +#if TODO_ACTIVE_REF static void active_idle_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, @@ -702,6 +708,7 @@ static void test_active_ref (flux_reactor_t *r) flux_watcher_destroy (w); } +#endif static void reactor_destroy_early (void) { @@ -728,6 +735,7 @@ static void test_reactor_flags (flux_reactor_t *r) "flux_reactor_create flags=0xffff fails with EINVAL"); } +#if TODO_PRIORITY static char cblist[6] = {0}; static int cblist_index = 0; static flux_watcher_t *priority_prep = NULL; @@ -800,6 +808,7 @@ static void test_priority (flux_reactor_t *r) flux_watcher_destroy (priority_prep); flux_watcher_destroy (priority_idle); } +#endif int main (int argc, char *argv[]) { @@ -819,16 +828,24 @@ int main (int argc, char *argv[]) "flux_watcher_is_active (NULL) returns false"); test_timer (reactor); +#if TODO_PERIODIC test_periodic (reactor); +#endif test_fd (reactor); test_idle (reactor); test_prepcheck (reactor); test_signal (reactor); +#if TODO_CHILD test_child (reactor); +#endif test_stat (reactor); +#if TODO_ACTIVE_REF test_active_ref (reactor); +#endif test_reactor_flags (reactor); +#if TODO_PRIORITY test_priority (reactor); +#endif flux_reactor_destroy (reactor); diff --git a/src/common/libflux/watcher_wrap.c b/src/common/libflux/watcher_wrap.c index 42299c6a276f..82a2b48099bf 100644 --- a/src/common/libflux/watcher_wrap.c +++ b/src/common/libflux/watcher_wrap.c @@ -8,112 +8,106 @@ * SPDX-License-Identifier: LGPL-3.0 \************************************************************/ -/* watcher_wrap.c - wrapped libev watchers */ +/* watcher_wrap.c - wrapped libuv watchers */ + +/* CAVEAT: a libuv handle (the libuv name for watcher) cannot be directly + * destroyed. In the Flux watcher destroy callback, we call uv_close(), + * registering libuv_close_cb(), then in libuv_close_cb(), the handle is freed. + * If the reactor doesn't run to allow the callback to run, memory is leaked. + */ #if HAVE_CONFIG_H #include "config.h" #endif #include +#include #include -#include "src/common/libev/ev.h" +#include "src/common/libutil/errno_safe.h" #include "reactor_private.h" #include "watcher_private.h" -static inline int events_to_libev (int events) +static inline int events_to_libuv (int events) { int e = 0; if (events & FLUX_POLLIN) - e |= EV_READ; + e |= UV_READABLE; if (events & FLUX_POLLOUT) - e |= EV_WRITE; + e |= UV_WRITABLE; if (events & FLUX_POLLERR) - e |= EV_ERROR; + e |= UV_DISCONNECT; return e; } -static inline int libev_to_events (int events) +static inline int libuv_to_events (int events) { int e = 0; - if (events & EV_READ) + if (events & UV_READABLE) e |= FLUX_POLLIN; - if (events & EV_WRITE) + if (events & UV_WRITABLE) e |= FLUX_POLLOUT; - if (events & EV_ERROR) + if (events & UV_DISCONNECT) e |= FLUX_POLLERR; return e; } -static void watcher_call_ev (flux_watcher_t *w, int revents) -{ - watcher_call (w, libev_to_events (revents)); -} - -static struct ev_loop *watcher_get_ev (flux_watcher_t *w) +static void libuv_close_cb (uv_handle_t *uvh) { - return reactor_get_loop (watcher_get_reactor (w)); + free (uvh); } -static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +static void watcher_call_uv (flux_watcher_t *w, int revents) { - flux_watcher_stop ((flux_watcher_t *)pw->data); - ev_prepare_stop (loop, pw); - free (pw); + watcher_call (w, libuv_to_events (revents)); } -/* Stop a watcher in the next ev_prepare callback. To be used from periodics - * reschedule callback or other ev callbacks in which it is documented - * unsafe to modify the ev_loop or any watcher. +/* file descriptors */ -static void watcher_stop_safe (flux_watcher_t *w) + +struct fdwatcher { + uv_poll_t *uvh; + int revents; +}; + +static void fdwatcher_cb (uv_poll_t *uvh, int status, int events) { - if (w) { - ev_prepare *pw = calloc (1, sizeof (*pw)); - if (!pw) /* On ENOMEM, we just have to give up */ - return; - ev_prepare_init (pw, safe_stop_cb); - pw->data = w; - ev_prepare_start (watcher_get_ev (w), pw); - } -} + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + if (status < 0) + watcher_call_uv (w, FLUX_POLLERR); + else + watcher_call_uv (w, events); + } -/* This is_active() callback works for "native" libev watchers, where - * watcher data points to a struct ev_TYPE. - */ -static bool wrap_ev_active (flux_watcher_t *w) +static void fdwatcher_start (flux_watcher_t *w) { - return ev_is_active (watcher_get_data (w)); + struct fdwatcher *fdw = watcher_get_data (w); + uv_poll_start (fdw->uvh, fdw->revents, fdwatcher_cb); } -/* file descriptors - */ - -static void fd_start (flux_watcher_t *w) +static void fdwatcher_stop (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); - struct ev_loop *loop = watcher_get_ev (w); - ev_io_start (loop, iow); + struct fdwatcher *fdw = watcher_get_data (w); + uv_poll_stop (fdw->uvh); } -static void fd_stop (flux_watcher_t *w) +static bool fdwatcher_is_active (flux_watcher_t *w) { - ev_io *iow = watcher_get_data (w); - struct ev_loop *loop = watcher_get_ev (w); - ev_io_stop (loop, iow); + struct fdwatcher *fdw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)fdw->uvh); } -static void fd_cb (struct ev_loop *loop, ev_io *iow, int revents) +static void fdwatcher_destroy (flux_watcher_t *w) { - struct flux_watcher *w = iow->data; - watcher_call_ev (w, revents); + struct fdwatcher *fdw = watcher_get_data (w); + uv_close ((uv_handle_t *)fdw->uvh, libuv_close_cb); } -static struct flux_watcher_ops fd_watcher = { - .start = fd_start, - .stop = fd_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static struct flux_watcher_ops fdwatcher_ops = { + .start = fdwatcher_start, + .stop = fdwatcher_stop, + .is_active = fdwatcher_is_active, + .destroy = fdwatcher_destroy, }; flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, @@ -122,53 +116,85 @@ flux_watcher_t *flux_fd_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_io *iow; + uv_loop_t *loop = reactor_get_loop (r); + struct fdwatcher *fdw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iow), &fd_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*fdw), &fdwatcher_ops, cb, arg))) return NULL; - iow = watcher_get_data (w); - ev_io_init (iow, fd_cb, fd, events_to_libev (events) & ~EV_ERROR); - iow->data = w; + fdw = watcher_get_data (w); + fdw->revents = events_to_libuv (events); + if (!(fdw->uvh = calloc (1, sizeof (*fdw->uvh)))) + goto error; + uv_poll_init (loop, fdw->uvh, fd); + uv_handle_set_data ((uv_handle_t *)fdw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } int flux_fd_watcher_get_fd (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &fd_watcher); - ev_io *iow = watcher_get_data (w); - return iow->fd; + if (watcher_get_ops (w) != &fdwatcher_ops) { + errno = EINVAL; + return -1; + } + struct fdwatcher *fdw = watcher_get_data (w); + int uverr; + int fd; + if ((uverr = uv_fileno ((uv_handle_t *)fdw->uvh, &fd)) < 0) { + errno = -uverr; + return -1; + } + return fd; } /* Timer */ -static void timer_start (flux_watcher_t *w) +struct tmwatcher { + uv_timer_t *uvh; + uint64_t timeout; + uint64_t repeat; +}; + +static void tmwatcher_cb (uv_timer_t *uvh) { - struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_start (loop, tw); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call_uv (w, 0); } -static void timer_stop (flux_watcher_t *w) +static void tmwatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - struct ev_timer *tw = watcher_get_data (w); - ev_timer_stop (loop, tw); + struct tmwatcher *tmw = watcher_get_data (w); + uv_timer_start (tmw->uvh, tmwatcher_cb, tmw->timeout, tmw->repeat); } -static void timer_cb (struct ev_loop *loop, ev_timer *tw, int revents) +static void tmwatcher_stop (flux_watcher_t *w) { - struct flux_watcher *w = tw->data; - watcher_call_ev (w, revents); + struct tmwatcher *tmw = watcher_get_data (w); + uv_timer_stop (tmw->uvh); } -static struct flux_watcher_ops timer_watcher = { - .start = timer_start, - .stop = timer_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static bool tmwatcher_is_active (flux_watcher_t *w) +{ + struct tmwatcher *tmw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)tmw->uvh); +} + +static void tmwatcher_destroy (flux_watcher_t *w) +{ + struct tmwatcher *tmw = watcher_get_data (w); + uv_close ((uv_handle_t *)tmw->uvh, libuv_close_cb); +} + +static struct flux_watcher_ops tmwatcher_ops = { + .start = tmwatcher_start, + .stop = tmwatcher_stop, + .is_active = tmwatcher_is_active, + .destroy = tmwatcher_destroy, }; flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, @@ -177,38 +203,80 @@ flux_watcher_t *flux_timer_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_timer *tw; + uv_loop_t *loop = reactor_get_loop (r); + struct tmwatcher *tmw; flux_watcher_t *w; + if (after < 0 || repeat < 0) { errno = EINVAL; return NULL; } - if (!(w = watcher_create (r, sizeof (*tw), &timer_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*tmw), &tmwatcher_ops, cb, arg))) return NULL; - tw = watcher_get_data (w); - ev_timer_init (tw, timer_cb, after, repeat); - tw->data = w; + tmw = watcher_get_data (w); + tmw->timeout = 1000ULL * after; + tmw->repeat = 1000ULL * repeat; + if (!(tmw->uvh = calloc (1, sizeof (*tmw->uvh)))) + goto error; + uv_timer_init (loop, tmw->uvh); + uv_handle_set_data ((uv_handle_t *)tmw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } void flux_timer_watcher_reset (flux_watcher_t *w, double after, double repeat) { - assert (watcher_get_ops (w) == &timer_watcher); - ev_timer *tw = watcher_get_data (w); - ev_timer_set (tw, after, repeat); + if (watcher_get_ops (w) != &tmwatcher_ops) + return; + struct tmwatcher *tmw = watcher_get_data (w); + tmw->timeout = 1000ULL * after; + tmw->repeat = 1000ULL * repeat; } void flux_timer_watcher_again (flux_watcher_t *w) { - assert (watcher_get_ops (w) == &timer_watcher); - struct ev_loop *loop = watcher_get_ev (w); - ev_timer *tw = watcher_get_data (w); - ev_timer_again (loop, tw); + if (watcher_get_ops (w) != &tmwatcher_ops) + return; + struct tmwatcher *tmw = watcher_get_data (w); + /* in future.c::then_context_set_timeout() we assume that 'again' can be + * run on a timer that hasn't been started. That was apparently allowed + * by libev, but is not allowed by libev + */ + if (uv_timer_again (tmw->uvh) == UV_EINVAL) { + if (tmw->repeat > 0) + flux_watcher_start (w); + } } +#if TODO_PERIODIC /* Periodic */ + +static void safe_stop_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +{ + flux_watcher_stop ((flux_watcher_t *)pw->data); + ev_prepare_stop (loop, pw); + free (pw); +} + +/* Stop a watcher in the next ev_prepare callback. To be used from periodics + * reschedule callback or other ev callbacks in which it is documented + * unsafe to modify the ev_loop or any watcher. + */ +static void watcher_stop_safe (flux_watcher_t *w) +{ + if (w) { + ev_prepare *pw = calloc (1, sizeof (*pw)); + if (!pw) /* On ENOMEM, we just have to give up */ + return; + ev_prepare_init (pw, safe_stop_cb); + pw->data = w; + ev_prepare_start (watcher_get_ev (w), pw); + } +} struct f_periodic { struct flux_watcher *w; ev_periodic evp; @@ -330,151 +398,206 @@ double flux_watcher_next_wakeup (flux_watcher_t *w) errno = EINVAL; return (-1.); } +#endif /* Prepare */ -static void prepare_start (flux_watcher_t *w) + +struct prepwatcher { + uv_prepare_t *uvh; +}; + +static void prepwatcher_cb (uv_prepare_t *uvh) { - struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_start (loop, pw); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call_uv (w, 0); } -static void prepare_stop (flux_watcher_t *w) +static void prepwatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_prepare *pw = watcher_get_data (w); - ev_prepare_stop (loop, pw); + struct prepwatcher *pw = watcher_get_data (w); + uv_prepare_start (pw->uvh, prepwatcher_cb); } -static void prepare_cb (struct ev_loop *loop, ev_prepare *pw, int revents) +static void prepwatcher_stop (flux_watcher_t *w) { - struct flux_watcher *w = pw->data; - watcher_call_ev (w, revents); + struct prepwatcher *pw = watcher_get_data (w); + uv_prepare_stop (pw->uvh); } -static struct flux_watcher_ops prepare_watcher = { - .start = prepare_start, - .stop = prepare_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static bool prepwatcher_is_active (flux_watcher_t *w) +{ + struct prepwatcher *pw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)pw->uvh); +} + +static void prepwatcher_destroy (flux_watcher_t *w) +{ + struct prepwatcher *pw = watcher_get_data (w); + uv_close ((uv_handle_t *)pw->uvh, libuv_close_cb); +} + +static struct flux_watcher_ops prepwatcher_ops = { + .start = prepwatcher_start, + .stop = prepwatcher_stop, + .is_active = prepwatcher_is_active, + .destroy = prepwatcher_destroy, }; flux_watcher_t *flux_prepare_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_prepare *pw; + uv_loop_t *loop = reactor_get_loop (r); + struct prepwatcher *pw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*pw), &prepare_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*pw), &prepwatcher_ops, cb, arg))) return NULL; pw = watcher_get_data (w); - ev_prepare_init (pw, prepare_cb); - pw->data = w; + if (!(pw->uvh = calloc (1, sizeof (*pw->uvh)))) + goto error; + uv_prepare_init (loop, pw->uvh); + uv_handle_set_data ((uv_handle_t *)pw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } /* Check */ -static void check_set_priority (flux_watcher_t *w, int priority) +struct chkwatcher { + uv_check_t *uvh; +}; + +static void chkwatcher_cb (uv_check_t *uvh) { - ev_check *cw = watcher_get_data (w); - ev_set_priority (cw, priority); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call_uv (w, 0); } -static void check_start (flux_watcher_t *w) +static void chkwatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_start (loop, cw); + struct chkwatcher *cw = watcher_get_data (w); + uv_check_start (cw->uvh, chkwatcher_cb); } -static void check_stop (flux_watcher_t *w) +static void chkwatcher_stop (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_check *cw = watcher_get_data (w); - ev_check_stop (loop, cw); + struct chkwatcher *cw = watcher_get_data (w); + uv_check_stop (cw->uvh); } -static void check_cb (struct ev_loop *loop, ev_check *cw, int revents) +static bool chkwatcher_is_active (flux_watcher_t *w) { - struct flux_watcher *w = cw->data; - watcher_call_ev (w, revents); + struct chkwatcher *cw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)cw->uvh); } -static struct flux_watcher_ops check_watcher = { - .set_priority = check_set_priority, - .start = check_start, - .stop = check_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static void chkwatcher_destroy (flux_watcher_t *w) +{ + struct chkwatcher *cw = watcher_get_data (w); + uv_close ((uv_handle_t *)cw->uvh, libuv_close_cb); +} + +static struct flux_watcher_ops chkwatcher_ops = { + .start = chkwatcher_start, + .stop = chkwatcher_stop, + .is_active = chkwatcher_is_active, + .destroy = chkwatcher_destroy, }; flux_watcher_t *flux_check_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_check *cw; + uv_loop_t *loop = reactor_get_loop (r); + struct chkwatcher *cw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*cw), &check_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*cw), &chkwatcher_ops, cb, arg))) return NULL; cw = watcher_get_data (w); - ev_check_init (cw, check_cb); - cw->data = w; + if (!(cw->uvh = calloc (1, sizeof (*cw->uvh)))) + goto error; + uv_check_init (loop, cw->uvh); + uv_handle_set_data ((uv_handle_t *)cw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } /* Idle */ +struct idlewatcher { + uv_idle_t *uvh; +}; -static void idle_start (flux_watcher_t *w) +static void idlewatcher_cb (uv_idle_t *uvh) { - struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_start (loop, iw); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call_uv (w, 0); } -static void idle_stop (flux_watcher_t *w) +static void idlewatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_idle *iw = watcher_get_data (w); - ev_idle_stop (loop, iw); + struct idlewatcher *iw = watcher_get_data (w); + uv_idle_start (iw->uvh, idlewatcher_cb); } -static void idle_cb (struct ev_loop *loop, ev_idle *iw, int revents) +static void idlewatcher_stop (flux_watcher_t *w) { - struct flux_watcher *w = iw->data; - watcher_call_ev (w, revents); + struct idlewatcher *iw = watcher_get_data (w); + uv_idle_stop (iw->uvh); } -static struct flux_watcher_ops idle_watcher = { - .start = idle_start, - .stop = idle_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static bool idlewatcher_is_active (flux_watcher_t *w) +{ + struct idlewatcher *iw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)iw->uvh); +} + +static void idlewatcher_destroy (flux_watcher_t *w) +{ + struct idlewatcher *iw = watcher_get_data (w); + uv_close ((uv_handle_t *)iw->uvh, libuv_close_cb); +} + +static struct flux_watcher_ops idlewatcher_ops = { + .start = idlewatcher_start, + .stop = idlewatcher_stop, + .is_active = idlewatcher_is_active, + .destroy = idlewatcher_destroy, }; flux_watcher_t *flux_idle_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { - ev_idle *iw; + uv_loop_t *loop = reactor_get_loop (r); + struct idlewatcher *iw; flux_watcher_t *w; - if (!(w = watcher_create (r, sizeof (*iw), &idle_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*iw), &idlewatcher_ops, cb, arg))) return NULL; iw = watcher_get_data (w); - ev_idle_init (iw, idle_cb); - iw->data = w; + if (!(iw->uvh = calloc (1, sizeof (*iw->uvh)))) + goto error; + uv_idle_init (loop, iw->uvh); + uv_handle_set_data ((uv_handle_t *)iw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } +#if TODO_CHILD /* Child */ @@ -547,35 +670,50 @@ int flux_child_watcher_get_rstatus (flux_watcher_t *w) ev_child *cw = watcher_get_data (w); return cw->rstatus; } +#endif /* Signal */ +struct sigwatcher { + uv_signal_t *uvh; + int signum; +}; -static void signal_start (flux_watcher_t *w) +static void sigwatcher_cb (uv_signal_t *uvh, int signum) { - struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_start (loop, sw); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + watcher_call_uv (w, 0); } -static void signal_stop (flux_watcher_t *w) +static void sigwatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_signal *sw = watcher_get_data (w); - ev_signal_stop (loop, sw); + struct sigwatcher *sw = watcher_get_data (w); + uv_signal_start (sw->uvh, sigwatcher_cb, sw->signum); } -static void signal_cb (struct ev_loop *loop, ev_signal *sw, int revents) +static void sigwatcher_stop (flux_watcher_t *w) { - struct flux_watcher *w = sw->data; - watcher_call_ev (w, revents); + struct sigwatcher *sw = watcher_get_data (w); + uv_signal_stop (sw->uvh); } -static struct flux_watcher_ops signal_watcher = { - .start = signal_start, - .stop = signal_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static bool sigwatcher_is_active (flux_watcher_t *w) +{ + struct sigwatcher *sw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)sw->uvh); +} + +static void sigwatcher_destroy (flux_watcher_t *w) +{ + struct sigwatcher *sw = watcher_get_data (w); + uv_close ((uv_handle_t *)sw->uvh, libuv_close_cb); +} + +static struct flux_watcher_ops sigwatcher_ops = { + .start = sigwatcher_start, + .stop = sigwatcher_stop, + .is_active = sigwatcher_is_active, + .destroy = sigwatcher_destroy, }; flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, @@ -583,84 +721,130 @@ flux_watcher_t *flux_signal_watcher_create (flux_reactor_t *r, flux_watcher_f cb, void *arg) { + uv_loop_t *loop = reactor_get_loop (r); flux_watcher_t *w; - ev_signal *sw; + struct sigwatcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &signal_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &sigwatcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_signal_init (sw, signal_cb, signum); - sw->data = w; + sw->signum = signum; + if (!(sw->uvh = calloc (1, sizeof (*sw->uvh)))) + goto error; + uv_signal_init (loop, sw->uvh); + uv_handle_set_data ((uv_handle_t *)sw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } int flux_signal_watcher_get_signum (flux_watcher_t *w) { - if (watcher_get_ops (w) != &signal_watcher) { + if (watcher_get_ops (w) != &sigwatcher_ops) { errno = EINVAL; - return (-1); + return -1; } - ev_signal *sw = watcher_get_data (w); - return sw->signum; + struct sigwatcher *sw = watcher_get_data (w); + return sw->uvh->signum; } /* Stat */ -static void stat_start (flux_watcher_t *w) +struct statwatcher { + uv_fs_event_t *uvh; + char *path; + struct stat prev; + struct stat stat; +}; + +static void statwatcher_cb (uv_fs_event_t *uvh, + const char *filename, + int events, + int status) { - struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_start (loop, sw); + struct flux_watcher *w = uv_handle_get_data ((uv_handle_t *)uvh); + struct statwatcher *sw = watcher_get_data (w); + sw->prev = sw->stat; + if (stat (sw->path, &sw->stat) < 0) + sw->stat.st_nlink = 0; + watcher_call_uv (w, 0); } -static void stat_stop (flux_watcher_t *w) +static void statwatcher_start (flux_watcher_t *w) { - struct ev_loop *loop = watcher_get_ev (w); - ev_stat *sw = watcher_get_data (w); - ev_stat_stop (loop, sw); + struct statwatcher *sw = watcher_get_data (w); + uv_fs_event_start (sw->uvh, + statwatcher_cb, + sw->path, + UV_FS_EVENT_WATCH_ENTRY); } -static void stat_cb (struct ev_loop *loop, ev_stat *sw, int revents) +static void statwatcher_stop (flux_watcher_t *w) { - struct flux_watcher *w = sw->data; - watcher_call_ev (w, revents); + struct statwatcher *sw = watcher_get_data (w); + uv_fs_event_stop (sw->uvh); } -static struct flux_watcher_ops stat_watcher = { - .start = stat_start, - .stop = stat_stop, - .destroy = NULL, - .is_active = wrap_ev_active, +static bool statwatcher_is_active (flux_watcher_t *w) +{ + struct statwatcher *sw = watcher_get_data (w); + return uv_is_active ((uv_handle_t *)sw->uvh); +} + +static void statwatcher_destroy (flux_watcher_t *w) +{ + struct statwatcher *sw = watcher_get_data (w); + uv_close ((uv_handle_t *)sw->uvh, libuv_close_cb); + ERRNO_SAFE_WRAP (free, sw->path); +} + +static struct flux_watcher_ops statwatcher_ops = { + .start = statwatcher_start, + .stop = statwatcher_stop, + .is_active = statwatcher_is_active, + .destroy = statwatcher_destroy, }; flux_watcher_t *flux_stat_watcher_create (flux_reactor_t *r, const char *path, - double interval, + double interval, // ignored flux_watcher_f cb, void *arg) { + uv_loop_t *loop = reactor_get_loop (r); flux_watcher_t *w; - ev_stat *sw; + struct statwatcher *sw; - if (!(w = watcher_create (r, sizeof (*sw), &stat_watcher, cb, arg))) + if (!(w = watcher_create (r, sizeof (*sw), &statwatcher_ops, cb, arg))) return NULL; sw = watcher_get_data (w); - ev_stat_init (sw, stat_cb, path, interval); - sw->data = w; + if (!(sw->path = strdup (path)) + || !(sw->uvh = calloc (1, sizeof (*(sw->uvh))))) + goto error; + if (stat (path, &sw->stat) < 0) + sw->stat.st_nlink = 0; + sw->prev = sw->stat; + uv_fs_event_init (loop, sw->uvh); + uv_handle_set_data ((uv_handle_t *)sw->uvh, w); return w; +error: + flux_watcher_destroy (w); + return NULL; } void flux_stat_watcher_get_rstat (flux_watcher_t *w, struct stat *stat, struct stat *prev) { - ev_stat *sw = watcher_get_data (w); - assert (watcher_get_ops (w) == &stat_watcher); + if (watcher_get_ops (w) != &statwatcher_ops) + return; + struct statwatcher *sw = watcher_get_data (w); if (stat) - *stat = sw->attr; + *stat = sw->stat; if (prev) *prev = sw->prev; }