Skip to content

Commit

Permalink
Merge branch 'master' into addflag-disable-daschunkedstore
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli authored Dec 10, 2024
2 parents 296314b + 3e244d6 commit 82451dc
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 22 deletions.
2 changes: 1 addition & 1 deletion arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
if msgReadErr != nil && c.sequencer.Synced() {
log.Warn("coordinator failed reading message", "pos", msgToRead, "err", msgReadErr)
break
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type InitConfig struct {
ImportWasm bool `koanf:"import-wasm"`
AccountsPerSync uint `koanf:"accounts-per-sync"`
ImportFile string `koanf:"import-file"`
GenesisJsonFile string `koanf:"genesis-json-file"`
ThenQuit bool `koanf:"then-quit"`
Prune string `koanf:"prune"`
PruneBloomSize uint64 `koanf:"prune-bloom-size"`
Expand Down Expand Up @@ -54,6 +55,7 @@ var InitConfigDefault = InitConfig{
Empty: false,
ImportWasm: false,
ImportFile: "",
GenesisJsonFile: "",
AccountsPerSync: 100000,
ThenQuit: false,
Prune: "",
Expand Down Expand Up @@ -83,6 +85,7 @@ func InitConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".import-wasm", InitConfigDefault.ImportWasm, "if set, import the wasm directory when downloading a database (contains executable code - only use with highly trusted source)")
f.Bool(prefix+".then-quit", InitConfigDefault.ThenQuit, "quit after init is done")
f.String(prefix+".import-file", InitConfigDefault.ImportFile, "path for json data to import")
f.String(prefix+".genesis-json-file", InitConfigDefault.GenesisJsonFile, "path for genesis json file")
f.Uint(prefix+".accounts-per-sync", InitConfigDefault.AccountsPerSync, "during init - sync database every X accounts. Lower value for low-memory systems. 0 disables.")
f.String(prefix+".prune", InitConfigDefault.Prune, "pruning for a given use: \"full\" for full nodes serving RPC requests, or \"validator\" for validators")
f.Uint64(prefix+".prune-bloom-size", InitConfigDefault.PruneBloomSize, "the amount of memory in megabytes to use for the pruning bloom filter (higher values prune better)")
Expand Down
38 changes: 35 additions & 3 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,36 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo

var chainConfig *params.ChainConfig

if config.Init.GenesisJsonFile != "" {
if initDataReader != nil {
return chainDb, nil, errors.New("multiple init methods supplied")
}
genesisJson, err := os.ReadFile(config.Init.GenesisJsonFile)
if err != nil {
return chainDb, nil, err
}
var gen core.Genesis
if err := json.Unmarshal(genesisJson, &gen); err != nil {
return chainDb, nil, err
}
var accounts []statetransfer.AccountInitializationInfo
for address, account := range gen.Alloc {
accounts = append(accounts, statetransfer.AccountInitializationInfo{
Addr: address,
EthBalance: account.Balance,
Nonce: account.Nonce,
ContractInfo: &statetransfer.AccountInitContractInfo{
Code: account.Code,
ContractStorage: account.Storage,
},
})
}
initDataReader = statetransfer.NewMemoryInitDataReader(&statetransfer.ArbosInitializationInfo{
Accounts: accounts,
})
chainConfig = gen.Config
}

var l2BlockChain *core.BlockChain
txIndexWg := sync.WaitGroup{}
if initDataReader == nil {
Expand All @@ -714,9 +744,11 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
if err != nil {
return chainDb, nil, err
}
chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson)
if err != nil {
return chainDb, nil, err
if chainConfig == nil {
chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson)
if err != nil {
return chainDb, nil, err
}
}
if config.Init.DevInit && config.Init.DevMaxCodeSize != 0 {
chainConfig.ArbitrumChainParams.MaxCodeSize = config.Init.DevMaxCodeSize
Expand Down
18 changes: 9 additions & 9 deletions das/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64)
var sigs []blsSignatures.Signature
var aggSignersMask uint64
var successfullyStoredCount int
var returned bool
var returned int // 0-no status, 1-succeeded, 2-failed
for i := 0; i < len(a.services); i++ {
select {
case <-ctx.Done():
Expand All @@ -279,26 +279,26 @@ func (a *Aggregator) Store(ctx context.Context, message []byte, timeout uint64)
// certDetailsChan, so the Store function can return, but also continue
// running until all responses are received (or the context is canceled)
// in order to produce accurate logs/metrics.
if !returned {
if returned == 0 {
if successfullyStoredCount >= a.requiredServicesForStore {
cd := certDetails{}
cd.pubKeys = append(cd.pubKeys, pubKeys...)
cd.sigs = append(cd.sigs, sigs...)
cd.aggSignersMask = aggSignersMask
certDetailsChan <- cd
returned = true
if a.maxAllowedServiceStoreFailures > 0 && // Ignore the case where AssumedHonest = 1, probably a testnet
int(storeFailures.Load())+1 > a.maxAllowedServiceStoreFailures {
log.Error("das.Aggregator: storing the batch data succeeded to enough DAS commitee members to generate the Data Availability Cert, but if one more had failed then the cert would not have been able to be generated. Look for preceding logs with \"Error from backend\"")
}
returned = 1
} else if int(storeFailures.Load()) > a.maxAllowedServiceStoreFailures {
cd := certDetails{}
cd.err = fmt.Errorf("aggregator failed to store message to at least %d out of %d DASes (assuming %d are honest). %w", a.requiredServicesForStore, len(a.services), a.config.AssumedHonest, daprovider.ErrBatchToDasFailed)
certDetailsChan <- cd
returned = true
returned = 2
}
}

}
if returned == 1 &&
a.maxAllowedServiceStoreFailures > 0 && // Ignore the case where AssumedHonest = 1, probably a testnet
int(storeFailures.Load())+1 > a.maxAllowedServiceStoreFailures {
log.Error("das.Aggregator: storing the batch data succeeded to enough DAS commitee members to generate the Data Availability Cert, but if one more had failed then the cert would not have been able to be generated. Look for preceding logs with \"Error from backend\"")
}
}()

Expand Down
6 changes: 2 additions & 4 deletions das/reader_aggregator_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package das

import (
"errors"
"maps"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -33,10 +34,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat
s.readers = make([]daprovider.DASReader, len(readers))
copy(s.readers, readers)

s.stats = make(map[daprovider.DASReader]readerStats)
for k, v := range stats {
s.stats[k] = v
}
s.stats = maps.Clone(stats)
}

// Exponentially growing Explore Exploit Strategy
Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
225 changes: 221 additions & 4 deletions util/redisutil/redisutil.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,231 @@
package redisutil

import "github.com/redis/go-redis/v9"
import (
"fmt"
"net"
"net/url"
"sort"
"strconv"
"strings"
"time"

func RedisClientFromURL(url string) (redis.UniversalClient, error) {
if url == "" {
"github.com/redis/go-redis/v9"
)

// RedisClientFromURL creates a new Redis client based on the provided URL.
// The URL scheme can be either `redis` or `redis+sentinel`.
func RedisClientFromURL(redisUrl string) (redis.UniversalClient, error) {
if redisUrl == "" {
return nil, nil
}
redisOptions, err := redis.ParseURL(url)
u, err := url.Parse(redisUrl)
if err != nil {
return nil, err
}
if u.Scheme == "redis+sentinel" {
redisOptions, err := parseFailoverRedisUrl(redisUrl)
if err != nil {
return nil, err
}
return redis.NewFailoverClient(redisOptions), nil
}
redisOptions, err := redis.ParseURL(redisUrl)
if err != nil {
return nil, err
}
return redis.NewClient(redisOptions), nil
}

// Designed using https://github.com/redis/go-redis/blob/a8590e987945b7ba050569cc3b94b8ece49e99e3/options.go#L283 as reference
// Example Usage :
//
// redis+sentinel://<user>:<password>@<host1>:<port1>,<host2>:<port2>,<host3>:<port3>/<master_name/><db_number>?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
func parseFailoverRedisUrl(redisUrl string) (*redis.FailoverOptions, error) {
u, err := url.Parse(redisUrl)
if err != nil {
return nil, err
}
o := &redis.FailoverOptions{}
o.SentinelUsername, o.SentinelPassword = getUserPassword(u)
o.SentinelAddrs = getAddressesWithDefaults(u)
f := strings.FieldsFunc(u.Path, func(r rune) bool {
return r == '/'
})
switch len(f) {
case 0:
return nil, fmt.Errorf("redis: master name is required")
case 1:
o.DB = 0
o.MasterName = f[0]
case 2:
o.MasterName = f[0]
var err error
if o.DB, err = strconv.Atoi(f[1]); err != nil {
return nil, fmt.Errorf("redis: invalid database number: %q", f[0])
}
default:
return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path)
}

return setupConnParams(u, o)
}

func getUserPassword(u *url.URL) (string, string) {
var user, password string
if u.User != nil {
user = u.User.Username()
if p, ok := u.User.Password(); ok {
password = p
}
}
return user, password
}

func getAddressesWithDefaults(u *url.URL) []string {
urlHosts := strings.Split(u.Host, ",")
var addresses []string
for _, urlHost := range urlHosts {
host, port, err := net.SplitHostPort(urlHost)
if err != nil {
host = u.Host
}
if host == "" {
host = "localhost"
}
if port == "" {
port = "6379"
}
addresses = append(addresses, net.JoinHostPort(host, port))
}
return addresses
}

type queryOptions struct {
q url.Values
err error
}

func (o *queryOptions) has(name string) bool {
return len(o.q[name]) > 0
}

func (o *queryOptions) string(name string) string {
vs := o.q[name]
if len(vs) == 0 {
return ""
}
delete(o.q, name) // enable detection of unknown parameters
return vs[len(vs)-1]
}

func (o *queryOptions) int(name string) int {
s := o.string(name)
if s == "" {
return 0
}
i, err := strconv.Atoi(s)
if err == nil {
return i
}
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s number: %w", name, err)
}
return 0
}

func (o *queryOptions) duration(name string) time.Duration {
s := o.string(name)
if s == "" {
return 0
}
// try plain number first
if i, err := strconv.Atoi(s); err == nil {
if i <= 0 {
// disable timeouts
return -1
}
return time.Duration(i) * time.Second
}
dur, err := time.ParseDuration(s)
if err == nil {
return dur
}
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s duration: %w", name, err)
}
return 0
}

func (o *queryOptions) bool(name string) bool {
switch s := o.string(name); s {
case "true", "1":
return true
case "false", "0", "":
return false
default:
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s boolean: expected true/false/1/0 or an empty string, got %q", name, s)
}
return false
}
}

func (o *queryOptions) remaining() []string {
if len(o.q) == 0 {
return nil
}
keys := make([]string, 0, len(o.q))
for k := range o.q {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func setupConnParams(u *url.URL, o *redis.FailoverOptions) (*redis.FailoverOptions, error) {
q := queryOptions{q: u.Query()}

// compat: a future major release may use q.int("db")
if tmp := q.string("db"); tmp != "" {
db, err := strconv.Atoi(tmp)
if err != nil {
return nil, fmt.Errorf("redis: invalid database number: %w", err)
}
o.DB = db
}

o.Protocol = q.int("protocol")
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.PoolTimeout = q.duration("pool_timeout")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxIdleConns = q.int("max_idle_conns")
o.MaxActiveConns = q.int("max_active_conns")
if q.has("conn_max_idle_time") {
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
} else {
o.ConnMaxIdleTime = q.duration("idle_timeout")
}
if q.has("conn_max_lifetime") {
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
} else {
o.ConnMaxLifetime = q.duration("max_conn_age")
}
if q.err != nil {
return nil, q.err
}

// any parameters left?
if r := q.remaining(); len(r) > 0 {
return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
}

return o, nil
}

0 comments on commit 82451dc

Please sign in to comment.