Skip to content

Commit

Permalink
chore(relayer): ensure increnting valsetid per shard (omni-network#1167)
Browse files Browse the repository at this point in the history
Ensure we submit incrementing valSetID per stream. This is enforced by
the portal contract, so relayer should not submit fuzzy attestations
from pervious validator sets. It should stick with finalized until the
new validator set attests to the fuzzy shards.

task: none
  • Loading branch information
corverroos authored Jun 3, 2024
1 parent 02feffa commit 58a1427
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 95 deletions.
4 changes: 2 additions & 2 deletions halo/attest/voter/voter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,11 @@ func (stubProvider) GetBlock(context.Context, xchain.ProviderRequest) (xchain.Bl
panic("unexpected")
}

func (stubProvider) GetSubmittedCursor(context.Context, xchain.StreamID) (xchain.StreamCursor, bool, error) {
func (stubProvider) GetSubmittedCursor(context.Context, xchain.StreamID) (xchain.SubmitCursor, bool, error) {
panic("unexpected")
}

func (stubProvider) GetEmittedCursor(context.Context, xchain.EmitRef, xchain.StreamID) (xchain.StreamCursor, bool, error) {
func (stubProvider) GetEmittedCursor(context.Context, xchain.EmitRef, xchain.StreamID) (xchain.EmitCursor, bool, error) {
panic("unexpected")
}

Expand Down
1 change: 1 addition & 0 deletions lib/xchain/merkle.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (t BlockTree) Proof(header BlockHeader, msgs []Msg) (merkle.MultiProof, err
indices = append(indices, headerIndex)

for _, msg := range msgs {
// TODO(corver): Filter msg shards by block confirmation level
msgLeaf, err := msgLeaf(msg)
if err != nil {
return merkle.MultiProof{}, err
Expand Down
4 changes: 2 additions & 2 deletions lib/xchain/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ type Provider interface {
// or false if not available, or an error.
// Calls the destination chain portal InXStreamOffset method.
// Note this is only supported for EVM chains, no the consensus chain.
GetSubmittedCursor(ctx context.Context, stream StreamID) (StreamCursor, bool, error)
GetSubmittedCursor(ctx context.Context, stream StreamID) (SubmitCursor, bool, error)

// GetEmittedCursor returns the emitted cursor for the provided stream,
// or false if not available, or an error.
// Calls the source chain portal OutXStreamOffset method.
//
// Note that the BlockOffset field is not populated for emit cursors, since it isn't stored on-chain
// but tracked off-chain.
GetEmittedCursor(ctx context.Context, ref EmitRef, stream StreamID) (StreamCursor, bool, error)
GetEmittedCursor(ctx context.Context, ref EmitRef, stream StreamID) (EmitCursor, bool, error)
}

type EmitRef struct {
Expand Down
61 changes: 32 additions & 29 deletions lib/xchain/provider/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,105 +24,108 @@ import (
// Note that the BlockOffset field is not populated for emit cursors, since it isn't stored on-chain
// but tracked off-chain.
func (p *Provider) GetEmittedCursor(ctx context.Context, ref xchain.EmitRef, stream xchain.StreamID,
) (xchain.StreamCursor, bool, error) {
) (xchain.EmitCursor, bool, error) {
if !ref.Valid() {
return xchain.StreamCursor{}, false, errors.New("invalid emit ref")
return xchain.EmitCursor{}, false, errors.New("invalid emit ref")
}

const unknownBlockOffset uint64 = 0
if stream.SourceChainID == p.cChainID {
block, err := getConsXBlock(ctx, ref, p.cProvider)
if err != nil {
return xchain.StreamCursor{}, false, err
return xchain.EmitCursor{}, false, err
}

return xchain.StreamCursor{
StreamID: stream,
MsgOffset: block.Msgs[0].StreamOffset, // Consensus xblocks only have a single xmsg.
BlockOffset: unknownBlockOffset,
return xchain.EmitCursor{
StreamID: stream,
MsgOffset: block.Msgs[0].StreamOffset, // Consensus xblocks only have a single xmsg.
}, true, nil
}

chain, rpcClient, err := p.getEVMChain(stream.SourceChainID)
if err != nil {
return xchain.StreamCursor{}, false, err
return xchain.EmitCursor{}, false, err
}

caller, err := bindings.NewOmniPortalCaller(chain.PortalAddress, rpcClient)
if err != nil {
return xchain.StreamCursor{}, false, errors.Wrap(err, "new caller")
return xchain.EmitCursor{}, false, errors.Wrap(err, "new caller")
}

opts := &bind.CallOpts{Context: ctx}
if ref.Height != nil {
opts.BlockNumber = big.NewInt(int64(*ref.Height))
} else if head, ok := headTypeFromConfLevel(*ref.ConfLevel); !ok {
return xchain.StreamCursor{}, false, errors.New("invalid conf level")
return xchain.EmitCursor{}, false, errors.New("invalid conf level")
} else {
// Populate an explicit block number if not querying latest head.
header, err := rpcClient.HeaderByType(ctx, head)
if err != nil {
return xchain.StreamCursor{}, false, err
return xchain.EmitCursor{}, false, err
}

opts.BlockNumber = header.Number
}

offset, err := caller.OutXMsgOffset(opts, stream.DestChainID, stream.ShardID)
if err != nil {
return xchain.StreamCursor{}, false, errors.Wrap(err, "call inXStreamOffset")
return xchain.EmitCursor{}, false, errors.Wrap(err, "call inXStreamOffset")
}

if offset == 0 {
return xchain.StreamCursor{}, false, nil
return xchain.EmitCursor{}, false, nil
}

return xchain.StreamCursor{
StreamID: stream,
MsgOffset: offset,
BlockOffset: unknownBlockOffset,
return xchain.EmitCursor{
StreamID: stream,
MsgOffset: offset,
}, true, nil
}

// GetSubmittedCursor returns the submitted cursor for the source chain on the destination chain,
// or false if not available, or an error. Calls the destination chain portal InXStreamOffset method.
func (p *Provider) GetSubmittedCursor(ctx context.Context, stream xchain.StreamID,
) (xchain.StreamCursor, bool, error) {
) (xchain.SubmitCursor, bool, error) {
chain, rpcClient, err := p.getEVMChain(stream.DestChainID)
if err != nil {
return xchain.StreamCursor{}, false, err
return xchain.SubmitCursor{}, false, err
}

caller, err := bindings.NewOmniPortalCaller(chain.PortalAddress, rpcClient)
if err != nil {
return xchain.StreamCursor{}, false, errors.Wrap(err, "new caller")
return xchain.SubmitCursor{}, false, errors.Wrap(err, "new caller")
}

height, err := rpcClient.BlockNumber(ctx)
if err != nil {
return xchain.StreamCursor{}, false, err
return xchain.SubmitCursor{}, false, err
}

callOpts := &bind.CallOpts{Context: ctx, BlockNumber: big.NewInt(int64(height))}

msgOffset, err := caller.InXMsgOffset(callOpts, stream.SourceChainID, stream.ShardID)
if err != nil {
return xchain.StreamCursor{}, false, errors.Wrap(err, "call inXStreamOffset")
return xchain.SubmitCursor{}, false, errors.Wrap(err, "call inXStreamOffset")
}

if msgOffset == 0 {
return xchain.StreamCursor{}, false, nil
return xchain.SubmitCursor{}, false, nil
}

blockOffset, err := caller.InXBlockOffset(callOpts, stream.SourceChainID, stream.ShardID)
if err != nil {
return xchain.StreamCursor{}, false, errors.Wrap(err, "call inXStreamBlockHeight")
return xchain.SubmitCursor{}, false, errors.Wrap(err, "call inXStreamBlockHeight")
}

return xchain.StreamCursor{
StreamID: stream,
MsgOffset: msgOffset,
BlockOffset: blockOffset,
valSetID, err := caller.InXStreamValidatorSetId(callOpts, stream.SourceChainID, stream.ShardID)
if err != nil {
return xchain.SubmitCursor{}, false, errors.Wrap(err, "call inXStreamValidatorSetId")
}

return xchain.SubmitCursor{
StreamID: stream,
MsgOffset: msgOffset,
BlockOffset: blockOffset,
ValidatorSetID: valSetID,
}, true, nil
}

Expand Down
8 changes: 4 additions & 4 deletions lib/xchain/provider/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,13 @@ func (m *Mock) GetBlock(_ context.Context, req xchain.ProviderRequest) (xchain.B
}

func (*Mock) GetSubmittedCursor(_ context.Context, stream xchain.StreamID,
) (xchain.StreamCursor, bool, error) {
return xchain.StreamCursor{StreamID: stream}, true, nil
) (xchain.SubmitCursor, bool, error) {
return xchain.SubmitCursor{StreamID: stream}, true, nil
}

func (*Mock) GetEmittedCursor(_ context.Context, _ xchain.EmitRef, stream xchain.StreamID,
) (xchain.StreamCursor, bool, error) {
return xchain.StreamCursor{StreamID: stream}, true, nil
) (xchain.EmitCursor, bool, error) {
return xchain.EmitCursor{StreamID: stream}, true, nil
}

func (m *Mock) parentBlockHash(chainVer xchain.ChainVersion, height uint64) common.Hash {
Expand Down
17 changes: 12 additions & 5 deletions lib/xchain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,16 @@ type Submission struct {
DestChainID uint64 // Destination chain ID, for internal use only
}

// StreamCursor is a cursor that tracks the progress of a cross-chain stream on destination portal contracts.
type StreamCursor struct {
StreamID // Stream ID of the Stream this cursor belongs to
MsgOffset uint64 // Latest applied Msg offset of the Stream
BlockOffset uint64 // Latest applied cross chain block offset
// SubmitCursor is a cursor that tracks the progress of a cross-chain stream on destination portal contracts.
type SubmitCursor struct {
StreamID // Stream ID of the Stream this cursor belongs to
MsgOffset uint64 // Latest submitted Msg offset of the Stream
BlockOffset uint64 // Latest submitted cross chain block offset
ValidatorSetID uint64 // Validator set that submitted the message.
}

// EmitCursor is a cursor that tracks the progress of a cross-chain stream on source portal contracts.
type EmitCursor struct {
StreamID // Stream ID of the Stream this cursor belongs to
MsgOffset uint64 // Latest emitted Msg offset of the Stream
}
104 changes: 70 additions & 34 deletions relayer/app/cursors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const initialXBlockOffset = 1

// getSubmittedCursors returns the last submitted cursor for each source chain on the destination chain.
func getSubmittedCursors(ctx context.Context, network netconf.Network, dstChainID uint64, xClient xchain.Provider,
) ([]xchain.StreamCursor, error) {
var cursors []xchain.StreamCursor //nolint:prealloc // Not worth it.
) ([]xchain.SubmitCursor, error) {
var cursors []xchain.SubmitCursor //nolint:prealloc // Not worth it.
for _, stream := range network.StreamsTo(dstChainID) {
cursor, ok, err := xClient.GetSubmittedCursor(ctx, stream)
if err != nil {
Expand All @@ -36,17 +36,24 @@ func getSubmittedCursors(ctx context.Context, network netconf.Network, dstChainI
// filterMsgs filters messages based on offsets for a specific stream.
// It takes a slice of messages, offsets indexed by stream ID, and the target stream ID,
// and returns a filtered slice containing only messages with offsets greater than the specified offset.
func filterMsgs(ctx context.Context, streamID xchain.StreamID, msgs []xchain.Msg, msgFilter *msgOffsetFilter) ([]xchain.Msg, error) {
func filterMsgs(ctx context.Context, streamID xchain.StreamID, valSetID uint64, msgs []xchain.Msg, msgFilter *msgCursorFilter) ([]xchain.Msg, error) {
backoff := expbackoff.New(ctx)
res := make([]xchain.Msg, 0, len(msgs)) // Res might have over-capacity, but that's fine, we only filter on startup.
for i := 0; i < len(msgs); {
msg := msgs[i]

check, expected := msgFilter.Check(streamID, msg.StreamOffset)
check, cursor := msgFilter.Check(streamID, valSetID, msg.StreamOffset)
if check == checkProcess {
res = append(res, msg)
} else if check == checkOldVatSet {
log.Warn(ctx, "Skipping msg with old valSetID", nil,
"stream", streamID,
"offset", msg.StreamOffset,
"valset", valSetID,
"cursor_valset", cursor.ValsSetID,
)
}
if check != checkGap {
if check != checkGapOffset {
i++
continue // Continue to next message
}
Expand All @@ -56,15 +63,15 @@ func filterMsgs(ctx context.Context, streamID xchain.StreamID, msgs []xchain.Msg
return nil, errors.New("unexpected gap in finalized msg offsets [BUG]",
"stream", streamID,
"offset", msg.StreamOffset,
"expected", expected,
"cursor_offset", cursor.MsgOffset,
)
}

// Re-orgs of fuzzy conf levels are expected and can create gaps, block until ConfFinalized fills the gap.
log.Warn(ctx, "Gap in fuzzy msg offsets, waiting for ConfFinalized", nil,
"stream", streamID,
"offset", msg.StreamOffset,
"expected", expected,
"cursor_offset", cursor.MsgOffset,
)
backoff()
// Retry the same message again
Expand All @@ -76,7 +83,7 @@ func filterMsgs(ctx context.Context, streamID xchain.StreamID, msgs []xchain.Msg
// fromChainVersionOffsets calculates the starting block offsets for all chain versions (to the destination chain).
func fromChainVersionOffsets(
destChainID uint64, // Destination chain ID
cursors []xchain.StreamCursor, // All actual on-chain submit cursors
cursors []xchain.SubmitCursor, // All actual on-chain submit cursors
chainVers []xchain.ChainVersion, // All expected chain versions
state *State, // On-disk local state
) (map[xchain.ChainVersion]uint64, error) {
Expand Down Expand Up @@ -117,54 +124,83 @@ func fromChainVersionOffsets(
return res, nil
}

// msgOffsetFilter is a filter that keeps track of the last processed message offset for each stream.
// It is used to filter out messages that have already been processed.
// msgCursorFilter is a filter that keeps track of the last processed message cursor for each stream.
// It is used to filter out messages that have already been processed or that cannot be submitted otherwise.
//
// More specifically, it ensures that fuzzy msgs are submitted either from fuzzy or finalized attestations, whichever comes first.
type msgOffsetFilter struct {
// It also ensures that valSetID always increases.
type msgCursorFilter struct {
mu sync.Mutex
offsets map[xchain.StreamID]uint64
cursors map[xchain.StreamID]streamCursor
}

type streamCursor struct {
MsgOffset uint64
ValsSetID uint64
}

func newMsgOffsetFilter(cursors []xchain.StreamCursor) *msgOffsetFilter {
offsets := make(map[xchain.StreamID]uint64, len(cursors))
func newMsgOffsetFilter(cursors []xchain.SubmitCursor) (*msgCursorFilter, error) {
streamCursors := make(map[xchain.StreamID]streamCursor, len(cursors))
for _, cursor := range cursors {
offsets[cursor.StreamID] = cursor.MsgOffset
streamCursors[cursor.StreamID] = streamCursor{
MsgOffset: cursor.MsgOffset,
ValsSetID: cursor.ValidatorSetID,
}
}

return &msgOffsetFilter{
offsets: offsets,
if len(streamCursors) != len(cursors) {
return nil, errors.New("unexpected duplicate cursors [BUG]")
}

return &msgCursorFilter{
cursors: streamCursors,
}, nil
}

//go:generate stringer -type=checkResult -trimprefix=check

type checkResult int

const (
// checkProcess indicates that the message offset is sequential and should be processed.
checkProcess checkResult = iota
// checkGap indicates that the message offset is too far ahead and therefore contains a gap.
checkGap
// checkIgnore indicates that the message offset was already processed and should be ignored.
checkIgnore
// checkGapOffset indicates that the message offset is too far ahead and therefore contains a gap.
checkGapOffset
// checkIgnoreOffset indicates that the message offset was already processed and should be ignored.
checkIgnoreOffset
// checkOldVatSet indicates that the message has a lower validator set ID than the last processed message and must be ignored.
checkOldVatSet
)

// Check updates the stream state and returns checkProcess if the provided offset is sequential.
// Otherwise it does not update the state and returns checkGap if the next message is too far ahead,
// or checkIgnore if the next message was already processed.
// It also returns the expected offset for the next message.
func (f *msgOffsetFilter) Check(stream xchain.StreamID, msgOffset uint64) (checkResult, uint64) {
//
// Otherwise, it does not update the state.
// It returns checkGap if the next message is too far ahead,
// or checkIgnore if the next message was already processed,
// or checkOldVatSet if the next message has a lower validator set ID than the last processed message.
//
// It also returns the existing stream cursor.
func (f *msgCursorFilter) Check(stream xchain.StreamID, valSetID uint64, msgOffset uint64) (checkResult, streamCursor) {
f.mu.Lock()
defer f.mu.Unlock()

expect := f.offsets[stream] + 1
if msgOffset > expect {
return checkGap, expect
} else if msgOffset < expect {
return checkIgnore, expect
cursor := f.cursors[stream]

expectOffset := cursor.MsgOffset + 1
if msgOffset > expectOffset {
return checkGapOffset, cursor
} else if msgOffset < expectOffset {
return checkIgnoreOffset, cursor
}

// Update the offset
f.offsets[stream] = msgOffset
if valSetID < cursor.ValsSetID {
return checkOldVatSet, cursor
}

// Update the cursor
f.cursors[stream] = streamCursor{
MsgOffset: msgOffset,
ValsSetID: valSetID,
}

return checkProcess, expect
return checkProcess, cursor
}
Loading

0 comments on commit 58a1427

Please sign in to comment.