Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bug): sync from da and p2p when starting a node #763

Merged
merged 32 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7b70862
finish impl but without fixing tests, and sync call is in wrong place
danwt May 1, 2024
8b0a032
move the call
danwt May 1, 2024
de0ea98
add a sanity check log
danwt May 1, 2024
67d30ac
do not sync celestia on boot
danwt May 1, 2024
1487d79
remove process nexct batch
danwt May 1, 2024
3db98d0
keep retrying
danwt May 1, 2024
66092f9
comment out celestia
danwt May 1, 2024
8725393
revert all temp changes
danwt May 1, 2024
bac7765
fixing da syncing
srene May 1, 2024
656b017
readd availability check
srene May 2, 2024
a8fdcd1
changing default parameters + logging
srene May 2, 2024
5208955
setting channel size to gossip cache value
srene May 2, 2024
e03ae2c
Merge 5208955ccc4613900609cbab080899612941e94e into 43e2d965f2b505751…
srene May 2, 2024
f5b7981
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
f6adc27
using unbuffered subscriptions
srene May 2, 2024
4c4442d
removing unused func
srene May 2, 2024
78780ed
refac/simplify sync until target
danwt May 2, 2024
132fc6e
make sync until target more readable
danwt May 2, 2024
02a41ba
add comment for retriever
danwt May 2, 2024
b600148
remove state index
danwt May 2, 2024
b3ff35e
rebuild proto
danwt May 2, 2024
f2df961
docstrings
danwt May 2, 2024
b343287
docstrings
danwt May 2, 2024
53002b0
fix(celestia): use fixed delay in repeat attempts (#753)
danwt May 2, 2024
a25d74b
readding mutex
srene May 2, 2024
09e7199
Merge a25d74b2a46a53491047112c0f45ab3c66370f17 into 53002b0a070743811…
srene May 2, 2024
4cc17dd
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
c7f6e7a
fix logging
srene May 2, 2024
b88bf6e
fix(doc): manager cache comment (#767)
danwt May 2, 2024
5cee763
add state index back to proto
danwt May 2, 2024
d101799
Merge 5cee7634fbd013ffcfc4ed8af0d3f8174f390b51 into b88bf6e72820c944b…
srene May 2, 2024
91bb364
Update CHANGELOG.md [skip ci]
invalid-email-address May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-01)
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-02)


Check failure on line 3 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]
### Bug Fixes

Check failure on line 4 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Heading levels should only increment by one level at a time [Expected: h2; Actual: h3]

* **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0))
* **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40))
Expand All @@ -12,17 +12,18 @@
* **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db))
* **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3))
* **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b))
* **sync:** make sure we use a latest state index as a start point ([#760](https://github.com/dymensionxyz/dymint/issues/760)) ([43e2d96](https://github.com/dymensionxyz/dymint/commit/43e2d965f2b505751f8e5260549e909c976141ee))


Check failure on line 17 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]

Check failure on line 18 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 3]
# [1.1.0-rc02](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc01...v1.1.0-rc02) (2024-04-26)

Check failure on line 19 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple top-level headings in the same document [Context: "# [1.1.0-rc02](https://github...."]


Check failure on line 21 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]

Check failure on line 22 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 3]
# [1.1.0-rc01](https://github.com/dymensionxyz/dymint/compare/v1.0.1-alpha...v1.1.0-rc01) (2024-04-25)

Check failure on line 23 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple top-level headings in the same document [Context: "# [1.1.0-rc01](https://github...."]


Check failure on line 25 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]
### Bug Fixes

Check failure on line 26 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Heading levels should only increment by one level at a time [Expected: h2; Actual: h3]

* **block production:** apply block before gossiping ([#695](https://github.com/dymensionxyz/dymint/issues/695)) ([5c496b4](https://github.com/dymensionxyz/dymint/commit/5c496b453e98bbcc67feb6df3a2d4ad340586816))
* **block:** Only register `nodeHealthStatusHandler` for sequencer ([#683](https://github.com/dymensionxyz/dymint/issues/683)) ([da2ff94](https://github.com/dymensionxyz/dymint/commit/da2ff94bcdd064109da703fa885609846a94e180))
Expand Down
10 changes: 4 additions & 6 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@
return err
}
}
if !isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
Fixed Show fixed Hide fixed
}

err := m.syncBlockManager()
if err != nil {
Expand All @@ -178,7 +181,6 @@
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand Down Expand Up @@ -236,9 +238,7 @@
// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.retrieverMutex.Lock() // needed to protect blockCache access
danwt marked this conversation as resolved.
Show resolved Hide resolved

m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache))
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache))
danwt marked this conversation as resolved.
Show resolved Hide resolved
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
Expand All @@ -252,8 +252,6 @@
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
danwt marked this conversation as resolved.
Show resolved Hide resolved

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
Expand Down
69 changes: 27 additions & 42 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
Expand Down Expand Up @@ -44,33 +43,33 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error {
return nil
}

var stateIndex uint64
h := m.Store.Height()
err := retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(h)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0),
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
if err != nil {
return fmt.Errorf("get height state: %w", err)
}
m.updateStateIndex(stateIndex - 1)
m.logger.Debug("Sync until target: updated state index pre loop", "stateIndex", stateIndex, "height", h, "syncTarget", syncTarget)

for currentHeight < syncTarget {
currStateIdx := atomic.LoadUint64(&m.LastState.SLStateIndex) + 1
m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget)
settlementBatch, err := m.SLClient.RetrieveBatch(currStateIdx)
var stateIndex uint64
h := m.Store.Height() + 1

m.logger.Debug("Sync until target: updated state index pre loop", "height", h, "syncTarget", syncTarget)
danwt marked this conversation as resolved.
Show resolved Hide resolved

err := retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(h)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0),
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
if err != nil {
return fmt.Errorf("get height state: %w", err)
}

m.logger.Info("Retrieving batch", "state_index", stateIndex)
danwt marked this conversation as resolved.
Show resolved Hide resolved
settlementBatch, err := m.SLClient.RetrieveBatch(stateIndex)
if err != nil {
return err
}
Expand All @@ -82,32 +81,18 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error {

currentHeight = m.Store.Height()

err = m.updateStateIndex(settlementBatch.StateIndex)
if err != nil {
return err
}
}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err = m.attemptApplyCachedBlocks()
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
}

return nil
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.LastState.SLStateIndex, stateIndex)
_, err := m.Store.UpdateState(m.LastState, nil)
if err != nil {
m.logger.Error("update state", "error", err)
return err
}
return nil
}

func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daMetaData.Height)
batchResp := m.fetchBatch(daMetaData)
Expand Down
5 changes: 5 additions & 0 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package celestia

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -319,6 +320,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

c.logger.Debug("Celestia DA getting blob", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment))
var batches []*types.Batch
blob, err := c.rpc.Get(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment)
if err != nil {
Expand All @@ -345,6 +347,9 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
if err != nil {
c.logger.Error("unmarshal block", "daHeight", daMetaData.Height, "error", err)
}

c.logger.Debug("Celestia DA get blob successful", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight)

parsedBatch := new(types.Batch)
err = parsedBatch.FromProto(&batch)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions da/celestia/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
defaultRpcRetryDelay = 30 * time.Second
defaultRpcRetryDelay = 1 * time.Second
danwt marked this conversation as resolved.
Show resolved Hide resolved
defaultRpcCheckAttempts = 10
namespaceVersion = 0
defaultGasPrices = 0.1
Expand Down Expand Up @@ -42,7 +42,7 @@ type Config struct {
var CelestiaDefaultConfig = Config{
BaseURL: "http://127.0.0.1:26658",
AppNodeURL: "",
Timeout: 30 * time.Second,
Timeout: 5 * time.Second,
danwt marked this conversation as resolved.
Show resolved Hide resolved
Fee: 0,
GasLimit: 20000000,
GasPrices: defaultGasPrices,
Expand Down
3 changes: 1 addition & 2 deletions utils/event/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ func MustSubscribe(
eventQuery pubsub.Query,
callback func(event pubsub.Message),
logger types.Logger,
outCapacity ...int,
) {
subscription, err := pubsubServer.Subscribe(ctx, clientID, eventQuery, outCapacity...)
subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery)
if err != nil {
logger.Error("subscribe to events")
panic(err)
Expand Down
Loading