From 1f2652ca8c0725ef599ed023cf61926ed636569b Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 15 Nov 2023 09:58:28 -0500 Subject: [PATCH 01/13] add trusted seq url to tracker and resolve data with seq prior to members --- cmd/main.go | 3 +- etherman/etherman.go | 5 ++ sequencer/call.go | 26 +++++++ .../sequencer.go => sequencer/tracker.go | 71 ++++++++++++++++++- sequencer/types.go | 1 + services/datacom/datacom.go | 6 +- synchronizer/batches.go | 70 +++++++++++++++--- synchronizer/rpc.go | 28 -------- test/e2e/sequencer_tracker_test.go | 4 +- 9 files changed, 170 insertions(+), 44 deletions(-) create mode 100644 sequencer/call.go rename synchronizer/sequencer.go => sequencer/tracker.go (61%) create mode 100644 sequencer/types.go delete mode 100644 synchronizer/rpc.go diff --git a/cmd/main.go b/cmd/main.go index 04948f39..4b4b42ff 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/0xPolygon/cdk-data-availability/etherman" "github.com/0xPolygon/cdk-data-availability/log" "github.com/0xPolygon/cdk-data-availability/rpc" + "github.com/0xPolygon/cdk-data-availability/sequencer" "github.com/0xPolygon/cdk-data-availability/services/datacom" "github.com/0xPolygon/cdk-data-availability/services/sync" "github.com/0xPolygon/cdk-data-availability/synchronizer" @@ -92,7 +93,7 @@ func start(cliCtx *cli.Context) error { var cancelFuncs []context.CancelFunc - sequencerTracker, err := synchronizer.NewSequencerTracker(c.L1, etherman) + sequencerTracker, err := sequencer.NewSequencerTracker(c.L1, etherman) if err != nil { log.Fatal(err) } diff --git a/etherman/etherman.go b/etherman/etherman.go index d3fdf257..ef484162 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -74,6 +74,11 @@ func (e *Etherman) TrustedSequencer() (common.Address, error) { return e.CDKValidium.TrustedSequencer(&bind.CallOpts{Pending: false}) } +// TrustedSequencerURL gets trusted sequencer's RPC url +func (e *Etherman) TrustedSequencerURL() (string, error) { + return e.CDKValidium.TrustedSequencerURL(&bind.CallOpts{Pending: false}) +} + // DataCommitteeMember represents a member of the Data Committee type DataCommitteeMember struct { Addr common.Address diff --git a/sequencer/call.go b/sequencer/call.go new file mode 100644 index 00000000..f1772fc4 --- /dev/null +++ b/sequencer/call.go @@ -0,0 +1,26 @@ +package sequencer + +import ( + "encoding/json" + "fmt" + + "github.com/0xPolygon/cdk-data-availability/rpc" + "github.com/0xPolygon/cdk-data-availability/types" +) + +// GetData returns batch data from the trusted sequencer +func GetData(url string, batchNum uint64) (*types.Batch, error) { + response, err := rpc.JSONRPCCall(url, "zkevm_getBatchByNumber", batchNum, true) + if err != nil { + return nil, err + } + if response.Error != nil { + return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message) + } + var result types.Batch + err = json.Unmarshal(response.Result, &result) + if err != nil { + return nil, err + } + return &result, nil +} diff --git a/synchronizer/sequencer.go b/sequencer/tracker.go similarity index 61% rename from synchronizer/sequencer.go rename to sequencer/tracker.go index 4cf02e11..dcc1bd37 100644 --- a/synchronizer/sequencer.go +++ b/sequencer/tracker.go @@ -1,4 +1,4 @@ -package synchronizer +package sequencer import ( "context" @@ -21,23 +21,28 @@ type SequencerTracker struct { timeout time.Duration retry time.Duration addr common.Address + url string lock sync.Mutex } // NewSequencerTracker creates a new SequencerTracker func NewSequencerTracker(cfg config.L1Config, ethClient *etherman.Etherman) (*SequencerTracker, error) { log.Info("starting sequencer address tracker") - // current address of the sequencer addr, err := ethClient.TrustedSequencer() if err != nil { return nil, err } + url, err := ethClient.TrustedSequencerURL() + if err != nil { + return nil, err + } w := &SequencerTracker{ client: ethClient, stop: make(chan struct{}), timeout: cfg.Timeout.Duration, retry: cfg.RetryPeriod.Duration, addr: addr, + url: url, } return w, nil } @@ -55,8 +60,26 @@ func (st *SequencerTracker) setAddr(addr common.Address) { st.addr = addr } +// GetUrl returns the last known URL of the Sequencer +func (st *SequencerTracker) GetUrl() string { + st.lock.Lock() + defer st.lock.Unlock() + return st.url +} + +func (st *SequencerTracker) setUrl(url string) { + st.lock.Lock() + defer st.lock.Unlock() + st.url = url +} + // Start starts the SequencerTracker func (st *SequencerTracker) Start() { + go st.trackAddrChanges() + go st.trackUrlChanges() +} + +func (st *SequencerTracker) trackAddrChanges() { events := make(chan *cdkvalidium.CdkvalidiumSetTrustedSequencer) defer close(events) for { @@ -100,6 +123,50 @@ func (st *SequencerTracker) Start() { } } +func (st *SequencerTracker) trackUrlChanges() { + events := make(chan *cdkvalidium.CdkvalidiumSetTrustedSequencerURL) + defer close(events) + for { + var ( + sub event.Subscription + err error + ) + + ctx, cancel := context.WithTimeout(context.Background(), st.timeout) + opts := &bind.WatchOpts{Context: ctx} + sub, err = st.client.CDKValidium.WatchSetTrustedSequencerURL(opts, events) + + // if no subscription, retry until established + for err != nil { + <-time.After(st.retry) + sub, err = st.client.CDKValidium.WatchSetTrustedSequencerURL(opts, events) + if err != nil { + log.Errorf("error subscribing to trusted sequencer event, retrying: %v", err) + } + } + + // wait on events, timeouts, and signals to stop + select { + case e := <-events: + log.Infof("new trusted sequencer url: %v", e.NewTrustedSequencerURL) + st.setUrl(e.NewTrustedSequencerURL) + case err := <-sub.Err(): + log.Warnf("subscription error, resubscribing: %v", err) + case <-ctx.Done(): + // Deadline exceeded is expected since we use finite context timeout + if ctx.Err() != nil && ctx.Err() != context.DeadlineExceeded { + log.Warnf("re-establishing subscription: %v", ctx.Err()) + } + case <-st.stop: + if sub != nil { + sub.Unsubscribe() + } + cancel() + return + } + } +} + // Stop stops the SequencerTracker func (st *SequencerTracker) Stop() { close(st.stop) diff --git a/sequencer/types.go b/sequencer/types.go new file mode 100644 index 00000000..5f9da75d --- /dev/null +++ b/sequencer/types.go @@ -0,0 +1 @@ +package sequencer diff --git a/services/datacom/datacom.go b/services/datacom/datacom.go index 5b61a035..b41a28a4 100644 --- a/services/datacom/datacom.go +++ b/services/datacom/datacom.go @@ -5,7 +5,7 @@ import ( "crypto/ecdsa" "github.com/0xPolygon/cdk-data-availability/rpc" - "github.com/0xPolygon/cdk-data-availability/synchronizer" + "github.com/0xPolygon/cdk-data-availability/sequencer" "github.com/0xPolygon/cdk-data-availability/types" "github.com/jackc/pgx/v4" ) @@ -18,12 +18,12 @@ type DataComEndpoints struct { db DBInterface txMan rpc.DBTxManager privateKey *ecdsa.PrivateKey - sequencerTracker *synchronizer.SequencerTracker + sequencerTracker *sequencer.SequencerTracker } // NewDataComEndpoints returns DataComEndpoints func NewDataComEndpoints( - db DBInterface, privateKey *ecdsa.PrivateKey, sequencerTracker *synchronizer.SequencerTracker, + db DBInterface, privateKey *ecdsa.PrivateKey, sequencerTracker *sequencer.SequencerTracker, ) *DataComEndpoints { return &DataComEndpoints{ db: db, diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 8d1eb280..4d6f81ec 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -6,15 +6,18 @@ import ( "sync" "time" + "github.com/0xPolygon/cdk-data-availability/client" "github.com/0xPolygon/cdk-data-availability/config" "github.com/0xPolygon/cdk-data-availability/db" "github.com/0xPolygon/cdk-data-availability/etherman" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium" "github.com/0xPolygon/cdk-data-availability/log" "github.com/0xPolygon/cdk-data-availability/rpc" + "github.com/0xPolygon/cdk-data-availability/sequencer" "github.com/0xPolygon/cdk-data-availability/types" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" ) const defaultBlockBatchSize = 32 @@ -24,6 +27,7 @@ type BatchSynchronizer struct { client *etherman.Etherman stop chan struct{} retry time.Duration + rpcTimeout time.Duration blockBatchSize uint self common.Address db *db.DB @@ -31,6 +35,7 @@ type BatchSynchronizer struct { lock sync.Mutex reorgs <-chan BlockReorg events chan *cdkvalidium.CdkvalidiumSequenceBatches + sequencer *sequencer.SequencerTracker } // NewBatchSynchronizer creates the BatchSynchronizer @@ -49,6 +54,7 @@ func NewBatchSynchronizer( client: ethClient, stop: make(chan struct{}), retry: cfg.RetryPeriod.Duration, + rpcTimeout: cfg.Timeout.Duration, blockBatchSize: cfg.BlockBatchSize, self: self, db: db, @@ -187,7 +193,7 @@ func (bs *BatchSynchronizer) consumeEvents() { } func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceBatches) error { - ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) + ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout) defer cancel() tx, _, err := bs.client.GetTx(ctx, event.Raw.TxHash) @@ -214,28 +220,36 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB var data []types.OffChainData for _, key := range missing { log.Infof("resolving missing key %v", key.Hex()) - var value types.OffChainData - value, err = bs.resolve(key) + var value *types.OffChainData + value, err = bs.resolve(event.NumBatch, key) if err != nil { return err } - data = append(data, value) + data = append(data, *value) } // Finally, store the data return store(bs.db, data) } -func (bs *BatchSynchronizer) resolve(key common.Hash) (types.OffChainData, error) { +func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.OffChainData, error) { log.Debugf("resolving missing data for key %v", key.Hex()) + + data := bs.trySequencer(batchNum, key) + if data != nil { + return data, nil + } + + // sequencer failed to produce data, try the other nodes if len(bs.committee) == 0 { // committee is resolved again once all members are evicted. They can be evicted // for not having data, or their config being malformed err := bs.resolveCommittee() if err != nil { - return types.OffChainData{}, err + return nil, err } } + // pull out the members, iterating will change the map on error members := make([]etherman.DataCommitteeMember, len(bs.committee)) for _, member := range bs.committee { @@ -249,13 +263,53 @@ func (bs *BatchSynchronizer) resolve(key common.Hash) (types.OffChainData, error continue // malformed committee, skip what is known to be wrong } log.Infof("trying DAC %s: %s", member.Addr.Hex(), member.URL) - value, err := resolveWithMember(key, member) + value, err := bs.resolveWithMember(key, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) delete(bs.committee, member.Addr) continue // did not have data or errored out } + return value, nil } - return types.OffChainData{}, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for key %v", key) + return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for key %v", key) +} + +// trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. +func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *types.OffChainData { + data, err := sequencer.GetData(bs.sequencer.GetUrl(), batchNum) + if err != nil { + log.Warnf("failed to get data from sequencer: %v", err) + return nil + } + expectKey := crypto.Keccak256Hash(data.L2Data) + if key != expectKey { + log.Warnf("sequencer gave wrong data for key: %s", key.Hex()) + return nil + } + return &types.OffChainData{ + Key: key, + Value: data.L2Data, + } +} + +func (bs *BatchSynchronizer) resolveWithMember(key common.Hash, member etherman.DataCommitteeMember) (*types.OffChainData, error) { + cm := client.New(member.URL) + ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout) + defer cancel() + + log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, key.Hex()) + + bytes, err := cm.GetOffChainData(ctx, key) + if err != nil { + return nil, err + } + expectKey := crypto.Keccak256Hash(bytes) + if key != expectKey { + return nil, err + } + return &types.OffChainData{ + Key: key, + Value: bytes, + }, nil } diff --git a/synchronizer/rpc.go b/synchronizer/rpc.go deleted file mode 100644 index 02aaf90f..00000000 --- a/synchronizer/rpc.go +++ /dev/null @@ -1,28 +0,0 @@ -package synchronizer - -import ( - "context" - "time" - - "github.com/0xPolygon/cdk-data-availability/client" - "github.com/0xPolygon/cdk-data-availability/etherman" - "github.com/0xPolygon/cdk-data-availability/log" - "github.com/0xPolygon/cdk-data-availability/types" - "github.com/ethereum/go-ethereum/common" -) - -const rpcTimeout = 3 * time.Second - -func resolveWithMember(key common.Hash, member etherman.DataCommitteeMember) (types.OffChainData, error) { - cm := client.New(member.URL) - ctx, cancel := context.WithTimeout(context.Background(), rpcTimeout) - defer cancel() - - log.Debugf("trying member %v at %v for key %v", member.Addr.Hex(), member.URL, key.Hex()) - - bytes, err := cm.GetOffChainData(ctx, key) - return types.OffChainData{ - Key: key, - Value: bytes, - }, err -} diff --git a/test/e2e/sequencer_tracker_test.go b/test/e2e/sequencer_tracker_test.go index 9b62d8af..8a69a300 100644 --- a/test/e2e/sequencer_tracker_test.go +++ b/test/e2e/sequencer_tracker_test.go @@ -6,7 +6,7 @@ import ( "github.com/0xPolygon/cdk-data-availability/config" "github.com/0xPolygon/cdk-data-availability/etherman" - "github.com/0xPolygon/cdk-data-availability/synchronizer" + "github.com/0xPolygon/cdk-data-availability/sequencer" "github.com/0xPolygon/cdk-data-availability/test/operations" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" @@ -26,7 +26,7 @@ func TestSequencerAddrExists(t *testing.T) { etherman, err := etherman.New(cfg.L1) require.NoError(t, err) - tracker, err := synchronizer.NewSequencerTracker(cfg.L1, etherman) + tracker, err := sequencer.NewSequencerTracker(cfg.L1, etherman) require.NoError(t, err) addr := tracker.GetAddr() From c8531b4717b5cada91f7578e522e25f51044703c Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 15 Nov 2023 11:36:31 -0500 Subject: [PATCH 02/13] pass tracker to synchronizer --- cmd/main.go | 2 +- sequencer/tracker.go | 2 ++ synchronizer/batches.go | 3 +++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/main.go b/cmd/main.go index 4b4b42ff..0ccb2b33 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -112,7 +112,7 @@ func start(cliCtx *cli.Context) error { cancelFuncs = append(cancelFuncs, detector.Stop) - batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, storage, detector.Subscribe(), etherman) + batchSynchronizer, err := synchronizer.NewBatchSynchronizer(c.L1, selfAddr, storage, detector.Subscribe(), etherman, sequencerTracker) if err != nil { log.Fatal(err) } diff --git a/sequencer/tracker.go b/sequencer/tracker.go index dcc1bd37..3f6f39ac 100644 --- a/sequencer/tracker.go +++ b/sequencer/tracker.go @@ -32,10 +32,12 @@ func NewSequencerTracker(cfg config.L1Config, ethClient *etherman.Etherman) (*Se if err != nil { return nil, err } + log.Infof("current sequencer addr: %s", addr.Hex()) url, err := ethClient.TrustedSequencerURL() if err != nil { return nil, err } + log.Infof("current sequencer url: %s", url) w := &SequencerTracker{ client: ethClient, stop: make(chan struct{}), diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 4d6f81ec..79df1c9d 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -45,6 +45,7 @@ func NewBatchSynchronizer( db *db.DB, reorgs <-chan BlockReorg, ethClient *etherman.Etherman, + sequencer *sequencer.SequencerTracker, ) (*BatchSynchronizer, error) { if cfg.BlockBatchSize == 0 { log.Infof("block batch size is not set, setting to default %d", defaultBlockBatchSize) @@ -60,6 +61,7 @@ func NewBatchSynchronizer( db: db, reorgs: reorgs, events: make(chan *cdkvalidium.CdkvalidiumSequenceBatches), + sequencer: sequencer, } return synchronizer, synchronizer.resolveCommittee() } @@ -277,6 +279,7 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O // trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *types.OffChainData { + log.Debugf("resolving batch %d, key %s, with sequencer at %s", batchNum, key.Hex(), bs.sequencer.GetUrl()) data, err := sequencer.GetData(bs.sequencer.GetUrl(), batchNum) if err != nil { log.Warnf("failed to get data from sequencer: %v", err) From adb6daa5cde41142f40961a9dbf6ebb42b78da8c Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 15 Nov 2023 13:17:46 -0500 Subject: [PATCH 03/13] set the trusted seq url in test --- test/e2e/datacommittee_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/test/e2e/datacommittee_test.go b/test/e2e/datacommittee_test.go index 502c8caf..16753275 100644 --- a/test/e2e/datacommittee_test.go +++ b/test/e2e/datacommittee_test.go @@ -71,6 +71,16 @@ func TestDataCommittee(t *testing.T) { require.NoError(t, err) clientL1, err := ethclient.Dial(operations.DefaultL1NetworkURL) require.NoError(t, err) + + // The default sequencer URL is incorrect, set it to match the docker container + validiumContract, err := cdkvalidium.NewCdkvalidium( + common.HexToAddress(operations.DefaultL1CDKValidiumSmartContract), + clientL1, + ) + require.NoError(t, err) + _, err = validiumContract.SetTrustedSequencerURL(authL1, "http://zkevm-node:8123") + require.NoError(t, err) + dacSC, err := cdkdatacommittee.NewCdkdatacommittee( common.HexToAddress(operations.DefaultL1DataCommitteeContract), clientL1, From 34a6cbac7f189f8d305da006cb284692b94d6dd8 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Wed, 15 Nov 2023 13:47:13 -0500 Subject: [PATCH 04/13] update tracker test with url change --- test/e2e/sequencer_tracker_test.go | 34 +++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/test/e2e/sequencer_tracker_test.go b/test/e2e/sequencer_tracker_test.go index 8a69a300..b04f7017 100644 --- a/test/e2e/sequencer_tracker_test.go +++ b/test/e2e/sequencer_tracker_test.go @@ -1,14 +1,18 @@ package e2e import ( + "fmt" + "math/rand" "testing" "time" "github.com/0xPolygon/cdk-data-availability/config" "github.com/0xPolygon/cdk-data-availability/etherman" + "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium" "github.com/0xPolygon/cdk-data-availability/sequencer" "github.com/0xPolygon/cdk-data-availability/test/operations" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" ) @@ -29,12 +33,32 @@ func TestSequencerAddrExists(t *testing.T) { tracker, err := sequencer.NewSequencerTracker(cfg.L1, etherman) require.NoError(t, err) + go tracker.Start() + defer tracker.Stop() + addr := tracker.GetAddr() require.Equal(t, common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), addr) - go tracker.Start() - select { - case <-time.After(1 * time.Second): - tracker.Stop() - } + url := tracker.GetUrl() + require.Equal(t, "http://cdk-validium-json-rpc:8123", url) // the default + + clientL1, err := ethclient.Dial(operations.DefaultL1NetworkURL) + require.NoError(t, err) + validiumContract, err := cdkvalidium.NewCdkvalidium( + common.HexToAddress(operations.DefaultL1CDKValidiumSmartContract), + clientL1, + ) + require.NoError(t, err) + + authL1, err := operations.GetAuth(operations.DefaultSequencerPrivateKey, operations.DefaultL1ChainID) + require.NoError(t, err) + + newUrl := fmt.Sprintf("http://something-else:%d", rand.Intn(10000)) + _, err = validiumContract.SetTrustedSequencerURL(authL1, newUrl) + require.NoError(t, err) + + // give the tracker a sec to get the event + <-time.After(2500 * time.Millisecond) + + require.Equal(t, newUrl, tracker.GetUrl()) } From 7d1cf965f73d7ca0717f7a633051c6efb436a4c9 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Thu, 16 Nov 2023 15:20:07 -0500 Subject: [PATCH 05/13] logging cleanup --- etherman/etherman.go | 2 +- synchronizer/batches.go | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/etherman/etherman.go b/etherman/etherman.go index ef484162..00281f20 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -162,5 +162,5 @@ func ParseEvent(event *cdkvalidium.CdkvalidiumSequenceBatches, txData []byte) (u for _, batch := range batches { keys = append(keys, batch.TransactionsHash) } - return event.Raw.BlockNumber, keys, nil + return event.NumBatch, keys, nil } diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 79df1c9d..ef06b2a2 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -3,6 +3,7 @@ package synchronizer import ( "context" "math/rand" + "strings" "sync" "time" @@ -203,25 +204,28 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB return err } txData := tx.Data() - _, keys, err := etherman.ParseEvent(event, txData) + bn, keys, err := etherman.ParseEvent(event, txData) if err != nil { return err } // collect keys that need to be resolved var missing []common.Hash + var missingHex []string for _, key := range keys { if !exists(bs.db, key) { // this could be a single query that takes the whole list and returns missing ones missing = append(missing, key) + missingHex = append(missingHex, key.Hex()) } } if len(missing) == 0 { return nil } + log.Debugf("missing: NumBatch: %d, Txs: %s", event.NumBatch, bn, strings.Join(missingHex, ",")) + var data []types.OffChainData for _, key := range missing { - log.Infof("resolving missing key %v", key.Hex()) var value *types.OffChainData value, err = bs.resolve(event.NumBatch, key) if err != nil { @@ -235,8 +239,6 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB } func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.OffChainData, error) { - log.Debugf("resolving missing data for key %v", key.Hex()) - data := bs.trySequencer(batchNum, key) if data != nil { return data, nil @@ -264,7 +266,6 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O delete(bs.committee, member.Addr) continue // malformed committee, skip what is known to be wrong } - log.Infof("trying DAC %s: %s", member.Addr.Hex(), member.URL) value, err := bs.resolveWithMember(key, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) From 045bed583d7467143ac82cd9ca233c20998ca92e Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Fri, 17 Nov 2023 08:45:42 -0500 Subject: [PATCH 06/13] complete the rpc struct for seq call --- rpc/types.go | 30 +++++++++++++++++ sequencer/call.go | 73 +++++++++++++++++++++++++++++++++++++++-- synchronizer/batches.go | 7 ++-- 3 files changed, 105 insertions(+), 5 deletions(-) diff --git a/rpc/types.go b/rpc/types.go index 5b4ee2dc..72429adb 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -191,3 +191,33 @@ func HexEncodeBig(bigint *big.Int) string { return fmt.Sprintf("%#x", bigint) } + +// ArgBig helps to marshal big number values provided in the RPC requests +type ArgBig big.Int + +// UnmarshalText unmarshals an instance of ArgBig into an array of bytes +func (a *ArgBig) UnmarshalText(input []byte) error { + buf, err := decodeToHex(input) + if err != nil { + return err + } + + b := new(big.Int) + b.SetBytes(buf) + *a = ArgBig(*b) + + return nil +} + +// MarshalText marshals an array of bytes into an instance of ArgBig +func (a ArgBig) MarshalText() ([]byte, error) { + b := (*big.Int)(&a) + + return []byte("0x" + b.Text(hexBase)), nil +} + +// Hex returns a hexadecimal representation +func (b ArgBig) Hex() string { + bb, _ := b.MarshalText() + return string(bb) +} diff --git a/sequencer/call.go b/sequencer/call.go index f1772fc4..5864318b 100644 --- a/sequencer/call.go +++ b/sequencer/call.go @@ -5,11 +5,12 @@ import ( "fmt" "github.com/0xPolygon/cdk-data-availability/rpc" - "github.com/0xPolygon/cdk-data-availability/types" + "github.com/ethereum/go-ethereum/common" + etypes "github.com/ethereum/go-ethereum/core/types" ) // GetData returns batch data from the trusted sequencer -func GetData(url string, batchNum uint64) (*types.Batch, error) { +func GetData(url string, batchNum uint64) (*SequenceBatch, error) { response, err := rpc.JSONRPCCall(url, "zkevm_getBatchByNumber", batchNum, true) if err != nil { return nil, err @@ -17,10 +18,76 @@ func GetData(url string, batchNum uint64) (*types.Batch, error) { if response.Error != nil { return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message) } - var result types.Batch + var result SequenceBatch err = json.Unmarshal(response.Result, &result) if err != nil { return nil, err } return &result, nil } + +// SequenceBatch is the batch by number structure returned by trusted sequencer +type SequenceBatch struct { + Number rpc.ArgUint64 `json:"number"` + ForcedBatchNumber *rpc.ArgUint64 `json:"forcedBatchNumber,omitempty"` + Coinbase common.Address `json:"coinbase"` + StateRoot common.Hash `json:"stateRoot"` + GlobalExitRoot common.Hash `json:"globalExitRoot"` + MainnetExitRoot common.Hash `json:"mainnetExitRoot"` + RollupExitRoot common.Hash `json:"rollupExitRoot"` + LocalExitRoot common.Hash `json:"localExitRoot"` + AccInputHash common.Hash `json:"accInputHash"` + Timestamp rpc.ArgUint64 `json:"timestamp"` + SendSequencesTxHash *common.Hash `json:"sendSequencesTxHash"` + VerifyBatchTxHash *common.Hash `json:"verifyBatchTxHash"` + Closed bool `json:"closed"` + //Blocks []BlockOrHash `json:"blocks"` + Transactions []TransactionOrHash `json:"transactions"` + BatchL2Data rpc.ArgBytes `json:"batchL2Data"` +} + +// TransactionOrHash for union type of transaction and types.Hash +type TransactionOrHash struct { + Hash *common.Hash + Tx *Transaction +} + +// Transaction structure +type Transaction struct { + Nonce rpc.ArgUint64 `json:"nonce"` + GasPrice rpc.ArgBig `json:"gasPrice"` + Gas rpc.ArgUint64 `json:"gas"` + To *common.Address `json:"to"` + Value rpc.ArgBig `json:"value"` + Input rpc.ArgBytes `json:"input"` + V rpc.ArgBig `json:"v"` + R rpc.ArgBig `json:"r"` + S rpc.ArgBig `json:"s"` + Hash common.Hash `json:"hash"` + From common.Address `json:"from"` + BlockHash *common.Hash `json:"blockHash"` + BlockNumber *rpc.ArgUint64 `json:"blockNumber"` + TxIndex *rpc.ArgUint64 `json:"transactionIndex"` + ChainID rpc.ArgBig `json:"chainId"` + Type rpc.ArgUint64 `json:"type"` + Receipt *Receipt `json:"receipt,omitempty"` +} + +// Receipt structure +type Receipt struct { + Root common.Hash `json:"root"` + CumulativeGasUsed rpc.ArgUint64 `json:"cumulativeGasUsed"` + LogsBloom etypes.Bloom `json:"logsBloom"` + Logs []*etypes.Log `json:"logs"` + Status rpc.ArgUint64 `json:"status"` + TxHash common.Hash `json:"transactionHash"` + TxIndex rpc.ArgUint64 `json:"transactionIndex"` + BlockHash common.Hash `json:"blockHash"` + BlockNumber rpc.ArgUint64 `json:"blockNumber"` + GasUsed rpc.ArgUint64 `json:"gasUsed"` + FromAddr common.Address `json:"from"` + ToAddr *common.Address `json:"to"` + ContractAddress *common.Address `json:"contractAddress"` + Type rpc.ArgUint64 `json:"type"` + EffectiveGasPrice *rpc.ArgBig `json:"effectiveGasPrice,omitempty"` +} diff --git a/synchronizer/batches.go b/synchronizer/batches.go index ef06b2a2..eb24c6c8 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -286,14 +286,17 @@ func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *typ log.Warnf("failed to get data from sequencer: %v", err) return nil } - expectKey := crypto.Keccak256Hash(data.L2Data) + + log.Infof(">>>> seq batch has %d txs", len(data.Transactions)) + + expectKey := crypto.Keccak256Hash(data.BatchL2Data) if key != expectKey { log.Warnf("sequencer gave wrong data for key: %s", key.Hex()) return nil } return &types.OffChainData{ Key: key, - Value: data.L2Data, + Value: data.BatchL2Data, } } From 454fb06a969907b66b2f7049ee6cebd044208f8e Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Fri, 17 Nov 2023 09:43:53 -0500 Subject: [PATCH 07/13] revert batch call struct --- sequencer/call.go | 73 ++--------------------------------------- synchronizer/batches.go | 4 +-- types/batch.go | 16 ++++++--- 3 files changed, 16 insertions(+), 77 deletions(-) diff --git a/sequencer/call.go b/sequencer/call.go index 5864318b..f1772fc4 100644 --- a/sequencer/call.go +++ b/sequencer/call.go @@ -5,12 +5,11 @@ import ( "fmt" "github.com/0xPolygon/cdk-data-availability/rpc" - "github.com/ethereum/go-ethereum/common" - etypes "github.com/ethereum/go-ethereum/core/types" + "github.com/0xPolygon/cdk-data-availability/types" ) // GetData returns batch data from the trusted sequencer -func GetData(url string, batchNum uint64) (*SequenceBatch, error) { +func GetData(url string, batchNum uint64) (*types.Batch, error) { response, err := rpc.JSONRPCCall(url, "zkevm_getBatchByNumber", batchNum, true) if err != nil { return nil, err @@ -18,76 +17,10 @@ func GetData(url string, batchNum uint64) (*SequenceBatch, error) { if response.Error != nil { return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message) } - var result SequenceBatch + var result types.Batch err = json.Unmarshal(response.Result, &result) if err != nil { return nil, err } return &result, nil } - -// SequenceBatch is the batch by number structure returned by trusted sequencer -type SequenceBatch struct { - Number rpc.ArgUint64 `json:"number"` - ForcedBatchNumber *rpc.ArgUint64 `json:"forcedBatchNumber,omitempty"` - Coinbase common.Address `json:"coinbase"` - StateRoot common.Hash `json:"stateRoot"` - GlobalExitRoot common.Hash `json:"globalExitRoot"` - MainnetExitRoot common.Hash `json:"mainnetExitRoot"` - RollupExitRoot common.Hash `json:"rollupExitRoot"` - LocalExitRoot common.Hash `json:"localExitRoot"` - AccInputHash common.Hash `json:"accInputHash"` - Timestamp rpc.ArgUint64 `json:"timestamp"` - SendSequencesTxHash *common.Hash `json:"sendSequencesTxHash"` - VerifyBatchTxHash *common.Hash `json:"verifyBatchTxHash"` - Closed bool `json:"closed"` - //Blocks []BlockOrHash `json:"blocks"` - Transactions []TransactionOrHash `json:"transactions"` - BatchL2Data rpc.ArgBytes `json:"batchL2Data"` -} - -// TransactionOrHash for union type of transaction and types.Hash -type TransactionOrHash struct { - Hash *common.Hash - Tx *Transaction -} - -// Transaction structure -type Transaction struct { - Nonce rpc.ArgUint64 `json:"nonce"` - GasPrice rpc.ArgBig `json:"gasPrice"` - Gas rpc.ArgUint64 `json:"gas"` - To *common.Address `json:"to"` - Value rpc.ArgBig `json:"value"` - Input rpc.ArgBytes `json:"input"` - V rpc.ArgBig `json:"v"` - R rpc.ArgBig `json:"r"` - S rpc.ArgBig `json:"s"` - Hash common.Hash `json:"hash"` - From common.Address `json:"from"` - BlockHash *common.Hash `json:"blockHash"` - BlockNumber *rpc.ArgUint64 `json:"blockNumber"` - TxIndex *rpc.ArgUint64 `json:"transactionIndex"` - ChainID rpc.ArgBig `json:"chainId"` - Type rpc.ArgUint64 `json:"type"` - Receipt *Receipt `json:"receipt,omitempty"` -} - -// Receipt structure -type Receipt struct { - Root common.Hash `json:"root"` - CumulativeGasUsed rpc.ArgUint64 `json:"cumulativeGasUsed"` - LogsBloom etypes.Bloom `json:"logsBloom"` - Logs []*etypes.Log `json:"logs"` - Status rpc.ArgUint64 `json:"status"` - TxHash common.Hash `json:"transactionHash"` - TxIndex rpc.ArgUint64 `json:"transactionIndex"` - BlockHash common.Hash `json:"blockHash"` - BlockNumber rpc.ArgUint64 `json:"blockNumber"` - GasUsed rpc.ArgUint64 `json:"gasUsed"` - FromAddr common.Address `json:"from"` - ToAddr *common.Address `json:"to"` - ContractAddress *common.Address `json:"contractAddress"` - Type rpc.ArgUint64 `json:"type"` - EffectiveGasPrice *rpc.ArgBig `json:"effectiveGasPrice,omitempty"` -} diff --git a/synchronizer/batches.go b/synchronizer/batches.go index eb24c6c8..609f2940 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -289,14 +289,14 @@ func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *typ log.Infof(">>>> seq batch has %d txs", len(data.Transactions)) - expectKey := crypto.Keccak256Hash(data.BatchL2Data) + expectKey := crypto.Keccak256Hash(data.L2Data) if key != expectKey { log.Warnf("sequencer gave wrong data for key: %s", key.Hex()) return nil } return &types.OffChainData{ Key: key, - Value: data.BatchL2Data, + Value: data.L2Data, } } diff --git a/types/batch.go b/types/batch.go index 317ff516..fddef5f0 100644 --- a/types/batch.go +++ b/types/batch.go @@ -10,11 +10,17 @@ import ( // Batch represents a batch used for synchronization type Batch struct { - Number rpc.ArgUint64 `json:"number"` - GlobalExitRoot common.Hash `json:"globalExitRoot"` - Timestamp rpc.ArgUint64 `json:"timestamp"` - Coinbase common.Address `json:"coinbase"` - L2Data rpc.ArgBytes `json:"batchL2Data"` + Number rpc.ArgUint64 `json:"number"` + GlobalExitRoot common.Hash `json:"globalExitRoot"` + Timestamp rpc.ArgUint64 `json:"timestamp"` + Coinbase common.Address `json:"coinbase"` + L2Data rpc.ArgBytes `json:"batchL2Data"` + Transactions []TransactionOrHash `json:"transactions"` +} + +// TransactionOrHash for union type of transaction and types.Hash +type TransactionOrHash struct { + Hash *common.Hash } // HashToSign returns a hash that uniquely identifies the batch From f04b47f9e4e550a2ae2db855b2d9b02330be30fc Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Fri, 17 Nov 2023 10:25:38 -0500 Subject: [PATCH 08/13] minor reorg --- etherman/etherman.go | 34 ---------------------------- sequencer/call.go | 16 ++++++++++--- synchronizer/batches.go | 10 ++++----- synchronizer/util.go | 41 ++++++++++++++++++++++++++++++++++ test/e2e/datacommittee_test.go | 4 ++-- 5 files changed, 60 insertions(+), 45 deletions(-) create mode 100644 synchronizer/util.go diff --git a/etherman/etherman.go b/etherman/etherman.go index 00281f20..917a669f 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -2,17 +2,14 @@ package etherman import ( "context" - "encoding/json" "fmt" "math/big" - "strings" "github.com/0xPolygon/cdk-data-availability/config" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkdatacommittee" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium" "github.com/0xPolygon/cdk-data-availability/log" "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -133,34 +130,3 @@ func (e *Etherman) GetCurrentDataCommitteeMembers() ([]DataCommitteeMember, erro } return members, nil } - -// ParseEvent unpacks the keys in a SequenceBatches event -func ParseEvent(event *cdkvalidium.CdkvalidiumSequenceBatches, txData []byte) (uint64, []common.Hash, error) { - a, err := abi.JSON(strings.NewReader(cdkvalidium.CdkvalidiumABI)) - if err != nil { - return 0, nil, err - } - method, err := a.MethodById(txData[:4]) - if err != nil { - return 0, nil, err - } - data, err := method.Inputs.Unpack(txData[4:]) - if err != nil { - return 0, nil, err - } - var batches []cdkvalidium.CDKValidiumBatchData - bytes, err := json.Marshal(data[0]) - if err != nil { - return 0, nil, err - } - err = json.Unmarshal(bytes, &batches) - if err != nil { - return 0, nil, err - } - - var keys []common.Hash - for _, batch := range batches { - keys = append(keys, batch.TransactionsHash) - } - return event.NumBatch, keys, nil -} diff --git a/sequencer/call.go b/sequencer/call.go index f1772fc4..5a266204 100644 --- a/sequencer/call.go +++ b/sequencer/call.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/0xPolygon/cdk-data-availability/rpc" - "github.com/0xPolygon/cdk-data-availability/types" + "github.com/ethereum/go-ethereum/common" ) // GetData returns batch data from the trusted sequencer -func GetData(url string, batchNum uint64) (*types.Batch, error) { +func GetData(url string, batchNum uint64) (*SeqBatch, error) { response, err := rpc.JSONRPCCall(url, "zkevm_getBatchByNumber", batchNum, true) if err != nil { return nil, err @@ -17,10 +17,20 @@ func GetData(url string, batchNum uint64) (*types.Batch, error) { if response.Error != nil { return nil, fmt.Errorf("%d - %s", response.Error.Code, response.Error.Message) } - var result types.Batch + var result SeqBatch err = json.Unmarshal(response.Result, &result) if err != nil { return nil, err } return &result, nil } + +// SeqBatch structure +type SeqBatch struct { + Number rpc.ArgUint64 `json:"number"` + AccInputHash common.Hash `json:"accInputHash"` + SendSequencesTxHash *common.Hash `json:"sendSequencesTxHash"` + VerifyBatchTxHash *common.Hash `json:"verifyBatchTxHash"` + Closed bool `json:"closed"` + BatchL2Data rpc.ArgBytes `json:"batchL2Data"` +} diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 609f2940..80d37c2a 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -204,7 +204,7 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB return err } txData := tx.Data() - bn, keys, err := etherman.ParseEvent(event, txData) + keys, err := UnpackTxData(txData) if err != nil { return err } @@ -222,7 +222,7 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB return nil } - log.Debugf("missing: NumBatch: %d, Txs: %s", event.NumBatch, bn, strings.Join(missingHex, ",")) + log.Debugf("missing: NumBatch: %d, Txs: %s", event.NumBatch, strings.Join(missingHex, ",")) var data []types.OffChainData for _, key := range missing { @@ -287,16 +287,14 @@ func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *typ return nil } - log.Infof(">>>> seq batch has %d txs", len(data.Transactions)) - - expectKey := crypto.Keccak256Hash(data.L2Data) + expectKey := crypto.Keccak256Hash(data.BatchL2Data) if key != expectKey { log.Warnf("sequencer gave wrong data for key: %s", key.Hex()) return nil } return &types.OffChainData{ Key: key, - Value: data.L2Data, + Value: data.BatchL2Data, } } diff --git a/synchronizer/util.go b/synchronizer/util.go new file mode 100644 index 00000000..e7bc9ec1 --- /dev/null +++ b/synchronizer/util.go @@ -0,0 +1,41 @@ +package synchronizer + +import ( + "encoding/json" + "strings" + + "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium" + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" +) + +// UnpackTxData unpacks the keys in a SequenceBatches event +func UnpackTxData(txData []byte) ([]common.Hash, error) { + a, err := abi.JSON(strings.NewReader(cdkvalidium.CdkvalidiumABI)) + if err != nil { + return nil, err + } + method, err := a.MethodById(txData[:4]) + if err != nil { + return nil, err + } + data, err := method.Inputs.Unpack(txData[4:]) + if err != nil { + return nil, err + } + var batches []cdkvalidium.CDKValidiumBatchData + bytes, err := json.Marshal(data[0]) + if err != nil { + return nil, err + } + err = json.Unmarshal(bytes, &batches) + if err != nil { + return nil, err + } + + var keys []common.Hash + for _, batch := range batches { + keys = append(keys, batch.TransactionsHash) + } + return keys, nil +} diff --git a/test/e2e/datacommittee_test.go b/test/e2e/datacommittee_test.go index 16753275..ddc9d385 100644 --- a/test/e2e/datacommittee_test.go +++ b/test/e2e/datacommittee_test.go @@ -17,11 +17,11 @@ import ( "github.com/0xPolygon/cdk-data-availability/config" cTypes "github.com/0xPolygon/cdk-data-availability/config/types" "github.com/0xPolygon/cdk-data-availability/db" - "github.com/0xPolygon/cdk-data-availability/etherman" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkdatacommittee" "github.com/0xPolygon/cdk-data-availability/etherman/smartcontracts/cdkvalidium" "github.com/0xPolygon/cdk-data-availability/log" "github.com/0xPolygon/cdk-data-availability/rpc" + "github.com/0xPolygon/cdk-data-availability/synchronizer" "github.com/0xPolygon/cdk-data-availability/test/operations" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -216,7 +216,7 @@ func getSequenceBatchesKeys(clientL1 *ethclient.Client, event *cdkvalidium.Cdkva return nil, err } txData := tx.Data() - _, keys, err := etherman.ParseEvent(event, txData) + keys, err := synchronizer.UnpackTxData(txData) return keys, err } From 7c3d4f9d2701e67a805a2c5085759f5d2f1ac222 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Mon, 20 Nov 2023 10:55:36 -0500 Subject: [PATCH 09/13] speculative fix for missing batch num --- sequencer/call.go | 9 +++---- synchronizer/batches.go | 60 ++++++++++++++++++++++++++++++++--------- 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/sequencer/call.go b/sequencer/call.go index 5a266204..9416a0c3 100644 --- a/sequencer/call.go +++ b/sequencer/call.go @@ -27,10 +27,7 @@ func GetData(url string, batchNum uint64) (*SeqBatch, error) { // SeqBatch structure type SeqBatch struct { - Number rpc.ArgUint64 `json:"number"` - AccInputHash common.Hash `json:"accInputHash"` - SendSequencesTxHash *common.Hash `json:"sendSequencesTxHash"` - VerifyBatchTxHash *common.Hash `json:"verifyBatchTxHash"` - Closed bool `json:"closed"` - BatchL2Data rpc.ArgBytes `json:"batchL2Data"` + Number rpc.ArgUint64 `json:"number"` + AccInputHash common.Hash `json:"accInputHash"` + BatchL2Data rpc.ArgBytes `json:"batchL2Data"` } diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 80d37c2a..c0acdab4 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -3,7 +3,6 @@ package synchronizer import ( "context" "math/rand" - "strings" "sync" "time" @@ -195,6 +194,11 @@ func (bs *BatchSynchronizer) consumeEvents() { } } +type batchKey struct { + batch uint64 + hash common.Hash +} + func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceBatches) error { ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout) defer cancel() @@ -209,25 +213,34 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB return err } + var batchKeys []batchKey + for i := event.NumBatch - uint64(len(keys)); i < event.NumBatch; i++ { + bk := batchKey{ + batch: i + 1, + hash: keys[i], + } + batchKeys = append(batchKeys, bk) + } + // collect keys that need to be resolved - var missing []common.Hash - var missingHex []string - for _, key := range keys { - if !exists(bs.db, key) { // this could be a single query that takes the whole list and returns missing ones + var missing []batchKey + for _, key := range batchKeys { + if !exists(bs.db, key.hash) { missing = append(missing, key) - missingHex = append(missingHex, key.Hex()) } } if len(missing) == 0 { return nil } - log.Debugf("missing: NumBatch: %d, Txs: %s", event.NumBatch, strings.Join(missingHex, ",")) + for _, key := range missing { + log.Debugf("missing event batchNum: %d, calc batch: %d, hash: %s", event.NumBatch, key.batch, key.hash.Hex()) + } var data []types.OffChainData for _, key := range missing { var value *types.OffChainData - value, err = bs.resolve(event.NumBatch, key) + value, err = bs.resolve(key.batch, key.hash) if err != nil { return err } @@ -278,23 +291,46 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for key %v", key) } +func testBatch1(url string, key common.Hash) { + log.Infof("trying batch 1 for testings") + test, err := sequencer.GetData(url, 1) + if err != nil { + log.Errorf("testing error: %v", err) + } + if test != nil { + expectKey := crypto.Keccak256Hash(test.BatchL2Data) + if key != expectKey { + log.Warnf("testing batch %d: sequencer gave wrong data for key: %s", 1, key.Hex()) + } else { + log.Infof("testing batch 1: key that was in batch 2 event found in batch 1: %s", key.Hex()) + } + } else { + log.Warnf("testing did not retrieve batch 1") + } +} + // trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *types.OffChainData { log.Debugf("resolving batch %d, key %s, with sequencer at %s", batchNum, key.Hex(), bs.sequencer.GetUrl()) - data, err := sequencer.GetData(bs.sequencer.GetUrl(), batchNum) + seqBatch, err := sequencer.GetData(bs.sequencer.GetUrl(), batchNum) if err != nil { log.Warnf("failed to get data from sequencer: %v", err) return nil } - expectKey := crypto.Keccak256Hash(data.BatchL2Data) + if batchNum == 2 && key.Hex() == "0x5931a88630bc594b49cc3ecb6782714cdba77e1f2033343a79411f7baa818eb5" { + testBatch1(bs.sequencer.GetUrl(), key) + } + + expectKey := crypto.Keccak256Hash(seqBatch.BatchL2Data) if key != expectKey { - log.Warnf("sequencer gave wrong data for key: %s", key.Hex()) + log.Warnf("batch %d: sequencer gave wrong data for key: %s", batchNum, key.Hex()) return nil } + log.Infof("batch %d: got data from sequencer for key: %s", batchNum, key.Hex()) return &types.OffChainData{ Key: key, - Value: data.BatchL2Data, + Value: seqBatch.BatchL2Data, } } From 7573a9d9d65ed78984ac73e744e21ea4545de969 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Mon, 20 Nov 2023 10:57:12 -0500 Subject: [PATCH 10/13] remove test code --- synchronizer/batches.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synchronizer/batches.go b/synchronizer/batches.go index c0acdab4..8a061531 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -291,6 +291,7 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for key %v", key) } +/* func testBatch1(url string, key common.Hash) { log.Infof("trying batch 1 for testings") test, err := sequencer.GetData(url, 1) @@ -308,6 +309,7 @@ func testBatch1(url string, key common.Hash) { log.Warnf("testing did not retrieve batch 1") } } +*/ // trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *types.OffChainData { @@ -318,9 +320,9 @@ func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *typ return nil } - if batchNum == 2 && key.Hex() == "0x5931a88630bc594b49cc3ecb6782714cdba77e1f2033343a79411f7baa818eb5" { - testBatch1(bs.sequencer.GetUrl(), key) - } + //if batchNum == 2 && key.Hex() == "0x5931a88630bc594b49cc3ecb6782714cdba77e1f2033343a79411f7baa818eb5" { + // testBatch1(bs.sequencer.GetUrl(), key) + //} expectKey := crypto.Keccak256Hash(seqBatch.BatchL2Data) if key != expectKey { From bd5fe6081f753812b0ce576c60153b5930b14cd0 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Mon, 20 Nov 2023 11:30:22 -0500 Subject: [PATCH 11/13] fix batch num offset --- synchronizer/batches.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 8a061531..18a5d30e 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -214,12 +214,12 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB } var batchKeys []batchKey - for i := event.NumBatch - uint64(len(keys)); i < event.NumBatch; i++ { - bk := batchKey{ - batch: i + 1, + offset := event.NumBatch - uint64(len(keys)) + uint64(1) + for i := 0; i < len(keys); i++ { + batchKeys = append(batchKeys, batchKey{ + batch: offset + uint64(i), hash: keys[i], - } - batchKeys = append(batchKeys, bk) + }) } // collect keys that need to be resolved From 7c4be04ffe8e659a6e7f5664f212b91c2e3f9943 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Mon, 20 Nov 2023 12:09:25 -0500 Subject: [PATCH 12/13] dual var for --- synchronizer/batches.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 18a5d30e..e5b98015 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -214,11 +214,10 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB } var batchKeys []batchKey - offset := event.NumBatch - uint64(len(keys)) + uint64(1) - for i := 0; i < len(keys); i++ { + for i, j := 0, len(keys)-1; i < len(keys); i, j = i+1, j-1 { batchKeys = append(batchKeys, batchKey{ - batch: offset + uint64(i), - hash: keys[i], + batch: event.NumBatch - uint64(i), + hash: keys[j], }) } From 9951f817729af258e49cf403d4566230ef6e2907 Mon Sep 17 00:00:00 2001 From: Christopher Campbell Date: Tue, 21 Nov 2023 09:57:51 -0500 Subject: [PATCH 13/13] minor code cleanup --- synchronizer/batches.go | 77 +++++++++++++---------------------------- 1 file changed, 24 insertions(+), 53 deletions(-) diff --git a/synchronizer/batches.go b/synchronizer/batches.go index e5b98015..f77bb5fc 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -22,7 +22,7 @@ import ( const defaultBlockBatchSize = 32 -// BatchSynchronizer watches for batch events, checks if they are "locally" stored, then retrieves and stores missing data +// BatchSynchronizer watches for number events, checks if they are "locally" stored, then retrieves and stores missing data type BatchSynchronizer struct { client *etherman.Etherman stop chan struct{} @@ -48,7 +48,7 @@ func NewBatchSynchronizer( sequencer *sequencer.SequencerTracker, ) (*BatchSynchronizer, error) { if cfg.BlockBatchSize == 0 { - log.Infof("block batch size is not set, setting to default %d", defaultBlockBatchSize) + log.Infof("block number size is not set, setting to default %d", defaultBlockBatchSize) cfg.BlockBatchSize = defaultBlockBatchSize } synchronizer := &BatchSynchronizer{ @@ -86,7 +86,7 @@ func (bs *BatchSynchronizer) resolveCommittee() error { // Start starts the synchronizer func (bs *BatchSynchronizer) Start() { - log.Infof("starting batch synchronizer, DAC addr: %v", bs.self) + log.Infof("starting number synchronizer, DAC addr: %v", bs.self) go bs.consumeEvents() go bs.produceEvents() go bs.handleReorgs() @@ -194,9 +194,10 @@ func (bs *BatchSynchronizer) consumeEvents() { } } +// batchKey is the pairing of batch number and data hash of a batch type batchKey struct { - batch uint64 - hash common.Hash + number uint64 + hash common.Hash } func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceBatches) error { @@ -212,16 +213,16 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB if err != nil { return err } - + // The event has the _last_ batch number & list of hashes. Each hash is + // in order, so the batch number can be computed from position in array var batchKeys []batchKey for i, j := 0, len(keys)-1; i < len(keys); i, j = i+1, j-1 { batchKeys = append(batchKeys, batchKey{ - batch: event.NumBatch - uint64(i), - hash: keys[j], + number: event.NumBatch - uint64(i), + hash: keys[j], }) } - - // collect keys that need to be resolved + // Pick out any batches that are missing from storage var missing []batchKey for _, key := range batchKeys { if !exists(bs.db, key.hash) { @@ -231,32 +232,28 @@ func (bs *BatchSynchronizer) handleEvent(event *cdkvalidium.CdkvalidiumSequenceB if len(missing) == 0 { return nil } - - for _, key := range missing { - log.Debugf("missing event batchNum: %d, calc batch: %d, hash: %s", event.NumBatch, key.batch, key.hash.Hex()) - } - + // Resolve the missing data var data []types.OffChainData for _, key := range missing { var value *types.OffChainData - value, err = bs.resolve(key.batch, key.hash) + value, err = bs.resolve(key) if err != nil { return err } data = append(data, *value) } - // Finally, store the data return store(bs.db, data) } -func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.OffChainData, error) { - data := bs.trySequencer(batchNum, key) +func (bs *BatchSynchronizer) resolve(batch batchKey) (*types.OffChainData, error) { + // First try to get the data from the trusted sequencer + data := bs.trySequencer(batch) if data != nil { return data, nil } - // sequencer failed to produce data, try the other nodes + // If the sequencer failed to produce data, try the other nodes if len(bs.committee) == 0 { // committee is resolved again once all members are evicted. They can be evicted // for not having data, or their config being malformed @@ -278,7 +275,7 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O delete(bs.committee, member.Addr) continue // malformed committee, skip what is known to be wrong } - value, err := bs.resolveWithMember(key, member) + value, err := bs.resolveWithMember(batch.hash, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) delete(bs.committee, member.Addr) @@ -287,50 +284,24 @@ func (bs *BatchSynchronizer) resolve(batchNum uint64, key common.Hash) (*types.O return value, nil } - return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for key %v", key) -} - -/* -func testBatch1(url string, key common.Hash) { - log.Infof("trying batch 1 for testings") - test, err := sequencer.GetData(url, 1) - if err != nil { - log.Errorf("testing error: %v", err) - } - if test != nil { - expectKey := crypto.Keccak256Hash(test.BatchL2Data) - if key != expectKey { - log.Warnf("testing batch %d: sequencer gave wrong data for key: %s", 1, key.Hex()) - } else { - log.Infof("testing batch 1: key that was in batch 2 event found in batch 1: %s", key.Hex()) - } - } else { - log.Warnf("testing did not retrieve batch 1") - } + return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for number %d, key %v", batch.number, batch.hash.Hex()) } -*/ // trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. -func (bs *BatchSynchronizer) trySequencer(batchNum uint64, key common.Hash) *types.OffChainData { - log.Debugf("resolving batch %d, key %s, with sequencer at %s", batchNum, key.Hex(), bs.sequencer.GetUrl()) - seqBatch, err := sequencer.GetData(bs.sequencer.GetUrl(), batchNum) +func (bs *BatchSynchronizer) trySequencer(batch batchKey) *types.OffChainData { + seqBatch, err := sequencer.GetData(bs.sequencer.GetUrl(), batch.number) if err != nil { log.Warnf("failed to get data from sequencer: %v", err) return nil } - //if batchNum == 2 && key.Hex() == "0x5931a88630bc594b49cc3ecb6782714cdba77e1f2033343a79411f7baa818eb5" { - // testBatch1(bs.sequencer.GetUrl(), key) - //} - expectKey := crypto.Keccak256Hash(seqBatch.BatchL2Data) - if key != expectKey { - log.Warnf("batch %d: sequencer gave wrong data for key: %s", batchNum, key.Hex()) + if batch.hash != expectKey { + log.Warnf("number %d: sequencer gave wrong data for key: %s", batch.number, batch.hash.Hex()) return nil } - log.Infof("batch %d: got data from sequencer for key: %s", batchNum, key.Hex()) return &types.OffChainData{ - Key: key, + Key: batch.hash, Value: seqBatch.BatchL2Data, } }