Skip to content

Commit

Permalink
Resolve zk hosts in background
Browse files Browse the repository at this point in the history
  • Loading branch information
secwall committed Aug 23, 2024
1 parent 3f4b680 commit a54e3c1
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 121 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ format:
goimports -w `find . -name '*.go'`

lint:
docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.58 golangci-lint run -v
docker run --rm -v ${CURDIR}:/app -w /app golangci/golangci-lint:v1.60 golangci-lint run -v

unittests:
go test ./cmd/... ./internal/...
Expand Down
12 changes: 6 additions & 6 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (app *App) baseContext() context.Context {
func (app *App) connectDCS() error {
var err error
// TODO: support other DCS systems
app.dcs, err = dcs.NewZookeeper(&app.config.Zookeeper, app.logger)
app.dcs, err = dcs.NewZookeeper(app.baseContext(), &app.config.Zookeeper, app.logger)
if err != nil {
return fmt.Errorf("failed to connect to zkDCS: %s", err.Error())
}
Expand Down Expand Up @@ -616,7 +616,7 @@ func (app *App) stateManager() appState {
// activeNodes are master + alive running replicas
activeNodes, err := app.GetActiveNodes()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return stateManager
}
app.logger.Infof("active: %v", activeNodes)
Expand Down Expand Up @@ -681,7 +681,7 @@ func (app *App) stateManager() appState {
}
return stateManager
} else if err != dcs.ErrNotFound {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return stateManager
}

Expand Down Expand Up @@ -2303,7 +2303,7 @@ func (app *App) Run() int {

err := app.lockFile()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.unlockFile()
Expand All @@ -2313,14 +2313,14 @@ func (app *App) Run() int {

err = app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.dcs.Close()

err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()
Expand Down
56 changes: 28 additions & 28 deletions internal/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@ import (
func (app *App) CliInfo(short bool) int {
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
app.dcs.Initialize()
defer app.dcs.Close()

err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()
if err := app.cluster.UpdateHostsInfo(); err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand All @@ -57,7 +57,7 @@ func (app *App) CliInfo(short bool) int {

activeNodes, err := app.GetActiveNodes()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
sort.Strings(activeNodes)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (app *App) CliInfo(short bool) int {
} else {
tree, err = app.dcs.GetTree("")
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
}
Expand All @@ -140,20 +140,20 @@ func (app *App) CliInfo(short bool) int {
func (app *App) CliState(short bool) int {
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.dcs.Close()
app.dcs.Initialize()
err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()

if err := app.cluster.UpdateHostsInfo(); err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand Down Expand Up @@ -191,20 +191,20 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
}
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.dcs.Close()
app.dcs.Initialize()
err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()

if err := app.cluster.UpdateHostsInfo(); err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand All @@ -223,7 +223,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
}
activeNodes, err := app.GetActiveNodes()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand Down Expand Up @@ -277,12 +277,12 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
// to avoid switching from one to another, use switch to behavior
positions, err := app.getNodePositions(candidates)
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag())
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
}
Expand All @@ -295,7 +295,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
return 2
}
if err != dcs.ErrNotFound {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 2
}

Expand All @@ -311,7 +311,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
return 2
}
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
// wait for switchover to complete
Expand Down Expand Up @@ -353,7 +353,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int {
ctx := app.baseContext()
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.dcs.Close()
Expand All @@ -365,7 +365,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int {
}
err = app.dcs.Create(pathMaintenance, maintenance)
if err != nil && err != dcs.ErrExists {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
// wait for mysync to pause
Expand All @@ -379,7 +379,7 @@ func (app *App) CliEnableMaintenance(waitTimeout time.Duration) int {
case <-ticker.C:
err = app.dcs.Get(pathMaintenance, maintenance)
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
}
if maintenance.MySyncPaused {
break Out
Expand All @@ -404,7 +404,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int {
ctx := app.baseContext()
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.dcs.Close()
Expand Down Expand Up @@ -439,7 +439,7 @@ func (app *App) CliDisableMaintenance(waitTimeout time.Duration) int {
break Out
}
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
}
case <-waitCtx.Done():
break Out
Expand Down Expand Up @@ -495,7 +495,7 @@ func (app *App) CliAbort() int {
return 0
}
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand All @@ -514,7 +514,7 @@ func (app *App) CliAbort() int {

err = app.dcs.Delete(pathCurrentSwitch)
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand All @@ -526,15 +526,15 @@ func (app *App) CliAbort() int {
func (app *App) CliHostList() int {
err := app.connectDCS()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
app.dcs.Initialize()
defer app.dcs.Close()

err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()
Expand Down Expand Up @@ -584,7 +584,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry

err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()
Expand All @@ -601,7 +601,7 @@ func (app *App) CliHostAdd(host string, streamFrom *string, priority *int64, dry

err = app.cluster.UpdateHostsInfo()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}

Expand Down Expand Up @@ -667,7 +667,7 @@ func (app *App) CliHostRemove(host string) int {

err = app.newDBCluster()
if err != nil {
app.logger.Errorf(err.Error())
app.logger.Error(err.Error())
return 1
}
defer app.cluster.Close()
Expand Down
10 changes: 6 additions & 4 deletions internal/dcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type ZookeeperConfig struct {
}

type RandomHostProviderConfig struct {
LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"`
LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"`
LookupTimeout time.Duration `config:"lookup_timeout" yaml:"lookup_timeout"`
LookupTTL time.Duration `config:"lookup_ttl" yaml:"lookup_ttl"`
LookupTickInterval time.Duration `config:"lookup_tick_interval" yaml:"lookup_tick_interval"`
}

func DefaultRandomHostProviderConfig() RandomHostProviderConfig {
return RandomHostProviderConfig{
LookupTimeout: 3 * time.Second,
LookupTTL: 300 * time.Second,
LookupTimeout: 3 * time.Second,
LookupTTL: 300 * time.Second,
LookupTickInterval: 60 * time.Second,
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/dcs/zk.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dcs

import (
"context"
"encoding/json"
"fmt"
"net"
Expand Down Expand Up @@ -52,7 +53,7 @@ func retry(config *ZookeeperConfig, operation func() error) error {
}

// NewZookeeper returns Zookeeper based DCS storage
func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) {
func NewZookeeper(ctx context.Context, config *ZookeeperConfig, logger *log.Logger) (DCS, error) {
if len(config.Hosts) == 0 {
return nil, fmt.Errorf("zookeeper not configured, fill zookeeper/hosts in config")
}
Expand All @@ -70,7 +71,7 @@ func NewZookeeper(config *ZookeeperConfig, logger *log.Logger) (DCS, error) {
var ec <-chan zk.Event
var err error
var operation func() error
hostProvider := NewRandomHostProvider(&config.RandomHostProvider, logger)
hostProvider := NewRandomHostProvider(ctx, &config.RandomHostProvider, logger)
if config.UseSSL {
if config.CACert == "" || config.KeyFile == "" || config.CertFile == "" {
return nil, fmt.Errorf("zookeeper ssl not configured, fill ca_cert/key_file/cert_file in config or disable use_ssl flag")
Expand Down
Loading

0 comments on commit a54e3c1

Please sign in to comment.