Skip to content

Commit

Permalink
Bring the latests changes from zkevm-aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
vcastellm committed Jul 31, 2024
1 parent 8d06c3f commit d9e1a17
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 67 deletions.
87 changes: 40 additions & 47 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package aggregator

import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/json"
Expand Down Expand Up @@ -285,7 +284,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
}

// Compare BatchL2Data from L1 and DataStream
if common.Bytes2Hex(batchl2Data) != common.Bytes2Hex(virtualBatch.BatchL2Data) {
if common.Bytes2Hex(batchl2Data) != common.Bytes2Hex(virtualBatch.BatchL2Data) && a.currentStreamBatch.Type != datastream.BatchType_BATCH_TYPE_INJECTED {
log.Warnf("BatchL2Data from L1 and data stream are different for batch %d", a.currentStreamBatch.BatchNumber)

if a.currentStreamBatch.Type == datastream.BatchType_BATCH_TYPE_INVALID {
Expand All @@ -296,7 +295,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
log.Warnf("L1 BatchL2Data:%v", common.Bytes2Hex(virtualBatch.BatchL2Data))
}

// Get L1InfoRoot
// Ger L1InfoRoot
sequence, err := a.l1Syncr.GetSequenceByBatchNumber(ctx, a.currentStreamBatch.BatchNumber)
if err != nil {
log.Errorf("Error getting sequence: %v", err)
Expand All @@ -317,7 +316,7 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli
a.currentStreamBatch.Timestamp = sequence.Timestamp

// Calculate Acc Input Hash
oldBatch, _, err := a.state.GetBatch(ctx, a.currentStreamBatch.BatchNumber-1, nil)
oldBatch, _, _, err := a.state.GetBatch(ctx, a.currentStreamBatch.BatchNumber-1, nil)
if err != nil {
log.Errorf("Error getting batch %d: %v", a.currentStreamBatch.BatchNumber-1, err)
return err
Expand All @@ -343,7 +342,14 @@ func (a *Aggregator) handleReceivedDataStream(entry *datastreamer.FileEntry, cli

a.currentStreamBatch.AccInputHash = accInputHash

err = a.state.AddBatch(ctx, &a.currentStreamBatch, a.currentBatchStreamData, nil)
// Get Witness
witness, err := getWitness(a.currentStreamBatch.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
log.Errorf("Failed to get witness for batch %d, err: %v", a.currentStreamBatch.BatchNumber, err)
return err
}

err = a.state.AddBatch(ctx, &a.currentStreamBatch, a.currentBatchStreamData, witness, nil)
if err != nil {
log.Errorf("Error adding batch: %v", err)
return err
Expand Down Expand Up @@ -465,7 +471,7 @@ func (a *Aggregator) Start(ctx context.Context) error {

// Store Acc Input Hash of the latest verified batch
dummyBatch := state.Batch{BatchNumber: lastVerifiedBatchNumber, AccInputHash: *accInputHash}
err = a.state.AddBatch(ctx, &dummyBatch, []byte{0}, nil)
err = a.state.AddBatch(ctx, &dummyBatch, []byte{0}, []byte{0}, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -619,7 +625,7 @@ func (a *Aggregator) sendFinalProof() {

a.startProofVerification()

finalBatch, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
finalBatch, _, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
if err != nil {
log.Errorf("Failed to retrieve batch with number [%d]: %v", proof.BatchNumberFinal, err)
a.endProofVerification()
Expand Down Expand Up @@ -765,7 +771,7 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
if string(finalProof.Public.NewStateRoot) == mockedStateRoot && string(finalProof.Public.NewLocalExitRoot) == mockedLocalExitRoot {
// This local exit root and state root come from the mock
// prover, use the one captured by the executor instead
finalBatch, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
finalBatch, _, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}
Expand All @@ -775,19 +781,6 @@ func (a *Aggregator) buildFinalProof(ctx context.Context, prover proverInterface
finalProof.Public.NewLocalExitRoot = finalBatch.LocalExitRoot.Bytes()
}

// Sanity Check: state root from the proof must match the one from the final batch
finalBatch, _, err := a.state.GetBatch(ctx, proof.BatchNumberFinal, nil)
if err != nil {
return nil, fmt.Errorf("failed to retrieve batch with number [%d]", proof.BatchNumberFinal)
}

if !bytes.Equal(finalProof.Public.NewStateRoot, finalBatch.StateRoot.Bytes()) {
for {
log.Errorf("State root from the proof [%#x] does not match the one from the batch [%#x]. HALTED", finalProof.Public.NewStateRoot, finalBatch.StateRoot.Bytes())
time.Sleep(a.cfg.RetryTime.Duration)
}
}

return finalProof, nil
}

Expand Down Expand Up @@ -1088,7 +1081,7 @@ func (a *Aggregator) tryAggregateProofs(ctx context.Context, prover proverInterf
log.Infof("Proof ID for aggregated proof: %v", *proof.ProofID)
log = log.WithFields("proofId", *proof.ProofID)

recursiveProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
recursiveProof, _, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
err = fmt.Errorf("failed to get aggregated proof from prover, %w", err)
log.Error(FirstToUpper(err.Error()))
Expand Down Expand Up @@ -1179,7 +1172,7 @@ func (a *Aggregator) getVerifiedBatchAccInputHash(ctx context.Context, batchNumb
return &accInputHash, nil
}

func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, *state.Proof, error) {
func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverInterface) (*state.Batch, []byte, *state.Proof, error) {
proverID := prover.ID()
proverName := prover.Name()

Expand All @@ -1195,7 +1188,7 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
// Get last virtual batch number from L1
lastVerifiedBatchNumber, err := a.etherman.GetLatestVerifiedBatchNum()
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

proofExists := true
Expand All @@ -1207,20 +1200,20 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
proofExists, err = a.state.CheckProofExistsForBatch(ctx, batchNumberToVerify, nil)
if err != nil {
log.Infof("Error checking proof exists for batch %d", batchNumberToVerify)
return nil, nil, err
return nil, nil, nil, err
}
}

// Check if the batch has been sequenced
sequence, err := a.l1Syncr.GetSequenceByBatchNumber(ctx, batchNumberToVerify)
if err != nil && !errors.Is(err, entities.ErrNotFound) {
return nil, nil, err
return nil, nil, nil, err
}

// Not found, so it it not possible to verify the batch yet
if sequence == nil || errors.Is(err, entities.ErrNotFound) {
log.Infof("No sequence found for batch %d", batchNumberToVerify)
return nil, nil, state.ErrNotFound
return nil, nil, nil, state.ErrNotFound
}

stateSequence := state.Sequence{
Expand All @@ -1231,12 +1224,12 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
err = a.state.AddSequence(ctx, stateSequence, nil)
if err != nil {
log.Infof("Error storing sequence for batch %d", batchNumberToVerify)
return nil, nil, err
return nil, nil, nil, err
}

batch, _, err := a.state.GetBatch(ctx, batchNumberToVerify, nil)
batch, _, witness, err := a.state.GetBatch(ctx, batchNumberToVerify, nil)
if err != nil {
return batch, nil, err
return batch, witness, nil, err
}

// All the data required to generate a proof is ready
Expand All @@ -1249,12 +1242,12 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
isProfitable, err := a.profitabilityChecker.IsProfitable(ctx, big.NewInt(0))
if err != nil {
log.Errorf("Failed to check aggregator profitability, err: %v", err)
return nil, nil, err
return nil, nil, nil, err
}

if !isProfitable {
log.Infof("Batch is not profitable, pol collateral %d", big.NewInt(0))
return nil, nil, err
return nil, nil, nil, err
}

now := time.Now().Round(time.Microsecond)
Expand All @@ -1270,10 +1263,10 @@ func (a *Aggregator) getAndLockBatchToProve(ctx context.Context, prover proverIn
err = a.state.AddGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("Failed to add batch proof, err: %v", err)
return nil, nil, err
return nil, nil, nil, err
}

return batch, proof, nil
return batch, witness, proof, nil
}

func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInterface) (bool, error) {
Expand All @@ -1284,7 +1277,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
)
log.Debug("tryGenerateBatchProof start")

batchToProve, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
batchToProve, witness, proof, err0 := a.getAndLockBatchToProve(ctx, prover)
if errors.Is(err0, state.ErrNotFound) {
// nothing to proof, swallow the error
log.Debug("Nothing to generate proof")
Expand Down Expand Up @@ -1313,7 +1306,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt
}()

log.Infof("Sending zki + batch to the prover, batchNumber [%d]", batchToProve.BatchNumber)
inputProver, err := a.buildInputProver(ctx, batchToProve)
inputProver, err := a.buildInputProver(ctx, batchToProve, witness)
if err != nil {
err = fmt.Errorf("failed to build input prover, %w", err)
log.Error(FirstToUpper(err.Error()))
Expand All @@ -1334,7 +1327,7 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt

log = log.WithFields("proofId", *proof.ProofID)

resGetProof, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
resGetProof, stateRoot, err := prover.WaitRecursiveProof(ctx, *proof.ProofID)
if err != nil {
err = fmt.Errorf("failed to get proof from prover, %w", err)
log.Error(FirstToUpper(err.Error()))
Expand All @@ -1343,6 +1336,11 @@ func (a *Aggregator) tryGenerateBatchProof(ctx context.Context, prover proverInt

log.Info("Batch proof generated")

// Sanity Check: state root from the proof must match the one from the batch
if a.cfg.BatchProofSanityCheckEnabled && (stateRoot != common.Hash{}) && (stateRoot != batchToProve.StateRoot) {
log.Fatalf("State root from the proof does not match the expected for batch %d: Proof = [%s] Expected = [%s]", batchToProve.BatchNumber, stateRoot.String(), batchToProve.StateRoot.String())
}

proof.Proof = resGetProof

// NOTE(pg): the defer func is useless from now on, use a different variable
Expand Down Expand Up @@ -1400,7 +1398,7 @@ func (a *Aggregator) resetVerifyProofTime() {
a.timeSendFinalProof = time.Now().Add(a.cfg.VerifyProofInterval.Duration)
}

func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.Batch) (*prover.StatelessInputProver, error) {
func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.Batch, witness []byte) (*prover.StatelessInputProver, error) {
isForcedBatch := false
batchRawData := &state.BatchRawV2{}
var err error
Expand Down Expand Up @@ -1446,7 +1444,6 @@ func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.
l1InfoTreeLeaf := leaves[l2blockRaw.IndexL1InfoTree]

// Calculate smt proof
log.Infof("Calling tree.ComputeMerkleProof")
smtProof, calculatedL1InfoRoot, err := tree.ComputeMerkleProof(l2blockRaw.IndexL1InfoTree, aLeaves)
if err != nil {
log.Errorf("Error computing merkle proof: %v", err)
Expand Down Expand Up @@ -1495,15 +1492,8 @@ func (a *Aggregator) buildInputProver(ctx context.Context, batchToVerify *state.
}*/
}

// Get Witness
witness, err := getWitness(batchToVerify.BatchNumber, a.cfg.WitnessURL, a.cfg.UseFullWitness)
if err != nil {
log.Errorf("Failed to get witness, err: %v", err)
return nil, err
}

// Get Old Acc Input Hash
oldBatch, _, err := a.state.GetBatch(ctx, batchToVerify.BatchNumber-1, nil)
oldBatch, _, _, err := a.state.GetBatch(ctx, batchToVerify.BatchNumber-1, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1539,6 +1529,7 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error
witnessType = "full"
}

start := time.Now()
response, err = rpc.JSONRPCCall(URL, "zkevm_getBatchWitness", batchNumber, witnessType)
if err != nil {
return nil, err
Expand All @@ -1549,6 +1540,8 @@ func getWitness(batchNumber uint64, URL string, fullWitness bool) ([]byte, error
return nil, fmt.Errorf("error from witness for batch %d: %v", batchNumber, response.Error)
}

log.Infof("Witness for batch %d received in %v", batchNumber, time.Since(start))

err = json.Unmarshal(response.Result, &witness)
if err != nil {
return nil, err
Expand Down
9 changes: 5 additions & 4 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/0xPolygon/cdk/aggregator/db"
"github.com/0xPolygon/cdk/config/types"
"github.com/0xPolygon/cdk/encoding"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygonHermez/zkevm-ethtx-manager/ethtxmanager"
syncronizerConfig "github.com/0xPolygonHermez/zkevm-synchronizer-l1/config"
Expand All @@ -24,9 +25,6 @@ const (

// L1 settlement backend
L1 SettlementBackend = "l1"

// TenToThePowerOf18 represents 1000000000000000000
TenToThePowerOf18 = 1000000000000000000
)

// TokenAmountWithDecimals is a wrapper type that parses token amount with decimals to big int
Expand All @@ -40,7 +38,7 @@ func (t *TokenAmountWithDecimals) UnmarshalText(data []byte) error {
if !ok {
return fmt.Errorf("failed to unmarshal string to float")
}
coin := new(big.Float).SetInt(big.NewInt(TenToThePowerOf18))
coin := new(big.Float).SetInt(big.NewInt(encoding.TenToThePowerOf18))
bigval := new(big.Float).Mul(amount, coin)
result := new(big.Int)
bigval.Int(result)
Expand Down Expand Up @@ -77,6 +75,9 @@ type Config struct {
// IntervalAfterWhichBatchConsolidateAnyway this is interval for the main sequencer, that will check if there is no transactions
IntervalAfterWhichBatchConsolidateAnyway types.Duration `mapstructure:"IntervalAfterWhichBatchConsolidateAnyway"`

// BatchProofSanityCheckEnabled is a flag to enable the sanity check of the batch proof
BatchProofSanityCheckEnabled bool `mapstructure:"BatchProofSanityCheckEnabled"`

// ChainID is the L2 ChainID provided by the Network Config
ChainID uint64

Expand Down
8 changes: 8 additions & 0 deletions aggregator/db/migrations/0002.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- +migrate Up
DELETE FROM aggregator.batch;
ALTER TABLE aggregator.batch
ADD COLUMN IF NOT EXISTS witness varchar NOT NULL;

-- +migrate Down
ALTER TABLE aggregator.batch
DROP COLUMN IF NOT EXISTS witness;
6 changes: 3 additions & 3 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type proverInterface interface {
BatchProof(input *prover.StatelessInputProver) (*string, error)
AggregatedProof(inputProof1, inputProof2 string) (*string, error)
FinalProof(inputProof string, aggregatorAddr string) (*string, error)
WaitRecursiveProof(ctx context.Context, proofID string) (string, error)
WaitRecursiveProof(ctx context.Context, proofID string) (string, common.Hash, error)
WaitFinalProof(ctx context.Context, proofID string) (*prover.FinalProof, error)
}

Expand Down Expand Up @@ -55,8 +55,8 @@ type stateInterface interface {
CleanupLockedProofs(ctx context.Context, duration string, dbTx pgx.Tx) (int64, error)
CheckProofExistsForBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (bool, error)
AddSequence(ctx context.Context, sequence state.Sequence, dbTx pgx.Tx) error
AddBatch(ctx context.Context, batch *state.Batch, datastream []byte, dbTx pgx.Tx) error
GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, []byte, error)
AddBatch(ctx context.Context, batch *state.Batch, datastream []byte, witness []byte, dbTx pgx.Tx) error
GetBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, []byte, []byte, error)
DeleteBatchesOlderThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
DeleteBatchesNewerThanBatchNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
}
Loading

0 comments on commit d9e1a17

Please sign in to comment.