Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: events index: record processed epochs and tipsets for events and eth_get_log blocks till requested tipset has been indexed #12080

Merged
merged 20 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 111 additions & 6 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ var ddls = []string{
value BLOB NOT NULL
)`,

createTableEventsSeen,

createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKey,

// metadata containing version of schema
`CREATE TABLE IF NOT EXISTS _meta (
Expand All @@ -75,20 +79,22 @@ var ddls = []string{
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (6)`,
}

var (
log = logging.Logger("filter")
)

const (
schemaVersion = 5
schemaVersion = 6

eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEventsSeen = `INSERT OR IGNORE INTO events_seen(height, tipset_key, tipset_key_cid) VALUES(?, ?, ?)`
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
Expand All @@ -98,6 +104,16 @@ const (
createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key BLOB NOT NULL,
tipset_key_cid BLOB NOT NULL
)`

createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKey = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key ON events_seen (tipset_key);`
)

type EventIndex struct {
Expand All @@ -108,6 +124,7 @@ type EventIndex struct {
stmtInsertEntry *sql.Stmt
stmtRevertEventsInTipset *sql.Stmt
stmtRestoreEvent *sql.Stmt
stmtInsertEventsSeen *sql.Stmt
}

func (ei *EventIndex) initStatements() (err error) {
Expand Down Expand Up @@ -136,6 +153,11 @@ func (ei *EventIndex) initStatements() (err error) {
return xerrors.Errorf("prepare stmtRestoreEvent: %w", err)
}

ei.stmtInsertEventsSeen, err = ei.db.Prepare(insertEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtInsertEventsSeen: %w", err)
}

return nil
}

Expand Down Expand Up @@ -407,6 +429,63 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
return nil
}

func (ei *EventIndex) migrateToVersion6(ctx context.Context) error {
now := time.Now()

tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()

stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen)
if err != nil {
return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err)
}
_, err = stmtCreateTableEventsSeen.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("create table events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, createIndexEventsSeenHeight)
if err != nil {
return xerrors.Errorf("create index events_seen_height: %w", err)
}
_, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKey)
if err != nil {
return xerrors.Errorf("create index events_seen_tipset_key: %w", err)
}

// INSERT an entry in the events_seen table for all epochs we do have events for in our DB
_, err = tx.ExecContext(ctx, `
INSERT INTO events_seen (height, tipset_key, tipset_key_cid)
SELECT e.height, e.tipset_key, e.tipset_key_cid
FROM event e
WHERE NOT EXISTS (
SELECT 1 FROM events_seen es
WHERE es.height = e.height AND es.tipset_key = e.tipset_key AND es.tipset_key_cid = e.tipset_key_cid
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
)
`)
if err != nil {
return xerrors.Errorf("insert events into events_seen: %w", err)
}

_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)")
if err != nil {
return xerrors.Errorf("increment _meta version: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}

ei.vacuumDBAndCheckpointWAL(ctx)
rvagg marked this conversation as resolved.
Show resolved Hide resolved

log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now))
return nil
}

func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) {
// During the large migrations, we have likely increased the WAL size a lot, so lets do some
// simple DB administration to free up space (VACUUM followed by truncating the WAL file)
Expand Down Expand Up @@ -498,6 +577,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
version = 5
}

if version == 5 {
log.Infof("Upgrading event index from version 5 to version 6")
err = eventIndex.migrateToVersion6(ctx)
if err != nil {
_ = db.Close()
return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err)
}
version = 6
}

if version != schemaVersion {
_ = db.Close()
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
Expand All @@ -520,6 +609,13 @@ func (ei *EventIndex) Close() error {
return ei.db.Close()
}

func (ei *EventIndex) isTipsetProcessed(ctx context.Context, tipsetKey []byte) (bool, error) {
row := ei.db.QueryRowContext(ctx, "SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key = ?", tipsetKey)
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
var exists bool
err := row.Scan(&exists)
return exists, err
}

func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.Begin()
if err != nil {
Expand Down Expand Up @@ -550,6 +646,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved

eventCount := 0
// iterate over all executed messages in this tipset and insert them into the database if they
Expand All @@ -567,11 +667,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
addressLookups[ev.Emitter] = addr
}

tsKeyCid, err := te.msgTs.Key().Cid()
rvagg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}

// check if this event already exists in the database
var entryID sql.NullInt64
err = tx.Stmt(ei.stmtEventExists).QueryRow(
Expand Down Expand Up @@ -651,6 +746,16 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
}

// add an entry to the event_seen table for this tipset
_, err = tx.Stmt(ei.stmtInsertEventsSeen).Exec(
te.msgTs.Height(),
te.msgTs.Key().Bytes(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec insert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
Expand Down
9 changes: 9 additions & 0 deletions chain/events/filter/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ func TestEventIndexPrefillFilter(t *testing.T) {
require.NoError(t, err, "collect events")
}

tsKey := events14000.msgTs.Key()
seen, err := ei.isTipsetProcessed(context.Background(), tsKey.Bytes())
require.NoError(t, err)
require.True(t, seen, "tipset key should be seen")

seen, err = ei.isTipsetProcessed(context.Background(), []byte{1})
require.NoError(t, err)
require.False(t, seen, "tipset key should not be seen")

testCases := []struct {
name string
filter *eventFilter
Expand Down