Skip to content

Commit

Permalink
* Added an alternate implementation when polling of file descriptors is
Browse files Browse the repository at this point in the history
   not supported by the OS. This is automatically detected via the APR
   header files, but can be enforces by adding '--disable-poll-streams'
   to the './configure' when building the module.
   This should allow the module to work on Windows again.
  • Loading branch information
Stefan Eissing committed Sep 29, 2021
1 parent 8b01a49 commit 0282368
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 42 deletions.
8 changes: 8 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
v2.0.0-rc3
--------------------------------------------------------------------------------
* Added an alternate implementation when polling of file descriptors is
not supported by the OS. This is automatically detected via the APR
header files, but can be enforces by adding '--disable-poll-streams'
to the './configure' when building the module.
This should allow the module to work on Windows again.

v2.0.0-rc2
--------------------------------------------------------------------------------
* Switching from a thread-safe apr pollset (not supported under Windows) to
Expand Down
11 changes: 10 additions & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ AC_ARG_WITH([apxs], [AS_HELP_STRING([--with-apxs],
[Use APXS executable [default=check]])],
[request_apxs=$withval], [request_apxs=check])

AC_ARG_ENABLE([poll-streams],
[AS_HELP_STRING([--disable-poll-streams],
[Disable polling of h2 streams, even if available.])],
[poll_streams=$enableval], [poll_streams=yes])

# Checks for programs.
AC_PROG_CC
Expand Down Expand Up @@ -120,7 +124,6 @@ AC_SUBST(LOAD_WATCHDOG)
export BUILD_SUBDIRS="mod_http2"
AC_SUBST(BUILD_SUBDIRS)


# We need nghttp2 to be in our link path, check for it.
#
AC_CHECK_LIB([nghttp2], [nghttp2_session_server_new2], ,
Expand Down Expand Up @@ -288,6 +291,12 @@ AC_ARG_VAR([PKGCONFIG], [pkg-config executable])
AC_PATH_PROG([PKGCONFIG], [pkg-config])


# do we poll streams, when available, or disable this (e.g. for test runs)
if test "$poll_streams" = "no"; then
CPPFLAGS="$CPPFLAGS -DH2_NO_POLL_STREAMS"
fi


AC_CONFIG_FILES([
Makefile
mod_http2/Makefile
Expand Down
12 changes: 8 additions & 4 deletions mod_http2/h2.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ struct h2_session;
struct h2_stream;

/*
* When working, should be this:
#define H2_CAN_POLL_FILES APR_FILES_AS_SOCKETS
*/
#define H2_CAN_POLL_FILES 1
* When apr pollsets can poll file descriptors (e.g. pipes),
* we use it for polling stream input/output.
*/
#ifdef H2_NO_POLL_STREAMS
#define H2_POLL_STREAMS 0
#else
#define H2_POLL_STREAMS APR_FILES_AS_SOCKETS
#endif

/**
* The magic PRIamble of RFC 7540 that is always sent when starting
Expand Down
84 changes: 56 additions & 28 deletions mod_http2/h2_mplx.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,50 @@ static void m_stream_cleanup(h2_mplx *m, h2_stream *stream)
{
h2_conn_ctx_t *c2_ctx = stream->c2? h2_conn_ctx_get(stream->c2) : NULL;

ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, unsubscribing from beam events"));
if (stream->output) {
h2_beam_on_was_empty(stream->output, NULL, NULL);
}
if (stream->input) {
h2_beam_on_received(stream->input, NULL, NULL);
h2_beam_on_consumed(stream->input, NULL, NULL);
}

ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, removing from registries"));
ap_assert(stream->state == H2_SS_CLEANUP);
h2_stream_cleanup(stream);
h2_ihash_remove(m->streams, stream->id);
h2_iq_remove(m->q, stream->id);

if (c2_ctx) {
if (!stream_is_running(stream)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is done, move to spurge"));
/* processing has finished */
APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
}
else {
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is running, abort"));
/* c2 is still running */
stream->c2->aborted = 1;
if (stream->input) {
h2_beam_abort(stream->input, m->c1);
}
if (stream->output) {
h2_beam_abort(stream->output, m->c1);
}
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, c2 is done, move to shold"));
h2_ihash_add(m->shold, stream);
}
}
else {
/* never started */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
H2_STRM_MSG(stream, "cleanup, never started, move to spurge"));
APR_ARRAY_PUSH(m->spurge, h2_stream *) = stream;
}
}
Expand Down Expand Up @@ -231,7 +254,10 @@ h2_mplx *h2_mplx_c1_create(h2_stream *stream0, server_rec *s, apr_pool_t *parent
m->streams_ev_in = apr_array_make(m->pool, 10, sizeof(h2_stream*));
m->streams_ev_out = apr_array_make(m->pool, 10, sizeof(h2_stream*));

#if !H2_CAN_POLL_FILES
#if !H2_POLL_STREAMS
status = apr_thread_mutex_create(&m->poll_lock, APR_THREAD_MUTEX_DEFAULT,
m->pool);
if (APR_SUCCESS != status) goto failure;
m->streams_input_read = h2_iq_create(m->pool, 10);
m->streams_output_written = h2_iq_create(m->pool, 10);
#endif
Expand Down Expand Up @@ -639,7 +665,7 @@ apr_status_t h2_mplx_c1_fwd_input(h2_mplx *m, struct h2_iqueue *input_pending,
return APR_SUCCESS;
}

static void c2_io_input_write_notify(void *ctx, h2_bucket_beam *beam)
static void c2_beam_input_write_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
Expand All @@ -650,7 +676,7 @@ static void c2_io_input_write_notify(void *ctx, h2_bucket_beam *beam)
}
}

static void c2_io_input_read_notify(void *ctx, h2_bucket_beam *beam)
static void c2_beam_input_read_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
Expand All @@ -659,18 +685,18 @@ static void c2_io_input_read_notify(void *ctx, h2_bucket_beam *beam)
if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
}
#if !H2_CAN_POLL_FILES
#if !H2_POLL_STREAMS
else {
H2_MPLX_ENTER_ALWAYS(conn_ctx->mplx);
apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
h2_iq_append(conn_ctx->mplx->streams_input_read, conn_ctx->stream_id);
apr_pollset_wakeup(conn_ctx->mplx->pollset);
H2_MPLX_LEAVE(conn_ctx->mplx);
apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
#endif
}
}

static void c2_io_output_write_notify(void *ctx, h2_bucket_beam *beam)
static void c2_beam_output_write_notify(void *ctx, h2_bucket_beam *beam)
{
conn_rec *c = ctx;
h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c);
Expand All @@ -679,12 +705,12 @@ static void c2_io_output_write_notify(void *ctx, h2_bucket_beam *beam)
if (conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
}
#if !H2_CAN_POLL_FILES
#if !H2_POLL_STREAMS
else {
H2_MPLX_ENTER_ALWAYS(conn_ctx->mplx);
apr_thread_mutex_lock(conn_ctx->mplx->poll_lock);
h2_iq_append(conn_ctx->mplx->streams_output_written, conn_ctx->stream_id);
apr_pollset_wakeup(conn_ctx->mplx->pollset);
H2_MPLX_LEAVE(conn_ctx->mplx);
apr_thread_mutex_unlock(conn_ctx->mplx->poll_lock);
}
#endif
}
Expand All @@ -706,20 +732,20 @@ static apr_status_t c2_setup_io(h2_mplx *m, conn_rec *c2, h2_stream *stream)
if (APR_SUCCESS != rv) goto cleanup;

h2_beam_buffer_size_set(conn_ctx->beam_out, m->stream_max_mem);
h2_beam_on_was_empty(conn_ctx->beam_out, c2_io_output_write_notify, c2);
h2_beam_on_was_empty(conn_ctx->beam_out, c2_beam_output_write_notify, c2);
}

if (stream->input) {
conn_ctx->beam_in = stream->input;
h2_beam_on_was_empty(stream->input, c2_io_input_write_notify, c2);
h2_beam_on_received(stream->input, c2_io_input_read_notify, c2);
h2_beam_on_was_empty(stream->input, c2_beam_input_write_notify, c2);
h2_beam_on_received(stream->input, c2_beam_input_read_notify, c2);
h2_beam_on_consumed(stream->input, c1_input_consumed, stream);
}
else {
memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
}

#if H2_CAN_POLL_FILES
#if H2_POLL_STREAMS
if (!conn_ctx->mplx_pool) {
apr_pool_create(&conn_ctx->mplx_pool, m->pool);
apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
Expand Down Expand Up @@ -879,9 +905,7 @@ static void s_c2_done(h2_mplx *m, conn_rec *c2, h2_conn_ctx_t *conn_ctx)
* since nothing more will happening here. */
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, c2,
H2_STRM_MSG(stream, "c2_done, stream open"));
H2_MPLX_LEAVE(m);
c2_io_output_write_notify(c2, NULL);
H2_MPLX_ENTER_ALWAYS(m);
c2_beam_output_write_notify(c2, NULL);
}
else if ((stream = h2_ihash_get(m->shold, conn_ctx->stream_id)) != NULL) {
/* stream is done, was just waiting for this. */
Expand Down Expand Up @@ -1108,6 +1132,7 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c1,
"h2_mplx(%ld): enter polling timeout=%d",
m->id, (int)apr_time_sec(timeout));

apr_array_clear(m->streams_ev_in);
apr_array_clear(m->streams_ev_out);

Expand All @@ -1116,37 +1141,40 @@ static apr_status_t mplx_pollset_poll(h2_mplx *m, apr_interval_time_t timeout,
if (m->streams_to_poll->nelts) {
for (i = 0; i < m->streams_to_poll->nelts; ++i) {
stream = APR_ARRAY_IDX(m->streams_to_poll, i, h2_stream*);
if (stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
if (stream && stream->c2 && (conn_ctx = h2_conn_ctx_get(stream->c2))) {
mplx_pollset_add(m, conn_ctx);
}
}
apr_array_clear(m->streams_to_poll);
}

H2_MPLX_LEAVE(m);
rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
H2_MPLX_ENTER_ALWAYS(m);

#if !H2_CAN_POLL_FILES
if (APR_STATUS_IS_EINTR(rv)
&& (!h2_iq_empty(m->streams_input_read)
|| !h2_iq_empty(m->streams_output_written))) {
#if !H2_POLL_STREAMS
apr_thread_mutex_lock(m->poll_lock);
if (!h2_iq_empty(m->streams_input_read)
|| !h2_iq_empty(m->streams_output_written)) {
while ((i = h2_iq_shift(m->streams_input_read))) {
stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_in, h2_stream*) = stream;
}
}
while ((i = h2_iq_shift(m->streams_output_written))) {
stream = h2_ihash_get(m->streams, conn_ctx->stream_id);
stream = h2_ihash_get(m->streams, i);
if (stream) {
APR_ARRAY_PUSH(m->streams_ev_out, h2_stream*) = stream;
}
}
nresults = 0;
rv = APR_SUCCESS;
apr_thread_mutex_unlock(m->poll_lock);
break;
}
apr_thread_mutex_unlock(m->poll_lock);
#endif
H2_MPLX_LEAVE(m);
rv = apr_pollset_poll(m->pollset, timeout >= 0? timeout : -1, &nresults, &results);
H2_MPLX_ENTER_ALWAYS(m);

} while (APR_STATUS_IS_EINTR(rv));

if (APR_SUCCESS != rv) {
Expand Down
3 changes: 2 additions & 1 deletion mod_http2/h2_mplx.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ struct h2_mplx {
apr_array_header_t *streams_ev_in;
apr_array_header_t *streams_ev_out;

#if !H2_CAN_POLL_FILES
#if !H2_POLL_STREAMS
apr_thread_mutex_t *poll_lock; /* not the painter */
struct h2_iqueue *streams_input_read; /* streams whose input has been read from */
struct h2_iqueue *streams_output_written; /* streams whose output has been written to */
#endif
Expand Down
6 changes: 0 additions & 6 deletions mod_http2/h2_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,6 @@ void h2_stream_cleanup(h2_stream *stream)
if (stream->out_buffer) {
apr_brigade_cleanup(stream->out_buffer);
}
if (stream->input) {
h2_beam_abort(stream->input, stream->session->c1);
}
if (stream->output) {
h2_beam_abort(stream->output, stream->session->c1);
}
}

void h2_stream_destroy(h2_stream *stream)
Expand Down
2 changes: 1 addition & 1 deletion mod_http2/h2_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @macro
* Version number of the http2 module as c string
*/
#define MOD_HTTP2_VERSION "2.0.0-rc2-git"
#define MOD_HTTP2_VERSION "2.0.0-rc3-git"

/**
* @macro
Expand Down
2 changes: 1 addition & 1 deletion mod_http2/h2_version.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* @macro
* Version number of the http2 module as c string
*/
#define MOD_HTTP2_VERSION "@PACKAGE_VERSION@-rc2-git"
#define MOD_HTTP2_VERSION "@PACKAGE_VERSION@-rc3-git"

/**
* @macro
Expand Down

0 comments on commit 0282368

Please sign in to comment.