From 85127f3f1929ee2a6afdd5ee6ddb992596d00c47 Mon Sep 17 00:00:00 2001 From: Yunhao Zhang Date: Wed, 31 Jul 2024 21:24:17 +0000 Subject: [PATCH] Use treap for pending_slots in disco/store --- src/app/fdctl/run/tiles/fd_store_int.c | 16 +-- src/disco/store/fd_pending_slots.c | 167 ++++++++++++++----------- src/disco/store/fd_pending_slots.h | 55 ++++++-- 3 files changed, 148 insertions(+), 90 deletions(-) diff --git a/src/app/fdctl/run/tiles/fd_store_int.c b/src/app/fdctl/run/tiles/fd_store_int.c index c5968dd969..cd7110128a 100644 --- a/src/app/fdctl/run/tiles/fd_store_int.c +++ b/src/app/fdctl/run/tiles/fd_store_int.c @@ -209,10 +209,10 @@ after_frag( void * _ctx, for( ulong i = 0; i < ctx->s34_buffer->shred_cnt; i++ ) { fd_shred_t * shred = &ctx->s34_buffer->pkts[i].shred; - if( FD_UNLIKELY( (long)(ctx->store->pending_slots->end - shred->slot) > (long)FD_PENDING_MAX ) ) { - FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot)); - continue; - } + /* if( FD_UNLIKELY( (long)(ctx->store->pending_slots->end - shred->slot) > (long)FD_PENDING_MAX ) ) { */ + /* FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot)); */ + /* continue; */ + /* } */ if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > 100 ) ) { FD_LOG_WARNING(("received shred with slot %lu that would overrun pending queue. skipping.", shred->slot)); @@ -433,10 +433,10 @@ after_credit( void * _ctx, ctx->store->now = fd_log_wallclock(); - if( FD_UNLIKELY( ctx->sim && - ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) { - FD_LOG_ERR( ( "Sim is complete." ) ); - } + /* if( FD_UNLIKELY( ctx->sim && */ + /* ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) { */ + /* FD_LOG_ERR( ( "Sim is complete." ) ); */ + /* } */ for( ulong i = fd_pending_slots_iter_init( ctx->store->pending_slots ); (i = fd_pending_slots_iter_next( ctx->store->pending_slots, ctx->store->now, i )) != ULONG_MAX; ) { diff --git a/src/disco/store/fd_pending_slots.c b/src/disco/store/fd_pending_slots.c index 699112b21c..f2f7410bc0 100644 --- a/src/disco/store/fd_pending_slots.c +++ b/src/disco/store/fd_pending_slots.c @@ -1,8 +1,10 @@ #include "fd_pending_slots.h" +#include +#include void * -fd_pending_slots_new( void * mem, ulong lo_wmark ) { - +fd_pending_slots_new( void * mem, + ulong lo_wmark FD_PARAM_UNUSED) { if( FD_UNLIKELY( !mem ) ) { FD_LOG_WARNING( ( "NULL mem" ) ); return NULL; @@ -13,27 +15,23 @@ fd_pending_slots_new( void * mem, ulong lo_wmark ) { return NULL; } - ulong footprint = fd_pending_slots_footprint(); - - fd_memset( mem, 0, footprint ); - ulong laddr = (ulong)mem; - fd_pending_slots_t * pending_slots = (void *)laddr; - pending_slots->lo_wmark = lo_wmark; - pending_slots->start = 0; - pending_slots->end = 0; - pending_slots->lock = 0; + fd_pending_slots_t * pending_slots = mem; + fd_memset( mem, 0, fd_pending_slots_footprint() ); + ulong laddr = (ulong)(mem) + sizeof( fd_pending_slots_t ); - laddr += sizeof( fd_pending_slots_t ); - pending_slots->pending = (void *)laddr; + laddr = fd_ulong_align_up( laddr, fd_pending_slots_treap_align() ); + pending_slots->treap = fd_pending_slots_treap_join( fd_pending_slots_treap_new( (void *)(laddr), MAX_PENDING_SLOTS ) ); + laddr += fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS ); - laddr += sizeof(long) * FD_PENDING_MAX; - - FD_TEST( laddr == (ulong)mem + footprint ); + laddr = fd_ulong_align_up( laddr, fd_pending_slots_pool_align() ); + pending_slots->pool = fd_pending_slots_pool_join( fd_pending_slots_pool_new( (void*)(laddr), MAX_PENDING_SLOTS ) ); + laddr += fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS ); + laddr = fd_ulong_align_up( laddr, fd_pending_slots_align() ); + FD_TEST( laddr == (ulong)mem + fd_pending_slots_footprint() ); return mem; } -/* TODO only safe for local joins */ fd_pending_slots_t * fd_pending_slots_join( void * pending_slots ) { if( FD_UNLIKELY( !pending_slots ) ) { @@ -46,7 +44,20 @@ fd_pending_slots_join( void * pending_slots ) { return NULL; } - fd_pending_slots_t * pending_slots_ = (fd_pending_slots_t *)pending_slots; + ulong laddr = (ulong)pending_slots; + fd_pending_slots_t * pending_slots_ = pending_slots; + laddr += sizeof( fd_pending_slots_t ); + + laddr = fd_ulong_align_up( laddr, fd_pending_slots_treap_align() ); + pending_slots_->treap = fd_pending_slots_treap_join( (void *)laddr ); + laddr += fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS ); + + laddr = fd_ulong_align_up( laddr, fd_pending_slots_pool_align() ); + pending_slots_->pool = fd_pending_slots_pool_join( (void *)laddr ); + laddr += fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS ); + + laddr = fd_ulong_align_up( laddr, fd_pending_slots_align() ); + FD_TEST( laddr == (ulong)pending_slots + fd_pending_slots_footprint() ); return pending_slots_; } @@ -58,6 +69,9 @@ fd_pending_slots_leave( fd_pending_slots_t const * pending_slots ) { return NULL; } + FD_TEST( fd_pending_slots_treap_leave( pending_slots->treap ) == pending_slots->treap ); + FD_TEST( fd_pending_slots_pool_leave( pending_slots->pool ) == pending_slots->pool ); + return (void *)pending_slots; } @@ -73,6 +87,9 @@ fd_pending_slots_delete( void * pending_slots ) { return NULL; } + fd_pending_slots_t * _pending_slots = pending_slots; + fd_pending_slots_treap_delete( fd_pending_slots_treap_leave( _pending_slots->treap ) ); + fd_pending_slots_pool_delete( fd_pending_slots_pool_leave( _pending_slots->pool ) ); return pending_slots; } @@ -98,7 +115,13 @@ fd_pending_slots_unlock( fd_pending_slots_t * pending_slots ) { ulong fd_pending_slots_iter_init( fd_pending_slots_t * pending_slots ) { - return pending_slots->start; + fd_pending_slots_lock( pending_slots ); + fd_pending_slots_treap_fwd_iter_t first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool ); + ulong slot = fd_pending_slots_treap_fwd_iter_done( first )? + ULONG_MAX : fd_pending_slots_treap_fwd_iter_ele( first, pending_slots->pool )->ele.slot; + fd_pending_slots_unlock( pending_slots ); + + return slot; } ulong @@ -106,73 +129,52 @@ fd_pending_slots_iter_next( fd_pending_slots_t * pending_slots, long now, ulong i ) { fd_pending_slots_lock( pending_slots ); - ulong end = pending_slots->end; - for( i = fd_ulong_max(i, pending_slots->start); 1; ++i ) { - if( i >= end ) { - /* End sentinel */ - i = ULONG_MAX; - break; - } - long * ele = &pending_slots->pending[ i & FD_PENDING_MASK ]; - if( i <= pending_slots->lo_wmark || *ele == 0 ) { - /* Empty or useless slot */ - if( pending_slots->start == i ) - pending_slots->start = i+1U; /* Pop it */ - } else if( *ele <= now ) { + + fd_pending_slots_treap_fwd_iter_t first, next; + first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool ); + for( fd_pending_slots_treap_fwd_iter_t iter = first; !fd_pending_slots_treap_fwd_iter_done( iter ); iter = next ) { + fd_pending_slots_treap_ele_t * treap_ele = fd_pending_slots_treap_fwd_iter_ele( iter, pending_slots->pool ); + fd_pending_slots_ele_t * ele = &treap_ele->ele; + + if (ele->slot >= i && ele->time <= now) { /* Do this slot */ - long when = *ele; - *ele = 0; - if( pending_slots->start == i ) - pending_slots->start = i+1U; /* Pop it */ + ulong slot = ele->slot; FD_LOG_DEBUG(( "preparing slot %lu when=%ld now=%ld latency=%ld", - i, when, now, now - when )); - break; + slot, ele->time, now, now - ele->time )); + + /* Remove this slot from treap */ + fd_pending_slots_treap_ele_remove( pending_slots->treap, treap_ele, pending_slots->pool ); + fd_pending_slots_unlock( pending_slots ); + return slot; } + + next = fd_pending_slots_treap_fwd_iter_next( iter, pending_slots->pool ); } + + /* No good slot found, return sentinel */ fd_pending_slots_unlock( pending_slots ); - return i; + return ULONG_MAX; } void fd_pending_slots_add( fd_pending_slots_t * pending_slots, ulong slot, - long when ) { + long when) { fd_pending_slots_lock( pending_slots ); - long * pending = pending_slots->pending; - if( pending_slots->start == pending_slots->end ) { - /* Queue is empty */ - pending_slots->start = slot; - pending_slots->end = slot+1U; - pending[slot & FD_PENDING_MASK] = when; - - } else if ( slot < pending_slots->start ) { - /* Grow down */ - if( (long)(pending_slots->end - slot) > (long)FD_PENDING_MAX ) - FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot )); - pending[slot & FD_PENDING_MASK] = when; - for( ulong i = slot+1; i < pending_slots->start; i++ ) { - /* Zero fill */ - pending[i & FD_PENDING_MASK] = 0; - } - pending_slots->start = slot; - - } else if ( slot >= pending_slots->end ) { - /* Grow up */ - if( (long)(slot - pending_slots->start) > (long)FD_PENDING_MAX ) - FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot )); - pending[slot & FD_PENDING_MASK] = when; - for( ulong i = pending_slots->end; i < slot; i++ ) { - /* Zero fill */ - pending[i & FD_PENDING_MASK] = 0; - } - pending_slots->end = slot+1U; + fd_pending_slots_ele_t query = { .slot = slot, .time = when }; + /* Given TREAP_CMP defined above, the query should only query the .slot field */ + fd_pending_slots_treap_ele_t *ele = fd_pending_slots_treap_ele_query( pending_slots->treap, query, pending_slots->pool ); + if ( ele ) { + if ( ele->ele.time > when ) ele->ele.time = when; } else { - /* Update in place */ - long * p = &pending[slot & FD_PENDING_MASK]; - if( 0 == *p || *p > when ) - *p = when; + fd_pending_slots_treap_ele_t * pool = pending_slots->pool; + ulong idx = fd_pending_slots_pool_idx_acquire( pool ); + pool[ idx ].ele.slot = slot; + pool[ idx ].ele.time = when; + FD_TEST( sizeof(ushort) == getrandom( &pool[idx].prio, sizeof(ushort), 0 )); + fd_pending_slots_treap_idx_insert( pending_slots->treap, idx, pending_slots->pool ); } fd_pending_slots_unlock( pending_slots ); @@ -180,6 +182,23 @@ fd_pending_slots_add( fd_pending_slots_t * pending_slots, void fd_pending_slots_set_lo_wmark( fd_pending_slots_t * pending_slots, - ulong slot ) { - pending_slots->lo_wmark = slot; + ulong lo_wmark ) { + fd_pending_slots_lock( pending_slots ); + + fd_pending_slots_treap_fwd_iter_t first, next; + first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool ); + if ( !fd_pending_slots_treap_fwd_iter_done( first ) ) { + next = fd_pending_slots_treap_fwd_iter_next( first, pending_slots->pool ); + } + while ( !fd_pending_slots_treap_fwd_iter_done( first ) ) { + fd_pending_slots_treap_ele_t * ele = fd_pending_slots_treap_fwd_iter_ele( first, pending_slots->pool ); + if ( ele->ele.slot > lo_wmark ) break; + fd_pending_slots_treap_ele_remove( pending_slots->treap, ele, pending_slots->pool ); + first = next; + if ( !fd_pending_slots_treap_fwd_iter_done( first ) ) { + next = fd_pending_slots_treap_fwd_iter_next( first, pending_slots->pool ); + } + } + + fd_pending_slots_unlock( pending_slots ); } diff --git a/src/disco/store/fd_pending_slots.h b/src/disco/store/fd_pending_slots.h index 3a1e55692a..da6016fb7a 100644 --- a/src/disco/store/fd_pending_slots.h +++ b/src/disco/store/fd_pending_slots.h @@ -2,17 +2,48 @@ #define HEADER_fd_src_flamenco_runtime_fd_pending_slots_h #include "../../util/fd_util.h" +#include "../../util/bits/fd_bits.h" +#define MAX_PENDING_SLOTS USHORT_MAX -#define FD_PENDING_MAX ( 1U << 14U ) /* 16 kb */ -#define FD_PENDING_MASK ( FD_PENDING_MAX - 1U ) +struct fd_pending_slots_ele { + ulong slot; + long time; +}; +typedef struct fd_pending_slots_ele fd_pending_slots_ele_t; + +struct fd_pending_slots_treap_ele { + fd_pending_slots_ele_t ele; + + /* The treap fields */ + ushort parent; + ushort left; + ushort right; + ushort prio; + ushort next; + ushort prev; +}; +typedef struct fd_pending_slots_treap_ele fd_pending_slots_treap_ele_t; + +#define POOL_NAME fd_pending_slots_pool +#define POOL_T fd_pending_slots_treap_ele_t +#define POOL_IDX_T ushort +#include "../../util/tmpl/fd_pool.c" + +#define TREAP_NAME fd_pending_slots_treap +#define TREAP_IDX_T ushort +#define TREAP_QUERY_T fd_pending_slots_ele_t +#define TREAP_T fd_pending_slots_treap_ele_t +#define TREAP_CMP(q,e) ((int)((long)q.slot - (long)e->ele.slot)) +#define TREAP_LT(e0,e1) ((e0)->ele.slot < (e1)->ele.slot) +#define TREAP_OPTIMIZE_ITERATION 1 +#define TREAP_IMPL_STYLE 0 +#include "../../util/tmpl/fd_treap.c" struct fd_pending_slots { - ulong start; - ulong end; - ulong lo_wmark; - ulong lock; - long * pending; /* pending slots to try to prepare */ + ulong lock; + fd_pending_slots_treap_t * treap; + fd_pending_slots_treap_ele_t * pool; }; typedef struct fd_pending_slots fd_pending_slots_t; @@ -25,7 +56,15 @@ fd_pending_slots_align( void ) { FD_FN_CONST static inline ulong fd_pending_slots_footprint( void ) { - return sizeof( fd_pending_slots_t ) + (sizeof(long) * FD_PENDING_MAX); + return FD_LAYOUT_FINI( + FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( + FD_LAYOUT_APPEND( + FD_LAYOUT_INIT, + alignof( fd_pending_slots_t ), sizeof( fd_pending_slots_t ) ), + fd_pending_slots_treap_align(), fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS ) ), + fd_pending_slots_pool_align(), fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS ) ), + fd_pending_slots_align() ); } void *