diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go index a82b365240e..b5ed2c249d8 100644 --- a/engine/access/rpc/backend/backend_stream_transactions.go +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -36,7 +36,7 @@ type backendSubscribeTransactions struct { type TransactionSubscriptionMetadata struct { *access.TransactionResult txReferenceBlockID flow.Identifier - blockWithTx *flow.Header + blockWithTx *flow.Block txExecuted bool eventEncodingVersion entities.EventEncodingVersion } @@ -126,7 +126,7 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran // When a block with the transaction is available, it is possible to receive a new transaction status while // searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction // statuses are the same, the current transaction status should be retrieved. - txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted) + txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.BlockHeight, txInfo.txExecuted) } if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { @@ -229,7 +229,7 @@ func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error { func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( height uint64, txInfo *TransactionSubscriptionMetadata, -) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) { +) (*flow.Block, flow.Identifier, uint64, flow.Identifier, error) { block, err := b.txLocalDataProvider.blocks.ByHeight(height) if err != nil { return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up block: %w", err) @@ -241,43 +241,40 @@ func (b *backendSubscribeTransactions) searchForTransactionBlockInfo( } if collectionID != flow.ZeroID { - return block.Header, block.ID(), height, collectionID, nil + return block, block.ID(), height, collectionID, nil } return nil, flow.ZeroID, 0, flow.ZeroID, nil } -// searchForTransactionResult searches for the transaction result of a block. It retrieves the execution result for the specified block ID. -// Expected errors: -// - codes.Internal if an internal error occurs while retrieving execution result. +// searchForTransactionResult searches for the transaction result of a block. It retrieves the transaction result from +// storage and, in case of failure, attempts to fetch the transaction result directly from the execution node. +// This is necessary to ensure data availability despite sync storage latency. +// +// No errors expected during normal operations. func (b *backendSubscribeTransactions) searchForTransactionResult( ctx context.Context, txInfo *TransactionSubscriptionMetadata, ) (*access.TransactionResult, error) { - _, err := b.executionResults.ByBlockID(txInfo.BlockID) + txResult, err := b.backendTransactions.GetTransactionResultFromStorage(ctx, txInfo.blockWithTx, txInfo.TransactionID, txInfo.eventEncodingVersion) if err != nil { - if errors.Is(err, storage.ErrNotFound) { - return nil, nil - } - return nil, fmt.Errorf("failed to get execution result for block %s: %w", txInfo.BlockID, err) - } - - txResult, err := b.backendTransactions.GetTransactionResult( - ctx, - txInfo.TransactionID, - txInfo.BlockID, - txInfo.CollectionID, - txInfo.eventEncodingVersion, - ) + // If any error occurs with local storage - request transaction result from EN + txResult, err = b.backendTransactions.GetTransactionResultFromExecutionNode( + ctx, + txInfo.blockWithTx, + txInfo.TransactionID, + txInfo.eventEncodingVersion, + ) - if err != nil { - // if either the storage or execution node reported no results or there were not enough execution results - if status.Code(err) == codes.NotFound { - // No result yet, indicate that it has not been executed - return nil, nil + if err != nil { + // if either the execution node reported no results + if status.Code(err) == codes.NotFound { + // No result yet, indicate that it has not been executed + return nil, nil + } + // Other Error trying to retrieve the result, return with err + return nil, err } - // Other Error trying to retrieve the result, return with err - return nil, err } return txResult, nil diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go index 24cdf601f17..bb170fff06b 100644 --- a/engine/access/rpc/backend/backend_stream_transactions_test.go +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/dgraph-io/badger/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -22,6 +25,7 @@ import ( connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" "github.com/onflow/flow-go/engine/access/subscription" subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock" + commonrpc "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" @@ -30,6 +34,7 @@ import ( syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" protocolint "github.com/onflow/flow-go/state/protocol" protocol "github.com/onflow/flow-go/state/protocol/mock" + "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/unittest" @@ -98,9 +103,6 @@ func (s *TransactionStatusSuite) SetupTest() { s.tempSnapshot = &protocol.Snapshot{} s.db, s.dbDir = unittest.TempBadgerDB(s.T()) - params := protocol.NewParams(s.T()) - s.state.On("Params").Return(params) - s.blocks = storagemock.NewBlocks(s.T()) s.headers = storagemock.NewHeaders(s.T()) s.transactions = storagemock.NewTransactions(s.T()) @@ -121,11 +123,24 @@ func (s *TransactionStatusSuite) SetupTest() { s.blockTracker = subscriptionmock.NewBlockTracker(s.T()) s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{} + s.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "not found")).Maybe() + s.connectionFactory.On("GetExecutionAPIClient", mock.Anything).Return(s.execClient, &mocks.MockCloser{}, nil).Maybe() + // generate blockCount consecutive blocks with associated seal, result and execution data s.rootBlock = unittest.BlockFixture() rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock)) s.resultsMap[s.rootBlock.ID()] = rootResult + params := protocol.NewParams(s.T()) + params.On("FinalizedRoot").Return(s.rootBlock.Header).Maybe() + s.state.On("Params").Return(params).Maybe() + + var receipts flow.ExecutionReceiptList + executionNodes := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) + receipts = unittest.ReceiptsForBlockFixture(&s.rootBlock, executionNodes.NodeIDs()) + s.receipts.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(receipts, nil).Maybe() + s.finalSnapshot.On("Identities", mock.Anything).Return(executionNodes, nil).Maybe() + var err error s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter( bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight), @@ -219,6 +234,14 @@ func (s *TransactionStatusSuite) backendParams() Params { TxResultQueryMode: IndexQueryModeLocalOnly, EventsIndex: index.NewEventsIndex(s.indexReporter, s.events), LastFullBlockHeight: s.lastFullBlockHeight, + ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider( + s.log, + s.state, + s.receipts, + nil, + nil, + ), + ConnFactory: s.connectionFactory, } } @@ -246,36 +269,20 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { finalizedHeader := s.finalizedBlock.Header return finalizedHeader.Height, nil }, nil) - s.blocks.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) (*flow.Block, error) { - for _, block := range s.blockMap { - if block.ID() == blockID { - return block, nil - } - } - return nil, nil - }, nil) s.sealedSnapshot.On("Head").Return(func() *flow.Header { return s.sealedBlock.Header }, nil) s.state.On("Sealed").Return(s.sealedSnapshot, nil) - s.results.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(mocks.StorageMapGetter(s.resultsMap)) // Generate sent transaction with ref block of the current finalized block transaction := unittest.TransactionFixture() transaction.SetReferenceBlockID(s.finalizedBlock.ID()) - s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil) col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction}) guarantee := col.Guarantee() light := col.Light() txId := transaction.ID() - txResult := flow.LightTransactionResult{ - TransactionID: txId, - Failed: false, - ComputationUsed: 0, - } - eventsForTx := unittest.EventsFixture(1, flow.EventAccountCreated) eventMessages := make([]*entities.Event, 1) for j, event := range eventsForTx { @@ -288,11 +295,21 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { mock.AnythingOfType("flow.Identifier"), ).Return(eventsForTx, nil) + hasTransactionResultInStorage := false s.transactionResults.On( "ByBlockIDTransactionID", mock.AnythingOfType("flow.Identifier"), mock.AnythingOfType("flow.Identifier"), - ).Return(&txResult, nil) + ).Return(func(blockID flow.Identifier, transactionID flow.Identifier) (*flow.LightTransactionResult, error) { + if hasTransactionResultInStorage { + return &flow.LightTransactionResult{ + TransactionID: txId, + Failed: false, + ComputationUsed: 0, + }, nil + } + return nil, storage.ErrNotFound + }).Twice() // Create a special common function to read subscription messages from the channel and check converting it to transaction info // and check results for correctness @@ -328,6 +345,8 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { // 3. Add one more finalized block on top of the transaction block and add execution results to storage finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) s.resultsMap[s.finalizedBlock.ID()] = finalizedResult + // init transaction result for storage + hasTransactionResultInStorage = true s.addNewFinalizedBlock(s.finalizedBlock.Header, true) checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted) diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 5ba1cecb038..6dc0892a0ea 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -651,14 +651,14 @@ func (b *backendTransactions) lookupTransactionResult( var err error switch b.txResultQueryMode { case IndexQueryModeExecutionNodesOnly: - txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) + txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) case IndexQueryModeLocalOnly: txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion) case IndexQueryModeFailover: txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion) if err != nil { // If any error occurs with local storage - request transaction result from EN - txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) + txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion) } default: return nil, status.Errorf(codes.Internal, "unknown transaction result query mode: %v", b.txResultQueryMode) @@ -742,7 +742,13 @@ func (b *backendTransactions) registerTransactionForRetry(tx *flow.TransactionBo b.retry.RegisterTransaction(referenceBlock.Height, tx) } -func (b *backendTransactions) getTransactionResultFromExecutionNode( +// GetTransactionResultFromExecutionNode retrieves the result of a specified transaction +// from the execution nodes for a given block. +// +// Error returns: +// - `codes.NotFound` if no execution receipt were found. +// - Returns internal errors if event conversion or status derivation fails. +func (b *backendTransactions) GetTransactionResultFromExecutionNode( ctx context.Context, block *flow.Block, transactionID flow.Identifier,