diff --git a/components/ledger/internal/storage/bucket/migrations/11-stateless.sql b/components/ledger/internal/storage/bucket/migrations/11-stateless.sql index 1c27ca772c..30bfea1a39 100644 --- a/components/ledger/internal/storage/bucket/migrations/11-stateless.sql +++ b/components/ledger/internal/storage/bucket/migrations/11-stateless.sql @@ -103,9 +103,6 @@ create function "{{.Bucket}}".set_volumes() as $$ begin - --todo: debugging - perform pg_advisory_xact_lock(new.accounts_seq); - new.post_commit_volumes = coalesce(( select ( (post_commit_volumes).inputs + case when new.is_source then 0 else new.amount end, diff --git a/components/ledger/internal/storage/ledger/balances.go b/components/ledger/internal/storage/ledger/balances.go index c1b0539f21..89c675533c 100644 --- a/components/ledger/internal/storage/ledger/balances.go +++ b/components/ledger/internal/storage/ledger/balances.go @@ -62,7 +62,7 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool, } selectMoves = s.db.NewSelect(). TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)). - Column("asset", "account_address"). + Column("asset", "account_address", "account_address_array"). ColumnExpr("post_commit_volumes as volumes") } else { if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") { diff --git a/components/ledger/internal/storage/ledger/moves.go b/components/ledger/internal/storage/ledger/moves.go index f54ff3db5c..3d5208fd17 100644 --- a/components/ledger/internal/storage/ledger/moves.go +++ b/components/ledger/internal/storage/ledger/moves.go @@ -2,15 +2,14 @@ package ledger import ( "context" - "math/big" - "slices" - "github.com/formancehq/ledger/internal/opentelemetry/tracer" "github.com/formancehq/stack/libs/go-libs/bun/bunpaginate" "github.com/formancehq/stack/libs/go-libs/collectionutils" "github.com/formancehq/stack/libs/go-libs/platform/postgres" "github.com/formancehq/stack/libs/go-libs/time" "github.com/uptrace/bun" + "math/big" + "slices" ) type Move struct { @@ -77,6 +76,7 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery { TableExpr("(?) moves", s.SortMovesBySeq(date)). DistinctOn("accounts_seq, account_address, asset"). Column("accounts_seq", "account_address", "asset"). + ColumnExpr("first_value(account_address_array) over (partition by (accounts_seq, account_address, asset) order by seq desc) as account_address_array"). ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_seq, account_address, asset) order by seq desc) as post_commit_volumes"). Where("ledger = ?", s.ledger.Name) @@ -105,6 +105,7 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ } func (s *Store) insertMoves(ctx context.Context, moves ...Move) error { + _, err := tracer.TraceWithLatency(ctx, "InsertMoves", tracer.NoResult(func(ctx context.Context) error { _, err := s.db.NewInsert(). With("_rows", s.db.NewValues(&moves)). diff --git a/components/ledger/internal/storage/ledger/moves_test.go b/components/ledger/internal/storage/ledger/moves_test.go index 2a6b5986d8..51f6b7b2f4 100644 --- a/components/ledger/internal/storage/ledger/moves_test.go +++ b/components/ledger/internal/storage/ledger/moves_test.go @@ -118,17 +118,20 @@ func TestPostCommitVolumesComputation(t *testing.T) { ctx := logging.TestingContext() wp := pond.New(10, 10) - for i := 0; i < 10; i++ { + for i := 0; i < 1000; i++ { wp.Submit(func() { for { sqlTx, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{}) require.NoError(t, err) - store := store.WithDB(sqlTx) + storeCP := store.WithDB(sqlTx) + + src := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) + dst := fmt.Sprintf("accounts:%d", rand.Intn(1000000)) tx := ledger.NewTransaction().WithPostings( - ledger.NewPosting(fmt.Sprintf("accounts:%d", rand.Intn(10)), fmt.Sprintf("accounts:%d", rand.Intn(5)), "USD", big.NewInt(1)), + ledger.NewPosting(src, dst, "USD", big.NewInt(1)), ) - err = store.CommitTransaction(ctx, &tx) + err = storeCP.CommitTransaction(ctx, &tx) if errors.Is(err, postgres.ErrDeadlockDetected) { require.NoError(t, sqlTx.Rollback()) continue @@ -143,6 +146,7 @@ func TestPostCommitVolumesComputation(t *testing.T) { utils.DumpTables(t, ctx, store.GetDB(), `select * from `+store.GetTableName("moves")+` order by account_address, asset, seq asc limit 500`, + `select * from `+store.GetTableName("transactions"), ) aggregatedBalances, err := store.GetAggregatedBalances(ctx, ledgercontroller.NewGetAggregatedBalancesQuery(ledgercontroller.PITFilter{}, nil, true)) diff --git a/components/ledger/internal/storage/ledger/transactions.go b/components/ledger/internal/storage/ledger/transactions.go index de1b8e657a..347e6afe4e 100644 --- a/components/ledger/internal/storage/ledger/transactions.go +++ b/components/ledger/internal/storage/ledger/transactions.go @@ -375,7 +375,16 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e DestinationsArray: Map(destinations, convertAddrToIndexedJSONB), } - err := s.insertTransaction(ctx, mappedTx) + sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string { + return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", from) + }) + + _, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx) + if err != nil { + return postgres.ResolveError(err) + } + + err = s.insertTransaction(ctx, mappedTx) if err != nil { return errors.Wrap(err, "failed to insert transaction") } @@ -410,7 +419,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e }...) } - for _, address := range moves.InvolvedAccounts() { + for _, address := range tx.InvolvedAccounts() { _, err := s.upsertAccount(ctx, ledger.Account{ Address: address, FirstUsage: *mappedTx.Timestamp, diff --git a/components/ledger/internal/transaction.go b/components/ledger/internal/transaction.go index 314f439f52..5f2e9dd72a 100644 --- a/components/ledger/internal/transaction.go +++ b/components/ledger/internal/transaction.go @@ -115,13 +115,23 @@ func (tx Transaction) InvolvedAccountAndAssets() map[string][]string { for account, assets := range ret { sort.Strings(assets) - slices.Compact(assets) - ret[account] = assets + ret[account] = slices.Compact(assets) } return ret } +func (tx Transaction) InvolvedAccounts() []string { + ret := make([]string, 0) + for _, posting := range tx.Postings { + ret = append(ret, posting.Source, posting.Destination) + } + + sort.Strings(ret) + + return slices.Compact(ret) +} + func NewTransaction() Transaction { return Transaction{ TransactionData: NewTransactionData(), diff --git a/components/ledger/test/e2e/stress_test.go b/components/ledger/test/e2e/stress_test.go index 63f6d5cf65..6373c24ee3 100644 --- a/components/ledger/test/e2e/stress_test.go +++ b/components/ledger/test/e2e/stress_test.go @@ -2,7 +2,7 @@ package test_suite import ( "fmt" - "github.com/davecgh/go-spew/spew" + "github.com/alitto/pond" ledger "github.com/formancehq/ledger/internal" . "github.com/formancehq/ledger/pkg/testserver" "github.com/formancehq/stack/ledger/client/models/components" @@ -14,8 +14,6 @@ import ( . "github.com/onsi/gomega" "math/big" "math/rand" - "sync" - "time" ) var _ = Context("Ledger stress tests", func() { @@ -33,15 +31,16 @@ var _ = Context("Ledger stress tests", func() { }) const ( - countLedgers = 10 - countTransactions = 50 - countAccounts = 10 + countLedgers = 30 + countBuckets = 3 + countTransactions = 500 + countAccounts = 20 ) When(fmt.Sprintf("creating %d ledgers dispatched on %d buckets", countLedgers, countLedgers/10), func() { BeforeEach(func() { for i := range countLedgers { - bucketName := fmt.Sprintf("bucket%d", i/10) + bucketName := fmt.Sprintf("bucket%d", i/countBuckets) ledgerName := fmt.Sprintf("ledger%d", i) err := CreateLedger(ctx, testServer.GetValue(), operations.V2CreateLedgerRequest{ Ledger: ledgerName, @@ -55,12 +54,10 @@ var _ = Context("Ledger stress tests", func() { }) When(fmt.Sprintf("creating %d transactions across the same account pool", countTransactions), func() { BeforeEach(func() { - wg := sync.WaitGroup{} - wg.Add(countTransactions) + wp := pond.New(80, 80) for range countTransactions { - go func() { + wp.Submit(func() { defer GinkgoRecover() - defer wg.Done() _, err := CreateTransaction(ctx, testServer.GetValue(), operations.V2CreateTransactionRequest{ Ledger: fmt.Sprintf("ledger%d", rand.Intn(countLedgers)), @@ -75,13 +72,15 @@ var _ = Context("Ledger stress tests", func() { Force: pointer.For(true), }) Expect(err).ShouldNot(HaveOccurred()) + }) + go func() { + }() } - wg.Wait() + wp.StopAndWait() }) When("getting aggregated volumes with no parameters", func() { - FIt("should be zero", func() { - <-time.After(2 * time.Second) + It("should be zero", func() { for i := range countLedgers { ledger := fmt.Sprintf("ledger%d", i) By("checking ledger "+ledger, func() { @@ -90,19 +89,17 @@ var _ = Context("Ledger stress tests", func() { UseInsertionDate: pointer.For(true), }) Expect(err).To(BeNil()) - Expect(aggregatedBalances).To(HaveLen(1)) - spew.Dump(aggregatedBalances) - - if aggregatedBalances["USD"].Cmp(big.NewInt(0)) != 0 { + if len(aggregatedBalances) == 0 { // it's random, a ledger could not have been targeted + // just in case, check if the ledger has transactions txs, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{ Ledger: ledger, - Pit: pointer.For(time.Now().Add(time.Hour * 24)), }) Expect(err).To(BeNil()) - spew.Dump(txs) + Expect(txs.Data).To(HaveLen(0)) + } else { + Expect(aggregatedBalances).To(HaveLen(1)) + Expect(aggregatedBalances["USD"]).To(Equal(big.NewInt(0))) } - - Expect(aggregatedBalances["USD"]).To(Equal(big.NewInt(0))) }) } })