Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: step 3-02-CreateTransaction #12181

Merged
merged 15 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/big"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -45,8 +46,10 @@ type inMemoryStore[

keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ]
persistentTxStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
maxUnstarted uint64

addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
addressStatesLock sync.RWMutex
addressStates map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
}

// NewInMemoryStore returns a new inMemoryStore
Expand All @@ -73,9 +76,9 @@ func NewInMemoryStore[
addressStates: map[ADDR]*addressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{},
}

maxUnstarted := config.MaxQueued()
if maxUnstarted <= 0 {
maxUnstarted = 10000
ms.maxUnstarted = config.MaxQueued()
if ms.maxUnstarted <= 0 {
ms.maxUnstarted = 10000
}
addresses, err := keyStore.EnabledAddressesForChain(ctx, chainID)
if err != nil {
Expand All @@ -86,7 +89,7 @@ func NewInMemoryStore[
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, maxUnstarted, txs)
ms.addressStates[fromAddr] = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](lggr, chainID, fromAddr, ms.maxUnstarted, txs)
}

return &ms, nil
Expand All @@ -101,7 +104,31 @@ func (ms *inMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Creat
txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
error,
) {
return txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, nil
tx := txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if ms.chainID.String() != chainID.String() {
panic(fmt.Sprintf(ErrInvalidChainID.Error()+": %s", chainID.String()))
}

ms.addressStatesLock.RLock()
defer ms.addressStatesLock.RUnlock()
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
as, ok := ms.addressStates[txRequest.FromAddress]
if !ok {
as = newAddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](ms.lggr, chainID, txRequest.FromAddress, ms.maxUnstarted, nil)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}

// Persist Transaction to persistent storage
tx, err := ms.persistentTxStore.CreateTransaction(ctx, txRequest, chainID)
if err != nil {
return tx, fmt.Errorf("create_transaction: %w", err)
}

// Update in memory store
// Add the request to the Unstarted channel to be processed by the Broadcaster
if err := as.addTxToUnstartedQueue(&tx); err != nil {
return *ms.deepCopyTx(tx), fmt.Errorf("create_transaction: %w", err)
poopoothegorilla marked this conversation as resolved.
Show resolved Hide resolved
}

return *ms.deepCopyTx(tx), nil
}

// FindTxWithIdempotencyKey returns a transaction with the given idempotency key
Expand Down
76 changes: 76 additions & 0 deletions core/chains/evm/txmgr/evm_inmemory_store_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,90 @@
package txmgr_test

import (
"context"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
commontxmgr "github.com/smartcontractkit/chainlink/v2/common/txmgr"
evmassets "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
evmgas "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
)

func TestInMemoryStore_CreateTransaction(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
_, dbcfg, evmcfg := evmtxmgr.MakeTestConfigs(t)
persistentStore := cltest.NewTestTxStore(t, db, dbcfg)
kst := cltest.NewKeyStore(t, db, dbcfg)
_, fromAddress := cltest.MustInsertRandomKey(t, kst.Eth())

ethClient := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.TestSugared(t)
chainID := ethClient.ConfiguredChainID()
ctx := context.Background()

inMemoryStore, err := commontxmgr.NewInMemoryStore[
*big.Int,
common.Address, common.Hash, common.Hash,
*evmtypes.Receipt,
evmtypes.Nonce,
evmgas.EvmFee,
](ctx, lggr, chainID, kst.Eth(), persistentStore, evmcfg.Transactions())
require.NoError(t, err)

toAddress := testutils.NewAddress()
gasLimit := uint32(1000)
payload := []byte{1, 2, 3}

t.Run("with queue under capacity inserts eth_tx", func(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
actTx, err := inMemoryStore.CreateTransaction(testutils.Context(t), evmtxmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
EncodedPayload: payload,
FeeLimit: uint64(gasLimit),
Meta: nil,
Strategy: strategy,
}, chainID)
require.NoError(t, err)

// check that the transaction was inserted into the persistent store
cltest.AssertCount(t, db, "evm.txes", 1)

var dbEthTx evmtxmgr.DbEthTx
require.NoError(t, db.Get(&dbEthTx, `SELECT * FROM evm.txes ORDER BY id ASC LIMIT 1`))

assert.Equal(t, commontxmgr.TxUnstarted, dbEthTx.State)
assert.Equal(t, gasLimit, dbEthTx.GasLimit)
assert.Equal(t, fromAddress, dbEthTx.FromAddress)
assert.Equal(t, toAddress, dbEthTx.ToAddress)
assert.Equal(t, payload, dbEthTx.EncodedPayload)
assert.Equal(t, evmassets.NewEthValue(0), dbEthTx.Value)
assert.Equal(t, subject, dbEthTx.Subject.UUID)

var expTx evmtxmgr.Tx
dbEthTx.ToTx(&expTx)

// check that the in-memory store has the same transaction data as the persistent store
assertTxEqual(t, expTx, actTx)
})
}

// assertTxEqual asserts that two transactions are equal
func assertTxEqual(t *testing.T, exp, act evmtxmgr.Tx) {
assert.Equal(t, exp.ID, act.ID)
Expand Down
Loading