Skip to content

Commit

Permalink
Sending to many peers can be tolerant of failures (#160)
Browse files Browse the repository at this point in the history
* Tolerate send errors on roll call (unless consesus is required)

* Add test to sendoToMany
  • Loading branch information
Maelkum authored Aug 30, 2024
1 parent cec3542 commit b1a1ab4
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 93 deletions.
36 changes: 12 additions & 24 deletions consensus/pbft/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/hashicorp/go-multierror"

Expand Down Expand Up @@ -45,48 +44,37 @@ func (r *Replica) broadcast(msg interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.NetworkTimeout)
defer cancel()

var (
wg sync.WaitGroup
multierr *multierror.Error
lock sync.Mutex
)

var errGroup multierror.Group
for _, target := range r.peers {
target := target

// Skip self.
if target == r.id {
continue
}

wg.Add(1)

// Send concurrently to everyone.
go func(peer peer.ID) {
defer wg.Done()
errGroup.Go(func() error {

// NOTE: We could potentially retry sending if we fail once. On the other hand, somewhat unlikely they're
// back online split second later.

err := r.host.SendMessageOnProtocol(ctx, peer, payload, r.protocolID)
err := r.host.SendMessageOnProtocol(ctx, target, payload, r.protocolID)
if err != nil {

lock.Lock()
defer lock.Unlock()

multierr = multierror.Append(multierr, err)
return fmt.Errorf("peer send error (peer: %v): %w", target.String(), err)
}
}(target)
}

wg.Wait()
return nil
})
}

// If all went well, just return.
sendErr := multierr.ErrorOrNil()
if sendErr == nil {
sendErr := errGroup.Wait()
if sendErr.ErrorOrNil() == nil {
return nil
}

// Warn if we had more send errors than we bargained for.
errCount := uint(multierr.Len())
errCount := uint(sendErr.Len())
if errCount > r.f {
r.log.Warn().Uint("f", r.f).Uint("errors", errCount).Msg("broadcast error count higher than pBFT f value")
}
Expand Down
4 changes: 2 additions & 2 deletions node/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee
}

// Request execution from peers.
err := n.sendToMany(ctx, replicas, reqCluster)
err := n.sendToMany(ctx, replicas, reqCluster, true)
if err != nil {
return fmt.Errorf("could not send cluster formation request to peers: %w", err)
}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error {
ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout)
defer cancel()

err := n.sendToMany(ctx, replicas, msgDisband)
err := n.sendToMany(ctx, replicas, msgDisband, true)
if err != nil {
return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err)
}
Expand Down
6 changes: 5 additions & 1 deletion node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
}
}

err = n.sendToMany(ctx, reportingPeers, reqExecute)
err = n.sendToMany(ctx,
reportingPeers,
reqExecute,
consensusRequired(consensusAlgo), // If we're using consensus, try to reach all peers.
)
if err != nil {
return codes.Error, nil, cluster, fmt.Errorf("could not send execution request to peers (function: %s, request: %s): %w", req.FunctionID, requestID, err)
}
Expand Down
44 changes: 36 additions & 8 deletions node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -63,24 +64,51 @@ func (n *Node) send(ctx context.Context, to peer.ID, msg blockless.Message) erro
return nil
}

// sendToMany serializes the message and sends it to a number of peers. It aborts on any error.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message) error {
// sendToMany serializes the message and sends it to a number of peers. `requireAll` dictates how we treat partial errors.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message, requireAll bool) error {

// Serialize the message.
payload, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not encode record: %w", err)
}

var errGroup multierror.Group
for i, peer := range peers {
// Send message.
err = n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("could not send message to peer (id: %v, peer %d out of %d): %w", peer, i, len(peers), err)
}
i := i
peer := peer

errGroup.Go(func() error {
err := n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i+1, len(peers), peer.String(), err)
}

return nil
})
}

return nil
retErr := errGroup.Wait()
if retErr == nil || len(retErr.Errors) == 0 {
// If everything succeeded => ok.
return nil
}

switch len(retErr.Errors) {
case len(peers):
// If everything failed => error.
return fmt.Errorf("all sends failed: %w", retErr)

default:
// Some sends failed - do as requested by `requireAll`.
if requireAll {
return fmt.Errorf("some sends failed: %w", retErr)
}

n.log.Warn().Err(retErr).Msg("some sends failed, proceeding")

return nil
}
}

func (n *Node) publish(ctx context.Context, msg blockless.Message) error {
Expand Down
159 changes: 101 additions & 58 deletions node/message_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,102 +3,137 @@ package node
import (
"context"
"encoding/json"
"math/rand"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/blocklessnetwork/b7s/host"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/testing/mocks"
)

func TestNode_Messaging(t *testing.T) {
func TestNode_SendMessage(t *testing.T) {

const (
topic = DefaultTopic
)
client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

rec := newDummyRecord()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)

var received dummyRecord
getStreamPayload(t, stream, &received)

require.Equal(t, rec, received)
})

err = node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)

wg.Wait()
}

func TestNode_Publish(t *testing.T) {

var (
rec = dummyRecord{
ID: mocks.GenericUUID.String(),
Value: 19846,
Description: "dummy-description",
}
rec = newDummyRecord()
ctx = context.Background()
topic = DefaultTopic
)

client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = client.InitPubSub(ctx)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

t.Run("sending single message", func(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()
err = node.subscribeToTopics(ctx)
require.NoError(t, err)

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)
// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)

var received dummyRecord
getStreamPayload(t, stream, &received)
// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)

require.Equal(t, rec, received)
})
time.Sleep(subscriptionDiseminationPause)

err := node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)
err = node.publish(ctx, rec)
require.NoError(t, err)

wg.Wait()
})
t.Run("publishing to a topic", func(t *testing.T) {
t.Parallel()
deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
require.NoError(t, err)

ctx := context.Background()
from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

err = client.InitPubSub(ctx)
require.NoError(t, err)
var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
require.NoError(t, err)
require.Equal(t, rec, received)
}

// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)
func TestNode_SendMessageToMany(t *testing.T) {

// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)
client1, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = node.subscribeToTopics(ctx)
require.NoError(t, err)
client2, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

time.Sleep(subscriptionDiseminationPause)
node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client1)
hostAddNewPeer(t, node.host, client2)

err = node.publish(ctx, rec)
require.NoError(t, err)
client1.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})
client2.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})

deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
// NOTE: These subtests are sequential.
t.Run("nominal case - sending to two online peers is ok", func(t *testing.T) {
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.NoError(t, err)

from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
})
t.Run("peer is down with requireAll is an error", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.Error(t, err)
})
t.Run("peer is down with partial delivery is ok", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.NoError(t, err)

require.Equal(t, rec, received)
})
t.Run("all sends failing produces an error", func(t *testing.T) {
client1.Close()
client2.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.Error(t, err)
})
}

Expand All @@ -111,3 +146,11 @@ type dummyRecord struct {
func (dummyRecord) Type() string {
return "MessageDummyRecord"
}

func newDummyRecord() dummyRecord {
return dummyRecord{
ID: mocks.GenericUUID.String(),
Value: rand.Uint64(),
Description: "dummy-description",
}
}

0 comments on commit b1a1ab4

Please sign in to comment.