From fc11fcb44b942dfbd76ee1f4888efd60b4079477 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sat, 15 Jun 2024 13:52:28 +1200 Subject: [PATCH 1/8] executor support for response signing using pluggable interface --- executor/config.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/executor/config.go b/executor/config.go index 16e723f2..28ce0b6c 100644 --- a/executor/config.go +++ b/executor/config.go @@ -4,26 +4,32 @@ import ( "github.com/spf13/afero" "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/execute" ) +type ExecutionSigner interface { + Sign(execute.Request, execute.RuntimeOutput) ([]byte, error) +} + // defaultConfig used to create Executor. var defaultConfig = Config{ - WorkDir: "workspace", - RuntimeDir: "", - ExecutableName: blockless.RuntimeCLI(), - FS: afero.NewOsFs(), - Limiter: &noopLimiter{}, + WorkDir: "workspace", + RuntimeDir: "", + ExecutableName: blockless.RuntimeCLI(), + FS: afero.NewOsFs(), + Limiter: &noopLimiter{}, DriversRootPath: "", } // Config represents the Executor configuration. type Config struct { - WorkDir string // directory where files needed for the execution are stored - RuntimeDir string // directory where the executable can be found - ExecutableName string // name for the executable - DriversRootPath string // where are cgi drivers stored - FS afero.Fs // FS accessor - Limiter Limiter // Resource limiter for executed processes + WorkDir string // directory where files needed for the execution are stored + RuntimeDir string // directory where the executable can be found + ExecutableName string // name for the executable + DriversRootPath string // where are cgi drivers stored + FS afero.Fs // FS accessor + Limiter Limiter // Resource limiter for executed processes + Signer ExecutionSigner // Signer for the executor } type Option func(*Config) @@ -62,3 +68,10 @@ func WithLimiter(limiter Limiter) Option { cfg.Limiter = limiter } } + +// WithSigner sets the signer for the executor. +func WithSigner(signer ExecutionSigner) Option { + return func(cfg *Config) { + cfg.Signer = signer + } +} From b78b0c7b709e49d777bb3063f3fc154a2766dcde Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sat, 15 Jun 2024 13:53:30 +1200 Subject: [PATCH 2/8] execution function can sign request and responses if signer exists --- executor/execute_function.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/executor/execute_function.go b/executor/execute_function.go index ae47e72e..6242dd03 100644 --- a/executor/execute_function.go +++ b/executor/execute_function.go @@ -11,16 +11,15 @@ import ( func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) { // Execute the function. - out, usage, err := e.executeFunction(requestID, req) + out, usage, signature, err := e.executeFunction(requestID, req) if err != nil { - res := execute.Result{ Code: codes.Error, RequestID: requestID, Result: out, Usage: usage, + Signature: signature, } - return res, fmt.Errorf("function execution failed: %w", err) } @@ -29,6 +28,7 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu RequestID: requestID, Result: out, Usage: usage, + Signature: signature, } return res, nil @@ -36,7 +36,7 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu // executeFunction handles the actual execution of the Blockless function. It returns the // execution information like standard output, standard error, exit code and resource usage. -func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, error) { +func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, error) { log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger() @@ -47,7 +47,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions) if err != nil { - return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) + return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) } // Remove all temporary files after we're done. defer func() { @@ -66,10 +66,18 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu out, usage, err := e.executeCommand(cmd) if err != nil { - return out, execute.Usage{}, fmt.Errorf("command execution failed: %w", err) + return out, execute.Usage{}, []byte{}, fmt.Errorf("command execution failed: %w", err) } log.Info().Msg("command executed successfully") - return out, usage, nil + var signature []byte + if e.cfg.Signer != nil { + signature, err = e.cfg.Signer.Sign(req, out) + if err != nil { + return out, usage, []byte{}, fmt.Errorf("could not sign output: %w", err) + } + } + + return out, usage, signature, nil } From 7b8350d7521b75eb8be32ba74e3d33248f4fb4e6 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sat, 15 Jun 2024 13:54:13 +1200 Subject: [PATCH 3/8] pbft exection response support for arbitrary signatures --- consensus/pbft/execute.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index 11977099..b0a76665 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -1,6 +1,7 @@ package pbft import ( + "encoding/hex" "fmt" "time" @@ -98,6 +99,7 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error { RequestTimestamp: request.Timestamp, Replica: r.id, }, + Signature: hex.EncodeToString(res.Signature), } // Save this executions in case it's requested again. From 63364676b31fd36f94823816c7f17f41cabe78b2 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sat, 15 Jun 2024 13:54:59 +1200 Subject: [PATCH 4/8] response execution support for signature --- models/execute/response.go | 1 + node/worker_execute.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/models/execute/response.go b/models/execute/response.go index 6feddf38..080a7bad 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -15,6 +15,7 @@ type Result struct { Result RuntimeOutput `json:"result"` RequestID string `json:"request_id"` Usage Usage `json:"usage,omitempty"` + Signature []byte `json:"signature,omitempty"` } // Cluster represents the set of peers that executed the request. diff --git a/node/worker_execute.go b/node/worker_execute.go index 343b4d90..773ab403 100644 --- a/node/worker_execute.go +++ b/node/worker_execute.go @@ -2,6 +2,7 @@ package node import ( "context" + "encoding/hex" "fmt" "time" @@ -47,6 +48,7 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque Results: execute.ResultMap{ n.host.ID(): result, }, + Signature: hex.EncodeToString(result.Signature), } // Send the response, whatever it may be (success or failure). From f72b962792c54b5f729c81daf919b4def40d7e66 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sat, 15 Jun 2024 13:55:38 +1200 Subject: [PATCH 5/8] head node propagation can now also include signatures --- node/aggregate/aggregate.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/node/aggregate/aggregate.go b/node/aggregate/aggregate.go index 20cce244..0de8925c 100644 --- a/node/aggregate/aggregate.go +++ b/node/aggregate/aggregate.go @@ -17,11 +17,14 @@ type Result struct { Peers []peer.ID `json:"peers,omitempty"` // How frequent was this result, in percentages. Frequency float64 `json:"frequency,omitempty"` + // Signature of this result + Signature []byte `json:"signature,omitempty"` } type resultStats struct { - seen uint - peers []peer.ID + seen uint + peers []peer.ID + signature []byte } func Aggregate(results execute.ResultMap) Results { @@ -40,13 +43,15 @@ func Aggregate(results execute.ResultMap) Results { stat, ok := stats[output] if !ok { stats[output] = resultStats{ - seen: 0, - peers: make([]peer.ID, 0), + seen: 0, + peers: make([]peer.ID, 0), + signature: res.Signature, } } stat.seen++ stat.peers = append(stat.peers, executingPeer) + stat.signature = res.Signature stats[output] = stat } @@ -59,6 +64,7 @@ func Aggregate(results execute.ResultMap) Results { Result: res, Peers: stat.peers, Frequency: 100 * float64(stat.seen) / float64(total), + Signature: stat.signature, } aggregated = append(aggregated, aggr) From be3bf70491ea7072f1fed830c78943fc2ff9f08b Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sun, 23 Jun 2024 15:55:43 +1200 Subject: [PATCH 6/8] go fmt ./... --- models/execute/runtime.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/models/execute/runtime.go b/models/execute/runtime.go index 1b817e98..bc16e7b6 100644 --- a/models/execute/runtime.go +++ b/models/execute/runtime.go @@ -6,12 +6,12 @@ const ( // RuntimeConfig represents the CLI flags supported by the runtime type BLSRuntimeConfig struct { - Entry string `json:"entry,omitempty"` - ExecutionTime uint64 `json:"run_time,omitempty"` - DebugInfo bool `json:"debug_info,omitempty"` - Fuel uint64 `json:"limited_fuel,omitempty"` - Memory uint64 `json:"limited_memory,omitempty"` - Logger string `json:"runtime_logger,omitempty"` + Entry string `json:"entry,omitempty"` + ExecutionTime uint64 `json:"run_time,omitempty"` + DebugInfo bool `json:"debug_info,omitempty"` + Fuel uint64 `json:"limited_fuel,omitempty"` + Memory uint64 `json:"limited_memory,omitempty"` + Logger string `json:"runtime_logger,omitempty"` DriversRootPath string `json:"drivers_root_path,omitempty"` // Fields not allowed to be set in the request. Input string `json:"-"` @@ -29,5 +29,5 @@ const ( BLSRuntimeFlagLogger = "runtime-logger" BLSRuntimeFlagPermission = "permission" BLSRuntimeFlagEnv = "env" - BLSRuntimeFlagDrivers = "drivers-root-path" + BLSRuntimeFlagDrivers = "drivers-root-path" ) From 5ae2c67c0db3ecda2556cb29e8a539c84b63f757 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sun, 23 Jun 2024 15:56:11 +1200 Subject: [PATCH 7/8] metadata provider support for worker executor nodes --- executor/config.go | 12 ++++++++++++ executor/execute_function.go | 23 +++++++++++++++++------ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/executor/config.go b/executor/config.go index 28ce0b6c..81f41b79 100644 --- a/executor/config.go +++ b/executor/config.go @@ -11,6 +11,10 @@ type ExecutionSigner interface { Sign(execute.Request, execute.RuntimeOutput) ([]byte, error) } +type MetaProvider interface { + WithMetadata(execute.Request, execute.RuntimeOutput) (interface{}, error) +} + // defaultConfig used to create Executor. var defaultConfig = Config{ WorkDir: "workspace", @@ -30,6 +34,7 @@ type Config struct { FS afero.Fs // FS accessor Limiter Limiter // Resource limiter for executed processes Signer ExecutionSigner // Signer for the executor + MetaProvider MetaProvider // Metadata provider for the executor } type Option func(*Config) @@ -75,3 +80,10 @@ func WithSigner(signer ExecutionSigner) Option { cfg.Signer = signer } } + +// WithMetaProvider sets the metadata provider for the executor. +func WithMetaProvider(meta MetaProvider) Option { + return func(cfg *Config) { + cfg.MetaProvider = meta + } +} diff --git a/executor/execute_function.go b/executor/execute_function.go index 6242dd03..c9cb4494 100644 --- a/executor/execute_function.go +++ b/executor/execute_function.go @@ -11,7 +11,7 @@ import ( func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) { // Execute the function. - out, usage, signature, err := e.executeFunction(requestID, req) + out, usage, signature, meta, err := e.executeFunction(requestID, req) if err != nil { res := execute.Result{ Code: codes.Error, @@ -29,6 +29,7 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu Result: out, Usage: usage, Signature: signature, + Metadata: meta, } return res, nil @@ -36,7 +37,7 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu // executeFunction handles the actual execution of the Blockless function. It returns the // execution information like standard output, standard error, exit code and resource usage. -func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, error) { +func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, interface{}, error) { log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger() @@ -47,7 +48,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions) if err != nil { - return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) + return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, nil, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) } // Remove all temporary files after we're done. defer func() { @@ -66,7 +67,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu out, usage, err := e.executeCommand(cmd) if err != nil { - return out, execute.Usage{}, []byte{}, fmt.Errorf("command execution failed: %w", err) + return out, execute.Usage{}, []byte{}, nil, fmt.Errorf("command execution failed: %w", err) } log.Info().Msg("command executed successfully") @@ -75,9 +76,19 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu if e.cfg.Signer != nil { signature, err = e.cfg.Signer.Sign(req, out) if err != nil { - return out, usage, []byte{}, fmt.Errorf("could not sign output: %w", err) + return out, usage, []byte{}, nil, fmt.Errorf("failed to sign output: %w", err) } + log.Debug().Msg("output signed") } - return out, usage, signature, nil + var metadata interface{} + if e.cfg.MetaProvider != nil { + metadata, err = e.cfg.MetaProvider.WithMetadata(req, out) + if err != nil { + return out, usage, []byte{}, nil, fmt.Errorf("failed to inject metadata: %w", err) + } + log.Debug().Msg("metadata injected") + } + + return out, usage, signature, metadata, nil } From 2757edac702a7dfccd9c42d62a80afb903c268e1 Mon Sep 17 00:00:00 2001 From: zees-dev Date: Sun, 23 Jun 2024 15:56:38 +1200 Subject: [PATCH 8/8] worker result integrate metadata aggregation --- models/execute/response.go | 1 + node/aggregate/aggregate.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/models/execute/response.go b/models/execute/response.go index 080a7bad..d87c960b 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -16,6 +16,7 @@ type Result struct { RequestID string `json:"request_id"` Usage Usage `json:"usage,omitempty"` Signature []byte `json:"signature,omitempty"` + Metadata interface{} `json:"metadata,omitempty"` } // Cluster represents the set of peers that executed the request. diff --git a/node/aggregate/aggregate.go b/node/aggregate/aggregate.go index 0de8925c..e1bdc592 100644 --- a/node/aggregate/aggregate.go +++ b/node/aggregate/aggregate.go @@ -19,12 +19,15 @@ type Result struct { Frequency float64 `json:"frequency,omitempty"` // Signature of this result Signature []byte `json:"signature,omitempty"` + // Metadata is used to store additional information about the result. + Metadata interface{} `json:"metadata,omitempty"` } type resultStats struct { seen uint peers []peer.ID signature []byte + metadata interface{} } func Aggregate(results execute.ResultMap) Results { @@ -46,12 +49,14 @@ func Aggregate(results execute.ResultMap) Results { seen: 0, peers: make([]peer.ID, 0), signature: res.Signature, + metadata: res.Metadata, } } stat.seen++ stat.peers = append(stat.peers, executingPeer) stat.signature = res.Signature + stat.metadata = res.Metadata stats[output] = stat } @@ -65,6 +70,7 @@ func Aggregate(results execute.ResultMap) Results { Peers: stat.peers, Frequency: 100 * float64(stat.seen) / float64(total), Signature: stat.signature, + Metadata: stat.metadata, } aggregated = append(aggregated, aggr)