diff --git a/arbnode/seq_coordinator.go b/arbnode/seq_coordinator.go index 5987801d5f..fc2f3c9cf6 100644 --- a/arbnode/seq_coordinator.go +++ b/arbnode/seq_coordinator.go @@ -662,7 +662,7 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration { for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount { var resString string resString, msgReadErr = client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result() - if msgReadErr != nil { + if msgReadErr != nil && c.sequencer.Synced() { log.Warn("coordinator failed reading message", "pos", msgToRead, "err", msgReadErr) break } diff --git a/cmd/conf/init.go b/cmd/conf/init.go index cd2b6c8805..74bd89fd16 100644 --- a/cmd/conf/init.go +++ b/cmd/conf/init.go @@ -27,6 +27,7 @@ type InitConfig struct { ImportWasm bool `koanf:"import-wasm"` AccountsPerSync uint `koanf:"accounts-per-sync"` ImportFile string `koanf:"import-file"` + GenesisJsonFile string `koanf:"genesis-json-file"` ThenQuit bool `koanf:"then-quit"` Prune string `koanf:"prune"` PruneBloomSize uint64 `koanf:"prune-bloom-size"` @@ -54,6 +55,7 @@ var InitConfigDefault = InitConfig{ Empty: false, ImportWasm: false, ImportFile: "", + GenesisJsonFile: "", AccountsPerSync: 100000, ThenQuit: false, Prune: "", @@ -83,6 +85,7 @@ func InitConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Bool(prefix+".import-wasm", InitConfigDefault.ImportWasm, "if set, import the wasm directory when downloading a database (contains executable code - only use with highly trusted source)") f.Bool(prefix+".then-quit", InitConfigDefault.ThenQuit, "quit after init is done") f.String(prefix+".import-file", InitConfigDefault.ImportFile, "path for json data to import") + f.String(prefix+".genesis-json-file", InitConfigDefault.GenesisJsonFile, "path for genesis json file") f.Uint(prefix+".accounts-per-sync", InitConfigDefault.AccountsPerSync, "during init - sync database every X accounts. Lower value for low-memory systems. 0 disables.") f.String(prefix+".prune", InitConfigDefault.Prune, "pruning for a given use: \"full\" for full nodes serving RPC requests, or \"validator\" for validators") f.Uint64(prefix+".prune-bloom-size", InitConfigDefault.PruneBloomSize, "the amount of memory in megabytes to use for the pruning bloom filter (higher values prune better)") diff --git a/cmd/nitro/init.go b/cmd/nitro/init.go index eb6d7df6fc..acad672bb0 100644 --- a/cmd/nitro/init.go +++ b/cmd/nitro/init.go @@ -689,6 +689,36 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo var chainConfig *params.ChainConfig + if config.Init.GenesisJsonFile != "" { + if initDataReader != nil { + return chainDb, nil, errors.New("multiple init methods supplied") + } + genesisJson, err := os.ReadFile(config.Init.GenesisJsonFile) + if err != nil { + return chainDb, nil, err + } + var gen core.Genesis + if err := json.Unmarshal(genesisJson, &gen); err != nil { + return chainDb, nil, err + } + var accounts []statetransfer.AccountInitializationInfo + for address, account := range gen.Alloc { + accounts = append(accounts, statetransfer.AccountInitializationInfo{ + Addr: address, + EthBalance: account.Balance, + Nonce: account.Nonce, + ContractInfo: &statetransfer.AccountInitContractInfo{ + Code: account.Code, + ContractStorage: account.Storage, + }, + }) + } + initDataReader = statetransfer.NewMemoryInitDataReader(&statetransfer.ArbosInitializationInfo{ + Accounts: accounts, + }) + chainConfig = gen.Config + } + var l2BlockChain *core.BlockChain txIndexWg := sync.WaitGroup{} if initDataReader == nil { @@ -714,9 +744,11 @@ func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeCo if err != nil { return chainDb, nil, err } - chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson) - if err != nil { - return chainDb, nil, err + if chainConfig == nil { + chainConfig, err = chaininfo.GetChainConfig(new(big.Int).SetUint64(config.Chain.ID), config.Chain.Name, genesisBlockNr, config.Chain.InfoFiles, config.Chain.InfoJson) + if err != nil { + return chainDb, nil, err + } } if config.Init.DevInit && config.Init.DevMaxCodeSize != 0 { chainConfig.ArbitrumChainParams.MaxCodeSize = config.Init.DevMaxCodeSize diff --git a/das/reader_aggregator_strategies.go b/das/reader_aggregator_strategies.go index 8e10d52c16..e072fdd85c 100644 --- a/das/reader_aggregator_strategies.go +++ b/das/reader_aggregator_strategies.go @@ -5,6 +5,7 @@ package das import ( "errors" + "maps" "math/rand" "sort" "sync" @@ -33,10 +34,7 @@ func (s *abstractAggregatorStrategy) update(readers []daprovider.DASReader, stat s.readers = make([]daprovider.DASReader, len(readers)) copy(s.readers, readers) - s.stats = make(map[daprovider.DASReader]readerStats) - for k, v := range stats { - s.stats[k] = v - } + s.stats = maps.Clone(stats) } // Exponentially growing Explore Exploit Strategy diff --git a/util/redisutil/redisutil.go b/util/redisutil/redisutil.go index 01ba836d5b..fafb816b8a 100644 --- a/util/redisutil/redisutil.go +++ b/util/redisutil/redisutil.go @@ -1,14 +1,231 @@ package redisutil -import "github.com/redis/go-redis/v9" +import ( + "fmt" + "net" + "net/url" + "sort" + "strconv" + "strings" + "time" -func RedisClientFromURL(url string) (redis.UniversalClient, error) { - if url == "" { + "github.com/redis/go-redis/v9" +) + +// RedisClientFromURL creates a new Redis client based on the provided URL. +// The URL scheme can be either `redis` or `redis+sentinel`. +func RedisClientFromURL(redisUrl string) (redis.UniversalClient, error) { + if redisUrl == "" { return nil, nil } - redisOptions, err := redis.ParseURL(url) + u, err := url.Parse(redisUrl) + if err != nil { + return nil, err + } + if u.Scheme == "redis+sentinel" { + redisOptions, err := parseFailoverRedisUrl(redisUrl) + if err != nil { + return nil, err + } + return redis.NewFailoverClient(redisOptions), nil + } + redisOptions, err := redis.ParseURL(redisUrl) if err != nil { return nil, err } return redis.NewClient(redisOptions), nil } + +// Designed using https://github.com/redis/go-redis/blob/a8590e987945b7ba050569cc3b94b8ece49e99e3/options.go#L283 as reference +// Example Usage : +// +// redis+sentinel://:@:,:,:/?dial_timeout=3&db=1&read_timeout=6s&max_retries=2 +func parseFailoverRedisUrl(redisUrl string) (*redis.FailoverOptions, error) { + u, err := url.Parse(redisUrl) + if err != nil { + return nil, err + } + o := &redis.FailoverOptions{} + o.SentinelUsername, o.SentinelPassword = getUserPassword(u) + o.SentinelAddrs = getAddressesWithDefaults(u) + f := strings.FieldsFunc(u.Path, func(r rune) bool { + return r == '/' + }) + switch len(f) { + case 0: + return nil, fmt.Errorf("redis: master name is required") + case 1: + o.DB = 0 + o.MasterName = f[0] + case 2: + o.MasterName = f[0] + var err error + if o.DB, err = strconv.Atoi(f[1]); err != nil { + return nil, fmt.Errorf("redis: invalid database number: %q", f[0]) + } + default: + return nil, fmt.Errorf("redis: invalid URL path: %s", u.Path) + } + + return setupConnParams(u, o) +} + +func getUserPassword(u *url.URL) (string, string) { + var user, password string + if u.User != nil { + user = u.User.Username() + if p, ok := u.User.Password(); ok { + password = p + } + } + return user, password +} + +func getAddressesWithDefaults(u *url.URL) []string { + urlHosts := strings.Split(u.Host, ",") + var addresses []string + for _, urlHost := range urlHosts { + host, port, err := net.SplitHostPort(urlHost) + if err != nil { + host = u.Host + } + if host == "" { + host = "localhost" + } + if port == "" { + port = "6379" + } + addresses = append(addresses, net.JoinHostPort(host, port)) + } + return addresses +} + +type queryOptions struct { + q url.Values + err error +} + +func (o *queryOptions) has(name string) bool { + return len(o.q[name]) > 0 +} + +func (o *queryOptions) string(name string) string { + vs := o.q[name] + if len(vs) == 0 { + return "" + } + delete(o.q, name) // enable detection of unknown parameters + return vs[len(vs)-1] +} + +func (o *queryOptions) int(name string) int { + s := o.string(name) + if s == "" { + return 0 + } + i, err := strconv.Atoi(s) + if err == nil { + return i + } + if o.err == nil { + o.err = fmt.Errorf("redis: invalid %s number: %w", name, err) + } + return 0 +} + +func (o *queryOptions) duration(name string) time.Duration { + s := o.string(name) + if s == "" { + return 0 + } + // try plain number first + if i, err := strconv.Atoi(s); err == nil { + if i <= 0 { + // disable timeouts + return -1 + } + return time.Duration(i) * time.Second + } + dur, err := time.ParseDuration(s) + if err == nil { + return dur + } + if o.err == nil { + o.err = fmt.Errorf("redis: invalid %s duration: %w", name, err) + } + return 0 +} + +func (o *queryOptions) bool(name string) bool { + switch s := o.string(name); s { + case "true", "1": + return true + case "false", "0", "": + return false + default: + if o.err == nil { + o.err = fmt.Errorf("redis: invalid %s boolean: expected true/false/1/0 or an empty string, got %q", name, s) + } + return false + } +} + +func (o *queryOptions) remaining() []string { + if len(o.q) == 0 { + return nil + } + keys := make([]string, 0, len(o.q)) + for k := range o.q { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +func setupConnParams(u *url.URL, o *redis.FailoverOptions) (*redis.FailoverOptions, error) { + q := queryOptions{q: u.Query()} + + // compat: a future major release may use q.int("db") + if tmp := q.string("db"); tmp != "" { + db, err := strconv.Atoi(tmp) + if err != nil { + return nil, fmt.Errorf("redis: invalid database number: %w", err) + } + o.DB = db + } + + o.Protocol = q.int("protocol") + o.ClientName = q.string("client_name") + o.MaxRetries = q.int("max_retries") + o.MinRetryBackoff = q.duration("min_retry_backoff") + o.MaxRetryBackoff = q.duration("max_retry_backoff") + o.DialTimeout = q.duration("dial_timeout") + o.ReadTimeout = q.duration("read_timeout") + o.WriteTimeout = q.duration("write_timeout") + o.PoolFIFO = q.bool("pool_fifo") + o.PoolSize = q.int("pool_size") + o.PoolTimeout = q.duration("pool_timeout") + o.MinIdleConns = q.int("min_idle_conns") + o.MaxIdleConns = q.int("max_idle_conns") + o.MaxActiveConns = q.int("max_active_conns") + if q.has("conn_max_idle_time") { + o.ConnMaxIdleTime = q.duration("conn_max_idle_time") + } else { + o.ConnMaxIdleTime = q.duration("idle_timeout") + } + if q.has("conn_max_lifetime") { + o.ConnMaxLifetime = q.duration("conn_max_lifetime") + } else { + o.ConnMaxLifetime = q.duration("max_conn_age") + } + if q.err != nil { + return nil, q.err + } + + // any parameters left? + if r := q.remaining(); len(r) > 0 { + return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(r, ", ")) + } + + return o, nil +} diff --git a/util/stopwaiter/stopwaiter.go b/util/stopwaiter/stopwaiter.go index 993768dd85..c242ac26ab 100644 --- a/util/stopwaiter/stopwaiter.go +++ b/util/stopwaiter/stopwaiter.go @@ -96,20 +96,12 @@ func (s *StopWaiterSafe) Start(ctx context.Context, parent any) error { } func (s *StopWaiterSafe) StopOnly() { - _ = s.stopOnly() -} - -// returns true if stop function was called -func (s *StopWaiterSafe) stopOnly() bool { - stopWasCalled := false s.mutex.Lock() defer s.mutex.Unlock() if s.started && !s.stopped { s.stopFunc() - stopWasCalled = true } s.stopped = true - return stopWasCalled } // StopAndWait may be called multiple times, even before start. @@ -126,9 +118,15 @@ func getAllStackTraces() string { } func (s *StopWaiterSafe) stopAndWaitImpl(warningTimeout time.Duration) error { - if !s.stopOnly() { + s.StopOnly() + if !s.Started() { + // No need to wait, because nothing can be started if it's already stopped. return nil } + // Even if StopOnly has been previously called, make sure we wait for everything to shut down. + // Otherwise, a StopOnly call followed by StopAndWait might return early without waiting. + // At this point started must be true (because it was true above and cannot go back to false), + // so GetWaitChannel won't return an error. waitChan, err := s.GetWaitChannel() if err != nil { return err diff --git a/util/stopwaiter/stopwaiter_test.go b/util/stopwaiter/stopwaiter_test.go index c561e1f43b..68e49ac2be 100644 --- a/util/stopwaiter/stopwaiter_test.go +++ b/util/stopwaiter/stopwaiter_test.go @@ -5,6 +5,7 @@ package stopwaiter import ( "context" + "sync/atomic" "testing" "time" @@ -73,3 +74,19 @@ func TestStopWaiterStopAndWaitMultipleTimes(t *testing.T) { sw.StopAndWait() sw.StopAndWait() } + +func TestStopWaiterStopOnlyThenStopAndWait(t *testing.T) { + t.Parallel() + sw := StopWaiter{} + sw.Start(context.Background(), &TestStruct{}) + var threadStopping atomic.Bool + sw.LaunchThread(func(context.Context) { + time.Sleep(time.Second) + threadStopping.Store(true) + }) + sw.StopOnly() + sw.StopAndWait() + if !threadStopping.Load() { + t.Error("StopAndWait returned before background thread stopped") + } +}