Skip to content

Commit

Permalink
Merge branch 'master' into update-gethpin-v1.14.4
Browse files Browse the repository at this point in the history
  • Loading branch information
amsanghi authored Dec 10, 2024
2 parents 03dd706 + 955f058 commit 418a1f8
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 21 deletions.
2 changes: 1 addition & 1 deletion arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
if msgReadErr != nil && c.sequencer.Synced() {
log.Warn("coordinator failed reading message", "pos", msgToRead, "err", msgReadErr)
break
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/conf/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type InitConfig struct {
ImportWasm bool `koanf:"import-wasm"`
AccountsPerSync uint `koanf:"accounts-per-sync"`
ImportFile string `koanf:"import-file"`
GenesisJsonFile string `koanf:"genesis-json-file"`
ThenQuit bool `koanf:"then-quit"`
Prune string `koanf:"prune"`
PruneBloomSize uint64 `koanf:"prune-bloom-size"`
Expand Down Expand Up @@ -54,6 +55,7 @@ var InitConfigDefault = InitConfig{
Empty: false,
ImportWasm: false,
ImportFile: "",
GenesisJsonFile: "",
AccountsPerSync: 100000,
ThenQuit: false,
Prune: "",
Expand Down Expand Up @@ -83,6 +85,7 @@ func InitConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".import-wasm", InitConfigDefault.ImportWasm, "if set, import the wasm directory when downloading a database (contains executable code - only use with highly trusted source)")
f.Bool(prefix+".then-quit", InitConfigDefault.ThenQuit, "quit after init is done")
f.String(prefix+".import-file", InitConfigDefault.ImportFile, "path for json data to import")
f.String(prefix+".genesis-json-file", InitConfigDefault.GenesisJsonFile, "path for genesis json file")
f.Uint(prefix+".accounts-per-sync", InitConfigDefault.AccountsPerSync, "during init - sync database every X accounts. Lower value for low-memory systems. 0 disables.")
f.String(prefix+".prune", InitConfigDefault.Prune, "pruning for a given use: \"full\" for full nodes serving RPC requests, or \"validator\" for validators")
f.Uint64(prefix+".prune-bloom-size", InitConfigDefault.PruneBloomSize, "the amount of memory in megabytes to use for the pruning bloom filter (higher values prune better)")
Expand Down
38 changes: 35 additions & 3 deletions cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,36 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo

var chainConfig *params.ChainConfig

if config.Init.GenesisJsonFile != "" {
if initDataReader != nil {
return chainDb, nil, errors.New("multiple init methods supplied")
}
genesisJson, err := os.ReadFile(config.Init.GenesisJsonFile)
if err != nil {
return chainDb, nil, err
}
var gen core.Genesis
if err := json.Unmarshal(genesisJson, &gen); err != nil {
return chainDb, nil, err
}
var accounts []statetransfer.AccountInitializationInfo
for address, account := range gen.Alloc {
accounts = append(accounts, statetransfer.AccountInitializationInfo{
Addr: address,
EthBalance: account.Balance,
Nonce: account.Nonce,
ContractInfo: &statetransfer.AccountInitContractInfo{
Code: account.Code,
ContractStorage: account.Storage,
},
})
}
initDataReader = statetransfer.NewMemoryInitDataReader(&statetransfer.ArbosInitializationInfo{
Accounts: accounts,
})
chainConfig = gen.Config
}

var l2BlockChain *core.BlockChain
txIndexWg := sync.WaitGroup{}
if initDataReader == nil {
Expand All @@ -714,9 +744,11 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo
if err != nil {
return chainDb, nil, err
}
chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson)
if err != nil {
return chainDb, nil, err
if chainConfig == nil {
chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson)
if err != nil {
return chainDb, nil, err
}
}
if config.Init.DevInit && config.Init.DevMaxCodeSize != 0 {
chainConfig.ArbitrumChainParams.MaxCodeSize = config.Init.DevMaxCodeSize
Expand Down
6 changes: 2 additions & 4 deletions das/reader_aggregator_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package das

import (
"errors"
"maps"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -33,10 +34,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat
s.readers = make([]daprovider.DASReader, len(readers))
copy(s.readers, readers)

s.stats = make(map[daprovider.DASReader]readerStats)
for k, v := range stats {
s.stats[k] = v
}
s.stats = maps.Clone(stats)
}

// Exponentially growing Explore Exploit Strategy
Expand Down
225 changes: 221 additions & 4 deletions util/redisutil/redisutil.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,231 @@
package redisutil

import "github.com/redis/go-redis/v9"
import (
"fmt"
"net"
"net/url"
"sort"
"strconv"
"strings"
"time"

func RedisClientFromURL(url string) (redis.UniversalClient, error) {
if url == "" {
"github.com/redis/go-redis/v9"
)

// RedisClientFromURL creates a new Redis client based on the provided URL.
// The URL scheme can be either `redis` or `redis+sentinel`.
func RedisClientFromURL(redisUrl string) (redis.UniversalClient, error) {
if redisUrl == "" {
return nil, nil
}
redisOptions, err := redis.ParseURL(url)
u, err := url.Parse(redisUrl)
if err != nil {
return nil, err
}
if u.Scheme == "redis+sentinel" {
redisOptions, err := parseFailoverRedisUrl(redisUrl)
if err != nil {
return nil, err
}
return redis.NewFailoverClient(redisOptions), nil
}
redisOptions, err := redis.ParseURL(redisUrl)
if err != nil {
return nil, err
}
return redis.NewClient(redisOptions), nil
}

// Designed using https://github.com/redis/go-redis/blob/a8590e987945b7ba050569cc3b94b8ece49e99e3/options.go#L283 as reference
// Example Usage :
//
// redis+sentinel://<user>:<password>@<host1>:<port1>,<host2>:<port2>,<host3>:<port3>/<master_name/><db_number>?dial_timeout=3&db=1&read_timeout=6s&max_retries=2
func parseFailoverRedisUrl(redisUrl string) (*redis.FailoverOptions, error) {
u, err := url.Parse(redisUrl)
if err != nil {
return nil, err
}
o := &redis.FailoverOptions{}
o.SentinelUsername, o.SentinelPassword = getUserPassword(u)
o.SentinelAddrs = getAddressesWithDefaults(u)
f := strings.FieldsFunc(u.Path, func(r rune) bool {
return r == '/'
})
switch len(f) {
case 0:
return nil, fmt.Errorf("redis: master name is required")
case 1:
o.DB = 0
o.MasterName = f[0]
case 2:
o.MasterName = f[0]
var err error
if o.DB, err = strconv.Atoi(f[1]); err != nil {
return nil, fmt.Errorf("redis: invalid database number: %q", f[0])
}
default:
return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path)
}

return setupConnParams(u, o)
}

func getUserPassword(u *url.URL) (string, string) {
var user, password string
if u.User != nil {
user = u.User.Username()
if p, ok := u.User.Password(); ok {
password = p
}
}
return user, password
}

func getAddressesWithDefaults(u *url.URL) []string {
urlHosts := strings.Split(u.Host, ",")
var addresses []string
for _, urlHost := range urlHosts {
host, port, err := net.SplitHostPort(urlHost)
if err != nil {
host = u.Host
}
if host == "" {
host = "localhost"
}
if port == "" {
port = "6379"
}
addresses = append(addresses, net.JoinHostPort(host, port))
}
return addresses
}

type queryOptions struct {
q url.Values
err error
}

func (o *queryOptions) has(name string) bool {
return len(o.q[name]) > 0
}

func (o *queryOptions) string(name string) string {
vs := o.q[name]
if len(vs) == 0 {
return ""
}
delete(o.q, name) // enable detection of unknown parameters
return vs[len(vs)-1]
}

func (o *queryOptions) int(name string) int {
s := o.string(name)
if s == "" {
return 0
}
i, err := strconv.Atoi(s)
if err == nil {
return i
}
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s number: %w", name, err)
}
return 0
}

func (o *queryOptions) duration(name string) time.Duration {
s := o.string(name)
if s == "" {
return 0
}
// try plain number first
if i, err := strconv.Atoi(s); err == nil {
if i <= 0 {
// disable timeouts
return -1
}
return time.Duration(i) * time.Second
}
dur, err := time.ParseDuration(s)
if err == nil {
return dur
}
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s duration: %w", name, err)
}
return 0
}

func (o *queryOptions) bool(name string) bool {
switch s := o.string(name); s {
case "true", "1":
return true
case "false", "0", "":
return false
default:
if o.err == nil {
o.err = fmt.Errorf("redis: invalid %s boolean: expected true/false/1/0 or an empty string, got %q", name, s)
}
return false
}
}

func (o *queryOptions) remaining() []string {
if len(o.q) == 0 {
return nil
}
keys := make([]string, 0, len(o.q))
for k := range o.q {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}

func setupConnParams(u *url.URL, o *redis.FailoverOptions) (*redis.FailoverOptions, error) {
q := queryOptions{q: u.Query()}

// compat: a future major release may use q.int("db")
if tmp := q.string("db"); tmp != "" {
db, err := strconv.Atoi(tmp)
if err != nil {
return nil, fmt.Errorf("redis: invalid database number: %w", err)
}
o.DB = db
}

o.Protocol = q.int("protocol")
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
o.MaxRetryBackoff = q.duration("max_retry_backoff")
o.DialTimeout = q.duration("dial_timeout")
o.ReadTimeout = q.duration("read_timeout")
o.WriteTimeout = q.duration("write_timeout")
o.PoolFIFO = q.bool("pool_fifo")
o.PoolSize = q.int("pool_size")
o.PoolTimeout = q.duration("pool_timeout")
o.MinIdleConns = q.int("min_idle_conns")
o.MaxIdleConns = q.int("max_idle_conns")
o.MaxActiveConns = q.int("max_active_conns")
if q.has("conn_max_idle_time") {
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
} else {
o.ConnMaxIdleTime = q.duration("idle_timeout")
}
if q.has("conn_max_lifetime") {
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
} else {
o.ConnMaxLifetime = q.duration("max_conn_age")
}
if q.err != nil {
return nil, q.err
}

// any parameters left?
if r := q.remaining(); len(r) > 0 {
return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", "))
}

return o, nil
}
16 changes: 7 additions & 9 deletions util/stopwaiter/stopwaiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,12 @@ func (s *StopWaiterSafe) Start(ctx context.Context, parent any) error {
}

func (s *StopWaiterSafe) StopOnly() {
_ = s.stopOnly()
}

// returns true if stop function was called
func (s *StopWaiterSafe) stopOnly() bool {
stopWasCalled := false
s.mutex.Lock()
defer s.mutex.Unlock()
if s.started && !s.stopped {
s.stopFunc()
stopWasCalled = true
}
s.stopped = true
return stopWasCalled
}

// StopAndWait may be called multiple times, even before start.
Expand All @@ -126,9 +118,15 @@ func getAllStackTraces() string {
}

func (s *StopWaiterSafe) stopAndWaitImpl(warningTimeout time.Duration) error {
if !s.stopOnly() {
s.StopOnly()
if !s.Started() {
// No need to wait, because nothing can be started if it's already stopped.
return nil
}
// Even if StopOnly has been previously called, make sure we wait for everything to shut down.
// Otherwise, a StopOnly call followed by StopAndWait might return early without waiting.
// At this point started must be true (because it was true above and cannot go back to false),
// so GetWaitChannel won't return an error.
waitChan, err := s.GetWaitChannel()
if err != nil {
return err
Expand Down
Loading

0 comments on commit 418a1f8

Please sign in to comment.