diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 533d5371a05..9bf7213c82f 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -72,6 +72,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (5)`, } var ( @@ -79,7 +80,7 @@ var ( ) const ( - schemaVersion = 4 + schemaVersion = 5 eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` @@ -356,6 +357,28 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { } } + if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) + return nil +} + +func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { + now := time.Now() + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1") if err != nil { return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err) @@ -366,7 +389,8 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { return xerrors.Errorf("update event index: %w", err) } - if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil { + _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)") + if err != nil { return xerrors.Errorf("increment _meta version: %w", err) } @@ -377,7 +401,7 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) return nil } @@ -462,6 +486,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor version = 4 } + if version == 4 { + log.Infof("Upgrading event index from version 4 to version 5") + err = eventIndex.migrateToVersion5(ctx) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate event index schema from version 4 to version 5: %w", err) + } + version = 5 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)