Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tpu): stake weighted QUIC QoS and flow control #512

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1788e0d
feat(quic): qos incl. stake weighted flow control + conn eviction
lidatong Jul 3, 2023
05ab26b
wip
lidatong Jul 25, 2023
5641b89
Merge branch 'main' into chali/feat/quic-tpu-stake-weighted-flow-control
lidatong Jul 25, 2023
5e239cb
stream pool implementation. fd_quic_conn_set_max_stream flow control
nbridge-jump Jul 26, 2023
b6cadaa
Removing whitespace
nbridge-jump Jul 27, 2023
b3ffdf1
feat(tango): LRU cache
lidatong Jul 30, 2023
c1b5d5e
more wip
lidatong Jul 31, 2023
edbc050
feat(tango): staked nodes shmem and ffi
lidatong Jul 31, 2023
df400fd
Merge branch 'chali/feat/fd-stake' into chali/feat/quic-tpu-stake-wei…
lidatong Jul 31, 2023
896fd6b
Merge branch 'feat/tango/lru' into chali/feat/quic-tpu-stake-weighted…
lidatong Jul 31, 2023
191d15a
refactor(qos): remove deprecated LRU
lidatong Jul 31, 2023
9215c31
Merge branch 'nbridge/stream-pool' into chali/feat/quic-tpu-stake-wei…
lidatong Jul 31, 2023
5a8d15d
wip
lidatong Aug 1, 2023
a92c016
Merge branch 'main' into chali/feat/quic-tpu-stake-weighted-flow-control
lidatong Aug 1, 2023
67cc27a
Merge branch 'main' into chali/feat/quic-tpu-stake-weighted-flow-control
lidatong Aug 2, 2023
d1bd1a0
fix: various memory issues
lidatong Aug 6, 2023
2acf7b7
test: add mvcc tests
lidatong Aug 6, 2023
5935cc3
fix: memory layout issues with fd stake
lidatong Aug 6, 2023
81eb44a
Merge branch 'chali/feat/fd-stake' into chali/feat/quic-tpu-stake-wei…
lidatong Aug 6, 2023
67051e8
feat: openssl peer verify
lidatong Aug 10, 2023
a68b568
feat: tpu (quic tile) modifications for qos
lidatong Aug 10, 2023
2274b0d
feat: qos connection counter and quic eviction / closing
lidatong Aug 10, 2023
3e1a955
test: qos
lidatong Aug 10, 2023
6fbd67e
feat: socket support
lidatong Aug 10, 2023
fb4265b
refactor: remove unused code
lidatong Aug 10, 2023
3be8d23
Merge branch 'chali/feat/fd-stake' into chali/feat/quic-tpu-stake-wei…
lidatong Aug 10, 2023
f2bcf7b
refactor: fd_quic_qos include
lidatong Aug 10, 2023
5ddeb2a
update test_mvcc.c to pass
llamb-jump Aug 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/rust/firedancer-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
.clang_arg(format!("-I{prefix}/"))
.header(&format!("wrapper_{lib}.h"))
.blocklist_type("schar|uchar|ushort|uint|ulong")
.blocklist_item("SORT_QUICK_ORDER_STYLE|SORT_MERGE_THRESH|SORT_QUICK_THRESH|SORT_QUICK_ORDER_STYLE|SORT_QUICK_SWAP_MINIMIZE");
.blocklist_item("SORT_QUICK_ORDER_STYLE|SORT_MERGE_THRESH|SORT_QUICK_THRESH|SORT_QUICK_ORDER_STYLE|SORT_QUICK_SWAP_MINIMIZE|MAP_MEMOIZE|MAP_QUERY_OPT|MAP_KEY_EQUAL_IS_SLOW");

// Well this is a complete mess. We want to only include, say, functions
// declared in the `ballet` directory in the ballet bindgen output. If
Expand Down
2 changes: 2 additions & 0 deletions ffi/rust/firedancer-sys/src/tango/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod fseq;
mod mcache;
mod tcache;
mod xdp;
mod stake;

pub use cnc::*;
pub use dcache::*;
Expand All @@ -13,3 +14,4 @@ pub use fseq::*;
pub use mcache::*;
pub use tcache::*;
pub use xdp::*;
pub use stake::*;
13 changes: 13 additions & 0 deletions ffi/rust/firedancer-sys/src/tango/stake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub use crate::generated::{
fd_stake_t,
fd_stake_align,
fd_stake_footprint,
fd_stake_join,
fd_stake_new,
fd_stake_version,
fd_stake_version_laddr,
fd_stake_write,
fd_stake_read,
fd_stake_dump,
FD_STAKE_ALIGN
};
3 changes: 2 additions & 1 deletion src/app/fdctl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ typedef struct {
ushort listen_port;
char xdp_mode[ 8 ];

uint max_concurrent_connections;
ulong max_concurrent_connections;
uint max_concurrent_connection_ids_per_connection;
uint max_concurrent_streams_per_connection;
uint max_concurrent_handshakes;
uint max_inflight_quic_packets;
uint tx_buf_size;
int stake_lg_slot_cnt;
uint xdp_rx_queue_size;
uint xdp_tx_queue_size;
uint xdp_aio_depth;
Expand Down
9 changes: 8 additions & 1 deletion src/app/fdctl/config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ dynamic_port_range = "8000-10000"

# Maximum number of simultaneous QUIC connections which can be open. New
# connections which would exceed this limit will not be accepted.
#
# Must be >=2 and a power of 2.
max_concurrent_connections = 32

# While in TCP a connection is identified by (Source IP, Source Port,
Expand All @@ -393,7 +395,9 @@ dynamic_port_range = "8000-10000"
#
# Currently this option does nothing, as we do not support creating
# additional connection IDs.
max_concurrent_connection_ids_per_connection = 16
#
# Should be in sync with `fd_quic_conn.h`.
max_concurrent_connection_ids_per_connection = 4

# QUIC allows for multiple streams to be multiplexed over a single
# connection. This option sets the maximum number of simultaneous
Expand Down Expand Up @@ -438,6 +442,9 @@ dynamic_port_range = "8000-10000"
# should be unused.
tx_buf_size = 4096

# Lg number of stake nodes to track
stake_lg_slot_cnt = 10

# Firedancer uses XDP for fast packet processing. XDP supports two
# modes, XDP_SKB and XDP_DRV. XDP_DRV is preferred as it is faster, but
# is not supported by all drivers.
Expand Down
102 changes: 83 additions & 19 deletions src/app/fdctl/configure/workspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@

#include "../../../tango/fd_tango.h"
#include "../../../tango/quic/fd_quic.h"
#include "../../../tango/quic/fd_quic_qos.h"
#include "../../../tango/xdp/fd_xsk_aio.h"
#include "../../../tango/udpsock/fd_udpsock.h"

#include <sys/stat.h>
#include <linux/capability.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

#define NAME "workspace"
#define FD_HAS_XDP 0

static void
init_perm( security_t * security,
Expand Down Expand Up @@ -81,6 +90,20 @@ static void quic( void * pod, char * fmt, fd_quic_limits_t * limits, ... ) {
fd_quic_new ( shmem, limits ) );
}

static void quic_qos( void * pod, char * fmt, fd_quic_qos_limits_t * limits, ... ) {
INSERTER( limits,
fd_quic_qos_align ( ),
fd_quic_qos_footprint( limits ),
fd_quic_qos_new ( shmem, limits ) );
}

static void stake( void * pod, char * fmt, int lg_slot_cnt, ... ) {
INSERTER( lg_slot_cnt,
fd_stake_align ( ),
fd_stake_footprint( lg_slot_cnt ),
fd_stake_new ( shmem, lg_slot_cnt ) );
}

static void xsk( void * pod, char * fmt, ulong frame_sz, ulong rx_depth, ulong tx_depth, ... ) {
INSERTER( tx_depth,
fd_xsk_align ( ),
Expand All @@ -95,6 +118,13 @@ static void xsk_aio( void * pod, char * fmt, ulong tx_depth, ulong batch_count,
fd_xsk_aio_new ( shmem, tx_depth, batch_count ) );
}

static void udpsock( void * pod, char * fmt, ulong frame_sz, ulong rx_depth, ulong tx_depth, ... ) {
INSERTER( tx_depth,
fd_udpsock_align ( ),
fd_udpsock_footprint( frame_sz, rx_depth, tx_depth ),
fd_udpsock_new ( shmem, frame_sz, rx_depth, tx_depth ) );
}

static void alloc( void * pod, char * fmt, ulong align, ulong sz, ... ) {
INSERTER( sz, align, sz, 1 );
}
Expand Down Expand Up @@ -196,19 +226,6 @@ init( config_t * const config ) {
if( FD_LIKELY( uid == 0 && seteuid( config->uid ) ) )
FD_LOG_ERR(( "seteuid() failed (%i-%s)", errno, strerror( errno ) ));

fd_quic_limits_t limits = {
.conn_cnt = config->tiles.quic.max_concurrent_connections,
.handshake_cnt = config->tiles.quic.max_concurrent_handshakes,
.conn_id_cnt = config->tiles.quic.max_concurrent_connection_ids_per_connection,
.conn_id_sparsity = 0.0,
.inflight_pkt_cnt = config->tiles.quic.max_inflight_quic_packets,
.tx_buf_sz = config->tiles.quic.tx_buf_size,
.stream_cnt[ FD_QUIC_STREAM_TYPE_BIDI_CLIENT ] = 0,
.stream_cnt[ FD_QUIC_STREAM_TYPE_BIDI_SERVER ] = 0,
.stream_cnt[ FD_QUIC_STREAM_TYPE_UNI_CLIENT ] = config->tiles.quic.max_concurrent_streams_per_connection,
.stream_cnt[ FD_QUIC_STREAM_TYPE_UNI_SERVER ] = 0,
};

for( ulong j=0; j<config->shmem.workspaces_cnt; j++ ) {
workspace_config_t * wksp1 = &config->shmem.workspaces[ j ];
WKSP_BEGIN( config, wksp1, 0 );
Expand Down Expand Up @@ -242,17 +259,64 @@ init( config_t * const config ) {
fseq ( pod, "fseq%lu", i );
}
break;
case wksp_quic:
cnc ( pod, "cnc" );
quic ( pod, "quic", &limits );
xsk ( pod, "xsk", 2048, config->tiles.quic.xdp_rx_queue_size, config->tiles.quic.xdp_tx_queue_size );
xsk_aio( pod, "xsk_aio", config->tiles.quic.xdp_tx_queue_size, config->tiles.quic.xdp_aio_depth );

case wksp_quic:;
fd_quic_limits_t quic_limits = {
.conn_cnt = config->tiles.quic.max_concurrent_connections,
.handshake_cnt = config->tiles.quic.max_concurrent_handshakes,
.conn_id_cnt = config->tiles.quic.max_concurrent_connection_ids_per_connection,
.conn_id_sparsity = 0.0,
.inflight_pkt_cnt = config->tiles.quic.max_inflight_quic_packets,
.tx_buf_sz = config->tiles.quic.tx_buf_size,
.stream_cnt[ FD_QUIC_STREAM_TYPE_BIDI_CLIENT ] = 0,
.stream_cnt[ FD_QUIC_STREAM_TYPE_BIDI_SERVER ] = 0,
.stream_cnt[ FD_QUIC_STREAM_TYPE_UNI_CLIENT ] = config->tiles.quic.max_concurrent_streams_per_connection,
.stream_cnt[ FD_QUIC_STREAM_TYPE_UNI_SERVER ] = 0,
};

int lg_max_conns = fd_ulong_find_msb( config->tiles.quic.max_concurrent_connections );
if ( lg_max_conns < 1 ) FD_LOG_ERR( ( "max_concurrent_connections must be at least 2." ) );
fd_quic_qos_limits_t quic_qos_limits = {
.min_streams = FD_QUIC_QOS_DEFAULT_MIN_STREAMS,
.max_streams = FD_QUIC_QOS_DEFAULT_MAX_STREAMS,
.total_streams = FD_QUIC_QOS_DEFAULT_TOTAL_STREAMS,
.pq_lg_slot_cnt = lg_max_conns - 1,
.lru_depth = config->tiles.quic.max_concurrent_connections >> 1,
};

cnc ( pod, "cnc" );
quic ( pod, "quic", &quic_limits );
quic_qos( pod, "quic_qos", &quic_qos_limits );
stake ( pod, "stake", 10 ); // FIXME slot cnt is not getting parsed

#if FD_HAS_XDP
(void)udpsock;
xsk ( pod, "xsk", 2048, config->tiles.quic.xdp_rx_queue_size, config->tiles.quic.xdp_tx_queue_size );
xsk_aio ( pod, "xsk_aio", config->tiles.quic.xdp_tx_queue_size, config->tiles.quic.xdp_aio_depth );
char const * quic_xsk_gaddr = fd_pod_query_cstr( pod, "xsk", NULL );
void * shmem = fd_wksp_map ( quic_xsk_gaddr );
if( FD_UNLIKELY( !fd_xsk_bind( shmem, config->name, config->tiles.quic.interface, (uint)wksp1->kind_idx ) ) )
FD_LOG_ERR(( "failed to bind xsk for quic tile %lu", wksp1->kind_idx ));
fd_wksp_unmap( shmem );
#else
(void)xsk;
(void)xsk_aio;
int sock_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
if( FD_UNLIKELY( sock_fd<0 ) ) {
FD_LOG_ERR( (
"socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%d-%s)", errno, strerror( errno ) ) );
}
struct sockaddr_in listen_addr = {
.sin_family = AF_INET,
.sin_addr = { .s_addr = FD_IP4_ADDR(127, 0, 0, 1) },
.sin_port = (ushort)fd_ushort_bswap( 8004 ),
};
if( FD_UNLIKELY( 0!=bind( sock_fd, (struct sockaddr const *)fd_type_pun_const( &listen_addr ), sizeof(struct sockaddr_in) ) ) ) {
close( sock_fd );
FD_LOG_ERR( ( "bind(sock_fd) failed (%d-%s)", errno, strerror( errno ) ) );
}
udpsock ( pod, "udpsock", 2048, config->tiles.quic.xdp_rx_queue_size, config->tiles.quic.xdp_tx_queue_size );
#endif


uint1 ( pod, "ip_addr", config->tiles.quic.ip_addr );
ushort1( pod, "listen_port", config->tiles.quic.listen_port, 0 );
Expand Down
10 changes: 6 additions & 4 deletions src/app/frank/fd_frank.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "../../disco/fd_disco.h"
#include "../../ballet/fd_ballet.h" /* FIXME: CONSIDER HAVING THIS IN DISCO_BASE */
#include "../../tango/xdp/fd_xsk.h"
#include "../../tango/udpsock/fd_udpsock.h"

/* FD_FRANK_CNC_DIAG_* are FD_CNC_DIAG_* style diagnostics and thus the
same considerations apply. Further they are harmonized with the
Expand All @@ -29,10 +30,11 @@ typedef struct {
char * tile_name;
ulong tile_idx;
ulong idx;
uchar const * tile_pod;
uchar const * in_pod;
uchar const * out_pod;
fd_xsk_t * xsk;
uchar const * tile_pod;
uchar const * in_pod;
uchar const * out_pod;
fd_xsk_t * xsk;
fd_udpsock_t * udpsock;
} fd_frank_args_t;

typedef struct {
Expand Down
15 changes: 14 additions & 1 deletion src/app/frank/fd_frank_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ run( fd_frank_args_t * args ) {
fd_quic_t * quic = fd_quic_join( fd_wksp_pod_map( args->tile_pod, "quic" ) );
if( FD_UNLIKELY( !quic ) ) FD_LOG_ERR(( "fd_quic_join failed" ));

FD_LOG_INFO(( "loading quic" ));
fd_quic_qos_t * quic_qos = fd_quic_qos_join( fd_wksp_pod_map( args->tile_pod, "quic_qos" ) );
if( FD_UNLIKELY( !quic_qos ) ) FD_LOG_ERR(( "fd_quic_qos_join failed" ));

FD_LOG_INFO(( "loading stake" ));
fd_stake_t * stake = fd_stake_join( fd_wksp_pod_map( args->tile_pod, "stake" ) );
if( FD_UNLIKELY( !stake ) ) FD_LOG_ERR(( "fd_stake_join failed" ));

FD_LOG_INFO(( "loading xsk_aio" ));
fd_xsk_aio_t * xsk_aio = fd_xsk_aio_join( fd_wksp_pod_map( args->tile_pod, "xsk_aio" ), args->xsk );
if( FD_UNLIKELY( !xsk_aio ) ) FD_LOG_ERR(( "fd_xsk_aio_join failed" ));
Expand Down Expand Up @@ -122,7 +130,12 @@ run( fd_frank_args_t * args ) {
/* Start serving */

FD_LOG_INFO(( "%s(%lu) run", args->tile_name, args->tile_idx ));
int err = fd_quic_tile( cnc, quic, xsk_aio, mcache, dcache, lazy, rng, scratch );
#if FD_HAS_XDP
#define SOCK args->xsk
#else
#define SOCK args->udpsock
#endif
int err = fd_quic_tile( cnc, quic, quic_qos, stake, SOCK, mcache, dcache, lazy, rng, scratch );
if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "fd_quic_tile failed (%i)", err ));
}

Expand Down
7 changes: 5 additions & 2 deletions src/ballet/x509/fd_x509.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,17 @@ fd_x509_gen_solana_cert( EVP_PKEY * pkey ) {

/* Generate serial number */
long serial;
if( FD_UNLIKELY( 1!=RAND_bytes( (uchar *)&serial, sizeof(long) ) ) ) {
if ( FD_UNLIKELY( 1 != RAND_bytes( (uchar *)&serial, sizeof( long ) ) ) ) {
FD_LOG_WARNING(( "RAND_bytes() failed" ));
goto cleanup1;
}
ASN1_INTEGER_set( X509_get_serialNumber(x), serial );

/* Set public key (the only important part) */
X509_set_pubkey( x, pkey );
if ( FD_UNLIKELY( 1 != X509_set_pubkey( x, pkey ) ) ) {
FD_LOG_WARNING(( "X509_set_pubkey() failed" ));
goto cleanup1;
};

/* Set very long expiration date */
long not_before = 0L; /* Jan 1 00:00:00 1975 GMT */
Expand Down
25 changes: 17 additions & 8 deletions src/disco/quic/fd_quic.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@

#include "../fd_disco_base.h"
#include "../../tango/quic/fd_quic.h"
#include "../../tango/quic/fd_quic_qos.h"
#include "../../tango/quic/tls/fd_quic_tls.h"
#include "../../tango/xdp/fd_xdp.h"
#include "../../tango/udpsock/fd_udpsock.h"
#include "../../ballet/txn/fd_txn.h"

#if FD_HAS_HOSTED
Expand Down Expand Up @@ -118,14 +121,20 @@ FD_FN_CONST ulong
fd_quic_tile_scratch_footprint( ulong depth );

int
fd_quic_tile( fd_cnc_t * cnc, /* Local join to the tile's command-and-control */
fd_quic_t * quic, /* QUIC without active join */
fd_xsk_aio_t * xsk_aio, /* Local join to QUIC XSK aio */
fd_frag_meta_t * mcache, /* Local join to the tile's txn output mcache */
uchar * dcache, /* Local join to the tile's txn output dcache */
long lazy, /* Laziness, <=0 means use a reasonable default */
fd_rng_t * rng, /* Local join to the rng this tile should use */
void * scratch ); /* Tile scratch memory */
fd_quic_tile( fd_cnc_t * cnc, /* Local join to the tile's command-and-control */
fd_quic_t * quic, /* QUIC without active join */
fd_quic_qos_t * quic_qos, /* Local join to QoS */
fd_stake_t * stake,
#if FD_HAS_XDP
fd_xsk_aio_t * xsk_aio, /* Local join to QUIC XSK aio */
#else
fd_udpsock_t * udpsock, /* Local join to QUIC udp sock */
#endif
fd_frag_meta_t * mcache, /* Local join to the tile's txn output mcache */
uchar * dcache, /* Local join to the tile's txn output dcache */
long lazy, /* Laziness, <=0 means use a reasonable default */
fd_rng_t * rng, /* Local join to the rng this tile should use */
void * scratch ); /* Tile scratch memory */

FD_PROTOTYPES_END

Expand Down
Loading
Loading