Skip to content

Commit

Permalink
event index should be unique for tipsets
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed May 1, 2024
1 parent aa76a45 commit 569e42b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
10 changes: 7 additions & 3 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ Filter = (*eventFilter)(nil)
type CollectedEvent struct {
Entries []types.EventEntry
EmitterAddr address.Address // address of emitter
EventIdx int // index of the event within the list of emitted events
EventIdx int // index of the event within the list of emitted events in a given tipset
Reverted bool
Height abi.ChainEpoch
TipSetKey types.TipSetKey // tipset that contained the message
Expand Down Expand Up @@ -94,8 +94,11 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}

eventCount := 0

for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
for _, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
Expand All @@ -119,7 +122,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
cev := &CollectedEvent{
Entries: ev.Entries,
EmitterAddr: addr,
EventIdx: evIdx,
EventIdx: eventCount,
Reverted: revert,
Height: te.msgTs.Height(),
TipSetKey: te.msgTs.Key(),
Expand All @@ -141,6 +144,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}
f.collected = append(f.collected, cev)
f.mu.Unlock()
eventCount++
}
}

Expand Down
20 changes: 16 additions & 4 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,16 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error {
}
}

stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1")
if err != nil {
return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err)
}

_, err = stmtEventIndexUpdate.ExecContext(ctx)
if err != nil {
return xerrors.Errorf("update event index: %w", err)
}

err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit transaction: %w", err)
Expand Down Expand Up @@ -501,10 +511,11 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("load executed messages: %w", err)
}

eventCount := 0
// iterate over all executed messages in this tipset and insert them into the database if they
// don't exist, otherwise mark them as not reverted
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
for _, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
Expand All @@ -528,7 +539,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
eventCount, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
).Scan(&entryID)
Expand All @@ -543,7 +554,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
eventCount, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
false, // reverted
Expand Down Expand Up @@ -578,7 +589,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
eventCount, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
)
Expand All @@ -596,6 +607,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
}
}
eventCount++
}
}

Expand Down

0 comments on commit 569e42b

Please sign in to comment.