Skip to content

Commit

Permalink
node: Fix issue where transfers that were loaded from the DB did not …
Browse files Browse the repository at this point in the history
…add a flow-cancel transfer on the TargetChain (#4002)

node: Fix issue where transfers that were loaded from the DB did not add
a flow-cancel transfer on the TargetChain

Flow-canceling is done in the `ProcessMsgForTime` loop when a new
message occurs. However, this was not done when a node restarted and
reloaded transfers from the past 24 hours. As a result it was possible
for the node to calculate a result that showed that the outgoing
transfers for an emitter chain exceeded the daily limit. In effect this
is true but only with the condition that there was incoming flow to
allow this to happen. This appeared to violate an invariant and so the
node did not start properly.

Add unit tests when reloading flow cancel transactions from the
database
  • Loading branch information
johnsaigle authored Jul 3, 2024
1 parent d146f82 commit 7c6e4c5
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 0 deletions.
31 changes: 31 additions & 0 deletions node/pkg/governor/governor_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ func (gov *ChainGovernor) loadFromDB() error {
return gov.loadFromDBAlreadyLocked()
}

// loadFromDBAlreadyLocked method loads transfers and pending data from the database and modifies the corresponding fields in the ChainGovernor.
// These fields are slices of transfers or pendingTransfers and will be sorted by their Timestamp property.
// Modifies the state of the database as a side-effect: 'transfers' that are older than 24 hours are deleted.
func (gov *ChainGovernor) loadFromDBAlreadyLocked() error {
xfers, pending, err := gov.db.GetChainGovernorData(gov.logger)
if err != nil {
Expand Down Expand Up @@ -154,10 +157,16 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) {
zap.String("Hash", hash),
)

// Note: no flow cancel added here. We only want to add an inverse, flow-cancel transfer when the transfer is
// released from the pending queue, not when it's added.
ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending})
gov.msgsSeen[hash] = transferEnqueued
}

// reloadTransfer method processes a db.Transfer and validates that it should be loaded into `gov`.
// Modifies `gov` as a side-effect: when a valid transfer is loaded, the properties 'transfers' and 'msgsSeen' are
// updated with information about the loaded transfer. In the case where a flow-canceling asset's transfer is loaded,
// both chain entries (emitter and target) will be updated.
func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
ce, exists := gov.chains[xfer.EmitterChain]
if !exists {
Expand Down Expand Up @@ -233,5 +242,27 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
return err
}
ce.transfers = append(ce.transfers, transfer)

// Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but
// that function does not capture flow-cancelling when the node is restarted.
tokenEntry := gov.tokens[tk]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil {
return err
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", xfer.MsgID),
zap.Stringer("token chain", xfer.OriginChain),
zap.Stringer("token address", xfer.OriginAddress),
zap.Stringer("target chain", xfer.TargetChain),
)
}
}
}
return nil
}
197 changes: 197 additions & 0 deletions node/pkg/governor/governor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,6 +2337,203 @@ func TestDontReloadDuplicates(t *testing.T) {
assert.Equal(t, uint64(4436), valuePending)
}

// With the addition of the flow-cancel feature, it's possible to in a way "exceed the daily limit" of outflow from a
// Governor as long as there is corresponding inflow of a flow-canceling asset to allow for additional outflow.
// When the node is restarted, it reloads all transfers and pending transfers. If the actual outflow is greater than
// the daily limit (due to flow cancel) ensure that the calculated limit on start-up is correct.
// This test ensures that governor usage limits are correctly calculated when reloading transfers from the database.
func TestReloadTransfersNearCapacity(t *testing.T) {
// Setup
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)

require.NoError(t, err)
assert.NotNil(t, gov)

// Set-up time
gov.setDayLengthInMinutes(24 * 60)
transferTime := time.Now()

// Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works
// when the Origin chain of the asset does not match the emitter chain
// NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified
var flowCancelTokenOriginAddress vaa.Address
flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61")
require.NoError(t, err)

var notFlowCancelTokenOriginAddress vaa.Address
notFlowCancelTokenOriginAddress, err = vaa.StringToAddress("77777af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f7777")
require.NoError(t, err)

// Data for Ethereum
tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec
tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum)
require.NoError(t, err)

// Data for Sui
tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec
tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui)
require.NoError(t, err)

// Data for Solana. Only used to represent the flow cancel asset.
// "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb"
tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec

// Add chain entries to `gov`
err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, 10000, 50000)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, 10000, 0)
require.NoError(t, err)
err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, 10000, 0)
require.NoError(t, err)

// Add flow cancel asset and non-flow cancelable asset to the token entry for `gov`
err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true)
require.NoError(t, err)
assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}])
err = gov.setTokenForTesting(vaa.ChainIDEthereum, notFlowCancelTokenOriginAddress.String(), "NOTCANCELABLE", 1.0, false)
require.NoError(t, err)

// This transfer should exhaust the dailyLimit for the emitter chain
xfer1 := &db.Transfer{
Timestamp: transferTime.Add(-10),
Value: uint64(10000),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
TargetAddress: tokenBridgeAddrSui,
TargetChain: vaa.ChainIDSui,
MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125",
Hash: "Hash1",
}

// This incoming transfer should free up some of the space on the previous emitter chain
xfer2 := &db.Transfer{
Timestamp: transferTime.Add(-9),
Value: uint64(2000),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDSui,
EmitterAddress: tokenBridgeAddrSui,
TargetAddress: tokenBridgeAddrEthereum,
TargetChain: vaa.ChainIDEthereum,
MsgID: "2/" + tokenBridgeAddrSui.String() + "/126",
Hash: "Hash2",
}

// Send another transfer out from the original emitter chain so that we "exceed the daily limit" if flow
// cancel is not applied
xfer3 := &db.Transfer{
Timestamp: transferTime.Add(-8),
Value: uint64(50),
OriginChain: vaa.ChainIDSolana,
OriginAddress: flowCancelTokenOriginAddress,
EmitterChain: vaa.ChainIDEthereum,
EmitterAddress: tokenBridgeAddrEthereum,
TargetAddress: tokenBridgeAddrSui,
TargetChain: vaa.ChainIDSui,
MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125",
Hash: "Hash3",
}

// Simulate reloading from the database.
// NOTE: The actual execution path we want to test is the following and runs when the node is restarted:
// gov.Run () --> gov.loadFromDb() --> gov.loadFromDBAlreadyLocked() --> gov.reloadTransfer()
// We don't have access to Run() from the test suite and the other functions are mocked to return `nil`.
// Therefore, the remainder of this test proceeds by operating on a list of `transfersLoadedFromDb` which
// simulates loading transfers from the database.
// From here we proceed with the next function we can actually test: `reloadTransfer()`.

// STEP 0: Initial state
assert.Equal(t, len(gov.msgsSeen), 0)
numTrans, netValueTransferred, numPending, valuePending := gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 0, numTrans)
assert.Equal(t, int64(0), netValueTransferred)
assert.Equal(t, 0, numPending)
assert.Equal(t, uint64(0), valuePending)

chainEntryEth, exists := gov.chains[vaa.ChainIDEthereum]
require.True(t, exists)
chainEntrySui, exists := gov.chains[vaa.ChainIDSui]
require.True(t, exists)

// STEP 1: Load first transfer
err = gov.reloadTransfer(xfer1)
require.NoError(t, err)
assert.Equal(t, len(gov.msgsSeen), 1)
numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 2, numTrans) // 1 plus transfer the inverse flow transfer on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000) = 20_000
_, valueTransferred, _, _ := gov.getStatsForAllChains()
assert.Equal(t, uint64(20000), valueTransferred)

governorUsageEth, err := gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(10000), governorUsageEth)
assert.Zero(t, governorUsageEth-chainEntryEth.dailyLimit) // Make sure we used the whole capacity
require.NoError(t, err)
governorUsageSui, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, governorUsageSui)
require.NoError(t, err)
sumTransfersSui, _, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-10000), sumTransfersSui)
require.NoError(t, err)

// STEP 2: Load second transfer
err = gov.reloadTransfer(xfer2)
require.NoError(t, err)
assert.Equal(t, len(gov.msgsSeen), 2)
numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 4, numTrans) // 2 transfers and their inverse flow transfers on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000 + 2_000) = 24_000
_, valueTransferred, _, _ = gov.getStatsForAllChains()
assert.Equal(t, uint64(24000), valueTransferred)

governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(8000), governorUsageEth)
assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 2000) // Remaining capacity
require.NoError(t, err)
governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Zero(t, governorUsageSui)
require.NoError(t, err)
sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-8000), sumTransfersSui)
require.NoError(t, err)

// STEP 3: Load third transfer
err = gov.reloadTransfer(xfer3)
require.NoError(t, err)
// Sum of absolute value of all transfers, including inverse flow cancel transfers:
// 2 * (10_000 + 2_000 + 50) = 24_100
_, valueTransferred, _, _ = gov.getStatsForAllChains()
assert.Equal(t, uint64(24100), valueTransferred)

numTrans, netValueTransferred, numPending, valuePending = gov.getStatsForAllChainsCancelFlow()
assert.Equal(t, 6, numTrans) // 3 transfers and their inverse flow transfers on the TargetChain
assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers

governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, uint64(8050), governorUsageEth)
assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 1950) // Remaining capacity
require.NoError(t, err)
governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0))
require.NoError(t, err)
assert.Zero(t, governorUsageSui)
sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0))
assert.Equal(t, int64(-8050), sumTransfersSui)
require.NoError(t, err)

// Sanity check: make sure these are still empty/zero
assert.Equal(t, 0, numPending)
assert.Equal(t, uint64(0), valuePending)
}

func TestReobservationOfPublishedMsg(t *testing.T) {
ctx := context.Background()
gov, err := newChainGovernorForTest(ctx)
Expand Down

0 comments on commit 7c6e4c5

Please sign in to comment.