Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
akaladarshi committed Dec 8, 2024
1 parent 17ab3d3 commit 37c3f85
Showing 1 changed file with 79 additions and 62 deletions.
141 changes: 79 additions & 62 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"

"github.com/filecoin-project/lotus/chain/types"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
)

var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
var (
ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = fmt.Errorf("range end is in the future")
)

const maxLookBackForWait = 120 // one hour of tipsets

Expand Down Expand Up @@ -236,83 +238,98 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
return ems, nil
}

// checkTipsetIndexedStatus verifies if a specific tipset is indexed based on the EventFilter.
// It returns nil if the tipset is indexed, ErrNotFound if it's not indexed or not specified,
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventFilter) error {
var tipsetKeyCid []byte
var err error

// Determine the tipset to check based on the filter
switch {
case f.TipsetCid != cid.Undef:
tipsetKeyCid = f.TipsetCid.Bytes()
case f.MinHeight >= 0 && f.MinHeight == f.MaxHeight:
tipsetKeyCid, err = si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
if err != nil {
if err == ErrNotFound {
// this means that this is a null round and there exist no events for this epoch
return nil
}
// checkRangeIndexedStatus verifies if a range of heights is indexed.
// It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
minHeight := f.MinHeight
maxHeight := f.MaxHeight

return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
// Find the first non-null round in the range
startCid, err := si.findFirstNonNullRound(ctx, &minHeight, maxHeight)
if err != nil {
return xerrors.Errorf("failed to find first non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if startCid == nil {
return nil
}

// Find the last non-null round in the range
endCid, err := si.findLastNonNullRound(ctx, &maxHeight, minHeight)
if err != nil {
if errors.Is(err, ErrRangeInFuture) {
return xerrors.Errorf("range end is in the future: %w", err)
}
case f.MinHeight >= 0 && f.MaxHeight >= 0 && f.MinHeight != f.MaxHeight:
return si.checkRangeIndexedStatus(ctx, f)
default:
// This function distinguishes between two scenarios:
// 1. Missing events: The requested tipset is not present in the Index (an error condition).
// 2. Valid case: The tipset exists but contains no events (a normal situation).
// Currently, this distinction is only made for the common use case where a user requests events for a single tipset.
// TODO: Implement this functionality for a range of tipsets. This is expensive and not a common use case so it's deferred for now.
return xerrors.Errorf("failed to find last non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if endCid == nil {
return nil
}

// If we couldn't determine a specific tipset, return ErrNotFound
if tipsetKeyCid == nil {
return ErrNotFound
// Check indexing for start and end tipsets
if err := si.checkTipsetByKeyCid(ctx, startCid, minHeight); err != nil {
return err
}

// Check if the determined tipset is indexed
if exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid); err != nil {
return xerrors.Errorf("failed to check if tipset is indexed: %w", err)
} else if exists {
return nil // Tipset is indexed
if err := si.checkTipsetByKeyCid(ctx, endCid, maxHeight); err != nil {
return err
}

return ErrNotFound // Tipset is not indexed
return nil
}

func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
startCid, err := si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
// checkTipsetByKeyCid checks if a tipset identified by its key CID is indexed.
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
if errors.Is(err, ErrNotFound) {
// Null round for the start of the range is acceptable
return nil
}
return xerrors.Errorf("failed to get tipset key cid for start height: %w", err)
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
}

endCid, err := si.getTipsetKeyCidByHeight(ctx, f.MaxHeight)
if err != nil {
if errors.Is(err, ErrNotFound) {
// Check if the range ends in the future
head := si.cs.GetHeaviestTipSet()
if head == nil || f.MaxHeight > head.Height() {
return xerrors.Errorf("range end is in the future: %w", err)
}
return ErrNotFound // End is unindexed
if exists {
return nil // null round
}

return ErrNotFound // tipset is not indexed
}

// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight *abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, error) {
for height := *minHeight; height <= maxHeight; height++ {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*minHeight = height // Update the minHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
return xerrors.Errorf("failed to get tipset key cid for end height: %w", err)
}

if exists, err := si.isTipsetIndexed(ctx, startCid); err != nil || !exists {
return xerrors.Errorf("start tipset not indexed: %w", err)
return nil, nil
}

// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight *abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, error) {
head := si.cs.GetHeaviestTipSet()
if head == nil || *maxHeight > head.Height() {
return nil, ErrRangeInFuture
}
if exists, err := si.isTipsetIndexed(ctx, endCid); err != nil || !exists {
return xerrors.Errorf("end tipset not indexed: %w", err)

for height := *maxHeight; height >= minHeight; height-- {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*maxHeight = height // Update the maxHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return nil
return nil, nil
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
Expand Down Expand Up @@ -495,7 +512,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
// if the height is old enough, we'll assume the index is caught up to it and not bother
// waiting for it to be indexed
if height <= maxLookBackHeight {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand All @@ -509,7 +526,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
}

if len(ces) == 0 {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand Down

0 comments on commit 37c3f85

Please sign in to comment.