Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] SendAndSubscribeTransactionStatuses endpoint implementation for Access Streaming API #5310

Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b9c1537
Implemented transaction status streaming
Guitarheroua Jan 15, 2024
56b9f5b
Fixed backend and tests
Guitarheroua Jan 29, 2024
117b881
Fixed tests
Guitarheroua Jan 29, 2024
62c09c0
Fixed comments
Guitarheroua Jan 29, 2024
08e3de0
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Jan 29, 2024
87f04d5
Fixed issue with multiple subscribtion.
Guitarheroua Jan 31, 2024
b9bf00c
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Jan 31, 2024
2594bf5
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Feb 12, 2024
fd4bf03
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Feb 16, 2024
51ef06a
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Feb 16, 2024
5ffeae7
Make changes for Chain State tracker
Guitarheroua Feb 19, 2024
59817d4
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
Guitarheroua Feb 19, 2024
9cc36df
Linted
Guitarheroua Feb 19, 2024
c402ec2
Merge branch 'guitarheroua/send-and-subscribe-transaction_statuses' o…
Guitarheroua Feb 19, 2024
78e2ad1
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Guitarheroua Feb 27, 2024
a77f107
Added godoc
Guitarheroua Feb 28, 2024
48db475
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Feb 28, 2024
8defed0
Added comments
Guitarheroua Feb 28, 2024
087ba90
Fixed conflicts
Guitarheroua Feb 28, 2024
ea30dd4
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
franklywatson Feb 28, 2024
6bbba07
Fixed subscribtion and tests.
Guitarheroua Feb 29, 2024
1579d57
Merge branch 'guitarheroua/send-and-subscribe-transaction_statuses' o…
Guitarheroua Feb 29, 2024
d625f4a
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Guitarheroua Feb 29, 2024
67b9ff2
Fixed issue with finalized tx status
Guitarheroua Mar 1, 2024
38b9779
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
franklywatson Mar 1, 2024
41c0446
Fixed remarks
Guitarheroua Mar 1, 2024
bbffca9
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
franklywatson Mar 5, 2024
d6ac926
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
Guitarheroua Mar 12, 2024
557029b
Fixed remarks
Guitarheroua Mar 12, 2024
96d0fe8
Renamed SubscribeTransactionStatuses
Guitarheroua Mar 12, 2024
8a1081d
Fixed exe results
Guitarheroua Mar 12, 2024
890ae5b
fixed remarks
Guitarheroua Mar 13, 2024
eb16e71
Merge remote-tracking branch 'UlyanaAndrukhiv/UlyanaAndrukhiv/subscri…
Guitarheroua Mar 13, 2024
c74a3b6
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
peterargue Mar 13, 2024
27fadb9
Updated flow-emulator
Guitarheroua Mar 14, 2024
33a9f8e
Apply suggestions from code review
Guitarheroua Mar 14, 2024
44ea43f
Revert unnecessary changes
Guitarheroua Mar 14, 2024
61ad359
Merge branch 'guitarheroua/send-and-subscribe-transaction_statuses' o…
Guitarheroua Mar 14, 2024
86b74e4
Fixed remarks
Guitarheroua Mar 14, 2024
3f231cb
Fixed test
Guitarheroua Mar 14, 2024
37014a7
Fixed remarks
Guitarheroua Mar 15, 2024
638d451
Apply suggestions from code review
Guitarheroua Mar 15, 2024
cd6b28c
Fixed issue with leak on sub cancel
Guitarheroua Mar 15, 2024
87aab74
Add integration test
Guitarheroua Mar 15, 2024
78a4097
Fixed integration test
Guitarheroua Mar 15, 2024
d600685
Linted
Guitarheroua Mar 15, 2024
85be1bd
Fixed logs
Guitarheroua Mar 15, 2024
d80b1b5
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
Guitarheroua Mar 15, 2024
3679c76
Fixed incorrect tx subscribtion behaviour
Guitarheroua Mar 18, 2024
16c9318
Fixed remarks
Guitarheroua Mar 19, 2024
a321f35
Removed comments
Guitarheroua Mar 19, 2024
bcca0aa
Fixed integration test
Guitarheroua Mar 19, 2024
9d6484c
Removed sentinel error for result not available
Guitarheroua Mar 19, 2024
471fa6c
Added graceful close. Fixed integration test
Guitarheroua Mar 19, 2024
81465f2
Change to ctx without cancel
Guitarheroua Mar 21, 2024
a2eb6f7
Merge branch 'master' into guitarheroua/send-and-subscribe-transactio…
Guitarheroua Mar 21, 2024
6c092e1
Update access/handler.go
Guitarheroua Mar 21, 2024
c48a862
Updated flow-emulator version
Guitarheroua Mar 21, 2024
49a6c68
Updated flow-emulator version
Guitarheroua Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions access/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ type API interface {
//
// If invalid parameters will be supplied SubscribeBlockDigestsFromLatest will return a failed subscription.
SubscribeBlockDigestsFromLatest(ctx context.Context, blockStatus flow.BlockStatus) subscription.Subscription
// SubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription
}

// TODO: Combine this with flow.TransactionResult?
Expand Down
40 changes: 37 additions & 3 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,9 +1086,43 @@ func (h *Handler) getSubscriptionDataFromStartBlockID(msgBlockId []byte, msgBloc
return startBlockID, blockStatus, nil
}

func (h *Handler) SendAndSubscribeTransactionStatuses(_ *access.SendAndSubscribeTransactionStatusesRequest, _ access.AccessAPI_SendAndSubscribeTransactionStatusesServer) error {
// not implemented
return nil
// SendAndSubscribeTransactionStatuses streams transaction statuses starting from the reference block saved in the
// transaction itself until the block containing the transaction becomes sealed or expired. When the transaction
// status becomes TransactionStatusSealed or TransactionStatusExpired, the subscription will automatically shut down.
func (h *Handler) SendAndSubscribeTransactionStatuses(
request *access.SendAndSubscribeTransactionStatusesRequest,
stream access.AccessAPI_SendAndSubscribeTransactionStatusesServer,
) error {
subCtx, cancel := context.WithCancel(stream.Context())
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we're not using cancel anymore, can you remove it

Suggested change
subCtx, cancel := context.WithCancel(stream.Context())
defer cancel()
ctx := stream.Context()


// check if the maximum number of streams is reached
if h.StreamCount.Load() >= h.MaxStreams {
return status.Errorf(codes.ResourceExhausted, "maximum number of streams reached")
}
h.StreamCount.Add(1)
defer h.StreamCount.Add(-1)

tx, err := convert.MessageToTransaction(request.GetTransaction(), h.chain)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

err = h.api.SendTransaction(subCtx, &tx)
if err != nil {
return err
}

sub := h.api.SubscribeTransactionStatuses(subCtx, &tx)

Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
return subscription.HandleSubscription(sub, func(txSubInfo *convert.TransactionSubscribeInfo) error {
err = stream.Send(convert.TransactionSubscribeInfoToMessage(txSubInfo))
if err != nil {
return rpc.ConvertError(err, "could not send response", codes.Internal)
}

return nil
})
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
Expand Down
16 changes: 16 additions & 0 deletions access/mock/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 37 additions & 24 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Backend struct {
backendExecutionResults
backendNetwork
backendSubscribeBlocks
backendSubscribeTransactions

state protocol.State
chainID flow.ChainID
Expand Down Expand Up @@ -169,6 +170,15 @@ func New(params Params) (*Backend, error) {
// initialize node version info
nodeInfo := getNodeVersionInfo(params.State.Params())

transactionsLocalDataProvider := &TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
}

b := &Backend{
state: params.State,
BlockTracker: params.BlockTracker,
Expand All @@ -186,30 +196,23 @@ func New(params Params) (*Backend, error) {
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
TransactionsLocalDataProvider: TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
systemTxID: systemTxID,
},
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
TransactionsLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
chainID: params.ChainID,
transactions: params.Transactions,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
retry: retry,
connFactory: params.ConnFactory,
previousAccessNodes: params.HistoricalAccessNodes,
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
systemTx: systemTx,
systemTxID: systemTxID,
},
backendEvents: backendEvents{
log: params.Log,
Expand Down Expand Up @@ -261,6 +264,16 @@ func New(params Params) (*Backend, error) {
sendBufferSize: params.SubscriptionParams.SendBufferSize,
blockTracker: params.BlockTracker,
},
backendSubscribeTransactions: backendSubscribeTransactions{
txLocalDataProvider: transactionsLocalDataProvider,
log: params.Log,
executionResults: params.ExecutionResults,
broadcaster: params.SubscriptionParams.Broadcaster,
sendTimeout: params.SubscriptionParams.SendTimeout,
responseLimit: params.SubscriptionParams.ResponseLimit,
sendBufferSize: params.SubscriptionParams.SendBufferSize,
blockTracker: params.BlockTracker,
},
collections: params.Collections,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
Expand Down
195 changes: 195 additions & 0 deletions engine/access/rpc/backend/backend_stream_transactions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package backend

import (
"context"
"errors"
"fmt"
"time"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/state"

"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/module/counters"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

// backendSubscribeTransactions handles transaction subscriptions.
type backendSubscribeTransactions struct {
txLocalDataProvider *TransactionsLocalDataProvider
executionResults storage.ExecutionResults
log zerolog.Logger
broadcaster *engine.Broadcaster
sendTimeout time.Duration
responseLimit float64
sendBufferSize int

blockTracker subscription.BlockTracker
}

// TransactionSubscriptionMetadata holds data representing the status state for each transaction subscription.
type TransactionSubscriptionMetadata struct {
txID flow.Identifier
txReferenceBlockID flow.Identifier
messageIndex counters.StrictMonotonousCounter
blockWithTx *flow.Header
blockID flow.Identifier
txExecuted bool
lastTxStatus flow.TransactionStatus
}

// SubscribeTransactionStatuses subscribes to transaction status changes starting from the transaction reference block ID.
// If invalid tx parameters will be supplied SubscribeTransactionStatuses will return a failed subscription.
func (b *backendSubscribeTransactions) SubscribeTransactionStatuses(ctx context.Context, tx *flow.TransactionBody) subscription.Subscription {
nextHeight, err := b.blockTracker.GetStartHeightFromBlockID(tx.ReferenceBlockID)
if err != nil {
return subscription.NewFailedSubscription(err, "could not get start height")
}

txInfo := TransactionSubscriptionMetadata{
txID: tx.ID(),
txReferenceBlockID: tx.ReferenceBlockID,
messageIndex: counters.NewMonotonousCounter(0),
blockWithTx: nil,
blockID: flow.ZeroID,
lastTxStatus: flow.TransactionStatusUnknown,
}

sub := subscription.NewHeightBasedSubscription(
b.sendBufferSize,
nextHeight,
b.getTransactionStatusResponse(&txInfo),
)

go subscription.NewStreamer(b.log, b.broadcaster, b.sendTimeout, b.responseLimit, sub).Stream(ctx)

return sub
}

// getTransactionStatusResponse returns a callback function that produces transaction status
// subscription responses based on new blocks.
func (b *backendSubscribeTransactions) getTransactionStatusResponse(txInfo *TransactionSubscriptionMetadata) func(context.Context, uint64) (interface{}, error) {
return func(ctx context.Context, height uint64) (interface{}, error) {
highestHeight, err := b.blockTracker.GetHighestHeight(flow.BlockStatusFinalized)
if err != nil {
return nil, fmt.Errorf("could not get highest height for block %d: %w", height, err)
}

// Fail early if no block finalized notification has been received for the given height.
// Note: It's possible that the block is locally finalized before the notification is
// received. This ensures a consistent view is available to all streams.
if height > highestHeight {
return nil, fmt.Errorf("block %d is not available yet: %w", height, subscription.ErrBlockNotReady)
}

if txInfo.lastTxStatus == flow.TransactionStatusSealed || txInfo.lastTxStatus == flow.TransactionStatusExpired {
return nil, fmt.Errorf("transaction final status %s was already reported: %w", txInfo.lastTxStatus.String(), subscription.ErrEndOfData)
}

if txInfo.blockWithTx == nil {
// Check if block contains transaction.
txInfo.blockWithTx, txInfo.blockID, err = b.searchForTransactionBlock(height, txInfo)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, fmt.Errorf("could not find block %d in storage: %w", height, subscription.ErrBlockNotReady)
}

if !errors.Is(err, ErrTransactionNotInBlock) {
return nil, status.Errorf(codes.Internal, "could not get block %d: %v", height, err)
}
}
}

// Find the transaction status.
var txStatus flow.TransactionStatus
if txInfo.blockWithTx == nil {
txStatus, err = b.txLocalDataProvider.DeriveUnknownTransactionStatus(txInfo.txReferenceBlockID)
} else {
if !txInfo.txExecuted {
// Check if transaction was executed.
txInfo.txExecuted, err = b.searchForExecutionResult(txInfo.blockID)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get execution result for block %s: %v", txInfo.blockID, err)
}
}

txStatus, err = b.txLocalDataProvider.DeriveTransactionStatus(txInfo.blockID, txInfo.blockWithTx.Height, txInfo.txExecuted)
}
if err != nil {
if !errors.Is(err, state.ErrUnknownSnapshotReference) {
irrecoverable.Throw(ctx, err)
}
return nil, rpc.ConvertStorageError(err)
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
}

// The same transaction status should not be reported, so return here with no response
if txInfo.lastTxStatus == txStatus {
return nil, nil
}
txInfo.lastTxStatus = txStatus

messageIndex := txInfo.messageIndex.Value()
if ok := txInfo.messageIndex.Set(messageIndex + 1); !ok {
return nil, status.Errorf(codes.Internal, "the message index has already been incremented to %d", txInfo.messageIndex.Value())
}

return &convert.TransactionSubscribeInfo{
ID: txInfo.txID,
Status: txInfo.lastTxStatus,
MessageIndex: messageIndex,
}, nil
}
}

// searchForTransactionBlock searches for the block containing the specified transaction.
// It retrieves the block at the given height and checks if the transaction is included in that block.
// Expected errors:
// - subscription.ErrBlockNotReady when unable to retrieve the block or collection ID
// - codes.Internal when other errors occur during block or collection lookup
func (b *backendSubscribeTransactions) searchForTransactionBlock(
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
height uint64,
txInfo *TransactionSubscriptionMetadata,
) (*flow.Header, flow.Identifier, error) {
block, err := b.txLocalDataProvider.blocks.ByHeight(height)
if err != nil {
return nil, flow.ZeroID, fmt.Errorf("error looking up block: %w", err)
}

collectionID, err := b.txLocalDataProvider.LookupCollectionIDInBlock(block, txInfo.txID)
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, flow.ZeroID, fmt.Errorf("error looking up transaction in block: %w", err)
}

if collectionID != flow.ZeroID {
return block.Header, block.ID(), nil
}

return nil, flow.ZeroID, nil
}

// searchForExecutionResult searches for the execution result of a block. It retrieves the execution result for the specified block ID.
// Expected errors:
// - codes.Internal if an internal error occurs while retrieving execution result.
func (b *backendSubscribeTransactions) searchForExecutionResult(
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
blockID flow.Identifier,
) (bool, error) {
_, err := b.executionResults.ByBlockID(blockID)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return false, nil
}
return false, fmt.Errorf("failed to get execution result for block %s: %w", blockID, err)
}

return true, nil
}
Loading
Loading