Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Improvements for SubscribeTransactionStatuses statuses handling #6736

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
39 changes: 18 additions & 21 deletions engine/access/rpc/backend/backend_stream_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type backendSubscribeTransactions struct {
type TransactionSubscriptionMetadata struct {
*access.TransactionResult
txReferenceBlockID flow.Identifier
blockWithTx *flow.Header
blockWithTx *flow.Block
txExecuted bool
eventEncodingVersion entities.EventEncodingVersion
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *Tran
// When a block with the transaction is available, it is possible to receive a new transaction status while
// searching for the transaction result. Otherwise, it remains unchanged. So, if the old and new transaction
// statuses are the same, the current transaction status should be retrieved.
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockWithTx.Height, txInfo.txExecuted)
txInfo.Status, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.BlockHeight, txInfo.txExecuted)
}
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (b *backendSubscribeTransactions) checkBlockReady(height uint64) error {
func (b *backendSubscribeTransactions) searchForTransactionBlockInfo(
height uint64,
txInfo *TransactionSubscriptionMetadata,
) (*flow.Header, flow.Identifier, uint64, flow.Identifier, error) {
) (*flow.Block, flow.Identifier, uint64, flow.Identifier, error) {
block, err := b.txLocalDataProvider.blocks.ByHeight(height)
if err != nil {
return nil, flow.ZeroID, 0, flow.ZeroID, fmt.Errorf("error looking up block: %w", err)
Expand All @@ -241,37 +241,34 @@ func (b *backendSubscribeTransactions) searchForTransactionBlockInfo(
}

if collectionID != flow.ZeroID {
return block.Header, block.ID(), height, collectionID, nil
return block, block.ID(), height, collectionID, nil
}

return nil, flow.ZeroID, 0, flow.ZeroID, nil
}

// searchForTransactionResult searches for the transaction result of a block. It retrieves the execution result for the specified block ID.
// Expected errors:
// - codes.Internal if an internal error occurs while retrieving execution result.
// searchForTransactionResult searches for the transaction result of a block. It retrieves the transaction result from
// storage and, in case of failure, attempts to fetch the transaction result directly from the execution node.
// This is necessary to ensure data availability despite sync storage latency.
//
// No errors expected during normal operations.
func (b *backendSubscribeTransactions) searchForTransactionResult(
ctx context.Context,
txInfo *TransactionSubscriptionMetadata,
) (*access.TransactionResult, error) {
_, err := b.executionResults.ByBlockID(txInfo.BlockID)
txResult, err := b.backendTransactions.GetTransactionResultFromStorage(ctx, txInfo.blockWithTx, txInfo.TransactionID, txInfo.eventEncodingVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if local indexing/execution sync is not enabled?

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: return the successful response here

if errors.Is(err, storage.ErrNotFound) {
return nil, nil
}
return nil, fmt.Errorf("failed to get execution result for block %s: %w", txInfo.BlockID, err)
// If any error occurs with local storage - request transaction result from EN
txResult, err = b.backendTransactions.GetTransactionResultFromExecutionNode(
ctx,
txInfo.blockWithTx,
txInfo.TransactionID,
txInfo.eventEncodingVersion,
)
}

txResult, err := b.backendTransactions.GetTransactionResult(
ctx,
txInfo.TransactionID,
txInfo.BlockID,
txInfo.CollectionID,
txInfo.eventEncodingVersion,
)

if err != nil {
// if either the storage or execution node reported no results or there were not enough execution results
// if either the storage or execution node reported no results
if status.Code(err) == codes.NotFound {
// No result yet, indicate that it has not been executed
return nil, nil
Expand Down
59 changes: 39 additions & 20 deletions engine/access/rpc/backend/backend_stream_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/dgraph-io/badger/v2"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand All @@ -22,6 +25,7 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/access/subscription"
subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock"
commonrpc "github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
Expand All @@ -30,6 +34,7 @@ import (
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
protocolint "github.com/onflow/flow-go/state/protocol"
protocol "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
storagemock "github.com/onflow/flow-go/storage/mock"
"github.com/onflow/flow-go/utils/unittest"
Expand Down Expand Up @@ -98,9 +103,6 @@ func (s *TransactionStatusSuite) SetupTest() {
s.tempSnapshot = &protocol.Snapshot{}
s.db, s.dbDir = unittest.TempBadgerDB(s.T())

params := protocol.NewParams(s.T())
s.state.On("Params").Return(params)

s.blocks = storagemock.NewBlocks(s.T())
s.headers = storagemock.NewHeaders(s.T())
s.transactions = storagemock.NewTransactions(s.T())
Expand All @@ -121,11 +123,24 @@ func (s *TransactionStatusSuite) SetupTest() {
s.blockTracker = subscriptionmock.NewBlockTracker(s.T())
s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{}

s.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "not found")).Maybe()
s.connectionFactory.On("GetExecutionAPIClient", mock.Anything).Return(s.execClient, &mocks.MockCloser{}, nil).Maybe()

// generate blockCount consecutive blocks with associated seal, result and execution data
s.rootBlock = unittest.BlockFixture()
rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock))
s.resultsMap[s.rootBlock.ID()] = rootResult

params := protocol.NewParams(s.T())
params.On("FinalizedRoot").Return(s.rootBlock.Header).Maybe()
s.state.On("Params").Return(params).Maybe()

var receipts flow.ExecutionReceiptList
executionNodes := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
receipts = unittest.ReceiptsForBlockFixture(&s.rootBlock, executionNodes.NodeIDs())
s.receipts.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(receipts, nil).Maybe()
s.finalSnapshot.On("Identities", mock.Anything).Return(executionNodes, nil).Maybe()

var err error
s.lastFullBlockHeight, err = counters.NewPersistentStrictMonotonicCounter(
bstorage.NewConsumerProgress(s.db, module.ConsumeProgressLastFullBlockHeight),
Expand Down Expand Up @@ -219,6 +234,14 @@ func (s *TransactionStatusSuite) backendParams() Params {
TxResultQueryMode: IndexQueryModeLocalOnly,
EventsIndex: index.NewEventsIndex(s.indexReporter, s.events),
LastFullBlockHeight: s.lastFullBlockHeight,
ExecNodeIdentitiesProvider: commonrpc.NewExecutionNodeIdentitiesProvider(
s.log,
s.state,
s.receipts,
nil,
nil,
),
ConnFactory: s.connectionFactory,
}
}

Expand Down Expand Up @@ -246,36 +269,20 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
finalizedHeader := s.finalizedBlock.Header
return finalizedHeader.Height, nil
}, nil)
s.blocks.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) (*flow.Block, error) {
for _, block := range s.blockMap {
if block.ID() == blockID {
return block, nil
}
}

return nil, nil
}, nil)
s.sealedSnapshot.On("Head").Return(func() *flow.Header {
return s.sealedBlock.Header
}, nil)
s.state.On("Sealed").Return(s.sealedSnapshot, nil)
s.results.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(mocks.StorageMapGetter(s.resultsMap))

// Generate sent transaction with ref block of the current finalized block
transaction := unittest.TransactionFixture()
transaction.SetReferenceBlockID(s.finalizedBlock.ID())
s.transactions.On("ByID", mock.AnythingOfType("flow.Identifier")).Return(&transaction.TransactionBody, nil)

col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction})
guarantee := col.Guarantee()
light := col.Light()
txId := transaction.ID()
txResult := flow.LightTransactionResult{
TransactionID: txId,
Failed: false,
ComputationUsed: 0,
}

eventsForTx := unittest.EventsFixture(1, flow.EventAccountCreated)
eventMessages := make([]*entities.Event, 1)
for j, event := range eventsForTx {
Expand All @@ -288,11 +295,21 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
mock.AnythingOfType("flow.Identifier"),
).Return(eventsForTx, nil)

hasTransactionResultInStorage := false
s.transactionResults.On(
"ByBlockIDTransactionID",
mock.AnythingOfType("flow.Identifier"),
mock.AnythingOfType("flow.Identifier"),
).Return(&txResult, nil)
).Return(func(blockID flow.Identifier, transactionID flow.Identifier) (*flow.LightTransactionResult, error) {
if hasTransactionResultInStorage {
return &flow.LightTransactionResult{
TransactionID: txId,
Failed: false,
ComputationUsed: 0,
}, nil
}
return nil, storage.ErrNotFound
}).Twice()

// Create a special common function to read subscription messages from the channel and check converting it to transaction info
// and check results for correctness
Expand Down Expand Up @@ -328,6 +345,8 @@ func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() {
// 3. Add one more finalized block on top of the transaction block and add execution results to storage
finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock))
s.resultsMap[s.finalizedBlock.ID()] = finalizedResult
// init transaction result for storage
hasTransactionResultInStorage = true
s.addNewFinalizedBlock(s.finalizedBlock.Header, true)
checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted)

Expand Down
12 changes: 9 additions & 3 deletions engine/access/rpc/backend/backend_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,14 +651,14 @@ func (b *backendTransactions) lookupTransactionResult(
var err error
switch b.txResultQueryMode {
case IndexQueryModeExecutionNodesOnly:
txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
case IndexQueryModeLocalOnly:
txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion)
case IndexQueryModeFailover:
txResult, err = b.GetTransactionResultFromStorage(ctx, block, txID, requiredEventEncodingVersion)
if err != nil {
// If any error occurs with local storage - request transaction result from EN
txResult, err = b.getTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
txResult, err = b.GetTransactionResultFromExecutionNode(ctx, block, txID, requiredEventEncodingVersion)
}
default:
return nil, status.Errorf(codes.Internal, "unknown transaction result query mode: %v", b.txResultQueryMode)
Expand Down Expand Up @@ -742,7 +742,13 @@ func (b *backendTransactions) registerTransactionForRetry(tx *flow.TransactionBo
b.retry.RegisterTransaction(referenceBlock.Height, tx)
}

func (b *backendTransactions) getTransactionResultFromExecutionNode(
// GetTransactionResultFromExecutionNode retrieves the result of a specified transaction
// from the execution nodes for a given block.
//
// Error returns:
// - `codes.NotFound` if no execution receipt were found.
// - Returns internal errors if event conversion or status derivation fails.
func (b *backendTransactions) GetTransactionResultFromExecutionNode(
ctx context.Context,
block *flow.Block,
transactionID flow.Identifier,
Expand Down
Loading