Skip to content

Commit

Permalink
Use attributes on roll call/execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Oct 4, 2023
1 parent 4ec007c commit afc761f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cmd/node/node
cmd/keygen/keygen
cmd/keyforge/keyforge
cmd/bootstrap-limiter/bootstrap-limiter

dist/
Expand Down
16 changes: 16 additions & 0 deletions models/execute/request.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package execute

import "github.com/libp2p/go-libp2p/core/peer"

// Request describes an execution request.
type Request struct {
FunctionID string `json:"function_id"`
Expand All @@ -25,6 +27,8 @@ type Config struct {
Permissions []string `json:"permissions,omitempty"`
ResultAggregation ResultAggregation `json:"result_aggregation,omitempty"`

Attributes *Attributes `json:"attributes,omitempty"`

// NodeCount specifies how many nodes should execute this request.
NodeCount int `json:"number_of_nodes,omitempty"`
// Consensus algorithm to use. Raft and PBFT are supported at this moment.
Expand All @@ -45,3 +49,15 @@ type ResultAggregation struct {
Type string `json:"type,omitempty"`
Parameters []Parameter `json:"parameters,omitempty"`
}

type Attributes struct {
// Values specify which attributes the node in question should have.
// At the moment we support strict equality only, so no `if RAM >= 16GB` types of conditions.
Values []Parameter `json:"attributes,omitempty"`

// Should we accept nodes whose attributes are not attested?
AttestationRequired bool `json:"attestation_required,omitempty"`

// Explicitly request specific attestors.
Attestors []peer.ID `json:"attestors,omitempty"`
}
14 changes: 8 additions & 6 deletions models/request/roll_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import (
"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/models/execute"
)

// RollCall describes the `MessageRollCall` message payload.
type RollCall struct {
From peer.ID `json:"from,omitempty"`
Type string `json:"type,omitempty"`
Origin peer.ID `json:"origin,omitempty"` // Origin is the peer that initiated the roll call.
FunctionID string `json:"function_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
Consensus consensus.Type `json:"consensus"`
From peer.ID `json:"from,omitempty"`
Type string `json:"type,omitempty"`
Origin peer.ID `json:"origin,omitempty"` // Origin is the peer that initiated the roll call.
FunctionID string `json:"function_id,omitempty"`
RequestID string `json:"request_id,omitempty"`
Consensus consensus.Type `json:"consensus"`
Attributes *execute.Attributes `json:"attributes,omitempty"`
}
50 changes: 50 additions & 0 deletions node/attributes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package node

import (
"errors"
"fmt"
"net/http"

Expand All @@ -10,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s-attributes/attributes"
"github.com/blocklessnetwork/b7s/models/execute"
)

const (
Expand Down Expand Up @@ -53,3 +55,51 @@ func getAttributesIPNSName(key crypto.PubKey) (string, error) {
func ipnsGatewayURL(name string) string {
return fmt.Sprintf("https://%s.ipns.cf-ipfs.com/%s", name, defaultAttributesFilename)
}

func haveAttributes(have attributes.Attestation, want execute.Attributes) error {

if want.AttestationRequired && len(have.Attestors) == 0 {
return errors.New("attestors required but none found")
}

// If the client wants specific attestors, check if they're present.
if len(want.Attestors) > 0 {

attestors := make(map[peer.ID]struct{}, len(have.Attestors))
for _, attestor := range have.Attestors {
attestors[attestor.Signer] = struct{}{}
}

for _, wa := range want.Attestors {
_, ok := attestors[wa]
if !ok {
return fmt.Errorf("attestor %s explicitly requested but not found", wa.String())
}
}
}

// It doesn't make a lot of sense to require attestors without wanting specific attributes,
// but if that's the case, and there's no attributes wanted, we're done now.
if len(want.Values) == 0 {
return nil
}

attrs := make(map[string]string, len(have.Attributes))
for _, attr := range have.Attributes {
attrs[attr.Name] = attr.Value
}

for _, wantAttr := range want.Values {

value, ok := attrs[wantAttr.Name]
if !ok {
return fmt.Errorf("attribute wanted but not found (attr: %v, value: %v)", wantAttr.Name, wantAttr.Value)
}

if value != wantAttr.Value {
return fmt.Errorf("attribute wanted but value doesn't match (attr: %v, want: %v, have: %v)", wantAttr.Name, wantAttr.Value, value)
}
}

return nil
}
2 changes: 1 addition & 1 deletion node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re
log.Info().Msg("processing execution request")

// Phase 1. - Issue roll call to nodes.
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo)
reportingPeers, err := n.executeRollCall(ctx, requestID, req.FunctionID, nodeCount, consensusAlgo, req.Config.Attributes)
if err != nil {
code := codes.Error
if errors.Is(err, blockless.ErrRollCallTimeout) {
Expand Down
22 changes: 19 additions & 3 deletions node/roll_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/models/request"
"github.com/blocklessnetwork/b7s/models/response"
)
Expand Down Expand Up @@ -39,6 +40,20 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte
return nil
}

if req.Attributes != nil {

if n.attributes == nil {
log.Info().Msg("skipping attributed execution requested")
return nil
}

err := haveAttributes(*n.attributes, *req.Attributes)
if err != nil {
log.Info().Err(err).Msg("skipping attributed execution request - we do not match requested attributes")
return nil
}
}

// Base response to return.
res := response.RollCall{
Type: blockless.MessageRollCallResponse,
Expand Down Expand Up @@ -87,7 +102,7 @@ func (n *Node) processRollCall(ctx context.Context, from peer.ID, payload []byte
return nil
}

func (n *Node) executeRollCall(ctx context.Context, requestID string, functionID string, nodeCount int, consensus consensus.Type) ([]peer.ID, error) {
func (n *Node) executeRollCall(ctx context.Context, requestID string, functionID string, nodeCount int, consensus consensus.Type, attributes *execute.Attributes) ([]peer.ID, error) {

// Create a logger with relevant context.
log := n.log.With().Str("request", requestID).Str("function", functionID).Int("node_count", nodeCount).Logger()
Expand All @@ -97,7 +112,7 @@ func (n *Node) executeRollCall(ctx context.Context, requestID string, functionID
n.rollCall.create(requestID)
defer n.rollCall.remove(requestID)

err := n.publishRollCall(ctx, requestID, functionID, consensus)
err := n.publishRollCall(ctx, requestID, functionID, consensus, attributes)
if err != nil {
return nil, fmt.Errorf("could not publish roll call: %w", err)
}
Expand Down Expand Up @@ -150,7 +165,7 @@ rollCallResponseLoop:

// publishRollCall will create a roll call request for executing the given function.
// On successful issuance of the roll call request, we return the ID of the issued request.
func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID string, consensus consensus.Type) error {
func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID string, consensus consensus.Type, attributes *execute.Attributes) error {

// Create a roll call request.
rollCall := request.RollCall{
Expand All @@ -159,6 +174,7 @@ func (n *Node) publishRollCall(ctx context.Context, requestID string, functionID
FunctionID: functionID,
RequestID: requestID,
Consensus: consensus,
Attributes: attributes,
}

// Publish the mssage.
Expand Down

0 comments on commit afc761f

Please sign in to comment.