Skip to content

Commit

Permalink
feat: store processed tipset after backfilling events
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Aug 2, 2024
1 parent 10f6976 commit affbb12
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (

const (
// same as in chain/events/index.go
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand Down Expand Up @@ -176,6 +177,11 @@ var backfillEventsCmd = &cli.Command{
return err
}

stmtUpsertEventSeen, err := db.Prepare(upsertEventsSeen)
if err != nil {
return err
}

processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error {
var tx *sql.Tx
for {
Expand All @@ -196,6 +202,11 @@ var backfillEventsCmd = &cli.Command{
var eventsAffected int64
var entriesAffected int64

tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// loop over each message receipt and backfill the events
for idx, receipt := range receipts {
msg := msgs[idx]
Expand Down Expand Up @@ -225,11 +236,6 @@ var backfillEventsCmd = &cli.Command{
addressLookups[event.Emitter] = addr
}

tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
}

// select the highest event id that exists in database, or null if none exists
var entryID sql.NullInt64
err = tx.Stmt(stmtEventExists).QueryRow(
Expand Down Expand Up @@ -299,6 +305,16 @@ var backfillEventsCmd = &cli.Command{
}
}

// this statement will mark the tipset as processed and will insert a new row if it doesn't exist
// or update the reverted field to false if it does
_, err = tx.Stmt(stmtUpsertEventSeen).Exec(
currTs.Height(),
tsKeyCid.Bytes(),
)
if err != nil {
return xerrors.Errorf("exec upsert events seen: %w", err)
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
Expand Down

0 comments on commit affbb12

Please sign in to comment.