diff --git a/cmd/main.go b/cmd/main.go index bfc6032b..0923e214 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -151,7 +151,7 @@ func start(cliCtx *cli.Context) error { []rpc.Service{ { Name: status.APISTATUS, - Service: status.NewEndpoints(storage), + Service: status.NewEndpoints(storage, batchSynchronizer), }, { Name: sync.APISYNC, diff --git a/db/db.go b/db/db.go index d1353691..09a42a86 100644 --- a/db/db.go +++ b/db/db.go @@ -28,8 +28,8 @@ type DB interface { GetOffChainData(ctx context.Context, key common.Hash) (*types.OffChainData, error) ListOffChainData(ctx context.Context, keys []common.Hash) ([]types.OffChainData, error) StoreOffChainData(ctx context.Context, od []types.OffChainData) error - CountOffchainData(ctx context.Context) (uint64, error) + DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) } // DB is the database layer of the data node @@ -289,3 +289,30 @@ func (db *pgDB) CountOffchainData(ctx context.Context) (uint64, error) { return count, nil } + +// DetectOffchainDataGaps returns the number of gaps in the offchain_data table +func (db *pgDB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { + const detectBatchNumGapQuery = "SELECT * FROM vw_batch_num_gaps;" + + rows, err := db.pg.QueryxContext(ctx, detectBatchNumGapQuery) + if err != nil { + return nil, err + } + + defer rows.Close() + + gaps := make(map[uint64]uint64) + for rows.Next() { + data := struct { + CurrentBatchNum uint64 `db:"current_batch_num"` + NextBatchNum uint64 `db:"next_batch_num"` + }{} + if err = rows.StructScan(&data); err != nil { + return nil, err + } + + gaps[data.CurrentBatchNum] = data.NextBatchNum + } + + return gaps, nil +} diff --git a/db/migrations/0005.sql b/db/migrations/0005.sql index 6436ee02..ffd6e886 100644 --- a/db/migrations/0005.sql +++ b/db/migrations/0005.sql @@ -1,9 +1,32 @@ -- +migrate Down +DROP VIEW IF EXISTS vw_batch_num_gaps; ALTER TABLE data_node.offchain_data DROP COLUMN IF EXISTS batch_num; -- +migrate Up ALTER TABLE data_node.offchain_data ADD COLUMN IF NOT EXISTS batch_num BIGINT NOT NULL DEFAULT 0; +-- Ensure batch_num is indexed for optimal performance +CREATE INDEX idx_batch_num ON data_node.offchain_data(batch_num); + +-- Create a view to detect gaps in batch_num +CREATE VIEW vw_batch_num_gaps AS +WITH numbered_batches AS ( + SELECT + batch_num, + ROW_NUMBER() OVER (ORDER BY batch_num) AS row_number + FROM data_node.offchain_data +) +SELECT + nb1.batch_num AS current_batch_num, + nb2.batch_num AS next_batch_num +FROM + numbered_batches nb1 + LEFT JOIN numbered_batches nb2 ON nb1.row_number = nb2.row_number - 1 +WHERE + nb1.batch_num IS NOT NULL + AND nb2.batch_num IS NOT NULL + AND nb1.batch_num + 1 <> nb2.batch_num; + -- It triggers resync with an updated logic of all batches UPDATE data_node.sync_tasks SET block = 0 WHERE task = 'L1'; \ No newline at end of file diff --git a/mocks/db.generated.go b/mocks/db.generated.go index b01653c7..14751da0 100644 --- a/mocks/db.generated.go +++ b/mocks/db.generated.go @@ -128,6 +128,64 @@ func (_c *DB_DeleteUnresolvedBatchKeys_Call) RunAndReturn(run func(context.Conte return _c } +// DetectOffchainDataGaps provides a mock function with given fields: ctx +func (_m *DB) DetectOffchainDataGaps(ctx context.Context) (map[uint64]uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for DetectOffchainDataGaps") + } + + var r0 map[uint64]uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[uint64]uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[uint64]uint64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[uint64]uint64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DB_DetectOffchainDataGaps_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectOffchainDataGaps' +type DB_DetectOffchainDataGaps_Call struct { + *mock.Call +} + +// DetectOffchainDataGaps is a helper method to define mock.On call +// - ctx context.Context +func (_e *DB_Expecter) DetectOffchainDataGaps(ctx interface{}) *DB_DetectOffchainDataGaps_Call { + return &DB_DetectOffchainDataGaps_Call{Call: _e.mock.On("DetectOffchainDataGaps", ctx)} +} + +func (_c *DB_DetectOffchainDataGaps_Call) Run(run func(ctx context.Context)) *DB_DetectOffchainDataGaps_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *DB_DetectOffchainDataGaps_Call) Return(_a0 map[uint64]uint64, _a1 error) *DB_DetectOffchainDataGaps_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DB_DetectOffchainDataGaps_Call) RunAndReturn(run func(context.Context) (map[uint64]uint64, error)) *DB_DetectOffchainDataGaps_Call { + _c.Call.Return(run) + return _c +} + // GetLastProcessedBlock provides a mock function with given fields: ctx, task func (_m *DB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { ret := _m.Called(ctx, task) diff --git a/services/status/status.go b/services/status/status.go index 664e5c7c..6b3d2a2a 100644 --- a/services/status/status.go +++ b/services/status/status.go @@ -15,17 +15,25 @@ import ( // APISTATUS is the namespace of the status service const APISTATUS = "status" +// GapsDetector is an interface for detecting gaps in the offchain data +type GapsDetector interface { + // Gaps returns a map of gaps in the offchain data + Gaps() map[uint64]uint64 +} + // Endpoints contains implementations for the "status" RPC endpoints type Endpoints struct { - db db.DB - startTime time.Time + db db.DB + startTime time.Time + gapsDetector GapsDetector } // NewEndpoints returns Endpoints -func NewEndpoints(db db.DB) *Endpoints { +func NewEndpoints(db db.DB, gapsDetector GapsDetector) *Endpoints { return &Endpoints{ - db: db, - startTime: time.Now(), + db: db, + startTime: time.Now(), + gapsDetector: gapsDetector, } } @@ -45,9 +53,10 @@ func (s *Endpoints) GetStatus() (interface{}, rpc.Error) { } return types.DACStatus{ - Version: dataavailability.Version, - Uptime: uptime, - KeyCount: rowCount, - BackfillProgress: backfillProgress, + Version: dataavailability.Version, + Uptime: uptime, + KeyCount: rowCount, + BackfillProgress: backfillProgress, + OffchainDataGapsExist: len(s.gapsDetector.Gaps()) > 0, }, nil } diff --git a/synchronizer/batches.go b/synchronizer/batches.go index c461c373..39227f9a 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -1,6 +1,7 @@ package synchronizer import ( + "bytes" "context" "fmt" "math/rand" @@ -31,18 +32,20 @@ type SequencerTracker interface { // BatchSynchronizer watches for number events, checks if they are "locally" stored, then retrieves and stores missing data type BatchSynchronizer struct { - client etherman.Etherman - stop chan struct{} - retry time.Duration - rpcTimeout time.Duration - blockBatchSize uint - self common.Address - db db.DB - committee *CommitteeMapSafe - syncLock sync.Mutex - reorgs <-chan BlockReorg - sequencer SequencerTracker - rpcClientFactory client.Factory + client etherman.Etherman + stop chan struct{} + retry time.Duration + rpcTimeout time.Duration + blockBatchSize uint + self common.Address + db db.DB + committee *CommitteeMapSafe + syncLock sync.Mutex + reorgs <-chan BlockReorg + sequencer SequencerTracker + rpcClientFactory client.Factory + offchainDataGaps map[uint64]uint64 + offchainDataGapsLock sync.Mutex } // NewBatchSynchronizer creates the BatchSynchronizer @@ -70,6 +73,7 @@ func NewBatchSynchronizer( reorgs: reorgs, sequencer: sequencer, rpcClientFactory: rpcClientFactory, + offchainDataGaps: make(map[uint64]uint64), } return synchronizer, synchronizer.resolveCommittee() } @@ -98,6 +102,7 @@ func (bs *BatchSynchronizer) Start(ctx context.Context) { go bs.processUnresolvedBatches(ctx) go bs.produceEvents(ctx) go bs.handleReorgs(ctx) + go bs.startOffchainDataGapsDetection(ctx) } // Stop stops the synchronizer @@ -105,6 +110,17 @@ func (bs *BatchSynchronizer) Stop() { close(bs.stop) } +// Gaps returns the offchain data gaps +func (bs *BatchSynchronizer) Gaps() map[uint64]uint64 { + bs.offchainDataGapsLock.Lock() + gaps := make(map[uint64]uint64, len(bs.offchainDataGaps)) + for key, value := range bs.offchainDataGaps { + gaps[key] = value + } + bs.offchainDataGapsLock.Unlock() + return gaps +} + func (bs *BatchSynchronizer) handleReorgs(ctx context.Context) { log.Info("starting reorgs handler") for { @@ -433,3 +449,52 @@ func (bs *BatchSynchronizer) resolveWithMember( BatchNum: batch.Number, }, nil } + +func (bs *BatchSynchronizer) startOffchainDataGapsDetection(ctx context.Context) { + log.Info("starting handling unresolved batches") + for { + delay := time.NewTimer(time.Minute) + select { + case <-delay.C: + if err := bs.detectOffchainDataGaps(ctx); err != nil { + log.Error(err) + } + case <-bs.stop: + return + } + } +} + +// detectOffchainDataGaps detects offchain data gaps and reports them in logs and the service state. +func (bs *BatchSynchronizer) detectOffchainDataGaps(ctx context.Context) error { + // Detect offchain data gaps + gaps, err := detectOffchainDataGaps(ctx, bs.db) + if err != nil { + return fmt.Errorf("failed to detect offchain data gaps: %v", err) + } + + // No gaps found, all good + if len(gaps) == 0 { + return nil + } + + // Log the detected gaps + gapsRaw := new(bytes.Buffer) + for key, value := range gaps { + if _, err = fmt.Fprintf(gapsRaw, "%d=>%d\n", key, value); err != nil { + log.Errorf("failed to write offchain data gaps: %v", err) + } + } + + log.Warnf("detected offchain data gaps (current batch number => expected batch number): %s", gapsRaw.String()) + + // Store the detected gaps in the service state + bs.offchainDataGapsLock.Lock() + bs.offchainDataGaps = make(map[uint64]uint64, len(gaps)) + for key, value := range gaps { + bs.offchainDataGaps[key] = value + } + bs.offchainDataGapsLock.Unlock() + + return nil +} diff --git a/synchronizer/store.go b/synchronizer/store.go index b0be7cfa..3ba53734 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -43,13 +43,6 @@ func setStartBlock(parentCtx context.Context, db dbTypes.DB, block uint64, syncT return db.StoreLastProcessedBlock(ctx, block, string(syncTask)) } -func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { - ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) - defer cancel() - - return db.ListOffChainData(ctx, keys) -} - func storeUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys []types.BatchKey) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() @@ -71,9 +64,23 @@ func deleteUnresolvedBatchKeys(parentCtx context.Context, db dbTypes.DB, keys [] return db.DeleteUnresolvedBatchKeys(ctx, keys) } +func listOffchainData(parentCtx context.Context, db dbTypes.DB, keys []common.Hash) ([]types.OffChainData, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) + defer cancel() + + return db.ListOffChainData(ctx, keys) +} + func storeOffchainData(parentCtx context.Context, db dbTypes.DB, data []types.OffChainData) error { ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) defer cancel() return db.StoreOffChainData(ctx, data) } + +func detectOffchainDataGaps(parentCtx context.Context, db dbTypes.DB) (map[uint64]uint64, error) { + ctx, cancel := context.WithTimeout(parentCtx, dbTimeout) + defer cancel() + + return db.DetectOffchainDataGaps(ctx) +} diff --git a/types/types.go b/types/types.go index 0aa5df98..40829a5a 100644 --- a/types/types.go +++ b/types/types.go @@ -17,10 +17,11 @@ const ( // DACStatus contains DAC status info type DACStatus struct { - Uptime string `json:"uptime"` - Version string `json:"version"` - KeyCount uint64 `json:"key_count"` - BackfillProgress uint64 `json:"backfill_progress"` + Uptime string `json:"uptime"` + Version string `json:"version"` + KeyCount uint64 `json:"key_count"` + BackfillProgress uint64 `json:"backfill_progress"` + OffchainDataGapsExist bool `json:"offchain_data_gaps_exist"` } // BatchKey is the pairing of batch number and data hash of a batch