Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

Commit

Permalink
feat(prover): move sub event logic into event function. (#513)
Browse files Browse the repository at this point in the history
Co-authored-by: David <[email protected]>
  • Loading branch information
mask-pp and davidtaikocha committed Jan 21, 2024
1 parent 5f9d843 commit d7aad5a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 48 deletions.
Empty file added internal/docker/nodes/.env
Empty file.
68 changes: 25 additions & 43 deletions prover/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/leveldb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -70,17 +69,8 @@ type Prover struct {
proofSubmitters []proofSubmitter.Submitter
proofContester proofSubmitter.Contester

// Subscriptions
blockProposedCh chan *bindings.TaikoL1ClientBlockProposed
blockProposedSub event.Subscription
transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved
transitionProvedSub event.Subscription
transitionContestedCh chan *bindings.TaikoL1ClientTransitionContested
transitionContestedSub event.Subscription
blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified
blockVerifiedSub event.Subscription
proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed
proveNotify chan struct{}
proofWindowExpiredCh chan *bindings.TaikoL1ClientBlockProposed
proveNotify chan struct{}

// Proof related
proofGenerationCh chan *proofProducer.ProofWithHeader
Expand Down Expand Up @@ -144,10 +134,6 @@ func InitFromConfig(ctx context.Context, p *Prover, cfg *Config) (err error) {
p.proverAddress = crypto.PubkeyToAddress(p.cfg.L1ProverPrivKey.PublicKey)

chBufferSize := p.protocolConfigs.BlockMaxProposals
p.blockProposedCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
p.blockVerifiedCh = make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize)
p.transitionProvedCh = make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize)
p.transitionContestedCh = make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize)
p.proofGenerationCh = make(chan *proofProducer.ProofWithHeader, chBufferSize)
p.proofWindowExpiredCh = make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
p.proveNotify = make(chan struct{}, 1)
Expand Down Expand Up @@ -407,9 +393,6 @@ func (p *Prover) Start() error {
}
}

p.wg.Add(1)
p.initSubscription()

go func() {
if err := p.srv.Start(fmt.Sprintf(":%v", p.cfg.HTTPServerPort)); !errors.Is(err, http.ErrServerClosed) {
log.Crit("Failed to start http server", "error", err)
Expand All @@ -421,6 +404,7 @@ func (p *Prover) Start() error {
go p.heartbeatInterval(p.ctx)
}

p.wg.Add(1)
go p.eventLoop()

return nil
Expand All @@ -440,15 +424,31 @@ func (p *Prover) eventLoop() {
default:
}
}
// Call reqProving() right away to catch up with the latest state.
reqProving()

// If there is too many (TaikoData.Config.blockMaxProposals) pending blocks in TaikoL1 contract, there will be no new
// BlockProposed temporarily, so except the BlockProposed subscription, we need another trigger to start
// fetching the proposed blocks.
forceProvingTicker := time.NewTicker(15 * time.Second)
defer forceProvingTicker.Stop()

// Call reqProving() right away to catch up with the latest state.
reqProving()
chBufferSize := p.protocolConfigs.BlockMaxProposals
blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, chBufferSize)
blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, chBufferSize)
transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, chBufferSize)
transitionContestedCh := make(chan *bindings.TaikoL1ClientTransitionContested, chBufferSize)
// Subscriptions
blockProposedSub := rpc.SubscribeBlockProposed(p.rpc.TaikoL1, blockProposedCh)
blockVerifiedSub := rpc.SubscribeBlockVerified(p.rpc.TaikoL1, blockVerifiedCh)
transitionProvedSub := rpc.SubscribeTransitionProved(p.rpc.TaikoL1, transitionProvedCh)
transitionContestedSub := rpc.SubscribeTransitionContested(p.rpc.TaikoL1, transitionContestedCh)
defer func() {
blockProposedSub.Unsubscribe()
blockVerifiedSub.Unsubscribe()
transitionProvedSub.Unsubscribe()
transitionContestedSub.Unsubscribe()
}()

for {
select {
Expand All @@ -460,23 +460,23 @@ func (p *Prover) eventLoop() {
if err := p.proveOp(); err != nil {
log.Error("Prove new blocks error", "error", err)
}
case e := <-p.blockVerifiedCh:
case e := <-blockVerifiedCh:
if err := p.onBlockVerified(p.ctx, e); err != nil {
log.Error("Handle BlockVerified event error", "error", err)
}
case e := <-p.transitionProvedCh:
case e := <-transitionProvedCh:
if err := p.onTransitionProved(p.ctx, e); err != nil {
log.Error("Handle TransitionProved event error", "error", err)
}
case e := <-p.transitionContestedCh:
case e := <-transitionContestedCh:
if err := p.onTransitionContested(p.ctx, e); err != nil {
log.Error("Handle TransitionContested event error", "error", err)
}
case e := <-p.proofWindowExpiredCh:
if err := p.onProvingWindowExpired(p.ctx, e); err != nil {
log.Error("Handle provingWindow expired event error", "error", err)
}
case <-p.blockProposedCh:
case <-blockProposedCh:
reqProving()
case <-forceProvingTicker.C:
reqProving()
Expand All @@ -486,8 +486,6 @@ func (p *Prover) eventLoop() {

// Close closes the prover instance.
func (p *Prover) Close(ctx context.Context) {
p.closeSubscription()

if p.guardianProverSender != nil {
if err := p.guardianProverSender.Close(); err != nil {
log.Error("failed to close database connection", "error", err)
Expand Down Expand Up @@ -1073,22 +1071,6 @@ func (p *Prover) isBlockVerified(id *big.Int) (bool, error) {
return id.Uint64() <= stateVars.B.LastVerifiedBlockId, nil
}

// initSubscription initializes all subscriptions in current prover instance.
func (p *Prover) initSubscription() {
p.blockProposedSub = rpc.SubscribeBlockProposed(p.rpc.TaikoL1, p.blockProposedCh)
p.blockVerifiedSub = rpc.SubscribeBlockVerified(p.rpc.TaikoL1, p.blockVerifiedCh)
p.transitionProvedSub = rpc.SubscribeTransitionProved(p.rpc.TaikoL1, p.transitionProvedCh)
p.transitionContestedSub = rpc.SubscribeTransitionContested(p.rpc.TaikoL1, p.transitionContestedCh)
}

// closeSubscription closes all subscriptions.
func (p *Prover) closeSubscription() {
p.blockVerifiedSub.Unsubscribe()
p.blockProposedSub.Unsubscribe()
p.transitionProvedSub.Unsubscribe()
p.transitionContestedSub.Unsubscribe()
}

// isValidProof checks if the given proof is a valid one, comparing to current L2 node canonical chain.
func (p *Prover) isValidProof(
ctx context.Context,
Expand Down
5 changes: 0 additions & 5 deletions prover/prover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,6 @@ func (s *ProverTestSuite) TestProveOp() {
s.Equal(header.ParentHash, common.BytesToHash(event.Tran.ParentHash[:]))
}

func (s *ProverTestSuite) TestStartSubscription() {
s.NotPanics(s.p.initSubscription)
s.NotPanics(s.p.closeSubscription)
}

func (s *ProverTestSuite) TestSetApprovalAmount() {
opts, err := bind.NewKeyedTransactorWithChainID(s.p.proverPrivateKey, s.p.rpc.L1ChainID)
s.Nil(err)
Expand Down

0 comments on commit d7aad5a

Please sign in to comment.