From a54e3c1a80f3fed445200f1379c0c6cc43b22331 Mon Sep 17 00:00:00 2001 From: secwall Date: Fri, 23 Aug 2024 15:54:04 +0200 Subject: [PATCH] Resolve zk hosts in background --- Makefile | 2 +- internal/app/app.go | 12 +- internal/app/cli.go | 56 +++++----- internal/dcs/config.go | 10 +- internal/dcs/zk.go | 5 +- internal/dcs/zk_host_provider.go | 181 ++++++++++++++++++------------- internal/log/log.go | 10 +- 7 files changed, 155 insertions(+), 121 deletions(-) diff --git a/Makefile b/Makefile index c3035248..3de76f18 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ format: goimports -w `find . -name '*.go'` lint: - docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.58 golangci-lint run -v + docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.60 golangci-lint run -v unittests: go test ./cmd/... ./internal/... diff --git a/internal/app/app.go b/internal/app/app.go index 8bf4dbdb..c6c3d639 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -107,7 +107,7 @@ func (app *App) baseContext() context.Context { func (app *App) connectDCS() error { var err error // TODO: support other DCS systems - app.dcs, err = dcs.NewZookeeper(&app.config.Zookeeper, app.logger) + app.dcs, err = dcs.NewZookeeper(app.baseContext(), &app.config.Zookeeper, app.logger) if err != nil { return fmt.Errorf("failed to connect to zkDCS: %s", err.Error()) } @@ -616,7 +616,7 @@ func (app *App) stateManager() appState { // activeNodes are master + alive running replicas activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return stateManager } app.logger.Infof("active: %v", activeNodes) @@ -681,7 +681,7 @@ func (app *App) stateManager() appState { } return stateManager } else if err != dcs.ErrNotFound { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return stateManager } @@ -2303,7 +2303,7 @@ func (app *App) Run() int { err := app.lockFile() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.unlockFile() @@ -2313,14 +2313,14 @@ func (app *App) Run() int { err = app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() diff --git a/internal/app/cli.go b/internal/app/cli.go index 6756f73f..c5389269 100644 --- a/internal/app/cli.go +++ b/internal/app/cli.go @@ -20,7 +20,7 @@ import ( func (app *App) CliInfo(short bool) int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } app.dcs.Initialize() @@ -28,12 +28,12 @@ func (app *App) CliInfo(short bool) int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -57,7 +57,7 @@ func (app *App) CliInfo(short bool) int { activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } sort.Strings(activeNodes) @@ -123,7 +123,7 @@ func (app *App) CliInfo(short bool) int { } else { tree, err = app.dcs.GetTree("") if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } } @@ -140,20 +140,20 @@ func (app *App) CliInfo(short bool) int { func (app *App) CliState(short bool) int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() app.dcs.Initialize() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -191,20 +191,20 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() app.dcs.Initialize() err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() if err := app.cluster.UpdateHostsInfo(); err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -223,7 +223,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration } activeNodes, err := app.GetActiveNodes() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -277,12 +277,12 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration // to avoid switching from one to another, use switch to behavior positions, err := app.getNodePositions(candidates) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag()) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } } @@ -295,7 +295,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration return 2 } if err != dcs.ErrNotFound { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 2 } @@ -311,7 +311,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration return 2 } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } // wait for switchover to complete @@ -353,7 +353,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { ctx := app.baseContext() err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() @@ -365,7 +365,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { } err = app.dcs.Create(pathMaintenance, maintenance) if err != nil && err != dcs.ErrExists { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } // wait for mysync to pause @@ -379,7 +379,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int { case <-ticker.C: err = app.dcs.Get(pathMaintenance, maintenance) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) } if maintenance.MySyncPaused { break Out @@ -404,7 +404,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { ctx := app.baseContext() err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.dcs.Close() @@ -439,7 +439,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int { break Out } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) } case <-waitCtx.Done(): break Out @@ -495,7 +495,7 @@ func (app *App) CliAbort() int { return 0 } if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -514,7 +514,7 @@ func (app *App) CliAbort() int { err = app.dcs.Delete(pathCurrentSwitch) if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -526,7 +526,7 @@ func (app *App) CliAbort() int { func (app *App) CliHostList() int { err := app.connectDCS() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } app.dcs.Initialize() @@ -534,7 +534,7 @@ func (app *App) CliHostList() int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() @@ -584,7 +584,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() @@ -601,7 +601,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry err = app.cluster.UpdateHostsInfo() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } @@ -667,7 +667,7 @@ func (app *App) CliHostRemove(host string) int { err = app.newDBCluster() if err != nil { - app.logger.Errorf(err.Error()) + app.logger.Error(err.Error()) return 1 } defer app.cluster.Close() diff --git a/internal/dcs/config.go b/internal/dcs/config.go index 7884ea4d..6eff2e66 100644 --- a/internal/dcs/config.go +++ b/internal/dcs/config.go @@ -31,14 +31,16 @@ type ZookeeperConfig struct { } type RandomHostProviderConfig struct { - LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"` - LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"` + LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"` + LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"` + LookupTickInterval time.Duration `config:"lookup_tick_interval" yaml:"lookup_tick_interval"` } func DefaultRandomHostProviderConfig() RandomHostProviderConfig { return RandomHostProviderConfig{ - LookupTimeout: 3 * time.Second, - LookupTTL: 300 * time.Second, + LookupTimeout: 3 * time.Second, + LookupTTL: 300 * time.Second, + LookupTickInterval: 60 * time.Second, } } diff --git a/internal/dcs/zk.go b/internal/dcs/zk.go index 64373fac..c56e8ccd 100644 --- a/internal/dcs/zk.go +++ b/internal/dcs/zk.go @@ -1,6 +1,7 @@ package dcs import ( + "context" "encoding/json" "fmt" "net" @@ -52,7 +53,7 @@ func retry(config *ZookeeperConfig, operation func() error) error { } // NewZookeeper returns Zookeeper based DCS storage -func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { +func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *log.Logger) (DCS, error) { if len(config.Hosts) == 0 { return nil, fmt.Errorf("zookeeper not configured, fill zookeeper/hosts in config") } @@ -70,7 +71,7 @@ func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) { var ec <-chan zk.Event var err error var operation func() error - hostProvider := NewRandomHostProvider(&config.RandomHostProvider, logger) + hostProvider := NewRandomHostProvider(ctx, &config.RandomHostProvider, logger) if config.UseSSL { if config.CACert == "" || config.KeyFile == "" || config.CertFile == "" { return nil, fmt.Errorf("zookeeper ssl not configured, fill ca_cert/key_file/cert_file in config or disable use_ssl flag") diff --git a/internal/dcs/zk_host_provider.go b/internal/dcs/zk_host_provider.go index 5ead36fc..9a50a475 100644 --- a/internal/dcs/zk_host_provider.go +++ b/internal/dcs/zk_host_provider.go @@ -11,119 +11,150 @@ import ( "github.com/yandex/mysync/internal/log" ) +type zkhost struct { + resolved []string + lastLookup time.Time +} + type RandomHostProvider struct { - lock sync.Mutex - servers []string - resolved []string - tried map[string]struct{} - logger *log.Logger - lastLookup time.Time - lookupTTL time.Duration - lookupTimeout time.Duration - resolver *net.Resolver + ctx context.Context + hosts sync.Map + hostsKeys []string + tried map[string]struct{} + logger *log.Logger + lookupTTL time.Duration + lookupTimeout time.Duration + lookupTickInterval time.Duration + resolver *net.Resolver } -func NewRandomHostProvider(config *RandomHostProviderConfig, logger *log.Logger) *RandomHostProvider { +func NewRandomHostProvider(ctx context.Context, config *RandomHostProviderConfig, logger *log.Logger) *RandomHostProvider { return &RandomHostProvider{ - lookupTTL: config.LookupTTL, - lookupTimeout: config.LookupTimeout, - logger: logger, - tried: make(map[string]struct{}), - resolver: &net.Resolver{}, + ctx: ctx, + lookupTTL: config.LookupTTL, + lookupTimeout: config.LookupTimeout, + lookupTickInterval: config.LookupTickInterval, + logger: logger, + tried: make(map[string]struct{}), + hosts: sync.Map{}, + resolver: &net.Resolver{}, } } func (rhp *RandomHostProvider) Init(servers []string) error { - rhp.lock.Lock() - defer rhp.lock.Unlock() + numResolved := 0 - rhp.servers = servers - - err := rhp.resolveHosts() + for _, host := range servers { + resolved, err := rhp.resolveHost(host) + if err != nil { + rhp.logger.Errorf("host definition %s is invalid %v", host, err) + continue + } + numResolved += len(resolved) + rhp.hosts.Store(host, zkhost{ + resolved: resolved, + lastLookup: time.Now(), + }) + rhp.hostsKeys = append(rhp.hostsKeys, host) + } - if err != nil { - return fmt.Errorf("failed to init zk host provider %v", err) + if numResolved == 0 { + return fmt.Errorf("unable to resolve any host from %v", servers) } + go rhp.resolveHosts() + return nil } -func (rhp *RandomHostProvider) resolveHosts() error { - resolved := []string{} - for _, server := range rhp.servers { - host, port, err := net.SplitHostPort(server) - if err != nil { - return err - } - ctx, cancel := context.WithTimeout(context.Background(), rhp.lookupTimeout) - defer cancel() - addrs, err := rhp.resolver.LookupHost(ctx, host) - if err != nil { - rhp.logger.Errorf("unable to resolve %s: %v", host, err) - } - for _, addr := range addrs { - resolved = append(resolved, net.JoinHostPort(addr, port)) +func (rhp *RandomHostProvider) resolveHosts() { + ticker := time.NewTicker(rhp.lookupTickInterval) + for { + select { + case <-ticker.C: + for _, pair := range rhp.hostsKeys { + host, _ := rhp.hosts.Load(pair) + zhost := host.(zkhost) + + if len(zhost.resolved) == 0 || time.Since(zhost.lastLookup) > rhp.lookupTTL { + resolved, err := rhp.resolveHost(pair) + if err != nil || len(resolved) == 0 { + rhp.logger.Errorf("background resolve for %s failed: %v", pair, err) + continue + } + rhp.hosts.Store(pair, zkhost{ + resolved: resolved, + lastLookup: time.Now(), + }) + } + } + case <-rhp.ctx.Done(): + return } } +} - if len(resolved) == 0 { - return fmt.Errorf("no hosts resolved for %q", rhp.servers) +func (rhp *RandomHostProvider) resolveHost(pair string) ([]string, error) { + var res []string + host, port, err := net.SplitHostPort(pair) + if err != nil { + return res, err + } + ctx, cancel := context.WithTimeout(rhp.ctx, rhp.lookupTimeout) + defer cancel() + addrs, err := rhp.resolver.LookupHost(ctx, host) + if err != nil { + rhp.logger.Errorf("unable to resolve %s: %v", host, err) + } + for _, addr := range addrs { + res = append(res, net.JoinHostPort(addr, port)) } - rhp.lastLookup = time.Now() - rhp.resolved = resolved - - rand.Shuffle(len(rhp.resolved), func(i, j int) { rhp.resolved[i], rhp.resolved[j] = rhp.resolved[j], rhp.resolved[i] }) - - return nil + return res, nil } func (rhp *RandomHostProvider) Len() int { - rhp.lock.Lock() - defer rhp.lock.Unlock() - return len(rhp.resolved) + return len(rhp.hostsKeys) } func (rhp *RandomHostProvider) Next() (server string, retryStart bool) { - rhp.lock.Lock() - defer rhp.lock.Unlock() - lastTime := time.Since(rhp.lastLookup) needRetry := false - if lastTime > rhp.lookupTTL { - err := rhp.resolveHosts() - if err != nil { - rhp.logger.Errorf("resolve zk hosts failed: %v", err) - } - } - notTried := []string{} + var ret string - for _, addr := range rhp.resolved { - if _, ok := rhp.tried[addr]; !ok { - notTried = append(notTried, addr) + for len(ret) == 0 { + notTried := []string{} + + for _, host := range rhp.hostsKeys { + if _, ok := rhp.tried[host]; !ok { + notTried = append(notTried, host) + } } - } - var selected string + var selected string + if len(notTried) == 0 { + needRetry = true + for k := range rhp.tried { + delete(rhp.tried, k) + } + selected = rhp.hostsKeys[rand.Intn(len(rhp.hostsKeys))] + } else { + selected = notTried[rand.Intn(len(notTried))] + } + rhp.tried[selected] = struct{}{} + + host, _ := rhp.hosts.Load(selected) + zhost := host.(zkhost) - if len(notTried) == 0 { - needRetry = true - for k := range rhp.tried { - delete(rhp.tried, k) + if len(zhost.resolved) > 0 { + ret = zhost.resolved[rand.Intn(len(zhost.resolved))] } - selected = rhp.resolved[rand.Intn(len(rhp.resolved))] - } else { - selected = notTried[rand.Intn(len(notTried))] } - rhp.tried[selected] = struct{}{} - - return selected, needRetry + return ret, needRetry } func (rhp *RandomHostProvider) Connected() { - rhp.lock.Lock() - defer rhp.lock.Unlock() for k := range rhp.tried { delete(rhp.tried, k) } diff --git a/internal/log/log.go b/internal/log/log.go index 72ff9f0e..7630efd4 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -125,23 +125,23 @@ func (l *Logger) printf(lvl Level, msg string, args ...interface{}) { } func (l *Logger) Debug(msg string) { - l.Debugf(msg) + l.Debugf("%s", msg) } func (l *Logger) Info(msg string) { - l.Infof(msg) + l.Infof("%s", msg) } func (l *Logger) Warn(msg string) { - l.Warnf(msg) + l.Warnf("%s", msg) } func (l *Logger) Error(msg string) { - l.Errorf(msg) + l.Errorf("%s", msg) } func (l *Logger) Fatal(msg string) { - l.Fatalf(msg) + l.Fatalf("%s", msg) } func (l *Logger) Debugf(msg string, args ...interface{}) {