diff --git a/src/app/fdctl/config/default.toml b/src/app/fdctl/config/default.toml index ecebdedec0..d1e13bc16c 100644 --- a/src/app/fdctl/config/default.toml +++ b/src/app/fdctl/config/default.toml @@ -861,7 +861,7 @@ dynamic_port_range = "8900-9000" # underlying link bandwidth. Supporting more streams per # connection currently has a memory footprint cost on the order # of kilobytes per stream, per connection. - max_concurrent_streams_per_connection = 2048 + max_concurrent_streams_per_connection = 32 # QUIC uses a fixed-size pool of streams to use for all the # connections in the QUIC instance. When a new connection is @@ -872,7 +872,7 @@ dynamic_port_range = "8900-9000" # this value must be at least as high as the value of # `max_concurrent_connections` above multiplied by 16, or else # the validator will refuse to start. - stream_pool_cnt = 32768 + stream_pool_cnt = 65536 # Controls how much transactions coming in via TPU can be # reassembled at the same time. Reassembly is required for user @@ -919,7 +919,7 @@ dynamic_port_range = "8900-9000" # # An idle connection will be terminated if it remains idle # longer than this threshold. - idle_timeout_millis = 10000 + idle_timeout_millis = 2000 # QUIC retry is a feature to combat new connection request # spamming. See rfc9000 8.1.2 for more details. This flag diff --git a/src/waltz/quic/fd_quic.c b/src/waltz/quic/fd_quic.c index 0366ca2c45..c92f696d40 100644 --- a/src/waltz/quic/fd_quic.c +++ b/src/waltz/quic/fd_quic.c @@ -1015,8 +1015,9 @@ fd_quic_stream_send( fd_quic_stream_t * stream, /* stream_id & 2 == 0 is bidir stream_id & 1 == 0 is client */ if( FD_UNLIKELY( ( ( (uint)stream_id & 2u ) == 2u ) & - ( ( (uint)stream_id & 1u ) != (uint)conn->server ) ) ) + ( ( (uint)stream_id & 1u ) != (uint)conn->server ) ) ) { return FD_QUIC_SEND_ERR_INVAL_STREAM; + } if( FD_UNLIKELY( conn->state != FD_QUIC_CONN_STATE_ACTIVE ) ) { if( conn->state == FD_QUIC_CONN_STATE_HANDSHAKE || @@ -1400,19 +1401,6 @@ fd_quic_handle_v1_initial( fd_quic_t * quic, /* This is the initial packet before retry. */ if( initial->token_len == 0 ) { - /* FIXME we pass the "tp" here to allow the function to add the retry_source_connection_ip - * transport parameter to be added. - * This is incorrect, since: - * 1. "tp" is transient, and will likely be modified before it is encoded into a packet - * 2. RETRY is supposed to be stateless - * - * The fix is thus: - * 1. put the new source connection id (or some seed from which it's generated) - * into the token in the RETRY packet - * 2. upon receiving the token back in a new INITIAL packet, unpack the connection id - * from the token and store in the newly created connection id - * 3. when the "tp" is about to be encoded, add the "retry_source_connection_ip" - * from the appropriate member of "conn" */ if( FD_UNLIKELY( fd_quic_send_retry( quic, pkt, &orig_dst_conn_id, &new_conn_id, @@ -2135,8 +2123,10 @@ fd_quic_reschedule_conn( fd_quic_conn_t * conn, ulong service_interval = fd_quic_get_service_interval( quic ); + fd_rng_t * rng = state->_rng; + ulong jitter = fd_rng_uint( rng ) & 0x3ffUL; timeout = fd_ulong_min( timeout, now + service_interval ); - timeout = fd_ulong_max( timeout, now + 1UL ); + timeout = fd_ulong_max( timeout, now + jitter ); if( conn->in_schedule ) { if( timeout >= conn->sched_service_time ) return; @@ -2779,7 +2769,7 @@ fd_quic_aio_cb_receive( void * context, t1 = fd_quic_now( quic ); ulong delta = t1 - t0; if( delta > (ulong)500e3 ) { - FD_LOG_WARNING(( "CALLBACK - took %lu t0: %lu t1: %lu batch_cnt: %lu", delta, t0, t1, (ulong)batch_cnt )); + FD_LOG_WARNING(( "CALLBACK - took %ld t0: %lu t1: %lu batch_cnt: %lu", delta, t0, t1, (ulong)batch_cnt )); } ) @@ -3017,6 +3007,34 @@ fd_quic_tls_cb_handshake_complete( fd_quic_tls_hs_t * hs, /* Trigger allocation of streams to connections */ fd_quic_state_t * state = fd_quic_get_state( conn->quic ); + + /* allocate client initiated unidir streams */ + uint stream_type = FD_QUIC_TYPE_UNIDIR << 1u; + + if( conn->server ) { + fd_quic_stream_pool_t * stream_pool = state->stream_pool; + + /* assign streams to connection */ + while( conn->cur_stream_cnt[stream_type] < FD_QUIC_STREAM_MIN ) { + /* obtain stream from stream pool */ + fd_quic_stream_t * stream = fd_quic_stream_pool_alloc( stream_pool ); + if( FD_UNLIKELY( !stream ) ) { + /* should never happen */ + break; + } + + int rtn = fd_quic_assign_stream( conn, stream_type, stream ); + if( FD_UNLIKELY( rtn == FD_QUIC_FAILED ) ) { + /* should never happen */ + /* return stream */ + fd_quic_stream_pool_free( stream_pool, stream ); + break; + } + } + + fd_quic_conn_update_max_streams( conn, (uint)stream_type >> 1U ); + } + state->flags |= FD_QUIC_FLAGS_ASSIGN_STREAMS; return; @@ -5959,14 +5977,32 @@ fd_quic_stream_free( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_stream_t * stream->stream_id = ~0UL; - /* add to stream_pool */ + /* get state */ fd_quic_state_t * state = fd_quic_get_state( quic ); - fd_quic_stream_pool_free( state->stream_pool, stream ); /* decrement current stream count */ ulong type = stream_id & 3UL; conn->cur_stream_cnt[type]--; + /* here, if the connection will become below the minimum streams */ + /* we want to keep it allocated */ + /* else we want to return it to the pool */ + if( FD_UNLIKELY( conn->server + && conn->state == FD_QUIC_CONN_STATE_ACTIVE + && type == FD_QUIC_STREAM_TYPE_UNI_CLIENT + && conn->cur_stream_cnt[type] < FD_QUIC_STREAM_MIN ) ) { + int rtn = fd_quic_assign_stream( conn, type, stream ); + if( FD_UNLIKELY( rtn == FD_QUIC_FAILED ) ) { + FD_DEBUG( FD_LOG_WARNING(( "fd_quic_assign_stream failed" )); ) + + /* add to stream_pool */ + fd_quic_stream_pool_free( state->stream_pool, stream ); + } + } else { + /* add to stream_pool */ + fd_quic_stream_pool_free( state->stream_pool, stream ); + } + fd_quic_conn_update_max_streams( conn, (uint)type >> 1U ); fd_quic_stream_reclaim( conn, stream_id & 3 ); @@ -6687,6 +6723,8 @@ fd_quic_assign_streams( fd_quic_t * quic ) { if( FD_UNLIKELY( conn->state != FD_QUIC_CONN_STATE_ACTIVE && conn->state != FD_QUIC_CONN_STATE_HANDSHAKE_COMPLETE ) ) { + FD_DEBUG( FD_LOG_WARNING(( "ASSIGN - conn chosen has unusable state: %u", + (uint)conn->state )); ) fd_quic_stream_pool_free( stream_pool, stream ); break; } @@ -6716,17 +6754,23 @@ fd_quic_assign_stream( fd_quic_conn_t * conn, ulong stream_type, fd_quic_stream_ ulong next_stream_id = conn->next_stream_id[stream_type]; /* check if connection is allowed to be assigned the stream */ - if( FD_UNLIKELY( next_stream_id >= conn->tgt_sup_stream_id[stream_type] ) ) { + if( FD_UNLIKELY( conn->cur_stream_cnt[stream_type] >= FD_QUIC_STREAM_MIN && + next_stream_id >= conn->tgt_sup_stream_id[stream_type] ) ) { + FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); ) return FD_QUIC_FAILED; } uint dirtype = ( (uint)stream_type & 2U ) >> 1U; - if( FD_UNLIKELY( dirtype==FD_QUIC_TYPE_BIDIR ) ) return FD_QUIC_FAILED; + if( FD_UNLIKELY( dirtype==FD_QUIC_TYPE_BIDIR ) ) { + FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); ) + return FD_QUIC_FAILED; + } fd_quic_stream_map_t * entry = fd_quic_stream_map_insert( conn->stream_map, next_stream_id ); if( FD_UNLIKELY( !entry ) ) { /* stream_map should be large enough for maximum tgt_stream_id */ /* TODO add metric */ + FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); ) return FD_QUIC_FAILED; } diff --git a/src/waltz/quic/tests/Local.mk b/src/waltz/quic/tests/Local.mk index 77d509cc01..33f4477f5f 100644 --- a/src/waltz/quic/tests/Local.mk +++ b/src/waltz/quic/tests/Local.mk @@ -28,6 +28,7 @@ $(call run-unit-test,test_quic_crypto) $(call make-unit-test,test_quic_client_flood,test_quic_client_flood,fd_quic fd_tls fd_ballet fd_waltz fd_util) $(call make-unit-test,test_quic_server,test_quic_server, fd_quic fd_tls fd_ballet fd_waltz fd_util) $(call make-unit-test,test_quic_txns, test_quic_txns, fd_quic fd_tls fd_ballet fd_waltz fd_util) +$(call make-unit-test,test_quic_idle_conns, test_quic_idle_conns, fd_quic fd_tls fd_ballet fd_waltz fd_util) $(call make-unit-test,test_quic_frames,test_frames,fd_quic fd_util) $(call make-unit-test,test_quic_retry_unit,test_quic_retry_unit,fd_quic fd_ballet fd_waltz fd_util) diff --git a/src/waltz/quic/tests/test_quic_idle_conns.c b/src/waltz/quic/tests/test_quic_idle_conns.c new file mode 100644 index 0000000000..1f2f8004e3 --- /dev/null +++ b/src/waltz/quic/tests/test_quic_idle_conns.c @@ -0,0 +1,288 @@ +#include "../fd_quic.h" +#include "fd_quic_test_helpers.h" +#include "../../../ballet/base64/fd_base64.h" +#include "../../../util/net/fd_ip4.h" + +#include +#include +#include + + +struct conn_meta { + fd_quic_conn_t * conn; + uint conn_idx; + uint state; +}; +typedef struct conn_meta conn_meta_t; + +#define CONN_STATE_DEAD 0U +#define CONN_STATE_INIT 1U +#define CONN_STATE_ACTIVE 2U + +#define MAX_CONNS 65536 + +conn_meta_t g_conn_meta[MAX_CONNS]; + +int g_dead = MAX_CONNS; +int g_init = 0; +int g_active = 0; + +void +cb_conn_new( fd_quic_conn_t * conn, + void * quic_ctx ) { + (void)conn; + (void)quic_ctx; +} + +void +cb_conn_handshake_complete( fd_quic_conn_t * conn, + void * quic_ctx ) { + (void)conn; + (void)quic_ctx; + + conn_meta_t * conn_meta = &g_conn_meta[conn->conn_idx]; + + conn_meta->state = CONN_STATE_ACTIVE; + + g_init--; + g_active++; +} + +void +cb_conn_final( fd_quic_conn_t * conn, + void * quic_ctx ) { + (void)conn; + (void)quic_ctx; + + conn_meta_t * conn_meta = &g_conn_meta[conn->conn_idx]; + + conn_meta->conn = NULL; + + switch( conn_meta->state ) { + case CONN_STATE_DEAD: break; + case CONN_STATE_INIT: g_init--; g_dead++; break; + case CONN_STATE_ACTIVE: g_active--; g_dead++; break; + } + + conn_meta->state = CONN_STATE_DEAD; +} + +void +cb_stream_new( fd_quic_stream_t * stream, + void * quic_ctx ) { + (void)stream; + (void)quic_ctx; +} + +void +cb_stream_notify( fd_quic_stream_t * stream, + void * stream_ctx, + int notify_type ) { + (void)stream; + (void)stream_ctx; + (void)notify_type; +} + +void +cb_stream_receive( fd_quic_stream_t * stream, + void * stream_ctx, + uchar const * data, + ulong data_sz, + ulong offset, + int fin ) { + (void)stream; + (void)stream_ctx; + (void)data; + (void)data_sz; + (void)offset; + (void)fin; +} + +ulong +cb_now( void * context ) { + (void)context; + return (ulong)fd_log_wallclock(); +} + + +void +run_quic_client( fd_quic_t * quic, + fd_quic_udpsock_t * udpsock, + uint dst_ip, + ushort dst_port ) { + + quic->cb.conn_new = cb_conn_new; + quic->cb.conn_hs_complete = cb_conn_handshake_complete; + quic->cb.conn_final = cb_conn_final; + quic->cb.stream_new = cb_stream_new; + quic->cb.stream_notify = cb_stream_notify; + quic->cb.stream_receive = cb_stream_receive; + quic->cb.now = cb_now; + quic->cb.now_ctx = NULL; + + fd_quic_set_aio_net_tx( quic, udpsock->aio ); + FD_TEST( fd_quic_init( quic ) ); + + ulong out_time = (ulong)fd_log_wallclock() + (ulong)1e9; + + while( 1 ) { + fd_quic_service( quic ); + fd_quic_udpsock_service( udpsock ); + + if( g_dead > 0 ) { + /* start a new connection */ + fd_quic_conn_t * conn = fd_quic_connect( quic, dst_ip, dst_port, NULL ); + + if( conn ) { + g_conn_meta[conn->conn_idx].conn = conn; + g_conn_meta[conn->conn_idx].conn_idx = (uint)conn->conn_idx; + g_conn_meta[conn->conn_idx].state = CONN_STATE_INIT; + + g_dead--; + g_init++; + } + } + + /* TODO send pings */ + + /* output stats */ + ulong now = (ulong)fd_log_wallclock(); + if( now > out_time ) { + FD_LOG_NOTICE(( "connections: active: %lu initializing: %lu", (ulong)g_active, (ulong)g_init )); + out_time = now + (ulong)1e9; + } + } + + /* finalize quic */ + fd_quic_fini( quic ); +} + + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + + char const * _src_ip = fd_env_strip_cmdline_cstr( &argc, + &argv, + "--src-ip", + NULL, + "127.0.0.1" ); + + char const * _dst_ip = fd_env_strip_cmdline_cstr( &argc, + &argv, + "--dst-ip", + NULL, + "127.0.0.1" ); + + ushort dst_port = fd_env_strip_cmdline_ushort( &argc, + &argv, + "--dst-port", + NULL, + 9007 ); + + /* number of connections to maintain */ + ulong num_conns = fd_env_strip_cmdline_ulong( &argc, + &argv, + "--num-conns", + NULL, + 256 ); + + ulong num_pages = fd_env_strip_cmdline_ulong( &argc, + &argv, + "--num-pages", + NULL, + 1 << 15 ); + + char const * _page_sz = fd_env_strip_cmdline_cstr( &argc, + &argv, + "--page-sz", + NULL, + "normal" ); + + ulong cpu_idx = fd_tile_cpu_id( fd_tile_idx() ); + if( cpu_idx>=fd_shmem_cpu_cnt() ) cpu_idx = 0UL; + ulong numa_idx = fd_env_strip_cmdline_ulong( &argc, + &argv, + "--numa-idx", + NULL, + fd_shmem_numa_idx(cpu_idx) ); + + ulong page_sz = fd_cstr_to_shmem_page_sz( _page_sz ); + if( FD_UNLIKELY( !page_sz ) ) FD_LOG_ERR(( "unsupported --page-sz" )); + + if( num_conns > MAX_CONNS ) { + FD_LOG_ERR(( "Argument --num-conns larger than maximum of %lu", (ulong)MAX_CONNS )); + } + + fd_wksp_t * wksp = fd_wksp_new_anonymous( page_sz, + num_pages, + numa_idx, + "wksp", + 0UL ); + FD_TEST( wksp ); + + fd_quic_limits_t quic_limits = { + .conn_cnt = num_conns, + .handshake_cnt = num_conns, + .conn_id_cnt = 16UL, + .conn_id_sparsity = 4.0, + .stream_cnt = { 0UL, // FD_QUIC_STREAM_TYPE_BIDI_CLIENT + 0UL, // FD_QUIC_STREAM_TYPE_BIDI_SERVER + 2UL, // FD_QUIC_STREAM_TYPE_UNI_CLIENT + 0UL }, // FD_QUIC_STREAM_TYPE_UNI_SERVER + .initial_stream_cnt = { 0UL, // FD_QUIC_STREAM_TYPE_BIDI_CLIENT + 0UL, // FD_QUIC_STREAM_TYPE_BIDI_SERVER + 2UL, // FD_QUIC_STREAM_TYPE_UNI_CLIENT + 0UL }, // FD_QUIC_STREAM_TYPE_UNI_SERVER + .stream_pool_cnt = num_conns * FD_QUIC_STREAM_MIN, + .stream_sparsity = 4.0, + .inflight_pkt_cnt = 64UL, + .tx_buf_sz = 0 + }; + ulong quic_footprint = fd_quic_footprint( &quic_limits ); + FD_TEST( quic_footprint ); + + void * mem = fd_wksp_alloc_laddr( wksp, fd_quic_align(), quic_footprint, 1UL ); + fd_quic_t * quic = fd_quic_new( mem, &quic_limits ); + FD_TEST( quic ); + + fd_rng_t _rng[1]; fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) ); + fd_tls_test_sign_ctx_t * sign_ctx = fd_wksp_alloc_laddr( wksp, alignof(fd_tls_test_sign_ctx_t), sizeof(fd_tls_test_sign_ctx_t), 1UL ); + *sign_ctx = fd_tls_test_sign_ctx( rng ); + fd_quic_config_test_signer( quic, sign_ctx ); + + fd_quic_udpsock_t _udpsock; + uint listen_ip; + if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( _src_ip, &listen_ip ) ) ) { + FD_LOG_NOTICE(( "invalid --src-ip" )); + return 1; + } + fd_quic_udpsock_t * udpsock = fd_quic_client_create_udpsock( &_udpsock, wksp, fd_quic_get_aio_net_rx( quic ), listen_ip ); + FD_TEST( udpsock == &_udpsock ); + + fd_quic_config_t * client_cfg = &quic->config; + client_cfg->role = FD_QUIC_ROLE_CLIENT; + FD_TEST( fd_quic_config_from_env( &argc, &argv, client_cfg ) ); + memcpy(client_cfg->link.dst_mac_addr, "\x52\xF1\x7E\xDA\x2C\xE0", 6UL); + client_cfg->net.ip_addr = udpsock->listen_ip; + client_cfg->net.ephem_udp_port.lo = (ushort)udpsock->listen_port; + client_cfg->net.ephem_udp_port.hi = (ushort)(udpsock->listen_port + 1); + client_cfg->initial_rx_max_stream_data = 1<<15; + client_cfg->idle_timeout = (ulong)10000e6; + + uint dst_ip = 0; + if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( _dst_ip, &dst_ip ) ) ) { + FD_LOG_NOTICE(( "invalid --dst-ip" )); + return 1; + } + run_quic_client( quic, udpsock, dst_ip, dst_port ); + + fd_wksp_free_laddr( fd_quic_delete( fd_quic_leave( quic ) ) ); + fd_quic_udpsock_destroy( udpsock ); + fd_wksp_delete_anonymous( wksp ); + + fd_halt(); + + return 0; +}