Skip to content

Commit

Permalink
feat(quic): qos incl. stake weighted flow control + conn eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
lidatong committed Jul 20, 2023
1 parent e8d71db commit 1788e0d
Show file tree
Hide file tree
Showing 13 changed files with 811 additions and 6 deletions.
7 changes: 5 additions & 2 deletions src/ballet/x509/fd_x509.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ fd_x509_gen_solana_cert( EVP_PKEY * pkey ) {

/* Generate serial number */
long serial;
if( FD_UNLIKELY( 1!=RAND_bytes( (uchar *)&serial, sizeof(long) ) ) ) {
if ( FD_UNLIKELY( 1 != RAND_bytes( (uchar *)&serial, sizeof( long ) ) ) ) {
FD_LOG_WARNING(( "RAND_bytes() failed" ));
goto cleanup1;
}
ASN1_INTEGER_set( X509_get_serialNumber(x), serial );

/* Set public key (the only important part) */
X509_set_pubkey( x, pkey );
if ( FD_UNLIKELY( 1 != X509_set_pubkey( x, pkey ) ) ) {
FD_LOG_WARNING(( "X509_set_pubkey() failed" ));
goto cleanup1;
};

/* Set very long expiration date */
long not_before = 0L; /* Jan 1 00:00:00 1975 GMT */
Expand Down
2 changes: 1 addition & 1 deletion src/tango/quic/Local.mk
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ ifdef FD_HAS_OPENSSL
$(call make-lib,fd_quic)
$(call add-objs,fd_quic fd_quic_conn fd_quic_conn_id fd_quic_conn_map fd_quic_proto \
fd_quic_stream tls/fd_quic_tls crypto/fd_quic_crypto_suites templ/fd_quic_transport_params \
templ/fd_quic_parse_util fd_quic_pkt_meta,fd_quic)
templ/fd_quic_parse_util fd_quic_pkt_meta fd_quic_qos,fd_quic)
$(call make-bin,fd_quic_ctl,fd_quic_ctl,fd_quic fd_ballet fd_util)
endif
10 changes: 8 additions & 2 deletions src/tango/quic/fd_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,12 @@ fd_quic_stream_fin( fd_quic_stream_t * stream ) {
/* TODO update metrics */
}

void
fd_quic_conn_set_max_streams( fd_quic_conn_t * conn, int dirtype, ulong max_streams ) {
int type = ((dirtype & 1) << 1) + conn->server; /* `dirtype & 1` clamps to 0 or 1 */
conn->max_streams[(ulong)type] = max_streams;
}

void
fd_quic_conn_set_rx_max_data( fd_quic_conn_t * conn, ulong rx_max_data ) {
conn->rx_max_data = rx_max_data;
Expand Down Expand Up @@ -3932,6 +3938,7 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) {
ulong stream_type_idx = 2u | !conn->server;
frame.max_streams.stream_type = 1;
frame.max_streams.max_streams = conn->max_streams[stream_type_idx];
frame.max_streams.max_streams = conn->max_streams[stream_type_idx];

/* attempt to write into buffer */
frame_sz = fd_quic_encode_max_streams_frame( payload_ptr,
Expand Down Expand Up @@ -4642,8 +4649,7 @@ fd_quic_connect( fd_quic_t * quic,
transport_params_raw,
FD_QUIC_TRANSPORT_PARAMS_RAW_SZ,
tp );
if( FD_UNLIKELY( tp_rc == FD_QUIC_ENCODE_FAIL ) ) {
/* FIXME log error in counters */
if( FD_UNLIKELY( tp_rc == FD_QUIC_ENCODE_FAIL ) ) { /* FIXME log error in counters */
FD_DEBUG( FD_LOG_DEBUG(( "fd_quic_encode_transport_params failed" )) );
goto fail_conn;
}
Expand Down
36 changes: 36 additions & 0 deletions src/tango/quic/fd_quic.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ typedef void
(* fd_quic_cb_tls_keylog_t)( void * quic_ctx,
char const * line );

/* fd_quic_conn_evict is called when there are no available connections
to determine which to evict. */

typedef void
(* fd_quic_cb_conn_evict_t)( void * quic_ctx );


/* fd_quic_callbacks_t defines the set of user-provided callbacks that
are invoked by the QUIC library. Resets on leave. */

Expand Down Expand Up @@ -662,6 +669,34 @@ fd_quic_stream_fin( fd_quic_stream_t * stream );
//void
//fd_quic_stream_close( fd_quic_stream_t * stream, int direction_flags );

/* Flow Control API ***************************************************/

/* fd_quic_conn_set_rx_max_data sets the maximum amount of data that can be sent
by the peer on a connection. This update will propagate to the peer via a
MAX_DATA frame.
A violation of this flow control param will result in connection termination
with FLOW_CONTROL_ERROR, per RFC 9000. */
FD_QUIC_API void
fd_quic_conn_set_rx_max_data( fd_quic_conn_t * conn, ulong rx_max_data );

FD_QUIC_API void
fd_quic_conn_set_max_streams( fd_quic_conn_t * conn, int dirtype, ulong max_streams );

/* fd_quic_stream_set_rx_max_stream_data sets the maximum amount of data that
can be sent by the peer on a stream. This update will propagate to the peer
via a MAX_STREAM_DATA frame.
A violation of this flow control param will result in connection termination
with FLOW_CONTROL_ERROR, per RFC 9000.
Note that updating this param will not affect the `max_data` param (above).
The effective limit will be the smaller of the two (see the stream loop in
`fd_quic.c`). Therefore, a user should consider both params when configuring
flow control. */
FD_QUIC_API void
fd_quic_stream_set_rx_max_stream_data( fd_quic_stream_t * stream, ulong rx_max_stream_data );

FD_PROTOTYPES_END

uint fd_quic_tx_buffered_raw(fd_quic_t *quic,
Expand All @@ -681,6 +716,7 @@ uint fd_quic_tx_buffered_raw(fd_quic_t *quic,
/* Convenience exports for consumers of API */
#include "fd_quic_conn.h"
#include "fd_quic_stream.h"
#include "fd_quic_qos.h"

/* FD_DEBUG_MODE: set to enable debug-only code
TODO move to util? */
Expand Down
239 changes: 239 additions & 0 deletions src/tango/quic/fd_quic_qos.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include "../../util/rng/fd_rng.h"
#include "tls/fd_quic_tls.h"
#include "fd_quic_qos.h"

int
fd_stake_pubkey_from_cert( fd_stake_pubkey_t * pubkey, X509 * cert ) {
EVP_PKEY * pubkey_openssl = X509_get_pubkey( cert );
size_t len = FD_TXN_PUBKEY_SZ;
FD_TEST( pubkey_openssl != NULL );
EVP_PKEY_get_raw_public_key( pubkey_openssl, pubkey->pubkey, &len );
EVP_PKEY_free( pubkey_openssl );
return 0;
}

fd_quic_qos_unpriv_conn_ele_t *
fd_quic_qos_unpriv_conn_ele_insert( fd_quic_qos_unpriv_conn_ele_t * prev,
fd_quic_qos_unpriv_conn_ele_t * ele ) {
ele->next = prev->next;
ele->prev = prev;
prev->next->prev = ele;
prev->next = ele;
return ele;
}

fd_quic_qos_unpriv_conn_ele_t *
fd_quic_qos_unpriv_conn_ele_remove( fd_quic_qos_unpriv_conn_ele_t * ele ) {
if ( FD_UNLIKELY( ele->next == ele ) ) return NULL; /* this is the sentinel */
ele->prev->next = ele->next;
ele->next->prev = ele->prev;
ele->prev = NULL;
ele->next = NULL;
ele->conn = NULL;
return ele;
}

ulong
fd_quic_qos_unpriv_conn_list_align( void ) {
return FD_QUIC_QOS_UNPRIV_CONN_LIST_ALIGN;
}

ulong
fd_quic_qos_unpriv_conn_list_footprint( ulong max ) {
return sizeof( fd_quic_qos_unpriv_conn_list_t ) + sizeof( fd_quic_qos_unpriv_conn_ele_t ) * max;
}

ulong
fd_quic_qos_unpriv_conn_list_new( void * mem ) {
FD_SCRATCH_ALLOC_INIT( l, mem );
fd_quic_qos_unpriv_conn_list_t * list = FD_SCRATCH_ALLOC_APPEND(
l, fd_quic_qos_unpriv_conn_list_align(), sizeof( fd_quic_qos_unpriv_conn_list_t ) );
FD_SCRATCH_ALLOC_FINI( l, fd_quic_qos_unpriv_conn_list_align() );
fd_quic_qos_unpriv_conn_ele_t sentinel = {
.conn = NULL, .next = list->sentinel, .prev = list->sentinel };
*list->sentinel = sentinel;
return 0;
}

fd_quic_qos_unpriv_conn_ele_t *
fd_quic_qos_unpriv_conn_list_push_back( fd_quic_qos_unpriv_conn_list_t * list,
fd_quic_qos_unpriv_conn_ele_t * curr ) {
fd_quic_qos_unpriv_conn_ele_t * tail = list->sentinel->prev;
return fd_quic_qos_unpriv_conn_ele_insert( tail, curr );
}

fd_quic_qos_unpriv_conn_ele_t *
fd_quic_qos_unpriv_conn_list_pop_front( fd_quic_qos_unpriv_conn_list_t * list ) {
if ( FD_UNLIKELY( list->sentinel->next == list->sentinel ) ) { /* list is empty*/
return NULL;
}
return fd_quic_qos_unpriv_conn_ele_remove( list->sentinel->next );
}

/* Unprivileged connection LRU algorithm:
1. The LRU contains a map of conn_id -> conn (ele) and a doubly-linked list
of conns.
2. When a new conn comes in, we check if it is already in the LRU by looking
it up in the map.
3. If it is already in the LRU, we move it to the back of the list.
4. If it is not in the LRU, we check if the LRU is full.
5. If the LRU is full, we evict 10% of the conns in the LRU.
6. We then pop the next conn off the free list, push it onto the used list,
and insert it into the map. */
fd_quic_qos_unpriv_conn_lru_t *
fd_quic_qos_unpriv_conn_upsert( fd_quic_qos_unpriv_conn_lru_t * lru, fd_quic_conn_t * conn ) {
fd_quic_qos_unpriv_conn_map_t * curr =
fd_quic_qos_unpriv_conn_map_query( lru->map, conn->local_conn_id, NULL );
if ( FD_LIKELY( curr ) ) { /* more likely to be handling existing conns that new
conns */
/* update existing conn to be the MRU */
fd_quic_qos_unpriv_conn_list_push_back( lru->used_list,
fd_quic_qos_unpriv_conn_ele_remove( curr->ele ) );
} else { /* new conn */
/* check if LRU is full */
if ( FD_UNLIKELY( lru->free_list->sentinel->next == lru->free_list->sentinel ) ) {
fd_quic_qos_unpriv_conn_ele_t * curr = lru->used_list->sentinel->prev;
/* if full, evict 10% of conns */
int n = (int)( lru->max / 10 );
for ( int i = 0; i < n; i++ ) {
fd_quic_qos_unpriv_conn_ele_t * pop_push =
fd_quic_qos_unpriv_conn_list_pop_front( lru->used_list );
/* add to the free list */
fd_quic_qos_unpriv_conn_list_push_back( lru->free_list, pop_push );
fd_quic_qos_unpriv_conn_map_t * map_slot =
fd_quic_qos_unpriv_conn_map_query( lru->map, pop_push->conn->local_conn_id, NULL );
/* if the ele is in the LRU list but not the map this is a programming error */
if ( FD_UNLIKELY( fd_quic_qos_unpriv_conn_map_key_inval( map_slot->key ) ) ) {
FD_LOG_ERR(
( "LRU list and map are out of sync. conn_id: %lu", curr->conn->local_conn_id ) );
}
/* remove from the lookup cache */
fd_quic_qos_unpriv_conn_map_remove( lru->map, map_slot );
}
}
fd_quic_qos_unpriv_conn_ele_t * curr = fd_quic_qos_unpriv_conn_list_pop_front( lru->free_list );
curr->conn = conn;
fd_quic_qos_unpriv_conn_list_push_back( lru->used_list, curr );
fd_quic_qos_unpriv_conn_map_insert( lru->map, conn->local_conn_id );
}
return lru;
}

ulong
fd_quic_qos_unpriv_conn_lru_align( void ) {
return FD_QUIC_QOS_UNPRIV_CONN_LRU_ALIGN;
}

ulong
fd_quic_qos_unpriv_conn_lru_footprint( ulong max ) {
ulong l;
l = FD_LAYOUT_INIT;
l = FD_LAYOUT_APPEND(
l, fd_quic_qos_unpriv_conn_lru_align(), sizeof( fd_quic_qos_unpriv_conn_lru_t ) );
l = FD_LAYOUT_APPEND(
l, fd_quic_qos_unpriv_conn_list_align(), fd_quic_qos_unpriv_conn_list_footprint( max ) );
l = FD_LAYOUT_APPEND(
l, fd_quic_qos_unpriv_conn_list_align(), fd_quic_qos_unpriv_conn_list_footprint( max ) );

return FD_QUIC_QOS_UNPRIV_CONN_LRU_ALIGN;
}

ulong
fd_quic_qos_align( void ) {
return FD_QUIC_QOS_ALIGN;
}

ulong
fd_quic_qos_footprint( fd_quic_qos_limits_t * limits ) {
ulong l;
l = FD_LAYOUT_INIT;
l = FD_LAYOUT_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) );
l = FD_LAYOUT_APPEND( l,
fd_quic_qos_priv_conn_align(),
fd_quic_qos_priv_conn_footprint( (int)limits->priv_conns ) );
l = FD_LAYOUT_APPEND( l,
fd_quic_qos_priv_conn_align(),
fd_quic_qos_priv_conn_footprint( (int)limits->unpriv_conns ) );
return FD_LAYOUT_FINI( l, fd_quic_qos_align() );
}

void *
fd_quic_qos_new( void * mem ) {
FD_SCRATCH_ALLOC_INIT( l, mem );
fd_quic_qos_t * qos = FD_SCRATCH_ALLOC_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) );
(void)qos;
return mem;
}

fd_quic_qos_t *
fd_quic_qos_join( void * mem ) {
FD_SCRATCH_ALLOC_INIT( l, mem );
fd_quic_qos_t * qos = FD_SCRATCH_ALLOC_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) );
// qos->staked_node_map = fd_quic_qos_staked_node_join( FD_SCRATCH_ALLOC_APPEND(
// l, fd_quic_qos_staked_node_align(), fd_quic_qos_staked_node_footprint( 2 ) ) );
(void)qos;
return qos;
}

void
fd_quic_qos_conn_new( fd_quic_qos_t * qos, fd_quic_conn_t * conn ) {
fd_stake_pubkey_t pubkey;
fd_stake_pubkey_from_cert( &pubkey, SSL_get_peer_certificate( conn->tls_hs->ssl ) );
fd_stake_staked_node_t * node =
fd_stake_staked_node_query( qos->stake->staked_nodes, pubkey, NULL );

ulong stake = 0;
if ( FD_UNLIKELY( ( !node ) ) ) { /* most incoming traffic likely unstaked */
fd_quic_conn_set_context( conn, node );
stake = node->stake;
}

ulong total_stake = fd_ulong_max( qos->stake->total_stake, 1UL ); /* avoid division by zero */
ulong share = ( stake * 100 / total_stake * 100 ); /* truncating division */
fd_quic_qos_limits_t limits = qos->limits;
ulong max_streams = share * ( limits.max_streams - limits.min_streams );
/* clamp */
max_streams = fd_ulong_min( fd_ulong_max( max_streams, limits.min_streams ), limits.max_streams );
fd_quic_conn_set_max_streams( conn, FD_QUIC_TYPE_UNIDIR, max_streams );
FD_LOG_NOTICE(
( "server: new connection with max streams %lu",
( conn->max_streams[FD_QUIC_STREAM_TYPE_UNI_CLIENT] - FD_QUIC_STREAM_TYPE_UNI_CLIENT ) /
4 ) );
}

void
fd_quic_qos_conn_evict( fd_quic_qos_t * qos, fd_quic_conn_t * conn ) {
fd_stake_pubkey_t pubkey;
fd_stake_pubkey_from_cert( &pubkey, SSL_get_peer_certificate( conn->tls_hs->ssl ) );
fd_stake_staked_node_t * node =
fd_stake_staked_node_query( qos->stake->staked_nodes, pubkey, NULL );

if ( FD_LIKELY( ( node ) ) ) { /* optimize this eviction code path */
/* Randomly sample 2 * lg(n) entries in the staked map and evict the
* smallest. Multiply by 2 because the map is sparse. */
fd_quic_qos_priv_conn_t * arg_min = NULL;
ulong min = node->stake;
int lg_n = fd_quic_qos_priv_conn_lg_slot_cnt( qos->priv_conn_map );
ulong n = fd_quic_qos_priv_conn_slot_cnt( qos->priv_conn_map );
for ( int i = 0; i < 2 * lg_n; i++ ) {
ulong slot_idx = fd_rng_ulong( qos->rng ) % n;
fd_quic_qos_priv_conn_t random_slot = qos->priv_conn_map[slot_idx];
/* Likely to find something given the map is full (linear-probing) */
if ( FD_LIKELY( !fd_quic_qos_priv_conn_key_inval( random_slot.key ) ) ) {
fd_quic_conn_t * random_conn = random_slot.conn;
ulong stake =
( (fd_stake_staked_node_t *)fd_quic_conn_get_context( random_conn ) )->stake;
if ( stake < min ) arg_min = &random_slot;
}
}
/* Unclear how likely this is... but probably won't evict anything */
if ( FD_UNLIKELY( arg_min ) ) {
fd_quic_qos_priv_conn_remove( qos->priv_conn_map, arg_min );
fd_quic_qos_priv_conn_insert( qos->priv_conn_map, conn->local_conn_id )->conn = conn;
return;
}
}
/* Otherwise upsert it into the LRU cache (which will update and evict as * neccessary) */
fd_quic_qos_unpriv_conn_upsert( qos->unpriv_conn_lru, conn );
}
Loading

0 comments on commit 1788e0d

Please sign in to comment.