Skip to content

Commit

Permalink
Add multiple update selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
wilwell committed Dec 5, 2024
1 parent 6748cc7 commit fae9001
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 122 deletions.
42 changes: 19 additions & 23 deletions api/v1/ytsaurus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
)

// EmbeddedPersistentVolumeClaim is an embedded version of k8s.io/api/core/v1.PersistentVolumeClaim.
Expand Down Expand Up @@ -645,11 +647,14 @@ type YtsaurusSpec struct {
//+optional
EnableFullUpdate bool `json:"enableFullUpdate"`
//+optional
//+kubebuilder:validation:Enum={"","Nothing","MasterOnly","DataNodesOnly","TabletNodesOnly","ExecNodesOnly","StatelessOnly","Everything"}
// UpdateSelector is an experimental field. Behaviour may change.
// If UpdateSelector is not empty EnableFullUpdate is ignored.
//+optional
// Deprecated: UpdateSelector is an experimental field. Behaviour may change.
UpdateSelector UpdateSelector `json:"updateSelector"`

//+optional
// Controls the components that should be updated during the update process.
UpdateSelectors []ComponentUpdateSelector `json:"updateSelectors,omitempty"`

NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`

Expand Down Expand Up @@ -725,26 +730,17 @@ type TabletCellBundleInfo struct {

type UpdateSelector string

const (
// UpdateSelectorUnspecified means that selector is disabled and would be ignored completely.
UpdateSelectorUnspecified UpdateSelector = ""
// UpdateSelectorNothing means that no component could be updated.
UpdateSelectorNothing UpdateSelector = "Nothing"
// UpdateSelectorMasterOnly means that only master could be updated.
UpdateSelectorMasterOnly UpdateSelector = "MasterOnly"
// UpdateSelectorTabletNodesOnly means that only data nodes could be updated
UpdateSelectorDataNodesOnly UpdateSelector = "DataNodesOnly"
// UpdateSelectorTabletNodesOnly means that only tablet nodes could be updated
UpdateSelectorTabletNodesOnly UpdateSelector = "TabletNodesOnly"
// UpdateSelectorExecNodesOnly means that only tablet nodes could be updated
UpdateSelectorExecNodesOnly UpdateSelector = "ExecNodesOnly"
// UpdateSelectorStatelessOnly means that only stateless components (everything but master, data nodes, and tablet nodes)
// could be updated.
UpdateSelectorStatelessOnly UpdateSelector = "StatelessOnly"
// UpdateSelectorEverything means that all components could be updated.
// With this setting and if master or tablet nodes need update all the components would be updated.
UpdateSelectorEverything UpdateSelector = "Everything"
)
type ComponentUpdateSelector struct {
//+optional
Component consts.ComponentType `json:"componentKind,omitempty"`
//+optional
ComponentGroup consts.ComponentGroup `json:"componentGroup,omitempty"`
//+kubebuilder:default:=false
//+optional
Update bool `json:"update"`

// TODO(#325): Add name field for specific sts and rolling options
}

type UpdateFlow string

Expand Down
20 changes: 20 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 16 additions & 11 deletions config/crd/bases/cluster.ytsaurus.tech_ytsaurus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34657,18 +34657,23 @@ spec:
uiImage:
type: string
updateSelector:
description: UpdateSelector is an experimental field. Behaviour may
change.
enum:
- ""
- Nothing
- MasterOnly
- DataNodesOnly
- TabletNodesOnly
- ExecNodesOnly
- StatelessOnly
- Everything
description: 'Deprecated: UpdateSelector is an experimental field.
Behaviour may change.'
type: string
updateSelectors:
description: Controls the components that should be updated during
the update process.
items:
properties:
componentGroup:
type: string
componentKind:
type: string
update:
default: false
type: boolean
type: object
type: array
useIpv4:
default: false
type: boolean
Expand Down
143 changes: 76 additions & 67 deletions controllers/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,60 +393,81 @@ type updateMeta struct {
componentNames []string
}

func canUpdateComponent(selector ytv1.UpdateSelector, component consts.ComponentType) bool {
switch selector {
case ytv1.UpdateSelectorNothing:
return false
case ytv1.UpdateSelectorMasterOnly:
return component == consts.MasterType
case ytv1.UpdateSelectorDataNodesOnly:
return component == consts.DataNodeType
case ytv1.UpdateSelectorTabletNodesOnly:
return component == consts.TabletNodeType
case ytv1.UpdateSelectorExecNodesOnly:
return component == consts.ExecNodeType
case ytv1.UpdateSelectorStatelessOnly:
switch component {
case consts.MasterType:
return false
case consts.DataNodeType:
return false
case consts.TabletNodeType:
return false
}
return true
case ytv1.UpdateSelectorEverything:
return true
default:
return false
func getFlowFromComponent(component consts.ComponentType) ytv1.UpdateFlow {
if component == consts.MasterType {
return ytv1.UpdateFlowMaster
}
if component == consts.TabletNodeType {
return ytv1.UpdateFlowTabletNodes
}
if component == consts.DataNodeType || component == consts.ExecNodeType {
return ytv1.UpdateFlowFull
}
return ytv1.UpdateFlowStateless
}

func canUpdateComponent(selectors []ytv1.ComponentUpdateSelector, component consts.ComponentType) (bool, error) {
for _, selector := range selectors {
if selector.Component != "" {
if selector.Component == component {
return true, nil
}
} else {
switch selector.ComponentGroup {
case consts.ComponentGroupEverything:
return true, nil
case consts.ComponentGroupStateful:
if component == consts.DataNodeType || component == consts.TabletNodeType {
return true, nil
}
case consts.ComponentGroupStateless:
if component != consts.DataNodeType && component != consts.TabletNodeType && component != consts.MasterType {
return true, nil
}
default:
return false, fmt.Errorf("unexpected component group %s", selector.ComponentGroup)
}
}
}
return false, nil
}

// chooseUpdateFlow considers spec and decides if operator should proceed with update or block.
// Block case is indicated with non-empty blockMsg.
// If update is not blocked, updateMeta containing a chosen flow and the component names to update returned.
func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component) (meta updateMeta, blockMsg string) {
configuredSelector := spec.UpdateSelector
if configuredSelector == ytv1.UpdateSelectorUnspecified {
if spec.EnableFullUpdate {
configuredSelector = ytv1.UpdateSelectorEverything
} else {
configuredSelector = ytv1.UpdateSelectorStatelessOnly
}
func chooseUpdateFlow(ctx context.Context, spec ytv1.YtsaurusSpec, needUpdate []components.Component) (meta updateMeta, blockMsg string) {
logger := log.FromContext(ctx)
configuredSelectors := spec.UpdateSelectors
if len(configuredSelectors) == 0 && spec.EnableFullUpdate {
configuredSelectors = []ytv1.ComponentUpdateSelector{{ComponentGroup: consts.ComponentGroupEverything, Update: true}}
}
for _, selector := range configuredSelectors {
logger.Info("NEW LOG: Configured selector", "component", selector.Component, "group", selector.ComponentGroup)
}
needFullUpdate := false

var canUpdate []string
var cannotUpdate []string
needFullUpdate := false
flows := make(map[ytv1.UpdateFlow]struct{})

for _, comp := range needUpdate {
component := comp.GetType()
if canUpdateComponent(configuredSelector, component) {
canUpdate = append(canUpdate, string(component))
upd, err := canUpdateComponent(configuredSelectors, component)
if err != nil {
return updateMeta{}, err.Error()
}
if upd {
canUpdate = append(canUpdate, comp.GetName())
flows[getFlowFromComponent(component)] = struct{}{}
logger.Info("NEW LOG: Can update", "component", component, "flow", getFlowFromComponent(component), "name", comp.GetName(), "type", comp.GetType())
} else {
cannotUpdate = append(cannotUpdate, string(component))
logger.Info("NEW LOG: Can't update", "component", component)
}
if !canUpdateComponent(ytv1.UpdateSelectorStatelessOnly, component) && component != consts.DataNodeType {

statelessCheck, err := canUpdateComponent([]ytv1.ComponentUpdateSelector{{ComponentGroup: consts.ComponentGroupStateless}}, component)

Check failure on line 468 in controllers/sync.go

View workflow job for this annotation

GitHub Actions / Run checks

SA4006: this value of `err` is never used (staticcheck)
if !statelessCheck && component != consts.DataNodeType {
logger.Info("NEW LOG: Need full update", "component", component)
needFullUpdate = true
}
}
Expand All @@ -458,37 +479,25 @@ func chooseUpdateFlow(spec ytv1.YtsaurusSpec, needUpdate []components.Component)
return updateMeta{}, "All components are uptodate"
}

switch configuredSelector {
case ytv1.UpdateSelectorEverything:
if spec.EnableFullUpdate {
if needFullUpdate {
return updateMeta{
flow: ytv1.UpdateFlowFull,
componentNames: nil,
}, ""
return updateMeta{flow: ytv1.UpdateFlowFull, componentNames: canUpdate}, ""
} else {
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: canUpdate,
}, ""
}
case ytv1.UpdateSelectorMasterOnly:
return updateMeta{
flow: ytv1.UpdateFlowMaster,
componentNames: canUpdate,
}, ""
case ytv1.UpdateSelectorTabletNodesOnly:
return updateMeta{
flow: ytv1.UpdateFlowTabletNodes,
componentNames: canUpdate,
}, ""
case ytv1.UpdateSelectorDataNodesOnly, ytv1.UpdateSelectorExecNodesOnly, ytv1.UpdateSelectorStatelessOnly:
return updateMeta{
flow: ytv1.UpdateFlowStateless,
componentNames: canUpdate,
}, ""
default:
return updateMeta{}, fmt.Sprintf("Unexpected update selector %s", configuredSelector)
return updateMeta{flow: ytv1.UpdateFlowStateless, componentNames: canUpdate}, ""
}
}

if len(flows) == 0 {
return updateMeta{}, fmt.Sprintf("Unexpected state: no flows for components {%s}", strings.Join(canUpdate, ", "))
}

if len(flows) == 1 {
for flow := range flows {
return updateMeta{flow: flow, componentNames: canUpdate}, ""
}
}
// If more than one flow is possible, we choose to follow full update flow.
return updateMeta{flow: ytv1.UpdateFlowFull, componentNames: canUpdate}, ""
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
Expand Down Expand Up @@ -543,7 +552,7 @@ func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus)
needUpdateNames = append(needUpdateNames, c.GetName())
}
logger = logger.WithValues("componentsForUpdateAll", needUpdateNames)
meta, blockMsg := chooseUpdateFlow(ytsaurus.GetResource().Spec, needUpdate)
meta, blockMsg := chooseUpdateFlow(ctx, ytsaurus.GetResource().Spec, needUpdate)
if blockMsg != "" {
logger.Info(blockMsg)
return ctrl.Result{Requeue: true}, nil
Expand Down
2 changes: 2 additions & 0 deletions controllers/ytsaurus_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

ytv1 "github.com/ytsaurus/ytsaurus-k8s-operator/api/v1"
"github.com/ytsaurus/ytsaurus-k8s-operator/controllers"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/consts"
"github.com/ytsaurus/ytsaurus-k8s-operator/pkg/testutil"
)

Expand Down Expand Up @@ -129,6 +130,7 @@ func TestYtsaurusUpdateStatelessComponent(t *testing.T) {
ytsaurusResource.Spec.Discovery.Image = &imageUpdated
t.Log("[ Updating discovery with disabled full update ]")
ytsaurusResource.Spec.EnableFullUpdate = false
ytsaurusResource.Spec.UpdateSelectors = []ytv1.ComponentUpdateSelector{{ComponentGroup: consts.ComponentGroupStateless}}
testutil.UpdateObject(h, &ytv1.Ytsaurus{}, &ytsaurusResource)

waitClusterState(h, ytv1.ClusterStateRunning)
Expand Down
21 changes: 20 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,24 @@ _Appears in:_
| `imagePullSecrets` _[LocalObjectReference](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#localobjectreference-v1-core) array_ | | | |


#### ComponentUpdateSelector







_Appears in:_
- [YtsaurusSpec](#ytsaurusspec)

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `componentKind` _[ComponentType](#componenttype)_ | | | |
| `componentGroup` _[ComponentGroup](#componentgroup)_ | | | |
| `update` _boolean_ | | false | |


#### ControllerAgentsSpec


Expand Down Expand Up @@ -2204,7 +2222,8 @@ _Appears in:_
| `oauthService` _[OauthServiceSpec](#oauthservicespec)_ | | | |
| `isManaged` _boolean_ | | true | |
| `enableFullUpdate` _boolean_ | | true | |
| `updateSelector` _[UpdateSelector](#updateselector)_ | UpdateSelector is an experimental field. Behaviour may change.<br />If UpdateSelector is not empty EnableFullUpdate is ignored. | | Enum: [ Nothing MasterOnly DataNodesOnly TabletNodesOnly ExecNodesOnly StatelessOnly Everything] <br /> |
| `updateSelector` _[UpdateSelector](#updateselector)_ | Deprecated: UpdateSelector is an experimental field. Behaviour may change. | | |
| `updateSelectors` _[ComponentUpdateSelector](#componentupdateselector) array_ | Controls the components that should be updated during the update process. | | |
| `nodeSelector` _object (keys:string, values:string)_ | | | |
| `tolerations` _[Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#toleration-v1-core) array_ | | | |
| `bootstrap` _[BootstrapSpec](#bootstrapspec)_ | | | |
Expand Down
11 changes: 10 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (

"go.uber.org/zap/zapcore"

"github.com/ytsaurus/ytsaurus-k8s-operator/controllers"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/ytsaurus/ytsaurus-k8s-operator/controllers"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down Expand Up @@ -67,6 +68,14 @@ func main() {
opts := zap.Options{
Development: true,
TimeEncoder: zapcore.ISO8601TimeEncoder,
//DestWriter: func() io.Writer {
// file, err := os.OpenFile("log.out", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
// if err != nil {
// setupLog.Error(err, "unable to open log file")
// os.Exit(1)
// }
// return file
//}(),

Check failure on line 78 in main.go

View workflow job for this annotation

GitHub Actions / Run checks

commentFormatting: put a space between `//` and comment text (gocritic)
}
opts.BindFlags(flag.CommandLine)
flag.Parse()
Expand Down
Loading

0 comments on commit fae9001

Please sign in to comment.