diff --git a/chain/ethhashlookup/eth_transaction_hash_lookup.go b/chain/ethhashlookup/eth_transaction_hash_lookup.go index d936809128b..a60dc5519dd 100644 --- a/chain/ethhashlookup/eth_transaction_hash_lookup.go +++ b/chain/ethhashlookup/eth_transaction_hash_lookup.go @@ -1,6 +1,7 @@ package ethhashlookup import ( + "context" "database/sql" "errors" "strconv" @@ -10,20 +11,12 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/types/ethtypes" + "github.com/filecoin-project/lotus/lib/sqlite" ) -var ErrNotFound = errors.New("not found") +const DefaultDbFilename = "txhash.db" -var pragmas = []string{ - "PRAGMA synchronous = normal", - "PRAGMA temp_store = memory", - "PRAGMA mmap_size = 30000000000", - "PRAGMA page_size = 32768", - "PRAGMA auto_vacuum = NONE", - "PRAGMA automatic_index = OFF", - "PRAGMA journal_mode = WAL", - "PRAGMA read_uncommitted = ON", -} +var ErrNotFound = errors.New("not found") var ddls = []string{ `CREATE TABLE IF NOT EXISTS eth_tx_hashes ( @@ -33,18 +26,8 @@ var ddls = []string{ )`, `CREATE INDEX IF NOT EXISTS insertion_time_index ON eth_tx_hashes (insertion_time)`, - - // metadata containing version of schema - `CREATE TABLE IF NOT EXISTS _meta ( - version UINT64 NOT NULL UNIQUE - )`, - - // version 1. - `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, } -const schemaVersion = 1 - const ( insertTxHash = `INSERT INTO eth_tx_hashes (hash, cid) @@ -103,50 +86,17 @@ func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) { return res.RowsAffected() } -func NewTransactionHashLookup(path string) (*EthTxHashLookup, error) { - db, err := sql.Open("sqlite3", path+"?mode=rwc") +func NewTransactionHashLookup(ctx context.Context, path string) (*EthTxHashLookup, error) { + db, _, err := sqlite.Open(path) if err != nil { - return nil, xerrors.Errorf("open sqlite3 database: %w", err) + return nil, xerrors.Errorf("failed to setup eth transaction hash lookup db: %w", err) } - for _, pragma := range pragmas { - if _, err := db.Exec(pragma); err != nil { - _ = db.Close() - return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err) - } - } - - q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") - if err == sql.ErrNoRows || !q.Next() { - // empty database, create the schema - for _, ddl := range ddls { - if _, err := db.Exec(ddl); err != nil { - _ = db.Close() - return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err) - } - } - } else if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("looking for _meta table: %w", err) - } else { - // Ensure we don't open a database from a different schema version - - row := db.QueryRow("SELECT max(version) FROM _meta") - var version int - err := row.Scan(&version) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("invalid database version: no version found") - } - if version != schemaVersion { - _ = db.Close() - return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) - } + if err := sqlite.InitDb(ctx, "eth transaction hash lookup", db, ddls, []sqlite.MigrationFunc{}); err != nil { + return nil, xerrors.Errorf("failed to init eth transaction hash lookup db: %w", err) } - return &EthTxHashLookup{ - db: db, - }, nil + return &EthTxHashLookup{db: db}, nil } func (ei *EthTxHashLookup) Close() error { diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index fed6c42eb31..7783dc60e41 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -8,7 +8,6 @@ import ( "sort" "strings" "sync" - "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -20,19 +19,10 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sqlite" ) -var pragmas = []string{ - "PRAGMA synchronous = normal", - "PRAGMA temp_store = memory", - "PRAGMA mmap_size = 30000000000", - "PRAGMA page_size = 32768", - "PRAGMA auto_vacuum = NONE", - "PRAGMA automatic_index = OFF", - "PRAGMA journal_mode = WAL", - "PRAGMA wal_autocheckpoint = 256", // checkpoint @ 256 pages - "PRAGMA journal_size_limit = 0", // always reset journal and wal files -} +const DefaultDbFilename = "events.db" // Any changes to this schema should be matched for the `lotus-shed indexes backfill-events` command @@ -70,18 +60,6 @@ var ddls = []string{ createIndexEventEntryEventId, createIndexEventsSeenHeight, createIndexEventsSeenTipsetKeyCid, - - // metadata containing version of schema - `CREATE TABLE IF NOT EXISTS _meta ( - version UINT64 NOT NULL UNIQUE - )`, - - `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, - `INSERT OR IGNORE INTO _meta (version) VALUES (2)`, - `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, - `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, - `INSERT OR IGNORE INTO _meta (version) VALUES (5)`, - `INSERT OR IGNORE INTO _meta (version) VALUES (6)`, } var ( @@ -89,8 +67,6 @@ var ( ) const ( - schemaVersion = 6 - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` @@ -212,436 +188,27 @@ func (ei *EventIndex) initStatements() (err error) { return nil } -func (ei *EventIndex) migrateToVersion2(ctx context.Context, chainStore *store.ChainStore) error { - now := time.Now() - - tx, err := ei.db.BeginTx(ctx, nil) - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - // rollback the transaction (a no-op if the transaction was already committed) - defer func() { _ = tx.Rollback() }() - - // create some temporary indices to help speed up the migration - _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)") - if err != nil { - return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err) - } - _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)") - if err != nil { - return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err) - } - - stmtDeleteOffChainEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid!=? and height=?") - if err != nil { - return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) - } - - stmtSelectEvent, err := tx.PrepareContext(ctx, "SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1") - if err != nil { - return xerrors.Errorf("prepare stmtSelectEvent: %w", err) - } - - stmtDeleteEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid=? AND id= minHeight.Int64 { - if currTs.Height()%1000 == 0 { - log.Infof("Migrating height %d (remaining %d)", currTs.Height(), int64(currTs.Height())-minHeight.Int64) - } - - tsKey := currTs.Parents() - currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey) - if err != nil { - return xerrors.Errorf("get tipset from key: %w", err) - } - log.Debugf("Migrating height %d", currTs.Height()) - - tsKeyCid, err := currTs.Key().Cid() - if err != nil { - return fmt.Errorf("tipset key cid: %w", err) - } - - // delete all events that are not in the canonical chain - _, err = stmtDeleteOffChainEvent.Exec(tsKeyCid.Bytes(), currTs.Height()) - if err != nil { - return xerrors.Errorf("delete off chain event: %w", err) - } - - // find the first eventId from the last time the tipset was applied - var eventId sql.NullInt64 - err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - continue - } - return xerrors.Errorf("select event: %w", err) - } - - // this tipset might not have any events which is ok - if !eventId.Valid { - continue - } - log.Debugf("Deleting all events with id < %d at height %d", eventId.Int64, currTs.Height()) - - res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64) - if err != nil { - return xerrors.Errorf("delete event: %w", err) - } - - nrRowsAffected, err := res.RowsAffected() - if err != nil { - return xerrors.Errorf("rows affected: %w", err) - } - log.Debugf("deleted %d events from tipset %s", nrRowsAffected, tsKeyCid.String()) - } - - // delete all entries that have an event_id that doesn't exist (since we don't have a foreign - // key constraint that gives us cascading deletes) - res, err := tx.ExecContext(ctx, "DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)") - if err != nil { - return xerrors.Errorf("delete event_entry: %w", err) - } - - nrRowsAffected, err := res.RowsAffected() - if err != nil { - return xerrors.Errorf("rows affected: %w", err) - } - log.Infof("Cleaned up %d entries that had deleted events\n", nrRowsAffected) - - // drop the temporary indices after the migration - _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid") - if err != nil { - return xerrors.Errorf("drop index tmp_tipset_key_cid: %w", err) - } - _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_height_tipset_key_cid") - if err != nil { - return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err) - } - - // original v2 migration introduced an index: - // CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key) - // which has subsequently been removed in v4, so it's omitted here - - // increment the schema version to 2 in _meta table. - _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (2)") - if err != nil { - return xerrors.Errorf("increment _meta version: %w", err) - } - - err = tx.Commit() - if err != nil { - return xerrors.Errorf("commit transaction: %w", err) - } - - log.Infof("Successfully migrated event index from version 1 to version 2 in %s", time.Since(now)) - - return nil -} - -// migrateToVersion3 migrates the schema from version 2 to version 3 by creating two indices: -// 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key column. -func (ei *EventIndex) migrateToVersion3(ctx context.Context) error { - now := time.Now() - - tx, err := ei.db.BeginTx(ctx, nil) - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - // create index on event.emitter_addr. - _, err = tx.ExecContext(ctx, createIndexEventEmitterAddr) - if err != nil { - return xerrors.Errorf("create index event_emitter_addr: %w", err) - } - - // original v3 migration introduced an index: - // CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key) - // which has subsequently been removed in v4, so it's omitted here - - // increment the schema version to 3 in _meta table. - _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (3)") - if err != nil { - return xerrors.Errorf("increment _meta version: %w", err) - } - - err = tx.Commit() - if err != nil { - return xerrors.Errorf("commit transaction: %w", err) - } - log.Infof("Successfully migrated event index from version 2 to version 3 in %s", time.Since(now)) - return nil -} - -// migrateToVersion4 migrates the schema from version 3 to version 4 by adjusting indexes to match -// the query patterns of the event filter. -// -// First it drops indexes introduced in previous migrations: -// 1. the index on the event.height and event.tipset_key columns -// 2. the index on the event_entry.key column -// -// And then creating the following indices: -// 1. an index on the event.tipset_key_cid column -// 2. an index on the event.height column -// 3. an index on the event.reverted column -// 4. an index on the event_entry.indexed and event_entry.key columns -// 5. an index on the event_entry.codec and event_entry.value columns -// 6. an index on the event_entry.event_id column -func (ei *EventIndex) migrateToVersion4(ctx context.Context) error { - now := time.Now() - - tx, err := ei.db.BeginTx(ctx, nil) - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - for _, create := range []struct { - desc string - query string - }{ - {"drop index height_tipset_key", "DROP INDEX IF EXISTS height_tipset_key;"}, - {"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"}, - {"create index event_tipset_key_cid", createIndexEventTipsetKeyCid}, - {"create index event_height", createIndexEventHeight}, - {"create index event_reverted", createIndexEventReverted}, - {"create index event_entry_indexed_key", createIndexEventEntryIndexedKey}, - {"create index event_entry_codec_value", createIndexEventEntryCodecValue}, - {"create index event_entry_event_id", createIndexEventEntryEventId}, - } { - _, err = tx.ExecContext(ctx, create.query) - if err != nil { - return xerrors.Errorf("%s: %w", create.desc, err) - } - } - - if _, err = tx.Exec("INSERT OR IGNORE INTO _meta (version) VALUES (4)"); err != nil { - return xerrors.Errorf("increment _meta version: %w", err) - } - - err = tx.Commit() - if err != nil { - return xerrors.Errorf("commit transaction: %w", err) - } - - log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now)) - return nil -} - -func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { - now := time.Now() - - tx, err := ei.db.BeginTx(ctx, nil) - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1") - if err != nil { - return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err) - } - - _, err = stmtEventIndexUpdate.ExecContext(ctx) - if err != nil { - return xerrors.Errorf("update event index: %w", err) - } - - _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)") - if err != nil { - return xerrors.Errorf("increment _meta version: %w", err) - } - - err = tx.Commit() - if err != nil { - return xerrors.Errorf("commit transaction: %w", err) - } - - log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) - return nil -} - -func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { - now := time.Now() - - tx, err := ei.db.BeginTx(ctx, nil) - if err != nil { - return xerrors.Errorf("begin transaction: %w", err) - } - defer func() { _ = tx.Rollback() }() - - stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen) - if err != nil { - return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err) - } - _, err = stmtCreateTableEventsSeen.ExecContext(ctx) - if err != nil { - return xerrors.Errorf("create table events_seen: %w", err) - } - - _, err = tx.ExecContext(ctx, createIndexEventsSeenHeight) - if err != nil { - return xerrors.Errorf("create index events_seen_height: %w", err) - } - _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) - if err != nil { - return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) - } - - // INSERT an entry in the events_seen table for all epochs we do have events for in our DB - _, err = tx.ExecContext(ctx, ` - INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted) - SELECT DISTINCT height, tipset_key_cid, reverted FROM event -`) - if err != nil { - return xerrors.Errorf("insert events into events_seen: %w", err) - } - - _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)") - if err != nil { - return xerrors.Errorf("increment _meta version: %w", err) - } - - err = tx.Commit() - if err != nil { - return xerrors.Errorf("commit transaction: %w", err) - } - - ei.vacuumDBAndCheckpointWAL(ctx) - - log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now)) - return nil -} - -func (ei *EventIndex) vacuumDBAndCheckpointWAL(ctx context.Context) { - // During the large migrations, we have likely increased the WAL size a lot, so lets do some - // simple DB administration to free up space (VACUUM followed by truncating the WAL file) - // as this would be a good time to do it when no other writes are happening. - log.Infof("Performing DB vacuum and wal checkpointing to free up space after the migration") - _, err := ei.db.ExecContext(ctx, "VACUUM") - if err != nil { - log.Warnf("error vacuuming database: %s", err) - } - _, err = ei.db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)") - if err != nil { - log.Warnf("error checkpointing wal: %s", err) - } -} - func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStore) (*EventIndex, error) { - db, err := sql.Open("sqlite3", path+"?mode=rwc") + db, _, err := sqlite.Open(path) if err != nil { - return nil, xerrors.Errorf("open sqlite3 database: %w", err) + return nil, xerrors.Errorf("failed to setup event index db: %w", err) } - for _, pragma := range pragmas { - if _, err := db.Exec(pragma); err != nil { - _ = db.Close() - return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err) - } + err = sqlite.InitDb(ctx, "event index", db, ddls, []sqlite.MigrationFunc{ + migrationVersion2(db, chainStore), + migrationVersion3, + migrationVersion4, + migrationVersion5, + migrationVersion6, + }) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("failed to setup event index db: %w", err) } eventIndex := EventIndex{db: db} - q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") - if q != nil { - defer func() { _ = q.Close() }() - } - if errors.Is(err, sql.ErrNoRows) || !q.Next() { - // empty database, create the schema - for _, ddl := range ddls { - if _, err := db.Exec(ddl); err != nil { - _ = db.Close() - return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err) - } - } - } else if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("looking for _meta table: %w", err) - } else { - // check the schema version to see if we need to upgrade the database schema - var version int - err := db.QueryRow("SELECT max(version) FROM _meta").Scan(&version) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("invalid database version: no version found") - } - - if version == 1 { - log.Infof("Upgrading event index from version 1 to version 2") - err = eventIndex.migrateToVersion2(ctx, chainStore) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("could not migrate event index schema from version 1 to version 2: %w", err) - } - version = 2 - } - - if version == 2 { - log.Infof("Upgrading event index from version 2 to version 3") - err = eventIndex.migrateToVersion3(ctx) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("could not migrate event index schema from version 2 to version 3: %w", err) - } - version = 3 - } - - if version == 3 { - log.Infof("Upgrading event index from version 3 to version 4") - err = eventIndex.migrateToVersion4(ctx) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("could not migrate event index schema from version 3 to version 4: %w", err) - } - version = 4 - } - - if version == 4 { - log.Infof("Upgrading event index from version 4 to version 5") - err = eventIndex.migrateToVersion5(ctx) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("could not migrate event index schema from version 4 to version 5: %w", err) - } - version = 5 - } - - if version == 5 { - log.Infof("Upgrading event index from version 5 to version 6") - err = eventIndex.migrateToVersion6(ctx) - if err != nil { - _ = db.Close() - return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err) - } - version = 6 - } - - if version != schemaVersion { - _ = db.Close() - return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) - } - } - - err = eventIndex.initStatements() - if err != nil { + if err = eventIndex.initStatements(); err != nil { _ = db.Close() return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) } diff --git a/chain/events/filter/index_migrations.go b/chain/events/filter/index_migrations.go new file mode 100644 index 00000000000..fe8a371a513 --- /dev/null +++ b/chain/events/filter/index_migrations.go @@ -0,0 +1,238 @@ +package filter + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/lib/sqlite" +) + +func migrationVersion2(db *sql.DB, chainStore *store.ChainStore) sqlite.MigrationFunc { + return func(ctx context.Context, tx *sql.Tx) error { + // create some temporary indices to help speed up the migration + _, err := tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_height_tipset_key_cid ON event (height,tipset_key_cid)") + if err != nil { + return xerrors.Errorf("create index tmp_height_tipset_key_cid: %w", err) + } + _, err = tx.ExecContext(ctx, "CREATE INDEX IF NOT EXISTS tmp_tipset_key_cid ON event (tipset_key_cid)") + if err != nil { + return xerrors.Errorf("create index tmp_tipset_key_cid: %w", err) + } + + stmtDeleteOffChainEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid!=? and height=?") + if err != nil { + return xerrors.Errorf("prepare stmtDeleteOffChainEvent: %w", err) + } + + stmtSelectEvent, err := tx.PrepareContext(ctx, "SELECT id FROM event WHERE tipset_key_cid=? ORDER BY message_index ASC, event_index ASC, id DESC LIMIT 1") + if err != nil { + return xerrors.Errorf("prepare stmtSelectEvent: %w", err) + } + + stmtDeleteEvent, err := tx.PrepareContext(ctx, "DELETE FROM event WHERE tipset_key_cid=? AND id= minHeight.Int64 { + if currTs.Height()%1000 == 0 { + log.Infof("Migrating height %d (remaining %d)", currTs.Height(), int64(currTs.Height())-minHeight.Int64) + } + + tsKey := currTs.Parents() + currTs, err = chainStore.GetTipSetFromKey(ctx, tsKey) + if err != nil { + return xerrors.Errorf("get tipset from key: %w", err) + } + log.Debugf("Migrating height %d", currTs.Height()) + + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("tipset key cid: %w", err) + } + + // delete all events that are not in the canonical chain + _, err = stmtDeleteOffChainEvent.Exec(tsKeyCid.Bytes(), currTs.Height()) + if err != nil { + return xerrors.Errorf("delete off chain event: %w", err) + } + + // find the first eventId from the last time the tipset was applied + var eventId sql.NullInt64 + err = stmtSelectEvent.QueryRow(tsKeyCid.Bytes()).Scan(&eventId) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } + return xerrors.Errorf("select event: %w", err) + } + + // this tipset might not have any events which is ok + if !eventId.Valid { + continue + } + log.Debugf("Deleting all events with id < %d at height %d", eventId.Int64, currTs.Height()) + + res, err := stmtDeleteEvent.Exec(tsKeyCid.Bytes(), eventId.Int64) + if err != nil { + return xerrors.Errorf("delete event: %w", err) + } + + nrRowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("rows affected: %w", err) + } + log.Debugf("deleted %d events from tipset %s", nrRowsAffected, tsKeyCid.String()) + } + + // delete all entries that have an event_id that doesn't exist (since we don't have a foreign + // key constraint that gives us cascading deletes) + res, err := tx.ExecContext(ctx, "DELETE FROM event_entry WHERE event_id NOT IN (SELECT id FROM event)") + if err != nil { + return xerrors.Errorf("delete event_entry: %w", err) + } + + nrRowsAffected, err := res.RowsAffected() + if err != nil { + return xerrors.Errorf("rows affected: %w", err) + } + log.Infof("Cleaned up %d entries that had deleted events\n", nrRowsAffected) + + // drop the temporary indices after the migration + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_tipset_key_cid") + if err != nil { + return xerrors.Errorf("drop index tmp_tipset_key_cid: %w", err) + } + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS tmp_height_tipset_key_cid") + if err != nil { + return xerrors.Errorf("drop index tmp_height_tipset_key_cid: %w", err) + } + + // original v2 migration introduced an index: + // CREATE INDEX IF NOT EXISTS height_tipset_key ON event (height,tipset_key) + // which has subsequently been removed in v4, so it's omitted here + + return nil + } +} + +// migrationVersion3 migrates the schema from version 2 to version 3 by creating two indices: +// 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key column. +func migrationVersion3(ctx context.Context, tx *sql.Tx) error { + // create index on event.emitter_addr. + _, err := tx.ExecContext(ctx, createIndexEventEmitterAddr) + if err != nil { + return xerrors.Errorf("create index event_emitter_addr: %w", err) + } + + // original v3 migration introduced an index: + // CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key) + // which has subsequently been removed in v4, so it's omitted here + + return nil +} + +// migrationVersion4 migrates the schema from version 3 to version 4 by adjusting indexes to match +// the query patterns of the event filter. +// +// First it drops indexes introduced in previous migrations: +// 1. the index on the event.height and event.tipset_key columns +// 2. the index on the event_entry.key column +// +// And then creating the following indices: +// 1. an index on the event.tipset_key_cid column +// 2. an index on the event.height column +// 3. an index on the event.reverted column +// 4. an index on the event_entry.indexed and event_entry.key columns +// 5. an index on the event_entry.codec and event_entry.value columns +// 6. an index on the event_entry.event_id column +func migrationVersion4(ctx context.Context, tx *sql.Tx) error { + for _, create := range []struct { + desc string + query string + }{ + {"drop index height_tipset_key", "DROP INDEX IF EXISTS height_tipset_key;"}, + {"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"}, + {"create index event_tipset_key_cid", createIndexEventTipsetKeyCid}, + {"create index event_height", createIndexEventHeight}, + {"create index event_reverted", createIndexEventReverted}, + {"create index event_entry_indexed_key", createIndexEventEntryIndexedKey}, + {"create index event_entry_codec_value", createIndexEventEntryCodecValue}, + {"create index event_entry_event_id", createIndexEventEntryEventId}, + } { + if _, err := tx.ExecContext(ctx, create.query); err != nil { + return xerrors.Errorf("%s: %w", create.desc, err) + } + } + + return nil +} + +// migrationVersion5 migrates the schema from version 4 to version 5 by updating the event_index +// to be 0-indexed within a tipset. +func migrationVersion5(ctx context.Context, tx *sql.Tx) error { + stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1") + if err != nil { + return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err) + } + + _, err = stmtEventIndexUpdate.ExecContext(ctx) + if err != nil { + return xerrors.Errorf("update event index: %w", err) + } + + return nil +} + +// migrationVersion6 migrates the schema from version 5 to version 6 by creating a new table +// events_seen that tracks the tipsets that have been seen by the event filter and populating it +// with the tipsets that have events in the event table. +func migrationVersion6(ctx context.Context, tx *sql.Tx) error { + stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen) + if err != nil { + return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err) + } + _, err = stmtCreateTableEventsSeen.ExecContext(ctx) + if err != nil { + return xerrors.Errorf("create table events_seen: %w", err) + } + + _, err = tx.ExecContext(ctx, createIndexEventsSeenHeight) + if err != nil { + return xerrors.Errorf("create index events_seen_height: %w", err) + } + _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) + if err != nil { + return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) + } + + // INSERT an entry in the events_seen table for all epochs we do have events for in our DB + _, err = tx.ExecContext(ctx, ` + INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted) + SELECT DISTINCT height, tipset_key_cid, reverted FROM event +`) + if err != nil { + return xerrors.Errorf("insert events into events_seen: %w", err) + } + + return nil +} diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index e9e81ae2cf5..db1a4b29166 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -3,10 +3,7 @@ package index import ( "context" "database/sql" - "errors" - "io/fs" "os" - "path" "sync" "time" @@ -19,34 +16,20 @@ import ( "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/sqlite" ) +const DefaultDbFilename = "msgindex.db" + var log = logging.Logger("msgindex") -var dbName = "msgindex.db" -var dbDefs = []string{ +var ddls = []string{ `CREATE TABLE IF NOT EXISTS messages ( cid VARCHAR(80) PRIMARY KEY ON CONFLICT REPLACE, tipset_cid VARCHAR(80) NOT NULL, epoch INTEGER NOT NULL )`, - `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid) - `, - `CREATE TABLE IF NOT EXISTS _meta ( - version UINT64 NOT NULL UNIQUE - )`, - `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, -} - -var dbPragmas = []string{ - "PRAGMA synchronous = normal", - "PRAGMA temp_store = memory", - "PRAGMA mmap_size = 30000000000", - "PRAGMA page_size = 32768", - "PRAGMA auto_vacuum = NONE", - "PRAGMA automatic_index = OFF", - "PRAGMA journal_mode = WAL", - "PRAGMA read_uncommitted = ON", + `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`, } const ( @@ -105,39 +88,15 @@ type headChange struct { app []*types.TipSet } -func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex, error) { - var ( - dbPath string - exists bool - err error - ) - - err = os.MkdirAll(basePath, 0755) - if err != nil { - return nil, xerrors.Errorf("error creating msgindex base directory: %w", err) - } - - dbPath = path.Join(basePath, dbName) - _, err = os.Stat(dbPath) - switch { - case err == nil: - exists = true - - case errors.Is(err, fs.ErrNotExist): - - case err != nil: - return nil, xerrors.Errorf("error stating msgindex database: %w", err) - } - - db, err := sql.Open("sqlite3", dbPath) +func NewMsgIndex(lctx context.Context, path string, cs ChainStore) (MsgIndex, error) { + db, exists, err := sqlite.Open(path) if err != nil { - // TODO [nice to have]: automatically delete corrupt databases - // but for now we can just error and let the operator delete. - return nil, xerrors.Errorf("error opening msgindex database: %w", err) + return nil, xerrors.Errorf("failed to setup message index db: %w", err) } - if err := prepareDB(db); err != nil { - return nil, xerrors.Errorf("error creating msgindex database: %w", err) + if err = sqlite.InitDb(lctx, "message index", db, ddls, []sqlite.MigrationFunc{}); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("failed to init message index db: %w", err) } // TODO we may consider populating the index when first creating the db @@ -179,24 +138,17 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex return msgIndex, nil } -func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) error { - err := os.MkdirAll(basePath, 0755) - if err != nil { - return xerrors.Errorf("error creating msgindex base directory: %w", err) - } - - dbPath := path.Join(basePath, dbName) - +func PopulateAfterSnapshot(lctx context.Context, path string, cs ChainStore) error { // if a database already exists, we try to delete it and create a new one - if _, err := os.Stat(dbPath); err == nil { - if err = os.Remove(dbPath); err != nil { - return xerrors.Errorf("msgindex already exists at %s and can't be deleted", dbPath) + if _, err := os.Stat(path); err == nil { + if err = os.Remove(path); err != nil { + return xerrors.Errorf("msgindex already exists at %s and can't be deleted", path) } } - db, err := sql.Open("sqlite3", dbPath) + db, _, err := sqlite.Open(path) if err != nil { - return xerrors.Errorf("error opening msgindex database: %w", err) + return xerrors.Errorf("failed to setup message index db: %w", err) } defer func() { if err := db.Close(); err != nil { @@ -204,7 +156,7 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) } }() - if err := prepareDB(db); err != nil { + if err := sqlite.InitDb(lctx, "message index", db, ddls, []sqlite.MigrationFunc{}); err != nil { return xerrors.Errorf("error creating msgindex database: %w", err) } @@ -266,23 +218,6 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) return nil } -// init utilities -func prepareDB(db *sql.DB) error { - for _, stmt := range dbDefs { - if _, err := db.Exec(stmt); err != nil { - return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err) - } - } - - for _, stmt := range dbPragmas { - if _, err := db.Exec(stmt); err != nil { - return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err) - } - } - - return nil -} - func reconcileIndex(db *sql.DB, cs ChainStore) error { // Invariant: after reconciliation, every tipset in the index is in the current chain; ie either // the chain head or reachable by walking the chain. diff --git a/chain/index/msgindex_test.go b/chain/index/msgindex_test.go index bf4bc6190e8..2cf707b0fed 100644 --- a/chain/index/msgindex_test.go +++ b/chain/index/msgindex_test.go @@ -30,7 +30,7 @@ func TestBasicMsgIndex(t *testing.T) { tmp := t.TempDir() t.Cleanup(func() { _ = os.RemoveAll(tmp) }) - msgIndex, err := NewMsgIndex(context.Background(), tmp, cs) + msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs) require.NoError(t, err) defer msgIndex.Close() //nolint @@ -58,7 +58,7 @@ func TestReorgMsgIndex(t *testing.T) { tmp := t.TempDir() t.Cleanup(func() { _ = os.RemoveAll(tmp) }) - msgIndex, err := NewMsgIndex(context.Background(), tmp, cs) + msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs) require.NoError(t, err) defer msgIndex.Close() //nolint @@ -103,7 +103,7 @@ func TestReconcileMsgIndex(t *testing.T) { tmp := t.TempDir() t.Cleanup(func() { _ = os.RemoveAll(tmp) }) - msgIndex, err := NewMsgIndex(context.Background(), tmp, cs) + msgIndex, err := NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs) require.NoError(t, err) for i := 0; i < 10; i++ { @@ -130,7 +130,7 @@ func TestReconcileMsgIndex(t *testing.T) { require.NoError(t, err) // reopen to reconcile - msgIndex, err = NewMsgIndex(context.Background(), tmp, cs) + msgIndex, err = NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs) require.NoError(t, err) defer msgIndex.Close() //nolint diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index ba2936e2a04..9910ffe48f9 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "os" - "path" "path/filepath" "runtime/pprof" "strings" @@ -640,7 +639,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) } if cfg.Index.EnableMsgIndex { log.Info("populating message index...") - if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + basePath, err := lr.SqlitePath() + if err != nil { + return err + } + if err := index.PopulateAfterSnapshot(ctx, filepath.Join(basePath, index.DefaultDbFilename), cst); err != nil { return err } log.Info("populating message index done") diff --git a/itests/msgindex_test.go b/itests/msgindex_test.go index 807ab3c03f0..d9ed752797e 100644 --- a/itests/msgindex_test.go +++ b/itests/msgindex_test.go @@ -52,7 +52,7 @@ func testMsgIndex( makeMsgIndex := func(cs *store.ChainStore) (index.MsgIndex, error) { var err error tmp := t.TempDir() - msgIndex, err := index.NewMsgIndex(context.Background(), tmp, cs) + msgIndex, err := index.NewMsgIndex(context.Background(), tmp+"/msgindex.db", cs) if err == nil { mx.Lock() tmpDirs = append(tmpDirs, tmp) diff --git a/lib/sqlite/sqlite.go b/lib/sqlite/sqlite.go new file mode 100644 index 00000000000..a638043b6cb --- /dev/null +++ b/lib/sqlite/sqlite.go @@ -0,0 +1,165 @@ +package sqlite + +import ( + "context" + "database/sql" + "errors" + "io/fs" + "os" + "path/filepath" + "strconv" + "time" + + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" +) + +var log = logging.Logger("sqlite") + +type MigrationFunc func(ctx context.Context, tx *sql.Tx) error + +var pragmas = []string{ + "PRAGMA synchronous = normal", + "PRAGMA temp_store = memory", + "PRAGMA mmap_size = 30000000000", + "PRAGMA page_size = 32768", + "PRAGMA auto_vacuum = NONE", + "PRAGMA automatic_index = OFF", + "PRAGMA journal_mode = WAL", + "PRAGMA wal_autocheckpoint = 256", // checkpoint @ 256 pages + "PRAGMA journal_size_limit = 0", // always reset journal and wal files +} + +const metaTableDdl = `CREATE TABLE IF NOT EXISTS _meta ( + version UINT64 NOT NULL UNIQUE +)` + +// metaDdl returns the DDL statements required to create the _meta table and add the required +// up to the given version. +func metaDdl(version uint64) []string { + var ddls []string + for i := 1; i <= int(version); i++ { + ddls = append(ddls, `INSERT OR IGNORE INTO _meta (version) VALUES (`+strconv.Itoa(i)+`)`) + } + return append([]string{metaTableDdl}, ddls...) +} + +// Open opens a database at the given path. If the database does not exist, it will be created. +func Open(path string) (*sql.DB, bool, error) { + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return nil, false, xerrors.Errorf("error creating database base directory [@ %s]: %w", path, err) + } + + _, err := os.Stat(path) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, false, xerrors.Errorf("error checking file status for database [@ %s]: %w", path, err) + } + exists := err == nil + + db, err := sql.Open("sqlite3", path+"?mode=rwc") + if err != nil { + return nil, false, xerrors.Errorf("error opening database [@ %s]: %w", path, err) + } + + for _, pragma := range pragmas { + if _, err := db.Exec(pragma); err != nil { + _ = db.Close() + return nil, false, xerrors.Errorf("error setting database pragma %q: %w", pragma, err) + } + } + + return db, exists, nil +} + +// InitDb initializes the database by checking whether it needs to be created or upgraded. +// The ddls are the DDL statements to create the tables in the database and their initial required +// content. The schemaVersion will be set inside the databse if it is newly created. Otherwise, the +// version is read from the databse and returned. This value should be checked against the expected +// version to determine if the database needs to be upgraded. +func InitDb( + ctx context.Context, + name string, + db *sql.DB, + ddls []string, + versionMigrations []MigrationFunc, +) error { + + schemaVersion := len(versionMigrations) + 1 + + q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") + if q != nil { + defer func() { _ = q.Close() }() + } + + if errors.Is(err, sql.ErrNoRows) || !q.Next() { + // empty database, create the schema including the _meta table and its versions + ddls := append(metaDdl(uint64(schemaVersion)), ddls...) + for _, ddl := range ddls { + if _, err := db.Exec(ddl); err != nil { + return xerrors.Errorf("failed to %s database execute ddl %q: %w", name, ddl, err) + } + } + return nil + } + + if err != nil { + return xerrors.Errorf("error looking for %s database _meta table: %w", name, err) + } + + if err := q.Close(); err != nil { + return xerrors.Errorf("error closing %s database _meta table query: %w", name, err) + } + + // check the schema version to see if we need to upgrade the database schema + var foundVersion int + if err = db.QueryRow("SELECT max(version) FROM _meta").Scan(&foundVersion); err != nil { + return xerrors.Errorf("invalid %s database version: no version found", name) + } + + if foundVersion > schemaVersion { + return xerrors.Errorf("invalid %s database version: version %d is greater than the number of migrations %d", name, foundVersion, len(versionMigrations)) + } + + runVacuum := foundVersion != schemaVersion + + for i := foundVersion + 1; i <= schemaVersion; i++ { + now := time.Now() + + log.Infof("Migrating %s database to version %d...", name, i) + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("failed to start %s database transaction: %w", name, err) + } + defer func() { _ = tx.Rollback() }() + // versions start at 1, but the migrations are 0-indexed where the first migration would take us to version 2 + if err := versionMigrations[i-2](ctx, tx); err != nil { + return xerrors.Errorf("failed to migrate %s database to version %d: %w", name, i, err) + } + if _, err := tx.ExecContext(ctx, `INSERT OR IGNORE INTO _meta (version) VALUES (?)`, i); err != nil { + return xerrors.Errorf("failed to update %s database _meta table: %w", name, err) + } + if err := tx.Commit(); err != nil { + return xerrors.Errorf("failed to commit %s database v%d migration transaction: %w", name, i, err) + } + + log.Infof("Successfully migrated %s database from version %d to %d in %s", name, i-1, i, time.Since(now)) + } + + if runVacuum { + // During the large migrations, we have likely increased the WAL size a lot, so lets do some + // simple DB administration to free up space (VACUUM followed by truncating the WAL file) + // as this would be a good time to do it when no other writes are happening. + log.Infof("Performing %s database vacuum and wal checkpointing to free up space after the migration", name) + _, err := db.ExecContext(ctx, "VACUUM") + if err != nil { + log.Warnf("error vacuuming %s database: %s", name, err) + } + _, err = db.ExecContext(ctx, "PRAGMA wal_checkpoint(TRUNCATE)") + if err != nil { + log.Warnf("error checkpointing %s database wal: %s", name, err) + } + } + + return nil +} diff --git a/lib/sqlite/sqlite_test.go b/lib/sqlite/sqlite_test.go new file mode 100644 index 00000000000..bda6432f5e6 --- /dev/null +++ b/lib/sqlite/sqlite_test.go @@ -0,0 +1,242 @@ +package sqlite_test + +import ( + "context" + "database/sql" + "path/filepath" + "strings" + "testing" + + _ "github.com/mattn/go-sqlite3" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/lib/sqlite" +) + +func TestSqlite(t *testing.T) { + req := require.New(t) + + ddl := []string{ + `CREATE TABLE IF NOT EXISTS blip ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + blip_name TEXT NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS bloop ( + blip_id INTEGER NOT NULL, + bloop_name TEXT NOT NULL, + FOREIGN KEY (blip_id) REFERENCES blip(id) + )`, + `CREATE INDEX IF NOT EXISTS blip_name_index ON blip (blip_name)`, + } + + tmpDir := t.TempDir() + dbPath := filepath.Join(tmpDir, "/test.db") + + db, exists, err := sqlite.Open(dbPath) + req.NoError(err) + req.False(exists) + req.NotNil(db) + + err = sqlite.InitDb(context.Background(), "testdb", db, ddl, nil) + req.NoError(err) + + // insert some data + + r, err := db.Exec("INSERT INTO blip (blip_name) VALUES ('blip1')") + req.NoError(err) + id, err := r.LastInsertId() + req.NoError(err) + req.Equal(int64(1), id) + _, err = db.Exec("INSERT INTO bloop (blip_id, bloop_name) VALUES (?, 'bloop1')", id) + req.NoError(err) + r, err = db.Exec("INSERT INTO blip (blip_name) VALUES ('blip2')") + req.NoError(err) + id, err = r.LastInsertId() + req.NoError(err) + req.Equal(int64(2), id) + _, err = db.Exec("INSERT INTO bloop (blip_id, bloop_name) VALUES (?, 'bloop2')", id) + req.NoError(err) + + // check that the db contains what we think it should + + expectedIndexes := []string{"blip_name_index"} + + expectedData := []tabledata{ + { + name: "_meta", + cols: []string{"version"}, + data: [][]interface{}{ + {int64(1)}, + }, + }, + { + name: "blip", + cols: []string{"id", "blip_name"}, + data: [][]interface{}{ + {int64(1), "blip1"}, + {int64(2), "blip2"}, + }, + }, + { + name: "bloop", + cols: []string{"blip_id", "bloop_name"}, + data: [][]interface{}{ + {int64(1), "bloop1"}, + {int64(2), "bloop2"}, + }, + }, + } + + actualIndexes, actualData := dumpTables(t, db) + req.Equal(expectedIndexes, actualIndexes) + req.Equal(expectedData, actualData) + + req.NoError(db.Close()) + + // open again, check contents is the same + + db, exists, err = sqlite.Open(dbPath) + req.NoError(err) + req.True(exists) + req.NotNil(db) + + err = sqlite.InitDb(context.Background(), "testdb", db, ddl, nil) + req.NoError(err) + + // database should contain the same things + + actualIndexes, actualData = dumpTables(t, db) + req.Equal(expectedIndexes, actualIndexes) + req.Equal(expectedData, actualData) + + req.NoError(db.Close()) + + // open again, with a migration + + db, exists, err = sqlite.Open(dbPath) + req.NoError(err) + req.True(exists) + req.NotNil(db) + + migration1 := func(ctx context.Context, tx *sql.Tx) error { + _, err := tx.Exec("ALTER TABLE blip ADD COLUMN blip_extra TEXT NOT NULL DEFAULT '!'") + return err + } + + err = sqlite.InitDb(context.Background(), "testdb", db, ddl, []sqlite.MigrationFunc{migration1}) + req.NoError(err) + + // also add something new + r, err = db.Exec("INSERT INTO blip (blip_name, blip_extra) VALUES ('blip1', '!!!')") + req.NoError(err) + id, err = r.LastInsertId() + req.NoError(err) + _, err = db.Exec("INSERT INTO bloop (blip_id, bloop_name) VALUES (?, 'bloop3')", id) + req.NoError(err) + + // database should contain new stuff + + expectedData[0].data = append(expectedData[0].data, []interface{}{int64(2)}) // _meta schema version 2 + expectedData[1] = tabledata{ + name: "blip", + cols: []string{"id", "blip_name", "blip_extra"}, + data: [][]interface{}{ + {int64(1), "blip1", "!"}, + {int64(2), "blip2", "!"}, + {int64(3), "blip1", "!!!"}, + }, + } + expectedData[2].data = append(expectedData[2].data, []interface{}{int64(3), "bloop3"}) + + actualIndexes, actualData = dumpTables(t, db) + req.Equal(expectedIndexes, actualIndexes) + req.Equal(expectedData, actualData) + + req.NoError(db.Close()) + + // open again, with another migration + + db, exists, err = sqlite.Open(dbPath) + req.NoError(err) + req.True(exists) + req.NotNil(db) + + migration2 := func(ctx context.Context, tx *sql.Tx) error { + // add an index + _, err := tx.Exec("CREATE INDEX IF NOT EXISTS blip_extra_index ON blip (blip_extra)") + return err + } + + err = sqlite.InitDb(context.Background(), "testdb", db, ddl, []sqlite.MigrationFunc{migration1, migration2}) + req.NoError(err) + + // database should contain new stuff + + expectedData[0].data = append(expectedData[0].data, []interface{}{int64(3)}) // _meta schema version 3 + expectedIndexes = append(expectedIndexes, "blip_extra_index") + + actualIndexes, actualData = dumpTables(t, db) + req.Equal(expectedIndexes, actualIndexes) + req.Equal(expectedData, actualData) + + req.NoError(db.Close()) +} + +func dumpTables(t *testing.T, db *sql.DB) ([]string, []tabledata) { + req := require.New(t) + + var indexes []string + rows, err := db.Query("SELECT name FROM sqlite_master WHERE type='index'") + req.NoError(err) + for rows.Next() { + var name string + err = rows.Scan(&name) + req.NoError(err) + if !strings.Contains(name, "sqlite_autoindex") { + indexes = append(indexes, name) + } + } + + var data []tabledata + rows, err = db.Query("SELECT name, sql FROM sqlite_master WHERE type = 'table'") + req.NoError(err) + for rows.Next() { + var name, sql string + err = rows.Scan(&name, &sql) + req.NoError(err) + if strings.HasPrefix(name, "sqlite") { + continue + } + sqla := strings.Split(sql, "\n") + cols := []string{} + for _, s := range sqla { + // alter table does funky things to the sql, hence the "," ReplaceAll: + s = strings.Split(strings.TrimSpace(strings.ReplaceAll(s, ",", "")), " ")[0] + switch s { + case "CREATE", "FOREIGN", "", ")": + default: + cols = append(cols, s) + } + } + data = append(data, tabledata{name: name, cols: cols}) + rows2, err := db.Query("SELECT * FROM " + name) + req.NoError(err) + for rows2.Next() { + vals := make([]interface{}, len(cols)) + vals2 := make([]interface{}, len(cols)) + for i := range vals { + vals[i] = &vals2[i] + } + err = rows2.Scan(vals...) + req.NoError(err) + data[len(data)-1].data = append(data[len(data)-1].data, vals2) + } + } + return indexes, data +} + +type tabledata struct { + name string + cols []string + data [][]interface{} +} diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 34080dbae23..22c5c5bc587 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -108,7 +108,7 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L if err != nil { return nil, xerrors.Errorf("failed to resolve event index database path: %w", err) } - dbPath = filepath.Join(sqlitePath, "events.db") + dbPath = filepath.Join(sqlitePath, filter.DefaultDbFilename) } else { dbPath = cfg.DatabasePath } diff --git a/node/modules/ethmodule.go b/node/modules/ethmodule.go index b36416e4e56..1360daf1a89 100644 --- a/node/modules/ethmodule.go +++ b/node/modules/ethmodule.go @@ -23,18 +23,20 @@ import ( func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI) (*full.EthModule, error) { return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI) (*full.EthModule, error) { + ctx := helpers.LifecycleCtx(mctx, lc) + sqlitePath, err := r.SqlitePath() if err != nil { return nil, err } - dbPath := filepath.Join(sqlitePath, "txhash.db") + dbPath := filepath.Join(sqlitePath, ethhashlookup.DefaultDbFilename) // Check if the db exists, if not, we'll back-fill some entries _, err = os.Stat(dbPath) dbAlreadyExists := err == nil - transactionHashLookup, err := ethhashlookup.NewTransactionHashLookup(dbPath) + transactionHashLookup, err := ethhashlookup.NewTransactionHashLookup(ctx, dbPath) if err != nil { return nil, err } @@ -68,7 +70,6 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep log.Infof("Prefilling GetTipsetByHeight done in %s", time.Since(start)) }() - ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ OnStart: func(context.Context) error { ev, err := events.NewEvents(ctx, &evapi) diff --git a/node/modules/msgindex.go b/node/modules/msgindex.go index 72e9840ba33..423be65d1b7 100644 --- a/node/modules/msgindex.go +++ b/node/modules/msgindex.go @@ -2,6 +2,7 @@ package modules import ( "context" + "path/filepath" "go.uber.org/fx" @@ -17,7 +18,7 @@ func MsgIndex(lc fx.Lifecycle, mctx helpers.MetricsCtx, cs *store.ChainStore, r return nil, err } - msgIndex, err := index.NewMsgIndex(helpers.LifecycleCtx(mctx, lc), basePath, cs) + msgIndex, err := index.NewMsgIndex(helpers.LifecycleCtx(mctx, lc), filepath.Join(basePath, index.DefaultDbFilename), cs) if err != nil { return nil, err }