Skip to content

Commit

Permalink
fix(host): defer cancel context in loop block (#21955)
Browse files Browse the repository at this point in the history
* fix(host): defer cancel context in loop block

* fix(host): reconnect disk when loop device already mounted or mount point busy

* fix(region): pass server stop params to container stop task
  • Loading branch information
zexi authored Jan 13, 2025
1 parent 7b263dd commit 4f31176
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/compute/guestdrivers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (p *SPodDriver) OnGuestDeployTaskDataReceived(ctx context.Context, guest *m
}

func (p *SPodDriver) StartGuestStopTask(guest *models.SGuest, ctx context.Context, userCred mcclient.TokenCredential, params *jsonutils.JSONDict, parentTaskId string) error {
task, err := taskman.TaskManager.NewTask(ctx, "PodStopTask", guest, userCred, nil, parentTaskId, "", nil)
task, err := taskman.TaskManager.NewTask(ctx, "PodStopTask", guest, userCred, params, parentTaskId, "", nil)
if err != nil {
return errors.Wrap(err, "New PodStopTask")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/compute/models/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,12 @@ func (m *SContainerManager) StartBatchStartTask(ctx context.Context, userCred mc
return m.startBatchTask(ctx, userCred, "ContainerBatchStartTask", ctrs, nil, parentTaskId)
}

func (m *SContainerManager) StartBatchStopTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, timeout int, parentTaskId string) error {
func (m *SContainerManager) StartBatchStopTask(ctx context.Context, userCred mcclient.TokenCredential, ctrs []SContainer, timeout int, force bool, parentTaskId string) error {
params := make([]api.ContainerStopInput, len(ctrs))
for i := range ctrs {
params[i] = api.ContainerStopInput{
Timeout: timeout,
Force: force,
}
}
taskParams := jsonutils.NewDict()
Expand Down
12 changes: 11 additions & 1 deletion pkg/compute/tasks/pod_stop_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (t *PodStopTask) OnInit(ctx context.Context, obj db.IStandaloneModel, body
t.OnWaitContainerStopped(ctx, obj.(*models.SGuest), nil)
}

type ServerStopTaskParams struct {
IsForce bool `json:"is_force"`
Timeout int64 `json:"timeout"`
}

func (t *PodStopTask) OnWaitContainerStopped(ctx context.Context, pod *models.SGuest, _ jsonutils.JSONObject) {
pod.SetStatus(ctx, t.GetUserCred(), api.POD_STATUS_STOPPING_CONTAINER, "")
ctrs, err := models.GetContainerManager().GetContainersByPod(pod.GetId())
Expand All @@ -49,7 +54,12 @@ func (t *PodStopTask) OnWaitContainerStopped(ctx context.Context, pod *models.SG
t.OnContainerStopped(ctx, pod, nil)
} else {
t.SetStage("OnContainerStopped", nil)
if err := models.GetContainerManager().StartBatchStopTask(ctx, t.GetUserCred(), ctrs, 1, t.GetId()); err != nil {
input := new(ServerStopTaskParams)
t.GetParams().Unmarshal(input)
if input.Timeout == 0 {
input.Timeout = 1
}
if err := models.GetContainerManager().StartBatchStopTask(ctx, t.GetUserCred(), ctrs, int(input.Timeout), input.IsForce, t.GetId()); err != nil {
t.OnWaitContainerStoppedFailed(ctx, pod, jsonutils.NewString(err.Error()))
return
}
Expand Down
67 changes: 57 additions & 10 deletions pkg/hostman/container/volume_mount/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"path/filepath"
"strings"

"yunion.io/x/log"
"yunion.io/x/pkg/errors"
Expand Down Expand Up @@ -153,29 +154,75 @@ func (d disk) newPostOverlay() iDiskPostOverlay {
return newDiskPostOverlay(d)
}

func (d disk) Mount(pod volume_mount.IPodInfo, ctrId string, vm *hostapi.ContainerVolumeMount) error {
iDisk, gd, err := d.getPodDisk(pod, vm)
if err != nil {
return errors.Wrap(err, "get pod disk interface")
}
func (d disk) connectDisk(iDisk storageman.IDisk) (string, bool, error) {
drv, err := iDisk.GetContainerStorageDriver()
if err != nil {
return errors.Wrap(err, "get disk storage driver")
return "", false, errors.Wrap(err, "get disk storage driver")
}
devPath, isConnected, err := drv.CheckConnect(iDisk.GetPath())
if err != nil {
return errors.Wrapf(err, "CheckConnect %s", iDisk.GetPath())
return "", false, errors.Wrapf(err, "CheckConnect %s", iDisk.GetPath())
}
if !isConnected {
devPath, err = drv.ConnectDisk(iDisk.GetPath())
if err != nil {
return errors.Wrapf(err, "ConnectDisk %s", iDisk.GetPath())
return "", false, errors.Wrapf(err, "ConnectDisk %s", iDisk.GetPath())
}
}
mntPoint := pod.GetDiskMountPoint(iDisk)
if err := container_storage.Mount(devPath, mntPoint, gd.Fs); err != nil {
return devPath, isConnected, nil
}

func (d disk) mountDisk(devPath string, mntPoint string, fs string) error {
if err := container_storage.Mount(devPath, mntPoint, fs); err != nil {
return errors.Wrapf(err, "mount %s to %s", devPath, mntPoint)
}
return nil
}

func (d disk) connectDiskAndMount(drv container_storage.IContainerStorage, pod volume_mount.IPodInfo, iDisk storageman.IDisk, fs string) (string, error) {
devPath, isConnected, err := d.connectDisk(iDisk)
if err != nil {
return "", errors.Wrap(err, "connect disk")
}
mntPoint := pod.GetDiskMountPoint(iDisk)
mountErrs := []error{}
if err := d.mountDisk(devPath, mntPoint, fs); err != nil {
mountErrs = append(mountErrs, err)
if isConnected && strings.Contains(err.Error(), fmt.Sprintf("%s already mounted or mount point busy.", devPath)) {
// disconnect disk and mount agin
if err := drv.DisconnectDisk(iDisk.GetPath(), mntPoint); err != nil {
mountErrs = append(mountErrs, errors.Wrapf(err, "disconnect disk cause of mount point busy"))
return mntPoint, errors.NewAggregate(mountErrs)
}
devPath, _, err = d.connectDisk(iDisk)
if err != nil {
return mntPoint, errors.Wrap(err, "connect disk after disconnect")
}
if err := d.mountDisk(devPath, mntPoint, fs); err != nil {
mountErrs = append(mountErrs, errors.Wrapf(err, "mount disk after reconnect"))
return mntPoint, errors.NewAggregate(mountErrs)
}
return mntPoint, nil
}
return mntPoint, errors.Wrapf(err, "mount %s to %s", devPath, mntPoint)
}
return mntPoint, nil
}

func (d disk) Mount(pod volume_mount.IPodInfo, ctrId string, vm *hostapi.ContainerVolumeMount) error {
iDisk, gd, err := d.getPodDisk(pod, vm)
if err != nil {
return errors.Wrap(err, "get pod disk interface")
}
drv, err := iDisk.GetContainerStorageDriver()
if err != nil {
return errors.Wrap(err, "get disk storage driver")
}
mntPoint, err := d.connectDiskAndMount(drv, pod, iDisk, gd.Fs)
if err != nil {
return errors.Wrap(err, "connect disk and mount disk")
}

vmDisk := vm.Disk
if vmDisk.SubDirectory != "" {
subDir := filepath.Join(mntPoint, vmDisk.SubDirectory)
Expand Down
17 changes: 10 additions & 7 deletions pkg/util/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,11 @@ func (c crictl) RemovePod(ctx context.Context, podId string) error {
interval := 5 * time.Second
errs := []error{}
for tries := 0; tries < maxTries; tries++ {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := c.removePod(ctx, podId)
err := func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return c.removePod(ctx, podId)
}()
if err == nil {
return nil
}
Expand Down Expand Up @@ -361,10 +363,11 @@ func (c crictl) stopContainerWithRetry(ctx context.Context, ctrId string, timeou
interval := 5 * time.Second
errs := []error{}
for tries := 0; tries < maxTries; tries++ {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err := c.stopContainer(ctx, ctrId, timeout)
err := func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
return c.stopContainer(ctx, ctrId, timeout)
}()
if err == nil {
return nil
}
Expand Down

0 comments on commit 4f31176

Please sign in to comment.