Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(upstream): upstream fixes for v1.2.0 #1017

Merged
merged 11 commits into from
Aug 13, 2024
Merged
92 changes: 27 additions & 65 deletions CHANGELOG.md

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// Prune old heights, if requested by ABCI app.
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
Expand Down Expand Up @@ -122,23 +121,23 @@ func (m *Manager) attemptApplyCachedBlocks() error {
for {
expectedHeight := m.State.NextHeight()

cachedBlock, blockExists := m.blockCache.GetBlockFromCache(expectedHeight)
cachedBlock, blockExists := m.blockCache.Get(expectedHeight)
if !blockExists {
break
}
if err := m.validateBlock(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: types.GossipedBlock})
err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Info("Block applied", "height", expectedHeight)

m.blockCache.DeleteBlockFromCache(cachedBlock.Block.Header.Height)
m.blockCache.Delete(cachedBlock.Block.Header.Height)
}

return nil
Expand Down
12 changes: 6 additions & 6 deletions block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ type Cache struct {
cache map[uint64]types.CachedBlock
}

func (m *Cache) AddBlockToCache(h uint64, b *types.Block, c *types.Commit) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c}
func (m *Cache) Add(h uint64, b *types.Block, c *types.Commit, source types.BlockSource) {
m.cache[h] = types.CachedBlock{Block: b, Commit: c, Source: source}
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) DeleteBlockFromCache(h uint64) {
func (m *Cache) Delete(h uint64) {
delete(m.cache, h)
types.BlockCacheSizeGauge.Set(float64(m.Size()))
}

func (m *Cache) GetBlockFromCache(h uint64) (types.CachedBlock, bool) {
func (m *Cache) Get(h uint64) (types.CachedBlock, bool) {
ret, found := m.cache[h]
return ret, found
}

func (m *Cache) HasBlockInCache(h uint64) bool {
_, found := m.GetBlockFromCache(h)
func (m *Cache) Has(h uint64) bool {
_, found := m.Get(h)
return found
}

Expand Down
2 changes: 1 addition & 1 deletion block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestApplyBlock(t *testing.T) {
Validators: tmtypes.NewValidatorSet(nil),
}
state.InitialHeight = 1
state.LastBlockHeight.Store(0)
state.SetHeight(0)
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
Expand Down
105 changes: 59 additions & 46 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
"sync"
"sync/atomic"

"code.cloudfoundry.org/go-diodes"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/store"
uerrors "github.com/dymensionxyz/dymint/utils/errors"
uevent "github.com/dymensionxyz/dymint/utils/event"

"github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -55,7 +55,7 @@
*/
// The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will
// start at this height + 1.
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont'
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it won't
// prune anything that might be submitted in the future. Therefore, it must be atomic.
LastSubmittedHeight atomic.Uint64

Expand All @@ -65,12 +65,15 @@
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// Protect against syncing twice from DA in case new batch is posted but it did not finish to sync yet.
syncFromDaMu sync.Mutex
Retriever da.BatchRetriever
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache *Cache

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -98,18 +101,17 @@
}

m := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
logger: logger,
Pubsub: pubsub,
p2pClient: p2pClient,
LocalKey: localKey,
Conf: conf,
Genesis: genesis,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
logger: logger,
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
Expand Down Expand Up @@ -144,45 +146,45 @@
}
}

if !isSequencer {
// Fullnode loop can start before syncing from DA
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}
if isSequencer {

eg, ctx := errgroup.WithContext(ctx)
eg, ctx := errgroup.WithContext(ctx)

if isSequencer {
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
go func() {
bytesProducedC <- nBytes
}()
eg.Go(func() error {
err = m.syncFromSettlement()
if err != nil {
return fmt.Errorf("sync block manager from settlement: %w", err)
}
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
bytesProducedC <- nBytes
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
go func() {
_ = eg.Wait() // errors are already logged
m.logger.Info("Block manager err group finished.")
}()
Comment on lines +168 to +171

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

} else {
eg.Go(func() error {
return m.RetrieveLoop(ctx)
})
eg.Go(func() error {
return m.SyncToTargetHeightLoop(ctx)
})
}
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
go func() {
err := m.syncFromSettlement()
if err != nil {
m.logger.Error("sync block manager from settlement", "err", err)
}
// DA Sync. Subscribe to SL next batch events
go uevent.MustSubscribe(ctx, m.Pubsub, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted, m.onNewStateUpdate, m.logger)
}()

go func() {
err := eg.Wait()
m.logger.Info("Block manager err group finished.", "err", err)
}()
// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
}

return nil
}
Expand All @@ -205,25 +207,36 @@
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
// syncFromSettlement enforces the node to be synced on initial run from SL and DA.
func (m *Manager) syncFromSettlement() error {
res, err := m.SLClient.GetLatestBatch()
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

func (m *Manager) UpdateTargetHeight(h uint64) {
for {
currentHeight := m.TargetHeight.Load()
if m.TargetHeight.CompareAndSwap(currentHeight, max(currentHeight, h)) {
break
}
}
}
49 changes: 42 additions & 7 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ package block_test
import (
"context"
"crypto/rand"
"sync/atomic"
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/libp2p/go-libp2p/core/crypto"

"github.com/dymensionxyz/dymint/block"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
"github.com/dymensionxyz/dymint/types"
"github.com/libp2p/go-libp2p/core/crypto"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
Expand Down Expand Up @@ -53,10 +56,11 @@ func TestInitialState(t *testing.T) {
// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, logger)
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BlockSyncRequestIntervalTime: 30 * time.Second,
}, privKey, "TestChain", emptyStore, pubsubServer, datastore.NewMapDatastore(), logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down Expand Up @@ -87,7 +91,7 @@ func TestInitialState(t *testing.T) {
store: fullStore,
genesis: genesis,
expectedInitialHeight: sampleState.InitialHeight,
expectedLastBlockHeight: sampleState.LastBlockHeight.Load(),
expectedLastBlockHeight: sampleState.Height(),
expectedChainID: sampleState.ChainID,
},
}
Expand All @@ -101,7 +105,7 @@ func TestInitialState(t *testing.T) {
assert.NotNil(agg)
assert.Equal(c.expectedChainID, agg.State.ChainID)
assert.Equal(c.expectedInitialHeight, agg.State.InitialHeight)
assert.Equal(c.expectedLastBlockHeight, agg.State.LastBlockHeight.Load())
assert.Equal(c.expectedLastBlockHeight, agg.State.Height())
})
}
}
Expand Down Expand Up @@ -439,3 +443,34 @@ func TestDAFetch(t *testing.T) {
})
}
}

func TestManager_updateTargetHeight(t *testing.T) {
tests := []struct {
name string
TargetHeight uint64
h uint64
expTargetHeight uint64
}{
{
name: "no update target height",
TargetHeight: 100,
h: 99,
expTargetHeight: 100,
}, {
name: "update target height",
TargetHeight: 100,
h: 101,
expTargetHeight: 101,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &block.Manager{
TargetHeight: atomic.Uint64{},
}
m.TargetHeight.Store(tt.TargetHeight)
m.UpdateTargetHeight(tt.h)
assert.Equal(t, tt.expTargetHeight, m.TargetHeight.Load())
})
}
}
Loading
Loading