From 20626451376a27af2d1473d9b0bc5bbc209f4de0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20=C4=8Cekrli=C4=87?= Date: Wed, 12 Apr 2023 23:50:17 +0200 Subject: [PATCH] Install function on roll call if it wasn't installed already (#78) * Rename function handler/interface to fstore in node code * Add method to check if a function is installed * Separate installing and getting a function * Update tests * Clarify naming - REST API endpoint publishes function install - doesnt actually install it * Add method for function install * Function install done on roll call * Improve log messages * Update tests * Remove "store" as a dependency for the node --- api/install.go | 2 +- api/install_test.go | 8 +- api/node.go | 2 +- cmd/node/main.go | 2 +- fstore/fstore.go | 2 +- fstore/get.go | 81 +-------------- fstore/get_test.go | 158 +--------------------------- fstore/install.go | 99 ++++++++++++++++++ fstore/install_test.go | 175 ++++++++++++++++++++++++++++++++ host/dht.go | 3 - node/execute.go | 21 +--- node/execute_internal_test.go | 16 +-- node/function.go | 16 ++- node/function_manifest.go | 21 ---- node/handlers.go | 41 -------- node/handlers_internal_test.go | 9 +- node/health_internal_test.go | 3 +- node/install.go | 78 ++++++++++++++ node/node.go | 8 +- node/node_integration_test.go | 2 +- node/node_internal_test.go | 12 +-- node/notifiee_internal_test.go | 3 +- node/rest.go | 6 +- node/roll_call.go | 67 ++++++------ node/roll_call_internal_test.go | 66 ++++++++++-- node/sync.go | 4 +- testing/mocks/fstore.go | 24 ++++- testing/mocks/generic.go | 2 +- testing/mocks/node.go | 12 +-- 29 files changed, 518 insertions(+), 425 deletions(-) create mode 100644 fstore/install.go create mode 100644 fstore/install_test.go delete mode 100644 node/function_manifest.go create mode 100644 node/install.go diff --git a/api/install.go b/api/install.go index f7d42f38..e9736fad 100644 --- a/api/install.go +++ b/api/install.go @@ -37,7 +37,7 @@ func (a *API) Install(ctx echo.Context) error { // Start function install in a separate goroutine and signal when it's done. fnErr := make(chan error) go func() { - err = a.node.FunctionInstall(reqCtx, req.URI, req.CID) + err = a.node.PublishFunctionInstall(reqCtx, req.URI, req.CID) fnErr <- err }() diff --git a/api/install_test.go b/api/install_test.go index 96a41435..c4b772b9 100644 --- a/api/install_test.go +++ b/api/install_test.go @@ -69,7 +69,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) { ) node := mocks.BaselineNode(t) - node.FunctionInstallFunc = func(context.Context, string, string) error { + node.PublishFunctionInstallFunc = func(context.Context, string, string) error { time.Sleep(installDuration) return nil } @@ -91,17 +91,17 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) { var res = response.InstallFunction{} require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &res)) - + num, err := strconv.Atoi(res.Code) require.NoError(t, err) - + require.Equal(t, http.StatusRequestTimeout, num) }) t.Run("node fails to install function", func(t *testing.T) { t.Parallel() node := mocks.BaselineNode(t) - node.FunctionInstallFunc = func(context.Context, string, string) error { + node.PublishFunctionInstallFunc = func(context.Context, string, string) error { return mocks.GenericError } diff --git a/api/node.go b/api/node.go index 7fc657b2..1dff36bc 100644 --- a/api/node.go +++ b/api/node.go @@ -9,5 +9,5 @@ import ( type Node interface { ExecuteFunction(context.Context, execute.Request) (execute.Result, error) ExecutionResult(id string) (execute.Result, bool) - FunctionInstall(ctx context.Context, uri string, cid string) error + PublishFunctionInstall(ctx context.Context, uri string, cid string) error } diff --git a/cmd/node/main.go b/cmd/node/main.go index 128cc568..73614b07 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -171,7 +171,7 @@ func run() int { fstore := fstore.New(log, functionStore, cfg.Workspace) // Instantiate node. - node, err := node.New(log, host, functionStore, peerstore, fstore, opts...) + node, err := node.New(log, host, peerstore, fstore, opts...) if err != nil { log.Error().Err(err).Msg("could not create node") return failure diff --git a/fstore/fstore.go b/fstore/fstore.go index f82f9f30..a8d95c25 100644 --- a/fstore/fstore.go +++ b/fstore/fstore.go @@ -31,7 +31,7 @@ func New(log zerolog.Logger, store Store, workdir string) *FStore { downloader.UserAgent = defaultUserAgent h := FStore{ - log: log.With().Str("component", "function_store").Logger(), + log: log.With().Str("component", "fstore").Logger(), store: store, http: &cli, downloader: downloader, diff --git a/fstore/get.go b/fstore/get.go index 9383d869..002df8f1 100644 --- a/fstore/get.go +++ b/fstore/get.go @@ -1,89 +1,18 @@ package fstore import ( - "errors" "fmt" - "path/filepath" "github.com/blocklessnetworking/b7s/models/blockless" ) -// Get retrieves the function manifest from the given address. `useCached` indicates whether, -// if the function is found in the store/db, it should be used, or if we should re-download it. -func (h *FStore) Get(address string, cid string, useCached bool) (*blockless.FunctionManifest, error) { +// Get retrieves a function manifest for the given function from storage. +func (h *FStore) Get(cid string) (*blockless.FunctionManifest, error) { - h.log.Debug(). - Str("cid", cid). - Str("address", address). - Bool("use_cached", useCached). - Msg("getting manifest") - - cachedFn, err := h.getFunction(cid) - // Return cached version if so requested. - if err == nil && useCached { - - h.log.Debug(). - Str("cid", cid). - Str("address", address). - Msg("function manifest was already cached, done") - - return &cachedFn.Manifest, nil - } - if err != nil && !errors.Is(err, blockless.ErrNotFound) { - return nil, fmt.Errorf("could not get function from store: %w", err) - } - - // Being here means that we either did not find the manifest, or we don't - // want to use the cached one. - - // Retrieve function manifest from the given address. - var manifest blockless.FunctionManifest - err = h.getJSON(address, &manifest) + fn, err := h.getFunction(cid) if err != nil { - return nil, fmt.Errorf("could not retrieve manifest: %w", err) - } - - // If the runtime URL is specified, use it to fill in the deployment info. - if manifest.Runtime.URL != "" { - err = updateDeploymentInfo(&manifest, address) - if err != nil { - return nil, fmt.Errorf("could not update deployment info: %w", err) - } - } - - // Download the function identified by the manifest. - functionPath, err := h.download(manifest) - if err != nil { - return nil, fmt.Errorf("could not download function: %w", err) - } - - out := filepath.Join(h.workdir, cid) - - // Unpack the .tar.gz archive. - // TODO: Would be good to know the content of the .tar.gz archive. - // We're unpacking the archive here and storing the path to the .tar.gz in the DB. - err = h.unpackArchive(functionPath, out) - if err != nil { - return nil, fmt.Errorf("could not unpack gzip archive (file: %s): %w", functionPath, err) - } - - manifest.Deployment.File = functionPath - - // Store the function record. - fn := functionRecord{ - CID: cid, - URL: address, - Manifest: manifest, - Archive: functionPath, - Files: out, - } - err = h.saveFunction(fn) - if err != nil { - h.log.Error(). - Err(err). - Str("cid", cid). - Msg("could not save function record") + return nil, fmt.Errorf("could not get function from store: %w", err) } - return &manifest, nil + return &fn.Manifest, nil } diff --git a/fstore/get_test.go b/fstore/get_test.go index 6d9b8355..c4025e87 100644 --- a/fstore/get_test.go +++ b/fstore/get_test.go @@ -1,87 +1,21 @@ package fstore_test import ( - "bytes" - "crypto/sha256" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" "os" - "strings" "testing" "github.com/stretchr/testify/require" "github.com/blocklessnetworking/b7s/fstore" - "github.com/blocklessnetworking/b7s/models/blockless" - "github.com/blocklessnetworking/b7s/store" - "github.com/blocklessnetworking/b7s/testing/helpers" "github.com/blocklessnetworking/b7s/testing/mocks" ) -func TestFunction_Get(t *testing.T) { - - const ( - manifestURL = "manifest.json" - functionURL = "function.tar.gz" - testFile = "testdata/testFunction.tar.gz" - - testCID = "dummy-cid" - ) - - workdir, err := os.MkdirTemp("", "b7s-function-get-") - require.NoError(t, err) - - defer os.RemoveAll(workdir) - - functionPayload, err := os.ReadFile(testFile) - require.NoError(t, err) - - hash := sha256.Sum256(functionPayload) - - // We'll create two servers, so we can link one to the other. - msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload) - - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) - - address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) - manifest, err := fh.Get(address, testCID, false) - require.NoError(t, err) - - // Verify downloaded file. - archive := manifest.Deployment.File - require.FileExists(t, archive) - - ok := verifyFileHash(t, archive, hash) - require.Truef(t, ok, "file hash does not match") - - // Shutdown both servers and retry getting the manifest - verify that the cached manifest will be returned. - fsrv.Close() - msrv.Close() - - _, err = fh.Get(address, testCID, true) - require.NoError(t, err) -} - func TestFunction_GetHandlesErrors(t *testing.T) { const ( - manifestURL = "manifest.json" - functionURL = "function.tar.gz" - testFile = "testdata/testFunction.tar.gz" - testCID = "dummy-cid" ) - functionPayload, err := os.ReadFile(testFile) - require.NoError(t, err) - - // We'll create two servers, so we can link one to the other. - msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload) - // NOTE: Server shutdown handled in test cases below. - t.Run("handles failure to read manifest from store", func(t *testing.T) { workdir, err := os.MkdirTemp("", "b7s-function-get-") @@ -96,97 +30,7 @@ func TestFunction_GetHandlesErrors(t *testing.T) { fh := fstore.New(mocks.NoopLogger, store, workdir) - address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) - _, err = fh.Get(address, testCID, false) - require.Error(t, err) - }) - t.Run("handles failure to download function", func(t *testing.T) { - - // Shutdown function server. - fsrv.Close() - - workdir, err := os.MkdirTemp("", "b7s-function-get-") - require.NoError(t, err) - - defer os.RemoveAll(workdir) - - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) - - address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) - _, err = fh.Get(address, testCID, false) + _, err = fh.Get(testCID) require.Error(t, err) }) - t.Run("handles failure to fetch manifest", func(t *testing.T) { - - // Shutdown manifest server. - msrv.Close() - - workdir, err := os.MkdirTemp("", "b7s-function-get-") - require.NoError(t, err) - - defer os.RemoveAll(workdir) - - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) - - address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) - _, err = fh.Get(address, testCID, false) - require.Error(t, err) - }) -} - -func createServers(t *testing.T, manifestURL string, functionURL string, functionPayload []byte) (manifestSrv *httptest.Server, functionSrv *httptest.Server) { - t.Helper() - - // Create function server. - fsrv := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - - path := strings.TrimPrefix(req.URL.Path, "/") - if path != functionURL { - w.WriteHeader(http.StatusNotFound) - return - } - - w.Write(functionPayload) - })) - - // Setup manifest that points to the function server. - functionAddress := fmt.Sprintf("%s/%s", fsrv.URL, functionURL) - hash := sha256.Sum256(functionPayload) - sourceManifest := blockless.FunctionManifest{ - Deployment: blockless.Deployment{ - URI: functionAddress, - Checksum: fmt.Sprintf("%x", hash), - }, - } - - // Create manifest server. - msrv := httptest.NewServer( - http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - - path := strings.TrimPrefix(req.URL.Path, "/") - if path != manifestURL { - w.WriteHeader(http.StatusNotFound) - return - } - - payload, err := json.Marshal(sourceManifest) - require.NoError(t, err) - w.Write(payload) - })) - - return msrv, fsrv -} - -func verifyFileHash(t *testing.T, filename string, checksum [32]byte) bool { - t.Helper() - - data, err := os.ReadFile(filename) - require.NoError(t, err) - - h := sha256.Sum256(data) - - return bytes.Equal(checksum[:], h[:]) } diff --git a/fstore/install.go b/fstore/install.go new file mode 100644 index 00000000..1b404fee --- /dev/null +++ b/fstore/install.go @@ -0,0 +1,99 @@ +package fstore + +import ( + "errors" + "fmt" + "path/filepath" + + "github.com/blocklessnetworking/b7s/models/blockless" +) + +// Install will download and install function identified by the manifest/CID. +func (h *FStore) Install(address string, cid string) error { + + h.log.Debug(). + Str("cid", cid). + Str("address", address). + Msg("installing function") + + // Retrieve function manifest from the given address. + var manifest blockless.FunctionManifest + err := h.getJSON(address, &manifest) + if err != nil { + return fmt.Errorf("could not retrieve manifest: %w", err) + } + + // If the runtime URL is specified, use it to fill in the deployment info. + if manifest.Runtime.URL != "" { + err = updateDeploymentInfo(&manifest, address) + if err != nil { + return fmt.Errorf("could not update deployment info: %w", err) + } + } + + // Download the function identified by the manifest. + functionPath, err := h.download(manifest) + if err != nil { + return fmt.Errorf("could not download function: %w", err) + } + + out := filepath.Join(h.workdir, cid) + + // Unpack the .tar.gz archive. + // TODO: Would be good to know the content of the .tar.gz archive. + // We're unpacking the archive here and storing the path to the .tar.gz in the DB. + err = h.unpackArchive(functionPath, out) + if err != nil { + return fmt.Errorf("could not unpack gzip archive (file: %s): %w", functionPath, err) + } + + manifest.Deployment.File = functionPath + + // Store the function record. + fn := functionRecord{ + CID: cid, + URL: address, + Manifest: manifest, + Archive: functionPath, + Files: out, + } + err = h.saveFunction(fn) + if err != nil { + h.log.Error(). + Err(err). + Str("cid", cid). + Msg("could not save function record") + } + + h.log.Debug(). + Str("cid", cid). + Str("address", address). + Msg("installed function") + + return nil +} + +// Installed checks if the function with the given CID is installed. +func (h *FStore) Installed(cid string) (bool, error) { + + fn, err := h.getFunction(cid) + if err != nil && errors.Is(err, blockless.ErrNotFound) { + return false, nil + } + if err != nil { + return false, fmt.Errorf("could not get function from store: %w", err) + } + + haveArchive, haveFiles, err := h.checkFunctionFiles(*fn) + if err != nil { + return false, fmt.Errorf("could not verify function cache: %w", err) + } + + // If we don't have all files found, treat it as not installed. + if !haveArchive || !haveFiles { + return false, nil + } + + // We have the function in the database and all files - we're good. + return true, nil +} diff --git a/fstore/install_test.go b/fstore/install_test.go new file mode 100644 index 00000000..3cf4285b --- /dev/null +++ b/fstore/install_test.go @@ -0,0 +1,175 @@ +package fstore_test + +import ( + "bytes" + "crypto/sha256" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/blocklessnetworking/b7s/fstore" + "github.com/blocklessnetworking/b7s/models/blockless" + "github.com/blocklessnetworking/b7s/store" + "github.com/blocklessnetworking/b7s/testing/helpers" + "github.com/blocklessnetworking/b7s/testing/mocks" +) + +func TestFunction_Install(t *testing.T) { + + const ( + manifestURL = "manifest.json" + functionURL = "function.tar.gz" + testFile = "testdata/testFunction.tar.gz" + + testCID = "dummy-cid" + ) + + workdir, err := os.MkdirTemp("", "b7s-function-get-") + require.NoError(t, err) + + defer os.RemoveAll(workdir) + + functionPayload, err := os.ReadFile(testFile) + require.NoError(t, err) + + hash := sha256.Sum256(functionPayload) + + // We'll create two servers, so we can link one to the other. + msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload) + defer fsrv.Close() + defer msrv.Close() + + store := store.New(helpers.InMemoryDB(t)) + fh := fstore.New(mocks.NoopLogger, store, workdir) + + _, err = fh.Get(testCID) + require.ErrorIs(t, err, blockless.ErrNotFound) + + address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) + err = fh.Install(address, testCID) + require.NoError(t, err) + + manifest, err := fh.Get(testCID) + require.NoError(t, err) + + // Verify downloaded file. + archive := manifest.Deployment.File + require.FileExists(t, archive) + + ok := verifyFileHash(t, archive, hash) + require.Truef(t, ok, "file hash does not match") +} + +func TestFunction_InstallHandlesErrors(t *testing.T) { + + const ( + manifestURL = "manifest.json" + functionURL = "function.tar.gz" + testFile = "testdata/testFunction.tar.gz" + + testCID = "dummy-cid" + ) + + functionPayload, err := os.ReadFile(testFile) + require.NoError(t, err) + + // We'll create two servers, so we can link one to the other. + msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload) + // NOTE: Server shutdown handled in test cases below. + + t.Run("handles failure to download function", func(t *testing.T) { + + // Shutdown function server. + fsrv.Close() + + workdir, err := os.MkdirTemp("", "b7s-function-get-") + require.NoError(t, err) + + defer os.RemoveAll(workdir) + + store := store.New(helpers.InMemoryDB(t)) + fh := fstore.New(mocks.NoopLogger, store, workdir) + + address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) + err = fh.Install(address, testCID) + require.Error(t, err) + }) + t.Run("handles failure to fetch manifest", func(t *testing.T) { + + // Shutdown manifest server. + msrv.Close() + + workdir, err := os.MkdirTemp("", "b7s-function-get-") + require.NoError(t, err) + + defer os.RemoveAll(workdir) + + store := store.New(helpers.InMemoryDB(t)) + fh := fstore.New(mocks.NoopLogger, store, workdir) + + address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) + err = fh.Install(address, testCID) + require.Error(t, err) + }) +} + +func createServers(t *testing.T, manifestURL string, functionURL string, functionPayload []byte) (manifestSrv *httptest.Server, functionSrv *httptest.Server) { + t.Helper() + + // Create function server. + fsrv := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + + path := strings.TrimPrefix(req.URL.Path, "/") + if path != functionURL { + w.WriteHeader(http.StatusNotFound) + return + } + + w.Write(functionPayload) + })) + + // Setup manifest that points to the function server. + functionAddress := fmt.Sprintf("%s/%s", fsrv.URL, functionURL) + hash := sha256.Sum256(functionPayload) + sourceManifest := blockless.FunctionManifest{ + Deployment: blockless.Deployment{ + URI: functionAddress, + Checksum: fmt.Sprintf("%x", hash), + }, + } + + // Create manifest server. + msrv := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + + path := strings.TrimPrefix(req.URL.Path, "/") + if path != manifestURL { + w.WriteHeader(http.StatusNotFound) + return + } + + payload, err := json.Marshal(sourceManifest) + require.NoError(t, err) + w.Write(payload) + })) + + return msrv, fsrv +} + +func verifyFileHash(t *testing.T, filename string, checksum [32]byte) bool { + t.Helper() + + data, err := os.ReadFile(filename) + require.NoError(t, err) + + h := sha256.Sum256(data) + + return bytes.Equal(checksum[:], h[:]) +} diff --git a/host/dht.go b/host/dht.go index a2e679ba..46763e9d 100644 --- a/host/dht.go +++ b/host/dht.go @@ -43,9 +43,6 @@ findPeers: // Skip peers we're already connected to. connections := h.Network().ConnsToPeer(peer.ID) if len(connections) > 0 { - h.log.Debug(). - Str("peer", peer.String()). - Msg("skipping connected peer") continue } diff --git a/node/execute.go b/node/execute.go index e0b39b9a..f08aa2cb 100644 --- a/node/execute.go +++ b/node/execute.go @@ -3,7 +3,6 @@ package node import ( "context" "encoding/json" - "errors" "fmt" "github.com/libp2p/go-libp2p/core/peer" @@ -90,7 +89,7 @@ func (n *Node) processExecute(ctx context.Context, from peer.ID, payload []byte) func (n *Node) workerExecute(ctx context.Context, from peer.ID, requestID string, req execute.Request) (execute.Result, error) { // Check if we have function in store. - functionInstalled, err := n.isFunctionInstalled(req.FunctionID) + functionInstalled, err := n.fstore.Installed(req.FunctionID) if err != nil { res := execute.Result{ Code: response.CodeError, @@ -193,7 +192,7 @@ rollCallResponseLoop: Str("peer", reportingPeer.String()). Str("function_id", req.FunctionID). Str("request_id", requestID). - Msg("peer reported for roll call") + Msg("peer reported for roll call - requesting execution") // Request execution from the peer who reported back first. reqExecute := request.Execute{ @@ -263,19 +262,3 @@ func (n *Node) processExecuteResponse(ctx context.Context, from peer.ID, payload return nil } - -// isFuncitonInstalled looks up the function in the store by using the functionID/CID as key. -func (n *Node) isFunctionInstalled(functionID string) (bool, error) { - - _, err := n.function.Get("", functionID, true) - if err != nil { - - if errors.Is(err, blockless.ErrNotFound) { - return false, nil - } - - return false, fmt.Errorf("could not lookup function in store: %w", err) - } - - return true, nil -} diff --git a/node/execute_internal_test.go b/node/execute_internal_test.go index 4f19e4c6..22c50b09 100644 --- a/node/execute_internal_test.go +++ b/node/execute_internal_test.go @@ -156,12 +156,12 @@ func TestNode_WorkerExecute(t *testing.T) { node := createNode(t, blockless.WorkerNode) - // Error retrieving function. + // Error retrieving function manifest. fstore := mocks.BaselineFunctionHandler(t) - fstore.GetFunc = func(string, string, bool) (*blockless.FunctionManifest, error) { - return nil, mocks.GenericError + fstore.InstalledFunc = func(string) (bool, error) { + return false, mocks.GenericError } - node.function = fstore + node.fstore = fstore // Create a host that will serve as a receiver of the execution response. receiver, err := host.New(mocks.NoopLogger, loopback, 0) @@ -193,10 +193,10 @@ func TestNode_WorkerExecute(t *testing.T) { wg.Wait() // Function is not installed. - fstore.GetFunc = func(string, string, bool) (*blockless.FunctionManifest, error) { - return nil, blockless.ErrNotFound + fstore.InstalledFunc = func(string) (bool, error) { + return false, nil } - node.function = fstore + node.fstore = fstore wg.Add(1) @@ -212,7 +212,7 @@ func TestNode_WorkerExecute(t *testing.T) { require.Equal(t, blockless.MessageExecuteResponse, received.Type) - require.Equal(t, received.Code, response.CodeNotFound) + require.Equal(t, response.CodeNotFound, received.Code) }) err = node.processExecute(context.Background(), receiver.ID(), payload) diff --git a/node/function.go b/node/function.go index 3818c91d..7e7446cc 100644 --- a/node/function.go +++ b/node/function.go @@ -1,18 +1,16 @@ package node -import ( - "github.com/blocklessnetworking/b7s/models/blockless" -) +// FStore provides retrieval of function manifest. +type FStore interface { + // Install will install a function based on the address and CID. + Install(address string, cid string) error -// FunctionStore provides retrieval of function manifest. -type FunctionStore interface { - // Get retrieves a function manifest based on the address or CID. `useCached` boolean - // determines if function manifest should be refetched or previously cached data can be returned. - Get(address string, cid string, useCached bool) (*blockless.FunctionManifest, error) + // Installed returns info if the function is installed or not. + Installed(cid string) (bool, error) // InstalledFunction returns the list of CIDs of installed functions. InstalledFunctions() []string - // Sync will recheck if function installation is found in local storage, and redownload it if it isn't + // Sync will recheck if function installation is found in local storage, and redownload it if it isn't. Sync(cid string) error } diff --git a/node/function_manifest.go b/node/function_manifest.go deleted file mode 100644 index 98c5b91b..00000000 --- a/node/function_manifest.go +++ /dev/null @@ -1,21 +0,0 @@ -package node - -import ( - "fmt" - - "github.com/blocklessnetworking/b7s/models/blockless" -) - -// getFunctionManifest retrieves the function manifest for the function with the given ID. -func (n *Node) getFunctionManifest(id string) (*blockless.FunctionManifest, error) { - - // Try to get function manifest from the store. - var manifest blockless.FunctionManifest - err := n.store.GetRecord(id, &manifest) - if err != nil { - // TODO: Check - error not found. - return nil, fmt.Errorf("could not retrieve function manifest: %w", err) - } - - return &manifest, nil -} diff --git a/node/handlers.go b/node/handlers.go index 44ad5c54..f0c0a60c 100644 --- a/node/handlers.go +++ b/node/handlers.go @@ -7,8 +7,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" - "github.com/blocklessnetworking/b7s/models/blockless" - "github.com/blocklessnetworking/b7s/models/request" "github.com/blocklessnetworking/b7s/models/response" ) @@ -39,45 +37,6 @@ func (n *Node) processRollCallResponse(ctx context.Context, from peer.ID, payloa return nil } -func (n *Node) processInstallFunction(ctx context.Context, from peer.ID, payload []byte) error { - - // Only workers should respond to function install requests. - if n.cfg.Role != blockless.WorkerNode { - n.log.Debug(). - Msg("received function install request, ignoring") - return nil - } - - // Unpack the request. - var req request.InstallFunction - err := json.Unmarshal(payload, &req) - if err != nil { - return fmt.Errorf("could not unpack request: %w", err) - } - req.From = from - - // Get the function manifest. - _, err = n.function.Get(req.ManifestURL, req.CID, true) - if err != nil { - return fmt.Errorf("could not retrieve function (manifest_url: %s, cid: %s): %w", req.ManifestURL, req.CID, err) - } - - // Create the response. - res := response.InstallFunction{ - Type: blockless.MessageInstallFunctionResponse, - Code: response.CodeAccepted, - Message: "installed", - } - - // Reply to the caller. - err = n.send(ctx, from, res) - if err != nil { - return fmt.Errorf("could not send the response (peer: %s): %w", from, err) - } - - return nil -} - func (n *Node) processInstallFunctionResponse(ctx context.Context, from peer.ID, payload []byte) error { n.log.Debug().Msg("function install response received") return nil diff --git a/node/handlers_internal_test.go b/node/handlers_internal_test.go index 5fb8e0c5..d9e01418 100644 --- a/node/handlers_internal_test.go +++ b/node/handlers_internal_test.go @@ -150,10 +150,13 @@ func TestNode_InstallFunction(t *testing.T) { hostAddNewPeer(t, node.host, receiver) fstore := mocks.BaselineFunctionHandler(t) - fstore.GetFunc = func(string, string, bool) (*blockless.FunctionManifest, error) { - return nil, mocks.GenericError + fstore.InstalledFunc = func(string) (bool, error) { + return false, nil } - node.function = fstore + fstore.InstallFunc = func(string, string) error { + return mocks.GenericError + } + node.fstore = fstore // NOTE: In reality, this is more "documenting" current behavior. // In reality it sounds more correct that we *should* get a response back. diff --git a/node/health_internal_test.go b/node/health_internal_test.go index 5b0a7c33..aba47ed8 100644 --- a/node/health_internal_test.go +++ b/node/health_internal_test.go @@ -26,7 +26,6 @@ func TestNode_Health(t *testing.T) { var ( logger = mocks.NoopLogger - store = mocks.BaselineStore(t) peerstore = mocks.BaselinePeerStore(t) functionHandler = mocks.BaselineFunctionHandler(t) ) @@ -37,7 +36,7 @@ func TestNode_Health(t *testing.T) { nhost, err := host.New(logger, loopback, 0) require.NoError(t, err) - node, err := New(logger, nhost, store, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopic(topic)) + node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopic(topic)) require.NoError(t, err) // Create a host that will listen on the the topic to verify health pings diff --git a/node/install.go b/node/install.go new file mode 100644 index 00000000..1aab83c6 --- /dev/null +++ b/node/install.go @@ -0,0 +1,78 @@ +package node + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/blocklessnetworking/b7s/models/blockless" + "github.com/blocklessnetworking/b7s/models/request" + "github.com/blocklessnetworking/b7s/models/response" +) + +func (n *Node) processInstallFunction(ctx context.Context, from peer.ID, payload []byte) error { + + // Only workers should respond to function install requests. + if n.cfg.Role != blockless.WorkerNode { + n.log.Debug(). + Msg("received function install request, ignoring") + return nil + } + + // Unpack the request. + var req request.InstallFunction + err := json.Unmarshal(payload, &req) + if err != nil { + return fmt.Errorf("could not unpack request: %w", err) + } + req.From = from + + // Install function. + err = n.installFunction(req.CID, req.ManifestURL) + if err != nil { + return fmt.Errorf("could not install function: %w", err) + } + + // Create the response. + res := response.InstallFunction{ + Type: blockless.MessageInstallFunctionResponse, + Code: response.CodeAccepted, + Message: "installed", + } + + // Reply to the caller. + err = n.send(ctx, from, res) + if err != nil { + return fmt.Errorf("could not send the response (peer: %s): %w", from, err) + } + + return nil +} + +// installFunction will check if the function is installed first, and install it if not. +func (n *Node) installFunction(cid string, manifestURL string) error { + + // Check if the function is installed. + installed, err := n.fstore.Installed(cid) + if err != nil { + return fmt.Errorf("could not check if function is installed: %w", err) + } + + if installed { + return nil + } + + // If the function was not installed already, install it now. + err = n.fstore.Install(manifestURL, cid) + if err != nil { + return fmt.Errorf("could not install function: %w", err) + } + + return nil +} + +func manifestURLFromCID(cid string) string { + return fmt.Sprintf("https://%s.ipfs.w3s.link/manifest.json", cid) +} diff --git a/node/node.go b/node/node.go index f5cb7e68..f7f4944a 100644 --- a/node/node.go +++ b/node/node.go @@ -28,9 +28,8 @@ type Node struct { log zerolog.Logger host *host.Host - store Store executor Executor - function FunctionStore + fstore FStore topic *pubsub.Topic sema chan struct{} @@ -41,7 +40,7 @@ type Node struct { } // New creates a new Node. -func New(log zerolog.Logger, host *host.Host, store Store, peerStore PeerStore, function FunctionStore, options ...Option) (*Node, error) { +func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore, options ...Option) (*Node, error) { // Initialize config. cfg := DefaultConfig @@ -63,8 +62,7 @@ func New(log zerolog.Logger, host *host.Host, store Store, peerStore PeerStore, log: log.With().Str("component", "node").Logger(), host: host, - store: store, - function: function, + fstore: fstore, executor: cfg.Execute, wg: &sync.WaitGroup{}, diff --git a/node/node_integration_test.go b/node/node_integration_test.go index f8414d9f..4dd3c9fd 100644 --- a/node/node_integration_test.go +++ b/node/node_integration_test.go @@ -119,7 +119,7 @@ func createNode(t *testing.T, dir string, logger zerolog.Logger, host *host.Host opts = append(opts, node.WithExecutor(executor)) } - node, err := node.New(logger, host, store, peerstore, fstore, opts...) + node, err := node.New(logger, host, peerstore, fstore, opts...) require.NoError(t, err) return db, node diff --git a/node/node_internal_test.go b/node/node_internal_test.go index 109fbb04..a3896957 100644 --- a/node/node_internal_test.go +++ b/node/node_internal_test.go @@ -38,7 +38,6 @@ func TestNode_New(t *testing.T) { var ( logger = mocks.NoopLogger - store = mocks.BaselineStore(t) peerstore = mocks.BaselinePeerStore(t) functionHandler = mocks.BaselineFunctionHandler(t) executor = mocks.BaselineExecutor(t) @@ -50,23 +49,23 @@ func TestNode_New(t *testing.T) { t.Run("create a head node", func(t *testing.T) { t.Parallel() - node, err := New(logger, host, store, peerstore, functionHandler, WithRole(blockless.HeadNode)) + node, err := New(logger, host, peerstore, functionHandler, WithRole(blockless.HeadNode)) require.NoError(t, err) require.NotNil(t, node) // Creating a head node with executor fails. - _, err = New(logger, host, store, peerstore, functionHandler, WithRole(blockless.HeadNode), WithExecutor(executor)) + _, err = New(logger, host, peerstore, functionHandler, WithRole(blockless.HeadNode), WithExecutor(executor)) require.Error(t, err) }) t.Run("create a worker node", func(t *testing.T) { t.Parallel() - node, err := New(logger, host, store, peerstore, functionHandler, WithRole(blockless.WorkerNode), WithExecutor(executor)) + node, err := New(logger, host, peerstore, functionHandler, WithRole(blockless.WorkerNode), WithExecutor(executor)) require.NoError(t, err) require.NotNil(t, node) // Creating a worker node without executor fails. - _, err = New(logger, host, store, peerstore, functionHandler, WithRole(blockless.WorkerNode)) + _, err = New(logger, host, peerstore, functionHandler, WithRole(blockless.WorkerNode)) require.Error(t, err) }) } @@ -95,7 +94,6 @@ func createNode(t *testing.T, role blockless.NodeRole) *Node { var ( logger = mocks.NoopLogger - store = mocks.BaselineStore(t) peerstore = mocks.BaselinePeerStore(t) functionHandler = mocks.BaselineFunctionHandler(t) ) @@ -112,7 +110,7 @@ func createNode(t *testing.T, role blockless.NodeRole) *Node { opts = append(opts, WithExecutor(executor)) } - node, err := New(logger, host, store, peerstore, functionHandler, opts...) + node, err := New(logger, host, peerstore, functionHandler, opts...) require.NoError(t, err) return node diff --git a/node/notifiee_internal_test.go b/node/notifiee_internal_test.go index b5e97629..5d3b8036 100644 --- a/node/notifiee_internal_test.go +++ b/node/notifiee_internal_test.go @@ -17,7 +17,6 @@ func TestNode_Notifiee(t *testing.T) { var ( logger = mocks.NoopLogger - store = mocks.BaselineStore(t) functionHandler = mocks.BaselineFunctionHandler(t) ) @@ -40,7 +39,7 @@ func TestNode_Notifiee(t *testing.T) { return nil } - node, err := New(logger, server, store, peerstore, functionHandler, WithRole(blockless.HeadNode)) + node, err := New(logger, server, peerstore, functionHandler, WithRole(blockless.HeadNode)) require.NoError(t, err) client, err := host.New(mocks.NoopLogger, loopback, 0) diff --git a/node/rest.go b/node/rest.go index 5072ba60..49858ef9 100644 --- a/node/rest.go +++ b/node/rest.go @@ -39,8 +39,8 @@ func (n *Node) ExecutionResult(id string) (execute.Result, bool) { return res.(execute.Result), ok } -// FunctionInstall initiates function install process. -func (n *Node) FunctionInstall(ctx context.Context, uri string, cid string) error { +// PublishFunctionInstall publishes a function install message. +func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string) error { var req request.InstallFunction if uri != "" { @@ -89,7 +89,7 @@ func createInstallMessageFromCID(cid string) request.InstallFunction { req := request.InstallFunction{ Type: blockless.MessageInstallFunction, - ManifestURL: fmt.Sprintf("https://%s.ipfs.w3s.link/manifest.json", cid), + ManifestURL: manifestURLFromCID(cid), CID: cid, } diff --git a/node/roll_call.go b/node/roll_call.go index 6954c78f..fcfcb07b 100644 --- a/node/roll_call.go +++ b/node/roll_call.go @@ -28,59 +28,50 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte } req.From = from - // Check if we have this manifest. - functionInstalled, err := n.isFunctionInstalled(req.FunctionID) - if err != nil { - // We could not lookup the manifest. - res := response.RollCall{ - Type: blockless.MessageRollCallResponse, - FunctionID: req.FunctionID, - RequestID: req.RequestID, - Code: response.CodeError, - } + n.log.Debug().Str("cid", req.FunctionID).Str("request_id", req.RequestID). + Msg("received roll call request") + + // Base response to return. + res := response.RollCall{ + Type: blockless.MessageRollCallResponse, + FunctionID: req.FunctionID, + RequestID: req.RequestID, + Code: response.CodeError, // CodeError by default, changed if everything goes well. + } + // Check if we have this function installed. + installed, err := n.fstore.Installed(req.FunctionID) + if err != nil { sendErr := n.send(ctx, req.From, res) // Log send error but choose to return the original error. - n.log.Error(). - Err(sendErr). - Str("to", req.From.String()). + n.log.Error().Err(sendErr).Str("to", req.From.String()). Msg("could not send response") return fmt.Errorf("could not check if function is installed: %w", err) } - // We don't have this function. - if !functionInstalled { + // We don't have this function - install it now. + if !installed { - res := response.RollCall{ - Type: blockless.MessageRollCallResponse, - FunctionID: req.FunctionID, - RequestID: req.RequestID, - Code: response.CodeNotFound, - } + n.log.Info().Str("cid", req.FunctionID). + Msg("roll call but function not installed, installing now") - err = n.send(ctx, req.From, res) + err = n.installFunction(req.FunctionID, manifestURLFromCID(req.FunctionID)) if err != nil { - return fmt.Errorf("could not send response: %w", err) - } - - // TODO: In the original code we create a function install call here. - // However, we do it with the CID only, but the function install code - // requires manifestURL + CID. So at the moment this code path is not - // present here. + sendErr := n.send(ctx, req.From, res) + // Log send error but choose to return the original error. + n.log.Error().Err(sendErr).Str("to", req.From.String()). + Msg("could not send response") - return nil + return fmt.Errorf("could not install function: %w", err) + } } - // Create response. - res := response.RollCall{ - Type: blockless.MessageRollCallResponse, - FunctionID: req.FunctionID, - RequestID: req.RequestID, - Code: response.CodeAccepted, - } + n.log.Info().Str("cid", req.FunctionID).Str("request_id", req.RequestID). + Msg("reporting for roll call") - // Send message. + // Send postive response. + res.Code = response.CodeAccepted err = n.send(ctx, req.From, res) if err != nil { return fmt.Errorf("could not send response: %w", err) diff --git a/node/roll_call_internal_test.go b/node/roll_call_internal_test.go index 350ad3c0..9e23910c 100644 --- a/node/roll_call_internal_test.go +++ b/node/roll_call_internal_test.go @@ -84,10 +84,10 @@ func TestNode_RollCall(t *testing.T) { // Function store fails to check function presence. fstore := mocks.BaselineFunctionHandler(t) - fstore.GetFunc = func(string, string, bool) (*blockless.FunctionManifest, error) { - return nil, mocks.GenericError + fstore.InstalledFunc = func(string) (bool, error) { + return false, mocks.GenericError } - node.function = fstore + node.fstore = fstore var wg sync.WaitGroup wg.Add(1) @@ -114,7 +114,7 @@ func TestNode_RollCall(t *testing.T) { wg.Wait() }) - t.Run("worker node handles function not installed", func(t *testing.T) { + t.Run("worker node installs function on roll call", func(t *testing.T) { t.Parallel() node := createNode(t, blockless.WorkerNode) @@ -124,12 +124,15 @@ func TestNode_RollCall(t *testing.T) { hostAddNewPeer(t, node.host, receiver) - // Function store works okay but function is not found. + // Function store has no function but is able to install it. fstore := mocks.BaselineFunctionHandler(t) - fstore.GetFunc = func(string, string, bool) (*blockless.FunctionManifest, error) { - return nil, blockless.ErrNotFound + fstore.InstalledFunc = func(string) (bool, error) { + return false, nil } - node.function = fstore + fstore.InstallFunc = func(string, string) error { + return nil + } + node.fstore = fstore var wg sync.WaitGroup wg.Add(1) @@ -148,7 +151,7 @@ func TestNode_RollCall(t *testing.T) { require.Equal(t, rollCallReq.FunctionID, received.FunctionID) require.Equal(t, rollCallReq.RequestID, received.RequestID) - require.Equal(t, response.CodeNotFound, received.Code) + require.Equal(t, response.CodeAccepted, received.Code) }) err = node.processRollCall(context.Background(), receiver.ID(), payload) @@ -156,6 +159,51 @@ func TestNode_RollCall(t *testing.T) { wg.Wait() }) + t.Run("worker node handles function failure to install function on roll call", func(t *testing.T) { + t.Parallel() + + node := createNode(t, blockless.WorkerNode) + + receiver, err := host.New(mocks.NoopLogger, loopback, 0) + require.NoError(t, err) + + hostAddNewPeer(t, node.host, receiver) + + // Function store has no function but is not able to install it. + fstore := mocks.BaselineFunctionHandler(t) + fstore.InstalledFunc = func(string) (bool, error) { + return false, nil + } + fstore.InstallFunc = func(string, string) error { + return mocks.GenericError + } + node.fstore = fstore + + var wg sync.WaitGroup + wg.Add(1) + + receiver.SetStreamHandler(blockless.ProtocolID, func(stream network.Stream) { + defer wg.Done() + defer stream.Close() + + var received response.RollCall + getStreamPayload(t, stream, &received) + + from := stream.Conn().RemotePeer() + require.Equal(t, node.host.ID(), from) + + require.Equal(t, blockless.MessageRollCallResponse, received.Type) + + require.Equal(t, rollCallReq.FunctionID, received.FunctionID) + require.Equal(t, rollCallReq.RequestID, received.RequestID) + require.Equal(t, response.CodeError, received.Code) + }) + + err = node.processRollCall(context.Background(), receiver.ID(), payload) + require.Error(t, err) + + wg.Wait() + }) t.Run("node issues roll call ok", func(t *testing.T) { t.Parallel() diff --git a/node/sync.go b/node/sync.go index 98ffffe6..6e49bccc 100644 --- a/node/sync.go +++ b/node/sync.go @@ -9,11 +9,11 @@ import ( // but were previously installed. We do NOT abort on failure. func (n *Node) syncFunctions() { - cids := n.function.InstalledFunctions() + cids := n.fstore.InstalledFunctions() for _, cid := range cids { - err := n.function.Sync(cid) + err := n.fstore.Sync(cid) if err != nil { n.log.Error().Err(err).Str("cid", cid).Msg("function sync error") continue diff --git a/testing/mocks/fstore.go b/testing/mocks/fstore.go index cc9309ee..d9711d03 100644 --- a/testing/mocks/fstore.go +++ b/testing/mocks/fstore.go @@ -7,7 +7,9 @@ import ( ) type FStore struct { - GetFunc func(string, string, bool) (*blockless.FunctionManifest, error) + InstallFunc func(string, string) error + GetFunc func(string) (*blockless.FunctionManifest, error) + InstalledFunc func(string) (bool, error) InstalledFunctionsFunc func() []string SyncFunc func(string) error } @@ -16,9 +18,15 @@ func BaselineFunctionHandler(t *testing.T) *FStore { t.Helper() fh := FStore{ - GetFunc: func(string, string, bool) (*blockless.FunctionManifest, error) { + GetFunc: func(string) (*blockless.FunctionManifest, error) { return &GenericManifest, nil }, + InstallFunc: func(string, string) error { + return nil + }, + InstalledFunc: func(string) (bool, error) { + return true, nil + }, InstalledFunctionsFunc: func() []string { return nil }, @@ -30,8 +38,16 @@ func BaselineFunctionHandler(t *testing.T) *FStore { return &fh } -func (f *FStore) Get(address string, cid string, useCached bool) (*blockless.FunctionManifest, error) { - return f.GetFunc(address, cid, useCached) +func (f *FStore) Install(address string, cid string) error { + return f.InstallFunc(address, cid) +} + +func (f *FStore) Get(cid string) (*blockless.FunctionManifest, error) { + return f.GetFunc(cid) +} + +func (f *FStore) Installed(cid string) (bool, error) { + return f.InstalledFunc(cid) } func (f *FStore) InstalledFunctions() []string { diff --git a/testing/mocks/generic.go b/testing/mocks/generic.go index d2962048..30f90187 100644 --- a/testing/mocks/generic.go +++ b/testing/mocks/generic.go @@ -17,7 +17,7 @@ import ( var ( NoopLogger = zerolog.New(io.Discard) - GenericError = errors.New("dummy test") + GenericError = errors.New("dummy error") GenericPeerID = peer.ID([]byte{0x0, 0x24, 0x8, 0x1, 0x12, 0x20, 0x56, 0x77, 0x86, 0x82, 0x76, 0xa, 0xc5, 0x9, 0x63, 0xde, 0xe4, 0x31, 0xfc, 0x44, 0x75, 0xdd, 0x5a, 0x27, 0xee, 0x6b, 0x94, 0x13, 0xed, 0xe2, 0xa3, 0x6d, 0x8a, 0x1d, 0x57, 0xb6, 0xb8, 0x91}) diff --git a/testing/mocks/node.go b/testing/mocks/node.go index d7d1d1f7..f9067b19 100644 --- a/testing/mocks/node.go +++ b/testing/mocks/node.go @@ -9,9 +9,9 @@ import ( // Node implements the `Node` interface expected by the API. type Node struct { - ExecuteFunctionFunc func(context.Context, execute.Request) (execute.Result, error) - ExecutionResultFunc func(id string) (execute.Result, bool) - FunctionInstallFunc func(ctx context.Context, uri string, cid string) error + ExecuteFunctionFunc func(context.Context, execute.Request) (execute.Result, error) + ExecutionResultFunc func(id string) (execute.Result, bool) + PublishFunctionInstallFunc func(ctx context.Context, uri string, cid string) error } func BaselineNode(t *testing.T) *Node { @@ -24,7 +24,7 @@ func BaselineNode(t *testing.T) *Node { ExecutionResultFunc: func(id string) (execute.Result, bool) { return GenericExecutionResult, true }, - FunctionInstallFunc: func(ctx context.Context, uri string, cid string) error { + PublishFunctionInstallFunc: func(ctx context.Context, uri string, cid string) error { return nil }, } @@ -40,6 +40,6 @@ func (n *Node) ExecutionResult(id string) (execute.Result, bool) { return n.ExecutionResultFunc(id) } -func (n *Node) FunctionInstall(ctx context.Context, uri string, cid string) error { - return n.FunctionInstallFunc(ctx, uri, cid) +func (n *Node) PublishFunctionInstall(ctx context.Context, uri string, cid string) error { + return n.PublishFunctionInstallFunc(ctx, uri, cid) }