Skip to content

Commit

Permalink
Merge pull request #1504 from orbs-network/bugfix/multicast-failure
Browse files Browse the repository at this point in the history
Bugfix/multicast failure
  • Loading branch information
electricmonk authored Dec 17, 2019
2 parents 9a43136 + d18edde commit 16582f6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
49 changes: 47 additions & 2 deletions services/gossip/adapter/tcp/direct_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func TestDirectTransport_HandlesStartupWithEmptyPeerList(t *testing.T) {

func TestDirectTransport_SupportsTopologyChangeInRuntime(t *testing.T) {
with.Concurrency(t, func(ctx context.Context, harness *with.ConcurrencyHarness) {
harness.AllowErrorsMatching("failed sending gossip message") // because the test will send to node3 which is not in topology

node1 := aNode(ctx, harness.Logger)
node2 := aNode(ctx, harness.Logger)
node3 := aNode(ctx, harness.Logger)
Expand Down Expand Up @@ -86,12 +88,16 @@ func TestDirectTransport_SupportsTopologyChangeInRuntime(t *testing.T) {

node1.requireSendsSuccessfullyTo(t, ctx, node4)
node1.requireSendsSuccessfullyTo(t, ctx, node2)
require.Error(t, node2.transport.Send(ctx, &adapter.TransportData{
node2.listener.ExpectNotReceive()

node2.transport.Send(ctx, &adapter.TransportData{
SenderNodeAddress: node2.address,
RecipientMode: gossipmessages.RECIPIENT_LIST_MODE_LIST,
RecipientNodeAddresses: []primitives.NodeAddress{node3.address},
Payloads: aMessage(),
}), "node 2 was able to send a message to node 3 which is no longer a part of its topology")
})

require.NoError(t, test.ConsistentlyVerify(test.EVENTUALLY_ADAPTER_TIMEOUT, node1.listener, node2.listener, node3.listener), "node 2 was able to send a message to node 3 which is no longer a part of its topology")
})
}

Expand Down Expand Up @@ -136,6 +142,45 @@ func TestDirectTransport_SupportsBroadcastTransmissions(t *testing.T) {
})
}

func TestDirectTransport_FailsGracefullyIfMulticastFailedToSendToASingleRecipient(t *testing.T) {
with.Concurrency(t, func(ctx context.Context, harness *with.ConcurrencyHarness) {
harness.AllowErrorsMatching("failed sending gossip message") // because the test will send to an arbitrary recipient which is not in topology

node1 := aNode(ctx, harness.Logger)
node2 := aNode(ctx, harness.Logger)
superviseAll(harness, node1, node2)
defer shutdownAll(ctx, node1, node2)

waitForAllNodesToSatisfy(t, "server did not start", func(node *nodeHarness) bool { return node.transport.IsServerListening() }, node1, node2)

firstTopology := aTopologyContaining(node1, node2)
node1.transport.UpdateTopology(ctx, firstTopology)
node2.transport.UpdateTopology(ctx, firstTopology)

waitForAllNodesToSatisfy(t,
"expected all nodes to have peers added",
func(node *nodeHarness) bool { return len(node.transport.outgoingConnections.activeConnections) > 0 },
node1, node2)

waitForAllNodesToSatisfy(t,
"expected all outgoing queues to become enabled after topology change",
func(node *nodeHarness) bool { return node.transport.allOutgoingQueuesEnabled() },
node1, node2)

payloads := aMessage()

node2.listener.ExpectReceive(payloads)
require.NoError(t, node1.transport.Send(ctx, &adapter.TransportData{
SenderNodeAddress: node1.address,
RecipientMode: gossipmessages.RECIPIENT_LIST_MODE_LIST,
RecipientNodeAddresses: []primitives.NodeAddress{{0x1}, node2.address},
Payloads: payloads,
}))

require.NoError(t, test.EventuallyVerify(test.EVENTUALLY_ADAPTER_TIMEOUT, node2.listener), "message was not sent to target node")
})
}

type nodeHarness struct {
transport *DirectTransport
address primitives.NodeAddress
Expand Down
3 changes: 2 additions & 1 deletion services/gossip/adapter/tcp/outgoing_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (c *outgoingConnections) send(ctx context.Context, data *adapter.TransportD
client.addDataToOutgoingPeerQueue(ctx, data)
c.metrics.messageSize.Record(int64(data.TotalSize()))
} else {
return errors.Errorf("unknown recipient public key: %s", recipientPublicKey.String())
err := errors.Errorf("unknown recipient public key: %s", recipientPublicKey.String())
c.logger.Error("failed sending gossip message", log.Error(err), log.Stringable("recipient-public-key", recipientPublicKey))
}
}
return nil
Expand Down

0 comments on commit 16582f6

Please sign in to comment.