Skip to content

Commit

Permalink
Merge pull request kubernetes#19538 from mesosphere/jdef_mesos_026_co…
Browse files Browse the repository at this point in the history
…mpat

MESOS: compatibility w/ mesos v0.26
  • Loading branch information
mikedanese committed Jan 14, 2016
2 parents 87bce64 + ad1803a commit 1f0b10b
Showing 1 changed file with 75 additions and 27 deletions.
102 changes: 75 additions & 27 deletions pkg/cloudprovider/providers/mesos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mesos
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
Expand All @@ -37,7 +38,7 @@ import (

const defaultClusterName = "mesos"

var noLeadingMasterError = fmt.Errorf("there is no current leading master available to query")
var noLeadingMasterError = errors.New("there is no current leading master available to query")

type mesosClient struct {
masterLock sync.RWMutex
Expand Down Expand Up @@ -136,17 +137,11 @@ func createMesosClient(
client.state.refill = client.pollMasterForState
first := true
if err := md.Detect(detector.OnMasterChanged(func(info *mesos.MasterInfo) {
client.masterLock.Lock()
defer client.masterLock.Unlock()
if info == nil {
client.master = ""
} else if host := info.GetHostname(); host != "" {
client.master = host
} else {
client.master = unpackIPv4(info.GetIp())
}
if len(client.master) > 0 {
client.master = fmt.Sprintf("%s:%d", client.master, info.GetPort())
host, port := extractMasterAddress(info)
if len(host) > 0 {
client.masterLock.Lock()
defer client.masterLock.Unlock()
client.master = fmt.Sprintf("%s:%d", host, port)
if first {
first = false
close(initialMaster)
Expand All @@ -160,6 +155,28 @@ func createMesosClient(
return client, nil
}

func extractMasterAddress(info *mesos.MasterInfo) (host string, port int) {
if info != nil {
host = info.GetAddress().GetHostname()
if host == "" {
host = info.GetAddress().GetIp()
}

if host != "" {
// use port from Address
port = int(info.GetAddress().GetPort())
} else {
// deprecated: get host and port directly from MasterInfo (and not Address)
host = info.GetHostname()
if host == "" {
host = unpackIPv4(info.GetIp())
}
port = int(info.GetPort())
}
}
return
}

func unpackIPv4(ip uint32) string {
octets := make([]byte, 4, 4)
binary.BigEndian.PutUint32(octets, ip)
Expand Down Expand Up @@ -198,29 +215,60 @@ func (c *mesosClient) pollMasterForState(ctx context.Context) (*mesosState, erro

//TODO(jdef) should not assume master uses http (what about https?)

uri := fmt.Sprintf("http://%s/state.json", master)
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
return nil, err
}
var state *mesosState
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return fmt.Errorf("HTTP request failed with code %d: %v", res.StatusCode, res.Status)
}
successHandler := func(res *http.Response) error {
blob, err1 := ioutil.ReadAll(res.Body)
if err1 != nil {
return err1
}
log.V(3).Infof("Got mesos state, content length %v", len(blob))
state, err1 = parseMesosState(blob)
return err1
})
return state, err
}
// thinking here is that we may get some other status codes from mesos at some point:
// - authentication
// - redirection (possibly from http to https)
// ...
for _, tt := range []struct {
uri string
handlers map[int]func(*http.Response) error
}{
{
uri: fmt.Sprintf("http://%s/state", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
{
uri: fmt.Sprintf("http://%s/state.json", master),
handlers: map[int]func(*http.Response) error{
200: successHandler,
},
},
} {
req, err := http.NewRequest("GET", tt.uri, nil)
if err != nil {
return nil, err
}
err = c.httpDo(ctx, req, func(res *http.Response, err error) error {
if err != nil {
return err
}
defer res.Body.Close()
if handler, ok := tt.handlers[res.StatusCode]; ok {
err1 := handler(res)
if err1 != nil {
return err1
}
}
// no handler for this error code, proceed to the next connection type
return nil
})
if state != nil || err != nil {
return state, err
}
}
return nil, errors.New("failed to sync with Mesos master")
}

func parseMesosState(blob []byte) (*mesosState, error) {
Expand Down

0 comments on commit 1f0b10b

Please sign in to comment.