Skip to content

Commit

Permalink
check for events at min_height as well
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jun 20, 2024
1 parent f7b8d39 commit 306aded
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
16 changes: 11 additions & 5 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ const (
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, ?) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`
isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
Expand Down Expand Up @@ -136,6 +137,7 @@ type EventIndex struct {

stmtIsTipsetProcessed *sql.Stmt
stmtGetMaxHeightInIndex *sql.Stmt
stmtIsHeightProcessed *sql.Stmt

mu sync.Mutex
subIdCounter uint64
Expand Down Expand Up @@ -201,6 +203,11 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err)
}

ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed)
if err != nil {
return xerrors.Errorf("prepare isHeightProcessed: %w", err)
}

return nil
}

Expand Down Expand Up @@ -688,11 +695,10 @@ func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
}

func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) {
mh, err := ei.GetMaxHeightInIndex(ctx)
if err != nil {
return false, err
}
return height <= mh, nil
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height)
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {
Expand Down
4 changes: 4 additions & 0 deletions chain/events/filter/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func TestEventIndexPrefillFilter(t *testing.T) {
require.NoError(t, err)
require.False(t, b)

b, err = ei.IsHeightProcessed(context.Background(), 13000)
require.NoError(t, err)
require.False(t, b)

tsKey := events14000.msgTs.Key()
tsKeyCid, err := tsKey.Cid()
require.NoError(t, err, "tipset key cid")
Expand Down
8 changes: 8 additions & 0 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,14 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return nil, err
}

// should also have the minHeight in the filter indexed
b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight))
if err != nil {
return nil, xerrors.Errorf("failed to check if event index has events for the minheight: %w", err)
}
if !b {
return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight)
}
} else {
ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid)
if err != nil {
Expand Down

0 comments on commit 306aded

Please sign in to comment.