Skip to content

Commit

Permalink
feat(da): SubmitBatchV2 method
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch committed Jul 1, 2024
1 parent bb07915 commit c1f6b88
Show file tree
Hide file tree
Showing 15 changed files with 859 additions and 151 deletions.
29 changes: 29 additions & 0 deletions da/da.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/celestiaorg/celestia-openrpc/types/blob"
"github.com/cometbft/cometbft/crypto/merkle"
cdctypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/store"
Expand Down Expand Up @@ -179,6 +180,13 @@ type ResultSubmitBatch struct {
SubmitMetaData *DASubmitMetaData
}

// ResultSubmitBatchV2 contains information returned from DA layer after block submission.
type ResultSubmitBatchV2 struct {
BaseResult
// DAPath instructs how to retrieve the submitted batch from the DA layer.
DAPath Path
}

// ResultCheckBatch contains information about block availability, returned from DA layer client.
type ResultCheckBatch struct {
BaseResult
Expand All @@ -196,6 +204,22 @@ type ResultRetrieveBatch struct {
CheckMetaData *DACheckMetaData
}

// ResultRetrieveBatchV2 contains a batch of blocks returned from the DA layer client.
type ResultRetrieveBatchV2 struct {
BaseResult
// Batches is the full block retrieved from the DA layer.
// If BaseResult.Code is not StatusSuccess, this field is nil.
Batches []*types.Batch
}

// Path TODO: move to the Dymension proto file
type Path struct {
// DAType identifies the DA type being used by the sequencer to post the blob.
DaType string
// Commitment is a generic commitment interpreted by the DA Layer.
Commitment *cdctypes.Any
}

// DataAvailabilityLayerClient defines generic interface for DA layer block submission.
// It also contains life-cycle methods.
type DataAvailabilityLayerClient interface {
Expand All @@ -213,6 +237,9 @@ type DataAvailabilityLayerClient interface {
// triggers a state transition in the DA layer.
SubmitBatch(batch *types.Batch) ResultSubmitBatch

// SubmitBatchV2 is a method that supports MsgUpdateStateV2.
SubmitBatchV2(*types.Batch) ResultSubmitBatchV2

GetClientType() Client

// CheckBatchAvailability checks the availability of the blob submitted getting proofs and validating them
Expand All @@ -226,6 +253,8 @@ type DataAvailabilityLayerClient interface {
type BatchRetriever interface {
// RetrieveBatches returns blocks at given data layer height from data availability layer.
RetrieveBatches(daMetaData *DASubmitMetaData) ResultRetrieveBatch
// RetrieveBatchesV2 is a method that supports MsgUpdateStateV2.
RetrieveBatchesV2(ResultSubmitBatchV2) ResultRetrieveBatchV2
// CheckBatchAvailability checks the availability of the blob received getting proofs and validating them
CheckBatchAvailability(daMetaData *DASubmitMetaData) ResultCheckBatch
}
20 changes: 20 additions & 0 deletions da/interchain/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
"github.com/ignite/cli/ignite/pkg/cosmosaccount"
"github.com/tendermint/tendermint/libs/bytes"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"

interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da"
)
Expand Down Expand Up @@ -43,3 +46,20 @@ func (c *daClient) Params(ctx context.Context) (interchainda.Params, error) {
}
return resp.GetParams(), nil
}

func (c *daClient) Tx(ctx context.Context, txHash []byte) (*ctypes.ResultTx, error) {
return c.RPC.Tx(ctx, txHash, false)
}

func (c *daClient) ABCIQueryWithProof(
ctx context.Context,
path string,
data bytes.HexBytes,
height int64,
) (*ctypes.ResultABCIQuery, error) {
opts := rpcclient.ABCIQueryOptions{
Height: height,
Prove: true,
}
return c.RPC.ABCIQueryWithOptions(ctx, path, data, opts)
}
30 changes: 28 additions & 2 deletions da/interchain/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package interchain

import (
"time"

interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da"
)

type DAConfig struct {
ClientID string `mapstructure:"client_id"` // This is the IBC client ID on Dymension hub for the DA chain
ChainID string `mapstructure:"chain_id"` // The chain ID of the DA chain
ClientID string `mapstructure:"client_id"` // IBC client ID between the Hub and DA layer
ChainID string `mapstructure:"chain_id"` // Chain ID of the DA layer

KeyringBackend string `mapstructure:"keyring_backend"`
KeyringHomeDir string `mapstructure:"keyring_home_dir"`
AddressPrefix string `mapstructure:"address_prefix"`
Expand All @@ -16,4 +19,27 @@ type DAConfig struct {
GasPrices string `mapstructure:"gas_prices"`
GasFees string `mapstructure:"gas_fees"`
DAParams interchainda.Params `mapstructure:"da_params"`

RetryMinDelay time.Duration `mapstructure:"retry_min_delay"`
RetryMaxDelay time.Duration `mapstructure:"retry_min_delay"`
RetryAttempts uint `mapstructure:"retry_attempts"`
}

func DefaultDAConfig() DAConfig {
return DAConfig{
ClientID: "",
ChainID: "",
KeyringBackend: "",
KeyringHomeDir: "",
AddressPrefix: "",
AccountName: "",
NodeAddress: "",
GasLimit: 0,
GasPrices: "",
GasFees: "",
DAParams: interchainda.Params{},
RetryMinDelay: 100 * time.Millisecond,
RetryMaxDelay: 2 * time.Second,
RetryAttempts: 10,
}
}
124 changes: 18 additions & 106 deletions da/interchain/interchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,31 @@ import (
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement/dymension"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da"
)

var (
_ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
_ da.BatchRetriever = &DataAvailabilityLayerClient{}
_ da.DataAvailabilityLayerClient = &DALayerClient{}
_ da.BatchRetriever = &DALayerClient{}
)

type DAClient interface {
Context() sdkclient.Context
BroadcastTx(accountName string, msgs ...sdk.Msg) (cosmosclient.Response, error)
Params(ctx context.Context) (interchainda.Params, error)
Tx(ctx context.Context, txHash []byte) (*ctypes.ResultTx, error)
ABCIQueryWithProof(ctx context.Context, path string, data bytes.HexBytes, height int64) (*ctypes.ResultABCIQuery, error)
}

// DataAvailabilityLayerClient is a client for DA-provider blockchains supporting the interchain-da module.
type DataAvailabilityLayerClient struct {
// DALayerClient is a client for DA-provider blockchains supporting the interchain-da module.
type DALayerClient struct {
logger types.Logger
ctx context.Context
cancel context.CancelFunc
Expand All @@ -46,7 +49,7 @@ type DataAvailabilityLayerClient struct {
}

// Init is called once. It reads the DA client configuration and initializes resources for the interchain DA provider.
func (c *DataAvailabilityLayerClient) Init(rawConfig []byte, server *pubsub.Server, _ store.KV, logger types.Logger, options ...da.Option) error {
func (c *DALayerClient) Init(rawConfig []byte, _ *pubsub.Server, _ store.KV, logger types.Logger, options ...da.Option) error {
ctx := context.Background()

// Read DA layer config
Expand Down Expand Up @@ -84,7 +87,6 @@ func (c *DataAvailabilityLayerClient) Init(rawConfig []byte, server *pubsub.Serv
c.cancel = cancel
c.cdc = cdc
c.synced = make(chan struct{})
c.pubsubServer = server
c.daClient = client
c.daConfig = config

Expand All @@ -96,120 +98,30 @@ func (c *DataAvailabilityLayerClient) Init(rawConfig []byte, server *pubsub.Serv
return nil
}

// Start is called once, after Init. It starts the operation of DataAvailabilityLayerClient, and Dymint will start submitting batches to the provider.
// Start is called once, after Init. It starts the operation of DALayerClient, and Dymint will start submitting batches to the provider.
// It fetches the latest interchain module parameters and sets up a subscription to receive updates when the provider updates these parameters.
// This ensures that the client is always up-to-date.
func (c *DataAvailabilityLayerClient) Start() error {
// Get the connectionID from the dymension hub for the da chain
c.daConfig.ClientID = dymension.(c.chainConfig.ChainID)

// Setup a subscription to event EventUpdateParams
c.grpc.Subscribe(func() {
// This event is thrown at the end of the block when the module params are updated
if block.event == EventUpdateParams {
// when the chain params are updated, update the client config to reflect the same
da.chainConfig.chainParams = block.event.new_params
}
})
func (c *DALayerClient) Start() error {
// TODO: Setup a subscription to event EventUpdateParams
return nil
}

// Stop is called once, when DataAvailabilityLayerClient is no longer needed.
func (c *DataAvailabilityLayerClient) Stop() error {
// Stop is called once, when DALayerClient is no longer needed.
func (c *DALayerClient) Stop() error {
c.pubsubServer.Stop()
c.cancel()
return nil
}

// Synced returns channel for on sync event
func (c *DataAvailabilityLayerClient) Synced() <-chan struct{} {
func (c *DALayerClient) Synced() <-chan struct{} {
return c.synced
}

func (c *DataAvailabilityLayerClient) GetClientType() da.Client {
func (c *DALayerClient) GetClientType() da.Client {
return da.Interchain
}

func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch {
result, err := c.submitBatch(batch)
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
Error: err,
},
SubmitMetaData: nil,
}
}
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "Submission successful",
},
SubmitMetaData: &da.DASubmitMetaData{
Height: height,
Namespace: c.config.NamespaceID.Bytes(),
Client: da.Celestia,
Commitment: commitment,
Index: 0,
Length: 0,
Root: nil,
},
}
}

type submitBatchResult struct {
BlobID uint64
BlobHash string
}

func (c *DataAvailabilityLayerClient) submitBatch(batch *types.Batch) (submitBatchResult, error) {
blob, err := batch.MarshalBinary()
if err != nil {
return submitBatchResult{}, fmt.Errorf("can't marshal batch: %w", err)
}

if len(blob) > int(c.daConfig.DAParams.MaxBlobSize) {
return submitBatchResult{}, fmt.Errorf("blob size %d exceeds the maximum allowed size %d", len(blob), c.daConfig.DAParams.MaxBlobSize)
}

feesToPay := sdk.NewCoin(c.daConfig.DAParams.CostPerByte.Denom, c.daConfig.DAParams.CostPerByte.Amount.MulRaw(int64(len(blob))))

msg := interchainda.MsgSubmitBlob{
Creator: c.daConfig.AccountName,
Blob: blob,
Fees: feesToPay,
}

txResp, err := c.daClient.BroadcastTx(c.daConfig.AccountName, &msg)
if err != nil {
return submitBatchResult{}, fmt.Errorf("can't broadcast MsgSubmitBlob to the DA layer: %w", err)
}
if txResp.Code != 0 {
return submitBatchResult{}, fmt.Errorf("MsgSubmitBlob broadcast tx status code is not 0: code %d", txResp.Code)
}

var resp interchainda.MsgSubmitBlobResponse
err = txResp.Decode(&resp)
if err != nil {
return submitBatchResult{}, fmt.Errorf("can't decode MsgSubmitBlob response: %w", err)
}

// trigger ibc stateupdate - optional (?)
// other ibc interactions would trigger this anyway. But until then, inclusion cannot be verified.
// better to trigger a stateupdate now imo
dymension.tx.ibc.client.updatestate(c.daConfig.clientID) // could import the go relayer and execute their funcs

return submitBatchResult{
BlobID: resp.BlobId,
BlobHash: resp.BlobHash,
}, nil
}

func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASubmitMetaData) da.ResultCheckBatch {
panic("implement me")
}

func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMetaData) da.ResultRetrieveBatch {
func (c *DALayerClient) CheckBatchAvailability(daMetaData *da.DASubmitMetaData) da.ResultCheckBatch {
panic("implement me")
}
62 changes: 62 additions & 0 deletions da/interchain/ioutils/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ioutils

import (
"bytes"
"compress/gzip"
"io"
)

// Note: []byte can never be const as they are inherently mutable
var (
// magic bytes to identify gzip.
// See https://www.ietf.org/rfc/rfc1952.txt
// and https://github.com/golang/go/blob/master/src/net/http/sniff.go#L186
gzipIdent = []byte("\x1F\x8B\x08")
)

// IsGzip returns checks if the file contents are gzip compressed
func IsGzip(input []byte) bool {
return len(input) >= 3 && bytes.Equal(gzipIdent, input[0:3])
}

// Gzip compresses the input ([]byte)
func Gzip(input []byte) ([]byte, error) {
// Create gzip writer
var b bytes.Buffer
w := gzip.NewWriter(&b)

_, err := w.Write(input)
if err != nil {
return nil, err
}

// You must close this first to flush the bytes to the buffer
err = w.Close()
if err != nil {
return nil, err
}

return b.Bytes(), nil
}

// Gunzip decompresses the input ([]byte)
func Gunzip(input []byte) ([]byte, error) {
// Create gzip reader
b := bytes.NewReader(input)
r, err := gzip.NewReader(b)
if err != nil {
return nil, err
}

output, err := io.ReadAll(r)
if err != nil {
return nil, err
}

err = r.Close()
if err != nil {
return nil, err
}

return output, nil
}
Loading

0 comments on commit c1f6b88

Please sign in to comment.