diff --git a/access/api.go b/access/api.go index a31f8ca0841..3201796c6ed 100644 --- a/access/api.go +++ b/access/api.go @@ -194,6 +194,10 @@ type API interface { // // If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription. SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription + // SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the + // transaction itself until the block containing the transaction becomes sealed or expired. When the transaction + // status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. + SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription } // TODO: Combine this with flow.TransactionResult? diff --git a/access/handler.go b/access/handler.go index e7ed10b744e..71e48511aca 100644 --- a/access/handler.go +++ b/access/handler.go @@ -1086,9 +1086,41 @@ func (h *Handler) getSubscriptionDataFromStartBlockID(msgBlockId []byte, msgBloc return startBlockID, blockStatus, nil } -func (h *Handler) SendAndSubscribeTransactionStatuses(_ *access.SendAndSubscribeTransactionStatusesRequest, _ access.AccessAPI_SendAndSubscribeTransactionStatusesServer) error { - // not implemented - return nil +// SendAndSubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the +// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction +// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down. +func (h *Handler) SendAndSubscribeTransactionStatuses( + request *access.SendAndSubscribeTransactionStatusesRequest, + stream access.AccessAPI_SendAndSubscribeTransactionStatusesServer, +) error { + ctx := stream.Context() + + // check if the maximum number of streams is reached + if h.StreamCount.Load() >= h.MaxStreams { + return status.Errorf(codes.ResourceExhausted, "maximum number of streams reached") + } + h.StreamCount.Add(1) + defer h.StreamCount.Add(-1) + + tx, err := convert.MessageToTransaction(request.GetTransaction(), h.chain) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + err = h.api.SendTransaction(ctx, &tx) + if err != nil { + return err + } + + sub := h.api.SubscribeTransactionStatuses(ctx, &tx) + return subscription.HandleSubscription(sub, func(txSubInfo *convert.TransactionSubscribeInfo) error { + err = stream.Send(convert.TransactionSubscribeInfoToMessage(txSubInfo)) + if err != nil { + return rpc.ConvertError(err, "could not send response", codes.Internal) + } + + return nil + }) } func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) { diff --git a/access/mock/api.go b/access/mock/api.go index 972143fee9f..b27e8a03580 100644 --- a/access/mock/api.go +++ b/access/mock/api.go @@ -977,6 +977,22 @@ func (_m *API) SubscribeBlocksFromStartHeight(ctx context.Context, startHeight u return r0 } +// SubscribeTransactionStatuses provides a mock function with given fields: ctx, tx +func (_m *API) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription { + ret := _m.Called(ctx, tx) + + var r0 subscription.Subscription + if rf, ok := ret.Get(0).(func(context.Context, *flow.TransactionBody) subscription.Subscription); ok { + r0 = rf(ctx, tx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(subscription.Subscription) + } + } + + return r0 +} + type mockConstructorTestingTNewAPI interface { mock.TestingT Cleanup(func()) diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 10eeb1a90a6..209fcff8dac 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -73,6 +73,7 @@ type Backend struct { backendExecutionResults backendNetwork backendSubscribeBlocks + backendSubscribeTransactions state protocol.State chainID flow.ChainID @@ -169,6 +170,15 @@ func New(params Params) (*Backend, error) { // initialize node version info nodeInfo := getNodeVersionInfo(params.State.Params()) + transactionsLocalDataProvider := &TransactionsLocalDataProvider{ + state: params.State, + collections: params.Collections, + blocks: params.Blocks, + eventsIndex: params.EventsIndex, + txResultsIndex: params.TxResultsIndex, + systemTxID: systemTxID, + } + b := &Backend{ state: params.State, BlockTracker: params.BlockTracker, @@ -186,30 +196,23 @@ func New(params Params) (*Backend, error) { scriptExecMode: params.ScriptExecutionMode, }, backendTransactions: backendTransactions{ - TransactionsLocalDataProvider: TransactionsLocalDataProvider{ - state: params.State, - collections: params.Collections, - blocks: params.Blocks, - eventsIndex: params.EventsIndex, - txResultsIndex: params.TxResultsIndex, - systemTxID: systemTxID, - }, - log: params.Log, - staticCollectionRPC: params.CollectionRPC, - chainID: params.ChainID, - transactions: params.Transactions, - executionReceipts: params.ExecutionReceipts, - transactionValidator: configureTransactionValidator(params.State, params.ChainID), - transactionMetrics: params.AccessMetrics, - retry: retry, - connFactory: params.ConnFactory, - previousAccessNodes: params.HistoricalAccessNodes, - nodeCommunicator: params.Communicator, - txResultCache: txResCache, - txErrorMessagesCache: txErrorMessagesCache, - txResultQueryMode: params.TxResultQueryMode, - systemTx: systemTx, - systemTxID: systemTxID, + TransactionsLocalDataProvider: transactionsLocalDataProvider, + log: params.Log, + staticCollectionRPC: params.CollectionRPC, + chainID: params.ChainID, + transactions: params.Transactions, + executionReceipts: params.ExecutionReceipts, + transactionValidator: configureTransactionValidator(params.State, params.ChainID), + transactionMetrics: params.AccessMetrics, + retry: retry, + connFactory: params.ConnFactory, + previousAccessNodes: params.HistoricalAccessNodes, + nodeCommunicator: params.Communicator, + txResultCache: txResCache, + txErrorMessagesCache: txErrorMessagesCache, + txResultQueryMode: params.TxResultQueryMode, + systemTx: systemTx, + systemTxID: systemTxID, }, backendEvents: backendEvents{ log: params.Log, @@ -261,6 +264,16 @@ func New(params Params) (*Backend, error) { sendBufferSize: params.SubscriptionParams.SendBufferSize, blockTracker: params.BlockTracker, }, + backendSubscribeTransactions: backendSubscribeTransactions{ + txLocalDataProvider: transactionsLocalDataProvider, + log: params.Log, + executionResults: params.ExecutionResults, + broadcaster: params.SubscriptionParams.Broadcaster, + sendTimeout: params.SubscriptionParams.SendTimeout, + responseLimit: params.SubscriptionParams.ResponseLimit, + sendBufferSize: params.SubscriptionParams.SendBufferSize, + blockTracker: params.BlockTracker, + }, collections: params.Collections, executionReceipts: params.ExecutionReceipts, connFactory: params.ConnFactory, diff --git a/engine/access/rpc/backend/backend_stream_transactions.go b/engine/access/rpc/backend/backend_stream_transactions.go new file mode 100644 index 00000000000..c18ce5905f8 --- /dev/null +++ b/engine/access/rpc/backend/backend_stream_transactions.go @@ -0,0 +1,195 @@ +package backend + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/state" + + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/module/counters" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/flow-go/engine/common/rpc" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/storage" +) + +// backendSubscribeTransactions handles transaction subscriptions. +type backendSubscribeTransactions struct { + txLocalDataProvider *TransactionsLocalDataProvider + executionResults storage.ExecutionResults + log zerolog.Logger + broadcaster *engine.Broadcaster + sendTimeout time.Duration + responseLimit float64 + sendBufferSize int + + blockTracker subscription.BlockTracker +} + +// TransactionSubscriptionMetadata holds data representing the status state for each transaction subscription. +type TransactionSubscriptionMetadata struct { + txID flow.Identifier + txReferenceBlockID flow.Identifier + messageIndex counters.StrictMonotonousCounter + blockWithTx *flow.Header + blockID flow.Identifier + txExecuted bool + lastTxStatus flow.TransactionStatus +} + +// SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID. +// If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription. +func (b *backendSubscribeTransactions) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription { + nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(tx.ReferenceBlockID) + if err != nil { + return subscription.NewFailedSubscription(err, "could not get start height") + } + + txInfo := TransactionSubscriptionMetadata{ + txID: tx.ID(), + txReferenceBlockID: tx.ReferenceBlockID, + messageIndex: counters.NewMonotonousCounter(0), + blockWithTx: nil, + blockID: flow.ZeroID, + lastTxStatus: flow.TransactionStatusUnknown, + } + + sub := subscription.NewHeightBasedSubscription( + b.sendBufferSize, + nextHeight, + b.getTransactionStatusResponse(&txInfo), + ) + + go subscription.NewStreamer(b.log, b.broadcaster, b.sendTimeout, b.responseLimit, sub).Stream(ctx) + + return sub +} + +// getTransactionStatusResponse returns a callback function that produces transaction status +// subscription responses based on new blocks. +func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *TransactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) { + return func(ctx context.Context, height uint64) (interface{}, error) { + highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized) + if err != nil { + return nil, fmt.Errorf("could not get highest height for block %d: %w", height, err) + } + + // Fail early if no block finalized notification has been received for the given height. + // Note: It's possible that the block is locally finalized before the notification is + // received. This ensures a consistent view is available to all streams. + if height > highestHeight { + return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady) + } + + if txInfo.lastTxStatus == flow.TransactionStatusSealed || txInfo.lastTxStatus == flow.TransactionStatusExpired { + return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.lastTxStatus.String(), subscription.ErrEndOfData) + } + + if txInfo.blockWithTx == nil { + // Check if block contains transaction. + txInfo.blockWithTx, txInfo.blockID, err = b.searchForTransactionBlock(height, txInfo) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return nil, fmt.Errorf("could not find block %d in storage: %w", height, subscription.ErrBlockNotReady) + } + + if !errors.Is(err, ErrTransactionNotInBlock) { + return nil, status.Errorf(codes.Internal, "could not get block %d: %v", height, err) + } + } + } + + // Find the transaction status. + var txStatus flow.TransactionStatus + if txInfo.blockWithTx == nil { + txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID) + } else { + if !txInfo.txExecuted { + // Check if transaction was executed. + txInfo.txExecuted, err = b.searchForExecutionResult(txInfo.blockID) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.blockID, err) + } + } + + txStatus, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockID, txInfo.blockWithTx.Height, txInfo.txExecuted) + } + if err != nil { + if !errors.Is(err, state.ErrUnknownSnapshotReference) { + irrecoverable.Throw(ctx, err) + } + return nil, rpc.ConvertStorageError(err) + } + + // The same transaction status should not be reported, so return here with no response + if txInfo.lastTxStatus == txStatus { + return nil, nil + } + txInfo.lastTxStatus = txStatus + + messageIndex := txInfo.messageIndex.Value() + if ok := txInfo.messageIndex.Set(messageIndex + 1); !ok { + return nil, status.Errorf(codes.Internal, "the message index has already been incremented to %d", txInfo.messageIndex.Value()) + } + + return &convert.TransactionSubscribeInfo{ + ID: txInfo.txID, + Status: txInfo.lastTxStatus, + MessageIndex: messageIndex, + }, nil + } +} + +// searchForTransactionBlock searches for the block containing the specified transaction. +// It retrieves the block at the given height and checks if the transaction is included in that block. +// Expected errors: +// - subscription.ErrBlockNotReady when unable to retrieve the block or collection ID +// - codes.Internal when other errors occur during block or collection lookup +func (b *backendSubscribeTransactions) searchForTransactionBlock( + height uint64, + txInfo *TransactionSubscriptionMetadata, +) (*flow.Header, flow.Identifier, error) { + block, err := b.txLocalDataProvider.blocks.ByHeight(height) + if err != nil { + return nil, flow.ZeroID, fmt.Errorf("error looking up block: %w", err) + } + + collectionID, err := b.txLocalDataProvider.LookupCollectionIDInBlock(block, txInfo.txID) + if err != nil { + return nil, flow.ZeroID, fmt.Errorf("error looking up transaction in block: %w", err) + } + + if collectionID != flow.ZeroID { + return block.Header, block.ID(), nil + } + + return nil, flow.ZeroID, nil +} + +// searchForExecutionResult searches for the execution result of a block. It retrieves the execution result for the specified block ID. +// Expected errors: +// - codes.Internal if an internal error occurs while retrieving execution result. +func (b *backendSubscribeTransactions) searchForExecutionResult( + blockID flow.Identifier, +) (bool, error) { + _, err := b.executionResults.ByBlockID(blockID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + return false, nil + } + return false, fmt.Errorf("failed to get execution result for block %s: %w", blockID, err) + } + + return true, nil +} diff --git a/engine/access/rpc/backend/backend_stream_transactions_test.go b/engine/access/rpc/backend/backend_stream_transactions_test.go new file mode 100644 index 00000000000..54097bc0428 --- /dev/null +++ b/engine/access/rpc/backend/backend_stream_transactions_test.go @@ -0,0 +1,366 @@ +package backend + +import ( + "context" + "fmt" + "testing" + "time" + + protocolint "github.com/onflow/flow-go/state/protocol" + + "github.com/onflow/flow-go/engine/access/index" + + "github.com/onflow/flow-go/utils/unittest/mocks" + + syncmock "github.com/onflow/flow-go/module/state_synchronization/mock" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine" + access "github.com/onflow/flow-go/engine/access/mock" + backendmock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" + connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock" + "github.com/onflow/flow-go/engine/access/subscription" + subscriptionmock "github.com/onflow/flow-go/engine/access/subscription/mock" + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/counters" + "github.com/onflow/flow-go/module/metrics" + protocol "github.com/onflow/flow-go/state/protocol/mock" + storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +type TransactionStatusSuite struct { + suite.Suite + + state *protocol.State + sealedSnapshot *protocol.Snapshot + finalSnapshot *protocol.Snapshot + tempSnapshot *protocol.Snapshot + log zerolog.Logger + + blocks *storagemock.Blocks + headers *storagemock.Headers + collections *storagemock.Collections + transactions *storagemock.Transactions + receipts *storagemock.ExecutionReceipts + results *storagemock.ExecutionResults + transactionResults *storagemock.LightTransactionResults + events *storagemock.Events + seals *storagemock.Seals + + colClient *access.AccessAPIClient + execClient *access.ExecutionAPIClient + historicalAccessClient *access.AccessAPIClient + archiveClient *access.AccessAPIClient + + connectionFactory *connectionmock.ConnectionFactory + communicator *backendmock.Communicator + blockTracker *subscriptionmock.BlockTracker + reporter *syncmock.IndexReporter + + chainID flow.ChainID + + broadcaster *engine.Broadcaster + rootBlock flow.Block + sealedBlock *flow.Block + finalizedBlock *flow.Block + + blockMap map[uint64]*flow.Block + resultsMap map[flow.Identifier]*flow.ExecutionResult + + backend *Backend +} + +func TestTransactionStatusSuite(t *testing.T) { + suite.Run(t, new(TransactionStatusSuite)) +} + +// SetupTest initializes the test suite with required dependencies. +func (s *TransactionStatusSuite) SetupTest() { + s.log = zerolog.New(zerolog.NewConsoleWriter()) + s.state = protocol.NewState(s.T()) + s.sealedSnapshot = protocol.NewSnapshot(s.T()) + s.finalSnapshot = protocol.NewSnapshot(s.T()) + s.tempSnapshot = &protocol.Snapshot{} + + header := unittest.BlockHeaderFixture() + + params := protocol.NewParams(s.T()) + params.On("SporkID").Return(unittest.IdentifierFixture(), nil) + params.On("ProtocolVersion").Return(uint(unittest.Uint64InRange(10, 30)), nil) + params.On("SporkRootBlockHeight").Return(header.Height, nil) + params.On("SealedRoot").Return(header, nil) + s.state.On("Params").Return(params) + + s.blocks = storagemock.NewBlocks(s.T()) + s.headers = storagemock.NewHeaders(s.T()) + s.transactions = storagemock.NewTransactions(s.T()) + s.collections = storagemock.NewCollections(s.T()) + s.receipts = storagemock.NewExecutionReceipts(s.T()) + s.results = storagemock.NewExecutionResults(s.T()) + s.seals = storagemock.NewSeals(s.T()) + s.colClient = access.NewAccessAPIClient(s.T()) + s.archiveClient = access.NewAccessAPIClient(s.T()) + s.execClient = access.NewExecutionAPIClient(s.T()) + s.transactionResults = storagemock.NewLightTransactionResults(s.T()) + s.events = storagemock.NewEvents(s.T()) + s.chainID = flow.Testnet + s.historicalAccessClient = access.NewAccessAPIClient(s.T()) + s.connectionFactory = connectionmock.NewConnectionFactory(s.T()) + s.communicator = backendmock.NewCommunicator(s.T()) + s.broadcaster = engine.NewBroadcaster() + s.blockTracker = subscriptionmock.NewBlockTracker(s.T()) + s.resultsMap = map[flow.Identifier]*flow.ExecutionResult{} + + // generate blockCount consecutive blocks with associated seal, result and execution data + s.rootBlock = unittest.BlockFixture() + rootResult := unittest.ExecutionResultFixture(unittest.WithBlock(&s.rootBlock)) + s.resultsMap[s.rootBlock.ID()] = rootResult + + s.sealedBlock = &s.rootBlock + s.finalizedBlock = unittest.BlockWithParentFixture(s.sealedBlock.Header) + finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) + s.resultsMap[s.finalizedBlock.ID()] = finalizedResult + s.blockMap = map[uint64]*flow.Block{ + s.sealedBlock.Header.Height: s.sealedBlock, + s.finalizedBlock.Header.Height: s.finalizedBlock, + } + + s.reporter = syncmock.NewIndexReporter(s.T()) + + s.blocks.On("ByHeight", mock.AnythingOfType("uint64")).Return(mocks.StorageMapGetter(s.blockMap)) + + s.state.On("Final").Return(s.finalSnapshot, nil) + s.state.On("AtBlockID", mock.AnythingOfType("flow.Identifier")).Return(func(blockID flow.Identifier) protocolint.Snapshot { + s.tempSnapshot.On("Head").Unset() + s.tempSnapshot.On("Head").Return(func() *flow.Header { + for _, block := range s.blockMap { + if block.ID() == blockID { + return block.Header + } + } + + return nil + }, nil) + + return s.tempSnapshot + }, nil) + + s.finalSnapshot.On("Head").Return(func() *flow.Header { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader + }, nil) + + s.blockTracker.On("GetStartHeightFromBlockID", mock.Anything).Return(func(_ flow.Identifier) (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil) + s.blockTracker.On("GetHighestHeight", flow.BlockStatusFinalized).Return(func(_ flow.BlockStatus) (uint64, error) { + finalizedHeader := s.finalizedBlock.Header + return finalizedHeader.Height, nil + }, nil) + + backendParams := s.backendParams() + err := backendParams.TxResultsIndex.Initialize(s.reporter) + require.NoError(s.T(), err) + + s.backend, err = New(backendParams) + require.NoError(s.T(), err) + +} + +// backendParams returns the Params configuration for the backend. +func (s *TransactionStatusSuite) backendParams() Params { + return Params{ + State: s.state, + Blocks: s.blocks, + Headers: s.headers, + Collections: s.collections, + Transactions: s.transactions, + ExecutionReceipts: s.receipts, + ExecutionResults: s.results, + ChainID: s.chainID, + CollectionRPC: s.colClient, + MaxHeightRange: DefaultMaxHeightRange, + SnapshotHistoryLimit: DefaultSnapshotHistoryLimit, + Communicator: NewNodeCommunicator(false), + AccessMetrics: metrics.NewNoopCollector(), + Log: s.log, + TxErrorMessagesCacheSize: 1000, + BlockTracker: s.blockTracker, + SubscriptionParams: SubscriptionParams{ + SendTimeout: subscription.DefaultSendTimeout, + SendBufferSize: subscription.DefaultSendBufferSize, + ResponseLimit: subscription.DefaultResponseLimit, + Broadcaster: s.broadcaster, + }, + TxResultsIndex: index.NewTransactionResultsIndex(s.transactionResults), + EventsIndex: index.NewEventsIndex(s.events), + } +} + +func (s *TransactionStatusSuite) addNewFinalizedBlock(parent *flow.Header, notify bool, options ...func(*flow.Block)) { + s.finalizedBlock = unittest.BlockWithParentFixture(parent) + for _, option := range options { + option(s.finalizedBlock) + } + + s.blockMap[s.finalizedBlock.Header.Height] = s.finalizedBlock + + if notify { + s.broadcaster.Publish() + } +} + +// TestSubscribeTransactionStatusHappyCase tests the functionality of the SubscribeTransactionStatuses method in the Backend. +// It covers the emulation of transaction stages from pending to sealed, and receiving status updates. +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusHappyCase() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.sealedSnapshot.On("Head").Return(func() *flow.Header { + return s.sealedBlock.Header + }, nil) + s.state.On("Sealed").Return(s.sealedSnapshot, nil) + s.results.On("ByBlockID", mock.AnythingOfType("flow.Identifier")).Return(mocks.StorageMapGetter(s.resultsMap)) + + // Generate sent transaction with ref block of the current finalized block + transaction := unittest.TransactionFixture() + transaction.SetReferenceBlockID(s.finalizedBlock.ID()) + col := flow.CollectionFromTransactions([]*flow.Transaction{&transaction}) + guarantee := col.Guarantee() + light := col.Light() + txId := transaction.ID() + + expectedMsgIndexCounter := counters.NewMonotonousCounter(0) + + // Create a special common function to read subscription messages from the channel and check converting it to transaction info + // and check results for correctness + checkNewSubscriptionMessage := func(sub subscription.Subscription, expectedTxStatus flow.TransactionStatus) { + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-sub.Channel() + require.True(s.T(), ok, + "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", + txId, s.finalizedBlock.ID(), sub.Err()) + + txInfo, ok := v.(*convert.TransactionSubscribeInfo) + require.True(s.T(), ok, "unexpected response type: %T", v) + + assert.Equal(s.T(), txId, txInfo.ID) + assert.Equal(s.T(), expectedTxStatus, txInfo.Status) + + expectedMsgIndex := expectedMsgIndexCounter.Value() + assert.Equal(s.T(), expectedMsgIndex, txInfo.MessageIndex) + wasSet := expectedMsgIndexCounter.Set(expectedMsgIndex + 1) + require.True(s.T(), wasSet) + }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) + } + + // 1. Subscribe to transaction status and receive the first message with pending status + sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody) + checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) + + // 2. Make transaction reference block sealed, and add a new finalized block that includes the transaction + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true, func(block *flow.Block) { + block.SetPayload(unittest.PayloadFixture(unittest.WithGuarantees(&guarantee))) + s.collections.On("LightByID", mock.AnythingOfType("flow.Identifier")).Return(&light, nil).Maybe() + }) + checkNewSubscriptionMessage(sub, flow.TransactionStatusFinalized) + + // 3. Add one more finalized block on top of the transaction block and add execution results to storage + finalizedResult := unittest.ExecutionResultFixture(unittest.WithBlock(s.finalizedBlock)) + s.resultsMap[s.finalizedBlock.ID()] = finalizedResult + + s.addNewFinalizedBlock(s.finalizedBlock.Header, true) + checkNewSubscriptionMessage(sub, flow.TransactionStatusExecuted) + + // 4. Make the transaction block sealed, and add a new finalized block + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + checkNewSubscriptionMessage(sub, flow.TransactionStatusSealed) + + //// 5. Stop subscription + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + + // Ensure subscription shuts down gracefully + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-sub.Channel() + assert.Nil(s.T(), v) + assert.False(s.T(), ok) + assert.NoError(s.T(), sub.Err()) + }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") +} + +// TestSubscribeTransactionStatusExpired tests the functionality of the SubscribeTransactionStatuses method in the Backend +// when transaction become expired +func (s *TransactionStatusSuite) TestSubscribeTransactionStatusExpired() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.blocks.On("GetLastFullBlockHeight").Return(func() (uint64, error) { + return s.sealedBlock.Header.Height, nil + }, nil) + + // Generate sent transaction with ref block of the current finalized block + transaction := unittest.TransactionFixture() + transaction.SetReferenceBlockID(s.finalizedBlock.ID()) + txId := transaction.ID() + + expectedMsgIndexCounter := counters.NewMonotonousCounter(0) + + // Create a special common function to read subscription messages from the channel and check converting it to transaction info + // and check results for correctness + checkNewSubscriptionMessage := func(sub subscription.Subscription, expectedTxStatus flow.TransactionStatus) { + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-sub.Channel() + require.True(s.T(), ok, + "channel closed while waiting for transaction info:\n\t- txID %x\n\t- blockID: %x \n\t- err: %v", + txId, s.finalizedBlock.ID(), sub.Err()) + + txInfo, ok := v.(*convert.TransactionSubscribeInfo) + require.True(s.T(), ok, "unexpected response type: %T", v) + + assert.Equal(s.T(), txId, txInfo.ID) + assert.Equal(s.T(), expectedTxStatus, txInfo.Status) + + expectedMsgIndex := expectedMsgIndexCounter.Value() + assert.Equal(s.T(), expectedMsgIndex, txInfo.MessageIndex) + wasSet := expectedMsgIndexCounter.Set(expectedMsgIndex + 1) + require.True(s.T(), wasSet) + }, time.Second, fmt.Sprintf("timed out waiting for transaction info:\n\t- txID: %x\n\t- blockID: %x", txId, s.finalizedBlock.ID())) + } + + // Subscribe to transaction status and receive the first message with pending status + sub := s.backend.SubscribeTransactionStatuses(ctx, &transaction.TransactionBody) + checkNewSubscriptionMessage(sub, flow.TransactionStatusPending) + + // Generate 600 blocks without transaction included and check, that transaction still pending + startHeight := s.finalizedBlock.Header.Height + 1 + lastHeight := startHeight + flow.DefaultTransactionExpiry + + for i := startHeight; i <= lastHeight; i++ { + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, false) + } + + // Generate final blocks and check transaction expired + s.sealedBlock = s.finalizedBlock + s.addNewFinalizedBlock(s.sealedBlock.Header, true) + checkNewSubscriptionMessage(sub, flow.TransactionStatusExpired) + + // Ensure subscription shuts down gracefully + unittest.RequireReturnsBefore(s.T(), func() { + v, ok := <-sub.Channel() + assert.Nil(s.T(), v) + assert.False(s.T(), ok) + assert.NoError(s.T(), sub.Err()) + }, 100*time.Millisecond, "timed out waiting for subscription to shutdown") +} diff --git a/engine/access/rpc/backend/backend_transactions.go b/engine/access/rpc/backend/backend_transactions.go index 2e25888734f..831a5efa404 100644 --- a/engine/access/rpc/backend/backend_transactions.go +++ b/engine/access/rpc/backend/backend_transactions.go @@ -26,7 +26,7 @@ import ( ) type backendTransactions struct { - TransactionsLocalDataProvider + *TransactionsLocalDataProvider staticCollectionRPC accessproto.AccessAPIClient // rpc client tied to a fixed collection node transactions storage.Transactions executionReceipts storage.ExecutionReceipts @@ -285,7 +285,7 @@ func (b *backendTransactions) GetTransactionResult( } // an additional check to ensure the correctness of the collection ID. - expectedCollectionID, err := b.lookupCollectionIDInBlock(block, txID) + expectedCollectionID, err := b.LookupCollectionIDInBlock(block, txID) if err != nil { // if the collection has not been indexed yet, the lookup will return a not found error. // if the request included a blockID or collectionID in its the search criteria, not found @@ -311,9 +311,9 @@ func (b *backendTransactions) GetTransactionResult( var txStatus flow.TransactionStatus // Derive the status of the transaction. if block == nil { - txStatus, err = b.deriveUnknownTransactionStatus(tx.ReferenceBlockID) + txStatus, err = b.DeriveUnknownTransactionStatus(tx.ReferenceBlockID) } else { - txStatus, err = b.deriveTransactionStatus(blockID, blockHeight, false) + txStatus, err = b.DeriveTransactionStatus(blockID, blockHeight, false) } if err != nil { @@ -439,7 +439,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode( txResult := resp.TransactionResults[i] // tx body is irrelevant to status if it's in an executed block - txStatus, err := b.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -488,7 +488,7 @@ func (b *backendTransactions) getTransactionResultsByBlockIDFromExecutionNode( } systemTxResult := resp.TransactionResults[len(resp.TransactionResults)-1] - systemTxStatus, err := b.deriveTransactionStatus(blockID, block.Header.Height, true) + systemTxStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -573,7 +573,7 @@ func (b *backendTransactions) getTransactionResultByIndexFromExecutionNode( } // tx body is irrelevant to status if it's in an executed block - txStatus, err := b.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -626,7 +626,7 @@ func (b *backendTransactions) GetSystemTransactionResult(ctx context.Context, bl } systemTxResult := resp.TransactionResults[len(resp.TransactionResults)-1] - systemTxStatus, err := b.deriveTransactionStatus(blockID, block.Header.Height, true) + systemTxStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { return nil, rpc.ConvertStorageError(err) } @@ -793,7 +793,7 @@ func (b *backendTransactions) getTransactionResultFromExecutionNode( } // tx body is irrelevant to status if it's in an executed block - txStatus, err := b.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := b.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) diff --git a/engine/access/rpc/backend/retry.go b/engine/access/rpc/backend/retry.go index 01cf00ea423..27697319a14 100644 --- a/engine/access/rpc/backend/retry.go +++ b/engine/access/rpc/backend/retry.go @@ -123,9 +123,9 @@ func (r *Retry) retryTxsAtHeight(heightToRetry uint64) error { // find the transaction status var status flow.TransactionStatus if block == nil { - status, err = r.backend.deriveUnknownTransactionStatus(tx.ReferenceBlockID) + status, err = r.backend.DeriveUnknownTransactionStatus(tx.ReferenceBlockID) } else { - status, err = r.backend.deriveTransactionStatus(block.ID(), block.Header.Height, false) + status, err = r.backend.DeriveTransactionStatus(block.ID(), block.Header.Height, false) } if err != nil { diff --git a/engine/access/rpc/backend/transactions_local_data_provider.go b/engine/access/rpc/backend/transactions_local_data_provider.go index bca27cfa400..652b5da87e3 100644 --- a/engine/access/rpc/backend/transactions_local_data_provider.go +++ b/engine/access/rpc/backend/transactions_local_data_provider.go @@ -21,6 +21,9 @@ import ( "github.com/onflow/flow-go/storage" ) +// ErrTransactionNotInBlock represents an error indicating that the transaction is not found in the block. +var ErrTransactionNotInBlock = errors.New("transaction not in block") + // TransactionErrorMessage declares the lookup transaction error methods by different input parameters. type TransactionErrorMessage interface { // LookupErrorMessageByTransactionID is a function type for getting transaction error message by block ID and transaction ID. @@ -91,7 +94,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultFromStorage( txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN } - txStatus, err := t.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -179,7 +182,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultsByBlockIDFromStorag txStatusCode = 1 } - txStatus, err := t.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -260,7 +263,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultByIndexFromStorage( txStatusCode = 1 // statusCode of 1 indicates an error and 0 indicates no error, the same as on EN } - txStatus, err := t.deriveTransactionStatus(blockID, block.Header.Height, true) + txStatus, err := t.DeriveTransactionStatus(blockID, block.Header.Height, true) if err != nil { if !errors.Is(err, state.ErrUnknownSnapshotReference) { irrecoverable.Throw(ctx, err) @@ -285,7 +288,7 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultByIndexFromStorage( } } - collectionID, err := t.lookupCollectionIDInBlock(block, txResult.TransactionID) + collectionID, err := t.LookupCollectionIDInBlock(block, txResult.TransactionID) if err != nil { return nil, err } @@ -302,9 +305,9 @@ func (t *TransactionsLocalDataProvider) GetTransactionResultByIndexFromStorage( }, nil } -// deriveUnknownTransactionStatus is used to determine the status of transaction +// DeriveUnknownTransactionStatus is used to determine the status of transaction // that are not in a block yet based on the provided reference block ID. -func (t *TransactionsLocalDataProvider) deriveUnknownTransactionStatus(refBlockID flow.Identifier) (flow.TransactionStatus, error) { +func (t *TransactionsLocalDataProvider) DeriveUnknownTransactionStatus(refBlockID flow.Identifier) (flow.TransactionStatus, error) { referenceBlock, err := t.state.AtBlockID(refBlockID).Head() if err != nil { return flow.TransactionStatusUnknown, err @@ -347,9 +350,9 @@ func (t *TransactionsLocalDataProvider) deriveUnknownTransactionStatus(refBlockI return flow.TransactionStatusPending, nil } -// deriveTransactionStatus is used to determine the status of a transaction based on the provided block ID, block height, and execution status. +// DeriveTransactionStatus is used to determine the status of a transaction based on the provided block ID, block height, and execution status. // No errors expected during normal operations. -func (t *TransactionsLocalDataProvider) deriveTransactionStatus(blockID flow.Identifier, blockHeight uint64, executed bool) (flow.TransactionStatus, error) { +func (t *TransactionsLocalDataProvider) DeriveTransactionStatus(blockID flow.Identifier, blockHeight uint64, executed bool) (flow.TransactionStatus, error) { if !executed { // If we've gotten here, but the block has not yet been executed, report it as only been finalized return flow.TransactionStatusFinalized, nil @@ -381,16 +384,16 @@ func isExpired(refHeight, compareToHeight uint64) bool { return compareToHeight-refHeight > flow.DefaultTransactionExpiry } -// lookupCollectionIDInBlock returns the collection ID based on the transaction ID. The lookup is performed in block +// LookupCollectionIDInBlock returns the collection ID based on the transaction ID. The lookup is performed in block // collections. -func (t *TransactionsLocalDataProvider) lookupCollectionIDInBlock( +func (t *TransactionsLocalDataProvider) LookupCollectionIDInBlock( block *flow.Block, txID flow.Identifier, ) (flow.Identifier, error) { for _, guarantee := range block.Payload.Guarantees { collection, err := t.collections.LightByID(guarantee.ID()) if err != nil { - return flow.ZeroID, err + return flow.ZeroID, fmt.Errorf("failed to get collection %s in indexed block: %w", guarantee.ID(), err) } for _, collectionTxID := range collection.Transactions { @@ -399,7 +402,7 @@ func (t *TransactionsLocalDataProvider) lookupCollectionIDInBlock( } } } - return flow.ZeroID, status.Error(codes.NotFound, "transaction not found in block") + return flow.ZeroID, ErrTransactionNotInBlock } // buildTxIDToCollectionIDMapping returns a map of transaction ID to collection ID based on the provided block. diff --git a/engine/access/subscription/streamer.go b/engine/access/subscription/streamer.go index 84fca744fc4..11531387200 100644 --- a/engine/access/subscription/streamer.go +++ b/engine/access/subscription/streamer.go @@ -14,6 +14,12 @@ import ( "github.com/onflow/flow-go/storage" ) +// ErrBlockNotReady represents an error indicating that a block is not yet available or ready. +var ErrBlockNotReady = errors.New("block not ready") + +// ErrEndOfData represents an error indicating that no more data available for streaming. +var ErrEndOfData = errors.New("end of data") + // Streamer represents a streaming subscription that delivers data to clients. type Streamer struct { log zerolog.Logger @@ -71,6 +77,12 @@ func (s *Streamer) Stream(ctx context.Context) { err := s.sendAllAvailable(ctx) if err != nil { + //TODO: The functionality to graceful shutdown on demand should be improved with https://github.com/onflow/flow-go/issues/5561 + if errors.Is(err, ErrEndOfData) { + s.sub.Close() + return + } + s.log.Err(err).Msg("error sending response") s.sub.Fail(err) return @@ -88,8 +100,15 @@ func (s *Streamer) sendAllAvailable(ctx context.Context) error { response, err := s.sub.Next(ctx) + if response == nil && err == nil { + continue + } + if err != nil { - if errors.Is(err, storage.ErrNotFound) || errors.Is(err, storage.ErrHeightNotIndexed) || execution_data.IsBlobNotFoundError(err) { + if errors.Is(err, storage.ErrNotFound) || + errors.Is(err, storage.ErrHeightNotIndexed) || + execution_data.IsBlobNotFoundError(err) || + errors.Is(err, ErrBlockNotReady) { // no more available return nil } diff --git a/engine/common/rpc/convert/transactions.go b/engine/common/rpc/convert/transactions.go index 221f41b0936..6b92f419fdd 100644 --- a/engine/common/rpc/convert/transactions.go +++ b/engine/common/rpc/convert/transactions.go @@ -1,11 +1,29 @@ package convert import ( + "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow/protobuf/go/flow/entities" "github.com/onflow/flow-go/model/flow" ) +// TransactionSubscribeInfo represents information about a subscribed transaction. +// It contains the ID of the transaction, its status, and the index of the associated message. +type TransactionSubscribeInfo struct { + ID flow.Identifier + Status flow.TransactionStatus + MessageIndex uint64 +} + +// TransactionSubscribeInfoToMessage converts a TransactionSubscribeInfo struct to a protobuf message +func TransactionSubscribeInfoToMessage(data *TransactionSubscribeInfo) *access.SendAndSubscribeTransactionStatusesResponse { + return &access.SendAndSubscribeTransactionStatusesResponse{ + Id: data.ID[:], + Status: entities.TransactionStatus(data.Status), + MessageIndex: data.MessageIndex, + } +} + // TransactionToMessage converts a flow.TransactionBody to a protobuf message func TransactionToMessage(tb flow.TransactionBody) *entities.Transaction { proposalKeyMessage := &entities.Transaction_ProposalKey{ diff --git a/insecure/go.mod b/insecure/go.mod index b7542dcf446..820b6af4403 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -208,7 +208,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 // indirect github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 // indirect github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13 // indirect - github.com/onflow/flow-go-sdk v0.44.0 // indirect + github.com/onflow/flow-go-sdk v0.46.0 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.1.0 // indirect github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240305102946-3efec6679252 // indirect github.com/onflow/sdks v0.5.0 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index d6c0d7ce470..6be929b3961 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1327,8 +1327,8 @@ github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1/go.mod h1:c09d6sN github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13 h1:B4ll7e3j+MqTJv2122Enq3RtDNzmIGRu9xjV7fo7un0= github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13/go.mod h1:kTMFIySzEJJeupk+7EmXs0EJ6CBWY/MV9fv9iYQk+RU= github.com/onflow/flow-go-sdk v0.24.0/go.mod h1:IoptMLPyFXWvyd9yYA6/4EmSeeozl6nJoIv4FaEMg74= -github.com/onflow/flow-go-sdk v0.44.0 h1:gVRLcZ6LUNs/5mzHDx0mp4mEnBAWD62O51P4/nYm4rE= -github.com/onflow/flow-go-sdk v0.44.0/go.mod h1:mm1Fi2hiMrexNMwRzTrAN2zwTvlP8iQ5CF2JSAgJR8U= +github.com/onflow/flow-go-sdk v0.46.0 h1:mrIQziCDe6Oi5HH/aPFvYluh1XUwO6lYpoXLWrBZc2s= +github.com/onflow/flow-go-sdk v0.46.0/go.mod h1:azVWF0yHI8wT1erF0vuYGqQZybl6Frbc+0Zu3rIPeHc= github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8OSD97bJc60cLuQ= github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= diff --git a/integration/go.mod b/integration/go.mod index 439d4aaceed..d5ef290726b 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -25,8 +25,8 @@ require ( github.com/onflow/crypto v0.25.0 github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 - github.com/onflow/flow-emulator v0.58.1-0.20240313174529-1d05170401b6 - github.com/onflow/flow-go v0.33.2-0.20240306234901-64ab8d27ea30 + github.com/onflow/flow-emulator v0.61.2-0.20240321141132-30ce90652b49 + github.com/onflow/flow-go v0.33.2-0.20240313182108-0bb799709506 github.com/onflow/flow-go-sdk v0.46.0 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 github.com/onflow/flow/protobuf/go/flow v0.3.7-0.20240305102946-3efec6679252 diff --git a/integration/go.sum b/integration/go.sum index 1433f1af10b..5bb55d735bb 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1414,8 +1414,8 @@ github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1 h1:xF5wHug6H8vKfz github.com/onflow/flow-core-contracts/lib/go/contracts v0.15.1/go.mod h1:WHp24VkUQfcfZi0XjI1uRVRt5alM5SHVkwOil1U2Tpc= github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1 h1:EjWjbyVEA+bMxXbM44dE6MsYeqOu5a9q/EwSWa4ma2M= github.com/onflow/flow-core-contracts/lib/go/templates v0.15.1/go.mod h1:c09d6sNyF/j5/pAynK7sNPb1XKqJqk1rxZPEqEL+dUo= -github.com/onflow/flow-emulator v0.58.1-0.20240313174529-1d05170401b6 h1:07q2fysEezb3QGpRATEQZPG8Gz6exxlIe7l/yLagrn4= -github.com/onflow/flow-emulator v0.58.1-0.20240313174529-1d05170401b6/go.mod h1:kQcINHh4JJs3LG5OsMJ9gSfTkEMZnH41WwnWpz6NG4E= +github.com/onflow/flow-emulator v0.61.2-0.20240321141132-30ce90652b49 h1:ng8b/dO4GoALEYI7Tdxwi/Fw4UqdytisX44hDpq4BVA= +github.com/onflow/flow-emulator v0.61.2-0.20240321141132-30ce90652b49/go.mod h1:/BER6dzeorFJP8/y6J1680kIxR+u/EFr8ZxJO7tBkwk= github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13 h1:B4ll7e3j+MqTJv2122Enq3RtDNzmIGRu9xjV7fo7un0= github.com/onflow/flow-ft/lib/go/contracts v0.7.1-0.20230711213910-baad011d2b13/go.mod h1:kTMFIySzEJJeupk+7EmXs0EJ6CBWY/MV9fv9iYQk+RU= github.com/onflow/flow-go-sdk v0.24.0/go.mod h1:IoptMLPyFXWvyd9yYA6/4EmSeeozl6nJoIv4FaEMg74= diff --git a/integration/tests/access/cohort1/access_api_test.go b/integration/tests/access/cohort1/access_api_test.go index e423ad3678c..1cbf5b191c4 100644 --- a/integration/tests/access/cohort1/access_api_test.go +++ b/integration/tests/access/cohort1/access_api_test.go @@ -2,9 +2,18 @@ package cohort1 import ( "context" + "io" "testing" "time" + "github.com/onflow/flow-go-sdk/templates" + "github.com/onflow/flow-go-sdk/test" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "github.com/onflow/flow-go/integration/tests/mvp" "github.com/rs/zerolog" @@ -191,6 +200,116 @@ func (s *AccessAPISuite) TestMVPScriptExecutionLocalStorage() { mvp.RunMVPTest(s.T(), s.ctx, s.net, s.accessNode2) } +// TestSendAndSubscribeTransactionStatuses tests the functionality of sending and subscribing to transaction statuses. +// +// This test verifies that a transaction can be created, signed, sent to the access API, and then the status of the transaction +// can be subscribed to. It performs the following steps: +// 1. Establishes a connection to the access API. +// 2. Creates a new account key and prepares a transaction for account creation. +// 3. Signs the transaction. +// 4. Sends and subscribes to the transaction status using the access API. +// 5. Verifies the received transaction statuses, ensuring they are received in order and the final status is "SEALED". +func (s *AccessAPISuite) TestSendAndSubscribeTransactionStatuses() { + accessNodeContainer := s.net.ContainerByName(testnet.PrimaryAN) + + // Establish a gRPC connection to the access API + conn, err := grpc.Dial(accessNodeContainer.Addr(testnet.GRPCPort), grpc.WithTransportCredentials(insecure.NewCredentials())) + s.Require().NoError(err) + s.Require().NotNil(conn) + + // Create a client for the access API + accessClient := accessproto.NewAccessAPIClient(conn) + serviceClient, err := accessNodeContainer.TestnetClient() + s.Require().NoError(err) + s.Require().NotNil(serviceClient) + + // Get the latest block ID + latestBlockID, err := serviceClient.GetLatestBlockID(s.ctx) + s.Require().NoError(err) + + // Generate a new account transaction + accountKey := test.AccountKeyGenerator().New() + payer := serviceClient.SDKServiceAddress() + + tx, err := templates.CreateAccount([]*sdk.AccountKey{accountKey}, nil, payer) + s.Require().NoError(err) + tx.SetComputeLimit(1000). + SetReferenceBlockID(sdk.HexToID(latestBlockID.String())). + SetProposalKey(payer, 0, serviceClient.GetSeqNumber()). + SetPayer(payer) + + tx, err = serviceClient.SignTransaction(tx) + s.Require().NoError(err) + + // Convert the transaction to a message format expected by the access API + authorizers := make([][]byte, len(tx.Authorizers)) + for i, auth := range tx.Authorizers { + authorizers[i] = auth.Bytes() + } + + convertToMessageSig := func(sigs []sdk.TransactionSignature) []*entities.Transaction_Signature { + msgSigs := make([]*entities.Transaction_Signature, len(sigs)) + for i, sig := range sigs { + msgSigs[i] = &entities.Transaction_Signature{ + Address: sig.Address.Bytes(), + KeyId: uint32(sig.KeyIndex), + Signature: sig.Signature, + } + } + + return msgSigs + } + + transactionMsg := &entities.Transaction{ + Script: tx.Script, + Arguments: tx.Arguments, + ReferenceBlockId: tx.ReferenceBlockID.Bytes(), + GasLimit: tx.GasLimit, + ProposalKey: &entities.Transaction_ProposalKey{ + Address: tx.ProposalKey.Address.Bytes(), + KeyId: uint32(tx.ProposalKey.KeyIndex), + SequenceNumber: tx.ProposalKey.SequenceNumber, + }, + Payer: tx.Payer.Bytes(), + Authorizers: authorizers, + PayloadSignatures: convertToMessageSig(tx.PayloadSignatures), + EnvelopeSignatures: convertToMessageSig(tx.EnvelopeSignatures), + } + + // Send and subscribe to the transaction status using the access API + subClient, err := accessClient.SendAndSubscribeTransactionStatuses(s.ctx, &accessproto.SendAndSubscribeTransactionStatusesRequest{ + Transaction: transactionMsg, + }) + s.Require().NoError(err) + + expectedCounter := uint64(0) + var finalTxStatus entities.TransactionStatus + var txID sdk.Identifier + + for { + resp, err := subClient.Recv() + if err != nil { + if err == io.EOF { + break + } + + s.Require().NoError(err) + } + + if txID == sdk.EmptyID { + txID = sdk.Identifier(resp.GetId()) + } + + s.Assert().Equal(expectedCounter, resp.GetMessageIndex()) + s.Assert().Equal(txID, sdk.Identifier(resp.GetId())) + + expectedCounter++ + finalTxStatus = resp.Status + } + + s.Assert().Equal(entities.TransactionStatus_SEALED, finalTxStatus) +} + func (s *AccessAPISuite) testGetAccount(client *client.Client) { header, err := client.GetLatestBlockHeader(s.ctx, true) s.Require().NoError(err)