Skip to content

Commit

Permalink
Use treap for pending_slots in disco/store
Browse files Browse the repository at this point in the history
  • Loading branch information
yhzhangjump committed Aug 1, 2024
1 parent c3cd3cd commit 85127f3
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 90 deletions.
16 changes: 8 additions & 8 deletions src/app/fdctl/run/tiles/fd_store_int.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ after_frag( void * _ctx,
for( ulong i = 0; i < ctx->s34_buffer->shred_cnt; i++ ) {
fd_shred_t * shred = &ctx->s34_buffer->pkts[i].shred;

if( FD_UNLIKELY( (long)(ctx->store->pending_slots->end - shred->slot) > (long)FD_PENDING_MAX ) ) {
FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot));
continue;
}
/* if( FD_UNLIKELY( (long)(ctx->store->pending_slots->end - shred->slot) > (long)FD_PENDING_MAX ) ) { */
/* FD_LOG_WARNING(("received shred %lu that would overrun pending queue. skipping.", shred->slot)); */
/* continue; */
/* } */

if( FD_UNLIKELY( (long)(ctx->store->curr_turbine_slot - shred->slot) > 100 ) ) {
FD_LOG_WARNING(("received shred with slot %lu that would overrun pending queue. skipping.", shred->slot));
Expand Down Expand Up @@ -433,10 +433,10 @@ after_credit( void * _ctx,

ctx->store->now = fd_log_wallclock();

if( FD_UNLIKELY( ctx->sim &&
ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) {
FD_LOG_ERR( ( "Sim is complete." ) );
}
/* if( FD_UNLIKELY( ctx->sim && */
/* ctx->store->pending_slots->start == ctx->store->pending_slots->end ) ) { */
/* FD_LOG_ERR( ( "Sim is complete." ) ); */
/* } */

for( ulong i = fd_pending_slots_iter_init( ctx->store->pending_slots );
(i = fd_pending_slots_iter_next( ctx->store->pending_slots, ctx->store->now, i )) != ULONG_MAX; ) {
Expand Down
167 changes: 93 additions & 74 deletions src/disco/store/fd_pending_slots.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "fd_pending_slots.h"
#include <limits.h>
#include <sys/random.h>

void *
fd_pending_slots_new( void * mem, ulong lo_wmark ) {

fd_pending_slots_new( void * mem,
ulong lo_wmark FD_PARAM_UNUSED) {
if( FD_UNLIKELY( !mem ) ) {
FD_LOG_WARNING( ( "NULL mem" ) );
return NULL;
Expand All @@ -13,27 +15,23 @@ fd_pending_slots_new( void * mem, ulong lo_wmark ) {
return NULL;
}

ulong footprint = fd_pending_slots_footprint();

fd_memset( mem, 0, footprint );
ulong laddr = (ulong)mem;
fd_pending_slots_t * pending_slots = (void *)laddr;
pending_slots->lo_wmark = lo_wmark;
pending_slots->start = 0;
pending_slots->end = 0;
pending_slots->lock = 0;
fd_pending_slots_t * pending_slots = mem;
fd_memset( mem, 0, fd_pending_slots_footprint() );
ulong laddr = (ulong)(mem) + sizeof( fd_pending_slots_t );

laddr += sizeof( fd_pending_slots_t );
pending_slots->pending = (void *)laddr;
laddr = fd_ulong_align_up( laddr, fd_pending_slots_treap_align() );
pending_slots->treap = fd_pending_slots_treap_join( fd_pending_slots_treap_new( (void *)(laddr), MAX_PENDING_SLOTS ) );
laddr += fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS );

laddr += sizeof(long) * FD_PENDING_MAX;

FD_TEST( laddr == (ulong)mem + footprint );
laddr = fd_ulong_align_up( laddr, fd_pending_slots_pool_align() );
pending_slots->pool = fd_pending_slots_pool_join( fd_pending_slots_pool_new( (void*)(laddr), MAX_PENDING_SLOTS ) );
laddr += fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS );

laddr = fd_ulong_align_up( laddr, fd_pending_slots_align() );
FD_TEST( laddr == (ulong)mem + fd_pending_slots_footprint() );
return mem;
}

/* TODO only safe for local joins */
fd_pending_slots_t *
fd_pending_slots_join( void * pending_slots ) {
if( FD_UNLIKELY( !pending_slots ) ) {
Expand All @@ -46,7 +44,20 @@ fd_pending_slots_join( void * pending_slots ) {
return NULL;
}

fd_pending_slots_t * pending_slots_ = (fd_pending_slots_t *)pending_slots;
ulong laddr = (ulong)pending_slots;
fd_pending_slots_t * pending_slots_ = pending_slots;
laddr += sizeof( fd_pending_slots_t );

laddr = fd_ulong_align_up( laddr, fd_pending_slots_treap_align() );
pending_slots_->treap = fd_pending_slots_treap_join( (void *)laddr );
laddr += fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS );

laddr = fd_ulong_align_up( laddr, fd_pending_slots_pool_align() );
pending_slots_->pool = fd_pending_slots_pool_join( (void *)laddr );
laddr += fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS );

laddr = fd_ulong_align_up( laddr, fd_pending_slots_align() );
FD_TEST( laddr == (ulong)pending_slots + fd_pending_slots_footprint() );

return pending_slots_;
}
Expand All @@ -58,6 +69,9 @@ fd_pending_slots_leave( fd_pending_slots_t const * pending_slots ) {
return NULL;
}

FD_TEST( fd_pending_slots_treap_leave( pending_slots->treap ) == pending_slots->treap );
FD_TEST( fd_pending_slots_pool_leave( pending_slots->pool ) == pending_slots->pool );

return (void *)pending_slots;
}

Expand All @@ -73,6 +87,9 @@ fd_pending_slots_delete( void * pending_slots ) {
return NULL;
}

fd_pending_slots_t * _pending_slots = pending_slots;
fd_pending_slots_treap_delete( fd_pending_slots_treap_leave( _pending_slots->treap ) );
fd_pending_slots_pool_delete( fd_pending_slots_pool_leave( _pending_slots->pool ) );
return pending_slots;
}

Expand All @@ -98,88 +115,90 @@ fd_pending_slots_unlock( fd_pending_slots_t * pending_slots ) {

ulong
fd_pending_slots_iter_init( fd_pending_slots_t * pending_slots ) {
return pending_slots->start;
fd_pending_slots_lock( pending_slots );
fd_pending_slots_treap_fwd_iter_t first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool );
ulong slot = fd_pending_slots_treap_fwd_iter_done( first )?
ULONG_MAX : fd_pending_slots_treap_fwd_iter_ele( first, pending_slots->pool )->ele.slot;
fd_pending_slots_unlock( pending_slots );

return slot;
}

ulong
fd_pending_slots_iter_next( fd_pending_slots_t * pending_slots,
long now,
ulong i ) {
fd_pending_slots_lock( pending_slots );
ulong end = pending_slots->end;
for( i = fd_ulong_max(i, pending_slots->start); 1; ++i ) {
if( i >= end ) {
/* End sentinel */
i = ULONG_MAX;
break;
}
long * ele = &pending_slots->pending[ i & FD_PENDING_MASK ];
if( i <= pending_slots->lo_wmark || *ele == 0 ) {
/* Empty or useless slot */
if( pending_slots->start == i )
pending_slots->start = i+1U; /* Pop it */
} else if( *ele <= now ) {

fd_pending_slots_treap_fwd_iter_t first, next;
first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool );
for( fd_pending_slots_treap_fwd_iter_t iter = first; !fd_pending_slots_treap_fwd_iter_done( iter ); iter = next ) {
fd_pending_slots_treap_ele_t * treap_ele = fd_pending_slots_treap_fwd_iter_ele( iter, pending_slots->pool );
fd_pending_slots_ele_t * ele = &treap_ele->ele;

if (ele->slot >= i && ele->time <= now) {
/* Do this slot */
long when = *ele;
*ele = 0;
if( pending_slots->start == i )
pending_slots->start = i+1U; /* Pop it */
ulong slot = ele->slot;
FD_LOG_DEBUG(( "preparing slot %lu when=%ld now=%ld latency=%ld",
i, when, now, now - when ));
break;
slot, ele->time, now, now - ele->time ));

/* Remove this slot from treap */
fd_pending_slots_treap_ele_remove( pending_slots->treap, treap_ele, pending_slots->pool );
fd_pending_slots_unlock( pending_slots );
return slot;
}

next = fd_pending_slots_treap_fwd_iter_next( iter, pending_slots->pool );
}

/* No good slot found, return sentinel */
fd_pending_slots_unlock( pending_slots );
return i;
return ULONG_MAX;
}

void
fd_pending_slots_add( fd_pending_slots_t * pending_slots,
ulong slot,
long when ) {
long when) {
fd_pending_slots_lock( pending_slots );

long * pending = pending_slots->pending;
if( pending_slots->start == pending_slots->end ) {
/* Queue is empty */
pending_slots->start = slot;
pending_slots->end = slot+1U;
pending[slot & FD_PENDING_MASK] = when;

} else if ( slot < pending_slots->start ) {
/* Grow down */
if( (long)(pending_slots->end - slot) > (long)FD_PENDING_MAX )
FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot ));
pending[slot & FD_PENDING_MASK] = when;
for( ulong i = slot+1; i < pending_slots->start; i++ ) {
/* Zero fill */
pending[i & FD_PENDING_MASK] = 0;
}
pending_slots->start = slot;

} else if ( slot >= pending_slots->end ) {
/* Grow up */
if( (long)(slot - pending_slots->start) > (long)FD_PENDING_MAX )
FD_LOG_ERR(( "pending queue overrun: start=%lu, end=%lu, new slot=%lu", pending_slots->start, pending_slots->end, slot ));
pending[slot & FD_PENDING_MASK] = when;
for( ulong i = pending_slots->end; i < slot; i++ ) {
/* Zero fill */
pending[i & FD_PENDING_MASK] = 0;
}
pending_slots->end = slot+1U;
fd_pending_slots_ele_t query = { .slot = slot, .time = when };
/* Given TREAP_CMP defined above, the query should only query the .slot field */
fd_pending_slots_treap_ele_t *ele = fd_pending_slots_treap_ele_query( pending_slots->treap, query, pending_slots->pool );

if ( ele ) {
if ( ele->ele.time > when ) ele->ele.time = when;
} else {
/* Update in place */
long * p = &pending[slot & FD_PENDING_MASK];
if( 0 == *p || *p > when )
*p = when;
fd_pending_slots_treap_ele_t * pool = pending_slots->pool;
ulong idx = fd_pending_slots_pool_idx_acquire( pool );
pool[ idx ].ele.slot = slot;
pool[ idx ].ele.time = when;
FD_TEST( sizeof(ushort) == getrandom( &pool[idx].prio, sizeof(ushort), 0 ));
fd_pending_slots_treap_idx_insert( pending_slots->treap, idx, pending_slots->pool );
}

fd_pending_slots_unlock( pending_slots );
}

void
fd_pending_slots_set_lo_wmark( fd_pending_slots_t * pending_slots,
ulong slot ) {
pending_slots->lo_wmark = slot;
ulong lo_wmark ) {
fd_pending_slots_lock( pending_slots );

fd_pending_slots_treap_fwd_iter_t first, next;
first = fd_pending_slots_treap_fwd_iter_init( pending_slots->treap, pending_slots->pool );
if ( !fd_pending_slots_treap_fwd_iter_done( first ) ) {
next = fd_pending_slots_treap_fwd_iter_next( first, pending_slots->pool );
}
while ( !fd_pending_slots_treap_fwd_iter_done( first ) ) {
fd_pending_slots_treap_ele_t * ele = fd_pending_slots_treap_fwd_iter_ele( first, pending_slots->pool );
if ( ele->ele.slot > lo_wmark ) break;
fd_pending_slots_treap_ele_remove( pending_slots->treap, ele, pending_slots->pool );
first = next;
if ( !fd_pending_slots_treap_fwd_iter_done( first ) ) {
next = fd_pending_slots_treap_fwd_iter_next( first, pending_slots->pool );
}
}

fd_pending_slots_unlock( pending_slots );
}
55 changes: 47 additions & 8 deletions src/disco/store/fd_pending_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,48 @@
#define HEADER_fd_src_flamenco_runtime_fd_pending_slots_h

#include "../../util/fd_util.h"
#include "../../util/bits/fd_bits.h"

#define MAX_PENDING_SLOTS USHORT_MAX

#define FD_PENDING_MAX ( 1U << 14U ) /* 16 kb */
#define FD_PENDING_MASK ( FD_PENDING_MAX - 1U )
struct fd_pending_slots_ele {
ulong slot;
long time;
};
typedef struct fd_pending_slots_ele fd_pending_slots_ele_t;

struct fd_pending_slots_treap_ele {
fd_pending_slots_ele_t ele;

/* The treap fields */
ushort parent;
ushort left;
ushort right;
ushort prio;
ushort next;
ushort prev;
};
typedef struct fd_pending_slots_treap_ele fd_pending_slots_treap_ele_t;

#define POOL_NAME fd_pending_slots_pool
#define POOL_T fd_pending_slots_treap_ele_t
#define POOL_IDX_T ushort
#include "../../util/tmpl/fd_pool.c"

#define TREAP_NAME fd_pending_slots_treap
#define TREAP_IDX_T ushort
#define TREAP_QUERY_T fd_pending_slots_ele_t
#define TREAP_T fd_pending_slots_treap_ele_t
#define TREAP_CMP(q,e) ((int)((long)q.slot - (long)e->ele.slot))
#define TREAP_LT(e0,e1) ((e0)->ele.slot < (e1)->ele.slot)
#define TREAP_OPTIMIZE_ITERATION 1
#define TREAP_IMPL_STYLE 0
#include "../../util/tmpl/fd_treap.c"

struct fd_pending_slots {
ulong start;
ulong end;
ulong lo_wmark;
ulong lock;
long * pending; /* pending slots to try to prepare */
ulong lock;
fd_pending_slots_treap_t * treap;
fd_pending_slots_treap_ele_t * pool;
};
typedef struct fd_pending_slots fd_pending_slots_t;

Expand All @@ -25,7 +56,15 @@ fd_pending_slots_align( void ) {

FD_FN_CONST static inline ulong
fd_pending_slots_footprint( void ) {
return sizeof( fd_pending_slots_t ) + (sizeof(long) * FD_PENDING_MAX);
return FD_LAYOUT_FINI(
FD_LAYOUT_APPEND(
FD_LAYOUT_APPEND(
FD_LAYOUT_APPEND(
FD_LAYOUT_INIT,
alignof( fd_pending_slots_t ), sizeof( fd_pending_slots_t ) ),
fd_pending_slots_treap_align(), fd_pending_slots_treap_footprint( MAX_PENDING_SLOTS ) ),
fd_pending_slots_pool_align(), fd_pending_slots_pool_footprint( MAX_PENDING_SLOTS ) ),
fd_pending_slots_align() );
}

void *
Expand Down

0 comments on commit 85127f3

Please sign in to comment.