Skip to content

Commit

Permalink
Merge pull request #31 from 0xPolygon/backfill-updates
Browse files Browse the repository at this point in the history
Backfill fixes
  • Loading branch information
christophercampbell authored Nov 27, 2023
2 parents fc46f8d + 9951f81 commit e69a225
Show file tree
Hide file tree
Showing 13 changed files with 329 additions and 104 deletions.
5 changes: 3 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/0xPolygon/cdk-data-availability/etherman"
"github.com/0xPolygon/cdk-data-availability/log"
"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/0xPolygon/cdk-data-availability/sequencer"
"github.com/0xPolygon/cdk-data-availability/services/datacom"
"github.com/0xPolygon/cdk-data-availability/services/sync"
"github.com/0xPolygon/cdk-data-availability/synchronizer"
Expand Down Expand Up @@ -92,7 +93,7 @@ func start(cliCtx *cli.Context) error {

var cancelFuncs []context.CancelFunc

sequencerTracker, err := synchronizer.NewSequencerTracker(c.L1, etherman)
sequencerTracker, err := sequencer.NewSequencerTracker(c.L1, etherman)
if err != nil {
log.Fatal(err)
}
Expand All @@ -111,7 +112,7 @@ func start(cliCtx *cli.Context) error {

cancelFuncs = append(cancelFuncs, detector.Stop)

batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, storage, detector.Subscribe(), etherman)
batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, storage, detector.Subscribe(), etherman, sequencerTracker)
if err != nil {
log.Fatal(err)
}
Expand Down
39 changes: 5 additions & 34 deletions etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ package etherman

import (
"context"
"encoding/json"
"fmt"
"math/big"
"strings"

"github.com/0xPolygon/cdk-data-availability/config"
"github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkdatacommittee"
"github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium"
"github.com/0xPolygon/cdk-data-availability/log"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -74,6 +71,11 @@ func (e *Etherman) TrustedSequencer() (common.Address, error) {
return e.CDKValidium.TrustedSequencer(&bind.CallOpts{Pending: false})
}

// TrustedSequencerURL gets trusted sequencer's RPC url
func (e *Etherman) TrustedSequencerURL() (string, error) {
return e.CDKValidium.TrustedSequencerURL(&bind.CallOpts{Pending: false})
}

// DataCommitteeMember represents a member of the Data Committee
type DataCommitteeMember struct {
Addr common.Address
Expand Down Expand Up @@ -128,34 +130,3 @@ func (e *Etherman) GetCurrentDataCommitteeMembers() ([]DataCommitteeMember, erro
}
return members, nil
}

// ParseEvent unpacks the keys in a SequenceBatches event
func ParseEvent(event *cdkvalidium.CdkvalidiumSequenceBatches, txData []byte) (uint64, []common.Hash, error) {
a, err := abi.JSON(strings.NewReader(cdkvalidium.CdkvalidiumABI))
if err != nil {
return 0, nil, err
}
method, err := a.MethodById(txData[:4])
if err != nil {
return 0, nil, err
}
data, err := method.Inputs.Unpack(txData[4:])
if err != nil {
return 0, nil, err
}
var batches []cdkvalidium.CDKValidiumBatchData
bytes, err := json.Marshal(data[0])
if err != nil {
return 0, nil, err
}
err = json.Unmarshal(bytes, &batches)
if err != nil {
return 0, nil, err
}

var keys []common.Hash
for _, batch := range batches {
keys = append(keys, batch.TransactionsHash)
}
return event.Raw.BlockNumber, keys, nil
}
30 changes: 30 additions & 0 deletions rpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,33 @@ func HexEncodeBig(bigint *big.Int) string {

return fmt.Sprintf("%#x", bigint)
}

// ArgBig helps to marshal big number values provided in the RPC requests
type ArgBig big.Int

// UnmarshalText unmarshals an instance of ArgBig into an array of bytes
func (a *ArgBig) UnmarshalText(input []byte) error {
buf, err := decodeToHex(input)
if err != nil {
return err
}

b := new(big.Int)
b.SetBytes(buf)
*a = ArgBig(*b)

return nil
}

// MarshalText marshals an array of bytes into an instance of ArgBig
func (a ArgBig) MarshalText() ([]byte, error) {
b := (*big.Int)(&a)

return []byte("0x" + b.Text(hexBase)), nil
}

// Hex returns a hexadecimal representation
func (b ArgBig) Hex() string {
bb, _ := b.MarshalText()
return string(bb)
}
33 changes: 33 additions & 0 deletions sequencer/call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sequencer

import (
"encoding/json"
"fmt"

"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/ethereum/go-ethereum/common"
)

// GetData returns batch data from the trusted sequencer
func GetData(url string, batchNum uint64) (*SeqBatch, error) {
response, err := rpc.JSONRPCCall(url, "zkevm_getBatchByNumber", batchNum, true)
if err != nil {
return nil, err
}
if response.Error != nil {
return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message)
}
var result SeqBatch
err = json.Unmarshal(response.Result, &result)
if err != nil {
return nil, err
}
return &result, nil
}

// SeqBatch structure
type SeqBatch struct {
Number rpc.ArgUint64 `json:"number"`
AccInputHash common.Hash `json:"accInputHash"`
BatchL2Data rpc.ArgBytes `json:"batchL2Data"`
}
73 changes: 71 additions & 2 deletions synchronizer/sequencer.go → sequencer/tracker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package synchronizer
package sequencer

import (
"context"
Expand All @@ -21,23 +21,30 @@ type SequencerTracker struct {
timeout time.Duration
retry time.Duration
addr common.Address
url string
lock sync.Mutex
}

// NewSequencerTracker creates a new SequencerTracker
func NewSequencerTracker(cfg config.L1Config, ethClient *etherman.Etherman) (*SequencerTracker, error) {
log.Info("starting sequencer address tracker")
// current address of the sequencer
addr, err := ethClient.TrustedSequencer()
if err != nil {
return nil, err
}
log.Infof("current sequencer addr: %s", addr.Hex())
url, err := ethClient.TrustedSequencerURL()
if err != nil {
return nil, err
}
log.Infof("current sequencer url: %s", url)
w := &SequencerTracker{
client: ethClient,
stop: make(chan struct{}),
timeout: cfg.Timeout.Duration,
retry: cfg.RetryPeriod.Duration,
addr: addr,
url: url,
}
return w, nil
}
Expand All @@ -55,8 +62,26 @@ func (st *SequencerTracker) setAddr(addr common.Address) {
st.addr = addr
}

// GetUrl returns the last known URL of the Sequencer
func (st *SequencerTracker) GetUrl() string {
st.lock.Lock()
defer st.lock.Unlock()
return st.url
}

func (st *SequencerTracker) setUrl(url string) {
st.lock.Lock()
defer st.lock.Unlock()
st.url = url
}

// Start starts the SequencerTracker
func (st *SequencerTracker) Start() {
go st.trackAddrChanges()
go st.trackUrlChanges()
}

func (st *SequencerTracker) trackAddrChanges() {
events := make(chan *cdkvalidium.CdkvalidiumSetTrustedSequencer)
defer close(events)
for {
Expand Down Expand Up @@ -100,6 +125,50 @@ func (st *SequencerTracker) Start() {
}
}

func (st *SequencerTracker) trackUrlChanges() {
events := make(chan *cdkvalidium.CdkvalidiumSetTrustedSequencerURL)
defer close(events)
for {
var (
sub event.Subscription
err error
)

ctx, cancel := context.WithTimeout(context.Background(), st.timeout)
opts := &bind.WatchOpts{Context: ctx}
sub, err = st.client.CDKValidium.WatchSetTrustedSequencerURL(opts, events)

// if no subscription, retry until established
for err != nil {
<-time.After(st.retry)
sub, err = st.client.CDKValidium.WatchSetTrustedSequencerURL(opts, events)
if err != nil {
log.Errorf("error subscribing to trusted sequencer event, retrying: %v", err)
}
}

// wait on events, timeouts, and signals to stop
select {
case e := <-events:
log.Infof("new trusted sequencer url: %v", e.NewTrustedSequencerURL)
st.setUrl(e.NewTrustedSequencerURL)
case err := <-sub.Err():
log.Warnf("subscription error, resubscribing: %v", err)
case <-ctx.Done():
// Deadline exceeded is expected since we use finite context timeout
if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded {
log.Warnf("re-establishing subscription: %v", ctx.Err())
}
case <-st.stop:
if sub != nil {
sub.Unsubscribe()
}
cancel()
return
}
}
}

// Stop stops the SequencerTracker
func (st *SequencerTracker) Stop() {
close(st.stop)
Expand Down
1 change: 1 addition & 0 deletions sequencer/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package sequencer
6 changes: 3 additions & 3 deletions services/datacom/datacom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"crypto/ecdsa"

"github.com/0xPolygon/cdk-data-availability/rpc"
"github.com/0xPolygon/cdk-data-availability/synchronizer"
"github.com/0xPolygon/cdk-data-availability/sequencer"
"github.com/0xPolygon/cdk-data-availability/types"
"github.com/jackc/pgx/v4"
)
Expand All @@ -18,12 +18,12 @@ type DataComEndpoints struct {
db DBInterface
txMan rpc.DBTxManager
privateKey *ecdsa.PrivateKey
sequencerTracker *synchronizer.SequencerTracker
sequencerTracker *sequencer.SequencerTracker
}

// NewDataComEndpoints returns DataComEndpoints
func NewDataComEndpoints(
db DBInterface, privateKey *ecdsa.PrivateKey, sequencerTracker *synchronizer.SequencerTracker,
db DBInterface, privateKey *ecdsa.PrivateKey, sequencerTracker *sequencer.SequencerTracker,
) *DataComEndpoints {
return &DataComEndpoints{
db: db,
Expand Down
Loading

0 comments on commit e69a225

Please sign in to comment.