Skip to content

Commit

Permalink
fix: hash consistency over high concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Sep 20, 2024
1 parent de57530 commit 6015146
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 45 deletions.
1 change: 1 addition & 0 deletions components/ledger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/ericlagergren/decimal v0.0.0-20221120152707-495c53812d05 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/formancehq/formance-sdk-go/v2 v2.3.1 // indirect
github.com/go-chi/chi v4.0.2+incompatible // indirect
github.com/go-chi/render v1.0.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
Expand Down
12 changes: 12 additions & 0 deletions components/ledger/internal/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (l Log) ChainLog(previous *Log) Log {
ret.ComputeHash(previous)
if previous != nil {
ret.ID = previous.ID + 1
} else {
ret.ID = 1
}
return ret
}
Expand Down Expand Up @@ -267,3 +269,13 @@ func HydrateLog(_type LogType, data []byte) (any, error) {
}

type Accounts map[string]Account

/**
test:
226a7450424330506a507341663049534d49433375506c557a37645a7a445666346e64715370617078556c773d220a7b2274797065223a224e45575f5452414e53414354494f4e222c2264617461223a7b227472616e73616374696f6e223a7b22706f7374696e6773223a6e756c6c2c226d65746164617461223a7b7d2c2274696d657374616d70223a22303030312d30312d30315430303a30303a30305a222c22696e7365727465644174223a22303030312d30312d30315430303a30303a30305a222c226964223a302c227265766572746564223a66616c73657d2c226163636f756e744d65746164617461223a7b7d7d2c2264617465223a22323032342d30392d32305431303a33373a31322e3933303931365a222c226964656d706f74656e63794b6579223a22222c226964223a302c2268617368223a6e756c6c7d0a
Db logs:
\x228ed3c10b43e33ec01fd0848c202dee3e5533edd6730d57f89dda92a5aa71525c227b2274797065223a224e45575f5452414e53414354494f4e222c2264617461223a7b227472616e73616374696f6e223a7b22706f7374696e6773223a6e756c6c2c226d65746164617461223a7b7d2c2274696d657374616d70223a22303030312d30312d30315430303a30303a30305a222c22696e7365727465644174223a22303030312d30312d30315430303a30303a30305a222c226964223a302c227265766572746564223a66616c73657d2c226163636f756e744d65746164617461223a7b7d7d2c2264617465223a22323032342d30392d32305431303a33373a31322e3933303931365a222c226964656d706f74656e63794b6579223a22222c226964223a302c2268617368223a6e756c6c7d0a
*/
Original file line number Diff line number Diff line change
Expand Up @@ -183,27 +183,31 @@ declare
previousHash bytea;
marshalledAsJSON varchar;
begin
-- we lock logs table as we need than the last log does not change until the transaction commit
execute pg_advisory_xact_lock(hashtext(new.ledger));

select hash into previousHash
from "{{.Bucket}}".logs
where ledger = '{{.Name}}'
where ledger = new.ledger
order by seq desc
limit 1;

-- select only fields participating in the hash on the backend and format json representation the same way
select public.json_compact(json_build_object(
'type', new.type,
'data', new.data,
'date', to_json(new.date::timestamp)#>>'{}' || 'Z',
'idempotencyKey', coalesce(new.idempotency_key, ''),
'id', 0,
'hash', null
)) into marshalledAsJSON;

new.hash = case when previousHash is null then marshalledAsJSON::bytea else previousHash || marshalledAsJSON::bytea end;
new.hash = (select public.digest(marshalledAsJSON || E'\n', 'sha256'::text));
'type', new.type,
'data', new.data,
'date', to_json(new.date::timestamp)#>>'{}' || 'Z',
'idempotencyKey', coalesce(new.idempotency_key, ''),
'id', 0,
'hash', null
)) into marshalledAsJSON;

new.hash = (
select public.digest(
case
when previousHash is null
then marshalledAsJSON::bytea
else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || marshalledAsJSON::bytea
end || E'\n', 'sha256'::text
)
);

return new;
end;
Expand Down
8 changes: 8 additions & 0 deletions components/ledger/internal/storage/ledger/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ func (j RawMessage) Value() (driver.Value, error) {

func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error {

// we lock logs table as we need than the last log does not change until the transaction commit
if s.ledger.HasFeature(ledger.FeatureHashLogs, "SYNC") {
_, err := s.db.NewRaw(`select pg_advisory_xact_lock(hashtext(?))`, s.ledger.Name).Exec(ctx)
if err != nil {
return postgres.ResolveError(err)
}
}

_, err := tracer.TraceWithLatency(ctx, "InsertLog", tracer.NoResult(func(ctx context.Context) error {
data, err := json.Marshal(log.Data)
if err != nil {
Expand Down
114 changes: 83 additions & 31 deletions components/ledger/internal/storage/ledger/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package ledger

import (
"context"
"database/sql"
"github.com/alitto/pond"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"
"math/big"
Expand All @@ -26,41 +28,91 @@ func TestInsertLog(t *testing.T) {
store := newLedgerStore(t)
ctx := logging.TestingContext()

// insert a first tx (we don't have any previous hash to use at this moment)
logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
logTxCopy := logTx
t.Run("check hash against core", func(t *testing.T) {
// insert a first tx (we don't have any previous hash to use at this moment)
log1 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log1Copy := log1

err := store.InsertLog(ctx, &logTx)
require.NoError(t, err)
err := store.InsertLog(ctx, &log1)
require.NoError(t, err)

require.NotZero(t, logTx.ID)
require.NotZero(t, logTx.Hash)
require.Equal(t, 1, log1.ID)
require.NotZero(t, log1.Hash)

// ensure than the database hashing is the same as the go hashing
chainedLogFromCore := logTxCopy.ChainLog(nil)
require.Equal(t, chainedLogFromCore.Hash, logTx.Hash)
// ensure than the database hashing is the same as the go hashing
chainedLog1 := log1Copy.ChainLog(nil)
require.Equal(t, chainedLog1.Hash, log1.Hash)

// insert a new log to test the hash when a previous hash exists
// adding an idempotency key to check for conflicts
logTx = ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
WithIdempotencyKey("foo")
logTxCopy = logTx
err = store.InsertLog(ctx, &logTx)
require.NoError(t, err)
require.NotZero(t, logTx.ID)
require.NotZero(t, logTx.Hash)

// ensure than the database hashing is the same as the go hashing
chainedLogFromCore = logTxCopy.ChainLog(nil)
require.Equal(t, chainedLogFromCore.Hash, logTx.Hash)

// create a new logs with the same IK as previous
// it should fail
logTx = ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
WithIdempotencyKey("foo")
err = store.InsertLog(ctx, &logTx)
require.Error(t, err)
require.True(t, errors.Is(err, ledgercontroller.ErrDuplicateIK{}))
// insert a new log to test the hash when a previous hash exists
// adding an idempotency key to check for conflicts
log2 := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
log2Copy := log2
err = store.InsertLog(ctx, &log2)
require.NoError(t, err)
require.Equal(t, 2, log2.ID)
require.NotZero(t, log2.Hash)

// ensure than the database hashing is the same as the go hashing
chainedLog2 := log2Copy.ChainLog(&log1)
require.Equal(t, chainedLog2.Hash, log2.Hash)
})

t.Run("duplicate IK", func(t *testing.T) {
// insert a first tx (we don't have any previous hash to use at this moment)
logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
WithIdempotencyKey("foo")

err := store.InsertLog(ctx, &logTx)
require.NoError(t, err)

require.NotZero(t, logTx.ID)
require.NotZero(t, logTx.Hash)

// create a new logs with the same IK as previous
// it should fail
logTx = ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{}).
WithIdempotencyKey("foo")
err = store.InsertLog(ctx, &logTx)
require.Error(t, err)
require.True(t, errors.Is(err, ledgercontroller.ErrDuplicateIK{}))
})

t.Run("hash consistency over high concurrency", func(t *testing.T) {
wp := pond.New(10, 10)
const countLogs = 100
for range countLogs {
wp.Submit(func() {
tx, err := store.db.BeginTx(ctx, &sql.TxOptions{})
require.NoError(t, err)
defer func() {
_ = tx.Rollback()
}()
store := store.WithDB(tx)

logTx := ledger.NewTransactionLog(ledger.NewTransaction(), map[string]metadata.Metadata{})
err = store.InsertLog(ctx, &logTx)
require.NoError(t, err)
require.NoError(t, tx.Commit())
})
}
wp.StopAndWait()

logs, err := store.GetLogs(ctx, ledgercontroller.NewGetLogsQuery(ledgercontroller.PaginatedQueryOptions[any]{
PageSize: countLogs,
}).WithOrder(bunpaginate.OrderAsc))
require.NoError(t, err)

var previous *ledger.Log
for _, log := range logs.Data {
expectedHash := log.Hash
expectedID := log.ID
log.Hash = nil
log.ID = 0
chainedLog := log.ChainLog(previous)
require.Equal(t, expectedHash, chainedLog.Hash, "checking log hash %d", expectedID)
previous = &chainedLog
}
})
}

func TestReadLogWithIdempotencyKey(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions components/ledger/internal/storage/ledger/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (s *Store) CommitTransaction(ctx context.Context, tx *ledger.Transaction) e
DestinationsArray: Map(destinations, convertAddrToIndexedJSONB),
}

// todo: can cause conflict with accounts on other ledgers, we need to use a stronger key than the account address
sqlQueries := Map(tx.InvolvedAccounts(), func(from string) string {
return fmt.Sprintf("select pg_advisory_xact_lock(hashtext('%s'))", from)
})
Expand Down
1 change: 1 addition & 0 deletions libs/go-libs/testing/platform/pgtesting/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func CreatePostgresServer(t T, pool *docker.Pool, opts ...Option) *Server {
"-c", "enable_partitionwise_aggregate=on",
"-c", "shared_preload_libraries=auto_explain,pg_stat_statements",
"-c", "log_lock_waits=on",
"-c", "log_min_messages=info",
},
},
CheckFn: func(ctx context.Context, resource *dockertest.Resource) error {
Expand Down

0 comments on commit 6015146

Please sign in to comment.