From 8f73f7d60cdcc2039f9219a5ecab6c0a7429c51c Mon Sep 17 00:00:00 2001 From: Maelkum Date: Thu, 7 Sep 2023 21:20:08 +0200 Subject: [PATCH 01/10] Integration tests for a PBFT cluster --- node/cluster.go | 2 +- node/cluster_pbft_integration_test.go | 255 ++++++++++++++++++++++++++ node/execute_integration_test.go | 2 +- node/node_integration_test.go | 22 ++- 4 files changed, 272 insertions(+), 9 deletions(-) create mode 100644 node/cluster_pbft_integration_test.go diff --git a/node/cluster.go b/node/cluster.go index 5fa0a609..6a747d62 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -174,7 +174,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error { return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err) } - log.Info().Err(err).Strs("peers", blockless.PeerIDsToStr(replicas)).Msg("sent cluster disband request") + n.log.Info().Err(err).Str("request", requestID).Strs("peers", blockless.PeerIDsToStr(replicas)).Msg("sent cluster disband request") return nil } diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go new file mode 100644 index 00000000..b96a3069 --- /dev/null +++ b/node/cluster_pbft_integration_test.go @@ -0,0 +1,255 @@ +//go:build integration +// +build integration + +package node_test + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/blocklessnetwork/b7s/consensus" + "github.com/blocklessnetwork/b7s/consensus/pbft" + "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/codes" + "github.com/blocklessnetwork/b7s/models/response" +) + +func TestNode_PBFT_ExecuteComplete(t *testing.T) { + + const ( + testTimeLimit = 1 * time.Minute + + dirPattern = "b7s-node-pbft-execute-integration-test-" + + cid = "whatever-cid" + + // Paths where files will be hosted on the test server. + manifestEndpoint = "/hello-manifest.json" + archiveEndpoint = "/hello-deployment.tar.gz" + testFunctionToServe = "testdata/hello.tar.gz" + functionMethod = "hello.wasm" + + expectedExecutionResult = `This is the start of my program +The answer is 42 +This is the end of my program +` + ) + + cleanupDisabled := cleanupDisabled() + + var verifiedExecution bool + + t.Log("starting test") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set a hard limit for test duration. + // This looks a bit sketchy as tests can have the time limit + // set externally, but as there's a lot of moving pieces here, + // include it for better usability. + go func() { + <-time.After(testTimeLimit) + cancel() + t.Log("cancelling test") + }() + + // Phase 0: Create libp2p hosts, loggers, temporary directories and nodes. + nodeDir := fmt.Sprintf("%v-head-", dirPattern) + head := instantiateNode(t, nodeDir, blockless.HeadNode) + t.Logf("head node workspace: %s", head.dir) + + var workers []*nodeScaffolding + for i := 0; i < 4; i++ { + nodeDir := fmt.Sprintf("%v-worker-%v-", dirPattern, i) + + worker := instantiateNode(t, nodeDir, blockless.WorkerNode) + t.Logf("worker node #%v workspace: %s", i, worker.dir) + + workers = append(workers, worker) + } + + workerIDs := make([]peer.ID, 0, len(workers)) + for _, worker := range workers { + workerIDs = append(workerIDs, worker.host.ID()) + } + + // Cleanup everything after test is complete. + defer func() { + head.db.Close() + head.logFile.Close() + if !cleanupDisabled { + os.RemoveAll(head.dir) + } + + for _, worker := range workers { + worker.db.Close() + worker.logFile.Close() + if !cleanupDisabled { + os.RemoveAll(worker.dir) + } + } + }() + + var nodes []*nodeScaffolding + nodes = append(nodes, head) + nodes = append(nodes, workers...) + + t.Log("created nodes") + + // Phase 1: Setup connections. + + // Client that will issue and receive request. + client := createClient(t) + + // Add hosts to each others peer stores so that they know how to contact each other, and then establish connections. + for i := 0; i < len(nodes); i++ { + for j := 0; j < len(nodes); j++ { + if j == i { + continue + } + hostAddNewPeer(t, client.host, nodes[i].host) + hostAddNewPeer(t, nodes[i].host, nodes[j].host) + hostAddNewPeer(t, nodes[j].host, nodes[i].host) + + // Establish a connection so that hosts disseminate topic subscription info. + info := hostGetAddrInfo(t, nodes[j].host) + err := nodes[i].host.Connect(ctx, *info) + require.NoError(t, err) + } + } + + // Phase 2: Start nodes. + t.Log("starting nodes") + + // We start nodes in separate goroutines. + var nodesWG sync.WaitGroup + nodesWG.Add(len(nodes)) + + for _, node := range nodes { + go func(node *nodeScaffolding) { + defer nodesWG.Done() + + err := node.node.Run(ctx) + require.NoError(t, err) + + t.Log("node stopped") + }(node) + } + + // Add a delay for the hosts to subscribe to topics, + // diseminate subscription information etc. + time.Sleep(startupDelay) + + t.Log("starting function server") + + // Phase 3: Create the server hosting the manifest and the function. + + srv := createFunctionServer(t, manifestEndpoint, archiveEndpoint, testFunctionToServe, cid) + defer srv.Close() + + // Phase 4: Have the worker nodes install the function. + // That way, when he receives the execution request - he will have the function needed to execute it. + + t.Log("instructing worker node to install function") + + var installWG sync.WaitGroup + installWG.Add(len(workers)) + + // Setup verifier for the response we expect. + client.host.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer installWG.Done() + defer stream.Close() + + from := stream.Conn().RemotePeer() + require.Contains(t, workerIDs, from) + + var res response.InstallFunction + getStreamPayload(t, stream, &res) + + require.Equal(t, blockless.MessageInstallFunctionResponse, res.Type) + require.Equal(t, codes.Accepted, res.Code) + require.Equal(t, "installed", res.Message) + + t.Log("client received function install response") + }) + + manifestURL := fmt.Sprintf("%v%v", srv.URL, manifestEndpoint) + for _, worker := range workers { + err := client.sendInstallMessage(ctx, worker.host.ID(), manifestURL, cid) + require.NoError(t, err) + } + + // Wait for the installation request to be processed. + installWG.Wait() + + t.Log("worker node installed function") + + // Phase 5: Request execution from the head node. + + t.Log("sending execution request") + + // Setup verifier for the response we expect. + var executeWG sync.WaitGroup + + executeWG.Add(1) + client.host.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer executeWG.Done() + defer stream.Close() + + t.Log("client received execution response") + + var res response.Execute + getStreamPayload(t, stream, &res) + + payload, _ := json.Marshal(res) + fmt.Printf("%s\n", payload) + + require.Equal(t, blockless.MessageExecuteResponse, res.Type) + require.Equal(t, codes.OK, res.Code) + require.NotEmpty(t, res.RequestID) + + require.Len(t, res.Cluster.Peers, len(workers)) + + // Verify cluster nodes are the workers we created. + require.ElementsMatch(t, workerIDs, res.Cluster.Peers) + + require.GreaterOrEqual(t, uint(len(res.Results)), pbft.MinClusterResults(uint(len(workers)))) + + for peer, exres := range res.Results { + require.Contains(t, workerIDs, peer) + require.Equal(t, expectedExecutionResult, exres.Result.Stdout) + } + + t.Log("client verified execution response") + + verifiedExecution = true + }) + + err := client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod, consensus.PBFT, len(workers)) + require.NoError(t, err) + + executeWG.Wait() + + t.Log("execution request processed") + + // Since we're done, we can cancel the context, leading to stopping of the nodes. + cancel() + + nodesWG.Wait() + + t.Log("nodes shutdown") + + t.Log("test complete") + + require.True(t, verifiedExecution) +} diff --git a/node/execute_integration_test.go b/node/execute_integration_test.go index 6ce1f255..17a4e351 100644 --- a/node/execute_integration_test.go +++ b/node/execute_integration_test.go @@ -189,7 +189,7 @@ This is the end of my program verifiedExecution = true }) - err = client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod) + err = client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod, 0, 1) require.NoError(t, err) executeWG.Wait() diff --git a/node/node_integration_test.go b/node/node_integration_test.go index cd68cc8e..f5f7780a 100644 --- a/node/node_integration_test.go +++ b/node/node_integration_test.go @@ -22,10 +22,12 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "github.com/blocklessnetwork/b7s/consensus" "github.com/blocklessnetwork/b7s/executor" "github.com/blocklessnetwork/b7s/fstore" "github.com/blocklessnetwork/b7s/host" "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/execute" "github.com/blocklessnetwork/b7s/models/request" "github.com/blocklessnetwork/b7s/node" "github.com/blocklessnetwork/b7s/peerstore" @@ -51,11 +53,9 @@ type nodeScaffolding struct { node *node.Node } -func instantiateNode(t *testing.T, dirnamePattern string, role blockless.NodeRole) *nodeScaffolding { +func instantiateNode(t *testing.T, nodeDir string, role blockless.NodeRole) *nodeScaffolding { t.Helper() - nodeDir := fmt.Sprintf("%v-%v-", dirnamePattern, role.String()) - // Bootstrap node directory. dir, err := os.MkdirTemp("", nodeDir) require.NoError(t, err) @@ -165,12 +165,20 @@ func (c *client) sendInstallMessage(ctx context.Context, to peer.ID, manifestURL return nil } -func (c *client) sendExecutionMessage(ctx context.Context, to peer.ID, cid string, method string) error { +func (c *client) sendExecutionMessage(ctx context.Context, to peer.ID, cid string, method string, consensus consensus.Type, count int) error { req := request.Execute{ - Type: blockless.MessageExecute, - FunctionID: cid, - Method: method, + Type: blockless.MessageExecute, + Request: execute.Request{ + FunctionID: cid, + Method: method, + Config: execute.Config{ + NodeCount: count, + }, + }, + } + if consensus.Valid() { + req.Config.ConsensusAlgorithm = consensus.String() } payload, err := json.Marshal(req) From 936c8f3119eab8fe05caaacd67691d8500cbd574 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Mon, 11 Sep 2023 15:44:44 +0200 Subject: [PATCH 02/10] Add node shutdown --- cmd/node/main.go | 7 +++++++ node/cluster.go | 2 +- node/cluster_pbft_integration_test.go | 2 ++ node/consensus.go | 4 ++-- node/node.go | 24 ++++++++++++++++++++++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/cmd/node/main.go b/cmd/node/main.go index 87cb08f7..b52b7c07 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -110,6 +110,7 @@ func run() int { log.Error().Err(err).Str("key", cfg.Host.PrivateKey).Msg("could not create host") return failure } + defer host.Close() log.Info(). Str("id", host.ID().String()). @@ -184,6 +185,12 @@ func run() int { log.Error().Err(err).Msg("could not create node") return failure } + defer func() { + err := node.Shutdown() + if err != nil { + log.Error().Err(err).Msg("Blockless node shutdown failed") + } + }() // Create the main context. ctx, cancel := context.WithCancel(context.Background()) diff --git a/node/cluster.go b/node/cluster.go index 6a747d62..a496bc6a 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -83,7 +83,7 @@ func (n *Node) processDisbandCluster(ctx context.Context, from peer.ID, payload n.log.Info().Str("peer", from.String()).Str("request", req.RequestID).Msg("received request to disband consensus cluster") - err = n.leaveCluster(req.RequestID) + err = n.leaveCluster(req.RequestID, consensusClusterDisbandTimeout) if err != nil { return fmt.Errorf("could not disband cluster (request: %s): %w", req.RequestID, err) } diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go index b96a3069..64de3410 100644 --- a/node/cluster_pbft_integration_test.go +++ b/node/cluster_pbft_integration_test.go @@ -66,6 +66,7 @@ This is the end of my program // Phase 0: Create libp2p hosts, loggers, temporary directories and nodes. nodeDir := fmt.Sprintf("%v-head-", dirPattern) head := instantiateNode(t, nodeDir, blockless.HeadNode) + defer head.node.Shutdown() t.Logf("head node workspace: %s", head.dir) var workers []*nodeScaffolding @@ -73,6 +74,7 @@ This is the end of my program nodeDir := fmt.Sprintf("%v-worker-%v-", dirPattern, i) worker := instantiateNode(t, nodeDir, blockless.WorkerNode) + defer worker.node.Shutdown() t.Logf("worker node #%v workspace: %s", i, worker.dir) workers = append(workers, worker) diff --git a/node/consensus.go b/node/consensus.go index eced6f9f..77fc8a06 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -123,7 +123,7 @@ func (n *Node) createPBFTCluster(ctx context.Context, from peer.ID, fc request.F return nil } -func (n *Node) leaveCluster(requestID string) error { +func (n *Node) leaveCluster(requestID string, timeout time.Duration) error { // Shutdown can take a while so use short locking intervals. n.clusterLock.RLock() @@ -136,7 +136,7 @@ func (n *Node) leaveCluster(requestID string) error { n.log.Info().Str("consensus", cluster.Consensus().String()).Str("request", requestID).Msg("leaving consensus cluster") - ctx, cancel := context.WithTimeout(context.Background(), consensusClusterDisbandTimeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // We know that the request is done executing when we have a result for it. diff --git a/node/node.go b/node/node.go index 369e5453..102fbcc6 100644 --- a/node/node.go +++ b/node/node.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/google/uuid" + "github.com/hashicorp/go-multierror" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" @@ -84,6 +85,29 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore return n, nil } +// Shutdown shuts down the node. +func (n *Node) Shutdown() error { + + n.host.RemoveStreamHandler(blockless.ProtocolID) + + // Only thing we do here is leave all consensus clusters we're part of. + n.clusterLock.Lock() + defer n.clusterLock.Unlock() + + var ( + multierr *multierror.Error + ) + + for request := range n.clusters { + err := n.leaveCluster(request, 0) + if err != nil { + multierr = multierror.Append(multierr, fmt.Errorf("could not shutdown cluster (id: %s): : %w", request, err)) + } + } + + return multierr.ErrorOrNil() +} + // ID returns the ID of this node. func (n *Node) ID() string { return n.host.ID().String() From f8e12a981eb15a7efd1e2b4db09ff9baeda382f0 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Mon, 11 Sep 2023 19:19:39 +0200 Subject: [PATCH 03/10] Revert previous change --- cmd/node/main.go | 6 ------ node/cluster_pbft_integration_test.go | 2 -- node/node.go | 24 ------------------------ 3 files changed, 32 deletions(-) diff --git a/cmd/node/main.go b/cmd/node/main.go index b52b7c07..ac05c70f 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -185,12 +185,6 @@ func run() int { log.Error().Err(err).Msg("could not create node") return failure } - defer func() { - err := node.Shutdown() - if err != nil { - log.Error().Err(err).Msg("Blockless node shutdown failed") - } - }() // Create the main context. ctx, cancel := context.WithCancel(context.Background()) diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go index 64de3410..b96a3069 100644 --- a/node/cluster_pbft_integration_test.go +++ b/node/cluster_pbft_integration_test.go @@ -66,7 +66,6 @@ This is the end of my program // Phase 0: Create libp2p hosts, loggers, temporary directories and nodes. nodeDir := fmt.Sprintf("%v-head-", dirPattern) head := instantiateNode(t, nodeDir, blockless.HeadNode) - defer head.node.Shutdown() t.Logf("head node workspace: %s", head.dir) var workers []*nodeScaffolding @@ -74,7 +73,6 @@ This is the end of my program nodeDir := fmt.Sprintf("%v-worker-%v-", dirPattern, i) worker := instantiateNode(t, nodeDir, blockless.WorkerNode) - defer worker.node.Shutdown() t.Logf("worker node #%v workspace: %s", i, worker.dir) workers = append(workers, worker) diff --git a/node/node.go b/node/node.go index 102fbcc6..369e5453 100644 --- a/node/node.go +++ b/node/node.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/google/uuid" - "github.com/hashicorp/go-multierror" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" @@ -85,29 +84,6 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore return n, nil } -// Shutdown shuts down the node. -func (n *Node) Shutdown() error { - - n.host.RemoveStreamHandler(blockless.ProtocolID) - - // Only thing we do here is leave all consensus clusters we're part of. - n.clusterLock.Lock() - defer n.clusterLock.Unlock() - - var ( - multierr *multierror.Error - ) - - for request := range n.clusters { - err := n.leaveCluster(request, 0) - if err != nil { - multierr = multierror.Append(multierr, fmt.Errorf("could not shutdown cluster (id: %s): : %w", request, err)) - } - } - - return multierr.ErrorOrNil() -} - // ID returns the ID of this node. func (n *Node) ID() string { return n.host.ID().String() From 7b14112cdf61e03b80b2a3af86a5fe6f12810924 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Mon, 11 Sep 2023 23:59:31 +0200 Subject: [PATCH 04/10] Shutdown head node last --- node/cluster_pbft_integration_test.go | 42 ++++++++++++--------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go index b96a3069..c0d86c37 100644 --- a/node/cluster_pbft_integration_test.go +++ b/node/cluster_pbft_integration_test.go @@ -5,7 +5,6 @@ package node_test import ( "context" - "encoding/json" "fmt" "os" "sync" @@ -50,19 +49,6 @@ This is the end of my program t.Log("starting test") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Set a hard limit for test duration. - // This looks a bit sketchy as tests can have the time limit - // set externally, but as there's a lot of moving pieces here, - // include it for better usability. - go func() { - <-time.After(testTimeLimit) - cancel() - t.Log("cancelling test") - }() - // Phase 0: Create libp2p hosts, loggers, temporary directories and nodes. nodeDir := fmt.Sprintf("%v-head-", dirPattern) head := instantiateNode(t, nodeDir, blockless.HeadNode) @@ -85,12 +71,6 @@ This is the end of my program // Cleanup everything after test is complete. defer func() { - head.db.Close() - head.logFile.Close() - if !cleanupDisabled { - os.RemoveAll(head.dir) - } - for _, worker := range workers { worker.db.Close() worker.logFile.Close() @@ -98,6 +78,12 @@ This is the end of my program os.RemoveAll(worker.dir) } } + + head.db.Close() + head.logFile.Close() + if !cleanupDisabled { + os.RemoveAll(head.dir) + } }() var nodes []*nodeScaffolding @@ -106,6 +92,19 @@ This is the end of my program t.Log("created nodes") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set a hard limit for test duration. + // This looks a bit sketchy as tests can have the time limit + // set externally, but as there's a lot of moving pieces here, + // include it for better usability. + go func() { + <-time.After(testTimeLimit) + cancel() + t.Log("cancelling test") + }() + // Phase 1: Setup connections. // Client that will issue and receive request. @@ -211,9 +210,6 @@ This is the end of my program var res response.Execute getStreamPayload(t, stream, &res) - payload, _ := json.Marshal(res) - fmt.Printf("%s\n", payload) - require.Equal(t, blockless.MessageExecuteResponse, res.Type) require.Equal(t, codes.OK, res.Code) require.NotEmpty(t, res.RequestID) From 904785849555524eb39c45b563a6e1c4e6b8af8f Mon Sep 17 00:00:00 2001 From: Maelkum Date: Tue, 12 Sep 2023 00:26:14 +0200 Subject: [PATCH 05/10] Execute method acquires state lock --- consensus/pbft/execute.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index f01b759c..ebc5245d 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -15,6 +15,10 @@ import ( // Execute fullfils the consensus interface by inserting the request into the pipeline. func (r *Replica) Execute(client peer.ID, requestID string, timestamp time.Time, req execute.Request) (codes.Code, execute.Result, error) { + // Modifying state, so acquire state lock now. + r.sl.Lock() + defer r.sl.Unlock() + request := Request{ ID: requestID, Timestamp: timestamp, From 8fa69978ccf6905f30811e5b99ca9b902e12a573 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Tue, 12 Sep 2023 05:37:53 +0200 Subject: [PATCH 06/10] Fix race conditions --- node/execute_integration_test.go | 4 ++-- node/handlers_internal_test.go | 2 ++ node/message_internal_test.go | 4 ++-- node/queue.go | 3 +-- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/node/execute_integration_test.go b/node/execute_integration_test.go index 17a4e351..d9b20ebb 100644 --- a/node/execute_integration_test.go +++ b/node/execute_integration_test.go @@ -104,7 +104,7 @@ This is the end of my program go func() { defer nodesWG.Done() - err = head.node.Run(ctx) + err := head.node.Run(ctx) require.NoError(t, err) t.Log("head node stopped") @@ -113,7 +113,7 @@ This is the end of my program go func() { defer nodesWG.Done() - err = worker.node.Run(ctx) + err := worker.node.Run(ctx) require.NoError(t, err) t.Log("worker node stopped") diff --git a/node/handlers_internal_test.go b/node/handlers_internal_test.go index 0508076b..4c5deb70 100644 --- a/node/handlers_internal_test.go +++ b/node/handlers_internal_test.go @@ -78,6 +78,8 @@ func TestNode_Handlers(t *testing.T) { requestID = "dummy-request-id-2" ) + node.rollCall.create(requestID) + // We only want responses with the code `Accepted`. res := response.RollCall{ Type: blockless.MessageRollCallResponse, diff --git a/node/message_internal_test.go b/node/message_internal_test.go index a5657c54..d7967993 100644 --- a/node/message_internal_test.go +++ b/node/message_internal_test.go @@ -54,7 +54,7 @@ func TestNode_Messaging(t *testing.T) { require.Equal(t, rec, received) }) - err = node.send(context.Background(), client.ID(), rec) + err := node.send(context.Background(), client.ID(), rec) require.NoError(t, err) wg.Wait() @@ -66,7 +66,7 @@ func TestNode_Messaging(t *testing.T) { // Establish a connection between peers. clientInfo := hostGetAddrInfo(t, client) - err = node.host.Connect(ctx, *clientInfo) + err := node.host.Connect(ctx, *clientInfo) require.NoError(t, err) // Have both client and node subscribe to the same topic. diff --git a/node/queue.go b/node/queue.go index acfd53d6..00587d3f 100644 --- a/node/queue.go +++ b/node/queue.go @@ -63,6 +63,7 @@ func (q *rollCallQueue) exists(reqID string) bool { // responses will return a channel that can be used to iterate through all of the responses. func (q *rollCallQueue) responses(reqID string) <-chan response.RollCall { q.Lock() + defer q.Unlock() _, ok := q.m[reqID] if !ok { @@ -70,8 +71,6 @@ func (q *rollCallQueue) responses(reqID string) <-chan response.RollCall { q.m[reqID] = make(chan response.RollCall, q.size) } - q.Unlock() - return q.m[reqID] } From 99973d5b38a4b31ce01168941d8269d72207f5a2 Mon Sep 17 00:00:00 2001 From: Maelkum Date: Tue, 12 Sep 2023 19:11:51 +0200 Subject: [PATCH 07/10] Fix imports and go mod tidy --- cmd/keyforge/key_management.go | 17 ++++++++--------- cmd/keyforge/sign_verify.go | 6 +++--- go.mod | 3 +-- go.sum | 2 -- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/cmd/keyforge/key_management.go b/cmd/keyforge/key_management.go index 39436900..106c5a64 100644 --- a/cmd/keyforge/key_management.go +++ b/cmd/keyforge/key_management.go @@ -2,13 +2,12 @@ package main import ( "encoding/base64" - "io/ioutil" "log" "os" "path/filepath" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" ) // LoadOrCreateKeys loads existing keys or creates new ones if not present @@ -23,7 +22,7 @@ func LoadOrCreateKeys(privKeyFile string, outputDir string) (crypto.PrivKey, cry return nil, nil, err } } else { - privBytes, err := ioutil.ReadFile(privKeyFile) + privBytes, err := os.ReadFile(privKeyFile) if err != nil { return nil, nil, err } @@ -51,33 +50,33 @@ func LoadOrCreateKeys(privKeyFile string, outputDir string) (crypto.PrivKey, cry } pubKeyFile := filepath.Join(outputDir, pubKeyName) - err = ioutil.WriteFile(pubKeyFile, pubPayload, pubKeyPermissions) + err = os.WriteFile(pubKeyFile, pubPayload, pubKeyPermissions) if err != nil { log.Fatalf("Could not write public key to file: %s", err) } pubKeyTextFile := filepath.Join(outputDir, pubKeyTxtName) pubKeyBase64 := base64.StdEncoding.EncodeToString(pubPayload) - err = ioutil.WriteFile(pubKeyTextFile, []byte(pubKeyBase64), pubKeyPermissions) + err = os.WriteFile(pubKeyTextFile, []byte(pubKeyBase64), pubKeyPermissions) if err != nil { log.Fatalf("Could not write public key text to file: %s", err) } identityFile := filepath.Join(outputDir, identityName) - err = ioutil.WriteFile(identityFile, []byte(identity.Pretty()), pubKeyPermissions) + err = os.WriteFile(identityFile, []byte(identity.Pretty()), pubKeyPermissions) if err != nil { log.Fatalf("Could not write identity to file: %s", err) } privKeyFile = filepath.Join(outputDir, privKeyName) - err = ioutil.WriteFile(privKeyFile, privPayload, privKeyPermissions) + err = os.WriteFile(privKeyFile, privPayload, privKeyPermissions) if err != nil { log.Fatalf("Could not write private key to file: %s", err) } // Write peer ID to file peerIDFile := filepath.Join(outputDir, peerIDFileName) - err = ioutil.WriteFile(peerIDFile, []byte(identity.Pretty()), pubKeyPermissions) + err = os.WriteFile(peerIDFile, []byte(identity.Pretty()), pubKeyPermissions) if err != nil { log.Fatalf("Could not write peer ID to file: %s", err) } diff --git a/cmd/keyforge/sign_verify.go b/cmd/keyforge/sign_verify.go index fe5929ae..45357260 100644 --- a/cmd/keyforge/sign_verify.go +++ b/cmd/keyforge/sign_verify.go @@ -7,8 +7,8 @@ import ( "log" "path/filepath" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" ) // HandleSignAndVerify performs the signing and verification based on the provided keys and flags @@ -117,6 +117,6 @@ func VerifyGivenSignatureWithPeerID(peerIDStr string, message string, encodedSig // This will give you a base64 string representation of the public key pubKeyBase64 := base64.StdEncoding.EncodeToString(pubKeyBytes) log.Printf("Extracted public key from PeerID (Base64): %s", pubKeyBase64) - + VerifyGivenSignature(pubKeyBase64, message, encodedSignature) } diff --git a/go.mod b/go.mod index f451b2f6..1a0f67c1 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,6 @@ require ( github.com/ipfs/boxo v0.12.0 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/libp2p/go-libp2p-consensus v0.0.1 // indirect - github.com/libp2p/go-libp2p-core v0.20.1 // indirect github.com/libp2p/go-libp2p-gostream v0.6.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/onsi/ginkgo/v2 v2.12.0 // indirect @@ -93,7 +92,7 @@ require ( github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.3.1 github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect diff --git a/go.sum b/go.sum index 4af3f66b..a591c3af 100644 --- a/go.sum +++ b/go.sum @@ -366,8 +366,6 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-consensus v0.0.1 h1:jcVbHRZLwTXU9iT/mPi+Lx4/OrIzq3bU1TbZNhYFCV8= github.com/libp2p/go-libp2p-consensus v0.0.1/go.mod h1:+9Wrfhc5QOqWB0gXI0m6ARlkHfdJpcFXmRU0WoHz4Mo= -github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= -github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= github.com/libp2p/go-libp2p-kad-dht v0.25.0 h1:T2SXQ/VlXTQVLChWY/+OyOsmGMRJvB5kiR+eJt7jtvI= From e594494198715a1dd25d5bc11340586536ed434e Mon Sep 17 00:00:00 2001 From: Maelkum Date: Tue, 12 Sep 2023 19:41:53 +0200 Subject: [PATCH 08/10] Fix typos --- cmd/bootstrap-limiter/main.go | 2 +- fstore/sync.go | 2 +- host/host.go | 2 +- models/execute/response.go | 2 +- node/execution_results.go | 2 +- node/node.go | 2 +- node/roll_call.go | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/bootstrap-limiter/main.go b/cmd/bootstrap-limiter/main.go index 44e0f320..3184882b 100644 --- a/cmd/bootstrap-limiter/main.go +++ b/cmd/bootstrap-limiter/main.go @@ -109,7 +109,7 @@ func run() int { return failure } - // Chown directory to be owned by the original user runing sudo. + // Chown directory to be owned by the original user running sudo. err = chownRecursive(target, int(id), -1) if err != nil { log.Printf("could not set owner for the cgroup: %s", err) diff --git a/fstore/sync.go b/fstore/sync.go index e2a46d5e..6e28b8e0 100644 --- a/fstore/sync.go +++ b/fstore/sync.go @@ -83,7 +83,7 @@ func (h *FStore) Sync(cid string) error { } // checkFunctionFiles checks if the files required by the function are found on local storage. -// It returns two booleans indicating presense of the archive file, the unpacked files, and a potential error. +// It returns two booleans indicating presence of the archive file, the unpacked files, and a potential error. func (h *FStore) checkFunctionFiles(fn functionRecord) (bool, bool, error) { // Check if the archive is found. diff --git a/host/host.go b/host/host.go index d395a4e2..f34682f7 100644 --- a/host/host.go +++ b/host/host.go @@ -37,7 +37,7 @@ func New(log zerolog.Logger, address string, port uint, options ...func(*Config) if cfg.Websocket { - // If the TCP and websocket port are explicitely chosen and set to the same value, one of the two listens will silently fail. + // If the TCP and websocket port are explicitly chosen and set to the same value, one of the two listens will silently fail. if port == cfg.WebsocketPort && cfg.WebsocketPort != 0 { return nil, fmt.Errorf("TCP and websocket ports cannot be the same (TCP: %v, Websocket: %v)", port, cfg.WebsocketPort) } diff --git a/models/execute/response.go b/models/execute/response.go index 3210729c..6feddf38 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -23,7 +23,7 @@ type Cluster struct { Peers []peer.ID `json:"peers,omitempty"` } -// RuntimeOutput describes the output produced by the Blockless Runtime during exection. +// RuntimeOutput describes the output produced by the Blockless Runtime during execution. type RuntimeOutput struct { Stdout string `json:"stdout"` Stderr string `json:"stderr"` diff --git a/node/execution_results.go b/node/execution_results.go index f6d5699d..6be6c7d6 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -84,7 +84,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, result.peers = append(result.peers, sender) if uint(len(result.peers)) >= count { - n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough maching results") + n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough matching results") exCancel() for _, peer := range result.peers { diff --git a/node/node.go b/node/node.go index 369e5453..0f18c86a 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,7 @@ import ( // It listens for messages coming from the wire and processes them. Depending on the // node role, which is determined on construction, it may process messages in different ways. // For example, upon receiving a message requesting execution of a Blockless function, -// a Worker Node will use the `Execute` component to fullfill the execution request. +// a Worker Node will use the `Execute` component to fulfill the execution request. // On the other hand, a Head Node will issue a roll call and eventually // delegate the execution to the chosend Worker Node. type Node struct { diff --git a/node/roll_call.go b/node/roll_call.go index 3ab4f701..ecaaa96b 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -77,7 +77,7 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte log.Info().Str("origin", req.Origin.String()).Msg("reporting for roll call") - // Send postive response. + // Send positive response. res.Code = codes.Accepted err = n.send(ctx, req.Origin, res) if err != nil { From ce19b1ed7d6ebc3eb184eb6c1580cce219e9c09f Mon Sep 17 00:00:00 2001 From: Maelkum Date: Tue, 12 Sep 2023 19:46:01 +0200 Subject: [PATCH 09/10] Gofmt --- cmd/keyforge/main.go | 3 +- cmd/manager/bin.go | 132 ++++++++++++++++++++--------------------- cmd/manager/main.go | 21 +++---- cmd/manager/service.go | 2 +- 4 files changed, 77 insertions(+), 81 deletions(-) diff --git a/cmd/keyforge/main.go b/cmd/keyforge/main.go index f9df12b9..bc3fe475 100644 --- a/cmd/keyforge/main.go +++ b/cmd/keyforge/main.go @@ -16,7 +16,6 @@ const ( peerIDFileName = "peerid.txt" privKeyPermissions = 0600 pubKeyPermissions = 0644 - ) func main() { @@ -60,7 +59,7 @@ func main() { if flagPublicKey != "" && flagMessage != "" && flagSignature != "" { VerifyGivenSignature(flagPublicKey, flagMessage, flagSignature) } - + if flagPeerID != "" && flagMessage != "" && flagSignature != "" { VerifyGivenSignatureWithPeerID(flagPeerID, flagMessage, flagSignature) } diff --git a/cmd/manager/bin.go b/cmd/manager/bin.go index a540eeac..b2f2f58c 100644 --- a/cmd/manager/bin.go +++ b/cmd/manager/bin.go @@ -15,68 +15,68 @@ import ( ) func installBinary(url, folder string) { - usr, err := user.Current() - if err != nil { - log.Fatal(err) - } - - targetPath := filepath.Join(usr.HomeDir, folder) - os.MkdirAll(targetPath, os.ModePerm) - - resp, err := http.Get(url) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - - archiveData, err := io.ReadAll(resp.Body) - if err != nil { - log.Fatal(err) - } - - gzipReader, err := gzip.NewReader(bytes.NewReader(archiveData)) - if err != nil { - log.Fatal(err) - } - defer gzipReader.Close() - - tarReader := tar.NewReader(gzipReader) - - for { - header, err := tarReader.Next() - if err == io.EOF { - break - } - if err != nil { - log.Fatal(err) - } - - path := filepath.Join(targetPath, header.Name) - switch header.Typeflag { - case tar.TypeDir: - if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil { - log.Fatal(err) - } - - case tar.TypeReg: - outFile, err := os.Create(path) - if err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(outFile, tarReader); err != nil { - log.Fatal(err) - } - - outFile.Close() - - if err := os.Chmod(path, os.FileMode(header.Mode)); err != nil { - log.Fatal(err) - } - - log.Printf("File %s installed in %s", header.Name, targetPath) - } - } + usr, err := user.Current() + if err != nil { + log.Fatal(err) + } + + targetPath := filepath.Join(usr.HomeDir, folder) + os.MkdirAll(targetPath, os.ModePerm) + + resp, err := http.Get(url) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + archiveData, err := io.ReadAll(resp.Body) + if err != nil { + log.Fatal(err) + } + + gzipReader, err := gzip.NewReader(bytes.NewReader(archiveData)) + if err != nil { + log.Fatal(err) + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + + path := filepath.Join(targetPath, header.Name) + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil { + log.Fatal(err) + } + + case tar.TypeReg: + outFile, err := os.Create(path) + if err != nil { + log.Fatal(err) + } + + if _, err := io.Copy(outFile, tarReader); err != nil { + log.Fatal(err) + } + + outFile.Close() + + if err := os.Chmod(path, os.FileMode(header.Mode)); err != nil { + log.Fatal(err) + } + + log.Printf("File %s installed in %s", header.Name, targetPath) + } + } } func installB7s(baseURL, version string) { @@ -105,18 +105,18 @@ func installRuntime(baseURL, version string) { arch := runtime.GOARCH platform := runtime.GOOS - if(platform == "darwin") { + if platform == "darwin" { platform = "macos" } - if(arch == "amd64") { + if arch == "amd64" { arch = "x86_64" } - if(arch == "arm64") { + if arch == "arm64" { arch = "aarch64" } url := fmt.Sprintf("%s/%s/blockless-runtime.%s-latest.%s.tar.gz", baseURL, version, platform, arch) installBinary(url, ".b7s/runtime") -} \ No newline at end of file +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 2af9ffc5..cc3725cf 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -56,7 +56,6 @@ func main() { <-ctx.Done() } - func makeBasicHost(listenPort int, insecure bool, privKeyPath string) (host.Host, error) { priv, err := loadPrivateKey(privKeyPath) if err != nil { @@ -76,20 +75,18 @@ func makeBasicHost(listenPort int, insecure bool, privKeyPath string) (host.Host return libp2p.New(opts...) } - - func loadPrivateKey(filePath string) (crypto.PrivKey, error) { - keyBytes, err := ioutil.ReadFile(filePath) - if err != nil { - return nil, err - } + keyBytes, err := ioutil.ReadFile(filePath) + if err != nil { + return nil, err + } - priv, err := crypto.UnmarshalPrivateKey(keyBytes) - if err != nil { - return nil, err - } + priv, err := crypto.UnmarshalPrivateKey(keyBytes) + if err != nil { + return nil, err + } - return priv, nil + return priv, nil } func getHostAddress(ha host.Host) string { diff --git a/cmd/manager/service.go b/cmd/manager/service.go index 01d7b479..7857e2f9 100644 --- a/cmd/manager/service.go +++ b/cmd/manager/service.go @@ -178,4 +178,4 @@ func removeMacOSService() { func removeWindowsService() { log.Fatal("Removing a service on Windows is not supported in this code.") -} \ No newline at end of file +} From cf6334193d25047f1e72fb82c6306342efa32abd Mon Sep 17 00:00:00 2001 From: Maelkum Date: Wed, 13 Sep 2023 13:52:37 +0200 Subject: [PATCH 10/10] Add go reportcard, license and release version badges --- README.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4a4cdc33..2069bad5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ -![Coverage](https://img.shields.io/badge/Coverage-64.5%25-yellow) +[![Coverage](https://img.shields.io/badge/Coverage-64.5%25-yellow)](https://img.shields.io/badge/Coverage-64.5%25-yellow) +[![Go Report Card](https://goreportcard.com/badge/github.com/blocklessnetwork/b7s)](https://goreportcard.com/report/github.com/blocklessnetwork/b7s) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/blocklessnetwork/b7s/blob/main/LICENSE.md) +[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/blocklessnetwork/b7s)](https://img.shields.io/github/v/release/blocklessnetwork/b7s) + # b7s daemon