diff --git a/ChangeLog b/ChangeLog index 97af3149..c90625e7 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,21 @@ +v1.5.3 +-------------------------------------------------------------------------------- + * slave connections have conn_rec->aborted flag set when a stream + has been reset by the client. + * Small fixes in bucket beams when forwarding file buckets. Output handling + on master connection uses less FLUSH and passes automatically when more + than half of H2StreamMaxMemSize bytes have accumulated. + Workaround for http: when forwarding partial file buckets to keep the + output filter from closing these too early. + * elimination of fixed master connectin buffer for TLS connections. New + scratch bucket handling optimized for TLS write sizes. + File bucket data read directly into scratch buffers, avoiding one + copy. Non-TLS connections continue to pass buckets unchanged to the core + filters to allow sendfile() usage. + * h2_request.c is no longer shared between these modules. This simplifies + building on platforms such as Windows, as module reference used in + logging is now clear. + v1.5.2 -------------------------------------------------------------------------------- * fixed connection shutdown deadlock on linux when client closed early diff --git a/configure.ac b/configure.ac index 1099b31f..e8a3f811 100644 --- a/configure.ac +++ b/configure.ac @@ -14,7 +14,7 @@ # AC_PREREQ([2.69]) -AC_INIT([mod_http2], [1.5.2], [stefan.eissing@greenbytes.de]) +AC_INIT([mod_http2], [1.5.3], [stefan.eissing@greenbytes.de]) LT_PREREQ([2.2.6]) LT_INIT() diff --git a/mod_http2/h2_bucket_beam.c b/mod_http2/h2_bucket_beam.c index 6d41fadc..65f9906a 100644 --- a/mod_http2/h2_bucket_beam.c +++ b/mod_http2/h2_bucket_beam.c @@ -204,8 +204,12 @@ apr_size_t h2_util_bl_print(char *buffer, apr_size_t bmax, static apr_status_t enter_yellow(h2_bucket_beam *beam, h2_beam_lock *pbl) { - if (beam->m_enter) { - return beam->m_enter(beam->m_ctx, pbl); + h2_beam_mutex_enter *enter = beam->m_enter; + if (enter) { + void *ctx = beam->m_ctx; + if (ctx) { + return enter(ctx, pbl); + } } pbl->mutex = NULL; pbl->leave = NULL; @@ -335,7 +339,7 @@ static void h2_beam_emitted(h2_bucket_beam *beam, h2_beam_proxy *proxy) else { /* it should be there unless we screwed up */ ap_log_perror(APLOG_MARK, APLOG_WARNING, 0, beam->red_pool, - APLOGNO() "h2_beam(%d-%s): emitted bucket not " + APLOGNO(03384) "h2_beam(%d-%s): emitted bucket not " "in hold, n=%d", beam->id, beam->tag, (int)proxy->n); AP_DEBUG_ASSERT(!proxy->bred); @@ -535,6 +539,9 @@ apr_status_t h2_beam_shutdown(h2_bucket_beam *beam, apr_read_type_e block) status = APR_EAGAIN; break; } + if (beam->m_cond) { + apr_thread_cond_broadcast(beam->m_cond); + } status = wait_cond(beam, bl.mutex); } leave_yellow(beam, &bl); @@ -716,6 +723,9 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, if (enter_yellow(beam, &bl) == APR_SUCCESS) { transfer: if (beam->aborted) { + if (!APR_BRIGADE_EMPTY(beam->green)) { + apr_brigade_cleanup(beam->green); + } status = APR_ECONNABORTED; goto leave; } @@ -781,6 +791,10 @@ apr_status_t h2_beam_receive(h2_bucket_beam *beam, #endif remain -= bred->length; ++transferred; + APR_BUCKET_REMOVE(bred); + H2_BLIST_INSERT_TAIL(&beam->hold, bred); + ++transferred; + continue; } else { /* create a "green" standin bucket. we took care about the diff --git a/mod_http2/h2_conn_io.c b/mod_http2/h2_conn_io.c index badee774..fb679ad3 100644 --- a/mod_http2/h2_conn_io.c +++ b/mod_http2/h2_conn_io.c @@ -45,7 +45,6 @@ * which seems to create less TCP packets overall */ #define WRITE_SIZE_MAX (TLS_DATA_MAX - 100) -#define WRITE_BUFFER_SIZE (5*WRITE_SIZE_MAX) static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, @@ -127,22 +126,13 @@ static void h2_conn_io_bb_log(conn_rec *c, int stream_id, int level, } apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, - const h2_config *cfg, - apr_pool_t *pool) + const h2_config *cfg) { io->c = c; - io->output = apr_brigade_create(pool, c->bucket_alloc); - io->buflen = 0; + io->output = apr_brigade_create(c->pool, c->bucket_alloc); io->is_tls = h2_h2_is_tls(c); io->buffer_output = io->is_tls; - - if (io->buffer_output) { - io->bufsize = WRITE_BUFFER_SIZE; - io->buffer = apr_pcalloc(pool, io->bufsize); - } - else { - io->bufsize = 0; - } + io->pass_threshold = h2_config_geti64(cfg, H2_CONF_STREAM_MAX_MEM) / 2; if (io->is_tls) { /* This is what we start with, @@ -151,12 +141,13 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, io->warmup_size = h2_config_geti64(cfg, H2_CONF_TLS_WARMUP_SIZE); io->cooldown_usecs = (h2_config_geti(cfg, H2_CONF_TLS_COOLDOWN_SECS) * APR_USEC_PER_SEC); - io->write_size = WRITE_SIZE_INITIAL; + io->write_size = (io->cooldown_usecs > 0? + WRITE_SIZE_INITIAL : WRITE_SIZE_MAX); } else { io->warmup_size = 0; io->cooldown_usecs = 0; - io->write_size = io->bufsize; + io->write_size = 0; } if (APLOGctrace1(c)) { @@ -170,54 +161,94 @@ apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, return APR_SUCCESS; } -int h2_conn_io_is_buffered(h2_conn_io *io) +#define LOG_SCRATCH 0 + +static void append_scratch(h2_conn_io *io) { - return io->bufsize > 0; + if (io->scratch && io->slen > 0) { + apr_bucket *b = apr_bucket_heap_create(io->scratch, io->slen, + apr_bucket_free, + io->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(io->output, b); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): append_scratch(%ld)", + io->c->id, (long)io->slen); +#endif + io->scratch = NULL; + io->slen = io->ssize = 0; + } } -typedef struct { - conn_rec *c; - h2_conn_io *io; -} pass_out_ctx; - -static apr_status_t pass_out(apr_bucket_brigade *bb, void *ctx) +static apr_size_t assure_scratch_space(h2_conn_io *io) { + apr_size_t remain = io->ssize - io->slen; + if (io->scratch && remain == 0) { + append_scratch(io); + } + if (!io->scratch) { + /* we control the size and it is larger than what buckets usually + * allocate. */ + io->scratch = apr_bucket_alloc(io->write_size, io->c->bucket_alloc); + io->ssize = io->write_size; + io->slen = 0; + remain = io->ssize; + } + return remain; +} + +static apr_status_t read_to_scratch(h2_conn_io *io, apr_bucket *b) { - pass_out_ctx *pctx = ctx; - conn_rec *c = pctx->c; apr_status_t status; - apr_off_t bblen; + const char *data; + apr_size_t len; - if (APR_BRIGADE_EMPTY(bb)) { + if (!b->length) { return APR_SUCCESS; } - ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_WRITE, c); - apr_brigade_length(bb, 0, &bblen); - h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", bb); - status = ap_pass_brigade(c->output_filters, bb); - if (status == APR_SUCCESS && pctx->io) { - pctx->io->bytes_written += (apr_size_t)bblen; - pctx->io->last_write = apr_time_now(); + AP_DEBUG_ASSERT(b->length <= (io->ssize - io->slen)); + if (APR_BUCKET_IS_FILE(b)) { + apr_bucket_file *f = (apr_bucket_file *)b->data; + apr_file_t *fd = f->fd; + apr_off_t offset = b->start; + apr_size_t len = b->length; + + /* file buckets will either mmap (which we do not want) or + * read 8000 byte chunks and split themself. However, we do + * know *exactly* how many bytes we need where. + */ + status = apr_file_seek(fd, APR_SET, &offset); + if (status != APR_SUCCESS) { + return status; + } + status = apr_file_read(fd, io->scratch + io->slen, &len); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, status, io->c, + "h2_conn_io(%ld): FILE_to_scratch(%ld)", + io->c->id, (long)len); +#endif + if (status != APR_SUCCESS && status != APR_EOF) { + return status; + } + io->slen += len; } - if (status != APR_SUCCESS) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044) - "h2_conn_io(%ld): pass_out brigade %ld bytes", - c->id, (long)bblen); + else { + status = apr_bucket_read(b, &data, &len, APR_BLOCK_READ); + if (status == APR_SUCCESS) { +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): read_to_scratch(%ld)", + io->c->id, (long)b->length); +#endif + memcpy(io->scratch+io->slen, data, len); + io->slen += len; + } } - apr_brigade_cleanup(bb); return status; } -/* Bring the current buffer content into the output brigade, appropriately - * chunked. - */ -static apr_status_t bucketeer_buffer(h2_conn_io *io) +static void check_write_size(h2_conn_io *io) { - const char *data = io->buffer; - apr_size_t remaining = io->buflen; - apr_bucket *b; - int bcount, i; - if (io->write_size > WRITE_SIZE_INITIAL && (io->cooldown_usecs > 0) && (apr_time_now() - io->last_write) >= io->cooldown_usecs) { @@ -236,134 +267,156 @@ static apr_status_t bucketeer_buffer(h2_conn_io *io) "h2_conn_io(%ld): threshold reached, write size now %ld", (long)io->c->id, (long)io->write_size); } - - bcount = (int)(remaining / io->write_size); - for (i = 0; i < bcount; ++i) { - b = apr_bucket_transient_create(data, io->write_size, - io->output->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - data += io->write_size; - remaining -= io->write_size; - } - - if (remaining > 0) { - b = apr_bucket_transient_create(data, remaining, - io->output->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - } - return APR_SUCCESS; } -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush) +static apr_status_t pass_output(h2_conn_io *io, int flush, int eoc) { - APR_BRIGADE_INSERT_TAIL(io->output, b); + conn_rec *c = io->c; + apr_bucket *b; + apr_off_t bblen; + apr_status_t status; + + append_scratch(io); if (flush) { - b = apr_bucket_flush_create(io->c->bucket_alloc); + b = apr_bucket_flush_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(io->output, b); } - return APR_SUCCESS; -} - -static apr_status_t h2_conn_io_flush_int(h2_conn_io *io, int flush, int eoc) -{ - pass_out_ctx ctx; - apr_bucket *b; - if (io->buflen == 0 && APR_BRIGADE_EMPTY(io->output)) { + if (APR_BRIGADE_EMPTY(io->output)) { return APR_SUCCESS; } - - if (io->buflen > 0) { - /* something in the buffer, put it in the output brigade */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, - "h2_conn_io: flush, flushing %ld bytes", - (long)io->buflen); - bucketeer_buffer(io); - } - - if (flush) { - b = apr_bucket_flush_create(io->c->bucket_alloc); - APR_BRIGADE_INSERT_TAIL(io->output, b); - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, "h2_conn_io: flush"); - io->buflen = 0; - ctx.c = io->c; - ctx.io = eoc? NULL : io; + ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, c, "h2_conn_io: pass_output"); + ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, NULL); + apr_brigade_length(io->output, 0, &bblen); - return pass_out(io->output, &ctx); - /* no more access after this, as we might have flushed an EOC bucket + h2_conn_io_bb_log(c, 0, APLOG_TRACE2, "master conn pass", io->output); + status = ap_pass_brigade(c->output_filters, io->output); + + /* careful with access after this, as we might have flushed an EOC bucket * that de-allocated us all. */ + if (!eoc) { + apr_brigade_cleanup(io->output); + if (status == APR_SUCCESS) { + io->bytes_written += (apr_size_t)bblen; + io->last_write = apr_time_now(); + } + } + + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, APLOGNO(03044) + "h2_conn_io(%ld): pass_out brigade %ld bytes", + c->id, (long)bblen); + } + return status; } apr_status_t h2_conn_io_flush(h2_conn_io *io) { - return h2_conn_io_flush_int(io, 1, 0); -} - -apr_status_t h2_conn_io_consider_pass(h2_conn_io *io) -{ - apr_off_t len = 0; - - if (!APR_BRIGADE_EMPTY(io->output)) { - len = h2_brigade_mem_size(io->output); - } - len += io->buflen; - if (len >= WRITE_BUFFER_SIZE) { - return h2_conn_io_flush_int(io, 1, 0); - } - return APR_SUCCESS; + return pass_output(io, 1, 0); } apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, h2_session *session) { apr_bucket *b = h2_bucket_eoc_create(io->c->bucket_alloc, session); APR_BRIGADE_INSERT_TAIL(io->output, b); - return h2_conn_io_flush_int(io, 1, 1); + return pass_output(io, 1, 1); } -apr_status_t h2_conn_io_write(h2_conn_io *io, - const char *buf, size_t length) +apr_status_t h2_conn_io_write(h2_conn_io *io, const char *data, size_t length) { apr_status_t status = APR_SUCCESS; - pass_out_ctx ctx; + apr_size_t remain; - ctx.c = io->c; - ctx.io = io; - if (io->bufsize > 0) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, 0, io->c, - "h2_conn_io: buffering %ld bytes", (long)length); - - if (!APR_BRIGADE_EMPTY(io->output)) { - status = h2_conn_io_flush_int(io, 0, 0); - } - - while (length > 0 && (status == APR_SUCCESS)) { - apr_size_t avail = io->bufsize - io->buflen; - if (avail <= 0) { - status = h2_conn_io_flush_int(io, 0, 0); - } - else if (length > avail) { - memcpy(io->buffer + io->buflen, buf, avail); - io->buflen += avail; - length -= avail; - buf += avail; + if (io->buffer_output) { + while (length > 0) { + remain = assure_scratch_space(io); + if (remain >= length) { +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): write_to_scratch(%ld)", + io->c->id, (long)length); +#endif + memcpy(io->scratch + io->slen, data, length); + io->slen += length; + length = 0; } else { - memcpy(io->buffer + io->buflen, buf, length); - io->buflen += length; - length = 0; - break; +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): write_to_scratch(%ld)", + io->c->id, (long)remain); +#endif + memcpy(io->scratch + io->slen, data, remain); + io->slen += remain; + data += remain; + length -= remain; } } - } else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE4, status, io->c, - "h2_conn_io: writing %ld bytes to brigade", (long)length); - status = apr_brigade_write(io->output, pass_out, &ctx, buf, length); + status = apr_brigade_write(io->output, NULL, NULL, data, length); + } + return status; +} + +apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb) +{ + apr_bucket *b; + apr_status_t status = APR_SUCCESS; + + check_write_size(io); + while (!APR_BRIGADE_EMPTY(bb) && status == APR_SUCCESS) { + b = APR_BRIGADE_FIRST(bb); + + if (APR_BUCKET_IS_METADATA(b)) { + /* need to finish any open scratch bucket, as meta data + * needs to be forward "in order". */ + append_scratch(io); + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } + else if (io->buffer_output) { + apr_size_t remain = assure_scratch_space(io); + if (b->length > remain) { + apr_bucket_split(b, remain); + if (io->slen == 0) { + /* complete write_size bucket, append unchanged */ + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); +#if LOG_SCRATCH + ap_log_cerror(APLOG_MARK, APLOG_INFO, 0, io->c, + "h2_conn_io(%ld): pass bucket(%ld)", + io->c->id, (long)b->length); +#endif + continue; + } + } + else { + /* bucket fits in remain, copy to scratch */ + read_to_scratch(io, b); + apr_bucket_delete(b); + continue; + } + } + else { + /* no buffering, forward buckets setaside on flush */ + if (APR_BUCKET_IS_TRANSIENT(b)) { + apr_bucket_setaside(b, io->c->pool); + } + APR_BUCKET_REMOVE(b); + APR_BRIGADE_INSERT_TAIL(io->output, b); + } } + if (status == APR_SUCCESS) { + if (!APR_BRIGADE_EMPTY(io->output)) { + apr_off_t len = h2_brigade_mem_size(io->output); + if (len >= io->pass_threshold) { + return pass_output(io, 0, 0); + } + } + } return status; } diff --git a/mod_http2/h2_conn_io.h b/mod_http2/h2_conn_io.h index c397e9f6..4ccf0070 100644 --- a/mod_http2/h2_conn_io.h +++ b/mod_http2/h2_conn_io.h @@ -39,16 +39,15 @@ typedef struct { apr_int64_t bytes_written; int buffer_output; - char *buffer; - apr_size_t buflen; - apr_size_t bufsize; + apr_size_t pass_threshold; + + char *scratch; + apr_size_t ssize; + apr_size_t slen; } h2_conn_io; apr_status_t h2_conn_io_init(h2_conn_io *io, conn_rec *c, - const struct h2_config *cfg, - apr_pool_t *pool); - -int h2_conn_io_is_buffered(h2_conn_io *io); + const struct h2_config *cfg); /** * Append data to the buffered output. @@ -59,12 +58,7 @@ apr_status_t h2_conn_io_write(h2_conn_io *io, const char *buf, size_t length); -/** - * Append a bucket to the buffered output. - * @param io the connection io - * @param b the bucket to append - */ -apr_status_t h2_conn_io_writeb(h2_conn_io *io, apr_bucket *b, int flush); +apr_status_t h2_conn_io_pass(h2_conn_io *io, apr_bucket_brigade *bb); /** * Append an End-Of-Connection bucket to the output that, once destroyed, @@ -79,11 +73,4 @@ apr_status_t h2_conn_io_write_eoc(h2_conn_io *io, struct h2_session *session); */ apr_status_t h2_conn_io_flush(h2_conn_io *io); -/** - * Check the amount of buffered output and pass it on if enough has accumulated. - * @param io the connection io - * @param flush if a flush bucket should be appended to any output - */ -apr_status_t h2_conn_io_consider_pass(h2_conn_io *io); - #endif /* defined(__mod_h2__h2_conn_io__) */ diff --git a/mod_http2/h2_ctx.c b/mod_http2/h2_ctx.c index 8b786b94..4b596a3d 100644 --- a/mod_http2/h2_ctx.c +++ b/mod_http2/h2_ctx.c @@ -23,7 +23,6 @@ #include "h2_session.h" #include "h2_task.h" #include "h2_ctx.h" -#include "h2_private.h" static h2_ctx *h2_ctx_create(const conn_rec *c) { diff --git a/mod_http2/h2_mplx.c b/mod_http2/h2_mplx.c index bbc8d703..3ae02f4f 100644 --- a/mod_http2/h2_mplx.c +++ b/mod_http2/h2_mplx.c @@ -90,6 +90,7 @@ static apr_status_t enter_mutex(h2_mplx *m, int *pacquired) * This allow recursive entering of the mutex from the saem thread, * which is what we need in certain situations involving callbacks */ + AP_DEBUG_ASSERT(m); apr_threadkey_private_get(&mutex, thread_lock); if (mutex == m->lock) { *pacquired = 0; @@ -167,6 +168,7 @@ static int can_beam_file(void *ctx, h2_bucket_beam *beam, apr_file_t *file) } static void have_out_data_for(h2_mplx *m, int stream_id); +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master); static void check_tx_reservation(h2_mplx *m) { @@ -189,6 +191,29 @@ static void check_tx_free(h2_mplx *m) } } +static int purge_stream(void *ctx, void *val) +{ + h2_mplx *m = ctx; + h2_stream *stream = val; + h2_task *task = h2_ihash_get(m->tasks, stream->id); + h2_ihash_remove(m->spurge, stream->id); + h2_stream_destroy(stream); + if (task) { + task_destroy(m, task, 1); + } + return 0; +} + +static void purge_streams(h2_mplx *m) +{ + if (!h2_ihash_empty(m->spurge)) { + while(!h2_ihash_iter(m->spurge, purge_stream, m)) { + /* repeat until empty */ + } + h2_ihash_clear(m->spurge); + } +} + static void h2_mplx_destroy(h2_mplx *m) { AP_DEBUG_ASSERT(m); @@ -257,6 +282,8 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent, m->stream_max_mem = h2_config_geti(conf, H2_CONF_STREAM_MAX_MEM); m->streams = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->shold = h2_ihash_create(m->pool, offsetof(h2_stream,id)); + m->spurge = h2_ihash_create(m->pool, offsetof(h2_stream,id)); m->q = h2_iq_create(m->pool, m->max_streams); m->tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); m->ready_tasks = h2_ihash_create(m->pool, offsetof(h2_task,stream_id)); @@ -294,10 +321,10 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m) return max_stream_started; } -static void input_consumed_signal(h2_mplx *m, h2_task *task) +static void input_consumed_signal(h2_mplx *m, h2_stream *stream) { - if (task->input.beam && task->worker_started) { - h2_beam_send(task->input.beam, NULL, 0); /* trigger updates */ + if (stream->input) { + h2_beam_send(stream->input, NULL, 0); /* trigger updates */ } } @@ -310,31 +337,32 @@ static int output_consumed_signal(h2_mplx *m, h2_task *task) } -static void task_destroy(h2_mplx *m, h2_task *task, int events) +static void task_destroy(h2_mplx *m, h2_task *task, int called_from_master) { conn_rec *slave = NULL; int reuse_slave = 0; apr_status_t status; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_task(%s): destroy", task->id); /* cleanup any buffered input */ status = h2_task_shutdown(task, 0); if (status != APR_SUCCESS){ - ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO() + ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, m->c, APLOGNO(03385) "h2_task(%s): shutdown", task->id); } - if (events) { + if (called_from_master) { /* Process outstanding events before destruction */ - input_consumed_signal(m, task); + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + if (stream) { + input_consumed_signal(m, stream); + } } /* The pool is cleared/destroyed which also closes all * allocated file handles. Give this count back to our * file handle pool. */ - if (task->input.beam) { - m->tx_handles_reserved += - h2_beam_get_files_beamed(task->input.beam); - } if (task->output.beam) { m->tx_handles_reserved += h2_beam_get_files_beamed(task->output.beam); @@ -368,47 +396,68 @@ static void stream_done(h2_mplx *m, h2_stream *stream, int rst_error) { h2_task *task; + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, m->c, + "h2_stream(%ld-%d): done", m->c->id, stream->id); + /* Situation: we are, on the master connection, done with processing + * the stream. Either we have handled it successfully, or the stream + * was reset by the client or the connection is gone and we are + * shutting down the whole session. + * + * We possibly have created a task for this stream to be processed + * on a slave connection. The processing might actually be ongoing + * right now or has already finished. A finished task waits for its + * stream to be done. This is the common case. + * + * If the stream had input (e.g. the request had a body), a task + * may have read, or is still reading buckets from the input beam. + * This means that the task is referencing memory from the stream's + * pool (or the master connection bucket alloc). Before we can free + * the stream pool, we need to make sure that those references are + * gone. This is what h2_beam_shutdown() on the input waits for. + * + * With the input handled, we can tear down that beam and care + * about the output beam. The stream might still have buffered some + * buckets read from the output, so we need to get rid of those. That + * is done by h2_stream_cleanup(). + * + * Now it is save to destroy the task (if it exists and is finished). + * + * FIXME: we currently destroy the stream, even if the task is still + * ongoing. This is not ok, since task->request is coming from stream + * memory. We should either copy it on task creation or wait with the + * stream destruction until the task is done. + */ + h2_iq_remove(m->q, stream->id); + h2_ihash_remove(m->ready_tasks, stream->id); h2_ihash_remove(m->streams, stream->id); if (stream->input) { - apr_status_t status; - status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ); - if (status == APR_EAGAIN) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_stream(%ld-%d): wait on input shutdown", - m->id, stream->id); - status = h2_beam_shutdown(stream->input, APR_BLOCK_READ); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, m->c, - "h2_stream(%ld-%d): input shutdown returned", - m->id, stream->id); - } + m->tx_handles_reserved += h2_beam_get_files_beamed(stream->input); } + h2_stream_cleanup(stream); task = h2_ihash_get(m->tasks, stream->id); if (task) { - /* Remove task from ready set, we will never submit it */ - h2_ihash_remove(m->ready_tasks, stream->id); - - if (task->worker_done) { - /* already finished or not even started yet */ - h2_iq_remove(m->q, task->stream_id); - task_destroy(m, task, 0); - } - else { + if (!task->worker_done) { /* task still running, cleanup once it is done */ - task->orphaned = 1; - task->input.beam = NULL; if (rst_error) { h2_task_rst(task, rst_error); } + /* FIXME: this should work, but does not + h2_ihash_add(m->shold, stream); + return;*/ + task->input.beam = NULL; + } + else { + /* already finished */ + task_destroy(m, task, 0); } } + h2_stream_destroy(stream); } static int stream_done_iter(void *ctx, void *val) { - h2_stream *stream = val; stream_done((h2_mplx*)ctx, val, 0); - h2_stream_destroy(stream); return 0; } @@ -416,6 +465,7 @@ static int task_print(void *ctx, void *val) { h2_mplx *m = ctx; h2_task *task = val; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); if (task->request) { ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, /* NO APLOGNO */ "->03198: h2_stream(%s): %s %s %s -> %s %d" @@ -424,7 +474,7 @@ static int task_print(void *ctx, void *val) task->request->authority, task->request->path, task->response? "http" : (task->rst_error? "reset" : "?"), task->response? task->response->http_status : task->rst_error, - task->orphaned, task->worker_started, + (stream? 0 : 1), task->worker_started, task->worker_done); } else if (task) { @@ -451,6 +501,17 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) /* disable WINDOW_UPDATE callbacks */ h2_mplx_set_consumed_cb(m, NULL, NULL); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): start release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + h2_iq_clear(m->q); apr_thread_cond_broadcast(m->task_thawed); while (!h2_ihash_iter(m->streams, stream_done_iter, m)) { @@ -458,19 +519,25 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } AP_DEBUG_ASSERT(h2_ihash_empty(m->streams)); + if (!h2_ihash_empty(m->shold)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams in hold", + m->id, (int)h2_ihash_count(m->shold)); + } + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): 2. release_join with %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + } + /* If we still have busy workers, we cannot release our memory - * pool yet, as slave connections have child pools of their respective - * h2_io's. - * Any remaining ios are processed in these workers. Any operation - * they do on their input/outputs will be errored ECONNRESET/ABORTED, - * so processing them should fail and workers *should* return. + * pool yet, as tasks have references to us. + * Any operation on the task slave connection will from now on + * be errored ECONNRESET/ABORTED, so processing them should fail + * and workers *should* return in a timely fashion. */ for (i = 0; m->workers_busy > 0; ++i) { m->join_wait = wait; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): release_join, waiting on %d tasks to report back", - m->id, (int)h2_ihash_count(m->tasks)); - status = apr_thread_cond_timedwait(wait, m->lock, apr_time_from_sec(wait_secs)); if (APR_STATUS_IS_TIMEUP(status)) { @@ -494,9 +561,22 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) } } - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, m->c, APLOGNO(03056) - "h2_mplx(%ld): release_join (%d tasks left) -> destroy", - m->id, (int)h2_ihash_count(m->tasks)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->shold)); + if (!h2_ihash_empty(m->spurge)) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld): release_join %d streams to purge", + m->id, (int)h2_ihash_count(m->spurge)); + purge_streams(m); + } + AP_DEBUG_ASSERT(h2_ihash_empty(m->spurge)); + AP_DEBUG_ASSERT(h2_ihash_empty(m->tasks)); + + if (!h2_ihash_empty(m->tasks)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, APLOGNO(03056) + "h2_mplx(%ld): release_join -> destroy, " + "%d tasks still present", + m->id, (int)h2_ihash_count(m->tasks)); + } leave_mutex(m, acquired); h2_mplx_destroy(m); /* all gone */ @@ -516,24 +596,17 @@ void h2_mplx_abort(h2_mplx *m) } } -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error) +apr_status_t h2_mplx_stream_done(h2_mplx *m, h2_stream *stream) { apr_status_t status = APR_SUCCESS; int acquired; - /* This maybe called from inside callbacks that already hold the lock. - * E.g. when we are streaming out DATA and the EOF triggers the stream - * release. - */ AP_DEBUG_ASSERT(m); if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_stream *stream = h2_ihash_get(m->streams, stream_id); - if (stream) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, - "h2_mplx(%ld-%d): marking stream as done.", - m->id, stream_id); - stream_done(m, stream, rst_error); - } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%ld-%d): marking stream as done.", + m->id, stream->id); + stream_done(m, stream, stream->rst_error); leave_mutex(m, acquired); } return status; @@ -547,8 +620,7 @@ void h2_mplx_set_consumed_cb(h2_mplx *m, h2_mplx_consumed_cb *cb, void *ctx) static int update_window(void *ctx, void *val) { - h2_mplx *m = ctx; - input_consumed_signal(m, val); + input_consumed_signal(ctx, val); return 1; } @@ -562,7 +634,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m) return APR_ECONNABORTED; } if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - h2_ihash_iter(m->tasks, update_window, m); + h2_ihash_iter(m->streams, update_window, m); ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, "h2_session(%ld): windows updated", m->id); @@ -580,7 +652,7 @@ static int task_iter_first(void *ctx, void *val) return 0; } -h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) +h2_stream *h2_mplx_next_submit(h2_mplx *m) { apr_status_t status; h2_stream *stream = NULL; @@ -597,7 +669,7 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) h2_task *task = ctx.task; h2_ihash_remove(m->ready_tasks, task->stream_id); - stream = h2_ihash_get(streams, task->stream_id); + stream = h2_ihash_get(m->streams, task->stream_id); if (stream && task) { task->submitted = 1; if (task->rst_error) { @@ -618,16 +690,14 @@ h2_stream *h2_mplx_next_submit(h2_mplx *m, h2_ihash_t *streams) "h2_mplx(%s): stream for response closed, " "resetting io to close request processing", task->id); - task->orphaned = 1; h2_task_rst(task, H2_ERR_STREAM_CLOSED); if (!task->worker_started || task->worker_done) { task_destroy(m, task, 1); } else { /* hang around until the h2_task is done, but - * shutdown input/output and send out any events asap. */ + * shutdown output */ h2_task_shutdown(task, 0); - input_consumed_signal(m, task); } } } @@ -640,8 +710,9 @@ static apr_status_t out_open(h2_mplx *m, int stream_id, h2_response *response) { apr_status_t status = APR_SUCCESS; h2_task *task = h2_ihash_get(m->tasks, stream_id); + h2_stream *stream = h2_ihash_get(m->streams, stream_id); - if (!task || task->orphaned) { + if (!task || !stream) { return APR_ECONNABORTED; } @@ -691,8 +762,9 @@ apr_status_t h2_mplx_out_open(h2_mplx *m, int stream_id, h2_response *response) static apr_status_t out_close(h2_mplx *m, h2_task *task) { apr_status_t status = APR_SUCCESS; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); - if (!task || task->orphaned) { + if (!task || !stream) { return APR_ECONNABORTED; } @@ -835,6 +907,7 @@ static h2_task *pop_task(h2_mplx *m) } slave->sbh = m->c->sbh; + slave->aborted = 0; task = h2_task_create(slave, stream->request, stream->input, m); h2_ihash_add(m->tasks, task); @@ -888,93 +961,115 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more) static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn) { - if (task) { - if (task->frozen) { - /* this task was handed over to an engine for processing - * and the original worker has finished. That means the - * engine may start processing now. */ - h2_task_thaw(task); - /* we do not want the task to block on writing response - * bodies into the mplx. */ - /* FIXME: this implementation is incomplete. */ - h2_task_set_io_blocking(task, 0); - apr_thread_cond_broadcast(m->task_thawed); - return; - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): task(%s) done", m->id, task->id); - out_close(m, task); - - if (ngn) { - apr_off_t bytes = 0; - if (task->output.beam) { - h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); - bytes += h2_beam_get_buffered(task->output.beam); - } - if (bytes > 0) { - /* we need to report consumed and current buffered output - * to the engine. The request will be streamed out or cancelled, - * no more data is coming from it and the engine should update - * its calculations before we destroy this information. */ - h2_req_engine_out_consumed(ngn, task->c, bytes); - } + if (task->frozen) { + /* this task was handed over to an engine for processing + * and the original worker has finished. That means the + * engine may start processing now. */ + h2_task_thaw(task); + /* we do not want the task to block on writing response + * bodies into the mplx. */ + h2_task_set_io_blocking(task, 0); + apr_thread_cond_broadcast(m->task_thawed); + return; + } + else { + h2_stream *stream; + + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): task(%s) done", m->id, task->id); + out_close(m, task); + stream = h2_ihash_get(m->streams, task->stream_id); + + if (ngn) { + apr_off_t bytes = 0; + if (task->output.beam) { + h2_beam_send(task->output.beam, NULL, APR_NONBLOCK_READ); + bytes += h2_beam_get_buffered(task->output.beam); } - - if (task->engine) { - if (!h2_req_engine_is_shutdown(task->engine)) { - ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, - "h2_mplx(%ld): task(%s) has not-shutdown " - "engine(%s)", m->id, task->id, - h2_req_engine_get_id(task->engine)); - } - h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + if (bytes > 0) { + /* we need to report consumed and current buffered output + * to the engine. The request will be streamed out or cancelled, + * no more data is coming from it and the engine should update + * its calculations before we destroy this information. */ + h2_req_engine_out_consumed(ngn, task->c, bytes); } - - if (!m->aborted && !task->orphaned && m->redo_tasks - && h2_ihash_get(m->redo_tasks, task->stream_id)) { - /* reset and schedule again */ - h2_task_redo(task); - h2_ihash_remove(m->redo_tasks, task->stream_id); - h2_iq_add(m->q, task->stream_id, NULL, NULL); - return; + } + + if (task->engine) { + if (!h2_req_engine_is_shutdown(task->engine)) { + ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, m->c, + "h2_mplx(%ld): task(%s) has not-shutdown " + "engine(%s)", m->id, task->id, + h2_req_engine_get_id(task->engine)); } - - task->worker_done = 1; - task->done_at = apr_time_now(); - if (task->output.beam) { - h2_beam_on_consumed(task->output.beam, NULL, NULL); - h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + h2_ngn_shed_done_ngn(m->ngn_shed, task->engine); + } + + if (!m->aborted && stream && m->redo_tasks + && h2_ihash_get(m->redo_tasks, task->stream_id)) { + /* reset and schedule again */ + h2_task_redo(task); + h2_ihash_remove(m->redo_tasks, task->stream_id); + h2_iq_add(m->q, task->stream_id, NULL, NULL); + return; + } + + task->worker_done = 1; + task->done_at = apr_time_now(); + if (task->output.beam) { + h2_beam_on_consumed(task->output.beam, NULL, NULL); + h2_beam_mutex_set(task->output.beam, NULL, NULL, NULL); + } + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): request done, %f ms elapsed", task->id, + (task->done_at - task->started_at) / 1000.0); + if (task->started_at > m->last_idle_block) { + /* this task finished without causing an 'idle block', e.g. + * a block by flow control. + */ + if (task->done_at- m->last_limit_change >= m->limit_change_interval + && m->workers_limit < m->workers_max) { + /* Well behaving stream, allow it more workers */ + m->workers_limit = H2MIN(m->workers_limit * 2, + m->workers_max); + m->last_limit_change = task->done_at; + m->need_registration = 1; + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, + "h2_mplx(%ld): increase worker limit to %d", + m->id, m->workers_limit); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%s): request done, %f ms" - " elapsed", task->id, - (task->done_at - task->started_at) / 1000.0); - if (task->started_at > m->last_idle_block) { - /* this task finished without causing an 'idle block', e.g. - * a block by flow control. - */ - if (task->done_at- m->last_limit_change >= m->limit_change_interval - && m->workers_limit < m->workers_max) { - /* Well behaving stream, allow it more workers */ - m->workers_limit = H2MIN(m->workers_limit * 2, - m->workers_max); - m->last_limit_change = task->done_at; - m->need_registration = 1; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c, - "h2_mplx(%ld): increase worker limit to %d", - m->id, m->workers_limit); - } + } + + if (stream) { + /* hang around until the stream deregisters */ + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream still open", + task->id); + } + else { + /* stream done, was it placed in hold? */ + stream = h2_ihash_get(m->shold, task->stream_id); + if (stream) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream in hold", + task->id); + stream->response = NULL; /* ref from task memory */ + /* We cannot destroy the stream here since this is + * called from a worker thread and freeing memory pools + * is only safe in the only thread using it (and its + * parent pool / allocator) */ + h2_ihash_remove(m->shold, stream->id); + h2_ihash_add(m->spurge, stream); } - - if (task->orphaned) { + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, m->c, + "h2_mplx(%s): task_done, stream not found", + task->id); task_destroy(m, task, 0); - if (m->join_wait) { - apr_thread_cond_signal(m->join_wait); - } } - else { - /* hang around until the stream deregisters */ + + if (m->join_wait) { + apr_thread_cond_signal(m->join_wait); } } } @@ -1180,11 +1275,13 @@ apr_status_t h2_mplx_req_engine_push(const char *ngn_type, task->r = r; if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) { - if (task->orphaned) { - status = APR_ECONNABORTED; + h2_stream *stream = h2_ihash_get(m->streams, task->stream_id); + + if (stream) { + status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); } else { - status = h2_ngn_shed_push_task(m->ngn_shed, ngn_type, task, einit); + status = APR_ECONNABORTED; } leave_mutex(m, acquired); } diff --git a/mod_http2/h2_mplx.h b/mod_http2/h2_mplx.h index a6fe12a3..9b316b0b 100644 --- a/mod_http2/h2_mplx.h +++ b/mod_http2/h2_mplx.h @@ -73,6 +73,8 @@ struct h2_mplx { unsigned int need_registration : 1; struct h2_ihash_t *streams; /* all streams currently processing */ + struct h2_ihash_t *shold; /* all streams done with task ongoing */ + struct h2_ihash_t *spurge; /* all streams done, ready for destroy */ struct h2_iqueue *q; /* all stream ids that need to be started */ struct h2_ihash_t *tasks; /* all tasks started and not destroyed */ @@ -167,7 +169,7 @@ apr_uint32_t h2_mplx_shutdown(h2_mplx *m); * @param rst_error if != 0, the stream was reset with the error given * */ -apr_status_t h2_mplx_stream_done(h2_mplx *m, int stream_id, int rst_error); +apr_status_t h2_mplx_stream_done(h2_mplx *m, struct h2_stream *stream); /** * Waits on output data from any stream in this session to become available. @@ -235,8 +237,7 @@ apr_status_t h2_mplx_in_update_windows(h2_mplx *m); * @param m the mplxer to get a response from * @param bb the brigade to place any existing repsonse body data into */ -struct h2_stream *h2_mplx_next_submit(h2_mplx *m, - struct h2_ihash_t *streams); +struct h2_stream *h2_mplx_next_submit(h2_mplx *m); /** * Opens the output for the given stream with the specified response. diff --git a/mod_http2/h2_proxy_session.c b/mod_http2/h2_proxy_session.c index 7febb4df..a2a5631f 100644 --- a/mod_http2/h2_proxy_session.c +++ b/mod_http2/h2_proxy_session.c @@ -23,7 +23,6 @@ #include "mod_http2.h" #include "h2.h" -#include "h2_request.h" #include "h2_util.h" #include "h2_proxy_session.h" @@ -553,9 +552,11 @@ static apr_status_t session_start(h2_proxy_session *session) apr_socket_t *s; s = ap_get_conn_socket(session->c); +#if (!defined(WIN32) && !defined(NETWARE)) || defined(DOXYGEN) if (s) { ap_sock_disable_nagle(s); } +#endif settings[0].settings_id = NGHTTP2_SETTINGS_ENABLE_PUSH; settings[0].value = 0; @@ -592,7 +593,7 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, stream->input = apr_brigade_create(stream->pool, session->c->bucket_alloc); stream->output = apr_brigade_create(stream->pool, session->c->bucket_alloc); - stream->req = h2_request_create(1, stream->pool, 0); + stream->req = h2_req_create(1, stream->pool, 0); apr_uri_parse(stream->pool, url, &puri); scheme = (strcmp(puri.scheme, "h2")? "http" : "https"); @@ -603,8 +604,8 @@ static apr_status_t open_stream(h2_proxy_session *session, const char *url, authority = apr_psprintf(stream->pool, "%s:%d", authority, puri.port); } path = apr_uri_unparse(stream->pool, &puri, APR_URI_UNP_OMITSITEPART); - h2_request_make(stream->req, stream->pool, r->method, scheme, - authority, path, r->headers_in); + h2_req_make(stream->req, stream->pool, r->method, scheme, + authority, path, r->headers_in); /* Tuck away all already existing cookies */ stream->saves = apr_table_make(r->pool, 2); diff --git a/mod_http2/h2_push.c b/mod_http2/h2_push.c index 748e32ab..977fab58 100644 --- a/mod_http2/h2_push.c +++ b/mod_http2/h2_push.c @@ -346,9 +346,9 @@ static int add_push(link_ctx *ctx) } headers = apr_table_make(ctx->pool, 5); apr_table_do(set_push_header, headers, ctx->req->headers, NULL); - req = h2_request_createn(0, ctx->pool, method, ctx->req->scheme, - ctx->req->authority, path, headers, - ctx->req->serialize); + req = h2_req_createn(0, ctx->pool, method, ctx->req->scheme, + ctx->req->authority, path, headers, + ctx->req->serialize); /* atm, we do not push on pushes */ h2_request_end_headers(req, ctx->pool, 1, 0); push->req = req; diff --git a/mod_http2/h2_request.c b/mod_http2/h2_request.c index f8c00413..d213e167 100644 --- a/mod_http2/h2_request.c +++ b/mod_http2/h2_request.c @@ -35,31 +35,6 @@ #include "h2_util.h" -h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize) -{ - return h2_request_createn(id, pool, NULL, NULL, NULL, NULL, NULL, - serialize); -} - -h2_request *h2_request_createn(int id, apr_pool_t *pool, - const char *method, const char *scheme, - const char *authority, const char *path, - apr_table_t *header, int serialize) -{ - h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); - - req->id = id; - req->method = method; - req->scheme = scheme; - req->authority = authority; - req->path = path; - req->headers = header? header : apr_table_make(pool, 10); - req->request_time = apr_time_now(); - req->serialize = serialize; - - return req; -} - static apr_status_t inspect_clen(h2_request *req, const char *s) { char *end; @@ -67,111 +42,28 @@ static apr_status_t inspect_clen(h2_request *req, const char *s) return (s == end)? APR_EINVAL : APR_SUCCESS; } -static apr_status_t add_h1_header(h2_request *req, apr_pool_t *pool, - const char *name, size_t nlen, - const char *value, size_t vlen) -{ - char *hname, *hvalue; - - if (h2_req_ignore_header(name, nlen)) { - return APR_SUCCESS; - } - else if (H2_HD_MATCH_LIT("cookie", name, nlen)) { - const char *existing = apr_table_get(req->headers, "cookie"); - if (existing) { - char *nval; - - /* Cookie header come separately in HTTP/2, but need - * to be merged by "; " (instead of default ", ") - */ - hvalue = apr_pstrndup(pool, value, vlen); - nval = apr_psprintf(pool, "%s; %s", existing, hvalue); - apr_table_setn(req->headers, "Cookie", nval); - return APR_SUCCESS; - } - } - else if (H2_HD_MATCH_LIT("host", name, nlen)) { - if (apr_table_get(req->headers, "Host")) { - return APR_SUCCESS; /* ignore duplicate */ - } - } - - hname = apr_pstrndup(pool, name, nlen); - hvalue = apr_pstrndup(pool, value, vlen); - h2_util_camel_case_header(hname, nlen); - apr_table_mergen(req->headers, hname, hvalue); - - return APR_SUCCESS; -} - -typedef struct { - h2_request *req; - apr_pool_t *pool; -} h1_ctx; - -static int set_h1_header(void *ctx, const char *key, const char *value) -{ - h1_ctx *x = ctx; - size_t klen = strlen(key); - if (!h2_req_ignore_header(key, klen)) { - add_h1_header(x->req, x->pool, key, klen, value, strlen(value)); - } - return 1; -} - -static apr_status_t add_all_h1_header(h2_request *req, apr_pool_t *pool, - apr_table_t *header) -{ - h1_ctx x; - x.req = req; - x.pool = pool; - apr_table_do(set_h1_header, &x, header, NULL); - return APR_SUCCESS; -} - - -apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool, - const char *method, const char *scheme, - const char *authority, const char *path, - apr_table_t *headers) -{ - req->method = method; - req->scheme = scheme; - req->authority = authority; - req->path = path; - - AP_DEBUG_ASSERT(req->scheme); - AP_DEBUG_ASSERT(req->authority); - AP_DEBUG_ASSERT(req->path); - AP_DEBUG_ASSERT(req->method); - - return add_all_h1_header(req, pool, headers); -} - -apr_status_t h2_request_rwrite(h2_request *req, request_rec *r) +apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, + request_rec *r) { apr_status_t status; const char *scheme, *authority; - scheme = (r->parsed_uri.scheme? r->parsed_uri.scheme + scheme = apr_pstrdup(pool, r->parsed_uri.scheme? r->parsed_uri.scheme : ap_http_scheme(r)); - authority = r->hostname; + authority = apr_pstrdup(pool, r->hostname); if (!ap_strchr_c(authority, ':') && r->server && r->server->port) { apr_port_t defport = apr_uri_port_of_scheme(scheme); if (defport != r->server->port) { /* port info missing and port is not default for scheme: append */ - authority = apr_psprintf(r->pool, "%s:%d", authority, + authority = apr_psprintf(pool, "%s:%d", authority, (int)r->server->port); } } - status = h2_request_make(req, r->pool, r->method, scheme, authority, - apr_uri_unparse(r->pool, &r->parsed_uri, - APR_URI_UNP_OMITSITEPART), - r->headers_in); - ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) - "h2_request(%d): rwrite %s host=%s://%s%s", - req->id, req->method, req->scheme, req->authority, req->path); + status = h2_req_make(req, pool, apr_pstrdup(pool, r->method), scheme, + authority, apr_uri_unparse(pool, &r->parsed_uri, + APR_URI_UNP_OMITSITEPART), + r->headers_in); return status; } @@ -223,7 +115,7 @@ apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, } else { /* non-pseudo header, append to work bucket of stream */ - status = add_h1_header(req, pool, name, nlen, value, vlen); + status = h2_headers_add_h1(req->headers, pool, name, nlen, value, vlen); } return status; diff --git a/mod_http2/h2_request.h b/mod_http2/h2_request.h index 4288dfec..ba48f4a1 100644 --- a/mod_http2/h2_request.h +++ b/mod_http2/h2_request.h @@ -18,19 +18,8 @@ #include "h2.h" -h2_request *h2_request_create(int id, apr_pool_t *pool, int serialize); - -h2_request *h2_request_createn(int id, apr_pool_t *pool, - const char *method, const char *scheme, - const char *authority, const char *path, - apr_table_t *headers, int serialize); - -apr_status_t h2_request_make(h2_request *req, apr_pool_t *pool, - const char *method, const char *scheme, - const char *authority, const char *path, - apr_table_t *headers); - -apr_status_t h2_request_rwrite(h2_request *req, request_rec *r); +apr_status_t h2_request_rwrite(h2_request *req, apr_pool_t *pool, + request_rec *r); apr_status_t h2_request_add_header(h2_request *req, apr_pool_t *pool, const char *name, size_t nlen, diff --git a/mod_http2/h2_session.c b/mod_http2/h2_session.c index f96c5096..5ee45acf 100644 --- a/mod_http2/h2_session.c +++ b/mod_http2/h2_session.c @@ -128,19 +128,16 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id, h2_stream * stream; apr_pool_t *stream_pool; - if (session->spare) { - stream_pool = session->spare; - session->spare = NULL; - } - else { - apr_pool_create(&stream_pool, session->pool); - apr_pool_tag(stream_pool, "h2_stream"); - } + apr_pool_create(&stream_pool, session->pool); + apr_pool_tag(stream_pool, "h2_stream"); stream = h2_stream_open(stream_id, stream_pool, session, initiated_on, req); - + ++session->open_streams; + ++session->unanswered_streams; + nghttp2_session_set_stream_user_data(session->ngh2, stream_id, stream); h2_ihash_add(session->streams, stream); + if (H2_STREAM_CLIENT_INITIATED(stream_id)) { if (stream_id > session->remote.emitted_max) { ++session->remote.emitted_count; @@ -262,6 +259,11 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, return 0; } +static h2_stream *get_stream(h2_session *session, int stream_id) +{ + return nghttp2_session_get_stream_user_data(session->ngh2, stream_id); +} + static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int32_t stream_id, const uint8_t *data, size_t len, void *userp) @@ -277,7 +279,7 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, return 0; } - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03064) "h2_stream(%ld-%d): on_data_chunk for unknown stream", @@ -313,6 +315,9 @@ static apr_status_t stream_release(h2_session *session, uint32_t error_code) { conn_rec *c = session->c; + apr_bucket *b; + apr_status_t status; + if (!error_code) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_stream(%ld-%d): handled, closing", @@ -331,8 +336,11 @@ static apr_status_t stream_release(h2_session *session, h2_stream_rst(stream, error_code); } - return h2_conn_io_writeb(&session->io, - h2_bucket_eos_create(c->bucket_alloc, stream), 0); + b = h2_bucket_eos_create(c->bucket_alloc, stream); + APR_BRIGADE_INSERT_TAIL(session->bbtmp, b); + status = h2_conn_io_pass(&session->io, session->bbtmp); + apr_brigade_cleanup(session->bbtmp); + return status; } static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, @@ -342,7 +350,7 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, h2_stream *stream; (void)ngh2; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (stream) { stream_release(session, stream, error_code); } @@ -358,7 +366,7 @@ static int on_begin_headers_cb(nghttp2_session *ngh2, /* We may see HEADERs at the start of a stream or after all DATA * streams to carry trailers. */ (void)ngh2; - s = h2_session_get_stream(session, frame->hd.stream_id); + s = get_stream(session, frame->hd.stream_id); if (s) { /* nop */ } @@ -385,7 +393,7 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, return 0; } - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02920) @@ -432,7 +440,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, /* This can be HEADERS for a new stream, defining the request, * or HEADER may come after DATA at the end of a stream as in * trailers */ - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); @@ -456,7 +464,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, } break; case NGHTTP2_DATA: - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream) { int eos = (frame->hd.flags & NGHTTP2_FLAG_END_STREAM); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, @@ -493,7 +501,7 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, "h2_session(%ld-%d): RST_STREAM by client, errror=%d", session->id, (int)frame->hd.stream_id, (int)frame->rst_stream.error_code); - stream = h2_session_get_stream(session, frame->hd.stream_id); + stream = get_stream(session, frame->hd.stream_id); if (stream && stream->request && stream->request->initiated_on) { ++session->pushes_reset; } @@ -536,13 +544,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, return 0; } -static apr_status_t pass_data(void *ctx, - const char *data, apr_off_t length) -{ - return h2_conn_io_write(&((h2_session*)ctx)->io, data, length); -} - - static char immortal_zeros[H2_MAX_PADLEN]; static int on_send_data_cb(nghttp2_session *ngh2, @@ -567,7 +568,7 @@ static int on_send_data_cb(nghttp2_session *ngh2, } padlen = (unsigned char)frame->data.padlen; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_NOTFOUND, session->c, APLOGNO(02924) @@ -580,52 +581,32 @@ static int on_send_data_cb(nghttp2_session *ngh2, "h2_stream(%ld-%d): send_data_cb for %ld bytes", session->id, (int)stream_id, (long)length); - if (h2_conn_io_is_buffered(&session->io)) { - status = h2_conn_io_write(&session->io, (const char *)framehd, 9); - if (status == APR_SUCCESS) { - if (padlen) { - status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); - } - - if (status == APR_SUCCESS) { - apr_off_t len = length; - status = h2_stream_readx(stream, pass_data, session, &len, &eos); - if (status == APR_SUCCESS && len != length) { - status = APR_EINVAL; - } - } - - if (status == APR_SUCCESS && padlen) { - if (padlen) { - status = h2_conn_io_write(&session->io, immortal_zeros, padlen); - } - } - } + status = h2_conn_io_write(&session->io, (const char *)framehd, 9); + if (padlen && status == APR_SUCCESS) { + status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); } - else { - status = h2_conn_io_write(&session->io, (const char *)framehd, 9); - if (padlen && status == APR_SUCCESS) { - status = h2_conn_io_write(&session->io, (const char *)&padlen, 1); - } - if (status == APR_SUCCESS) { - apr_off_t len = length; - status = h2_stream_read_to(stream, session->io.output, &len, &eos); - if (status == APR_SUCCESS && len != length) { - status = APR_EINVAL; - } - } - - if (status == APR_SUCCESS && padlen) { - b = apr_bucket_immortal_create(immortal_zeros, padlen, - session->c->bucket_alloc); - status = h2_conn_io_writeb(&session->io, b, 0); + + if (status == APR_SUCCESS) { + apr_off_t len = length; + status = h2_stream_read_to(stream, session->bbtmp, &len, &eos); + if (status == APR_SUCCESS && len != length) { + status = APR_EINVAL; } } + if (status == APR_SUCCESS && padlen) { + b = apr_bucket_immortal_create(immortal_zeros, padlen, + session->c->bucket_alloc); + APR_BRIGADE_INSERT_TAIL(session->bbtmp, b); + } + if (status == APR_SUCCESS) { + status = h2_conn_io_pass(&session->io, session->bbtmp); + } + + apr_brigade_cleanup(session->bbtmp); if (status == APR_SUCCESS) { stream->data_frames_sent++; - h2_conn_io_consider_pass(&session->io); return 0; } else { @@ -682,45 +663,31 @@ static apr_status_t init_callbacks(conn_rec *c, nghttp2_session_callbacks **pcb) return APR_SUCCESS; } -static void h2_session_cleanup(h2_session *session) +static void h2_session_destroy(h2_session *session) { - AP_DEBUG_ASSERT(session); - /* This is an early cleanup of the session that may - * discard what is no longer necessary for *new* streams - * and general HTTP/2 processing. - * At this point, all frames are in transit or somehwere in - * our buffers or passed down output filters. - * h2 streams might still being written out. - */ - if (session->c) { - h2_ctx_clear(session->c); + AP_DEBUG_ASSERT(session); + + h2_ihash_clear(session->streams); + if (session->mplx) { + h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); + h2_mplx_release_and_join(session->mplx, session->iowait); + session->mplx = NULL; } + + ap_remove_input_filter_byhandle((session->r? session->r->input_filters : + session->c->input_filters), "H2_IN"); if (session->ngh2) { nghttp2_session_del(session->ngh2); session->ngh2 = NULL; } - if (session->spare) { - apr_pool_destroy(session->spare); - session->spare = NULL; + if (session->c) { + h2_ctx_clear(session->c); } -} -static void h2_session_destroy(h2_session *session) -{ - AP_DEBUG_ASSERT(session); - - h2_session_cleanup(session); - h2_ihash_clear(session->streams); - if (APLOGctrace1(session->c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "h2_session(%ld): destroy", session->id); } - if (session->mplx) { - h2_mplx_set_consumed_cb(session->mplx, NULL, NULL); - h2_mplx_release_and_join(session->mplx, session->iowait); - session->mplx = NULL; - } if (session->pool) { apr_pool_destroy(session->pool); } @@ -901,7 +868,7 @@ static h2_session *h2_session_create_int(conn_rec *c, h2_session_receive, session); ap_add_input_filter("H2_IN", session->cin, r, c); - h2_conn_io_init(&session->io, c, session->config, session->pool); + h2_conn_io_init(&session->io, c, session->config); session->bbtmp = apr_brigade_create(session->pool, c->bucket_alloc); status = init_callbacks(c, &callbacks); @@ -1138,10 +1105,8 @@ static int resume_on_data(void *ctx, void *val) static int h2_session_resume_streams_with_data(h2_session *session) { AP_DEBUG_ASSERT(session); - if (!h2_ihash_empty(session->streams) - && session->mplx && !session->mplx->aborted) { + if (session->open_streams && !session->mplx->aborted) { resume_ctx ctx; - ctx.session = session; ctx.resume_count = 0; @@ -1153,11 +1118,6 @@ static int h2_session_resume_streams_with_data(h2_session *session) return 0; } -h2_stream *h2_session_get_stream(h2_session *session, int stream_id) -{ - return h2_ihash_get(session->streams, stream_id); -} - static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1183,7 +1143,7 @@ static ssize_t stream_data_cb(nghttp2_session *ng2s, (void)ng2s; (void)buf; (void)source; - stream = h2_session_get_stream(session, stream_id); + stream = get_stream(session, stream_id); if (!stream) { ap_log_cerror(APLOG_MARK, APLOG_ERR, 0, session->c, APLOGNO(02937) @@ -1334,7 +1294,7 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) stream->id, err); } - stream->submitted = 1; + --session->unanswered_streams; if (stream->request && stream->request->initiated_on) { ++session->pushes_submitted; } @@ -1384,7 +1344,6 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, session->c, "h2_stream(%ld-%d): scheduling push stream", session->id, stream->id); - h2_stream_cleanup(stream); stream = NULL; } ++session->unsent_promises; @@ -1509,29 +1468,14 @@ apr_status_t h2_session_set_prio(h2_session *session, h2_stream *stream, apr_status_t h2_session_stream_done(h2_session *session, h2_stream *stream) { - apr_pool_t *pool = h2_stream_detach_pool(stream); - int stream_id = stream->id; - int rst_error = stream->rst_error; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_stream(%ld-%d): cleanup by EOS bucket destroy", - session->id, stream_id); - if (session->streams) { - h2_ihash_remove(session->streams, stream_id); - } + "h2_stream(%ld-%d): EOS bucket cleanup -> done", + session->id, stream->id); + h2_ihash_remove(session->streams, stream->id); + --session->open_streams; + --session->unanswered_streams; + h2_mplx_stream_done(session->mplx, stream); - h2_stream_cleanup(stream); - h2_mplx_stream_done(session->mplx, stream_id, rst_error); - h2_stream_destroy(stream); - - if (pool) { - apr_pool_clear(pool); - if (session->spare) { - apr_pool_destroy(session->spare); - } - session->spare = pool; - } - return APR_SUCCESS; } @@ -1708,7 +1652,7 @@ static apr_status_t h2_session_submit(h2_session *session) if (has_unsubmitted_streams(session)) { /* If we have responses ready, submit them now. */ - while ((stream = h2_mplx_next_submit(session->mplx, session->streams))) { + while ((stream = h2_mplx_next_submit(session->mplx))) { status = submit_response(session, stream); ++session->unsent_submits; @@ -1770,7 +1714,7 @@ static void update_child_status(h2_session *session, int status, const char *msg apr_snprintf(session->status, sizeof(session->status), "%s, streams: %d/%d/%d/%d/%d (open/recv/resp/push/rst)", msg? msg : "-", - (int)h2_ihash_count(session->streams), + (int)session->open_streams, (int)session->remote.emitted_count, (int)session->responses_submitted, (int)session->pushes_submitted, @@ -1788,7 +1732,7 @@ static void transit(h2_session *session, const char *action, h2_session_state ns session->state = nstate; switch (session->state) { case H2_SESSION_ST_IDLE: - update_child_status(session, (h2_ihash_empty(session->streams)? + update_child_status(session, (session->open_streams == 0? SERVER_BUSY_KEEPALIVE : SERVER_BUSY_READ), "idle"); break; @@ -1917,10 +1861,7 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) * CPU cycles. Ideally, we'd like to do a blocking read, but that * is not possible if we have scheduled tasks and wait * for them to produce something. */ - if (h2_conn_io_flush(&session->io) != APR_SUCCESS) { - dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); - } - if (h2_ihash_empty(session->streams)) { + if (!session->open_streams) { if (!is_accepting_streams(session)) { /* We are no longer accepting new streams and have * finished processing existing ones. Time to leave. */ @@ -1944,6 +1885,10 @@ static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) * new output data from task processing, * switch to blocking reads. We are probably waiting on * window updates. */ + if (h2_conn_io_flush(&session->io) != APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + return; + } transit(session, "no io", H2_SESSION_ST_IDLE); session->idle_until = apr_time_now() + session->s->timeout; session->keep_sync_until = session->idle_until; @@ -2126,9 +2071,8 @@ apr_status_t h2_session_process(h2_session *session, int async) break; case H2_SESSION_ST_IDLE: - /* make certain, the client receives everything before we idle */ - if (!session->keep_sync_until - && async && h2_ihash_empty(session->streams) + /* make certain, we send everything before we idle */ + if (!session->keep_sync_until && async && !session->open_streams && !session->r && session->remote.emitted_count) { ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): async idle, nonblock read", session->id); @@ -2226,8 +2170,8 @@ apr_status_t h2_session_process(h2_session *session, int async) } } - if (!h2_ihash_empty(session->streams)) { - /* resume any streams for which data is available again */ + if (session->open_streams) { + /* resume any streams with output data */ h2_session_resume_streams_with_data(session); /* Submit any responses/push_promises that are ready */ status = h2_session_submit(session); @@ -2278,6 +2222,7 @@ apr_status_t h2_session_process(h2_session *session, int async) session->start_wait = apr_time_now(); if (h2_conn_io_flush(&session->io) != APR_SUCCESS) { dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + break; } } else if ((apr_time_now() - session->start_wait) >= session->s->timeout) { @@ -2303,11 +2248,15 @@ apr_status_t h2_session_process(h2_session *session, int async) session->wait_us = 0; dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); } - else if (status == APR_TIMEUP) { + else if (APR_STATUS_IS_TIMEUP(status)) { /* go back to checking all inputs again */ transit(session, "wait cycle", session->local.accepting? H2_SESSION_ST_BUSY : H2_SESSION_ST_LOCAL_SHUTDOWN); } + else if (APR_STATUS_IS_ECONNRESET(status) + || APR_STATUS_IS_ECONNABORTED(status)) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } else { ap_log_cerror(APLOG_MARK, APLOG_WARNING, status, c, "h2_session(%ld): waiting on conditional", @@ -2343,7 +2292,7 @@ apr_status_t h2_session_process(h2_session *session, int async) ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): [%s] process returns", session->id, state_name(session->state)); - + if ((session->state != H2_SESSION_ST_DONE) && (APR_STATUS_IS_EOF(status) || APR_STATUS_IS_ECONNRESET(status) diff --git a/mod_http2/h2_session.h b/mod_http2/h2_session.h index bf4ded33..32202dc3 100644 --- a/mod_http2/h2_session.h +++ b/mod_http2/h2_session.h @@ -100,6 +100,8 @@ typedef struct h2_session { struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */ + int open_streams; /* number of streams open */ + int unanswered_streams; /* number of streams waiting for response */ int unsent_submits; /* number of submitted, but not yet written responses. */ int unsent_promises; /* number of submitted, but not yet written push promised */ @@ -122,8 +124,6 @@ typedef struct h2_session { apr_bucket_brigade *bbtmp; /* brigade for keeping temporary data */ struct apr_thread_cond_t *iowait; /* our cond when trywaiting for data */ - apr_pool_t *spare; /* spare stream pool */ - char status[64]; /* status message for scoreboard */ int last_status_code; /* the one already reported */ const char *last_status_msg; /* the one already reported */ @@ -190,9 +190,6 @@ void h2_session_close(h2_session *session); apr_status_t h2_session_handle_response(h2_session *session, struct h2_stream *stream); -/* Get the h2_stream for the given stream idenrtifier. */ -struct h2_stream *h2_session_get_stream(h2_session *session, int stream_id); - /** * Create and register a new stream under the given id. * diff --git a/mod_http2/h2_stream.c b/mod_http2/h2_stream.c index c8635aee..dcc25da4 100644 --- a/mod_http2/h2_stream.c +++ b/mod_http2/h2_stream.c @@ -53,12 +53,19 @@ static int state_transition[][7] = { /*CL*/{ 1, 1, 0, 0, 1, 1, 1 }, }; -#define H2_STREAM_OUT_LOG(lvl,s,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ - h2_util_bb_log((s)->session->c,(s)->session->id,lvl,msg,(s)->buffer); \ - } while(0) - +static void H2_STREAM_OUT_LOG(int lvl, h2_stream *s, char *tag) +{ + if (APLOG_C_IS_LEVEL(s->session->c, lvl)) { + conn_rec *c = s->session->c; + char buffer[4 * 1024]; + const char *line = "(null)"; + apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); + + len = h2_util_bb_print(buffer, bmax, tag, "", s->buffer); + ap_log_cerror(APLOG_MARK, lvl, 0, c, "bb_dump(%ld-%d): %s", + c->id, s->id, len? buffer : line); + } +} static int set_state(h2_stream *stream, h2_stream_state_t state) { @@ -143,6 +150,30 @@ static int output_open(h2_stream *stream) } } +static apr_status_t stream_pool_cleanup(void *ctx) +{ + h2_stream *stream = ctx; + apr_status_t status; + + if (stream->input) { + h2_beam_destroy(stream->input); + stream->input = NULL; + } + if (stream->files) { + apr_file_t *file; + int i; + for (i = 0; i < stream->files->nelts; ++i) { + file = APR_ARRAY_IDX(stream->files, i, apr_file_t*); + status = apr_file_close(file); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, stream->session->c, + "h2_stream(%ld-%d): destroy, closed file %d", + stream->session->id, stream->id, i); + } + stream->files = NULL; + } + return APR_SUCCESS; +} + h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, int initiated_on, const h2_request *creq) { @@ -162,11 +193,13 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, req->initiated_on = initiated_on; } else { - req = h2_request_create(id, pool, + req = h2_req_create(id, pool, h2_config_geti(session->config, H2_CONF_SER_HEADERS)); } stream->request = req; + apr_pool_cleanup_register(pool, stream, stream_pool_cleanup, + apr_pool_cleanup_null); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082) "h2_stream(%ld-%d): opened", session->id, stream->id); return stream; @@ -175,19 +208,30 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session, void h2_stream_cleanup(h2_stream *stream) { AP_DEBUG_ASSERT(stream); - if (stream->input) { - h2_beam_destroy(stream->input); - stream->input = NULL; - } if (stream->buffer) { apr_brigade_cleanup(stream->buffer); } + if (stream->input) { + apr_status_t status; + status = h2_beam_shutdown(stream->input, APR_NONBLOCK_READ); + if (status == APR_EAGAIN) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + "h2_stream(%ld-%d): wait on input shutdown", + stream->session->id, stream->id); + status = h2_beam_shutdown(stream->input, APR_BLOCK_READ); + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, stream->session->c, + "h2_stream(%ld-%d): input shutdown returned", + stream->session->id, stream->id); + } + } } void h2_stream_destroy(h2_stream *stream) { AP_DEBUG_ASSERT(stream); - h2_stream_cleanup(stream); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, 0, stream->session->c, + "h2_stream(%ld-%d): destroy", + stream->session->id, stream->id); if (stream->pool) { apr_pool_destroy(stream->pool); } @@ -229,9 +273,14 @@ apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r) return APR_ECONNRESET; } set_state(stream, H2_STREAM_ST_OPEN); - status = h2_request_rwrite(stream->request, r); + status = h2_request_rwrite(stream->request, stream->pool, r); stream->request->serialize = h2_config_geti(h2_config_rget(r), H2_CONF_SER_HEADERS); + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, status, r, APLOGNO(03058) + "h2_request(%d): rwrite %s host=%s://%s%s", + stream->request->id, stream->request->method, + stream->request->scheme, stream->request->authority, + stream->request->path); return status; } @@ -394,11 +443,43 @@ int h2_stream_is_suspended(const h2_stream *stream) static apr_status_t fill_buffer(h2_stream *stream, apr_size_t amount) { + conn_rec *c = stream->session->c; + apr_bucket *b; + apr_status_t status; + if (!stream->output) { return APR_EOF; } - return h2_beam_receive(stream->output, stream->buffer, - APR_NONBLOCK_READ, amount); + status = h2_beam_receive(stream->output, stream->buffer, + APR_NONBLOCK_READ, amount); + /* The buckets we reveive are using the stream->buffer pool as + * lifetime which is exactly what we want since this is stream->pool. + * + * However: when we send these buckets down the core output filters, the + * filter might decide to setaside them into a pool of its own. And it + * might decide, after having sent the buckets, to clear its pool. + * + * This is problematic for file buckets because it then closed the contained + * file. Any split off buckets we sent afterwards will result in a + * APR_EBADF. + */ + for (b = APR_BRIGADE_FIRST(stream->buffer); + b != APR_BRIGADE_SENTINEL(stream->buffer); + b = APR_BUCKET_NEXT(b)) { + if (APR_BUCKET_IS_FILE(b)) { + apr_bucket_file *f = (apr_bucket_file *)b->data; + apr_pool_t *fpool = apr_file_pool_get(f->fd); + if (fpool != c->pool) { + apr_bucket_setaside(b, c->pool); + if (!stream->files) { + stream->files = apr_array_make(stream->pool, + 5, sizeof(apr_file_t*)); + } + APR_ARRAY_PUSH(stream->files, apr_file_t*) = f->fd; + } + } + } + return status; } apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, @@ -429,12 +510,14 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, return status; } +static const apr_size_t DATA_CHUNK_SIZE = ((16*1024) - 100 - 9); + apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, int *peos) { conn_rec *c = stream->session->c; apr_status_t status = APR_SUCCESS; - apr_off_t requested = (*plen > 0)? *plen : 32*1024; + apr_off_t requested; if (stream->rst_error) { *plen = 0; @@ -442,11 +525,19 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, return APR_ECONNRESET; } + if (*plen > 0) { + requested = H2MIN(*plen, DATA_CHUNK_SIZE); + } + else { + requested = DATA_CHUNK_SIZE; + } + *plen = requested; + H2_STREAM_OUT_LOG(APLOG_TRACE2, stream, "h2_stream_out_prepare_pre"); h2_util_bb_avail(stream->buffer, plen, peos); - if (!*peos && !*plen) { + if (!*peos && *plen < requested) { /* try to get more data */ - status = fill_buffer(stream, H2MIN(requested, 32*1024)); + status = fill_buffer(stream, (requested - *plen) + DATA_CHUNK_SIZE); if (APR_STATUS_IS_EOF(status)) { apr_bucket *eos = apr_bucket_eos_create(c->bucket_alloc); APR_BRIGADE_INSERT_TAIL(stream->buffer, eos); @@ -467,27 +558,6 @@ apr_status_t h2_stream_out_prepare(h2_stream *stream, } -apr_status_t h2_stream_readx(h2_stream *stream, - h2_io_data_cb *cb, void *ctx, - apr_off_t *plen, int *peos) -{ - conn_rec *c = stream->session->c; - apr_status_t status = APR_SUCCESS; - - if (stream->rst_error) { - return APR_ECONNRESET; - } - status = h2_util_bb_readx(stream->buffer, cb, ctx, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - } - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, - "h2_stream(%ld-%d): readx, len=%ld eos=%d", - c->id, stream->id, (long)*plen, *peos); - return status; -} - - apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { diff --git a/mod_http2/h2_stream.h b/mod_http2/h2_stream.h index 8ae600c7..33f28f6e 100644 --- a/mod_http2/h2_stream.h +++ b/mod_http2/h2_stream.h @@ -54,6 +54,7 @@ struct h2_stream { struct h2_bucket_beam *output; apr_bucket_brigade *buffer; apr_bucket_brigade *tmp; + apr_array_header_t *files; /* apr_file_t* we collected during I/O */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ @@ -62,7 +63,6 @@ struct h2_stream { unsigned int submitted : 1; /* response HEADER has been sent */ apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ - apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */ }; @@ -204,23 +204,6 @@ apr_status_t h2_stream_set_response(h2_stream *stream, apr_status_t h2_stream_out_prepare(h2_stream *stream, apr_off_t *plen, int *peos); -/** - * Read data from the stream output. - * - * @param stream the stream to read from - * @param cb callback to invoke for byte chunks read. Might be invoked - * multiple times (with different values) for one read operation. - * @param ctx context data for callback - * @param plen (in-/out) max. number of bytes to read and on return actual - * number of bytes read - * @param peos (out) != 0 iff end of stream has been reached while reading - * @return APR_SUCCESS if out information was computed successfully. - * APR_EAGAIN if not data is available and end of stream has not been - * reached yet. - */ -apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, - void *ctx, apr_off_t *plen, int *peos); - /** * Read a maximum number of bytes into the bucket brigade. * diff --git a/mod_http2/h2_task.c b/mod_http2/h2_task.c index fe0cca25..92029d89 100644 --- a/mod_http2/h2_task.c +++ b/mod_http2/h2_task.c @@ -485,6 +485,9 @@ void h2_task_rst(h2_task *task, int error) if (task->output.beam) { h2_beam_abort(task->output.beam); } + if (task->c) { + task->c->aborted = 1; + } } apr_status_t h2_task_shutdown(h2_task *task, int block) @@ -682,7 +685,7 @@ static apr_status_t h2_task_process_request(h2_task *task, conn_rec *c) "h2_task(%s): create request_rec", task->id); r = h2_request_create_rec(req, c); if (r && (r->status == HTTP_OK)) { - ap_update_child_status(c->sbh, SERVER_BUSY_READ, r); + ap_update_child_status(c->sbh, SERVER_BUSY_WRITE, r); if (cs) { cs->state = CONN_STATE_HANDLER; diff --git a/mod_http2/h2_task.h b/mod_http2/h2_task.h index 58b64b0a..454bc376 100644 --- a/mod_http2/h2_task.h +++ b/mod_http2/h2_task.h @@ -83,7 +83,6 @@ struct h2_task { unsigned int frozen : 1; unsigned int blocking : 1; unsigned int detached : 1; - unsigned int orphaned : 1; /* h2_stream is gone for this task */ unsigned int submitted : 1; /* response has been submitted to client */ unsigned int worker_started : 1; /* h2_worker started processing for this io */ unsigned int worker_done : 1; /* h2_worker finished for this io */ diff --git a/mod_http2/h2_util.c b/mod_http2/h2_util.c index 64830524..e6fe4596 100644 --- a/mod_http2/h2_util.c +++ b/mod_http2/h2_util.c @@ -23,8 +23,7 @@ #include -#include "h2_private.h" -#include "h2_request.h" +#include "h2.h" #include "h2_util.h" /* h2_log2(n) iff n is a power of 2 */ @@ -1036,19 +1035,6 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax, return off; } -void h2_util_bb_log(conn_rec *c, int stream_id, int level, - const char *tag, apr_bucket_brigade *bb) -{ - char buffer[4 * 1024]; - const char *line = "(null)"; - apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); - - len = h2_util_bb_print(buffer, bmax, tag, "", bb); - /* Intentional no APLOGNO */ - ap_log_cerror(APLOG_MARK, level, 0, c, "bb_dump(%ld-%d): %s", - c->id, stream_id, len? buffer : line); -} - apr_status_t h2_append_brigade(apr_bucket_brigade *to, apr_bucket_brigade *from, apr_off_t *plen, @@ -1066,6 +1052,8 @@ apr_status_t h2_append_brigade(apr_bucket_brigade *to, if (APR_BUCKET_IS_METADATA(e)) { if (APR_BUCKET_IS_EOS(e)) { *peos = 1; + apr_bucket_delete(e); + continue; } } else { @@ -1313,6 +1301,107 @@ int h2_proxy_res_ignore_header(const char *name, size_t len) || ignore_header(H2_LIT_ARGS(IgnoredProxyRespHds), name, len)); } +apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool, + const char *name, size_t nlen, + const char *value, size_t vlen) +{ + char *hname, *hvalue; + + if (h2_req_ignore_header(name, nlen)) { + return APR_SUCCESS; + } + else if (H2_HD_MATCH_LIT("cookie", name, nlen)) { + const char *existing = apr_table_get(headers, "cookie"); + if (existing) { + char *nval; + + /* Cookie header come separately in HTTP/2, but need + * to be merged by "; " (instead of default ", ") + */ + hvalue = apr_pstrndup(pool, value, vlen); + nval = apr_psprintf(pool, "%s; %s", existing, hvalue); + apr_table_setn(headers, "Cookie", nval); + return APR_SUCCESS; + } + } + else if (H2_HD_MATCH_LIT("host", name, nlen)) { + if (apr_table_get(headers, "Host")) { + return APR_SUCCESS; /* ignore duplicate */ + } + } + + hname = apr_pstrndup(pool, name, nlen); + hvalue = apr_pstrndup(pool, value, vlen); + h2_util_camel_case_header(hname, nlen); + apr_table_mergen(headers, hname, hvalue); + + return APR_SUCCESS; +} + +/******************************************************************************* + * h2 request handling + ******************************************************************************/ + +h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method, + const char *scheme, const char *authority, + const char *path, apr_table_t *header, int serialize) +{ + h2_request *req = apr_pcalloc(pool, sizeof(h2_request)); + + req->id = id; + req->method = method; + req->scheme = scheme; + req->authority = authority; + req->path = path; + req->headers = header? header : apr_table_make(pool, 10); + req->request_time = apr_time_now(); + req->serialize = serialize; + + return req; +} + +h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize) +{ + return h2_req_createn(id, pool, NULL, NULL, NULL, NULL, NULL, serialize); +} + +typedef struct { + apr_table_t *headers; + apr_pool_t *pool; +} h1_ctx; + +static int set_h1_header(void *ctx, const char *key, const char *value) +{ + h1_ctx *x = ctx; + size_t klen = strlen(key); + if (!h2_req_ignore_header(key, klen)) { + h2_headers_add_h1(x->headers, x->pool, key, klen, value, strlen(value)); + } + return 1; +} + +apr_status_t h2_req_make(h2_request *req, apr_pool_t *pool, + const char *method, const char *scheme, + const char *authority, const char *path, + apr_table_t *headers) +{ + h1_ctx x; + + req->method = method; + req->scheme = scheme; + req->authority = authority; + req->path = path; + + AP_DEBUG_ASSERT(req->scheme); + AP_DEBUG_ASSERT(req->authority); + AP_DEBUG_ASSERT(req->path); + AP_DEBUG_ASSERT(req->method); + + x.pool = pool; + x.headers = req->headers; + apr_table_do(set_h1_header, &x, headers, NULL); + return APR_SUCCESS; +} /******************************************************************************* * frame logging diff --git a/mod_http2/h2_util.h b/mod_http2/h2_util.h index 8e7e2795..56614766 100644 --- a/mod_http2/h2_util.h +++ b/mod_http2/h2_util.h @@ -276,6 +276,25 @@ h2_ngheader *h2_util_ngheader_make_res(apr_pool_t *p, h2_ngheader *h2_util_ngheader_make_req(apr_pool_t *p, const struct h2_request *req); +apr_status_t h2_headers_add_h1(apr_table_t *headers, apr_pool_t *pool, + const char *name, size_t nlen, + const char *value, size_t vlen); + +/******************************************************************************* + * h2_request helpers + ******************************************************************************/ + +struct h2_request *h2_req_createn(int id, apr_pool_t *pool, const char *method, + const char *scheme, const char *authority, + const char *path, apr_table_t *header, + int serialize); +struct h2_request *h2_req_create(int id, apr_pool_t *pool, int serialize); + +apr_status_t h2_req_make(struct h2_request *req, apr_pool_t *pool, + const char *method, const char *scheme, + const char *authority, const char *path, + apr_table_t *headers); + /******************************************************************************* * apr brigade helpers ******************************************************************************/ @@ -357,8 +376,16 @@ apr_size_t h2_util_bb_print(char *buffer, apr_size_t bmax, * @param tag a short message text about the context * @param bb the brigade to log */ -void h2_util_bb_log(conn_rec *c, int stream_id, int level, - const char *tag, apr_bucket_brigade *bb); +#define h2_util_bb_log(c, i, level, tag, bb) \ +do { \ + char buffer[4 * 1024]; \ + const char *line = "(null)"; \ + apr_size_t len, bmax = sizeof(buffer)/sizeof(buffer[0]); \ + len = h2_util_bb_print(buffer, bmax, (tag), "", (bb)); \ + ap_log_cerror(APLOG_MARK, level, 0, (c), "bb_dump(%ld-%d): %s", \ + (c)->id, (int)(i), (len? buffer : line)); \ +} while(0) + /** * Transfer buckets from one brigade to another with a limit on the diff --git a/mod_http2/h2_version.h b/mod_http2/h2_version.h index 30c0acf4..13cd3df2 100644 --- a/mod_http2/h2_version.h +++ b/mod_http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.5.2" +#define MOD_HTTP2_VERSION "1.5.3" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010502 +#define MOD_HTTP2_VERSION_NUM 0x010503 #endif /* mod_h2_h2_version_h */