Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
separate onblockproposed handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed Jul 14, 2023
1 parent 3f90039 commit cda2aec
Showing 1 changed file with 79 additions and 47 deletions.
126 changes: 79 additions & 47 deletions prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Prover struct {
blockVerifiedSub event.Subscription
proverSlashedCh chan *bindings.TaikoL1ProverPoolSlashed
proverSlashedSub event.Subscription
proveNotify chan *big.Int
proveNotify chan struct{}

// Proof related
proofGenerationCh chan *proofProducer.ProofWithHeader
Expand Down Expand Up @@ -142,7 +142,7 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) {
p.blockProvenCh = make(chan *bindings.TaikoL1ClientBlockProven, chBufferSize)
p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize)
p.proverSlashedCh = make(chan *bindings.TaikoL1ProverPoolSlashed, chBufferSize)
p.proveNotify = make(chan *big.Int, 1)
p.proveNotify = make(chan struct{}, 1)
if err := p.initL1Current(cfg.StartingBlockID); err != nil {
return fmt.Errorf("initialize L1 current cursor error: %w", err)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (p *Prover) eventLoop() {
// if we are already proving.
reqProving := func() {
select {
case p.proveNotify <- nil:
case p.proveNotify <- struct{}{}:
default:
}
}
Expand Down Expand Up @@ -278,8 +278,8 @@ func (p *Prover) eventLoop() {
}
case proofWithHeader := <-p.proofGenerationCh:
p.submitProofOp(p.ctx, proofWithHeader)
case startHeight := <-p.proveNotify:
if err := p.proveOp(startHeight); err != nil {
case <-p.proveNotify:
if err := p.proveOp(); err != nil {
log.Error("Prove new blocks error", "error", err)
}
case <-p.blockProposedCh:
Expand Down Expand Up @@ -310,28 +310,17 @@ func (p *Prover) Close() {

// proveOp performs a proving operation, find current unproven blocks, then
// request generating proofs for them.
func (p *Prover) proveOp(startHeight *big.Int) error {
func (p *Prover) proveOp() error {
firstTry := true

var endHeight *big.Int = nil

if startHeight == nil {
startHeight = new(big.Int).SetUint64(p.l1Current)
} else {
// if startHeight is passed in, we really mean to say, we want to target a blockProposedEvent in
// that specific block.
endHeight = new(big.Int).Add(startHeight, big.NewInt(1))
}

for firstTry || p.reorgDetectedFlag {
p.reorgDetectedFlag = false
firstTry = false

iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{
Client: p.rpc.L1,
TaikoL1: p.rpc.TaikoL1,
StartHeight: startHeight,
EndHeight: endHeight,
StartHeight: new(big.Int).SetUint64(p.l1Current),
OnBlockProposedEvent: p.onBlockProposed,
})
if err != nil {
Expand Down Expand Up @@ -683,21 +672,7 @@ func (p *Prover) onBlockProven(ctx context.Context, event *bindings.TaikoL1Clien
} else {
// generate oracle proof if oracle prover, proof is invalid
if p.cfg.OracleProver {
// call proveNotify and pass in the L1 start height
notify := func() {
select {
case p.proveNotify <- new(big.Int).SetUint64(event.Raw.BlockNumber):
default:
log.Info("unable to request oracle proof, proveNotify channel busy",
"blockID",
event.BlockId.Uint64(),
"l1Height",
event.Raw.BlockNumber,
)
}
}

notify()
return p.requestProofForBlockId(event.BlockId, new(big.Int).SetUint64(event.Raw.BlockNumber))
}
}

Expand Down Expand Up @@ -863,18 +838,6 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId
}

if time.Now().Unix() > int64(block.ProposedAt)+int64(block.ProofWindow) {
notify := func() {
select {
case p.proveNotify <- new(big.Int).SetUint64(l1Height):
default:
log.Info("unable to request proof, proveNotify channel busy",
"blockID",
blockId,
"l1Height",
l1Height,
)
}
}

// we should remove this block from being watched regardless of whether the block
// has a valid proof
Expand Down Expand Up @@ -906,7 +869,9 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId
l1Height,
)
// we can generate the proof, no proof came in by proof window expiring
notify()
if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil {
return err
}
} else {
// we need to check the block hash vs the proof's blockHash to see
// if the proof is valid or not
Expand All @@ -930,11 +895,78 @@ func (p *Prover) checkProofWindowExpired(ctx context.Context, l1Height, blockId
)
// we can generate the proof, the proof is incorrect since blockHash does not match
// the correct one but parentHash/gasUsed are correct.
notify()
if err := p.requestProofForBlockId(new(big.Int).SetUint64(blockId), new(big.Int).SetUint64(l1Height)); err != nil {
return err
}
}
}
}

// otherwise, keep it in the map and check again next iteration
return nil
}

// proveOp performs a proving operation, find current unproven blocks, then
// request generating proofs for them.
func (p *Prover) requestProofForBlockId(blockId *big.Int, l1Height *big.Int) error {
onBlockProposed := func(
ctx context.Context,
event *bindings.TaikoL1ClientBlockProposed,
end eventIterator.EndBlockProposedEventIterFunc,
) error {
// If there is newly generated proofs, we need to submit them as soon as possible.
if len(p.proofGenerationCh) > 0 {
end()
return nil
}

// only filter for exact blockID we want
if event.BlockId.Cmp(blockId) != 0 {
return nil
}

// Check whether the block has been verified.
isVerified, err := p.isBlockVerified(event.BlockId)
if err != nil {
return fmt.Errorf("failed to check if the current L2 block is verified: %w", err)
}

if isVerified {
log.Info("📋 Block has been verified", "blockID", event.BlockId)
return nil
}

ctx, cancelCtx := context.WithCancel(ctx)
p.currentBlocksBeingProvenMutex.Lock()
p.currentBlocksBeingProven[event.BlockId.Uint64()] = cancelFunc(func() {
defer cancelCtx()
if err := p.validProofSubmitter.CancelProof(ctx, event.BlockId); err != nil {
log.Error("failed to cancel proof", "error", err, "blockID", event.BlockId)
}
})
p.currentBlocksBeingProvenMutex.Unlock()

if err := p.validProofSubmitter.RequestProof(ctx, event); err != nil {
return err
}

return nil
}

iter, err := eventIterator.NewBlockProposedIterator(p.ctx, &eventIterator.BlockProposedIteratorConfig{
Client: p.rpc.L1,
TaikoL1: p.rpc.TaikoL1,
StartHeight: l1Height,
EndHeight: new(big.Int).Add(l1Height, big.NewInt(1)),
OnBlockProposedEvent: onBlockProposed,
})
if err != nil {
return err
}

if err := iter.Iter(); err != nil {
return err
}

return nil
}

0 comments on commit cda2aec

Please sign in to comment.