Skip to content

Commit

Permalink
Fix stream handling to ensure all connections always have at least a …
Browse files Browse the repository at this point in the history
…minimum number of streams
  • Loading branch information
nbridge-jump authored and mmcgee-jump committed Oct 10, 2024
1 parent f07011c commit 3c0ff66
Show file tree
Hide file tree
Showing 4 changed files with 356 additions and 23 deletions.
6 changes: 3 additions & 3 deletions src/app/fdctl/config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ dynamic_port_range = "8900-9000"
# underlying link bandwidth. Supporting more streams per
# connection currently has a memory footprint cost on the order
# of kilobytes per stream, per connection.
max_concurrent_streams_per_connection = 2048
max_concurrent_streams_per_connection = 32

# QUIC uses a fixed-size pool of streams to use for all the
# connections in the QUIC instance. When a new connection is
Expand All @@ -872,7 +872,7 @@ dynamic_port_range = "8900-9000"
# this value must be at least as high as the value of
# `max_concurrent_connections` above multiplied by 16, or else
# the validator will refuse to start.
stream_pool_cnt = 32768
stream_pool_cnt = 65536

# Controls how much transactions coming in via TPU can be
# reassembled at the same time. Reassembly is required for user
Expand Down Expand Up @@ -919,7 +919,7 @@ dynamic_port_range = "8900-9000"
#
# An idle connection will be terminated if it remains idle
# longer than this threshold.
idle_timeout_millis = 10000
idle_timeout_millis = 2000

# QUIC retry is a feature to combat new connection request
# spamming. See rfc9000 8.1.2 for more details. This flag
Expand Down
84 changes: 64 additions & 20 deletions src/waltz/quic/fd_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,9 @@ fd_quic_stream_send( fd_quic_stream_t * stream,
/* stream_id & 2 == 0 is bidir
stream_id & 1 == 0 is client */
if( FD_UNLIKELY( ( ( (uint)stream_id & 2u ) == 2u ) &
( ( (uint)stream_id & 1u ) != (uint)conn->server ) ) )
( ( (uint)stream_id & 1u ) != (uint)conn->server ) ) ) {
return FD_QUIC_SEND_ERR_INVAL_STREAM;
}

if( FD_UNLIKELY( conn->state != FD_QUIC_CONN_STATE_ACTIVE ) ) {
if( conn->state == FD_QUIC_CONN_STATE_HANDSHAKE ||
Expand Down Expand Up @@ -1400,19 +1401,6 @@ fd_quic_handle_v1_initial( fd_quic_t * quic,

/* This is the initial packet before retry. */
if( initial->token_len == 0 ) {
/* FIXME we pass the "tp" here to allow the function to add the retry_source_connection_ip
* transport parameter to be added.
* This is incorrect, since:
* 1. "tp" is transient, and will likely be modified before it is encoded into a packet
* 2. RETRY is supposed to be stateless
*
* The fix is thus:
* 1. put the new source connection id (or some seed from which it's generated)
* into the token in the RETRY packet
* 2. upon receiving the token back in a new INITIAL packet, unpack the connection id
* from the token and store in the newly created connection id
* 3. when the "tp" is about to be encoded, add the "retry_source_connection_ip"
* from the appropriate member of "conn" */
if( FD_UNLIKELY( fd_quic_send_retry(
quic, pkt,
&orig_dst_conn_id, &new_conn_id,
Expand Down Expand Up @@ -2135,8 +2123,10 @@ fd_quic_reschedule_conn( fd_quic_conn_t * conn,

ulong service_interval = fd_quic_get_service_interval( quic );

fd_rng_t * rng = state->_rng;
ulong jitter = fd_rng_uint( rng ) & 0x3ffUL;
timeout = fd_ulong_min( timeout, now + service_interval );
timeout = fd_ulong_max( timeout, now + 1UL );
timeout = fd_ulong_max( timeout, now + jitter );

if( conn->in_schedule ) {
if( timeout >= conn->sched_service_time ) return;
Expand Down Expand Up @@ -2779,7 +2769,7 @@ fd_quic_aio_cb_receive( void * context,
t1 = fd_quic_now( quic );
ulong delta = t1 - t0;
if( delta > (ulong)500e3 ) {
FD_LOG_WARNING(( "CALLBACK - took %lu t0: %lu t1: %lu batch_cnt: %lu", delta, t0, t1, (ulong)batch_cnt ));
FD_LOG_WARNING(( "CALLBACK - took %ld t0: %lu t1: %lu batch_cnt: %lu", delta, t0, t1, (ulong)batch_cnt ));
}
)

Expand Down Expand Up @@ -3017,6 +3007,34 @@ fd_quic_tls_cb_handshake_complete( fd_quic_tls_hs_t * hs,

/* Trigger allocation of streams to connections */
fd_quic_state_t * state = fd_quic_get_state( conn->quic );

/* allocate client initiated unidir streams */
uint stream_type = FD_QUIC_TYPE_UNIDIR << 1u;

if( conn->server ) {
fd_quic_stream_pool_t * stream_pool = state->stream_pool;

/* assign streams to connection */
while( conn->cur_stream_cnt[stream_type] < FD_QUIC_STREAM_MIN ) {
/* obtain stream from stream pool */
fd_quic_stream_t * stream = fd_quic_stream_pool_alloc( stream_pool );
if( FD_UNLIKELY( !stream ) ) {
/* should never happen */
break;
}

int rtn = fd_quic_assign_stream( conn, stream_type, stream );
if( FD_UNLIKELY( rtn == FD_QUIC_FAILED ) ) {
/* should never happen */
/* return stream */
fd_quic_stream_pool_free( stream_pool, stream );
break;
}
}

fd_quic_conn_update_max_streams( conn, (uint)stream_type >> 1U );
}

state->flags |= FD_QUIC_FLAGS_ASSIGN_STREAMS;
return;

Expand Down Expand Up @@ -5959,14 +5977,32 @@ fd_quic_stream_free( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_stream_t *

stream->stream_id = ~0UL;

/* add to stream_pool */
/* get state */
fd_quic_state_t * state = fd_quic_get_state( quic );
fd_quic_stream_pool_free( state->stream_pool, stream );

/* decrement current stream count */
ulong type = stream_id & 3UL;
conn->cur_stream_cnt[type]--;

/* here, if the connection will become below the minimum streams */
/* we want to keep it allocated */
/* else we want to return it to the pool */
if( FD_UNLIKELY( conn->server
&& conn->state == FD_QUIC_CONN_STATE_ACTIVE
&& type == FD_QUIC_STREAM_TYPE_UNI_CLIENT
&& conn->cur_stream_cnt[type] < FD_QUIC_STREAM_MIN ) ) {
int rtn = fd_quic_assign_stream( conn, type, stream );
if( FD_UNLIKELY( rtn == FD_QUIC_FAILED ) ) {
FD_DEBUG( FD_LOG_WARNING(( "fd_quic_assign_stream failed" )); )

/* add to stream_pool */
fd_quic_stream_pool_free( state->stream_pool, stream );
}
} else {
/* add to stream_pool */
fd_quic_stream_pool_free( state->stream_pool, stream );
}

fd_quic_conn_update_max_streams( conn, (uint)type >> 1U );

fd_quic_stream_reclaim( conn, stream_id & 3 );
Expand Down Expand Up @@ -6687,6 +6723,8 @@ fd_quic_assign_streams( fd_quic_t * quic ) {

if( FD_UNLIKELY( conn->state != FD_QUIC_CONN_STATE_ACTIVE &&
conn->state != FD_QUIC_CONN_STATE_HANDSHAKE_COMPLETE ) ) {
FD_DEBUG( FD_LOG_WARNING(( "ASSIGN - conn chosen has unusable state: %u",
(uint)conn->state )); )
fd_quic_stream_pool_free( stream_pool, stream );
break;
}
Expand Down Expand Up @@ -6716,17 +6754,23 @@ fd_quic_assign_stream( fd_quic_conn_t * conn, ulong stream_type, fd_quic_stream_
ulong next_stream_id = conn->next_stream_id[stream_type];

/* check if connection is allowed to be assigned the stream */
if( FD_UNLIKELY( next_stream_id >= conn->tgt_sup_stream_id[stream_type] ) ) {
if( FD_UNLIKELY( conn->cur_stream_cnt[stream_type] >= FD_QUIC_STREAM_MIN &&
next_stream_id >= conn->tgt_sup_stream_id[stream_type] ) ) {
FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); )
return FD_QUIC_FAILED;
}

uint dirtype = ( (uint)stream_type & 2U ) >> 1U;
if( FD_UNLIKELY( dirtype==FD_QUIC_TYPE_BIDIR ) ) return FD_QUIC_FAILED;
if( FD_UNLIKELY( dirtype==FD_QUIC_TYPE_BIDIR ) ) {
FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); )
return FD_QUIC_FAILED;
}

fd_quic_stream_map_t * entry = fd_quic_stream_map_insert( conn->stream_map, next_stream_id );
if( FD_UNLIKELY( !entry ) ) {
/* stream_map should be large enough for maximum tgt_stream_id */
/* TODO add metric */
FD_DEBUG( FD_LOG_NOTICE(( "ASSIGN FAILED" )); )
return FD_QUIC_FAILED;
}

Expand Down
1 change: 1 addition & 0 deletions src/waltz/quic/tests/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ $(call run-unit-test,test_quic_crypto)
$(call make-unit-test,test_quic_client_flood,test_quic_client_flood,fd_quic fd_tls fd_ballet fd_waltz fd_util)
$(call make-unit-test,test_quic_server,test_quic_server, fd_quic fd_tls fd_ballet fd_waltz fd_util)
$(call make-unit-test,test_quic_txns, test_quic_txns, fd_quic fd_tls fd_ballet fd_waltz fd_util)
$(call make-unit-test,test_quic_idle_conns, test_quic_idle_conns, fd_quic fd_tls fd_ballet fd_waltz fd_util)

$(call make-unit-test,test_quic_frames,test_frames,fd_quic fd_util)
$(call make-unit-test,test_quic_retry_unit,test_quic_retry_unit,fd_quic fd_ballet fd_waltz fd_util)
Expand Down
Loading

0 comments on commit 3c0ff66

Please sign in to comment.