From b9b57496dd7564bbbfb8618b9a5f0a67d4be3106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Toni=20Ram=C3=ADrez?= <58293609+ToniRamirezM@users.noreply.github.com> Date: Mon, 19 Aug 2024 14:28:16 +0200 Subject: [PATCH] feat: Update seq sender and aggregator (#38) * feat: bring latest changes --- aggregator/aggregator.go | 203 ++++++--- aggregator/config.go | 9 + aggregator/db/migrations/0002.sql | 8 + aggregator/db/migrations/003.sql | 7 + aggregator/interfaces.go | 4 +- aggregator/prover/prover.go | 33 +- config/default.go | 3 + etherman/types.go | 19 +- go.mod | 4 +- go.sum | 8 +- sequencesender/config.go | 3 + sequencesender/seqsendertypes/types.go | 3 +- sequencesender/sequencesender.go | 415 ++++++++++++------ sequencesender/sequencesender_test.go | 8 + sequencesender/txbuilder/banana_types.go | 13 +- sequencesender/txbuilder/banana_validium.go | 3 +- sequencesender/txbuilder/banana_zkevm.go | 3 +- sequencesender/txbuilder/elderberry_base.go | 3 + sequencesender/txbuilder/elderberry_types.go | 14 +- .../txbuilder/validium_cond_num_batches.go | 2 +- state/helper.go | 42 +- state/interfaces.go | 4 +- state/pgstatestorage/batch.go | 28 +- state/types.go | 7 + test/config/test.config.toml | 2 + test/config/test.kurtosis_template.toml | 2 + 26 files changed, 605 insertions(+), 245 deletions(-) create mode 100644 aggregator/db/migrations/0002.sql create mode 100644 aggregator/db/migrations/003.sql diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 85a9518c..dce5ed1b 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -44,6 +44,10 @@ const ( mockedLocalExitRoot = "0x17c04c3760510b48c6012742c540a81aba4bca2f78b9d14bfd2f123e2e53ea3e" ) +var ( + errBusy = errors.New("witness server is busy") +) + type finalProofMsg struct { proverName string proverID string @@ -79,6 +83,8 @@ type Aggregator struct { finalProof chan finalProofMsg verifyingProof bool + witnessRetrievalChan chan state.DBBatch + srv *grpc.Server ctx context.Context exit context.CancelFunc @@ -175,8 +181,11 @@ func New( currentBatchStreamData: []byte{}, aggLayerClient: aggLayerClient, sequencerPrivateKey: sequencerPrivateKey, + witnessRetrievalChan: make(chan state.DBBatch), } + log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers) + // Set function to handle the batches from the data stream a.streamClient.SetProcessEntryFunc(a.handleReceivedDataStream) a.l1Syncr.SetCallbackOnReorgDone(a.handleReorg) @@ -184,6 +193,38 @@ func New( return a, nil } +func (a *Aggregator) retrieveWitness() { + var success bool + for { + dbBatch := <-a.witnessRetrievalChan + inner: + for !success { + var err error + // Get Witness + dbBatch.Witness, err = getWitness(dbBatch.Batch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness) + if err != nil { + if err == errBusy { + log.Debugf("Witness server is busy, retrying get witness for batch %d in %v", dbBatch.Batch.BatchNumber, a.cfg.RetryTime.Duration) + } else { + log.Errorf("Failed to get witness for batch %d, err: %v", dbBatch.Batch.BatchNumber, err) + } + time.Sleep(a.cfg.RetryTime.Duration) + continue inner + } + + err = a.state.AddBatch(a.ctx, &dbBatch, nil) + if err != nil { + log.Errorf("Error adding batch: %v", err) + time.Sleep(a.cfg.RetryTime.Duration) + continue inner + } + success = true + } + + success = false + } +} + func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { log.Warnf("Reorg detected, reorgData: %+v", reorgData) @@ -209,7 +250,6 @@ func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) { } func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer) error { - ctx := context.Background() forcedBlockhashL1 := common.Hash{} if !a.halted.Load() { @@ -250,7 +290,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli var batchl2Data []byte // Get batchl2Data from L1 - virtualBatch, err := a.l1Syncr.GetVirtualBatchByBatchNumber(ctx, a.currentStreamBatch.BatchNumber) + virtualBatch, err := a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) if err != nil && !errors.Is(err, entities.ErrNotFound) { log.Errorf("Error getting virtual batch: %v", err) return err @@ -259,7 +299,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli for errors.Is(err, entities.ErrNotFound) { log.Debug("Waiting for virtual batch to be available") time.Sleep(a.cfg.RetryTime.Duration) - virtualBatch, err = a.l1Syncr.GetVirtualBatchByBatchNumber(ctx, a.currentStreamBatch.BatchNumber) + virtualBatch, err = a.l1Syncr.GetVirtualBatchByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) if err != nil && !errors.Is(err, entities.ErrNotFound) { log.Errorf("Error getting virtual batch: %v", err) @@ -296,7 +336,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli } // Get L1InfoRoot - sequence, err := a.l1Syncr.GetSequenceByBatchNumber(ctx, a.currentStreamBatch.BatchNumber) + sequence, err := a.l1Syncr.GetSequenceByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) if err != nil { log.Errorf("Error getting sequence: %v", err) return err @@ -305,7 +345,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli for sequence == nil { log.Debug("Waiting for sequence to be available") time.Sleep(a.cfg.RetryTime.Duration) - sequence, err = a.l1Syncr.GetSequenceByBatchNumber(ctx, a.currentStreamBatch.BatchNumber) + sequence, err = a.l1Syncr.GetSequenceByBatchNumber(a.ctx, a.currentStreamBatch.BatchNumber) if err != nil { log.Errorf("Error getting sequence: %v", err) return err @@ -316,7 +356,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli a.currentStreamBatch.Timestamp = sequence.Timestamp // Calculate Acc Input Hash - oldBatch, _, err := a.state.GetBatch(ctx, a.currentStreamBatch.BatchNumber-1, nil) + oldDBBatch, err := a.state.GetBatch(a.ctx, a.currentStreamBatch.BatchNumber-1, nil) if err != nil { log.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber-1, err) return err @@ -324,7 +364,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli // Injected Batch if a.currentStreamBatch.BatchNumber == 1 { - l1Block, err := a.l1Syncr.GetL1BlockByNumber(ctx, virtualBatch.BlockNumber) + l1Block, err := a.l1Syncr.GetL1BlockByNumber(a.ctx, virtualBatch.BlockNumber) if err != nil { log.Errorf("Error getting L1 block: %v", err) return err @@ -334,7 +374,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli a.currentStreamBatch.L1InfoRoot = a.currentStreamBatch.GlobalExitRoot } - accInputHash, err := cdkcommon.CalculateAccInputHash(oldBatch.AccInputHash, a.currentStreamBatch.BatchL2Data, a.currentStreamBatch.L1InfoRoot, uint64(a.currentStreamBatch.Timestamp.Unix()), a.currentStreamBatch.Coinbase, forcedBlockhashL1) + accInputHash, err := cdkcommon.CalculateAccInputHash(oldDBBatch.Batch.AccInputHash, a.currentStreamBatch.BatchL2Data, a.currentStreamBatch.L1InfoRoot, uint64(a.currentStreamBatch.Timestamp.Unix()), a.currentStreamBatch.Coinbase, forcedBlockhashL1) if err != nil { log.Errorf("Error calculating acc input hash: %v", err) return err @@ -342,11 +382,36 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli a.currentStreamBatch.AccInputHash = accInputHash - err = a.state.AddBatch(ctx, &a.currentStreamBatch, a.currentBatchStreamData, nil) + dbBatch := state.DBBatch{ + Batch: a.currentStreamBatch, + Datastream: a.currentBatchStreamData, + Witness: nil, + } + + // Check if the batch is already in the DB to keep its witness + wDBBatch, err := a.state.GetBatch(a.ctx, a.currentStreamBatch.BatchNumber, nil) + if err != nil { + if !errors.Is(err, state.ErrNotFound) { + log.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber, err) + return err + } + } + + if wDBBatch != nil && wDBBatch.Witness != nil && len(wDBBatch.Witness) > 0 { + dbBatch.Witness = wDBBatch.Witness + } + + // Store batch in the DB + err = a.state.AddBatch(a.ctx, &dbBatch, nil) if err != nil { log.Errorf("Error adding batch: %v", err) return err } + + // Retrieve the witness + if dbBatch.Witness == nil || len(dbBatch.Witness) == 0 { + a.witnessRetrievalChan <- dbBatch + } } // Reset current batch data @@ -463,8 +528,8 @@ func (a *Aggregator) Start(ctx context.Context) error { log.Infof("Starting AccInputHash:%v", accInputHash.String()) // Store Acc Input Hash of the latest verified batch - dummyBatch := state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash} - err = a.state.AddBatch(ctx, &dummyBatch, []byte{0}, nil) + dummyDBBatch := state.DBBatch{Batch: state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}, Datastream: []byte{0}, Witness: []byte{0}} + err = a.state.AddBatch(ctx, &dummyDBBatch, nil) if err != nil { return err } @@ -483,6 +548,11 @@ func (a *Aggregator) Start(ctx context.Context) error { } }() + // Witness retrieval workers + for i := 0; i < a.cfg.MaxWitnessRetrievalWorkers; i++ { + go a.retrieveWitness() + } + // Start stream client err = a.streamClient.Start() if err != nil { @@ -618,7 +688,7 @@ func (a *Aggregator) sendFinalProof() { a.startProofVerification() - finalBatch, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) + finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) if err != nil { log.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err) a.endProofVerification() @@ -627,8 +697,8 @@ func (a *Aggregator) sendFinalProof() { inputs := ethmanTypes.FinalProofInputs{ FinalProof: msg.finalProof, - NewLocalExitRoot: finalBatch.LocalExitRoot.Bytes(), - NewStateRoot: finalBatch.StateRoot.Bytes(), + NewLocalExitRoot: finalDBBatch.Batch.LocalExitRoot.Bytes(), + NewStateRoot: finalDBBatch.Batch.StateRoot.Bytes(), } switch a.cfg.SettlementBackend { @@ -764,14 +834,31 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface if string(finalProof.Public.NewStateRoot) == mockedStateRoot && string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot { // This local exit root and state root come from the mock // prover, use the one captured by the executor instead - finalBatch, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) + finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) if err != nil { return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal) } log.Warnf("NewLocalExitRoot and NewStateRoot look like a mock values, using values from executor instead: LER: %v, SR: %v", - finalBatch.LocalExitRoot.TerminalString(), finalBatch.StateRoot.TerminalString()) - finalProof.Public.NewStateRoot = finalBatch.StateRoot.Bytes() - finalProof.Public.NewLocalExitRoot = finalBatch.LocalExitRoot.Bytes() + finalDBBatch.Batch.LocalExitRoot.TerminalString(), finalDBBatch.Batch.StateRoot.TerminalString()) + finalProof.Public.NewStateRoot = finalDBBatch.Batch.StateRoot.Bytes() + finalProof.Public.NewLocalExitRoot = finalDBBatch.Batch.LocalExitRoot.Bytes() + } + + // Sanity Check: state root from the proof must match the one from the final batch + if a.cfg.FinalProofSanityCheckEnabled { + finalDBBatch, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil) + if err != nil { + return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal) + } + + if common.BytesToHash(finalProof.Public.NewStateRoot).String() != finalDBBatch.Batch.StateRoot.String() { + for { + log.Errorf("State root from the final proof does not match the expected for batch %d: Proof = [%s] Expected = [%s]", proof.BatchNumberFinal, string(finalProof.Public.NewStateRoot), finalDBBatch.Batch.StateRoot.String()) + time.Sleep(a.cfg.RetryTime.Duration) + } + } else { + log.Infof("State root sanity check from the final proof for batch %d passed", proof.BatchNumberFinal) + } } return finalProof, nil @@ -1165,7 +1252,7 @@ func (a *Aggregator) getVerifiedBatchAccInputHash(ctx context.Context, batchNumb return &accInputHash, nil } -func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, *state.Proof, error) { +func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, []byte, *state.Proof, error) { proverID := prover.ID() proverName := prover.Name() @@ -1181,7 +1268,7 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn // Get last virtual batch number from L1 lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum() if err != nil { - return nil, nil, err + return nil, nil, nil, err } proofExists := true @@ -1193,20 +1280,20 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn proofExists, err = a.state.CheckProofExistsForBatch(ctx, batchNumberToVerify, nil) if err != nil { log.Infof("Error checking proof exists for batch %d", batchNumberToVerify) - return nil, nil, err + return nil, nil, nil, err } } // Check if the batch has been sequenced sequence, err := a.l1Syncr.GetSequenceByBatchNumber(ctx, batchNumberToVerify) if err != nil && !errors.Is(err, entities.ErrNotFound) { - return nil, nil, err + return nil, nil, nil, err } // Not found, so it it not possible to verify the batch yet if sequence == nil || errors.Is(err, entities.ErrNotFound) { log.Infof("No sequence found for batch %d", batchNumberToVerify) - return nil, nil, state.ErrNotFound + return nil, nil, nil, state.ErrNotFound } stateSequence := state.Sequence{ @@ -1214,20 +1301,30 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn ToBatchNumber: sequence.ToBatchNumber, } - err = a.state.AddSequence(ctx, stateSequence, nil) + // Check if the batch is already in the DB + dbBatch, err := a.state.GetBatch(ctx, batchNumberToVerify, nil) if err != nil { - log.Infof("Error storing sequence for batch %d", batchNumberToVerify) - return nil, nil, err + if errors.Is(err, state.ErrNotFound) { + log.Infof("Batch (%d) is not yet in DB", batchNumberToVerify) + } + return nil, nil, nil, err } - batch, _, err := a.state.GetBatch(ctx, batchNumberToVerify, nil) + // Check if the witness is already in the DB + if len(dbBatch.Witness) == 0 { + log.Infof("Witness for batch %d is not yet in DB", batchNumberToVerify) + return nil, nil, nil, state.ErrNotFound + } + + err = a.state.AddSequence(ctx, stateSequence, nil) if err != nil { - return batch, nil, err + log.Infof("Error storing sequence for batch %d", batchNumberToVerify) + return nil, nil, nil, err } // All the data required to generate a proof is ready - log.Infof("Found virtual batch %d pending to generate proof", batch.BatchNumber) - log = log.WithFields("batch", batch.BatchNumber) + log.Infof("Found virtual batch %d pending to generate proof", dbBatch.Batch.BatchNumber) + log = log.WithFields("batch", dbBatch.Batch.BatchNumber) log.Info("Checking profitability to aggregate batch") @@ -1235,18 +1332,18 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn isProfitable, err := a.profitabilityChecker.IsProfitable(ctx, big.NewInt(0)) if err != nil { log.Errorf("Failed to check aggregator profitability, err: %v", err) - return nil, nil, err + return nil, nil, nil, err } if !isProfitable { log.Infof("Batch is not profitable, pol collateral %d", big.NewInt(0)) - return nil, nil, err + return nil, nil, nil, err } now := time.Now().Round(time.Microsecond) proof := &state.Proof{ - BatchNumber: batch.BatchNumber, - BatchNumberFinal: batch.BatchNumber, + BatchNumber: dbBatch.Batch.BatchNumber, + BatchNumberFinal: dbBatch.Batch.BatchNumber, Prover: &proverName, ProverID: &proverID, GeneratingSince: &now, @@ -1256,10 +1353,10 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn err = a.state.AddGeneratedProof(ctx, proof, nil) if err != nil { log.Errorf("Failed to add batch proof, err: %v", err) - return nil, nil, err + return nil, nil, nil, err } - return batch, proof, nil + return &dbBatch.Batch, dbBatch.Witness, proof, nil } func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInterface) (bool, error) { @@ -1270,8 +1367,8 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt ) log.Debug("tryGenerateBatchProof start") - batchToProve, proof, err0 := a.getAndLockBatchToProve(ctx, prover) - if errors.Is(err0, state.ErrNotFound) { + batchToProve, witness, proof, err0 := a.getAndLockBatchToProve(ctx, prover) + if errors.Is(err0, state.ErrNotFound) || errors.Is(err0, entities.ErrNotFound) { // nothing to proof, swallow the error log.Debug("Nothing to generate proof") return false, nil @@ -1299,7 +1396,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt }() log.Infof("Sending zki + batch to the prover, batchNumber [%d]", batchToProve.BatchNumber) - inputProver, err := a.buildInputProver(ctx, batchToProve) + inputProver, err := a.buildInputProver(ctx, batchToProve, witness) if err != nil { err = fmt.Errorf("failed to build input prover, %w", err) log.Error(FirstToUpper(err.Error())) @@ -1330,9 +1427,13 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt log.Info("Batch proof generated") // Sanity Check: state root from the proof must match the one from the batch - if stateRoot != batchToProve.StateRoot { - log.Fatalf("State root from the proof does not match the expected for batch %d: Proof = [%s] Expected = [%s]", - batchToProve.BatchNumber, stateRoot.String(), batchToProve.StateRoot.String()) + if a.cfg.BatchProofSanityCheckEnabled && (stateRoot != common.Hash{}) && (stateRoot != batchToProve.StateRoot) { + for { + log.Errorf("State root from the proof does not match the expected for batch %d: Proof = [%s] Expected = [%s]", batchToProve.BatchNumber, stateRoot.String(), batchToProve.StateRoot.String()) + time.Sleep(a.cfg.RetryTime.Duration) + } + } else { + log.Infof("State root sanity check for batch %d passed", batchToProve.BatchNumber) } proof.Proof = resGetProof @@ -1392,7 +1493,7 @@ func (a *Aggregator) resetVerifyProofTime() { a.timeSendFinalProof = time.Now().Add(a.cfg.VerifyProofInterval.Duration) } -func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.Batch) (*prover.StatelessInputProver, error) { +func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.Batch, witness []byte) (*prover.StatelessInputProver, error) { isForcedBatch := false batchRawData := &state.BatchRawV2{} var err error @@ -1487,15 +1588,8 @@ func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state. }*/ } - // Get Witness - witness, err := getWitness(batchToVerify.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness) - if err != nil { - log.Errorf("Failed to get witness, err: %v", err) - return nil, err - } - // Get Old Acc Input Hash - oldBatch, _, err := a.state.GetBatch(ctx, batchToVerify.BatchNumber-1, nil) + oldDBBatch, err := a.state.GetBatch(ctx, batchToVerify.BatchNumber-1, nil) if err != nil { return nil, err } @@ -1503,7 +1597,7 @@ func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state. inputProver := &prover.StatelessInputProver{ PublicInputs: &prover.StatelessPublicInputs{ Witness: witness, - OldAccInputHash: oldBatch.AccInputHash.Bytes(), + OldAccInputHash: oldDBBatch.Batch.AccInputHash.Bytes(), OldBatchNum: batchToVerify.BatchNumber - 1, ChainId: batchToVerify.ChainID, ForkId: batchToVerify.ForkID, @@ -1531,6 +1625,8 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error witnessType = "full" } + log.Infof("Requesting witness for batch %d of type %s", batchNumber, witnessType) + response, err = rpc.JSONRPCCall(URL, "zkevm_getBatchWitness", batchNumber, witnessType) if err != nil { return nil, err @@ -1538,6 +1634,9 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error // Check if the response is an error if response.Error != nil { + if response.Error.Message == "busy" { + return nil, errBusy + } return nil, fmt.Errorf("error from witness for batch %d: %v", batchNumber, response.Error) } diff --git a/aggregator/config.go b/aggregator/config.go index 3d53e0f4..23335800 100644 --- a/aggregator/config.go +++ b/aggregator/config.go @@ -77,6 +77,12 @@ type Config struct { // IntervalAfterWhichBatchConsolidateAnyway this is interval for the main sequencer, that will check if there is no transactions IntervalAfterWhichBatchConsolidateAnyway types.Duration `mapstructure:"IntervalAfterWhichBatchConsolidateAnyway"` + // BatchProofSanityCheckEnabled is a flag to enable the sanity check of the batch proof + BatchProofSanityCheckEnabled bool `mapstructure:"BatchProofSanityCheckEnabled"` + + // FinalProofSanityCheckEnabled is a flag to enable the sanity check of the final proof + FinalProofSanityCheckEnabled bool `mapstructure:"FinalProofSanityCheckEnabled"` + // ChainID is the L2 ChainID provided by the Network Config ChainID uint64 @@ -142,6 +148,9 @@ type Config struct { // AggLayerURL url of the agglayer service AggLayerURL string `mapstructure:"AggLayerURL"` + + // MaxWitnessRetrievalWorkers is the maximum number of workers that will be used to retrieve the witness + MaxWitnessRetrievalWorkers int `mapstructure:"MaxWitnessRetrievalWorkers"` } // StreamClientCfg contains the data streamer's configuration properties diff --git a/aggregator/db/migrations/0002.sql b/aggregator/db/migrations/0002.sql new file mode 100644 index 00000000..e2290e13 --- /dev/null +++ b/aggregator/db/migrations/0002.sql @@ -0,0 +1,8 @@ +-- +migrate Up +DELETE FROM aggregator.batch; +ALTER TABLE aggregator.batch + ADD COLUMN IF NOT EXISTS witness varchar NOT NULL; + +-- +migrate Down +ALTER TABLE aggregator.batch + DROP COLUMN IF EXISTS witness; diff --git a/aggregator/db/migrations/003.sql b/aggregator/db/migrations/003.sql new file mode 100644 index 00000000..5351f8e7 --- /dev/null +++ b/aggregator/db/migrations/003.sql @@ -0,0 +1,7 @@ +-- +migrate Up +ALTER TABLE aggregator.batch + ALTER COLUMN witness DROP NOT NULL; + +-- +migrate Down +ALTER TABLE aggregator.batch + ALTER COLUMN witness SET NOT NULL; diff --git a/aggregator/interfaces.go b/aggregator/interfaces.go index edb40459..6b5ba63a 100644 --- a/aggregator/interfaces.go +++ b/aggregator/interfaces.go @@ -55,8 +55,8 @@ type stateInterface interface { CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error) CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) AddSequence(ctx context.Context, sequence state.Sequence, dbTx pgx.Tx) error - AddBatch(ctx context.Context, batch *state.Batch, datastream []byte, dbTx pgx.Tx) error - GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, []byte, error) + AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error + GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error) DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error } diff --git a/aggregator/prover/prover.go b/aggregator/prover/prover.go index 26e25504..fd9b1e7d 100644 --- a/aggregator/prover/prover.go +++ b/aggregator/prover/prover.go @@ -8,6 +8,7 @@ import ( "math/big" "net" "strconv" + "strings" "time" "github.com/0xPolygon/cdk/config/types" @@ -255,12 +256,19 @@ func (p *Prover) WaitRecursiveProof(ctx context.Context, proofID string) (string if err != nil { return "", common.Hash{}, err } - stateRoot, err := GetStateRootFromProof(res.Proof.(*GetProofResponse_RecursiveProof).RecursiveProof) - if err != nil { - return "", common.Hash{}, err - } + resProof := res.Proof.(*GetProofResponse_RecursiveProof) - return resProof.RecursiveProof, stateRoot, nil + + sr, err := GetStateRootFromProof(resProof.RecursiveProof) + if err != nil && sr != (common.Hash{}) { + log.Errorf("Error getting state root from proof: %v", err) + } + + if sr == (common.Hash{}) { + log.Info("Recursive proof does not contain state root. Possibly mock prover is in use.") + } + + return resProof.RecursiveProof, sr, nil } // WaitFinalProof waits for the final proof to be generated by the prover and @@ -342,10 +350,18 @@ func (p *Prover) call(req *AggregatorMessage) (*ProverMessage, error) { // GetStateRootFromProof returns the state root from the proof. func GetStateRootFromProof(proof string) (common.Hash, error) { + // Log received proof + log.Debugf("Received proof to get SR from: %s", proof) + type Publics struct { Publics []string `mapstructure:"publics"` } + // Check if the proof contains the SR + if !strings.Contains(proof, "publics") { + return common.Hash{}, nil + } + var publics Publics err := json.Unmarshal([]byte(proof), &publics) if err != nil { @@ -353,11 +369,8 @@ func GetStateRootFromProof(proof string) (common.Hash, error) { return common.Hash{}, err } - var ( - v [8]uint64 - j = 0 - ) - + var v [8]uint64 + var j = 0 for i := stateRootStartIndex; i < stateRootFinalIndex; i++ { u64, err := strconv.ParseInt(publics.Publics[i], 10, 64) if err != nil { diff --git a/config/default.go b/config/default.go index 67a81b65..58404105 100644 --- a/config/default.go +++ b/config/default.go @@ -57,6 +57,8 @@ ProofStatePollingInterval = "5s" SenderAddress = "" CleanupLockedProofsInterval = "2m" GeneratingProofCleanupThreshold = "10m" +BatchProofSanityCheckEnabled = true +FinalProofSanityCheckEnabled = true ForkId = 9 GasOffset = 0 WitnessURL = "localhost:8123" @@ -65,6 +67,7 @@ UseFullWitness = false SettlementBackend = "l1" AggLayerTxTimeout = "5m" AggLayerURL = "" +MaxWitnessRetrievalWorkers = 2 SequencerPrivateKey = {} [Aggregator.DB] Name = "aggregator_db" diff --git a/etherman/types.go b/etherman/types.go index b41ff93d..578eea59 100644 --- a/etherman/types.go +++ b/etherman/types.go @@ -162,13 +162,14 @@ type Batch struct { } type SequenceBanana struct { - Batches []Batch - OldAccInputHash common.Hash - AccInputHash common.Hash - L1InfoRoot common.Hash - MaxSequenceTimestamp uint64 - IndexL1InfoRoot uint32 - L2Coinbase common.Address + Batches []Batch + OldAccInputHash common.Hash + AccInputHash common.Hash + L1InfoRoot common.Hash + MaxSequenceTimestamp uint64 + IndexL1InfoRoot uint32 + L2Coinbase common.Address + LastVirtualBatchNumber uint64 } func NewSequenceBanana(batches []Batch, l2Coinbase common.Address) *SequenceBanana { @@ -198,3 +199,7 @@ func NewSequenceBanana(batches []Batch, l2Coinbase common.Address) *SequenceBana func (s *SequenceBanana) Len() int { return len(s.Batches) } + +func (s *SequenceBanana) SetLastVirtualBatchNumber(batchNumber uint64) { + s.LastVirtualBatchNumber = batchNumber +} diff --git a/go.mod b/go.mod index b0137850..30a2a942 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/0xPolygon/cdk-contracts-tooling v0.0.0-20240726125827-301fa4c59245 github.com/0xPolygon/cdk-data-availability v0.0.8 github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d - github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 - github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.9 + github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 + github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.4 github.com/ethereum/go-ethereum v1.14.5 github.com/hermeznetwork/tracerr v0.3.2 diff --git a/go.sum b/go.sum index 09150414..d2d50209 100644 --- a/go.sum +++ b/go.sum @@ -4,10 +4,10 @@ github.com/0xPolygon/cdk-data-availability v0.0.8 h1:bMmOYZ7Ei683y80ric3KzMPXtRG github.com/0xPolygon/cdk-data-availability v0.0.8/go.mod h1:3XkZ0zn0GsvAT01MPQMmukF534CVSFmtrcoK3F/BK6Q= github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d h1:sxh6hZ2jF/sxxj2jd5o1vuNNCZjYmn4aRG9SRlVaEFs= github.com/0xPolygon/cdk-rpc v0.0.0-20240419104226-c0a62ba0f49d/go.mod h1:2scWqMMufrQXu7TikDgQ3BsyaKoX8qP26D6E262vSOg= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE= -github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.9 h1:vrAezzwTNke6NroDAltGh1k2AJ6ibmZPBsG0bCltbRc= -github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.9/go.mod h1:pRqfLQVM3nbzdhy3buqjAgcVyNDKAXOHqTSgkwiKpic= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3 h1:zJ06KCGLMDOap4slop/QmiMUO+VPsKSS3+944SY06ww= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3/go.mod h1:bv7DjATsczN2WvFt26jv34TWv6rfvYM1SqegrgrFwfI= +github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234 h1:QElCysO7f2xaknY/RDjxcs7IVmcgORfsCX2g+YD0Ko4= +github.com/0xPolygonHermez/zkevm-ethtx-manager v0.1.10-0.20240716105056-c051c96d0234/go.mod h1:zBZWxwOHKlw+ghd9roQLgIkDZWA7e7qO3EsfQQT/+oQ= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.3-0.20240712085301-0310358abb59 h1:Qwh92vFEXnpmDggQaZA3648viEQfLdMnAw/WFSY+2i8= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.3-0.20240712085301-0310358abb59/go.mod h1:/LHf8jPQeBYKABM1xUmN1dKaFVIJc9jMQDSGBDJ7CS0= github.com/0xPolygonHermez/zkevm-synchronizer-l1 v0.6.3 h1:C+jNYr/CDMMn8wn3HqZqLTPU0luNYIB35pnxVf9O8TM= diff --git a/sequencesender/config.go b/sequencesender/config.go index 9b80e404..9521a0c2 100644 --- a/sequencesender/config.go +++ b/sequencesender/config.go @@ -64,6 +64,9 @@ type Config struct { // MaxBatchesForL1 is the maximum amount of batches to be sequenced in a single L1 tx MaxBatchesForL1 uint64 `mapstructure:"MaxBatchesForL1"` + + // SanityCheckRPCURL is the URL of the RPC server to perform sanity check regarding the number of blocks in a batch + SanityCheckRPCURL string `mapstructure:"SanityCheckRPCURL"` } // StreamClientCfg contains the data streamer's configuration properties diff --git a/sequencesender/seqsendertypes/types.go b/sequencesender/seqsendertypes/types.go index 5d998b1c..21b5834e 100644 --- a/sequencesender/seqsendertypes/types.go +++ b/sequencesender/seqsendertypes/types.go @@ -34,11 +34,12 @@ type Sequence interface { FirstBatch() Batch LastBatch() Batch Len() int - L2Coinbase() common.Address + LastVirtualBatchNumber() uint64 String() string // WRITE + SetLastVirtualBatchNumber(batchNumber uint64) //SetL1InfoRoot(hash common.Hash) //SetOldAccInputHash(hash common.Hash) //SetAccInputHash(hash common.Hash) diff --git a/sequencesender/sequencesender.go b/sequencesender/sequencesender.go index 31ddb75c..2e116a1b 100644 --- a/sequencesender/sequencesender.go +++ b/sequencesender/sequencesender.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/0xPolygon/cdk-rpc/rpc" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/log" "github.com/0xPolygon/cdk/sequencesender/seqsendertypes" @@ -27,33 +28,36 @@ import ( // SequenceSender represents a sequence sender type SequenceSender struct { - cfg Config - ethTxManager *ethtxmanager.Client - etherman *etherman.Client - currentNonce uint64 - latestVirtualBatch uint64 // Latest virtualized batch obtained from L1 - latestVirtualTime time.Time // Latest virtual batch timestamp - latestSentToL1Batch uint64 // Latest batch sent to L1 - wipBatch uint64 // Work in progress batch - sequenceList []uint64 // Sequence of batch number to be send to L1 - sequenceData map[uint64]*sequenceData // All the batch data indexed by batch number - mutexSequence sync.Mutex // Mutex to access sequenceData and sequenceList - ethTransactions map[common.Hash]*ethTxData // All the eth tx sent to L1 indexed by hash - ethTxData map[common.Hash][]byte // Tx data send to or received from L1 - mutexEthTx sync.Mutex // Mutex to access ethTransactions - sequencesTxFile *os.File // Persistence of sent transactions - validStream bool // Not valid while receiving data before the desired batch - fromStreamBatch uint64 // Initial batch to connect to the streaming - latestStreamBatch uint64 // Latest batch received by the streaming - seqSendingStopped bool // If there is a critical error - streamClient *datastreamer.StreamClient - TxBuilder txbuilder.TxBuilder + cfg Config + ethTxManager *ethtxmanager.Client + etherman *etherman.Client + currentNonce uint64 + latestVirtualBatch uint64 // Latest virtualized batch obtained from L1 + latestVirtualTime time.Time // Latest virtual batch timestamp + latestSentToL1Batch uint64 // Latest batch sent to L1 + wipBatch uint64 // Work in progress batch + sequenceList []uint64 // Sequence of batch number to be send to L1 + sequenceData map[uint64]*sequenceData // All the batch data indexed by batch number + mutexSequence sync.Mutex // Mutex to access sequenceData and sequenceList + ethTransactions map[common.Hash]*ethTxData // All the eth tx sent to L1 indexed by hash + ethTxData map[common.Hash][]byte // Tx data send to or received from L1 + mutexEthTx sync.Mutex // Mutex to access ethTransactions + sequencesTxFile *os.File // Persistence of sent transactions + validStream bool // Not valid while receiving data before the desired batch + fromStreamBatch uint64 // Initial batch to connect to the streaming + latestStreamBatch uint64 // Latest batch received by the streaming + seqSendingStopped bool // If there is a critical error + prevStreamEntry *datastreamer.FileEntry + streamClient *datastreamer.StreamClient + TxBuilder txbuilder.TxBuilder + latestVirtualBatchLock sync.Mutex } type sequenceData struct { batchClosed bool batch seqsendertypes.Batch batchRaw *state.BatchRawV2 + batchType datastream.BatchType } type ethTxData struct { @@ -68,6 +72,7 @@ type ethTxData struct { To common.Address `json:"to"` StateHistory []string `json:"stateHistory"` Txs map[common.Hash]ethTxAdditionalData `json:"txs"` + Gas uint64 `json:"gas"` } type ethTxAdditionalData struct { @@ -94,7 +99,7 @@ func New(cfg Config, etherman *etherman.Client, txBuilder txbuilder.TxBuilder) ( // Restore pending sent sequences err := s.loadSentSequencesTransactions() if err != nil { - log.Fatalf("[SeqSender] error restoring sent sequences from file", err) + log.Fatalf("error restoring sent sequences from file", err) return nil, err } @@ -107,16 +112,16 @@ func New(cfg Config, etherman *etherman.Client, txBuilder txbuilder.TxBuilder) ( s.ethTxManager, err = ethtxmanager.New(cfg.EthTxManager) if err != nil { - log.Fatalf("[SeqSender] error creating ethtxmanager client: %v", err) + log.Fatalf("error creating ethtxmanager client: %v", err) return nil, err } // Create datastream client s.streamClient, err = datastreamer.NewClient(s.cfg.StreamClient.Server, 1) if err != nil { - log.Fatalf("[SeqSender] failed to create stream client, error: %v", err) + log.Fatalf("failed to create stream client, error: %v", err) } else { - log.Infof("[SeqSender] new stream client") + log.Infof("new stream client") } // Set func to handle the streaming s.streamClient.SetProcessEntryFunc(s.handleReceivedDataStream) @@ -133,27 +138,27 @@ func (s *SequenceSender) Start(ctx context.Context) { var err error s.currentNonce, err = s.etherman.CurrentNonce(ctx, s.cfg.L2Coinbase) if err != nil { - log.Fatalf("[SeqSender] failed to get current nonce from %v, error: %v", s.cfg.L2Coinbase, err) + log.Fatalf("failed to get current nonce from %v, error: %v", s.cfg.L2Coinbase, err) } else { - log.Infof("[SeqSender] current nonce for %v is %d", s.cfg.L2Coinbase, s.currentNonce) + log.Infof("current nonce for %v is %d", s.cfg.L2Coinbase, s.currentNonce) } // Get latest virtual state batch from L1 err = s.updateLatestVirtualBatch() if err != nil { - log.Fatalf("[SeqSender] error getting latest sequenced batch, error: %v", err) + log.Fatalf("error getting latest sequenced batch, error: %v", err) } // Sync all monitored sent L1 tx err = s.syncAllEthTxResults(ctx) if err != nil { - log.Fatalf("[SeqSender] failed to sync monitored tx results, error: %v", err) + log.Fatalf("failed to sync monitored tx results, error: %v", err) } // Start datastream client err = s.streamClient.Start() if err != nil { - log.Fatalf("[SeqSender] failed to start stream client, error: %v", err) + log.Fatalf("failed to start stream client, error: %v", err) } // Set starting point of the streaming @@ -166,10 +171,10 @@ func (s *SequenceSender) Start(ctx context.Context) { marshalledBookmark, err := proto.Marshal(bookmark) if err != nil { - log.Fatalf("[SeqSender] failed to marshal bookmark, error: %v", err) + log.Fatalf("failed to marshal bookmark, error: %v", err) } - log.Infof("[SeqSender] stream client from bookmark %v", bookmark) + log.Infof("stream client from bookmark %v", bookmark) // Current batch to sequence s.wipBatch = s.latestVirtualBatch + 1 @@ -181,7 +186,7 @@ func (s *SequenceSender) Start(ctx context.Context) { // Start receiving the streaming err = s.streamClient.ExecCommandStartBookmark(marshalledBookmark) if err != nil { - log.Fatalf("[SeqSender] failed to connect to the streaming: %v", err) + log.Fatalf("failed to connect to the streaming: %v", err) } } @@ -226,7 +231,7 @@ func (s *SequenceSender) purgeSequences() { } delete(s.sequenceData, toPurge[i]) } - log.Infof("[SeqSender] batches purged count: %d, fromBatch: %d, toBatch: %d", len(toPurge), firstPurged, lastPurged) + log.Infof("batches purged count: %d, fromBatch: %d, toBatch: %d", len(toPurge), firstPurged, lastPurged) } s.mutexSequence.Unlock() } @@ -254,9 +259,9 @@ func (s *SequenceSender) purgeEthTx(ctx context.Context) { if data.OnMonitor { err := s.ethTxManager.Remove(ctx, hash) if err != nil { - log.Warnf("[SeqSender] error removing monitor tx %v from ethtxmanager: %v", hash, err) + log.Warnf("error removing monitor tx %v from ethtxmanager: %v", hash, err) } else { - log.Infof("[SeqSender] removed monitor tx %v from ethtxmanager", hash) + log.Infof("removed monitor tx %v from ethtxmanager", hash) } } } @@ -275,7 +280,7 @@ func (s *SequenceSender) purgeEthTx(ctx context.Context) { delete(s.ethTransactions, toPurge[i]) delete(s.ethTxData, toPurge[i]) } - log.Infof("[SeqSender] txs purged count: %d, fromNonce: %d, toNonce: %d", len(toPurge), firstPurged, lastPurged) + log.Infof("txs purged count: %d, fromNonce: %d, toNonce: %d", len(toPurge), firstPurged, lastPurged) } s.mutexEthTx.Unlock() } @@ -306,10 +311,10 @@ func (s *SequenceSender) syncEthTxResults(ctx context.Context) (uint64, error) { // Save updated sequences transactions err := s.saveSentSequencesTransactions(ctx) if err != nil { - log.Errorf("[SeqSender] error saving tx sequence, error: %v", err) + log.Errorf("error saving tx sequence, error: %v", err) } - log.Infof("[SeqSender] %d tx results synchronized (%d in pending state)", txSync, txPending) + log.Infof("%d tx results synchronized (%d in pending state)", txSync, txPending) return txPending, nil } @@ -318,7 +323,7 @@ func (s *SequenceSender) syncAllEthTxResults(ctx context.Context) error { // Get all results results, err := s.ethTxManager.ResultsByStatus(ctx, nil) if err != nil { - log.Warnf("[SeqSender] error getting results for all tx: %v", err) + log.Warnf("error getting results for all tx: %v", err) return err } @@ -328,7 +333,7 @@ func (s *SequenceSender) syncAllEthTxResults(ctx context.Context) error { for _, result := range results { txSequence, exists := s.ethTransactions[result.ID] if !exists { - log.Infof("[SeqSender] transaction %v missing in memory structure. Adding it", result.ID) + log.Infof("transaction %v missing in memory structure. Adding it", result.ID) // No info: from/to batch and the sent timestamp s.ethTransactions[result.ID] = ðTxData{ SentL1Timestamp: time.Time{}, @@ -346,10 +351,10 @@ func (s *SequenceSender) syncAllEthTxResults(ctx context.Context) error { // Save updated sequences transactions err = s.saveSentSequencesTransactions(ctx) if err != nil { - log.Errorf("[SeqSender] error saving tx sequence, error: %v", err) + log.Errorf("error saving tx sequence, error: %v", err) } - log.Infof("[SeqSender] %d tx results synchronized", numResults) + log.Infof("%d tx results synchronized", numResults) return nil } @@ -376,7 +381,7 @@ func (s *SequenceSender) copyTxData(txHash common.Hash, txData []byte, txsResult // updateEthTxResult handles updating transaction state func (s *SequenceSender) updateEthTxResult(txData *ethTxData, txResult ethtxmanager.MonitoredTxResult) { if txData.Status != txResult.Status.String() { - log.Infof("[SeqSender] update transaction %v to state %s", txResult.ID, txResult.Status.String()) + log.Infof("update transaction %v to state %s", txResult.ID, txResult.Status.String()) txData.StatusTimestamp = time.Now() stTrans := txData.StatusTimestamp.Format("2006-01-02T15:04:05.000-07:00") + ", " + txData.Status + ", " + txResult.Status.String() txData.Status = txResult.Status.String() @@ -385,7 +390,7 @@ func (s *SequenceSender) updateEthTxResult(txData *ethTxData, txResult ethtxmana // Manage according to the state statusConsolidated := txData.Status == ethtxmanager.MonitoredTxStatusSafe.String() || txData.Status == ethtxmanager.MonitoredTxStatusFinalized.String() if txData.Status == ethtxmanager.MonitoredTxStatusFailed.String() { - s.logFatalf("[SeqSender] transaction %v result failed!") + s.logFatalf("transaction %v result failed!") } else if statusConsolidated && txData.ToBatch >= s.latestVirtualBatch { s.latestVirtualTime = txData.StatusTimestamp } @@ -406,21 +411,21 @@ func (s *SequenceSender) updateEthTxResult(txData *ethTxData, txResult ethtxmana func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash common.Hash) error { txData, exists := s.ethTransactions[txHash] if !exists { - log.Errorf("[SeqSender] transaction %v not found in memory", txHash) + log.Errorf("transaction %v not found in memory", txHash) return errors.New("transaction not found in memory structure") } txResult, err := s.ethTxManager.Result(ctx, txHash) if err == ethtxmanager.ErrNotFound { - log.Infof("[SeqSender] transaction %v does not exist in ethtxmanager. Marking it", txHash) + log.Infof("transaction %v does not exist in ethtxmanager. Marking it", txHash) txData.OnMonitor = false // Resend tx - errSend := s.sendTx(ctx, true, &txHash, nil, 0, 0, nil) + errSend := s.sendTx(ctx, true, &txHash, nil, 0, 0, nil, txData.Gas) if errSend == nil { txData.OnMonitor = false } } else if err != nil { - log.Errorf("[SeqSender] error getting result for tx %v: %v", txHash, err) + log.Errorf("error getting result for tx %v: %v", txHash, err) return err } else { s.updateEthTxResult(txData, txResult) @@ -432,14 +437,14 @@ func (s *SequenceSender) getResultAndUpdateEthTx(ctx context.Context, txHash com // tryToSendSequence checks if there is a sequence and it's worth it to send to L1 func (s *SequenceSender) tryToSendSequence(ctx context.Context) { // Update latest virtual batch - log.Infof("[SeqSender] updating virtual batch") + log.Infof("updating virtual batch") err := s.updateLatestVirtualBatch() if err != nil { return } // Update state of transactions - log.Infof("[SeqSender] updating tx results") + log.Infof("updating tx results") countPending, err := s.syncEthTxResults(ctx) if err != nil { return @@ -447,22 +452,22 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { // Check if the sequence sending is stopped if s.seqSendingStopped { - log.Warnf("[SeqSender] sending is stopped!") + log.Warnf("sending is stopped!") return } // Check if reached the maximum number of pending transactions if countPending >= s.cfg.MaxPendingTx { - log.Infof("[SeqSender] max number of pending txs (%d) reached. Waiting for some to be completed", countPending) + log.Infof("max number of pending txs (%d) reached. Waiting for some to be completed", countPending) return } // Check if should send sequence to L1 - log.Infof("[SeqSender] getting sequences to send") + log.Infof("getting sequences to send") sequence, err := s.getSequencesToSend(ctx) if err != nil || sequence == nil || sequence.Len() == 0 { if err != nil { - log.Errorf("[SeqSender] error getting sequences: %v", err) + log.Errorf("error getting sequences: %v", err) } return } @@ -472,7 +477,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { lastSequence := sequence.LastBatch() lastL2BlockTimestamp := lastSequence.LastL2BLockTimestamp() - log.Infof("[SeqSender] sending sequences to L1. From batch %d to batch %d", firstSequence.BatchNumber(), lastSequence.BatchNumber()) + log.Infof("sending sequences to L1. From batch %d to batch %d", firstSequence.BatchNumber(), lastSequence.BatchNumber()) log.Infof(sequence.String()) // Wait until last L1 block timestamp is L1BlockTimestampMargin seconds above the timestamp of the last L2 block in the sequence @@ -481,18 +486,18 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { // Get header of the last L1 block lastL1BlockHeader, err := s.etherman.GetLatestBlockHeader(ctx) if err != nil { - log.Errorf("[SeqSender] failed to get last L1 block timestamp, err: %v", err) + log.Errorf("failed to get last L1 block timestamp, err: %v", err) return } elapsed, waitTime := s.marginTimeElapsed(lastL2BlockTimestamp, lastL1BlockHeader.Time, timeMargin) if !elapsed { - log.Infof("[SeqSender] waiting at least %d seconds to send sequences, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds", + log.Infof("waiting at least %d seconds to send sequences, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds", waitTime, lastL1BlockHeader.Number, lastL1BlockHeader.Time, lastSequence.BatchNumber(), lastL2BlockTimestamp, timeMargin) time.Sleep(time.Duration(waitTime) * time.Second) } else { - log.Infof("[SeqSender] continuing, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is greater than %d seconds", + log.Infof("continuing, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is greater than %d seconds", lastL1BlockHeader.Number, lastL1BlockHeader.Time, lastSequence.BatchNumber, lastL2BlockTimestamp, timeMargin) break } @@ -506,7 +511,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { // Wait if the time difference is less than L1BlockTimestampMargin if !elapsed { - log.Infof("[SeqSender] waiting at least %d seconds to send sequences, time difference between now (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds", + log.Infof("waiting at least %d seconds to send sequences, time difference between now (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds", waitTime, currentTime, lastSequence.BatchNumber, lastL2BlockTimestamp, timeMargin) time.Sleep(time.Duration(waitTime) * time.Second) } else { @@ -517,17 +522,37 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { } // Send sequences to L1 - log.Infof("[SeqSender] sending sequences to L1. From batch %d to batch %d", firstSequence.BatchNumber(), lastSequence.BatchNumber()) + log.Infof("sending sequences to L1. From batch %d to batch %d", firstSequence.BatchNumber(), lastSequence.BatchNumber()) log.Infof(sequence.String()) tx, err := s.TxBuilder.BuildSequenceBatchesTx(ctx, sequence) if err != nil { - log.Errorf("[SeqSender] error building sequenceBatches tx: %v", err) + log.Errorf("error building sequenceBatches tx: %v", err) + return + } + + // Get latest virtual state batch from L1 + err = s.updateLatestVirtualBatch() + if err != nil { + log.Fatalf("error getting latest sequenced batch, error: %v", err) + } + + sequence.SetLastVirtualBatchNumber(s.latestVirtualBatch) + + txToEstimateGas, err := s.TxBuilder.BuildSequenceBatchesTx(ctx, sequence) + if err != nil { + log.Errorf("error building sequenceBatches tx to estimate gas: %v", err) + return + } + + gas, err := s.etherman.EstimateGas(ctx, s.cfg.SenderAddress, tx.To(), nil, txToEstimateGas.Data()) + if err != nil { + log.Errorf("error estimating gas: ", err) return } // Add sequence tx - err = s.sendTx(ctx, false, nil, tx.To(), firstSequence.BatchNumber(), lastSequence.BatchNumber(), tx.Data()) + err = s.sendTx(ctx, false, nil, tx.To(), firstSequence.BatchNumber(), lastSequence.BatchNumber(), tx.Data(), gas) if err != nil { return } @@ -537,7 +562,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) { } // sendTx adds transaction to the ethTxManager to send it to L1 -func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *common.Hash, to *common.Address, fromBatch uint64, toBatch uint64, data []byte) error { +func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *common.Hash, to *common.Address, fromBatch uint64, toBatch uint64, data []byte, gas uint64) error { // Params if new tx to send or resend a previous tx var paramTo *common.Address var paramNonce *uint64 @@ -554,7 +579,7 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com valueToBatch = toBatch } else { if txOldHash == nil { - log.Errorf("[SeqSender] trying to resend a tx with nil hash") + log.Errorf("trying to resend a tx with nil hash") return errors.New("resend tx with nil hash monitor id") } paramTo = &s.ethTransactions[*txOldHash].To @@ -568,9 +593,9 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com } // Add sequence tx - txHash, err := s.ethTxManager.Add(ctx, paramTo, paramNonce, big.NewInt(0), paramData, s.cfg.GasOffset, nil) + txHash, err := s.ethTxManager.AddWithGas(ctx, paramTo, paramNonce, big.NewInt(0), paramData, s.cfg.GasOffset, nil, gas) if err != nil { - log.Errorf("[SeqSender] error adding sequence to ethtxmanager: %v", err) + log.Errorf("error adding sequence to ethtxmanager: %v", err) return err } if !resend { @@ -586,6 +611,7 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com ToBatch: valueToBatch, OnMonitor: true, To: valueToAddress, + Gas: gas, } // Add tx to internal structure @@ -604,7 +630,7 @@ func (s *SequenceSender) sendTx(ctx context.Context, resend bool, txOldHash *com // Save sent sequences err = s.saveSentSequencesTransactions(ctx) if err != nil { - log.Errorf("[SeqSender] error saving tx sequence sent, error: %v", err) + log.Errorf("error saving tx sequence sent, error: %v", err) } return nil } @@ -639,7 +665,7 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) (seqsendertypes // If the coinbase changes, the sequence ends here if len(sequenceBatches) > 0 && batch.LastCoinbase() != prevCoinbase { - log.Infof("[SeqSender] batch with different coinbase (batch %v, sequence %v), sequence will be sent to this point", prevCoinbase, batch.LastCoinbase) + log.Infof("batch with different coinbase (batch %v, sequence %v), sequence will be sent to this point", prevCoinbase, batch.LastCoinbase) return s.TxBuilder.NewSequence(sequenceBatches, s.cfg.L2Coinbase) } prevCoinbase = batch.LastCoinbase() @@ -657,23 +683,23 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) (seqsendertypes // Check if the current batch is the last before a change to a new forkid, in this case we need to close and send the sequence to L1 if (s.cfg.ForkUpgradeBatchNumber != 0) && (batchNumber == (s.cfg.ForkUpgradeBatchNumber)) { - log.Infof("[SeqSender] sequence should be sent to L1, as we have reached the batch %d from which a new forkid is applied (upgrade)", s.cfg.ForkUpgradeBatchNumber) + log.Infof("sequence should be sent to L1, as we have reached the batch %d from which a new forkid is applied (upgrade)", s.cfg.ForkUpgradeBatchNumber) return s.TxBuilder.NewSequence(sequenceBatches, s.cfg.L2Coinbase) } } // Reached the latest batch. Decide if it's worth to send the sequence, or wait for new batches if len(sequenceBatches) == 0 { - log.Infof("[SeqSender] no batches to be sequenced") + log.Infof("no batches to be sequenced") return nil, nil } if s.latestVirtualTime.Before(time.Now().Add(-s.cfg.LastBatchVirtualizationTimeMaxWaitPeriod.Duration)) { - log.Infof("[SeqSender] sequence should be sent, too much time without sending anything to L1") + log.Infof("sequence should be sent, too much time without sending anything to L1") return s.TxBuilder.NewSequence(sequenceBatches, s.cfg.L2Coinbase) } - log.Infof("[SeqSender] not enough time has passed since last batch was virtualized and the sequence could be bigger") + log.Infof("not enough time has passed since last batch was virtualized and the sequence could be bigger") return nil, nil } @@ -681,17 +707,17 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) (seqsendertypes func (s *SequenceSender) loadSentSequencesTransactions() error { // Check if file exists if _, err := os.Stat(s.cfg.SequencesTxFileName); os.IsNotExist(err) { - log.Infof("[SeqSender] file not found %s: %v", s.cfg.SequencesTxFileName, err) + log.Infof("file not found %s: %v", s.cfg.SequencesTxFileName, err) return nil } else if err != nil { - log.Errorf("[SeqSender] error opening file %s: %v", s.cfg.SequencesTxFileName, err) + log.Errorf("error opening file %s: %v", s.cfg.SequencesTxFileName, err) return err } // Read file data, err := os.ReadFile(s.cfg.SequencesTxFileName) if err != nil { - log.Errorf("[SeqSender] error reading file %s: %v", s.cfg.SequencesTxFileName, err) + log.Errorf("error reading file %s: %v", s.cfg.SequencesTxFileName, err) return err } @@ -700,7 +726,7 @@ func (s *SequenceSender) loadSentSequencesTransactions() error { err = json.Unmarshal(data, &s.ethTransactions) s.mutexEthTx.Unlock() if err != nil { - log.Errorf("[SeqSender] error decoding data from %s: %v", s.cfg.SequencesTxFileName, err) + log.Errorf("error decoding data from %s: %v", s.cfg.SequencesTxFileName, err) return err } @@ -718,7 +744,7 @@ func (s *SequenceSender) saveSentSequencesTransactions(ctx context.Context) erro fileName := s.cfg.SequencesTxFileName[0:strings.IndexRune(s.cfg.SequencesTxFileName, '.')] + ".tmp" s.sequencesTxFile, err = os.Create(fileName) if err != nil { - log.Errorf("[SeqSender] error creating file %s: %v", fileName, err) + log.Errorf("error creating file %s: %v", fileName, err) return err } defer s.sequencesTxFile.Close() @@ -730,43 +756,90 @@ func (s *SequenceSender) saveSentSequencesTransactions(ctx context.Context) erro err = encoder.Encode(s.ethTransactions) s.mutexEthTx.Unlock() if err != nil { - log.Errorf("[SeqSender] error writing file %s: %v", fileName, err) + log.Errorf("error writing file %s: %v", fileName, err) return err } // Rename the new file err = os.Rename(fileName, s.cfg.SequencesTxFileName) if err != nil { - log.Errorf("[SeqSender] error renaming file %s to %s: %v", fileName, s.cfg.SequencesTxFileName, err) + log.Errorf("error renaming file %s to %s: %v", fileName, s.cfg.SequencesTxFileName, err) return err } return nil } +func (s *SequenceSender) entryTypeToString(entryType datastream.EntryType) string { + switch entryType { + case datastream.EntryType_ENTRY_TYPE_BATCH_START: + return "BatchStart" + case datastream.EntryType_ENTRY_TYPE_L2_BLOCK: + return "L2Block" + case datastream.EntryType_ENTRY_TYPE_TRANSACTION: + return "Transaction" + case datastream.EntryType_ENTRY_TYPE_BATCH_END: + return "BatchEnd" + default: + return fmt.Sprintf("%d", entryType) + } +} + // handleReceivedDataStream manages the events received by the streaming -func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c *datastreamer.StreamClient, ss *datastreamer.StreamServer) error { - dsType := datastream.EntryType(e.Type) +func (s *SequenceSender) handleReceivedDataStream(entry *datastreamer.FileEntry, client *datastreamer.StreamClient, server *datastreamer.StreamServer) error { + dsType := datastream.EntryType(entry.Type) + + var prevEntryType datastream.EntryType + if s.prevStreamEntry != nil { + prevEntryType = datastream.EntryType(s.prevStreamEntry.Type) + } switch dsType { case datastream.EntryType_ENTRY_TYPE_L2_BLOCK: // Handle stream entry: L2Block l2Block := &datastream.L2Block{} - err := proto.Unmarshal(e.Data, l2Block) + + err := proto.Unmarshal(entry.Data, l2Block) if err != nil { - log.Errorf("[SeqSender] error unmarshalling L2Block: %v", err) + log.Errorf("error unmarshalling L2Block: %v", err) return err } + log.Infof("received L2Block entry, l2Block.Number: %d, l2Block.BatchNumber: %d, entry.Number: %d", l2Block.Number, l2Block.BatchNumber, entry.Number) + + // Sanity checks + if s.prevStreamEntry != nil && !(prevEntryType == datastream.EntryType_ENTRY_TYPE_BATCH_START || prevEntryType == datastream.EntryType_ENTRY_TYPE_L2_BLOCK || prevEntryType == datastream.EntryType_ENTRY_TYPE_TRANSACTION) { + log.Fatalf("unexpected L2Block entry received, entry.Number: %d, l2Block.Number: %d, prevEntry: %s, prevEntry.Number: %d", + entry.Number, l2Block.Number, s.entryTypeToString(prevEntryType), s.prevStreamEntry.Number) + } else if prevEntryType == datastream.EntryType_ENTRY_TYPE_L2_BLOCK { + prevL2Block := &datastream.L2Block{} + + err := proto.Unmarshal(s.prevStreamEntry.Data, prevL2Block) + if err != nil { + log.Errorf("error unmarshalling prevL2Block: %v", err) + return err + } + if l2Block.Number != prevL2Block.Number+1 { + log.Fatalf("unexpected L2Block number %d received, it should be %d, entry.Number: %d, prevEntry.Number: %d", + l2Block.Number, prevL2Block.Number+1, entry.Number, s.prevStreamEntry.Number) + } + } + // Already virtualized if l2Block.BatchNumber <= s.fromStreamBatch { if l2Block.BatchNumber != s.latestStreamBatch { - log.Infof("[SeqSender] skipped! batch already virtualized, number %d", l2Block.BatchNumber) + log.Infof("skipped! batch already virtualized, number %d", l2Block.BatchNumber) } } else if !s.validStream && l2Block.BatchNumber == s.fromStreamBatch+1 { // Initial case after startup s.addNewSequenceBatch(l2Block) s.validStream = true + } else { + // Handle whether it's only a new block or also a new batch + if l2Block.BatchNumber > s.wipBatch { + // Create new sequential batch + s.addNewSequenceBatch(l2Block) + } } // Latest stream batch @@ -775,23 +848,11 @@ func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c * return nil } - // Handle whether it's only a new block or also a new batch - if l2Block.BatchNumber > s.wipBatch { - // New batch in the sequence - // Close current batch - err := s.closeSequenceBatch() - if err != nil { - log.Fatalf("[SeqSender] error closing wip batch") - return err - } - - // Create new sequential batch - s.addNewSequenceBatch(l2Block) - } - // Add L2 block s.addNewBatchL2Block(l2Block) + s.prevStreamEntry = entry + case datastream.EntryType_ENTRY_TYPE_TRANSACTION: // Handle stream entry: Transaction if !s.validStream { @@ -799,15 +860,25 @@ func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c * } l2Tx := &datastream.Transaction{} - err := proto.Unmarshal(e.Data, l2Tx) + err := proto.Unmarshal(entry.Data, l2Tx) if err != nil { - log.Errorf("[SeqSender] error unmarshalling Transaction: %v", err) + log.Errorf("error unmarshalling Transaction: %v", err) return err } + log.Debugf("received Transaction entry, tx.L2BlockNumber: %d, tx.Index: %d, entry.Number: %d", l2Tx.L2BlockNumber, l2Tx.Index, entry.Number) + + // Sanity checks + if !(prevEntryType == datastream.EntryType_ENTRY_TYPE_L2_BLOCK || prevEntryType == datastream.EntryType_ENTRY_TYPE_TRANSACTION) { + log.Fatalf("unexpected Transaction entry received, entry.Number: %d, transaction.L2BlockNumber: %d, transaction.Index: %d, prevEntry: %s, prevEntry.Number: %d", + entry.Number, l2Tx.L2BlockNumber, l2Tx.Index, s.entryTypeToString(prevEntryType), s.prevStreamEntry.Number) + } + // Add tx data s.addNewBlockTx(l2Tx) + s.prevStreamEntry = entry + case datastream.EntryType_ENTRY_TYPE_BATCH_START: // Handle stream entry: BatchStart if !s.validStream { @@ -815,15 +886,27 @@ func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c * } batch := &datastream.BatchStart{} - err := proto.Unmarshal(e.Data, batch) + err := proto.Unmarshal(entry.Data, batch) if err != nil { - log.Errorf("[SeqSender] error unmarshalling BatchStart: %v", err) + log.Errorf("error unmarshalling BatchStart: %v", err) return err } + log.Infof("received BatchStart entry, batchStart.Number: %d, entry.Number: %d", batch.Number, entry.Number) + + // Sanity checks + if !(prevEntryType == datastream.EntryType_ENTRY_TYPE_BATCH_END) { + log.Fatalf("unexpected BatchStart entry received, entry.Number: %d, batchStart.Number: %d, prevEntry.Type: %s, prevEntry.Number: %d", + entry.Number, batch.Number, s.entryTypeToString(prevEntryType), s.prevStreamEntry.Number) + } else if batch.Number != s.wipBatch+1 { + log.Fatalf("unexpected BatchStart.Number %d received, if should be wipBatch %d+1, entry.Number: %d", s.wipBatch, batch.Number, entry.Number) + } + // Add batch start data s.addInfoSequenceBatchStart(batch) + s.prevStreamEntry = entry + case datastream.EntryType_ENTRY_TYPE_BATCH_END: // Handle stream entry: BatchEnd if !s.validStream { @@ -831,14 +914,31 @@ func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c * } batch := &datastream.BatchEnd{} - err := proto.Unmarshal(e.Data, batch) + err := proto.Unmarshal(entry.Data, batch) if err != nil { - log.Errorf("[SeqSender] error unmarshalling BatchEnd: %v", err) + log.Errorf("error unmarshalling BatchEnd: %v", err) return err } + log.Infof("received BatchEnd entry, batchEnd.Number: %d, entry.Number: %d", batch.Number, entry.Number) + + // Sanity checks + if !(prevEntryType == datastream.EntryType_ENTRY_TYPE_L2_BLOCK || prevEntryType == datastream.EntryType_ENTRY_TYPE_TRANSACTION) { + log.Fatalf("unexpected BatchEnd entry received, entry.Number: %d, batchEnd.Number: %d, prevEntry.Type: %s, prevEntry.Number: %d", + entry.Number, batch.Number, s.entryTypeToString(prevEntryType), s.prevStreamEntry.Number) + } + // Add batch end data s.addInfoSequenceBatchEnd(batch) + + // Close current wip batch + err = s.closeSequenceBatch() + if err != nil { + log.Fatalf("error closing wip batch") + return err + } + + s.prevStreamEntry = entry } return nil @@ -847,32 +947,87 @@ func (s *SequenceSender) handleReceivedDataStream(e *datastreamer.FileEntry, c * // closeSequenceBatch closes the current batch func (s *SequenceSender) closeSequenceBatch() error { s.mutexSequence.Lock() - log.Infof("[SeqSender] closing batch %d", s.wipBatch) + defer s.mutexSequence.Unlock() + + log.Infof("closing batch %d", s.wipBatch) data := s.sequenceData[s.wipBatch] if data != nil { data.batchClosed = true - l2Data, err := state.EncodeBatchV2(data.batchRaw) + + batchL2Data, err := state.EncodeBatchV2(data.batchRaw) if err != nil { - log.Errorf("[SeqSender] error closing and encoding the batch %d: %v", s.wipBatch, err) + log.Errorf("error closing and encoding the batch %d: %v", s.wipBatch, err) return err } - data.batch.SetL2Data(l2Data) + + data.batch.SetL2Data(batchL2Data) + } else { + log.Fatalf("wipBatch %d not found in sequenceData slice", s.wipBatch) + } + + // Sanity Check + if s.cfg.SanityCheckRPCURL != "" { + rpcNumberOfBlocks, batchL2Data, err := s.getBatchFromRPC(s.wipBatch) + if err != nil { + log.Fatalf("error getting batch number from RPC while trying to perform sanity check: %v", err) + } else { + dsNumberOfBlocks := len(s.sequenceData[s.wipBatch].batchRaw.Blocks) + if rpcNumberOfBlocks != dsNumberOfBlocks { + log.Fatalf("number of blocks in batch %d (%d) does not match the number of blocks in the batch from the RPC (%d)", s.wipBatch, dsNumberOfBlocks, rpcNumberOfBlocks) + } + + if data.batchType == datastream.BatchType_BATCH_TYPE_REGULAR && common.Bytes2Hex(data.batch.L2Data()) != batchL2Data { + log.Infof("datastream batchL2Data: %s", common.Bytes2Hex(data.batch.L2Data())) + log.Infof("RPC batchL2Data: %s", batchL2Data) + log.Fatalf("batchL2Data in batch %d does not match batchL2Data from the RPC (%d)", s.wipBatch) + } + + log.Infof("sanity check of batch %d against RPC successful", s.wipBatch) + } + } else { + log.Warnf("config param SanityCheckRPCURL not set, sanity check with RPC can't be done") } - s.mutexSequence.Unlock() return nil } +func (s *SequenceSender) getBatchFromRPC(batchNumber uint64) (int, string, error) { + type zkEVMBatch struct { + Blocks []string `mapstructure:"blocks"` + BatchL2Data string `mapstructure:"batchL2Data"` + } + + zkEVMBatchData := zkEVMBatch{} + + response, err := rpc.JSONRPCCall(s.cfg.SanityCheckRPCURL, "zkevm_getBatchByNumber", batchNumber) + if err != nil { + return 0, "", err + } + + // Check if the response is an error + if response.Error != nil { + return 0, "", fmt.Errorf("error in the response calling zkevm_getBatchByNumber: %v", response.Error) + } + + // Get the batch number from the response hex string + err = json.Unmarshal(response.Result, &zkEVMBatchData) + if err != nil { + return 0, "", fmt.Errorf("error unmarshalling the batch number from the response calling zkevm_getBatchByNumber: %v", err) + } + + return len(zkEVMBatchData.Blocks), zkEVMBatchData.BatchL2Data, nil +} + // addNewSequenceBatch adds a new batch to the sequence func (s *SequenceSender) addNewSequenceBatch(l2Block *datastream.L2Block) { s.mutexSequence.Lock() - log.Infof("[SeqSender] ...new batch, number %d", l2Block.BatchNumber) + log.Infof("...new batch, number %d", l2Block.BatchNumber) if l2Block.BatchNumber > s.wipBatch+1 { - s.logFatalf("[SeqSender] new batch number (%d) is not consecutive to the current one (%d)", l2Block.BatchNumber, s.wipBatch) + s.logFatalf("new batch number (%d) is not consecutive to the current one (%d)", l2Block.BatchNumber, s.wipBatch) } else if l2Block.BatchNumber < s.wipBatch { - s.logFatalf("[SeqSender] new batch number (%d) is lower than the current one (%d)", l2Block.BatchNumber, s.wipBatch) + s.logFatalf("new batch number (%d) is lower than the current one (%d)", l2Block.BatchNumber, s.wipBatch) } batch := s.TxBuilder.NewBatchFromL2Block(l2Block) @@ -897,15 +1052,16 @@ func (s *SequenceSender) addNewSequenceBatch(l2Block *datastream.L2Block) { // addInfoSequenceBatchStart adds info from the batch start func (s *SequenceSender) addInfoSequenceBatchStart(batch *datastream.BatchStart) { s.mutexSequence.Lock() - log.Infof("[SeqSender] batch %d (%s) Start: type %d forkId %d chainId %d", batch.Number, datastream.BatchType_name[int32(batch.Type)], batch.Type, batch.ForkId, batch.ChainId) + log.Infof("batch %d (%s) Start: type %d forkId %d chainId %d", batch.Number, datastream.BatchType_name[int32(batch.Type)], batch.Type, batch.ForkId, batch.ChainId) // Current batch data := s.sequenceData[s.wipBatch] if data != nil { wipBatch := data.batch if wipBatch.BatchNumber()+1 != batch.Number { - s.logFatalf("[SeqSender] batch start number (%d) does not match the current consecutive one (%d)", batch.Number, wipBatch.BatchNumber) + s.logFatalf("batch start number (%d) does not match the current consecutive one (%d)", batch.Number, wipBatch.BatchNumber) } + data.batchType = batch.Type } s.mutexSequence.Unlock() @@ -922,7 +1078,7 @@ func (s *SequenceSender) addInfoSequenceBatchEnd(batch *datastream.BatchEnd) { if wipBatch.BatchNumber() == batch.Number { // wipBatch.StateRoot = common.BytesToHash(batch) TODO: check if this is needed } else { - s.logFatalf("[SeqSender] batch end number (%d) does not match the current one (%d)", batch.Number, wipBatch.BatchNumber) + s.logFatalf("batch end number (%d) does not match the current one (%d)", batch.Number, wipBatch.BatchNumber) } } @@ -932,7 +1088,7 @@ func (s *SequenceSender) addInfoSequenceBatchEnd(batch *datastream.BatchEnd) { // addNewBatchL2Block adds a new L2 block to the work in progress batch func (s *SequenceSender) addNewBatchL2Block(l2Block *datastream.L2Block) { s.mutexSequence.Lock() - log.Infof("[SeqSender] .....new L2 block, number %d (batch %d)", l2Block.Number, l2Block.BatchNumber) + log.Infof(".....new L2 block, number %d (batch %d)", l2Block.Number, l2Block.BatchNumber) // Current batch data := s.sequenceData[s.wipBatch] @@ -941,7 +1097,7 @@ func (s *SequenceSender) addNewBatchL2Block(l2Block *datastream.L2Block) { data.batch.SetLastL2BLockTimestamp(l2Block.Timestamp) // Sanity check: should be the same coinbase within the batch if common.BytesToAddress(l2Block.Coinbase) != data.batch.LastCoinbase() { - s.logFatalf("[SeqSender] coinbase changed within the batch! (Previous %v, Current %v)", data.batch.LastCoinbase, common.BytesToAddress(l2Block.Coinbase)) + s.logFatalf("coinbase changed within the batch! (Previous %v, Current %v)", data.batch.LastCoinbase, common.BytesToAddress(l2Block.Coinbase)) } data.batch.SetLastCoinbase(common.BytesToAddress(l2Block.Coinbase)) data.batch.SetL1InfoTreeIndex(l2Block.L1InfotreeIndex) @@ -953,7 +1109,7 @@ func (s *SequenceSender) addNewBatchL2Block(l2Block *datastream.L2Block) { // Get current L2 block _, blockRaw := s.getWipL2Block() if blockRaw == nil { - log.Debugf("[SeqSender] wip block %d not found!") + log.Debugf("wip block %d not found!") return } @@ -968,7 +1124,7 @@ func (s *SequenceSender) addNewBatchL2Block(l2Block *datastream.L2Block) { // addNewBlockTx adds a new Tx to the current L2 block func (s *SequenceSender) addNewBlockTx(l2Tx *datastream.Transaction) { s.mutexSequence.Lock() - log.Debugf("[SeqSender] ........new tx, length %d EGP %d SR %x..", len(l2Tx.Encoded), l2Tx.EffectiveGasPricePercentage, l2Tx.ImStateRoot[:8]) + log.Debugf("........new tx, length %d EGP %d SR %x..", len(l2Tx.Encoded), l2Tx.EffectiveGasPricePercentage, l2Tx.ImStateRoot[:8]) // Current L2 block _, blockRaw := s.getWipL2Block() @@ -976,7 +1132,7 @@ func (s *SequenceSender) addNewBlockTx(l2Tx *datastream.Transaction) { // New Tx raw tx, err := state.DecodeTx(common.Bytes2Hex(l2Tx.Encoded)) if err != nil { - log.Fatalf("[SeqSender] error decoding tx!") + log.Fatalf("error decoding tx: %v", err) return } @@ -1010,15 +1166,18 @@ func (s *SequenceSender) getWipL2Block() (uint64, *state.L2BlockRaw) { // updateLatestVirtualBatch queries the value in L1 and updates the latest virtual batch field func (s *SequenceSender) updateLatestVirtualBatch() error { + s.latestVirtualBatchLock.Lock() + defer s.latestVirtualBatchLock.Unlock() + // Get latest virtual state batch from L1 var err error s.latestVirtualBatch, err = s.etherman.GetLatestBatchNumber() if err != nil { - log.Errorf("[SeqSender] error getting latest virtual batch, error: %v", err) + log.Errorf("error getting latest virtual batch, error: %v", err) return errors.New("fail to get latest virtual batch") } else { - log.Infof("[SeqSender] latest virtual batch is %d", s.latestVirtualBatch) + log.Infof("latest virtual batch is %d", s.latestVirtualBatch) } return nil } @@ -1053,7 +1212,7 @@ func (s *SequenceSender) marginTimeElapsed(l2BlockTimestamp uint64, currentTime func (s *SequenceSender) logFatalf(template string, args ...interface{}) { s.seqSendingStopped = true log.Errorf(template, args...) - log.Errorf("[SeqSender] sequence sending stopped.") + log.Errorf("sequence sending stopped.") for { time.Sleep(1 * time.Second) } @@ -1067,7 +1226,7 @@ func printBatch(raw *state.BatchRawV2, showBlock bool, showTx bool) { totalL2Txs += len(raw.Blocks[k].Transactions) } - log.Debugf("[SeqSender] // #blocks: %d, #L2txs: %d", len(raw.Blocks), totalL2Txs) + log.Debugf("// #blocks: %d, #L2txs: %d", len(raw.Blocks), totalL2Txs) // Blocks info if showBlock { @@ -1081,17 +1240,17 @@ func printBatch(raw *state.BatchRawV2, showBlock bool, showTx bool) { lastBlock = &raw.Blocks[numBlocks-1] } if firstBlock != nil { - log.Debugf("[SeqSender] // block first (indL1info: %d, delta-timestamp: %d, #L2txs: %d)", firstBlock.IndexL1InfoTree, firstBlock.DeltaTimestamp, len(firstBlock.Transactions)) + log.Debugf("// block first (indL1info: %d, delta-timestamp: %d, #L2txs: %d)", firstBlock.IndexL1InfoTree, firstBlock.DeltaTimestamp, len(firstBlock.Transactions)) // Tx info if showTx { for iTx, tx := range firstBlock.Transactions { v, r, s := tx.Tx.RawSignatureValues() - log.Debugf("[SeqSender] // tx(%d) effPct: %d, encoded: %t, v: %v, r: %v, s: %v", iTx, tx.EfficiencyPercentage, tx.TxAlreadyEncoded, v, r, s) + log.Debugf("// tx(%d) effPct: %d, encoded: %t, v: %v, r: %v, s: %v", iTx, tx.EfficiencyPercentage, tx.TxAlreadyEncoded, v, r, s) } } } if lastBlock != nil { - log.Debugf("[SeqSender] // block last (indL1info: %d, delta-timestamp: %d, #L2txs: %d)", lastBlock.DeltaTimestamp, lastBlock.DeltaTimestamp, len(lastBlock.Transactions)) + log.Debugf("// block last (indL1info: %d, delta-timestamp: %d, #L2txs: %d)", lastBlock.DeltaTimestamp, lastBlock.DeltaTimestamp, len(lastBlock.Transactions)) } } } diff --git a/sequencesender/sequencesender_test.go b/sequencesender/sequencesender_test.go index ca0d33e7..c16fda42 100644 --- a/sequencesender/sequencesender_test.go +++ b/sequencesender/sequencesender_test.go @@ -12,6 +12,7 @@ import ( const ( txStreamEncoded1 = "f86508843b9aca0082520894617b3a3528f9cdd6630fd3301b9c8911f7bf063d0a808207f5a0579b72a1c1ffdd845fba45317540982109298e2ec8d67ddf2cdaf22e80903677a01831e9a01291c7ea246742a5b5a543ca6938bfc3f6958c22be06fad99274e4ac" txStreamEncoded2 = "f86509843b9aca0082520894617b3a3528f9cdd6630fd3301b9c8911f7bf063d0a808207f5a0908a522075e09485166ffa7630cd2b7013897fa1f1238013677d6f0a86efb3d2a0068b12435fcdc8ee254f3b1df8c5b29ed691eeee6065704f061130935976ca99" + txStreamEncoded3 = "b8b402f8b101268505d21dba0085076c363d8982dc60941929761e87667283f087ea9ab8370c174681b4e980b844095ea7b300000000000000000000000080a64c6d7f12c47b7c66c5b4e20e72bc1fcd5d9effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffc001a0dd4db494969139a120e8721842455ec13f82757a4fc49b66d447c7d32d095a1da06ef54068a9aa67ecc4f52d885299a04feb6f3531cdfc771f1412cd3331d1ba4c" ) func TestStreamTx(t *testing.T) { @@ -19,6 +20,8 @@ func TestStreamTx(t *testing.T) { require.NoError(t, err) tx2, err := state.DecodeTx(txStreamEncoded2) require.NoError(t, err) + tx3, err := state.DecodeTx(txStreamEncoded3) + require.NoError(t, err) txTest := state.L2TxRaw{ EfficiencyPercentage: 129, @@ -48,6 +51,11 @@ func TestStreamTx(t *testing.T) { TxAlreadyEncoded: false, Tx: tx2, }, + { + EfficiencyPercentage: 97, + TxAlreadyEncoded: false, + Tx: tx3, + }, }, }, }, diff --git a/sequencesender/txbuilder/banana_types.go b/sequencesender/txbuilder/banana_types.go index f44a753f..ffa5d804 100644 --- a/sequencesender/txbuilder/banana_types.go +++ b/sequencesender/txbuilder/banana_types.go @@ -100,19 +100,20 @@ func (b *BananaBatch) BatchNumber() uint64 { func (b BananaBatch) DeepCopy() seqsendertypes.Batch { return &BananaBatch{b.Batch} - } func (b *BananaBatch) SetL2Data(data []byte) { b.Batch.L2Data = data - } + func (b *BananaBatch) SetLastCoinbase(address common.Address) { b.Batch.LastCoinbase = address } + func (b *BananaBatch) SetLastL2BLockTimestamp(ts uint64) { b.Batch.LastL2BLockTimestamp = ts } + func (b *BananaBatch) SetL1InfoTreeIndex(index uint32) { b.Batch.L1InfoTreeIndex = index } @@ -130,3 +131,11 @@ func (b *BananaBatch) String() string { b.LastCoinbase().String(), b.ForcedBatchTimestamp(), b.ForcedGlobalExitRoot().String(), b.ForcedBlockHashL1().String(), b.L2Data(), b.LastL2BLockTimestamp(), b.BatchNumber(), b.GlobalExitRoot().String(), b.L1InfoTreeIndex(), ) } + +func (b *BananaSequence) LastVirtualBatchNumber() uint64 { + return b.SequenceBanana.LastVirtualBatchNumber +} + +func (b *BananaSequence) SetLastVirtualBatchNumber(batchNumber uint64) { + b.SequenceBanana.LastVirtualBatchNumber = batchNumber +} diff --git a/sequencesender/txbuilder/banana_validium.go b/sequencesender/txbuilder/banana_validium.go index 2472f62a..c7309dfc 100644 --- a/sequencesender/txbuilder/banana_validium.go +++ b/sequencesender/txbuilder/banana_validium.go @@ -75,7 +75,7 @@ func (t *TxBuilderBananaValidium) BuildSequenceBatchesTx(ctx context.Context, se // Build sequence data tx, err := t.internalBuildSequenceBatchesTx(ethseq, dataAvailabilityMessage) if err != nil { - log.Errorf("[SeqSender] error estimating new sequenceBatches to add to ethtxmanager: ", err) + log.Errorf("error estimating new sequenceBatches to add to ethtxmanager: ", err) return nil, err } return tx, nil @@ -116,7 +116,6 @@ func (t *TxBuilderBananaValidium) sequenceBatchesValidium(opts bind.TransactOpts log.Debugf("Batches to send: %+v", batches) log.Debug("l2CoinBase: ", sequence.L2Coinbase) log.Debug("Sequencer address: ", opts.From) - } return tx, err diff --git a/sequencesender/txbuilder/banana_zkevm.go b/sequencesender/txbuilder/banana_zkevm.go index ada3d900..3a95cdff 100644 --- a/sequencesender/txbuilder/banana_zkevm.go +++ b/sequencesender/txbuilder/banana_zkevm.go @@ -64,7 +64,7 @@ func (t *TxBuilderBananaZKEVM) BuildSequenceBatchesTx(ctx context.Context, seque // Build sequence data tx, err := t.sequenceBatchesRollup(newopts, ethseq) if err != nil { - log.Errorf("[SeqSender] error estimating new sequenceBatches to add to ethtxmanager: ", err) + log.Errorf("error estimating new sequenceBatches to add to ethtxmanager: ", err) return nil, err } return tx, nil @@ -91,7 +91,6 @@ func (t *TxBuilderBananaZKEVM) sequenceBatchesRollup(opts bind.TransactOpts, seq log.Debugf("Batches to send: %+v", batches) log.Debug("l2CoinBase: ", sequence.L2Coinbase) log.Debug("Sequencer address: ", opts.From) - } return tx, err diff --git a/sequencesender/txbuilder/elderberry_base.go b/sequencesender/txbuilder/elderberry_base.go index 64971e9c..23081b58 100644 --- a/sequencesender/txbuilder/elderberry_base.go +++ b/sequencesender/txbuilder/elderberry_base.go @@ -49,5 +49,8 @@ func getLastSequencedBatchNumber(sequences seqsendertypes.Sequence) uint64 { if sequences.FirstBatch().BatchNumber() == 0 { panic("First batch number is 0, that is not allowed!") } + if sequences.LastVirtualBatchNumber() != 0 { + return sequences.LastVirtualBatchNumber() + } return sequences.FirstBatch().BatchNumber() - 1 } diff --git a/sequencesender/txbuilder/elderberry_types.go b/sequencesender/txbuilder/elderberry_types.go index bbc7d56a..72809957 100644 --- a/sequencesender/txbuilder/elderberry_types.go +++ b/sequencesender/txbuilder/elderberry_types.go @@ -9,8 +9,9 @@ import ( ) type ElderberrySequence struct { - l2Coinbase common.Address - batches []seqsendertypes.Batch + l2Coinbase common.Address + batches []seqsendertypes.Batch + lastVirtualBatchNumber uint64 } func NewElderberrySequence(batches []seqsendertypes.Batch, l2Coinbase common.Address) *ElderberrySequence { @@ -53,6 +54,7 @@ func (b *ElderberrySequence) Len() int { func (b *ElderberrySequence) L2Coinbase() common.Address { return b.l2Coinbase } + func (b *ElderberrySequence) String() string { res := fmt.Sprintf("Seq/Elderberry: L2Coinbase: %s, Batches: %d", b.l2Coinbase.String(), len(b.batches)) for i, batch := range b.Batches() { @@ -60,3 +62,11 @@ func (b *ElderberrySequence) String() string { } return res } + +func (b *ElderberrySequence) SetLastVirtualBatchNumber(batchNumber uint64) { + b.lastVirtualBatchNumber = batchNumber +} + +func (b *ElderberrySequence) LastVirtualBatchNumber() uint64 { + return b.lastVirtualBatchNumber +} diff --git a/sequencesender/txbuilder/validium_cond_num_batches.go b/sequencesender/txbuilder/validium_cond_num_batches.go index fd1ad22a..93755c9b 100644 --- a/sequencesender/txbuilder/validium_cond_num_batches.go +++ b/sequencesender/txbuilder/validium_cond_num_batches.go @@ -23,7 +23,7 @@ func NewConditionalNewSequenceNumBatches(maxBatchesForL1 uint64) *ConditionalNew func (c *ConditionalNewSequenceNumBatches) NewSequenceIfWorthToSend(ctx context.Context, txBuilder TxBuilder, sequenceBatches []seqsendertypes.Batch, l2Coinbase common.Address) (seqsendertypes.Sequence, error) { if c.maxBatchesForL1 != MaxBatchesForL1Disabled && len(sequenceBatches) >= int(c.maxBatchesForL1) { log.Infof( - "[SeqSender] sequence should be sent to L1, because MaxBatchesForL1 (%d) has been reached", + "sequence should be sent to L1, because MaxBatchesForL1 (%d) has been reached", c.maxBatchesForL1, ) return txBuilder.NewSequence(sequenceBatches, l2Coinbase) diff --git a/state/helper.go b/state/helper.go index efb1b1df..c717fb56 100644 --- a/state/helper.go +++ b/state/helper.go @@ -1,6 +1,7 @@ package state import ( + "bytes" "fmt" "math/big" "strconv" @@ -35,19 +36,24 @@ func prepareRLPTxData(tx *types.Transaction) ([]byte, error) { v, r, s := tx.RawSignatureValues() sign := 1 - (v.Uint64() & 1) - nonce, gasPrice, gas, to, value, data, chainID := tx.Nonce(), tx.GasPrice(), tx.Gas(), tx.To(), tx.Value(), tx.Data(), tx.ChainId() - rlpFieldsToEncode := []interface{}{ - nonce, - gasPrice, - gas, - to, - value, - data, + tx.Nonce(), + } + + if tx.Type() == types.DynamicFeeTxType { + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.GasTipCap()) + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.GasFeeCap()) + } else { + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.GasPrice()) } - if !IsPreEIP155Tx(tx) { - rlpFieldsToEncode = append(rlpFieldsToEncode, chainID) + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.Gas()) + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.To()) + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.Value()) + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.Data()) + + if !isPreEIP155Tx(tx) { + rlpFieldsToEncode = append(rlpFieldsToEncode, tx.ChainId()) rlpFieldsToEncode = append(rlpFieldsToEncode, uint(0)) rlpFieldsToEncode = append(rlpFieldsToEncode, uint(0)) } @@ -61,11 +67,8 @@ func prepareRLPTxData(tx *types.Transaction) ([]byte, error) { newRPadded := fmt.Sprintf("%064s", r.Text(hex.Base)) newSPadded := fmt.Sprintf("%064s", s.Text(hex.Base)) newVPadded := fmt.Sprintf("%02s", newV.Text(hex.Base)) - txData, err := hex.DecodeString(hex.EncodeToString(txCodedRlp) + newRPadded + newSPadded + newVPadded) - if err != nil { - return nil, err - } - return txData, nil + + return hex.DecodeString(hex.EncodeToString(txCodedRlp) + newRPadded + newSPadded + newVPadded) } // DecodeTxs extracts Transactions for its encoded form @@ -173,16 +176,19 @@ func DecodeTx(encodedTx string) (*types.Transaction, error) { return nil, err } + reader := bytes.NewReader(b) + stream := rlp.NewStream(reader, 0) + tx := new(types.Transaction) - if err := tx.UnmarshalBinary(b); err != nil { + if err := tx.DecodeRLP(stream); err != nil { return nil, err } return tx, nil } -// IsPreEIP155Tx checks if the tx is a tx that has a chainID as zero and +// isPreEIP155Tx checks if the tx is a tx that has a chainID as zero and // V field is either 27 or 28 -func IsPreEIP155Tx(tx *types.Transaction) bool { +func isPreEIP155Tx(tx *types.Transaction) bool { v, _, _ := tx.RawSignatureValues() return tx.ChainId().Uint64() == 0 && (v.Uint64() == 27 || v.Uint64() == 28) } diff --git a/state/interfaces.go b/state/interfaces.go index a0c38337..ce825685 100644 --- a/state/interfaces.go +++ b/state/interfaces.go @@ -23,8 +23,8 @@ type storage interface { CleanupGeneratedProofs(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error) CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error) - AddBatch(ctx context.Context, batch *Batch, datastream []byte, dbTx pgx.Tx) error - GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*Batch, []byte, error) + AddBatch(ctx context.Context, dbBatch *DBBatch, dbTx pgx.Tx) error + GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*DBBatch, error) DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error } diff --git a/state/pgstatestorage/batch.go b/state/pgstatestorage/batch.go index ab8414d1..db818df5 100644 --- a/state/pgstatestorage/batch.go +++ b/state/pgstatestorage/batch.go @@ -2,6 +2,7 @@ package pgstatestorage import ( "context" + "errors" "github.com/0xPolygon/cdk/state" "github.com/ethereum/go-ethereum/common" @@ -9,24 +10,31 @@ import ( ) // AddBatch stores a batch -func (p *PostgresStorage) AddBatch(ctx context.Context, batch *state.Batch, datastream []byte, dbTx pgx.Tx) error { - const addInputHashSQL = "INSERT INTO aggregator.batch (batch_num, batch, datastream) VALUES ($1, $2, $3) ON CONFLICT (batch_num) DO UPDATE SET batch = $2, datastream = $3" +func (p *PostgresStorage) AddBatch(ctx context.Context, dbBatch *state.DBBatch, dbTx pgx.Tx) error { + const addInputHashSQL = "INSERT INTO aggregator.batch (batch_num, batch, datastream, witness) VALUES ($1, $2, $3, $4) ON CONFLICT (batch_num) DO UPDATE SET batch = $2, datastream = $3, witness = $4" e := p.getExecQuerier(dbTx) - _, err := e.Exec(ctx, addInputHashSQL, batch.BatchNumber, &batch, common.Bytes2Hex(datastream)) + _, err := e.Exec(ctx, addInputHashSQL, dbBatch.Batch.BatchNumber, &dbBatch.Batch, common.Bytes2Hex(dbBatch.Datastream), common.Bytes2Hex(dbBatch.Witness)) return err } // GetBatch gets a batch by a given batch number -func (p *PostgresStorage) GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, []byte, error) { - const getInputHashSQL = "SELECT batch, datastream FROM aggregator.batch WHERE batch_num = $1" +func (p *PostgresStorage) GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.DBBatch, error) { + const getInputHashSQL = "SELECT batch, datastream, witness FROM aggregator.batch WHERE batch_num = $1" e := p.getExecQuerier(dbTx) - var batch *state.Batch + var batch state.Batch var streamStr string - err := e.QueryRow(ctx, getInputHashSQL, batchNumber).Scan(&batch, &streamStr) - if err != nil { - return nil, nil, err + var witnessStr string + err := e.QueryRow(ctx, getInputHashSQL, batchNumber).Scan(&batch, &streamStr, &witnessStr) + if errors.Is(err, pgx.ErrNoRows) { + return nil, state.ErrNotFound + } else if err != nil { + return nil, err } - return batch, common.Hex2Bytes(streamStr), nil + return &state.DBBatch{ + Batch: batch, + Datastream: common.Hex2Bytes(streamStr), + Witness: common.Hex2Bytes(witnessStr), + }, nil } // DeleteBatchesOlderThanBatchNumber deletes batches previous to the given batch number diff --git a/state/types.go b/state/types.go index 40aaa88b..d5a8d155 100644 --- a/state/types.go +++ b/state/types.go @@ -47,3 +47,10 @@ type Sequence struct { FromBatchNumber uint64 ToBatchNumber uint64 } + +// DBBatch struct is a wrapper for the state.Batch and its metadata +type DBBatch struct { + Batch Batch + Datastream []byte + Witness []byte +} diff --git a/test/config/test.config.toml b/test/config/test.config.toml index 9cc1ef0b..7acbd9c6 100644 --- a/test/config/test.config.toml +++ b/test/config/test.config.toml @@ -13,6 +13,7 @@ SequencesTxFileName = "sequencesender.json" GasOffset = 80000 WaitPeriodPurgeTxFile = "60m" MaxPendingTx = 1 +SanityCheckRPCURL = "http://127.0.0.1:8123" [SequenceSender.StreamClient] Server = "127.0.0.1:6900" [SequenceSender.EthTxManager] @@ -47,6 +48,7 @@ ProofStatePollingInterval = "5s" SenderAddress = "0x3f2963d678442c4af27a797453b64ef6ce9443e9" CleanupLockedProofsInterval = "2m" GeneratingProofCleanupThreshold = "10m" +BatchProofSanityCheckEnabled = true ForkId = 9 GasOffset = 0 WitnessURL = "http://zkevm-erigon-seq:8123" diff --git a/test/config/test.kurtosis_template.toml b/test/config/test.kurtosis_template.toml index 061a4838..3ca151ce 100644 --- a/test/config/test.kurtosis_template.toml +++ b/test/config/test.kurtosis_template.toml @@ -27,6 +27,7 @@ SequencesTxFileName = "sequencesender.json" GasOffset = 80000 WaitPeriodPurgeTxFile = "15m" MaxPendingTx = 1 +SanityCheckRPCURL = "http://127.0.0.1:8123" [SequenceSender.StreamClient] Server = "127.0.0.1:${zkevm_data_streamer_port}" [SequenceSender.EthTxManager] @@ -59,6 +60,7 @@ ProofStatePollingInterval = "5s" SenderAddress = "" CleanupLockedProofsInterval = "2m" GeneratingProofCleanupThreshold = "10m" +BatchProofSanityCheckEnabled = true ForkId = 9 GasOffset = 0 WitnessURL = "localhost:8123"