From 1788e0dd1d8a0d5b08a46475d70d88820ca2a98b Mon Sep 17 00:00:00 2001 From: Charles Li Date: Mon, 3 Jul 2023 01:33:24 +0000 Subject: [PATCH] feat(quic): qos incl. stake weighted flow control + conn eviction --- src/ballet/x509/fd_x509.c | 7 +- src/tango/quic/Local.mk | 2 +- src/tango/quic/fd_quic.c | 10 +- src/tango/quic/fd_quic.h | 36 +++ src/tango/quic/fd_quic_qos.c | 239 ++++++++++++++++++++ src/tango/quic/fd_quic_qos.h | 149 ++++++++++++ src/tango/quic/tests/Local.mk | 2 + src/tango/quic/tests/test_quic_qos_server.c | 200 ++++++++++++++++ src/tango/quic/tests/test_quic_qos_unit.c | 73 ++++++ src/tango/stake/Local.mk | 4 + src/tango/stake/fd_stake.c | 41 ++++ src/tango/stake/fd_stake.h | 52 +++++ src/tango/xdp/fd_xdp_ctl.c | 2 +- 13 files changed, 811 insertions(+), 6 deletions(-) create mode 100644 src/tango/quic/fd_quic_qos.c create mode 100644 src/tango/quic/fd_quic_qos.h create mode 100644 src/tango/quic/tests/test_quic_qos_server.c create mode 100644 src/tango/quic/tests/test_quic_qos_unit.c create mode 100644 src/tango/stake/Local.mk create mode 100644 src/tango/stake/fd_stake.c create mode 100644 src/tango/stake/fd_stake.h diff --git a/src/ballet/x509/fd_x509.c b/src/ballet/x509/fd_x509.c index 3bb8bb6251..2550b14888 100644 --- a/src/ballet/x509/fd_x509.c +++ b/src/ballet/x509/fd_x509.c @@ -50,14 +50,17 @@ fd_x509_gen_solana_cert( EVP_PKEY * pkey ) { /* Generate serial number */ long serial; - if( FD_UNLIKELY( 1!=RAND_bytes( (uchar *)&serial, sizeof(long) ) ) ) { + if ( FD_UNLIKELY( 1 != RAND_bytes( (uchar *)&serial, sizeof( long ) ) ) ) { FD_LOG_WARNING(( "RAND_bytes() failed" )); goto cleanup1; } ASN1_INTEGER_set( X509_get_serialNumber(x), serial ); /* Set public key (the only important part) */ - X509_set_pubkey( x, pkey ); + if ( FD_UNLIKELY( 1 != X509_set_pubkey( x, pkey ) ) ) { + FD_LOG_WARNING(( "X509_set_pubkey() failed" )); + goto cleanup1; + }; /* Set very long expiration date */ long not_before = 0L; /* Jan 1 00:00:00 1975 GMT */ diff --git a/src/tango/quic/Local.mk b/src/tango/quic/Local.mk index 14d3eecd8e..f24116fa43 100644 --- a/src/tango/quic/Local.mk +++ b/src/tango/quic/Local.mk @@ -2,6 +2,6 @@ ifdef FD_HAS_OPENSSL $(call make-lib,fd_quic) $(call add-objs,fd_quic fd_quic_conn fd_quic_conn_id fd_quic_conn_map fd_quic_proto \ fd_quic_stream tls/fd_quic_tls crypto/fd_quic_crypto_suites templ/fd_quic_transport_params \ - templ/fd_quic_parse_util fd_quic_pkt_meta,fd_quic) + templ/fd_quic_parse_util fd_quic_pkt_meta fd_quic_qos,fd_quic) $(call make-bin,fd_quic_ctl,fd_quic_ctl,fd_quic fd_ballet fd_util) endif diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index e220ab8c41..08d3bcb084 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -1020,6 +1020,12 @@ fd_quic_stream_fin( fd_quic_stream_t * stream ) { /* TODO update metrics */ } +void +fd_quic_conn_set_max_streams( fd_quic_conn_t * conn, int dirtype, ulong max_streams ) { + int type = ((dirtype & 1) << 1) + conn->server; /* `dirtype & 1` clamps to 0 or 1 */ + conn->max_streams[(ulong)type] = max_streams; +} + void fd_quic_conn_set_rx_max_data( fd_quic_conn_t * conn, ulong rx_max_data ) { conn->rx_max_data = rx_max_data; @@ -3932,6 +3938,7 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { ulong stream_type_idx = 2u | !conn->server; frame.max_streams.stream_type = 1; frame.max_streams.max_streams = conn->max_streams[stream_type_idx]; + frame.max_streams.max_streams = conn->max_streams[stream_type_idx]; /* attempt to write into buffer */ frame_sz = fd_quic_encode_max_streams_frame( payload_ptr, @@ -4642,8 +4649,7 @@ fd_quic_connect( fd_quic_t * quic, transport_params_raw, FD_QUIC_TRANSPORT_PARAMS_RAW_SZ, tp ); - if( FD_UNLIKELY( tp_rc == FD_QUIC_ENCODE_FAIL ) ) { - /* FIXME log error in counters */ + if( FD_UNLIKELY( tp_rc == FD_QUIC_ENCODE_FAIL ) ) { /* FIXME log error in counters */ FD_DEBUG( FD_LOG_DEBUG(( "fd_quic_encode_transport_params failed" )) ); goto fail_conn; } diff --git a/src/tango/quic/fd_quic.h b/src/tango/quic/fd_quic.h index 1ffcb30580..827704148f 100644 --- a/src/tango/quic/fd_quic.h +++ b/src/tango/quic/fd_quic.h @@ -358,6 +358,13 @@ typedef void (* fd_quic_cb_tls_keylog_t)( void * quic_ctx, char const * line ); +/* fd_quic_conn_evict is called when there are no available connections + to determine which to evict. */ + +typedef void +(* fd_quic_cb_conn_evict_t)( void * quic_ctx ); + + /* fd_quic_callbacks_t defines the set of user-provided callbacks that are invoked by the QUIC library. Resets on leave. */ @@ -662,6 +669,34 @@ fd_quic_stream_fin( fd_quic_stream_t * stream ); //void //fd_quic_stream_close( fd_quic_stream_t * stream, int direction_flags ); +/* Flow Control API ***************************************************/ + +/* fd_quic_conn_set_rx_max_data sets the maximum amount of data that can be sent + by the peer on a connection. This update will propagate to the peer via a + MAX_DATA frame. + + A violation of this flow control param will result in connection termination + with FLOW_CONTROL_ERROR, per RFC 9000. */ +FD_QUIC_API void +fd_quic_conn_set_rx_max_data( fd_quic_conn_t * conn, ulong rx_max_data ); + +FD_QUIC_API void +fd_quic_conn_set_max_streams( fd_quic_conn_t * conn, int dirtype, ulong max_streams ); + +/* fd_quic_stream_set_rx_max_stream_data sets the maximum amount of data that + can be sent by the peer on a stream. This update will propagate to the peer + via a MAX_STREAM_DATA frame. + + A violation of this flow control param will result in connection termination + with FLOW_CONTROL_ERROR, per RFC 9000. + + Note that updating this param will not affect the `max_data` param (above). + The effective limit will be the smaller of the two (see the stream loop in + `fd_quic.c`). Therefore, a user should consider both params when configuring + flow control. */ +FD_QUIC_API void +fd_quic_stream_set_rx_max_stream_data( fd_quic_stream_t * stream, ulong rx_max_stream_data ); + FD_PROTOTYPES_END uint fd_quic_tx_buffered_raw(fd_quic_t *quic, @@ -681,6 +716,7 @@ uint fd_quic_tx_buffered_raw(fd_quic_t *quic, /* Convenience exports for consumers of API */ #include "fd_quic_conn.h" #include "fd_quic_stream.h" +#include "fd_quic_qos.h" /* FD_DEBUG_MODE: set to enable debug-only code TODO move to util? */ diff --git a/src/tango/quic/fd_quic_qos.c b/src/tango/quic/fd_quic_qos.c new file mode 100644 index 0000000000..9522341005 --- /dev/null +++ b/src/tango/quic/fd_quic_qos.c @@ -0,0 +1,239 @@ +#include "../../util/rng/fd_rng.h" +#include "tls/fd_quic_tls.h" +#include "fd_quic_qos.h" + +int +fd_stake_pubkey_from_cert( fd_stake_pubkey_t * pubkey, X509 * cert ) { + EVP_PKEY * pubkey_openssl = X509_get_pubkey( cert ); + size_t len = FD_TXN_PUBKEY_SZ; + FD_TEST( pubkey_openssl != NULL ); + EVP_PKEY_get_raw_public_key( pubkey_openssl, pubkey->pubkey, &len ); + EVP_PKEY_free( pubkey_openssl ); + return 0; +} + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_ele_insert( fd_quic_qos_unpriv_conn_ele_t * prev, + fd_quic_qos_unpriv_conn_ele_t * ele ) { + ele->next = prev->next; + ele->prev = prev; + prev->next->prev = ele; + prev->next = ele; + return ele; +} + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_ele_remove( fd_quic_qos_unpriv_conn_ele_t * ele ) { + if ( FD_UNLIKELY( ele->next == ele ) ) return NULL; /* this is the sentinel */ + ele->prev->next = ele->next; + ele->next->prev = ele->prev; + ele->prev = NULL; + ele->next = NULL; + ele->conn = NULL; + return ele; +} + +ulong +fd_quic_qos_unpriv_conn_list_align( void ) { + return FD_QUIC_QOS_UNPRIV_CONN_LIST_ALIGN; +} + +ulong +fd_quic_qos_unpriv_conn_list_footprint( ulong max ) { + return sizeof( fd_quic_qos_unpriv_conn_list_t ) + sizeof( fd_quic_qos_unpriv_conn_ele_t ) * max; +} + +ulong +fd_quic_qos_unpriv_conn_list_new( void * mem ) { + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_quic_qos_unpriv_conn_list_t * list = FD_SCRATCH_ALLOC_APPEND( + l, fd_quic_qos_unpriv_conn_list_align(), sizeof( fd_quic_qos_unpriv_conn_list_t ) ); + FD_SCRATCH_ALLOC_FINI( l, fd_quic_qos_unpriv_conn_list_align() ); + fd_quic_qos_unpriv_conn_ele_t sentinel = { + .conn = NULL, .next = list->sentinel, .prev = list->sentinel }; + *list->sentinel = sentinel; + return 0; +} + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_list_push_back( fd_quic_qos_unpriv_conn_list_t * list, + fd_quic_qos_unpriv_conn_ele_t * curr ) { + fd_quic_qos_unpriv_conn_ele_t * tail = list->sentinel->prev; + return fd_quic_qos_unpriv_conn_ele_insert( tail, curr ); +} + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_list_pop_front( fd_quic_qos_unpriv_conn_list_t * list ) { + if ( FD_UNLIKELY( list->sentinel->next == list->sentinel ) ) { /* list is empty*/ + return NULL; + } + return fd_quic_qos_unpriv_conn_ele_remove( list->sentinel->next ); +} + +/* Unprivileged connection LRU algorithm: + + 1. The LRU contains a map of conn_id -> conn (ele) and a doubly-linked list + of conns. + 2. When a new conn comes in, we check if it is already in the LRU by looking + it up in the map. + 3. If it is already in the LRU, we move it to the back of the list. + 4. If it is not in the LRU, we check if the LRU is full. + 5. If the LRU is full, we evict 10% of the conns in the LRU. + 6. We then pop the next conn off the free list, push it onto the used list, + and insert it into the map. */ +fd_quic_qos_unpriv_conn_lru_t * +fd_quic_qos_unpriv_conn_upsert( fd_quic_qos_unpriv_conn_lru_t * lru, fd_quic_conn_t * conn ) { + fd_quic_qos_unpriv_conn_map_t * curr = + fd_quic_qos_unpriv_conn_map_query( lru->map, conn->local_conn_id, NULL ); + if ( FD_LIKELY( curr ) ) { /* more likely to be handling existing conns that new + conns */ + /* update existing conn to be the MRU */ + fd_quic_qos_unpriv_conn_list_push_back( lru->used_list, + fd_quic_qos_unpriv_conn_ele_remove( curr->ele ) ); + } else { /* new conn */ + /* check if LRU is full */ + if ( FD_UNLIKELY( lru->free_list->sentinel->next == lru->free_list->sentinel ) ) { + fd_quic_qos_unpriv_conn_ele_t * curr = lru->used_list->sentinel->prev; + /* if full, evict 10% of conns */ + int n = (int)( lru->max / 10 ); + for ( int i = 0; i < n; i++ ) { + fd_quic_qos_unpriv_conn_ele_t * pop_push = + fd_quic_qos_unpriv_conn_list_pop_front( lru->used_list ); + /* add to the free list */ + fd_quic_qos_unpriv_conn_list_push_back( lru->free_list, pop_push ); + fd_quic_qos_unpriv_conn_map_t * map_slot = + fd_quic_qos_unpriv_conn_map_query( lru->map, pop_push->conn->local_conn_id, NULL ); + /* if the ele is in the LRU list but not the map this is a programming error */ + if ( FD_UNLIKELY( fd_quic_qos_unpriv_conn_map_key_inval( map_slot->key ) ) ) { + FD_LOG_ERR( + ( "LRU list and map are out of sync. conn_id: %lu", curr->conn->local_conn_id ) ); + } + /* remove from the lookup cache */ + fd_quic_qos_unpriv_conn_map_remove( lru->map, map_slot ); + } + } + fd_quic_qos_unpriv_conn_ele_t * curr = fd_quic_qos_unpriv_conn_list_pop_front( lru->free_list ); + curr->conn = conn; + fd_quic_qos_unpriv_conn_list_push_back( lru->used_list, curr ); + fd_quic_qos_unpriv_conn_map_insert( lru->map, conn->local_conn_id ); + } + return lru; +} + +ulong +fd_quic_qos_unpriv_conn_lru_align( void ) { + return FD_QUIC_QOS_UNPRIV_CONN_LRU_ALIGN; +} + +ulong +fd_quic_qos_unpriv_conn_lru_footprint( ulong max ) { + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( + l, fd_quic_qos_unpriv_conn_lru_align(), sizeof( fd_quic_qos_unpriv_conn_lru_t ) ); + l = FD_LAYOUT_APPEND( + l, fd_quic_qos_unpriv_conn_list_align(), fd_quic_qos_unpriv_conn_list_footprint( max ) ); + l = FD_LAYOUT_APPEND( + l, fd_quic_qos_unpriv_conn_list_align(), fd_quic_qos_unpriv_conn_list_footprint( max ) ); + + return FD_QUIC_QOS_UNPRIV_CONN_LRU_ALIGN; +} + +ulong +fd_quic_qos_align( void ) { + return FD_QUIC_QOS_ALIGN; +} + +ulong +fd_quic_qos_footprint( fd_quic_qos_limits_t * limits ) { + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) ); + l = FD_LAYOUT_APPEND( l, + fd_quic_qos_priv_conn_align(), + fd_quic_qos_priv_conn_footprint( (int)limits->priv_conns ) ); + l = FD_LAYOUT_APPEND( l, + fd_quic_qos_priv_conn_align(), + fd_quic_qos_priv_conn_footprint( (int)limits->unpriv_conns ) ); + return FD_LAYOUT_FINI( l, fd_quic_qos_align() ); +} + +void * +fd_quic_qos_new( void * mem ) { + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_quic_qos_t * qos = FD_SCRATCH_ALLOC_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) ); + (void)qos; + return mem; +} + +fd_quic_qos_t * +fd_quic_qos_join( void * mem ) { + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_quic_qos_t * qos = FD_SCRATCH_ALLOC_APPEND( l, fd_quic_qos_align(), sizeof( fd_quic_qos_t ) ); + // qos->staked_node_map = fd_quic_qos_staked_node_join( FD_SCRATCH_ALLOC_APPEND( + // l, fd_quic_qos_staked_node_align(), fd_quic_qos_staked_node_footprint( 2 ) ) ); + (void)qos; + return qos; +} + +void +fd_quic_qos_conn_new( fd_quic_qos_t * qos, fd_quic_conn_t * conn ) { + fd_stake_pubkey_t pubkey; + fd_stake_pubkey_from_cert( &pubkey, SSL_get_peer_certificate( conn->tls_hs->ssl ) ); + fd_stake_staked_node_t * node = + fd_stake_staked_node_query( qos->stake->staked_nodes, pubkey, NULL ); + + ulong stake = 0; + if ( FD_UNLIKELY( ( !node ) ) ) { /* most incoming traffic likely unstaked */ + fd_quic_conn_set_context( conn, node ); + stake = node->stake; + } + + ulong total_stake = fd_ulong_max( qos->stake->total_stake, 1UL ); /* avoid division by zero */ + ulong share = ( stake * 100 / total_stake * 100 ); /* truncating division */ + fd_quic_qos_limits_t limits = qos->limits; + ulong max_streams = share * ( limits.max_streams - limits.min_streams ); + /* clamp */ + max_streams = fd_ulong_min( fd_ulong_max( max_streams, limits.min_streams ), limits.max_streams ); + fd_quic_conn_set_max_streams( conn, FD_QUIC_TYPE_UNIDIR, max_streams ); + FD_LOG_NOTICE( + ( "server: new connection with max streams %lu", + ( conn->max_streams[FD_QUIC_STREAM_TYPE_UNI_CLIENT] - FD_QUIC_STREAM_TYPE_UNI_CLIENT ) / + 4 ) ); +} + +void +fd_quic_qos_conn_evict( fd_quic_qos_t * qos, fd_quic_conn_t * conn ) { + fd_stake_pubkey_t pubkey; + fd_stake_pubkey_from_cert( &pubkey, SSL_get_peer_certificate( conn->tls_hs->ssl ) ); + fd_stake_staked_node_t * node = + fd_stake_staked_node_query( qos->stake->staked_nodes, pubkey, NULL ); + + if ( FD_LIKELY( ( node ) ) ) { /* optimize this eviction code path */ + /* Randomly sample 2 * lg(n) entries in the staked map and evict the + * smallest. Multiply by 2 because the map is sparse. */ + fd_quic_qos_priv_conn_t * arg_min = NULL; + ulong min = node->stake; + int lg_n = fd_quic_qos_priv_conn_lg_slot_cnt( qos->priv_conn_map ); + ulong n = fd_quic_qos_priv_conn_slot_cnt( qos->priv_conn_map ); + for ( int i = 0; i < 2 * lg_n; i++ ) { + ulong slot_idx = fd_rng_ulong( qos->rng ) % n; + fd_quic_qos_priv_conn_t random_slot = qos->priv_conn_map[slot_idx]; + /* Likely to find something given the map is full (linear-probing) */ + if ( FD_LIKELY( !fd_quic_qos_priv_conn_key_inval( random_slot.key ) ) ) { + fd_quic_conn_t * random_conn = random_slot.conn; + ulong stake = + ( (fd_stake_staked_node_t *)fd_quic_conn_get_context( random_conn ) )->stake; + if ( stake < min ) arg_min = &random_slot; + } + } + /* Unclear how likely this is... but probably won't evict anything */ + if ( FD_UNLIKELY( arg_min ) ) { + fd_quic_qos_priv_conn_remove( qos->priv_conn_map, arg_min ); + fd_quic_qos_priv_conn_insert( qos->priv_conn_map, conn->local_conn_id )->conn = conn; + return; + } + } + /* Otherwise upsert it into the LRU cache (which will update and evict as * neccessary) */ + fd_quic_qos_unpriv_conn_upsert( qos->unpriv_conn_lru, conn ); +} diff --git a/src/tango/quic/fd_quic_qos.h b/src/tango/quic/fd_quic_qos.h new file mode 100644 index 0000000000..7fb95cea5a --- /dev/null +++ b/src/tango/quic/fd_quic_qos.h @@ -0,0 +1,149 @@ +#ifndef HEADER_fd_src_tango_quic_fd_quic_qos_h +#define HEADER_fd_src_tango_quic_fd_quic_qos_h + +#include "../../ballet/txn/fd_txn.h" +#include "../../util/fd_util.h" +#include "../../util/fd_util_base.h" +#include "../stake/fd_stake.h" +#include "fd_quic_common.h" +#include "fd_quic_conn.h" +#include "../stake/fd_stake.h" +#include + +#define FD_QUIC_QOS_UNPRIV_CONN_LIST_ALIGN ( 128UL ) +#define FD_QUIC_QOS_UNPRIV_CONN_LRU_ALIGN ( 128UL ) +#define FD_QUIC_QOS_ALIGN ( 128UL ) + +/* Default limits */ +#define FD_QUIC_QOS_DEFAULT_MIN_STREAMS ( 1UL << 7 ) +#define FD_QUIC_QOS_DEFAULT_MAX_STREAMS ( 1UL << 11 ) +#define FD_QUIC_QOS_DEFAULT_TOTAL_STREAMS ( 1UL << 16 ) +#define FD_QUIC_QOS_DEFAULT_PRIV_CONNS ( 1UL << 16 ) +#define FD_QUIC_QOS_DEFAULT_UNPRIV_CONNS ( 1UL << 16 ) + +/* Configurable limits */ +struct fd_quic_qos_limits { + /* clang-format off */ + ulong min_streams; /* the min # of concurrent streams that can be alloted to a single conn */ + ulong max_streams; /* the max # of concurrent streams that can be alloted to a single conn */ + ulong total_streams; /* the total # of streams that can be alloted across all conns */ + ulong priv_conns; /* the max # of "privileged" conns. stake-based eviction. */ + ulong unpriv_conns; /* the max # of "unprivileged" conns. LRU-based eviction. */ + /* clang-format on */ +}; +typedef struct fd_quic_qos_limits fd_quic_qos_limits_t; + +/* staked connections randomized eviction queue */ +struct fd_quic_qos_priv_conn { + ulong key; /* conn_id */ + uint hash; + fd_quic_conn_t * conn; +}; +typedef struct fd_quic_qos_priv_conn fd_quic_qos_priv_conn_t; +#define MAP_NAME fd_quic_qos_priv_conn +#define MAP_T fd_quic_qos_priv_conn_t +#include "../../util/tmpl/fd_map_dynamic.c" + +typedef struct fd_quic_qos_unpriv_conn_ele fd_quic_qos_unpriv_conn_ele_t; /* forward decl */ +struct fd_quic_qos_unpriv_conn_ele { + fd_quic_conn_t * conn; + fd_quic_qos_unpriv_conn_ele_t * prev; + fd_quic_qos_unpriv_conn_ele_t * next; +}; + +struct fd_quic_qos_unpriv_conn_list { + fd_quic_qos_unpriv_conn_ele_t * sentinel; +}; +typedef struct fd_quic_qos_unpriv_conn_list fd_quic_qos_unpriv_conn_list_t; + +/* unstaked connections for O(1) lookup into LRU list */ +struct fd_quic_qos_unpriv_conn_map { + ulong key; /* conn_id */ + uint hash; + fd_quic_qos_unpriv_conn_ele_t * ele; +}; +typedef struct fd_quic_qos_unpriv_conn_map fd_quic_qos_unpriv_conn_map_t; + +#define MAP_NAME fd_quic_qos_unpriv_conn_map +#define MAP_T fd_quic_qos_unpriv_conn_map_t +#include "../../util/tmpl/fd_map_dynamic.c" + +struct fd_quic_qos_unpriv_conn_lru { + fd_quic_qos_unpriv_conn_list_t * used_list; + fd_quic_qos_unpriv_conn_list_t * free_list; + fd_quic_qos_unpriv_conn_map_t * map; + ulong max; +}; +typedef struct fd_quic_qos_unpriv_conn_lru fd_quic_qos_unpriv_conn_lru_t; + +struct fd_quic_qos { + fd_quic_qos_limits_t limits; + fd_stake_t * stake; + fd_quic_qos_priv_conn_t * priv_conn_map; + fd_quic_qos_unpriv_conn_lru_t * unpriv_conn_lru; + fd_rng_t * rng; +}; +typedef struct fd_quic_qos fd_quic_qos_t; + +FD_PROTOTYPES_BEGIN + +/* Extract the ed25519 public key from an X509 cert */ +int +fd_quic_qos_pubkey_from_cert( fd_stake_pubkey_t * pubkey, X509 * cert ); + +/* An element can insert itself, given a pointer to the previous element to insert after. */ +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_ele_insert( fd_quic_qos_unpriv_conn_ele_t * prev, + fd_quic_qos_unpriv_conn_ele_t * ele ); + +/* An element can remove itself. */ +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_ele_remove( fd_quic_qos_unpriv_conn_ele_t * ele ); + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_list_push_back( fd_quic_qos_unpriv_conn_list_t * list, + fd_quic_qos_unpriv_conn_ele_t * curr ); + +fd_quic_qos_unpriv_conn_ele_t * +fd_quic_qos_unpriv_conn_list_pop_front( fd_quic_qos_unpriv_conn_list_t * list ); + +ulong +fd_quic_qos_unpriv_conn_lru_align( void ); + +ulong +fd_quic_qos_unpriv_conn_lru_footprint( ulong max ); + +void * +fd_quic_qos_unpriv_conn_lru_new( void * mem ); + +fd_quic_qos_t * +fd_quic_qos_unpriv_conn_lru_join( void * mem ); + +/* Upserts to an LRU cache with eviction if the cache is full. + New connections get cached in the LRU map for O(1) subsequent update / remove. */ +fd_quic_qos_unpriv_conn_lru_t * +fd_quic_qos_unpriv_conn_upsert( fd_quic_qos_unpriv_conn_lru_t * lru, fd_quic_conn_t * conn ); + +ulong +fd_quic_qos_align( void ); + +ulong +fd_quic_qos_footprint( fd_quic_qos_limits_t * limits ); + +void * +fd_quic_qos_new( void * mem ); + +fd_quic_qos_t * +fd_quic_qos_join( void * mem ); + +/* Determine how many streams to allocate to this conn */ +void +fd_quic_qos_conn_new( fd_quic_qos_t * qos, fd_quic_conn_t * conn ); + +/* Determine which conn to evict */ +void +fd_quic_qos_conn_evict( fd_quic_qos_t * qos, fd_quic_conn_t * conn ); + +FD_PROTOTYPES_END + +#endif /* HEADER_fd_src_tango_quic_fd_quic_qos_h */ diff --git a/src/tango/quic/tests/Local.mk b/src/tango/quic/tests/Local.mk index 5cf968ccfa..9cfaa6e3b4 100644 --- a/src/tango/quic/tests/Local.mk +++ b/src/tango/quic/tests/Local.mk @@ -6,6 +6,7 @@ $(call make-unit-test,test_quic_hs,test_quic_hs,fd_aio fd_quic fd_ballet fd_tang $(call make-unit-test,test_quic_streams,test_quic_streams,fd_aio fd_ballet fd_tango fd_quic fd_util) $(call make-unit-test,test_quic_conn,test_quic_conn,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_server,test_quic_server,fd_aio fd_ballet fd_quic fd_tango fd_util) +$(call make-unit-test,test_quic_qos_server,test_quic_qos_server,fd_aio fd_ballet fd_quic fd_tango fd_util) $(call make-unit-test,test_quic_client_flood,test_quic_client_flood,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_bw,test_quic_bw,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_handshake,test_handshake,fd_aio fd_ballet fd_quic fd_util) @@ -19,6 +20,7 @@ $(call make-unit-test,test_quic_tls_both,test_tls_quic_both,fd_quic fd_ballet fd # $(call make-unit-test,test_quic_flow_control,test_quic_flow_control,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_retry_unit,test_quic_retry_unit,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_retry_integration,test_quic_retry_integration,fd_aio fd_quic fd_ballet fd_tango fd_util) +$(call make-unit-test,test_quic_qos_unit,test_quic_qos_unit,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call run-unit-test,test_quic_hs) $(call run-unit-test,test_quic_streams) diff --git a/src/tango/quic/tests/test_quic_qos_server.c b/src/tango/quic/tests/test_quic_qos_server.c new file mode 100644 index 0000000000..d6bcfc8ed3 --- /dev/null +++ b/src/tango/quic/tests/test_quic_qos_server.c @@ -0,0 +1,200 @@ +#include + +#include +#include +#include + +#include "../../../util/fd_util_base.h" +#include "../../../util/net/fd_eth.h" +#include "../../../util/net/fd_ip4.h" +#include "../../../util/rng/fd_rng.h" + +#include "../fd_quic.h" +#include "../tls/fd_quic_tls.h" +#include "fd_quic_test_helpers.h" + +#include "../../xdp/fd_xdp_redirect_user.h" +#include "../../xdp/fd_xsk.h" +#include "../../xdp/fd_xsk_aio.h" + +#include "../../../ballet/ed25519/fd_ed25519_openssl.h" +#include "../../../ballet/x509/fd_x509.h" + +#include "../../stake/fd_stake.h" + +#include "../fd_quic_qos.h" + +int server_complete = 0; + +void +conn_new( fd_quic_conn_t * conn, void * ctx ) { + fd_quic_qos_conn_new( (fd_quic_qos_t *)ctx, conn ); +} + +void +conn_evict( fd_quic_conn_t * conn, void * ctx ) { + fd_quic_qos_conn_new( (fd_quic_qos_t *)ctx, conn ); +} + +int +save_x509_cert_to_pem_file( X509 * cert, const char * pem_file ) { + FILE * pem_fp = fopen( pem_file, "wb" ); + if ( FD_UNLIKELY( pem_fp == NULL ) ) return 1; + if ( FD_UNLIKELY( !PEM_write_X509( pem_fp, cert ) ) ) return 1; + if ( FD_UNLIKELY( fclose( pem_fp ) ) ) return 1; + return 0; +} + +EVP_PKEY * +get_public_key_from_pem( const char * filename ) { + BIO * bio = BIO_new_file( filename, "r" ); + if ( bio == NULL ) { return NULL; } + + X509 * x509 = PEM_read_bio_X509( bio, NULL, NULL, NULL ); + if ( x509 == NULL ) { + BIO_free( bio ); + return NULL; + } + + EVP_PKEY * pkey = X509_get_pubkey( x509 ); + BIO_free( bio ); + + return pkey; +} + +int +main( int argc, char ** argv ) { + fd_boot( &argc, &argv ); + + ulong cpu_idx = fd_tile_cpu_id( fd_tile_idx() ); + if ( cpu_idx >= fd_shmem_cpu_cnt() ) cpu_idx = 0UL; + + /* clang-format off */ + char const * _page_sz = fd_env_strip_cmdline_cstr( &argc, &argv, "--page-sz", NULL, "gigantic" ); + ulong page_cnt = fd_env_strip_cmdline_ulong( &argc, &argv, "--page-cnt", NULL, 1UL ); + ulong numa_idx = fd_env_strip_cmdline_ulong( &argc, &argv, "--numa-idx", NULL, fd_shmem_numa_idx( cpu_idx ) ); + /* clang-format on */ + + ulong page_sz = fd_cstr_to_shmem_page_sz( _page_sz ); + if ( FD_UNLIKELY( !page_sz ) ) FD_LOG_ERR( ( "unsupported --page-sz" ) ); + + fd_quic_limits_t quic_limits = { 0 }; + fd_quic_limits_from_env( &argc, &argv, &quic_limits ); + + FD_LOG_NOTICE( ( "Creating workspace with --page-cnt %lu --page-sz %s pages on --numa-idx %lu", + page_cnt, + _page_sz, + numa_idx ) ); + fd_wksp_t * wksp = + fd_wksp_new_anonymous( page_sz, page_cnt, fd_shmem_cpu_idx( numa_idx ), "wksp", 0UL ); + FD_TEST( wksp ); + + FD_LOG_NOTICE( ( "Creating server QUIC" ) ); + fd_quic_t * quic = fd_quic_new_anonymous( wksp, &quic_limits, FD_QUIC_ROLE_SERVER ); + FD_TEST( quic ); + quic->cb.conn_new = conn_new; + + fd_quic_udpsock_t _udpsock[1]; + fd_quic_udpsock_t * udpsock = + fd_quic_udpsock_create( _udpsock, &argc, &argv, wksp, fd_quic_get_aio_net_rx( quic ) ); + FD_TEST( udpsock ); + + /* Transport params: + original_destination_connection_id (0x00) : len(0) + max_idle_timeout (0x01) : * 60000 + stateless_reset_token (0x02) : len(0) + max_udp_payload_size (0x03) : 0 + initial_max_data (0x04) : * 1048576 + initial_max_stream_data_bidi_local (0x05) : * 1048576 + initial_max_stream_data_bidi_remote (0x06) : * 1048576 + initial_max_stream_data_uni (0x07) : * 1048576 + initial_max_streams_bidi (0x08) : * 128 + initial_max_streams_uni (0x09) : * 128 + ack_delay_exponent (0x0a) : * 3 + max_ack_delay (0x0b) : * 25 + disable_active_migration (0x0c) : 0 + preferred_address (0x0d) : len(0) + active_connection_id_limit (0x0e) : * 8 + initial_source_connection_id (0x0f) : * len(8) ec 73 1b 41 a0 d5 c6 fe + retry_source_connection_id (0x10) : len(0) */ + + fd_quic_config_t * quic_config = &quic->config; + FD_TEST( quic_config ); + + quic_config->role = FD_QUIC_ROLE_SERVER; + FD_TEST( fd_quic_config_from_env( &argc, &argv, quic_config ) ); + + memcpy( quic_config->link.src_mac_addr, udpsock->self_mac, 6UL ); + quic_config->net.ip_addr = udpsock->listen_ip; + quic_config->net.listen_udp_port = udpsock->listen_port; + + uchar pkey[32] = { + 137, 115, 254, 55, 116, 55, 118, 19, 151, 66, 229, 24, 188, 62, 99, 209, + 162, 16, 6, 7, 24, 81, 152, 128, 139, 234, 170, 93, 88, 204, 245, 205, + }; + quic->cert_key_object = fd_ed25519_pkey_from_private( pkey ); + quic->cert_object = fd_x509_gen_solana_cert( quic->cert_key_object ); + + EVP_PKEY * pk = get_public_key_from_pem( "cert.pem" ); + uchar pk_buf[32]; + size_t len; + EVP_PKEY_get_raw_public_key( pk, NULL, &len ); + FD_LOG_NOTICE( ( "len %lu", len ) ); + EVP_PKEY_get_raw_public_key( pk, pk_buf, &len ); + FD_LOG_HEXDUMP_NOTICE( ( "pk_buf", pk_buf, len ) ); + + if ( FD_UNLIKELY( save_x509_cert_to_pem_file( quic->cert_object, "cert.pem" ) ) ) { + FD_LOG_ERR( ( "failed to save solana pubkey into cert.pem" ) ); + } + + fd_quic_set_aio_net_tx( quic, udpsock->aio ); + + if ( FD_UNLIKELY( argc > 1 ) ) FD_LOG_ERR( ( "unrecognized argument: %s", argv[1] ) ); + + FD_LOG_NOTICE( ( "Initializing QUIC" ) ); + FD_TEST( fd_quic_init( quic ) ); + + fd_quic_qos_limits_t qos_limits = { .min_streams = FD_QUIC_QOS_DEFAULT_MIN_STREAMS, + .max_streams = FD_QUIC_QOS_DEFAULT_MAX_STREAMS, + .total_streams = FD_QUIC_QOS_DEFAULT_TOTAL_STREAMS, + .priv_conns = 0, + .unpriv_conns = 0 }; + void * mem = + fd_wksp_alloc_laddr( wksp, fd_quic_qos_align(), fd_quic_qos_footprint( &qos_limits ), 42UL ); + fd_quic_qos_t * qos = fd_quic_qos_join( fd_quic_qos_new( mem ) ); + FD_TEST( qos ); + + fd_stake_pubkey_t pubkey = { + .pubkey = {44, 174, 25, 39, 43, 255, 200, 81, 55, 73, 10, 113, 174, 91, 223, 80, + 50, 51, 102, 25, 63, 110, 36, 28, 51, 11, 174, 179, 110, 8, 25, 152} + }; + fd_stake_staked_node_t * staked_node = + fd_stake_staked_node_insert( qos->stake->staked_nodes, pubkey ); + staked_node->stake = 1; + + fd_stake_pubkey_t pubkey2 = { + .pubkey = {250, 56, 248, 84, 190, 46, 154, 76, 15, 72, 181, 205, 32, 96, 128, 213, + 158, 33, 81, 193, 63, 154, 93, 254, 15, 81, 32, 175, 54, 60, 179, 224} + }; + fd_stake_staked_node_t * staked_node2 = + fd_stake_staked_node_insert( qos->stake->staked_nodes, pubkey2 ); + staked_node2->stake = 2; + + // conn_evict() + + /* do general processing */ + // while ( 1 ) { + // fd_quic_service( quic ); + // fd_quic_udpsock_service( udpsock ); + // } + + FD_TEST( fd_quic_fini( quic ) ); + + fd_wksp_free_laddr( fd_quic_delete( fd_quic_leave( quic ) ) ); + fd_quic_udpsock_destroy( udpsock ); + fd_wksp_delete_anonymous( wksp ); + + FD_LOG_NOTICE( ( "pass" ) ); + fd_halt(); + return 0; +} diff --git a/src/tango/quic/tests/test_quic_qos_unit.c b/src/tango/quic/tests/test_quic_qos_unit.c new file mode 100644 index 0000000000..aa8dbd4a57 --- /dev/null +++ b/src/tango/quic/tests/test_quic_qos_unit.c @@ -0,0 +1,73 @@ +#include "../../../util/fd_util.h" +#include "../../stake/fd_stake.h" +#include "../fd_quic_qos.h" + +#define NUM_PUBKEYS 3 + +fd_stake_pubkey_t pubkeys[NUM_PUBKEYS] = { + { .pubkey = { 44, 174, 25, 39, 43, 255, 200, 81, 55, 73, 10, 113, 174, 91, 223, 80, + 50, 51, 102, 25, 63, 110, 36, 28, 51, 11, 174, 179, 110, 8, 25, 152 } }, + + { .pubkey = { 250, 56, 248, 84, 190, 46, 154, 76, 15, 72, 181, 205, 32, 96, 128, 213, + 158, 33, 81, 193, 63, 154, 93, 254, 15, 81, 32, 175, 54, 60, 179, 224 } }, + + { .pubkey = { 225, 102, 95, 246, 174, 91, 1, 240, 118, 174, 119, 113, 150, 146, 149, 29, + 253, 10, 69, 168, 188, 51, 31, 11, 67, 18, 201, 181, 189, 178, 159, 178 } } }; + +void +test_fd_qos_conn_lru( void ) { + fd_wksp_t * wksp = fd_wksp_new_anonymous( + FD_SHMEM_GIGANTIC_PAGE_SZ, 1, fd_shmem_cpu_idx( fd_shmem_numa_idx( 0 ) ), "wksp", 0UL ); + FD_TEST( wksp ); + + fd_quic_qos_limits_t qos_limits = { + .min_streams = 1, + .max_streams = 1, + .total_streams = 1, + .priv_conns = 2, + .unpriv_conns = 2, + }; + FD_TEST( ( 1 << FD_STAKE_LG_MAX_STAKED_NODES ) >= NUM_PUBKEYS ); + void * mem = + fd_wksp_alloc_laddr( wksp, fd_quic_qos_align(), fd_quic_qos_footprint( &qos_limits ), 42UL ); + fd_quic_qos_t * qos = fd_quic_qos_join( fd_quic_qos_new( mem ) ); + fd_stake_staked_node_t * staked_nodes = qos->stake->staked_nodes; + + for ( ulong i = 0; i < NUM_PUBKEYS; i++ ) { + fd_stake_staked_node_t * staked_node = fd_stake_staked_node_insert( staked_nodes, pubkeys[i] ); + FD_TEST( staked_node ); + staked_node->stake = i; + } + + for ( ulong i = 0; i < fd_stake_staked_node_slot_cnt( staked_nodes ); i++ ) { + fd_stake_staked_node_t staked_node = staked_nodes[i]; + if ( !fd_stake_staked_node_key_inval( staked_node.key ) ) { + FD_LOG_HEXDUMP_NOTICE( ( "pubkey", &staked_node.key, sizeof( staked_node.key ) ) ); + } + } + + for ( ulong i = 0; i < NUM_PUBKEYS; i++ ) { + for ( ulong j = 0; j < fd_stake_staked_node_slot_cnt( staked_nodes ); j++ ) { + // fd_stake_staked_node_t staked_node = qos->staked_node_map[j]; + // int rc = !( memcmp( staked_node.key.pubkey, pubkeys[i].pubkey, FD_TXN_PUBKEY_SZ ) ); + // FD_LOG_NOTICE(("rc %d", rc)); + } + fd_stake_staked_node_t * staked_node = + fd_stake_staked_node_query( staked_nodes, pubkeys[i], NULL ); + FD_TEST( staked_node ); + FD_TEST( staked_node->stake == i ); + } +} + +int +main( int argc, char ** argv ) { + fd_boot( &argc, &argv ); + + if ( FD_UNLIKELY( argc > 1 ) ) FD_LOG_ERR( ( "unrecognized argument: %s", argv[1] ) ); + + test_fd_qos_conn_lru(); + + FD_LOG_NOTICE( ( "pass" ) ); + fd_halt(); + return 0; +} diff --git a/src/tango/stake/Local.mk b/src/tango/stake/Local.mk new file mode 100644 index 0000000000..ac311477ba --- /dev/null +++ b/src/tango/stake/Local.mk @@ -0,0 +1,4 @@ +$(call add-hdrs,fd_stake.h) +$(call add-objs,fd_stake,fd_tango) +$(call make-unit-test,fd_stake,fd_stake,fd_tango fd_util) +$(call run-unit-test,fd_stake,) diff --git a/src/tango/stake/fd_stake.c b/src/tango/stake/fd_stake.c new file mode 100644 index 0000000000..29d3513008 --- /dev/null +++ b/src/tango/stake/fd_stake.c @@ -0,0 +1,41 @@ +#include "fd_stake.h" + +ulong +fd_stake_align( void ) { + return FD_STAKE_ALIGN; +} + +ulong +fd_stake_footprint( void ) { + ulong l; + l = FD_LAYOUT_INIT; + l = FD_LAYOUT_APPEND( l, fd_stake_align(), sizeof( fd_stake_t ) ); + l = FD_LAYOUT_APPEND( l, + fd_stake_staked_node_align(), + fd_stake_staked_node_footprint( FD_STAKE_LG_MAX_STAKED_NODES ) ); + return FD_LAYOUT_FINI( l, fd_stake_align() ); +} + +void * +fd_stake_new( void * mem ) { + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_stake_t * stake = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_align(), sizeof( fd_stake_t ) ); + void * _staked_node_map = + FD_SCRATCH_ALLOC_APPEND( l, + fd_stake_staked_node_align(), + fd_stake_staked_node_footprint( FD_STAKE_LG_MAX_STAKED_NODES ) ); + fd_stake_staked_node_new( _staked_node_map, FD_STAKE_LG_MAX_STAKED_NODES ); + stake->version = 0; + return mem; +} + +fd_stake_t * +fd_stake_join( void * mem ) { + FD_SCRATCH_ALLOC_INIT( l, mem ); + fd_stake_t * stake = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_align(), sizeof( fd_stake_t ) ); + stake->staked_nodes = fd_stake_staked_node_join( + FD_SCRATCH_ALLOC_APPEND( l, + fd_stake_staked_node_align(), + fd_stake_staked_node_footprint( FD_STAKE_LG_MAX_STAKED_NODES ) ) ); + return stake; +} diff --git a/src/tango/stake/fd_stake.h b/src/tango/stake/fd_stake.h new file mode 100644 index 0000000000..177af43c6c --- /dev/null +++ b/src/tango/stake/fd_stake.h @@ -0,0 +1,52 @@ +#ifndef HEADER_fd_src_tango_stake_fd_stake_h +#define HEADER_fd_src_tango_stake_fd_stake_h + +#include "../../ballet/txn/fd_txn.h" + +#define FD_STAKE_ALIGN ( 128 ) +#define FD_STAKE_LG_MAX_STAKED_NODES ( 16 ) + +struct fd_stake_pubkey { + uchar pubkey[FD_TXN_PUBKEY_SZ]; +}; +typedef struct fd_stake_pubkey fd_stake_pubkey_t; +static fd_stake_pubkey_t pubkey_null = { 0 }; + +/* Staked node map */ +struct fd_stake_staked_node { + fd_stake_pubkey_t key; + uint hash; + ulong stake; +}; +typedef struct fd_stake_staked_node fd_stake_staked_node_t; + +#define MAP_NAME fd_stake_staked_node +#define MAP_T fd_stake_staked_node_t +#define MAP_KEY_T fd_stake_pubkey_t +#define MAP_KEY_NULL pubkey_null +#define MAP_KEY_INVAL( k ) !( memcmp( &k, &pubkey_null, sizeof( fd_stake_pubkey_t ) ) ) +#define MAP_KEY_EQUAL( k0, k1 ) !( memcmp( ( k0.pubkey ), ( k1.pubkey ), FD_TXN_PUBKEY_SZ ) ) +#define MAP_KEY_EQUAL_IS_SLOW 1 +#define MAP_KEY_HASH( key ) ( *(uint *)( fd_type_pun( key.pubkey ) ) ) +#include "../../util/tmpl/fd_map_dynamic.c" + +struct fd_stake { + ulong version; /* MVCC version # */ + fd_stake_staked_node_t * staked_nodes; + ulong total_stake; +}; +typedef struct fd_stake fd_stake_t; + +ulong +fd_stake_align( void ); + +ulong +fd_stake_footprint( void ); + +void * +fd_stake_new( void * mem ); + +fd_stake_t * +fd_stake_join( void * mem ); + +#endif /* HEADER_fd_src_tango_stake_fd_stake_h */ diff --git a/src/tango/xdp/fd_xdp_ctl.c b/src/tango/xdp/fd_xdp_ctl.c index 95c0d3c4ce..92c75ac056 100644 --- a/src/tango/xdp/fd_xdp_ctl.c +++ b/src/tango/xdp/fd_xdp_ctl.c @@ -240,7 +240,7 @@ main( int argc, if( FD_UNLIKELY( !shxsk ) ) FD_LOG_ERR(( "%i: %s: fd_wksp_map(%s) failed\n\tDo %s help for help", cnt, cmd, _shxsk, bin )); if( FD_UNLIKELY( !fd_xsk_bind( shxsk, app_name, ifname, ifqueue ) ) ) - FD_LOG_ERR(( "%i: %s: fd_xsk_unbind(%s,%s,%u) failed\n\tDo %s help for help", + FD_LOG_ERR(( "%i: %s: fd_xsk_bind(%s,%s,%u) failed\n\tDo %s help for help", cnt, cmd, _shxsk, ifname, ifqueue, bin )); fd_wksp_unmap( shxsk );