Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeboosted information to broadcast feed #2695

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
72de9d2
Add timeboosted field to broadcast feed
ganeshvanahalli Sep 24, 2024
97f7c18
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
ganeshvanahalli Oct 2, 2024
1781ae3
change timeboosted byte array calculation
ganeshvanahalli Oct 2, 2024
a713900
address PR comments
ganeshvanahalli Oct 2, 2024
811fd19
define IsTxTimeboosted on the BlockMetadata type
ganeshvanahalli Oct 2, 2024
b67b08b
only write blockMetadata to db when its non-nil, prevents erasing of …
ganeshvanahalli Oct 11, 2024
b8890a1
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 11, 2024
572d4b2
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 11, 2024
5fda4c4
use arbostypes.BlockMetadata return type instead of []byte for BlockM…
ganeshvanahalli Oct 14, 2024
a5b3eea
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 14, 2024
13a9e0e
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 14, 2024
d3cd8f5
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 15, 2024
a19acb6
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 15, 2024
0aa68ef
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 16, 2024
bf8adc5
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Oct 18, 2024
86ee92b
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
ganeshvanahalli Oct 24, 2024
41d7436
merge express-lane-timeboost and resolve conflicts
ganeshvanahalli Nov 12, 2024
bc93214
address PR comments
ganeshvanahalli Nov 21, 2024
06b94b0
merge upstream and resolve conflicts
ganeshvanahalli Nov 21, 2024
e9fc4a7
merge upstream and resolve conflicts
ganeshvanahalli Dec 26, 2024
d87c63c
fix lint error
ganeshvanahalli Dec 26, 2024
910a375
address PR comments
ganeshvanahalli Dec 26, 2024
54be46a
Merge branch 'express-lane-timeboost' into add-timeboosted-broadcastf…
Tristan-Wilson Dec 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,12 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
blockHash = &msgResult.BlockHash
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash)
blockMetadata, err := t.txStreamer.BlockMetadataAtCount(seqNum + 1)
if err != nil {
log.Warn("Error getting blockMetadata byte array from tx streamer", "err", err)
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash, blockMetadata)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down
4 changes: 2 additions & 2 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,8 +1047,8 @@ func (n *Node) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex,
return n.InboxReader.GetFinalizedMsgCount(ctx)
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata arbostypes.BlockMetadata) error {
return n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata)
}

func (n *Node) ExpectChosenSequencer() error {
Expand Down
1 change: 1 addition & 0 deletions arbnode/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package arbnode
var (
messagePrefix []byte = []byte("m") // maps a message sequence number to a message
blockHashInputFeedPrefix []byte = []byte("b") // maps a message sequence number to a block hash received through the input feed
blockMetadataInputFeedPrefix []byte = []byte("t") // maps a message sequence number to a blockMetaData byte array received through the input feed
messageResultPrefix []byte = []byte("r") // maps a message sequence number to a message result
legacyDelayedMessagePrefix []byte = []byte("d") // maps a delayed sequence number to an accumulator and a message as serialized on L1
rlpDelayedMessagePrefix []byte = []byte("e") // maps a delayed sequence number to an accumulator and an RLP encoded message
Expand Down
106 changes: 77 additions & 29 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type TransactionStreamer struct {

nextAllowedFeedReorgLog time.Time

broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockHash
broadcasterQueuedMessages []arbostypes.MessageWithMetadataAndBlockInfo
broadcasterQueuedMessagesPos atomic.Uint64
broadcasterQueuedMessagesActiveReorg bool

Expand Down Expand Up @@ -263,7 +263,7 @@ func deleteFromRange(ctx context.Context, db ethdb.Database, prefix []byte, star

// The insertion mutex must be held. This acquires the reorg mutex.
// Note: oldMessages will be empty if reorgHook is nil
func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash) error {
func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo) error {
if count == 0 {
return errors.New("cannot reorg out init message")
}
Expand Down Expand Up @@ -358,9 +358,9 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
return err
}

messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messagesResults))
messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockInfo, 0, len(messagesResults))
for i := 0; i < len(messagesResults); i++ {
messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockHash{
messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: newMessages[i].MessageWithMeta,
BlockHash: &messagesResults[i].BlockHash,
})
Expand All @@ -382,6 +382,10 @@ func (s *TransactionStreamer) reorg(batch ethdb.Batch, count arbutil.MessageInde
if err != nil {
return err
}
err = deleteStartingAt(s.db, batch, blockMetadataInputFeedPrefix, uint64ToKey(uint64(count)))
if err != nil {
return err
}
err = deleteStartingAt(s.db, batch, messagePrefix, uint64ToKey(uint64(count)))
if err != nil {
return err
Expand Down Expand Up @@ -448,7 +452,7 @@ func (s *TransactionStreamer) GetMessage(seqNum arbutil.MessageIndex) (*arbostyp
return &message, nil
}

func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadataAndBlockHash, error) {
func (s *TransactionStreamer) getMessageWithMetadataAndBlockInfo(seqNum arbutil.MessageIndex) (*arbostypes.MessageWithMetadataAndBlockInfo, error) {
msg, err := s.GetMessage(seqNum)
if err != nil {
return nil, err
Expand All @@ -471,11 +475,21 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockHash(seqNum arbutil.
return nil, err
}

msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
key = dbKey(blockMetadataInputFeedPrefix, uint64(seqNum))
blockMetadata, err := s.db.Get(key)
if err != nil {
if !dbutil.IsErrNotFound(err) {
return nil, err
}
blockMetadata = nil
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: *msg,
BlockHash: blockHash,
BlockMetadata: blockMetadata,
}
return &msgWithBlockHash, nil
return &msgWithBlockInfo, nil
}

// Note: if changed to acquire the mutex, some internal users may need to be updated to a non-locking version.
Expand Down Expand Up @@ -531,7 +545,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
return nil
}
broadcastStartPos := feedMessages[0].SequenceNumber
var messages []arbostypes.MessageWithMetadataAndBlockHash
var messages []arbostypes.MessageWithMetadataAndBlockInfo
broadcastAfterPos := broadcastStartPos
for _, feedMessage := range feedMessages {
if broadcastAfterPos != feedMessage.SequenceNumber {
Expand All @@ -540,11 +554,12 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*m.BroadcastFe
if feedMessage.Message.Message == nil || feedMessage.Message.Message.Header == nil {
return fmt.Errorf("invalid feed message at sequence number %v", feedMessage.SequenceNumber)
}
msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: feedMessage.Message,
BlockHash: feedMessage.BlockHash,
BlockMetadata: feedMessage.BlockMetadata,
}
messages = append(messages, msgWithBlockHash)
messages = append(messages, msgWithBlockInfo)
broadcastAfterPos++
}

Expand Down Expand Up @@ -665,9 +680,9 @@ func endBatch(batch ethdb.Batch) error {
}

func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadata, batch ethdb.Batch) error {
messagesWithBlockHash := make([]arbostypes.MessageWithMetadataAndBlockHash, 0, len(messages))
messagesWithBlockInfo := make([]arbostypes.MessageWithMetadataAndBlockInfo, 0, len(messages))
for _, message := range messages {
messagesWithBlockHash = append(messagesWithBlockHash, arbostypes.MessageWithMetadataAndBlockHash{
messagesWithBlockInfo = append(messagesWithBlockInfo, arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: message,
})
}
Expand All @@ -676,7 +691,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
// Trim confirmed messages from l1pricedataCache
s.exec.MarkFeedStart(pos + arbutil.MessageIndex(len(messages)))
s.reorgMutex.RLock()
dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockHash, nil)
dups, _, _, err := s.countDuplicateMessages(pos, messagesWithBlockInfo, nil)
s.reorgMutex.RUnlock()
if err != nil {
return err
Expand All @@ -693,7 +708,7 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(pos arbutil.MessageIndex, m
s.insertionMutex.Lock()
defer s.insertionMutex.Unlock()

return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockHash, batch)
return s.addMessagesAndEndBatchImpl(pos, messagesAreConfirmed, messagesWithBlockInfo, batch)
}

func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (uint64, error) {
Expand All @@ -714,7 +729,7 @@ func (s *TransactionStreamer) getPrevPrevDelayedRead(pos arbutil.MessageIndex) (

func (s *TransactionStreamer) countDuplicateMessages(
pos arbutil.MessageIndex,
messages []arbostypes.MessageWithMetadataAndBlockHash,
messages []arbostypes.MessageWithMetadataAndBlockInfo,
batch *ethdb.Batch,
) (uint64, bool, *arbostypes.MessageWithMetadata, error) {
var curMsg uint64
Expand Down Expand Up @@ -808,7 +823,7 @@ func (s *TransactionStreamer) logReorg(pos arbutil.MessageIndex, dbMsg *arbostyp

}

func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
func (s *TransactionStreamer) addMessagesAndEndBatchImpl(messageStartPos arbutil.MessageIndex, messagesAreConfirmed bool, messages []arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error {
var confirmedReorg bool
var oldMsg *arbostypes.MessageWithMetadata
var lastDelayedRead uint64
Expand Down Expand Up @@ -952,6 +967,7 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
pos arbutil.MessageIndex,
msgWithMeta arbostypes.MessageWithMetadata,
msgResult execution.MessageResult,
blockMetadata arbostypes.BlockMetadata,
) error {
if err := s.ExpectChosenSequencer(); err != nil {
return err
Expand All @@ -976,15 +992,16 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
}
}

msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgWithMeta,
BlockHash: &msgResult.BlockHash,
BlockMetadata: blockMetadata,
}

if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, nil); err != nil {
if err := s.writeMessages(pos, []arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, nil); err != nil {
return err
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, pos)

return nil
}
Expand All @@ -1005,7 +1022,7 @@ func (s *TransactionStreamer) PopulateFeedBacklog() error {
return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer)
}

func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error {
// write message with metadata
key := dbKey(messagePrefix, uint64(pos))
msgBytes, err := rlp.EncodeToBytes(msg.MessageWithMeta)
Expand All @@ -1025,11 +1042,22 @@ func (s *TransactionStreamer) writeMessage(pos arbutil.MessageIndex, msg arbosty
if err != nil {
return err
}
return batch.Put(key, msgBytes)
if err := batch.Put(key, msgBytes); err != nil {
return err
}

if msg.BlockMetadata != nil {
// Only store non-nil BlockMetadata to db. In case of a reorg, we dont have to explicitly
// clear out BlockMetadata of the reorged message, since those messages will be handled by s.reorg()
// This also allows update of BatchGasCost in message without mistakenly erasing BlockMetadata
key = dbKey(blockMetadataInputFeedPrefix, uint64(pos))
return batch.Put(key, msg.BlockMetadata)
}
return nil
}

func (s *TransactionStreamer) broadcastMessages(
msgs []arbostypes.MessageWithMetadataAndBlockHash,
msgs []arbostypes.MessageWithMetadataAndBlockInfo,
pos arbutil.MessageIndex,
) {
if s.broadcastServer == nil {
Expand All @@ -1042,7 +1070,7 @@ func (s *TransactionStreamer) broadcastMessages(

// The mutex must be held, and pos must be the latest message count.
// `batch` may be nil, which initializes a new batch. The batch is closed out in this function.
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockHash, batch ethdb.Batch) error {
func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error {
if batch == nil {
batch = s.db.NewBatch()
}
Expand Down Expand Up @@ -1071,6 +1099,23 @@ func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages [
return nil
}

func (s *TransactionStreamer) BlockMetadataAtCount(count arbutil.MessageIndex) (arbostypes.BlockMetadata, error) {
if count == 0 {
return nil, nil
}
pos := count - 1

key := dbKey(blockMetadataInputFeedPrefix, uint64(pos))
blockMetadata, err := s.db.Get(key)
if err != nil {
if dbutil.IsErrNotFound(err) {
return nil, nil
}
return nil, err
}
ganeshvanahalli marked this conversation as resolved.
Show resolved Hide resolved
return blockMetadata, nil
}

func (s *TransactionStreamer) ResultAtCount(count arbutil.MessageIndex) (*execution.MessageResult, error) {
if count == 0 {
return &execution.MessageResult{}, nil
Expand Down Expand Up @@ -1163,7 +1208,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
if pos >= msgCount {
return false
}
msgAndBlockHash, err := s.getMessageWithMetadataAndBlockHash(pos)
msgAndBlockInfo, err := s.getMessageWithMetadataAndBlockInfo(pos)
if err != nil {
log.Error("feedOneMsg failed to readMessage", "err", err, "pos", pos)
return false
Expand All @@ -1177,7 +1222,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
}
msgForPrefetch = msg
}
msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockHash.MessageWithMeta, msgForPrefetch)
msgResult, err := s.exec.DigestMessage(pos, &msgAndBlockInfo.MessageWithMeta, msgForPrefetch)
if err != nil {
logger := log.Warn
if prevMessageCount < msgCount {
Expand All @@ -1187,7 +1232,8 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
return false
}

s.checkResult(msgResult, msgAndBlockHash.BlockHash)
// we just log the error but not update the value in db itself with msgResult.BlockHash? and instead forward the new block hash
s.checkResult(msgResult, msgAndBlockInfo.BlockHash)

batch := s.db.NewBatch()
err = s.storeResult(pos, *msgResult, batch)
Expand All @@ -1201,11 +1247,13 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
return false
}

msgWithBlockHash := arbostypes.MessageWithMetadataAndBlockHash{
MessageWithMeta: msgAndBlockHash.MessageWithMeta,
msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgAndBlockInfo.MessageWithMeta,
BlockHash: &msgResult.BlockHash,
// maybe if blockhash is differing we clear out previous timeboosted and not send timeboosted info to broadcasting?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. If blockHash doesn't agree clear BlockMetadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I think the best place to implement this is in Bulk syncing of missing BlockMetadata PR, since it tracks missing blockMetadata and in there we can clear out blockMetadata and mark it as missing in DB so that it will be refetched later on

BlockMetadata: msgAndBlockInfo.BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockHash{msgWithBlockHash}, pos)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, pos)

return pos+1 < msgCount
}
Expand Down
21 changes: 20 additions & 1 deletion arbos/arbostypes/messagewithmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package arbostypes
import (
"context"
"encoding/binary"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,9 +20,12 @@ type MessageWithMetadata struct {
DelayedMessagesRead uint64 `json:"delayedMessagesRead"`
}

type MessageWithMetadataAndBlockHash struct {
type BlockMetadata []byte

type MessageWithMetadataAndBlockInfo struct {
MessageWithMeta MessageWithMetadata
BlockHash *common.Hash
BlockMetadata BlockMetadata
}

var EmptyTestMessageWithMetadata = MessageWithMetadata{
Expand All @@ -33,6 +37,21 @@ var TestMessageWithMetadataAndRequestId = MessageWithMetadata{
Message: &TestIncomingMessageWithRequestId,
}

// IsTxTimeboosted given a tx's index in the block returns whether the tx was timeboosted or not
func (b BlockMetadata) IsTxTimeboosted(txIndex int) (bool, error) {
if len(b) == 0 {
return false, errors.New("blockMetadata is not set")
}
if txIndex < 0 {
return false, fmt.Errorf("invalid transaction index- %d, should be positive", txIndex)
}
maxTxCount := (len(b) - 1) * 8
if txIndex >= maxTxCount {
return false, nil
}
return b[1+(txIndex/8)]&(1<<(txIndex%8)) != 0, nil
}

func (m *MessageWithMetadata) Hash(sequenceNumber arbutil.MessageIndex, chainId uint64) (common.Hash, error) {
serializedExtraData := make([]byte, 24)
binary.BigEndian.PutUint64(serializedExtraData[:8], uint64(sequenceNumber))
Expand Down
Loading
Loading