Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to disable DAS chunked stores #2796

Merged
merged 12 commits into from
Dec 30, 2024
4 changes: 3 additions & 1 deletion cmd/datool/datool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type ClientStoreConfig struct {
SigningWallet string `koanf:"signing-wallet"`
SigningWalletPassword string `koanf:"signing-wallet-password"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
EnableChunkedStore bool `koanf:"enable-chunked-store"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little confusing having to do enable-chunked-store=false, how about changing the option to disable-chunked-store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier it was named to disable-chunked-store but following a discussion with gabriel to avoid boolean flags with negative assertions here i changed it to enable-chunked-store

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a recurring theme, and, in general, I prefer flags/bools to be worded in a positive sense. I think @eljobe was the first one to bring this up.

}

func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
Expand All @@ -104,6 +105,7 @@ func parseClientStoreConfig(args []string) (*ClientStoreConfig, error) {
f.String("signing-wallet-password", genericconf.PASSWORD_NOT_SET, "password to unlock the wallet, if not specified the user is prompted for the password")
f.Duration("das-retention-period", 24*time.Hour, "The period which DASes are requested to retain the stored batches.")
f.Int("max-store-chunk-body-size", 512*1024, "The maximum HTTP POST body size for a chunked store request")
f.Bool("enable-chunked-store", true, "enable data to be sent to DAS in chunks instead of all at once")

k, err := confighelpers.BeginCommonParse(f, args)
if err != nil {
Expand Down Expand Up @@ -152,7 +154,7 @@ func startClientStore(args []string) error {
}
}

client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize)
client, err := das.NewDASRPCClient(config.URL, signer, config.MaxStoreChunkBodySize, config.EnableChunkedStore)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ type AggregatorConfig struct {
AssumedHonest int `koanf:"assumed-honest"`
Backends BackendConfigList `koanf:"backends"`
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
EnableChunkedStore bool `koanf:"enable-chunked-store"`
}

var DefaultAggregatorConfig = AggregatorConfig{
AssumedHonest: 0,
Backends: nil,
MaxStoreChunkBodySize: 512 * 1024,
EnableChunkedStore: true,
}

var parsedBackendsConf BackendConfigList
Expand All @@ -56,6 +58,7 @@ func AggregatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".assumed-honest", DefaultAggregatorConfig.AssumedHonest, "Number of assumed honest backends (H). If there are N backends, K=N+1-H valid responses are required to consider an Store request to be successful.")
f.Var(&parsedBackendsConf, prefix+".backends", "JSON RPC backend configuration. This can be specified on the command line as a JSON array, eg: [{\"url\": \"...\", \"pubkey\": \"...\"},...], or as a JSON array in the config file.")
f.Int(prefix+".max-store-chunk-body-size", DefaultAggregatorConfig.MaxStoreChunkBodySize, "maximum HTTP POST body size to use for individual batch chunks, including JSON RPC overhead and an estimated overhead of 512B of headers")
f.Bool(prefix+".enable-chunked-store", DefaultAggregatorConfig.EnableChunkedStore, "enable data to be sent to DAS in chunks instead of all at once")
}

type Aggregator struct {
Expand Down
4 changes: 2 additions & 2 deletions das/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestDAS_BasicAggregationLocal(t *testing.T) {
backends = append(backends, *details)
}

aggregator, err := NewAggregator(ctx, DataAvailabilityConfig{RPCAggregator: AggregatorConfig{AssumedHonest: 1}, ParentChainNodeURL: "none"}, backends)
aggregator, err := NewAggregator(ctx, DataAvailabilityConfig{RPCAggregator: AggregatorConfig{AssumedHonest: 1, EnableChunkedStore: true}, ParentChainNodeURL: "none"}, backends)
Require(t, err)

rawMsg := []byte("It's time for you to see the fnords.")
Expand Down Expand Up @@ -207,7 +207,7 @@ func testConfigurableStorageFailures(t *testing.T, shouldFailAggregation bool) {
aggregator, err := NewAggregator(
ctx,
DataAvailabilityConfig{
RPCAggregator: AggregatorConfig{AssumedHonest: assumedHonest},
RPCAggregator: AggregatorConfig{AssumedHonest: assumedHonest, EnableChunkedStore: true},
ParentChainNodeURL: "none",
RequestTimeout: time.Millisecond * 2000,
}, backends)
Expand Down
39 changes: 25 additions & 14 deletions das/dasRpcClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ var (
)

type DASRPCClient struct { // implements DataAvailabilityService
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
clnt *rpc.Client
url string
signer signature.DataSignerFunc
chunkSize uint64
enableChunkedStore bool
}

func nilSigner(_ []byte) ([]byte, error) {
Expand All @@ -47,7 +48,7 @@ func nilSigner(_ []byte) ([]byte, error) {

const sendChunkJSONBoilerplate = "{\"jsonrpc\":\"2.0\",\"id\":4294967295,\"method\":\"das_sendChunked\",\"params\":[\"\"]}"

func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int) (*DASRPCClient, error) {
func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChunkBodySize int, enableChunkedStore bool) (*DASRPCClient, error) {
clnt, err := rpc.Dial(target)
if err != nil {
return nil, err
Expand All @@ -56,18 +57,23 @@ func NewDASRPCClient(target string, signer signature.DataSignerFunc, maxStoreChu
signer = nilSigner
}

client := &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
enableChunkedStore: enableChunkedStore,
}

// Byte arrays are encoded in base64
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
if enableChunkedStore {
chunkSize := (maxStoreChunkBodySize - len(sendChunkJSONBoilerplate) - 512 /* headers */) / 2
if chunkSize <= 0 {
return nil, fmt.Errorf("max-store-chunk-body-size %d doesn't leave enough room for chunk payload", maxStoreChunkBodySize)
}
client.chunkSize = uint64(chunkSize)
}

return &DASRPCClient{
clnt: clnt,
url: target,
signer: signer,
chunkSize: uint64(chunkSize),
}, nil
return client, nil
}

func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64) (*daprovider.DataAvailabilityCertificate, error) {
Expand All @@ -83,6 +89,11 @@ func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64
rpcClientStoreDurationHistogram.Update(time.Since(start).Nanoseconds())
}()

if !c.enableChunkedStore {
log.Debug("Legacy store is being force-used by the DAS client", "url", c.url)
return c.legacyStore(ctx, message, timeout)
}

// #nosec G115
timestamp := uint64(start.Unix())
nChunks := uint64(len(message)) / c.chunkSize
Expand Down
2 changes: 1 addition & 1 deletion das/rpc_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func ParseServices(config AggregatorConfig, signer signature.DataSignerFunc) ([]
}
metricName := metricsutil.CanonicalizeMetricName(url.Hostname())

service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize)
service, err := NewDASRPCClient(b.URL, signer, config.MaxStoreChunkBodySize, config.EnableChunkedStore)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions das/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func testRpcImpl(t *testing.T, size, times int, concurrent bool) {
AssumedHonest: 1,
Backends: beConfigs,
MaxStoreChunkBodySize: (chunkSize * 2) + len(sendChunkJSONBoilerplate),
EnableChunkedStore: true,
},
RequestTimeout: time.Minute,
}
Expand Down
1 change: 1 addition & 0 deletions system_tests/das_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func aggConfigForBackend(backendConfig das.BackendConfig) das.AggregatorConfig {
AssumedHonest: 1,
Backends: das.BackendConfigList{backendConfig},
MaxStoreChunkBodySize: 512 * 1024,
EnableChunkedStore: true,
}
}

Expand Down
Loading