Skip to content

Commit

Permalink
[concurrent] Fix Ids between concurrent submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
gregnazario committed Jun 25, 2024
1 parent 887ca49 commit 609ae27
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
6 changes: 3 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func Test_AccountResources(t *testing.T) {
func Test_Concurrent_Submission(t *testing.T) {
const numTxns = uint64(10)

client, err := NewClient(DevnetConfig)
client, err := NewClient(LocalnetConfig)
assert.NoError(t, err)

account1, err := NewEd25519Account()
Expand All @@ -381,7 +381,7 @@ func Test_Concurrent_Submission(t *testing.T) {
assert.NoError(t, err)

// start submission goroutine
payloads := make(chan TransactionSubmissionPayload)
payloads := make(chan TransactionBuildPayload)
results := make(chan TransactionSubmissionResponse)
go client.nodeClient.BuildSignAndSubmitTransactions(account1, payloads, results)

Expand All @@ -390,7 +390,7 @@ func Test_Concurrent_Submission(t *testing.T) {

// Generate transactions
for i := uint64(0); i < numTxns; i++ {
payloads <- TransactionSubmissionPayload{
payloads <- TransactionBuildPayload{
Id: i,
Type: TransactionSubmissionTypeSingle, // TODO: not needed?
Inner: TransactionPayload{Payload: &EntryFunction{
Expand Down
38 changes: 25 additions & 13 deletions transactionSubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (
TransactionSubmissionTypeMultiAgent TransactionSubmissionType = iota
)

type TransactionSubmissionPayload struct {
type TransactionBuildPayload struct {
Id uint64
Type TransactionSubmissionType
Inner TransactionPayload // The actual transaction payload
Expand All @@ -28,6 +28,10 @@ type TransactionBuildResponse struct {
Response RawTransactionImpl
Err error
}
type TransactionSubmissionRequest struct {
Id uint64
SignedTxn SignedTransaction
}

type TransactionSubmissionResponse struct {
Id uint64
Expand Down Expand Up @@ -59,7 +63,7 @@ func (snt *SequenceNumberTracker) Update(new uint64) uint64 {
// BuildTransactions start a goroutine to process [TransactionPayload] and spit out [RawTransactionImpl].
//
// TODO: add optional arguments for configuring transactions as a whole?
func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan TransactionSubmissionPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64) {
func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan TransactionBuildPayload, responses chan TransactionBuildResponse, setSequenceNumber chan uint64) {
// Initialize state
account, err := rc.Account(sender)
if err != nil {
Expand Down Expand Up @@ -116,20 +120,25 @@ func (rc *NodeClient) BuildTransactions(sender AccountAddress, payloads chan Tra
}

// SubmitTransactions starts up a worker for sending signed transactions to on-chain
func (rc *NodeClient) SubmitTransactions(signedTxns chan *SignedTransaction, responses chan TransactionSubmissionResponse) {
func (rc *NodeClient) SubmitTransactions(requests chan TransactionSubmissionRequest, responses chan TransactionSubmissionResponse) {
for {
select {
case signedTxn, ok := <-signedTxns:
case request, ok := <-requests:
if !ok {
close(responses)
return
}

response, err := rc.SubmitTransaction(signedTxn)
err := request.SignedTxn.Verify()
if err != nil {
panic("transaction verification failed")
}

response, err := rc.SubmitTransaction(&request.SignedTxn)
if err != nil {
responses <- TransactionSubmissionResponse{Err: err}
responses <- TransactionSubmissionResponse{Id: request.Id, Err: err}
} else {
responses <- TransactionSubmissionResponse{Response: response}
responses <- TransactionSubmissionResponse{Id: request.Id, Response: response}
}
}
}
Expand All @@ -138,7 +147,7 @@ func (rc *NodeClient) SubmitTransactions(signedTxns chan *SignedTransaction, res
// BuildSignAndSubmitTransactions starts up a goroutine to process transactions for a single [TransactionSender]
func (rc *NodeClient) BuildSignAndSubmitTransactions(
sender TransactionSigner,
payloads chan TransactionSubmissionPayload,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
) {
singleSigner := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
Expand Down Expand Up @@ -178,7 +187,7 @@ func (rc *NodeClient) BuildSignAndSubmitTransactions(
// sender := NewEd25519Account()
// feePayer := NewEd25519Account()
//
// payloads := make(chan TransactionSubmissionPayload)
// payloads := make(chan TransactionBuildPayload)
// responses := make(chan TransactionSubmissionResponse)
//
// signingFunc := func(rawTxn RawTransactionImpl) (*SignedTransaction, error) {
Expand Down Expand Up @@ -214,7 +223,7 @@ func (rc *NodeClient) BuildSignAndSubmitTransactions(
// }
func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
sender AccountAddress,
payloads chan TransactionSubmissionPayload,
payloads chan TransactionBuildPayload,
responses chan TransactionSubmissionResponse,
sign func(rawTxn RawTransactionImpl) (*SignedTransaction, error),
) {
Expand All @@ -225,8 +234,8 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
setSequenceNumber := make(chan uint64)
go rc.BuildTransactions(sender, payloads, buildResponses, setSequenceNumber)

signedTxns := make(chan *SignedTransaction, 20)
go rc.SubmitTransactions(signedTxns, responses)
submissionRequests := make(chan TransactionSubmissionRequest, 20)
go rc.SubmitTransactions(submissionRequests, responses)

for {
select {
Expand All @@ -242,7 +251,10 @@ func (rc *NodeClient) BuildSignAndSubmitTransactionsWithSignFunction(
if err != nil {
responses <- TransactionSubmissionResponse{Id: buildResponse.Id, Err: err}
} else {
signedTxns <- signedTxn
submissionRequests <- TransactionSubmissionRequest{
Id: buildResponse.Id,
SignedTxn: *signedTxn,
}
}
}()
}
Expand Down

0 comments on commit 609ae27

Please sign in to comment.