Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check eth events indexed in range #12728

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The Lotus v1.31.0 release introduces the new `ChainIndexer` subsystem, enhancing
- Make the ordering of event output for `eth_` APIs and `GetActorEventsRaw` consistent, sorting ascending on: epoch, message index, event index and original event entry order. ([filecoin-project/lotus#12623](https://github.com/filecoin-project/lotus/pull/12623))
- Return a consistent error when encountering null rounds in ETH RPC method calls. ([filecoin-project/lotus#12655](https://github.com/filecoin-project/lotus/pull/12655))
- Correct erroneous sector QAP-calculation upon sector extension in lotus-miner cli. ([filecoin-project/lotus#12720](https://github.com/filecoin-project/lotus/pull/12720))
- Return error if logs or events within range are not indexed. ([filecoin-project/lotus#12728](https://github.com/filecoin-project/lotus/pull/12728))


## 📝 Changelog

Expand Down
1 change: 0 additions & 1 deletion chain/index/ddls.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func preparedStatementMapping(ps *preparedStatements) map[**sql.Stmt]string {
&ps.getMsgCidFromEthHashStmt: "SELECT message_cid FROM eth_tx_hash WHERE tx_hash = ? LIMIT 1",
&ps.insertEthTxHashStmt: "INSERT INTO eth_tx_hash (tx_hash, message_cid) VALUES (?, ?) ON CONFLICT (tx_hash) DO UPDATE SET inserted_at = CURRENT_TIMESTAMP",
&ps.insertTipsetMessageStmt: "INSERT INTO tipset_message (tipset_key_cid, height, reverted, message_cid, message_index) VALUES (?, ?, ?, ?, ?) ON CONFLICT (tipset_key_cid, message_cid) DO UPDATE SET reverted = 0",
&ps.hasTipsetStmt: "SELECT EXISTS(SELECT 1 FROM tipset_message WHERE tipset_key_cid = ?)",
&ps.updateTipsetToNonRevertedStmt: "UPDATE tipset_message SET reverted = 0 WHERE tipset_key_cid = ?",
&ps.updateTipsetToRevertedStmt: "UPDATE tipset_message SET reverted = 1 WHERE tipset_key_cid = ?",
&ps.removeTipsetsBeforeHeightStmt: "DELETE FROM tipset_message WHERE height < ?",
Expand Down
128 changes: 90 additions & 38 deletions chain/index/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"

"github.com/filecoin-project/lotus/chain/types"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
)

var ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
var (
ErrMaxResultsReached = fmt.Errorf("filter matches too many events, try a more restricted filter")
ErrRangeInFuture = fmt.Errorf("range end is in the future")
)

const maxLookBackForWait = 120 // one hour of tipsets

Expand Down Expand Up @@ -236,48 +238,98 @@ func loadExecutedMessages(ctx context.Context, cs ChainStore, recomputeTipSetSta
return ems, nil
}

// checkTipsetIndexedStatus verifies if a specific tipset is indexed based on the EventFilter.
// It returns nil if the tipset is indexed, ErrNotFound if it's not indexed or not specified,
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, f *EventFilter) error {
var tipsetKeyCid []byte
var err error

// Determine the tipset to check based on the filter
switch {
case f.TipsetCid != cid.Undef:
tipsetKeyCid = f.TipsetCid.Bytes()
case f.MinHeight >= 0 && f.MinHeight == f.MaxHeight:
tipsetKeyCid, err = si.getTipsetKeyCidByHeight(ctx, f.MinHeight)
if err != nil {
if err == ErrNotFound {
// this means that this is a null round and there exist no events for this epoch
return nil
}
Comment on lines -246 to -255
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you've lost all of this logic in the process of refactoring, so we have a couple of problems now:

  1. We're not testing the case where the filter specifies a specific TipsetCid - which we can do directly without needing to look at the range (case 1 above)
  2. We're not taking the shortcut path for the case where the filter specifies a min==max, that one is easy because it's just one tipset to test (case 2 above)
  3. Your new code just jumps straight in and assumes that f.MinHeight and f.MaxHeight are useful, but that may not be the case. They're allowed to have -1 values, which signifies that neither have been specified (we do that in
    func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRange abi.ChainEpoch) (minHeight abi.ChainEpoch, maxHeight abi.ChainEpoch, err error) {
    ). Which is why the >= 0 check existed in the original case.

So I think you'll need to restore some form of the original switch to accommodate those two cases above and then for the third case where f.MinHeight >= 0 && f.MaxHeight >= 0, and maybe && f.MinHeight < f.MaxHeight just for good measure, just to be defensive about possible inputs.

// checkRangeIndexedStatus verifies if a range of heights is indexed.
// It checks for the existence of non-null rounds at the range boundaries.
func (si *SqliteIndexer) checkRangeIndexedStatus(ctx context.Context, f *EventFilter) error {
minHeight := f.MinHeight
maxHeight := f.MaxHeight

return xerrors.Errorf("failed to get tipset key cid by height: %w", err)
// Find the first non-null round in the range
startCid, err := si.findFirstNonNullRound(ctx, &minHeight, maxHeight)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of the pointer to update it here is clever, but a little too non-obvious from a maintenance perspective, there's no obvious notice here it gets altered. I think you'd be better off returning a new minHeight along with the cid. You'll have to var startCid, endCid cid.Cid and not use := for these calls though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[]byte sorry, not cid.Cid

which is another point - not being a CID, let's be careful with naming of these things, startKeyBytes perhaps?

if err != nil {
return xerrors.Errorf("failed to find first non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if startCid == nil {
return nil
}

// Find the last non-null round in the range
endCid, err := si.findLastNonNullRound(ctx, &maxHeight, minHeight)
if err != nil {
if errors.Is(err, ErrRangeInFuture) {
return xerrors.Errorf("range end is in the future: %w", err)
}
default:
// This function distinguishes between two scenarios:
// 1. Missing events: The requested tipset is not present in the Index (an error condition).
// 2. Valid case: The tipset exists but contains no events (a normal situation).
// Currently, this distinction is only made for the common use case where a user requests events for a single tipset.
// TODO: Implement this functionality for a range of tipsets. This is expensive and not a common use case so it's deferred for now.
return xerrors.Errorf("failed to find last non-null round: %w", err)
}

// If all rounds are null, consider the range valid
if endCid == nil {
Comment on lines +267 to +268
Copy link
Member

@rvagg rvagg Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oo, interesting case, you should never get here, right, since we do the walk for startCid first? If you can't come up with a plausible way to get here then I think we should error here and say something about this being "unexpected" (we want someone to report this error if they ever stumble upon it)

return nil
}

// If we couldn't determine a specific tipset, return ErrNotFound
if tipsetKeyCid == nil {
return ErrNotFound
// Check indexing for start and end tipsets
if err := si.checkTipsetByKeyCid(ctx, startCid, minHeight); err != nil {
return err
rvagg marked this conversation as resolved.
Show resolved Hide resolved
}

// Check if the determined tipset is indexed
if exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid); err != nil {
return xerrors.Errorf("failed to check if tipset is indexed: %w", err)
} else if exists {
return nil // Tipset is indexed
if err := si.checkTipsetByKeyCid(ctx, endCid, maxHeight); err != nil {
return err
}

return nil
}

// checkTipsetByKeyCid checks if a tipset identified by its key CID is indexed.
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
}

if exists {
return nil // null round
}

return ErrNotFound // tipset is not indexed
}
Comment on lines +285 to +296
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure the height is necessary here, but the main thing I want to do is clarify what this is for. How about we rename this one checkTipsetIndexedStatus since you've dropped it form its parent

Suggested change
func (si *SqliteIndexer) checkTipsetByKeyCid(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
}
if exists {
return nil // null round
}
return ErrNotFound // tipset is not indexed
}
func (si *SqliteIndexer) checkTipsetIndexedStatus(ctx context.Context, tipsetKeyCid []byte, height abi.ChainEpoch) error {
exists, err := si.isTipsetIndexed(ctx, tipsetKeyCid)
if err != nil {
return xerrors.Errorf("failed to check if tipset at height %d is indexed: %w", height, err)
} else if exists {
return nil // has been indexed
}
return ErrNotFound
}


// findFirstNonNullRound finds the first non-null round starting from minHeight up to maxHeight
func (si *SqliteIndexer) findFirstNonNullRound(ctx context.Context, minHeight *abi.ChainEpoch, maxHeight abi.ChainEpoch) ([]byte, error) {
for height := *minHeight; height <= maxHeight; height++ {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*minHeight = height // Update the minHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
Comment on lines +301 to +308
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's switch this for readability, since programmers are used to seeing errors handled first (but not always, just the majority of the time), we introduce a cognitive hiccup when we switch things around; but if we keep it close to the expected flow then it's easier to skim

Suggested change
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*minHeight = height // Update the minHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err != nil {
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
// else null round, keep searching
continue
}
*minHeight = height // Update the minHeight to the found height
return cid, nil

}

return nil, nil
}

// findLastNonNullRound finds the last non-null round starting from maxHeight down to minHeight
func (si *SqliteIndexer) findLastNonNullRound(ctx context.Context, maxHeight *abi.ChainEpoch, minHeight abi.ChainEpoch) ([]byte, error) {
head := si.cs.GetHeaviestTipSet()
if head == nil || *maxHeight > head.Height() {
return nil, ErrRangeInFuture
}

for height := *maxHeight; height >= minHeight; height-- {
cid, err := si.getTipsetKeyCidByHeight(ctx, height)
if err == nil {
*maxHeight = height // Update the maxHeight to the found height
return cid, nil
}
if !errors.Is(err, ErrNotFound) {
return nil, xerrors.Errorf("failed to get tipset key cid for height %d: %w", height, err)
}
}

return ErrNotFound // Tipset is not indexed
return nil, nil
}

// getTipsetKeyCidByHeight retrieves the tipset key CID for a given height.
Expand Down Expand Up @@ -460,7 +512,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
// if the height is old enough, we'll assume the index is caught up to it and not bother
// waiting for it to be indexed
if height <= maxLookBackHeight {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand All @@ -474,7 +526,7 @@ func (si *SqliteIndexer) GetEventsForFilter(ctx context.Context, f *EventFilter)
}

if len(ces) == 0 {
return nil, si.checkTipsetIndexedStatus(ctx, f)
return nil, si.checkRangeIndexedStatus(ctx, f)
}
}

Expand Down
Loading