Skip to content

Commit

Permalink
Fix up models
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum committed Jul 1, 2024
1 parent 80723c9 commit 838776f
Show file tree
Hide file tree
Showing 18 changed files with 52 additions and 69 deletions.
4 changes: 2 additions & 2 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAPI_Execute(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

cluster := execute.Cluster{
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

return expectedCode, "", res, execute.Cluster{}, mocks.GenericError
Expand Down
3 changes: 1 addition & 2 deletions api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import (

"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/models/response"
)

type Node interface {
ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results response.ExecutionResultMap, peers execute.Cluster, err error)
ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (code codes.Code, requestID string, results execute.ResultMap, peers execute.Cluster, err error)
ExecutionResult(id string) (execute.Result, bool)
PublishFunctionInstall(ctx context.Context, uri string, cid string, subgroup string) error
}
4 changes: 2 additions & 2 deletions consensus/pbft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {
msg := response.Execute{
Code: res.Code,
RequestID: request.ID,
Results: response.ExecutionResultMap{
r.id: response.ExecutionResult{
Results: execute.ResultMap{
r.id: execute.NodeResult{
Result: res,
Metadata: metadata,
},
Expand Down
6 changes: 2 additions & 4 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ import (
"github.com/blocklessnetwork/b7s/models/execute"
)

type Metadata map[string]any

type Provider interface {
Metadata(execute.Request, execute.RuntimeOutput) (Metadata, error)
Metadata(execute.Request, execute.RuntimeOutput) (any, error)
}

type noopProvider struct{}

func (p noopProvider) Metadata(execute.Request, execute.RuntimeOutput) (Metadata, error) {
func (p noopProvider) Metadata(execute.Request, execute.RuntimeOutput) (any, error) {
return map[string]any{}, nil
}

Expand Down
10 changes: 8 additions & 2 deletions models/execute/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/blocklessnetwork/b7s/models/codes"
)

// NodeResult is an annotated execution result.
type NodeResult struct {
Result
Metadata any `json:"metadata,omitempty"`
}

// Result describes an execution result.
type Result struct {
Code codes.Code `json:"code"`
Expand Down Expand Up @@ -39,7 +45,7 @@ type Usage struct {
}

// ResultMap contains execution results from multiple peers.
type ResultMap map[peer.ID]Result
type ResultMap map[peer.ID]NodeResult

// MarshalJSON provides means to correctly handle JSON serialization/deserialization.
// See:
Expand All @@ -48,7 +54,7 @@ type ResultMap map[peer.ID]Result
// https://github.com/libp2p/go-libp2p-resource-manager/pull/67#issuecomment-1176820561
func (m ResultMap) MarshalJSON() ([]byte, error) {

em := make(map[string]Result, len(m))
em := make(map[string]NodeResult, len(m))
for p, v := range m {
em[p.String()] = v
}
Expand Down
26 changes: 4 additions & 22 deletions models/response/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
Expand All @@ -20,10 +19,10 @@ var _ (json.Marshaler) = (*Execute)(nil)

// Execute describes the response to the `MessageExecute` message.
type Execute struct {
RequestID string `json:"request_id,omitempty"`
Code codes.Code `json:"code,omitempty"`
Results ExecutionResultMap `json:"results,omitempty"`
Cluster execute.Cluster `json:"cluster,omitempty"`
RequestID string `json:"request_id,omitempty"`
Code codes.Code `json:"code,omitempty"`
Results execute.ResultMap `json:"results,omitempty"`
Cluster execute.Cluster `json:"cluster,omitempty"`

PBFT PBFTResultInfo `json:"pbft,omitempty"`
// Signed digest of the response.
Expand All @@ -33,23 +32,6 @@ type Execute struct {
Message string `json:"message,omitempty"`
}

type ExecutionResultMap map[peer.ID]ExecutionResult

func (m ExecutionResultMap) MarshalJSON() ([]byte, error) {

em := make(map[string]ExecutionResult, len(m))
for p, v := range m {
em[p.String()] = v
}

return json.Marshal(em)
}

type ExecutionResult struct {
execute.Result
Metadata metadata.Metadata `json:"metadata,omitempty"`
}

func (Execute) Type() string { return blockless.MessageExecuteResponse }

func (e Execute) MarshalJSON() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion models/response/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestExecute_Signing(t *testing.T) {
RequestID: mocks.GenericUUID.String(),
Code: codes.OK,
Results: execute.ResultMap{
mocks.GenericPeerID: mocks.GenericExecutionResult,
mocks.GenericPeerID: execute.NodeResult{Result: mocks.GenericExecutionResult},
},
Cluster: execute.Cluster{
Peers: mocks.GenericPeerIDs[:4],
Expand Down
8 changes: 3 additions & 5 deletions node/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import (

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/models/response"
)

func Aggregate(results response.ExecutionResultMap) Results {
func Aggregate(results execute.ResultMap) Results {

total := len(results)
if total == 0 {
Expand All @@ -20,7 +18,7 @@ func Aggregate(results response.ExecutionResultMap) Results {
type resultStats struct {
seen uint
peers []peer.ID
metadata map[peer.ID]metadata.Metadata
metadata map[peer.ID]any
}

stats := make(map[execute.RuntimeOutput]resultStats)
Expand All @@ -34,7 +32,7 @@ func Aggregate(results response.ExecutionResultMap) Results {
stat = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
metadata: make(map[peer.ID]metadata.Metadata),
metadata: make(map[peer.ID]any),
}
}

Expand Down
5 changes: 2 additions & 3 deletions node/aggregate/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/execute"
)

Expand All @@ -22,11 +21,11 @@ type Result struct {
Frequency float64 `json:"frequency,omitempty"`
}

type NodeMetadata map[peer.ID]metadata.Metadata
type NodeMetadata map[peer.ID]any

func (m NodeMetadata) MarshalJSON() ([]byte, error) {

em := make(map[string]metadata.Metadata, len(m))
em := make(map[string]any, len(m))
for p, v := range m {
em[p.String()] = v
}
Expand Down
2 changes: 1 addition & 1 deletion node/cluster_pbft_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ This is the end of my program

for peer, exres := range res.Results {
require.Contains(t, workerIDs, peer)
require.Equal(t, expectedExecutionResult, exres.Result.Stdout)
require.Equal(t, expectedExecutionResult, exres.Result.Result.Stdout)
}

t.Log("client verified execution response")
Expand Down
4 changes: 2 additions & 2 deletions node/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (n *Node) createRaftCluster(ctx context.Context, from peer.ID, fc request.F
msg := response.Execute{
Code: res.Code,
RequestID: req.RequestID,
Results: response.ExecutionResultMap{
n.host.ID(): response.ExecutionResult{
Results: execute.ResultMap{
n.host.ID(): execute.NodeResult{
Result: res,
Metadata: metadata,
},
Expand Down
2 changes: 1 addition & 1 deletion node/execute_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ This is the end of my program

require.Equal(t, codes.OK, res.Code)
require.NotEmpty(t, res.RequestID)
require.Equal(t, expectedExecutionResult, res.Results[worker.host.ID()].Result.Stdout)
require.Equal(t, expectedExecutionResult, res.Results[worker.host.ID()].Result.Result.Stdout)

t.Log("client verified execution response")

Expand Down
13 changes: 7 additions & 6 deletions node/execute_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/blocklessnetwork/b7s/host"
Expand Down Expand Up @@ -88,7 +87,7 @@ func TestNode_WorkerExecute(t *testing.T) {
require.Equal(t, outRequestID, received.RequestID)
require.Equal(t, expected.Code, received.Code)

require.Equal(t, expected.Result, received.Results[node.host.ID()].Result)
require.Equal(t, expected.Result, received.Results[node.host.ID()].Result.Result)
})

err = node.processExecute(context.Background(), receiver.ID(), executionRequest)
Expand Down Expand Up @@ -146,7 +145,7 @@ func TestNode_WorkerExecute(t *testing.T) {

require.Equal(t, received.RequestID, requestID)
require.Equal(t, faultyExecutionResult.Code, received.Code)
require.Equal(t, faultyExecutionResult.Result, received.Results[node.host.ID()].Result)
require.Equal(t, faultyExecutionResult.Result, received.Results[node.host.ID()].Result.Result)
})

err = node.processExecute(context.Background(), receiver.ID(), executionRequest)
Expand Down Expand Up @@ -332,10 +331,12 @@ func TestNode_HeadExecute(t *testing.T) {
res := response.Execute{
Code: codes.OK,
RequestID: requestID,
Results: map[peer.ID]execute.Result{
Results: execute.ResultMap{
mockWorker.Host.ID(): {
Code: codes.OK,
Result: executionResult,
Result: execute.Result{
Code: codes.OK,
Result: executionResult,
},
},
},
}
Expand Down
17 changes: 8 additions & 9 deletions node/execution_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,29 @@ import (
"github.com/rs/zerolog/log"

"github.com/blocklessnetwork/b7s/consensus/pbft"
"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/models/response"
)

// gatherExecutionResultsPBFT collects execution results from a PBFT cluster. This means f+1 identical results.
func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, peers []peer.ID) response.ExecutionResultMap {
func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string, peers []peer.ID) execute.ResultMap {

exctx, exCancel := context.WithTimeout(ctx, n.cfg.ExecutionTimeout)
defer exCancel()

type aggregatedResult struct {
result execute.Result
peers []peer.ID
metadata map[peer.ID]metadata.Metadata
metadata map[peer.ID]any
}

var (
count = pbft.MinClusterResults(uint(len(peers)))
lock sync.Mutex
wg sync.WaitGroup

results = make(map[string]aggregatedResult)
out response.ExecutionResultMap = make(map[peer.ID]response.ExecutionResult)
results = make(map[string]aggregatedResult)
out execute.ResultMap = make(map[peer.ID]execute.NodeResult)
)

wg.Add(len(peers))
Expand Down Expand Up @@ -80,7 +79,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,
peers: []peer.ID{
sender,
},
metadata: map[peer.ID]metadata.Metadata{
metadata: map[peer.ID]any{
sender: exres.Metadata,
},
}
Expand All @@ -96,7 +95,7 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,
exCancel()

for _, peer := range result.peers {
out[peer] = response.ExecutionResult{
out[peer] = execute.NodeResult{
Result: result.result,
Metadata: result.metadata[peer],
}
Expand All @@ -111,14 +110,14 @@ func (n *Node) gatherExecutionResultsPBFT(ctx context.Context, requestID string,
}

// gatherExecutionResults collects execution results from direct executions or raft clusters.
func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, peers []peer.ID) response.ExecutionResultMap {
func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, peers []peer.ID) execute.ResultMap {

// We're willing to wait for a limited amount of time.
exctx, exCancel := context.WithTimeout(ctx, n.cfg.ExecutionTimeout)
defer exCancel()

var (
results response.ExecutionResultMap = make(map[peer.ID]response.ExecutionResult)
results execute.ResultMap = make(map[peer.ID]execute.NodeResult)
reslock sync.Mutex
wg sync.WaitGroup
)
Expand Down
4 changes: 2 additions & 2 deletions node/head_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (n *Node) headProcessExecute(ctx context.Context, from peer.ID, req request

// headExecute is called on the head node. The head node will publish a roll call and delegate an execution request to chosen nodes.
// The returned map contains execution results, mapped to the peer IDs of peers who reported them.
func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, subgroup string) (codes.Code, response.ExecutionResultMap, execute.Cluster, error) {
func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Request, subgroup string) (codes.Code, execute.ResultMap, execute.Cluster, error) {

nodeCount := -1
if req.Config.NodeCount >= 1 {
Expand Down Expand Up @@ -136,7 +136,7 @@ func (n *Node) headExecute(ctx context.Context, requestID string, req execute.Re

log.Debug().Msg("waiting for execution responses")

var results response.ExecutionResultMap
var results execute.ResultMap
if consensusAlgo == consensus.PBFT {
results = n.gatherExecutionResultsPBFT(ctx, requestID, reportingPeers)

Expand Down
3 changes: 1 addition & 2 deletions node/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"github.com/blocklessnetwork/b7s/models/codes"
"github.com/blocklessnetwork/b7s/models/execute"
"github.com/blocklessnetwork/b7s/models/request"
"github.com/blocklessnetwork/b7s/models/response"
)

// ExecuteFunction can be used to start function execution. At the moment this is used by the API server to start execution on the head node.
func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, response.ExecutionResultMap, execute.Cluster, error) {
func (n *Node) ExecuteFunction(ctx context.Context, req execute.Request, subgroup string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

if !n.isHead() {
return codes.NotAvailable, "", nil, execute.Cluster{}, fmt.Errorf("action not supported on this node type")
Expand Down
4 changes: 2 additions & 2 deletions node/worker_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque
res := response.Execute{
Code: code,
RequestID: requestID,
Results: response.ExecutionResultMap{
n.host.ID(): response.ExecutionResult{
Results: execute.ResultMap{
n.host.ID(): execute.NodeResult{
Result: result,
Metadata: metadata,
},
Expand Down
4 changes: 3 additions & 1 deletion testing/mocks/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ func BaselineNode(t *testing.T) *Node {
ExecuteFunctionFunc: func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

results := execute.ResultMap{
GenericPeerID: GenericExecutionResult,
GenericPeerID: execute.NodeResult{
Result: GenericExecutionResult,
},
}

// TODO: Add a generic cluster info
Expand Down

0 comments on commit 838776f

Please sign in to comment.