Skip to content

Commit

Permalink
Fix proxies validation code
Browse files Browse the repository at this point in the history
- Fixes proxies validation code, minor refactoring and better logging.
  • Loading branch information
spkesan authored Jan 27, 2021
1 parent 733550f commit 2b62b82
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions aerospike-utility/aku-adm.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ func createAerospikeClient(host string, username string, password string) *aeros
func performOperation(op string, host string) {
switch op {
case "post-start":
if securityEnabled != "true" {
zap.S().Info("Security disabled. No post-start operation.")
break
}
client := createAerospikeClient(host, adminUsername, adminPassword)
performPostStartOp(client)
client.Close()
Expand Down Expand Up @@ -130,7 +134,7 @@ func performPreStopOpCommunity(client *aerospike.Client) {
}

if out[command] == "ERROR::unstable-cluster" {
zap.S().Errorf("Unstable cluster. Retrying.")
zap.S().Warn("Unstable cluster. Retrying.")
retry = true
break
}
Expand Down Expand Up @@ -222,7 +226,7 @@ func performPreStopOp(client *aerospike.Client) {
}

if out[command] == "ERROR::unstable-cluster" {
zap.S().Errorf("Unstable cluster. Retrying.")
zap.S().Warn("Unstable cluster. Retrying.")
retry = true
break
}
Expand Down Expand Up @@ -281,7 +285,7 @@ func performPreStopOp(client *aerospike.Client) {
metrics := parseStats(stats[fmt.Sprintf("namespace/%s", selectedNamespace)], ";")

if metrics["pending_quiesce"] != "true" {
zap.S().Errorf("Waiting for pending_quiesce to be true. Retrying.")
zap.S().Warn("Waiting for pending_quiesce to be true. Retrying.")
time.Sleep(1 * time.Second)
continue
}
Expand Down Expand Up @@ -374,7 +378,7 @@ func performPreStopOp(client *aerospike.Client) {
}

if pendingQuiesceTrueInAllNodes {
zap.S().Error("pending_quiesce is true on all nodes. Already in pre-stop, stopping pod.")
zap.S().Error("pending_quiesce is true on all nodes. already in pre-stop, stopping pod.")
// Quiesce was probably ignored. Abort.
cmd := "cluster-stable:ignore-migrations=no;"
for {
Expand All @@ -390,7 +394,7 @@ func performPreStopOp(client *aerospike.Client) {
}

if res[cmd] == "ERROR::unstable-cluster" {
zap.S().Errorf("Unstable cluster. Retrying.")
zap.S().Warn("Unstable cluster. Retrying.")
shouldRetry = true
break
}
Expand Down Expand Up @@ -445,7 +449,7 @@ func performPreStopOp(client *aerospike.Client) {
metrics := parseStats(stats[fmt.Sprintf("namespace/%s", selectedNamespace)], ";")

if metrics["effective_is_quiesced"] != "true" {
zap.S().Errorf("Waiting for effective_is_quiesced to be true. Retrying.")
zap.S().Warn("Waiting for effective_is_quiesced to be true. Retrying.")
retry = true
break
}
Expand Down Expand Up @@ -514,6 +518,7 @@ func performPreStopOp(client *aerospike.Client) {
zap.S().Debugf("Total Ops/sec: %f.", opsPerSec)

if opsPerSec != 0.0 {
zap.S().Warn("Current throughput non zero. Retrying.")
retry = true
break
}
Expand All @@ -527,7 +532,7 @@ func performPreStopOp(client *aerospike.Client) {
continue
}

zap.S().Info("No throughput observed on this node.")
zap.S().Info("No active transactions observed on this node.")
break
}

Expand All @@ -548,9 +553,10 @@ func performPreStopOp(client *aerospike.Client) {
break
}

if len(out["namespaces"]) > 0 {
namespacesList := strings.Split(out["namespaces"], ";")
if len(namespacesList) > 0 {
total := 0
for _, ns := range out["namespaces"] {
for _, ns := range namespacesList {
stats, err := node.RequestInfo(infoPolicy, fmt.Sprintf("namespace/%s", ns))
if err != nil {
zap.S().Errorf("Error while fetching namespace statistics for %s: %v. Retrying.", ns, err)
Expand All @@ -577,14 +583,15 @@ func performPreStopOp(client *aerospike.Client) {

i, _ = strconv.Atoi(metrics["batch_sub_proxy_timeout"])
total += i
}

if totalProxies != total {
zap.S().Debugf("Previous total proxies: %d, Current total proxies: %d.", totalProxies, total)
zap.S().Error("Proxies diff non zero. Retrying.")
totalProxies = total
retry = true
break
}
zap.S().Debugf("Previous total proxies: %d, Current total proxies: %d.", totalProxies, total)

if totalProxies != total {
zap.S().Warn("Proxies diff non zero. Retrying.")
totalProxies = total
retry = true
break
}
}

Expand All @@ -593,11 +600,11 @@ func performPreStopOp(client *aerospike.Client) {
}

if retry {
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
continue
}

zap.S().Info("No proxies observed on this node.")
zap.S().Info("No ongoing proxies observed on this node.")
break
}

Expand Down Expand Up @@ -634,11 +641,6 @@ func getLatencyCommand(ver string) (string, error) {
// creates a sys-admin user which can be used for pre stop operation.
// (only when security is enabled)
func performPostStartOp(client *aerospike.Client) {
if securityEnabled == "false" {
zap.S().Info("Security disabled. No post-start operation.")
return
}

zap.S().Info("Starting post-start operation.")

adminPolicy := aerospike.NewAdminPolicy()
Expand Down

0 comments on commit 2b62b82

Please sign in to comment.