Skip to content

Commit

Permalink
first trying the Execution node for which there is an ER and falling … (
Browse files Browse the repository at this point in the history
#318)

* first trying the Execution node for which there is an ER and falling back to static EN otherwise

* fixing condition

* fixing unit test
  • Loading branch information
vishalchangrani authored Jan 23, 2021
1 parent 8d1a665 commit ae8f357
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 40 deletions.
6 changes: 3 additions & 3 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

// maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request
const maxExecutionNodesCnt = 3
const maxExecutionNodesCnt = 2

// Backends implements the Access API.
//
Expand Down Expand Up @@ -198,7 +198,7 @@ func convertStorageError(err error) error {
}

// executionNodesForBlockID returns upto maxExecutionNodesCnt number of randomly chosen execution node identities
// which have executed the given block ID. If no such execution node is found, then an error is returned.
// which have executed the given block ID. If no such execution node is found, an empty list is returned.
func executionNodesForBlockID(
blockID flow.Identifier,
executionReceipts storage.ExecutionReceipts,
Expand All @@ -217,7 +217,7 @@ func executionNodesForBlockID(
}

if len(executorIDs) == 0 {
return nil, fmt.Errorf("no execution node found for block ID %v: %w", blockID, err)
return flow.IdentityList{}, nil
}

// find the node identities of these execution nodes
Expand Down
20 changes: 10 additions & 10 deletions engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,29 @@ func (b *backendAccounts) getAccountAtBlockID(
BlockId: blockID[:],
}

var exeRes *execproto.GetAccountAtBlockIDResponse
var err error
execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get account from the execution node: %v", err)
}

if b.staticExecutionRPC != nil {
var exeRes *execproto.GetAccountAtBlockIDResponse
if len(execNodes) == 0 {
if b.staticExecutionRPC == nil {
return nil, status.Errorf(codes.Internal, "failed to get account from the execution node")
}

exeRes, err = b.staticExecutionRPC.GetAccountAtBlockID(ctx, &exeReq)
if err != nil {
convertedErr := getAccountError(err)
return nil, convertedErr
}
} else {

execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get account from the execution node: %v", err)
}

} else {
exeRes, err = b.getAccountFromAnyExeNode(ctx, execNodes, exeReq)
if err != nil {
return nil, err
}
}

account, err := convert.MessageToAccount(exeRes.GetAccount())
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to convert account message: %v", err)
Expand Down
28 changes: 15 additions & 13 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,34 @@ func (b *backendEvents) getBlockEventsFromExecutionNode(
BlockIds: convert.IdentifiersToMessages(blockIDs),
}

// choose the last block ID to find the list of execution nodes
lastBlockID := blockIDs[len(blockIDs)-1]

execNodes, err := executionNodesForBlockID(lastBlockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
}

var resp *execproto.GetEventsForBlockIDsResponse
var err error
if b.staticExecutionRPC != nil {
if len(execNodes) == 0 {
if b.staticExecutionRPC == nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node")
}

// call the execution node gRPC
resp, err = b.staticExecutionRPC.GetEventsForBlockIDs(ctx, &req)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
}
} else {

// choose the last block ID to find the list of execution nodes
lastBlockID := blockIDs[len(blockIDs)-1]

execNodes, err := executionNodesForBlockID(lastBlockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
}

var successfulNode *flow.Identity
resp, successfulNode, err = b.getEventsFromAnyExeNode(ctx, execNodes, req)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err)
return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution nodes %s: %v", execNodes, err)
}
b.log.Trace().
Str("execution_id", successfulNode.String()).
Str("block_id", lastBlockID.String()).
Str("last_block_id", lastBlockID.String()).
Msg("successfully got events")
}

Expand Down
20 changes: 13 additions & 7 deletions engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,27 @@ func (b *backendScripts) executeScriptOnExecutionNode(
Arguments: arguments,
}

// if an executionRPC is provided, send the script to that execution node
if b.staticExecutionRPC != nil {
// find few execution nodes which have executed the block earlier and provided an execution receipt for it
execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err)
}

// if no execution nodes were found, fall back to the static execution node if provided
if len(execNodes) == 0 {
if b.staticExecutionRPC == nil {
return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node")
}
// if an executionRPC is provided, send the script to that execution node
execResp, err := b.staticExecutionRPC.ExecuteScriptAtBlockID(ctx, &execReq)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err)
}
return execResp.GetValue(), nil
}

// find few execution nodes which have executed the block earlier and provided an execution receipt for it
execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err)
}

// try each of the execution nodes found
var errors *multierror.Error
// try to execute the script on one of the execution nodes
for _, execNode := range execNodes {
Expand Down
18 changes: 17 additions & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,20 @@ func (suite *Suite) TestGetEventsForBlockIDs() {

suite.Run("with fixed execution node", func() {

// create receipt mocks that always returns empty
receipts := new(storagemock.ExecutionReceipts)
receipts.
On("ByBlockIDAllExecutionReceipts", mock.Anything).
Return([]flow.ExecutionReceipt{}, nil).Once()

// create the handler
backend := New(
suite.state,
suite.execClient, // pass the default client
nil, nil,
suite.blocks,
nil, nil, nil,
suite.receipts,
receipts,
suite.chainID,
metrics.NewNoopCollector(),
nil,
Expand Down Expand Up @@ -598,6 +604,12 @@ func (suite *Suite) TestGetEventsForHeightRange() {
return headers
}

// use the static execution node
// TODO: test the dynamic execution node selection logic
suite.receipts.
On("ByBlockIDAllExecutionReceipts", mock.Anything).
Return([]flow.ExecutionReceipt{}, nil)

setupExecClient := func() []flow.BlockEvents {
blockIDs := make([]flow.Identifier, len(blockHeaders))
for i, header := range blockHeaders {
Expand Down Expand Up @@ -816,6 +828,10 @@ func (suite *Suite) TestGetAccountAtBlockHeight() {
Return(&h, nil).
Once()

suite.receipts.
On("ByBlockIDAllExecutionReceipts", mock.Anything).
Return([]flow.ExecutionReceipt{}, nil).Once()

// create the expected execution API request
blockID := h.ID()
exeReq := &execproto.GetAccountAtBlockIDRequest{
Expand Down
17 changes: 11 additions & 6 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,17 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode(
TransactionId: transactionID,
}

execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from any execution node: %v", err)
}

var resp *execproto.GetTransactionResultResponse
var err error
if b.executionRPC != nil {
if len(execNodes) == 0 {
if b.executionRPC == nil {
return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from execution node")
}

// call the execution node gRPC
resp, err = b.executionRPC.GetTransactionResult(ctx, &req)
if err != nil {
Expand All @@ -399,11 +407,8 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode(
}
return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from execution node: %v", err)
}

} else {
execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state)
if err != nil {
return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from any execution node: %v", err)
}
resp, err = b.getTransactionResultFromAnyExeNode(ctx, execNodes, req)
if err != nil {
if status.Code(err) == codes.NotFound {
Expand Down
5 changes: 5 additions & 0 deletions engine/access/rpc/backend/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() {
// block storage returns the corresponding block
suite.blocks.On("ByCollectionID", collection.ID()).Return(&block, nil)


txID := transactionBody.ID()
blockID := block.ID()
exeEventReq := execution.GetTransactionResultRequest{
Expand All @@ -98,6 +99,10 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() {
Events: nil,
}

suite.receipts.
On("ByBlockIDAllExecutionReceipts", mock.Anything).
Return([]flow.ExecutionReceipt{}, nil)

// Setup Handler + Retry
backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers,
suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil,
Expand Down

0 comments on commit ae8f357

Please sign in to comment.