Skip to content

Commit

Permalink
Added Health check for container
Browse files Browse the repository at this point in the history
  • Loading branch information
supermeng committed Apr 25, 2017
1 parent 2d39c40 commit 474ac70
Show file tree
Hide file tree
Showing 16 changed files with 505 additions and 20 deletions.
43 changes: 43 additions & 0 deletions apiserver/ports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package apiserver

import (
"fmt"
"net/http"

"github.com/laincloud/deployd/engine"
"github.com/mijia/sweb/form"
"github.com/mijia/sweb/log"
"github.com/mijia/sweb/server"
"golang.org/x/net/context"
)

type RestfulPorts struct {
server.BaseResource
}

type Ports struct {
Ports []int
}

func (rn RestfulPorts) Get(ctx context.Context, r *http.Request) (int, interface{}) {
return http.StatusOK, engine.FetchAllPortsInfo()
}

func (rn RestfulPorts) Post(ctx context.Context, r *http.Request) (int, interface{}) {
options := []string{"validate"}
cmd := form.ParamStringOptions(r, "cmd", options, "noop")
switch cmd {
case "validate":
var ports Ports
if err := form.ParamBodyJson(r, &ports); err != nil {
log.Warnf("Failed to decode valiad ports, %s", err)
return http.StatusBadRequest, fmt.Sprintf("Invalid ports params format: %s", err)
}
occs := engine.OccupiedPorts(ports.Ports...)
if len(occs) == 0 {
return http.StatusOK, nil
}
return http.StatusBadRequest, fmt.Sprintf("Conflicted ports: %v", occs)
}
return http.StatusBadRequest, fmt.Sprintf("Unknown request!")
}
1 change: 1 addition & 0 deletions apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *Server) ListenAndServe(addr string) error {
s.AddRestfulResource("/api/status", "RestfulStatus", RestfulStatus{})
s.AddRestfulResource("/api/constraints", "RestfulConstraints", RestfulConstraints{})
s.AddRestfulResource("/api/notifies", "RestfulNotifies", RestfulNotifies{})
s.AddRestfulResource("/api/ports", "RestfulPorts", RestfulPorts{})

s.Get("/debug/vars", "RuntimeStat", s.getRuntimeStat)
s.NotFound(func(ctx context.Context, w http.ResponseWriter, r *http.Request) context.Context {
Expand Down
30 changes: 30 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/laincloud/deployd/cluster"
"github.com/laincloud/deployd/storage"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/adoc"
"github.com/mijia/sweb/log"
)
Expand Down Expand Up @@ -335,6 +336,7 @@ func (engine *OrcEngine) initPodGroupCtrl(spec PodGroupSpec, states []PodPrevSta
// This will be running inside the go routine
func (engine *OrcEngine) initOperationWorker() {
tick := time.Tick(time.Duration(RefreshInterval) * time.Second)
portsTick := time.Tick(5 * time.Minute)
for {
select {
case op := <-engine.opsChan:
Expand Down Expand Up @@ -370,6 +372,8 @@ func (engine *OrcEngine) initOperationWorker() {
}
}
engine.RUnlock()
case <-portsTick:
RefreshPorts(engine.pgCtrls)
case <-engine.stop:
if len(engine.opsChan) == 0 {
return
Expand Down Expand Up @@ -504,6 +508,32 @@ func (engine *OrcEngine) startClusterMonitor() {
}
engine.onClusterNodeLost(event.Node.Name, downCount)
}
} else if strings.HasPrefix(event.Status, "health_status") {
id := event.Id
if cont, err := engine.cluster.InspectContainer(id); err == nil {
status := HealthState(HealthStateNone)
switch event.Status {
case "health_status: starting":
status = HealthStateStarting
case "health_status: healthy":
status = HealthStateHealthy
case "health_status: unhealthy":
status = HealthStateUnHealthy
}
containerName := strings.TrimLeft(cont.Name, "/")
if podName, instance, err := util.ParseNameInstanceNo(containerName); err == nil {
pgCtrl, ok := engine.pgCtrls[podName]
if ok {
if len(pgCtrl.podCtrls) >= instance {
pgCtrl.podCtrls[instance-1].pod.Healthst = status
pgCtrl.opsChan <- pgOperSnapshotGroup{true}
pgCtrl.opsChan <- pgOperSaveStore{true}
}
}
}
} else {
log.Errorf("ParseNameInstanceNo error:%v", err)
}
}
}
})
Expand Down
49 changes: 49 additions & 0 deletions engine/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import (
// if they are restarted before (now - RestartInfoClearInterval), clear the restart info
var RestartInfoClearInterval time.Duration

const (
DefaultHealthInterval = 10
DefaultHealthTimeout = 5
DefaultHealthRetries = 3
)

// podController is controlled by the podGroupController
type podController struct {
spec PodSpec
Expand Down Expand Up @@ -313,6 +319,18 @@ func (pc *podController) refreshContainer(kluster cluster.Cluster, index int) {
pc.pod.LastError = state.Error
}
}
health := state.Health
if health != nil {
if health.Status == HealthState(HealthStateStarting).String() {
pc.pod.Healthst = HealthState(HealthStateStarting)
} else if health.Status == HealthState(HealthStateHealthy).String() {
pc.pod.Healthst = HealthState(HealthStateHealthy)
} else {
pc.pod.Healthst = HealthState(HealthStateUnHealthy)
}
} else {
pc.pod.Healthst = HealthState(HealthStateNone)
}
}
}

Expand Down Expand Up @@ -384,6 +402,37 @@ func (pc *podController) createContainerConfig(filters []string, index int) adoc
Entrypoint: spec.Entrypoint,
Labels: labelsMap,
}
if podSpec.HealthConfig.Cmd != "" {
if podSpec.HealthConfig.Cmd == "none" {
cc.Healthcheck = &adoc.HealthConfig{
Test: []string{"NONE"},
}
} else {
interval := DefaultHealthInterval
timeout := DefaultHealthTimeout
retries := DefaultHealthRetries

if podSpec.HealthConfig.Options.Interval > 0 {
interval = podSpec.HealthConfig.Options.Interval
}

if podSpec.HealthConfig.Options.Timeout > 0 {
interval = podSpec.HealthConfig.Options.Timeout
}

if podSpec.HealthConfig.Options.Retries > 0 {
interval = podSpec.HealthConfig.Options.Retries
}

cc.Healthcheck = &adoc.HealthConfig{
Test: []string{"CMD-SHELL", "timeout 10s " + pc.spec.HealthConfig.Cmd + " || exit 1"},
Interval: time.Duration(interval) * time.Second,
Timeout: time.Duration(timeout) * time.Second,
Retries: retries,
}
}
}

if spec.Expose > 0 {
cc.ExposedPorts = map[string]struct{}{
fmt.Sprintf("%d/tcp", spec.Expose): struct{}{},
Expand Down
31 changes: 25 additions & 6 deletions engine/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ func (pgCtrl *podGroupController) Inspect() PodGroupWithSpec {
pgCtrl.RLock()
defer pgCtrl.RUnlock()
p := PodGroupWithSpec{pgCtrl.spec, pgCtrl.prevState, pgCtrl.group}
log.Infof("ppp:%v\n", p)
log.Warnf("p.LastError:%v\n", p.LastError)
return p
}

Expand Down Expand Up @@ -147,7 +145,6 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) {

pgCtrl.group.LastError = ""
if ok := pgCtrl.updatePodPorts(podSpec); !ok {
log.Infof("updatePodPorts failed:%v", pgCtrl.group.LastError)
return
}
oldPodSpec := spec.Pod.Clone()
Expand All @@ -161,9 +158,8 @@ func (pgCtrl *podGroupController) RescheduleSpec(podSpec PodSpec) {
pgCtrl.opsChan <- pgOperSaveStore{true}
pgCtrl.opsChan <- pgOperSnapshotEagleView{spec.Name}
for i := 0; i < spec.NumInstances; i += 1 {
pgCtrl.waitLastPodStarted(i, podSpec)
pgCtrl.opsChan <- pgOperUpgradeInstance{i + 1, spec.Version, oldPodSpec, spec.Pod}
// wait some seconds for new instance's initialization completed, before we update next one
time.Sleep(time.Second * time.Duration(podSpec.GetSetupTime()))
}
pgCtrl.opsChan <- pgOperSnapshotGroup{true}
pgCtrl.opsChan <- pgOperSnapshotPrevState{}
Expand All @@ -181,6 +177,7 @@ func (pgCtrl *podGroupController) RescheduleDrift(fromNode, toNode string, insta
pgCtrl.opsChan <- pgOperLogOperation{fmt.Sprintf("Start to reschedule drift from %s", fromNode)}
if instanceNo == -1 {
for i := 0; i < spec.NumInstances; i += 1 {
pgCtrl.waitLastPodStarted(i, spec.Pod)
pgCtrl.opsChan <- pgOperDriftInstance{i + 1, fromNode, toNode, force}
}
} else {
Expand Down Expand Up @@ -370,13 +367,35 @@ func (pgCtrl *podGroupController) updatePodPorts(podSpec PodSpec) bool {
} else {
pgCtrl.group.State = RunStateFail
pgCtrl.group.LastError = fmt.Sprintf("Cannot start podgroup %v, some ports like %v were alerady in used!", pgCtrl.spec.Name, existsPorts)
log.Warn("check ports failed")
return false
}
}
return true
}

func (pgCtrl *podGroupController) waitLastPodStarted(i int, podSpec PodSpec) {
retries := 5
if i > 0 {
// wait some seconds for new instance's initialization completed, before we update next one
if pgCtrl.podCtrls[i].pod.Healthst == HealthStateNone {
time.Sleep(time.Second * time.Duration(podSpec.GetSetupTime()))
} else {
retryTimes := 0
for {
if retryTimes == retries {
break
}
retryTimes++
// wait until to non-starting state
if pgCtrl.podCtrls[i-1].pod.Healthst != HealthStateStarting {
break
}
time.Sleep(time.Second * 10)
}
}
}
}

func newPodGroupController(spec PodGroupSpec, states []PodPrevState, pg PodGroup) *podGroupController {
podCtrls := make([]*podController, spec.NumInstances)
for i := range podCtrls {
Expand Down
5 changes: 4 additions & 1 deletion engine/podgroup_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (op pgOperSnapshotGroup) Do(pgCtrl *podGroupController, c cluster.Cluster,
}()

group.State = RunStateSuccess
group.Healthst = HealthStateHealthy
group.LastError = ""
group.Pods = make([]Pod, spec.NumInstances)
for i, podCtrl := range pgCtrl.podCtrls {
Expand All @@ -340,6 +341,9 @@ func (op pgOperSnapshotGroup) Do(pgCtrl *podGroupController, c cluster.Cluster,
group.State = podCtrl.pod.State
group.LastError = podCtrl.pod.LastError
}
if podCtrl.pod.Healthst != HealthStateHealthy {
group.Healthst = podCtrl.pod.Healthst
}
}
if op.updateTime {
group.UpdatedAt = time.Now()
Expand Down Expand Up @@ -461,7 +465,6 @@ type pgOperLogOperation struct {
func (op pgOperLogOperation) Do(pgCtrl *podGroupController, c cluster.Cluster, store storage.Store, ev *RuntimeEagleView) bool {
pgCtrl.RLock()
defer pgCtrl.RUnlock()
log.Infof("%s %s", pgCtrl, op.msg)
return false
}

Expand Down
Loading

0 comments on commit 474ac70

Please sign in to comment.