diff --git a/internal/dcs/config.go b/internal/dcs/config.go index 5047e00a..74d15958 100644 --- a/internal/dcs/config.go +++ b/internal/dcs/config.go @@ -9,24 +9,35 @@ import ( // ZookeeperConfig contains Zookeeper connection info type ZookeeperConfig struct { - Hostname string `config:"hostname" yaml:"hostname"` - SessionTimeout time.Duration `config:"session_timeout" yaml:"session_timeout"` - Namespace string `config:"namespace,required"` - Hosts []string `config:"hosts,required"` - BackoffInterval time.Duration `config:"backoff_interval" yaml:"backoff_interval"` - BackoffRandFactor float64 `config:"backoff_rand_factor" yaml:"backoff_rand_factor"` - BackoffMultiplier float64 `config:"backoff_multiplier" yaml:"backoff_multiplier"` - BackoffMaxInterval time.Duration `config:"backoff_max_interval" yaml:"backoff_max_interval"` - BackoffMaxElapsedTime time.Duration `config:"backoff_max_elapsed_time" yaml:"backoff_max_elapsed_time"` - BackoffMaxRetries uint64 `config:"backoff_max_retries" yaml:"backoff_max_retries"` - Auth bool `config:"auth" yaml:"auth"` - Username string `config:"username" yaml:"username"` - Password string `config:"password" yaml:"password"` - UseSSL bool `config:"use_ssl" yaml:"use_ssl"` - KeyFile string `config:"keyfile" yaml:"keyfile"` - CertFile string `config:"certfile" yaml:"certfile"` - CACert string `config:"ca_cert" yaml:"ca_cert"` - VerifyCerts bool `config:"verify_certs" yaml:"verify_certs"` + Hostname string `config:"hostname" yaml:"hostname"` + SessionTimeout time.Duration `config:"session_timeout" yaml:"session_timeout"` + Namespace string `config:"namespace,required"` + Hosts []string `config:"hosts,required"` + BackoffInterval time.Duration `config:"backoff_interval" yaml:"backoff_interval"` + BackoffRandFactor float64 `config:"backoff_rand_factor" yaml:"backoff_rand_factor"` + BackoffMultiplier float64 `config:"backoff_multiplier" yaml:"backoff_multiplier"` + BackoffMaxInterval time.Duration `config:"backoff_max_interval" yaml:"backoff_max_interval"` + BackoffMaxElapsedTime time.Duration `config:"backoff_max_elapsed_time" yaml:"backoff_max_elapsed_time"` + BackoffMaxRetries uint64 `config:"backoff_max_retries" yaml:"backoff_max_retries"` + RandomHostProvider RandomHostProviderConfig `config:"random_host_provider" yaml:"random_host_provider"` + Auth bool `config:"auth" yaml:"auth"` + Username string `config:"username" yaml:"username"` + Password string `config:"password" yaml:"password"` + UseSSL bool `config:"use_ssl" yaml:"use_ssl"` + KeyFile string `config:"keyfile" yaml:"keyfile"` + CertFile string `config:"certfile" yaml:"certfile"` + CACert string `config:"ca_cert" yaml:"ca_cert"` + VerifyCerts bool `config:"verify_certs" yaml:"verify_certs"` +} + +type RandomHostProviderConfig struct { + LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"` +} + +func DefaultRandomHostProviderConfig() RandomHostProviderConfig { + return RandomHostProviderConfig{ + LookupTTL: 300 * time.Second, + } } // DefaultZookeeperConfig return default Zookeeper connection configuration @@ -44,6 +55,7 @@ func DefaultZookeeperConfig() (ZookeeperConfig, error) { BackoffMaxInterval: backoff.DefaultMaxInterval, BackoffMaxElapsedTime: backoff.DefaultMaxElapsedTime, BackoffMaxRetries: 10, + RandomHostProvider: DefaultRandomHostProviderConfig(), } return config, nil } diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index cd7a9a09..64373fac 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -70,6 +70,7 @@ func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { var ec <-chan zk.Event var err error var operation func() error + hostProvider := NewRandomHostProvider(&config.RandomHostProvider, logger) if config.UseSSL { if config.CACert == "" || config.KeyFile == "" || config.CertFile == "" { return nil, fmt.Errorf("zookeeper ssl not configured, fill ca_cert/key_file/cert_file in config or disable use_ssl flag") @@ -85,12 +86,12 @@ func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { } operation = func() error { - conn, ec, err = zk.Connect(config.Hosts, config.SessionTimeout, zk.WithLogger(zkLoggerProxy{logger}), zk.WithDialer(dialer)) + conn, ec, err = zk.Connect(config.Hosts, config.SessionTimeout, zk.WithLogger(zkLoggerProxy{logger}), zk.WithDialer(dialer), zk.WithHostProvider(hostProvider)) return err } } else { operation = func() error { - conn, ec, err = zk.Connect(config.Hosts, config.SessionTimeout, zk.WithLogger(zkLoggerProxy{logger})) + conn, ec, err = zk.Connect(config.Hosts, config.SessionTimeout, zk.WithLogger(zkLoggerProxy{logger}), zk.WithHostProvider(hostProvider)) return err } } diff --git a/internal/dcs/zk_host_provider.go b/internal/dcs/zk_host_provider.go new file mode 100644 index 00000000..dbfd6072 --- /dev/null +++ b/internal/dcs/zk_host_provider.go @@ -0,0 +1,123 @@ +package dcs + +import ( + "fmt" + "math/rand" + "net" + "sync" + "time" + + "github.com/yandex/mysync/internal/log" +) + +type RandomHostProvider struct { + lock sync.Mutex + servers []string + resolved []string + tried map[string]struct{} + logger *log.Logger + lastLookup time.Time + lookupTTL time.Duration +} + +func NewRandomHostProvider(config *RandomHostProviderConfig, logger *log.Logger) *RandomHostProvider { + return &RandomHostProvider{ + lookupTTL: config.LookupTTL, + logger: logger, + tried: make(map[string]struct{}), + } +} + +func (rhp *RandomHostProvider) Init(servers []string) error { + rhp.lock.Lock() + defer rhp.lock.Unlock() + + rhp.servers = servers + + err := rhp.resolveHosts() + + if err != nil { + return fmt.Errorf("failed to init zk host provider %v", err) + } + + return nil +} + +func (rhp *RandomHostProvider) resolveHosts() error { + resolved := []string{} + for _, server := range rhp.servers { + host, port, err := net.SplitHostPort(server) + if err != nil { + return err + } + addrs, err := net.LookupHost(host) + if err != nil { + rhp.logger.Errorf("unable to resolve %s: %v", host, err) + } + for _, addr := range addrs { + resolved = append(resolved, net.JoinHostPort(addr, port)) + } + } + + if len(resolved) == 0 { + return fmt.Errorf("no hosts resolved for %q", rhp.servers) + } + + rhp.lastLookup = time.Now() + rhp.resolved = resolved + + rand.Shuffle(len(rhp.resolved), func(i, j int) { rhp.resolved[i], rhp.resolved[j] = rhp.resolved[j], rhp.resolved[i] }) + + return nil +} + +func (rhp *RandomHostProvider) Len() int { + rhp.lock.Lock() + defer rhp.lock.Unlock() + return len(rhp.resolved) +} + +func (rhp *RandomHostProvider) Next() (server string, retryStart bool) { + rhp.lock.Lock() + defer rhp.lock.Unlock() + lastTime := time.Since(rhp.lastLookup) + needRetry := false + if lastTime > rhp.lookupTTL { + err := rhp.resolveHosts() + if err != nil { + rhp.logger.Errorf("resolve zk hosts failed: %v", err) + } + } + + notTried := []string{} + + for _, addr := range rhp.resolved { + if _, ok := rhp.tried[addr]; !ok { + notTried = append(notTried, addr) + } + } + + var selected string + + if len(notTried) == 0 { + needRetry = true + for k := range rhp.tried { + delete(rhp.tried, k) + } + selected = rhp.resolved[rand.Intn(len(rhp.resolved))] + } else { + selected = notTried[rand.Intn(len(notTried))] + } + + rhp.tried[selected] = struct{}{} + + return selected, needRetry +} + +func (rhp *RandomHostProvider) Connected() { + rhp.lock.Lock() + defer rhp.lock.Unlock() + for k := range rhp.tried { + delete(rhp.tried, k) + } +}