Skip to content

Commit

Permalink
UpdateSelector: implementation (#239)
Browse files Browse the repository at this point in the history
* Add UpdateStrategy field

* Strategy = selector + flow

* Unspecified + flow comment

* regenerate

* Selectors impl

* Unspecified
  • Loading branch information
l0kix2 authored Apr 23, 2024
1 parent 277e13c commit 567bc65
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 27 deletions.
287 changes: 265 additions & 22 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"fmt"
"time"

"github.com/ytsaurus/yt-k8s-operator/pkg/components"
Expand All @@ -14,7 +15,7 @@ import (
apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
)

func (r *YtsaurusReconciler) handleUpdatingStateFullMode(
func (r *YtsaurusReconciler) handleEverything(
ctx context.Context,
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
Expand Down Expand Up @@ -160,7 +161,7 @@ func (r *YtsaurusReconciler) handleUpdatingStateFullMode(
return nil, nil
}

func (r *YtsaurusReconciler) handleUpdatingStateLocalMode(
func (r *YtsaurusReconciler) handleStateless(
ctx context.Context,
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
Expand Down Expand Up @@ -232,6 +233,159 @@ func (r *YtsaurusReconciler) handleUpdatingStateLocalMode(
return nil, nil
}

func (r *YtsaurusReconciler) handleMasterOnly(
ctx context.Context,
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
ytsaurus.LogUpdate(ctx, "Checking the possibility of updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStatePossibilityCheck)
return &ctrl.Result{Requeue: true}, err

case ytv1.UpdateStatePossibilityCheck:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionHasPossibility) {
ytsaurus.LogUpdate(ctx, "Waiting for safe mode enabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSafeModeEnabled)
return &ctrl.Result{Requeue: true}, err
} else if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionNoPossibility) {
ytsaurus.LogUpdate(ctx, "Update is impossible, need to apply previous images")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateImpossibleToStart)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForSafeModeEnabled:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionSafeModeEnabled) {
ytsaurus.LogUpdate(ctx, "Waiting for tablet cells saving")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSnapshots)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForSnapshots:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionSnaphotsSaved) {
ytsaurus.LogUpdate(ctx, "Waiting for pods removal")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForPodsRemoval)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForPodsRemoval:
if componentManager.arePodsRemoved() {
ytsaurus.LogUpdate(ctx, "Waiting for pods creation")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForPodsCreation)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForPodsCreation:
if componentManager.allReadyOrUpdating() {
ytsaurus.LogUpdate(ctx, "All components were recreated")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForMasterExitReadOnly)
return &ctrl.Result{RequeueAfter: time.Second * 7}, err
}

case ytv1.UpdateStateWaitingForMasterExitReadOnly:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionMasterExitedReadOnly) {
ytsaurus.LogUpdate(ctx, "Masters exited read-only state")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForSafeModeDisabled)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForSafeModeDisabled:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionSafeModeDisabled) {
ytsaurus.LogUpdate(ctx, "Finishing")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateUpdateFinishing)
return &ctrl.Result{Requeue: true}, err
}
}
return nil, nil
}

func (r *YtsaurusReconciler) handleTabletNodesOnly(
ctx context.Context,
ytsaurus *apiProxy.Ytsaurus,
componentManager *ComponentManager,
) (*ctrl.Result, error) {
resource := ytsaurus.GetResource()

switch resource.Status.UpdateStatus.State {
case ytv1.UpdateStateNone:
ytsaurus.LogUpdate(ctx, "Checking the possibility of updating")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStatePossibilityCheck)
return &ctrl.Result{Requeue: true}, err

case ytv1.UpdateStatePossibilityCheck:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionHasPossibility) {
ytsaurus.LogUpdate(ctx, "Waiting for safe mode enabled")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForTabletCellsSaving)
return &ctrl.Result{Requeue: true}, err
} else if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionNoPossibility) {
ytsaurus.LogUpdate(ctx, "Update is impossible, need to apply previous images")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateImpossibleToStart)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateImpossibleToStart:
if !componentManager.needSync() || !ytsaurus.GetResource().Spec.EnableFullUpdate {
ytsaurus.LogUpdate(ctx, "Spec changed back or full update isn't enabled, update is canceling")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateCancelUpdate)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForTabletCellsSaving:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsSaved) {
ytsaurus.LogUpdate(ctx, "Waiting for tablet cells removing to start")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForTabletCellsRemovingStart)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForTabletCellsRemovingStart:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRemovingStarted) {
ytsaurus.LogUpdate(ctx, "Waiting for tablet cells removing to finish")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForTabletCellsRemoved)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForTabletCellsRemoved:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRemoved) {
ytsaurus.LogUpdate(ctx, "Waiting for snapshots")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForPodsRemoval)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForPodsRemoval:
if componentManager.arePodsRemoved() {
ytsaurus.LogUpdate(ctx, "Waiting for pods creation")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForPodsCreation)
return &ctrl.Result{Requeue: true}, err
}

case ytv1.UpdateStateWaitingForPodsCreation:
if componentManager.allReadyOrUpdating() {
ytsaurus.LogUpdate(ctx, "All components were recreated")
err := ytsaurus.SaveUpdateState(ctx, ytv1.UpdateStateWaitingForTabletCellsRecovery)
return &ctrl.Result{RequeueAfter: time.Second * 7}, err
}

case ytv1.UpdateStateWaitingForTabletCellsRecovery:
if ytsaurus.IsUpdateStatusConditionTrue(consts.ConditionTabletCellsRecovered) {
ytsaurus.LogUpdate(ctx, "Finishing")
err := ytsaurus.SaveClusterState(ctx, ytv1.ClusterStateUpdateFinishing)
return &ctrl.Result{Requeue: true}, err
}
}

return nil, nil
}

func getComponentNames(components []components.Component) []string {
if components == nil {
return nil
Expand All @@ -243,35 +397,112 @@ func getComponentNames(components []components.Component) []string {
return names
}

// chooseUpdateStrategy considers spec decides if operator should proceed with update or block.
// Block is indicated with non-empty blockMsg.
// Component names which are chosen for update are return in names slice.
// Nil names slice means "full update".
func chooseUpdateStrategy(spec ytv1.YtsaurusSpec, needUpdate []components.Component) (names []string, blockMsg string) {
type updateMeta struct {
flow ytv1.UpdateFlow
// componentNames is a list of component names that will be updated. It is built according to the update selector.
componentNames []string
}

// chooseUpdateFlow considers spec and decides if operator should proceed with update or block.
// Block case is indicated with non-empty blockMsg.
// If update is not blocked, updateMeta containing a chosen flow and the component names to update returned.
func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component) (meta updateMeta, blockMsg string) {
isFullUpdateEnabled := spec.EnableFullUpdate
configuredSelector := spec.UpdateSelector

masterNeedsUpdate := false
tabletNodesNeedUpdate := false
execNodesNeedUpdate := false
statelessNeedUpdate := false
var masterNames []string
var tabletNodeNames []string
var execNodeNames []string
var statelessNames []string
for _, comp := range needUpdate {
if comp.GetType() == consts.MasterType {
masterNeedsUpdate = true
masterNames = append(masterNames, comp.GetName())
continue
}
if comp.GetType() == consts.TabletNodeType {
tabletNodesNeedUpdate = true
tabletNodeNames = append(tabletNodeNames, comp.GetName())
continue
}
if comp.GetType() == consts.ExecNodeType {
execNodesNeedUpdate = true
execNodeNames = append(execNodeNames, comp.GetName())
}
statelessNames = append(statelessNames, comp.GetName())
statelessNeedUpdate = true
}
statefulNeedUpdate := masterNeedsUpdate || tabletNodesNeedUpdate

if statefulNeedUpdate {
if isFullUpdateEnabled {
return nil, ""
} else {
return nil, "Full update is not allowed by enableFullUpdate field, ignoring it"
allNamesNeedingUpdate := getComponentNames(needUpdate)

// Fallback to EnableFullUpdate field.
if configuredSelector == ytv1.UpdateSelectorUnspecified {
if statefulNeedUpdate {
if isFullUpdateEnabled {
return updateMeta{flow: ytv1.UpdateFlowFull, componentNames: nil}, ""
} else {
return updateMeta{flow: "", componentNames: nil}, "Full update is not allowed by enableFullUpdate field, ignoring it"
}
}
return updateMeta{flow: ytv1.UpdateFlowStateless, componentNames: allNamesNeedingUpdate}, ""
}

switch configuredSelector {
case ytv1.UpdateSelectorNothing:
return updateMeta{}, "All updates are blocked by updateSelector field."
case ytv1.UpdateSelectorEverything:
if statefulNeedUpdate {
return updateMeta{
flow: ytv1.UpdateFlowFull,
componentNames: nil,
}, ""
} else {
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: allNamesNeedingUpdate,
}, ""
}
case ytv1.UpdateSelectorMasterOnly:
if !masterNeedsUpdate {
return updateMeta{}, "Only Master update is allowed by updateSelector, but it doesn't need update"
}
return updateMeta{
flow: ytv1.UpdateFlowMaster,
componentNames: masterNames,
}, ""
case ytv1.UpdateSelectorTabletNodesOnly:
if !tabletNodesNeedUpdate {
return updateMeta{}, "Only Tablet nodes update is allowed by updateSelector, but they don't need update"
}
return updateMeta{
flow: ytv1.UpdateFlowTabletNodes,
componentNames: tabletNodeNames,
}, ""
case ytv1.UpdateSelectorExecNodesOnly:
if !execNodesNeedUpdate {
return updateMeta{}, "Only Exec nodes update is allowed by updateSelector, but they don't need update"
}
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: execNodeNames,
}, ""
case ytv1.UpdateSelectorStatelessOnly:
if !statelessNeedUpdate {
return updateMeta{}, "Only stateless components update is allowed by updateSelector, but they don't need update"
}
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: statelessNames,
}, ""
default:
// TODO: just validate it in hook
return updateMeta{}, fmt.Sprintf("Unexpected update selector %s", configuredSelector)
}
return getComponentNames(needUpdate), ""
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
Expand Down Expand Up @@ -314,23 +545,35 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)
return ctrl.Result{Requeue: true}, err

case componentManager.needUpdate() != nil:
componentNames, blockMsg := chooseUpdateStrategy(ytsaurus.GetResource().Spec, componentManager.needUpdate())
meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, componentManager.needUpdate())
if blockMsg != "" {
logger.Info(blockMsg)
return ctrl.Result{}, nil
return ctrl.Result{Requeue: true}, nil
}
logger.Info("Ytsaurus needs components update", "components", componentNames)
err := ytsaurus.SaveUpdatingClusterState(ctx, componentNames)
return ctrl.Result{Requeue: true}, err
logger.Info("Ytsaurus needs components update",
"components", meta.componentNames,
"flow", meta.flow,
)
err = ytsaurus.SaveUpdatingClusterState(ctx, meta.flow, meta.componentNames)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{Requeue: true}, nil
}

case ytv1.ClusterStateUpdating:
var result *ctrl.Result
var err error
if ytsaurus.GetLocalUpdatingComponents() != nil {
result, err = r.handleUpdatingStateLocalMode(ctx, ytsaurus, componentManager)
} else {
result, err = r.handleUpdatingStateFullMode(ctx, ytsaurus, componentManager)

switch ytsaurus.GetUpdateFlow() {
case ytv1.UpdateFlowFull:
result, err = r.handleEverything(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowStateless:
result, err = r.handleStateless(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowMaster:
result, err = r.handleMasterOnly(ctx, ytsaurus, componentManager)
case ytv1.UpdateFlowTabletNodes:
result, err = r.handleTabletNodesOnly(ctx, ytsaurus, componentManager)
}

if result != nil {
Expand Down
8 changes: 7 additions & 1 deletion pkg/apiproxy/ytsaurus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *Ytsaurus) GetLocalUpdatingComponents() []string {
return c.ytsaurus.Status.UpdateStatus.Components
}

func (c *Ytsaurus) GetUpdateFlow() ytv1.UpdateFlow {
return c.ytsaurus.Status.UpdateStatus.Flow
}

func (c *Ytsaurus) IsUpdateStatusConditionTrue(condition string) bool {
return meta.IsStatusConditionTrue(c.ytsaurus.Status.UpdateStatus.Conditions, condition)
}
Expand All @@ -73,6 +77,7 @@ func (c *Ytsaurus) ClearUpdateStatus(ctx context.Context) error {
c.ytsaurus.Status.UpdateStatus.TabletCellBundles = make([]ytv1.TabletCellBundleInfo, 0)
c.ytsaurus.Status.UpdateStatus.MasterMonitoringPaths = make([]string, 0)
c.ytsaurus.Status.UpdateStatus.Components = nil
c.ytsaurus.Status.UpdateStatus.Flow = ytv1.UpdateFlowNone
return c.apiProxy.UpdateStatus(ctx)
}

Expand All @@ -82,9 +87,10 @@ func (c *Ytsaurus) LogUpdate(ctx context.Context, message string) {
logger.Info(fmt.Sprintf("Ytsaurus update: %s", message))
}

func (c *Ytsaurus) SaveUpdatingClusterState(ctx context.Context, components []string) error {
func (c *Ytsaurus) SaveUpdatingClusterState(ctx context.Context, flow ytv1.UpdateFlow, components []string) error {
logger := log.FromContext(ctx)
c.ytsaurus.Status.State = ytv1.ClusterStateUpdating
c.ytsaurus.Status.UpdateStatus.Flow = flow
c.ytsaurus.Status.UpdateStatus.Components = components

if err := c.apiProxy.UpdateStatus(ctx); err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/components/ytsaurus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,6 @@ func (yc *YtsaurusClient) GetYtClient() yt.Client {
}

func (yc *YtsaurusClient) HandlePossibilityCheck(ctx context.Context) (ok bool, msg string, err error) {
if !yc.ytsaurus.GetResource().Spec.EnableFullUpdate {
return false, "Full update is not enabled", nil
}

// Check tablet cell bundles.
notGoodBundles, err := GetNotGoodTabletCellBundles(ctx, yc.ytClient)
if err != nil {
Expand Down

0 comments on commit 567bc65

Please sign in to comment.