diff --git a/aggregator/agglayer_client.go b/agglayer/client.go similarity index 83% rename from aggregator/agglayer_client.go rename to agglayer/client.go index 4726ccc1..18aa4bd9 100644 --- a/aggregator/agglayer_client.go +++ b/agglayer/client.go @@ -1,4 +1,4 @@ -package aggregator +package agglayer import ( "context" @@ -79,3 +79,17 @@ func (c *AggLayerClient) WaitTxToBeMined(hash common.Hash, ctx context.Context) } } } + +// SendCertificate sends a certificate to the AggLayer +func (c *AggLayerClient) SendCertificate(certificate *Certificate) error { + response, err := rpc.JSONRPCCall(c.url, "interop_sendCertificate", certificate) + if err != nil { + return err + } + + if response.Error != nil { + return fmt.Errorf("%v %v", response.Error.Code, response.Error.Message) + } + + return nil +} diff --git a/aggregator/agglayer_tx.go b/agglayer/tx.go similarity index 98% rename from aggregator/agglayer_tx.go rename to agglayer/tx.go index b0cd09c9..ca3befbb 100644 --- a/aggregator/agglayer_tx.go +++ b/agglayer/tx.go @@ -1,4 +1,4 @@ -package aggregator +package agglayer import ( "crypto/ecdsa" diff --git a/agglayer/types.go b/agglayer/types.go new file mode 100644 index 00000000..ab11864c --- /dev/null +++ b/agglayer/types.go @@ -0,0 +1,13 @@ +package agglayer + +import ( + "github.com/0xPolygon/cdk/localbridgesync" + "github.com/ethereum/go-ethereum/common" +) + +type Certificate struct { + OriginNetwork uint32 + PrevLocalExitRoot common.Hash + BridgeExits []*localbridgesync.Bridge + ImportedBridgeExits []*localbridgesync.Claim +} diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index 97f4c21a..489667bd 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -17,6 +17,7 @@ import ( "github.com/0xPolygon/cdk-rpc/rpc" cdkTypes "github.com/0xPolygon/cdk-rpc/types" + "github.com/0xPolygon/cdk/agglayer" ethmanTypes "github.com/0xPolygon/cdk/aggregator/ethmantypes" "github.com/0xPolygon/cdk/aggregator/prover" cdkcommon "github.com/0xPolygon/cdk/common" @@ -85,7 +86,7 @@ type Aggregator struct { exit context.CancelFunc sequencerPrivateKey *ecdsa.PrivateKey - aggLayerClient AgglayerClientInterface + aggLayerClient agglayer.AgglayerClientInterface } // New creates a new aggregator. @@ -148,12 +149,12 @@ func New( } var ( - aggLayerClient AgglayerClientInterface + aggLayerClient agglayer.AgglayerClientInterface sequencerPrivateKey *ecdsa.PrivateKey ) if cfg.SettlementBackend == AggLayer { - aggLayerClient = NewAggLayerClient(cfg.AggLayerURL) + aggLayerClient = agglayer.NewAggLayerClient(cfg.AggLayerURL) sequencerPrivateKey, err = newKeyFromKeystore(cfg.SequencerPrivateKey) if err != nil { @@ -655,10 +656,10 @@ func (a *Aggregator) settleWithAggLayer( inputs ethmanTypes.FinalProofInputs) bool { proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x") proofBytes := common.Hex2Bytes(proofStrNo0x) - tx := Tx{ + tx := agglayer.Tx{ LastVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumber - 1), NewVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumberFinal), - ZKP: ZKP{ + ZKP: agglayer.ZKP{ NewStateRoot: common.BytesToHash(inputs.NewStateRoot), NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot), Proof: cdkTypes.ArgBytes(proofBytes), diff --git a/aggsender/aggsender.go b/aggsender/aggsender.go new file mode 100644 index 00000000..ad29bde1 --- /dev/null +++ b/aggsender/aggsender.go @@ -0,0 +1,183 @@ +package aggsender + +import ( + "context" + "encoding/json" + "math/big" + "path/filepath" + "time" + + "github.com/0xPolygon/cdk/agglayer" + cdkCommon "github.com/0xPolygon/cdk/common" + "github.com/0xPolygon/cdk/localbridgesync" + "github.com/0xPolygon/cdk/log" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/mdbx" +) + +const ( + aggSenderDBFolder = "aggsender" + sentCertificatesTable = "sent_certificates" +) + +var claimsTimeout = time.Second * 60 + +func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg { + return kv.TableCfg{ + sentCertificatesTable: {}, + } +} + +// AggSender is a component that will send certificates to the aggLayer +type AggSender struct { + syncer *localbridgesync.LocalBridgeSync + db kv.RwDB + + client localbridgesync.EthClienter + aggLayerClient agglayer.AggLayerClient + + originNetwork uint32 + lastSentCertificateBlock *uint64 +} + +// New returns a new AggSender +func New( + originNetwork uint32, + reorgDetector localbridgesync.ReorgDetector, + aggLayerClient agglayer.AggLayerClient, + l2Client localbridgesync.EthClienter, + syncerCfg localbridgesync.LocalBridgeSyncerConfig) (*AggSender, error) { + syncer, err := localbridgesync.New(syncerCfg, reorgDetector, l2Client) + if err != nil { + return nil, err + } + + db, err := mdbx.NewMDBX(nil). + Path(filepath.Join(syncerCfg.DBPath, aggSenderDBFolder)). + WithTableCfg(tableCfgFunc). + Open() + if err != nil { + return nil, err + } + + return &AggSender{ + db: db, + syncer: syncer, + client: l2Client, + originNetwork: originNetwork, + aggLayerClient: aggLayerClient, + }, nil +} + +// Start starts the AggSender +func (a *AggSender) Start(ctx context.Context) { + go a.syncer.Sync(ctx) + go a.sendCertificates(ctx) +} + +// sendCertificates sends certificates to the aggLayer +func (a *AggSender) sendCertificates(ctx context.Context) { + ticker := time.NewTicker(claimsTimeout) + + for { + select { + case <-ticker.C: + lastSentCertificate, err := a.getLastSentCertificateBlock(ctx) + if err != nil { + log.Error("Error getting last sent certificate", "err", err) + continue + } + + lastFinalizedBlock, err := a.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) + if err != nil { + log.Error("Error getting block number", "err", err) + continue + } + + bridgeEvents, err := a.syncer.GetClaimsAndBridges(ctx, lastSentCertificate+1, lastFinalizedBlock.Nonce.Uint64()) + if err != nil { + log.Error("Error getting claims and bridges", "err", err) + continue + } + + certificate := a.buildCertificate(bridgeEvents) + + if err := a.aggLayerClient.SendCertificate(certificate); err != nil { + log.Error("Error sending certificate", "err", err) + continue + } + + if err := a.saveLastSentCertificate(ctx, lastFinalizedBlock.Nonce.Uint64(), certificate); err != nil { + log.Error("Error saving last sent certificate in db", "err", err) + continue + } + case <-ctx.Done(): + log.Info("AggSender stopped") + return + } + } +} + +// buildCertificate builds a certificate from the bridge events +func (a *AggSender) buildCertificate(bridgeEvents []localbridgesync.BridgeEvent) *agglayer.Certificate { + bridgeExits := make([]*localbridgesync.Bridge, 0, len(bridgeEvents)) + importedBridgeExits := make([]*localbridgesync.Claim, 0, len(bridgeEvents)) + + for _, bridgeEvent := range bridgeEvents { + bridgeExits = append(bridgeExits, bridgeEvent.Bridge) + importedBridgeExits = append(importedBridgeExits, bridgeEvent.Claim) + } + + return &agglayer.Certificate{ + OriginNetwork: a.originNetwork, + PrevLocalExitRoot: common.Hash{}, + BridgeExits: bridgeExits, + ImportedBridgeExits: importedBridgeExits, + } +} + +// saveLastSentCertificate saves the last sent certificate +func (a *AggSender) saveLastSentCertificate(ctx context.Context, blockNum uint64, certificate *agglayer.Certificate) error { + a.lastSentCertificateBlock = &blockNum // save it in memory as well + + return a.db.Update(ctx, func(tx kv.RwTx) error { + raw, err := json.Marshal(certificate) + if err != nil { + return err + } + + return tx.Put(sentCertificatesTable, cdkCommon.BlockNum2Bytes(blockNum), raw) + }) +} + +// getLastSentCertificateBlock returns the last sent certificate block +func (a *AggSender) getLastSentCertificateBlock(ctx context.Context) (uint64, error) { + if a.lastSentCertificateBlock != nil { + // if we have it in memory, return it + return *a.lastSentCertificateBlock, nil + } + + // if its zero, maybe we have it in the db, if this is a new start, and agg sender run before + err := a.db.View(ctx, func(tx kv.Tx) error { + cursor, err := tx.Cursor(sentCertificatesTable) + if err != nil { + return err + } + + k, _, err := cursor.Last() + if err != nil { + return err + } + + if k != nil { + lastSentCertificateBlock := cdkCommon.Bytes2BlockNum(k) + a.lastSentCertificateBlock = &lastSentCertificateBlock + } + + return nil + }) + + return *a.lastSentCertificateBlock, err +} diff --git a/config/config.go b/config/config.go index 02ede0fb..1e09e218 100644 --- a/config/config.go +++ b/config/config.go @@ -67,7 +67,6 @@ type Config struct { NetworkConfig NetworkConfig // Configuration of the sequence sender service SequenceSender sequencesender.Config - // Common Config that affects all the services Common common.Config } diff --git a/localbridgesync/localbridgesync.go b/localbridgesync/localbridgesync.go index 42ab67d1..48554a15 100644 --- a/localbridgesync/localbridgesync.go +++ b/localbridgesync/localbridgesync.go @@ -13,24 +13,28 @@ var ( maxRetryAttemptsAfterError = 5 ) +type LocalBridgeSyncerConfig struct { + DBPath string + Bridge common.Address + SyncBlockChunkSize uint64 + BlockFinalityType etherman.BlockNumberFinality +} + type LocalBridgeSync struct { *processor *driver } func New( - dbPath string, - bridge common.Address, - syncBlockChunkSize uint64, - blockFinalityType etherman.BlockNumberFinality, + cfg LocalBridgeSyncerConfig, rd ReorgDetector, l2Client EthClienter, ) (*LocalBridgeSync, error) { - p, err := newProcessor(dbPath) + p, err := newProcessor(cfg.DBPath) if err != nil { return nil, err } - dwn, err := newDownloader(bridge, l2Client, syncBlockChunkSize, blockFinalityType) + dwn, err := newDownloader(cfg.Bridge, l2Client, cfg.SyncBlockChunkSize, cfg.BlockFinalityType) if err != nil { return nil, err }