Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending to many peers can be tolerant of failures #160

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 12 additions & 24 deletions consensus/pbft/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/hashicorp/go-multierror"

Expand Down Expand Up @@ -45,48 +44,37 @@ func (r *Replica) broadcast(msg interface{}) error {
ctx, cancel := context.WithTimeout(context.Background(), r.cfg.NetworkTimeout)
defer cancel()

var (
wg sync.WaitGroup
multierr *multierror.Error
lock sync.Mutex
)

var errGroup multierror.Group
for _, target := range r.peers {
target := target

// Skip self.
if target == r.id {
continue
}

wg.Add(1)

// Send concurrently to everyone.
go func(peer peer.ID) {
defer wg.Done()
errGroup.Go(func() error {

// NOTE: We could potentially retry sending if we fail once. On the other hand, somewhat unlikely they're
// back online split second later.

err := r.host.SendMessageOnProtocol(ctx, peer, payload, r.protocolID)
err := r.host.SendMessageOnProtocol(ctx, target, payload, r.protocolID)
if err != nil {

lock.Lock()
defer lock.Unlock()

multierr = multierror.Append(multierr, err)
return fmt.Errorf("peer send error (peer: %v): %w", target.String(), err)
}
}(target)
}

wg.Wait()
return nil
})
}

// If all went well, just return.
sendErr := multierr.ErrorOrNil()
if sendErr == nil {
sendErr := errGroup.Wait()
if sendErr.ErrorOrNil() == nil {
return nil
}

// Warn if we had more send errors than we bargained for.
errCount := uint(multierr.Len())
errCount := uint(sendErr.Len())
if errCount > r.f {
r.log.Warn().Uint("f", r.f).Uint("errors", errCount).Msg("broadcast error count higher than pBFT f value")
}
Expand Down
4 changes: 2 additions & 2 deletions node/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (n *Node) formCluster(ctx context.Context, requestID string, replicas []pee
}

// Request execution from peers.
err := n.sendToMany(ctx, replicas, reqCluster)
err := n.sendToMany(ctx, replicas, reqCluster, true)
if err != nil {
return fmt.Errorf("could not send cluster formation request to peers: %w", err)
}
Expand Down Expand Up @@ -142,7 +142,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error {
ctx, cancel := context.WithTimeout(context.Background(), consensusClusterSendTimeout)
defer cancel()

err := n.sendToMany(ctx, replicas, msgDisband)
err := n.sendToMany(ctx, replicas, msgDisband, true)
if err != nil {
return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err)
}
Expand Down
6 changes: 5 additions & 1 deletion node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
}
}

err = n.sendToMany(ctx, reportingPeers, reqExecute)
err = n.sendToMany(ctx,
reportingPeers,
reqExecute,
consensusRequired(consensusAlgo), // If we're using consensus, try to reach all peers.
)
if err != nil {
return codes.Error, nil, cluster, fmt.Errorf("could not send execution request to peers (function: %s, request: %s): %w", req.FunctionID, requestID, err)
}
Expand Down
44 changes: 36 additions & 8 deletions node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"

"github.com/hashicorp/go-multierror"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"

Expand Down Expand Up @@ -63,24 +64,51 @@ func (n *Node) send(ctx context.Context, to peer.ID, msg blockless.Message) erro
return nil
}

// sendToMany serializes the message and sends it to a number of peers. It aborts on any error.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message) error {
// sendToMany serializes the message and sends it to a number of peers. `requireAll` dictates how we treat partial errors.
func (n *Node) sendToMany(ctx context.Context, peers []peer.ID, msg blockless.Message, requireAll bool) error {

// Serialize the message.
payload, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not encode record: %w", err)
}

var errGroup multierror.Group
for i, peer := range peers {
// Send message.
err = n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("could not send message to peer (id: %v, peer %d out of %d): %w", peer, i, len(peers), err)
}
i := i
peer := peer

errGroup.Go(func() error {
err := n.host.SendMessage(ctx, peer, payload)
if err != nil {
return fmt.Errorf("peer %v/%v send error (peer: %v): %w", i+1, len(peers), peer.String(), err)
}

return nil
})
}

return nil
retErr := errGroup.Wait()
if retErr == nil || len(retErr.Errors) == 0 {
// If everything succeeded => ok.
return nil
}

switch len(retErr.Errors) {
case len(peers):
// If everything failed => error.
return fmt.Errorf("all sends failed: %w", retErr)

default:
// Some sends failed - do as requested by `requireAll`.
if requireAll {
return fmt.Errorf("some sends failed: %w", retErr)
}

n.log.Warn().Err(retErr).Msg("some sends failed, proceeding")

return nil
}
}

func (n *Node) publish(ctx context.Context, msg blockless.Message) error {
Expand Down
159 changes: 101 additions & 58 deletions node/message_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,102 +3,137 @@ package node
import (
"context"
"encoding/json"
"math/rand"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/blocklessnetwork/b7s/host"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/testing/mocks"
)

func TestNode_Messaging(t *testing.T) {
func TestNode_SendMessage(t *testing.T) {

const (
topic = DefaultTopic
)
client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

rec := newDummyRecord()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)

var received dummyRecord
getStreamPayload(t, stream, &received)

require.Equal(t, rec, received)
})

err = node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)

wg.Wait()
}

func TestNode_Publish(t *testing.T) {

var (
rec = dummyRecord{
ID: mocks.GenericUUID.String(),
Value: 19846,
Description: "dummy-description",
}
rec = newDummyRecord()
ctx = context.Background()
topic = DefaultTopic
)

client, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = client.InitPubSub(ctx)
require.NoError(t, err)

node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client)

t.Run("sending single message", func(t *testing.T) {
t.Parallel()

var wg sync.WaitGroup
wg.Add(1)

client.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) {
defer wg.Done()
defer stream.Close()
err = node.subscribeToTopics(ctx)
require.NoError(t, err)

from := stream.Conn().RemotePeer()
require.Equal(t, node.host.ID(), from)
// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)

var received dummyRecord
getStreamPayload(t, stream, &received)
// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)

require.Equal(t, rec, received)
})
time.Sleep(subscriptionDiseminationPause)

err := node.send(context.Background(), client.ID(), rec)
require.NoError(t, err)
err = node.publish(ctx, rec)
require.NoError(t, err)

wg.Wait()
})
t.Run("publishing to a topic", func(t *testing.T) {
t.Parallel()
deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
require.NoError(t, err)

ctx := context.Background()
from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

err = client.InitPubSub(ctx)
require.NoError(t, err)
var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
require.NoError(t, err)
require.Equal(t, rec, received)
}

// Establish a connection between peers.
clientInfo := hostGetAddrInfo(t, client)
err = node.host.Connect(ctx, *clientInfo)
require.NoError(t, err)
func TestNode_SendMessageToMany(t *testing.T) {

// Have both client and node subscribe to the same topic.
_, subscription, err := client.Subscribe(topic)
require.NoError(t, err)
client1, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

err = node.subscribeToTopics(ctx)
require.NoError(t, err)
client2, err := host.New(mocks.NoopLogger, loopback, 0)
require.NoError(t, err)

time.Sleep(subscriptionDiseminationPause)
node := createNode(t, blockless.HeadNode)
hostAddNewPeer(t, node.host, client1)
hostAddNewPeer(t, node.host, client2)

err = node.publish(ctx, rec)
require.NoError(t, err)
client1.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})
client2.SetStreamHandler(blockless.ProtocolID, func(network.Stream) {})

deadlineCtx, cancel := context.WithTimeout(ctx, publishTimeout)
defer cancel()
msg, err := subscription.Next(deadlineCtx)
// NOTE: These subtests are sequential.
t.Run("nominal case - sending to two online peers is ok", func(t *testing.T) {
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.NoError(t, err)

from := msg.ReceivedFrom
require.Equal(t, node.host.ID(), from)
require.NotNil(t, msg.Topic)
require.Equal(t, topic, *msg.Topic)

var received dummyRecord
err = json.Unmarshal(msg.Data, &received)
})
t.Run("peer is down with requireAll is an error", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), true)
require.Error(t, err)
})
t.Run("peer is down with partial delivery is ok", func(t *testing.T) {
client1.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.NoError(t, err)

require.Equal(t, rec, received)
})
t.Run("all sends failing produces an error", func(t *testing.T) {
client1.Close()
client2.Close()
err = node.sendToMany(context.Background(), []peer.ID{client1.ID(), client2.ID()}, newDummyRecord(), false)
require.Error(t, err)
})
}

Expand All @@ -111,3 +146,11 @@ type dummyRecord struct {
func (dummyRecord) Type() string {
return "MessageDummyRecord"
}

func newDummyRecord() dummyRecord {
return dummyRecord{
ID: mocks.GenericUUID.String(),
Value: rand.Uint64(),
Description: "dummy-description",
}
}
Loading