Skip to content

Commit

Permalink
test for leader uptime
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-deboer committed Mar 22, 2017
1 parent bbd3aaa commit 25f0784
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 8 deletions.
14 changes: 14 additions & 0 deletions pkg/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"time"

"bytes"

Expand Down Expand Up @@ -104,6 +105,12 @@ func main() {
looking for existing members`,
EnvVar: "ETCDCD_IGNORE_NAMING_MISMATCH",
},
cli.StringFlag{
Name: "minimum-uptime-seconds-to-join",
Value: "5m",
Usage: `The minimum amount of seconds after which it is viable to join an existing cluster in which
the current node is an expected member`,
},
}
app.Action = func(c *cli.Context) {

Expand Down Expand Up @@ -140,6 +147,11 @@ func parseArgs(c *cli.Context) *discovery.Discovery {
if len(masterFilter) == 0 {
log.Fatalf("'%s' is required", "master-names")
}
minUptimeString := c.String("minimum-uptime-to-join")
minJoinUptime, err := time.ParseDuration(c.String("minimum-uptime-to-join"))
if err != nil {
log.Fatalf("Invalid duration '%s': %v", minUptimeString, err)
}

return &discovery.Discovery{
Platform: platform,
Expand All @@ -152,5 +164,7 @@ func parseArgs(c *cli.Context) *discovery.Discovery {
IgnoreNamingMismatch: c.Bool("ignore-naming-mismatch"),
MasterFilter: masterFilter,
MaxTries: 5,
MinimumUptimeToJoin: minJoinUptime,
}

}
67 changes: 59 additions & 8 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"sort"
"strings"
"time"

"net/http"

log "github.com/Sirupsen/logrus"
etcd "github.com/coreos/etcd/client"
"github.com/matt-deboer/etcdcd/pkg/platform"
Expand All @@ -28,6 +31,7 @@ type Discovery struct {
MasterFilter string
DryRun bool
IgnoreNamingMismatch bool
MinimumUptimeToJoin time.Duration
}

func findMemberByName(members []etcd.Member, name string) *etcd.Member {
Expand Down Expand Up @@ -94,7 +98,7 @@ func (d *Discovery) DiscoverEnvironment() (map[string]string, error) {
sort.Slice(expectedMembers, func(i, j int) bool { return expectedMembers[i].Name < expectedMembers[j].Name })

localMaster := findMemberByName(expectedMembers, p.LocalInstanceName())
membersAPI, currentMembers, err := d.resolveMembersAndAPI(expectedMembers, localMaster)
membersAPI, currentMembers, uptime, err := d.resolveMembersAndAPI(expectedMembers, localMaster)

environment := map[string]string{}
environment["ETCD_NAME"] = p.LocalInstanceName()
Expand All @@ -105,7 +109,7 @@ func (d *Discovery) DiscoverEnvironment() (map[string]string, error) {
log.Debugf("Local master: %#v", *localMaster)
}
// this instance is an expected master
if len(currentMembers) > 0 && !containsMember(currentMembers, *localMaster) {
if len(currentMembers) > 0 && !containsMember(currentMembers, *localMaster) && uptime >= d.MinimumUptimeToJoin {
// there is an existing cluster
if err = d.assertSaneClusterState(expectedMembers, currentMembers); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -230,7 +234,7 @@ func (d *Discovery) joinExistingCluster(membersAPI etcd.MembersAPI,
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Retryable error attempting to add local master %#v: %v", localMember, err)
}
membersAPI, _, err = d.resolveMembersAndAPI(expectedMembers, localMember)
membersAPI, _, _, err = d.resolveMembersAndAPI(expectedMembers, localMember)
if err != nil {
log.Errorf("%s ERROR: %v", msg, err)
return err
Expand All @@ -240,11 +244,10 @@ func (d *Discovery) joinExistingCluster(membersAPI etcd.MembersAPI,
return nil
}

func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member, localMember *etcd.Member) (etcd.MembersAPI, []etcd.Member, error) {
func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member,
localMember *etcd.Member) (membersAPI etcd.MembersAPI, currentMembers []etcd.Member, uptime time.Duration, err error) {

ctx := context.Background()
var currentMembers []etcd.Member
var membersAPI etcd.MembersAPI
var lastErr error
for tries := 0; tries <= d.MaxTries; tries++ {
for _, member := range expectedMembers {
Expand All @@ -267,6 +270,20 @@ func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member, localMem
}

membersAPI = etcd.NewMembersAPI(etcdClient)
leader, err := membersAPI.Leader(ctx)
if err != nil {
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Error getting leader %s %v, %v", member.Name, member.ClientURLs, err)
}
lastErr = err
continue
} else if leader == nil {
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Error getting leader %s %v, %v", member.Name, member.ClientURLs, err)
}
lastErr = errors.New("Failed to resolve cluster leader")
}

currentMembers, err = membersAPI.List(ctx)
if err != nil {
if log.GetLevel() >= log.DebugLevel {
Expand All @@ -275,6 +292,16 @@ func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member, localMem
lastErr = err
continue
}

uptime, err = getUptime(etcdClient.Endpoints()[0])
if err != nil {
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Error listing leader uptime %s %v, %v", member.Name, member.ClientURLs, err)
}
lastErr = err
continue
}

// sanity-check the returned members; it may be partial in case of a yet-forming cluster
hasInvalidMembers := false
for _, m := range currentMembers {
Expand All @@ -290,7 +317,7 @@ func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member, localMem
if log.GetLevel() >= log.DebugLevel {
log.Debugf("Actual cluster members: %#v", currentMembers)
}
return membersAPI, currentMembers, nil
return membersAPI, currentMembers, uptime, nil
}
}
}
Expand All @@ -305,5 +332,29 @@ func (d *Discovery) resolveMembersAndAPI(expectedMembers []etcd.Member, localMem
break
}
}
return nil, nil, lastErr
return nil, nil, time.Millisecond, lastErr
}

func getUptime(endpoint string) (time.Duration, error) {
resp, err := http.DefaultClient.Get(endpoint + "/v2/stats/self")
if err != nil {
return time.Millisecond, err
}

defer resp.Body.Close()
contents, err := ioutil.ReadAll(resp.Body)
if err != nil {
return time.Millisecond, err
}
stats := make(map[string]interface{})
err = json.Unmarshal(contents, &stats)
if err != nil {
return time.Millisecond, err
}
if leaderInfo, ok := stats["leaderInfo"]; ok {
if uptimeString, ok := leaderInfo.(map[string]interface{})["uptime"]; ok {
return time.ParseDuration(uptimeString.(string))
}
}
return time.Millisecond, fmt.Errorf("Missing leader uptime info for endpiont %s", endpoint)
}

0 comments on commit 25f0784

Please sign in to comment.