diff --git a/block/manager.go b/block/manager.go index 0d3eba315..bd7e8cb13 100644 --- a/block/manager.go +++ b/block/manager.go @@ -350,7 +350,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) { // for aggregator: get notification that batch has been accepted so can send next batch. func (m *Manager) SyncTargetLoop(ctx context.Context) { m.logger.Info("Started sync target loop") - subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) + subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted) if err != nil { m.logger.Error("failed to subscribe to state update events") panic(err) @@ -368,7 +368,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { return case event := <-subscription.Out(): m.logger.Info("Received state update event", "eventData", event.Data()) - eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted) + eventData := event.Data().(*settlement.EventDataNewBatchAccepted) m.updateSyncParams(ctx, eventData.EndHeight) // In case we are the aggregator and we've got an update, then we can stop blocking from // the next batches to be published. For non-aggregators this is not needed. @@ -385,6 +385,7 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { // updateSyncParams updates the sync target and state index if necessary func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) { + rollappHubHeightGauge.Set(float64(endHeight)) m.logger.Info("Received new syncTarget", "syncTarget", endHeight) atomic.StoreUint64(&m.syncTarget, endHeight) atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano()) @@ -707,6 +708,7 @@ func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error { } m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs)) + rollappHeightGauge.Set(float64(newHeight)) //TODO: move to separate function lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime) diff --git a/block/metrics.go b/block/metrics.go new file mode 100644 index 000000000..430a428c2 --- /dev/null +++ b/block/metrics.go @@ -0,0 +1,15 @@ +package block + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var rollappHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "rollapp_height", + Help: "The height of the local rollapp", +}) +var rollappHubHeightGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "rollapp_hub_height", + Help: "The latest height of the Rollapp that has been synced to the hub.", +}) diff --git a/config/config.go b/config/config.go index a6d0155f2..a03ce15ca 100644 --- a/config/config.go +++ b/config/config.go @@ -27,10 +27,11 @@ type NodeConfig struct { // parameters below are dymint specific and read from config Aggregator bool `mapstructure:"aggregator"` BlockManagerConfig `mapstructure:",squash"` - DALayer string `mapstructure:"da_layer"` - DAConfig string `mapstructure:"da_config"` - SettlementLayer string `mapstructure:"settlement_layer"` - SettlementConfig settlement.Config `mapstructure:",squash"` + DALayer string `mapstructure:"da_layer"` + DAConfig string `mapstructure:"da_config"` + SettlementLayer string `mapstructure:"settlement_layer"` + SettlementConfig settlement.Config `mapstructure:",squash"` + Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"` } // BlockManagerConfig consists of all parameters required by BlockManagerConfig @@ -131,3 +132,14 @@ func (c NodeConfig) Validate() error { return nil } + +// InstrumentationConfig defines the configuration for metrics reporting. +type InstrumentationConfig struct { + // When true, Prometheus metrics are served under /metrics on + // PrometheusListenAddr. + // Check out the documentation for the list of available metrics. + Prometheus bool `mapstructure:"prometheus"` + + // Address to listen for Prometheus collector(s) connections. + PrometheusListenAddr string `mapstructure:"prometheus_listen_addr"` +} diff --git a/config/defaults.go b/config/defaults.go index d29d2b8d0..918f615f2 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -36,6 +36,10 @@ func DefaultConfig(home, chainId string) *NodeConfig { BlockBatchMaxSizeBytes: 1500000}, DALayer: "mock", SettlementLayer: "mock", + Instrumentation: &InstrumentationConfig{ + Prometheus: false, + PrometheusListenAddr: ":2112", + }, } if home == "" { diff --git a/config/toml.go b/config/toml.go index 8f4f55038..632f6fcc5 100644 --- a/config/toml.go +++ b/config/toml.go @@ -77,7 +77,7 @@ block_batch_size = {{ .BlockManagerConfig.BlockBatchSize }} batch_submit_max_time = "{{ .BlockManagerConfig.BatchSubmitMaxTime }}" ### da config ### -da_layer = "{{ .DALayer }}" # mock, celestia +da_layer = "{{ .DALayer }}" # mock, celestia, avail namespace_id = "{{ .BlockManagerConfig.NamespaceID }}" da_config = "{{ .DAConfig }}" @@ -85,9 +85,9 @@ da_config = "{{ .DAConfig }}" block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }} #celestia config example: -# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"fee\":20000, \"gas_limit\": 20000000, \"namespace_id\":\"000000000000ffff\"}" +# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_limit\": 20000000, \"namespace_id\":\"000000000000ffff\"}" # Avail config example: -# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}", +# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}" ### settlement config ### settlement_layer = "{{ .SettlementLayer }}" # mock, dymension @@ -103,4 +103,19 @@ gas_fees = "{{ .SettlementConfig.GasFees }}" keyring_backend = "{{ .SettlementConfig.KeyringBackend }}" keyring_home_dir = "{{ .SettlementConfig.KeyringHomeDir }}" dym_account_name = "{{ .SettlementConfig.DymAccountName }}" + +####################################################### +### Instrumentation Configuration Options ### +####################################################### +[instrumentation] + +# When true, Prometheus metrics are served under /metrics on +# PrometheusListenAddr. +# Check out the documentation for the list of available metrics. +prometheus = {{ .Instrumentation.Prometheus }} + +# Address to listen for Prometheus collector(s) connections +prometheus_listen_addr = "{{ .Instrumentation.PrometheusListenAddr }}" + + ` diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 71a189e6b..ec011a0cf 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -13,7 +13,7 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/client" httprpcclient "github.com/tendermint/tendermint/rpc/client/http" - "github.com/celestiaorg/go-cnc" + cnc "github.com/celestiaorg/go-cnc" "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log" "github.com/dymensionxyz/dymint/store" @@ -22,9 +22,9 @@ import ( ) type CNCClientI interface { - SubmitPFD(ctx context.Context, namespaceID [8]byte, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error) - NamespacedShares(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error) - NamespacedData(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error) + SubmitPFB(ctx context.Context, namespaceID cnc.Namespace, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error) + NamespacedShares(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error) + NamespacedData(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error) } // DataAvailabilityLayerClient use celestia-node public API. @@ -95,6 +95,14 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S return err } + if c.config.GasPrices != 0 && c.config.Fee != 0 { + return errors.New("can't set both gas prices and fee") + } + + if c.config.Fee == 0 && c.config.GasPrices == 0 { + return errors.New("fee or gas prices must be set") + } + c.pubsubServer = pubsubServer // Set defaults c.txPollingRetryDelay = defaultTxPollingRetryDelay @@ -159,62 +167,61 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS c.logger.Debug("Context cancelled") return da.ResultSubmitBatch{} default: - txResponse, err := c.client.SubmitPFD(c.ctx, c.config.NamespaceID, blob, c.config.Fee, c.config.GasLimit) - if txResponse != nil { - if txResponse.Code != 0 { - c.logger.Debug("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse, "error", err) - // Publish an health event. Only if we failed to emit the event we return an error. - res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog)) - if err != nil { - return res - } - } else if err != nil { - // Here we assume that if txResponse is not nil and also error is not nil it means that the transaction - // was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout). - // hence trying to poll for it. - c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txResponse", txResponse, "error", err) - inclusionHeight, err := c.waitForTXInclusion(txResponse.TxHash) - if err == nil { - res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil) - if err != nil { - return res - } else { - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{ - Code: da.StatusSuccess, - Message: "tx hash: " + txResponse.TxHash, - DAHeight: inclusionHeight, - }, - } - } - } else { - c.logger.Debug("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err) - res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err) - if err != nil { - return res - } - } - - } else { - c.logger.Debug("Successfully submitted DA batch", "txResponse", txResponse) - res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil) - if err != nil { - return res - } - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{ - Code: da.StatusSuccess, - Message: "tx hash: " + txResponse.TxHash, - DAHeight: uint64(txResponse.Height), - }, - } + estimatedGas := EstimateGas(len(blob)) + gasWanted := uint64(float64(estimatedGas) * gasAdjustment) + fees := c.calculateFees(gasWanted) + + //SubmitPFB sets an error if the txResponse has error, so we check check the txResponse for error + txResponse, err := c.client.SubmitPFB(c.ctx, c.config.NamespaceID, blob, int64(fees), gasWanted) + if txResponse == nil { + c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "error", err) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err) + if err != nil { + return res } - } else { - res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New("DA txResponse is nil")) + time.Sleep(c.submitRetryDelay) + continue + } + + if txResponse.Code != 0 { + c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse.RawLog, "code", txResponse.Code) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog)) if err != nil { return res } time.Sleep(c.submitRetryDelay) + continue + } + + // Here we assume that if txResponse is not nil and also error is not nil it means that the transaction + // was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout). + // hence trying to poll for it. + daHeight := uint64(txResponse.Height) + if daHeight == 0 { + c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txHash", txResponse.TxHash) + daHeight, err = c.waitForTXInclusion(txResponse.TxHash) + if err != nil { + c.logger.Error("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err) + if err != nil { + return res + } + time.Sleep(c.submitRetryDelay) + continue + } + } + + c.logger.Info("Successfully submitted DA batch", "txHash", txResponse.TxHash, "daHeight", txResponse.Height, "gasWanted", txResponse.GasWanted, "gasUsed", txResponse.GasUsed) + res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil) + if err != nil { + return res + } + return da.ResultSubmitBatch{ + BaseResult: da.BaseResult{ + Code: da.StatusSuccess, + Message: "tx hash: " + txResponse.TxHash, + DAHeight: daHeight, + }, } } } diff --git a/da/celestia/celestia_test.go b/da/celestia/celestia_test.go index 849272abc..790564b1c 100644 --- a/da/celestia/celestia_test.go +++ b/da/celestia/celestia_test.go @@ -24,14 +24,14 @@ import ( ) const ( - submitPFDFuncName = "SubmitPFD" + submitPFBFuncName = "SubmitPFB" TxFuncName = "Tx" ) func TestSubmitBatch(t *testing.T) { assert := assert.New(t) require := require.New(t) - configBytes, err := json.Marshal(celestia.Config{}) + configBytes, err := json.Marshal(celestia.CelestiaDefaultConfig) require.NoError(err) batch := &types.Batch{ StartHeight: 0, @@ -39,59 +39,59 @@ func TestSubmitBatch(t *testing.T) { } cases := []struct { name string - submitPFDReturn []interface{} + submitPFBReturn []interface{} sumbitPFDRun func(args mock.Arguments) TxFnReturn []interface{} TxFnRun func(args mock.Arguments) isSubmitBatchAsync bool - expectedSubmitPFDMinCalls int + expectedSubmitPFBMinCalls int expectedInclusionHeight int expectedHealthEvent *da.EventDataDAHealthStatus }{ { - name: "TestSubmitPFDResponseNil", - submitPFDReturn: []interface{}{nil, nil}, + name: "TestSubmitPFBResponseNil", + submitPFBReturn: []interface{}{nil, nil}, sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, isSubmitBatchAsync: true, - expectedSubmitPFDMinCalls: 2, + expectedSubmitPFBMinCalls: 2, expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false}, }, { - name: "TestSubmitPFDResponseCodeSuccess", - submitPFDReturn: []interface{}{&cnc.TxResponse{Code: 0, Height: int64(143)}, nil}, + name: "TestSubmitPFBResponseCodeSuccess", + submitPFBReturn: []interface{}{&cnc.TxResponse{Code: 0, Height: int64(143)}, nil}, sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, isSubmitBatchAsync: false, - expectedSubmitPFDMinCalls: 1, + expectedSubmitPFBMinCalls: 1, expectedInclusionHeight: 143, expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: true}, }, { - name: "TestSubmitPFDResponseCodeFailure", - submitPFDReturn: []interface{}{&cnc.TxResponse{Code: 1}, nil}, + name: "TestSubmitPFBResponseCodeFailure", + submitPFBReturn: []interface{}{&cnc.TxResponse{Code: 1}, nil}, sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, isSubmitBatchAsync: true, - expectedSubmitPFDMinCalls: 2, + expectedSubmitPFBMinCalls: 2, expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false}, }, { - name: "TestSubmitPFDDelayedInclusion", - submitPFDReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")}, + name: "TestSubmitPFBDelayedInclusion", + submitPFBReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")}, sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, TxFnReturn: []interface{}{&coretypes.ResultTx{Hash: bytes.HexBytes("1234"), Height: int64(145)}, nil}, TxFnRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, isSubmitBatchAsync: false, - expectedSubmitPFDMinCalls: 1, + expectedSubmitPFBMinCalls: 1, expectedInclusionHeight: 145, expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: true}, }, { - name: "TestSubmitPFDDelayedInclusionTxNotFound", - submitPFDReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")}, + name: "TestSubmitPFBDelayedInclusionTxNotFound", + submitPFBReturn: []interface{}{&cnc.TxResponse{TxHash: "1234"}, errors.New("timeout")}, sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, TxFnReturn: []interface{}{nil, errors.New("notFound")}, TxFnRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, isSubmitBatchAsync: true, - expectedSubmitPFDMinCalls: 2, + expectedSubmitPFBMinCalls: 2, expectedHealthEvent: &da.EventDataDAHealthStatus{Healthy: false}, }, } @@ -119,7 +119,7 @@ func TestSubmitBatch(t *testing.T) { err = dalc.Start() require.NoError(err) // Set the mock functions - mockCNCClient.On(submitPFDFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFDReturn...).Run(tc.sumbitPFDRun) + mockCNCClient.On(submitPFBFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFBReturn...).Run(tc.sumbitPFDRun) rpcmockClient.On(TxFuncName, mock.Anything, mock.Anything, mock.Anything).Return(tc.TxFnReturn...).Run(tc.TxFnRun) if tc.isSubmitBatchAsync { go dalc.SubmitBatch(batch) @@ -146,8 +146,8 @@ func TestSubmitBatch(t *testing.T) { err = dalc.Stop() require.NoError(err) // Wait for the goroutines to finish before accessing the mock calls - time.Sleep(1 * time.Second) + time.Sleep(3 * time.Second) t.Log("Verifying mock calls") - assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFDFuncName), tc.expectedSubmitPFDMinCalls) + assert.GreaterOrEqual(testutil.CountMockCalls(mockCNCClient.Calls, submitPFBFuncName), tc.expectedSubmitPFBMinCalls) } } diff --git a/da/celestia/config.go b/da/celestia/config.go index a3be4ad0a..2e7180e79 100644 --- a/da/celestia/config.go +++ b/da/celestia/config.go @@ -3,12 +3,17 @@ package celestia import ( "encoding/hex" "time" + + cnc "github.com/celestiaorg/go-cnc" ) const ( defaultTxPollingRetryDelay = 20 * time.Second defaultSubmitRetryDelay = 10 * time.Second defaultTxPollingAttempts = 5 + namespaceVersion = 0 + defaultGasPrices = 0.1 + gasAdjustment = 1.3 ) // Config stores Celestia DALC configuration parameters. @@ -17,19 +22,21 @@ type Config struct { AppNodeURL string `json:"app_node_url"` Timeout time.Duration `json:"timeout"` Fee int64 `json:"fee"` + GasPrices float64 `json:"gas_prices"` GasLimit uint64 `json:"gas_limit"` NamespaceIDStr string `json:"namespace_id"` - NamespaceID [8]byte `json:"-"` + NamespaceID cnc.Namespace `json:"-"` } var CelestiaDefaultConfig = Config{ BaseURL: "http://127.0.0.1:26659", AppNodeURL: "", Timeout: 30 * time.Second, - Fee: 20000, + Fee: 0, GasLimit: 20000000, + GasPrices: defaultGasPrices, NamespaceIDStr: "000000000000ffff", - NamespaceID: [8]byte{0, 0, 0, 0, 0, 0, 255, 255}, + NamespaceID: cnc.Namespace{Version: namespaceVersion, ID: []byte{0, 0, 0, 0, 0, 0, 255, 255}}, } func (c *Config) InitNamespaceID() error { @@ -38,6 +45,14 @@ func (c *Config) InitNamespaceID() error { if err != nil { return err } - copy(c.NamespaceID[:], namespaceBytes) + // TODO(omritoptix): a hack. need to enforce in the config + if len(namespaceBytes) != cnc.NamespaceIDSize { + // pad namespaceBytes with 0s + namespaceBytes = append(make([]byte, cnc.NamespaceIDSize-len(namespaceBytes)), namespaceBytes...) + } + c.NamespaceID, err = cnc.New(namespaceVersion, namespaceBytes) + if err != nil { + return err + } return nil } diff --git a/da/celestia/fees.go b/da/celestia/fees.go new file mode 100644 index 000000000..8af0c9f09 --- /dev/null +++ b/da/celestia/fees.go @@ -0,0 +1,22 @@ +package celestia + +const ( + perByteGasTolerance = 2 + pfbGasFixedCost = 80000 + defaultGasPerBlobByte = 8 +) + +func (c *DataAvailabilityLayerClient) calculateFees(gas uint64) int64 { + fees := c.config.Fee + if fees == 0 { + fees = int64(c.config.GasPrices * float64(gas)) + } + + return fees +} + +// EstimateGas estimates the gas required to pay for a set of blobs in a PFB. +func EstimateGas(blobSizes int) uint64 { + variableGasAmount := (defaultGasPerBlobByte + perByteGasTolerance) * blobSizes + return uint64(variableGasAmount + pfbGasFixedCost) +} diff --git a/da/celestia/mock/server.go b/da/celestia/mock/server.go index 5215d2ed2..986f470f7 100644 --- a/da/celestia/mock/server.go +++ b/da/celestia/mock/server.go @@ -66,7 +66,7 @@ func (s *Server) Stop() { func (s *Server) getHandler() http.Handler { mux := mux2.NewRouter() - mux.HandleFunc("/submit_pfd", s.submit).Methods(http.MethodPost) + mux.HandleFunc("/submit_pfb", s.submit).Methods(http.MethodPost) mux.HandleFunc("/namespaced_shares/{namespace}/height/{height}", s.shares).Methods(http.MethodGet) mux.HandleFunc("/namespaced_data/{namespace}/height/{height}", s.data).Methods(http.MethodGet) @@ -74,7 +74,7 @@ func (s *Server) getHandler() http.Handler { } func (s *Server) submit(w http.ResponseWriter, r *http.Request) { - req := cnc.SubmitPFDRequest{} + req := cnc.SubmitPFBRequest{} err := json.NewDecoder(r.Body).Decode(&req) if err != nil { s.writeError(w, err) diff --git a/da/da_test.go b/da/da_test.go index 3cf52d0b8..c1210cfcd 100644 --- a/da/da_test.go +++ b/da/da_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/celestiaorg/go-cnc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/pubsub" @@ -98,7 +99,8 @@ func doTestDALC(t *testing.T, dalc da.DataAvailabilityLayerClient) { BaseURL: "http://localhost:26658", Timeout: 30 * time.Second, GasLimit: 3000000, - NamespaceID: [8]byte{0, 1, 2, 3, 4, 5, 6, 7}, + Fee: 200000000, + NamespaceID: cnc.Namespace{Version: 0, ID: []byte{0, 0, 0, 0, 0, 0, 255, 255}}, } conf, _ = json.Marshal(config) } @@ -218,7 +220,8 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) { BaseURL: "http://localhost:26658", Timeout: 30 * time.Second, GasLimit: 3000000, - NamespaceID: [8]byte{0, 1, 2, 3, 4, 5, 6, 7}, + Fee: 2000000, + NamespaceID: cnc.Namespace{Version: 0, ID: []byte{0, 0, 0, 0, 0, 0, 255, 255}}, } conf, _ = json.Marshal(config) } diff --git a/go.mod b/go.mod index 75108a415..0821ebd51 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,12 @@ go 1.19 require ( code.cloudfoundry.org/go-diodes v0.0.0-20220725190411-383eb6634c40 + cosmossdk.io/errors v1.0.0-beta.7 github.com/avast/retry-go v3.0.0+incompatible - github.com/celestiaorg/go-cnc v0.2.0 + github.com/celestiaorg/go-cnc v0.4.2 github.com/centrifuge/go-substrate-rpc-client/v4 v4.0.12 github.com/dgraph-io/badger/v3 v3.2103.3 - github.com/dymensionxyz/cosmosclient v0.3.0-beta.0.20230621132116-77eb2ae5ab92 + github.com/dymensionxyz/cosmosclient v0.4.0-beta github.com/dymensionxyz/dymension v0.2.0-beta.0.20230607115558-745644a96ea6 github.com/go-kit/kit v0.12.0 github.com/gofrs/uuid v4.3.0+incompatible @@ -38,7 +39,6 @@ require ( ) require ( - cosmossdk.io/errors v1.0.0-beta.7 // indirect cosmossdk.io/math v1.0.0-rc.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/StackExchange/wmi v1.2.1 // indirect @@ -242,7 +242,6 @@ require ( ) replace ( - github.com/celestiaorg/go-cnc => github.com/dymensionxyz/go-cnc v0.2.2 github.com/centrifuge/go-substrate-rpc-client/v4 => github.com/availproject/go-substrate-rpc-client/v4 v4.0.12-avail-1.4.0-rc1-5e286e3 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4 github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1 diff --git a/go.sum b/go.sum index a2244beed..7620c7b62 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,8 @@ github.com/btcsuite/btcd/btcutil v1.1.2 h1:XLMbX8JQEiwMcYft2EGi8zPUkoa0abKIU6/BJ github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce h1:YtWJF7RHm2pYCvA5t0RPmAaLUhREsKuKd+SLhxFbFeQ= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= +github.com/celestiaorg/go-cnc v0.4.2 h1:7ixf3tevMB7Lvz2mbyRG0ZOK+8qoPm7wNhdgpi8VreU= +github.com/celestiaorg/go-cnc v0.4.2/go.mod h1:zYzvHudSd1iNPuHBMyvZ1YvWou5aT9JXgtch9Tkaf70= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= @@ -206,12 +208,10 @@ github.com/dustin/go-humanize v1.0.1-0.20200219035652-afde56e7acac h1:opbrjaN/L8 github.com/dustin/go-humanize v1.0.1-0.20200219035652-afde56e7acac/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM= github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= -github.com/dymensionxyz/cosmosclient v0.3.0-beta.0.20230621132116-77eb2ae5ab92 h1:OIKQAadI4pa2LTi888nzEWrGAUW225ysETiEHIr+oLY= -github.com/dymensionxyz/cosmosclient v0.3.0-beta.0.20230621132116-77eb2ae5ab92/go.mod h1:3y64ecWDzhnd0sSYZfaL4QpwgK0b0j6LFLVRGdygg+o= +github.com/dymensionxyz/cosmosclient v0.4.0-beta h1:IWyEEdvJ60n/v/DGNelD0n23lS6ED/5rsQJBClgIfhA= +github.com/dymensionxyz/cosmosclient v0.4.0-beta/go.mod h1:3y64ecWDzhnd0sSYZfaL4QpwgK0b0j6LFLVRGdygg+o= github.com/dymensionxyz/dymension v0.2.0-beta.0.20230607115558-745644a96ea6 h1:dnriGXmMdYEiF/8lMrj+PDlN1vyQc6zgs/ZHL67eoyI= github.com/dymensionxyz/dymension v0.2.0-beta.0.20230607115558-745644a96ea6/go.mod h1:rDkVuF+DxBDi5tTgVHFk1D2xpqf8bOccs6aB1wTOvP0= -github.com/dymensionxyz/go-cnc v0.2.2 h1:C7WUFJ+PkkB62HPegBJJL+YlQExqvYudTTRdNiCNIDk= -github.com/dymensionxyz/go-cnc v0.2.2/go.mod h1:CZBVUhQnJnAVcfQnnEAqREF+PNWr97m/BhJ5fp1K44Q= github.com/dymensionxyz/rpc v1.3.1 h1:7EXWIobaBes5zldRvTIg7TmNsEKjicrWA/OjCc0NaGs= github.com/dymensionxyz/rpc v1.3.1/go.mod h1:f+WpX8ysy8wt95iGc6auYlHcnHj2bUkhiRVkkKNys8c= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= diff --git a/mocks/da/celestia/cnc_client.go b/mocks/da/celestia/cnc_client.go index 6beade153..613a7feaf 100644 --- a/mocks/da/celestia/cnc_client.go +++ b/mocks/da/celestia/cnc_client.go @@ -16,11 +16,11 @@ type CNCClientI struct { } // NamespacedData provides a mock function with given fields: ctx, namespaceID, height -func (_m *CNCClientI) NamespacedData(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error) { +func (_m *CNCClientI) NamespacedData(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error) { ret := _m.Called(ctx, namespaceID, height) var r0 [][]byte - if rf, ok := ret.Get(0).(func(context.Context, [8]byte, uint64) [][]byte); ok { + if rf, ok := ret.Get(0).(func(context.Context, cnc.Namespace, uint64) [][]byte); ok { r0 = rf(ctx, namespaceID, height) } else { if ret.Get(0) != nil { @@ -29,7 +29,7 @@ func (_m *CNCClientI) NamespacedData(ctx context.Context, namespaceID [8]byte, h } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, [8]byte, uint64) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, cnc.Namespace, uint64) error); ok { r1 = rf(ctx, namespaceID, height) } else { r1 = ret.Error(1) @@ -39,11 +39,11 @@ func (_m *CNCClientI) NamespacedData(ctx context.Context, namespaceID [8]byte, h } // NamespacedShares provides a mock function with given fields: ctx, namespaceID, height -func (_m *CNCClientI) NamespacedShares(ctx context.Context, namespaceID [8]byte, height uint64) ([][]byte, error) { +func (_m *CNCClientI) NamespacedShares(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error) { ret := _m.Called(ctx, namespaceID, height) var r0 [][]byte - if rf, ok := ret.Get(0).(func(context.Context, [8]byte, uint64) [][]byte); ok { + if rf, ok := ret.Get(0).(func(context.Context, cnc.Namespace, uint64) [][]byte); ok { r0 = rf(ctx, namespaceID, height) } else { if ret.Get(0) != nil { @@ -52,7 +52,7 @@ func (_m *CNCClientI) NamespacedShares(ctx context.Context, namespaceID [8]byte, } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, [8]byte, uint64) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, cnc.Namespace, uint64) error); ok { r1 = rf(ctx, namespaceID, height) } else { r1 = ret.Error(1) @@ -61,12 +61,12 @@ func (_m *CNCClientI) NamespacedShares(ctx context.Context, namespaceID [8]byte, return r0, r1 } -// SubmitPFD provides a mock function with given fields: ctx, namespaceID, blob, fee, gasLimit -func (_m *CNCClientI) SubmitPFD(ctx context.Context, namespaceID [8]byte, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error) { +// SubmitPFB provides a mock function with given fields: ctx, namespaceID, blob, fee, gasLimit +func (_m *CNCClientI) SubmitPFB(ctx context.Context, namespaceID cnc.Namespace, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error) { ret := _m.Called(ctx, namespaceID, blob, fee, gasLimit) var r0 *cnc.TxResponse - if rf, ok := ret.Get(0).(func(context.Context, [8]byte, []byte, int64, uint64) *cnc.TxResponse); ok { + if rf, ok := ret.Get(0).(func(context.Context, cnc.Namespace, []byte, int64, uint64) *cnc.TxResponse); ok { r0 = rf(ctx, namespaceID, blob, fee, gasLimit) } else { if ret.Get(0) != nil { @@ -75,7 +75,7 @@ func (_m *CNCClientI) SubmitPFD(ctx context.Context, namespaceID [8]byte, blob [ } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, [8]byte, []byte, int64, uint64) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, cnc.Namespace, []byte, int64, uint64) error); ok { r1 = rf(ctx, namespaceID, blob, fee, gasLimit) } else { r1 = ret.Error(1) diff --git a/node/node.go b/node/node.go index d225c047e..3168465ca 100644 --- a/node/node.go +++ b/node/node.go @@ -5,7 +5,11 @@ import ( "encoding/base64" "encoding/json" "fmt" + "net/http" "sync" + "time" + + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/libp2p/go-libp2p/core/crypto" @@ -263,6 +267,12 @@ func (n *Node) OnStart() error { if err != nil { return fmt.Errorf("error while starting settlement layer client: %w", err) } + go func() { + if err := n.startPrometheusServer(); err != nil { + panic(err) + } + }() + n.baseLayersHealthStatus = BaseLayersHealthStatus{ settlementHealthy: true, daHealthy: true, @@ -400,5 +410,22 @@ func (n *Node) healthStatusHandler(err error) { if err = n.pubsubServer.PublishWithEvents(n.ctx, healthStatusEvent, map[string][]string{events.EventNodeTypeKey: {events.EventHealthStatus}}); err != nil { panic(err) } + } } + +func (n *Node) startPrometheusServer() error { + if n.conf.Instrumentation != nil && n.conf.Instrumentation.Prometheus { + http.Handle("/metrics", promhttp.Handler()) + srv := &http.Server{ + Addr: n.conf.Instrumentation.PrometheusListenAddr, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + Handler: http.DefaultServeMux, + } + if err := srv.ListenAndServe(); err != nil { + return err + } + } + return nil +} diff --git a/node/node_test.go b/node/node_test.go index acae0dd75..df42c3946 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -117,7 +117,7 @@ func TestHealthStatusEventHandler(t *testing.T) { err = node.Start() assert.NoError(err) // wait for node to start - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) slUnealthyError := errors.New("settlement layer is unhealthy") daUnealthyError := errors.New("da layer is unhealthy") @@ -171,8 +171,10 @@ func TestHealthStatusEventHandler(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { done := make(chan bool, 1) + ready := make(chan bool, 1) go func() { HealthSubscription, err := node.pubsubServer.Subscribe(node.ctx, c.name, events.EventQueryHealthStatus) + ready <- true assert.NoError(err) select { case event := <-HealthSubscription.Out(): @@ -184,7 +186,7 @@ func TestHealthStatusEventHandler(t *testing.T) { assert.Equal(c.expectedError, healthStatusEvent.Error) done <- true break - case <-time.After(500 * time.Millisecond): + case <-time.After(1 * time.Second): if c.expectHealthStatusEventEmitted { t.Error("expected health status event but didn't get one") } @@ -192,6 +194,7 @@ func TestHealthStatusEventHandler(t *testing.T) { break } }() + <-ready // Emit an event. node.pubsubServer.PublishWithEvents(context.Background(), c.baseLayerHealthStatusEventData, c.baseLayerHealthStatusEvent) <-done diff --git a/settlement/base.go b/settlement/base.go index ae6b3a832..98354e472 100644 --- a/settlement/base.go +++ b/settlement/base.go @@ -8,6 +8,7 @@ import ( "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log" "github.com/dymensionxyz/dymint/types" + "github.com/dymensionxyz/dymint/utils" "github.com/tendermint/tendermint/libs/pubsub" ) @@ -148,7 +149,7 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) { func (b *BaseLayerClient) validateBatch(batch *types.Batch) error { if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 { - return fmt.Errorf("batch start height must be last height. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)+1) + return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)) } if batch.EndHeight < batch.StartHeight { return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight) @@ -170,10 +171,18 @@ func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) { b.logger.Debug("received state update event", "eventData", event.Data()) eventData := event.Data().(*EventDataNewSettlementBatchAccepted) atomic.StoreUint64(&b.latestHeight, eventData.EndHeight) + // Emit new batch event + newBatchEventData := &EventDataNewBatchAccepted{ + EndHeight: eventData.EndHeight, + StateIndex: eventData.StateIndex, + } + utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData, + map[string][]string{EventTypeKey: {EventNewBatchAccepted}}) case <-subscription.Cancelled(): b.logger.Info("subscription canceled") return case <-b.ctx.Done(): + b.logger.Info("Context done. Exiting state update handler") return } } diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index e530b842a..ffe10f135 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -8,6 +8,7 @@ import ( "cosmossdk.io/errors" "github.com/avast/retry-go" + "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" cdctypes "github.com/cosmos/cosmos-sdk/codec/types" cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" @@ -336,7 +337,7 @@ func (d *HubClient) submitBatch(msgUpdateState *rollapptypes.MsgUpdateState) err err := retry.Do(func() error { txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState) if err != nil || txResp.Code != 0 { - d.logger.Error("Error sending batch to settlement layer", "resp", txResp, "error", err) + d.logger.Error("Error sending batch to settlement layer", "resp", txResp.RawLog, "error", err) return err } return nil @@ -423,6 +424,7 @@ func getCosmosClientOptions(config *settlement.Config) []cosmosclient.Option { } options := []cosmosclient.Option{ cosmosclient.WithAddressPrefix(addressPrefix), + cosmosclient.WithBroadcastMode(flags.BroadcastSync), cosmosclient.WithNodeAddress(config.NodeAddress), cosmosclient.WithFees(config.GasFees), cosmosclient.WithGasLimit(config.GasLimit), diff --git a/settlement/events.go b/settlement/events.go index 36de7123e..1993a671b 100644 --- a/settlement/events.go +++ b/settlement/events.go @@ -16,19 +16,25 @@ const ( // Define the event types const ( - EventNewSettlementBatchAccepted = "NewSettlementBatchAccepted" - EventSequencersListUpdated = "SequencersListUpdated" - EventSettlementHealthStatus = "SettlementHealthStatus" + // This event should be emitted internally in order to communicate between the settlement layer and the hub client + EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted" + // This event should be emitted externally when a batch is accepted + EventNewBatchAccepted = "EventNewBatchAccepted" + EventSequencersListUpdated = "SequencersListUpdated" + EventSettlementHealthStatus = "SettlementHealthStatus" ) -// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted -type EventDataNewSettlementBatchAccepted struct { +// EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted +type EventDataNewBatchAccepted struct { // EndHeight is the height of the last accepted batch EndHeight uint64 // StateIndex is the rollapp-specific index the batch was saved in the SL StateIndex uint64 } +// EventDataNewSettlementBatchAccepted defines the structure of the event data for the EventNewSettlementBatchAccepted +type EventDataNewSettlementBatchAccepted EventDataNewBatchAccepted + // EventDataSequencersListUpdated defines the structure of the event data for the EventSequencersListUpdated type EventDataSequencersListUpdated struct { // Sequencers is the list of sequencers @@ -46,6 +52,7 @@ type EventDataSettlementHealthStatus struct { // Define queries var ( EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted) + EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted) EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus) ) diff --git a/settlement/settlement_test.go b/settlement/settlement_test.go index 56b7874ce..542515165 100644 --- a/settlement/settlement_test.go +++ b/settlement/settlement_test.go @@ -170,6 +170,8 @@ func TestGetSequencers(t *testing.T) { assert.Equal(t, settlementClient.GetProposer().PublicKey, proposer.PublicKey) err := settlementClient.Stop() + // Wait until the settlement layer stops + <-time.After(1 * time.Second) assert.NoError(t, err) // Validate the amount of inactive sequencers