Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: events: Index events by emitter actor ID #11723

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 64 additions & 67 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,49 +27,48 @@ func isIndexedValue(b uint8) bool {
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
}

type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)

type EventFilter interface {
Filter

TakeCollectedEvents(context.Context) []*CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
CollectEvents(context.Context, *TipSetEvents, bool) error
}

type eventFilter struct {
id types.FilterID
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
tipsetCid cid.Cid
addresses []address.Address // list of actor addresses that are extpected to emit the event
actorResolver ActorResolver
id types.FilterID
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
tipsetCid cid.Cid
addresses []address.Address // list of actor addresses that are expected to emit the event

keysWithCodec map[string][]types.ActorEventBlock // map of key names to a list of alternate values that may match
maxResults int // maximum number of results to collect, 0 is unlimited

mu sync.Mutex
collected []*CollectedEvent
lastTaken time.Time
ch chan<- interface{}
ch chan<- any
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

var _ Filter = (*eventFilter)(nil)

type CollectedEvent struct {
Entries []types.EventEntry
EmitterAddr address.Address // address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
Height abi.ChainEpoch
TipSetKey types.TipSetKey // tipset that contained the message
MsgIdx int // index of the message in the tipset
MsgCid cid.Cid // cid of message that produced event
Entries []types.EventEntry
Emitter abi.ActorID // address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
Height abi.ChainEpoch
TipSetKey types.TipSetKey // tipset that contained the message
MsgIdx int // index of the message in the tipset
MsgCid cid.Cid // cid of message that produced event
}

func (f *eventFilter) ID() types.FilterID {
return f.id
}

func (f *eventFilter) SetSubChannel(ch chan<- interface{}) {
func (f *eventFilter) SetSubChannel(ch chan<- any) {
f.mu.Lock()
defer f.mu.Unlock()
f.ch = ch
Expand All @@ -82,49 +81,42 @@ func (f *eventFilter) ClearSubChannel() {
f.ch = nil
}

func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver AddressResolver) error {
func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool) error {
if !f.matchTipset(te) {
return nil
}

// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)

// Resolve filter addresses to actor IDs at the given event TipSet.
emitters, hasAddrs := f.emitters(ctx, te.rctTs)
ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
// lookup address corresponding to the actor id
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
if hasAddrs {
// The filter has at least one address. Therefore, perform a match against
// the emitters even when it may be empty.
// An empty emitters map means the event filter applies to events from actors
// that may have not been deployed yet, e.g. smart contracts.
if _, match := emitters[ev.Emitter]; !match {
continue
}
addressLookups[ev.Emitter] = addr
}

if !f.matchAddress(addr) {
continue
}
if !f.matchKeys(ev.Entries) {
continue
}

// event matches filter, so record it
cev := &CollectedEvent{
Entries: ev.Entries,
EmitterAddr: addr,
EventIdx: evIdx,
Reverted: revert,
Height: te.msgTs.Height(),
TipSetKey: te.msgTs.Key(),
MsgCid: em.Message().Cid(),
MsgIdx: msgIdx,
Entries: ev.Entries,
Emitter: ev.Emitter,
EventIdx: evIdx,
Reverted: revert,
Height: te.msgTs.Height(),
TipSetKey: te.msgTs.Key(),
MsgCid: em.Message().Cid(),
MsgIdx: msgIdx,
}

f.mu.Lock()
Expand Down Expand Up @@ -153,7 +145,7 @@ func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) {
f.mu.Unlock()
}

func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
func (f *eventFilter) TakeCollectedEvents(context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
f.collected = nil
Expand Down Expand Up @@ -188,19 +180,24 @@ func (f *eventFilter) matchTipset(te *TipSetEvents) bool {
return true
}

func (f *eventFilter) matchAddress(o address.Address) bool {
// emitters gets the emitter actor IDs that correspond to this filter`s addresses at the given TipSet.
// The returned bool value indicates whether this eventFilter has any addresses or not.
// This is to cover a case where the filter should match emitter actor IDs that may have not been deployed yet,
// i.e. there is at least one address that could not be resolved into its corresponding actor ID.
func (f *eventFilter) emitters(ctx context.Context, ts *types.TipSet) (map[abi.ActorID]struct{}, bool) {
if len(f.addresses) == 0 {
return true
return nil, false
}

// Assume short lists of addresses
// TODO: binary search for longer lists or restrict list length
for _, a := range f.addresses {
if a == o {
return true
emitters := make(map[abi.ActorID]struct{})
for _, addr := range f.addresses {
emitter, err := f.actorResolver(ctx, addr, ts)
if err != nil {
// Cannot match against addr; skip.
continue
}
emitters[emitter] = struct{}{}
}
return false
return emitters, true
}

func (f *eventFilter) matchKeys(ees []types.EventEntry) bool {
Expand Down Expand Up @@ -301,11 +298,11 @@ func (e *executedMessage) Events() []*types.Event {

type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
ActorResolver ActorResolver
MaxFilterResults int
EventIndex *EventIndex

mu sync.Mutex // guards mutations to filters
mu sync.Mutex // guards mutations to filters and currentHeight
filters map[types.FilterID]EventFilter
currentHeight abi.ChainEpoch
}
Expand All @@ -326,14 +323,14 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, false); err != nil {
return err
}
}
Expand All @@ -357,14 +354,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
if err := f.CollectEvents(ctx, tse, true); err != nil {
return err
}
}
Expand All @@ -373,7 +370,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}

func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address,
keysWithCodec map[string][]types.ActorEventBlock, excludeReverted bool) (EventFilter, error) {
keysWithCodec map[string][]types.ActorEventBlock, _ bool) (EventFilter, error) {
m.mu.Lock()
if m.currentHeight == 0 {
// sync in progress, we haven't had an Apply
Expand All @@ -382,7 +379,9 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
currentHeight := m.currentHeight
m.mu.Unlock()

if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight {
requiresHistoricEvents := minHeight != -1 && minHeight < currentHeight

if m.EventIndex == nil && requiresHistoricEvents {
return nil, xerrors.Errorf("historic event index disabled")
}

Expand All @@ -399,12 +398,12 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
addresses: addresses,
keysWithCodec: keysWithCodec,
maxResults: m.MaxFilterResults,
actorResolver: m.ActorResolver,
}

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
return nil, err
if m.EventIndex != nil && requiresHistoricEvents {
if err := m.EventIndex.prefillFilter(ctx, f); err != nil {
return nil, xerrors.Errorf("pre-fill historic events: %w", err)
}
}

Expand All @@ -418,7 +417,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
return f, nil
}

func (m *EventFilterManager) Remove(ctx context.Context, id types.FilterID) error {
func (m *EventFilterManager) Remove(_ context.Context, id types.FilterID) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, found := m.filters[id]; !found {
Expand Down Expand Up @@ -487,8 +486,6 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc
if err != nil {
return nil, xerrors.Errorf("read events: %w", err)
}

}

return ems, nil
}
Loading
Loading