Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker executor response signature support #154

Closed
wants to merge 8 commits into from
2 changes: 2 additions & 0 deletions consensus/pbft/execute.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pbft

import (
"encoding/hex"
"fmt"
"time"

Expand Down Expand Up @@ -98,6 +99,7 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {
RequestTimestamp: request.Timestamp,
Replica: r.id,
},
Signature: hex.EncodeToString(res.Signature),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PBFT already has and mandates signatures - note the msg.Sign() invocation below that will overwrite this value.

}

// Save this executions in case it's requested again.
Expand Down
35 changes: 24 additions & 11 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,32 @@ import (
"github.com/spf13/afero"

"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/execute"
)

type ExecutionSigner interface {
Sign(execute.Request, execute.RuntimeOutput) ([]byte, error)
}

// defaultConfig used to create Executor.
var defaultConfig = Config{
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
DriversRootPath: "",
}

// Config represents the Executor configuration.
type Config struct {
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
Signer ExecutionSigner // Signer for the executor
}

type Option func(*Config)
Expand Down Expand Up @@ -62,3 +68,10 @@ func WithLimiter(limiter Limiter) Option {
cfg.Limiter = limiter
}
}

// WithSigner sets the signer for the executor.
func WithSigner(signer ExecutionSigner) Option {
return func(cfg *Config) {
cfg.Signer = signer
}
}
22 changes: 15 additions & 7 deletions executor/execute_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ import (
func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) {

// Execute the function.
out, usage, err := e.executeFunction(requestID, req)
out, usage, signature, err := e.executeFunction(requestID, req)
if err != nil {

res := execute.Result{
Code: codes.Error,
RequestID: requestID,
Result: out,
Usage: usage,
Signature: signature,
}

return res, fmt.Errorf("function execution failed: %w", err)
}

Expand All @@ -29,14 +28,15 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu
RequestID: requestID,
Result: out,
Usage: usage,
Signature: signature,
}

return res, nil
}

// executeFunction handles the actual execution of the Blockless function. It returns the
// execution information like standard output, standard error, exit code and resource usage.
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, error) {
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, error) {

log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger()

Expand All @@ -47,7 +47,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu

err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions)
if err != nil {
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err)
return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err)
}
// Remove all temporary files after we're done.
defer func() {
Expand All @@ -66,10 +66,18 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu

out, usage, err := e.executeCommand(cmd)
if err != nil {
return out, execute.Usage{}, fmt.Errorf("command execution failed: %w", err)
return out, execute.Usage{}, []byte{}, fmt.Errorf("command execution failed: %w", err)
}

log.Info().Msg("command executed successfully")

return out, usage, nil
var signature []byte
if e.cfg.Signer != nil {
signature, err = e.cfg.Signer.Sign(req, out)
if err != nil {
return out, usage, []byte{}, fmt.Errorf("could not sign output: %w", err)
}
}

return out, usage, signature, nil
}
1 change: 1 addition & 0 deletions models/execute/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Result struct {
Result RuntimeOutput `json:"result"`
RequestID string `json:"request_id"`
Usage Usage `json:"usage,omitempty"`
Signature []byte `json:"signature,omitempty"`
}

// Cluster represents the set of peers that executed the request.
Expand Down
14 changes: 10 additions & 4 deletions node/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ type Result struct {
Peers []peer.ID `json:"peers,omitempty"`
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
// Signature of this result
Signature []byte `json:"signature,omitempty"`
}

type resultStats struct {
seen uint
peers []peer.ID
seen uint
peers []peer.ID
signature []byte
}

func Aggregate(results execute.ResultMap) Results {
Expand All @@ -40,13 +43,15 @@ func Aggregate(results execute.ResultMap) Results {
stat, ok := stats[output]
if !ok {
stats[output] = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
seen: 0,
peers: make([]peer.ID, 0),
signature: res.Signature,
}
}

stat.seen++
stat.peers = append(stat.peers, executingPeer)
stat.signature = res.Signature

stats[output] = stat
}
Expand All @@ -59,6 +64,7 @@ func Aggregate(results execute.ResultMap) Results {
Result: res,
Peers: stat.peers,
Frequency: 100 * float64(stat.seen) / float64(total),
Signature: stat.signature,
}

aggregated = append(aggregated, aggr)
Expand Down
2 changes: 2 additions & 0 deletions node/worker_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"encoding/hex"
"fmt"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque
Results: execute.ResultMap{
n.host.ID(): result,
},
Signature: hex.EncodeToString(result.Signature),
}

// Send the response, whatever it may be (success or failure).
Expand Down
Loading