Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Improve performance of proposal builder by not DB inserting active set if not necessary #6422

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *HandlerV1) storeAtx(
proof *mwire.MalfeasanceProof
malicious bool
)
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
var err error
malicious, err = identities.IsMalicious(tx, atx.SmesherID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if len(watx.marriages) != 0 {
newMarriageID, err := marriage.NewID(tx)
if err != nil {
Expand Down Expand Up @@ -927,7 +927,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
atxs.AtxAdded(h.cdb, atx)

malicious := false
err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// malfeasance check happens after storing the ATX because storing updates the marriage set
// that is needed for the malfeasance proof
// TODO(mafa): don't store own ATX if it would mark the node as malicious
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestTransactionService_StreamResults(t *testing.T) {
gen := fixture.NewTransactionResultGenerator().
WithAddresses(2)
txs := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txs {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/v2alpha1/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransactionService_List(t *testing.T) {

gen := fixture.NewTransactionResultGenerator().WithAddresses(2)
txsList := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txsList {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion blocks/certifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (c *Certifier) save(
if len(valid)+len(invalid) == 0 {
return certificates.Add(c.db, lid, cert)
}
return c.db.WithTx(ctx, func(dbtx sql.Transaction) error {
return c.db.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := certificates.Add(dbtx, lid, cert); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func Recover(
}
defer localDB.Close()
logger.Info("clearing atx and malfeasance sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx sql.Transaction) error {
if err := localDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if err := atxsync.Clear(tx); err != nil {
return err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func RecoverFromLocalFile(
zap.Int("num accounts", len(data.accounts)),
zap.Int("num atxs", len(data.atxs)),
)
if err = newDB.WithTx(ctx, func(tx sql.Transaction) error {
if err = newDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
for _, acct := range data.accounts {
if err = accounts.Update(tx, acct); err != nil {
return fmt.Errorf("restore account snapshot: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/merge-nodes/internal/merge_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error {
}

dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to))
err = dstDB.WithTx(ctx, func(tx sql.Transaction) error {
err = dstDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
enc := func(stmt *sql.Statement) {
stmt.BindText(1, filepath.Join(from, localDbFile))
}
Expand Down
5 changes: 5 additions & 0 deletions common/types/poet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type PoetServer struct {
Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"`
}

func ByteToPoetProofRef(b []byte) (ref PoetProofRef) {
copy(ref[:], b)
return ref
}

type PoetProofRef Hash32

func (r *PoetProofRef) String() string {
Expand Down
6 changes: 2 additions & 4 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,11 @@ func (bs *BlobStore) Has(hint Hint, key []byte) (bool, error) {
case TXDB:
return transactions.Has(bs.DB, types.TransactionID(types.BytesToHash(key)))
case POETDB:
var ref types.PoetProofRef
copy(ref[:], key)
return poets.Has(bs.DB, ref)
return poets.Has(bs.DB, types.ByteToPoetProofRef(key))
case Malfeasance:
return identities.IsMalicious(bs.DB, types.BytesToNodeID(key))
case ActiveSet:
return activesets.Has(bs.DB, key)
return activesets.Has(bs.DB, types.BytesToHash(key))
}
return false, fmt.Errorf("blob store not found %s", hint)
}
2 changes: 1 addition & 1 deletion malfeasance/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof)
return types.EmptyNodeID, errors.Join(err, pubsub.ErrValidationReject)
}
proofBytes := codec.MustEncode(p)
if err := h.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
malicious, err := identities.IsMalicious(dbtx, nodeID)
if err != nil {
return fmt.Errorf("check known malicious: %w", err)
Expand Down
12 changes: 3 additions & 9 deletions mesh/ballotwriter/ballotwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var writerDelay = 100 * time.Millisecond

type BallotWriter struct {
db db
db sql.StateDatabase
logger *zap.Logger

atxMu sync.Mutex
Expand All @@ -30,7 +30,7 @@ type BallotWriter struct {
ballotBatchResult *batchResult
}

func New(db db, logger *zap.Logger) *BallotWriter {
func New(db sql.StateDatabase, logger *zap.Logger) *BallotWriter {
// create a stopped ticker that can be started later
timer := time.NewTicker(writerDelay)
timer.Stop()
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *BallotWriter) Start(ctx context.Context) {
// we use a context.Background() because: on shutdown the canceling of the
// context may exit the transaction halfway and leave the db in some state where it
// causes crawshaw to panic on a "not all connections returned to pool".
if err := w.db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := w.db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
for _, ballot := range batch {
if !ballot.IsMalicious() {
layerBallotStart := time.Now()
Expand Down Expand Up @@ -163,9 +163,3 @@ type batchResult struct {
doneC chan struct{}
err error
}

type db interface {
sql.Executor

WithTx(context.Context, func(sql.Transaction) error) error
}
6 changes: 3 additions & 3 deletions mesh/ballotwriter/ballotwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
if err := writeFn(a[i], tx); err != nil {
b.Fatal(err)
}
Expand All @@ -138,7 +138,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/1000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 1000); i < (j*1000)+1000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand All @@ -156,7 +156,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/5000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 5000); i < (j*5000)+5000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMesh(
}

genesis := types.GetEffectiveGenesis()
if err = db.WithTx(context.Background(), func(dbtx sql.Transaction) error {
if err = db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error {
if err = layers.SetProcessed(dbtx, genesis); err != nil {
return fmt.Errorf("mesh init: %w", err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error
return fmt.Errorf("execute block %v/%v: %w", layer.Layer, target, err)
}
}
if err := msh.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := msh.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := layers.SetApplied(dbtx, layer.Layer, target); err != nil {
return fmt.Errorf("set applied for %v/%v: %w", layer.Layer, target, err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (msh *Mesh) saveHareOutput(ctx context.Context, lid types.LayerID, bid type
certs []certificates.CertValidity
err error
)
if err = msh.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err = msh.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// check if a certificate has been generated or sync'ed.
// - node generated the certificate when it collected enough certify messages
// - hare outputs are processed in layer order. i.e. when hare fails for a previous layer N,
Expand Down
51 changes: 26 additions & 25 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
logger *zap.Logger
cfg config

db sql.Executor
db sql.StateDatabase
localdb sql.Executor
atxsdata atxsData
clock layerClock
Expand Down Expand Up @@ -204,7 +204,7 @@
}
}

// WithWorkersLimit configures paralelization factor for builder operation when working with
// WithWorkersLimit configures parallelization factor for builder operation when working with
// more than one signer.
func WithWorkersLimit(limit int) Opt {
return func(pb *ProposalBuilder) {
Expand Down Expand Up @@ -270,7 +270,7 @@
// New creates a struct of block builder type.
func New(
clock layerClock,
db sql.Executor,
db sql.StateDatabase,
localdb sql.Executor,
atxsdata atxsData,
publisher pubsub.Publisher,
Expand Down Expand Up @@ -449,7 +449,7 @@
pb.activeGen.updateFallback(target, set)
}

func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.LayerID) error {
if pb.shared.epoch != current.GetEpoch() {
pb.shared = sharedSession{epoch: current.GetEpoch()}
}
Expand All @@ -476,7 +476,27 @@
pb.shared.active.id = id
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil

// Ideally we only persist the active set when we are actually eligible with at least one identity in at least one
// layer, but since at the moment we use a bootstrapped activeset, `activesets.Has` will always return
// true anyways.
//
// Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but
// maybe we should revisit this when activesets are no longer bootstrapped.
return pb.db.WithTx(ctx, func(tx sql.Transaction) error {
yes, err := activesets.Has(tx, pb.shared.active.id)
if err != nil {
return err
}

Check warning on line 490 in miner/proposal_builder.go

View check run for this annotation

Codecov / codecov/patch

miner/proposal_builder.go#L489-L490

Added lines #L489 - L490 were not covered by tests
if yes {
return nil
}

return activesets.Add(tx, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
})
}

func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) error {
Expand Down Expand Up @@ -548,7 +568,7 @@

func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
buildStartTime := time.Now()
if err := pb.initSharedData(lid); err != nil {
if err := pb.initSharedData(ctx, lid); err != nil {
return err
}

Expand Down Expand Up @@ -578,17 +598,6 @@
return meshHash
})

persistActiveSetOnce := sync.OnceValue(func() error {
err := activesets.Add(pb.db, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return err
}
return nil
})

// Two stage pipeline, with the stages running in parallel.
// 1. Initializes signers. Runs limited number of goroutines because the initialization is CPU and DB bound.
// 2. Collects eligible signers' sessions from the stage 1 and creates and publishes proposals.
Expand Down Expand Up @@ -662,14 +671,6 @@
ss.latency.hash = time.Since(start)

eg2.Go(func() error {
// needs to be saved before publishing, as we will query it in handler
if ss.session.ref == types.EmptyBallotID {
start := time.Now()
if err := persistActiveSetOnce(); err != nil {
return err
}
ss.latency.activeSet = time.Since(start)
}
proofs := ss.session.eligibilities.proofs[lid]

start = time.Now()
Expand Down
17 changes: 0 additions & 17 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package miner
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -1272,19 +1271,3 @@ func BenchmarkDoubleCache(b *testing.B) {

require.Equal(b, types.EmptyATXID, found)
}

func BenchmarkDB(b *testing.B) {
db, err := statesql.Open("file:state.sql")
require.NoError(b, err)
defer db.Close()

bytes, err := hex.DecodeString("00003ce28800fadd692c522f7b1db219f675b49108aec7f818e2c4fd935573f6")
require.NoError(b, err)
nodeID := types.BytesToNodeID(bytes)
var found types.ATXID
b.ResetTimer()
for i := 0; i < b.N; i++ {
found, _ = atxs.GetByEpochAndNodeID(db, 30, nodeID)
}
require.NotEqual(b, types.EmptyATXID, found)
}
13 changes: 5 additions & 8 deletions sql/activesets/activesets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func Add(db sql.Executor, id types.Hash32, set *types.EpochActiveSet) error {
(id, epoch, active_set)
values (?1, ?2, ?3);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, id[:])
stmt.BindBytes(1, id.Bytes())
stmt.BindInt64(2, int64(set.Epoch))
stmt.BindBytes(3, codec.MustEncode(set))
}, nil)
Expand Down Expand Up @@ -100,9 +100,7 @@ func getBlob(ctx context.Context, db sql.Executor, id []byte) ([]byte, error) {

func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
_, err := db.Exec("delete from activesets where epoch < ?1;",
func(stmt *sql.Statement) {
stmt.BindInt64(1, int64(epoch))
},
func(stmt *sql.Statement) { stmt.BindInt64(1, int64(epoch)) },
nil,
)
if err != nil {
Expand All @@ -111,10 +109,9 @@ func DeleteBeforeEpoch(db sql.Executor, epoch types.EpochID) error {
return nil
}

func Has(db sql.Executor, id []byte) (bool, error) {
rows, err := db.Exec(
"select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id) },
func Has(db sql.Executor, id types.Hash32) (bool, error) {
rows, err := db.Exec("select 1 from activesets where id = ?1;",
func(stmt *sql.Statement) { stmt.BindBytes(1, id.Bytes()) },
nil,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.Equal(t, 11, db.QueryCount())
}

require.NoError(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx5, types.AtxBlob{})
return nil
}))
Expand All @@ -445,7 +445,7 @@ func TestGetIDsByEpochCached(t *testing.T) {
require.ElementsMatch(t, []types.ATXID{atx4.ID(), atx5.ID()}, ids3)
require.Equal(t, 13, db.QueryCount()) // not incremented after Add

require.Error(t, db.WithTx(context.Background(), func(tx sql.Transaction) error {
require.Error(t, db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
atxs.Add(tx, atx6, types.AtxBlob{})
return errors.New("fail") // rollback
}))
Expand Down
Loading
Loading