Skip to content

Commit

Permalink
Finished delete node gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
supermeng committed Dec 28, 2017
1 parent c1f82c6 commit f8b69e8
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 19 deletions.
14 changes: 14 additions & 0 deletions apiserver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,17 @@ func (rn RestfulNodes) Patch(ctx context.Context, r *http.Request) (int, interfa
return http.StatusBadRequest, fmt.Sprintf("Unkown command %s", cmd)
}
}

func (rn RestfulNodes) Delete(ctx context.Context, r *http.Request) (int, interface{}) {
node := form.ParamString(r, "node", "")

if node == "" {
return http.StatusBadRequest, "from node name required"
}
engine := getEngine(ctx)
engine.RemoveNode(node)
return http.StatusAccepted, map[string]interface{}{
"message": "containers in node will be drift",
"node": node,
}
}
31 changes: 19 additions & 12 deletions engine/eagleview.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ func (ev *RuntimeEagleView) Refresh(c cluster.Cluster) error {
}()

labelFilter := []string{"com.docker.swarm.id"}
filters := map[string][]string{
"label": labelFilter,
}
podGroups := make(map[string][]RuntimeEaglePod)
err := ev.refreshCallback(c, labelFilter, func(pod RuntimeEaglePod) {
err := ev.refreshCallback(c, filters, func(pod RuntimeEaglePod) {
name := pod.Name
podGroups[name] = append(podGroups[name], pod)
totalContainers += 1
Expand All @@ -70,10 +73,10 @@ func (ev *RuntimeEagleView) RefreshPodGroup(c cluster.Cluster, pgName string) ([
pgName, totalContainers, time.Now().Sub(start))
}()

filters := []string{
labelFilters := []string{
fmt.Sprintf("%s.pg_name=%s", kLainLabelPrefix, pgName),
}
if pods, err := ev.refreshByFilters(c, filters); err == nil {
if pods, err := ev.filterByLabels(c, labelFilters); err == nil {
ev.Lock()
ev.podGroups[pgName] = pods
ev.Unlock()
Expand All @@ -94,27 +97,31 @@ func (ev *RuntimeEagleView) RefreshPodsByNamespace(c cluster.Cluster, namespace
namespace, totalContainers, time.Now().Sub(start))
}()

filters := []string{
labelFilters := []string{
fmt.Sprintf("%s.pg_namespace=%s", kLainLabelPrefix, namespace),
"com.docker.swarm.id",
}
pods, err := ev.refreshByFilters(c, filters)
pods, err := ev.filterByLabels(c, labelFilters)
totalContainers = len(pods)
return pods, err
}

func (ev *RuntimeEagleView) refreshByFilters(c cluster.Cluster, labelFilters []string) ([]RuntimeEaglePod, error) {
labelFilters = append(labelFilters, "com.docker.swarm.id")
func (ev *RuntimeEagleView) filterByLabels(c cluster.Cluster, labelFilters []string) ([]RuntimeEaglePod, error) {
filters := map[string][]string{
"label": labelFilters,
}
return ev.refreshByFilters(c, filters)
}

func (ev *RuntimeEagleView) refreshByFilters(c cluster.Cluster, filters map[string][]string) ([]RuntimeEaglePod, error) {
pods := make([]RuntimeEaglePod, 0, 10)
err := ev.refreshCallback(c, labelFilters, func(pod RuntimeEaglePod) {
err := ev.refreshCallback(c, filters, func(pod RuntimeEaglePod) {
pods = append(pods, pod)
})
return pods, err
}

func (ev *RuntimeEagleView) refreshCallback(c cluster.Cluster, labelFilter []string, callback func(RuntimeEaglePod)) error {
filters := map[string][]string{
"label": labelFilter,
}
func (ev *RuntimeEagleView) refreshCallback(c cluster.Cluster, filters map[string][]string, callback func(RuntimeEaglePod)) error {
filterJson, err := json.Marshal(filters)
if err != nil {
log.Warnf("<RuntimeEagleView> Failed to encode the filter json, %s", err)
Expand Down
58 changes: 58 additions & 0 deletions engine/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package engine

import (
"time"

"github.com/laincloud/deployd/cluster"
"github.com/mijia/sweb/log"
)

// remove a node should be in such steps show below
// 1. make target node in maintenance with constraint
// 2. fetch all containers in target node
// 3. drift all containers in target node Asynchronously
// (which can make cluster corrupted but eagle will correct it, don't worry!
// in situation: schedule instance(generally shrink) and drift concurrently)
// 4. stop all process service for lain (generally by lainctl)
// 5. remove maintenance(generally by lainctl or called in add node phase)
func (engine *OrcEngine) RemoveNode(node string) error {
// step 1
constraint := ConstraintSpec{"node", false, node, true}
cstController.SetConstraint(constraint, engine.store)
// step 2
pods, err := engine.eagleView.refreshPodsByNode(engine.cluster, []string{node})
if err != nil {
log.Warn("refreshPodsByNode err:%v", err)
return err
}
log.Infof("pods %v will be drift", pods)
// step 3
for _, pod := range pods {
engine.DriftNode(node, "", pod.Name, pod.InstanceNo, true)
}
return nil
}

// Fetch all containers in target nodes
func (ev *RuntimeEagleView) refreshPodsByNode(c cluster.Cluster, nodes []string) ([]RuntimeEaglePod, error) {
totalContainers := 0
start := time.Now()
defer func() {
log.Infof("<RuntimeEagleView> pods by node %v refreshed, #containers=%d, duration=%s",
nodes, totalContainers, time.Now().Sub(start))
}()
nodeFilters := make([]string, len(nodes))
for i, node := range nodes {
nodeFilters[i] = node
}
filters := make(map[string][]string)
labelFilters := []string{
"com.docker.swarm.id",
}
filters["node"] = nodeFilters
filters["label"] = labelFilters
log.Infof("filters: %v", filters)
pods, err := ev.refreshByFilters(c, filters)
totalContainers = len(pods)
return pods, err
}
10 changes: 7 additions & 3 deletions engine/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,15 @@ func (pgCtrl *podGroupController) cleanCorruptedContainers() {
id: rtPod.Container.Id,
})
}
log.Infof("pods::%v", pods)
corrupted := false
uselessContainers := make([]*container, 0)
for _, containers := range pods {
if len(containers) > 1 {
for instance, containers := range pods {
if instance > pgCtrl.spec.NumInstances {
corrupted = true
for _, container := range containers {
uselessContainers = append(uselessContainers, container)
}
} else if len(containers) > 1 {
By(ByVersionAndDriftCounter).Sort(containers)
corrupted = true
for _, container := range containers[1:] {
Expand Down
4 changes: 1 addition & 3 deletions engine/podgroup_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ type pgOperDeployInstance struct {
* Deploy is happend when deployding, scheduling, foundMissing
* 1. check if current pod is deployd in cluster
* 2. if deployed, check if corrupted, if corrupted try recover it
* 3. if not deployed, just deploy it with driftcount+1
* 3. if not deployed, just deploy it
*
*/
func (op pgOperDeployInstance) Do(pgCtrl *podGroupController, c cluster.Cluster, store storage.Store, ev *RuntimeEagleView) bool {
Expand Down Expand Up @@ -473,8 +473,6 @@ func (op pgOperDriftInstance) Do(pgCtrl *podGroupController, c cluster.Cluster,
oldSpec, oldPod := podCtrl.spec.Clone(), podCtrl.pod
oldNodeName := oldPod.NodeName()

pgCtrl.waitLastPodHealth(op.instanceNo - 1)

isDrifted = podCtrl.Drift(c, op.fromNode, op.toNode, op.force)
runtime = podCtrl.pod.ImRuntime
if isDrifted {
Expand Down
2 changes: 1 addition & 1 deletion storage/etcd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (store *EtcdStore) SetWithTTL(key string, v interface{}, ttlSec int, force

func (store *EtcdStore) Remove(key string) error {
_, err := store.keysApi.Delete(store.ctx, key, nil)
if err != nil {
if err == nil {
store.Lock()
delete(store.keyHashes, key)
store.Unlock()
Expand Down

0 comments on commit f8b69e8

Please sign in to comment.