From 7c6e4c597c01a925e83d717056568b77e0804a2a Mon Sep 17 00:00:00 2001 From: John Saigle <4022790+johnsaigle@users.noreply.github.com> Date: Wed, 3 Jul 2024 15:15:35 -0400 Subject: [PATCH] node: Fix issue where transfers that were loaded from the DB did not add a flow-cancel transfer on the TargetChain (#4002) node: Fix issue where transfers that were loaded from the DB did not add a flow-cancel transfer on the TargetChain Flow-canceling is done in the `ProcessMsgForTime` loop when a new message occurs. However, this was not done when a node restarted and reloaded transfers from the past 24 hours. As a result it was possible for the node to calculate a result that showed that the outgoing transfers for an emitter chain exceeded the daily limit. In effect this is true but only with the condition that there was incoming flow to allow this to happen. This appeared to violate an invariant and so the node did not start properly. Add unit tests when reloading flow cancel transactions from the database --- node/pkg/governor/governor_db.go | 31 +++++ node/pkg/governor/governor_test.go | 197 +++++++++++++++++++++++++++++ 2 files changed, 228 insertions(+) diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index 146f2c26f9..a71fb194fa 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -18,6 +18,9 @@ func (gov *ChainGovernor) loadFromDB() error { return gov.loadFromDBAlreadyLocked() } +// loadFromDBAlreadyLocked method loads transfers and pending data from the database and modifies the corresponding fields in the ChainGovernor. +// These fields are slices of transfers or pendingTransfers and will be sorted by their Timestamp property. +// Modifies the state of the database as a side-effect: 'transfers' that are older than 24 hours are deleted. func (gov *ChainGovernor) loadFromDBAlreadyLocked() error { xfers, pending, err := gov.db.GetChainGovernorData(gov.logger) if err != nil { @@ -154,10 +157,16 @@ func (gov *ChainGovernor) reloadPendingTransfer(pending *db.PendingTransfer) { zap.String("Hash", hash), ) + // Note: no flow cancel added here. We only want to add an inverse, flow-cancel transfer when the transfer is + // released from the pending queue, not when it's added. ce.pending = append(ce.pending, &pendingEntry{token: token, amount: payload.Amount, hash: hash, dbData: *pending}) gov.msgsSeen[hash] = transferEnqueued } +// reloadTransfer method processes a db.Transfer and validates that it should be loaded into `gov`. +// Modifies `gov` as a side-effect: when a valid transfer is loaded, the properties 'transfers' and 'msgsSeen' are +// updated with information about the loaded transfer. In the case where a flow-canceling asset's transfer is loaded, +// both chain entries (emitter and target) will be updated. func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { ce, exists := gov.chains[xfer.EmitterChain] if !exists { @@ -233,5 +242,27 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { return err } ce.transfers = append(ce.transfers, transfer) + + // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but + // that function does not capture flow-cancelling when the node is restarted. + tokenEntry := gov.tokens[tk] + if tokenEntry != nil { + // Mandatory check to ensure that the token should be able to reduce the Governor limit. + if tokenEntry.flowCancels { + if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { + if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + return err + } + } else { + gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + zap.String("msgID", xfer.MsgID), + zap.Stringer("token chain", xfer.OriginChain), + zap.Stringer("token address", xfer.OriginAddress), + zap.Stringer("target chain", xfer.TargetChain), + ) + } + } + } return nil } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 7cec609b19..964ec53345 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -2337,6 +2337,203 @@ func TestDontReloadDuplicates(t *testing.T) { assert.Equal(t, uint64(4436), valuePending) } +// With the addition of the flow-cancel feature, it's possible to in a way "exceed the daily limit" of outflow from a +// Governor as long as there is corresponding inflow of a flow-canceling asset to allow for additional outflow. +// When the node is restarted, it reloads all transfers and pending transfers. If the actual outflow is greater than +// the daily limit (due to flow cancel) ensure that the calculated limit on start-up is correct. +// This test ensures that governor usage limits are correctly calculated when reloading transfers from the database. +func TestReloadTransfersNearCapacity(t *testing.T) { + // Setup + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + + require.NoError(t, err) + assert.NotNil(t, gov) + + // Set-up time + gov.setDayLengthInMinutes(24 * 60) + transferTime := time.Now() + + // Solana USDC used as the flow cancelling asset. This ensures that the flow cancel mechanism works + // when the Origin chain of the asset does not match the emitter chain + // NOTE: Replace this Chain:Address pair if the Flow Cancel Token List is modified + var flowCancelTokenOriginAddress vaa.Address + flowCancelTokenOriginAddress, err = vaa.StringToAddress("c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d61") + require.NoError(t, err) + + var notFlowCancelTokenOriginAddress vaa.Address + notFlowCancelTokenOriginAddress, err = vaa.StringToAddress("77777af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f7777") + require.NoError(t, err) + + // Data for Ethereum + tokenBridgeAddrStrEthereum := "0x0290fb167208af455bb137780163b7b7a9a10c16" //nolint:gosec + tokenBridgeAddrEthereum, err := vaa.StringToAddress(tokenBridgeAddrStrEthereum) + require.NoError(t, err) + + // Data for Sui + tokenBridgeAddrStrSui := "0xc57508ee0d4595e5a8728974a4a93a787d38f339757230d441e895422c07aba9" //nolint:gosec + tokenBridgeAddrSui, err := vaa.StringToAddress(tokenBridgeAddrStrSui) + require.NoError(t, err) + + // Data for Solana. Only used to represent the flow cancel asset. + // "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb" + tokenBridgeAddrStrSolana := "0x0e0a589e6488147a94dcfa592b90fdd41152bb2ca77bf6016758a6f4df9d21b4" //nolint:gosec + + // Add chain entries to `gov` + err = gov.setChainForTesting(vaa.ChainIDEthereum, tokenBridgeAddrStrEthereum, 10000, 50000) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSui, tokenBridgeAddrStrSui, 10000, 0) + require.NoError(t, err) + err = gov.setChainForTesting(vaa.ChainIDSolana, tokenBridgeAddrStrSolana, 10000, 0) + require.NoError(t, err) + + // Add flow cancel asset and non-flow cancelable asset to the token entry for `gov` + err = gov.setTokenForTesting(vaa.ChainIDSolana, flowCancelTokenOriginAddress.String(), "USDC", 1.0, true) + require.NoError(t, err) + assert.NotNil(t, gov.tokens[tokenKey{chain: vaa.ChainIDSolana, addr: flowCancelTokenOriginAddress}]) + err = gov.setTokenForTesting(vaa.ChainIDEthereum, notFlowCancelTokenOriginAddress.String(), "NOTCANCELABLE", 1.0, false) + require.NoError(t, err) + + // This transfer should exhaust the dailyLimit for the emitter chain + xfer1 := &db.Transfer{ + Timestamp: transferTime.Add(-10), + Value: uint64(10000), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + TargetAddress: tokenBridgeAddrSui, + TargetChain: vaa.ChainIDSui, + MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125", + Hash: "Hash1", + } + + // This incoming transfer should free up some of the space on the previous emitter chain + xfer2 := &db.Transfer{ + Timestamp: transferTime.Add(-9), + Value: uint64(2000), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDSui, + EmitterAddress: tokenBridgeAddrSui, + TargetAddress: tokenBridgeAddrEthereum, + TargetChain: vaa.ChainIDEthereum, + MsgID: "2/" + tokenBridgeAddrSui.String() + "/126", + Hash: "Hash2", + } + + // Send another transfer out from the original emitter chain so that we "exceed the daily limit" if flow + // cancel is not applied + xfer3 := &db.Transfer{ + Timestamp: transferTime.Add(-8), + Value: uint64(50), + OriginChain: vaa.ChainIDSolana, + OriginAddress: flowCancelTokenOriginAddress, + EmitterChain: vaa.ChainIDEthereum, + EmitterAddress: tokenBridgeAddrEthereum, + TargetAddress: tokenBridgeAddrSui, + TargetChain: vaa.ChainIDSui, + MsgID: "2/" + tokenBridgeAddrEthereum.String() + "/125", + Hash: "Hash3", + } + + // Simulate reloading from the database. + // NOTE: The actual execution path we want to test is the following and runs when the node is restarted: + // gov.Run () --> gov.loadFromDb() --> gov.loadFromDBAlreadyLocked() --> gov.reloadTransfer() + // We don't have access to Run() from the test suite and the other functions are mocked to return `nil`. + // Therefore, the remainder of this test proceeds by operating on a list of `transfersLoadedFromDb` which + // simulates loading transfers from the database. + // From here we proceed with the next function we can actually test: `reloadTransfer()`. + + // STEP 0: Initial state + assert.Equal(t, len(gov.msgsSeen), 0) + numTrans, netValueTransferred, numPending, valuePending := gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 0, numTrans) + assert.Equal(t, int64(0), netValueTransferred) + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) + + chainEntryEth, exists := gov.chains[vaa.ChainIDEthereum] + require.True(t, exists) + chainEntrySui, exists := gov.chains[vaa.ChainIDSui] + require.True(t, exists) + + // STEP 1: Load first transfer + err = gov.reloadTransfer(xfer1) + require.NoError(t, err) + assert.Equal(t, len(gov.msgsSeen), 1) + numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 2, numTrans) // 1 plus transfer the inverse flow transfer on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000) = 20_000 + _, valueTransferred, _, _ := gov.getStatsForAllChains() + assert.Equal(t, uint64(20000), valueTransferred) + + governorUsageEth, err := gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(10000), governorUsageEth) + assert.Zero(t, governorUsageEth-chainEntryEth.dailyLimit) // Make sure we used the whole capacity + require.NoError(t, err) + governorUsageSui, err := gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, governorUsageSui) + require.NoError(t, err) + sumTransfersSui, _, err := gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-10000), sumTransfersSui) + require.NoError(t, err) + + // STEP 2: Load second transfer + err = gov.reloadTransfer(xfer2) + require.NoError(t, err) + assert.Equal(t, len(gov.msgsSeen), 2) + numTrans, netValueTransferred, _, _ = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 4, numTrans) // 2 transfers and their inverse flow transfers on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000 + 2_000) = 24_000 + _, valueTransferred, _, _ = gov.getStatsForAllChains() + assert.Equal(t, uint64(24000), valueTransferred) + + governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(8000), governorUsageEth) + assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 2000) // Remaining capacity + require.NoError(t, err) + governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Zero(t, governorUsageSui) + require.NoError(t, err) + sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-8000), sumTransfersSui) + require.NoError(t, err) + + // STEP 3: Load third transfer + err = gov.reloadTransfer(xfer3) + require.NoError(t, err) + // Sum of absolute value of all transfers, including inverse flow cancel transfers: + // 2 * (10_000 + 2_000 + 50) = 24_100 + _, valueTransferred, _, _ = gov.getStatsForAllChains() + assert.Equal(t, uint64(24100), valueTransferred) + + numTrans, netValueTransferred, numPending, valuePending = gov.getStatsForAllChainsCancelFlow() + assert.Equal(t, 6, numTrans) // 3 transfers and their inverse flow transfers on the TargetChain + assert.Equal(t, int64(0), netValueTransferred) // Value cancels out for all transfers + + governorUsageEth, err = gov.TrimAndSumValueForChain(chainEntryEth, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, uint64(8050), governorUsageEth) + assert.Equal(t, int(chainEntryEth.dailyLimit-governorUsageEth), 1950) // Remaining capacity + require.NoError(t, err) + governorUsageSui, err = gov.TrimAndSumValueForChain(chainEntrySui, time.Unix(int64(transferTime.Unix()-1000), 0)) + require.NoError(t, err) + assert.Zero(t, governorUsageSui) + sumTransfersSui, _, err = gov.TrimAndSumValue(chainEntrySui.transfers, time.Unix(int64(transferTime.Unix()-1000), 0)) + assert.Equal(t, int64(-8050), sumTransfersSui) + require.NoError(t, err) + + // Sanity check: make sure these are still empty/zero + assert.Equal(t, 0, numPending) + assert.Equal(t, uint64(0), valuePending) +} + func TestReobservationOfPublishedMsg(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx)