Skip to content

Commit

Permalink
fdctl: separate workspace for transaction pipeline
Browse files Browse the repository at this point in the history
A shared workspace is added for the QUIC, verify, dedup, and pack tiles
so that they can share dcache entries and not copy transactions back and
forth.
  • Loading branch information
mmcgee-jump committed Aug 21, 2023
1 parent 8abb52f commit 7529db1
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 67 deletions.
55 changes: 35 additions & 20 deletions src/app/fdctl/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ find_wksp( config_t * const config,

/* partial frank_bank definition since the tile doesn't really exist */
static fd_frank_task_t frank_bank = {
.in_wksp = "pack_bank",
.out_wksp = "bank_shred",
.in_wksp = "pack_bank",
.out_wksp = "bank_shred",
.extra_wksp = NULL,
};

ulong
Expand All @@ -36,21 +37,29 @@ memlock_max_bytes( config_t * const config ) {
for( ulong j=0; j<config->shmem.workspaces_cnt; j++ ) {
workspace_config_t * wksp = &config->shmem.workspaces[ j ];

#define TILE_MAX( tile ) do { \
ulong in_bytes = 0, out_bytes = 0; \
if( FD_LIKELY( tile.in_wksp ) ) { \
workspace_config_t * in_wksp = find_wksp( config, tile.in_wksp ); \
in_bytes = in_wksp->num_pages * in_wksp->page_size; \
} \
if( FD_LIKELY( tile.out_wksp ) ) { \
workspace_config_t * out_wksp = find_wksp( config, tile.out_wksp ); \
out_bytes = out_wksp->num_pages * out_wksp->page_size; \
} \
memlock_max_bytes = fd_ulong_max( memlock_max_bytes, \
wksp->page_size * wksp->num_pages + in_bytes + out_bytes ); \
#define TILE_MAX( tile ) do { \
ulong in_bytes = 0, out_bytes = 0, extra_bytes = 0; \
if( FD_LIKELY( tile.in_wksp ) ) { \
workspace_config_t * in_wksp = find_wksp( config, tile.in_wksp ); \
in_bytes = in_wksp->num_pages * in_wksp->page_size; \
} \
if( FD_LIKELY( tile.out_wksp ) ) { \
workspace_config_t * out_wksp = find_wksp( config, tile.out_wksp ); \
out_bytes = out_wksp->num_pages * out_wksp->page_size; \
} \
if( FD_LIKELY( tile.extra_wksp ) ) { \
workspace_config_t * extra_wksp = find_wksp( config, tile.extra_wksp ); \
extra_bytes = extra_wksp->num_pages * extra_wksp->page_size; \
} \
memlock_max_bytes = fd_ulong_max( memlock_max_bytes, \
wksp->page_size * wksp->num_pages + \
in_bytes + \
out_bytes + \
extra_bytes ); \
} while(0)

switch ( wksp->kind ) {
case wksp_tpu_txn_data:
case wksp_quic_verify:
case wksp_verify_dedup:
case wksp_dedup_pack:
Expand Down Expand Up @@ -461,21 +470,27 @@ static void
init_workspaces( config_t * config ) {
ulong idx = 0;

config->shmem.workspaces[ idx ].kind = wksp_quic_verify;
config->shmem.workspaces[ idx ].name = "quic_verify";
config->shmem.workspaces[ idx ].kind = wksp_tpu_txn_data;
config->shmem.workspaces[ idx ].name = "tpu_txn_data";
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_GIGANTIC_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 1;
idx++;

config->shmem.workspaces[ idx ].kind = wksp_quic_verify;
config->shmem.workspaces[ idx ].name = "quic_verify";
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_HUGE_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 2;
idx++;

config->shmem.workspaces[ idx ].kind = wksp_verify_dedup;
config->shmem.workspaces[ idx ].name = "verify_dedup";
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_GIGANTIC_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 1;
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_HUGE_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 2;
idx++;

config->shmem.workspaces[ idx ].kind = wksp_dedup_pack;
config->shmem.workspaces[ idx ].name = "dedup_pack";
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_GIGANTIC_PAGE_SZ;
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_HUGE_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 1;
idx++;

Expand Down Expand Up @@ -517,7 +532,7 @@ init_workspaces( config_t * config ) {

config->shmem.workspaces[ idx ].kind = wksp_pack;
config->shmem.workspaces[ idx ].name = "pack";
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_HUGE_PAGE_SZ;
config->shmem.workspaces[ idx ].page_size = FD_SHMEM_GIGANTIC_PAGE_SZ;
config->shmem.workspaces[ idx ].num_pages = 1;
idx++;

Expand Down
1 change: 1 addition & 0 deletions src/app/fdctl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

typedef struct {
enum {
wksp_tpu_txn_data,
wksp_quic_verify,
wksp_verify_dedup,
wksp_dedup_pack,
Expand Down
6 changes: 3 additions & 3 deletions src/app/fdctl/config/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ dynamic_port_range = "8000-10000"
# If nonzero, enable JSON RPC on this port, and use the next port for the
# RPC websocket. If zero, disable JSON RPC. This option is passed to the
# Solana Labs client with the `--rpc-port` argument.
port = 0
port = 8899

# If true, all RPC operations are enabled on this validator, including
# non-default RPC methods for querying chain state and transaction history.
# This option is passed to the Solana Labs client with the `--full-rpc-api`
# argument.
full_api = false
full_api = true

# If the RPC is private, the valdiator's open RPC port is not published in
# the `solana gossip` command for use by others. This option is passed to
Expand All @@ -129,7 +129,7 @@ dynamic_port_range = "8000-10000"
# `getConfirmedBlock` API. This will cause an increase in disk usage and
# IOPS. This option is passed to the Solana Labs client with the
# `--enable-rpc-transaction-history` argument.
transaction_history = false
transaction_history = true

# If enabled, include CPI inner instructions, logs, and return data in the
# historical transaction info stored. This option is passed to the Solana
Expand Down
9 changes: 7 additions & 2 deletions src/app/fdctl/configure/workspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,18 +216,21 @@ init( config_t * const config ) {
WKSP_BEGIN( config, wksp1, 0 );

switch( wksp1->kind ) {
case wksp_tpu_txn_data:
for( ulong i=0; i<config->layout.verify_tile_count; i++ ) {
dcache( pod, "dcache%lu", config->tiles.verify.mtu, config->tiles.verify.receive_buffer_size, config->tiles.verify.receive_buffer_size * 32, i );
}
break;
case wksp_quic_verify:
for( ulong i=0; i<config->layout.verify_tile_count; i++ ) {
mcache( pod, "mcache%lu", config->tiles.verify.receive_buffer_size, i );
dcache( pod, "dcache%lu", config->tiles.verify.mtu, config->tiles.verify.receive_buffer_size, config->tiles.verify.receive_buffer_size * 32, i );
fseq ( pod, "fseq%lu", i );
}
break;
case wksp_verify_dedup:
ulong1( pod, "cnt", config->layout.verify_tile_count );
for( ulong i=0; i<config->layout.verify_tile_count; i++ ) {
mcache( pod, "mcache%lu", config->tiles.verify.receive_buffer_size, i );
dcache( pod, "dcache%lu", config->tiles.verify.mtu, config->tiles.verify.receive_buffer_size, 0, i );
fseq ( pod, "fseq%lu", i );
}
break;
Expand All @@ -241,6 +244,8 @@ init( config_t * const config ) {
mcache( pod, "mcache%lu", config->tiles.bank.receive_buffer_size, i );
dcache( pod, "dcache%lu", USHORT_MAX, config->layout.bank_tile_count * (ulong)config->tiles.bank.receive_buffer_size, 0, i );
fseq ( pod, "fseq%lu", i );
mcache( pod, "mcache-back%lu", config->tiles.bank.receive_buffer_size, i );
fseq ( pod, "fseq-back%lu", i );
}
break;
case wksp_bank_shred:
Expand Down
13 changes: 12 additions & 1 deletion src/app/fdctl/monitor/monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ run_monitor( config_t * const config,
config->layout.verify_tile_count + // quic <-> verify
config->layout.verify_tile_count + // verify <-> dedup
1 + // dedup <-> pack
config->layout.bank_tile_count; // pack <-> bank
config->layout.bank_tile_count + // pack <-> bank
config->layout.bank_tile_count; // bank <-> pack

tile_t * tiles = fd_alloca( alignof(tile_t *), sizeof(tile_t)*tile_cnt );
link_t * links = fd_alloca( alignof(link_t *), sizeof(link_t)*link_cnt );
Expand All @@ -220,6 +221,8 @@ run_monitor( config_t * const config,

char buf[ 64 ];
switch( wksp->kind ) {
case wksp_tpu_txn_data:
break;
case wksp_quic_verify:
for( ulong i=0; i<config->layout.verify_tile_count; i++ ) {
links[ link_idx ].src_name = "quic";
Expand Down Expand Up @@ -260,6 +263,14 @@ run_monitor( config_t * const config,
links[ link_idx ].fseq = fd_fseq_join( fd_wksp_pod_map( pods[ j ], snprintf1( buf, 64, "fseq%lu", i ) ) );
if( FD_UNLIKELY( !links[ link_idx ].fseq ) ) FD_LOG_ERR(( "fd_fseq_join failed" ));
link_idx++;

links[ link_idx ].src_name = "bank";
links[ link_idx ].dst_name = "pack";
links[ link_idx ].mcache = fd_mcache_join( fd_wksp_pod_map( pods[ j ], snprintf1( buf, 64, "mcache-back%lu", i ) ) );
if( FD_UNLIKELY( !links[ link_idx ].mcache ) ) FD_LOG_ERR(( "fd_mcache_join failed" ));
links[ link_idx ].fseq = fd_fseq_join( fd_wksp_pod_map( pods[ j ], snprintf1( buf, 64, "fseq-back%lu", i ) ) );
if( FD_UNLIKELY( !links[ link_idx ].fseq ) ) FD_LOG_ERR(( "fd_fseq_join failed" ));
link_idx++;
}
break;
case wksp_bank_shred:
Expand Down
3 changes: 3 additions & 0 deletions src/app/fdctl/run.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ tile_main( void * _args ) {
.tile_name = args->tile->name,
.in_pod = NULL,
.out_pod = NULL,
.extra_pod = NULL,
.tick_per_ns = args->tick_per_ns,
};

Expand All @@ -119,6 +120,8 @@ tile_main( void * _args ) {
frank_args.in_pod = workspace_pod_join( args->app_name, args->tile->in_wksp, 0 );
if( FD_LIKELY( args->tile->out_wksp ) )
frank_args.out_pod = workspace_pod_join( args->app_name, args->tile->out_wksp, 0 );
if( FD_LIKELY( args->tile->extra_wksp ) )
frank_args.extra_pod = workspace_pod_join( args->app_name, args->tile->extra_wksp, 0 );

if( FD_UNLIKELY( args->tile->init ) ) args->tile->init( &frank_args );

Expand Down
3 changes: 2 additions & 1 deletion src/app/fddev/configure/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ init( config_t * const config ) {
ADD( "--bootstrap-validator", config->consensus.identity_path );
ADD1( vote );
ADD1( stake );
ADD( "--bootstrap-stake-authorized-pubkey", config->consensus.identity_path );

ADD( "--ledger", config->ledger.path );
ADD( "--faucet-pubkey", faucet );
Expand Down Expand Up @@ -153,7 +154,7 @@ check( config_t * const config ) {

configure_stage_t cluster = {
.name = NAME,
.always_recreate = 0,
.always_recreate = 1,
.enabled = enabled,
.init_perm = NULL,
.fini_perm = NULL,
Expand Down
9 changes: 9 additions & 0 deletions src/app/fddev/dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ dev_cmd_fn( args_t * args,
validator will get stuck forever. */
config->consensus.wait_for_vote_to_start_leader = 0;

config->consensus.genesis_fetch = 0;
config->consensus.snapshot_fetch = 0;

if( FD_LIKELY( !strcmp( config->consensus.vote_account_path, "" ) ) )
snprintf1( config->consensus.vote_account_path,
sizeof( config->consensus.vote_account_path ),
"%s/vote-account.json",
config->scratch_directory );

if( FD_UNLIKELY( config->development.netns.enabled ) ) {
/* if we entered a network namespace during configuration, leave it
so that `run_firedancer` starts from a clean namespace */
Expand Down
2 changes: 2 additions & 0 deletions src/app/frank/fd_frank.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ typedef struct {
uchar const * tile_pod;
uchar const * in_pod;
uchar const * out_pod;
uchar const * extra_pod;
fd_xsk_t * xsk;
double tick_per_ns;
} fd_frank_args_t;
Expand All @@ -43,6 +44,7 @@ typedef struct {
char * name;
char * in_wksp;
char * out_wksp;
char * extra_wksp;
ushort allow_syscalls_sz;
long * allow_syscalls;
ulong (*allow_fds)( fd_frank_args_t * args, ulong out_fds_sz, int * out_fds );
Expand Down
1 change: 1 addition & 0 deletions src/app/frank/fd_frank_dedup.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ fd_frank_task_t frank_dedup = {
.name = "dedup",
.in_wksp = "verify_dedup",
.out_wksp = "dedup_pack",
.extra_wksp = NULL,
.allow_syscalls_sz = sizeof(allow_syscalls)/sizeof(allow_syscalls[ 0 ]),
.allow_syscalls = allow_syscalls,
.allow_fds = allow_fds,
Expand Down
20 changes: 10 additions & 10 deletions src/app/frank/fd_frank_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ run( fd_frank_args_t * args ) {

FD_LOG_INFO(( "joining dcache%lu", args->tile_idx ));
/* Note (chunks are referenced relative to the containing workspace
currently and there is just one workspace). (FIXME: VALIDATE
COMMON WORKSPACE FOR THESE) */
fd_wksp_t * wksp = fd_wksp_containing( mcache );
currently and there is just one workspace). */
uchar * dcache = fd_dcache_join( fd_wksp_pod_map( args->extra_pod, "dcache0" ) );
fd_wksp_t * wksp = fd_wksp_containing( dcache );
if( FD_UNLIKELY( !wksp ) ) FD_LOG_ERR(( "fd_wksp_containing failed" ));

FD_LOG_INFO(( "joining fseq" ));
Expand Down Expand Up @@ -140,12 +140,12 @@ run( fd_frank_args_t * args ) {
out_state out[ FD_FRANK_PACK_MAX_OUT ];

/* FIXME: Plumb this through properly: */
ulong bank_cnt = fd_pod_cnt( args->out_pod ) / 3UL - 1UL; /* Skip bank 0 */
ulong bank_cnt = fd_pod_query_ulong( args->out_pod, "num_tiles", 0UL );
if( FD_UNLIKELY( !bank_cnt ) ) FD_LOG_ERR(( "pack.num_tiles unset or set to zero" ));
if( FD_UNLIKELY( bank_cnt>FD_FRANK_PACK_MAX_OUT ) ) FD_LOG_ERR(( "pack tile connects to too many banking tiles" ));

/* Skip bank 0 */
for( ulong i=0UL; i<bank_cnt; i++ ) join_out( out+i, args->out_pod, i+1UL );

for( ulong i=0UL; i<bank_cnt; i++ ) join_out( out+i, args->out_pod, i );
fd_wksp_t * out_wksp = fd_wksp_containing( args->out_pod );

ulong max_txn_per_microblock = MAX_MICROBLOCK_SZ/sizeof(fd_txn_p_t);

Expand All @@ -169,8 +169,7 @@ run( fd_frank_args_t * args ) {
fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, seed, 0UL ) );
if( FD_UNLIKELY( !rng ) ) FD_LOG_ERR(( "fd_rng_join failed" ));


void * pack_laddr = fd_wksp_alloc_laddr( wksp, fd_pack_align(), pack_footprint, FD_PACK_TAG );
void * pack_laddr = fd_wksp_alloc_laddr( fd_wksp_containing( args->tile_pod ), fd_pack_align(), pack_footprint, FD_PACK_TAG );
if( FD_UNLIKELY( !pack_laddr ) ) FD_LOG_ERR(( "allocating memory for pack object failed" ));


Expand Down Expand Up @@ -254,7 +253,7 @@ run( fd_frank_args_t * args ) {
for( ulong i=0UL; i<bank_cnt; i++ ) {
out_state * o = out+i;
if( FD_LIKELY( o->out_cr_avail>0UL ) ) { /* optimize for the case we send a microblock */
void * microblock_dst = fd_chunk_to_laddr( wksp, o->out_chunk );
void * microblock_dst = fd_chunk_to_laddr( out_wksp, o->out_chunk );
ulong schedule_cnt = fd_pack_schedule_next_microblock( pack, cus_per_microblock, vote_fraction, microblock_dst );
if( FD_LIKELY( schedule_cnt ) ) {
ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
Expand Down Expand Up @@ -362,6 +361,7 @@ fd_frank_task_t frank_pack = {
.name = "pack",
.in_wksp = "dedup_pack",
.out_wksp = "pack_bank",
.extra_wksp = "tpu_txn_data",
.allow_syscalls_sz = sizeof(allow_syscalls)/sizeof(allow_syscalls[ 0 ]),
.allow_syscalls = allow_syscalls,
.allow_fds = allow_fds,
Expand Down
3 changes: 2 additions & 1 deletion src/app/frank/fd_frank_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ run( fd_frank_args_t * args ) {

FD_LOG_INFO(( "joining dcache" ));
snprintf( path, sizeof(path), "dcache%lu", args->tile_idx );
uchar * dcache = fd_dcache_join( fd_wksp_pod_map( args->out_pod, path ) );
uchar * dcache = fd_dcache_join( fd_wksp_pod_map( args->extra_pod, path ) );
if( FD_UNLIKELY( !dcache ) ) FD_LOG_ERR(( "fd_dcache_join failed" ));

FD_LOG_INFO(( "loading quic" ));
Expand Down Expand Up @@ -154,6 +154,7 @@ fd_frank_task_t frank_quic = {
.name = "quic",
.in_wksp = NULL,
.out_wksp = "quic_verify",
.extra_wksp = "tpu_txn_data",
.allow_syscalls_sz = sizeof(allow_syscalls)/sizeof(allow_syscalls[ 0 ]),
.allow_syscalls = allow_syscalls,
.allow_fds = allow_fds,
Expand Down
Loading

0 comments on commit 7529db1

Please sign in to comment.