Skip to content

Commit

Permalink
Merge branch 'node-split-poc' into node-split/node-service
Browse files Browse the repository at this point in the history
  • Loading branch information
acud authored Oct 30, 2024
2 parents 98cac64 + 55e311b commit 4d6b25a
Show file tree
Hide file tree
Showing 30 changed files with 112 additions and 132 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ See [RELEASE](./RELEASE.md) for workflow instructions.
* [#6417](https://github.com/spacemeshos/go-spacemesh/pull/6417) Fix initial post being deleted when the node is
restarted or times out before the first ATX is published.

* [#6422](https://github.com/spacemeshos/go-spacemesh/pull/6422) Further improved performance of the proposal building
process to avoid late proposals.

## v1.7.6

### Upgrade information
Expand Down
2 changes: 1 addition & 1 deletion activation/handler_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *HandlerV1) storeAtx(
proof *mwire.MalfeasanceProof
malicious bool
)
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
var err error
malicious, err = identities.IsMalicious(tx, atx.SmesherID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions activation/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ func (h *HandlerV2) checkPrevAtx(ctx context.Context, tx sql.Transaction, atx *a

// Store an ATX in the DB.
func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx *activationTx) error {
if err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if len(watx.marriages) != 0 {
newMarriageID, err := marriage.NewID(tx)
if err != nil {
Expand Down Expand Up @@ -927,7 +927,7 @@ func (h *HandlerV2) storeAtx(ctx context.Context, atx *types.ActivationTx, watx
atxs.AtxAdded(h.cdb, atx)

malicious := false
err := h.cdb.WithTx(ctx, func(tx sql.Transaction) error {
err := h.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// malfeasance check happens after storing the ATX because storing updates the marriage set
// that is needed for the malfeasance proof
// TODO(mafa): don't store own ATX if it would mark the node as malicious
Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/transaction_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestTransactionService_StreamResults(t *testing.T) {
gen := fixture.NewTransactionResultGenerator().
WithAddresses(2)
txs := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txs {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion api/grpcserver/v2alpha1/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransactionService_List(t *testing.T) {

gen := fixture.NewTransactionResultGenerator().WithAddresses(2)
txsList := make([]types.TransactionWithResult, 100)
require.NoError(t, db.WithTx(ctx, func(dtx sql.Transaction) error {
require.NoError(t, db.WithTxImmediate(ctx, func(dtx sql.Transaction) error {
for i := range txsList {
tx := gen.Next()

Expand Down
2 changes: 1 addition & 1 deletion blocks/certifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (c *Certifier) save(
if len(valid)+len(invalid) == 0 {
return certificates.Add(c.db, lid, cert)
}
return c.db.WithTx(ctx, func(dbtx sql.Transaction) error {
return c.db.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := certificates.Add(dbtx, lid, cert); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion blocks/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func createTestGenerator(tb testing.TB) *testGenerator {
tg.mockPatrol,
WithGeneratorLogger(lg),
WithHareOutputChan(ch),
WithConfig(testConfig()))
WithConfig(testConfig()),
)
return tg
}

Expand Down
4 changes: 2 additions & 2 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func Recover(
}
defer localDB.Close()
logger.Info("clearing atx and malfeasance sync metadata from local database")
if err := localDB.WithTx(ctx, func(tx sql.Transaction) error {
if err := localDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
if err := atxsync.Clear(tx); err != nil {
return err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func RecoverFromLocalFile(
zap.Int("num accounts", len(data.accounts)),
zap.Int("num atxs", len(data.atxs)),
)
if err = newDB.WithTx(ctx, func(tx sql.Transaction) error {
if err = newDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
for _, acct := range data.accounts {
if err = accounts.Update(tx, acct); err != nil {
return fmt.Errorf("restore account snapshot: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/merge-nodes/internal/merge_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func MergeDBs(ctx context.Context, dbLog *zap.Logger, from, to string) error {
}

dbLog.Info("merging databases", zap.String("from", from), zap.String("to", to))
err = dstDB.WithTx(ctx, func(tx sql.Transaction) error {
err = dstDB.WithTxImmediate(ctx, func(tx sql.Transaction) error {
enc := func(stmt *sql.Statement) {
stmt.BindText(1, filepath.Join(from, localDbFile))
}
Expand Down
5 changes: 5 additions & 0 deletions common/types/poet.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type PoetServer struct {
Pubkey Base64Enc `mapstructure:"pubkey" json:"pubkey"`
}

func ByteToPoetProofRef(b []byte) (ref PoetProofRef) {
copy(ref[:], b)
return ref
}

type PoetProofRef Hash32

func (r *PoetProofRef) String() string {
Expand Down
6 changes: 2 additions & 4 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,11 @@ func (bs *BlobStore) Has(hint Hint, key []byte) (bool, error) {
case TXDB:
return transactions.Has(bs.DB, types.TransactionID(types.BytesToHash(key)))
case POETDB:
var ref types.PoetProofRef
copy(ref[:], key)
return poets.Has(bs.DB, ref)
return poets.Has(bs.DB, types.ByteToPoetProofRef(key))
case Malfeasance:
return identities.IsMalicious(bs.DB, types.BytesToNodeID(key))
case ActiveSet:
return activesets.Has(bs.DB, key)
return activesets.Has(bs.DB, types.BytesToHash(key))
}
return false, fmt.Errorf("blob store not found %s", hint)
}
2 changes: 1 addition & 1 deletion malfeasance/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handler) validateAndSave(ctx context.Context, p *wire.MalfeasanceProof)
return types.EmptyNodeID, errors.Join(err, pubsub.ErrValidationReject)
}
proofBytes := codec.MustEncode(p)
if err := h.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := h.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
malicious, err := identities.IsMalicious(dbtx, nodeID)
if err != nil {
return fmt.Errorf("check known malicious: %w", err)
Expand Down
12 changes: 3 additions & 9 deletions mesh/ballotwriter/ballotwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
var writerDelay = 100 * time.Millisecond

type BallotWriter struct {
db db
db sql.StateDatabase
logger *zap.Logger

atxMu sync.Mutex
Expand All @@ -30,7 +30,7 @@ type BallotWriter struct {
ballotBatchResult *batchResult
}

func New(db db, logger *zap.Logger) *BallotWriter {
func New(db sql.StateDatabase, logger *zap.Logger) *BallotWriter {
// create a stopped ticker that can be started later
timer := time.NewTicker(writerDelay)
timer.Stop()
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *BallotWriter) Start(ctx context.Context) {
// we use a context.Background() because: on shutdown the canceling of the
// context may exit the transaction halfway and leave the db in some state where it
// causes crawshaw to panic on a "not all connections returned to pool".
if err := w.db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := w.db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
for _, ballot := range batch {
if !ballot.IsMalicious() {
layerBallotStart := time.Now()
Expand Down Expand Up @@ -163,9 +163,3 @@ type batchResult struct {
doneC chan struct{}
err error
}

type db interface {
sql.Executor

WithTx(context.Context, func(sql.Transaction) error) error
}
6 changes: 3 additions & 3 deletions mesh/ballotwriter/ballotwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
if err := writeFn(a[i], tx); err != nil {
b.Fatal(err)
}
Expand All @@ -138,7 +138,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/1000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 1000); i < (j*1000)+1000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand All @@ -156,7 +156,7 @@ func BenchmarkWriteCoalescing(b *testing.B) {
db := newDiskSqlite(b)
b.ResetTimer()
for j := 0; j < b.N/5000; j++ {
if err := db.WithTx(context.Background(), func(tx sql.Transaction) error {
if err := db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
var err error
for i := (j * 5000); i < (j*5000)+5000; i++ {
if err = writeFn(a[i], tx); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewMesh(
}

genesis := types.GetEffectiveGenesis()
if err = db.WithTx(context.Background(), func(dbtx sql.Transaction) error {
if err = db.WithTxImmediate(context.Background(), func(dbtx sql.Transaction) error {
if err = layers.SetProcessed(dbtx, genesis); err != nil {
return fmt.Errorf("mesh init: %w", err)
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error
return fmt.Errorf("execute block %v/%v: %w", layer.Layer, target, err)
}
}
if err := msh.cdb.WithTx(ctx, func(dbtx sql.Transaction) error {
if err := msh.cdb.WithTxImmediate(ctx, func(dbtx sql.Transaction) error {
if err := layers.SetApplied(dbtx, layer.Layer, target); err != nil {
return fmt.Errorf("set applied for %v/%v: %w", layer.Layer, target, err)
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func (msh *Mesh) saveHareOutput(ctx context.Context, lid types.LayerID, bid type
certs []certificates.CertValidity
err error
)
if err = msh.cdb.WithTx(ctx, func(tx sql.Transaction) error {
if err = msh.cdb.WithTxImmediate(ctx, func(tx sql.Transaction) error {
// check if a certificate has been generated or sync'ed.
// - node generated the certificate when it collected enough certify messages
// - hare outputs are processed in layer order. i.e. when hare fails for a previous layer N,
Expand Down
12 changes: 5 additions & 7 deletions miner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ type latencyTracker struct {
start time.Time
end time.Time

data time.Duration
tortoise time.Duration
hash time.Duration
activeSet time.Duration
txs time.Duration
publish time.Duration
data time.Duration
tortoise time.Duration
hash time.Duration
txs time.Duration
publish time.Duration
}

func (lt *latencyTracker) total() time.Duration {
Expand All @@ -35,7 +34,6 @@ func (lt *latencyTracker) total() time.Duration {

func (lt *latencyTracker) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddDuration("data", lt.data)
encoder.AddDuration("active set", lt.activeSet)
encoder.AddDuration("tortoise", lt.tortoise)
encoder.AddDuration("hash", lt.hash)
encoder.AddDuration("txs", lt.txs)
Expand Down
51 changes: 26 additions & 25 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type ProposalBuilder struct {
logger *zap.Logger
cfg config

db sql.Executor
db sql.StateDatabase
localdb sql.Executor
atxsdata atxsData
clock layerClock
Expand Down Expand Up @@ -204,7 +204,7 @@ func WithLayerSize(size uint32) Opt {
}
}

// WithWorkersLimit configures paralelization factor for builder operation when working with
// WithWorkersLimit configures parallelization factor for builder operation when working with
// more than one signer.
func WithWorkersLimit(limit int) Opt {
return func(pb *ProposalBuilder) {
Expand Down Expand Up @@ -270,7 +270,7 @@ func WithActivesetPreparation(prep ActiveSetPreparation) Opt {
// New creates a struct of block builder type.
func New(
clock layerClock,
db sql.Executor,
db sql.StateDatabase,
localdb sql.Executor,
atxsdata atxsData,
publisher pubsub.Publisher,
Expand Down Expand Up @@ -449,7 +449,7 @@ func (pb *ProposalBuilder) UpdateActiveSet(target types.EpochID, set []types.ATX
pb.activeGen.updateFallback(target, set)
}

func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
func (pb *ProposalBuilder) initSharedData(ctx context.Context, current types.LayerID) error {
if pb.shared.epoch != current.GetEpoch() {
pb.shared = sharedSession{epoch: current.GetEpoch()}
}
Expand All @@ -476,7 +476,27 @@ func (pb *ProposalBuilder) initSharedData(current types.LayerID) error {
pb.shared.active.id = id
pb.shared.active.set = set
pb.shared.active.weight = weight
return nil

// Ideally we only persist the active set when we are actually eligible with at least one identity in at least one
// layer, but since at the moment we use a bootstrapped activeset, `activesets.Has` will always return
// true anyways.
//
// Additionally all activesets that are older than 2 epochs are deleted at the beginning of an epoch anyway, but
// maybe we should revisit this when activesets are no longer bootstrapped.
return pb.db.WithTx(ctx, func(tx sql.Transaction) error {
yes, err := activesets.Has(tx, pb.shared.active.id)
if err != nil {
return err
}
if yes {
return nil
}

return activesets.Add(tx, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
})
}

func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID) error {
Expand Down Expand Up @@ -548,7 +568,7 @@ func (pb *ProposalBuilder) initSignerData(ss *signerSession, lid types.LayerID)

func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
buildStartTime := time.Now()
if err := pb.initSharedData(lid); err != nil {
if err := pb.initSharedData(ctx, lid); err != nil {
return err
}

Expand Down Expand Up @@ -578,17 +598,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
return meshHash
})

persistActiveSetOnce := sync.OnceValue(func() error {
err := activesets.Add(pb.db, pb.shared.active.id, &types.EpochActiveSet{
Epoch: pb.shared.epoch,
Set: pb.shared.active.set,
})
if err != nil && !errors.Is(err, sql.ErrObjectExists) {
return err
}
return nil
})

// Two stage pipeline, with the stages running in parallel.
// 1. Initializes signers. Runs limited number of goroutines because the initialization is CPU and DB bound.
// 2. Collects eligible signers' sessions from the stage 1 and creates and publishes proposals.
Expand Down Expand Up @@ -662,14 +671,6 @@ func (pb *ProposalBuilder) build(ctx context.Context, lid types.LayerID) error {
ss.latency.hash = time.Since(start)

eg2.Go(func() error {
// needs to be saved before publishing, as we will query it in handler
if ss.session.ref == types.EmptyBallotID {
start := time.Now()
if err := persistActiveSetOnce(); err != nil {
return err
}
ss.latency.activeSet = time.Since(start)
}
proofs := ss.session.eligibilities.proofs[lid]

start = time.Now()
Expand Down
17 changes: 0 additions & 17 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package miner
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math/rand"
Expand Down Expand Up @@ -1272,19 +1271,3 @@ func BenchmarkDoubleCache(b *testing.B) {

require.Equal(b, types.EmptyATXID, found)
}

func BenchmarkDB(b *testing.B) {
db, err := statesql.Open("file:state.sql")
require.NoError(b, err)
defer db.Close()

bytes, err := hex.DecodeString("00003ce28800fadd692c522f7b1db219f675b49108aec7f818e2c4fd935573f6")
require.NoError(b, err)
nodeID := types.BytesToNodeID(bytes)
var found types.ATXID
b.ResetTimer()
for i := 0; i < b.N; i++ {
found, _ = atxs.GetByEpochAndNodeID(db, 30, nodeID)
}
require.NotEqual(b, types.EmptyATXID, found)
}
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,7 @@ func (app *App) initServices(ctx context.Context) error {
app.localDB,
atxService,
atxPublisher,

app.validator,
nipostBuilder,
app.clock,
Expand Down
Loading

0 comments on commit 4d6b25a

Please sign in to comment.