Skip to content

Commit

Permalink
Install function on roll call if it wasn't installed already (#78)
Browse files Browse the repository at this point in the history
* Rename function handler/interface to fstore in node code

* Add method to check if a function is installed

* Separate installing and getting a function

* Update tests

* Clarify naming - REST API endpoint publishes function install - doesnt actually install it

* Add method for function install

* Function install done on roll call

* Improve log messages

* Update tests

* Remove "store" as a dependency for the node
  • Loading branch information
Maelkum authored Apr 12, 2023
1 parent 6e00e82 commit 2062645
Show file tree
Hide file tree
Showing 29 changed files with 518 additions and 425 deletions.
2 changes: 1 addition & 1 deletion api/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (a *API) Install(ctx echo.Context) error {
// Start function install in a separate goroutine and signal when it's done.
fnErr := make(chan error)
go func() {
err = a.node.FunctionInstall(reqCtx, req.URI, req.CID)
err = a.node.PublishFunctionInstall(reqCtx, req.URI, req.CID)
fnErr <- err
}()

Expand Down
8 changes: 4 additions & 4 deletions api/install_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {
)

node := mocks.BaselineNode(t)
node.FunctionInstallFunc = func(context.Context, string, string) error {
node.PublishFunctionInstallFunc = func(context.Context, string, string) error {
time.Sleep(installDuration)
return nil
}
Expand All @@ -91,17 +91,17 @@ func TestAPI_FunctionInstall_HandlesErrors(t *testing.T) {

var res = response.InstallFunction{}
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &res))

num, err := strconv.Atoi(res.Code)
require.NoError(t, err)

require.Equal(t, http.StatusRequestTimeout, num)
})
t.Run("node fails to install function", func(t *testing.T) {
t.Parallel()

node := mocks.BaselineNode(t)
node.FunctionInstallFunc = func(context.Context, string, string) error {
node.PublishFunctionInstallFunc = func(context.Context, string, string) error {
return mocks.GenericError
}

Expand Down
2 changes: 1 addition & 1 deletion api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import (
type Node interface {
ExecuteFunction(context.Context, execute.Request) (execute.Result, error)
ExecutionResult(id string) (execute.Result, bool)
FunctionInstall(ctx context.Context, uri string, cid string) error
PublishFunctionInstall(ctx context.Context, uri string, cid string) error
}
2 changes: 1 addition & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func run() int {
fstore := fstore.New(log, functionStore, cfg.Workspace)

// Instantiate node.
node, err := node.New(log, host, functionStore, peerstore, fstore, opts...)
node, err := node.New(log, host, peerstore, fstore, opts...)
if err != nil {
log.Error().Err(err).Msg("could not create node")
return failure
Expand Down
2 changes: 1 addition & 1 deletion fstore/fstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func New(log zerolog.Logger, store Store, workdir string) *FStore {
downloader.UserAgent = defaultUserAgent

h := FStore{
log: log.With().Str("component", "function_store").Logger(),
log: log.With().Str("component", "fstore").Logger(),
store: store,
http: &cli,
downloader: downloader,
Expand Down
81 changes: 5 additions & 76 deletions fstore/get.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,18 @@
package fstore

import (
"errors"
"fmt"
"path/filepath"

"github.com/blocklessnetworking/b7s/models/blockless"
)

// Get retrieves the function manifest from the given address. `useCached` indicates whether,
// if the function is found in the store/db, it should be used, or if we should re-download it.
func (h *FStore) Get(address string, cid string, useCached bool) (*blockless.FunctionManifest, error) {
// Get retrieves a function manifest for the given function from storage.
func (h *FStore) Get(cid string) (*blockless.FunctionManifest, error) {

h.log.Debug().
Str("cid", cid).
Str("address", address).
Bool("use_cached", useCached).
Msg("getting manifest")

cachedFn, err := h.getFunction(cid)
// Return cached version if so requested.
if err == nil && useCached {

h.log.Debug().
Str("cid", cid).
Str("address", address).
Msg("function manifest was already cached, done")

return &cachedFn.Manifest, nil
}
if err != nil && !errors.Is(err, blockless.ErrNotFound) {
return nil, fmt.Errorf("could not get function from store: %w", err)
}

// Being here means that we either did not find the manifest, or we don't
// want to use the cached one.

// Retrieve function manifest from the given address.
var manifest blockless.FunctionManifest
err = h.getJSON(address, &manifest)
fn, err := h.getFunction(cid)
if err != nil {
return nil, fmt.Errorf("could not retrieve manifest: %w", err)
}

// If the runtime URL is specified, use it to fill in the deployment info.
if manifest.Runtime.URL != "" {
err = updateDeploymentInfo(&manifest, address)
if err != nil {
return nil, fmt.Errorf("could not update deployment info: %w", err)
}
}

// Download the function identified by the manifest.
functionPath, err := h.download(manifest)
if err != nil {
return nil, fmt.Errorf("could not download function: %w", err)
}

out := filepath.Join(h.workdir, cid)

// Unpack the .tar.gz archive.
// TODO: Would be good to know the content of the .tar.gz archive.
// We're unpacking the archive here and storing the path to the .tar.gz in the DB.
err = h.unpackArchive(functionPath, out)
if err != nil {
return nil, fmt.Errorf("could not unpack gzip archive (file: %s): %w", functionPath, err)
}

manifest.Deployment.File = functionPath

// Store the function record.
fn := functionRecord{
CID: cid,
URL: address,
Manifest: manifest,
Archive: functionPath,
Files: out,
}
err = h.saveFunction(fn)
if err != nil {
h.log.Error().
Err(err).
Str("cid", cid).
Msg("could not save function record")
return nil, fmt.Errorf("could not get function from store: %w", err)
}

return &manifest, nil
return &fn.Manifest, nil
}
158 changes: 1 addition & 157 deletions fstore/get_test.go
Original file line number Diff line number Diff line change
@@ -1,87 +1,21 @@
package fstore_test

import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"

"github.com/stretchr/testify/require"

"github.com/blocklessnetworking/b7s/fstore"
"github.com/blocklessnetworking/b7s/models/blockless"
"github.com/blocklessnetworking/b7s/store"
"github.com/blocklessnetworking/b7s/testing/helpers"
"github.com/blocklessnetworking/b7s/testing/mocks"
)

func TestFunction_Get(t *testing.T) {

const (
manifestURL = "manifest.json"
functionURL = "function.tar.gz"
testFile = "testdata/testFunction.tar.gz"

testCID = "dummy-cid"
)

workdir, err := os.MkdirTemp("", "b7s-function-get-")
require.NoError(t, err)

defer os.RemoveAll(workdir)

functionPayload, err := os.ReadFile(testFile)
require.NoError(t, err)

hash := sha256.Sum256(functionPayload)

// We'll create two servers, so we can link one to the other.
msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload)

store := store.New(helpers.InMemoryDB(t))
fh := fstore.New(mocks.NoopLogger, store, workdir)

address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL)
manifest, err := fh.Get(address, testCID, false)
require.NoError(t, err)

// Verify downloaded file.
archive := manifest.Deployment.File
require.FileExists(t, archive)

ok := verifyFileHash(t, archive, hash)
require.Truef(t, ok, "file hash does not match")

// Shutdown both servers and retry getting the manifest - verify that the cached manifest will be returned.
fsrv.Close()
msrv.Close()

_, err = fh.Get(address, testCID, true)
require.NoError(t, err)
}

func TestFunction_GetHandlesErrors(t *testing.T) {

const (
manifestURL = "manifest.json"
functionURL = "function.tar.gz"
testFile = "testdata/testFunction.tar.gz"

testCID = "dummy-cid"
)

functionPayload, err := os.ReadFile(testFile)
require.NoError(t, err)

// We'll create two servers, so we can link one to the other.
msrv, fsrv := createServers(t, manifestURL, functionURL, functionPayload)
// NOTE: Server shutdown handled in test cases below.

t.Run("handles failure to read manifest from store", func(t *testing.T) {

workdir, err := os.MkdirTemp("", "b7s-function-get-")
Expand All @@ -96,97 +30,7 @@ func TestFunction_GetHandlesErrors(t *testing.T) {

fh := fstore.New(mocks.NoopLogger, store, workdir)

address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL)
_, err = fh.Get(address, testCID, false)
require.Error(t, err)
})
t.Run("handles failure to download function", func(t *testing.T) {

// Shutdown function server.
fsrv.Close()

workdir, err := os.MkdirTemp("", "b7s-function-get-")
require.NoError(t, err)

defer os.RemoveAll(workdir)

store := store.New(helpers.InMemoryDB(t))
fh := fstore.New(mocks.NoopLogger, store, workdir)

address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL)
_, err = fh.Get(address, testCID, false)
_, err = fh.Get(testCID)
require.Error(t, err)
})
t.Run("handles failure to fetch manifest", func(t *testing.T) {

// Shutdown manifest server.
msrv.Close()

workdir, err := os.MkdirTemp("", "b7s-function-get-")
require.NoError(t, err)

defer os.RemoveAll(workdir)

store := store.New(helpers.InMemoryDB(t))
fh := fstore.New(mocks.NoopLogger, store, workdir)

address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL)
_, err = fh.Get(address, testCID, false)
require.Error(t, err)
})
}

func createServers(t *testing.T, manifestURL string, functionURL string, functionPayload []byte) (manifestSrv *httptest.Server, functionSrv *httptest.Server) {
t.Helper()

// Create function server.
fsrv := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

path := strings.TrimPrefix(req.URL.Path, "/")
if path != functionURL {
w.WriteHeader(http.StatusNotFound)
return
}

w.Write(functionPayload)
}))

// Setup manifest that points to the function server.
functionAddress := fmt.Sprintf("%s/%s", fsrv.URL, functionURL)
hash := sha256.Sum256(functionPayload)
sourceManifest := blockless.FunctionManifest{
Deployment: blockless.Deployment{
URI: functionAddress,
Checksum: fmt.Sprintf("%x", hash),
},
}

// Create manifest server.
msrv := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

path := strings.TrimPrefix(req.URL.Path, "/")
if path != manifestURL {
w.WriteHeader(http.StatusNotFound)
return
}

payload, err := json.Marshal(sourceManifest)
require.NoError(t, err)
w.Write(payload)
}))

return msrv, fsrv
}

func verifyFileHash(t *testing.T, filename string, checksum [32]byte) bool {
t.Helper()

data, err := os.ReadFile(filename)
require.NoError(t, err)

h := sha256.Sum256(data)

return bytes.Equal(checksum[:], h[:])
}
Loading

0 comments on commit 2062645

Please sign in to comment.