Skip to content

Commit

Permalink
extended MiniblockMetada structe
Browse files Browse the repository at this point in the history
  • Loading branch information
miiu96 committed Dec 20, 2024
1 parent 54bce4b commit c054556
Show file tree
Hide file tree
Showing 5 changed files with 599 additions and 51 deletions.
65 changes: 56 additions & 9 deletions dblookupext/historyRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ type historyRepository struct {
}

type notarizedNotification struct {
metaNonce uint64
metaHash []byte
metaNonce uint64
metaHash []byte
isPartialExecuted bool
firstProcessed int32
lastProcessed int32
}

// NewHistoryRepository will create a new instance of HistoryRepository
Expand Down Expand Up @@ -379,9 +382,19 @@ func (hr *historyRepository) onNotarizedMiniblock(metaBlockNonce uint64, metaBlo
metaHash: metaBlockHash,
})
} else if isNotarizedAtDestination {
var firstProcessed, lastProcessed int32
isPartialExecuted := miniblockHeader.GetConstructionState() == int32(block.PartialExecuted)
if isPartialExecuted {
firstProcessed = miniblockHeader.GetIndexOfFirstTxProcessed()
lastProcessed = miniblockHeader.GetIndexOfLastTxProcessed()
}

hr.pendingNotarizedAtDestinationNotifications.Set(string(miniblockHash), &notarizedNotification{
metaNonce: metaBlockNonce,
metaHash: metaBlockHash,
metaNonce: metaBlockNonce,
metaHash: metaBlockHash,
isPartialExecuted: isPartialExecuted,
firstProcessed: firstProcessed,
lastProcessed: lastProcessed,
})
} else {
log.Error("onNotarizedMiniblock(): unexpected condition, notification not understood")
Expand Down Expand Up @@ -411,10 +424,7 @@ func (hr *historyRepository) consumePendingNotificationsWithLock() {
metadata.NotarizedAtSourceInMetaHash = notification.metaHash
})

hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtDestinationNotifications, func(metadata *MiniblockMetadata, notification *notarizedNotification) {
metadata.NotarizedAtDestinationInMetaNonce = notification.metaNonce
metadata.NotarizedAtDestinationInMetaHash = notification.metaHash
})
hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtDestinationNotifications, updateMetadataPendingNotarizationOnDestination)

hr.consumePendingNotificationsNoLock(hr.pendingNotarizedAtBothNotifications, func(metadata *MiniblockMetadata, notification *notarizedNotification) {
metadata.NotarizedAtSourceInMetaNonce = notification.metaNonce
Expand All @@ -430,6 +440,44 @@ func (hr *historyRepository) consumePendingNotificationsWithLock() {
)
}

func updateMetadataPendingNotarizationOnDestination(metadata *MiniblockMetadata, notification *notarizedNotification) {
if !notification.isPartialExecuted {
metadata.NotarizedAtDestinationInMetaNonce = notification.metaNonce
metadata.NotarizedAtDestinationInMetaHash = notification.metaHash

return
}

if len(metadata.PartialExecutionInfo) == 0 {
metadata.PartialExecutionInfo = []*PartialExecutionInfo{
{
NotarizedAtDestinationInMetaHash: notification.metaHash,
NotarizedAtDestinationMetaNonce: notification.metaNonce,
LastProcessedTxIndex: notification.lastProcessed,
},
}

return
}

// check for rollbacks
for len(metadata.PartialExecutionInfo) > 0 {
lastElement := metadata.PartialExecutionInfo[len(metadata.PartialExecutionInfo)-1]
if notification.firstProcessed <= lastElement.LastProcessedTxIndex {
// it was a rollback we should remove the last element from array
metadata.PartialExecutionInfo = metadata.PartialExecutionInfo[:len(metadata.PartialExecutionInfo)-1]
} else {
break
}
}

metadata.PartialExecutionInfo = append(metadata.PartialExecutionInfo, &PartialExecutionInfo{
NotarizedAtDestinationInMetaHash: notification.metaHash,
NotarizedAtDestinationMetaNonce: notification.metaNonce,
LastProcessedTxIndex: notification.lastProcessed,
})
}

func (hr *historyRepository) consumePendingNotificationsNoLock(pendingMap *container.MutexMap, patchMetadataFunc func(*MiniblockMetadata, *notarizedNotification)) {
for _, key := range pendingMap.Keys() {
notification, ok := pendingMap.Get(key)
Expand All @@ -442,7 +490,6 @@ func (hr *historyRepository) consumePendingNotificationsNoLock(pendingMap *conta
log.Error("consumePendingNotificationsNoLock(): bad key", "key", key)
continue
}

notificationTyped, ok := notification.(*notarizedNotification)
if !ok {
log.Error("consumePendingNotificationsNoLock(): bad value", "value", fmt.Sprintf("%T", notification))
Expand Down
114 changes: 113 additions & 1 deletion dblookupext/historyRepository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dblookupext

import (
"errors"
"github.com/multiversx/mx-chain-core-go/marshal"
"sync"
"testing"

Expand Down Expand Up @@ -259,9 +260,18 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
ReceiverShardID: 13,
TxHashes: [][]byte{[]byte("txC")},
}
partialExecutedMiniblock := &block.MiniBlock{
SenderShardID: 15,
ReceiverShardID: 13,
TxHashes: [][]byte{[]byte("txD"), []byte("txE")},
}

miniblockHashA, _ := repo.computeMiniblockHash(miniblockA)
miniblockHashB, _ := repo.computeMiniblockHash(miniblockB)
miniblockHashC, _ := repo.computeMiniblockHash(miniblockC)
partialExecutedMbHash, _ := repo.computeMiniblockHash(partialExecutedMiniblock)

marshalizer := &marshal.GogoProtoMarshalizer{}

// Let's have a block committed
_ = repo.RecordBlock([]byte("fooblock"),
Expand All @@ -271,6 +281,7 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
miniblockA,
miniblockB,
miniblockC,
partialExecutedMiniblock,
},
}, nil, nil, nil, nil,
)
Expand Down Expand Up @@ -308,7 +319,18 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
ReceiverShardID: 14,
Hash: miniblockHashB,
},
}},
},
},
{
ShardID: 15,
ShardMiniBlockHeaders: []block.MiniBlockHeader{
{
SenderShardID: 15,
ReceiverShardID: 13,
Hash: partialExecutedMbHash,
},
},
},
},
}

Expand All @@ -330,6 +352,20 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
require.Equal(t, 0, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))

metadata, err = repo.getMiniblockMetadataByMiniblockHash(partialExecutedMbHash)
require.Nil(t, err)
require.Equal(t, 4000, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))

mbhr := &block.MiniBlockHeaderReserved{
ExecutionType: block.ProcessingType(0),
State: block.PartialExecuted,
IndexOfFirstTxProcessed: 0,
IndexOfLastTxProcessed: 0,
}
mbhrBytes, err := marshalizer.Marshal(mbhr)
require.Nil(t, err)

// Let's receive a metablock that notarized two shard blocks, with miniblocks B (at destination) and C (at source)
metablock = &block.MetaBlock{
Nonce: 4001,
Expand All @@ -354,6 +390,17 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
},
},
},
{
ShardID: 13,
ShardMiniBlockHeaders: []block.MiniBlockHeader{
{
SenderShardID: 15,
ReceiverShardID: 13,
Hash: partialExecutedMbHash,
Reserved: mbhrBytes,
},
},
},
},
}

Expand All @@ -375,6 +422,21 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
require.Equal(t, 4001, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))

metadata, err = repo.getMiniblockMetadataByMiniblockHash(partialExecutedMbHash)
require.Nil(t, err)
require.Equal(t, 4000, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))
require.Equal(t, 4001, int(metadata.PartialExecutionInfo[0].NotarizedAtDestinationMetaNonce))

mbhr = &block.MiniBlockHeaderReserved{
ExecutionType: block.ProcessingType(0),
State: block.PartialExecuted,
IndexOfFirstTxProcessed: 1,
IndexOfLastTxProcessed: 1,
}
mbhrBytes, err = marshalizer.Marshal(mbhr)
require.Nil(t, err)

// Let's receive a metablock that notarized one shard block, with miniblock C (at destination)
metablock = &block.MetaBlock{
Nonce: 4002,
Expand All @@ -387,6 +449,12 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
ReceiverShardID: 13,
Hash: miniblockHashC,
},
{
SenderShardID: 15,
ReceiverShardID: 13,
Hash: partialExecutedMbHash,
Reserved: mbhrBytes,
},
},
},
},
Expand All @@ -409,6 +477,50 @@ func TestHistoryRepository_OnNotarizedBlocks(t *testing.T) {
require.Nil(t, err)
require.Equal(t, 4001, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 4002, int(metadata.NotarizedAtDestinationInMetaNonce))

metadata, err = repo.getMiniblockMetadataByMiniblockHash(partialExecutedMbHash)
require.Nil(t, err)
require.Equal(t, 4000, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))
require.Equal(t, 4001, int(metadata.PartialExecutionInfo[0].NotarizedAtDestinationMetaNonce))
require.Equal(t, 4002, int(metadata.PartialExecutionInfo[1].NotarizedAtDestinationMetaNonce))

// simulate a rollback and the partial executed miniblock will have a different order

mbhr = &block.MiniBlockHeaderReserved{
ExecutionType: block.ProcessingType(0),
State: block.PartialExecuted,
IndexOfFirstTxProcessed: 0,
IndexOfLastTxProcessed: 1,
}
mbhrBytes, err = marshalizer.Marshal(mbhr)
require.Nil(t, err)

metablock = &block.MetaBlock{
Nonce: 4002,
ShardInfo: []block.ShardData{
{
ShardID: 13,
ShardMiniBlockHeaders: []block.MiniBlockHeader{
{
SenderShardID: 15,
ReceiverShardID: 13,
Hash: partialExecutedMbHash,
Reserved: mbhrBytes,
},
},
},
},
}

repo.OnNotarizedBlocks(core.MetachainShardId, []data.HeaderHandler{metablock}, [][]byte{[]byte("metablockZ")})

metadata, err = repo.getMiniblockMetadataByMiniblockHash(partialExecutedMbHash)
require.Nil(t, err)
require.Equal(t, 4000, int(metadata.NotarizedAtSourceInMetaNonce))
require.Equal(t, 0, int(metadata.NotarizedAtDestinationInMetaNonce))
require.Len(t, metadata.PartialExecutionInfo, 1)
require.Equal(t, 4002, int(metadata.PartialExecutionInfo[0].NotarizedAtDestinationMetaNonce))
}

func TestHistoryRepository_OnNotarizedBlocksAtSourceBeforeCommittingAtDestination(t *testing.T) {
Expand Down
Loading

0 comments on commit c054556

Please sign in to comment.