Skip to content

Commit

Permalink
Refactor (#54)
Browse files Browse the repository at this point in the history
* Add executor component

* Add node type + refactor models + refactor executables

* Creating a "host" module to contain all of the libp2p host functionality (#27)

* Add store package (#28)

* Add peerstore (#30)

* Add API (#33)

* Add function handler (#31)

* Remove obsolete files (#37)

* Remove src as it contains obsoleted (or refactored and moved) files

* Go mod tidy

* Add Readme files for executables (#39)

* Add Readme files for executables

* Remove replace directive from go.mod

* Add tests for `store` and `peerstore` packages (#38)

* Add tests for "store" package

* Rename test

* Move pebble DB setup to an external package

* Add tests for peerstore

* Add tests with a baseline cache to cover all storage failure scenarios

* Adding tests for the `executor` (#40)

* Using config for executor

* Use afero to abstract FS operations

* Add test for paths created by the executor (manifest, entry etc)

* Add tests for executor creation

* Add tests for configuration

* Add more tests for executor - cmd creation and manifest writing

* Update executor creation

* Split imports

* Add tests for function handler package (#41)

* Add tests for downloading JSON and parsing/merging deployment info

* Add tests for downloader

* Add tests for archive unpacking code

* Adding external test for function handler - Get()

* Update function handler usage

* Add tests for the `node` package (#44)

* Add mock Executor + add tests for node config

* Add mocks for more component, test node creation and handling unsupported functions

* Add tests for handlers

- health
- roll call response
- install function response

* Add a WaitMap for better handling of roll-call/execute responses

* Add a WaitFor to enable waiting for a limited time period

* Use time.After instead of a ticker

* Refactor roll call response handling and add tests for notifiee

* Add test for sending message

* Add test for publishing to a topic

* Add a method for subscribing - internalizes topic handle handling

* Add tests for worker execution

* Add tests for function install

* Handle roll call request

* Adding test for issuing roll call

* Health interval is configurable + tests for health ping

* Create a const for loopback address

* Add test for head node execute - roll call timeout scenario

* Roll call timeout is configurable

* Simplify handling of configurable options by having cfg as part of `Node` instead of individual fields

* Simplify peer address handling code

* Add head execute test case + fix handling of worker execution flow

* Keep a single package for execution result caching

* Decrease timeout

* Each component with a logger sets its own "component" tag (#47)

* Host discovery phase - skip connected peers (#46)

* Add integration test for node - function install and execution request (#45)

* Add integration test for node (function install + execute)

* Move function server to the helper package

* Log files created in appropriate directories

* Blockless runtime path set as environment variable and proper closing of log files

* Add integration flags

* Process requests in parallel (#48)

* Concurrency level for node is controlled via CLI flag (#49)

* update makefile (#50)

* add usage flags to readme

* [executor] Collect CPU usage info and add integration tests (#53)

* Add a config option for setting executable name

* Collect resource usage info - CPU time in nanoseconds and max memory on *ux

* Fix type assertion

* Add a small md5sum test program

* Add integration test for executor

* update docker files (#55)

---------

Co-authored-by: Maelkum <[email protected]>
  • Loading branch information
dmikey and Maelkum authored Mar 22, 2023
1 parent 19bc443 commit f2ce23b
Show file tree
Hide file tree
Showing 186 changed files with 7,742 additions and 3,966 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cmd/node/node
cmd/keygen/keygen

dist/
runtime/
*.env
Expand Down
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: all
all: clean build
all: clean build-node build-keygen

.PHONY: test
test:
Expand All @@ -8,10 +8,16 @@ test:
go test ./src/...
@echo "\n✅ Done.\n"

.PHONY: build
build:
.PHONY: build-node
build-node:
@echo "\n🛠 Building node...\n"
cd src && go build -o ../dist/b7s
cd cmd/node && go build -o ../../dist/b7s
@echo "\n✅ Done.\n"

.PHONY: build-keygen
build-keygen:
@echo "\n🛠 Building node...\n"
cd cmd/keygen && go build -o ../../dist/b7s-keygen
@echo "\n✅ Done.\n"

.PHONY: clean
Expand Down
28 changes: 15 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
![Coverage](https://img.shields.io/badge/Coverage-48.7%25-yellow)


# b7s daemon

b7s is a peer-to-peer networking daemon for the blockless network. It is supported on Windows, Linux, and MacOS platforms for both x64 and arm64 architectures.
Expand All @@ -19,18 +18,21 @@ sudo sh -c "wget https://raw.githubusercontent.com/blocklessnetwork/b7s/main/dow

You can also use Docker to install b7s. See the [Docker documentation](/docker/README.md) for more information.

Usage
b7s can be run with a number of commands and flags:

Commands:

- `help`: display the help menu
- `keygen`: generate identity keys for the node
Flags:

- `config`: path to the configuration file
- `out`: style of logging used in the daemon (rich, text, or json)
For example:
## Usage

| Flag | Short Form | Default Value | Description |
| ----------- | ---------- | ----------------------- | --------------------------------------------------------------------------------------------- |
| log-level | -l | "info" | Specifies the level of logging to use. |
| db | -d | "db" | Specifies the path to the database used for persisting node data. |
| role | -r | "worker" | Specifies the role this node will have in the Blockless protocol (head or worker). |
| address | -a | "0.0.0.0" | Specifies the address that the libp2p host will use. |
| port | -p | 0 | Specifies the port that the libp2p host will use. |
| private-key | N/A | N/A | Specifies the private key that the libp2p host will use. |
| concurrency | -c | node.DefaultConcurrency | Specifies the maximum number of requests the node will process in parallel. |
| rest-api | N/A | N/A | Specifies the address where the head node REST API will listen on. |
| boot-nodes | N/A | N/A | Specifies a list of addresses that this node will connect to on startup, in multiaddr format. |
| workspace | N/A | "./workspace" | Specifies the directory that the node can use for file storage. |
| runtime | N/A | N/A | Specifies the runtime address used by the worker node. |

## Dependencies

Expand Down
22 changes: 22 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package api

import (
"github.com/rs/zerolog"
)

// API provides REST API functionality for the Blockless head node.
type API struct {
log zerolog.Logger
node Node
}

// New creates a new instance of a Blockless head node REST API. Access to node data is provided by the provided `node`.
func New(log zerolog.Logger, node Node) *API {

api := API{
log: log.With().Str("component", "api").Logger(),
node: node,
}

return &api
}
36 changes: 36 additions & 0 deletions api/execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package api

import (
"fmt"
"net/http"

"github.com/labstack/echo/v4"

"github.com/blocklessnetworking/b7s/models/api/request"
"github.com/blocklessnetworking/b7s/models/execute"
)

// Execute implements the REST API endpoint for function execution.
func (a *API) Execute(ctx echo.Context) error {

// Unpack the API request.
var req request.Execute
err := ctx.Bind(&req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}

// TODO: Check - We perhaps want to return the request ID and not wait for the execution, right?
// It's probable that it will time out anyway, right?

// Get the execution result.
result, err := a.node.ExecuteFunction(ctx.Request().Context(), execute.Request(req))
// Determine status code.
code := http.StatusOK
if err != nil {
code = http.StatusInternalServerError
}

// Send the response.
return ctx.JSON(code, result)
}
66 changes: 66 additions & 0 deletions api/install.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package api

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/labstack/echo/v4"

"github.com/blocklessnetworking/b7s/models/api/request"
)

const (
functionInstallTimeout = 10 * time.Second
)

func (a *API) Install(ctx echo.Context) error {

// Unpack the API request.
var req request.InstallFunction
err := ctx.Bind(&req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Errorf("could not unpack request: %w", err))
}

if req.URI == "" && req.CID == "" {
return echo.NewHTTPError(http.StatusBadRequest, errors.New("URI or CID are required"))
}

// Add a deadline to the context.
reqCtx, cancel := context.WithTimeout(ctx.Request().Context(), functionInstallTimeout)
defer cancel()

// Start function install in a separate goroutine and signal when it's done.
fnErr := make(chan error)
go func() {
err = a.node.FunctionInstall(reqCtx, req.URI, req.CID)
fnErr <- err
}()

// Wait until either function install finishes, or request times out.
select {

// Context timed out.
case <-reqCtx.Done():

status := http.StatusRequestTimeout
if !errors.Is(reqCtx.Err(), context.DeadlineExceeded) {
status = http.StatusInternalServerError
}
return ctx.NoContent(status)

// Work done.
case err = <-fnErr:
break
}

// Check if function install succeeded and handle error or return response.
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("function installation failed: %w", err))
}

return ctx.NoContent(http.StatusOK)
}
13 changes: 13 additions & 0 deletions api/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package api

import (
"context"

"github.com/blocklessnetworking/b7s/models/execute"
)

type Node interface {
ExecuteFunction(context.Context, execute.Request) (execute.Result, error)
ExecutionResult(id string) (execute.Result, bool)
FunctionInstall(ctx context.Context, uri string, cid string) error
}
27 changes: 27 additions & 0 deletions api/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package api

import (
"errors"
"net/http"

"github.com/labstack/echo/v4"
)

// ExecutionResult implements the REST API endpoint for retrieving the result of a function execution.
func (a *API) ExecutionResult(ctx echo.Context) error {

// Get the request ID.
requestID := ctx.Param("id")
if requestID == "" {
return echo.NewHTTPError(http.StatusBadRequest, errors.New("missing request ID"))
}

// Lookup execution result.
result, ok := a.node.ExecutionResult(requestID)
if !ok {
return ctx.NoContent(http.StatusNotFound)
}

// Send the response back.
return ctx.JSON(http.StatusOK, result)
}
23 changes: 23 additions & 0 deletions cmd/keygen/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# KeyGen

## Description

The `keygen` utility can be used to create keys that will determine Blockless b7s Node identity.

## Usage

```console
Usage of keygen:
-o, --output string directory where keys should be stored (default ".")
```

## Examples

Create keys in the `keys` directory of the users `home` directory:

```console
$ ./keygen --output ~/keys
generated private key: /home/user/keys/priv.bin
generated public key: /home/user/keys/pub.bin
generated identity file: /home/user/keys/identity
```
89 changes: 89 additions & 0 deletions cmd/keygen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"fmt"
"log"
"os"
"path/filepath"

"github.com/spf13/pflag"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)

const (
// Names used for created files.
privKeyName = "priv.bin"
pubKeyName = "pub.bin"
identityName = "identity"
)

const (
// Permissions used for created files.
privKeyPermissions = 0600
pubKeyPermissions = 0644
)

func main() {

var (
flagOutputDir string
)

pflag.StringVarP(&flagOutputDir, "output", "o", ".", "directory where keys should be stored")

pflag.Parse()

// Create output directory, if it doesn't exist.
err := os.MkdirAll(flagOutputDir, os.ModePerm)
if err != nil {
log.Fatalf("could not create output directory: %s", err)
}

// Generate key pair.
priv, pub, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
if err != nil {
log.Fatalf("could not generate key pair: %s", err)
}

// Encode keys and extract peer ID from key.
privPayload, err := crypto.MarshalPrivateKey(priv)
if err != nil {
log.Fatalf("could not marshal private key: %s", err)
}

pubPayload, err := crypto.MarshalPublicKey(pub)
if err != nil {
log.Fatalf("could not marshal public key: %s", err)
}

identity, err := peer.IDFromPublicKey(pub)
if err != nil {
log.Fatalf("failed to get peer identity from public key: %s", err)
}

// Write keys and identity to files.

pubKeyFile := filepath.Join(flagOutputDir, pubKeyName)
err = os.WriteFile(pubKeyFile, pubPayload, pubKeyPermissions)
if err != nil {
log.Fatalf("could not write private key to file: %s", err)
}

idFile := filepath.Join(flagOutputDir, identityName)
err = os.WriteFile(idFile, []byte(identity), pubKeyPermissions)
if err != nil {
log.Fatalf("could not write private key to file: %s", err)
}

privKeyFile := filepath.Join(flagOutputDir, privKeyName)
err = os.WriteFile(privKeyFile, privPayload, privKeyPermissions)
if err != nil {
log.Fatalf("could not write private key to file: %s", err)
}

fmt.Printf("generated private key: %s\n", privKeyFile)
fmt.Printf("generated public key: %s\n", pubKeyFile)
fmt.Printf("generated identity file: %s\n", idFile)
}
Loading

0 comments on commit f2ce23b

Please sign in to comment.