Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into release/v0.5.x
Browse files Browse the repository at this point in the history
  • Loading branch information
omritoptix committed Jul 26, 2023
2 parents 7244e68 + e5a48aa commit 110dc3a
Show file tree
Hide file tree
Showing 20 changed files with 265 additions and 121 deletions.
6 changes: 4 additions & 2 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
// for aggregator: get notification that batch has been accepted so can send next batch.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted)
if err != nil {
m.logger.Error("failed to subscribe to state update events")
panic(err)
Expand All @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
return
case event := <-subscription.Out():
m.logger.Info("Received state update event", "eventData", event.Data())
eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted)
eventData := event.Data().(*settlement.EventDataNewBatchAccepted)
m.updateSyncParams(ctx, eventData.EndHeight)
// In case we are the aggregator and we've got an update, then we can stop blocking from
// the next batches to be published. For non-aggregators this is not needed.
Expand All @@ -385,6 +385,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
rollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
Expand Down Expand Up @@ -707,6 +708,7 @@ func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error {
}

m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))
rollappHeightGauge.Set(float64(newHeight))

//TODO: move to separate function
lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime)
Expand Down
15 changes: 15 additions & 0 deletions block/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package block

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var rollappHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_height",
Help: "The height of the local rollapp",
})
var rollappHubHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_hub_height",
Help: "The latest height of the Rollapp that has been synced to the hub.",
})
20 changes: 16 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type NodeConfig struct {
// parameters below are dymint specific and read from config
Aggregator bool `mapstructure:"aggregator"`
BlockManagerConfig `mapstructure:",squash"`
DALayer string `mapstructure:"da_layer"`
DAConfig string `mapstructure:"da_config"`
SettlementLayer string `mapstructure:"settlement_layer"`
SettlementConfig settlement.Config `mapstructure:",squash"`
DALayer string `mapstructure:"da_layer"`
DAConfig string `mapstructure:"da_config"`
SettlementLayer string `mapstructure:"settlement_layer"`
SettlementConfig settlement.Config `mapstructure:",squash"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
}

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
Expand Down Expand Up @@ -131,3 +132,14 @@ func (c NodeConfig) Validate() error {

return nil
}

// InstrumentationConfig defines the configuration for metrics reporting.
type InstrumentationConfig struct {
// When true, Prometheus metrics are served under /metrics on
// PrometheusListenAddr.
// Check out the documentation for the list of available metrics.
Prometheus bool `mapstructure:"prometheus"`

// Address to listen for Prometheus collector(s) connections.
PrometheusListenAddr string `mapstructure:"prometheus_listen_addr"`
}
4 changes: 4 additions & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func DefaultConfig(home, chainId string) *NodeConfig {
BlockBatchMaxSizeBytes: 1500000},
DALayer: "mock",
SettlementLayer: "mock",
Instrumentation: &InstrumentationConfig{
Prometheus: false,
PrometheusListenAddr: ":2112",
},
}

if home == "" {
Expand Down
21 changes: 18 additions & 3 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ block_batch_size = {{ .BlockManagerConfig.BlockBatchSize }}
batch_submit_max_time = "{{ .BlockManagerConfig.BatchSubmitMaxTime }}"
### da config ###
da_layer = "{{ .DALayer }}" # mock, celestia
da_layer = "{{ .DALayer }}" # mock, celestia, avail
namespace_id = "{{ .BlockManagerConfig.NamespaceID }}"
da_config = "{{ .DAConfig }}"
# max size of batch in bytes that can be accepted by DA
block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"fee\":20000, \"gas_limit\": 20000000, \"namespace_id\":\"000000000000ffff\"}"
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_limit\": 20000000, \"namespace_id\":\"000000000000ffff\"}"
# Avail config example:
# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}",
# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}"
### settlement config ###
settlement_layer = "{{ .SettlementLayer }}" # mock, dymension
Expand All @@ -103,4 +103,19 @@ gas_fees = "{{ .SettlementConfig.GasFees }}"
keyring_backend = "{{ .SettlementConfig.KeyringBackend }}"
keyring_home_dir = "{{ .SettlementConfig.KeyringHomeDir }}"
dym_account_name = "{{ .SettlementConfig.DymAccountName }}"
#######################################################
### Instrumentation Configuration Options ###
#######################################################
[instrumentation]
# When true, Prometheus metrics are served under /metrics on
# PrometheusListenAddr.
# Check out the documentation for the list of available metrics.
prometheus = {{ .Instrumentation.Prometheus }}
# Address to listen for Prometheus collector(s) connections
prometheus_listen_addr = "{{ .Instrumentation.PrometheusListenAddr }}"
`
117 changes: 62 additions & 55 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/client"
httprpcclient "github.com/tendermint/tendermint/rpc/client/http"

"github.com/celestiaorg/go-cnc"
cnc "github.com/celestiaorg/go-cnc"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/store"
Expand All @@ -22,9 +22,9 @@ import (
)

type CNCClientI interface {
SubmitPFD(ctx context.Context, namespaceID [8]byte, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error)
NamespacedShares(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error)
NamespacedData(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error)
SubmitPFB(ctx context.Context, namespaceID cnc.Namespace, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error)
NamespacedShares(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
NamespacedData(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
}

// DataAvailabilityLayerClient use celestia-node public API.
Expand Down Expand Up @@ -95,6 +95,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
return err
}

if c.config.GasPrices != 0 && c.config.Fee != 0 {
return errors.New("can't set both gas prices and fee")
}

if c.config.Fee == 0 && c.config.GasPrices == 0 {
return errors.New("fee or gas prices must be set")
}

c.pubsubServer = pubsubServer
// Set defaults
c.txPollingRetryDelay = defaultTxPollingRetryDelay
Expand Down Expand Up @@ -159,62 +167,61 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
c.logger.Debug("Context cancelled")
return da.ResultSubmitBatch{}
default:
txResponse, err := c.client.SubmitPFD(c.ctx, c.config.NamespaceID, blob, c.config.Fee, c.config.GasLimit)
if txResponse != nil {
if txResponse.Code != 0 {
c.logger.Debug("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse, "error", err)
// Publish an health event. Only if we failed to emit the event we return an error.
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog))
if err != nil {
return res
}
} else if err != nil {
// Here we assume that if txResponse is not nil and also error is not nil it means that the transaction
// was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout).
// hence trying to poll for it.
c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txResponse", txResponse, "error", err)
inclusionHeight, err := c.waitForTXInclusion(txResponse.TxHash)
if err == nil {
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil)
if err != nil {
return res
} else {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
DAHeight: inclusionHeight,
},
}
}
} else {
c.logger.Debug("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
}

} else {
c.logger.Debug("Successfully submitted DA batch", "txResponse", txResponse)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil)
if err != nil {
return res
}
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
DAHeight: uint64(txResponse.Height),
},
}
estimatedGas := EstimateGas(len(blob))
gasWanted := uint64(float64(estimatedGas) * gasAdjustment)
fees := c.calculateFees(gasWanted)

//SubmitPFB sets an error if the txResponse has error, so we check check the txResponse for error
txResponse, err := c.client.SubmitPFB(c.ctx, c.config.NamespaceID, blob, int64(fees), gasWanted)
if txResponse == nil {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
} else {
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New("DA txResponse is nil"))
time.Sleep(c.submitRetryDelay)
continue
}

if txResponse.Code != 0 {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse.RawLog, "code", txResponse.Code)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog))
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}

// Here we assume that if txResponse is not nil and also error is not nil it means that the transaction
// was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout).
// hence trying to poll for it.
daHeight := uint64(txResponse.Height)
if daHeight == 0 {
c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txHash", txResponse.TxHash)
daHeight, err = c.waitForTXInclusion(txResponse.TxHash)
if err != nil {
c.logger.Error("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}
}

c.logger.Info("Successfully submitted DA batch", "txHash", txResponse.TxHash, "daHeight", txResponse.Height, "gasWanted", txResponse.GasWanted, "gasUsed", txResponse.GasUsed)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil)
if err != nil {
return res
}
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
DAHeight: daHeight,
},
}
}
}
Expand Down
44 changes: 22 additions & 22 deletions da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,74 +24,74 @@ import (
)

const (
submitPFDFuncName = "SubmitPFD"
submitPFBFuncName = "SubmitPFB"
TxFuncName = "Tx"
)

func TestSubmitBatch(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
configBytes, err := json.Marshal(celestia.Config{})
configBytes, err := json.Marshal(celestia.CelestiaDefaultConfig)
require.NoError(err)
batch := &types.Batch{
StartHeight: 0,
EndHeight: 1,
}
cases := []struct {
name string
submitPFDReturn []interface{}
submitPFBReturn []interface{}
sumbitPFDRun func(args mock.Arguments)
TxFnReturn []interface{}
TxFnRun func(args mock.Arguments)
isSubmitBatchAsync bool
expectedSubmitPFDMinCalls int
expectedSubmitPFBMinCalls int
expectedInclusionHeight int
expectedHealthEvent *da.EventDataDAHealthStatus
}{
{
name: "TestSubmitPFDResponseNil",
submitPFDReturn: []interface{}{nil, nil},
name: "TestSubmitPFBResponseNil",
submitPFBReturn: []interface{}{nil, nil},
sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
isSubmitBatchAsync: true,
expectedSubmitPFDMinCalls: 2,
expectedSubmitPFBMinCalls: 2,
expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false},
},
{
name: "TestSubmitPFDResponseCodeSuccess",
submitPFDReturn: []interface{}{&cnc.TxResponse{Code: 0, Height: int64(143)}, nil},
name: "TestSubmitPFBResponseCodeSuccess",
submitPFBReturn: []interface{}{&cnc.TxResponse{Code: 0, Height: int64(143)}, nil},
sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
isSubmitBatchAsync: false,
expectedSubmitPFDMinCalls: 1,
expectedSubmitPFBMinCalls: 1,
expectedInclusionHeight: 143,
expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: true},
},
{
name: "TestSubmitPFDResponseCodeFailure",
submitPFDReturn: []interface{}{&cnc.TxResponse{Code: 1}, nil},
name: "TestSubmitPFBResponseCodeFailure",
submitPFBReturn: []interface{}{&cnc.TxResponse{Code: 1}, nil},
sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
isSubmitBatchAsync: true,
expectedSubmitPFDMinCalls: 2,
expectedSubmitPFBMinCalls: 2,
expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false},
},
{
name: "TestSubmitPFDDelayedInclusion",
submitPFDReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")},
name: "TestSubmitPFBDelayedInclusion",
submitPFBReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")},
sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
TxFnReturn: []interface{}{&coretypes.ResultTx{Hash: bytes.HexBytes("1234"), Height: int64(145)}, nil},
TxFnRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
isSubmitBatchAsync: false,
expectedSubmitPFDMinCalls: 1,
expectedSubmitPFBMinCalls: 1,
expectedInclusionHeight: 145,
expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: true},
},
{
name: "TestSubmitPFDDelayedInclusionTxNotFound",
submitPFDReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")},
name: "TestSubmitPFBDelayedInclusionTxNotFound",
submitPFBReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")},
sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
TxFnReturn: []interface{}{nil, errors.New("notFound")},
TxFnRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) },
isSubmitBatchAsync: true,
expectedSubmitPFDMinCalls: 2,
expectedSubmitPFBMinCalls: 2,
expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false},
},
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestSubmitBatch(t *testing.T) {
err = dalc.Start()
require.NoError(err)
// Set the mock functions
mockCNCClient.On(submitPFDFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFDReturn...).Run(tc.sumbitPFDRun)
mockCNCClient.On(submitPFBFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFBReturn...).Run(tc.sumbitPFDRun)
rpcmockClient.On(TxFuncName, mock.Anything, mock.Anything, mock.Anything).Return(tc.TxFnReturn...).Run(tc.TxFnRun)
if tc.isSubmitBatchAsync {
go dalc.SubmitBatch(batch)
Expand All @@ -146,8 +146,8 @@ func TestSubmitBatch(t *testing.T) {
err = dalc.Stop()
require.NoError(err)
// Wait for the goroutines to finish before accessing the mock calls
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)
t.Log("Verifying mock calls")
assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls)
assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFBFuncName), tc.expectedSubmitPFBMinCalls)
}
}
Loading

0 comments on commit 110dc3a

Please sign in to comment.