From 72ae1900215ab6dc9792fae6c7ac216eb55fa67e Mon Sep 17 00:00:00 2001 From: Liam Heeger Date: Fri, 11 Aug 2023 13:10:49 -0500 Subject: [PATCH 01/23] build: use gcc-12.2.0 in activate script --- activate | 2 +- activate-gcc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/activate b/activate index d527581944..16f94a010f 100644 --- a/activate +++ b/activate @@ -6,7 +6,7 @@ if ! command -v module >/dev/null 2>&1; then fi module purge -module load gcc-9.3.0 +module load gcc-12.2.0 module load Python-3.9 module load openssl-1.1.1s+quic1 module list diff --git a/activate-gcc b/activate-gcc index 9376a0f6fa..5be26b910a 100755 --- a/activate-gcc +++ b/activate-gcc @@ -5,9 +5,9 @@ if [ $# -gt 0 ] && [ "$1" = "install" ]; then # Install Packages needed for dev hosts sudo yum install \ - jump_module_gcc-9.3.0 + jump_module_gcc-12.2.0 fi module purge -module load gcc-9.3.0 +module load gcc-12.2.0 module list From 7ee1e318babbb75cb0930ebc31b0c64a2c6c4589 Mon Sep 17 00:00:00 2001 From: cavemanloverboy <93507302+cavemanloverboy@users.noreply.github.com> Date: Fri, 11 Aug 2023 20:53:02 -0700 Subject: [PATCH 02/23] fix typo --- src/tango/quic/fd_quic.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index 2ff8e3950c..8325203680 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -522,7 +522,7 @@ fd_quic_enc_level_to_pn_space( uint enc_level ) { return el2pn_map[ enc_level ]; } -/* This code is directly from rpc9000 A.3 */ +/* This code is directly from rfc9000 A.3 */ static void fd_quic_reconstruct_pkt_num( ulong * pkt_number, ulong pkt_number_sz, From 06b491777d5cf58ea5acea0cd32b1a6fdb7976a6 Mon Sep 17 00:00:00 2001 From: Logan Lamb Date: Tue, 1 Aug 2023 19:54:59 +0000 Subject: [PATCH 03/23] CI test to confirm fd_frank_quic is working --- .github/workflows/make_test.yml | 18 ++ src/ballet/base64/Local.mk | 3 + src/ballet/base64/fd_base64.c | 84 ++++++++ src/ballet/base64/fd_base64.h | 15 ++ src/ballet/base64/test_base64.c | 49 +++++ src/tango/quic/tests/Local.mk | 1 + src/tango/quic/tests/fd_quic_test_helpers.c | 72 +++++++ src/tango/quic/tests/fd_quic_test_helpers.h | 6 + src/tango/quic/tests/quic_txn.bin | Bin 0 -> 368 bytes src/tango/quic/tests/test_quic_txn.c | 214 ++++++++++++++++++++ src/test/frank-single-transaction.sh | 56 +++++ 11 files changed, 518 insertions(+) create mode 100644 src/ballet/base64/Local.mk create mode 100644 src/ballet/base64/fd_base64.c create mode 100644 src/ballet/base64/fd_base64.h create mode 100644 src/ballet/base64/test_base64.c create mode 100644 src/tango/quic/tests/quic_txn.bin create mode 100644 src/tango/quic/tests/test_quic_txn.c create mode 100755 src/test/frank-single-transaction.sh diff --git a/.github/workflows/make_test.yml b/.github/workflows/make_test.yml index f447aecf79..749c218756 100644 --- a/.github/workflows/make_test.yml +++ b/.github/workflows/make_test.yml @@ -69,3 +69,21 @@ jobs: - name: Run unit tests run: make -k -j2 --output-sync=target run-unit-test + + frank-single-transaction: + runs-on: [self-hosted, Linux, X64] + strategy: + matrix: + compiler: [gcc] + defaults: + run: + shell: bash + steps: + - uses: actions/checkout@v3 + with: + submodules: recursive + + - uses: dtolnay/rust-toolchain@1.69.0 + + - name: Run + run: ./src/test/frank-single-transaction.sh diff --git a/src/ballet/base64/Local.mk b/src/ballet/base64/Local.mk new file mode 100644 index 0000000000..b01c8db269 --- /dev/null +++ b/src/ballet/base64/Local.mk @@ -0,0 +1,3 @@ +$(call add-hdrs,fd_base64.h) +$(call add-objs,fd_base64,fd_ballet) +$(call make-unit-test,test_base64,test_base64,fd_ballet fd_util) diff --git a/src/ballet/base64/fd_base64.c b/src/ballet/base64/fd_base64.c new file mode 100644 index 0000000000..ce68816ea6 --- /dev/null +++ b/src/ballet/base64/fd_base64.c @@ -0,0 +1,84 @@ +#include "fd_base64.h" + +/* Function to get the index of a character in the Base64 alphabet */ +static inline int +base64_decode_char( char c ) { + if( c >= 'A' && c <= 'Z' ) return c - 'A'; + if( c >= 'a' && c <= 'z' ) return c - 'a' + 26; + if( c >= '0' && c <= '9' ) return c - '0' + 52; + if( c == '+' ) return 62; + if( c == '/' ) return 63; + return -1; // Invalid character +} + +/* Function to decode a base64 encoded string into an unsigned char array + The function returns the length of the decoded array */ +int +fd_base64_decode( const char * encoded, + uchar * decoded ) { + int len = 0; + int bits_collected = 0; + uint accumulator = 0; + + while ( *encoded ) { + char c = *encoded++; + int value = base64_decode_char(c); + + if( value >= 0 ) { + accumulator = ( accumulator << 6 ) | ( uint ) value; + bits_collected += 6; + + if( bits_collected >= 8 ) { + bits_collected -= 8; + decoded[ len++ ] = ( uchar )( accumulator >> bits_collected ); + accumulator &= ( 1U << bits_collected ) - 1; + } + } else if( c == '=' ) { + /* Padding character, ignore and break the loop */ + break; + } else { + /* Fail with invalid characters (e.g., whitespace, padding) */ + return -1; + } + } + + return len; +} + +ulong +fd_base64_encode( const uchar * data, + int data_len, + char * encoded ) { + static const char base64_alphabet[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + uint encoded_len = 0; + uint accumulator = 0; + int bits_collected = 0; + + while( data_len-- ) { + accumulator = ( accumulator << 8 ) | *data++; + bits_collected += 8; + + while( bits_collected >= 6 ) { + encoded[ encoded_len++ ] = base64_alphabet[ ( accumulator >> ( bits_collected - 6) ) & 0x3F ]; + bits_collected -= 6; + } + } + + if( bits_collected > 0 ) { + // If there are remaining bits, pad the last Base64 character with zeroes + accumulator <<= 6 - bits_collected; + encoded[ encoded_len++ ] = base64_alphabet[accumulator & 0x3F ]; + } + + // Add padding characters if necessary + while( encoded_len % 4 != 0 ) { + encoded[ encoded_len++ ] = '='; + } + + // Null-terminate the encoded string + encoded[ encoded_len ] = '\0'; + + return encoded_len; +} diff --git a/src/ballet/base64/fd_base64.h b/src/ballet/base64/fd_base64.h new file mode 100644 index 0000000000..7b14809ee1 --- /dev/null +++ b/src/ballet/base64/fd_base64.h @@ -0,0 +1,15 @@ +#ifndef HEADER_fd_src_ballet_base64_fd_base64_h +#define HEADER_fd_src_ballet_base64_fd_base64_h + +/* fd_base64.h provides methods for converting between binary and base64. */ +#include "../fd_ballet_base.h" +int +fd_base64_decode( const char * encoded, + uchar * decoded ); + +ulong +fd_base64_encode( const uchar * data, + int data_len, + char * encoded ); + +#endif /* HEADER_fd_src_ballet_base64_fd_base64_h */ diff --git a/src/ballet/base64/test_base64.c b/src/ballet/base64/test_base64.c new file mode 100644 index 0000000000..abbbfedfe6 --- /dev/null +++ b/src/ballet/base64/test_base64.c @@ -0,0 +1,49 @@ +#include "fd_base64.h" +#include +#include + +void +decode_test( void ) { + const char * expected_string = "Hello World!"; + const char * encoded_string = "SGVsbG8gV29ybGQh"; + uchar decoded[ 100 ]; /* Assuming the decoded data won't exceed 100 bytes */ + + int decoded_length = fd_base64_decode( encoded_string, decoded ); + + FD_TEST( (uint)decoded_length == strlen( expected_string ) ); + FD_TEST( memcmp( decoded, expected_string, strlen(expected_string) ) == 0 ); +} + +void +decode_test_equals( void ) { + const char * encoded_string = "AZCML352XGjOwgIwMGsRf8oa2IoWzSvgWlJwcAEtLtwk3/h2VIe7n+YbPrAwpbIiK3KOM/G4XiNAKyhHbn2VBQ0BAAEGUn3G2+sjJ+xarkiI77ZYW6CEGHzEjzovKWoUG3/TSKAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACdPJdmqIA5PfVdI4dCMAMKH7z7U0fpkodPhLfE54yrfhMa9sJylZdraDb38lv6aISwi7GkOXsRZ8PQnKkkbFarB08LBYexRVX/v3EVfNeQk9z+WMTKqR0tc/WtXBjQP+v5pDHP1tYUMPTA8WZARvn8XTCjSs+9iPlPPYBWQrEspMZwcluFwA/afVpAczCo7+IJMw5/a0W/kR2EsJRNuF3IBBQUCAwAEAYgEJgW58JTT/VUQEAERERERARABAREBEBERERERABEBEQAQABARAAAREQEREBAQEBERERABERABEBAREBEQEQARAAERABARAQEQABABABAQAQABABEBAREQAAARAAAREREREAERARAAAREQEBAAEAAAEAAQEBEAEREAAAEQEBERAQAAAQEAABABABAQAAEBAAAQEBEAERAQEAAQAREBEQAREREAARAAAREAEAAAAREBEQEQEAAAEQEBEREREBABAQEQAQEQAAEQAREAERAAEAEQARABEBAAAQAQEAABEQEBEAAAEAABEBAAEAEAAQAQEBEQAQABABAREREBAQEAABEBARAAEAABEBAQEREAEBAREQEAAAEBEAEREREQARERAQEQAAEBEQEQARABEBAQAAAAEBEBAQEREQEQEQEAEQAQEBERARAQAQEBABAAAAEAERABAAAAAAEBEAEAAREREBEQEQARAREBEBEREAABABEAEBEBERAQABAAEQEAEAAQABEBEAAAABABAAABEAAAEQAQAQEBAQARAAEREAAAAQAQAREAAQAQAAEQEAEQEAAAARABEAEREBEBAQAQAQEREQEAAAEBAAAQEQABEAABEAEBEBAQABEREAAAABEBAAAQEQEQAAARERAQABERERABEAARABABEBAQAQEAEAARARERABABAREQEBAAAA=="; + uchar decoded[ 1500 ]; /* Assuming the decoded data won't exceed 1500 bytes */ + + int decoded_length = fd_base64_decode( encoded_string, decoded ); + + FD_TEST( decoded_length != -1 ); +} + +void encode_test( void ) { + uchar binary_data[] = {72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33}; /* "Hello World!" */ + const char * expected_string = "SGVsbG8gV29ybGQh"; + + char encoded[100]; /* Assuming the encoded data won't exceed 100 characters */ + + ulong encoded_length = fd_base64_encode(binary_data, sizeof(binary_data), encoded); + printf("%lu %ld\n", sizeof(binary_data), strlen((char*)binary_data)); + + FD_TEST( (uint)encoded_length == strlen( expected_string ) ); + FD_TEST( memcmp( encoded, expected_string, strlen(encoded) ) == 0 ); +} + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + decode_test(); + decode_test_equals(); + encode_test(); + fd_halt(); + return 0; +} diff --git a/src/tango/quic/tests/Local.mk b/src/tango/quic/tests/Local.mk index 5cf968ccfa..263f9cc7fa 100644 --- a/src/tango/quic/tests/Local.mk +++ b/src/tango/quic/tests/Local.mk @@ -7,6 +7,7 @@ $(call make-unit-test,test_quic_streams,test_quic_streams,fd_aio fd_ballet fd_ta $(call make-unit-test,test_quic_conn,test_quic_conn,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_server,test_quic_server,fd_aio fd_ballet fd_quic fd_tango fd_util) $(call make-unit-test,test_quic_client_flood,test_quic_client_flood,fd_aio fd_quic fd_ballet fd_tango fd_util) +$(call make-unit-test,test_quic_txn,test_quic_txn,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_bw,test_quic_bw,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_handshake,test_handshake,fd_aio fd_ballet fd_quic fd_util) $(call make-unit-test,test_quic_crypto,test_crypto,fd_quic fd_ballet fd_util) diff --git a/src/tango/quic/tests/fd_quic_test_helpers.c b/src/tango/quic/tests/fd_quic_test_helpers.c index c6be7b4c2c..a213b3b599 100644 --- a/src/tango/quic/tests/fd_quic_test_helpers.c +++ b/src/tango/quic/tests/fd_quic_test_helpers.c @@ -226,6 +226,77 @@ fd_quic_test_keylog( fd_quic_virtual_pair_t const * pair, fd_pcapng_fwrite_tls_key_log( (uchar const *)line, (uint)strlen( line ), pair->quic_a2b.pcapng ); } +fd_quic_udpsock_t * +fd_quic_client_create_udpsock(fd_quic_udpsock_t * udpsock, + fd_wksp_t * wksp, + fd_aio_t const * rx_aio, + uint listen_ip) { + ulong mtu = 2048UL; + ulong rx_depth = 1024UL; + ulong tx_depth = 1024UL; + + int sock_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP ); + if( FD_UNLIKELY( sock_fd<0 ) ) { + FD_LOG_WARNING(( "socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%d-%s)", + errno, strerror( errno ) )); + return NULL; + } + + struct sockaddr_in listen_addr = { + .sin_family = AF_INET, + .sin_addr = { .s_addr = listen_ip }, + .sin_port = 0, + }; + if( FD_UNLIKELY( 0!=bind( sock_fd, (struct sockaddr const *)fd_type_pun_const( &listen_addr ), sizeof(struct sockaddr_in) ) ) ) { + FD_LOG_WARNING(( "bind(sock_fd) failed (%d-%s)", + errno, strerror( errno ) )); + close( sock_fd ); + return NULL; + } + + void * sock_mem = fd_wksp_alloc_laddr( wksp, fd_udpsock_align(), + fd_udpsock_footprint( mtu, rx_depth, tx_depth ), + 1UL ); + if( FD_UNLIKELY( !sock_mem ) ) { + FD_LOG_WARNING(( "fd_wksp_alloc_laddr() failed" )); + close( sock_fd ); + return NULL; + } + + fd_udpsock_t * sock = fd_udpsock_join( fd_udpsock_new( sock_mem, mtu, rx_depth, tx_depth ), sock_fd ); + if( FD_UNLIKELY( !sock ) ) { + FD_LOG_WARNING(( "fd_udpsock_join() failed" )); + close( sock_fd ); + fd_wksp_free_laddr( sock_mem ); + return NULL; + } + + udpsock->type = FD_QUIC_UDPSOCK_TYPE_UDPSOCK; + udpsock->wksp = wksp; + udpsock->udpsock.sock = sock; + udpsock->udpsock.sock_fd = sock_fd; + udpsock->aio = fd_udpsock_get_tx( sock ); + udpsock->listen_ip = fd_udpsock_get_ip4_address( sock ); + udpsock->listen_port = (ushort)fd_udpsock_get_listen_port( sock ); + fd_udpsock_set_rx( sock, rx_aio ); + + FD_LOG_NOTICE(( "UDP socket listening on " FD_IP4_ADDR_FMT ":%u", + FD_IP4_ADDR_FMT_ARGS( udpsock->listen_ip ), udpsock->listen_port )); + return udpsock; +} + +// TODO: LML complete this thought? +fd_quic_udpsock_t * +create_udp_socket(fd_quic_udpsock_t * udpsock) { + if( FD_UNLIKELY( !fd_cstr_to_ip4_addr("0.0.0.0", &udpsock->listen_ip ) ) ) { + goto error_1; + } + udpsock->listen_port = 0; // TODO: check this where is it set in flood? + error_1: + FD_LOG_NOTICE(( "invalid --listen-ip" )); + return NULL; +} + fd_quic_udpsock_t * fd_quic_udpsock_create( void * _sock, int * pargc, @@ -255,6 +326,7 @@ fd_quic_udpsock_create( void * _sock, quic_sock->listen_port = listen_port; int is_xsk = (!!xdp_app_name); + FD_LOG_NOTICE(( "is_xsk %d", is_xsk )); if( is_xsk ) { FD_TEST( _src_mac ); if( FD_UNLIKELY( !fd_cstr_to_mac_addr( _src_mac, quic_sock->self_mac ) ) ) FD_LOG_ERR(( "invalid --src-mac" )); diff --git a/src/tango/quic/tests/fd_quic_test_helpers.h b/src/tango/quic/tests/fd_quic_test_helpers.h index 8915cd6615..6c642f1ff2 100644 --- a/src/tango/quic/tests/fd_quic_test_helpers.h +++ b/src/tango/quic/tests/fd_quic_test_helpers.h @@ -107,6 +107,12 @@ typedef struct fd_quic_udpsock fd_quic_udpsock_t; FD_PROTOTYPES_BEGIN +fd_quic_udpsock_t * +fd_quic_client_create_udpsock(fd_quic_udpsock_t * udpsock, + fd_wksp_t * wksp, + fd_aio_t const * rx_aio, + uint listen_ip); + fd_quic_udpsock_t * fd_quic_udpsock_create( void * _sock, int * argc, diff --git a/src/tango/quic/tests/quic_txn.bin b/src/tango/quic/tests/quic_txn.bin new file mode 100644 index 0000000000000000000000000000000000000000..f05412cf87c1ecac3394126bd3488d9250965cfc GIT binary patch literal 368 zcmZST=smI~N%HgppQVe6_#H}WPL-`W#VFqT@vP;^MZXWFD*ijy%yfj;>&mQ9+y1_e zjos~u2N(OA|4u#JIy=x}eJMlW9RXXL28RRo3%}=;MIKma;(z^ST(xNb3?;|l&I=4X z*BH)`S=(bfXYWZ*nak&IS1#H;>E3o;%ak=N+FQ(LZ)Ij;U}6)$I@983iJ#q`Qj^u& z>&4e_9=+!3|6KgxJg0;v2Ig6}xGvx6+%&y(QA5|6_iIjScrsndOU!Y4@wGAQ*k_}+ k%B>6#zzC&rQB1fbpyJE~tx_max_data )); +} + +void +cb_conn_handshake_complete( fd_quic_conn_t * conn, + void * quic_ctx ) { + (void)conn; + (void)quic_ctx; + FD_LOG_NOTICE(( "cb_conn_handshake_complete %lu", conn->tx_max_data )); + g_handshake_complete = 1; +} + +void +cb_conn_final( fd_quic_conn_t * conn, + void * quic_ctx ) { + (void)conn; + (void)quic_ctx; + FD_LOG_NOTICE(( "cb_conn_final" )); + g_conn_final = 1; +} + +void +cb_stream_new( fd_quic_stream_t * stream, + void * quic_ctx, + int stream_type ) { + (void)stream; + (void)quic_ctx; + (void)stream_type; + FD_LOG_NOTICE(( "cb_stream_new" )); +} + +void +cb_stream_notify( fd_quic_stream_t * stream, + void * stream_ctx, + int notify_type ) { + (void)stream; + (void)stream_ctx; + g_stream_notify = 1; + FD_LOG_NOTICE(( "cb_stream_notify %d", notify_type )); +} + +void +cb_stream_receive( fd_quic_stream_t * stream, + void * stream_ctx, + uchar const * data, + ulong data_sz, + ulong offset, + int fin ) { + (void)stream; + (void)stream_ctx; + (void)data; + (void)data_sz; + (void)offset; + (void)fin; +} + +ulong +cb_now( void * context ) { + (void)context; + return (ulong)fd_log_wallclock(); +} + +int +run_quic_client( fd_quic_t * quic, + fd_quic_udpsock_t * udpsock, + fd_aio_pkt_info_t * pkt ) { + uint dst_ip; + if( FD_UNLIKELY( !fd_cstr_to_ip4_addr( "198.18.0.1", &dst_ip ) ) ) FD_LOG_ERR(( "invalid --dst-ip" )); + ushort dst_port = 9001; + + + #define MSG_SZ_MIN (1UL) + #define MSG_SZ_MAX (1232UL-64UL-32UL) + #define MSG_SIZE_RANGE (MSG_SZ_MAX - MSG_SZ_MIN + 1UL) + + quic->cb.conn_new = cb_conn_new; + quic->cb.conn_hs_complete = cb_conn_handshake_complete; + quic->cb.conn_final = cb_conn_final; + quic->cb.stream_new = cb_stream_new; + quic->cb.stream_notify = cb_stream_notify; + quic->cb.stream_receive = cb_stream_receive; + quic->cb.now = cb_now; + quic->cb.now_ctx = NULL; + + fd_quic_set_aio_net_tx( quic, udpsock->aio ); + FD_TEST( fd_quic_init( quic ) ); + + fd_quic_conn_t * conn = fd_quic_connect( quic, dst_ip, dst_port, NULL ); + while ( FD_UNLIKELY( !( g_handshake_complete || g_conn_final ) ) ) { + fd_quic_service( quic ); + fd_quic_udpsock_service( udpsock ); + } + FD_TEST( conn ); + FD_TEST( conn->state == FD_QUIC_CONN_STATE_ACTIVE ); + + fd_quic_stream_t * stream = fd_quic_conn_new_stream( conn, FD_QUIC_TYPE_UNIDIR ); + FD_TEST( stream ); + int rc = 0; + if( stream ) { + rc = fd_quic_stream_send( stream, pkt, 1, 1 ); + FD_LOG_NOTICE(( "rc %d", rc )); + } + while ( FD_UNLIKELY( !( g_stream_notify || g_conn_final ) ) ) { + fd_quic_service( quic ); + fd_quic_udpsock_service( udpsock ); + } + + if( conn ) { + fd_quic_conn_close( conn, 0 ); + } + fd_quic_fini( quic ); + + return rc; +} + +int +main( int argc, + char ** argv ) { + fd_boot( &argc, &argv ); + const char * payload = fd_env_strip_cmdline_cstr( &argc, &argv, "--payload-base64-encoded", NULL, NULL ); + + fd_aio_pkt_info_t pkt; + uchar buf[1300]; + if( !payload ) { + pkt.buf = ( void * )transaction; + pkt.buf_sz = ( ushort )transaction_sz; + } else { + int buf_sz = fd_base64_decode( payload, buf ); + if ( buf_sz == -1 ) { + FD_LOG_NOTICE(( "bad input %s", payload )); + return -1; + } + FD_LOG_NOTICE(( "transaction size %d!", buf_sz )); + pkt.buf = (void *)buf; + pkt.buf_sz = ( ushort ) buf_sz; + } + + fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz("gigantic"), + 1UL, + fd_shmem_cpu_idx( 0 ), + "wksp", + 0UL ); + FD_TEST( wksp ); + + fd_quic_limits_t quic_limits = { + .conn_cnt = 1024UL, + .handshake_cnt = 256UL, + .conn_id_cnt = 16UL, + .conn_id_sparsity = 4.0, + .stream_cnt = { 0UL, // FD_QUIC_STREAM_TYPE_BIDI_CLIENT + 0UL, // FD_QUIC_STREAM_TYPE_BIDI_SERVER + 2UL, // FD_QUIC_STREAM_TYPE_UNI_CLIENT + 0UL }, // FD_QUIC_STREAM_TYPE_UNI_SERVER + .stream_sparsity = 4.0, + .inflight_pkt_cnt = 64UL, + .tx_buf_sz = 1UL<<15UL + }; + ulong quic_footprint = fd_quic_footprint( &quic_limits ); + FD_TEST( quic_footprint ); + + void * mem = fd_wksp_alloc_laddr( wksp, fd_quic_align(), quic_footprint, 1UL ); + fd_quic_t * quic = fd_quic_new( mem, &quic_limits ); + FD_TEST( quic ); + + fd_quic_udpsock_t _udpsock; + uint listen_ip; + if( FD_UNLIKELY( !fd_cstr_to_ip4_addr("0.0.0.0", &listen_ip ) ) ) { + FD_LOG_NOTICE(( "invalid listen-ip" )); + return 1; + } + fd_quic_udpsock_t * udpsock = fd_quic_client_create_udpsock( &_udpsock, wksp, fd_quic_get_aio_net_rx( quic ), listen_ip ); + FD_TEST( udpsock == &_udpsock ); + + fd_quic_config_t * client_cfg = &quic->config; + client_cfg->role = FD_QUIC_ROLE_CLIENT; + memcpy( client_cfg->alpns, "\xasolana-tpu", 11UL ); + client_cfg->alpns_sz = 11U; + FD_TEST( fd_quic_config_from_env( &argc, &argv, client_cfg ) ); + memcpy(client_cfg->link.dst_mac_addr, "\x52\xF1\x7E\xDA\x2C\xE0", 6UL); + client_cfg->net.ip_addr = udpsock->listen_ip; + client_cfg->net.ephem_udp_port.lo = (ushort)udpsock->listen_port; + client_cfg->net.ephem_udp_port.hi = (ushort)(udpsock->listen_port + 1); + client_cfg->initial_rx_max_stream_data = 1<<15; + + int num_sent = run_quic_client( quic, udpsock, &pkt ); + + fd_wksp_free_laddr( fd_quic_delete( fd_quic_leave( quic ) ) ); + fd_quic_udpsock_destroy( udpsock ); + fd_wksp_delete_anonymous( wksp ); + + fd_halt(); + + switch( num_sent ) { + case 1: return 0; /* If no packets were successfully transmitted return one. */ + case 0: return 1; /* If the single packet was transmitted successfully return zero. */ + default: return -num_sent; + } +} diff --git a/src/test/frank-single-transaction.sh b/src/test/frank-single-transaction.sh new file mode 100755 index 0000000000..ea2a8c5845 --- /dev/null +++ b/src/test/frank-single-transaction.sh @@ -0,0 +1,56 @@ +#!/bin/bash + +# bash strict mode +set -euo pipefail +IFS=$'\n\t' +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd "${SCRIPT_DIR}/../../" + +# create test configuration for fddev +TMPDIR=$(mktemp -d) +cat > ${TMPDIR}/config.toml < Date: Mon, 14 Aug 2023 20:05:58 +0000 Subject: [PATCH 04/23] add mcache tx seq to monitor --- src/app/fdctl/monitor/helper.c | 6 +++--- src/app/fdctl/monitor/monitor.c | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/app/fdctl/monitor/helper.c b/src/app/fdctl/monitor/helper.c index ce0014505a..1aa833b0d0 100644 --- a/src/app/fdctl/monitor/helper.c +++ b/src/app/fdctl/monitor/helper.c @@ -112,9 +112,9 @@ printf_seq( char ** buf, : (delta>0L) ? TEXT_GREEN /* new sequence numbers published */ : (seq_now) ? TEXT_RED /* sequence number went backward */ : TEXT_BLUE; /* sequence number reset */ - if( delta> 99999L ) PRINT( "%16lx(%s>+99999" TEXT_NORMAL ")", seq_now, color ); - else if( delta<-99999L ) PRINT( "%16lx(%s<-99999" TEXT_NORMAL ")", seq_now, color ); - else PRINT( "%16lx(%s %+6li" TEXT_NORMAL ")", seq_now, color, delta ); + if( delta> 99999L ) PRINT( "%10lu(%s>+99999" TEXT_NORMAL ")", seq_now, color ); + else if( delta<-99999L ) PRINT( "%10lu(%s<-99999" TEXT_NORMAL ")", seq_now, color ); + else PRINT( "%10lu(%s %+6li" TEXT_NORMAL ")", seq_now, color, delta ); } void diff --git a/src/app/fdctl/monitor/monitor.c b/src/app/fdctl/monitor/monitor.c index c0e7f5ec1d..71e5f4b2b1 100644 --- a/src/app/fdctl/monitor/monitor.c +++ b/src/app/fdctl/monitor/monitor.c @@ -334,8 +334,8 @@ run_monitor( config_t * const config, PRINT( TEXT_NEWLINE ); } PRINT( TEXT_NEWLINE ); - PRINT( " link | tot TPS | tot bps | uniq TPS | uniq bps | ha tr%% | uniq bw%% | filt tr%% | filt bw%% | ovrnp cnt | ovrnr cnt | slow cnt" TEXT_NEWLINE ); - PRINT( "----------------+----------+----------+----------+----------+----------+----------+----------+----------+---------------------+---------------------+---------------------" TEXT_NEWLINE ); + PRINT( " link | tot TPS | tot bps | uniq TPS | uniq bps | ha tr%% | uniq bw%% | filt tr%% | filt bw%% | ovrnp cnt | ovrnr cnt | slow cnt | tx seq" TEXT_NEWLINE ); + PRINT( "----------------+----------+----------+----------+----------+----------+----------+----------+----------+---------------------+---------------------+---------------------+-------------------" TEXT_NEWLINE ); long dt = now-then; for( ulong link_idx=0UL; link_idxfseq_diag_ovrnp_cnt, prv->fseq_diag_ovrnp_cnt ); PRINT( " | " ); printf_err_cnt( &buf, &buf_sz, cur->fseq_diag_ovrnr_cnt, prv->fseq_diag_ovrnr_cnt ); PRINT( " | " ); printf_err_cnt( &buf, &buf_sz, cur->fseq_diag_slow_cnt, prv->fseq_diag_slow_cnt ); + PRINT( " | " ); printf_seq( &buf, &buf_sz, cur->mcache_seq, prv->mcache_seq ); PRINT( TEXT_NEWLINE ); } From 604915e465fef4824fccf4aecd8bb9ee306be55c Mon Sep 17 00:00:00 2001 From: Marcus <117692400+marcus-jump@users.noreply.github.com> Date: Mon, 14 Aug 2023 19:29:29 +0000 Subject: [PATCH 05/23] fix fuzz artifact publication --- .../fuzz_artifacts.yml | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) rename .github/{workflows-disabled => workflows}/fuzz_artifacts.yml (78%) diff --git a/.github/workflows-disabled/fuzz_artifacts.yml b/.github/workflows/fuzz_artifacts.yml similarity index 78% rename from .github/workflows-disabled/fuzz_artifacts.yml rename to .github/workflows/fuzz_artifacts.yml index 4818070a83..23b2c090a6 100644 --- a/.github/workflows-disabled/fuzz_artifacts.yml +++ b/.github/workflows/fuzz_artifacts.yml @@ -2,8 +2,6 @@ name: Make Fuzz Artifacts on: pull_request: push: - branches: - - main workflow_dispatch: merge_group: jobs: @@ -21,16 +19,16 @@ jobs: - uses: dtolnay/rust-toolchain@1.69.0 - - name: Set extras - run: | - echo "EXTRAS=static" >> "$GITHUB_ENV" + - uses: firedancer-io/alpine-builder@main + name: Build with ASAN on Alpine + env: + MACHINE: linux_clang_x86_64_fuzz_asan + with: + command: make --output-sync=target -j fuzz-test - - name: Prepare asan + - name: List Artifacts run: | - echo "MACHINE=linux_clang_x86_64_fuzz_asan" >> "$GITHUB_ENV" - - - uses: firedancer-io/alpine-builder@main - name: Build with asan on Alpine + ls build/linux/clang/x86_64_fuzz_asan/fuzz-test - uses: firedancer-io/clusterfuzz-action@main if: ${{ github.ref == 'refs/heads/main' }} From 80d479ba225649aa153de196395b06b707402010 Mon Sep 17 00:00:00 2001 From: Marcus <117692400+marcus-jump@users.noreply.github.com> Date: Tue, 15 Aug 2023 18:34:18 +0000 Subject: [PATCH 06/23] build fuzz artifacts on 20.04 --- .github/workflows/fuzz_artifacts.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/fuzz_artifacts.yml b/.github/workflows/fuzz_artifacts.yml index 23b2c090a6..68a2047733 100644 --- a/.github/workflows/fuzz_artifacts.yml +++ b/.github/workflows/fuzz_artifacts.yml @@ -19,12 +19,12 @@ jobs: - uses: dtolnay/rust-toolchain@1.69.0 - - uses: firedancer-io/alpine-builder@main - name: Build with ASAN on Alpine + - uses: firedancer-io/fuzzbot-builder@main + name: Build with ASAN on Ubuntu 20.04 env: MACHINE: linux_clang_x86_64_fuzz_asan with: - command: make --output-sync=target -j fuzz-test + command: ./deps.sh install && make --output-sync=target -j fuzz-test - name: List Artifacts run: | From 48cd86b83bcdb46863150687be078d4565ae4f4c Mon Sep 17 00:00:00 2001 From: Michael McGee Date: Tue, 15 Aug 2023 21:49:24 +0000 Subject: [PATCH 07/23] xdp: add breadcrumbs for logging --- src/app/fdctl/Local.mk | 3 ++- src/tango/xdp/fd_xdp_license.h | 1 + src/tango/xdp/fd_xdp_redirect_prog.c | 29 +++++++++++++++++++++++++++- src/tango/xdp/fd_xdp_redirect_user.c | 7 +++---- 4 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 src/tango/xdp/fd_xdp_license.h diff --git a/src/app/fdctl/Local.mk b/src/app/fdctl/Local.mk index a1e6f3feb5..b65c330d80 100644 --- a/src/app/fdctl/Local.mk +++ b/src/app/fdctl/Local.mk @@ -5,7 +5,8 @@ ifdef FD_HAS_DOUBLE $(call make-lib,fd_fdctl) $(call add-objs,main config security utility run keygen monitor/monitor monitor/helper configure/configure configure/large_pages configure/sysctl configure/shmem configure/xdp configure/xdp_leftover configure/ethtool configure/workspace_leftover configure/workspace,fd_fdctl) $(call make-bin,fdctl,main1,fd_fdctl fd_frank fd_disco fd_ballet fd_tango fd_util fd_quic solana_validator_fd) -$(OBJDIR)/bin/fdctl: src/app/fdctl/config/default.toml +$(OBJDIR)/obj/app/fdctl/configure/xdp.o: src/tango/xdp/fd_xdp_redirect_prog.o +$(OBJDIR)/obj/app/fdctl/config.o: src/app/fdctl/config/default.toml endif endif endif diff --git a/src/tango/xdp/fd_xdp_license.h b/src/tango/xdp/fd_xdp_license.h new file mode 100644 index 0000000000..62e51d36a3 --- /dev/null +++ b/src/tango/xdp/fd_xdp_license.h @@ -0,0 +1 @@ +#define FD_LICENSE "Apache-2.0" diff --git a/src/tango/xdp/fd_xdp_redirect_prog.c b/src/tango/xdp/fd_xdp_redirect_prog.c index fbe5cb8b76..28912e8606 100644 --- a/src/tango/xdp/fd_xdp_redirect_prog.c +++ b/src/tango/xdp/fd_xdp_redirect_prog.c @@ -19,6 +19,7 @@ #include "../ebpf/fd_ebpf_base.h" #include "fd_xdp_redirect_prog.h" +#include "fd_xdp_license.h" #include @@ -30,9 +31,10 @@ /* Metadata ***********************************************************/ -char __license[] __attribute__(( section("license") )) = "Apache-2.0"; +char __license[] __attribute__(( section("license") )) = FD_LICENSE; /* eBPF syscalls ******************************************************/ +/* https://github.com/torvalds/linux/blob/91aa6c412d7f85e48aead7b00a7d9e91f5cf5863/include/uapi/linux/bpf.h#L5577 */ static void * (* bpf_map_lookup_elem)( void * map, @@ -45,6 +47,31 @@ static long ulong flags ) = (void *)51U; +#ifdef FD_XDP_LOGGING + +/* To do logging, you should enable this import and then call it like + + char fmt[] = "hello %d"; + bpf_trace_printk( fmt, sizeof(fmt), 5 ); + + Note that BPF logging only supports certain format specifiers and + can take at most three arguments. It is extremely fickle and the + program may just start randomly failing to load with a verifier + error. Move things around and try again. + + Firedancer is not using a GPL compatible license, and BPF printk + is a GPL part of BPF. This means that we cannot use it, or ship + this code in the binary. The kernel checks this with the "license" + section exported above, which you should not change. +*/ +static long +(*bpf_trace_printk)( const char * fmt, + uint fmt_size, + ... ) + = (void *) 6; + +#endif + /* eBPF maps **********************************************************/ /* eBPF maps allows sharing information between the Linux userspace and diff --git a/src/tango/xdp/fd_xdp_redirect_user.c b/src/tango/xdp/fd_xdp_redirect_user.c index d0753ad720..824c300989 100644 --- a/src/tango/xdp/fd_xdp_redirect_user.c +++ b/src/tango/xdp/fd_xdp_redirect_user.c @@ -5,6 +5,7 @@ #define _DEFAULT_SOURCE #include "fd_xdp_redirect_user.h" #include "fd_xdp_redirect_prog.h" +#include "fd_xdp_license.h" #include "../../ballet/ebpf/fd_ebpf.h" #include "../../util/fd_util.h" @@ -328,7 +329,7 @@ fd_xdp_hook_iface( char const * app_name, .prog_type = BPF_PROG_TYPE_XDP, .insn_cnt = (uint) ( res->bpf_sz / 8UL ), .insns = (ulong)( res->bpf ), - .license = (ulong)"Apache-2.0", + .license = (ulong)FD_LICENSE, /* Verifier logs */ .log_level = 6, .log_size = EBPF_KERN_LOG_BUFSZ, @@ -338,9 +339,7 @@ fd_xdp_hook_iface( char const * app_name, if( FD_UNLIKELY( prog_fd<0 ) ) { FD_LOG_WARNING(( "bpf(BPF_PROG_LOAD, insns=%p, insn_cnt=%lu) failed (%d-%s)", (void *)res->bpf, res->bpf_sz / 8UL, errno, strerror( errno ) )); - if( errno==EACCES ) { - FD_LOG_NOTICE(( "eBPF verifier log:\n%s", ebpf_kern_log )); - } + FD_LOG_NOTICE(( "eBPF verifier log:\n%s", ebpf_kern_log )); return -1; } From af4e30058d5229c64ef3e7a04ae54a5af0ad9f39 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Wed, 16 Aug 2023 17:47:32 +0000 Subject: [PATCH 08/23] Fix fuzz_pcap --- src/util/net/fuzz_pcap.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/util/net/fuzz_pcap.c b/src/util/net/fuzz_pcap.c index 5dd434e4fd..270c574822 100644 --- a/src/util/net/fuzz_pcap.c +++ b/src/util/net/fuzz_pcap.c @@ -26,7 +26,9 @@ int LLVMFuzzerTestOneInput( uchar const * data, ulong size ) { - FILE * file = fmemopen( (void *)data, size, "r+b" ); + if( FD_UNLIKELY( size==0UL ) ) return 0; + + FILE * file = fmemopen( (void *)data, size, "rb" ); FD_TEST( file ); /* Open "pcap". */ From 42b5c26afaf081fba427d5643c21dd28a45bd7d5 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 23 May 2023 14:24:43 -0500 Subject: [PATCH 09/23] retransmission fixed. New fibres implementation. test_quic_drops.c test --- src/tango/aio/fd_aio_pcapng.c | 7 +- src/tango/quic/fd_quic.c | 255 +++++++-- src/tango/quic/fd_quic_conn.h | 8 +- src/tango/quic/fd_quic_proto.h | 2 + src/tango/quic/fd_quic_proto_structs.h | 16 + src/tango/quic/templ/fd_quic_encoders.h | 5 +- .../quic/templ/fd_quic_encoders_footprint.h | 2 +- src/tango/quic/templ/fd_quic_parse_frame.h | 2 +- src/tango/quic/templ/fd_quic_union.h | 2 + src/tango/quic/tests/Local.mk | 1 + src/tango/quic/tests/test_quic_drops.c | 501 ++++++++++++++++++ src/util/fibre/Local.mk | 5 + src/util/fibre/fd_fibre.c | 295 +++++++++++ src/util/fibre/fd_fibre.h | 152 ++++++ src/util/fibre/test_fibre.c | 159 ++++++ 15 files changed, 1354 insertions(+), 58 deletions(-) create mode 100644 src/tango/quic/fd_quic_proto_structs.h create mode 100644 src/tango/quic/tests/test_quic_drops.c create mode 100644 src/util/fibre/Local.mk create mode 100644 src/util/fibre/fd_fibre.c create mode 100644 src/util/fibre/fd_fibre.h create mode 100644 src/util/fibre/test_fibre.c diff --git a/src/tango/aio/fd_aio_pcapng.c b/src/tango/aio/fd_aio_pcapng.c index af29b24574..ff8b7a7f42 100644 --- a/src/tango/aio/fd_aio_pcapng.c +++ b/src/tango/aio/fd_aio_pcapng.c @@ -23,7 +23,12 @@ fd_aio_pcapng_send( void * ctx, } } - return fd_aio_send( mitm->dst, batch, batch_cnt, opt_batch_idx, flush ); + /* pcaping doesn't require any additional destination */ + if( mitm->dst ) { + return fd_aio_send( mitm->dst, batch, batch_cnt, opt_batch_idx, flush ); + } + + return FD_AIO_SUCCESS; } FD_FN_CONST fd_aio_t const * diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index 8325203680..935b70d378 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -709,12 +709,12 @@ struct fd_quic_frame_context { /* handle single v1 frames */ /* returns bytes consumed */ ulong -fd_quic_handle_v1_frame( fd_quic_t * quic, - fd_quic_conn_t * conn, - fd_quic_pkt_t * pkt, - uchar const * buf, - ulong buf_sz, - void * scratch ) { +fd_quic_handle_v1_frame( fd_quic_t * quic, + fd_quic_conn_t * conn, + fd_quic_pkt_t * pkt, + uchar const * buf, + ulong buf_sz, + fd_quic_frame_u * frame_union ) { fd_quic_frame_context_t frame_context[1] = {{ quic, conn, pkt }}; uchar const * p = buf; @@ -1646,7 +1646,7 @@ fd_quic_handle_v1_initial( fd_quic_t * quic, uchar const * frame_ptr = crypt_scratch + payload_off; ulong frame_sz = body_sz - pkt_number_sz - FD_QUIC_CRYPTO_TAG_SZ; /* total size of all frames in packet */ while( frame_sz > 0 ) { - rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, conn->frame_scratch ); + rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, &conn->frame_union ); if( rc == FD_QUIC_PARSE_FAIL ) { return FD_QUIC_PARSE_FAIL; } @@ -1840,7 +1840,7 @@ fd_quic_handle_v1_handshake( uchar const * frame_ptr = crypt_scratch + payload_off; ulong frame_sz = body_sz - pkt_number_sz - FD_QUIC_CRYPTO_TAG_SZ; /* total size of all frames in packet */ while( frame_sz > 0 ) { - rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, conn->frame_scratch ); + rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, &conn->frame_union ); if( rc == FD_QUIC_PARSE_FAIL ) { return FD_QUIC_PARSE_FAIL; } @@ -2176,7 +2176,7 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_ uchar const * frame_ptr = crypt_scratch + payload_off; ulong frame_sz = cur_sz - pn_offset - pkt_number_sz - FD_QUIC_CRYPTO_TAG_SZ; /* total size of all frames in packet */ while( frame_sz > 0 ) { - rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, conn->frame_scratch ); + rc = fd_quic_handle_v1_frame( quic, conn, pkt, frame_ptr, frame_sz, &conn->frame_union ); if( rc == FD_QUIC_PARSE_FAIL ) { return FD_QUIC_PARSE_FAIL; } @@ -2278,12 +2278,15 @@ fd_quic_reschedule_conn( fd_quic_conn_t * conn, quickly than remove, insert */ } + conn->next_service_time = timeout; fd_quic_schedule_conn( conn ); return; } - conn->next_service_time = timeout; + if( timeout < conn->next_service_time ) { + conn->next_service_time = timeout; + } } @@ -2453,6 +2456,20 @@ fd_quic_ack_pkt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_t * pkt ) ack->next = NULL; (*acks_tx_end)->next = ack; *acks_tx_end = ack; + + /* if mandatory, check for prior acks with lower priority */ + if( ack_mandatory ) { + fd_quic_ack_t * cur_ack = *acks_tx; + while( cur_ack ) { + cur_ack->flags |= FD_QUIC_ACK_FLAGS_MANDATORY; + cur_ack->tx_time = fd_ulong_min( cur_ack->tx_time, ack_time ); + + /* pretend we haven't sent */ + cur_ack->tx_pkt_number = FD_QUIC_PKT_NUM_UNUSED; + + cur_ack = cur_ack->next; + } + } } fd_quic_reschedule_conn( conn, ack_time ); @@ -3089,7 +3106,9 @@ fd_quic_service( fd_quic_t * quic ) { conn = event->conn; ulong service_time = event->timeout; - if( now < service_time ) break; + if( now < service_time ) { + break; + } /* set an initial next_service_time */ conn->next_service_time = now + quic->config.service_interval; @@ -3312,7 +3331,8 @@ fd_quic_pkt_hdr_populate( fd_quic_pkt_hdr_t * pkt_hdr, uint enc_level, ulong pkt_number, fd_quic_conn_t * conn, - uchar key_phase ) { + uchar key_phase, + uint initial ) { pkt_hdr->enc_level = enc_level; /* current peer endpoint */ @@ -3501,6 +3521,12 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { fd_quic_pkt_meta_t * pkt_meta = NULL; + if( conn->tx_ptr != conn->tx_buf ) { + fd_quic_tx_buffered( quic, conn, 0 ); + fd_quic_reschedule_conn( conn, 0 ); + return; + } + /* temporary usage data is populated, then encoded into a buffer so only one member in use */ @@ -3658,8 +3684,12 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { each quic packet gets encrypted into tx_buf, and the space in crypt_scratch is reused */ + /* are we the client initial packet? */ + ulong hs_data_offset = conn->hs_sent_bytes[enc_level]; + initial_pkt = (uint)( hs_data_offset == 0 ) & (uint)( !conn->server ) & (uint)( enc_level == fd_quic_enc_level_initial_id ); + /* populate the quic packet header */ - fd_quic_pkt_hdr_populate( &pkt_hdr, enc_level, pkt_number, conn, (uchar)key_phase_tx ); + fd_quic_pkt_hdr_populate( &pkt_hdr, enc_level, pkt_number, conn, (uchar)key_phase_tx, initial_pkt ); ulong initial_hdr_sz = fd_quic_pkt_hdr_footprint( &pkt_hdr, enc_level ); @@ -3835,8 +3865,9 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { /* offset within stream */ ulong offset = conn->hs_sent_bytes[enc_level]; - /* are we the client initial packet? */ - initial_pkt = (uint)( offset == 0 ) & (uint)( !conn->server ) & (uint)( enc_level == fd_quic_enc_level_initial_id ); + /* track pkt_meta values */ + ulong offset_lo = offset; + ulong offset_hi = offset; data_sz = 0; (void)data; @@ -3894,11 +3925,8 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { payload_ptr += frame_sz; tot_frame_sz += frame_sz; - /* update packet meta */ - pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_HS_DATA; - pkt_meta->range.offset_lo = frame.crypto.offset; - pkt_meta->range.offset_hi = frame.crypto.offset + data_sz; - pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt ); + /* update pkt_meta values */ + offset_hi += cur_data_sz; /* move to next hs_data */ offset += cur_data_sz; @@ -3911,6 +3939,15 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { hs_data = fd_quic_tls_get_next_hs_data( conn->tls_hs, hs_data ); } } + + /* update packet meta */ + if( offset_hi > offset_lo ) { + pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_HS_DATA; + pkt_meta->range.offset_lo = offset_lo; + pkt_meta->range.offset_hi = offset_hi; + pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt ); + } + } /* are we at application level of encryption? */ @@ -4375,6 +4412,13 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { continue; } + /* drop packet */ + /* this is a workaround for leaving a short=header-packet in the buffer + for the next tx_conn call. Next time around the tx_conn call will + not be aware that the buffer cannot be added to */ + conn->tx_ptr = conn->tx_buf; + conn->tx_sz = sizeof( conn->tx_buf ); + break; } @@ -4502,6 +4546,16 @@ fd_quic_conn_free( fd_quic_t * quic, } } + /* remove from orig_dst_conn_id */ + { + fd_quic_conn_entry_t * entry = fd_quic_conn_map_query( state->conn_map, &conn->orig_dst_conn_id ); + if( entry ) { + entry->conn = NULL; + + fd_quic_conn_map_remove( state->conn_map, entry ); + } + } + /* find conn in events, then remove */ /* FIXME O(n) scales badly with number of conns (#266) */ ulong event_idx = 0; @@ -4565,6 +4619,26 @@ fd_quic_conn_free( fd_quic_t * quic, } quic->metrics.conn_active_cnt--; + + /* clear keys */ + for( ulong j = 0U; j < 4U; ++j ) { + for( ulong k = 0U; k < 2U; ++k ) { + fd_memset( conn->keys[j][k].pkt_key, 0x42, sizeof( conn->keys[0][0].pkt_key ) ); + fd_memset( conn->keys[j][k].hp_key, 0x43, sizeof( conn->keys[0][0].hp_key ) ); + fd_memset( conn->keys[j][k].iv, 0x45, sizeof( conn->keys[0][0].iv ) ); + conn->keys[j][k].pkt_key_sz = 0; + conn->keys[j][k].hp_key_sz = 0; + conn->keys[j][k].iv_sz = 0; + } + } + for( ulong k = 0U; k < 2U; ++k ) { + fd_memset( conn->new_keys[k].pkt_key, 0x42, sizeof( conn->new_keys[0].pkt_key ) ); + fd_memset( conn->new_keys[k].hp_key, 0x43, sizeof( conn->new_keys[0].hp_key ) ); + fd_memset( conn->new_keys[k].iv, 0x45, sizeof( conn->new_keys[0].iv ) ); + conn->new_keys[k].pkt_key_sz = 0; + conn->new_keys[k].hp_key_sz = 0; + conn->new_keys[k].iv_sz = 0; + } } fd_quic_conn_id_t @@ -4627,6 +4701,11 @@ fd_quic_connect( fd_quic_t * quic, conn->host.udp_port = src_port; + /* save original destination connection id */ + + fd_memcpy( conn->orig_dst_conn_id.conn_id, &peer_conn_id.conn_id, peer_conn_id.sz ); + conn->orig_dst_conn_id.sz = peer_conn_id.sz; + /* Prepare QUIC-TLS transport params object (sent as a TLS extension). Take template from state and mutate certain params in-place. @@ -5044,8 +5123,9 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, while(1) { /* find earliest sent pkt_meta over all of the enc_levels */ - uint enc_level = ~0u; - ulong expiry = ~0u; + uint enc_level = ~0u; + uint peer_enc_level = conn->peer_enc_level; + ulong expiry = ~0ul; for( uint j = 0u; j < 4u; ++j ) { /* TODO this only checks the head of each enc_level assuming that pkt_meta is in time order. It IS @@ -5065,6 +5145,7 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, enc_level = j; expiry = pkt_meta->expiry; } + if( enc_level < peer_enc_level ) break; pkt_meta = pkt_meta->next; } if( enc_level != ~0u ) break; @@ -5083,12 +5164,25 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, } } - FD_LOG_DEBUG(( "retrying tx" )); - fd_quic_pkt_meta_list_t * sent = &pool->sent[enc_level]; fd_quic_pkt_meta_t * pkt_meta = sent->head; fd_quic_pkt_meta_t * prior = NULL; /* prior is always null, since we always look at head */ + /* already moved to another enc_level */ + if( enc_level < peer_enc_level ) { + /* free pkt_meta */ + + /* remove from list */ + fd_quic_pkt_meta_remove( sent, prior, pkt_meta ); + + /* put pkt_meta back in free list */ + fd_quic_pkt_meta_deallocate( pool, pkt_meta ); + + cnt_freed++; + + continue; + } + uint pn_space = fd_quic_enc_level_to_pn_space( enc_level ); ulong pkt_number = pkt_meta->pkt_number; @@ -5286,9 +5380,8 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn, } if( flags & FD_QUIC_PKT_META_FLAGS_HS_DATA ) { - /* hs_data is being acked - TODO we should free data here - it eventually gets "freed", so this isn't critical */ + /* actually, it is assumed the offsets are from the beginning of this + data... so we can't free any here */ } if( flags & FD_QUIC_PKT_META_FLAGS_HS_DONE ) { @@ -5652,6 +5745,8 @@ fd_quic_frame_handle_reset_stream_frame( (void)data; (void)p; (void)p_sz; + /* ack-eliciting */ + /* TODO implement */ return FD_QUIC_PARSE_FAIL; } @@ -5665,6 +5760,8 @@ fd_quic_frame_handle_stop_sending_frame( (void)data; (void)p; (void)p_sz; + /* ack-eliciting */ + /* TODO implement */ return FD_QUIC_PARSE_FAIL; } @@ -5678,6 +5775,7 @@ fd_quic_frame_handle_new_token_frame( (void)data; (void)p; (void)p_sz; + /* ack-eliciting */ return FD_QUIC_PARSE_FAIL; } @@ -5969,6 +6067,9 @@ fd_quic_frame_handle_max_data_frame( fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + ulong tx_max_data = context.conn->tx_max_data; ulong new_max_data = data->max_data; @@ -5989,6 +6090,9 @@ fd_quic_frame_handle_max_stream_data( (void)p_sz; fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + ulong stream_id = data->stream_id; /* find stream */ @@ -6018,6 +6122,9 @@ fd_quic_frame_handle_max_streams_frame( fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + /* stream type */ ulong type = (ulong)context.conn->server | (ulong)( data->stream_type << 1u ); @@ -6030,14 +6137,19 @@ fd_quic_frame_handle_max_streams_frame( static ulong fd_quic_frame_handle_data_blocked_frame( - void * context, - fd_quic_data_blocked_frame_t * data, - uchar const * p, - ulong p_sz) { - (void)context; + void * vp_context, + fd_quic_data_blocked_frame_t * data, + uchar const * p, + ulong p_sz ) { (void)data; (void)p; (void)p_sz; + + fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + /* Since we do not do runtime allocations, we will not attempt to find more memory in the case of DATA_BLOCKED We return 0 (bytes consumed), since this frame does not @@ -6047,14 +6159,19 @@ fd_quic_frame_handle_data_blocked_frame( static ulong fd_quic_frame_handle_stream_data_blocked_frame( - void * context, - fd_quic_stream_data_blocked_frame_t * data, - uchar const * p, - ulong p_sz) { - (void)context; + void * vp_context, + fd_quic_stream_data_blocked_frame_t * data, + uchar const * p, + ulong p_sz ) { (void)data; (void)p; (void)p_sz; + + fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + /* Since we do not do runtime allocations, we will not attempt to find more memory in the case of STREAM_DATA_BLOCKED We return 0 (bytes consumed), since this frame does not @@ -6064,14 +6181,19 @@ fd_quic_frame_handle_stream_data_blocked_frame( static ulong fd_quic_frame_handle_streams_blocked_frame( - void * context, + void * vp_context, fd_quic_streams_blocked_frame_t * data, - uchar const * p, - ulong p_sz) { - (void)context; + uchar const * p, + ulong p_sz ) { (void)data; (void)p; (void)p_sz; + + fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + /* TODO STREAMS_BLOCKED should be sent by client when it wants to use a new stream, but is unable to due to the max_streams value @@ -6082,32 +6204,43 @@ fd_quic_frame_handle_streams_blocked_frame( static ulong fd_quic_frame_handle_new_conn_id_frame( - void * context, + void * vp_context, fd_quic_new_conn_id_frame_t * data, - uchar const * p, - ulong p_sz) { - (void)context; + uchar const * p, + ulong p_sz ) { (void)data; (void)p; (void)p_sz; - FD_DEBUG( FD_LOG_DEBUG(( "new_conn_id requested" )) ); + fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + + + DEBUG( FD_LOG_DEBUG(( "new_conn_id requested" )); ) return 0; } static ulong fd_quic_frame_handle_retire_conn_id_frame( - void * context, - fd_quic_retire_conn_id_frame_t * data, - uchar const * p, - ulong p_sz) { - (void)context; + void * vp_context, + fd_quic_retire_conn_id_frame_t * data, + uchar const * p, + ulong p_sz ) { + (void)vp_context; (void)data; (void)p; (void)p_sz; FD_DEBUG( printf( "%s:%d retire_conn_id requested\n", __func__, (int)(__LINE__) ); fflush( stdout ); ) + + // fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; + + // /* ack-eliciting */ + // context.pkt->ack_flag |= ACK_FLAG_RQD; + return FD_QUIC_PARSE_FAIL; } @@ -6206,15 +6339,31 @@ fd_quic_frame_handle_handshake_done_frame( fd_quic_frame_context_t context = *(fd_quic_frame_context_t*)vp_context; fd_quic_conn_t * conn = context.conn; + /* ack-eliciting */ + context.pkt->ack_flag |= ACK_FLAG_RQD; + /* servers must treat receipt of HANDSHAKE_DONE as a protocol violation */ if( FD_UNLIKELY( conn->server ) ) { fd_quic_conn_error( conn, FD_QUIC_CONN_REASON_PROTOCOL_VIOLATION ); return FD_QUIC_PARSE_FAIL; } - /* either we treat this as a fatal error, or just warn - if we don't tear down the connection we must move to ACTIVE */ if( FD_UNLIKELY( conn->state != FD_QUIC_CONN_STATE_HANDSHAKE_COMPLETE ) ) { + switch( conn->state ) { + case FD_QUIC_CONN_STATE_PEER_CLOSE: + case FD_QUIC_CONN_STATE_ABORT: + case FD_QUIC_CONN_STATE_CLOSE_PENDING: + case FD_QUIC_CONN_STATE_DEAD: + /* connection closing... nothing to do */ + return 0; + + case FD_QUIC_CONN_STATE_ACTIVE: + /* already active - probably received out of order */ + return 0; + } + + /* either we treat this as a fatal error, or just warn + if we don't tear down the connection we must move to ACTIVE */ FD_LOG_WARNING(( "%s : handshake done frame received, but not in handshake complete state", __func__ )); } diff --git a/src/tango/quic/fd_quic_conn.h b/src/tango/quic/fd_quic_conn.h index 84a5328ddd..0d8b2cbbcb 100644 --- a/src/tango/quic/fd_quic_conn.h +++ b/src/tango/quic/fd_quic_conn.h @@ -7,6 +7,7 @@ #include "crypto/fd_quic_crypto_suites.h" #include "templ/fd_quic_transport_params.h" #include "fd_quic_pkt_meta.h" +#include "templ/fd_quic_union.h" #define FD_QUIC_CONN_STATE_INVALID 0 /* dead object / freed */ #define FD_QUIC_CONN_STATE_HANDSHAKE 1 /* currently doing handshaking with peer */ @@ -87,6 +88,11 @@ struct fd_quic_conn { /* we can have multiple connection ids */ fd_quic_conn_id_t our_conn_id[ FD_QUIC_MAX_CONN_ID_PER_CONN ]; + /* Save original destination connection id + This will be used when we receive a retransmitted initial packet + Also used when retransmitting the first initial packet */ + fd_quic_conn_id_t orig_dst_conn_id; + /* Host network endpoint - for server, just a copy of config->net - for client, an allocated ephemeral UDP port */ @@ -159,7 +165,7 @@ struct fd_quic_conn { uchar crypt_scratch[2048]; /* some scratch space for frame encoding/decoding */ - uchar frame_scratch[2048]; + fd_quic_frame_u frame_union; /* buffer to send next */ /* rename tx_buf, since it's easy to confuse with stream->tx_buf */ diff --git a/src/tango/quic/fd_quic_proto.h b/src/tango/quic/fd_quic_proto.h index b6f6fe5590..9cd628897b 100644 --- a/src/tango/quic/fd_quic_proto.h +++ b/src/tango/quic/fd_quic_proto.h @@ -1,6 +1,8 @@ #ifndef HEADER_fd_src_tango_quic_fd_quic_proto_h #define HEADER_fd_src_tango_quic_fd_quic_proto_h +#include "fd_quic_proto_structs.h" + #include "fd_quic_common.h" #include "fd_quic_types.h" diff --git a/src/tango/quic/fd_quic_proto_structs.h b/src/tango/quic/fd_quic_proto_structs.h new file mode 100644 index 0000000000..2a4295bae0 --- /dev/null +++ b/src/tango/quic/fd_quic_proto_structs.h @@ -0,0 +1,16 @@ +#ifndef HEADER_fd_src_tango_quic_fd_quic_proto_structs_h +#define HEADER_fd_src_tango_quic_fd_quic_proto_structs_h + +#include "fd_quic_common.h" +#include "fd_quic_types.h" + +#include "templ/fd_quic_defs.h" +#include "templ/fd_quic_templ.h" +#include "templ/fd_quic_frames_templ.h" +#include "templ/fd_quic_ipv4.h" +#include "templ/fd_quic_udp.h" +#include "templ/fd_quic_eth.h" +#include "templ/fd_quic_undefs.h" + +#endif /* HEADER_fd_src_tango_quic_fd_quic_proto_structs_h */ + diff --git a/src/tango/quic/templ/fd_quic_encoders.h b/src/tango/quic/templ/fd_quic_encoders.h index 95de87c991..70461b32ba 100644 --- a/src/tango/quic/templ/fd_quic_encoders.h +++ b/src/tango/quic/templ/fd_quic_encoders.h @@ -73,7 +73,10 @@ #define FD_TEMPL_MBR_ELEM_PKTNUM(NAME,TYPE) \ buf += (cur_bit != 0); \ cur_bit = 0; \ - if( buf >= buf_end ) return FD_QUIC_PARSE_FAIL; \ + if( (long)( frame->NAME##_bits + cur_bit ) \ + > (long)( ( buf_end - buf ) * 8 ) ) { \ + return FD_QUIC_PARSE_FAIL; \ + } \ frame->NAME##_pnoff = (unsigned)( buf - orig_buf ); \ if( fd_quic_encode_bits( buf, cur_bit, frame->NAME, \ frame->NAME##_bits ) ) { \ diff --git a/src/tango/quic/templ/fd_quic_encoders_footprint.h b/src/tango/quic/templ/fd_quic_encoders_footprint.h index 9fe21c9ae3..87a2ec078d 100644 --- a/src/tango/quic/templ/fd_quic_encoders_footprint.h +++ b/src/tango/quic/templ/fd_quic_encoders_footprint.h @@ -32,7 +32,7 @@ #define FD_TEMPL_MBR_ELEM_PKTNUM(NAME,TYPE) \ buf += (cur_bit != 0); \ cur_bit = 0; \ - buf += (ulong)frame->NAME##_bits >> (ulong)3u; + buf += (ulong)(frame->NAME##_bits+7u) >> (ulong)3u; /* determines the encoding footprint of the VARINT */ diff --git a/src/tango/quic/templ/fd_quic_parse_frame.h b/src/tango/quic/templ/fd_quic_parse_frame.h index f90116bfe5..f3cc2eb0c8 100644 --- a/src/tango/quic/templ/fd_quic_parse_frame.h +++ b/src/tango/quic/templ/fd_quic_parse_frame.h @@ -34,7 +34,7 @@ #define FD_TEMPL_DEF_STRUCT_BEGIN(NAME) \ do { \ - fd_quic_##NAME##_t * data = (fd_quic_##NAME##_t*)scratch; + fd_quic_##NAME##_t * data = &frame_union->NAME; #define FD_TEMPL_MBR_FRAME_TYPE(NAME,ID_LO,ID_HI) \ id_lo = ID_LO; \ diff --git a/src/tango/quic/templ/fd_quic_union.h b/src/tango/quic/templ/fd_quic_union.h index 36a5887ea4..112a1cafb7 100644 --- a/src/tango/quic/templ/fd_quic_union.h +++ b/src/tango/quic/templ/fd_quic_union.h @@ -3,6 +3,8 @@ #include "../../../util/fd_util_base.h" +#include "../fd_quic_proto_structs.h" + /* define a union of all the frame structures */ union fd_quic_frame_union { #include "fd_quic_union_decl.h" diff --git a/src/tango/quic/tests/Local.mk b/src/tango/quic/tests/Local.mk index 263f9cc7fa..158a897109 100644 --- a/src/tango/quic/tests/Local.mk +++ b/src/tango/quic/tests/Local.mk @@ -5,6 +5,7 @@ $(call add-objs,fd_quic_stream_spam fd_quic_test_helpers,fd_quic) $(call make-unit-test,test_quic_hs,test_quic_hs,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_streams,test_quic_streams,fd_aio fd_ballet fd_tango fd_quic fd_util) $(call make-unit-test,test_quic_conn,test_quic_conn,fd_aio fd_quic fd_ballet fd_tango fd_util) +$(call make-unit-test,test_quic_drops,test_quic_drops,fd_aio fd_quic fd_ballet fd_tango fd_util fd_fibre) $(call make-unit-test,test_quic_server,test_quic_server,fd_aio fd_ballet fd_quic fd_tango fd_util) $(call make-unit-test,test_quic_client_flood,test_quic_client_flood,fd_aio fd_quic fd_ballet fd_tango fd_util) $(call make-unit-test,test_quic_txn,test_quic_txn,fd_aio fd_quic fd_ballet fd_tango fd_util) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c new file mode 100644 index 0000000000..db85aea46e --- /dev/null +++ b/src/tango/quic/tests/test_quic_drops.c @@ -0,0 +1,501 @@ +#include "../fd_quic.h" +#include "fd_quic_test_helpers.h" +#include "../../../util/rng/fd_rng.h" +#include "../../../util/net/fd_pcapng.h" + +#include "../../../util/fibre/fd_fibre.h" + +#include + +/* number of streams to send/receive */ +#define NUM_STREAMS 100 + +/* done flags */ + +int client_done = 0; +int server_done = 0; + +/* received count */ +ulong rcvd = 0; +ulong tot_rcvd = 0; + +/* some randomness stuff */ + +typedef float rng_t; + +rng_t rnd() { + static uint seed = 0; + + ulong l = fd_rng_private_expand( seed++ ); + return (rng_t)l * (rng_t)0x1p-64; +} + +fd_fibre_t * client_fibre = NULL; +fd_fibre_t * server_fibre = NULL; + +/* man-in-the-middle for testing drops */ + +struct mitm_ctx { + fd_aio_t local; + fd_aio_t const * dst; + fd_aio_t const * pcap; + rng_t thresh; + int server; +}; +typedef struct mitm_ctx mitm_ctx_t; + +static int +mitm_tx( void * ctx, + fd_aio_pkt_info_t const * batch, + ulong batch_cnt, + ulong * opt_batch_idx, + int flush ) { + mitm_ctx_t * mitm_ctx = (mitm_ctx_t*)ctx; + + /* each time data transfers, the schedule might change + so wake the other fibre */ + if( client_fibre && mitm_ctx->server ) fd_fibre_wake( client_fibre ); + if( server_fibre && !mitm_ctx->server ) fd_fibre_wake( server_fibre ); + + /* write to pcap even if dropping */ + if( mitm_ctx->pcap ) { + fd_aio_send( mitm_ctx->pcap, batch, batch_cnt, opt_batch_idx, 1 ); + } + + /* generate a random number and compare with threshold, and either pass thru or drop */ + + if( rnd() < mitm_ctx->thresh ) { + /* dropping behaves as-if the send was successful */ + printf( "dropped! server=%d\n", mitm_ctx->server ); + return FD_AIO_SUCCESS; + } else { + printf( "passed thru! server=%d\n", mitm_ctx->server ); + return fd_aio_send( mitm_ctx->dst, batch, batch_cnt, opt_batch_idx, flush ); + } +} + +static void +mitm_link( fd_quic_t * quic_a, fd_quic_t * quic_b, mitm_ctx_t * mitm, fd_aio_t const * pcap ) { + fd_aio_t const * rx_b = fd_quic_get_aio_net_rx( quic_b ); + + /* create a new aio for mitm */ + + FD_TEST( fd_aio_join( fd_aio_new( &mitm->local, mitm, mitm_tx ) ) ); + + mitm->dst = rx_b; + mitm->pcap = pcap; + + fd_quic_set_aio_net_tx( quic_a, &mitm->local ); +} + +static void +mitm_set_thresh( mitm_ctx_t * mitm_ctx, rng_t thresh ) { + mitm_ctx->thresh = thresh; +} + +static void +mitm_set_server( mitm_ctx_t * mitm_ctx, int server ) { + mitm_ctx->server = server; +} + + +fd_aio_pcapng_t pcap_client_to_server; +fd_aio_pcapng_t pcap_server_to_client; + +static void +my_tls_keylog( void * quic_ctx, + char const * line ) { + (void)quic_ctx; + fd_pcapng_fwrite_tls_key_log( (uchar const *)line, (uint)strlen( line ), pcap_server_to_client.pcapng ); +} + + +int state = 0; +int server_complete = 0; +int client_complete = 0; + +extern uchar pkt_full[]; +extern ulong pkt_full_sz; + +uchar fail = 0; + +void +my_stream_notify_cb( fd_quic_stream_t * stream, void * ctx, int type ) { + (void)stream; + (void)ctx; + (void)type; +} + +void +my_stream_receive_cb( fd_quic_stream_t * stream, + void * ctx, + uchar const * data, + ulong data_sz, + ulong offset, + int fin ) { + (void)ctx; + (void)stream; + (void)fin; + + FD_LOG_NOTICE(( "received data from peer. stream_id: %lu size: %lu offset: %lu\n", + (ulong)stream->stream_id, data_sz, offset )); + FD_LOG_HEXDUMP_DEBUG(( "received data", data, data_sz )); + + FD_LOG_DEBUG(( "recv ok" )); + + rcvd++; + tot_rcvd++; + + if( tot_rcvd == NUM_STREAMS ) client_done = 1; +} + + +struct my_context { + int server; +}; +typedef struct my_context my_context_t; + +void +my_cb_conn_final( fd_quic_conn_t * conn, + void * context ) { + (void)context; + + fd_quic_conn_t ** ppconn = (fd_quic_conn_t**)fd_quic_conn_get_context( conn ); + if( ppconn ) { + FD_LOG_NOTICE(( "my_cb_conn_final %p SUCCESS", (void*)*ppconn )); + *ppconn = NULL; + }} + +void +my_connection_new( fd_quic_conn_t * conn, + void * vp_context ) { + (void)vp_context; + + FD_LOG_NOTICE(( "server handshake complete" )); + + server_complete = 1; + + (void)conn; +} + +void +my_handshake_complete( fd_quic_conn_t * conn, + void * vp_context ) { + (void)vp_context; + + FD_LOG_NOTICE(( "client handshake complete" )); + + client_complete = 1; + + (void)conn; +} + +/* global "clock" */ +ulong now = (ulong)1e18; + +ulong test_clock( void * ctx ) { + (void)ctx; + return now; +} + +long +test_fibre_clock(void) { + return (long)now; +} + + +struct client_args { + fd_quic_t * quic; + fd_quic_t * server_quic; +}; +typedef struct client_args client_args_t; + +void +client_fibre_fn( void * vp_arg ) { + client_args_t * args = (client_args_t*)vp_arg; + + fd_quic_t * quic = args->quic; + fd_quic_t * server_quic = args->server_quic; + + fd_quic_conn_t * conn = NULL; + fd_quic_stream_t * stream = NULL; + + uchar buf[] = "Hello World!"; + fd_aio_pkt_info_t batch[1] = {{ .buf = buf, .buf_sz = sizeof( buf ) }}; + + ulong period_ns = (ulong)1e6; + ulong next_send = now + period_ns; + ulong sent = 0; + + while( !client_done ) { + ulong next_wakeup = fd_quic_get_next_wakeup( quic ); + + /* wake up at either next service or next send, whichever is sooner */ + fd_fibre_wait_until( (long)fd_ulong_min( next_wakeup, next_send ) ); + + fd_quic_service( quic ); + + if( !conn ) { + rcvd = sent = 0; + + conn = fd_quic_connect( quic, + server_quic->config.net.ip_addr, + server_quic->config.net.listen_udp_port, + server_quic->config.sni ); + + if( !conn ) { + FD_LOG_WARNING(( "Client unable to obtain a connection" )); + continue; + } + + fd_quic_conn_set_context( conn, &conn ); + + /* wait for connection handshake */ + while( conn && conn->state != FD_QUIC_CONN_STATE_ACTIVE ) { + /* service client */ + fd_quic_service( quic ); + + /* allow server to process */ + fd_fibre_wait_until( (long)fd_quic_get_next_wakeup( quic ) ); + } + + continue; + } + + if( !stream ) { + if( rcvd != sent ) { + fd_quic_service( quic ); + fd_fibre_wait_until( (long)fd_quic_get_next_wakeup( quic ) ); + + continue; + } + + stream = fd_quic_conn_new_stream( conn, FD_QUIC_TYPE_UNIDIR ); + + if( !stream ) { + if( conn->state == FD_QUIC_CONN_STATE_ACTIVE ) { + FD_LOG_WARNING(( "Client unable to obtain a stream" )); + } + continue; + } + } + + /* set next send time */ + next_send = now + period_ns; + + /* have a stream, so send */ + int rc = fd_quic_stream_send( stream, batch, 1 /* batch_sz */, 1 /* fin */ ); + + if( rc == 1 ) { + /* successful - stream will begin closing */ + + if( ++sent % 15 == 0 ) { + fd_quic_conn_close( conn, 0 ); + sent = 0; + + /* wait for connection to be reaped + (it's set to NULL in final callback */ + while( conn ) { + fd_quic_service( quic ); + + /* allow server to process */ + fd_fibre_wait_until( (long)fd_quic_get_next_wakeup( quic ) ); + } + + stream = NULL; + + continue; + } + + /* ensure new stream used for next send */ + stream = fd_quic_conn_new_stream( conn, FD_QUIC_TYPE_UNIDIR ); + + /* TODO close logic */ + + } else { + FD_LOG_WARNING(( "send failed" )); + } + } + + if( conn ) { + fd_quic_conn_close( conn, 0 ); + + /* keep servicing until connection closed */ + while( conn ) { + fd_quic_service( quic ); + fd_fibre_yield(); + } + } + + /* tell the server to shutdown */ + server_done = 1; +} + + +struct server_args { + fd_quic_t * quic; +}; +typedef struct server_args server_args_t; + + +void +server_fibre_fn( void * vp_arg ) { + server_args_t * args = (server_args_t*)vp_arg; + + fd_quic_t * quic = args->quic; + + /* wake up at least every 1ms */ + ulong period_ns = (ulong)1e6; + while( !server_done ) { + fd_quic_service( quic ); + + ulong next_wakeup = fd_quic_get_next_wakeup( quic ); + ulong next_period = now + period_ns; + + fd_fibre_wait_until( (long)fd_ulong_min( next_wakeup, next_period ) ); + } +} + + +int +main( int argc, char ** argv ) { + + fd_boot ( &argc, &argv ); + fd_quic_test_boot( &argc, &argv ); + + ulong cpu_idx = fd_tile_cpu_id( fd_tile_idx() ); + if( cpu_idx>fd_shmem_cpu_cnt() ) cpu_idx = 0UL; + + char const * _page_sz = fd_env_strip_cmdline_cstr ( &argc, &argv, "--page-sz", NULL, "gigantic" ); + ulong page_cnt = fd_env_strip_cmdline_ulong( &argc, &argv, "--page-cnt", NULL, 2UL ); + ulong numa_idx = fd_env_strip_cmdline_ulong( &argc, &argv, "--numa-idx", NULL, fd_shmem_numa_idx( cpu_idx ) ); + + ulong page_sz = fd_cstr_to_shmem_page_sz( _page_sz ); + if( FD_UNLIKELY( !page_sz ) ) FD_LOG_ERR(( "unsupported --page-sz" )); + + FD_LOG_NOTICE(( "Creating workspace (--page-cnt %lu, --page-sz %s, --numa-idx %lu)", page_cnt, _page_sz, numa_idx )); + fd_wksp_t * wksp = fd_wksp_new_anonymous( page_sz, page_cnt, fd_shmem_cpu_idx( numa_idx ), "wksp", 0UL ); + FD_TEST( wksp ); + + fd_quic_limits_t const quic_limits = { + .conn_cnt = 10, + .conn_id_cnt = 10, + .conn_id_sparsity = 4.0, + .handshake_cnt = 10, + .stream_cnt = { 0, 0, 10, 0 }, + .inflight_pkt_cnt = 1024, + .tx_buf_sz = 1<<14, + .rx_buf_sz = 1<<14 + }; + + ulong quic_footprint = fd_quic_footprint( &quic_limits ); + FD_TEST( quic_footprint ); + FD_LOG_NOTICE(( "QUIC footprint: %lu bytes", quic_footprint )); + + FD_LOG_NOTICE(( "Creating server QUIC" )); + fd_quic_t * server_quic = fd_quic_new_anonymous( wksp, &quic_limits, FD_QUIC_ROLE_SERVER ); + FD_TEST( server_quic ); + + FD_LOG_NOTICE(( "Creating client QUIC" )); + fd_quic_t * client_quic = fd_quic_new_anonymous( wksp, &quic_limits, FD_QUIC_ROLE_CLIENT ); + FD_TEST( client_quic ); + + fd_quic_config_t * client_config = &client_quic->config; + client_config->idle_timeout = 5e9; + + client_quic->cb.conn_hs_complete = my_handshake_complete; + client_quic->cb.stream_receive = my_stream_receive_cb; + client_quic->cb.stream_notify = my_stream_notify_cb; + client_quic->cb.conn_final = my_cb_conn_final; + + client_quic->cb.now = test_clock; + client_quic->cb.now_ctx = NULL; + + fd_quic_config_t * server_config = &server_quic->config; + server_config->idle_timeout = 5e9; + + server_quic->cb.conn_new = my_connection_new; + server_quic->cb.stream_receive = my_stream_receive_cb; + server_quic->cb.stream_notify = my_stream_notify_cb; + server_quic->cb.conn_final = my_cb_conn_final; + server_quic->cb.tls_keylog = my_tls_keylog; + + server_quic->cb.now = test_clock; + server_quic->cb.now_ctx = NULL; + + /* pcap */ + FILE * pcap_file = fopen( "test_quic_drops.pcapng", "wb" ); + FD_TEST( pcap_file ); + printf( "pcap_file: %p\n", (void*)pcap_file ); fflush( stdout ); + + FD_TEST( 1UL == fd_aio_pcapng_start( pcap_file ) ); + fflush( pcap_file ); + + FD_TEST( fd_aio_pcapng_join( &pcap_client_to_server, NULL, pcap_file ) ); + FD_TEST( fd_aio_pcapng_join( &pcap_server_to_client, NULL, pcap_file ) ); + + FD_LOG_NOTICE(( "Attaching AIOs" )); + mitm_ctx_t mitm_client_to_server; + mitm_ctx_t mitm_server_to_client; + + mitm_link( client_quic, server_quic, &mitm_client_to_server, fd_aio_pcapng_get_aio( &pcap_client_to_server ) ); + mitm_link( server_quic, client_quic, &mitm_server_to_client, fd_aio_pcapng_get_aio( &pcap_server_to_client ) ); + + mitm_set_thresh( &mitm_client_to_server, 0.30f ); + mitm_set_thresh( &mitm_server_to_client, 0.30f ); + + mitm_set_server( &mitm_client_to_server, 0 ); + mitm_set_server( &mitm_server_to_client, 1 ); + + FD_LOG_NOTICE(( "Initializing QUICs" )); + FD_TEST( fd_quic_init( client_quic ) ); + FD_TEST( fd_quic_init( server_quic ) ); + + /* initialize fibres */ + void * this_fibre_mem = fd_wksp_alloc_laddr( wksp, fd_fibre_init_align(), fd_fibre_init_footprint( ), 1UL ); + fd_fibre_t * this_fibre = fd_fibre_init( this_fibre_mem ); (void)this_fibre; + + /* set fibre scheduler clock */ + fd_fibre_set_clock( test_fibre_clock ); + + /* create fibres for client and server */ + ulong stack_sz = 1<<20; + void * client_mem = fd_wksp_alloc_laddr( wksp, fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ), 1UL ); + client_args_t client_args[1] = {{ .quic = client_quic, .server_quic = server_quic }}; + client_fibre = fd_fibre_start( client_mem, stack_sz, client_fibre_fn, client_args ); + FD_TEST( client_fibre ); + + void * server_mem = fd_wksp_alloc_laddr( wksp, fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ), 1UL ); + server_args_t server_args[1] = {{ .quic = server_quic }}; + server_fibre = fd_fibre_start( server_mem, stack_sz, server_fibre_fn, server_args ); + FD_TEST( server_fibre ); + + /* schedule the fibres + they will execute during the call to fibre_schedule_run */ + fd_fibre_schedule( client_fibre ); + fd_fibre_schedule( server_fibre ); + + /* run the fibres until done */ + while(1) { + long timeout = fd_fibre_schedule_run(); + if( timeout < 0 ) break; + + now = (ulong)timeout; + } + + FD_TEST( fd_aio_pcapng_leave( &pcap_client_to_server ) ); + FD_TEST( fd_aio_pcapng_leave( &pcap_server_to_client ) ); + + FD_LOG_NOTICE(( "Cleaning up" )); + //fd_quic_virtual_pair_fini( &vp ); + // TODO clean up mitm_ctx and aio + fd_wksp_free_laddr( fd_quic_delete( fd_quic_leave( server_quic ) ) ); + fd_wksp_free_laddr( fd_quic_delete( fd_quic_leave( client_quic ) ) ); + fd_wksp_delete_anonymous( wksp ); + + FD_LOG_NOTICE(( "pass" )); + fd_quic_test_halt(); + fd_halt(); + return 0; +} + diff --git a/src/util/fibre/Local.mk b/src/util/fibre/Local.mk new file mode 100644 index 0000000000..da842c14d7 --- /dev/null +++ b/src/util/fibre/Local.mk @@ -0,0 +1,5 @@ +$(call make-lib,fd_fibre) +$(call add-objs,fd_fibre,fd_fibre) + +$(call make-unit-test,test_fibre,test_fibre,fd_fibre fd_util) + diff --git a/src/util/fibre/fd_fibre.c b/src/util/fibre/fd_fibre.c new file mode 100644 index 0000000000..1ad9586b9a --- /dev/null +++ b/src/util/fibre/fd_fibre.c @@ -0,0 +1,295 @@ +#include "fd_fibre.h" + +#include +#include +#include +#include + +fd_fibre_t * fd_fibre_current = NULL; + + +/* top level function + simply calls the user function then sets the done flag */ +void +fd_fibre_run_fn( void * vp ) { + fd_fibre_t * fibre = (fd_fibre_t*)vp; + + /* call user function */ + fibre->fn( fibre->arg ); + + /* set done flag */ + fibre->done = 1; +} + +/* footprint and alignment required for fd_fibre_init */ +ulong +fd_fibre_init_footprint( void ) { + /* size should be a multiple of the alignment */ + return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ); +} + +ulong +fd_fibre_init_align( void ) { + return FD_FIBRE_ALIGN; +} + + +/* initialize main fibre */ +fd_fibre_t * +fd_fibre_init( void * mem ) { + fd_fibre_t * fibre = (fd_fibre_t*)mem; + + memset( fibre, 0, sizeof( *fibre ) ); + + fibre->stack = NULL; + fibre->stack_sz = 0; + + ucontext_t * ctx = &fibre->ctx; + + if( getcontext( ctx ) == -1 ) { + fprintf( stderr, "getcontext failed with %d %s\n", errno, strerror( errno ) ); + fflush( stderr ); + fd_fibre_abort(); + } + + fd_fibre_current = fibre; + + return fibre; +} + + +/* footprint and alignment required for fd_fibre_start */ +ulong +fd_fibre_start_footprint( ulong stack_size ) { + return fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ) + + fd_ulong_align_up( stack_size, FD_FIBRE_ALIGN ); +} + +ulong fd_fibre_start_align( void ) { + return FD_FIBRE_ALIGN; +} + + +/* start a fibre */ + +/* this uses get/setcontext to start a new fibre + the current fibre will continue running, and the new one will be + inactive, and ready to switch to + this is cooperative threading + this fibre may be started on another thread */ +fd_fibre_t * +fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) { + if( fd_fibre_current == NULL ) { + fprintf( stderr, "fd_fibre_init must be called before fd_fibre_start\n" ); + fflush( stderr ); + fd_fibre_abort(); + } + + ulong l_mem = (ulong)mem; + + void * stack = (void*)( l_mem + + fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ) ); + + fd_fibre_t * fibre = (fd_fibre_t*)mem; + + memset( fibre, 0, sizeof( *fibre ) ); + + /* set the current value of stack and stack_sz */ + fibre->stack_sz = stack_sz; + fibre->stack = stack; + + fibre->fn = fn; + fibre->arg = arg; + + /* start with the current fibre */ + memcpy( &fibre->ctx, &fd_fibre_current->ctx, sizeof( fibre->ctx ) ); + + /* set the successor context, for use in the event the fibre terminates */ + fibre->ctx.uc_link = &fd_fibre_current->ctx; + + /* set the stack for the new fibre */ + fibre->ctx.uc_stack.ss_sp = stack; + fibre->ctx.uc_stack.ss_size = stack_sz; + + /* make a new context */ + makecontext( &fibre->ctx, (void(*)())fd_fibre_run_fn, 1, fibre ); + + return fibre; +} + +/* free a fibre + + this frees up the resources of a fibre */ +void +fd_fibre_free( fd_fibre_t * fibre ) { + free( fibre->stack ); + free( fibre ); +} + +/* switch execution to a fibre + + switches execution to "swap_to" + "swap_to" must have been created with either fd_fibre_init, or fd_fibre_start */ +void +fd_fibre_swap( fd_fibre_t * swap_to ) { + if( swap_to == fd_fibre_current ) { + return; + } + + if( swap_to->done ) return; + + /* set the context to return to as the current context */ + swap_to->ctx.uc_link = &fd_fibre_current->ctx; + + /* store current fibre for popping */ + fd_fibre_t * fibre_pop = fd_fibre_current; + + /* set fd_fibre_current for next execution context */ + fd_fibre_current = swap_to; + + /* switch to new fibre */ + if( swapcontext( &fibre_pop->ctx, &swap_to->ctx ) == -1 ) { + fprintf( stderr, "swapcontext failed with %d %s\n", errno, strerror( errno ) ); + fflush( stdout ); + fd_fibre_abort(); + } + + /* return value of fibre to its previous value */ + fd_fibre_current = fibre_pop; +} + + +/* set a clock for scheduler */ +long (*fd_fibre_clock)(void); + +/* fibre for scheduler */ +fd_fibre_t * fd_fibre_scheduler = NULL; + +void +fd_fibre_set_clock( long (*clock)(void) ) { + fd_fibre_clock = clock; +} + +/* yield current fibre + allows another fibre to run */ +void +fd_fibre_yield( void ) { + /* same as yield */ + fd_fibre_wait(0); +} + +/* stops running currently executing fibre for a period */ +void +fd_fibre_wait( long wait_ns ) { + /* cannot wait if no scheduler */ + if( fd_fibre_scheduler == NULL ) return; + + /* calc wake time */ + long wake = fd_fibre_clock() + ( wait_ns < 1 ? 1 : wait_ns ); + + fd_fibre_current->sched_time = wake; + + fd_fibre_schedule( fd_fibre_current ); + + /* switch to the fibre scheduler */ + fd_fibre_swap( fd_fibre_scheduler ); +} + +/* stops running currently executing fibre until a particular + time */ +void +fd_fibre_wait_until( long resume_time_ns ) { + long now = fd_fibre_clock(); + if( resume_time_ns <= now ) { + /* ensure that another fibre gets a chance at some point */ + resume_time_ns = now + 1; + } + + /* cannot wait if no scheduler */ + if( fd_fibre_scheduler == NULL ) return; + + fd_fibre_current->sched_time = resume_time_ns; + + fd_fibre_schedule( fd_fibre_current ); + + /* switch to the fibre scheduler */ + fd_fibre_swap( fd_fibre_scheduler ); +} + +/* wakes another fibre */ +void +fd_fibre_wake( fd_fibre_t * fibre ) { + if( fd_fibre_current == fibre ) return; + + fibre->sched_time = fd_fibre_clock(); + fd_fibre_schedule( fibre ); +} + +/* sentinel for run queue */ +fd_fibre_t fd_fibre_schedule_queue[1] = {{ .sentinel = 1, .next = fd_fibre_schedule_queue }}; + +/* add a fibre to the schedule */ +void +fd_fibre_schedule( fd_fibre_t * fibre ) { + if( fd_fibre_clock == NULL ) fd_fibre_abort(); + + fd_fibre_t * cur_fibre = fd_fibre_schedule_queue; + + /* remove from schedule */ + while(1) { + if( cur_fibre->next == fibre ) { + cur_fibre->next = fibre->next; + } + + cur_fibre = cur_fibre->next; + if( cur_fibre->sentinel ) break; + } + + /* add into schedule at appropriate place for wake time */ + fd_fibre_t * prior = fd_fibre_schedule_queue; + long wake = fibre->sched_time; + + cur_fibre = prior->next; + while( !cur_fibre->sentinel && wake > cur_fibre->sched_time ) { + prior = cur_fibre; + cur_fibre = cur_fibre->next; + } + + /* insert into schedule */ + fibre->next = cur_fibre; + prior->next = fibre; +} + +/* run the current schedule + + returns the time of the next ready fibre + returns -1 if there are no fibres in the schedule */ +long +fd_fibre_schedule_run() { + /* set the currently running fibre as the scheduler */ + fd_fibre_scheduler = fd_fibre_current; + + while(1) { + fd_fibre_t * cur_fibre = fd_fibre_schedule_queue->next; + if( cur_fibre->sentinel ) return -1; + + long now = fd_fibre_clock(); + if( cur_fibre->sched_time > now ) { + /* nothing more to do yet */ + return cur_fibre->sched_time; + } + + /* remove from schedule */ + fd_fibre_schedule_queue->next = cur_fibre->next; + + /* if fibre done, skip execution */ + if( !cur_fibre->done ) { + fd_fibre_swap( cur_fibre ); + } + } + + return -1; +} + + + diff --git a/src/util/fibre/fd_fibre.h b/src/util/fibre/fd_fibre.h new file mode 100644 index 0000000000..72a210fa8f --- /dev/null +++ b/src/util/fibre/fd_fibre.h @@ -0,0 +1,152 @@ +#ifndef HEADER_fd_src_util_fibre_fd_fibre_h +#define HEADER_fd_src_util_fibre_fd_fibre_h + +#include + +#include "../fd_util.h" + +#define FD_FIBRE_ALIGN 128UL + +/* definition of the function to be called when starting a new fibre */ +typedef void (*fd_fibre_fn_t)( void * ); + +struct fd_fibre { + ucontext_t ctx; + void * stack; + size_t stack_sz; + fd_fibre_fn_t fn; + void * arg; + int done; + + /* schedule parameters */ + long sched_time; + struct fd_fibre * next; + int sentinel; +}; +typedef struct fd_fibre fd_fibre_t; + + +/* TODO make thread local */ +extern fd_fibre_t * fd_fibre_current; + + +FD_PROTOTYPES_BEGIN + + +/* footprint and alignment required for fd_fibre_init */ +ulong fd_fibre_init_footprint( void ); +ulong fd_fibre_init_align( void ); + + +/* initialize main fibre + + should be called before making any other fibre calls + + creates a new fibre from the current thread, and returns it + caller should keep the fibre for later freeing + + probably shouldn't run this twice on the same thread + + mem is the memory allocated for this object. Use fd_fibre_init{_align,_footprint} to + obtain the appropriate size and alignment requirements */ + +fd_fibre_t * +fd_fibre_init( void * ); + + +/* footprint and alignment required for fd_fibre_start */ +ulong fd_fibre_start_footprint( ulong stack_size ); +ulong fd_fibre_start_align( void ); + + +/* Start a fibre + + This uses get/setcontext to create a new fibre + + fd_fibre_init must be called once before calling this + + The current fibre will continue running, and the other will be + inactive, and ready to switch to + + This fibre may be started on this or another thread + + mem is the memory used for the fibre. Use fd_fibre_start{_align,_footprint} + to determine the size and alignment required for the memory + + stack_sz is the size of the stack required + + fn is the function entry point to call in the new fibre + arg is the value to pass to function fn */ +fd_fibre_t * +fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ); + + +/* Free a fibre + + This frees up the resources of a fibre + + Only call on a fibre that is not currently running */ +void +fd_fibre_free( fd_fibre_t * fibre ); + + +/* switch execution to a fibre + + Switches execution to "swap_to" + The global variable `fd_fibre_current` is updated with the state + of the currently running fibre before switching */ +void +fd_fibre_swap( fd_fibre_t * swap_to ); + + +/* fd_fibre_abort is called when a fatal error occurrs */ +#ifndef fd_fibre_abort +# define fd_fibre_abort(...) abort( __VA_ARGS__ ) +#endif + + +/* set a clock for scheduler */ +void +fd_fibre_set_clock( long (*clock)(void) ); + + +/* yield current fibre + allows other fibres to execute */ +void +fd_fibre_yield( void ); + + +/* stops running currently executing fibre for a period of time */ +void +fd_fibre_wait( long wait_ns ); + + +/* stops running currently executing fibre until a particular + time */ +void +fd_fibre_wait_until( long resume_time_ns ); + + +/* wakes another fibre */ +void +fd_fibre_wake( fd_fibre_t * fibre ); + + +/* add a fibre to the schedule */ +void +fd_fibre_schedule( fd_fibre_t * fibre ); + + +/* run the current schedule + + returns + the time of the next ready fibre + -1 if there are no fibres in the schedule */ +long +fd_fibre_schedule_run( void ); + +FD_PROTOTYPES_END + + +#endif /* HEADER_fd_src_util_fibre_fd_fibre_h */ + diff --git a/src/util/fibre/test_fibre.c b/src/util/fibre/test_fibre.c new file mode 100644 index 0000000000..ee71143da5 --- /dev/null +++ b/src/util/fibre/test_fibre.c @@ -0,0 +1,159 @@ +#include +#include + +#include "fd_fibre.h" + + +void +fn1( void * vp ) { + (void)vp; + printf( "running fn1\n" ); fflush( stdout ); +} + + +void +fn2( void * vp ) { + (void)vp; + printf( "running fn2\n" ); fflush( stdout ); +} + + +void +fn3( void * vp ) { + (void)vp; + printf( "running fn3\n" ); fflush( stdout ); +} + + +/* tests of fd_fibre_wait and fd_fibre_wait_until */ + +/* need a synthetic clock */ +long now = 0; + +long +my_clock(void) { + return now; +} + + +// done flag for tests +int done = 0; + +void +test1( void * vp ) { + /* argument to test1 */ + long * arg = (long*)vp; + + /* fetch argument, which is period */ + long period = arg[0]; + + while( !done ) { + printf( "test1 arg(%ld) now: %ld\n", period, now ); fflush( stdout ); + + fd_fibre_wait( period ); + } +} + +void +test2( void * vp ) { + /* arguments */ + long * arg = (long*)vp; + + /* argument is "done" time */ + long done_time = arg[0]; + + /* this test simply waits until a particular time + and then sets a flag */ + + printf( "test2: waiting\n" ); fflush( stdout ); + fd_fibre_wait_until( done_time ); + + printf( "test2: finished waiting\n" ); fflush( stdout ); + done = 1; +} + + +int +main( int argc, char ** argv ) { + (void)argc; + (void)argv; + + // initialize fibres + void * main_fibre_mem = aligned_alloc( fd_fibre_init_align(), fd_fibre_init_footprint() ); + fd_fibre_t * main_fibre = fd_fibre_init( main_fibre_mem ); + + // create 3 fibres for functions fn1, fn2 and fn3 + ulong stack_sz = 1<<20; + void * fibre_1_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + void * fibre_2_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + void * fibre_3_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + + fd_fibre_t * fibre_1 = fd_fibre_start( fibre_1_mem, stack_sz, fn1, NULL ); + fd_fibre_t * fibre_2 = fd_fibre_start( fibre_2_mem, stack_sz, fn2, NULL ); + fd_fibre_t * fibre_3 = fd_fibre_start( fibre_3_mem, stack_sz, fn3, NULL ); + + // start each fibre, and allow to complete + fd_fibre_swap( fibre_1 ); + fd_fibre_swap( fibre_2 ); + fd_fibre_swap( fibre_3 ); + + fd_fibre_free( fibre_3 ); + fd_fibre_free( fibre_2 ); + fd_fibre_free( fibre_1 ); + + free( fibre_1_mem ); + free( fibre_2_mem ); + free( fibre_3_mem ); + + // now run test of wait and wait_until + + // needs a clock + fd_fibre_set_clock( my_clock ); + + // prepare some fibres + long t0_period = (long)1e9; + void * t0_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * t0 = fd_fibre_start( t0_mem, stack_sz, test1, &t0_period ); + + long t1_period = (long)3e9; + void * t1_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * t1 = fd_fibre_start( t1_mem, stack_sz, test1, &t1_period ); + + long t2_period = (long)5e9; + void * t2_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * t2 = fd_fibre_start( t2_mem, stack_sz, test1, &t2_period ); + + long t3_done_time = (long)60e9; + void * t3_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * t3 = fd_fibre_start( t3_mem, stack_sz, test2, &t3_done_time ); + + // add to schedule + fd_fibre_schedule( t0 ); + fd_fibre_schedule( t1 ); + fd_fibre_schedule( t2 ); + fd_fibre_schedule( t3 ); + + // run schedule until done + while( 1 ) { + long timeout = fd_fibre_schedule_run(); + if( timeout == -1 ) break; + + now = timeout; + } + + fd_fibre_free( t0 ); + fd_fibre_free( t1 ); + fd_fibre_free( t2 ); + fd_fibre_free( t3 ); + + free( t0_mem ); + free( t1_mem ); + free( t2_mem ); + free( t3_mem ); + + fd_fibre_free( main_fibre ); + free( main_fibre_mem ); + + return 0; +} + From 6e09c54af17576515e512483ba79561488039166 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Fri, 26 May 2023 13:28:23 -0500 Subject: [PATCH 10/23] added pipe to fibre for quic testing --- src/util/fibre/fd_fibre.c | 160 +++++++++++++++++++++++++++++++++++- src/util/fibre/fd_fibre.h | 69 ++++++++++++++++ src/util/fibre/test_fibre.c | 125 +++++++++++++++++++++++++++- 3 files changed, 349 insertions(+), 5 deletions(-) diff --git a/src/util/fibre/fd_fibre.c b/src/util/fibre/fd_fibre.c index 1ad9586b9a..be0270aab6 100644 --- a/src/util/fibre/fd_fibre.c +++ b/src/util/fibre/fd_fibre.c @@ -122,8 +122,8 @@ fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) { this frees up the resources of a fibre */ void fd_fibre_free( fd_fibre_t * fibre ) { - free( fibre->stack ); - free( fibre ); + /* nothing to do, as caller owns memory */ + (void)fibre; } /* switch execution to a fibre @@ -291,5 +291,161 @@ fd_fibre_schedule_run() { return -1; } +ulong +fd_fibre_pipe_align( void ) { + return alignof( fd_fibre_pipe_t ); +} + +ulong +fd_fibre_pipe_footprint( ulong entries ) { + return sizeof( fd_fibre_pipe_t ) + entries * sizeof( ulong ); +} + +fd_fibre_pipe_t * +fd_fibre_pipe_new( void * mem, ulong entries ) { + fd_fibre_pipe_t * pipe = (fd_fibre_pipe_t*)mem; + + ulong * entries_array = (ulong*)&pipe[1]; + + pipe->cap = entries; + pipe->head = 0UL; + pipe->tail = 0UL; + pipe->reader = NULL; + pipe->writer = NULL; + pipe->entries = entries_array; + + return pipe; +} + +int +fd_fibre_pipe_write( fd_fibre_pipe_t * pipe, ulong value, long timeout ) { + fd_fibre_t * prev_writer = pipe->writer; + + ulong used = 0; + ulong free = 0; + + long timeout_ts = fd_fibre_clock() + timeout; + + /* loop until either there is space for a new value to be + written, or until we time out */ + while(1) { + used = pipe->head - pipe->tail; + free = pipe->cap - used; + + /* if we have free space, break out of loop */ + if( free ) break; + + /* we have no free space within which to write, so wait */ + + /* update the writer to ourself */ + pipe->writer = fd_fibre_current; + + /* did we time out? */ + if( fd_fibre_clock() >= timeout_ts ) { + /* restore writer before returning */ + pipe->writer = prev_writer; + + /* return timeout */ + return 1; + } + + /* wait */ + + /* set current fibre as the writer */ + pipe->writer = fd_fibre_current; + + /* set wakeup time */ + fd_fibre_current->sched_time = timeout_ts; + fd_fibre_schedule( fd_fibre_current ); + + /* switch to the scheduler */ + fd_fibre_swap( fd_fibre_scheduler ); + } + + /* we have free space, so store the value */ + pipe->entries[pipe->head % pipe->cap] = value; + + /* increment the head */ + pipe->head++; + + /* wake up one waiting reader, if any */ + if( pipe->reader ) { + /* ensure we are scheduled */ + fd_fibre_current->sched_time = fd_fibre_clock();; + fd_fibre_schedule( fd_fibre_current ); + fd_fibre_swap( pipe->reader ); + } + + /* restore writer */ + pipe->writer = prev_writer; + + /* return successful write */ + return 0; +} + + +int +fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ) { + fd_fibre_t * prev_reader = pipe->reader; + + ulong used = 0; + + long timeout_ts = fd_fibre_clock() + timeout; + + /* loop until we have a value to be read, or until we time out */ + while(1) { + used = pipe->head - pipe->tail; + + /* is data available? */ + if( used ) break; + + /* no data available, so wait */ + + /* update the reader */ + pipe->reader = fd_fibre_current; + + /* did we time out? */ + if( fd_fibre_clock() >= timeout_ts ) { + /* restore the reader before returning */ + pipe->reader = prev_reader; + + /* return timeout */ + return 1; + } + + /* wait */ + + /* set current fibre as the reader */ + pipe->reader = fd_fibre_current; + + /* set wakeup time */ + fd_fibre_current->sched_time = timeout_ts; + fd_fibre_schedule( fd_fibre_current ); + + /* switch to the scheduler */ + fd_fibre_swap( fd_fibre_scheduler ); + } + + /* we have data to provide, so retrive it */ + *value = pipe->entries[pipe->tail % pipe->cap]; + + /* increment the tail */ + pipe->tail++; + + /* wake up one waiting writer, if any */ + if( pipe->writer ) { + /* ensure we are scheduled */ + fd_fibre_current->sched_time = fd_fibre_clock();; + fd_fibre_schedule( fd_fibre_current ); + + fd_fibre_swap( pipe->writer ); + } + + /* restore reader */ + pipe->reader = prev_reader; + + /* return success */ + return 0; +} diff --git a/src/util/fibre/fd_fibre.h b/src/util/fibre/fd_fibre.h index 72a210fa8f..9af7d5caef 100644 --- a/src/util/fibre/fd_fibre.h +++ b/src/util/fibre/fd_fibre.h @@ -26,6 +26,19 @@ struct fd_fibre { typedef struct fd_fibre fd_fibre_t; +struct fd_fibre_pipe { + ulong cap; /* capacity */ + ulong head; /* head index */ + ulong tail; /* tail index */ + + fd_fibre_t * writer; /* fibre that's currently waiting for a write, if any */ + fd_fibre_t * reader; /* fibre that's currently waiting for a read, if any */ + + ulong * entries; +}; +typedef struct fd_fibre_pipe fd_fibre_pipe_t; + + /* TODO make thread local */ extern fd_fibre_t * fd_fibre_current; @@ -145,6 +158,62 @@ fd_fibre_schedule( fd_fibre_t * fibre ); long fd_fibre_schedule_run( void ); + +/* fibre data structures */ + +/* pipe + + send data from one fibre to another + wakes receiving fibre on write */ + +/* pipe footprint and alignment */ + +ulong +fd_fibre_pipe_align( void ); + +ulong +fd_fibre_pipe_footprint( ulong entries ); + + +/* create a new pipe */ + +fd_fibre_pipe_t * +fd_fibre_pipe_new( void * mem, ulong entries ); + + +/* write a value into the pipe + + can block if there isn't any free space + timeout allows the blocking to terminate after a period of time + + pipe the pipe to write to + value the value to write + timeout the amount of time to wait for the write to complete + + returns 0 successful + 1 there was no space for the write operation */ + +int +fd_fibre_pipe_write( fd_fibre_pipe_t * pipe, ulong value, long timeout ); + + +/* read a value from the pipe + + read can block if there isn't any data in the pipe + + timeout allows the read to terminate without a result after + a period of time + + pipe the pipe to write to + value a pointer to the ulong to receive the value + timeout number of nanoseconds to wait for a value + + returns 0 successfully read a value from the pipe + 1 timed out without receiving data */ +int +fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ); + + FD_PROTOTYPES_END diff --git a/src/util/fibre/test_fibre.c b/src/util/fibre/test_fibre.c index ee71143da5..3da7b40ff0 100644 --- a/src/util/fibre/test_fibre.c +++ b/src/util/fibre/test_fibre.c @@ -72,6 +72,119 @@ test2( void * vp ) { done = 1; } +void +test_pipe_producer( void * vp ) { + /* arguments */ + fd_fibre_pipe_t * pipe = (fd_fibre_pipe_t*)vp; + + printf( "pipe test producer starting\n" ); fflush( stdout ); + + /* transmit at a rate of one message per millisecond + for one second */ + long run_period = (long)1e6; + long run_duration = (long)1e9; + long send_time = now + run_period; + long run_end = now + run_duration; + + ulong msg = 0; + while( now < run_end ) { + /* wait until sent time */ + fd_fibre_wait_until( send_time ); + + /* send msg to consumer */ + printf( "writing msg: %lu\n", msg ); fflush( stdout ); + int rtn = fd_fibre_pipe_write( pipe, msg, 0 ); + if( rtn ) { + printf( "write failed on msg: %lu\n", msg ); + exit(1); + } + + /* choose another send time */ + send_time += run_period; + + /* increment message */ + msg++; + } + + printf( "producer finished\n" ); +} + + +void test_pipe_consumer( void * vp ) { + /* arguments */ + fd_fibre_pipe_t * pipe = (fd_fibre_pipe_t*)vp; + + printf( "pipe test consumer starting\n" ); fflush( stdout ); + + long run_period = (long)1e6; + long run_duration = (long)1e9; + long run_end = now + run_duration; + + /* wait to receive from pipe, and report each message received */ + while( now < run_end ) { + /* receive message from producer + wait for up to the period */ + ulong msg = 0; + int rtn = fd_fibre_pipe_read( pipe, &msg, run_period ); + + if( rtn ) { + printf( "read failed\n" ); + exit(1); + } + + printf( "msg %lu received at %ld\n", msg, now ); + } + + printf( "consumer finished\n" ); +} + + +void +run_pipe_test( void ) { + printf( "pipe test starting\n" ); + + /* set now to zero for pretty output */ + now = 0; + + /* create a pipe for communicating between fibres */ + ulong stack_sz = 1<<20; + ulong pipe_entries = 16; + void * pipe_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + + fd_fibre_pipe_t * pipe = fd_fibre_pipe_new( pipe_mem, pipe_entries ); + + /* create a fibre each for producer and consumer */ + void * fibre_1_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + void * fibre_2_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + + fd_fibre_t * fibre_1 = fd_fibre_start( fibre_1_mem, stack_sz, test_pipe_producer, pipe ); + fd_fibre_t * fibre_2 = fd_fibre_start( fibre_2_mem, stack_sz, test_pipe_consumer, pipe ); + + /* schedule the fibres */ + fd_fibre_schedule( fibre_1 ); + fd_fibre_schedule( fibre_2 ); + + /* run schedule until done */ + while( 1 ) { + long timeout = fd_fibre_schedule_run(); + if( timeout == -1 ) { + /* -1 indicates no fibres scheduled */ + break; + } + + /* advance time to the next scheduled event */ + now = timeout; + } + + fd_fibre_free( fibre_2 ); + fd_fibre_free( fibre_1 ); + + free( fibre_1_mem ); + free( fibre_2_mem ); + + printf( "pipe test complete\n" ); +} + int main( int argc, char ** argv ) { @@ -79,11 +192,12 @@ main( int argc, char ** argv ) { (void)argv; // initialize fibres - void * main_fibre_mem = aligned_alloc( fd_fibre_init_align(), fd_fibre_init_footprint() ); - fd_fibre_t * main_fibre = fd_fibre_init( main_fibre_mem ); + void * main_fibre_mem = aligned_alloc( fd_fibre_init_align(), fd_fibre_init_footprint() ); + fd_fibre_t * main_fibre = fd_fibre_init( main_fibre_mem ); // create 3 fibres for functions fn1, fn2 and fn3 ulong stack_sz = 1<<20; + void * fibre_1_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); void * fibre_2_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); void * fibre_3_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); @@ -136,7 +250,10 @@ main( int argc, char ** argv ) { // run schedule until done while( 1 ) { long timeout = fd_fibre_schedule_run(); - if( timeout == -1 ) break; + if( timeout == -1 ) { + /* -1 indicates no fibres scheduled */ + break; + } now = timeout; } @@ -151,6 +268,8 @@ main( int argc, char ** argv ) { free( t2_mem ); free( t3_mem ); + run_pipe_test(); + fd_fibre_free( main_fibre ); free( main_fibre_mem ); From 7332d8fac8a41f07954ff62a3be17a466aab6dfa Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Fri, 2 Jun 2023 14:00:38 -0500 Subject: [PATCH 11/23] fix merge --- src/tango/quic/fd_quic_proto_structs.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/tango/quic/fd_quic_proto_structs.h b/src/tango/quic/fd_quic_proto_structs.h index 2a4295bae0..b2373a242f 100644 --- a/src/tango/quic/fd_quic_proto_structs.h +++ b/src/tango/quic/fd_quic_proto_structs.h @@ -7,9 +7,6 @@ #include "templ/fd_quic_defs.h" #include "templ/fd_quic_templ.h" #include "templ/fd_quic_frames_templ.h" -#include "templ/fd_quic_ipv4.h" -#include "templ/fd_quic_udp.h" -#include "templ/fd_quic_eth.h" #include "templ/fd_quic_undefs.h" #endif /* HEADER_fd_src_tango_quic_fd_quic_proto_structs_h */ From 24156a78bcc599e660b202f0f888e28bf5895e99 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Fri, 2 Jun 2023 14:07:32 -0500 Subject: [PATCH 12/23] fix merge --- src/tango/quic/fd_quic_proto.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/tango/quic/fd_quic_proto.h b/src/tango/quic/fd_quic_proto.h index 9cd628897b..333f5a7c27 100644 --- a/src/tango/quic/fd_quic_proto.h +++ b/src/tango/quic/fd_quic_proto.h @@ -6,11 +6,6 @@ #include "fd_quic_common.h" #include "fd_quic_types.h" -#include "templ/fd_quic_defs.h" -#include "templ/fd_quic_templ.h" -#include "templ/fd_quic_frames_templ.h" -#include "templ/fd_quic_undefs.h" - #include "templ/fd_quic_parsers_decl.h" #include "templ/fd_quic_templ.h" #include "templ/fd_quic_frames_templ.h" From fceb5e88bb2311a07b914d746474a573be793ead Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Fri, 2 Jun 2023 17:24:30 -0500 Subject: [PATCH 13/23] quic - more tests --- src/tango/quic/tests/test_quic_drops.c | 54 ++++++ src/util/fibre/test_fibre.c | 219 +++++++++++++++++++++++++ 2 files changed, 273 insertions(+) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index db85aea46e..07671598a2 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -30,9 +30,63 @@ rng_t rnd() { return (rng_t)l * (rng_t)0x1p-64; } +/* fibres for client and server */ + fd_fibre_t * client_fibre = NULL; fd_fibre_t * server_fibre = NULL; +/* "net" fibre for dropping and pcapping */ + +fd_fibre_t * net_fibre = NULL; + +struct net_fibre_args { + fd_fibre_pipe_t * input; + fd_fibre_pipe_t * release; + float thresh; +}; +typedef struct net_fibre_args net_fibre_args_t; + +void +net_fibre_main( void * vp_args ) { + /* get args */ + net_fibre_args_t * args = (net_fibre_args_t*)vp_args; + + /* input pipe, and destination aios */ + fd_fibre_pipe_t * input = args->input; + fd_fibre_pipe_t * release = args->release; + + float thresh = args->thresh; + + int rtn = 0; + ulong idx = 0; + + while( !(client_done && server_done) ) { + /* wait for data on pipe */ + rtn = fd_fibre_pipe_read( input, &idx, (long)60e9 ); + if( !rtn ) { + printf( "net_fibre_main: no data in 60s\n" ); + exit(1); + } + + /* we have a message */ + + /* drop? */ + if( rnd() < thresh ) { + /* free slot */ + rtn = fd_fibre_pipe_write( release, idx, (long)1e6 ); + if( rtn ) { + printf( "net_fibre_main: timeout trying to return idx\n" ); + exit(1); + } + + continue; + } + + /* could insert into a reorder buffer here */ + } +} + + /* man-in-the-middle for testing drops */ struct mitm_ctx { diff --git a/src/util/fibre/test_fibre.c b/src/util/fibre/test_fibre.c index 3da7b40ff0..3cf2650344 100644 --- a/src/util/fibre/test_fibre.c +++ b/src/util/fibre/test_fibre.c @@ -186,6 +186,223 @@ run_pipe_test( void ) { } +struct pipe_producer_args { + fd_fibre_pipe_t * output; + long expire; + long period; +}; +typedef struct pipe_producer_args pipe_producer_args_t; + + +void +pipe_producer_main( void * vp_args ) { + /* obtain args */ + pipe_producer_args_t * args = (pipe_producer_args_t*)vp_args; + + /* send periodically - every 1ms (synthetic clock) */ + fd_fibre_pipe_t * output = args->output; + long expire = args->expire; + long period = args->period; + + /* first send time */ + long send_time = now + period; + + /* producer runs until time limit exceeded */ + long expire_time = now + expire; + + /* msg is just a counter */ + ulong msg = 1; + + /* for return values */ + int rtn; + + while( now < expire_time ) { + /* wait until next "send" */ + fd_fibre_wait_until( send_time ); + + /* set timeout to be the same as period */ + long timeout = period; + + /* log write call */ + printf( "pipe_producer_main: writing %lu\n", msg ); fflush( stdout ); + + /* try sending */ + rtn = fd_fibre_pipe_write( output, msg, timeout ); + + if( rtn ) { + printf( "fd_fibre_pipe_write failed\n" ); + exit(1); + } + + /* update send time for next iteration */ + send_time += period; + + /* increment message */ + msg++; + } + + printf( "pipe_producer_main: finished\n" ); + +} + + +struct pipe_filter_args { + fd_fibre_pipe_t * input; + fd_fibre_pipe_t * out1; + fd_fibre_pipe_t * out2; + long period; +}; +typedef struct pipe_filter_args pipe_filter_args_t; + + +void +pipe_filter_main( void * vp_args ) { + pipe_filter_args_t * args = (pipe_filter_args_t*)vp_args; + + /* receive messages on one pipe, distribute them to two pipes + alternately */ + + fd_fibre_pipe_t * input = args->input; + fd_fibre_pipe_t * out1 = args->out1; + fd_fibre_pipe_t * out2 = args->out2; + long period = args->period; + long timeout = period; + + /* loop until read fails */ + while(1) { + ulong msg = 0; + int rtn = fd_fibre_pipe_read( input, &msg, timeout ); + if( rtn ) break; + + /* we have a message - choose the out pipe(s) */ + if( msg % 2 == 0 ) { + rtn = fd_fibre_pipe_write( out1, msg, timeout ); + if( rtn ) { + printf( "pipe_filter_main: write failed\n" ); + exit(1); + } + } + + if( msg % 3 == 0 ) { + rtn = fd_fibre_pipe_write( out2, msg, timeout ); + if( rtn ) { + printf( "pipe_filter_main: write failed\n" ); + exit(1); + } + } + } + + printf( "pipe_filter_main complete\n" ); +} + + +struct pipe_consumer_args { + char const * name; + fd_fibre_pipe_t * input; + long expire; +}; +typedef struct pipe_consumer_args pipe_consumer_args_t; + + +void +pipe_consumer_main( void * vp_args ) { + /* obtain args */ + + pipe_consumer_args_t *args = (pipe_consumer_args_t*)vp_args; + + fd_fibre_pipe_t * input = args->input; + long expire = args->expire; + + long expire_time = now + expire; + + /* loop until read fails */ + while( expire_time > now ) { + ulong msg = 0; + int rtn = fd_fibre_pipe_read( input, &msg, expire_time - now ); + if( rtn ) break; + + /* we have a message - output it */ + printf( "pipe_consumer_main: %s received msg %lu\n", args->name, msg ); + } + + printf( "pipe_consumer_main: finished\n" ); +} + + +void +run_test_pipe_filter( void ) { + ulong pipe_entries = 16; + ulong stack_sz = 1<<20; + + /* create three pipes */ + void * pipe_1_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_pipe_t * pipe_1 = fd_fibre_pipe_new( pipe_1_mem, pipe_entries ); + + void * pipe_2_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_pipe_t * pipe_2 = fd_fibre_pipe_new( pipe_2_mem, pipe_entries ); + + void * pipe_3_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_pipe_t * pipe_3 = fd_fibre_pipe_new( pipe_3_mem, pipe_entries ); + + /* period set to 1 ms */ + long period = (long)1e6; + long expire = (long)2e7; + + /* start 1 producer, 1 filter and 2 consumer fibres */ + pipe_producer_args_t producer_args = { .output = pipe_1, .expire = expire, period }; + pipe_filter_args_t filter_args = { .input = pipe_1, .out1 = pipe_2, .out2 = pipe_3, .period = period }; + pipe_consumer_args_t consumer_main_1_args = { .name = "main_1", .input = pipe_2, .expire = expire }; + pipe_consumer_args_t consumer_main_2_args = { .name = "main_2", .input = pipe_3, .expire = expire }; + + /* create fibre for pipe_producer_main */ + void * producer_fibre_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * producer_fibre = fd_fibre_start( producer_fibre_mem, stack_sz, pipe_producer_main, &producer_args ); + + /* create fibre for pipe_filter_main */ + void * filter_fibre_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * filter_fibre = fd_fibre_start( filter_fibre_mem, stack_sz, pipe_filter_main, &filter_args ); + + /* create fibre for pipe_consumer_1_main */ + void * consumer_1_fibre_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * consumer_1_fibre = fd_fibre_start( consumer_1_fibre_mem, stack_sz, pipe_consumer_main, &consumer_main_1_args ); + + /* create fibre for pipe_consumer_2_main */ + void * consumer_2_fibre_mem = aligned_alloc( fd_fibre_start_align(), fd_fibre_start_footprint( stack_sz ) ); + fd_fibre_t * consumer_2_fibre = fd_fibre_start( consumer_2_fibre_mem, stack_sz, pipe_consumer_main, &consumer_main_2_args ); + + /* add to schedule */ + fd_fibre_schedule( producer_fibre ); + fd_fibre_schedule( filter_fibre ); + fd_fibre_schedule( consumer_1_fibre ); + fd_fibre_schedule( consumer_2_fibre ); + + + /* run schedule until done */ + while( 1 ) { + long timeout = fd_fibre_schedule_run(); + if( timeout == -1 ) { + /* -1 indicates no fibres scheduled */ + break; + } + + /* advance time to next event */ + now = timeout; + } + + /* free fibres */ + fd_fibre_free( producer_fibre ); + fd_fibre_free( filter_fibre ); + fd_fibre_free( consumer_1_fibre ); + fd_fibre_free( consumer_2_fibre ); + + /* free fibre mem */ + free( producer_fibre_mem ); + free( filter_fibre_mem ); + free( consumer_1_fibre_mem ); + free( consumer_2_fibre_mem ); +} + + int main( int argc, char ** argv ) { (void)argc; @@ -270,6 +487,8 @@ main( int argc, char ** argv ) { run_pipe_test(); + run_test_pipe_filter(); + fd_fibre_free( main_fibre ); free( main_fibre_mem ); From 743c7aa69008be36c341cc90a39be19cd1ff53a9 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 27 Jun 2023 13:31:10 -0500 Subject: [PATCH 14/23] Fix merge --- src/tango/quic/tests/test_quic_drops.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index 07671598a2..a0a20d8b8b 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -438,8 +438,7 @@ main( int argc, char ** argv ) { .handshake_cnt = 10, .stream_cnt = { 0, 0, 10, 0 }, .inflight_pkt_cnt = 1024, - .tx_buf_sz = 1<<14, - .rx_buf_sz = 1<<14 + .tx_buf_sz = 1<<14 }; ulong quic_footprint = fd_quic_footprint( &quic_limits ); From 22b24d993e0ccfee26022479856165100909545c Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 27 Jun 2023 13:32:48 -0500 Subject: [PATCH 15/23] Remove printf from test output --- src/tango/quic/tests/test_quic_drops.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index a0a20d8b8b..a79c3eb58f 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -120,10 +120,8 @@ mitm_tx( void * ctx, if( rnd() < mitm_ctx->thresh ) { /* dropping behaves as-if the send was successful */ - printf( "dropped! server=%d\n", mitm_ctx->server ); return FD_AIO_SUCCESS; } else { - printf( "passed thru! server=%d\n", mitm_ctx->server ); return fd_aio_send( mitm_ctx->dst, batch, batch_cnt, opt_batch_idx, flush ); } } From d88fc99736b5e6325c128ebbfa082ec277c00dfc Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 1 Aug 2023 13:35:32 -0500 Subject: [PATCH 16/23] formatting --- src/tango/quic/tests/test_quic_drops.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index a79c3eb58f..74d29a4f38 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -290,7 +290,7 @@ client_fibre_fn( void * vp_arg ) { if( !conn ) { rcvd = sent = 0; - conn = fd_quic_connect( quic, + conn = fd_quic_connect( quic, server_quic->config.net.ip_addr, server_quic->config.net.listen_udp_port, server_quic->config.sni ); @@ -549,4 +549,3 @@ main( int argc, char ** argv ) { fd_halt(); return 0; } - From d01e7875ba2ee137aa0dcf699b79d74f01521576 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 1 Aug 2023 13:36:40 -0500 Subject: [PATCH 17/23] formatting --- src/util/fibre/fd_fibre.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/util/fibre/fd_fibre.c b/src/util/fibre/fd_fibre.c index be0270aab6..3e3aeb53c6 100644 --- a/src/util/fibre/fd_fibre.c +++ b/src/util/fibre/fd_fibre.c @@ -87,7 +87,7 @@ fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) { ulong l_mem = (ulong)mem; - void * stack = (void*)( l_mem + + void * stack = (void*)( l_mem + fd_ulong_align_up( sizeof( fd_fibre_t ), FD_FIBRE_ALIGN ) ); fd_fibre_t * fibre = (fd_fibre_t*)mem; @@ -396,7 +396,7 @@ fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ) { /* loop until we have a value to be read, or until we time out */ while(1) { used = pipe->head - pipe->tail; - + /* is data available? */ if( used ) break; @@ -448,4 +448,3 @@ fd_fibre_pipe_read( fd_fibre_pipe_t * pipe, ulong *value, long timeout ) { /* return success */ return 0; } - From 5c013b7eee9956376991b9437fc615eaf526d869 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Wed, 2 Aug 2023 10:13:34 -0500 Subject: [PATCH 18/23] formatting --- src/util/fibre/fd_fibre.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/util/fibre/fd_fibre.h b/src/util/fibre/fd_fibre.h index 9af7d5caef..f7d65f509c 100644 --- a/src/util/fibre/fd_fibre.h +++ b/src/util/fibre/fd_fibre.h @@ -207,7 +207,7 @@ fd_fibre_pipe_write( fd_fibre_pipe_t * pipe, ulong value, long timeout ); pipe the pipe to write to value a pointer to the ulong to receive the value timeout number of nanoseconds to wait for a value - + returns 0 successfully read a value from the pipe 1 timed out without receiving data */ int @@ -218,4 +218,3 @@ FD_PROTOTYPES_END #endif /* HEADER_fd_src_util_fibre_fd_fibre_h */ - From 3d9bc3fdedb3a17918ec3d9bc0c2d4e469622b1e Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Thu, 3 Aug 2023 14:09:11 -0500 Subject: [PATCH 19/23] Missing code, merge issues --- src/tango/quic/fd_quic.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index 935b70d378..a3b3a71d83 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -2327,8 +2327,6 @@ fd_quic_ack_pkt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_t * pkt ) ack_time = now + quic->config.service_interval; /* randomize */ } - //DEBUG( return; ) - /* algo: if there exists a last unsent ack, and the last ack refers to the prior packet simply extend it @@ -3386,10 +3384,17 @@ fd_quic_pkt_hdr_populate( fd_quic_pkt_hdr_t * pkt_hdr, pkt_hdr->quic_pkt.handshake.version = conn->version; /* destination */ - fd_memcpy( pkt_hdr->quic_pkt.handshake.dst_conn_id, - peer_conn_id->conn_id, - peer_conn_id->sz ); - pkt_hdr->quic_pkt.handshake.dst_conn_id_len = peer_conn_id->sz; + if( initial ) { + fd_memcpy( pkt_hdr->quic_pkt.initial.dst_conn_id, + conn->orig_dst_conn_id.conn_id, + conn->orig_dst_conn_id.sz ); + pkt_hdr->quic_pkt.initial.dst_conn_id_len = conn->orig_dst_conn_id.sz; + } else { + fd_memcpy( pkt_hdr->quic_pkt.initial.dst_conn_id, + peer_conn_id->conn_id, + peer_conn_id->sz ); + pkt_hdr->quic_pkt.initial.dst_conn_id_len = peer_conn_id->sz; + } /* source */ fd_memcpy( pkt_hdr->quic_pkt.handshake.src_conn_id, @@ -6218,7 +6223,7 @@ fd_quic_frame_handle_new_conn_id_frame( context.pkt->ack_flag |= ACK_FLAG_RQD; - DEBUG( FD_LOG_DEBUG(( "new_conn_id requested" )); ) + FD_DEBUG( FD_LOG_DEBUG(( "new_conn_id requested" )); ) return 0; } From 1835dbb07c7b281ee084d2b527e158e593078772 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Thu, 3 Aug 2023 14:09:45 -0500 Subject: [PATCH 20/23] Fix test_quic_retry_unit.c --- src/tango/quic/tests/test_quic_retry_unit.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/tango/quic/tests/test_quic_retry_unit.c b/src/tango/quic/tests/test_quic_retry_unit.c index 82b485f3a7..0587eef6a3 100644 --- a/src/tango/quic/tests/test_quic_retry_unit.c +++ b/src/tango/quic/tests/test_quic_retry_unit.c @@ -4,11 +4,6 @@ #include "../fd_quic_common.h" #include "../fd_quic_types.h" -#include "../templ/fd_quic_defs.h" -#include "../templ/fd_quic_frames_templ.h" -#include "../templ/fd_quic_templ.h" -#include "../templ/fd_quic_undefs.h" - #include "../templ/fd_quic_encoders_decl.h" #include "../templ/fd_quic_frames_templ.h" #include "../templ/fd_quic_templ.h" From 1f051d4ed14166ac530909fbf69cdcc8101a1b8f Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Thu, 3 Aug 2023 14:49:16 -0500 Subject: [PATCH 21/23] compilation issues under clang --- src/tango/quic/tests/test_quic_drops.c | 2 +- src/util/fibre/fd_fibre.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index 74d29a4f38..596ed8c847 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -23,7 +23,7 @@ ulong tot_rcvd = 0; typedef float rng_t; -rng_t rnd() { +rng_t rnd( void ) { static uint seed = 0; ulong l = fd_rng_private_expand( seed++ ); diff --git a/src/util/fibre/fd_fibre.c b/src/util/fibre/fd_fibre.c index 3e3aeb53c6..ac111bd082 100644 --- a/src/util/fibre/fd_fibre.c +++ b/src/util/fibre/fd_fibre.c @@ -112,7 +112,7 @@ fd_fibre_start( void * mem, ulong stack_sz, fd_fibre_fn_t fn, void * arg ) { fibre->ctx.uc_stack.ss_size = stack_sz; /* make a new context */ - makecontext( &fibre->ctx, (void(*)())fd_fibre_run_fn, 1, fibre ); + makecontext( &fibre->ctx, (void(*)(void))fd_fibre_run_fn, 1, fibre ); return fibre; } @@ -265,7 +265,7 @@ fd_fibre_schedule( fd_fibre_t * fibre ) { returns the time of the next ready fibre returns -1 if there are no fibres in the schedule */ long -fd_fibre_schedule_run() { +fd_fibre_schedule_run( void ) { /* set the currently running fibre as the scheduler */ fd_fibre_scheduler = fd_fibre_current; From cdf280ee5f85c843bb850f54aa3a2c4831ad9999 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Wed, 9 Aug 2023 16:30:35 -0500 Subject: [PATCH 22/23] Fix regression in test_quic_drops --- src/tango/quic/tests/test_quic_drops.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index 596ed8c847..f1e2fb0556 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -462,6 +462,8 @@ main( int argc, char ** argv ) { client_quic->cb.now = test_clock; client_quic->cb.now_ctx = NULL; + client_quic->config.initial_rx_max_stream_data = 1<<15; + fd_quic_config_t * server_config = &server_quic->config; server_config->idle_timeout = 5e9; @@ -474,6 +476,8 @@ main( int argc, char ** argv ) { server_quic->cb.now = test_clock; server_quic->cb.now_ctx = NULL; + server_quic->config.initial_rx_max_stream_data = 1<<15; + /* pcap */ FILE * pcap_file = fopen( "test_quic_drops.pcapng", "wb" ); FD_TEST( pcap_file ); From e8bc56f063e26f186ab3a7409ad1ef327d14f6a6 Mon Sep 17 00:00:00 2001 From: Nick Bridge Date: Tue, 8 Aug 2023 12:14:14 -0500 Subject: [PATCH 23/23] make fd_quic_handle_v1_one_rtt return tot_sz instead of 0 --- src/tango/quic/fd_quic.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index a3b3a71d83..4a50d0a446 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -1982,7 +1982,7 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_ ulong rc = fd_quic_decode_one_rtt( one_rtt, cur_ptr, cur_sz ); if( rc == FD_QUIC_PARSE_FAIL ) { FD_DEBUG( FD_LOG_DEBUG(( "fd_quic_decode_one_rtt failed" )) ); - return 0; + return FD_QUIC_PARSE_FAIL; } /* generate one_rtt secrets, keys etc */ @@ -2193,7 +2193,7 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_ uint pn_space = fd_quic_enc_level_to_pn_space( enc_level ); conn->exp_pkt_number[pn_space] = pkt_number + 1u; - return 0; + return tot_sz; } @@ -4782,11 +4782,6 @@ fd_quic_connect( fd_quic_t * quic, goto fail_tls_hs; } - FD_DEBUG( - fd_quic_tls_hs_data_t const * hs_data = fd_quic_tls_get_hs_data( tls_hs, 0 ); - printf( "hs_data @ enc_level 0: %p\n", (void*)hs_data ); - ); - conn->tls_hs = tls_hs; fd_quic_crypto_suite_t *suite =