diff --git a/driver/state/state.go b/driver/state/state.go index 2f2e645f2..b8f0e3dde 100644 --- a/driver/state/state.go +++ b/driver/state/state.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math/big" + "sync" "sync/atomic" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -20,69 +21,48 @@ import ( // State contains all states which will be used by driver. type State struct { - // Subscriptions, will automatically resubscribe on errors - l1HeadSub event.Subscription // L1 new heads - l2HeadSub event.Subscription // L2 new heads - l2TransitionProvedSub event.Subscription // TaikoL1.TransitionProved events - l2BlockVerifiedSub event.Subscription // TaikoL1.BlockVerified events - l2BlockProposedSub event.Subscription // TaikoL1.BlockProposed events - - l1HeadCh chan *types.Header - l2HeadCh chan *types.Header - blockProposedCh chan *bindings.TaikoL1ClientBlockProposed - transitionProvedCh chan *bindings.TaikoL1ClientTransitionProved - blockVerifiedCh chan *bindings.TaikoL1ClientBlockVerified - // Feeds l1HeadsFeed event.Feed // L1 new heads notification feed - l1Head *atomic.Value // Latest known L1 head - l2Head *atomic.Value // Current L2 execution engine's local chain head - l2HeadBlockID *atomic.Value // Latest known L2 block ID - l2VerifiedHead *atomic.Value // Latest known L2 verified head - l1Current *atomic.Value // Current L1 block sync cursor + l1Head *atomic.Value // Latest known L1 head + l2Head *atomic.Value // Current L2 execution engine's local chain head + l2HeadBlockID *atomic.Value // Latest known L2 block ID + l1Current *atomic.Value // Current L1 block sync cursor // Constants - GenesisL1Height *big.Int - BlockDeadendHash common.Hash + GenesisL1Height *big.Int // RPC clients rpc *rpc.Client + + stopCh chan struct{} + wg sync.WaitGroup } // New creates a new driver state instance. func New(ctx context.Context, rpc *rpc.Client) (*State, error) { s := &State{ - rpc: rpc, - l1Head: new(atomic.Value), - l2Head: new(atomic.Value), - l2HeadBlockID: new(atomic.Value), - l2VerifiedHead: new(atomic.Value), - l1Current: new(atomic.Value), - l1HeadCh: make(chan *types.Header, 10), - l2HeadCh: make(chan *types.Header, 10), - blockProposedCh: make(chan *bindings.TaikoL1ClientBlockProposed, 10), - transitionProvedCh: make(chan *bindings.TaikoL1ClientTransitionProved, 10), - blockVerifiedCh: make(chan *bindings.TaikoL1ClientBlockVerified, 10), - BlockDeadendHash: common.BigToHash(common.Big1), + rpc: rpc, + l1Head: new(atomic.Value), + l2Head: new(atomic.Value), + l2HeadBlockID: new(atomic.Value), + l1Current: new(atomic.Value), + stopCh: make(chan struct{}), } if err := s.init(ctx); err != nil { return nil, err } - s.startSubscriptions(ctx) + go s.eventLoop(ctx) return s, nil } // Close closes all inner subscriptions. func (s *State) Close() { - s.l1HeadSub.Unsubscribe() - s.l2HeadSub.Unsubscribe() - s.l2BlockVerifiedSub.Unsubscribe() - s.l2BlockProposedSub.Unsubscribe() - s.l2TransitionProvedSub.Unsubscribe() + close(s.stopCh) + s.wg.Wait() } // init fetches the latest status and initializes the state instance. @@ -123,47 +103,63 @@ func (s *State) init(ctx context.Context) error { return nil } -// startSubscriptions initializes all subscriptions in the given state instance. -func (s *State) startSubscriptions(ctx context.Context) { - s.l1HeadSub = rpc.SubscribeChainHead(s.rpc.L1, s.l1HeadCh) - s.l2HeadSub = rpc.SubscribeChainHead(s.rpc.L2, s.l2HeadCh) - s.l2BlockVerifiedSub = rpc.SubscribeBlockVerified(s.rpc.TaikoL1, s.blockVerifiedCh) - s.l2BlockProposedSub = rpc.SubscribeBlockProposed(s.rpc.TaikoL1, s.blockProposedCh) - s.l2TransitionProvedSub = rpc.SubscribeTransitionProved(s.rpc.TaikoL1, s.transitionProvedCh) - - go func() { - for { - select { - case <-ctx.Done(): - return - case e := <-s.blockProposedCh: - s.setHeadBlockID(e.BlockId) - case e := <-s.transitionProvedCh: - log.Info( - "✅ Transition proven", - "blockID", e.BlockId, - "parentHash", common.Hash(e.Tran.ParentHash), - "hash", common.Hash(e.Tran.BlockHash), - "stateRoot", common.Hash(e.Tran.StateRoot), - "prover", e.Prover, - ) - case e := <-s.blockVerifiedCh: - log.Info( - "📈 Block verified", - "blockID", e.BlockId, - "hash", common.Hash(e.BlockHash), - "stateRoot", common.Hash(e.StateRoot), - "assignedProver", e.AssignedProver, - "prover", e.Prover, - ) - case newHead := <-s.l1HeadCh: - s.setL1Head(newHead) - s.l1HeadsFeed.Send(newHead) - case newHead := <-s.l2HeadCh: - s.setL2Head(newHead) - } - } +// eventLoop initializes and starts all subscriptions and callbacks in the given state instance. +func (s *State) eventLoop(ctx context.Context) { + s.wg.Add(1) + defer s.wg.Done() + + l1HeadCh := make(chan *types.Header, 10) + l2HeadCh := make(chan *types.Header, 10) + blockProposedCh := make(chan *bindings.TaikoL1ClientBlockProposed, 10) + transitionProvedCh := make(chan *bindings.TaikoL1ClientTransitionProved, 10) + blockVerifiedCh := make(chan *bindings.TaikoL1ClientBlockVerified, 10) + + l1HeadSub := rpc.SubscribeChainHead(s.rpc.L1, l1HeadCh) + l2HeadSub := rpc.SubscribeChainHead(s.rpc.L2, l2HeadCh) + l2BlockVerifiedSub := rpc.SubscribeBlockVerified(s.rpc.TaikoL1, blockVerifiedCh) + l2BlockProposedSub := rpc.SubscribeBlockProposed(s.rpc.TaikoL1, blockProposedCh) + l2TransitionProvedSub := rpc.SubscribeTransitionProved(s.rpc.TaikoL1, transitionProvedCh) + defer func() { + l1HeadSub.Unsubscribe() + l2HeadSub.Unsubscribe() + l2BlockVerifiedSub.Unsubscribe() + l2BlockProposedSub.Unsubscribe() + l2TransitionProvedSub.Unsubscribe() }() + + for { + select { + case <-ctx.Done(): + return + case <-s.stopCh: + return + case e := <-blockProposedCh: + s.setHeadBlockID(e.BlockId) + case e := <-transitionProvedCh: + log.Info( + "✅ Transition proven", + "blockID", e.BlockId, + "parentHash", common.Hash(e.Tran.ParentHash), + "hash", common.Hash(e.Tran.BlockHash), + "stateRoot", common.Hash(e.Tran.StateRoot), + "prover", e.Prover, + ) + case e := <-blockVerifiedCh: + log.Info( + "📈 Block verified", + "blockID", e.BlockId, + "hash", common.Hash(e.BlockHash), + "stateRoot", common.Hash(e.StateRoot), + "assignedProver", e.AssignedProver, + "prover", e.Prover, + ) + case newHead := <-l1HeadCh: + s.setL1Head(newHead) + s.l1HeadsFeed.Send(newHead) + case newHead := <-l2HeadCh: + s.setL2Head(newHead) + } + } } // setL1Head sets the L1 head concurrent safely. diff --git a/pkg/rpc/subscription.go b/pkg/rpc/subscription.go index 25eafd0b2..a6dddbb6a 100644 --- a/pkg/rpc/subscription.go +++ b/pkg/rpc/subscription.go @@ -120,12 +120,10 @@ func SubscribeChainHead( // waitSubErr keeps waiting until the given subscription failed. func waitSubErr(ctx context.Context, sub event.Subscription) (event.Subscription, error) { - for { - select { - case err := <-sub.Err(): - return sub, err - case <-ctx.Done(): - return sub, nil - } + select { + case err := <-sub.Err(): + return sub, err + case <-ctx.Done(): + return sub, nil } } diff --git a/pkg/rpc/subscription_test.go b/pkg/rpc/subscription_test.go index 67e4131a1..7a980cd73 100644 --- a/pkg/rpc/subscription_test.go +++ b/pkg/rpc/subscription_test.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/stretchr/testify/require" - "github.com/taikoxyz/taiko-client/bindings" )