diff --git a/include/ap_mmn.h b/include/ap_mmn.h index 00475bf5dfe..85dd97d0990 100644 --- a/include/ap_mmn.h +++ b/include/ap_mmn.h @@ -603,6 +603,10 @@ * and AP_REQUEST_TRUSTED_CT BNOTE. * 20120211.133 (2.4.60-dev) Add ap_proxy_fixup_uds_filename() * 20120211.134 (2.4.60-dev) AP_SLASHES and AP_IS_SLASH + * 20120211.135 (2.4.59-dev) Add CONN_STATE_ASYNC_WAITIO, CONN_STATE_KEEPALIVE + * and CONN_STATE_PROCESSING + * 20120211.136 (2.4.59-dev) Add wait_io field to struct process_score + * 20120211.137 (2.4.59-dev) Add AP_MPMQ_CAN_WAITIO */ #define MODULE_MAGIC_COOKIE 0x41503234UL /* "AP24" */ @@ -610,7 +614,7 @@ #ifndef MODULE_MAGIC_NUMBER_MAJOR #define MODULE_MAGIC_NUMBER_MAJOR 20120211 #endif -#define MODULE_MAGIC_NUMBER_MINOR 134 /* 0...n */ +#define MODULE_MAGIC_NUMBER_MINOR 137 /* 0...n */ /** * Determine if the server's current MODULE_MAGIC_NUMBER is at least a diff --git a/include/ap_mpm.h b/include/ap_mpm.h index e3a58aa2a3c..158496fd7a8 100644 --- a/include/ap_mpm.h +++ b/include/ap_mpm.h @@ -178,6 +178,9 @@ AP_DECLARE(apr_status_t) ap_os_create_privileged_process( #define AP_MPMQ_GENERATION 15 /** MPM can drive serf internally */ #define AP_MPMQ_HAS_SERF 16 +/* 17-18 are trunk only */ +/** MPM supports CONN_STATE_ASYNC_WAITIO */ +#define AP_MPMQ_CAN_WAITIO 19 /** @} */ /** diff --git a/include/httpd.h b/include/httpd.h index 3aa05ba64ae..21a91f37241 100644 --- a/include/httpd.h +++ b/include/httpd.h @@ -453,13 +453,13 @@ AP_DECLARE(const char *) ap_get_server_built(void); /* non-HTTP status codes returned by hooks */ -#define OK 0 /**< Module has handled this stage. */ -#define DECLINED -1 /**< Module declines to handle */ -#define DONE -2 /**< Module has served the response completely - * - it's safe to die() with no more output - */ -#define SUSPENDED -3 /**< Module will handle the remainder of the request. - * The core will never invoke the request again, */ +#define OK 0 /**< Module has handled this stage. */ +#define DECLINED -1 /**< Module declines to handle */ +#define DONE -2 /**< Module has served the response completely + * - it's safe to die() with no more output + */ +#define SUSPENDED -3 /**< Module will handle the remainder of the request. + * The core will never invoke the request again */ /** Returned by the bottom-most filter if no data was written. * @see ap_pass_brigade(). */ @@ -1256,16 +1256,25 @@ struct conn_rec { * only be set by the MPM. Use CONN_STATE_LINGER outside of the MPM. */ typedef enum { - CONN_STATE_CHECK_REQUEST_LINE_READABLE, - CONN_STATE_READ_REQUEST_LINE, - CONN_STATE_HANDLER, - CONN_STATE_WRITE_COMPLETION, - CONN_STATE_SUSPENDED, - CONN_STATE_LINGER, /* connection may be closed with lingering */ - CONN_STATE_LINGER_NORMAL, /* MPM has started lingering close with normal timeout */ - CONN_STATE_LINGER_SHORT, /* MPM has started lingering close with short timeout */ - - CONN_STATE_NUM /* Number of states (keep/kept last) */ + CONN_STATE_KEEPALIVE, /* Kept alive in the MPM (using KeepAliveTimeout) */ + CONN_STATE_PROCESSING, /* Processed by process_connection hooks */ + CONN_STATE_HANDLER, /* Processed by the modules handlers */ + CONN_STATE_WRITE_COMPLETION, /* Flushed by the MPM before entering CONN_STATE_KEEPALIVE */ + CONN_STATE_SUSPENDED, /* Suspended in the MPM until ap_run_resume_suspended() */ + CONN_STATE_LINGER, /* MPM flushes then closes the connection with lingering */ + CONN_STATE_LINGER_NORMAL, /* MPM has started lingering close with normal timeout */ + CONN_STATE_LINGER_SHORT, /* MPM has started lingering close with short timeout */ + + CONN_STATE_ASYNC_WAITIO, /* Returning this state to the MPM will make it wait for + * the connection to be readable or writable according to + * c->cs->sense (resp. CONN_SENSE_WANT_READ or _WRITE), + * using the configured Timeout */ + + CONN_STATE_NUM, /* Number of states (keep here before aliases) */ + + /* Aliases (legacy) */ + CONN_STATE_CHECK_REQUEST_LINE_READABLE = CONN_STATE_KEEPALIVE, + CONN_STATE_READ_REQUEST_LINE = CONN_STATE_PROCESSING, } conn_state_e; typedef enum { diff --git a/include/scoreboard.h b/include/scoreboard.h index 0142aa9204a..4af9132031a 100644 --- a/include/scoreboard.h +++ b/include/scoreboard.h @@ -144,13 +144,14 @@ struct process_score { * connections (for async MPMs) */ apr_uint32_t connections; /* total connections (for async MPMs) */ - apr_uint32_t write_completion; /* async connections doing write completion */ + apr_uint32_t write_completion; /* async connections in write completion */ apr_uint32_t lingering_close; /* async connections in lingering close */ apr_uint32_t keep_alive; /* async connections in keep alive */ apr_uint32_t suspended; /* connections suspended by some module */ int bucket; /* Listener bucket used by this child; this field is DEPRECATED * and no longer updated by the MPMs (i.e. always zero). */ + apr_uint32_t wait_io; /* async connections waiting an IO in the MPM */ }; /* Scoreboard is now in 'local' memory, since it isn't updated once created, diff --git a/modules/generators/mod_status.c b/modules/generators/mod_status.c index 2cb38c747fb..c1c856d41dd 100644 --- a/modules/generators/mod_status.c +++ b/modules/generators/mod_status.c @@ -564,7 +564,7 @@ static int status_handler(request_rec *r) ap_rputs("", r); if (is_async) { - int write_completion = 0, lingering_close = 0, keep_alive = 0, + int wait_io = 0, write_completion = 0, lingering_close = 0, keep_alive = 0, connections = 0, stopping = 0, procs = 0; if (!short_report) ap_rputs("\n\n\n" @@ -572,15 +572,17 @@ static int status_handler(request_rec *r) "" "" "\n" - "" - "\n" + "" + "\n" "" "" - "\n", r); + "" + "\n", r); for (i = 0; i < server_limit; ++i) { ps_record = ap_get_scoreboard_process(i); if (ps_record->pid) { connections += ps_record->connections; + wait_io += ps_record->wait_io; write_completion += ps_record->write_completion; keep_alive += ps_record->keep_alive; lingering_close += ps_record->lingering_close; @@ -600,7 +602,7 @@ static int status_handler(request_rec *r) "" "" "" - "" + "" "\n", i, ps_record->pid, dying, old, @@ -609,6 +611,7 @@ static int status_handler(request_rec *r) thread_busy_buffer[i], thread_graceful_buffer[i], thread_idle_buffer[i], + ps_record->wait_io, ps_record->write_completion, ps_record->keep_alive, ps_record->lingering_close); @@ -620,23 +623,26 @@ static int status_handler(request_rec *r) "" "" "" - "" + "" "\n
PIDStoppingConnectionsThreadsAsync connections
ThreadsAsync connections
totalacceptingbusygracefulidlewritingkeep-aliveclosing
wait-iowritingkeep-aliveclosing
%s%s%u%s%u%u%u%u%u%u%u%u%u%u
%d%d%d %d%d%d%d%d%d%d%d%d%d
\n", procs, stopping, connections, busy, graceful, idle, - write_completion, keep_alive, lingering_close); + wait_io, write_completion, keep_alive, + lingering_close); } else { ap_rprintf(r, "Processes: %d\n" "Stopping: %d\n" "ConnsTotal: %d\n" + "ConnsAsyncWaitIO: %d\n" "ConnsAsyncWriting: %d\n" "ConnsAsyncKeepAlive: %d\n" "ConnsAsyncClosing: %d\n", procs, stopping, connections, - write_completion, keep_alive, lingering_close); + wait_io, write_completion, keep_alive, + lingering_close); } } diff --git a/modules/http/http_core.c b/modules/http/http_core.c index c6cb473dbc8..a86465085d2 100644 --- a/modules/http/http_core.c +++ b/modules/http/http_core.c @@ -138,9 +138,9 @@ static int ap_process_http_async_connection(conn_rec *c) conn_state_t *cs = c->cs; AP_DEBUG_ASSERT(cs != NULL); - AP_DEBUG_ASSERT(cs->state == CONN_STATE_READ_REQUEST_LINE); + AP_DEBUG_ASSERT(cs->state == CONN_STATE_PROCESSING); - if (cs->state == CONN_STATE_READ_REQUEST_LINE) { + if (cs->state == CONN_STATE_PROCESSING) { ap_update_child_status_from_conn(c->sbh, SERVER_BUSY_READ, c); if (ap_extended_status) { ap_set_conn_count(c->sbh, r, c->keepalives); diff --git a/modules/http2/h2_c1.c b/modules/http2/h2_c1.c index afb26fc0737..626e665b3fc 100644 --- a/modules/http2/h2_c1.c +++ b/modules/http2/h2_c1.c @@ -47,23 +47,25 @@ static struct h2_workers *workers; -static int async_mpm; +static int async_mpm, mpm_can_waitio; APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_in) *h2_c_logio_add_bytes_in; APR_OPTIONAL_FN_TYPE(ap_logio_add_bytes_out) *h2_c_logio_add_bytes_out; apr_status_t h2_c1_child_init(apr_pool_t *pool, server_rec *s) { - apr_status_t status = APR_SUCCESS; int minw, maxw; apr_time_t idle_limit; - status = ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm); - if (status != APR_SUCCESS) { + if (ap_mpm_query(AP_MPMQ_IS_ASYNC, &async_mpm)) { /* some MPMs do not implemnent this */ async_mpm = 0; - status = APR_SUCCESS; } +#ifdef AP_MPMQ_CAN_WAITIO + if (!async_mpm || ap_mpm_query(AP_MPMQ_CAN_WAITIO, &mpm_can_waitio)) { + mpm_can_waitio = 0; + } +#endif h2_config_init(pool); @@ -113,23 +115,22 @@ apr_status_t h2_c1_setup(conn_rec *c, request_rec *r, server_rec *s) return rv; } -apr_status_t h2_c1_run(conn_rec *c) +int h2_c1_run(conn_rec *c) { apr_status_t status; - int mpm_state = 0; + int mpm_state = 0, keepalive = 0; h2_conn_ctx_t *conn_ctx = h2_conn_ctx_get(c); ap_assert(conn_ctx); ap_assert(conn_ctx->session); + c->clogging_input_filters = 0; do { if (c->cs) { - c->cs->sense = CONN_SENSE_DEFAULT; c->cs->state = CONN_STATE_HANDLER; } - status = h2_session_process(conn_ctx->session, async_mpm); - - if (APR_STATUS_IS_EOF(status)) { + status = h2_session_process(conn_ctx->session, async_mpm, &keepalive); + if (status != APR_SUCCESS) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, H2_SSSN_LOG(APLOGNO(03045), conn_ctx->session, "process, closing conn")); @@ -152,24 +153,51 @@ apr_status_t h2_c1_run(conn_rec *c) case H2_SESSION_ST_IDLE: case H2_SESSION_ST_BUSY: case H2_SESSION_ST_WAIT: - c->cs->state = CONN_STATE_WRITE_COMPLETION; - if (c->cs && !conn_ctx->session->remote.emitted_count) { - /* let the MPM know that we are not done and want - * the Timeout behaviour instead of a KeepAliveTimeout + if (keepalive) { + /* Flush then keep-alive */ + c->cs->sense = CONN_SENSE_DEFAULT; + c->cs->state = CONN_STATE_WRITE_COMPLETION; + } + else { + /* Let the MPM know that we are not done and want to wait + * for read using Timeout instead of KeepAliveTimeout. * See PR 63534. */ c->cs->sense = CONN_SENSE_WANT_READ; +#ifdef AP_MPMQ_CAN_WAITIO + if (mpm_can_waitio) { + /* This tells the MPM to wait for the connection to be + * readable (CONN_SENSE_WANT_READ) within the configured + * Timeout and then come back to the process_connection() + * hooks again when ready. + */ + c->cs->state = CONN_STATE_ASYNC_WAITIO; + } + else +#endif + { + /* This is a compat workaround to do the same using the + * CONN_STATE_WRITE_COMPLETION state but with both + * CONN_SENSE_WANT_READ to wait for readability rather + * than writing and c->clogging_input_filters to force + * reentering the process_connection() hooks from any + * state when ready. This somehow will use Timeout too. + */ + c->cs->state = CONN_STATE_WRITE_COMPLETION; + c->clogging_input_filters = 1; + } } break; + case H2_SESSION_ST_CLEANUP: case H2_SESSION_ST_DONE: default: c->cs->state = CONN_STATE_LINGER; - break; + break; } } - return APR_SUCCESS; + return OK; } apr_status_t h2_c1_pre_close(struct h2_conn_ctx_t *ctx, conn_rec *c) @@ -275,8 +303,7 @@ static int h2_c1_hook_process_connection(conn_rec* c) return !OK; } } - h2_c1_run(c); - return OK; + return h2_c1_run(c); declined: ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, c, "h2_h2, declined"); diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c index 2aeea42b5df..b5153082b56 100644 --- a/modules/http2/h2_mplx.c +++ b/modules/http2/h2_mplx.c @@ -397,6 +397,7 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) typedef struct { int stream_count; int stream_want_send; + int stream_send_win_exhausted; } stream_iter_aws_t; static int m_stream_want_send_data(void *ctx, void *stream) @@ -419,6 +420,29 @@ int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m) return x.stream_count && (x.stream_count == x.stream_want_send); } +static int m_stream_send_win_exh(void *ctx, void *s) +{ + h2_stream *stream = s; + int win; + stream_iter_aws_t *x = ctx; + ++x->stream_count; + win = nghttp2_session_get_stream_remote_window_size(stream->session->ngh2, stream->id); + if (win == 0) + ++x->stream_send_win_exhausted; + return 1; +} + +int h2_mplx_c1_all_streams_send_win_exhausted(h2_mplx *m) +{ + stream_iter_aws_t x; + x.stream_count = 0; + x.stream_send_win_exhausted = 0; + H2_MPLX_ENTER(m); + h2_ihash_iter(m->streams, m_stream_send_win_exh, &x); + H2_MPLX_LEAVE(m); + return x.stream_count && (x.stream_count == x.stream_send_win_exhausted); +} + static int m_report_stream_iter(void *ctx, void *val) { h2_mplx *m = ctx; h2_stream *stream = val; diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h index 860f9160397..12e36f766f4 100644 --- a/modules/http2/h2_mplx.h +++ b/modules/http2/h2_mplx.h @@ -196,6 +196,11 @@ apr_status_t h2_mplx_c1_streams_do(h2_mplx *m, h2_mplx_stream_cb *cb, void *ctx) */ int h2_mplx_c1_all_streams_want_send_data(h2_mplx *m); +/** + * Return != 0 iff all open streams have send window exhausted + */ +int h2_mplx_c1_all_streams_send_win_exhausted(h2_mplx *m); + /** * A stream has been RST_STREAM by the client. Abort * any processing going on and remove from processing diff --git a/modules/http2/h2_session.c b/modules/http2/h2_session.c index 5724fdadb01..ba248d0cc27 100644 --- a/modules/http2/h2_session.c +++ b/modules/http2/h2_session.c @@ -323,8 +323,8 @@ static int on_header_cb(nghttp2_session *ngh2, const nghttp2_frame *frame, (!stream->rtmp || stream->rtmp->http_status == H2_HTTP_STATUS_UNSET || /* We accept a certain amount of failures in order to reply - * with an informative HTTP error response like 413. But if the - * client is too wrong, we fail the request a RESET of the stream */ + * with an informative HTTP error response like 413. But of the + * client is too wrong, we RESET the stream */ stream->request_headers_failed > 100)) { return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; } @@ -1762,12 +1762,22 @@ static void unblock_c1_out(h2_session *session) { } } -apr_status_t h2_session_process(h2_session *session, int async) +static int h2_send_flow_blocked(h2_session *session) +{ + /* We are completely send blocked if either the connection window + * is 0 or all stream flow windows are 0. */ + return ((nghttp2_session_get_remote_window_size(session->ngh2) <= 0) || + h2_mplx_c1_all_streams_send_win_exhausted(session->mplx)); +} + +apr_status_t h2_session_process(h2_session *session, int async, + int *pkeepalive) { apr_status_t status = APR_SUCCESS; conn_rec *c = session->c1; int rv, mpm_state, trace = APLOGctrace3(c); + *pkeepalive = 0; if (trace) { ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c, H2_SSSN_MSG(session, "process start, async=%d"), async); @@ -1922,6 +1932,14 @@ apr_status_t h2_session_process(h2_session *session, int async) break; case H2_SESSION_ST_WAIT: + /* In this state, we might have returned processing to the MPM + * before. On a connection socket event, we are invoked again and + * need to process any input before proceeding. */ + h2_c1_read(session); + if (session->state != H2_SESSION_ST_WAIT) { + break; + } + status = h2_c1_io_assure_flushed(&session->io); if (APR_SUCCESS != status) { h2_session_dispatch_event(session, H2_SESSION_EV_CONN_ERROR, status, NULL); @@ -1934,8 +1952,16 @@ apr_status_t h2_session_process(h2_session *session, int async) break; } } - /* No IO happening and input is exhausted. Make sure we have - * flushed any possibly pending output and then wait with + else if (async && h2_send_flow_blocked(session)) { + /* By returning to the MPM, we do not block a worker + * and async wait for the client send window updates. */ + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, status, c, + H2_SSSN_LOG(APLOGNO(10502), session, + "BLOCKED, return to mpm c1 monitoring")); + goto leaving; + } + + /* No IO happening and input is exhausted. Wait with * the c1 connection timeout for sth to happen in our c1/c2 sockets/pipes */ ap_log_cerror(APLOG_MARK, APLOG_TRACE2, status, c, H2_SSSN_MSG(session, "polling timeout=%d, open_streams=%d"), @@ -1976,9 +2002,13 @@ apr_status_t h2_session_process(h2_session *session, int async) } leaving: + /* entering KeepAlive timing when we have no more open streams AND + * we have processed at least one stream. */ + *pkeepalive = (session->open_streams == 0 && session->remote.emitted_count); if (trace) { - ap_log_cerror( APLOG_MARK, APLOG_TRACE3, status, c, - H2_SSSN_MSG(session, "process returns")); + ap_log_cerror(APLOG_MARK, APLOG_TRACE3, status, c, + H2_SSSN_MSG(session, "process returns, keepalive=%d"), + *pkeepalive); } h2_mplx_c1_going_keepalive(session->mplx); diff --git a/modules/http2/h2_session.h b/modules/http2/h2_session.h index 3328509de8a..2c8f334cce0 100644 --- a/modules/http2/h2_session.h +++ b/modules/http2/h2_session.h @@ -144,8 +144,11 @@ void h2_session_event(h2_session *session, h2_session_event_t ev, * error occurred. * * @param session the sessionm to process + * @param async if mpm is async + * @param pkeepalive on return, != 0 if connection to be put into keepalive + * behaviour and timouts */ -apr_status_t h2_session_process(h2_session *session, int async); +apr_status_t h2_session_process(h2_session *session, int async, int *pkeepalive); /** * Last chance to do anything before the connection is closed. diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h index 7e7da2106aa..bf222078e7e 100644 --- a/modules/http2/h2_version.h +++ b/modules/http2/h2_version.h @@ -27,7 +27,7 @@ * @macro * Version number of the http2 module as c string */ -#define MOD_HTTP2_VERSION "2.0.22" +#define MOD_HTTP2_VERSION "2.0.27" /** * @macro @@ -35,7 +35,7 @@ * release. This is a 24 bit number with 8 bits for major number, 8 bits * for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203. */ -#define MOD_HTTP2_VERSION_NUM 0x020016 +#define MOD_HTTP2_VERSION_NUM 0x02001b #endif /* mod_h2_h2_version_h */ diff --git a/modules/lua/lua_request.c b/modules/lua/lua_request.c index bec85807545..cfb89b80cab 100644 --- a/modules/lua/lua_request.c +++ b/modules/lua/lua_request.c @@ -1264,6 +1264,10 @@ static int lua_ap_scoreboard_process(lua_State *L) lua_pushnumber(L, ps_record->suspended); lua_settable(L, -3); + lua_pushstring(L, "wait_io"); + lua_pushnumber(L, ps_record->wait_io); + lua_settable(L, -3); + lua_pushstring(L, "write_completion"); lua_pushnumber(L, ps_record->write_completion); lua_settable(L, -3); diff --git a/server/mpm/event/event.c b/server/mpm/event/event.c index 7e7a7e91baf..050d823809b 100644 --- a/server/mpm/event/event.c +++ b/server/mpm/event/event.c @@ -146,6 +146,8 @@ #define apr_time_from_msec(x) (x * 1000) #endif +#define CONN_STATE_IS_LINGERING_CLOSE(s) ((s) >= CONN_STATE_LINGER && \ + (s) <= CONN_STATE_LINGER_SHORT) #ifndef MAX_SECS_TO_LINGER #define MAX_SECS_TO_LINGER 30 #endif @@ -246,8 +248,11 @@ struct event_conn_state_t { conn_state_t pub; /** chaining in defer_linger_chain */ struct event_conn_state_t *chain; - /** Is lingering close from defer_lingering_close()? */ - int deferred_linger; + unsigned int + /** Is lingering close from defer_lingering_close()? */ + deferred_linger :1, + /** Has ap_start_lingering_close() been called? */ + linger_started :1; }; APR_RING_HEAD(timeout_head_t, event_conn_state_t); @@ -262,12 +267,14 @@ struct timeout_queue { /* * Several timeout queues that use different timeouts, so that we always can * simply append to the end. + * waitio_q uses vhost's TimeOut * write_completion_q uses vhost's TimeOut * keepalive_q uses vhost's KeepAliveTimeOut * linger_q uses MAX_SECS_TO_LINGER * short_linger_q uses SECONDS_TO_LINGER */ -static struct timeout_queue *write_completion_q, +static struct timeout_queue *waitio_q, + *write_completion_q, *keepalive_q, *linger_q, *short_linger_q; @@ -413,7 +420,8 @@ static event_child_bucket *all_buckets, /* All listeners buckets */ *my_bucket; /* Current child bucket */ struct event_srv_cfg_s { - struct timeout_queue *wc_q, + struct timeout_queue *io_q, + *wc_q, *ka_q; }; @@ -689,6 +697,9 @@ static int event_query(int query_code, int *result, apr_status_t *rv) case AP_MPMQ_GENERATION: *result = retained->mpm->my_generation; break; + case AP_MPMQ_CAN_WAITIO: + *result = 1; + break; default: *rv = APR_ENOTIMPL; break; @@ -884,7 +895,7 @@ static void close_connection(event_conn_state_t *cs) */ static int shutdown_connection(event_conn_state_t *cs) { - if (cs->pub.state < CONN_STATE_LINGER) { + if (!CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)) { apr_table_setn(cs->c->notes, "short-lingering-close", "1"); defer_lingering_close(cs); } @@ -963,11 +974,18 @@ static int event_post_read_request(request_rec *r) /* Forward declare */ static void process_lingering_close(event_conn_state_t *cs); -static void update_reqevents_from_sense(event_conn_state_t *cs, int sense) +static void update_reqevents_from_sense(event_conn_state_t *cs, + int default_sense) { - if (sense < 0) { + int sense = default_sense; + + if (cs->pub.sense != CONN_SENSE_DEFAULT) { sense = cs->pub.sense; + + /* Reset to default for the next round */ + cs->pub.sense = CONN_SENSE_DEFAULT; } + if (sense == CONN_SENSE_WANT_READ) { cs->pfd.reqevents = APR_POLLIN | APR_POLLHUP; } @@ -979,9 +997,6 @@ static void update_reqevents_from_sense(event_conn_state_t *cs, int sense) * so it shouldn't hurt (ignored otherwise). */ cs->pfd.reqevents |= APR_POLLERR; - - /* Reset to default for the next round */ - cs->pub.sense = CONN_SENSE_DEFAULT; } /* @@ -1020,7 +1035,6 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc &mpm_event_module); cs->pfd.desc_type = APR_POLL_SOCKET; cs->pfd.desc.s = sock; - update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); pt->type = PT_CSD; pt->baton = cs; cs->pfd.client_data = pt; @@ -1033,6 +1047,8 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc if (rc != OK && rc != DONE) { ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(00469) "process_socket: connection aborted"); + close_connection(cs); + return; } /** @@ -1041,17 +1057,15 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc * and there are measurable delays before the * socket is readable due to the first data packet arriving, * it might be better to create the cs on the listener thread - * with the state set to CONN_STATE_CHECK_REQUEST_LINE_READABLE + * with the state set to CONN_STATE_KEEPALIVE * * FreeBSD users will want to enable the HTTP accept filter * module in their kernel for the highest performance * When the accept filter is active, sockets are kept in the * kernel until a HTTP request is received. */ - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; - + cs->pub.state = CONN_STATE_PROCESSING; cs->pub.sense = CONN_SENSE_DEFAULT; - rc = OK; } else { c = cs->c; @@ -1062,83 +1076,124 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc c->id = conn_id; } - if (c->aborted) { - /* do lingering close below */ - cs->pub.state = CONN_STATE_LINGER; + if (CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)) { + goto lingering_close; } - else if (cs->pub.state >= CONN_STATE_LINGER) { - /* fall through */ - } - else { - if (cs->pub.state == CONN_STATE_READ_REQUEST_LINE - /* If we have an input filter which 'clogs' the input stream, - * like mod_ssl used to, lets just do the normal read from input - * filters, like the Worker MPM does. Filters that need to write - * where they would otherwise read, or read where they would - * otherwise write, should set the sense appropriately. - */ - || c->clogging_input_filters) { -read_request: - clogging = c->clogging_input_filters; - if (clogging) { - apr_atomic_inc32(&clogged_count); - } - rc = ap_run_process_connection(c); - if (clogging) { - apr_atomic_dec32(&clogged_count); - } - if (cs->pub.state > CONN_STATE_LINGER) { + + if (cs->pub.state == CONN_STATE_PROCESSING + /* If we have an input filter which 'clogs' the input stream, + * like mod_ssl used to, lets just do the normal read from input + * filters, like the Worker MPM does. Filters that need to write + * where they would otherwise read, or read where they would + * otherwise write, should set the sense appropriately. + */ + || c->clogging_input_filters) { + process_connection: + cs->pub.state = CONN_STATE_PROCESSING; + + clogging = c->clogging_input_filters; + if (clogging) { + apr_atomic_inc32(&clogged_count); + } + rc = ap_run_process_connection(c); + if (clogging) { + apr_atomic_dec32(&clogged_count); + } + /* + * The process_connection hooks should set the appropriate connection + * state upon return, for event MPM to either: + * - CONN_STATE_LINGER: do lingering close; + * - CONN_STATE_WRITE_COMPLETION: flush pending outputs using Timeout + * and wait for next incoming data using KeepAliveTimeout, then come + * back to process_connection() hooks; + * - CONN_STATE_SUSPENDED: suspend the connection such that it now + * interacts with the MPM through suspend/resume_connection() hooks, + * and/or registered poll callbacks (PT_USER), and/or registered + * timed callbacks triggered by timer events; + * - CONN_STATE_ASYNC_WAITIO: wait for read/write-ability of the underlying + * socket using Timeout and come back to process_connection() hooks when + * ready; + * - CONN_STATE_KEEPALIVE: now handled by CONN_STATE_WRITE_COMPLETION + * to flush before waiting for next data (that might depend on it). + * If a process_connection hook returns an error or no hook sets the state + * to one of the above expected value, forcibly close the connection w/ + * CONN_STATE_LINGER. This covers the cases where no process_connection + * hook executes (DECLINED), or one returns OK w/o touching the state (i.e. + * CONN_STATE_PROCESSING remains after the call) which can happen with + * third-party modules not updated to work specifically with event MPM + * while this was expected to do lingering close unconditionally with + * worker or prefork MPMs for instance. + */ + switch (rc) { + case DONE: + rc = OK; /* same as OK, fall through */ + case OK: + if (cs->pub.state == CONN_STATE_PROCESSING) { cs->pub.state = CONN_STATE_LINGER; } - if (rc == DONE) { - rc = OK; + else if (cs->pub.state == CONN_STATE_KEEPALIVE) { + cs->pub.state = CONN_STATE_WRITE_COMPLETION; } + break; + } + if (rc != OK || (cs->pub.state != CONN_STATE_LINGER + && cs->pub.state != CONN_STATE_ASYNC_WAITIO + && cs->pub.state != CONN_STATE_WRITE_COMPLETION + && cs->pub.state != CONN_STATE_SUSPENDED)) { + ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10111) + "process_socket: connection processing returned %i " + "(%sstate %i): closing", + rc, rc ? "" : "unexpected ", (int)cs->pub.state); + cs->pub.state = CONN_STATE_LINGER; + } + else if (c->aborted) { + cs->pub.state = CONN_STATE_LINGER; + } + if (cs->pub.state == CONN_STATE_LINGER) { + goto lingering_close; } } - /* - * The process_connection hooks above should set the connection state - * appropriately upon return, for event MPM to either: - * - do lingering close (CONN_STATE_LINGER), - * - wait for readability of the next request with respect to the keepalive - * timeout (state CONN_STATE_CHECK_REQUEST_LINE_READABLE), - * - wait for read/write-ability of the underlying socket with respect to - * its timeout by setting c->clogging_input_filters to 1 and the sense - * to CONN_SENSE_WANT_READ/WRITE (state CONN_STATE_WRITE_COMPLETION), - * - keep flushing the output filters stack in nonblocking mode, and then - * if required wait for read/write-ability of the underlying socket with - * respect to its own timeout (state CONN_STATE_WRITE_COMPLETION); since - * completion at some point may require reads (e.g. SSL_ERROR_WANT_READ), - * an output filter can also set the sense to CONN_SENSE_WANT_READ at any - * time for event MPM to do the right thing, - * - suspend the connection (SUSPENDED) such that it now interacts with - * the MPM through suspend/resume_connection() hooks, and/or registered - * poll callbacks (PT_USER), and/or registered timed callbacks triggered - * by timer events. - * If a process_connection hook returns an error or no hook sets the state - * to one of the above expected value, we forcibly close the connection w/ - * CONN_STATE_LINGER. This covers the cases where no process_connection - * hook executes (DECLINED), or one returns OK w/o touching the state (i.e. - * CONN_STATE_READ_REQUEST_LINE remains after the call) which can happen - * with third-party modules not updated to work specifically with event MPM - * while this was expected to do lingering close unconditionally with - * worker or prefork MPMs for instance. - */ - if (rc != OK || (cs->pub.state >= CONN_STATE_NUM) - || (cs->pub.state < CONN_STATE_LINGER - && cs->pub.state != CONN_STATE_WRITE_COMPLETION - && cs->pub.state != CONN_STATE_CHECK_REQUEST_LINE_READABLE - && cs->pub.state != CONN_STATE_SUSPENDED)) { - ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, c, APLOGNO(10111) - "process_socket: connection processing %s: closing", - rc ? apr_psprintf(c->pool, "returned error %i", rc) - : apr_psprintf(c->pool, "unexpected state %i", - (int)cs->pub.state)); - cs->pub.state = CONN_STATE_LINGER; + + if (cs->pub.state == CONN_STATE_ASYNC_WAITIO) { + /* Set a read/write timeout for this connection, and let the + * event thread poll for read/writeability. + */ + cs->queue_timestamp = apr_time_now(); + notify_suspend(cs); + + ap_update_child_status(cs->sbh, SERVER_BUSY_READ, NULL); + + /* Modules might set c->cs->sense to CONN_SENSE_WANT_WRITE, + * the default is CONN_SENSE_WANT_READ still. + */ + update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); + apr_thread_mutex_lock(timeout_mutex); + TO_QUEUE_APPEND(cs->sc->io_q, cs); + rv = apr_pollset_add(event_pollset, &cs->pfd); + if (rv != APR_SUCCESS && !APR_STATUS_IS_EEXIST(rv)) { + AP_DEBUG_ASSERT(0); + TO_QUEUE_REMOVE(cs->sc->io_q, cs); + apr_thread_mutex_unlock(timeout_mutex); + ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(10503) + "process_socket: apr_pollset_add failure in " + "CONN_STATE_ASYNC_WAITIO"); + close_connection(cs); + signal_threads(ST_GRACEFUL); + } + else { + apr_thread_mutex_unlock(timeout_mutex); + } + return; } if (cs->pub.state == CONN_STATE_WRITE_COMPLETION) { ap_filter_t *output_filter = c->output_filters; apr_status_t rv; + + /* Flush all pending outputs before going to CONN_STATE_KEEPALIVE or + * straight to CONN_STATE_PROCESSING if inputs are pending already. + */ + ap_update_child_status(cs->sbh, SERVER_BUSY_WRITE, NULL); while (output_filter->next != NULL) { output_filter = output_filter->next; @@ -1148,9 +1203,9 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc ap_log_cerror(APLOG_MARK, APLOG_DEBUG, rv, c, APLOGNO(00470) "network write failure in core output filter"); cs->pub.state = CONN_STATE_LINGER; + goto lingering_close; } - else if (c->data_in_output_filters || - cs->pub.sense == CONN_SENSE_WANT_READ) { + if (c->data_in_output_filters || cs->pub.sense == CONN_SENSE_WANT_READ) { /* Still in WRITE_COMPLETION_STATE: * Set a read/write timeout for this connection, and let the * event thread poll for read/writeability. @@ -1158,7 +1213,8 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc cs->queue_timestamp = apr_time_now(); notify_suspend(cs); - update_reqevents_from_sense(cs, -1); + /* Add work to pollset. */ + update_reqevents_from_sense(cs, CONN_SENSE_WANT_WRITE); apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(cs->sc->wc_q, cs); rv = apr_pollset_add(event_pollset, &cs->pfd); @@ -1167,8 +1223,8 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc TO_QUEUE_REMOVE(cs->sc->wc_q, cs); apr_thread_mutex_unlock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_ERR, rv, ap_server_conf, APLOGNO(03465) - "process_socket: apr_pollset_add failure for " - "write completion"); + "process_socket: apr_pollset_add failure in " + "CONN_STATE_WRITE_COMPLETION"); close_connection(cs); signal_threads(ST_GRACEFUL); } @@ -1177,22 +1233,23 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc } return; } - else if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) { + if (c->keepalive != AP_CONN_KEEPALIVE || c->aborted) { cs->pub.state = CONN_STATE_LINGER; + goto lingering_close; } - else if (c->data_in_input_filters) { - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; - goto read_request; - } - else if (!listener_may_exit) { - cs->pub.state = CONN_STATE_CHECK_REQUEST_LINE_READABLE; + if (c->data_in_input_filters) { + goto process_connection; } - else { + if (listener_may_exit) { cs->pub.state = CONN_STATE_LINGER; + goto lingering_close; } + + /* Fall through */ + cs->pub.state = CONN_STATE_KEEPALIVE; } - if (cs->pub.state == CONN_STATE_CHECK_REQUEST_LINE_READABLE) { + if (cs->pub.state == CONN_STATE_KEEPALIVE) { ap_update_child_status(cs->sbh, SERVER_BUSY_KEEPALIVE, NULL); /* It greatly simplifies the logic to use a single timeout value per q @@ -1207,6 +1264,7 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc notify_suspend(cs); /* Add work to pollset. */ + cs->pub.sense = CONN_SENSE_DEFAULT; update_reqevents_from_sense(cs, CONN_SENSE_WANT_READ); apr_thread_mutex_lock(timeout_mutex); TO_QUEUE_APPEND(cs->sc->ka_q, cs); @@ -1233,11 +1291,9 @@ static void process_socket(apr_thread_t *thd, apr_pool_t * p, apr_socket_t * soc return; } + lingering_close: /* CONN_STATE_LINGER[_*] fall through process_lingering_close() */ - if (cs->pub.state >= CONN_STATE_LINGER) { - process_lingering_close(cs); - return; - } + process_lingering_close(cs); } /* conns_this_child has gone to zero or below. See if the admin coded @@ -1250,10 +1306,8 @@ static void check_infinite_requests(void) "Stopping process due to MaxConnectionsPerChild"); signal_threads(ST_GRACEFUL); } - else { - /* keep going */ - conns_this_child = APR_INT32_MAX; - } + /* keep going */ + conns_this_child = APR_INT32_MAX; } static int close_listeners(int *closed) @@ -1488,9 +1542,12 @@ static void process_lingering_close(event_conn_state_t *cs) ap_log_cerror(APLOG_MARK, APLOG_TRACE6, 0, cs->c, "lingering close from state %i", (int)cs->pub.state); - AP_DEBUG_ASSERT(cs->pub.state >= CONN_STATE_LINGER); + AP_DEBUG_ASSERT(CONN_STATE_IS_LINGERING_CLOSE(cs->pub.state)); + + if (!cs->linger_started) { + cs->pub.state = CONN_STATE_LINGER; + cs->linger_started = 1; - if (cs->pub.state == CONN_STATE_LINGER) { /* defer_lingering_close() may have bumped lingering_count already */ if (!cs->deferred_linger) { apr_atomic_inc32(&lingering_count); @@ -1502,12 +1559,11 @@ static void process_lingering_close(event_conn_state_t *cs) close_connection(cs); return; } - - cs->queue_timestamp = apr_time_now(); - /* Clear APR_INCOMPLETE_READ if it was ever set, we'll do the poll() - * at the listener only from now, if needed. - */ + + /* All nonblocking from now, no need for APR_INCOMPLETE_READ either */ + apr_socket_timeout_set(csd, 0); apr_socket_opt_set(csd, APR_INCOMPLETE_READ, 0); + /* * If some module requested a shortened waiting period, only wait for * 2s (SECONDS_TO_LINGER). This is useful for mitigating certain @@ -1519,10 +1575,19 @@ static void process_lingering_close(event_conn_state_t *cs) else { cs->pub.state = CONN_STATE_LINGER_NORMAL; } + cs->pub.sense = CONN_SENSE_DEFAULT; notify_suspend(cs); + + /* One timestamp/duration for the whole lingering close time. + * XXX: This makes the (short_)linger_q not sorted/ordered by expiring + * timeouts whenever multiple schedules are necessary (EAGAIN below), + * but we probabaly don't care since these connections do not count + * for connections_above_limit() and all of them will be killed when + * busy or gracefully stopping anyway. + */ + cs->queue_timestamp = apr_time_now(); } - apr_socket_timeout_set(csd, 0); do { nbytes = sizeof(dummybuf); rv = apr_socket_recv(csd, dummybuf, &nbytes); @@ -1699,24 +1764,20 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) if (APLOGtrace6(ap_server_conf)) { /* trace log status every second */ if (now - last_log > apr_time_from_sec(1)) { - last_log = now; - apr_thread_mutex_lock(timeout_mutex); ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, - "connections: %u (clogged: %u write-completion: %d " - "keep-alive: %d lingering: %d suspended: %u)", + "connections: %u (waitio:%u write-completion:%u" + "keep-alive:%u lingering:%u suspended:%u clogged:%u), " + "workers: %u/%u shutdown", apr_atomic_read32(&connection_count), - apr_atomic_read32(&clogged_count), + apr_atomic_read32(waitio_q->total), apr_atomic_read32(write_completion_q->total), apr_atomic_read32(keepalive_q->total), apr_atomic_read32(&lingering_count), - apr_atomic_read32(&suspended_count)); - if (dying) { - ap_log_error(APLOG_MARK, APLOG_TRACE6, 0, ap_server_conf, - "%u/%u workers shutdown", - apr_atomic_read32(&threads_shutdown), - threads_per_child); - } - apr_thread_mutex_unlock(timeout_mutex); + apr_atomic_read32(&suspended_count), + apr_atomic_read32(&clogged_count), + apr_atomic_read32(&threads_shutdown), + threads_per_child); + last_log = now; } } @@ -1824,8 +1885,14 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) blocking = 1; break; - case CONN_STATE_CHECK_REQUEST_LINE_READABLE: - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; + case CONN_STATE_ASYNC_WAITIO: + cs->pub.state = CONN_STATE_PROCESSING; + remove_from_q = cs->sc->io_q; + blocking = 1; + break; + + case CONN_STATE_KEEPALIVE: + cs->pub.state = CONN_STATE_PROCESSING; remove_from_q = cs->sc->ka_q; break; @@ -1978,23 +2045,28 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) /* Steps below will recompute this. */ queues_next_expiry = 0; - /* Step 1: keepalive timeouts */ + /* Step 1: keepalive queue timeouts are closed */ if (workers_were_busy || dying) { process_keepalive_queue(0); /* kill'em all \m/ */ } else { process_keepalive_queue(now); } - /* Step 2: write completion timeouts */ - process_timeout_queue(write_completion_q, now, - defer_lingering_close); - /* Step 3: (normal) lingering close completion timeouts */ + + /* Step 2: waitio queue timeouts are flushed */ + process_timeout_queue(waitio_q, now, defer_lingering_close); + + /* Step 3: write completion queue timeouts are flushed */ + process_timeout_queue(write_completion_q, now, defer_lingering_close); + + /* Step 4: normal lingering close queue timeouts are closed */ if (dying && linger_q->timeout > short_linger_q->timeout) { /* Dying, force short timeout for normal lingering close */ linger_q->timeout = short_linger_q->timeout; } process_timeout_queue(linger_q, now, shutdown_connection); - /* Step 4: (short) lingering close completion timeouts */ + + /* Step 5: short lingering close queue timeouts are closed */ process_timeout_queue(short_linger_q, now, shutdown_connection); apr_thread_mutex_unlock(timeout_mutex); @@ -2003,11 +2075,12 @@ static void * APR_THREAD_FUNC listener_thread(apr_thread_t * thd, void *dummy) queues_next_expiry > now ? queues_next_expiry - now : -1); - ps->keep_alive = apr_atomic_read32(keepalive_q->total); + ps->wait_io = apr_atomic_read32(waitio_q->total); ps->write_completion = apr_atomic_read32(write_completion_q->total); - ps->connections = apr_atomic_read32(&connection_count); - ps->suspended = apr_atomic_read32(&suspended_count); + ps->keep_alive = apr_atomic_read32(keepalive_q->total); ps->lingering_close = apr_atomic_read32(&lingering_count); + ps->suspended = apr_atomic_read32(&suspended_count); + ps->connections = apr_atomic_read32(&connection_count); } else if ((workers_were_busy || dying) && apr_atomic_read32(keepalive_q->total)) { @@ -3403,7 +3476,7 @@ static void setup_slave_conn(conn_rec *c, void *csd) cs->bucket_alloc = c->bucket_alloc; cs->pfd = mcs->pfd; cs->pub = mcs->pub; - cs->pub.state = CONN_STATE_READ_REQUEST_LINE; + cs->pub.state = CONN_STATE_PROCESSING; cs->pub.sense = CONN_SENSE_DEFAULT; c->cs = &(cs->pub); @@ -3630,16 +3703,17 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, struct { struct timeout_queue *tail, *q; apr_hash_t *hash; - } wc, ka; + } io, wc, ka; /* Not needed in pre_config stage */ if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG) { return OK; } - wc.tail = ka.tail = NULL; + io.hash = apr_hash_make(ptemp); wc.hash = apr_hash_make(ptemp); ka.hash = apr_hash_make(ptemp); + io.tail = wc.tail = ka.tail = NULL; linger_q = TO_QUEUE_MAKE(pconf, apr_time_from_sec(MAX_SECS_TO_LINGER), NULL); @@ -3650,8 +3724,12 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, event_srv_cfg *sc = apr_pcalloc(pconf, sizeof *sc); ap_set_module_config(s->module_config, &mpm_event_module, sc); - if (!wc.tail) { + if (!io.tail) { /* The main server uses the global queues */ + io.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL); + apr_hash_set(io.hash, &s->timeout, sizeof s->timeout, io.q); + io.tail = waitio_q = io.q; + wc.q = TO_QUEUE_MAKE(pconf, s->timeout, NULL); apr_hash_set(wc.hash, &s->timeout, sizeof s->timeout, wc.q); wc.tail = write_completion_q = wc.q; @@ -3664,6 +3742,13 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, else { /* The vhosts use any existing queue with the same timeout, * or their own queue(s) if there isn't */ + io.q = apr_hash_get(io.hash, &s->timeout, sizeof s->timeout); + if (!io.q) { + io.q = TO_QUEUE_MAKE(pconf, s->timeout, io.tail); + apr_hash_set(io.hash, &s->timeout, sizeof s->timeout, io.q); + io.tail = io.tail->next = io.q; + } + wc.q = apr_hash_get(wc.hash, &s->timeout, sizeof s->timeout); if (!wc.q) { wc.q = TO_QUEUE_MAKE(pconf, s->timeout, wc.tail); @@ -3680,6 +3765,7 @@ static int event_post_config(apr_pool_t *pconf, apr_pool_t *plog, ka.tail = ka.tail->next = ka.q; } } + sc->io_q = io.q; sc->wc_q = wc.q; sc->ka_q = ka.q; }