-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker executor response signature support #154
Changes from all commits
fc11fcb
b78b0c7
7b8350d
6336467
f72b962
be3bf70
5ae2c67
2757eda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,26 +4,37 @@ import ( | |
"github.com/spf13/afero" | ||
|
||
"github.com/blocklessnetwork/b7s/models/blockless" | ||
"github.com/blocklessnetwork/b7s/models/execute" | ||
) | ||
|
||
type ExecutionSigner interface { | ||
Sign(execute.Request, execute.RuntimeOutput) ([]byte, error) | ||
} | ||
|
||
type MetaProvider interface { | ||
WithMetadata(execute.Request, execute.RuntimeOutput) (interface{}, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick - I prefer using |
||
} | ||
|
||
// defaultConfig used to create Executor. | ||
var defaultConfig = Config{ | ||
WorkDir: "workspace", | ||
RuntimeDir: "", | ||
ExecutableName: blockless.RuntimeCLI(), | ||
FS: afero.NewOsFs(), | ||
Limiter: &noopLimiter{}, | ||
WorkDir: "workspace", | ||
RuntimeDir: "", | ||
ExecutableName: blockless.RuntimeCLI(), | ||
FS: afero.NewOsFs(), | ||
Limiter: &noopLimiter{}, | ||
DriversRootPath: "", | ||
} | ||
|
||
// Config represents the Executor configuration. | ||
type Config struct { | ||
WorkDir string // directory where files needed for the execution are stored | ||
RuntimeDir string // directory where the executable can be found | ||
ExecutableName string // name for the executable | ||
DriversRootPath string // where are cgi drivers stored | ||
FS afero.Fs // FS accessor | ||
Limiter Limiter // Resource limiter for executed processes | ||
WorkDir string // directory where files needed for the execution are stored | ||
RuntimeDir string // directory where the executable can be found | ||
ExecutableName string // name for the executable | ||
DriversRootPath string // where are cgi drivers stored | ||
FS afero.Fs // FS accessor | ||
Limiter Limiter // Resource limiter for executed processes | ||
Signer ExecutionSigner // Signer for the executor | ||
MetaProvider MetaProvider // Metadata provider for the executor | ||
} | ||
|
||
type Option func(*Config) | ||
|
@@ -62,3 +73,17 @@ func WithLimiter(limiter Limiter) Option { | |
cfg.Limiter = limiter | ||
} | ||
} | ||
|
||
// WithSigner sets the signer for the executor. | ||
func WithSigner(signer ExecutionSigner) Option { | ||
return func(cfg *Config) { | ||
cfg.Signer = signer | ||
} | ||
} | ||
|
||
// WithMetaProvider sets the metadata provider for the executor. | ||
func WithMetaProvider(meta MetaProvider) Option { | ||
return func(cfg *Config) { | ||
cfg.MetaProvider = meta | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,16 +11,15 @@ import ( | |
func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) { | ||
|
||
// Execute the function. | ||
out, usage, err := e.executeFunction(requestID, req) | ||
out, usage, signature, meta, err := e.executeFunction(requestID, req) | ||
if err != nil { | ||
|
||
res := execute.Result{ | ||
Code: codes.Error, | ||
RequestID: requestID, | ||
Result: out, | ||
Usage: usage, | ||
Signature: signature, | ||
} | ||
|
||
return res, fmt.Errorf("function execution failed: %w", err) | ||
} | ||
|
||
|
@@ -29,14 +28,16 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu | |
RequestID: requestID, | ||
Result: out, | ||
Usage: usage, | ||
Signature: signature, | ||
Metadata: meta, | ||
} | ||
|
||
return res, nil | ||
} | ||
|
||
// executeFunction handles the actual execution of the Blockless function. It returns the | ||
// execution information like standard output, standard error, exit code and resource usage. | ||
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, error) { | ||
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, interface{}, error) { | ||
|
||
log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger() | ||
|
||
|
@@ -47,7 +48,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu | |
|
||
err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions) | ||
if err != nil { | ||
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) | ||
return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, nil, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) | ||
} | ||
// Remove all temporary files after we're done. | ||
defer func() { | ||
|
@@ -66,10 +67,28 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu | |
|
||
out, usage, err := e.executeCommand(cmd) | ||
if err != nil { | ||
return out, execute.Usage{}, fmt.Errorf("command execution failed: %w", err) | ||
return out, execute.Usage{}, []byte{}, nil, fmt.Errorf("command execution failed: %w", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And other places too - why not return |
||
} | ||
|
||
log.Info().Msg("command executed successfully") | ||
|
||
return out, usage, nil | ||
var signature []byte | ||
if e.cfg.Signer != nil { | ||
signature, err = e.cfg.Signer.Sign(req, out) | ||
if err != nil { | ||
return out, usage, []byte{}, nil, fmt.Errorf("failed to sign output: %w", err) | ||
} | ||
log.Debug().Msg("output signed") | ||
} | ||
|
||
var metadata interface{} | ||
if e.cfg.MetaProvider != nil { | ||
metadata, err = e.cfg.MetaProvider.WithMetadata(req, out) | ||
if err != nil { | ||
return out, usage, []byte{}, nil, fmt.Errorf("failed to inject metadata: %w", err) | ||
} | ||
log.Debug().Msg("metadata injected") | ||
} | ||
|
||
return out, usage, signature, metadata, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,11 +17,17 @@ type Result struct { | |
Peers []peer.ID `json:"peers,omitempty"` | ||
// How frequent was this result, in percentages. | ||
Frequency float64 `json:"frequency,omitempty"` | ||
// Signature of this result | ||
Signature []byte `json:"signature,omitempty"` | ||
// Metadata is used to store additional information about the result. | ||
Metadata interface{} `json:"metadata,omitempty"` | ||
} | ||
|
||
type resultStats struct { | ||
seen uint | ||
peers []peer.ID | ||
seen uint | ||
peers []peer.ID | ||
signature []byte | ||
metadata interface{} | ||
} | ||
|
||
func Aggregate(results execute.ResultMap) Results { | ||
|
@@ -40,13 +46,17 @@ func Aggregate(results execute.ResultMap) Results { | |
stat, ok := stats[output] | ||
if !ok { | ||
stats[output] = resultStats{ | ||
seen: 0, | ||
peers: make([]peer.ID, 0), | ||
seen: 0, | ||
peers: make([]peer.ID, 0), | ||
signature: res.Signature, | ||
metadata: res.Metadata, | ||
} | ||
} | ||
|
||
stat.seen++ | ||
stat.peers = append(stat.peers, executingPeer) | ||
stat.signature = res.Signature | ||
stat.metadata = res.Metadata | ||
Comment on lines
48
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this will work as intended.. Is this signature like a hash that can be recalculated by anyone or is it a signature that can be tied to the signer (can be verified by a public key)? I assume it's the latter. Let's say we have 5 execution responses from 5 worker nodes. We will have e.g. the same execution result (stdout of the blockless runtime) but 5 different signatures, and I believe 5 different metadata objects. (I'm not sure if metadata is unique or not, but since it's configurable, it can be anything.) When aggregating, we iterate through the results, and for the seen results, we increment the number of times it was seen, add the worker ID to the list of peers that have this exact result, and set the signature and metadata to that of the result we just processed. Meaning if we iterate through execution results of worker1 through worker5, we will save the signature of worker5 (if we process that one last, which also isn't guaranteed in a map). In each iteration we override the previous signature/metadata. I think we will need to have a map that will map worker IDs to signatures and metadata. For example something like: {
"result": "whatever",
"peers": [
"worker1",
"worker2",
"worker3",
"worker4",
"worker5"
],
"frequency": 100,
"signatures": {
"worker1": {
"scheme": "abc", // without specifying the scheme we don't know how to verify the signature, no?
"sig": "0x1234567890abcdef1234567890abcdef"
},
// other workers
},
"metadata": {
"worker1": {
// metadata object
},
// other workers
}
} |
||
|
||
stats[output] = stat | ||
} | ||
|
@@ -59,6 +69,8 @@ func Aggregate(results execute.ResultMap) Results { | |
Result: res, | ||
Peers: stat.peers, | ||
Frequency: 100 * float64(stat.seen) / float64(total), | ||
Signature: stat.signature, | ||
Metadata: stat.metadata, | ||
} | ||
|
||
aggregated = append(aggregated, aggr) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PBFT already has and mandates signatures - note the
msg.Sign()
invocation below that will overwrite this value.