Skip to content

Commit

Permalink
faet: aggsender
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Jul 26, 2024
1 parent 651f461 commit 28037c8
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 14 deletions.
16 changes: 15 additions & 1 deletion aggregator/agglayer_client.go → agglayer/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package aggregator
package agglayer

import (
"context"
Expand Down Expand Up @@ -79,3 +79,17 @@ func (c *AggLayerClient) WaitTxToBeMined(hash common.Hash, ctx context.Context)
}
}
}

// SendCertificate sends a certificate to the AggLayer
func (c *AggLayerClient) SendCertificate(certificate *Certificate) error {
response, err := rpc.JSONRPCCall(c.url, "interop_sendCertificate", certificate)
if err != nil {
return err
}

if response.Error != nil {
return fmt.Errorf("%v %v", response.Error.Code, response.Error.Message)
}

return nil
}
2 changes: 1 addition & 1 deletion aggregator/agglayer_tx.go → agglayer/tx.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package aggregator
package agglayer

import (
"crypto/ecdsa"
Expand Down
13 changes: 13 additions & 0 deletions agglayer/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package agglayer

import (
"github.com/0xPolygon/cdk/localbridgesync"
"github.com/ethereum/go-ethereum/common"
)

type Certificate struct {
OriginNetwork uint32
PrevLocalExitRoot common.Hash
BridgeExits []*localbridgesync.Bridge
ImportedBridgeExits []*localbridgesync.Claim
}
11 changes: 6 additions & 5 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/0xPolygon/cdk-rpc/rpc"
cdkTypes "github.com/0xPolygon/cdk-rpc/types"
"github.com/0xPolygon/cdk/agglayer"
ethmanTypes "github.com/0xPolygon/cdk/aggregator/ethmantypes"
"github.com/0xPolygon/cdk/aggregator/prover"
cdkcommon "github.com/0xPolygon/cdk/common"
Expand Down Expand Up @@ -85,7 +86,7 @@ type Aggregator struct {
exit context.CancelFunc

sequencerPrivateKey *ecdsa.PrivateKey
aggLayerClient AgglayerClientInterface
aggLayerClient agglayer.AgglayerClientInterface
}

// New creates a new aggregator.
Expand Down Expand Up @@ -148,12 +149,12 @@ func New(
}

var (
aggLayerClient AgglayerClientInterface
aggLayerClient agglayer.AgglayerClientInterface
sequencerPrivateKey *ecdsa.PrivateKey
)

if cfg.SettlementBackend == AggLayer {
aggLayerClient = NewAggLayerClient(cfg.AggLayerURL)
aggLayerClient = agglayer.NewAggLayerClient(cfg.AggLayerURL)

sequencerPrivateKey, err = newKeyFromKeystore(cfg.SequencerPrivateKey)
if err != nil {
Expand Down Expand Up @@ -655,10 +656,10 @@ func (a *Aggregator) settleWithAggLayer(
inputs ethmanTypes.FinalProofInputs) bool {
proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x")
proofBytes := common.Hex2Bytes(proofStrNo0x)
tx := Tx{
tx := agglayer.Tx{
LastVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumber - 1),
NewVerifiedBatch: cdkTypes.ArgUint64(proof.BatchNumberFinal),
ZKP: ZKP{
ZKP: agglayer.ZKP{
NewStateRoot: common.BytesToHash(inputs.NewStateRoot),
NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot),
Proof: cdkTypes.ArgBytes(proofBytes),
Expand Down
183 changes: 183 additions & 0 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package aggsender

import (
"context"
"encoding/json"
"math/big"
"path/filepath"
"time"

"github.com/0xPolygon/cdk/agglayer"
cdkCommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/localbridgesync"
"github.com/0xPolygon/cdk/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
)

const (
aggSenderDBFolder = "aggsender"
sentCertificatesTable = "sent_certificates"
)

var claimsTimeout = time.Second * 60

func tableCfgFunc(defaultBuckets kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
sentCertificatesTable: {},
}
}

// AggSender is a component that will send certificates to the aggLayer
type AggSender struct {
syncer *localbridgesync.LocalBridgeSync
db kv.RwDB

client localbridgesync.EthClienter
aggLayerClient agglayer.AggLayerClient

originNetwork uint32
lastSentCertificateBlock *uint64
}

// New returns a new AggSender
func New(
originNetwork uint32,
reorgDetector localbridgesync.ReorgDetector,
aggLayerClient agglayer.AggLayerClient,
l2Client localbridgesync.EthClienter,
syncerCfg localbridgesync.LocalBridgeSyncerConfig) (*AggSender, error) {
syncer, err := localbridgesync.New(syncerCfg, reorgDetector, l2Client)
if err != nil {
return nil, err
}

db, err := mdbx.NewMDBX(nil).
Path(filepath.Join(syncerCfg.DBPath, aggSenderDBFolder)).
WithTableCfg(tableCfgFunc).
Open()
if err != nil {
return nil, err
}

return &AggSender{
db: db,
syncer: syncer,
client: l2Client,
originNetwork: originNetwork,
aggLayerClient: aggLayerClient,
}, nil
}

// Start starts the AggSender
func (a *AggSender) Start(ctx context.Context) {
go a.syncer.Sync(ctx)
go a.sendCertificates(ctx)
}

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
ticker := time.NewTicker(claimsTimeout)

for {
select {
case <-ticker.C:
lastSentCertificate, err := a.getLastSentCertificateBlock(ctx)
if err != nil {
log.Error("Error getting last sent certificate", "err", err)
continue
}

lastFinalizedBlock, err := a.client.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber)))
if err != nil {
log.Error("Error getting block number", "err", err)
continue
}

bridgeEvents, err := a.syncer.GetClaimsAndBridges(ctx, lastSentCertificate+1, lastFinalizedBlock.Nonce.Uint64())
if err != nil {
log.Error("Error getting claims and bridges", "err", err)
continue
}

certificate := a.buildCertificate(bridgeEvents)

if err := a.aggLayerClient.SendCertificate(certificate); err != nil {
log.Error("Error sending certificate", "err", err)
continue
}

if err := a.saveLastSentCertificate(ctx, lastFinalizedBlock.Nonce.Uint64(), certificate); err != nil {
log.Error("Error saving last sent certificate in db", "err", err)
continue
}
case <-ctx.Done():
log.Info("AggSender stopped")
return
}
}
}

// buildCertificate builds a certificate from the bridge events
func (a *AggSender) buildCertificate(bridgeEvents []localbridgesync.BridgeEvent) *agglayer.Certificate {
bridgeExits := make([]*localbridgesync.Bridge, 0, len(bridgeEvents))
importedBridgeExits := make([]*localbridgesync.Claim, 0, len(bridgeEvents))

for _, bridgeEvent := range bridgeEvents {
bridgeExits = append(bridgeExits, bridgeEvent.Bridge)
importedBridgeExits = append(importedBridgeExits, bridgeEvent.Claim)
}

return &agglayer.Certificate{
OriginNetwork: a.originNetwork,
PrevLocalExitRoot: common.Hash{},
BridgeExits: bridgeExits,
ImportedBridgeExits: importedBridgeExits,
}
}

// saveLastSentCertificate saves the last sent certificate
func (a *AggSender) saveLastSentCertificate(ctx context.Context, blockNum uint64, certificate *agglayer.Certificate) error {
a.lastSentCertificateBlock = &blockNum // save it in memory as well

return a.db.Update(ctx, func(tx kv.RwTx) error {
raw, err := json.Marshal(certificate)
if err != nil {
return err
}

return tx.Put(sentCertificatesTable, cdkCommon.BlockNum2Bytes(blockNum), raw)
})
}

// getLastSentCertificateBlock returns the last sent certificate block
func (a *AggSender) getLastSentCertificateBlock(ctx context.Context) (uint64, error) {
if a.lastSentCertificateBlock != nil {
// if we have it in memory, return it
return *a.lastSentCertificateBlock, nil
}

// if its zero, maybe we have it in the db, if this is a new start, and agg sender run before
err := a.db.View(ctx, func(tx kv.Tx) error {
cursor, err := tx.Cursor(sentCertificatesTable)
if err != nil {
return err
}

k, _, err := cursor.Last()
if err != nil {
return err
}

if k != nil {
lastSentCertificateBlock := cdkCommon.Bytes2BlockNum(k)
a.lastSentCertificateBlock = &lastSentCertificateBlock
}

return nil
})

return *a.lastSentCertificateBlock, err
}
1 change: 0 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type Config struct {
NetworkConfig NetworkConfig
// Configuration of the sequence sender service
SequenceSender sequencesender.Config

// Common Config that affects all the services
Common common.Config
}
Expand Down
16 changes: 10 additions & 6 deletions localbridgesync/localbridgesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,28 @@ var (
maxRetryAttemptsAfterError = 5
)

type LocalBridgeSyncerConfig struct {
DBPath string
Bridge common.Address
SyncBlockChunkSize uint64
BlockFinalityType etherman.BlockNumberFinality
}

type LocalBridgeSync struct {
*processor
*driver
}

func New(
dbPath string,
bridge common.Address,
syncBlockChunkSize uint64,
blockFinalityType etherman.BlockNumberFinality,
cfg LocalBridgeSyncerConfig,
rd ReorgDetector,
l2Client EthClienter,
) (*LocalBridgeSync, error) {
p, err := newProcessor(dbPath)
p, err := newProcessor(cfg.DBPath)
if err != nil {
return nil, err
}
dwn, err := newDownloader(bridge, l2Client, syncBlockChunkSize, blockFinalityType)
dwn, err := newDownloader(cfg.Bridge, l2Client, cfg.SyncBlockChunkSize, cfg.BlockFinalityType)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 28037c8

Please sign in to comment.