Skip to content

Commit

Permalink
[concurrent] Fix Ids between concurrent submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
gregnazario committed Jun 25, 2024
1 parent 40f6378 commit 22389f6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
7 changes: 3 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,8 @@ func concurrentTxnWaiter(
func Test_Concurrent_Submission(t *testing.T) {
const numTxns = uint64(100)
const numWaiters = 4
netConfig := LocalnetConfig

client, err := NewClient(netConfig)
client, err := createTestClient()
assert.NoError(t, err)

account1, err := NewEd25519Account()
Expand All @@ -416,7 +415,7 @@ func Test_Concurrent_Submission(t *testing.T) {
assert.NoError(t, err)

// start submission goroutine
payloads := make(chan TransactionSubmissionPayload, 50)
payloads := make(chan TransactionBuildPayload, 50)
results := make(chan TransactionSubmissionResponse, 50)
go client.nodeClient.BuildSignAndSubmitTransactions(account1, payloads, results, ExpirationSeconds(20))

Expand All @@ -425,7 +424,7 @@ func Test_Concurrent_Submission(t *testing.T) {

// Generate transactions
for i := uint64(0); i < numTxns; i++ {
payloads <- TransactionSubmissionPayload{
payloads <- TransactionBuildPayload{
Id: i,
Type: TransactionSubmissionTypeSingle, // TODO: not needed?
Inner: TransactionPayload{Payload: &EntryFunction{
Expand Down
35 changes: 21 additions & 14 deletions transactionSubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota
)

type TransactionSubmissionPayload struct {
type TransactionBuildPayload struct {
Id uint64
Type TransactionSubmissionType
Inner TransactionPayload // The actual transaction payload
Expand All @@ -30,6 +30,10 @@ type TransactionBuildResponse struct {
Response RawTransactionImpl
Err error
}
type TransactionSubmissionRequest struct {
Id uint64
SignedTxn *SignedTransaction
}

type TransactionSubmissionResponse struct {
Id uint64
Expand Down Expand Up @@ -57,7 +61,7 @@ func (snt *SequenceNumberTracker) Update(next uint64) uint64 {
}

// BuildTransactions start a goroutine to process [TransactionPayload] and spit out [RawTransactionImpl].
func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan TransactionSubmissionPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64, options ...any) {
func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan TransactionBuildPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64, options ...any) {
// Initialize state
account, err := rc.Account(sender)
if err != nil {
Expand Down Expand Up @@ -116,14 +120,14 @@ func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan Tra

// SubmitTransactions consumes signed transactions, submits to aptos-node, yields responses.
// closes output chan `responses` when input chan `signedTxns` is closed.
func (rc *NodeClient) SubmitTransactions(signedTxns chan *SignedTransaction, responses chan TransactionSubmissionResponse) {
func (rc *NodeClient) SubmitTransactions(requests chan TransactionSubmissionRequest, responses chan TransactionSubmissionResponse) {
defer close(responses)
for signedTxn := range signedTxns {
response, err := rc.SubmitTransaction(signedTxn)
for request := range requests {
response, err := rc.SubmitTransaction(request.SignedTxn)
if err != nil {
responses <- TransactionSubmissionResponse{Err: err}
responses <- TransactionSubmissionResponse{Id: request.Id, Err: err}
} else {
responses <- TransactionSubmissionResponse{Response: response}
responses <- TransactionSubmissionResponse{Id: request.Id, Response: response}
}
}
}
Expand All @@ -132,7 +136,7 @@ func (rc *NodeClient) SubmitTransactions(signedTxns chan *SignedTransaction, res
// Closes output chan `responses` on completion of input chan `payloads`.
func (rc *NodeClient) BuildSignAndSubmitTransactions(
sender TransactionSigner,
payloads chan TransactionSubmissionPayload,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
buildOptions ...any,
) {
Expand Down Expand Up @@ -176,7 +180,7 @@ func (rc *NodeClient) BuildSignAndSubmitTransactions(
// sender := NewEd25519Account()
// feePayer := NewEd25519Account()
//
// payloads := make(chan TransactionSubmissionPayload)
// payloads := make(chan TransactionBuildPayload)
// responses := make(chan TransactionSubmissionResponse)
//
// signingFunc := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
Expand Down Expand Up @@ -212,7 +216,7 @@ func (rc *NodeClient) BuildSignAndSubmitTransactions(
// }
func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
sender AccountAddress,
payloads chan TransactionSubmissionPayload,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
buildOptions ...any,
Expand All @@ -224,8 +228,8 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
setSequenceNumber := make(chan uint64)
go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber, buildOptions...)

signedTxns := make(chan *SignedTransaction, 20)
go rc.SubmitTransactions(signedTxns, responses)
submissionRequests := make(chan TransactionSubmissionRequest, 20)
go rc.SubmitTransactions(submissionRequests, responses)

var wg sync.WaitGroup

Expand All @@ -241,12 +245,15 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
if err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err}
} else {
signedTxns <- signedTxn
submissionRequests <- TransactionSubmissionRequest{
Id: buildResponse.Id,
SignedTxn: signedTxn,
}
}
}()
}
}

wg.Wait()
close(signedTxns)
close(submissionRequests)
}

0 comments on commit 22389f6

Please sign in to comment.