diff --git a/README.md b/README.md index 4a4cdc33..2069bad5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ -![Coverage](https://img.shields.io/badge/Coverage-64.5%25-yellow) +[![Coverage](https://img.shields.io/badge/Coverage-64.5%25-yellow)](https://img.shields.io/badge/Coverage-64.5%25-yellow) +[![Go Report Card](https://goreportcard.com/badge/github.com/blocklessnetwork/b7s)](https://goreportcard.com/report/github.com/blocklessnetwork/b7s) +[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/blocklessnetwork/b7s/blob/main/LICENSE.md) +[![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/blocklessnetwork/b7s)](https://img.shields.io/github/v/release/blocklessnetwork/b7s) + # b7s daemon diff --git a/cmd/bootstrap-limiter/main.go b/cmd/bootstrap-limiter/main.go index 44e0f320..3184882b 100644 --- a/cmd/bootstrap-limiter/main.go +++ b/cmd/bootstrap-limiter/main.go @@ -109,7 +109,7 @@ func run() int { return failure } - // Chown directory to be owned by the original user runing sudo. + // Chown directory to be owned by the original user running sudo. err = chownRecursive(target, int(id), -1) if err != nil { log.Printf("could not set owner for the cgroup: %s", err) diff --git a/cmd/keyforge/key_management.go b/cmd/keyforge/key_management.go index 39436900..106c5a64 100644 --- a/cmd/keyforge/key_management.go +++ b/cmd/keyforge/key_management.go @@ -2,13 +2,12 @@ package main import ( "encoding/base64" - "io/ioutil" "log" "os" "path/filepath" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" ) // LoadOrCreateKeys loads existing keys or creates new ones if not present @@ -23,7 +22,7 @@ func LoadOrCreateKeys(privKeyFile string, outputDir string) (crypto.PrivKey, cry return nil, nil, err } } else { - privBytes, err := ioutil.ReadFile(privKeyFile) + privBytes, err := os.ReadFile(privKeyFile) if err != nil { return nil, nil, err } @@ -51,33 +50,33 @@ func LoadOrCreateKeys(privKeyFile string, outputDir string) (crypto.PrivKey, cry } pubKeyFile := filepath.Join(outputDir, pubKeyName) - err = ioutil.WriteFile(pubKeyFile, pubPayload, pubKeyPermissions) + err = os.WriteFile(pubKeyFile, pubPayload, pubKeyPermissions) if err != nil { log.Fatalf("Could not write public key to file: %s", err) } pubKeyTextFile := filepath.Join(outputDir, pubKeyTxtName) pubKeyBase64 := base64.StdEncoding.EncodeToString(pubPayload) - err = ioutil.WriteFile(pubKeyTextFile, []byte(pubKeyBase64), pubKeyPermissions) + err = os.WriteFile(pubKeyTextFile, []byte(pubKeyBase64), pubKeyPermissions) if err != nil { log.Fatalf("Could not write public key text to file: %s", err) } identityFile := filepath.Join(outputDir, identityName) - err = ioutil.WriteFile(identityFile, []byte(identity.Pretty()), pubKeyPermissions) + err = os.WriteFile(identityFile, []byte(identity.Pretty()), pubKeyPermissions) if err != nil { log.Fatalf("Could not write identity to file: %s", err) } privKeyFile = filepath.Join(outputDir, privKeyName) - err = ioutil.WriteFile(privKeyFile, privPayload, privKeyPermissions) + err = os.WriteFile(privKeyFile, privPayload, privKeyPermissions) if err != nil { log.Fatalf("Could not write private key to file: %s", err) } // Write peer ID to file peerIDFile := filepath.Join(outputDir, peerIDFileName) - err = ioutil.WriteFile(peerIDFile, []byte(identity.Pretty()), pubKeyPermissions) + err = os.WriteFile(peerIDFile, []byte(identity.Pretty()), pubKeyPermissions) if err != nil { log.Fatalf("Could not write peer ID to file: %s", err) } diff --git a/cmd/keyforge/main.go b/cmd/keyforge/main.go index f9df12b9..bc3fe475 100644 --- a/cmd/keyforge/main.go +++ b/cmd/keyforge/main.go @@ -16,7 +16,6 @@ const ( peerIDFileName = "peerid.txt" privKeyPermissions = 0600 pubKeyPermissions = 0644 - ) func main() { @@ -60,7 +59,7 @@ func main() { if flagPublicKey != "" && flagMessage != "" && flagSignature != "" { VerifyGivenSignature(flagPublicKey, flagMessage, flagSignature) } - + if flagPeerID != "" && flagMessage != "" && flagSignature != "" { VerifyGivenSignatureWithPeerID(flagPeerID, flagMessage, flagSignature) } diff --git a/cmd/keyforge/sign_verify.go b/cmd/keyforge/sign_verify.go index fe5929ae..45357260 100644 --- a/cmd/keyforge/sign_verify.go +++ b/cmd/keyforge/sign_verify.go @@ -7,8 +7,8 @@ import ( "log" "path/filepath" - "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" ) // HandleSignAndVerify performs the signing and verification based on the provided keys and flags @@ -117,6 +117,6 @@ func VerifyGivenSignatureWithPeerID(peerIDStr string, message string, encodedSig // This will give you a base64 string representation of the public key pubKeyBase64 := base64.StdEncoding.EncodeToString(pubKeyBytes) log.Printf("Extracted public key from PeerID (Base64): %s", pubKeyBase64) - + VerifyGivenSignature(pubKeyBase64, message, encodedSignature) } diff --git a/cmd/manager/bin.go b/cmd/manager/bin.go index a540eeac..b2f2f58c 100644 --- a/cmd/manager/bin.go +++ b/cmd/manager/bin.go @@ -15,68 +15,68 @@ import ( ) func installBinary(url, folder string) { - usr, err := user.Current() - if err != nil { - log.Fatal(err) - } - - targetPath := filepath.Join(usr.HomeDir, folder) - os.MkdirAll(targetPath, os.ModePerm) - - resp, err := http.Get(url) - if err != nil { - log.Fatal(err) - } - defer resp.Body.Close() - - archiveData, err := io.ReadAll(resp.Body) - if err != nil { - log.Fatal(err) - } - - gzipReader, err := gzip.NewReader(bytes.NewReader(archiveData)) - if err != nil { - log.Fatal(err) - } - defer gzipReader.Close() - - tarReader := tar.NewReader(gzipReader) - - for { - header, err := tarReader.Next() - if err == io.EOF { - break - } - if err != nil { - log.Fatal(err) - } - - path := filepath.Join(targetPath, header.Name) - switch header.Typeflag { - case tar.TypeDir: - if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil { - log.Fatal(err) - } - - case tar.TypeReg: - outFile, err := os.Create(path) - if err != nil { - log.Fatal(err) - } - - if _, err := io.Copy(outFile, tarReader); err != nil { - log.Fatal(err) - } - - outFile.Close() - - if err := os.Chmod(path, os.FileMode(header.Mode)); err != nil { - log.Fatal(err) - } - - log.Printf("File %s installed in %s", header.Name, targetPath) - } - } + usr, err := user.Current() + if err != nil { + log.Fatal(err) + } + + targetPath := filepath.Join(usr.HomeDir, folder) + os.MkdirAll(targetPath, os.ModePerm) + + resp, err := http.Get(url) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + archiveData, err := io.ReadAll(resp.Body) + if err != nil { + log.Fatal(err) + } + + gzipReader, err := gzip.NewReader(bytes.NewReader(archiveData)) + if err != nil { + log.Fatal(err) + } + defer gzipReader.Close() + + tarReader := tar.NewReader(gzipReader) + + for { + header, err := tarReader.Next() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + + path := filepath.Join(targetPath, header.Name) + switch header.Typeflag { + case tar.TypeDir: + if err := os.MkdirAll(path, os.FileMode(header.Mode)); err != nil { + log.Fatal(err) + } + + case tar.TypeReg: + outFile, err := os.Create(path) + if err != nil { + log.Fatal(err) + } + + if _, err := io.Copy(outFile, tarReader); err != nil { + log.Fatal(err) + } + + outFile.Close() + + if err := os.Chmod(path, os.FileMode(header.Mode)); err != nil { + log.Fatal(err) + } + + log.Printf("File %s installed in %s", header.Name, targetPath) + } + } } func installB7s(baseURL, version string) { @@ -105,18 +105,18 @@ func installRuntime(baseURL, version string) { arch := runtime.GOARCH platform := runtime.GOOS - if(platform == "darwin") { + if platform == "darwin" { platform = "macos" } - if(arch == "amd64") { + if arch == "amd64" { arch = "x86_64" } - if(arch == "arm64") { + if arch == "arm64" { arch = "aarch64" } url := fmt.Sprintf("%s/%s/blockless-runtime.%s-latest.%s.tar.gz", baseURL, version, platform, arch) installBinary(url, ".b7s/runtime") -} \ No newline at end of file +} diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 2af9ffc5..cc3725cf 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -56,7 +56,6 @@ func main() { <-ctx.Done() } - func makeBasicHost(listenPort int, insecure bool, privKeyPath string) (host.Host, error) { priv, err := loadPrivateKey(privKeyPath) if err != nil { @@ -76,20 +75,18 @@ func makeBasicHost(listenPort int, insecure bool, privKeyPath string) (host.Host return libp2p.New(opts...) } - - func loadPrivateKey(filePath string) (crypto.PrivKey, error) { - keyBytes, err := ioutil.ReadFile(filePath) - if err != nil { - return nil, err - } + keyBytes, err := ioutil.ReadFile(filePath) + if err != nil { + return nil, err + } - priv, err := crypto.UnmarshalPrivateKey(keyBytes) - if err != nil { - return nil, err - } + priv, err := crypto.UnmarshalPrivateKey(keyBytes) + if err != nil { + return nil, err + } - return priv, nil + return priv, nil } func getHostAddress(ha host.Host) string { diff --git a/cmd/manager/service.go b/cmd/manager/service.go index 01d7b479..7857e2f9 100644 --- a/cmd/manager/service.go +++ b/cmd/manager/service.go @@ -178,4 +178,4 @@ func removeMacOSService() { func removeWindowsService() { log.Fatal("Removing a service on Windows is not supported in this code.") -} \ No newline at end of file +} diff --git a/cmd/node/main.go b/cmd/node/main.go index 87cb08f7..ac05c70f 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -110,6 +110,7 @@ func run() int { log.Error().Err(err).Str("key", cfg.Host.PrivateKey).Msg("could not create host") return failure } + defer host.Close() log.Info(). Str("id", host.ID().String()). diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index f01b759c..ebc5245d 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -15,6 +15,10 @@ import ( // Execute fullfils the consensus interface by inserting the request into the pipeline. func (r *Replica) Execute(client peer.ID, requestID string, timestamp time.Time, req execute.Request) (codes.Code, execute.Result, error) { + // Modifying state, so acquire state lock now. + r.sl.Lock() + defer r.sl.Unlock() + request := Request{ ID: requestID, Timestamp: timestamp, diff --git a/fstore/sync.go b/fstore/sync.go index e2a46d5e..6e28b8e0 100644 --- a/fstore/sync.go +++ b/fstore/sync.go @@ -83,7 +83,7 @@ func (h *FStore) Sync(cid string) error { } // checkFunctionFiles checks if the files required by the function are found on local storage. -// It returns two booleans indicating presense of the archive file, the unpacked files, and a potential error. +// It returns two booleans indicating presence of the archive file, the unpacked files, and a potential error. func (h *FStore) checkFunctionFiles(fn functionRecord) (bool, bool, error) { // Check if the archive is found. diff --git a/go.mod b/go.mod index f451b2f6..1a0f67c1 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,6 @@ require ( github.com/ipfs/boxo v0.12.0 // indirect github.com/labstack/gommon v0.4.0 // indirect github.com/libp2p/go-libp2p-consensus v0.0.1 // indirect - github.com/libp2p/go-libp2p-core v0.20.1 // indirect github.com/libp2p/go-libp2p-gostream v0.6.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/onsi/ginkgo/v2 v2.12.0 // indirect @@ -93,7 +92,7 @@ require ( github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.3.1 github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect diff --git a/go.sum b/go.sum index 4af3f66b..a591c3af 100644 --- a/go.sum +++ b/go.sum @@ -366,8 +366,6 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-consensus v0.0.1 h1:jcVbHRZLwTXU9iT/mPi+Lx4/OrIzq3bU1TbZNhYFCV8= github.com/libp2p/go-libp2p-consensus v0.0.1/go.mod h1:+9Wrfhc5QOqWB0gXI0m6ARlkHfdJpcFXmRU0WoHz4Mo= -github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw= -github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= github.com/libp2p/go-libp2p-kad-dht v0.25.0 h1:T2SXQ/VlXTQVLChWY/+OyOsmGMRJvB5kiR+eJt7jtvI= diff --git a/host/host.go b/host/host.go index d395a4e2..f34682f7 100644 --- a/host/host.go +++ b/host/host.go @@ -37,7 +37,7 @@ func New(log zerolog.Logger, address string, port uint, options ...func(*Config) if cfg.Websocket { - // If the TCP and websocket port are explicitely chosen and set to the same value, one of the two listens will silently fail. + // If the TCP and websocket port are explicitly chosen and set to the same value, one of the two listens will silently fail. if port == cfg.WebsocketPort && cfg.WebsocketPort != 0 { return nil, fmt.Errorf("TCP and websocket ports cannot be the same (TCP: %v, Websocket: %v)", port, cfg.WebsocketPort) } diff --git a/models/execute/response.go b/models/execute/response.go index 3210729c..6feddf38 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -23,7 +23,7 @@ type Cluster struct { Peers []peer.ID `json:"peers,omitempty"` } -// RuntimeOutput describes the output produced by the Blockless Runtime during exection. +// RuntimeOutput describes the output produced by the Blockless Runtime during execution. type RuntimeOutput struct { Stdout string `json:"stdout"` Stderr string `json:"stderr"` diff --git a/node/cluster.go b/node/cluster.go index 5fa0a609..a496bc6a 100644 --- a/node/cluster.go +++ b/node/cluster.go @@ -83,7 +83,7 @@ func (n *Node) processDisbandCluster(ctx context.Context, from peer.ID, payload n.log.Info().Str("peer", from.String()).Str("request", req.RequestID).Msg("received request to disband consensus cluster") - err = n.leaveCluster(req.RequestID) + err = n.leaveCluster(req.RequestID, consensusClusterDisbandTimeout) if err != nil { return fmt.Errorf("could not disband cluster (request: %s): %w", req.RequestID, err) } @@ -174,7 +174,7 @@ func (n *Node) disbandCluster(requestID string, replicas []peer.ID) error { return fmt.Errorf("could not send cluster disband request (request: %s): %w", requestID, err) } - log.Info().Err(err).Strs("peers", blockless.PeerIDsToStr(replicas)).Msg("sent cluster disband request") + n.log.Info().Err(err).Str("request", requestID).Strs("peers", blockless.PeerIDsToStr(replicas)).Msg("sent cluster disband request") return nil } diff --git a/node/cluster_pbft_integration_test.go b/node/cluster_pbft_integration_test.go new file mode 100644 index 00000000..c0d86c37 --- /dev/null +++ b/node/cluster_pbft_integration_test.go @@ -0,0 +1,251 @@ +//go:build integration +// +build integration + +package node_test + +import ( + "context" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + + "github.com/blocklessnetwork/b7s/consensus" + "github.com/blocklessnetwork/b7s/consensus/pbft" + "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/codes" + "github.com/blocklessnetwork/b7s/models/response" +) + +func TestNode_PBFT_ExecuteComplete(t *testing.T) { + + const ( + testTimeLimit = 1 * time.Minute + + dirPattern = "b7s-node-pbft-execute-integration-test-" + + cid = "whatever-cid" + + // Paths where files will be hosted on the test server. + manifestEndpoint = "/hello-manifest.json" + archiveEndpoint = "/hello-deployment.tar.gz" + testFunctionToServe = "testdata/hello.tar.gz" + functionMethod = "hello.wasm" + + expectedExecutionResult = `This is the start of my program +The answer is 42 +This is the end of my program +` + ) + + cleanupDisabled := cleanupDisabled() + + var verifiedExecution bool + + t.Log("starting test") + + // Phase 0: Create libp2p hosts, loggers, temporary directories and nodes. + nodeDir := fmt.Sprintf("%v-head-", dirPattern) + head := instantiateNode(t, nodeDir, blockless.HeadNode) + t.Logf("head node workspace: %s", head.dir) + + var workers []*nodeScaffolding + for i := 0; i < 4; i++ { + nodeDir := fmt.Sprintf("%v-worker-%v-", dirPattern, i) + + worker := instantiateNode(t, nodeDir, blockless.WorkerNode) + t.Logf("worker node #%v workspace: %s", i, worker.dir) + + workers = append(workers, worker) + } + + workerIDs := make([]peer.ID, 0, len(workers)) + for _, worker := range workers { + workerIDs = append(workerIDs, worker.host.ID()) + } + + // Cleanup everything after test is complete. + defer func() { + for _, worker := range workers { + worker.db.Close() + worker.logFile.Close() + if !cleanupDisabled { + os.RemoveAll(worker.dir) + } + } + + head.db.Close() + head.logFile.Close() + if !cleanupDisabled { + os.RemoveAll(head.dir) + } + }() + + var nodes []*nodeScaffolding + nodes = append(nodes, head) + nodes = append(nodes, workers...) + + t.Log("created nodes") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Set a hard limit for test duration. + // This looks a bit sketchy as tests can have the time limit + // set externally, but as there's a lot of moving pieces here, + // include it for better usability. + go func() { + <-time.After(testTimeLimit) + cancel() + t.Log("cancelling test") + }() + + // Phase 1: Setup connections. + + // Client that will issue and receive request. + client := createClient(t) + + // Add hosts to each others peer stores so that they know how to contact each other, and then establish connections. + for i := 0; i < len(nodes); i++ { + for j := 0; j < len(nodes); j++ { + if j == i { + continue + } + hostAddNewPeer(t, client.host, nodes[i].host) + hostAddNewPeer(t, nodes[i].host, nodes[j].host) + hostAddNewPeer(t, nodes[j].host, nodes[i].host) + + // Establish a connection so that hosts disseminate topic subscription info. + info := hostGetAddrInfo(t, nodes[j].host) + err := nodes[i].host.Connect(ctx, *info) + require.NoError(t, err) + } + } + + // Phase 2: Start nodes. + t.Log("starting nodes") + + // We start nodes in separate goroutines. + var nodesWG sync.WaitGroup + nodesWG.Add(len(nodes)) + + for _, node := range nodes { + go func(node *nodeScaffolding) { + defer nodesWG.Done() + + err := node.node.Run(ctx) + require.NoError(t, err) + + t.Log("node stopped") + }(node) + } + + // Add a delay for the hosts to subscribe to topics, + // diseminate subscription information etc. + time.Sleep(startupDelay) + + t.Log("starting function server") + + // Phase 3: Create the server hosting the manifest and the function. + + srv := createFunctionServer(t, manifestEndpoint, archiveEndpoint, testFunctionToServe, cid) + defer srv.Close() + + // Phase 4: Have the worker nodes install the function. + // That way, when he receives the execution request - he will have the function needed to execute it. + + t.Log("instructing worker node to install function") + + var installWG sync.WaitGroup + installWG.Add(len(workers)) + + // Setup verifier for the response we expect. + client.host.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer installWG.Done() + defer stream.Close() + + from := stream.Conn().RemotePeer() + require.Contains(t, workerIDs, from) + + var res response.InstallFunction + getStreamPayload(t, stream, &res) + + require.Equal(t, blockless.MessageInstallFunctionResponse, res.Type) + require.Equal(t, codes.Accepted, res.Code) + require.Equal(t, "installed", res.Message) + + t.Log("client received function install response") + }) + + manifestURL := fmt.Sprintf("%v%v", srv.URL, manifestEndpoint) + for _, worker := range workers { + err := client.sendInstallMessage(ctx, worker.host.ID(), manifestURL, cid) + require.NoError(t, err) + } + + // Wait for the installation request to be processed. + installWG.Wait() + + t.Log("worker node installed function") + + // Phase 5: Request execution from the head node. + + t.Log("sending execution request") + + // Setup verifier for the response we expect. + var executeWG sync.WaitGroup + + executeWG.Add(1) + client.host.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer executeWG.Done() + defer stream.Close() + + t.Log("client received execution response") + + var res response.Execute + getStreamPayload(t, stream, &res) + + require.Equal(t, blockless.MessageExecuteResponse, res.Type) + require.Equal(t, codes.OK, res.Code) + require.NotEmpty(t, res.RequestID) + + require.Len(t, res.Cluster.Peers, len(workers)) + + // Verify cluster nodes are the workers we created. + require.ElementsMatch(t, workerIDs, res.Cluster.Peers) + + require.GreaterOrEqual(t, uint(len(res.Results)), pbft.MinClusterResults(uint(len(workers)))) + + for peer, exres := range res.Results { + require.Contains(t, workerIDs, peer) + require.Equal(t, expectedExecutionResult, exres.Result.Stdout) + } + + t.Log("client verified execution response") + + verifiedExecution = true + }) + + err := client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod, consensus.PBFT, len(workers)) + require.NoError(t, err) + + executeWG.Wait() + + t.Log("execution request processed") + + // Since we're done, we can cancel the context, leading to stopping of the nodes. + cancel() + + nodesWG.Wait() + + t.Log("nodes shutdown") + + t.Log("test complete") + + require.True(t, verifiedExecution) +} diff --git a/node/consensus.go b/node/consensus.go index eced6f9f..77fc8a06 100644 --- a/node/consensus.go +++ b/node/consensus.go @@ -123,7 +123,7 @@ func (n *Node) createPBFTCluster(ctx context.Context, from peer.ID, fc request.F return nil } -func (n *Node) leaveCluster(requestID string) error { +func (n *Node) leaveCluster(requestID string, timeout time.Duration) error { // Shutdown can take a while so use short locking intervals. n.clusterLock.RLock() @@ -136,7 +136,7 @@ func (n *Node) leaveCluster(requestID string) error { n.log.Info().Str("consensus", cluster.Consensus().String()).Str("request", requestID).Msg("leaving consensus cluster") - ctx, cancel := context.WithTimeout(context.Background(), consensusClusterDisbandTimeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // We know that the request is done executing when we have a result for it. diff --git a/node/execute_integration_test.go b/node/execute_integration_test.go index 6ce1f255..d9b20ebb 100644 --- a/node/execute_integration_test.go +++ b/node/execute_integration_test.go @@ -104,7 +104,7 @@ This is the end of my program go func() { defer nodesWG.Done() - err = head.node.Run(ctx) + err := head.node.Run(ctx) require.NoError(t, err) t.Log("head node stopped") @@ -113,7 +113,7 @@ This is the end of my program go func() { defer nodesWG.Done() - err = worker.node.Run(ctx) + err := worker.node.Run(ctx) require.NoError(t, err) t.Log("worker node stopped") @@ -189,7 +189,7 @@ This is the end of my program verifiedExecution = true }) - err = client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod) + err = client.sendExecutionMessage(ctx, head.host.ID(), cid, functionMethod, 0, 1) require.NoError(t, err) executeWG.Wait() diff --git a/node/execution_results.go b/node/execution_results.go index f6d5699d..6be6c7d6 100644 --- a/node/execution_results.go +++ b/node/execution_results.go @@ -84,7 +84,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, result.peers = append(result.peers, sender) if uint(len(result.peers)) >= count { - n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough maching results") + n.log.Info().Str("request", requestID).Int("peers", len(peers)).Uint("matching_results", count).Msg("have enough matching results") exCancel() for _, peer := range result.peers { diff --git a/node/handlers_internal_test.go b/node/handlers_internal_test.go index 0508076b..4c5deb70 100644 --- a/node/handlers_internal_test.go +++ b/node/handlers_internal_test.go @@ -78,6 +78,8 @@ func TestNode_Handlers(t *testing.T) { requestID = "dummy-request-id-2" ) + node.rollCall.create(requestID) + // We only want responses with the code `Accepted`. res := response.RollCall{ Type: blockless.MessageRollCallResponse, diff --git a/node/message_internal_test.go b/node/message_internal_test.go index a5657c54..d7967993 100644 --- a/node/message_internal_test.go +++ b/node/message_internal_test.go @@ -54,7 +54,7 @@ func TestNode_Messaging(t *testing.T) { require.Equal(t, rec, received) }) - err = node.send(context.Background(), client.ID(), rec) + err := node.send(context.Background(), client.ID(), rec) require.NoError(t, err) wg.Wait() @@ -66,7 +66,7 @@ func TestNode_Messaging(t *testing.T) { // Establish a connection between peers. clientInfo := hostGetAddrInfo(t, client) - err = node.host.Connect(ctx, *clientInfo) + err := node.host.Connect(ctx, *clientInfo) require.NoError(t, err) // Have both client and node subscribe to the same topic. diff --git a/node/node.go b/node/node.go index 369e5453..0f18c86a 100644 --- a/node/node.go +++ b/node/node.go @@ -19,7 +19,7 @@ import ( // It listens for messages coming from the wire and processes them. Depending on the // node role, which is determined on construction, it may process messages in different ways. // For example, upon receiving a message requesting execution of a Blockless function, -// a Worker Node will use the `Execute` component to fullfill the execution request. +// a Worker Node will use the `Execute` component to fulfill the execution request. // On the other hand, a Head Node will issue a roll call and eventually // delegate the execution to the chosend Worker Node. type Node struct { diff --git a/node/node_integration_test.go b/node/node_integration_test.go index 24041f27..f5f7780a 100644 --- a/node/node_integration_test.go +++ b/node/node_integration_test.go @@ -22,6 +22,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/require" + "github.com/blocklessnetwork/b7s/consensus" "github.com/blocklessnetwork/b7s/executor" "github.com/blocklessnetwork/b7s/fstore" "github.com/blocklessnetwork/b7s/host" @@ -52,11 +53,9 @@ type nodeScaffolding struct { node *node.Node } -func instantiateNode(t *testing.T, dirnamePattern string, role blockless.NodeRole) *nodeScaffolding { +func instantiateNode(t *testing.T, nodeDir string, role blockless.NodeRole) *nodeScaffolding { t.Helper() - nodeDir := fmt.Sprintf("%v-%v-", dirnamePattern, role.String()) - // Bootstrap node directory. dir, err := os.MkdirTemp("", nodeDir) require.NoError(t, err) @@ -166,15 +165,21 @@ func (c *client) sendInstallMessage(ctx context.Context, to peer.ID, manifestURL return nil } -func (c *client) sendExecutionMessage(ctx context.Context, to peer.ID, cid string, method string) error { +func (c *client) sendExecutionMessage(ctx context.Context, to peer.ID, cid string, method string, consensus consensus.Type, count int) error { req := request.Execute{ Type: blockless.MessageExecute, Request: execute.Request{ FunctionID: cid, Method: method, + Config: execute.Config{ + NodeCount: count, + }, }, } + if consensus.Valid() { + req.Config.ConsensusAlgorithm = consensus.String() + } payload, err := json.Marshal(req) if err != nil { diff --git a/node/queue.go b/node/queue.go index acfd53d6..00587d3f 100644 --- a/node/queue.go +++ b/node/queue.go @@ -63,6 +63,7 @@ func (q *rollCallQueue) exists(reqID string) bool { // responses will return a channel that can be used to iterate through all of the responses. func (q *rollCallQueue) responses(reqID string) <-chan response.RollCall { q.Lock() + defer q.Unlock() _, ok := q.m[reqID] if !ok { @@ -70,8 +71,6 @@ func (q *rollCallQueue) responses(reqID string) <-chan response.RollCall { q.m[reqID] = make(chan response.RollCall, q.size) } - q.Unlock() - return q.m[reqID] } diff --git a/node/roll_call.go b/node/roll_call.go index 3ab4f701..ecaaa96b 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -77,7 +77,7 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte log.Info().Str("origin", req.Origin.String()).Msg("reporting for roll call") - // Send postive response. + // Send positive response. res.Code = codes.Accepted err = n.send(ctx, req.Origin, res) if err != nil {