Skip to content

Commit

Permalink
Reporting offchain data gap detection
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman committed Jul 4, 2024
1 parent 48558e6 commit fb5889b
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func start(cliCtx *cli.Context) error {
[]rpc.Service{
{
Name: status.APISTATUS,
Service: status.NewEndpoints(storage),
Service: status.NewEndpoints(storage, batchSynchronizer),
},
{
Name: sync.APISYNC,
Expand Down
29 changes: 28 additions & 1 deletion db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type DB interface {
GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error)
ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error)
StoreOffChainData(ctx context.Context, od []types.OffChainData) error

CountOffchainData(ctx context.Context) (uint64, error)
DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error)
}

// DB is the database layer of the data node
Expand Down Expand Up @@ -289,3 +289,30 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) {

return count, nil
}

// DetectOffchainDataGaps returns the number of gaps in the offchain_data table
func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) {
const detectBatchNumGapQuery = "SELECT * FROM vw_batch_num_gaps;"

rows, err := db.pg.QueryxContext(ctx, detectBatchNumGapQuery)
if err != nil {
return nil, err
}

defer rows.Close()

gaps := make(map[uint64]uint64)
for rows.Next() {
data := struct {
CurrentBatchNum uint64 `db:"current_batch_num"`
NextBatchNum uint64 `db:"next_batch_num"`
}{}
if err = rows.StructScan(&data); err != nil {
return nil, err
}

gaps[data.CurrentBatchNum] = data.NextBatchNum
}

return gaps, nil
}
23 changes: 23 additions & 0 deletions db/migrations/0005.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
-- +migrate Down
DROP VIEW IF EXISTS vw_batch_num_gaps;
ALTER TABLE data_node.offchain_data DROP COLUMN IF EXISTS batch_num;

-- +migrate Up
ALTER TABLE data_node.offchain_data
ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0;

-- Ensure batch_num is indexed for optimal performance
CREATE INDEX idx_batch_num ON data_node.offchain_data(batch_num);

-- Create a view to detect gaps in batch_num
CREATE VIEW vw_batch_num_gaps AS
WITH numbered_batches AS (
SELECT
batch_num,
ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number
FROM data_node.offchain_data
)
SELECT
nb1.batch_num AS current_batch_num,
nb2.batch_num AS next_batch_num
FROM
numbered_batches nb1
LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1
WHERE
nb1.batch_num IS NOT NULL
AND nb2.batch_num IS NOT NULL
AND nb1.batch_num + 1 <> nb2.batch_num;

-- It triggers resync with an updated logic of all batches
UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1';
58 changes: 58 additions & 0 deletions mocks/db.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions services/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,25 @@ import (
// APISTATUS is the namespace of the status service
const APISTATUS = "status"

// GapsDetector is an interface for detecting gaps in the offchain data
type GapsDetector interface {
// Gaps returns a map of gaps in the offchain data
Gaps() map[uint64]uint64
}

// Endpoints contains implementations for the "status" RPC endpoints
type Endpoints struct {
db db.DB
startTime time.Time
db db.DB
startTime time.Time
gapsDetector GapsDetector
}

// NewEndpoints returns Endpoints
func NewEndpoints(db db.DB) *Endpoints {
func NewEndpoints(db db.DB, gapsDetector GapsDetector) *Endpoints {
return &Endpoints{
db: db,
startTime: time.Now(),
db: db,
startTime: time.Now(),
gapsDetector: gapsDetector,
}
}

Expand All @@ -45,9 +53,10 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) {
}

return types.DACStatus{
Version: dataavailability.Version,
Uptime: uptime,
KeyCount: rowCount,
BackfillProgress: backfillProgress,
Version: dataavailability.Version,
Uptime: uptime,
KeyCount: rowCount,
BackfillProgress: backfillProgress,
OffchainDataGapsExist: len(s.gapsDetector.Gaps()) > 0,
}, nil
}
89 changes: 77 additions & 12 deletions synchronizer/batches.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package synchronizer

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand Down Expand Up @@ -31,18 +32,20 @@ type SequencerTracker interface {

// BatchSynchronizer watches for number events, checks if they are "locally" stored, then retrieves and stores missing data
type BatchSynchronizer struct {
client etherman.Etherman
stop chan struct{}
retry time.Duration
rpcTimeout time.Duration
blockBatchSize uint
self common.Address
db db.DB
committee *CommitteeMapSafe
syncLock sync.Mutex
reorgs <-chan BlockReorg
sequencer SequencerTracker
rpcClientFactory client.Factory
client etherman.Etherman
stop chan struct{}
retry time.Duration
rpcTimeout time.Duration
blockBatchSize uint
self common.Address
db db.DB
committee *CommitteeMapSafe
syncLock sync.Mutex
reorgs <-chan BlockReorg
sequencer SequencerTracker
rpcClientFactory client.Factory
offchainDataGaps map[uint64]uint64
offchainDataGapsLock sync.Mutex
}

// NewBatchSynchronizer creates the BatchSynchronizer
Expand Down Expand Up @@ -70,6 +73,7 @@ func NewBatchSynchronizer(
reorgs: reorgs,
sequencer: sequencer,
rpcClientFactory: rpcClientFactory,
offchainDataGaps: make(map[uint64]uint64),
}
return synchronizer, synchronizer.resolveCommittee()
}
Expand Down Expand Up @@ -98,13 +102,25 @@ func (bs *BatchSynchronizer) Start(ctx context.Context) {
go bs.processUnresolvedBatches(ctx)
go bs.produceEvents(ctx)
go bs.handleReorgs(ctx)
go bs.startOffchainDataGapsDetection(ctx)
}

// Stop stops the synchronizer
func (bs *BatchSynchronizer) Stop() {
close(bs.stop)
}

// Gaps returns the offchain data gaps
func (bs *BatchSynchronizer) Gaps() map[uint64]uint64 {
bs.offchainDataGapsLock.Lock()
gaps := make(map[uint64]uint64, len(bs.offchainDataGaps))
for key, value := range bs.offchainDataGaps {
gaps[key] = value
}
bs.offchainDataGapsLock.Unlock()
return gaps
}

func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) {
log.Info("starting reorgs handler")
for {
Expand Down Expand Up @@ -433,3 +449,52 @@ func (bs *BatchSynchronizer) resolveWithMember(
BatchNum: batch.Number,
}, nil
}

func (bs *BatchSynchronizer) startOffchainDataGapsDetection(ctx context.Context) {
log.Info("starting handling unresolved batches")
for {
delay := time.NewTimer(time.Minute)
select {
case <-delay.C:
if err := bs.detectOffchainDataGaps(ctx); err != nil {
log.Error(err)
}
case <-bs.stop:
return
}
}
}

// detectOffchainDataGaps detects offchain data gaps and reports them in logs and the service state.
func (bs *BatchSynchronizer) detectOffchainDataGaps(ctx context.Context) error {
// Detect offchain data gaps
gaps, err := detectOffchainDataGaps(ctx, bs.db)
if err != nil {
return fmt.Errorf("failed to detect offchain data gaps: %v", err)
}

// No gaps found, all good
if len(gaps) == 0 {
return nil
}

// Log the detected gaps
gapsRaw := new(bytes.Buffer)
for key, value := range gaps {
if _, err = fmt.Fprintf(gapsRaw, "%d=>%d\n", key, value); err != nil {
log.Errorf("failed to write offchain data gaps: %v", err)
}
}

log.Warnf("detected offchain data gaps (current batch number => expected batch number): %s", gapsRaw.String())

// Store the detected gaps in the service state
bs.offchainDataGapsLock.Lock()
bs.offchainDataGaps = make(map[uint64]uint64, len(gaps))
for key, value := range gaps {
bs.offchainDataGaps[key] = value
}
bs.offchainDataGapsLock.Unlock()

return nil
}
21 changes: 14 additions & 7 deletions synchronizer/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64, syncT
return db.StoreLastProcessedBlock(ctx, block, string(syncTask))
}

func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) {
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()

return db.ListOffChainData(ctx, keys)
}

func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error {
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()
Expand All @@ -71,9 +64,23 @@ func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []
return db.DeleteUnresolvedBatchKeys(ctx, keys)
}

func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) {
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()

return db.ListOffChainData(ctx, keys)
}

func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.OffChainData) error {
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()

return db.StoreOffChainData(ctx, data)
}

func detectOffchainDataGaps(parentCtx context.Context, db dbTypes.DB) (map[uint64]uint64, error) {
ctx, cancel := context.WithTimeout(parentCtx, dbTimeout)
defer cancel()

return db.DetectOffchainDataGaps(ctx)
}
9 changes: 5 additions & 4 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ const (

// DACStatus contains DAC status info
type DACStatus struct {
Uptime string `json:"uptime"`
Version string `json:"version"`
KeyCount uint64 `json:"key_count"`
BackfillProgress uint64 `json:"backfill_progress"`
Uptime string `json:"uptime"`
Version string `json:"version"`
KeyCount uint64 `json:"key_count"`
BackfillProgress uint64 `json:"backfill_progress"`
OffchainDataGapsExist bool `json:"offchain_data_gaps_exist"`
}

// BatchKey is the pairing of batch number and data hash of a batch
Expand Down

0 comments on commit fb5889b

Please sign in to comment.