Skip to content

Commit

Permalink
Add the ability to listen for discovered chains (#808)
Browse files Browse the repository at this point in the history
Expand chain exchange to accept a listener which is notified whenever a
new chain is discovered. This mechanism is intended to be integrated
into F3 host pubsub, whereupon receiving a partial message the host
looks up its chain. When known, the chain is returned immediately.
Otherwise, the host would buffer the partial message and await
notification of its discovering from chain exchange.

Part of #792
  • Loading branch information
masih authored Jan 6, 2025
1 parent 135bfe6 commit fbd674a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 9 deletions.
4 changes: 4 additions & 0 deletions chainexchange/chainexchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ type ChainExchange interface {
RemoveChainsByInstance(context.Context, uint64) error
}

type Listener interface {
NotifyChainDiscovered(ctx context.Context, key Key, instance uint64, chain gpbft.ECChain)
}

func (k Key) IsZero() bool { return len(k) == 0 }
11 changes: 11 additions & 0 deletions chainexchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type options struct {
maxInstanceLookahead uint64
maxDiscoveredChainsPerInstance int
maxWantedChainsPerInstance int
listener Listener
}

func newOptions(o ...Option) (*options, error) {
Expand Down Expand Up @@ -132,3 +133,13 @@ func WithMaxWantedChainsPerInstance(max int) Option {
return nil
}
}

func WithListener(listener Listener) Option {
return func(o *options) error {
if listener == nil {
return errors.New("listener cannot be nil")
}
o.listener = listener
return nil
}
}
44 changes: 36 additions & 8 deletions chainexchange/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *PubSubChainExchange) Key(chain gpbft.ECChain) Key {
return rootDigest[:]
}

func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {
func (p *PubSubChainExchange) GetChainByInstance(ctx context.Context, instance uint64, key Key) (gpbft.ECChain, bool) {

// We do not have to take instance as input, and instead we can just search
// through all the instance as they are not expected to be more than 10. The
Expand All @@ -121,26 +121,34 @@ func (p *PubSubChainExchange) GetChainByInstance(_ context.Context, instance uin
return nil, false
}

p.mu.Lock()
defer p.mu.Unlock()

cacheKey := string(key)

// Check wanted keys first.
p.mu.Lock()
wanted := p.getChainsWantedAt(instance)
p.mu.Unlock()
if portion, found := wanted.Get(cacheKey); found && !portion.IsPlaceholder() {
// Found and is not a placeholder.
return portion.chain, true
}

// Check if the chain for the key is discovered.
p.mu.Lock()
discovered := p.getChainsDiscoveredAt(instance)
if portion, found := discovered.Get(cacheKey); found {
// Add it to the wanted cache and remove it from the discovered cache.
wanted.Add(cacheKey, portion)
discovered.Remove(cacheKey)
p.mu.Unlock()

chain := portion.chain
if p.listener != nil {
p.listener.NotifyChainDiscovered(ctx, key, instance, chain)
}
// TODO: Do we want to pull all the suffixes of the chain into wanted cache?
return portion.chain, true
return chain, true
}
p.mu.Unlock()

// Otherwise, add a placeholder for the wanted key as a way to prioritise its
// retention via LRU recent-ness.
wanted.ContainsOrAdd(cacheKey, chainPortionPlaceHolder)
Expand Down Expand Up @@ -250,10 +258,15 @@ func (p *PubSubChainExchange) Broadcast(ctx context.Context, msg Message) error
return nil
}

type discovery struct {
key Key
instance uint64
chain gpbft.ECChain
}

func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Message) {
var notifications []discovery
p.mu.Lock()
defer p.mu.Unlock()

wanted := p.getChainsWantedAt(cmsg.Instance)
for offset := len(cmsg.Chain); offset >= 0 && ctx.Err() == nil; offset-- {
// TODO: Expose internals of merkle.go so that keys can be generated
Expand All @@ -265,11 +278,26 @@ func (p *PubSubChainExchange) cacheAsWantedChain(ctx context.Context, cmsg Messa
wanted.Add(cacheKey, &chainPortion{
chain: prefix,
})
if p.listener != nil {
notifications = append(notifications, discovery{
key: key,
instance: cmsg.Instance,
chain: prefix,
})
}
}
// Continue with the remaining prefix keys as we do not know if any of them have
// been evicted from the cache or not. This should be cheap enough considering the
// added complexity of tracking evictions relative to chain prefixes.
}
p.mu.Unlock()

// Notify the listener outside the lock.
if p.listener != nil {
for _, notification := range notifications {
p.listener.NotifyChainDiscovered(ctx, notification.key, notification.instance, notification.chain)
}
}
}

func (p *PubSubChainExchange) RemoveChainsByInstance(_ context.Context, instance uint64) error {
Expand Down
28 changes: 27 additions & 1 deletion chainexchange/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
const topicName = "fish"
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
var testInstant gpbft.Instant
var testListener listener
host, err := libp2p.New()
require.NoError(t, err)
t.Cleanup(func() {
Expand All @@ -33,6 +34,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
chainexchange.WithPubSub(ps),
chainexchange.WithTopicName(topicName),
chainexchange.WithTopicScoreParams(nil),
chainexchange.WithListener(&testListener),
)
require.NoError(t, err)
require.NotNil(t, subject)
Expand All @@ -50,6 +52,7 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
chain, found := subject.GetChainByInstance(ctx, instance, key)
require.False(t, found)
require.Nil(t, chain)
require.Empty(t, testListener.notifications)

require.NoError(t, subject.Broadcast(ctx, chainexchange.Message{
Instance: instance,
Expand All @@ -66,11 +69,34 @@ func TestPubSubChainExchange_Broadcast(t *testing.T) {
require.True(t, found)
require.Equal(t, baseChain, chain)

// Assert that we have received 2 notifications, because ecChain has 2 tipsets.
// First should be the ecChain, second should be the baseChain.
require.Len(t, testListener.notifications, 2)
require.Equal(t, instance, testListener.notifications[1].instance)
require.Equal(t, baseKey, testListener.notifications[1].key)
require.Equal(t, baseChain, testListener.notifications[1].chain)
require.Equal(t, instance, testListener.notifications[0].instance)
require.Equal(t, key, testListener.notifications[0].key)
require.Equal(t, ecChain, testListener.notifications[0].chain)

require.NoError(t, subject.Shutdown(ctx))
}

type notification struct {
key chainexchange.Key
instance uint64
chain gpbft.ECChain
}
type listener struct {
notifications []notification
}

func (l *listener) NotifyChainDiscovered(_ context.Context, key chainexchange.Key, instance uint64, chain gpbft.ECChain) {
l.notifications = append(l.notifications, notification{key: key, instance: instance, chain: chain})
}

// TODO: Add more tests, specifically:
// - valodation
// - validation
// - discovery through other chainexchange instance
// - cache eviction/fixed memory footprint.
// - fulfilment of chain from discovery to wanted in any order.
Expand Down

0 comments on commit fbd674a

Please sign in to comment.