Skip to content

Commit

Permalink
fix: concurrency issue
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Sep 20, 2024
1 parent bd66dde commit 941532e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ create function "{{.Bucket}}".set_volumes()
as
$$
begin
--todo: debugging
perform pg_advisory_xact_lock(new.accounts_seq);

new.post_commit_volumes = coalesce((
select (
(post_commit_volumes).inputs + case when new.is_source then 0 else new.amount end,
Expand Down
2 changes: 1 addition & 1 deletion components/ledger/internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *Store) SelectAggregatedBalances(date *time.Time, useInsertionDate bool,
}
selectMoves = s.db.NewSelect().
TableExpr("(?) moves", s.SelectDistinctMovesBySeq(date)).
Column("asset", "account_address").
Column("asset", "account_address", "account_address_array").
ColumnExpr("post_commit_volumes as volumes")
} else {
if !s.ledger.HasFeature(ledger.FeaturePostCommitEffectiveVolumes, "SYNC") {
Expand Down
7 changes: 4 additions & 3 deletions components/ledger/internal/storage/ledger/moves.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package ledger

import (
"context"
"math/big"
"slices"

"github.com/formancehq/ledger/internal/opentelemetry/tracer"
"github.com/formancehq/stack/libs/go-libs/bun/bunpaginate"
"github.com/formancehq/stack/libs/go-libs/collectionutils"
"github.com/formancehq/stack/libs/go-libs/platform/postgres"
"github.com/formancehq/stack/libs/go-libs/time"
"github.com/uptrace/bun"
"math/big"
"slices"
)

type Move struct {
Expand Down Expand Up @@ -77,6 +76,7 @@ func (s *Store) SelectDistinctMovesBySeq(date *time.Time) *bun.SelectQuery {
TableExpr("(?) moves", s.SortMovesBySeq(date)).
DistinctOn("accounts_seq, account_address, asset").
Column("accounts_seq", "account_address", "asset").
ColumnExpr("first_value(account_address_array) over (partition by (accounts_seq, account_address, asset) order by seq desc) as account_address_array").
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_seq, account_address, asset) order by seq desc) as post_commit_volumes").
Where("ledger = ?", s.ledger.Name)

Expand Down Expand Up @@ -105,6 +105,7 @@ func (s *Store) SelectDistinctMovesByEffectiveDate(date *time.Time) *bun.SelectQ
}

func (s *Store) insertMoves(ctx context.Context, moves ...Move) error {

_, err := tracer.TraceWithLatency(ctx, "InsertMoves", tracer.NoResult(func(ctx context.Context) error {
_, err := s.db.NewInsert().
With("_rows", s.db.NewValues(&moves)).
Expand Down
12 changes: 8 additions & 4 deletions components/ledger/internal/storage/ledger/moves_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,20 @@ func TestPostCommitVolumesComputation(t *testing.T) {
ctx := logging.TestingContext()

wp := pond.New(10, 10)
for i := 0; i < 10; i++ {
for i := 0; i < 1000; i++ {
wp.Submit(func() {
for {
sqlTx, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{})
require.NoError(t, err)
store := store.WithDB(sqlTx)
storeCP := store.WithDB(sqlTx)

src := fmt.Sprintf("accounts:%d", rand.Intn(1000000))
dst := fmt.Sprintf("accounts:%d", rand.Intn(1000000))

tx := ledger.NewTransaction().WithPostings(
ledger.NewPosting(fmt.Sprintf("accounts:%d", rand.Intn(10)), fmt.Sprintf("accounts:%d", rand.Intn(5)), "USD", big.NewInt(1)),
ledger.NewPosting(src, dst, "USD", big.NewInt(1)),
)
err = store.CommitTransaction(ctx, &tx)
err = storeCP.CommitTransaction(ctx, &tx)
if errors.Is(err, postgres.ErrDeadlockDetected) {
require.NoError(t, sqlTx.Rollback())
continue
Expand All @@ -143,6 +146,7 @@ func TestPostCommitVolumesComputation(t *testing.T) {

utils.DumpTables(t, ctx, store.GetDB(),
`select * from `+store.GetTableName("moves")+` order by account_address, asset, seq asc limit 500`,
`select * from `+store.GetTableName("transactions"),
)

aggregatedBalances, err := store.GetAggregatedBalances(ctx, ledgercontroller.NewGetAggregatedBalancesQuery(ledgercontroller.PITFilter{}, nil, true))
Expand Down
13 changes: 11 additions & 2 deletions components/ledger/internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,16 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
DestinationsArray: Map(destinations, convertAddrToIndexedJSONB),
}

err := s.insertTransaction(ctx, mappedTx)
sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string {
return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", from)
})

_, err := s.db.NewRaw(strings.Join(sqlQueries, ";")).Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}

err = s.insertTransaction(ctx, mappedTx)
if err != nil {
return errors.Wrap(err, "failed to insert transaction")
}
Expand Down Expand Up @@ -410,7 +419,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
}...)
}

for _, address := range moves.InvolvedAccounts() {
for _, address := range tx.InvolvedAccounts() {
_, err := s.upsertAccount(ctx, ledger.Account{
Address: address,
FirstUsage: *mappedTx.Timestamp,
Expand Down
14 changes: 12 additions & 2 deletions components/ledger/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,23 @@ func (tx Transaction) InvolvedAccountAndAssets() map[string][]string {

for account, assets := range ret {
sort.Strings(assets)
slices.Compact(assets)
ret[account] = assets
ret[account] = slices.Compact(assets)
}

return ret
}

func (tx Transaction) InvolvedAccounts() []string {
ret := make([]string, 0)
for _, posting := range tx.Postings {
ret = append(ret, posting.Source, posting.Destination)
}

sort.Strings(ret)

return slices.Compact(ret)
}

func NewTransaction() Transaction {
return Transaction{
TransactionData: NewTransactionData(),
Expand Down
41 changes: 19 additions & 22 deletions components/ledger/test/e2e/stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package test_suite

import (
"fmt"
"github.com/davecgh/go-spew/spew"
"github.com/alitto/pond"
ledger "github.com/formancehq/ledger/internal"
. "github.com/formancehq/ledger/pkg/testserver"
"github.com/formancehq/stack/ledger/client/models/components"
Expand All @@ -14,8 +14,6 @@ import (
. "github.com/onsi/gomega"
"math/big"
"math/rand"
"sync"
"time"
)

var _ = Context("Ledger stress tests", func() {
Expand All @@ -33,15 +31,16 @@ var _ = Context("Ledger stress tests", func() {
})

const (
countLedgers = 10
countTransactions = 50
countAccounts = 10
countLedgers = 30
countBuckets = 3
countTransactions = 500
countAccounts = 20
)

When(fmt.Sprintf("creating %d ledgers dispatched on %d buckets", countLedgers, countLedgers/10), func() {
BeforeEach(func() {
for i := range countLedgers {
bucketName := fmt.Sprintf("bucket%d", i/10)
bucketName := fmt.Sprintf("bucket%d", i/countBuckets)
ledgerName := fmt.Sprintf("ledger%d", i)
err := CreateLedger(ctx, testServer.GetValue(), operations.V2CreateLedgerRequest{
Ledger: ledgerName,
Expand All @@ -55,12 +54,10 @@ var _ = Context("Ledger stress tests", func() {
})
When(fmt.Sprintf("creating %d transactions across the same account pool", countTransactions), func() {
BeforeEach(func() {
wg := sync.WaitGroup{}
wg.Add(countTransactions)
wp := pond.New(80, 80)
for range countTransactions {
go func() {
wp.Submit(func() {
defer GinkgoRecover()
defer wg.Done()

_, err := CreateTransaction(ctx, testServer.GetValue(), operations.V2CreateTransactionRequest{
Ledger: fmt.Sprintf("ledger%d", rand.Intn(countLedgers)),
Expand All @@ -75,13 +72,15 @@ var _ = Context("Ledger stress tests", func() {
Force: pointer.For(true),
})
Expect(err).ShouldNot(HaveOccurred())
})
go func() {

}()
}
wg.Wait()
wp.StopAndWait()
})
When("getting aggregated volumes with no parameters", func() {
FIt("should be zero", func() {
<-time.After(2 * time.Second)
It("should be zero", func() {
for i := range countLedgers {
ledger := fmt.Sprintf("ledger%d", i)
By("checking ledger "+ledger, func() {
Expand All @@ -90,19 +89,17 @@ var _ = Context("Ledger stress tests", func() {
UseInsertionDate: pointer.For(true),
})
Expect(err).To(BeNil())
Expect(aggregatedBalances).To(HaveLen(1))
spew.Dump(aggregatedBalances)

if aggregatedBalances["USD"].Cmp(big.NewInt(0)) != 0 {
if len(aggregatedBalances) == 0 { // it's random, a ledger could not have been targeted
// just in case, check if the ledger has transactions
txs, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{
Ledger: ledger,
Pit: pointer.For(time.Now().Add(time.Hour * 24)),
})
Expect(err).To(BeNil())
spew.Dump(txs)
Expect(txs.Data).To(HaveLen(0))
} else {
Expect(aggregatedBalances).To(HaveLen(1))
Expect(aggregatedBalances["USD"]).To(Equal(big.NewInt(0)))
}

Expect(aggregatedBalances["USD"]).To(Equal(big.NewInt(0)))
})
}
})
Expand Down

0 comments on commit 941532e

Please sign in to comment.