From cda2aeccc6cee686ea596c3c42106247447ce0b5 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 14 Jul 2023 01:05:30 -0700 Subject: [PATCH] separate onblockproposed handlers --- prover/prover.go | 126 +++++++++++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 47 deletions(-) diff --git a/prover/prover.go b/prover/prover.go index 4ef681071..9128ba523 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -65,7 +65,7 @@ type Prover struct { blockVerifiedSub event.Subscription proverSlashedCh chan *bindings.TaikoL1ProverPoolSlashed proverSlashedSub event.Subscription - proveNotify chan *big.Int + proveNotify chan struct{} // Proof related proofGenerationCh chan *proofProducer.ProofWithHeader @@ -142,7 +142,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) { p.blockProvenCh = make(chan *bindings.TaikoL1ClientBlockProven, chBufferSize) p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize) p.proverSlashedCh = make(chan *bindings.TaikoL1ProverPoolSlashed, chBufferSize) - p.proveNotify = make(chan *big.Int, 1) + p.proveNotify = make(chan struct{}, 1) if err := p.initL1Current(cfg.StartingBlockID); err != nil { return fmt.Errorf("initialize L1 current cursor error: %w", err) } @@ -235,7 +235,7 @@ func (p *Prover) eventLoop() { // if we are already proving. reqProving := func() { select { - case p.proveNotify <- nil: + case p.proveNotify <- struct{}{}: default: } } @@ -278,8 +278,8 @@ func (p *Prover) eventLoop() { } case proofWithHeader := <-p.proofGenerationCh: p.submitProofOp(p.ctx, proofWithHeader) - case startHeight := <-p.proveNotify: - if err := p.proveOp(startHeight); err != nil { + case <-p.proveNotify: + if err := p.proveOp(); err != nil { log.Error("Prove new blocks error", "error", err) } case <-p.blockProposedCh: @@ -310,19 +310,9 @@ func (p *Prover) Close() { // proveOp performs a proving operation, find current unproven blocks, then // request generating proofs for them. -func (p *Prover) proveOp(startHeight *big.Int) error { +func (p *Prover) proveOp() error { firstTry := true - var endHeight *big.Int = nil - - if startHeight == nil { - startHeight = new(big.Int).SetUint64(p.l1Current) - } else { - // if startHeight is passed in, we really mean to say, we want to target a blockProposedEvent in - // that specific block. - endHeight = new(big.Int).Add(startHeight, big.NewInt(1)) - } - for firstTry || p.reorgDetectedFlag { p.reorgDetectedFlag = false firstTry = false @@ -330,8 +320,7 @@ func (p *Prover) proveOp(startHeight *big.Int) error { iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{ Client: p.rpc.L1, TaikoL1: p.rpc.TaikoL1, - StartHeight: startHeight, - EndHeight: endHeight, + StartHeight: new(big.Int).SetUint64(p.l1Current), OnBlockProposedEvent: p.onBlockProposed, }) if err != nil { @@ -683,21 +672,7 @@ func (p *Prover) onBlockProven(ctx context.Context, event *bindings.TaikoL1Clien } else { // generate oracle proof if oracle prover, proof is invalid if p.cfg.OracleProver { - // call proveNotify and pass in the L1 start height - notify := func() { - select { - case p.proveNotify <- new(big.Int).SetUint64(event.Raw.BlockNumber): - default: - log.Info("unable to request oracle proof, proveNotify channel busy", - "blockID", - event.BlockId.Uint64(), - "l1Height", - event.Raw.BlockNumber, - ) - } - } - - notify() + return p.requestProofForBlockId(event.BlockId, new(big.Int).SetUint64(event.Raw.BlockNumber)) } } @@ -863,18 +838,6 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId } if time.Now().Unix() > int64(block.ProposedAt)+int64(block.ProofWindow) { - notify := func() { - select { - case p.proveNotify <- new(big.Int).SetUint64(l1Height): - default: - log.Info("unable to request proof, proveNotify channel busy", - "blockID", - blockId, - "l1Height", - l1Height, - ) - } - } // we should remove this block from being watched regardless of whether the block // has a valid proof @@ -906,7 +869,9 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId l1Height, ) // we can generate the proof, no proof came in by proof window expiring - notify() + if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil { + return err + } } else { // we need to check the block hash vs the proof's blockHash to see // if the proof is valid or not @@ -930,7 +895,9 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId ) // we can generate the proof, the proof is incorrect since blockHash does not match // the correct one but parentHash/gasUsed are correct. - notify() + if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil { + return err + } } } } @@ -938,3 +905,68 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId // otherwise, keep it in the map and check again next iteration return nil } + +// proveOp performs a proving operation, find current unproven blocks, then +// request generating proofs for them. +func (p *Prover) requestProofForBlockId(blockId *big.Int, l1Height *big.Int) error { + onBlockProposed := func( + ctx context.Context, + event *bindings.TaikoL1ClientBlockProposed, + end eventIterator.EndBlockProposedEventIterFunc, + ) error { + // If there is newly generated proofs, we need to submit them as soon as possible. + if len(p.proofGenerationCh) > 0 { + end() + return nil + } + + // only filter for exact blockID we want + if event.BlockId.Cmp(blockId) != 0 { + return nil + } + + // Check whether the block has been verified. + isVerified, err := p.isBlockVerified(event.BlockId) + if err != nil { + return fmt.Errorf("failed to check if the current L2 block is verified: %w", err) + } + + if isVerified { + log.Info("📋 Block has been verified", "blockID", event.BlockId) + return nil + } + + ctx, cancelCtx := context.WithCancel(ctx) + p.currentBlocksBeingProvenMutex.Lock() + p.currentBlocksBeingProven[event.BlockId.Uint64()] = cancelFunc(func() { + defer cancelCtx() + if err := p.validProofSubmitter.CancelProof(ctx, event.BlockId); err != nil { + log.Error("failed to cancel proof", "error", err, "blockID", event.BlockId) + } + }) + p.currentBlocksBeingProvenMutex.Unlock() + + if err := p.validProofSubmitter.RequestProof(ctx, event); err != nil { + return err + } + + return nil + } + + iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{ + Client: p.rpc.L1, + TaikoL1: p.rpc.TaikoL1, + StartHeight: l1Height, + EndHeight: new(big.Int).Add(l1Height, big.NewInt(1)), + OnBlockProposedEvent: onBlockProposed, + }) + if err != nil { + return err + } + + if err := iter.Iter(); err != nil { + return err + } + + return nil +}