diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index fc5b168f030..c5782724652 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -20,7 +20,7 @@ import ( ) // maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request -const maxExecutionNodesCnt = 3 +const maxExecutionNodesCnt = 2 // Backends implements the Access API. // @@ -198,7 +198,7 @@ func convertStorageError(err error) error { } // executionNodesForBlockID returns upto maxExecutionNodesCnt number of randomly chosen execution node identities -// which have executed the given block ID. If no such execution node is found, then an error is returned. +// which have executed the given block ID. If no such execution node is found, an empty list is returned. func executionNodesForBlockID( blockID flow.Identifier, executionReceipts storage.ExecutionReceipts, @@ -217,7 +217,7 @@ func executionNodesForBlockID( } if len(executorIDs) == 0 { - return nil, fmt.Errorf("no execution node found for block ID %v: %w", blockID, err) + return flow.IdentityList{}, nil } // find the node identities of these execution nodes diff --git a/engine/access/rpc/backend/backend_accounts.go b/engine/access/rpc/backend/backend_accounts.go index 4f11e72c1d6..e7df97eb3fd 100644 --- a/engine/access/rpc/backend/backend_accounts.go +++ b/engine/access/rpc/backend/backend_accounts.go @@ -81,29 +81,29 @@ func (b *backendAccounts) getAccountAtBlockID( BlockId: blockID[:], } - var exeRes *execproto.GetAccountAtBlockIDResponse - var err error + execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get account from the execution node: %v", err) + } - if b.staticExecutionRPC != nil { + var exeRes *execproto.GetAccountAtBlockIDResponse + if len(execNodes) == 0 { + if b.staticExecutionRPC == nil { + return nil, status.Errorf(codes.Internal, "failed to get account from the execution node") + } exeRes, err = b.staticExecutionRPC.GetAccountAtBlockID(ctx, &exeReq) if err != nil { convertedErr := getAccountError(err) return nil, convertedErr } - } else { - - execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to get account from the execution node: %v", err) - } + } else { exeRes, err = b.getAccountFromAnyExeNode(ctx, execNodes, exeReq) if err != nil { return nil, err } } - account, err := convert.MessageToAccount(exeRes.GetAccount()) if err != nil { return nil, status.Errorf(codes.Internal, "failed to convert account message: %v", err) diff --git a/engine/access/rpc/backend/backend_events.go b/engine/access/rpc/backend/backend_events.go index 19c2a89c9ea..d3827d9128d 100644 --- a/engine/access/rpc/backend/backend_events.go +++ b/engine/access/rpc/backend/backend_events.go @@ -103,32 +103,34 @@ func (b *backendEvents) getBlockEventsFromExecutionNode( BlockIds: convert.IdentifiersToMessages(blockIDs), } + // choose the last block ID to find the list of execution nodes + lastBlockID := blockIDs[len(blockIDs)-1] + + execNodes, err := executionNodesForBlockID(lastBlockID, b.executionReceipts, b.state) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err) + } + var resp *execproto.GetEventsForBlockIDsResponse - var err error - if b.staticExecutionRPC != nil { + if len(execNodes) == 0 { + if b.staticExecutionRPC == nil { + return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node") + } + // call the execution node gRPC resp, err = b.staticExecutionRPC.GetEventsForBlockIDs(ctx, &req) if err != nil { return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err) } } else { - - // choose the last block ID to find the list of execution nodes - lastBlockID := blockIDs[len(blockIDs)-1] - - execNodes, err := executionNodesForBlockID(lastBlockID, b.executionReceipts, b.state) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err) - } - var successfulNode *flow.Identity resp, successfulNode, err = b.getEventsFromAnyExeNode(ctx, execNodes, req) if err != nil { - return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution node: %v", err) + return nil, status.Errorf(codes.Internal, "failed to retrieve events from execution nodes %s: %v", execNodes, err) } b.log.Trace(). Str("execution_id", successfulNode.String()). - Str("block_id", lastBlockID.String()). + Str("last_block_id", lastBlockID.String()). Msg("successfully got events") } diff --git a/engine/access/rpc/backend/backend_scripts.go b/engine/access/rpc/backend/backend_scripts.go index 37365855208..19c381b9568 100644 --- a/engine/access/rpc/backend/backend_scripts.go +++ b/engine/access/rpc/backend/backend_scripts.go @@ -86,21 +86,27 @@ func (b *backendScripts) executeScriptOnExecutionNode( Arguments: arguments, } - // if an executionRPC is provided, send the script to that execution node - if b.staticExecutionRPC != nil { + // find few execution nodes which have executed the block earlier and provided an execution receipt for it + execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err) + } + + // if no execution nodes were found, fall back to the static execution node if provided + if len(execNodes) == 0 { + if b.staticExecutionRPC == nil { + return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node") + } + // if an executionRPC is provided, send the script to that execution node execResp, err := b.staticExecutionRPC.ExecuteScriptAtBlockID(ctx, &execReq) if err != nil { return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err) } return execResp.GetValue(), nil - } - // find few execution nodes which have executed the block earlier and provided an execution receipt for it - execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) - if err != nil { - return nil, status.Errorf(codes.Internal, "failed to execute the script on the execution node: %v", err) } + // try each of the execution nodes found var errors *multierror.Error // try to execute the script on one of the execution nodes for _, execNode := range execNodes { diff --git a/engine/access/rpc/backend/backend_test.go b/engine/access/rpc/backend/backend_test.go index 1fc28806ee5..151979b6651 100644 --- a/engine/access/rpc/backend/backend_test.go +++ b/engine/access/rpc/backend/backend_test.go @@ -521,6 +521,12 @@ func (suite *Suite) TestGetEventsForBlockIDs() { suite.Run("with fixed execution node", func() { + // create receipt mocks that always returns empty + receipts := new(storagemock.ExecutionReceipts) + receipts. + On("ByBlockIDAllExecutionReceipts", mock.Anything). + Return([]flow.ExecutionReceipt{}, nil).Once() + // create the handler backend := New( suite.state, @@ -528,7 +534,7 @@ func (suite *Suite) TestGetEventsForBlockIDs() { nil, nil, suite.blocks, nil, nil, nil, - suite.receipts, + receipts, suite.chainID, metrics.NewNoopCollector(), nil, @@ -598,6 +604,12 @@ func (suite *Suite) TestGetEventsForHeightRange() { return headers } + // use the static execution node + // TODO: test the dynamic execution node selection logic + suite.receipts. + On("ByBlockIDAllExecutionReceipts", mock.Anything). + Return([]flow.ExecutionReceipt{}, nil) + setupExecClient := func() []flow.BlockEvents { blockIDs := make([]flow.Identifier, len(blockHeaders)) for i, header := range blockHeaders { @@ -816,6 +828,10 @@ func (suite *Suite) TestGetAccountAtBlockHeight() { Return(&h, nil). Once() + suite.receipts. + On("ByBlockIDAllExecutionReceipts", mock.Anything). + Return([]flow.ExecutionReceipt{}, nil).Once() + // create the expected execution API request blockID := h.ID() exeReq := &execproto.GetAccountAtBlockIDRequest{ diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index f9de076c081..3061bd634d5 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -388,9 +388,17 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( TransactionId: transactionID, } + execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) + if err != nil { + return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from any execution node: %v", err) + } + var resp *execproto.GetTransactionResultResponse - var err error - if b.executionRPC != nil { + if len(execNodes) == 0 { + if b.executionRPC == nil { + return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from execution node") + } + // call the execution node gRPC resp, err = b.executionRPC.GetTransactionResult(ctx, &req) if err != nil { @@ -399,11 +407,8 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( } return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from execution node: %v", err) } + } else { - execNodes, err := executionNodesForBlockID(blockID, b.executionReceipts, b.state) - if err != nil { - return nil, 0, "", status.Errorf(codes.Internal, "failed to retrieve result from any execution node: %v", err) - } resp, err = b.getTransactionResultFromAnyExeNode(ctx, execNodes, req) if err != nil { if status.Code(err) == codes.NotFound { diff --git a/engine/access/rpc/backend/retry_test.go b/engine/access/rpc/backend/retry_test.go index 0dcda2469fc..da0ff540d5f 100644 --- a/engine/access/rpc/backend/retry_test.go +++ b/engine/access/rpc/backend/retry_test.go @@ -88,6 +88,7 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() { // block storage returns the corresponding block suite.blocks.On("ByCollectionID", collection.ID()).Return(&block, nil) + txID := transactionBody.ID() blockID := block.ID() exeEventReq := execution.GetTransactionResultRequest{ @@ -98,6 +99,10 @@ func (suite *Suite) TestSuccessfulTransactionsDontRetry() { Events: nil, } + suite.receipts. + On("ByBlockIDAllExecutionReceipts", mock.Anything). + Return([]flow.ExecutionReceipt{}, nil) + // Setup Handler + Retry backend := New(suite.state, suite.execClient, suite.colClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, suite.receipts, suite.chainID, metrics.NewNoopCollector(), nil,