Skip to content

Commit

Permalink
Merge pull request ethereum-optimism#7649 from epociask/indexer.bridg…
Browse files Browse the repository at this point in the history
…e_offset_fix

Offset Bug Fix for OP Indexer
  • Loading branch information
hamdiallam authored Oct 13, 2023
2 parents a037ebf + b0b37b2 commit e758f0e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 19 deletions.
10 changes: 7 additions & 3 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,15 @@ The indexer service is responsible for polling and processing real-time batches
* Process and persist new bridge events
* Synchronize L1 proven/finalized withdrawals with their L2 initialization counterparts

#### API

### L1 Polling
L1 blocks are only indexed if they contain L1 system contract events. This is done to reduce the amount of unnecessary data that is indexed. Because of this, the `l1_block_headers` table will not contain every L1 block header.

#### API
The indexer service runs a lightweight health server adjacently to the main service. The health server exposes a single endpoint `/healthz` that can be used to check the health of the indexer service. The health assessment doesn't check dependency health (ie. database) but rather checks the health of the indexer service itself.

### Database
The indexer service currently supports a Postgres database for storing L1/L2 OP Stack chain data. The most up-to-date database schemas can be found in the `./migrations` directory.

## Metrics
The indexer services exposes a set of Prometheus metrics that can be used to monitor the health of the service. The metrics are exposed via the `/metrics` endpoint on the health server.
## Metrics
The indexer services exposes a set of Prometheus metrics that can be used to monitor the health of the service. The metrics are exposed via the `/metrics` endpoint on the health server.
2 changes: 1 addition & 1 deletion indexer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,6 @@ func LoadConfig(log log.Logger, path string) (Config, error) {
cfg.Chain.L2HeaderBufferSize = defaultHeaderBufferSize
}

log.Info("loaded config")
log.Info("loaded config", "config", cfg.Chain)
return cfg, nil
}
24 changes: 16 additions & 8 deletions indexer/database/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math/big"

"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"

Expand Down Expand Up @@ -170,14 +169,11 @@ func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64
// We use timestamps since that translates to both L1 & L2
var fromTimestamp, toTimestamp uint64

if fromL1Height == nil {
fromL1Height = bigint.Zero
}

// Lower Bound (the default `fromTimestamp = 0` suffices genesis representation)
if fromL1Height.BitLen() > 0 {
var header L1BlockHeader
// Lower Bound (the default `fromTimestamp = l1_starting_height` (default=0) suffices genesis representation)
var header L1BlockHeader
if fromL1Height != nil {
result := db.gorm.Where("number = ?", fromL1Height).Take(&header)
// TODO - Embed logging to db
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
Expand All @@ -186,6 +182,16 @@ func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64
}

fromTimestamp = header.Timestamp
} else {
result := db.gorm.Order("number desc").Take(&header)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}

fromL1Height = header.Number
}

// Upper Bound (lowest timestamp indexed between L1/L2 bounded by `maxL1Range`)
Expand All @@ -196,6 +202,7 @@ func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64
l1QueryFilter = fmt.Sprintf("%s AND number <= %d", l1QueryFilter, maxHeight)
}

// Fetch most recent header from l1_block_headers table
var l1Header L1BlockHeader
result := db.gorm.Where(l1QueryFilter).Order("timestamp DESC").Take(&l1Header)
if result.Error != nil {
Expand All @@ -207,6 +214,7 @@ func (db *blocksDB) LatestObservedEpoch(fromL1Height *big.Int, maxL1Range uint64

toTimestamp = l1Header.Timestamp

// Fetch most recent header from l2_block_headers table
var l2Header L2BlockHeader
result = db.gorm.Where("timestamp <= ?", toTimestamp).Order("timestamp DESC").Take(&l2Header)
if result.Error != nil {
Expand Down
5 changes: 3 additions & 2 deletions indexer/node/header_traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (f *HeaderTraversal) LastHeader() *types.Header {
return f.lastHeader
}

// NextFinalizedHeaders retrives the next set of headers that have been
// NextFinalizedHeaders retrieves the next set of headers that have been
// marked as finalized by the connected client, bounded by the supplied size
func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header, error) {
latestBlockHeader, err := f.ethClient.BlockHeaderByNumber(nil)
Expand All @@ -49,7 +49,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,

if f.lastHeader != nil {
cmp := f.lastHeader.Number.Cmp(endHeight)
if cmp == 0 {
if cmp == 0 { // We're synced to head and there are no new headers
return nil, nil
} else if cmp > 0 {
return nil, ErrHeaderTraversalAheadOfProvider
Expand All @@ -61,6 +61,7 @@ func (f *HeaderTraversal) NextFinalizedHeaders(maxSize uint64) ([]types.Header,
nextHeight = new(big.Int).Add(f.lastHeader.Number, bigint.One)
}

// endHeight = (nextHeight - endHeight) <= maxSize
endHeight = bigint.Clamp(nextHeight, endHeight, maxSize)
headers, err := f.ethClient.BlockHeadersByRange(nextHeight, endHeight)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions indexer/processors/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (b *BridgeProcessor) Start(ctx context.Context) error {

// Runs the processing loop. In order to ensure all seen bridge finalization events
// can be correlated with bridge initiated events, we establish a shared marker between
// L1 and L2 when processing events. The lastest shared indexed time (epochs) between
// L1 and L2 when processing events. The latest shared indexed time (epochs) between
// L1 and L2 serves as this shared marker.
func (b *BridgeProcessor) run() error {
// In the event where we have a large number of un-observed epochs, we cap the search
Expand Down Expand Up @@ -127,11 +127,11 @@ func (b *BridgeProcessor) run() error {
}
if b.LatestL1Header != nil && latestEpoch.L1BlockHeader.Number.Cmp(b.LatestL1Header.Number) <= 0 {
b.log.Error("non-increasing l1 block height observed", "latest_bridge_l1_block_number", b.LatestL1Header.Number, "latest_epoch_l1_block_number", latestEpoch.L1BlockHeader.Number)
return errors.New("non-increasing l1 block heght observed")
return errors.New("non-increasing l1 block height observed")
}
if b.LatestL2Header != nil && latestEpoch.L2BlockHeader.Number.Cmp(b.LatestL2Header.Number) <= 0 {
b.log.Error("non-increasing l2 block height observed", "latest_bridge_l2_block_number", b.LatestL2Header.Number, "latest_epoch_l2_block_number", latestEpoch.L2BlockHeader.Number)
return errors.New("non-increasing l2 block heght observed")
return errors.New("non-increasing l2 block height observed")
}

toL1Height, toL2Height := latestEpoch.L1BlockHeader.Number, latestEpoch.L2BlockHeader.Number
Expand Down Expand Up @@ -169,17 +169,21 @@ func (b *BridgeProcessor) run() error {

// First, find all possible initiated bridge events
if err := bridge.LegacyL1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.metrics, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
batchLog.Error("failed to index legacy l1 initiated bridge events", "err", err)
return err
}
if err := bridge.LegacyL2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.metrics, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
batchLog.Error("failed to index legacy l2 initiated bridge events", "err", err)
return err
}

// Now that all initiated events have been indexed, it is ensured that all finalization can find their counterpart.
if err := bridge.LegacyL1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.metrics, b.l1Etl.EthClient, b.chainConfig.L1Contracts, legacyFromL1Height, legacyToL1Height); err != nil {
batchLog.Error("failed to index legacy l1 finalized bridge events", "err", err)
return err
}
if err := bridge.LegacyL2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.metrics, b.chainConfig.L2Contracts, legacyFromL2Height, legacyToL2Height); err != nil {
batchLog.Error("failed to index legacy l2l finalized bridge events", "err", err)
return err
}

Expand All @@ -201,24 +205,27 @@ func (b *BridgeProcessor) run() error {

// First, find all possible initiated bridge events
if err := bridge.L1ProcessInitiatedBridgeEvents(l1BridgeLog, tx, b.metrics, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
batchLog.Error("failed to index l1 initiated bridge events", "err", err)
return err
}
if err := bridge.L2ProcessInitiatedBridgeEvents(l2BridgeLog, tx, b.metrics, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
batchLog.Error("failed to index l2 initiated bridge events", "err", err)
return err
}

// Now all finalization events can find their counterpart.
if err := bridge.L1ProcessFinalizedBridgeEvents(l1BridgeLog, tx, b.metrics, b.chainConfig.L1Contracts, fromL1Height, toL1Height); err != nil {
batchLog.Error("failed to index l1 finalized bridge events", "err", err)
return err
}
if err := bridge.L2ProcessFinalizedBridgeEvents(l2BridgeLog, tx, b.metrics, b.chainConfig.L2Contracts, fromL2Height, toL2Height); err != nil {
batchLog.Error("failed to index l2 finalized bridge events", "err", err)
return err
}

// a-ok
return nil
}); err != nil {
batchLog.Error("failed to index bridge events", "err", err)
return err
}

Expand Down
8 changes: 7 additions & 1 deletion indexer/processors/contracts/optimism_portal.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"math/big"

"github.com/ethereum-optimism/optimism/indexer/bigint"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-bindings/bindings"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -64,14 +65,19 @@ func OptimismPortalTransactionDepositEvents(contractAddress common.Address, db *
return nil, err
}

mint := depositTx.Mint
if mint == nil {
mint = bigint.Zero
}

optimismPortalTxDeposits[i] = OptimismPortalTransactionDepositEvent{
Event: &transactionDepositEvents[i].ContractEvent,
DepositTx: depositTx,
GasLimit: new(big.Int).SetUint64(depositTx.Gas),
Tx: database.Transaction{
FromAddress: txDeposit.From,
ToAddress: txDeposit.To,
Amount: depositTx.Mint,
Amount: mint,
Data: depositTx.Data,
Timestamp: transactionDepositEvents[i].Timestamp,
},
Expand Down

0 comments on commit e758f0e

Please sign in to comment.