diff --git a/ChangeLog b/ChangeLog index 9740fb13..54f786f9 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,29 @@ +v1.2.0 +-------------------------------------------------------------------------------- + * Each connection now has a push diary where already pushed resources are + recorded (as 64-bit subsets sha256 URL hashes). The maximum size of a diary + can be configured with the new directive 'H2PushDiarySize'. The default is 256. + * The module recognizes the request header 'Cache-Digest', carrying a base64url + encoded set of hash values using Golomb set encoding as described in + https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + This is highly experimental and will most likely change in format and + interpretation as the draft evolves. For now, this value is used to replace + the current push diary. This allows clients to inform the server about + which resources they already have cached. + * module configuration now looks to the crypto library (because it wants SHA256 + from it). If it is not found, a replacement hash is used, however this is + not interoperable with cache digests from clients, e.g. hits will be missed. + * the module has a new handler named "http2-status" which exposes certain + properties and statistics of the *current* HTTP/2 connection. It can be + configured just like: + + SetHandler http2-status + + The usual precautions about exposing some internals of your server to the + outside world apply. Just as for "server-status"." + * Due to more test cases and new functions, more bugs have been exposed, + examined and exterminated. + v1.1.0 -------------------------------------------------------------------------------- * GOAWAY will be sent when a HTTP/2 connection is shutdown, whenever the diff --git a/configure.ac b/configure.ac index d4e0cc07..f8abc449 100644 --- a/configure.ac +++ b/configure.ac @@ -14,7 +14,7 @@ # AC_PREREQ([2.69]) -AC_INIT([mod_http2], [1.1.0], [stefan.eissing@greenbytes.de]) +AC_INIT([mod_http2], [1.2.0], [stefan.eissing@greenbytes.de]) LT_PREREQ([2.2.6]) LT_INIT() @@ -130,6 +130,12 @@ AC_CHECK_FUNCS([nghttp2_stream_get_weight], AC_CHECK_FUNCS([nghttp2_session_change_stream_priority], [CPPFLAGS="$CPPFLAGS -DH2_NG2_CHANGE_PRIO"], []) + +AC_CHECK_LIB([crypto], [SHA256_Init], [ +CPPFLAGS="$CPPFLAGS -DH2_OPENSSL" +LIBS="$LIBS -lcrypto"], +[AC_MSG_ERROR("libcrypto not found")]) + # Checks for header files. AC_CHECK_HEADERS([ \ assert.h \ diff --git a/mod_http2/h2_config.c b/mod_http2/h2_config.c index ecfe949d..31307e64 100644 --- a/mod_http2/h2_config.c +++ b/mod_http2/h2_config.c @@ -62,6 +62,8 @@ static h2_config defconf = { -1, /* connection timeout */ -1, /* keepalive timeout */ 0, /* stream timeout */ + 256, /* push diary size */ + }; void h2_config_init(apr_pool_t *pool) @@ -97,6 +99,7 @@ static void *h2_config_create(apr_pool_t *pool, conf->h2_timeout = DEF_VAL; conf->h2_keepalive = DEF_VAL; conf->h2_stream_timeout = DEF_VAL; + conf->push_diary_size = DEF_VAL; return conf; } @@ -145,6 +148,7 @@ void *h2_config_merge(apr_pool_t *pool, void *basev, void *addv) n->h2_timeout = H2_CONFIG_GET(add, base, h2_timeout); n->h2_keepalive = H2_CONFIG_GET(add, base, h2_keepalive); n->h2_stream_timeout = H2_CONFIG_GET(add, base, h2_stream_timeout); + n->push_diary_size = H2_CONFIG_GET(add, base, push_diary_size); return n; } @@ -193,6 +197,8 @@ apr_int64_t h2_config_geti64(const h2_config *conf, h2_config_var_t var) return H2_CONFIG_GET(conf, &defconf, h2_keepalive); case H2_CONF_STREAM_TIMEOUT_SECS: return H2_CONFIG_GET(conf, &defconf, h2_stream_timeout); + case H2_CONF_PUSH_DIARY_SIZE: + return H2_CONFIG_GET(conf, &defconf, push_diary_size); default: return DEF_VAL; } @@ -526,6 +532,23 @@ static const char *h2_conf_set_stream_timeout(cmd_parms *parms, return NULL; } +static const char *h2_conf_set_push_diary_size(cmd_parms *parms, + void *arg, const char *value) +{ + h2_config *cfg = (h2_config *)h2_config_sget(parms->server); + (void)arg; + cfg->push_diary_size = (int)apr_atoi64(value); + if (cfg->push_diary_size < 0) { + return "value must be >= 0"; + } + if (cfg->push_diary_size > 0 && (cfg->push_diary_size & (cfg->push_diary_size-1))) { + return "value must a power of 2"; + } + if (cfg->push_diary_size > (1 << 15)) { + return "value must <= 65536"; + } + return NULL; +} #define AP_END_CMD AP_INIT_TAKE1(NULL, NULL, NULL, RSRC_CONF, NULL) @@ -570,6 +593,8 @@ const command_rec h2_cmds[] = { RSRC_CONF, "timeout (seconds) for idle HTTP/2 connections, no streams open"), AP_INIT_TAKE1("H2StreamTimeout", h2_conf_set_stream_timeout, NULL, RSRC_CONF, "read/write timeout (seconds) for HTTP/2 streams"), + AP_INIT_TAKE1("H2PushDiarySize", h2_conf_set_push_diary_size, NULL, + RSRC_CONF, "size of push diary"), AP_END_CMD }; diff --git a/mod_http2/h2_config.h b/mod_http2/h2_config.h index 57d926d5..a40bb83c 100644 --- a/mod_http2/h2_config.h +++ b/mod_http2/h2_config.h @@ -42,6 +42,7 @@ typedef enum { H2_CONF_TIMEOUT_SECS, H2_CONF_KEEPALIVE_SECS, H2_CONF_STREAM_TIMEOUT_SECS, + H2_CONF_PUSH_DIARY_SIZE, } h2_config_var_t; struct apr_hash_t; @@ -72,6 +73,7 @@ typedef struct h2_config { int h2_timeout; /* timeout for http/2 connections */ int h2_keepalive; /* timeout for idle connections, no streams */ int h2_stream_timeout; /* timeout for http/2 streams, slave connections */ + int push_diary_size; /* # of entries in push diary */ } h2_config; diff --git a/mod_http2/h2_conn.c b/mod_http2/h2_conn.c index 49570c97..3e762f16 100644 --- a/mod_http2/h2_conn.c +++ b/mod_http2/h2_conn.c @@ -38,6 +38,7 @@ #include "h2_worker.h" #include "h2_workers.h" #include "h2_conn.h" +#include "h2_version.h" static struct h2_workers *workers; @@ -170,63 +171,28 @@ apr_status_t h2_conn_setup(h2_ctx *ctx, conn_rec *c, request_rec *r) return APR_SUCCESS; } -static apr_status_t h2_conn_process(h2_ctx *ctx) -{ - h2_session *session; - apr_status_t status; - - session = h2_ctx_session_get(ctx); - if (session->c->cs) { - session->c->cs->sense = CONN_SENSE_DEFAULT; - } - - status = h2_session_process(session, async_mpm); - - session->c->keepalive = AP_CONN_KEEPALIVE; - if (session->c->cs) { - session->c->cs->state = CONN_STATE_WRITE_COMPLETION; - } - - if (APR_STATUS_IS_EOF(status) - || APR_STATUS_IS_ECONNRESET(status) - || APR_STATUS_IS_ECONNABORTED(status)) { - /* fatal, probably client just closed connection. emergency shutdown */ - /* Make sure this connection gets closed properly. */ - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): aborted", session->id); - session->c->keepalive = AP_CONN_CLOSE; - - h2_ctx_clear(session->c); - h2_session_abort(session, status); - h2_session_eoc_callback(session); - /* hereafter session might be gone */ - return APR_ECONNABORTED; - } - - if (session->state == H2_SESSION_ST_CLOSING) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): closing", session->id); - /* Make sure this connection gets closed properly. */ - session->c->keepalive = AP_CONN_CLOSE; - - h2_ctx_clear(session->c); - h2_session_close(session); - /* hereafter session may be gone */ - } - else if (session->state == H2_SESSION_ST_ABORTED) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session(%ld): already aborted", session->id); - return APR_ECONNABORTED; - } - - return APR_SUCCESS; -} - apr_status_t h2_conn_run(struct h2_ctx *ctx, conn_rec *c) { + apr_status_t status; int mpm_state = 0; + do { - h2_conn_process(ctx); + if (c->cs) { + c->cs->sense = CONN_SENSE_DEFAULT; + } + status = h2_session_process(h2_ctx_session_get(ctx), async_mpm); + + if (c->cs) { + c->cs->state = CONN_STATE_WRITE_COMPLETION; + } + if (APR_STATUS_IS_EOF(status)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, + "h2_session(%ld): process, closing conn", c->id); + c->keepalive = AP_CONN_CLOSE; + } + else { + c->keepalive = AP_CONN_KEEPALIVE; + } if (ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state)) { break; diff --git a/mod_http2/h2_conn_io.h b/mod_http2/h2_conn_io.h index b11480ba..15457eb3 100644 --- a/mod_http2/h2_conn_io.h +++ b/mod_http2/h2_conn_io.h @@ -35,6 +35,7 @@ typedef struct { apr_size_t write_size; apr_time_t last_write; + apr_int64_t bytes_read; apr_int64_t bytes_written; int buffer_output; diff --git a/mod_http2/h2_filter.c b/mod_http2/h2_filter.c index fd8e25ce..577d2c1f 100644 --- a/mod_http2/h2_filter.c +++ b/mod_http2/h2_filter.c @@ -23,7 +23,16 @@ #include "h2_private.h" #include "h2_conn_io.h" +#include "h2_ctx.h" +#include "h2_mplx.h" +#include "h2_push.h" +#include "h2_task.h" +#include "h2_stream.h" +#include "h2_stream_set.h" +#include "h2_response.h" +#include "h2_session.h" #include "h2_util.h" +#include "h2_version.h" #include "h2_filter.h" @@ -157,3 +166,143 @@ apr_status_t h2_filter_core_input(ap_filter_t* f, } return status; } + +/******************************************************************************* + * http2 connection status handler + stream out source + ******************************************************************************/ + +static const char *H2_SOS_H2_STATUS = "http2-status"; + +int h2_filter_h2_status_handler(request_rec *r) +{ + h2_ctx *ctx = h2_ctx_rget(r); + h2_task *task; + + if (strcmp(r->handler, "http2-status")) { + return DECLINED; + } + if (r->method_number != M_GET) { + return DECLINED; + } + + task = ctx? h2_ctx_get_task(ctx) : NULL; + if (task) { + /* We need to handle the actual output on the main thread, as + * we need to access h2_session information. */ + apr_table_setn(r->notes, H2_RESP_SOS_NOTE, H2_SOS_H2_STATUS); + apr_table_setn(r->headers_out, "Content-Type", "application/json"); + r->status = 200; + return DONE; + } + return DECLINED; +} + +#define bbout(...) apr_brigade_printf(bb, NULL, NULL, __VA_ARGS__) +static apr_status_t h2_sos_h2_status_buffer(h2_sos *sos, apr_bucket_brigade *bb) +{ + h2_stream *stream = sos->stream; + h2_session *session = stream->session; + h2_mplx *mplx = session->mplx; + apr_status_t status; + + if (!bb) { + bb = apr_brigade_create(stream->pool, session->c->bucket_alloc); + } + + bbout("{\n"); + bbout(" \"HTTP2\": \"on\",\n"); + bbout(" \"H2PUSH\": \"%s\",\n", h2_session_push_enabled(session)? "on" : "off"); + bbout(" \"mod_http2_version\": \"%s\",\n", MOD_HTTP2_VERSION); + bbout(" \"session_id\": %ld,\n", (long)session->id); + bbout(" \"streams_max\": %d,\n", (int)session->max_stream_count); + bbout(" \"this_stream\": %d,\n", stream->id); + bbout(" \"streams_open\": %d,\n", (int)h2_stream_set_size(session->streams)); + bbout(" \"max_stream_started\": %d,\n", mplx->max_stream_started); + bbout(" \"requests_received\": %d,\n", session->requests_received); + bbout(" \"responses_submitted\": %d,\n", session->responses_submitted); + bbout(" \"streams_reset\": %d, \n", session->streams_reset); + bbout(" \"pushes_promised\": %d,\n", session->pushes_promised); + bbout(" \"pushes_submitted\": %d,\n", session->pushes_submitted); + bbout(" \"pushes_reset\": %d,\n", session->pushes_reset); + + if (session->push_diary) { + const char *data; + const char *base64_digest; + apr_size_t len; + + status = h2_push_diary_digest_get(session->push_diary, stream->pool, 1024, &data, &len); + if (status == APR_SUCCESS) { + base64_digest = h2_util_base64url_encode(data, len, stream->pool); + bbout(" \"cache_digest\": \"%s\",\n", base64_digest); + } + + /* try the reverse for testing purposes */ + status = h2_push_diary_digest_set(session->push_diary, data, len); + if (status == APR_SUCCESS) { + status = h2_push_diary_digest_get(session->push_diary, stream->pool, 1024, &data, &len); + if (status == APR_SUCCESS) { + base64_digest = h2_util_base64url_encode(data, len, stream->pool); + bbout(" \"cache_digest^2\": \"%s\",\n", base64_digest); + } + } + } + bbout(" \"frames_received\": %ld,\n", (long)session->frames_received); + bbout(" \"frames_sent\": %ld,\n", (long)session->frames_sent); + bbout(" \"bytes_received\": %"APR_UINT64_T_FMT",\n", session->io.bytes_read); + bbout(" \"bytes_sent\": %"APR_UINT64_T_FMT"\n", session->io.bytes_written); + bbout("}\n"); + + return sos->prev->buffer(sos->prev, bb); +} + +static apr_status_t h2_sos_h2_status_read_to(h2_sos *sos, apr_bucket_brigade *bb, + apr_off_t *plen, int *peos) +{ + return sos->prev->read_to(sos->prev, bb, plen, peos); +} + +static apr_status_t h2_sos_h2_status_prep_read(h2_sos *sos, apr_off_t *plen, int *peos) +{ + return sos->prev->prep_read(sos->prev, plen, peos); +} + +static apr_status_t h2_sos_h2_status_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx, + apr_off_t *plen, int *peos) +{ + return sos->prev->readx(sos->prev, cb, ctx, plen, peos); +} + +static apr_table_t *h2_sos_h2_status_get_trailers(h2_sos *sos) +{ + return sos->prev->get_trailers(sos->prev); +} + +static h2_sos *h2_sos_h2_status_create(h2_sos *prev) +{ + h2_sos *sos; + h2_response *response = prev->response; + + apr_table_unset(response->headers, "Content-Length"); + response->content_length = -1; + + sos = apr_pcalloc(prev->stream->pool, sizeof(*sos)); + sos->prev = prev; + sos->response = response; + sos->stream = prev->stream; + sos->buffer = h2_sos_h2_status_buffer; + sos->prep_read = h2_sos_h2_status_prep_read; + sos->readx = h2_sos_h2_status_readx; + sos->read_to = h2_sos_h2_status_read_to; + sos->get_trailers = h2_sos_h2_status_get_trailers; + + return sos; +} + +h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev) +{ + if (!strcmp(H2_SOS_H2_STATUS, name)) { + return h2_sos_h2_status_create(prev); + } + return prev; +} + diff --git a/mod_http2/h2_filter.h b/mod_http2/h2_filter.h index f27c9ce0..401a6e0e 100644 --- a/mod_http2/h2_filter.h +++ b/mod_http2/h2_filter.h @@ -16,6 +16,7 @@ #ifndef __mod_h2__h2_filter__ #define __mod_h2__h2_filter__ +struct h2_stream; struct h2_session; typedef apr_status_t h2_filter_cin_cb(void *ctx, @@ -42,5 +43,35 @@ apr_status_t h2_filter_core_input(ap_filter_t* filter, apr_read_type_e block, apr_off_t readbytes); +typedef struct h2_sos h2_sos; +typedef apr_status_t h2_sos_data_cb(void *ctx, const char *data, apr_off_t len); + +typedef apr_status_t h2_sos_buffer(h2_sos *sos, apr_bucket_brigade *bb); +typedef apr_status_t h2_sos_prep_read(h2_sos *sos, apr_off_t *plen, int *peos); +typedef apr_status_t h2_sos_readx(h2_sos *sos, h2_sos_data_cb *cb, + void *ctx, apr_off_t *plen, int *peos); +typedef apr_status_t h2_sos_read_to(h2_sos *sos, apr_bucket_brigade *bb, + apr_off_t *plen, int *peos); +typedef apr_table_t *h2_sos_get_trailers(h2_sos *sos); + + +#define H2_RESP_SOS_NOTE "h2-sos-filter" + +struct h2_sos { + struct h2_stream *stream; + h2_sos *prev; + struct h2_response *response; + void *ctx; + h2_sos_buffer *buffer; + h2_sos_prep_read *prep_read; + h2_sos_readx *readx; + h2_sos_read_to *read_to; + h2_sos_get_trailers *get_trailers; +}; + +h2_sos *h2_filter_sos_create(const char *name, struct h2_sos *prev); + +int h2_filter_h2_status_handler(request_rec *r); + #endif /* __mod_h2__h2_filter__ */ diff --git a/mod_http2/h2_from_h1.c b/mod_http2/h2_from_h1.c index 3e372c25..f2f39f30 100644 --- a/mod_http2/h2_from_h1.c +++ b/mod_http2/h2_from_h1.c @@ -70,7 +70,9 @@ h2_response *h2_from_h1_get_response(h2_from_h1 *from_h1) static apr_status_t make_h2_headers(h2_from_h1 *from_h1, request_rec *r) { from_h1->response = h2_response_create(from_h1->stream_id, 0, - from_h1->http_status, from_h1->hlines, + from_h1->http_status, + from_h1->hlines, + r->notes, from_h1->pool); from_h1->content_length = from_h1->response->content_length; from_h1->chunked = r->chunked; diff --git a/mod_http2/h2_mplx.c b/mod_http2/h2_mplx.c index 743f7545..4fa0f591 100644 --- a/mod_http2/h2_mplx.c +++ b/mod_http2/h2_mplx.c @@ -319,11 +319,14 @@ apr_status_t h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait) void h2_mplx_abort(h2_mplx *m) { apr_status_t status; + AP_DEBUG_ASSERT(m); - status = apr_thread_mutex_lock(m->lock); - if (APR_SUCCESS == status) { - m->aborted = 1; - apr_thread_mutex_unlock(m->lock); + if (!m->aborted) { + status = apr_thread_mutex_lock(m->lock); + if (APR_SUCCESS == status) { + m->aborted = 1; + apr_thread_mutex_unlock(m->lock); + } } } diff --git a/mod_http2/h2_push.c b/mod_http2/h2_push.c index 1ba4ea6f..61b33b34 100644 --- a/mod_http2/h2_push.c +++ b/mod_http2/h2_push.c @@ -16,8 +16,14 @@ #include #include -#include #include +#include +#include +#include + +#ifdef H2_OPENSSL +#include +#endif #include #include @@ -29,6 +35,12 @@ #include "h2_push.h" #include "h2_request.h" #include "h2_response.h" +#include "h2_session.h" +#include "h2_stream.h" + +/******************************************************************************* + * link header handling + ******************************************************************************/ static const char *policy_str(h2_push_policy policy) { @@ -451,3 +463,607 @@ void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_en } req->push_policy = policy; } + +/******************************************************************************* + * push diary + ******************************************************************************/ + +typedef struct h2_push_diary_entry { + apr_uint64_t hash; +} h2_push_diary_entry; + + +#ifdef H2_OPENSSL +static void sha256_update(SHA256_CTX *ctx, const char *s) +{ + SHA256_Update(ctx, s, strlen(s)); +} + +static void calc_sha256_hash(h2_push_diary *diary, apr_uint64_t *phash, h2_push *push) +{ + SHA256_CTX sha256; + union { + unsigned char hash[SHA256_DIGEST_LENGTH]; + apr_uint64_t val; + } ctx; + + SHA256_Init(&sha256); + sha256_update(&sha256, push->req->scheme); + sha256_update(&sha256, "://"); + sha256_update(&sha256, push->req->authority); + sha256_update(&sha256, push->req->path); + SHA256_Final(ctx.hash, &sha256); + + *phash = ctx.val; +} +#endif + +static unsigned int val_apr_hash(const char *str) +{ + apr_ssize_t len = strlen(str); + return apr_hashfunc_default(str, &len); +} + +static void calc_apr_hash(h2_push_diary *diary, apr_uint64_t *phash, h2_push *push) +{ + apr_uint64_t val; +#if APR_UINT64MAX > APR_UINT_MAX + val = (val_apr_hash(push->req->scheme) << 32); + val ^= (val_apr_hash(push->req->authority) << 16); + val ^= val_apr_hash(push->req->path); +#else + val = val_apr_hash(push->req->scheme); + val ^= val_apr_hash(push->req->authority); + val ^= val_apr_hash(push->req->path); +#endif + *phash = val; +} + +static apr_int32_t ceil_power_of_2(apr_int32_t n) +{ + --n; + n |= n >> 1; + n |= n >> 2; + n |= n >> 4; + n |= n >> 8; + n |= n >> 16; + return ++n; +} + +static h2_push_diary *diary_create(apr_pool_t *p, h2_push_digest_type dtype, + apr_size_t N) +{ + h2_push_diary *diary = NULL; + + if (N > 0) { + diary = apr_pcalloc(p, sizeof(*diary)); + + diary->NMax = ceil_power_of_2(N); + diary->N = diary->NMax; + /* the mask we use in value comparision depends on where we got + * the values from. If we calculate them ourselves, we can use + * the full 64 bits. + * If we set the diary via a compressed golomb set, we have less + * relevant bits and need to use a smaller mask. */ + diary->mask = 0xffffffffffffffffu; + /* grows by doubling, start with a power of 2 */ + diary->entries = apr_array_make(p, 16, sizeof(h2_push_diary_entry)); + + switch (dtype) { +#ifdef H2_OPENSSL + case H2_PUSH_DIGEST_SHA256: + diary->dtype = H2_PUSH_DIGEST_SHA256; + diary->dcalc = calc_sha256_hash; + break; +#endif /* ifdef H2_OPENSSL */ + default: + diary->dtype = H2_PUSH_DIGEST_APR_HASH; + diary->dcalc = calc_apr_hash; + break; + } + } + + return diary; +} + +h2_push_diary *h2_push_diary_create(apr_pool_t *p, apr_size_t N) +{ + return diary_create(p, H2_PUSH_DIGEST_SHA256, N); +} + +static int h2_push_diary_find(h2_push_diary *diary, apr_uint64_t hash) +{ + if (diary) { + h2_push_diary_entry *e; + int i; + + /* search from the end, where the last accessed digests are */ + hash &= diary->mask; + for (i = diary->entries->nelts-1; i >= 0; --i) { + e = &APR_ARRAY_IDX(diary->entries, i, h2_push_diary_entry); + if (e->hash == hash) { + return i; + } + } + } + return -1; +} + +static h2_push_diary_entry *move_to_last(h2_push_diary *diary, apr_size_t idx) +{ + h2_push_diary_entry *entries = (h2_push_diary_entry*)diary->entries->elts; + h2_push_diary_entry e; + apr_size_t lastidx = diary->entries->nelts-1; + + /* move entry[idx] to the end */ + if (idx < lastidx) { + e = entries[idx]; + memmove(entries+idx, entries+idx+1, sizeof(e) * (lastidx - idx)); + entries[lastidx] = e; + } + return &entries[lastidx]; +} + +static void h2_push_diary_append(h2_push_diary *diary, h2_push_diary_entry *e) +{ + h2_push_diary_entry *ne; + + if (diary->entries->nelts < diary->N) { + /* append a new diary entry at the end */ + APR_ARRAY_PUSH(diary->entries, h2_push_diary_entry) = *e; + ne = &APR_ARRAY_IDX(diary->entries, diary->entries->nelts-1, h2_push_diary_entry); + } + else { + /* replace content with new digest. keeps memory usage constant once diary is full */ + ne = move_to_last(diary, 0); + *ne = *e; + } + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, diary->entries->pool, + "push_diary_append: masking %lx", ne->hash); + ne->hash &= diary->mask; +} + +apr_array_header_t *h2_push_diary_update(h2_session *session, apr_array_header_t *pushes) +{ + apr_array_header_t *npushes = pushes; + h2_push_diary_entry e; + int i, idx; + + if (session->push_diary && pushes) { + npushes = NULL; + + for (i = 0; i < pushes->nelts; ++i) { + h2_push *push; + + push = APR_ARRAY_IDX(pushes, i, h2_push*); + session->push_diary->dcalc(session->push_diary, &e.hash, push); + idx = h2_push_diary_find(session->push_diary, e.hash); + if (idx >= 0) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "push_diary_update: already there PUSH %s", push->req->path); + move_to_last(session->push_diary, idx); + } + else { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "push_diary_update: adding PUSH %s", push->req->path); + if (!npushes) { + npushes = apr_array_make(pushes->pool, 5, sizeof(h2_push_diary_entry*)); + } + APR_ARRAY_PUSH(npushes, h2_push*) = push; + h2_push_diary_append(session->push_diary, &e); + } + } + } + return npushes; +} + +apr_array_header_t *h2_push_collect_update(h2_stream *stream, + const struct h2_request *req, + const struct h2_response *res) +{ + h2_session *session = stream->session; + const char *cache_digest = apr_table_get(req->headers, "Cache-Digest"); + apr_array_header_t *pushes; + apr_status_t status; + + if (cache_digest && session->push_diary) { + status = h2_push_diary_digest64_set(session->push_diary, cache_digest, stream->pool); + if (status != APR_SUCCESS) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_session(%ld): push diary set from Cache-Digest: %s", + session->id, cache_digest); + } + } + pushes = h2_push_collect(stream->pool, req, res); + return h2_push_diary_update(stream->session, pushes); +} + +/* log2(n) iff n is a power of 2 */ +static unsigned char log2(apr_uint32_t n) +{ + int lz = 0; + if (!n) { + return 0; + } + if (!(n & 0xffff0000u)) { + lz += 16; + n = (n << 16); + } + if (!(n & 0xff000000u)) { + lz += 8; + n = (n << 8); + } + if (!(n & 0xf0000000u)) { + lz += 4; + n = (n << 4); + } + if (!(n & 0xc0000000u)) { + lz += 2; + n = (n << 2); + } + if (!(n & 0x80000000u)) { + lz += 1; + } + + return 31 - lz; +} + +/* log2(n) iff n is a power of 2 */ +static unsigned char log2_64(apr_uint64_t n) +{ + apr_uint32_t i = (n & 0xffffffffu); + if (i) { + return log2(i); + } + return log2((apr_uint32_t)(n >> 32)) + 32; +} + +static apr_int32_t log2inv(unsigned char log2) +{ + return log2? (1 << log2) : 1; +} + + +typedef struct { + h2_push_diary *diary; + unsigned char log2p; + apr_uint32_t mask_bits; + apr_uint64_t mask; + apr_uint32_t fixed_bits; + apr_uint64_t fixed_mask; + apr_pool_t *pool; + unsigned char *data; + apr_size_t datalen; + apr_size_t offset; + unsigned int bit; + apr_uint64_t last; +} gset_encoder; + +static int cmp_puint64(const void *p1, const void *p2) +{ + const apr_uint64_t *pu1 = p1, *pu2 = p2; + return (*pu1 > *pu2)? 1 : ((*pu1 == *pu2)? 0 : -1); +} + +/* in golomb bit stream encoding, bit 0 is the 8th of the first char, or + * more generally: + * char(bit/8) & cbit_mask[(bit % 8)] + */ +static unsigned char cbit_mask[] = { + 0x80u, + 0x40u, + 0x20u, + 0x10u, + 0x08u, + 0x04u, + 0x02u, + 0x01u, +}; + +static apr_status_t gset_encode_bit(gset_encoder *encoder, int bit) +{ + if (++encoder->bit >= 8) { + if (++encoder->offset >= encoder->datalen) { + apr_size_t nlen = encoder->datalen*2; + unsigned char *ndata = apr_pcalloc(encoder->pool, nlen); + if (!ndata) { + return APR_ENOMEM; + } + memcpy(ndata, encoder->data, encoder->datalen); + encoder->data = ndata; + encoder->datalen = nlen; + } + encoder->bit = 0; + encoder->data[encoder->offset] = 0xffu; + } + if (!bit) { + encoder->data[encoder->offset] &= ~cbit_mask[encoder->bit]; + } + return APR_SUCCESS; +} + +static apr_status_t gset_encode_next(gset_encoder *encoder, apr_uint64_t pval) +{ + apr_uint64_t delta, flex_bits; + apr_status_t status = APR_SUCCESS; + int i; + + delta = pval - encoder->last; + encoder->last = pval; + flex_bits = (delta >> encoder->fixed_bits); + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, encoder->pool, + "h2_push_diary_enc: val=%lx, delta=%lx flex_bits=%ld, " + "fixed_bits=%d, fixed_val=%lx", + pval, delta, flex_bits, encoder->fixed_bits, delta&encoder->fixed_mask); + for (; flex_bits != 0; --flex_bits) { + status = gset_encode_bit(encoder, 1); + if (status != APR_SUCCESS) { + return status; + } + } + status = gset_encode_bit(encoder, 0); + if (status != APR_SUCCESS) { + return status; + } + + for (i = encoder->fixed_bits-1; i >= 0; --i) { + status = gset_encode_bit(encoder, (delta >> i) & 1); + if (status != APR_SUCCESS) { + return status; + } + } + return APR_SUCCESS; +} + +/** + * Get a cache digest as described in + * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + * from the contents of the push diary. + * + * @param diary the diary to calculdate the digest from + * @param p the pool to use + * @param pdata on successful return, the binary cache digest + * @param plen on successful return, the length of the binary data + */ +apr_status_t h2_push_diary_digest_get(h2_push_diary *diary, apr_pool_t *pool, + apr_uint32_t maxP, + const char **pdata, apr_size_t *plen) +{ + apr_size_t nelts, N, i; + unsigned char log2n, log2pmax, mask_bits; + gset_encoder encoder; + apr_uint64_t *hashes; + apr_size_t hash_count; + + nelts = diary->entries->nelts; + + if (nelts > APR_UINT32_MAX) { + /* should not happen */ + return APR_ENOTIMPL; + } + N = ceil_power_of_2(nelts); + log2n = log2(N); + + mask_bits = log2_64(diary->mask + 1); + if (mask_bits <= log2n) { + /* uhm, what? */ + return APR_ENOTIMPL; + } + + /* Now log2p is the max number of relevant bits, so that + * log2p + log2n == mask_bits. We can uise a lower log2p + * and have a shorter set encoding... + */ + log2pmax = log2(ceil_power_of_2(maxP)); + + memset(&encoder, 0, sizeof(encoder)); + encoder.diary = diary; + encoder.log2p = H2MIN(mask_bits - log2n, log2pmax); + encoder.mask_bits = log2n + encoder.log2p; + encoder.mask = 1; + encoder.mask = (encoder.mask << encoder.mask_bits) - 1; + encoder.fixed_bits = encoder.log2p; + encoder.fixed_mask = 1; + encoder.fixed_mask = (encoder.fixed_mask << encoder.fixed_bits) - 1; + encoder.pool = pool; + encoder.datalen = 512; + encoder.data = apr_pcalloc(encoder.pool, encoder.datalen); + + encoder.data[0] = log2n; + encoder.data[1] = encoder.log2p; + encoder.offset = 1; + encoder.bit = 8; + encoder.last = 0; + + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, + "h2_push_diary_digest_get: %d entries, N=%d, log2n=%d, " + "mask_bits=%d, enc.mask_bits=%d, enc.log2p=%d", + (int)nelts, (int)N, (int)log2n, (int)mask_bits, + (int)encoder.mask_bits, (int)encoder.log2p); + + hash_count = diary->entries->nelts; + hashes = apr_pcalloc(encoder.pool, hash_count); + for (i = 0; i < hash_count; ++i) { + hashes[i] = ((&APR_ARRAY_IDX(diary->entries, i, h2_push_diary_entry))->hash + & encoder.mask); + } + + qsort(hashes, hash_count, sizeof(apr_uint64_t), cmp_puint64); + for (i = 0; i < hash_count; ++i) { + if (!i || (hashes[i] != hashes[i-1])) { + gset_encode_next(&encoder, hashes[i]); + } + } + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, + "h2_push_diary_digest_get: golomb compressed hashes, %d bytes", + (int)encoder.offset + 1); + + *pdata = (const char *)encoder.data; + *plen = encoder.offset + 1; + + return APR_SUCCESS; +} + +typedef struct { + h2_push_diary *diary; + apr_pool_t *pool; + unsigned char log2p; + const unsigned char *data; + apr_size_t datalen; + apr_size_t offset; + unsigned int bit; + apr_uint64_t last_val; +} gset_decoder; + +static int gset_decode_next_bit(gset_decoder *decoder) +{ + if (++decoder->bit >= 8) { + if (++decoder->offset >= decoder->datalen) { + return -1; + } + decoder->bit = 0; + } + return (decoder->data[decoder->offset] & cbit_mask[decoder->bit])? 1 : 0; +} + +static apr_status_t gset_decode_next(gset_decoder *decoder, apr_uint64_t *phash) +{ + apr_uint64_t flex = 0, fixed = 0, delta; + int i; + + /* read 1 bits until we encounter 0, then read log2n(diary-P) bits. + * On a malformed bit-string, this will not fail, but produce results + * which are pbly too large. Luckily, the diary will modulo the hash. + */ + while (1) { + int bit = gset_decode_next_bit(decoder); + if (bit == -1) { + return APR_EINVAL; + } + if (!bit) { + break; + } + ++flex; + } + + for (i = 0; i < decoder->log2p; ++i) { + int bit = gset_decode_next_bit(decoder); + if (bit == -1) { + return APR_EINVAL; + } + fixed = (fixed << 1) | bit; + } + + delta = (flex << decoder->log2p) | fixed; + *phash = delta + decoder->last_val; + decoder->last_val = *phash; + + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, decoder->pool, + "h2_push_diary_digest_dec: val=%lx, delta=%lx, flex=%d, fixed=%lx", + *phash, delta, (int)flex, fixed); + + return APR_SUCCESS; +} + +/** + * Initialize the push diary by a cache digest as described in + * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + * . + * @param diary the diary to set the digest into + * @param data the binary cache digest + * @param len the length of the cache digest + * @return APR_EINVAL if digest was not successfully parsed + */ +apr_status_t h2_push_diary_digest_set(h2_push_diary *diary, + const char *data, apr_size_t len) +{ + gset_decoder decoder; + unsigned char log2n, log2p; + apr_size_t N, i; + apr_pool_t *pool = diary->entries->pool; + h2_push_diary_entry e; + apr_status_t status = APR_SUCCESS; + apr_uint64_t mask; + int mask_bits; + + if (len < 2) { + /* at least this should be there */ + return APR_EINVAL; + } + log2n = data[0]; + log2p = data[1]; + mask_bits = log2n + log2p; + if (mask_bits > 64) { + /* cannot handle */ + return APR_ENOTIMPL; + } + else if (mask_bits == 64) { + mask = 0xffffffffffffffffu; + } + else { + mask = 1; + mask = (mask << mask_bits) - 1; + } + + /* whatever is in the digest, it replaces the diary entries */ + apr_array_clear(diary->entries); + + N = log2inv(log2n + log2p); + + decoder.diary = diary; + decoder.pool = pool; + decoder.log2p = log2p; + decoder.data = (const unsigned char*)data; + decoder.datalen = len; + decoder.offset = 1; + decoder.bit = 8; + decoder.last_val = 0; + + diary->N = N; + diary->mask = mask; + /* Determine effective N we use for storage */ + if (!N) { + /* a totally empty cache digest. someone tells us that she has no + * entries in the cache at all. Use our own preferences for N+mask + */ + diary->N = diary->NMax; + diary->mask = 0xffffffffffffffffu; + return APR_SUCCESS; + } + else if (N > diary->NMax) { + /* Store not more than diary is configured to hold. We open us up + * to DOS attacks otherwise. */ + diary->N = diary->NMax; + } + + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, + "h2_push_diary_digest_set: N=%d, log2n=%d, " + "diary->mask=%lx, dec.log2p=%d", + (int)diary->N, (int)log2n, diary->mask, + (int)decoder.log2p); + + for (i = 0; i < diary->N; ++i) { + if (gset_decode_next(&decoder, &e.hash) != APR_SUCCESS) { + /* the data may have less than N values */ + break; + } + h2_push_diary_append(diary, &e); + } + + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, + "h2_push_diary_digest_set: diary now with %d entries, mask=%lx", + (int)diary->entries->nelts, diary->mask); + return status; +} + +apr_status_t h2_push_diary_digest64_set(h2_push_diary *diary, const char *data64url, + apr_pool_t *pool) +{ + const char *data; + apr_size_t len = h2_util_base64url_decode(&data, data64url, pool); + ap_log_perror(APLOG_MARK, APLOG_TRACE1, 0, pool, + "h2_push_diary_digest64_set: digest=%s, dlen=%d", + data64url, (int)len); + return h2_push_diary_digest_set(diary, data, len); +} + diff --git a/mod_http2/h2_push.h b/mod_http2/h2_push.h index f3a8601c..f0a2d89d 100644 --- a/mod_http2/h2_push.h +++ b/mod_http2/h2_push.h @@ -18,6 +18,8 @@ struct h2_request; struct h2_response; struct h2_ngheader; +struct h2_session; +struct h2_stream; typedef enum { H2_PUSH_NONE, @@ -30,11 +32,97 @@ typedef struct h2_push { const struct h2_request *req; } h2_push; +typedef enum { + H2_PUSH_DIGEST_APR_HASH, + H2_PUSH_DIGEST_SHA256 +} h2_push_digest_type; + +typedef struct h2_push_diary h2_push_diary; + +typedef void h2_push_digest_calc(h2_push_diary *diary, apr_uint64_t *phash, h2_push *push); +struct h2_push_diary { + apr_array_header_t *entries; + apr_size_t NMax; /* Maximum for N, should size change be necessary */ + apr_size_t N; /* Current maximum number of entries, power of 2 */ + apr_uint64_t mask; /* applied on hash value comparision */ + h2_push_digest_type dtype; + h2_push_digest_calc *dcalc; +}; + +/** + * Determine the list of h2_push'es to send to the client on behalf of + * the given request/response pair. + * + * @param p the pool to use + * @param req the requst from the client + * @param res the response from the server + * @return array of h2_push addresses or NULL + */ apr_array_header_t *h2_push_collect(apr_pool_t *p, const struct h2_request *req, const struct h2_response *res); +/** + * Set the push policy for the given request. Takes request headers into + * account, see draft https://tools.ietf.org/html/draft-ruellan-http-accept-push-policy-00 + * for details. + * + * @param req the request to determine the policy for + * @param p the pool to use + * @param push_enabled if HTTP/2 server push is generally enabled for this request + */ void h2_push_policy_determine(struct h2_request *req, apr_pool_t *p, int push_enabled); +/** + * Create a new push diary for the given maximum number of entries. + * + * @oaram p the pool to use + * @param N the max number of entries, rounded up to 2^x + * @return the created diary, might be NULL of max_entries is 0 + */ +h2_push_diary *h2_push_diary_create(apr_pool_t *p, apr_size_t N); + +/** + * Filters the given pushes against the diary and returns only those pushes + * that were newly entered in the diary. + */ +apr_array_header_t *h2_push_diary_update(struct h2_session *session, apr_array_header_t *pushes); + +/** + * Collect pushes for the given request/response pair, enter them into the + * diary and return those pushes newly entered. + */ +apr_array_header_t *h2_push_collect_update(struct h2_stream *stream, + const struct h2_request *req, + const struct h2_response *res); +/** + * Get a cache digest as described in + * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + * from the contents of the push diary. + * + * @param diary the diary to calculdate the digest from + * @param p the pool to use + * @param pdata on successful return, the binary cache digest + * @param plen on successful return, the length of the binary data + */ +apr_status_t h2_push_diary_digest_get(h2_push_diary *diary, apr_pool_t *p, + apr_uint32_t maxP, const char **pdata, + apr_size_t *plen); + +/** + * Initialize the push diary by a cache digest as described in + * https://datatracker.ietf.org/doc/draft-kazuho-h2-cache-digest/ + * . + * @param diary the diary to set the digest into + * @param data the binary cache digest + * @param len the length of the cache digest + * @return APR_EINVAL if digest was not successfully parsed + */ +apr_status_t h2_push_diary_digest_set(h2_push_diary *diary, + const char *data, apr_size_t len); + +apr_status_t h2_push_diary_digest64_set(h2_push_diary *diary, const char *data64url, + apr_pool_t *pool); + #endif /* defined(__mod_h2__h2_push__) */ diff --git a/mod_http2/h2_response.c b/mod_http2/h2_response.c index d16fee29..3ef6f850 100644 --- a/mod_http2/h2_response.c +++ b/mod_http2/h2_response.c @@ -26,6 +26,7 @@ #include #include "h2_private.h" +#include "h2_filter.h" #include "h2_h2.h" #include "h2_util.h" #include "h2_request.h" @@ -64,10 +65,16 @@ static apr_table_t *parse_headers(apr_array_header_t *hlines, apr_pool_t *pool) } } +static const char *get_sos_filter(apr_table_t *notes) +{ + return notes? apr_table_get(notes, H2_RESP_SOS_NOTE) : NULL; +} + static h2_response *h2_response_create_int(int stream_id, int rst_error, int http_status, apr_table_t *headers, + apr_table_t *notes, apr_pool_t *pool) { h2_response *response; @@ -82,11 +89,12 @@ static h2_response *h2_response_create_int(int stream_id, return NULL; } - response->stream_id = stream_id; - response->rst_error = rst_error; - response->http_status = http_status? http_status : 500; + response->stream_id = stream_id; + response->rst_error = rst_error; + response->http_status = http_status? http_status : 500; response->content_length = -1; - response->headers = headers; + response->headers = headers; + response->sos_filter = get_sos_filter(notes); s = apr_table_get(headers, "Content-Length"); if (s) { @@ -109,10 +117,11 @@ h2_response *h2_response_create(int stream_id, int rst_error, int http_status, apr_array_header_t *hlines, + apr_table_t *notes, apr_pool_t *pool) { return h2_response_create_int(stream_id, rst_error, http_status, - parse_headers(hlines, pool), pool); + parse_headers(hlines, pool), notes, pool); } h2_response *h2_response_rcreate(int stream_id, request_rec *r, @@ -123,10 +132,11 @@ h2_response *h2_response_rcreate(int stream_id, request_rec *r, return NULL; } - response->stream_id = stream_id; - response->http_status = r->status; + response->stream_id = stream_id; + response->http_status = r->status; response->content_length = -1; - response->headers = header; + response->headers = header; + response->sos_filter = get_sos_filter(r->notes); if (response->http_status == HTTP_FORBIDDEN) { const char *cause = apr_table_get(r->notes, "ssl-renegotiate-forbidden"); @@ -155,20 +165,22 @@ h2_response *h2_response_die(int stream_id, apr_status_t type, apr_table_setn(headers, "Date", date); apr_table_setn(headers, "Server", ap_get_server_banner()); - return h2_response_create_int(stream_id, 0, 500, headers, pool); + return h2_response_create_int(stream_id, 0, 500, headers, NULL, pool); } h2_response *h2_response_clone(apr_pool_t *pool, h2_response *from) { h2_response *to = apr_pcalloc(pool, sizeof(h2_response)); - to->stream_id = from->stream_id; - to->http_status = from->http_status; + + to->stream_id = from->stream_id; + to->http_status = from->http_status; to->content_length = from->content_length; + to->sos_filter = from->sos_filter; if (from->headers) { - to->headers = apr_table_clone(pool, from->headers); + to->headers = apr_table_clone(pool, from->headers); } if (from->trailers) { - to->trailers = apr_table_clone(pool, from->trailers); + to->trailers = apr_table_clone(pool, from->trailers); } return to; } diff --git a/mod_http2/h2_response.h b/mod_http2/h2_response.h index 426eeead..59140ee3 100644 --- a/mod_http2/h2_response.h +++ b/mod_http2/h2_response.h @@ -20,12 +20,13 @@ struct h2_request; struct h2_push; typedef struct h2_response { - int stream_id; - int rst_error; - int http_status; - apr_off_t content_length; + int stream_id; + int rst_error; + int http_status; + apr_off_t content_length; apr_table_t *headers; apr_table_t *trailers; + const char *sos_filter; } h2_response; /** @@ -40,6 +41,7 @@ h2_response *h2_response_create(int stream_id, int rst_error, int http_status, apr_array_header_t *hlines, + apr_table_t *notes, apr_pool_t *pool); /** diff --git a/mod_http2/h2_session.c b/mod_http2/h2_session.c index de04202a..34575a6b 100644 --- a/mod_http2/h2_session.c +++ b/mod_http2/h2_session.c @@ -74,13 +74,14 @@ static apr_status_t h2_session_receive(void *ctx, const char *data, apr_size_t len, apr_size_t *readlen); +static int is_accepting_streams(h2_session *session); +static void dispatch_event(h2_session *session, h2_session_event_t ev, + int err, const char *msg); + h2_stream *h2_session_open_stream(h2_session *session, int stream_id) { h2_stream * stream; apr_pool_t *stream_pool; - if (session->aborted) { - return NULL; - } if (session->spare) { stream_pool = session->spare; @@ -95,6 +96,7 @@ h2_stream *h2_session_open_stream(h2_session *session, int stream_id) h2_stream_set_add(session->streams, stream); if (H2_STREAM_CLIENT_INITIATED(stream_id) && stream_id > session->max_stream_received) { + ++session->requests_received; session->max_stream_received = stream->id; } @@ -174,7 +176,6 @@ static apr_status_t stream_schedule(h2_session *session, h2_stream *stream, int eos) { (void)session; - ++session->requests_received; return h2_stream_schedule(stream, eos, h2_session_push_enabled(session), stream_pri_cmp, session); } @@ -210,16 +211,14 @@ static int on_invalid_frame_recv_cb(nghttp2_session *ngh2, h2_session *session = (h2_session *)userp; (void)ngh2; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (APLOGctrace2(session->c)) { + if (APLOGcdebug(session->c)) { char buffer[256]; frame_print(frame, buffer, sizeof(buffer)/sizeof(buffer[0])); - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, session->c, - "h2_session: callback on_invalid_frame_recv error=%d %s", - error, buffer); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): recv unknown FRAME[%s], frames=%ld/%ld (r/s)", + session->id, buffer, (long)session->frames_received, + (long)session->frames_sent); } return 0; } @@ -234,8 +233,9 @@ static int on_data_chunk_recv_cb(nghttp2_session *ngh2, uint8_t flags, int rv; (void)flags; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; + if (!is_accepting_streams(session)) { + /* ignore */ + return 0; } stream = h2_session_get_stream(session, stream_id); @@ -298,9 +298,6 @@ static int on_stream_close_cb(nghttp2_session *ngh2, int32_t stream_id, h2_stream *stream; (void)ngh2; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } stream = h2_session_get_stream(session, stream_id); if (stream) { stream_release(session, stream, error_code); @@ -339,8 +336,9 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, (void)ngh2; (void)flags; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; + if (!is_accepting_streams(session)) { + /* just ignore */ + return 0; } stream = h2_session_get_stream(session, frame->hd.stream_id); @@ -374,10 +372,6 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, apr_status_t status = APR_SUCCESS; h2_stream *stream; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (APLOGcdebug(session->c)) { char buffer[256]; @@ -455,10 +449,16 @@ static int on_frame_recv_cb(nghttp2_session *ng2s, "h2_session(%ld-%d): RST_STREAM by client, errror=%d", session->id, (int)frame->hd.stream_id, (int)frame->rst_stream.error_code); - ++session->streams_reset; + stream = h2_session_get_stream(session, frame->hd.stream_id); + if (stream && stream->initiated_on) { + ++session->pushes_reset; + } + else { + ++session->streams_reset; + } break; case NGHTTP2_GOAWAY: - session->client_goaway = 1; + dispatch_event(session, H2_SESSION_EV_REMOTE_GOAWAY, 0, NULL); break; default: if (APLOGctrace2(session->c)) { @@ -515,10 +515,6 @@ static int on_send_data_cb(nghttp2_session *ngh2, (void)ngh2; (void)source; - if (session->aborted) { - return NGHTTP2_ERR_CALLBACK_FAILURE; - } - if (frame->data.padlen > H2_MAX_PADLEN) { return NGHTTP2_ERR_PROTO; } @@ -692,42 +688,24 @@ static void h2_session_destroy(h2_session *session) } } -static apr_status_t h2_session_shutdown(h2_session *session, int reason) +static apr_status_t h2_session_shutdown(h2_session *session, int reason, const char *msg) { apr_status_t status = APR_SUCCESS; + const char *err = msg; AP_DEBUG_ASSERT(session); - if (session->state != H2_SESSION_ST_CLOSING - && session->state != H2_SESSION_ST_ABORTED) { - h2_mplx_abort(session->mplx); - if (session->server_goaway) { - /* already sent one */ - } - else if (!reason) { - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - h2_mplx_get_max_stream_started(session->mplx), - reason, NULL, 0); - status = nghttp2_session_send(session->ngh2); - session->server_goaway = 1; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "session(%ld): shutdown, no err", session->id); - } - else { - const char *err = nghttp2_strerror(reason); - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, - h2_mplx_get_max_stream_started(session->mplx), - reason, (const uint8_t *)err, - strlen(err)); - status = nghttp2_session_send(session->ngh2); - session->server_goaway = 1; - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, - "session(%ld): shutdown, err=%d '%s'", - session->id, reason, err); - } - - h2_conn_io_flush(&session->io); - session->state = H2_SESSION_ST_CLOSING; - } + if (!err && reason) { + err = nghttp2_strerror(reason); + } + nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, + h2_mplx_get_max_stream_started(session->mplx), + reason, (uint8_t*)err, err? strlen(err):0); + status = nghttp2_session_send(session->ngh2); + h2_conn_io_flush(&session->io); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "session(%ld): sent GOAWAY, err=%d, msg=%s", + session->id, reason, err? err : ""); + dispatch_event(session, H2_SESSION_EV_LOCAL_GOAWAY, reason, err); return status; } @@ -744,8 +722,8 @@ static apr_status_t session_pool_cleanup(void *data) ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, "session(%ld): pool_cleanup", session->id); - AP_DEBUG_ASSERT(session->aborted || session->server_goaway); - if (!session->aborted && !session->server_goaway) { + if (session->state != H2_SESSION_ST_DONE + && session->state != H2_SESSION_ST_LOCAL_SHUTDOWN) { /* Not good. The connection is being torn down and we have * not sent a goaway. This is considered a protocol error and * the client has to assume that any streams "in flight" may have @@ -810,6 +788,7 @@ static h2_session *h2_session_create_int(conn_rec *c, { nghttp2_session_callbacks *callbacks = NULL; nghttp2_option *options = NULL; + uint32_t n; apr_pool_t *pool = NULL; apr_status_t status = apr_pool_create(&pool, c->pool); @@ -911,13 +890,18 @@ static h2_session *h2_session_create_int(conn_rec *c, h2_session_destroy(session); return NULL; } - + + n = h2_config_geti(session->config, H2_CONF_PUSH_DIARY_SIZE); + session->push_diary = h2_push_diary_create(session->pool, n); + if (APLOGcdebug(c)) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, "session(%ld) created, timeout=%d, keepalive_timeout=%d, " - "max_streams=%d, stream_mem=%d", + "max_streams=%d, stream_mem=%d, push_diary(type=%d,N=%d)", session->id, session->timeout_secs, session->keepalive_secs, - (int)session->max_stream_count, (int)session->max_stream_mem); + (int)session->max_stream_count, (int)session->max_stream_mem, + session->push_diary->dtype, + (int)session->push_diary->N); } } return session; @@ -941,15 +925,6 @@ void h2_session_eoc_callback(h2_session *session) h2_session_destroy(session); } -void h2_session_abort(h2_session *session, apr_status_t status) -{ - AP_DEBUG_ASSERT(session); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, - "h2_session(%ld): aborting", session->id); - session->state = H2_SESSION_ST_ABORTED; - session->aborted = 1; -} - static apr_status_t h2_session_start(h2_session *session, int *rv) { apr_status_t status = APR_SUCCESS; @@ -1025,6 +1000,10 @@ static apr_status_t h2_session_start(h2_session *session, int *rv) ++slen; } + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, session->c, + "h2_session(%ld): start, INITIAL_WINDOW_SIZE=%ld, " + "MAX_CONCURRENT_STREAMS=%d", + session->id, (long)win_size, (int)session->max_stream_count); *rv = nghttp2_submit_settings(session->ngh2, NGHTTP2_FLAG_NONE, settings, slen); if (*rv != 0) { @@ -1088,7 +1067,7 @@ static int h2_session_resume_streams_with_data(h2_session *session) { AP_DEBUG_ASSERT(session); if (!h2_stream_set_is_empty(session->streams) - && session->mplx && !session->aborted) { + && session->mplx && !session->mplx->aborted) { resume_ctx ctx; ctx.session = session; @@ -1110,25 +1089,6 @@ h2_stream *h2_session_get_stream(h2_session *session, int stream_id) return session->last_stream; } -void h2_session_close(h2_session *session) -{ - apr_status_t status = APR_SUCCESS; - apr_bucket *b; - conn_rec *c = session->c; - - AP_DEBUG_ASSERT(session); - if (!session->server_goaway) { - status = h2_session_shutdown(session, 0); - } - h2_session_cleanup(session); - - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, - "h2_session(%ld): writing eoc", c->id); - b = h2_bucket_eoc_create(c->bucket_alloc, session); - h2_conn_io_write_eoc(&session->io, b); - /* and all is or will be destroyed */ -} - static ssize_t stream_data_cb(nghttp2_session *ng2s, int32_t stream_id, uint8_t *buf, @@ -1240,17 +1200,17 @@ typedef struct { static apr_status_t submit_response(h2_session *session, h2_stream *stream) { apr_status_t status = APR_SUCCESS; + h2_response *response = h2_stream_get_response(stream); int rv = 0; AP_DEBUG_ASSERT(session); AP_DEBUG_ASSERT(stream); - AP_DEBUG_ASSERT(stream->response || stream->rst_error); + AP_DEBUG_ASSERT(response || stream->rst_error); if (stream->submitted) { rv = NGHTTP2_PROTOCOL_ERROR; } - else if (stream->response && stream->response->headers) { + else if (response && response->headers) { nghttp2_data_provider provider; - h2_response *response = stream->response; h2_ngheader *ngh; const h2_priority *prio; @@ -1294,7 +1254,6 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) response->headers); rv = nghttp2_submit_response(session->ngh2, response->stream_id, ngh->nv, ngh->nvlen, &provider); - ++session->responses_sent; } else { int err = H2_STREAM_RST(stream, H2_ERR_PROTOCOL_ERROR); @@ -1305,14 +1264,19 @@ static apr_status_t submit_response(h2_session *session, h2_stream *stream) rv = nghttp2_submit_rst_stream(session->ngh2, NGHTTP2_FLAG_NONE, stream->id, err); - ++session->responses_sent; } stream->submitted = 1; + if (stream->initiated_on) { + ++session->pushes_submitted; + } + else { + ++session->responses_submitted; + } if (nghttp2_is_fatal(rv)) { status = APR_EGENERAL; - h2_session_shutdown(session, rv); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv)); ap_log_cerror(APLOG_MARK, APLOG_ERR, status, session->c, APLOGNO(02940) "submit_response: %s", nghttp2_strerror(rv)); @@ -1338,7 +1302,7 @@ struct h2_stream *h2_session_push(h2_session *session, h2_stream *is, session->id, is->id, nghttp2_strerror(nid)); return NULL; } - ++session->streams_pushed; + ++session->pushes_promised; ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_stream(%ld-%d): SERVER_PUSH %d for %s %s on %d", @@ -1594,9 +1558,7 @@ static apr_status_t h2_session_send(h2_session *session) int rv = nghttp2_session_send(session->ngh2); if (rv != 0) { if (nghttp2_is_fatal(rv)) { - ap_log_cerror( APLOG_MARK, APLOG_DEBUG, 0, session->c, - "h2_session: send gave error=%s", nghttp2_strerror(rv)); - h2_session_shutdown(session, rv); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, rv, nghttp2_strerror(rv)); return APR_EGENERAL; } } @@ -1619,17 +1581,14 @@ static apr_status_t h2_session_receive(void *ctx, const char *data, session->id, (long)len); n = nghttp2_session_mem_recv(session->ngh2, (const uint8_t *)data, len); if (n < 0) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, APR_EGENERAL, - session->c, - "h2_session: nghttp2_session_mem_recv error=%d", - (int)n); if (nghttp2_is_fatal((int)n)) { - h2_session_shutdown(session, (int)n); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, (int)n, nghttp2_strerror(n)); return APR_EGENERAL; } } else { *readlen = n; + session->io.bytes_read += n; } } return APR_SUCCESS; @@ -1646,7 +1605,7 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops) * We just pull at the filter chain to make it happen */ status = ap_get_brigade(c->input_filters, session->bbtmp, AP_MODE_READBYTES, - (block && !session->aborted)? APR_BLOCK_READ : APR_NONBLOCK_READ, + block? APR_BLOCK_READ : APR_NONBLOCK_READ, APR_BUCKET_BUFF_SIZE); /* get rid of any possible data we do not expect to get */ apr_brigade_cleanup(session->bbtmp); @@ -1690,8 +1649,7 @@ static apr_status_t h2_session_read(h2_session *session, int block, int loops) * status. */ return rstatus; } - - if (session->aborted) { + if (!is_accepting_streams(session)) { break; } } @@ -1728,6 +1686,271 @@ static apr_status_t h2_session_submit(h2_session *session) return status; } +static const char *StateNames[] = { + "INIT", /* H2_SESSION_ST_INIT */ + "DONE", /* H2_SESSION_ST_DONE */ + "IDLE", /* H2_SESSION_ST_IDLE */ + "BUSY", /* H2_SESSION_ST_BUSY */ + "WAIT", /* H2_SESSION_ST_WAIT */ + "LSHUTDOWN", /* H2_SESSION_ST_LOCAL_SHUTDOWN */ + "RSHUTDOWN", /* H2_SESSION_ST_REMOTE_SHUTDOWN */ +}; + +static const char *state_name(h2_session_state state) +{ + if (state >= (sizeof(StateNames)/sizeof(StateNames[0]))) { + return "unknown"; + } + return StateNames[state]; +} + +static int is_accepting_streams(h2_session *session) +{ + switch (session->state) { + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_BUSY: + case H2_SESSION_ST_WAIT: + return 1; + default: + return 0; + } +} + +static void transit(h2_session *session, const char *action, h2_session_state nstate) +{ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, + "h2_session(%ld): transit [%s] -- %s --> [%s]", session->id, + state_name(session->state), action, state_name(nstate)); + session->state = nstate; +} + +static void h2_session_ev_init(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_INIT: + transit(session, "init", H2_SESSION_ST_BUSY); + break; + + default: + /* nop */ + break; + } +} + +static void h2_session_ev_local_goaway(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* already did that? */ + break; + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_REMOTE_SHUTDOWN: + /* all done */ + transit(session, "local goaway", H2_SESSION_ST_DONE); + break; + default: + transit(session, "local goaway", H2_SESSION_ST_LOCAL_SHUTDOWN); + break; + } +} + +static void h2_session_ev_remote_goaway(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_REMOTE_SHUTDOWN: + /* already received that? */ + break; + case H2_SESSION_ST_IDLE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* all done */ + transit(session, "remote goaway", H2_SESSION_ST_DONE); + break; + default: + transit(session, "remote goaway", H2_SESSION_ST_REMOTE_SHUTDOWN); + break; + } +} + +static void h2_session_ev_conn_error(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_INIT: + case H2_SESSION_ST_DONE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "conn error", H2_SESSION_ST_DONE); + break; + + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): conn error -> shutdown", session->id); + h2_session_shutdown(session, arg, msg); + break; + } +} + +static void h2_session_ev_proto_error(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_DONE: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + /* just leave */ + transit(session, "proto error", H2_SESSION_ST_DONE); + break; + + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): proto error -> shutdown", session->id); + h2_session_shutdown(session, arg, msg); + break; + } +} + +static void h2_session_ev_conn_timeout(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_LOCAL_SHUTDOWN: + transit(session, "conn timeout", H2_SESSION_ST_DONE); + break; + default: + h2_session_shutdown(session, arg, msg); + transit(session, "conn timeout", H2_SESSION_ST_DONE); + break; + } +} + +static void h2_session_ev_no_io(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_BUSY: + /* nothing for input and output to do. If we remain + * in this state, we go into a tight loop and suck up + * CPU cycles. Ideally, we'd like to do a blocking read, but that + * is not possible if we have scheduled tasks and wait + * for them to produce something. */ + if (h2_stream_set_is_empty(session->streams)) { + /* When we have no streams, no task event are possible, + * switch to blocking reads */ + transit(session, "no io", H2_SESSION_ST_IDLE); + } + else if (!h2_stream_set_has_unsubmitted(session->streams) + && !h2_stream_set_has_suspended(session->streams)) { + /* none of our streams is waiting for a response or + * new output data from task processing, + * switch to blocking reads. */ + transit(session, "no io", H2_SESSION_ST_IDLE); + } + else { + /* Unable to do blocking reads, as we wait on events from + * task processing in other threads. Do a busy wait with + * backoff timer. */ + transit(session, "no io", H2_SESSION_ST_WAIT); + } + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_wait_timeout(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_WAIT: + transit(session, "wait timeout", H2_SESSION_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_stream_ready(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_WAIT: + transit(session, "stream ready", H2_SESSION_ST_BUSY); + break; + default: + /* nop */ + break; + } +} + +static void h2_session_ev_data_read(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_IDLE: + transit(session, "data read", H2_SESSION_ST_BUSY); + break; + /* fall through */ + default: + /* nop */ + break; + } +} + +static void h2_session_ev_ngh2_done(h2_session *session, int arg, const char *msg) +{ + switch (session->state) { + case H2_SESSION_ST_DONE: + /* nop */ + break; + default: + transit(session, "nghttp2 done", H2_SESSION_ST_DONE); + break; + } +} + +static void dispatch_event(h2_session *session, h2_session_event_t ev, + int arg, const char *msg) +{ + switch (ev) { + case H2_SESSION_EV_INIT: + h2_session_ev_init(session, arg, msg); + break; + case H2_SESSION_EV_LOCAL_GOAWAY: + h2_session_ev_local_goaway(session, arg, msg); + break; + case H2_SESSION_EV_REMOTE_GOAWAY: + h2_session_ev_remote_goaway(session, arg, msg); + break; + case H2_SESSION_EV_CONN_ERROR: + h2_session_ev_conn_error(session, arg, msg); + break; + case H2_SESSION_EV_PROTO_ERROR: + h2_session_ev_proto_error(session, arg, msg); + break; + case H2_SESSION_EV_CONN_TIMEOUT: + h2_session_ev_conn_timeout(session, arg, msg); + break; + case H2_SESSION_EV_NO_IO: + h2_session_ev_no_io(session, arg, msg); + break; + case H2_SESSION_EV_WAIT_TIMEOUT: + h2_session_ev_wait_timeout(session, arg, msg); + break; + case H2_SESSION_EV_STREAM_READY: + h2_session_ev_stream_ready(session, arg, msg); + break; + case H2_SESSION_EV_DATA_READ: + h2_session_ev_data_read(session, arg, msg); + break; + case H2_SESSION_EV_NGH2_DONE: + h2_session_ev_ngh2_done(session, arg, msg); + break; + default: + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, session->c, + "h2_session(%ld): unknown event %d", + session->id, ev); + break; + } + + if (session->state == H2_SESSION_ST_DONE) { + h2_mplx_abort(session->mplx); + } +} + static const int MAX_WAIT_MICROS = 200 * 1000; apr_status_t h2_session_process(h2_session *session, int async) @@ -1735,7 +1958,6 @@ apr_status_t h2_session_process(h2_session *session, int async) apr_status_t status = APR_SUCCESS; conn_rec *c = session->c; int rv, have_written, have_read; - const char *reason = ""; ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, "h2_session(%ld): process start, async=%d", session->id, async); @@ -1743,84 +1965,65 @@ apr_status_t h2_session_process(h2_session *session, int async) while (1) { have_read = have_written = 0; - if (session->aborted) { - reason = "aborted"; - status = APR_ECONNABORTED; - goto out; - } - else if (session->client_goaway) { - h2_session_shutdown(session, 0); - } - switch (session->state) { case H2_SESSION_ST_INIT: if (!h2_is_acceptable_connection(c, 1)) { - nghttp2_submit_goaway(session->ngh2, NGHTTP2_FLAG_NONE, 0, - NGHTTP2_INADEQUATE_SECURITY, NULL, 0); - nghttp2_session_send(session->ngh2); - session->server_goaway = 1; + h2_session_shutdown(session, NGHTTP2_INADEQUATE_SECURITY, NULL); } - - ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); - status = h2_session_start(session, &rv); - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, - "h2_session(%ld): started on %s:%d", session->id, - session->s->server_hostname, - c->local_addr->port); - if (status != APR_SUCCESS) { - reason = "start failed"; - goto out; + else { + ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); + status = h2_session_start(session, &rv); + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, + "h2_session(%ld): started on %s:%d", session->id, + session->s->server_hostname, + c->local_addr->port); + if (status != APR_SUCCESS) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + dispatch_event(session, H2_SESSION_EV_INIT, 0, NULL); } - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): INIT -> BUSY", session->id); - session->state = H2_SESSION_ST_BUSY; break; - case H2_SESSION_ST_IDLE_READ: + case H2_SESSION_ST_IDLE: h2_filter_cin_timeout_set(session->cin, session->keepalive_secs); ap_update_child_status(c->sbh, SERVER_BUSY_KEEPALIVE, NULL); status = h2_session_read(session, 1, 10); - if (APR_STATUS_IS_TIMEUP(status)) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): IDLE -> KEEPALIVE", session->id); - h2_session_shutdown(session, 0); - goto out; - } - else if (status == APR_SUCCESS) { - /* got something, go busy again */ + if (status == APR_SUCCESS) { have_read = 1; - if (session->client_goaway) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): IDLE -> CLOSING", session->id); - h2_session_shutdown(session, 0); - } - else { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): IDLE -> BUSY", session->id); - session->state = H2_SESSION_ST_BUSY; - } + dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); + } + else if (status == APR_EAGAIN) { + /* nothing to read */ + } + else if (APR_STATUS_IS_TIMEUP(status)) { + dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL); + break; } else { - reason = "keepalive error"; - goto out; + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); } break; case H2_SESSION_ST_BUSY: + case H2_SESSION_ST_LOCAL_SHUTDOWN: + case H2_SESSION_ST_REMOTE_SHUTDOWN: if (nghttp2_session_want_read(session->ngh2)) { ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); h2_filter_cin_timeout_set(session->cin, session->timeout_secs); status = h2_session_read(session, 0, 10); if (status == APR_SUCCESS) { - /* got something, continue processing */ have_read = 1; + dispatch_event(session, H2_SESSION_EV_DATA_READ, 0, NULL); } else if (status == APR_EAGAIN) { /* nothing to read */ } + else if (APR_STATUS_IS_TIMEUP(status)) { + dispatch_event(session, H2_SESSION_EV_CONN_TIMEOUT, 0, NULL); + break; + } else { - reason = "busy read error"; - goto out; + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); } } @@ -1833,64 +2036,40 @@ apr_status_t h2_session_process(h2_session *session, int async) have_written = 1; } else if (status != APR_EAGAIN) { - reason = "submit error"; - goto out; + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "submit error"); + break; } /* send out window updates for our inputs */ status = h2_mplx_in_update_windows(session->mplx); if (status != APR_SUCCESS && status != APR_EAGAIN) { - reason = "window update error"; - goto out; + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "window update error"); + break; } } if (nghttp2_session_want_write(session->ngh2)) { status = h2_session_send(session); - if (status != APR_SUCCESS) { - reason = "send error"; - goto out; + if (status == APR_SUCCESS) { + have_written = 1; + } + else { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, + H2_ERR_INTERNAL_ERROR, "writing"); + break; } - have_written = 1; } if (have_read || have_written) { session->wait_us = 0; } else { - /* nothing for input and output to do. If we remain - * in this state, we go into a tight loop and suck up - * CPU cycles. - * Ideally, we'd like to do a blocking read, but that - * is not possible if we have scheduled tasks and wait - * for them to produce something. */ - if (h2_stream_set_is_empty(session->streams)) { - /* When we have no streams, no task event are possible, - * switch to blocking reads */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): BUSY -> IDLE", session->id); - session->state = H2_SESSION_ST_IDLE_READ; - } - else if (!h2_stream_set_has_unsubmitted(session->streams) - && !h2_stream_set_has_suspended(session->streams)) { - /* none of our streams is waiting for a response or - * new output data from task processing, - * switch to blocking reads. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): BUSY -> IDLE", session->id); - session->state = H2_SESSION_ST_IDLE_READ; - } - else { - /* Unable to do blocking reads, as we wait on events from - * task processing in other threads. Do a busy wait with - * backoff timer. */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): BUSY -> WAIT", session->id); - session->state = H2_SESSION_ST_BUSY_WAIT; - } + dispatch_event(session, H2_SESSION_EV_NO_IO, 0, NULL); } break; - case H2_SESSION_ST_BUSY_WAIT: + case H2_SESSION_ST_WAIT: session->wait_us = H2MAX(session->wait_us, 10); if (APLOGctrace1(c)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, @@ -1898,79 +2077,67 @@ apr_status_t h2_session_process(h2_session *session, int async) (long)session->wait_us); } - h2_conn_io_flush(&session->io); ap_log_cerror( APLOG_MARK, APLOG_TRACE2, status, c, "h2_session(%ld): process -> trywait", session->id); status = h2_mplx_out_trywait(session->mplx, session->wait_us, session->iowait); if (status == APR_SUCCESS) { - /* got something, go busy again */ - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): WAIT -> BUSY", session->id); - session->state = H2_SESSION_ST_BUSY; + dispatch_event(session, H2_SESSION_EV_STREAM_READY, 0, NULL); } else if (status == APR_TIMEUP) { - if (nghttp2_session_want_read(session->ngh2)) { - ap_update_child_status(c->sbh, SERVER_BUSY_READ, NULL); - status = h2_session_read(session, 0, 1); - if (status == APR_SUCCESS) { - /* got something, go busy again */ - session->wait_us = 0; - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): WAIT -> BUSY", session->id); - session->state = H2_SESSION_ST_BUSY; - } - else if (status != APR_EAGAIN) { - reason = "busy read error"; - goto out; - } - } /* nothing, increase timer for graceful backup */ session->wait_us = H2MIN(session->wait_us*2, MAX_WAIT_MICROS); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): WAIT -> BUSY", session->id); - session->state = H2_SESSION_ST_BUSY; + dispatch_event(session, H2_SESSION_EV_WAIT_TIMEOUT, 0, NULL); } else { - reason = "busy wait error"; - goto out; + h2_session_shutdown(session, H2_ERR_INTERNAL_ERROR, "cond wait error"); } break; - case H2_SESSION_ST_CLOSING: - if (nghttp2_session_want_write(session->ngh2)) { - status = h2_session_send(session); - have_written = 1; - } - reason = "closing"; + case H2_SESSION_ST_DONE: + status = APR_EOF; goto out; - case H2_SESSION_ST_ABORTED: - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, - "h2_session(%ld): processing ABORTED", session->id); - return APR_ECONNABORTED; - default: ap_log_cerror(APLOG_MARK, APLOG_ERR, APR_EGENERAL, c, - "h2_session(%ld): state %d", session->id, session->state); - return APR_EGENERAL; + "h2_session(%ld): unknown state %d", session->id, session->state); + dispatch_event(session, H2_SESSION_EV_PROTO_ERROR, 0, NULL); + break; } - if (!nghttp2_session_want_read(session->ngh2) - && !nghttp2_session_want_write(session->ngh2)) { - session->state = H2_SESSION_ST_CLOSING; - } - if (have_written) { h2_conn_io_flush(&session->io); } + else if (!nghttp2_session_want_read(session->ngh2) + && !nghttp2_session_want_write(session->ngh2)) { + dispatch_event(session, H2_SESSION_EV_NGH2_DONE, 0, NULL); + } } + out: if (have_written) { h2_conn_io_flush(&session->io); } + ap_log_cerror( APLOG_MARK, APLOG_TRACE1, status, c, - "h2_session(%ld): process return, state %d, reason '%s'", - session->id, session->state, reason); + "h2_session(%ld): [%s] process returns", + session->id, state_name(session->state)); + + if ((session->state != H2_SESSION_ST_DONE) + && (APR_STATUS_IS_EOF(status) + || APR_STATUS_IS_ECONNRESET(status) + || APR_STATUS_IS_ECONNABORTED(status))) { + dispatch_event(session, H2_SESSION_EV_CONN_ERROR, 0, NULL); + } + + status = (session->state == H2_SESSION_ST_DONE)? APR_EOF : APR_SUCCESS; + if (session->state == H2_SESSION_ST_DONE) { + if (!session->eoc_written) { + session->eoc_written = 1; + h2_conn_io_write_eoc(&session->io, + h2_bucket_eoc_create(session->c->bucket_alloc, session)); + } + } + return status; } diff --git a/mod_http2/h2_session.h b/mod_http2/h2_session.h index b3b26b7b..17a4ec5a 100644 --- a/mod_http2/h2_session.h +++ b/mod_http2/h2_session.h @@ -45,6 +45,7 @@ struct h2_filter_cin; struct h2_mplx; struct h2_priority; struct h2_push; +struct h2_push_diary; struct h2_response; struct h2_session; struct h2_stream; @@ -55,13 +56,28 @@ struct nghttp2_session; typedef enum { H2_SESSION_ST_INIT, /* send initial SETTINGS, etc. */ - H2_SESSION_ST_IDLE_READ, /* nothing to write, expecting data inc */ + H2_SESSION_ST_DONE, /* finished, connection close */ + H2_SESSION_ST_IDLE, /* nothing to write, expecting data inc */ H2_SESSION_ST_BUSY, /* read/write without stop */ - H2_SESSION_ST_BUSY_WAIT, /* waiting for tasks reporting back */ - H2_SESSION_ST_CLOSING, /* shuting down */ - H2_SESSION_ST_ABORTED, /* client closed connection or sky fall */ + H2_SESSION_ST_WAIT, /* waiting for tasks reporting back */ + H2_SESSION_ST_LOCAL_SHUTDOWN, /* we announced GOAWAY */ + H2_SESSION_ST_REMOTE_SHUTDOWN, /* client announced GOAWAY */ } h2_session_state; +typedef enum { + H2_SESSION_EV_INIT, /* session was initialized */ + H2_SESSION_EV_LOCAL_GOAWAY, /* we send a GOAWAY */ + H2_SESSION_EV_REMOTE_GOAWAY, /* remote send us a GOAWAY */ + H2_SESSION_EV_CONN_ERROR, /* connection error */ + H2_SESSION_EV_PROTO_ERROR, /* protocol error */ + H2_SESSION_EV_CONN_TIMEOUT, /* connection timeout */ + H2_SESSION_EV_NO_IO, /* nothing has been read or written */ + H2_SESSION_EV_WAIT_TIMEOUT, /* timeout waiting for tasks */ + H2_SESSION_EV_STREAM_READY, /* stream signalled availability of headers/data */ + H2_SESSION_EV_DATA_READ, /* connection data has been read */ + H2_SESSION_EV_NGH2_DONE, /* nghttp2 wants neither read nor write anything */ +} h2_session_event_t; + typedef struct h2_session { long id; /* identifier of this session, unique * inside a httpd process */ @@ -72,23 +88,23 @@ typedef struct h2_session { const struct h2_config *config; /* Relevant config for this session */ h2_session_state state; /* state session is in */ - unsigned int aborted : 1; /* aborted processing, emergency exit */ unsigned int reprioritize : 1; /* scheduled streams priority changed */ - unsigned int client_goaway : 1; /* client sent us a GOAWAY */ - unsigned int server_goaway : 1; /* we sent a GOAWAY */ + unsigned int eoc_written : 1; /* h2 eoc bucket written */ apr_interval_time_t wait_us; /* timout during BUSY_WAIT state, micro secs */ - int unsent_submits; /* number of submitted, but not yet sent - responses. */ - int unsent_promises; /* number of submitted, but not yet sent - * push promised */ - - apr_size_t frames_received; /* number of http/2 frames received */ - apr_size_t frames_sent; /* number of http/2 frames sent */ + int unsent_submits; /* number of submitted, but not yet written responses. */ + int unsent_promises; /* number of submitted, but not yet written push promised */ + int requests_received; /* number of http/2 requests received */ - int responses_sent; /* number of http/2 responses submitted */ + int responses_submitted; /* number of http/2 responses submitted */ int streams_reset; /* number of http/2 streams reset by client */ - int streams_pushed; /* number of http/2 streams pushed */ + int pushes_promised; /* number of http/2 push promises submitted */ + int pushes_submitted; /* number of http/2 pushed responses submitted */ + int pushes_reset; /* number of http/2 pushed reset by client */ + + apr_size_t frames_received; /* number of http/2 frames received */ + apr_size_t frames_sent; /* number of http/2 frames sent */ + int max_stream_received; /* highest stream id created */ int max_stream_handled; /* highest stream id completed */ @@ -114,6 +130,8 @@ typedef struct h2_session { struct nghttp2_session *ngh2; /* the nghttp2 session (internal use) */ struct h2_workers *workers; /* for executing stream tasks */ + + struct h2_push_diary *push_diary; /* remember pushes, avoid duplicates */ } h2_session; diff --git a/mod_http2/h2_stream.c b/mod_http2/h2_stream.c index 1777c990..0a5af7eb 100644 --- a/mod_http2/h2_stream.c +++ b/mod_http2/h2_stream.c @@ -27,6 +27,7 @@ #include "h2_conn.h" #include "h2_config.h" #include "h2_h2.h" +#include "h2_filter.h" #include "h2_mplx.h" #include "h2_push.h" #include "h2_request.h" @@ -40,11 +41,6 @@ #include "h2_util.h" -#define H2_STREAM_OUT(lvl,s,msg) \ - do { \ - if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ - h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbout); \ - } while(0) #define H2_STREAM_IN(lvl,s,msg) \ do { \ if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \ @@ -146,6 +142,8 @@ static int output_open(h2_stream *stream) } } +static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response); + h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session) { h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream)); @@ -161,8 +159,6 @@ h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session) h2_stream *stream = h2_stream_create(id, pool, session); set_state(stream, H2_STREAM_ST_OPEN); stream->request = h2_request_create(id, pool, session->config); - stream->bbout = apr_brigade_create(stream->pool, - stream->session->c->bucket_alloc); ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, "h2_stream(%ld-%d): opened", session->id, stream->id); @@ -206,10 +202,17 @@ void h2_stream_rst(h2_stream *stream, int error_code) stream->session->id, stream->id, error_code); } +struct h2_response *h2_stream_get_response(h2_stream *stream) +{ + return stream->sos? stream->sos->response : NULL; +} + apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, apr_bucket_brigade *bb) { apr_status_t status = APR_SUCCESS; + h2_sos *sos; + if (!output_open(stream)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): output closed", @@ -217,21 +220,16 @@ apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response, return APR_ECONNRESET; } - stream->response = response; - if (bb && !APR_BRIGADE_EMPTY(bb)) { - apr_size_t move_all = INT_MAX; - /* we can move file handles from h2_mplx into this h2_stream as many - * as we want, since the lifetimes are the same and we are not freeing - * the ones in h2_mplx->io before this stream is done. */ - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_pre"); - status = h2_util_move(stream->bbout, bb, 16 * 1024, &move_all, - "h2_stream_set_response"); - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream set_response_post"); + sos = h2_sos_mplx_create(stream, response); + if (sos->response->sos_filter) { + sos = h2_filter_sos_create(sos->response->sos_filter, sos); } + stream->sos = sos; + status = stream->sos->buffer(stream->sos, bb); ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, "h2_stream(%ld-%d): set_response(%d)", - stream->session->id, stream->id, response->http_status); + stream->session->id, stream->id, stream->sos->response->http_status); return status; } @@ -424,156 +422,57 @@ apr_status_t h2_stream_write_data(h2_stream *stream, return status; } +void h2_stream_set_suspended(h2_stream *stream, int suspended) +{ + AP_DEBUG_ASSERT(stream); + stream->suspended = !!suspended; + ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, + "h2_stream(%ld-%d): suspended=%d", + stream->session->id, stream->id, stream->suspended); +} + +int h2_stream_is_suspended(h2_stream *stream) +{ + AP_DEBUG_ASSERT(stream); + return stream->suspended; +} + apr_status_t h2_stream_prep_read(h2_stream *stream, apr_off_t *plen, int *peos) { - apr_status_t status = APR_SUCCESS; - const char *src; - apr_table_t *trailers = NULL; - int test_read = (*plen == 0); - if (stream->rst_error) { return APR_ECONNRESET; } - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_pre"); - if (!APR_BRIGADE_EMPTY(stream->bbout)) { - src = "stream"; - status = h2_util_bb_avail(stream->bbout, plen, peos); - if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { - apr_brigade_cleanup(stream->bbout); - return h2_stream_prep_read(stream, plen, peos); - } - trailers = stream->response? stream->response->trailers : NULL; + if (!stream->sos) { + return APR_EGENERAL; } - else { - src = "mplx"; - status = h2_mplx_out_readx(stream->session->mplx, stream->id, - NULL, NULL, plen, peos, &trailers); - if (trailers && stream->response) { - h2_response_set_trailers(stream->response, trailers); - } - } - - if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - } - - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream prep_read_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d, trailers=%s", - stream->session->id, stream->id, src, (long)*plen, *peos, - trailers? "yes" : "no"); - return status; + return stream->sos->prep_read(stream->sos, plen, peos); } apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, void *ctx, apr_off_t *plen, int *peos) { - apr_status_t status = APR_SUCCESS; - apr_table_t *trailers = NULL; - const char *src; - - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_pre"); if (stream->rst_error) { return APR_ECONNRESET; } - *peos = 0; - if (!APR_BRIGADE_EMPTY(stream->bbout)) { - apr_off_t origlen = *plen; - - src = "stream"; - status = h2_util_bb_readx(stream->bbout, cb, ctx, plen, peos); - if (status == APR_SUCCESS && !*peos && !*plen) { - apr_brigade_cleanup(stream->bbout); - *plen = origlen; - return h2_stream_readx(stream, cb, ctx, plen, peos); - } - } - else { - src = "mplx"; - status = h2_mplx_out_readx(stream->session->mplx, stream->id, - cb, ctx, plen, peos, &trailers); - } - - if (trailers && stream->response) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): readx, saving trailers", - stream->session->id, stream->id); - h2_response_set_trailers(stream->response, trailers); - } - - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; + if (!stream->sos) { + return APR_EGENERAL; } - - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): readx %s, len=%ld eos=%d", - stream->session->id, stream->id, src, (long)*plen, *peos); - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream readx_post"); - - return status; + return stream->sos->readx(stream->sos, cb, ctx, plen, peos); } apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos) { - apr_status_t status = APR_SUCCESS; - apr_table_t *trailers = NULL; - - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_pre"); if (stream->rst_error) { return APR_ECONNRESET; } - - if (APR_BRIGADE_EMPTY(stream->bbout)) { - apr_off_t tlen = *plen; - int eos; - status = h2_mplx_out_read_to(stream->session->mplx, stream->id, - stream->bbout, &tlen, &eos, &trailers); - } - - if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(stream->bbout)) { - status = h2_transfer_brigade(bb, stream->bbout, stream->pool, - plen, peos); + if (!stream->sos) { + return APR_EGENERAL; } - else { - *plen = 0; - *peos = 0; - } - - if (trailers && stream->response) { - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): read_to, saving trailers", - stream->session->id, stream->id); - h2_response_set_trailers(stream->response, trailers); - } - - if (status == APR_SUCCESS && !*peos && !*plen) { - status = APR_EAGAIN; - } - H2_STREAM_OUT(APLOG_TRACE2, stream, "h2_stream read_to_post"); - ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c, - "h2_stream(%ld-%d): read_to, len=%ld eos=%d", - stream->session->id, stream->id, (long)*plen, *peos); - return status; -} - -void h2_stream_set_suspended(h2_stream *stream, int suspended) -{ - AP_DEBUG_ASSERT(stream); - stream->suspended = !!suspended; - ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c, - "h2_stream(%ld-%d): suspended=%d", - stream->session->id, stream->id, stream->suspended); -} - -int h2_stream_is_suspended(h2_stream *stream) -{ - AP_DEBUG_ASSERT(stream); - return stream->suspended; + return stream->sos->read_to(stream->sos, bb, plen, peos); } int h2_stream_input_is_open(h2_stream *stream) @@ -600,7 +499,8 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream) apr_array_header_t *pushes; int i; - pushes = h2_push_collect(stream->pool, stream->request, stream->response); + pushes = h2_push_collect_update(stream, stream->request, + h2_stream_get_response(stream)); if (pushes && !apr_is_empty_array(pushes)) { ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c, "h2_stream(%ld-%d): found %d push candidates", @@ -619,13 +519,15 @@ apr_status_t h2_stream_submit_pushes(h2_stream *stream) apr_table_t *h2_stream_get_trailers(h2_stream *stream) { - return stream->response? stream->response->trailers : NULL; + return stream->sos? stream->sos->get_trailers(stream->sos) : NULL; } const h2_priority *h2_stream_get_priority(h2_stream *stream) { - if (stream->initiated_on && stream->response) { - const char *ctype = apr_table_get(stream->response->headers, "content-type"); + h2_response *response = h2_stream_get_response(stream); + + if (stream->initiated_on && response) { + const char *ctype = apr_table_get(response->headers, "content-type"); if (ctype) { /* FIXME: Not good enough, config needs to come from request->server */ return h2_config_get_priority(stream->session->config, ctype); @@ -634,3 +536,196 @@ const h2_priority *h2_stream_get_priority(h2_stream *stream) return NULL; } +/******************************************************************************* + * h2_sos_mplx + ******************************************************************************/ + +typedef struct h2_sos_mplx { + h2_mplx *m; + apr_bucket_brigade *bb; + apr_table_t *trailers; +} h2_sos_mplx; + +#define H2_SOS_MPLX_OUT(lvl,msos,msg) \ + do { \ + if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \ + h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \ + } while(0) + + +static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb, + apr_off_t *plen, int *peos) +{ + h2_sos_mplx *msos = sos->ctx; + apr_status_t status = APR_SUCCESS; + apr_table_t *trailers = NULL; + + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_pre"); + + if (APR_BRIGADE_EMPTY(msos->bb)) { + apr_off_t tlen = *plen; + int eos; + status = h2_mplx_out_read_to(msos->m, sos->stream->id, + msos->bb, &tlen, &eos, &trailers); + } + + if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(msos->bb)) { + status = h2_transfer_brigade(bb, msos->bb, sos->stream->pool, + plen, peos); + } + else { + *plen = 0; + *peos = 0; + } + + if (trailers) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, + "h2_stream(%ld-%d): read_to, saving trailers", + msos->m->id, sos->stream->id); + msos->trailers = trailers; + } + + if (status == APR_SUCCESS && !*peos && !*plen) { + status = APR_EAGAIN; + } + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_post"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, + "h2_stream(%ld-%d): read_to, len=%ld eos=%d", + msos->m->id, sos->stream->id, (long)*plen, *peos); + return status; +} + +static apr_status_t h2_sos_mplx_prep_read(h2_sos *sos, apr_off_t *plen, int *peos) +{ + h2_sos_mplx *msos = sos->ctx; + apr_status_t status = APR_SUCCESS; + const char *src; + apr_table_t *trailers = NULL; + int test_read = (*plen == 0); + + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_pre"); + if (!APR_BRIGADE_EMPTY(msos->bb)) { + src = "stream"; + status = h2_util_bb_avail(msos->bb, plen, peos); + if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { + apr_brigade_cleanup(msos->bb); + return h2_sos_mplx_prep_read(sos, plen, peos); + } + } + else { + src = "mplx"; + status = h2_mplx_out_readx(msos->m, sos->stream->id, + NULL, NULL, plen, peos, &trailers); + if (trailers) { + msos->trailers = trailers; + } + } + + if (!test_read && status == APR_SUCCESS && !*peos && !*plen) { + status = APR_EAGAIN; + } + + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_post"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, + "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d, trailers=%s", + msos->m->id, sos->stream->id, src, (long)*plen, *peos, + msos->trailers? "yes" : "no"); + return status; +} + +static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx, + apr_off_t *plen, int *peos) +{ + h2_sos_mplx *msos = sos->ctx; + apr_status_t status = APR_SUCCESS; + apr_table_t *trailers = NULL; + const char *src; + + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx readx_pre"); + *peos = 0; + if (!APR_BRIGADE_EMPTY(msos->bb)) { + apr_off_t origlen = *plen; + + src = "stream"; + status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos); + if (status == APR_SUCCESS && !*peos && !*plen) { + apr_brigade_cleanup(msos->bb); + *plen = origlen; + return h2_sos_mplx_readx(sos, cb, ctx, plen, peos); + } + } + else { + src = "mplx"; + status = h2_mplx_out_readx(msos->m, sos->stream->id, + cb, ctx, plen, peos, &trailers); + } + + if (trailers) { + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, + "h2_stream(%ld-%d): readx, saving trailers", + msos->m->id, sos->stream->id); + msos->trailers = trailers; + } + + if (status == APR_SUCCESS && !*peos && !*plen) { + status = APR_EAGAIN; + } + + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_stream readx_post"); + ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c, + "h2_stream(%ld-%d): readx %s, len=%ld eos=%d", + msos->m->id, sos->stream->id, src, (long)*plen, *peos); + + return status; +} + +static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos) +{ + h2_sos_mplx *msos = sos->ctx; + + return msos->trailers; +} + +static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb) +{ + h2_sos_mplx *msos = sos->ctx; + apr_status_t status = APR_SUCCESS; + + if (bb && !APR_BRIGADE_EMPTY(bb)) { + apr_size_t move_all = INT_MAX; + /* we can move file handles from h2_mplx into this h2_stream as many + * as we want, since the lifetimes are the same and we are not freeing + * the ones in h2_mplx->io before this stream is done. */ + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre"); + status = h2_util_move(msos->bb, bb, 16 * 1024, &move_all, + "h2_stream_set_response"); + H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post"); + } + return status; +} + +static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response) +{ + h2_sos *sos; + h2_sos_mplx *msos; + + msos = apr_pcalloc(stream->pool, sizeof(*msos)); + msos->m = stream->session->mplx; + msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc); + + sos = apr_pcalloc(stream->pool, sizeof(*sos)); + sos->stream = stream; + sos->response = response; + + sos->ctx = msos; + sos->buffer = h2_sos_mplx_buffer; + sos->prep_read = h2_sos_mplx_prep_read; + sos->readx = h2_sos_mplx_readx; + sos->read_to = h2_sos_mplx_read_to; + sos->get_trailers = h2_sos_mplx_get_trailers; + + sos->response = response; + + return sos; +} + diff --git a/mod_http2/h2_stream.h b/mod_http2/h2_stream.h index bb884df5..fa219df2 100644 --- a/mod_http2/h2_stream.h +++ b/mod_http2/h2_stream.h @@ -45,7 +45,7 @@ struct h2_priority; struct h2_request; struct h2_response; struct h2_session; -struct h2_task; +struct h2_sos; typedef struct h2_stream h2_stream; @@ -57,7 +57,6 @@ struct h2_stream { apr_pool_t *pool; /* the memory pool for this stream */ struct h2_request *request; /* the request made in this stream */ - struct h2_response *response; /* the response, once ready */ int rst_error; /* stream error for RST_STREAM */ unsigned int aborted : 1; /* was aborted */ @@ -67,8 +66,8 @@ struct h2_stream { apr_off_t input_remaining; /* remaining bytes on input as advertised via content-length */ apr_bucket_brigade *bbin; /* input DATA */ - - apr_bucket_brigade *bbout; /* output DATA */ + + struct h2_sos *sos; /* stream output source, e.g. to read output from */ apr_off_t data_frames_sent; /* # of DATA frames sent out for this stream */ }; @@ -194,6 +193,8 @@ apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, */ int h2_stream_is_scheduled(h2_stream *stream); +struct h2_response *h2_stream_get_response(h2_stream *stream); + /** * Set the response for this stream. Invoked when all meta data for * the stream response has been collected. @@ -255,6 +256,16 @@ apr_status_t h2_stream_readx(h2_stream *stream, h2_io_data_cb *cb, apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, apr_off_t *plen, int *peos); +/** + * Get optional trailers for this stream, may be NULL. Meaningful + * results can only be expected when the end of the response body has + * been reached. + * + * @param stream to ask for trailers + * @return trailers for NULL + */ +apr_table_t *h2_stream_get_trailers(h2_stream *stream); + /** * Set the suspended state of the stream. * @param stream the stream to change state on @@ -291,16 +302,6 @@ int h2_stream_needs_submit(h2_stream *stream); */ apr_status_t h2_stream_submit_pushes(h2_stream *stream); -/** - * Get optional trailers for this stream, may be NULL. Meaningful - * results can only be expected when the end of the response body has - * been reached. - * - * @param stream to ask for trailers - * @return trailers for NULL - */ -apr_table_t *h2_stream_get_trailers(h2_stream *stream); - /** * Get priority information set for this stream. */ diff --git a/mod_http2/h2_util.c b/mod_http2/h2_util.c index 68d12324..5e4a7840 100644 --- a/mod_http2/h2_util.c +++ b/mod_http2/h2_util.c @@ -82,22 +82,33 @@ void h2_util_camel_case_header(char *s, size_t len) } } -static const int BASE64URL_TABLE[] = { - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, 62, -1, -1, -1, 63, 52, 53, 54, 55, 56, 57, - 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, -1, 0, 1, 2, 3, 4, 5, 6, - 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, - 25, -1, -1, -1, -1, -1, -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, - 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, - -1, -1, -1, -1 +static const int BASE64URL_UINT6[] = { +/* 0 1 2 3 4 5 6 7 8 9 a b c d e f */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* 0 */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* 1 */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 62, -1, -1, /* 2 */ + 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, -1, -1, -1, -1, -1, -1, /* 3 */ + -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, /* 4 */ + 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, -1, -1, -1, -1, 63, /* 5 */ + -1, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, /* 6 */ + 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, -1, -1, -1, -1, -1, /* 7 */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* 8 */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* 9 */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* a */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* b */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* c */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* d */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, /* e */ + -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 /* f */ +}; +static const char BASE64URL_CHARS[] = { + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', /* 0 - 9 */ + 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', /* 10 - 19 */ + 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', /* 20 - 29 */ + 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', /* 30 - 39 */ + 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', /* 40 - 49 */ + 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', /* 50 - 59 */ + '8', '9', '-', '_', ' ', ' ', ' ', ' ', ' ', ' ', /* 60 - 69 */ }; apr_size_t h2_util_base64url_decode(const char **decoded, const char *encoded, @@ -109,7 +120,7 @@ apr_size_t h2_util_base64url_decode(const char **decoded, const char *encoded, int n; apr_size_t len, mlen, remain, i; - while (*p && BASE64URL_TABLE[ *p ] == -1) { + while (*p && BASE64URL_UINT6[ *p ] != -1) { ++p; } len = p - e; @@ -119,10 +130,10 @@ apr_size_t h2_util_base64url_decode(const char **decoded, const char *encoded, i = 0; d = (unsigned char*)*decoded; for (; i < mlen; i += 4) { - n = ((BASE64URL_TABLE[ e[i+0] ] << 18) + - (BASE64URL_TABLE[ e[i+1] ] << 12) + - (BASE64URL_TABLE[ e[i+2] ] << 6) + - BASE64URL_TABLE[ e[i+3] ]); + n = ((BASE64URL_UINT6[ e[i+0] ] << 18) + + (BASE64URL_UINT6[ e[i+1] ] << 12) + + (BASE64URL_UINT6[ e[i+2] ] << 6) + + (BASE64URL_UINT6[ e[i+3] ])); *d++ = n >> 16; *d++ = n >> 8 & 0xffu; *d++ = n & 0xffu; @@ -130,21 +141,43 @@ apr_size_t h2_util_base64url_decode(const char **decoded, const char *encoded, remain = len - mlen; switch (remain) { case 2: - n = ((BASE64URL_TABLE[ e[mlen+0] ] << 18) + - (BASE64URL_TABLE[ e[mlen+1] ] << 12)); + n = ((BASE64URL_UINT6[ e[mlen+0] ] << 18) + + (BASE64URL_UINT6[ e[mlen+1] ] << 12)); *d++ = n >> 16; break; case 3: - n = ((BASE64URL_TABLE[ e[mlen+0] ] << 18) + - (BASE64URL_TABLE[ e[mlen+1] ] << 12) + - (BASE64URL_TABLE[ e[mlen+2] ] << 6)); + n = ((BASE64URL_UINT6[ e[mlen+0] ] << 18) + + (BASE64URL_UINT6[ e[mlen+1] ] << 12) + + (BASE64URL_UINT6[ e[mlen+2] ] << 6)); *d++ = n >> 16; *d++ = n >> 8 & 0xffu; break; default: /* do nothing */ break; } - return len; + return mlen/4*3 + remain; +} + +const char *h2_util_base64url_encode(const char *data, + apr_size_t len, apr_pool_t *pool) +{ + apr_size_t mlen = ((len+2)/3)*3; + apr_size_t slen = (mlen/3)*4; + apr_size_t i; + const unsigned char *udata = (const unsigned char*)data; + char *enc, *p = apr_pcalloc(pool, slen+1); /* 0 terminated */ + + enc = p; + for (i = 0; i < mlen; i+= 3) { + *p++ = BASE64URL_CHARS[ (udata[i] >> 2) & 0x3fu ]; + *p++ = BASE64URL_CHARS[ (udata[i] << 4) + + ((i+1 < len)? (udata[i+1] >> 4) : 0) & 0x3fu ]; + *p++ = BASE64URL_CHARS[ (udata[i+1] << 2) + + ((i+2 < len)? (udata[i+2] >> 6) : 0) & 0x3fu ]; + *p++ = (i+2 < len)? BASE64URL_CHARS[ udata[i+2] & 0x3fu ] : '='; + } + + return enc; } int h2_util_contains_token(apr_pool_t *pool, const char *s, const char *token) @@ -506,7 +539,7 @@ int h2_util_bb_has_data(apr_bucket_brigade *bb) b != APR_BRIGADE_SENTINEL(bb); b = APR_BUCKET_NEXT(b)) { - if (!APR_BUCKET_IS_METADATA(b)) { + if (!AP_BUCKET_IS_EOR(b)) { return 1; } } diff --git a/mod_http2/h2_util.h b/mod_http2/h2_util.h index 10ad7d6b..1730e00c 100644 --- a/mod_http2/h2_util.h +++ b/mod_http2/h2_util.h @@ -49,6 +49,8 @@ const char *h2_util_first_token_match(apr_pool_t *pool, const char *s, apr_size_t h2_util_base64url_decode(const char **decoded, const char *encoded, apr_pool_t *pool); +const char *h2_util_base64url_encode(const char *data, + apr_size_t len, apr_pool_t *pool); #define H2_HD_MATCH_LIT(l, name, nlen) \ ((nlen == sizeof(l) - 1) && !apr_strnatcasecmp(l, name)) diff --git a/mod_http2/h2_version.h b/mod_http2/h2_version.h index 648dcec4..25a2b970 100644 --- a/mod_http2/h2_version.h +++ b/mod_http2/h2_version.h @@ -26,7 +26,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "1.1.0" +#define MOD_HTTP2_VERSION "1.2.0" /** * @macro @@ -34,7 +34,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x010100 +#define MOD_HTTP2_VERSION_NUM 0x010200 #endif /* mod_h2_h2_version_h */ diff --git a/mod_http2/mod_http2.c b/mod_http2/mod_http2.c index 53a460fe..a44661c9 100644 --- a/mod_http2/mod_http2.c +++ b/mod_http2/mod_http2.c @@ -18,6 +18,7 @@ #include #include +#include #include #include @@ -27,6 +28,7 @@ #include "h2_stream.h" #include "h2_alt_svc.h" #include "h2_conn.h" +#include "h2_filter.h" #include "h2_task.h" #include "h2_session.h" #include "h2_config.h" @@ -156,6 +158,9 @@ static void h2_hooks(apr_pool_t *pool) /* Setup subprocess env for certain variables */ ap_hook_fixups(h2_h2_fixups, NULL,NULL, APR_HOOK_MIDDLE); + + /* test http2 connection status handler */ + ap_hook_handler(h2_filter_h2_status_handler, NULL, NULL, APR_HOOK_MIDDLE); } static char *value_of_HTTP2(apr_pool_t *p, server_rec *s,