Skip to content

Commit

Permalink
* incomplete start for a file pollset alternative
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Eissing committed Sep 28, 2021
1 parent 74e775f commit 8b01a49
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 139 deletions.
6 changes: 6 additions & 0 deletions mod_http2/h2.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
struct h2_session;
struct h2_stream;

/*
* When working, should be this:
#define H2_CAN_POLL_FILES APR_FILES_AS_SOCKETS
*/
#define H2_CAN_POLL_FILES 1

/**
* The magic PRIamble of RFC 7540 that is always sent when starting
* a h2 communication.
Expand Down
25 changes: 13 additions & 12 deletions mod_http2/h2_c2.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,21 @@ static apr_status_t h2_c2_filter_in(ap_filter_t* f,
conn_ctx->id, conn_ctx->stream_id, block, (long)readbytes);
}
if (conn_ctx->beam_in) {
if (conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
receive:
status = h2_beam_receive(conn_ctx->beam_in, f->c, fctx->bb, APR_NONBLOCK_READ,
conn_ctx->mplx->stream_max_mem);
if (APR_STATUS_IS_EAGAIN(status) && APR_BLOCK_READ == block) {
status = h2_util_wait_on_pipe(conn_ctx->pipe_in_prod[H2_PIPE_OUT]);
if (APR_SUCCESS == status) {
goto receive;
status = h2_beam_receive(conn_ctx->beam_in, f->c, fctx->bb, APR_NONBLOCK_READ,
conn_ctx->mplx->stream_max_mem);
if (APR_STATUS_IS_EAGAIN(status) && APR_BLOCK_READ == block) {
status = h2_util_wait_on_pipe(conn_ctx->pipe_in_prod[H2_PIPE_OUT]);
if (APR_SUCCESS == status) {
goto receive;
}
}
}
else {
status = h2_beam_receive(conn_ctx->beam_in, f->c, fctx->bb, block,
conn_ctx->mplx->stream_max_mem);
}
}
else {
status = APR_EOF;
Expand Down Expand Up @@ -569,13 +575,8 @@ static apr_status_t c2_process(h2_conn_ctx_t *conn_ctx, conn_rec *c)
conn_ctx->server = r->server;

/* the request_rec->server carries the timeout value that applies */
h2_beam_timeout_set(conn_ctx->beam_out, r->server->timeout);
apr_file_pipe_timeout_set(conn_ctx->pipe_out_prod[H2_PIPE_OUT], r->server->timeout);
h2_conn_ctx_set_timeout(conn_ctx, r->server->timeout);

if (conn_ctx->beam_in) {
h2_beam_timeout_set(conn_ctx->beam_in, r->server->timeout);
apr_file_pipe_timeout_set(conn_ctx->pipe_in_prod[H2_PIPE_OUT], r->server->timeout);
}
if (h2_config_sgeti(conn_ctx->server, H2_CONF_COPY_FILES)) {
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c,
"h2_mplx(%s-%d): copy_files in output",
Expand Down
120 changes: 19 additions & 101 deletions mod_http2/h2_conn_ctx.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,42 +65,11 @@ h2_conn_ctx_t *h2_conn_ctx_create_for_c1(conn_rec *c1, server_rec *s, const char
return ctx;
}

static void input_write_notify(void *ctx, h2_bucket_beam *beam)
{
h2_conn_ctx_t *conn_ctx = ctx;

(void)beam;
if (conn_ctx->pipe_in_prod[H2_PIPE_IN]) {
apr_file_putc(1, conn_ctx->pipe_in_prod[H2_PIPE_IN]);
}
}

static void input_read_notify(void *ctx, h2_bucket_beam *beam)
{
h2_conn_ctx_t *conn_ctx = ctx;

(void)beam;
if (conn_ctx->pipe_in_drain[H2_PIPE_IN]) {
apr_file_putc(1, conn_ctx->pipe_in_drain[H2_PIPE_IN]);
}
}

static void output_notify(void *ctx, h2_bucket_beam *beam)
{
h2_conn_ctx_t *conn_ctx = ctx;

(void)beam;
if (conn_ctx && conn_ctx->pipe_out_prod[H2_PIPE_IN]) {
apr_file_putc(1, conn_ctx->pipe_out_prod[H2_PIPE_IN]);
}
}

apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2,
struct h2_mplx *mplx, struct h2_stream *stream)
{
h2_conn_ctx_t *conn_ctx;
apr_status_t rv = APR_SUCCESS;
const char *action = "init";

ap_assert(c2->master);
conn_ctx = h2_conn_ctx_get(c2);
Expand All @@ -124,75 +93,7 @@ apr_status_t h2_conn_ctx_init_for_c2(h2_conn_ctx_t **pctx, conn_rec *c2,
conn_ctx->done = 0;
conn_ctx->done_at = 0;

if (!conn_ctx->mplx_pool) {
apr_pool_create(&conn_ctx->mplx_pool, mplx->pool);
apr_pool_tag(conn_ctx->mplx_pool, "H2_MPLX_C2");
}

if (!conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
action = "create output pipe";
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_out_prod[H2_PIPE_OUT],
&conn_ctx->pipe_out_prod[H2_PIPE_IN],
APR_FULL_NONBLOCK,
conn_ctx->mplx_pool, c2->pool);
if (APR_SUCCESS != rv) goto cleanup;
}
conn_ctx->pfd_out_prod.desc_type = APR_POLL_FILE;
conn_ctx->pfd_out_prod.desc.f = conn_ctx->pipe_out_prod[H2_PIPE_OUT];
conn_ctx->pfd_out_prod.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
conn_ctx->pfd_out_prod.client_data = conn_ctx;

if (!conn_ctx->beam_out) {
action = "create output beam";
rv = h2_beam_create(&conn_ctx->beam_out, c2, conn_ctx->req_pool,
stream->id, "output", 0, c2->base_server->timeout);
if (APR_SUCCESS != rv) goto cleanup;

h2_beam_buffer_size_set(conn_ctx->beam_out, mplx->stream_max_mem);
h2_beam_on_was_empty(conn_ctx->beam_out, output_notify, conn_ctx);
}

ap_assert(conn_ctx->beam_in == NULL);
if (stream->input) {
if (!conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
action = "create input write pipe";
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_prod[H2_PIPE_OUT],
&conn_ctx->pipe_in_prod[H2_PIPE_IN],
APR_READ_BLOCK,
c2->pool, conn_ctx->mplx_pool);
if (APR_SUCCESS != rv) goto cleanup;
}

if (!conn_ctx->pipe_in_drain[H2_PIPE_OUT]) {
action = "create input read pipe";
rv = apr_file_pipe_create_pools(&conn_ctx->pipe_in_drain[H2_PIPE_OUT],
&conn_ctx->pipe_in_drain[H2_PIPE_IN],
APR_FULL_NONBLOCK,
c2->pool, conn_ctx->mplx_pool);
if (APR_SUCCESS != rv) goto cleanup;
}
conn_ctx->pfd_in_drain.desc_type = APR_POLL_FILE;
conn_ctx->pfd_in_drain.desc.f = conn_ctx->pipe_in_drain[H2_PIPE_OUT];
conn_ctx->pfd_in_drain.reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP;
conn_ctx->pfd_in_drain.client_data = conn_ctx;

h2_beam_on_was_empty(stream->input, input_write_notify, conn_ctx);
h2_beam_on_received(stream->input, input_read_notify, conn_ctx);
conn_ctx->beam_in = stream->input;
}
else {
memset(&conn_ctx->pfd_in_drain, 0, sizeof(conn_ctx->pfd_in_drain));
}

cleanup:
*pctx = (APR_SUCCESS == rv)? conn_ctx : NULL;
stream->output = (APR_SUCCESS == rv)? conn_ctx->beam_out : NULL;
if (APR_SUCCESS != rv) {
ap_log_cerror(APLOG_MARK, APLOG_ERR, rv, c2,
H2_STRM_LOG(APLOGNO(), stream,
"error %s"), action);
goto cleanup;
}
*pctx = conn_ctx;
return rv;
}

Expand Down Expand Up @@ -226,4 +127,21 @@ void h2_conn_ctx_destroy(conn_rec *c)
}
ap_set_module_config(c->conn_config, &http2_module, NULL);
}
}
}

void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout)
{
if (conn_ctx->beam_out) {
h2_beam_timeout_set(conn_ctx->beam_out, timeout);
}
if (conn_ctx->pipe_out_prod[H2_PIPE_OUT]) {
apr_file_pipe_timeout_set(conn_ctx->pipe_out_prod[H2_PIPE_OUT], timeout);
}

if (conn_ctx->beam_in) {
h2_beam_timeout_set(conn_ctx->beam_in, timeout);
}
if (conn_ctx->pipe_in_prod[H2_PIPE_OUT]) {
apr_file_pipe_timeout_set(conn_ctx->pipe_in_prod[H2_PIPE_OUT], timeout);
}
}
2 changes: 2 additions & 0 deletions mod_http2/h2_conn_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ void h2_conn_ctx_detach(conn_rec *c);

void h2_conn_ctx_destroy(conn_rec *c);

void h2_conn_ctx_set_timeout(h2_conn_ctx_t *conn_ctx, apr_interval_time_t timeout);

#endif /* defined(__mod_h2__h2_conn_ctx__) */
Loading

0 comments on commit 8b01a49

Please sign in to comment.