From 4f14eaa3892400d20adac7bc636d3a1587a5f3ec Mon Sep 17 00:00:00 2001 From: Kirill Sibirev Date: Thu, 21 Mar 2024 16:52:02 +0100 Subject: [PATCH] Incapsulate flows in components --- controllers/component_manager.go | 48 +++-- controllers/component_registry.go | 150 ++++++++++++++ controllers/helpers.go | 65 +++++++ controllers/sync.go | 4 +- controllers/ytsaurus_flow.go | 128 ++++++++++++ controllers/ytsaurus_sync.go | 60 ++++++ pkg/components/component.go | 109 +++++++++-- pkg/components/conditions.go | 249 ++++++++++++++++++++++++ pkg/components/config_helper.go | 6 +- pkg/components/controller_agent.go | 24 ++- pkg/components/data_node.go | 22 ++- pkg/components/discovery.go | 93 ++++++++- pkg/components/discovery_local_test.go | 89 +++++++++ pkg/components/exec_node.go | 18 +- pkg/components/httpproxy.go | 28 +-- pkg/components/master.go | 193 +++++++++++++++++- pkg/components/master_caches.go | 6 +- pkg/components/master_local_test.go | 106 ++++++++++ pkg/components/microservice.go | 8 +- pkg/components/query_tracker.go | 39 ++-- pkg/components/queue_agent.go | 56 +++--- pkg/components/rpcproxy.go | 24 ++- pkg/components/scheduler.go | 70 ++++--- pkg/components/server.go | 26 +++ pkg/components/strawberry_controller.go | 53 ++--- pkg/components/suite_test.go | 10 +- pkg/components/tablet_node.go | 12 +- pkg/components/tablet_node_test.go | 24 +-- pkg/components/tcpproxy.go | 22 ++- pkg/components/ui.go | 20 +- pkg/components/yql_agent.go | 24 ++- pkg/components/ytsaurus_client.go | 24 ++- 32 files changed, 1574 insertions(+), 236 deletions(-) create mode 100644 controllers/component_registry.go create mode 100644 controllers/ytsaurus_flow.go create mode 100644 controllers/ytsaurus_sync.go create mode 100644 pkg/components/conditions.go create mode 100644 pkg/components/discovery_local_test.go create mode 100644 pkg/components/master_local_test.go diff --git a/controllers/component_manager.go b/controllers/component_manager.go index 8eb6e28d..91b027da 100644 --- a/controllers/component_manager.go +++ b/controllers/component_manager.go @@ -1,5 +1,7 @@ package controllers +// TODO: file will be deleted after this refactoring. No need to review changes. + import ( "context" "time" @@ -40,18 +42,23 @@ func NewComponentManager( cfgen := ytconfig.NewGenerator(resource, clusterDomain) d := components.NewDiscovery(cfgen, ytsaurus) - m := components.NewMaster(cfgen, ytsaurus) + //m := components.NewMaster(cfgen, ytsaurus) var hps []components.Component for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { - hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec)) + //hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec)) + hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, hpSpec)) } - yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0]) + //yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0]) + yc := components.NewYtsaurusClient(cfgen, ytsaurus) + + m := components.NewMaster(cfgen, ytsaurus, yc) var dnds []components.Component nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain) if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 { for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes { - dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec)) + //dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec)) + dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, dndSpec)) } } @@ -64,14 +71,16 @@ func NewComponentManager( allComponents = append(allComponents, hps...) if resource.Spec.UI != nil { - ui := components.NewUI(cfgen, ytsaurus, m) + //ui := components.NewUI(cfgen, ytsaurus, m) + ui := components.NewUI(cfgen, ytsaurus) allComponents = append(allComponents, ui) } if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 { var rps []components.Component for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies { - rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec)) + //rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec)) + rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, rpSpec)) } allComponents = append(allComponents, rps...) } @@ -79,7 +88,8 @@ func NewComponentManager( if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 { var tps []components.Component for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies { - tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec)) + //tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec)) + tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, tpSpec)) } allComponents = append(allComponents, tps...) } @@ -87,7 +97,8 @@ func NewComponentManager( var ends []components.Component if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 { for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes { - ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec)) + //ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec)) + ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, endSpec)) } } allComponents = append(allComponents, ends...) @@ -101,33 +112,38 @@ func NewComponentManager( allComponents = append(allComponents, tnds...) if resource.Spec.Schedulers != nil { - s = components.NewScheduler(cfgen, ytsaurus, m, ends, tnds) + //s = components.NewScheduler(cfgen, ytsaurus, m, ends, tnds) + s = components.NewScheduler(cfgen, ytsaurus, len(tnds)) allComponents = append(allComponents, s) } if resource.Spec.ControllerAgents != nil { - ca := components.NewControllerAgent(cfgen, ytsaurus, m) + //ca := components.NewControllerAgent(cfgen, ytsaurus, m) + ca := components.NewControllerAgent(cfgen, ytsaurus) allComponents = append(allComponents, ca) } var q components.Component if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { - q = components.NewQueryTracker(cfgen, ytsaurus, yc, tnds) + q = components.NewQueryTracker(cfgen, ytsaurus, yc, len(tnds)) allComponents = append(allComponents, q) } if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { - qa := components.NewQueueAgent(cfgen, ytsaurus, yc, m, tnds) + //qa := components.NewQueueAgent(cfgen, ytsaurus, yc, m, tnds) + qa := components.NewQueueAgent(cfgen, ytsaurus, yc, len(tnds)) allComponents = append(allComponents, qa) } if resource.Spec.YQLAgents != nil { - yqla := components.NewYQLAgent(cfgen, ytsaurus, m) + //yqla := components.NewYQLAgent(cfgen, ytsaurus, m) + yqla := components.NewYQLAgent(cfgen, ytsaurus) allComponents = append(allComponents, yqla) } if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil { - strawberry := components.NewStrawberryController(cfgen, ytsaurus, m, s, dnds) + //strawberry := components.NewStrawberryController(cfgen, ytsaurus, m, s, dnds) + strawberry := components.NewStrawberryController(cfgen, ytsaurus) allComponents = append(allComponents, strawberry) } @@ -153,7 +169,7 @@ func NewComponentManager( return nil, err } - componentStatus := c.Status(ctx) + componentStatus := c.StatusOld(ctx) c.SetReadyCondition(componentStatus) syncStatus := componentStatus.SyncStatus @@ -205,7 +221,7 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) { hasPending := false for _, c := range cm.allComponents { - status := c.Status(ctx) + status := c.StatusOld(ctx) if status.SyncStatus == components.SyncStatusPending || status.SyncStatus == components.SyncStatusUpdating { diff --git a/controllers/component_registry.go b/controllers/component_registry.go new file mode 100644 index 00000000..64f1c326 --- /dev/null +++ b/controllers/component_registry.go @@ -0,0 +1,150 @@ +package controllers + +import ( + "context" + + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +type component interface { + //Fetch(ctx context.Context) error + Sync(ctx context.Context) error + Status(ctx context.Context) (components.ComponentStatus, error) + GetName() string + GetType() consts.ComponentType +} + +type componentRegistry struct { + comps map[string]component + byType map[consts.ComponentType][]component +} + +func (cr *componentRegistry) add(comp component) { + cr.comps[comp.GetName()] = comp + compsOfSameType := cr.byType[comp.GetType()] + compsOfSameType = append(compsOfSameType, comp) +} + +func (cr *componentRegistry) list() []component { + var result []component + for _, comp := range cr.comps { + result = append(result, comp) + } + return result +} +func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component { + var result []component + for _, compType := range types { + result = append(result, cr.byType[compType]...) + } + return result +} + +func buildComponentRegistry( + ytsaurus *apiProxy.Ytsaurus, +) *componentRegistry { + registry := &componentRegistry{ + comps: make(map[string]component), + } + + resource := ytsaurus.GetResource() + clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client()) + cfgen := ytconfig.NewGenerator(resource, clusterDomain) + + yc := components.NewYtsaurusClient(cfgen, ytsaurus) + registry.add(yc) + + d := components.NewDiscovery(cfgen, ytsaurus) + registry.add(d) + + m := components.NewMaster(cfgen, ytsaurus, yc) + registry.add(m) + + for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies { + hp := components.NewHTTPProxy(cfgen, ytsaurus, hpSpec) + registry.add(hp) + } + + nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain) + if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 { + for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes { + dnd := components.NewDataNode(nodeCfgGen, ytsaurus, dndSpec) + registry.add(dnd) + } + } + + if resource.Spec.UI != nil { + ui := components.NewUI(cfgen, ytsaurus) + registry.add(ui) + } + + if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 { + for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies { + rp := components.NewRPCProxy(cfgen, ytsaurus, rpSpec) + registry.add(rp) + } + } + + if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 { + for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies { + tp := components.NewTCPProxy(cfgen, ytsaurus, tpSpec) + registry.add(tp) + } + } + + if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 { + for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes { + end := components.NewExecNode(nodeCfgGen, ytsaurus, endSpec) + registry.add(end) + } + } + + tndCount := 0 + if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes { + tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0) + registry.add(tnd) + tndCount++ + } + } + if resource.Spec.Schedulers != nil { + s := components.NewScheduler(cfgen, ytsaurus, tndCount) + registry.add(s) + } + + if resource.Spec.ControllerAgents != nil { + ca := components.NewControllerAgent(cfgen, ytsaurus) + registry.add(ca) + } + + var q component + if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + q = components.NewQueryTracker(cfgen, ytsaurus, yc, tndCount) + registry.add(q) + } + + if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 { + qa := components.NewQueueAgent(cfgen, ytsaurus, yc, tndCount) + registry.add(qa) + } + + if resource.Spec.YQLAgents != nil { + yqla := components.NewYQLAgent(cfgen, ytsaurus) + registry.add(yqla) + } + + if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil { + strawberry := components.NewStrawberryController(cfgen, ytsaurus) + registry.add(strawberry) + } + + if resource.Spec.MasterCaches != nil { + mc := components.NewMasterCache(cfgen, ytsaurus) + registry.add(mc) + } + + return registry +} diff --git a/controllers/helpers.go b/controllers/helpers.go index 0603c1f7..e6157385 100644 --- a/controllers/helpers.go +++ b/controllers/helpers.go @@ -1,11 +1,18 @@ package controllers import ( + "context" + "fmt" "net" "os" "strings" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" ) const ( @@ -30,3 +37,61 @@ func getClusterDomain(client client.Client) string { return clusterDomain } + +func logComponentStatuses( + ctx context.Context, + registry *componentRegistry, + statuses map[string]components.ComponentStatus, + componentsOrder [][]consts.ComponentType, + resource *ytv1.Ytsaurus, +) { + logger := log.FromContext(ctx) + + var readyComponents []string + var notReadyComponents []string + + for batchIndex, typesInBatch := range componentsOrder { + compsInBatch := registry.listByType(typesInBatch...) + for _, comp := range compsInBatch { + name := comp.GetName() + status := statuses[name] + + if status.SyncStatus == components.SyncStatusReady { + readyComponents = append(readyComponents, name) + } else { + notReadyComponents = append(notReadyComponents, name) + } + + logger.V(1).Info( + fmt.Sprintf( + "%d.%s %s: %s", + batchIndex, + statusToSymbol(status.SyncStatus), + name, + status.Message, + ), + ) + } + } + + // NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus + logger.Info("Ytsaurus sync status", + "notReadyComponents", notReadyComponents, + "readyComponents", readyComponents, + "updateState", resource.Status.UpdateStatus.State, + "clusterState", resource.Status.State) + +} + +func statusToSymbol(st components.SyncStatus) string { + switch st { + case components.SyncStatusReady: + return "[v]" + case components.SyncStatusBlocked: + return "[x]" + case components.SyncStatusUpdating: + return "[.]" + default: + return "[ ]" + } +} diff --git a/controllers/sync.go b/controllers/sync.go index 7717303b..367f0046 100644 --- a/controllers/sync.go +++ b/controllers/sync.go @@ -1,5 +1,7 @@ package controllers +// TODO: file will be deleted after this refactoring. No need to review changes. + import ( "context" "time" @@ -243,7 +245,7 @@ func getComponentNames(components []components.Component) []string { return names } -func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { +func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { logger := log.FromContext(ctx) if !resource.Spec.IsManaged { diff --git a/controllers/ytsaurus_flow.go b/controllers/ytsaurus_flow.go new file mode 100644 index 00000000..086a2447 --- /dev/null +++ b/controllers/ytsaurus_flow.go @@ -0,0 +1,128 @@ +package controllers + +import ( + "context" + "errors" + "fmt" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" + "github.com/ytsaurus/yt-k8s-operator/pkg/consts" +) + +func getStatuses( + ctx context.Context, + registry *componentRegistry, +) (map[string]components.ComponentStatus, error) { + statuses := make(map[string]components.ComponentStatus) + for _, c := range registry.list() { + componentStatus, err := c.Status(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get component %s status: %w", c.GetName(), err) + } + statuses[c.GetName()] = componentStatus + } + + return statuses, nil +} + +// componentsOrder is an order in which components will be built. +// The main rules are: +// - if component A needs component B for building (running jobs, using yt client, etc), it +// should be placed in some of the sections after component B section. +var componentsOrder = [][]consts.ComponentType{ + { + // Discovery doesn't depend on anyone. + consts.DiscoveryType, + // Proxy are placed first since it is needed for ytsaurus client to work. + consts.HttpProxyType, + }, + { + consts.YtsaurusClientType, + }, + { + // Master uses ytsaurus client when being updated, so if both are down, + // ytsaurus client (and proxies) should be built first. + consts.MasterType, + }, + { + consts.UIType, + consts.RpcProxyType, + consts.TcpProxyType, + consts.DataNodeType, + consts.MasterCacheType, + }, + { + consts.TabletNodeType, + consts.ExecNodeType, + }, + { + consts.SchedulerType, + consts.ControllerAgentType, + consts.QueryTrackerType, + consts.QueueAgentType, + consts.YqlAgentType, + }, + { + consts.StrawberryControllerType, + }, +} + +func syncComponents( + ctx context.Context, + registry *componentRegistry, + resource *ytv1.Ytsaurus, +) (components.ComponentStatus, error) { + statuses, err := getStatuses(ctx, registry) + if err != nil { + return components.ComponentStatus{}, err + } + logComponentStatuses(ctx, registry, statuses, componentsOrder, resource) + + var batchToSync []component + for _, typesInBatch := range componentsOrder { + compsInBatch := registry.listByType(typesInBatch...) + for _, comp := range compsInBatch { + status := statuses[comp.GetName()] + if status.SyncStatus != components.SyncStatusReady && batchToSync == nil { + batchToSync = compsInBatch + } + } + } + + if batchToSync == nil { + // YTsaurus is running and happy. + return components.ComponentStatus{SyncStatus: components.SyncStatusReady}, nil + } + + // Run sync for non-ready components in the batch. + batchNotReadyStatuses := make(map[string]components.ComponentStatus) + var errList []error + for _, comp := range batchToSync { + status := statuses[comp.GetName()] + if status.SyncStatus == components.SyncStatusReady { + continue + } + batchNotReadyStatuses[comp.GetName()] = status + if err = comp.Sync(ctx); err != nil { + errList = append(errList, fmt.Errorf("failed to sync %s: %w", comp.GetName(), err)) + } + } + + if len(errList) != 0 { + return components.ComponentStatus{}, errors.Join(errList...) + } + + // Choosing the most important status for the batch to report up. + batchStatus := components.ComponentStatus{ + SyncStatus: components.SyncStatusUpdating, + Message: "", + } + for compName, st := range batchNotReadyStatuses { + if st.SyncStatus == components.SyncStatusBlocked { + batchStatus.SyncStatus = components.SyncStatusBlocked + } + batchStatus.Message += fmt.Sprintf("; %s=%s (%s)", compName, st.SyncStatus, st.Message) + } + return batchStatus, nil +} diff --git a/controllers/ytsaurus_sync.go b/controllers/ytsaurus_sync.go new file mode 100644 index 00000000..b31650b6 --- /dev/null +++ b/controllers/ytsaurus_sync.go @@ -0,0 +1,60 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/components" +) + +var ( + requeueNot = ctrl.Result{Requeue: false} + requeueASAP = ctrl.Result{Requeue: true} + requeueSoon = ctrl.Result{RequeueAfter: 1 * time.Second} + requeueLater = ctrl.Result{RequeueAfter: 1 * time.Minute} +) + +func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + if !resource.Spec.IsManaged { + logger.Info("Ytsaurus cluster is not managed by controller, do nothing") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + ytsaurus := apiProxy.NewYtsaurus(resource, r.Client, r.Recorder, r.Scheme) + compRegistry := buildComponentRegistry(ytsaurus) + st, err := syncComponents(ctx, compRegistry, ytsaurus.GetResource()) + if err != nil { + return requeueASAP, fmt.Errorf("failed to sync components: %w", err) + } + + var requeue ctrl.Result + var clusterState ytv1.ClusterState + + switch st.SyncStatus { + case components.SyncStatusReady: + logger.Info("YTsaurus running and happy") + requeue = requeueNot + clusterState = ytv1.ClusterStateRunning + case components.SyncStatusBlocked: + logger.Info("Components update is blocked. Human is needed. %s", st.Message) + requeue = requeueLater + clusterState = ytv1.ClusterStateCancelUpdate + default: + requeue = requeueSoon + clusterState = ytv1.ClusterStateUpdating + } + + err = ytsaurus.SaveClusterState(ctx, clusterState) + if err != nil { + return requeueASAP, fmt.Errorf("failed to save cluster state to %s", clusterState) + } + return requeue, nil +} diff --git a/pkg/components/component.go b/pkg/components/component.go index b5cb751d..c4ed39a3 100644 --- a/pkg/components/component.go +++ b/pkg/components/component.go @@ -6,6 +6,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" ) @@ -13,12 +14,14 @@ import ( type SyncStatus string const ( - SyncStatusBlocked SyncStatus = "Blocked" SyncStatusNeedFullUpdate SyncStatus = "NeedFullUpdate" SyncStatusNeedLocalUpdate SyncStatus = "NeedLocalUpdate" SyncStatusPending SyncStatus = "Pending" - SyncStatusReady SyncStatus = "Ready" - SyncStatusUpdating SyncStatus = "Updating" + + SyncStatusNeedSync SyncStatus = "NeedSync" + SyncStatusUpdating SyncStatus = "Updating" + SyncStatusReady SyncStatus = "Ready" + SyncStatusBlocked SyncStatus = "Blocked" ) func IsRunningStatus(status SyncStatus) bool { @@ -42,10 +45,23 @@ func SimpleStatus(status SyncStatus) ComponentStatus { return ComponentStatus{status, string(status)} } +func NeedSyncStatus(message string) ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusNeedSync, Message: message} +} + +func UpdatingStatus(message string) ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusUpdating, Message: message} +} + +func ReadyStatus() ComponentStatus { + return ComponentStatus{SyncStatus: SyncStatusReady} +} + type Component interface { Fetch(ctx context.Context) error Sync(ctx context.Context) error - Status(ctx context.Context) ComponentStatus + Status(ctx context.Context) (ComponentStatus, error) + StatusOld(ctx context.Context) ComponentStatus GetName() string SetReadyCondition(status ComponentStatus) @@ -53,6 +69,32 @@ type Component interface { IsUpdatable() bool } +type conditionManagerIface interface { + SetTrue(context.Context, ConditionName) error + SetTrueMsg(context.Context, ConditionName, string) error + SetFalse(context.Context, ConditionName) error + SetFalseMsg(context.Context, ConditionName, string) error + Set(context.Context, ConditionName, bool) error + SetMsg(context.Context, ConditionName, bool, string) error + SetCond(context.Context, Condition) error + SetCondMany(context.Context, ...Condition) error + SetCondMsg(context.Context, Condition, string) error + IsTrue(ConditionName) bool + IsFalse(ConditionName) bool + Is(condition Condition) bool + IsSatisfied(condition Condition) bool + IsNotSatisfied(condition Condition) bool + All(conds ...Condition) bool + Any(conds ...Condition) bool +} + +type stateManagerInterface interface { + SetTabletCellBundles(context.Context, []ytv1.TabletCellBundleInfo) error + GetTabletCellBundles() []ytv1.TabletCellBundleInfo + SetMasterMonitoringPaths(context.Context, []string) error + GetMasterMonitoringPaths() []string +} + // Following structs are used as a base for implementing YTsaurus components objects. // baseComponent is a base struct intendend for use in the simplest components and remote components // (the ones that don't have access to the ytsaurus resource). @@ -73,6 +115,11 @@ func (c *baseComponent) GetName() string { type localComponent struct { baseComponent ytsaurus *apiproxy.Ytsaurus + + // currently we have it in the component, but in the future we may + // want to receive it from the outside of the component. + condManager conditionManagerIface + stateManager stateManagerInterface } // localServerComponent is a base structs for components which have access to ytsaurus resource, @@ -89,6 +136,8 @@ func newLocalComponent( return localComponent{ baseComponent: baseComponent{labeller: labeller}, ytsaurus: ytsaurus, + condManager: newConditionManagerFromYtsaurus(ytsaurus), + stateManager: newStateManagerFromYtsaurus(ytsaurus), } } @@ -111,13 +160,8 @@ func newLocalServerComponent( server server, ) localServerComponent { return localServerComponent{ - localComponent: localComponent{ - baseComponent: baseComponent{ - labeller: labeller, - }, - ytsaurus: ytsaurus, - }, - server: server, + localComponent: newLocalComponent(labeller, ytsaurus), + server: server, } } @@ -129,3 +173,46 @@ func LocalServerNeedSync(srv server, ytsaurus *apiproxy.Ytsaurus) bool { return (srv.configNeedsReload() && ytsaurus.IsUpdating()) || srv.needBuild() } + +// tmp + +func (c *localServerComponent) runUntilNoErr( + ctx context.Context, + run func(ctx context.Context) error, + onSuccess Condition, +) error { + if err := run(ctx); err != nil { + return fmt.Errorf("failed to run %s for cond %s: %w", c.GetName(), onSuccess, err) + } + if err := c.condManager.SetCondMsg(ctx, onSuccess, "run once finished successfully"); err != nil { + return fmt.Errorf("failed to set condition %s: %w", onSuccess, err) + } + return nil +} + +func (c *localServerComponent) runUntilOk( + ctx context.Context, + run func(ctx context.Context) (bool, error), + onSuccess Condition, +) error { + return c.runUntilOkWithCleanup(ctx, run, nil, onSuccess) +} + +func (c *localServerComponent) runUntilOkWithCleanup(ctx context.Context, run func(ctx context.Context) (bool, error), cleanup func(ctx context.Context) error, onSuccess Condition) error { + done, err := run(ctx) + if err != nil { + return fmt.Errorf("failed to run %s for cond %s: %w", c.GetName(), onSuccess, err) + } + if !done { + return nil + } + if cleanup != nil { + if err = cleanup(ctx); err != nil { + return fmt.Errorf("cleanup failed: %w", err) + } + } + if err = c.condManager.SetCond(ctx, onSuccess); err != nil { + return fmt.Errorf("failed to set conditions %s: %w", onSuccess, err) + } + return nil +} diff --git a/pkg/components/conditions.go b/pkg/components/conditions.go new file mode 100644 index 00000000..3f0ab4aa --- /dev/null +++ b/pkg/components/conditions.go @@ -0,0 +1,249 @@ +package components + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" +) + +type ConditionName string + +type Condition struct { + Name ConditionName + Val bool +} + +func (c Condition) String() string { + if c.Val { + return string(c.Name) + } + return fmt.Sprintf("!%s", c.Name) +} + +func not(condDep Condition) Condition { + return Condition{ + Name: condDep.Name, + Val: !condDep.Val, + } +} +func isTrue(cond ConditionName) Condition { + return Condition{Name: cond, Val: true} +} + +// buildFinished means that component was fully built initally. +func buildStarted(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sBuildStarted", compName))) +} +func buildFinished(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sBuildFinished", compName))) +} +func initializationStarted(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sInitializationStarted", compName))) +} +func initializationFinished(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%snitializationFinished", compName))) +} +func updateRequired(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sUpdateRequired", compName))) +} +func podsRemoved(compName string) Condition { + return isTrue(ConditionName(fmt.Sprintf("%sPodsRemoved", compName))) +} + +type baseStateManager struct { + client client.Client + ytsaurus *ytv1.Ytsaurus +} + +type conditionManager struct { + baseStateManager +} +type stateManager struct { + baseStateManager +} + +func newConditionManagerFromYtsaurus(ytsaurus *apiproxy.Ytsaurus) *conditionManager { + return newConditionManager( + ytsaurus.APIProxy().Client(), + ytsaurus.GetResource(), + ) +} + +func newConditionManager(client client.Client, ytsaurus *ytv1.Ytsaurus) *conditionManager { + return &conditionManager{ + baseStateManager{ + client: client, + ytsaurus: ytsaurus, + }, + } +} + +func newStateManagerFromYtsaurus(ytsaurus *apiproxy.Ytsaurus) *stateManager { + return newStateManager( + ytsaurus.APIProxy().Client(), + ytsaurus.GetResource(), + ) +} + +func newStateManager(client client.Client, ytsaurus *ytv1.Ytsaurus) *stateManager { + return &stateManager{ + baseStateManager{ + client: client, + ytsaurus: ytsaurus, + }, + } +} + +func (m *baseStateManager) updateStatusRetryOnConflict(ctx context.Context, change func(ytsaurusResource *ytv1.Ytsaurus)) error { + tryUpdate := func(ytsaurus *ytv1.Ytsaurus) error { + change(ytsaurus) + // You have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + err := m.client.Status().Update(ctx, ytsaurus) + if err == nil { + m.ytsaurus = ytsaurus + } + return err + } + + err := tryUpdate(m.ytsaurus) + if err == nil || !errors.IsConflict(err) { + return err + } + + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Fetch the resource here; you need to refetch it on every try, since + // if you got a conflict on the last update attempt then you need to get + // the current version before making your own changes. + ytsaurus := ytv1.Ytsaurus{} + name := types.NamespacedName{ + Namespace: m.ytsaurus.Namespace, + Name: m.ytsaurus.Name, + } + if err = m.client.Get(ctx, name, &ytsaurus); err != nil { + return err + } + + return tryUpdate(&ytsaurus) + }) +} + +func (cm *conditionManager) SetTrue(ctx context.Context, condName ConditionName) error { + return cm.SetTrueMsg(ctx, condName, "") +} +func (cm *conditionManager) SetTrueMsg(ctx context.Context, condName ConditionName, msg string) error { + return cm.SetMsg(ctx, condName, true, msg) +} +func (cm *conditionManager) SetFalse(ctx context.Context, condName ConditionName) error { + return cm.SetFalseMsg(ctx, condName, "") +} +func (cm *conditionManager) SetFalseMsg(ctx context.Context, condName ConditionName, msg string) error { + return cm.SetMsg(ctx, condName, false, msg) +} +func (cm *conditionManager) Set(ctx context.Context, condName ConditionName, val bool) error { + return cm.SetMsg(ctx, condName, val, "") +} +func (cm *conditionManager) SetCond(ctx context.Context, cond Condition) error { + return cm.SetMsg(ctx, cond.Name, cond.Val, "") +} +func (cm *conditionManager) SetCondMany(ctx context.Context, conds ...Condition) error { + var metaconds []metav1.Condition + for _, cond := range conds { + metaconds = append(metaconds, cm.buildCond(cond.Name, cond.Val, "")) + } + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + for _, metacond := range metaconds { + meta.SetStatusCondition(&ytsaurus.Status.Conditions, metacond) + } + }) +} +func (cm *conditionManager) SetCondMsg(ctx context.Context, cond Condition, msg string) error { + return cm.SetMsg(ctx, cond.Name, cond.Val, msg) +} +func (cm *conditionManager) buildCond(condName ConditionName, val bool, msg string) metav1.Condition { + return metav1.Condition{ + Type: string(condName), + Status: map[bool]metav1.ConditionStatus{ + true: metav1.ConditionTrue, + false: metav1.ConditionFalse, + }[val], + // DO we need better reason? + Reason: string(condName), + Message: msg, + } +} +func (cm *conditionManager) SetMsg(ctx context.Context, condName ConditionName, val bool, msg string) error { + metacond := cm.buildCond(condName, val, msg) + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + meta.SetStatusCondition(&ytsaurus.Status.Conditions, metacond) + }) +} +func (cm *conditionManager) IsTrue(condName ConditionName) bool { + return meta.IsStatusConditionTrue(cm.ytsaurus.Status.Conditions, string(condName)) +} +func (cm *conditionManager) IsFalse(condName ConditionName) bool { + return !cm.IsTrue(condName) +} +func (cm *conditionManager) Is(cond Condition) bool { + return cm.IsSatisfied(cond) +} +func (cm *conditionManager) All(conds ...Condition) bool { + for _, cond := range conds { + if cm.IsNotSatisfied(cond) { + return false + } + } + return true +} +func (cm *conditionManager) Any(conds ...Condition) bool { + for _, cond := range conds { + if cm.IsSatisfied(cond) { + return true + } + } + return false +} +func (cm *conditionManager) IsSatisfied(cond Condition) bool { + return cm.IsTrue(cond.Name) == cond.Val +} +func (cm *conditionManager) IsNotSatisfied(cond Condition) bool { + return !cm.IsSatisfied(cond) +} +func (cm *conditionManager) Get(condName ConditionName) bool { + if cm.IsTrue(condName) { + return true + } else { + return false + } +} + +func (cm *stateManager) SetClusterState(ctx context.Context, clusterState ytv1.ClusterState) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.State = clusterState + }) +} +func (cm *stateManager) SetTabletCellBundles(ctx context.Context, cells []ytv1.TabletCellBundleInfo) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.UpdateStatus.TabletCellBundles = cells + }) +} +func (cm *stateManager) SetMasterMonitoringPaths(ctx context.Context, paths []string) error { + return cm.updateStatusRetryOnConflict(ctx, func(ytsaurus *ytv1.Ytsaurus) { + ytsaurus.Status.UpdateStatus.MasterMonitoringPaths = paths + }) +} +func (cm *stateManager) GetTabletCellBundles() []ytv1.TabletCellBundleInfo { + return cm.ytsaurus.Status.UpdateStatus.TabletCellBundles +} +func (cm *stateManager) GetMasterMonitoringPaths() []string { + return cm.ytsaurus.Status.UpdateStatus.MasterMonitoringPaths +} diff --git a/pkg/components/config_helper.go b/pkg/components/config_helper.go index e2adef03..c262c4d4 100644 --- a/pkg/components/config_helper.go +++ b/pkg/components/config_helper.go @@ -7,12 +7,13 @@ import ( "reflect" "github.com/google/go-cmp/cmp" + "go.ytsaurus.tech/yt/go/yson" + corev1 "k8s.io/api/core/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" "github.com/ytsaurus/yt-k8s-operator/pkg/resources" "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" - "go.ytsaurus.tech/yt/go/yson" - corev1 "k8s.io/api/core/v1" ) const ( @@ -192,6 +193,7 @@ func (h *ConfigHelper) Build() *corev1.ConfigMap { for fileName := range h.generators { data, err := h.getConfig(fileName) if err != nil { + // TODO: fix suppression of the error, it will fail with NPE in places of call return nil } diff --git a/pkg/components/controller_agent.go b/pkg/components/controller_agent.go index a012a9d5..c8708121 100644 --- a/pkg/components/controller_agent.go +++ b/pkg/components/controller_agent.go @@ -15,11 +15,15 @@ import ( type ControllerAgent struct { localServerComponent - cfgen *ytconfig.Generator - master Component + cfgen *ytconfig.Generator + //master Component } -func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *ControllerAgent { +func NewControllerAgent( + cfgen *ytconfig.Generator, + ytsaurus *apiproxy.Ytsaurus, + // master Component, +) *ControllerAgent { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -46,7 +50,7 @@ func NewControllerAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, return &ControllerAgent{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: master, + //master: master, } } @@ -73,9 +77,9 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu } } - if !IsRunningStatus(ca.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, ca.master.GetName()), err - } + //if !IsRunningStatus(ca.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, ca.master.GetName()), err + //} if ca.NeedSync() { if !dry { @@ -91,7 +95,11 @@ func (ca *ControllerAgent) doSync(ctx context.Context, dry bool) (ComponentStatu return SimpleStatus(SyncStatusReady), err } -func (ca *ControllerAgent) Status(ctx context.Context) ComponentStatus { +func (ca *ControllerAgent) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (ca *ControllerAgent) StatusOld(ctx context.Context) ComponentStatus { status, err := ca.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/data_node.go b/pkg/components/data_node.go index 17ad3224..c5586c72 100644 --- a/pkg/components/data_node.go +++ b/pkg/components/data_node.go @@ -15,14 +15,14 @@ import ( type DataNode struct { localServerComponent - cfgen *ytconfig.NodeGenerator - master Component + cfgen *ytconfig.NodeGenerator + //master Component } func NewDataNode( cfgen *ytconfig.NodeGenerator, ytsaurus *apiproxy.Ytsaurus, - master Component, + //master Component, spec ytv1.DataNodesSpec, ) *DataNode { resource := ytsaurus.GetResource() @@ -53,7 +53,7 @@ func NewDataNode( return &DataNode{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: master, + //master: master, } } @@ -79,10 +79,10 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error return *status, err } } - - if !IsRunningStatus(n.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err - } + // + //if !IsRunningStatus(n.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err + //} if n.NeedSync() { if !dry { @@ -98,7 +98,11 @@ func (n *DataNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error return SimpleStatus(SyncStatusReady), err } -func (n *DataNode) Status(ctx context.Context) ComponentStatus { +func (n *DataNode) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (n *DataNode) StatusOld(ctx context.Context) ComponentStatus { status, err := n.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/discovery.go b/pkg/components/discovery.go index f2e0b2c6..3f0f90ed 100644 --- a/pkg/components/discovery.go +++ b/pkg/components/discovery.go @@ -2,6 +2,7 @@ package components import ( "context" + "fmt" "go.ytsaurus.tech/library/go/ptr" @@ -87,16 +88,96 @@ func (d *Discovery) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } -func (d *Discovery) Status(ctx context.Context) ComponentStatus { - status, err := d.doSync(ctx, true) +func (d *Discovery) Status(ctx context.Context) (ComponentStatus, error) { + if err := d.Fetch(ctx); err != nil { + return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", d.GetName(), err) + } + + if d.condManager.Is(not(buildFinished(d.GetName()))) { + return NeedSyncStatus("initial build not yet have finished"), nil + } + + needUpdate, err := d.server.hasDiff(ctx) if err != nil { - panic(err) + return ComponentStatus{}, err } - return status + if needUpdate || d.condManager.Is(updateRequired(d.GetName())) { + return NeedSyncStatus("component needs update"), nil + } + + return ReadyStatus(), nil +} + +func (d *Discovery) StatusOld(ctx context.Context) ComponentStatus { + st, err := d.Status(ctx) + if err != nil { + panic(err) + } + return st } func (d *Discovery) Sync(ctx context.Context) error { - _, err := d.doSync(ctx, false) - return err + srv := d.server.(*serverImpl) + + // Initial component creation. + builtStartedCond := buildStarted(d.GetName()) + if d.condManager.Is(not(builtStartedCond)) { + return d.runUntilNoErr(ctx, d.server.Sync, builtStartedCond) + } + + builtCond := buildFinished(d.GetName()) + if d.condManager.Is(not(builtCond)) { + return d.runUntilOk(ctx, func(ctx context.Context) (bool, error) { + diff, err := d.server.hasDiff(ctx) + return !diff, err + }, builtCond) + } + + // Update in case of a diff. + needUpdate, err := srv.hasDiff(ctx) + if err != nil { + return err + } + updateRequiredCond := updateRequired(d.GetName()) + if needUpdate { + if err = d.condManager.SetCond(ctx, updateRequiredCond); err != nil { + return err + } + } + if d.condManager.Is(updateRequiredCond) { + return d.runUntilOkWithCleanup(ctx, d.handleUpdate, d.handlePostUpdate, not(updateRequiredCond)) + } + + return nil +} + +func (d *Discovery) handleUpdate(ctx context.Context) (bool, error) { + podsWereRemoved := podsRemoved(d.GetName()) + if d.condManager.Is(not(podsWereRemoved)) { + return false, d.runUntilNoErr(ctx, d.server.removePods, podsWereRemoved) + } + return true, nil +} + +func (d *Discovery) handlePostUpdate(ctx context.Context) error { + for _, cond := range d.getConditionsSetByUpdate() { + if err := d.condManager.SetCond(ctx, not(cond)); err != nil { + return err + } + } + return nil +} + +func (d *Discovery) getConditionsSetByUpdate() []Condition { + var result []Condition + conds := []Condition{ + podsRemoved(d.GetName()), + } + for _, cond := range conds { + if d.condManager.IsSatisfied(cond) { + result = append(result, cond) + } + } + return result } diff --git a/pkg/components/discovery_local_test.go b/pkg/components/discovery_local_test.go new file mode 100644 index 00000000..517bf5d7 --- /dev/null +++ b/pkg/components/discovery_local_test.go @@ -0,0 +1,89 @@ +package components + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/record" + ptr "k8s.io/utils/pointer" + ctrlrt "sigs.k8s.io/controller-runtime" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/testutil" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +const ( + ytsaurusName = "testsaurus" +) + +func TestDiscoveryFlow(t *testing.T) { + ctx := context.Background() + namespace := "discovery" + + h := testutil.NewTestHelper(t, namespace, filepath.Join("..", "..", "config", "crd", "bases")) + h.Start(func(mgr ctrlrt.Manager) error { return nil }) + defer h.Stop() + + ytsaurusResource := testutil.BuildMinimalYtsaurus(namespace, ytsaurusName) + // Deploy of ytsaurus spec is required, so it could set valid owner references for child resources. + testutil.DeployObject(h, &ytsaurusResource) + + scheme := runtime.NewScheme() + utilruntime.Must(ytv1.AddToScheme(scheme)) + fakeRecorder := record.NewFakeRecorder(100) + + ytsaurus := apiProxy.NewYtsaurus(&ytsaurusResource, h.GetK8sClient(), fakeRecorder, scheme) + domain := "testdomain" + cfgen := ytconfig.NewGenerator(&ytsaurusResource, domain) + + // initial creation + discovery := NewDiscovery(cfgen, ytsaurus) + testutil.Eventually(h, "ds became ready", func() bool { + st, err := discovery.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, discovery.Sync(ctx)) + return false + }) + + cmData := testutil.FetchConfigMapData(h, "yt-discovery-config", "ytserver-discovery.yson") + require.Contains(t, cmData, "ms-0.masters."+namespace+".svc."+domain+":9010") + + testutil.FetchEventually( + h, + "ds", + &appsv1.StatefulSet{}, + ) + + // update + // + update2 to be sure that first update doesn't end with wrong state + for i := 1; i <= 2; i++ { + t.Logf("Update ds #%d", i) + newImage := ptr.String(fmt.Sprintf("new-image-%d", i)) + ytsaurusResource.Spec.Discovery.Image = newImage + + discovery = NewDiscovery(cfgen, ytsaurus) + testutil.Eventually(h, "ds became ready", func() bool { + st, err := discovery.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, discovery.Sync(ctx)) + return false + }) + sts := &appsv1.StatefulSet{} + testutil.GetObject(h, "ds", sts) + require.Equal(t, *newImage, sts.Spec.Template.Spec.Containers[0].Image) + } +} diff --git a/pkg/components/exec_node.go b/pkg/components/exec_node.go index ba26de5d..ba5c0c12 100644 --- a/pkg/components/exec_node.go +++ b/pkg/components/exec_node.go @@ -15,13 +15,13 @@ import ( type ExecNode struct { baseExecNode localComponent - master Component + //master Component } func NewExecNode( cfgen *ytconfig.NodeGenerator, ytsaurus *apiproxy.Ytsaurus, - master Component, + //master Component, spec ytv1.ExecNodesSpec, ) *ExecNode { resource := ytsaurus.GetResource() @@ -57,7 +57,7 @@ func NewExecNode( sidecars: spec.Sidecars, privileged: spec.Privileged, }, - master: master, + //master: master, } } @@ -80,9 +80,9 @@ func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error } } - if !IsRunningStatus(n.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err - } + //if !IsRunningStatus(n.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, n.master.GetName()), err + //} if LocalServerNeedSync(n.server, n.ytsaurus) { return n.doSyncBase(ctx, dry) @@ -95,7 +95,11 @@ func (n *ExecNode) doSync(ctx context.Context, dry bool) (ComponentStatus, error return SimpleStatus(SyncStatusReady), err } -func (n *ExecNode) Status(ctx context.Context) ComponentStatus { +func (n *ExecNode) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (n *ExecNode) StatusOld(ctx context.Context) ComponentStatus { status, err := n.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/httpproxy.go b/pkg/components/httpproxy.go index e9831f41..509f2aa2 100644 --- a/pkg/components/httpproxy.go +++ b/pkg/components/httpproxy.go @@ -19,8 +19,8 @@ type HttpProxy struct { localServerComponent cfgen *ytconfig.Generator - serviceType corev1.ServiceType - master Component + serviceType corev1.ServiceType + //master Component balancingService *resources.HTTPService role string @@ -30,7 +30,7 @@ type HttpProxy struct { func NewHTTPProxy( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - masterReconciler Component, + //masterReconciler Component, spec ytv1.HTTPProxiesSpec) *HttpProxy { resource := ytsaurus.GetResource() @@ -78,11 +78,11 @@ func NewHTTPProxy( return &HttpProxy{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: masterReconciler, - serviceType: spec.ServiceType, - role: spec.Role, - httpsSecret: httpsSecret, - balancingService: balancingService, + //master: masterReconciler, + serviceType: spec.ServiceType, + role: spec.Role, + httpsSecret: httpsSecret, + balancingService: balancingService, } } @@ -112,9 +112,9 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err } } - if !IsRunningStatus(hp.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, hp.master.GetName()), err - } + //if !IsRunningStatus(hp.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, hp.master.GetName()), err + //} if hp.NeedSync() { if !dry { @@ -144,7 +144,11 @@ func (hp *HttpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, err return SimpleStatus(SyncStatusReady), err } -func (hp *HttpProxy) Status(ctx context.Context) ComponentStatus { +func (hp *HttpProxy) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (hp *HttpProxy) StatusOld(ctx context.Context) ComponentStatus { status, err := hp.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/master.go b/pkg/components/master.go index 775a956f..3eca0c44 100644 --- a/pkg/components/master.go +++ b/pkg/components/master.go @@ -26,16 +26,37 @@ const ( defaultHostAddressLabel = "kubernetes.io/hostname" ) +var ( + masterUpdatePossibleCond = isTrue("MasterUpdatePossible") + masterSafeModeEnabledCond = isTrue("MasterSafeModeEnabled") + masterSnapshotsBuildStartedCond = isTrue("MasterSnapshotsBuildStarted") + masterSnapshotsBuildFinishedCond = isTrue("MasterSnapshotsBuildFinished") + masterExitReadOnlyPrepared = isTrue("MasterExitReadOnlyPrepared") + masterExitReadOnlyFinished = isTrue("MasterExitReadOnlyFinished") + masterSafeModeDisabledCond = isTrue("MasterSafeModeDisabled") +) + +type ytsaurusClientForMaster interface { + HandlePossibilityCheck(context.Context) (bool, string, error) + EnableSafeMode(context.Context) error + DisableSafeMode(context.Context) error + GetMasterMonitoringPaths(context.Context) ([]string, error) + StartBuildMasterSnapshots(context.Context, []string) error + AreMasterSnapshotsBuilt(context.Context, []string) (bool, error) +} + type Master struct { localServerComponent cfgen *ytconfig.Generator + ytClient ytsaurusClientForMaster + initJob *InitJob exitReadOnlyJob *InitJob adminCredentials corev1.Secret } -func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { +func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, ytClient ytsaurusClientForMaster) *Master { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -84,6 +105,7 @@ func NewMaster(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *Master { return &Master{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, + ytClient: ytClient, initJob: initJob, exitReadOnlyJob: exitReadOnlyJob, } @@ -318,7 +340,28 @@ func (m *Master) doSync(ctx context.Context, dry bool) (ComponentStatus, error) return m.initJob.Sync(ctx, dry) } -func (m *Master) Status(ctx context.Context) ComponentStatus { +func (m *Master) Status(ctx context.Context) (ComponentStatus, error) { + if err := m.Fetch(ctx); err != nil { + return ComponentStatus{}, fmt.Errorf("failed to fetch component %s: %w", m.GetName(), err) + } + + if m.condManager.Is(not(buildFinished(m.GetName()))) { + return NeedSyncStatus("initial build not yet have finished"), nil + } + + needUpdate, err := m.server.hasDiff(ctx) + if err != nil { + return ComponentStatus{}, err + } + + if needUpdate || m.condManager.Is(updateRequired(m.GetName())) { + return NeedSyncStatus("component needs update"), nil + } + + return ReadyStatus(), nil +} + +func (m *Master) StatusOld(ctx context.Context) ComponentStatus { status, err := m.doSync(ctx, true) if err != nil { panic(err) @@ -328,8 +371,142 @@ func (m *Master) Status(ctx context.Context) ComponentStatus { } func (m *Master) Sync(ctx context.Context) error { - _, err := m.doSync(ctx, false) - return err + // 1. Initial component creation. + builtStartedCond := buildStarted(m.GetName()) + if m.condManager.Is(not(builtStartedCond)) { + return m.runUntilNoErr(ctx, m.server.Sync, builtStartedCond) + } + + builtCond := buildFinished(m.GetName()) + if m.condManager.Is(not(builtCond)) { + return m.runUntilOk(ctx, func(ctx context.Context) (bool, error) { + diff, err := m.server.hasDiff(ctx) + return !diff, err + }, builtCond) + } + + // 2. Initialization once in a lifetime of the main component + initCond := initializationFinished(m.GetName()) + if m.condManager.Is(not(initCond)) { + return m.runUntilOk(ctx, m.handleInitialization, initCond) + } + + // 3. Update in case of a diff until full component update is completed. + needUpdate, err := m.server.hasDiff(ctx) + if err != nil { + return err + } + updateRequiredCond := updateRequired(m.GetName()) + if needUpdate { + if err = m.condManager.SetCond(ctx, updateRequiredCond); err != nil { + return err + } + } + if m.condManager.Is(updateRequiredCond) { + if m.condManager.Is(not(masterUpdatePossibleCond)) { + return m.runUntilOk(ctx, func(ctx context.Context) (bool, error) { + // TODO: put message in the condition reason + ok, _, err := m.ytClient.HandlePossibilityCheck(ctx) + if err != nil { + return false, err + } + return ok, nil + }, masterUpdatePossibleCond) + } + return m.runUntilOkWithCleanup( + ctx, + m.handleUpdate, + m.cleanupAfterUpdate, + not(updateRequiredCond), + ) + } + + return nil +} + +func (m *Master) handleInitialization(ctx context.Context) (bool, error) { + m.initJob.SetInitScript(m.createInitScript()) + st, err := m.initJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err +} + +func (m *Master) handleUpdate(ctx context.Context) (bool, error) { + if m.condManager.Is(not(masterSafeModeEnabledCond)) { + return false, m.runUntilNoErr(ctx, func(ctx context.Context) error { + return m.ytClient.EnableSafeMode(ctx) + }, masterSafeModeEnabledCond) + } + if m.condManager.Is(not(masterSnapshotsBuildStartedCond)) { + return false, m.runUntilNoErr(ctx, func(ctx context.Context) error { + monitoringPaths, err := m.ytClient.GetMasterMonitoringPaths(ctx) + if err != nil { + return err + } + if err = m.storeMasterMonitoringPaths(ctx, monitoringPaths); err != nil { + return err + } + + return m.ytClient.StartBuildMasterSnapshots(ctx, monitoringPaths) + }, masterSnapshotsBuildStartedCond) + } + if m.condManager.Is(not(masterSnapshotsBuildFinishedCond)) { + return false, m.runUntilOk(ctx, func(ctx context.Context) (bool, error) { + paths := m.getStoredMasterMonitoringPaths() + return m.ytClient.AreMasterSnapshotsBuilt(ctx, paths) + }, masterSnapshotsBuildFinishedCond) + } + podsWereRemoved := podsRemoved(m.GetName()) + if m.condManager.Is(not(podsWereRemoved)) { + return false, m.runUntilNoErr(ctx, m.server.removePods, podsWereRemoved) + } + if m.condManager.Is(not(masterExitReadOnlyPrepared)) { + return false, m.runUntilNoErr(ctx, func(ctx context.Context) error { + return m.exitReadOnlyJob.prepareRestart(ctx, false) + }, masterExitReadOnlyPrepared) + } + if m.condManager.Is(not(masterExitReadOnlyFinished)) { + return false, m.runUntilOk(ctx, func(ctx context.Context) (done bool, err error) { + m.exitReadOnlyJob.SetInitScript(m.createInitScript()) + st, err := m.exitReadOnlyJob.Sync(ctx, false) + return st.SyncStatus == SyncStatusReady, err + }, masterExitReadOnlyFinished) + } + if m.condManager.Is(not(masterSafeModeDisabledCond)) { + return false, m.runUntilNoErr(ctx, func(ctx context.Context) error { + return m.ytClient.DisableSafeMode(ctx) + }, masterSafeModeDisabledCond) + } + return true, nil +} + +func (m *Master) cleanupAfterUpdate(ctx context.Context) error { + for _, cond := range m.getConditionsSetByUpdate() { + if err := m.condManager.SetCond(ctx, not(cond)); err != nil { + return err + } + } + return nil +} + +func (m *Master) getConditionsSetByUpdate() []Condition { + var result []Condition + conds := []Condition{ + masterUpdatePossibleCond, + masterSafeModeEnabledCond, + masterSnapshotsBuildStartedCond, + masterSnapshotsBuildFinishedCond, + podsRemoved(m.GetName()), + masterExitReadOnlyPrepared, + masterExitReadOnlyFinished, + masterSafeModeDisabledCond, + updateRequired(m.GetName()), + } + for _, cond := range conds { + if m.condManager.IsSatisfied(cond) { + result = append(result, cond) + } + } + return result } func (m *Master) doServerSync(ctx context.Context) error { @@ -391,3 +568,11 @@ func (m *Master) setMasterReadOnlyExitPrepared(ctx context.Context, status metav Message: "Masters are ready to exit read-only state", }) } + +func (m *Master) storeMasterMonitoringPaths(ctx context.Context, paths []string) error { + return m.stateManager.SetMasterMonitoringPaths(ctx, paths) +} + +func (m *Master) getStoredMasterMonitoringPaths() []string { + return m.stateManager.GetMasterMonitoringPaths() +} diff --git a/pkg/components/master_caches.go b/pkg/components/master_caches.go index 5865fb21..3319fc9a 100644 --- a/pkg/components/master_caches.go +++ b/pkg/components/master_caches.go @@ -86,7 +86,11 @@ func (mc *MasterCache) doSync(ctx context.Context, dry bool) (ComponentStatus, e return SimpleStatus(SyncStatusReady), err } -func (mc *MasterCache) Status(ctx context.Context) ComponentStatus { +func (mc *MasterCache) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (mc *MasterCache) StatusOld(ctx context.Context) ComponentStatus { status, err := mc.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/master_local_test.go b/pkg/components/master_local_test.go new file mode 100644 index 00000000..caa739ca --- /dev/null +++ b/pkg/components/master_local_test.go @@ -0,0 +1,106 @@ +package components + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/record" + ptr "k8s.io/utils/pointer" + ctrlrt "sigs.k8s.io/controller-runtime" + + ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" + "github.com/ytsaurus/yt-k8s-operator/pkg/testutil" + "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" +) + +type fakeYtsaurusForMaster struct { +} + +func (y *fakeYtsaurusForMaster) HandlePossibilityCheck(context.Context) (bool, string, error) { + return true, "", nil +} +func (y *fakeYtsaurusForMaster) EnableSafeMode(context.Context) error { + return nil +} +func (y *fakeYtsaurusForMaster) DisableSafeMode(context.Context) error { + return nil +} +func (y *fakeYtsaurusForMaster) GetMasterMonitoringPaths(context.Context) ([]string, error) { + return []string{"path1", "path2"}, nil +} +func (y *fakeYtsaurusForMaster) StartBuildMasterSnapshots(context.Context, []string) error { + return nil +} +func (y *fakeYtsaurusForMaster) AreMasterSnapshotsBuilt(context.Context, []string) (bool, error) { + return true, nil +} + +func TestMasterFlow(t *testing.T) { + ctx := context.Background() + namespace := "master" + domain := "testdomain" + + h := testutil.NewTestHelper(t, namespace, filepath.Join("..", "..", "config", "crd", "bases")) + h.Start(func(mgr ctrlrt.Manager) error { return nil }) + defer h.Stop() + + ytsaurusResource := testutil.BuildMinimalYtsaurus(namespace, ytsaurusName) + // Deploy of ytsaurus spec is required, so it could set valid owner references for child resources. + testutil.DeployObject(h, &ytsaurusResource) + + scheme := runtime.NewScheme() + utilruntime.Must(ytv1.AddToScheme(scheme)) + fakeRecorder := record.NewFakeRecorder(100) + + ytsaurus := apiProxy.NewYtsaurus(&ytsaurusResource, h.GetK8sClient(), fakeRecorder, scheme) + cfgen := ytconfig.NewGenerator(&ytsaurusResource, domain) + + // initial creation + master := NewMaster(cfgen, ytsaurus, &fakeYtsaurusForMaster{}) + testutil.Eventually(h, "ms became ready", func() bool { + st, err := master.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, master.Sync(ctx)) + return false + }) + + cmData := testutil.FetchConfigMapData(h, "yt-master-config", "ytserver-master.yson") + require.Contains(t, cmData, "ms-0.masters."+namespace+".svc."+domain+":9010") + + testutil.FetchEventually( + h, + "ms", + &appsv1.StatefulSet{}, + ) + + // update + update #2 to be sure that first update doesn't end with wrong state + for i := 1; i <= 2; i++ { + t.Logf("Update ms #%d", i) + newImage := ptr.String(fmt.Sprintf("new-image-%d", i)) + ytsaurusResource.Spec.PrimaryMasters.Image = newImage + + master = NewMaster(cfgen, ytsaurus, &fakeYtsaurusForMaster{}) + testutil.Eventually(h, "ms became ready", func() bool { + st, err := master.Status(ctx) + require.NoError(t, err) + if st.SyncStatus == SyncStatusReady { + return true + } + require.NoError(t, master.Sync(ctx)) + return false + }) + sts := &appsv1.StatefulSet{} + testutil.GetObject(h, "ms", sts) + require.Equal(t, *newImage, sts.Spec.Template.Spec.Containers[0].Image) + } +} diff --git a/pkg/components/microservice.go b/pkg/components/microservice.go index 02312c4c..0be129b7 100644 --- a/pkg/components/microservice.go +++ b/pkg/components/microservice.go @@ -3,15 +3,17 @@ package components import ( "context" - v1 "github.com/ytsaurus/yt-k8s-operator/api/v1" ptr "k8s.io/utils/pointer" + v1 "github.com/ytsaurus/yt-k8s-operator/api/v1" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy" "github.com/ytsaurus/yt-k8s-operator/pkg/labeller" "github.com/ytsaurus/yt-k8s-operator/pkg/resources" "github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" ) // microservice manages common resources of YTsaurus service component diff --git a/pkg/components/query_tracker.go b/pkg/components/query_tracker.go index 299b3a85..61806b90 100644 --- a/pkg/components/query_tracker.go +++ b/pkg/components/query_tracker.go @@ -25,17 +25,19 @@ type QueryTracker struct { cfgen *ytconfig.Generator ytsaurusClient internalYtsaurusClient - tabletNodes []Component - initCondition string - initQTState *InitJob - secret *resources.StringSecret + //tabletNodes []Component + tabletNodesCount int + initCondition string + initQTState *InitJob + secret *resources.StringSecret } func NewQueryTracker( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient, - tabletNodes []Component, + tabletNodesCount int, + // tabletNodes []Component, ) *QueryTracker { resource := ytsaurus.GetResource() l := labeller.Labeller{ @@ -69,7 +71,7 @@ func NewQueryTracker( return &QueryTracker{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - tabletNodes: tabletNodes, + tabletNodesCount: tabletNodesCount, initCondition: "queryTrackerInitCompleted", ytsaurusClient: yc, initQTState: NewInitJob( @@ -154,21 +156,22 @@ func (qt *QueryTracker) doSync(ctx context.Context, dry bool) (ComponentStatus, } // Wait for tablet nodes to proceed with query tracker state init. - if qt.tabletNodes == nil || len(qt.tabletNodes) == 0 { + // TODO: this should be done in validation hook. + if qt.tabletNodesCount == 0 { return WaitingStatus(SyncStatusBlocked, "tablet nodes"), fmt.Errorf("cannot initialize query tracker without tablet nodes") } - for _, tnd := range qt.tabletNodes { - if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, "tablet nodes"), err - } - } + //for _, tnd := range qt.tabletNodes { + // if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, "tablet nodes"), err + // } + //} var ytClient yt.Client if qt.ytsaurus.GetClusterState() != ytv1.ClusterStateUpdating { - if qt.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { - return WaitingStatus(SyncStatusBlocked, qt.ytsaurusClient.GetName()), err - } + //if qt.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { + // return WaitingStatus(SyncStatusBlocked, qt.ytsaurusClient.GetName()), err + //} if !dry { ytClient = qt.ytsaurusClient.GetYtClient() @@ -344,7 +347,11 @@ func (qt *QueryTracker) init(ctx context.Context, ytClient yt.Client) (err error return } -func (qt *QueryTracker) Status(ctx context.Context) ComponentStatus { +func (qt *QueryTracker) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (qt *QueryTracker) StatusOld(ctx context.Context) ComponentStatus { status, err := qt.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/queue_agent.go b/pkg/components/queue_agent.go index ec976747..e1f7836b 100644 --- a/pkg/components/queue_agent.go +++ b/pkg/components/queue_agent.go @@ -25,19 +25,21 @@ type QueueAgent struct { cfgen *ytconfig.Generator ytsaurusClient internalYtsaurusClient - master Component - tabletNodes []Component - initCondition string - initQAState *InitJob - secret *resources.StringSecret + //master Component + tabletNodesCount int + //tabletNodes []Component + initCondition string + initQAState *InitJob + secret *resources.StringSecret } func NewQueueAgent( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, yc internalYtsaurusClient, - master Component, - tabletNodes []Component, + //master Component, + tabletNodesCount int, + // tabletNodes []Component, ) *QueueAgent { resource := ytsaurus.GetResource() l := labeller.Labeller{ @@ -71,10 +73,11 @@ func NewQueueAgent( return &QueueAgent{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: master, - tabletNodes: tabletNodes, - initCondition: "queueAgentInitCompleted", - ytsaurusClient: yc, + //master: master, + tabletNodesCount: tabletNodesCount, + //tabletNodes: tabletNodes, + initCondition: "queueAgentInitCompleted", + ytsaurusClient: yc, initQAState: NewInitJob( &l, ytsaurus.APIProxy(), @@ -118,19 +121,20 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er } } - if !IsRunningStatus(qa.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, qa.master.GetName()), err - } + //if !IsRunningStatus(qa.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, qa.master.GetName()), err + //} // It makes no sense to start queue agents without tablet nodes. - if qa.tabletNodes == nil || len(qa.tabletNodes) == 0 { + // TODO: this should be in the validation hook + if qa.tabletNodesCount == 0 { return WaitingStatus(SyncStatusBlocked, "tablet nodes"), fmt.Errorf("cannot initialize queue agent without tablet nodes") } - for _, tnd := range qa.tabletNodes { - if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err - } - } + //for _, tnd := range qa.tabletNodes { + // if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err + // } + //} if qa.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -157,9 +161,9 @@ func (qa *QueueAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er var ytClient yt.Client if qa.ytsaurus.GetClusterState() != ytv1.ClusterStateUpdating { - if qa.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { - return WaitingStatus(SyncStatusBlocked, qa.ytsaurusClient.GetName()), err - } + //if qa.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { + // return WaitingStatus(SyncStatusBlocked, qa.ytsaurusClient.GetName()), err + //} if !dry { ytClient = qa.ytsaurusClient.GetYtClient() @@ -298,7 +302,11 @@ func (qa *QueueAgent) prepareInitQueueAgentState() { container.EnvFrom = []corev1.EnvFromSource{qa.secret.GetEnvSource()} } -func (qa *QueueAgent) Status(ctx context.Context) ComponentStatus { +func (qa *QueueAgent) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (qa *QueueAgent) StatusOld(ctx context.Context) ComponentStatus { status, err := qa.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/rpcproxy.go b/pkg/components/rpcproxy.go index ca81a698..27946743 100644 --- a/pkg/components/rpcproxy.go +++ b/pkg/components/rpcproxy.go @@ -18,7 +18,7 @@ type RpcProxy struct { localServerComponent cfgen *ytconfig.Generator - master Component + //master Component serviceType *v1.ServiceType balancingService *resources.RPCService @@ -28,7 +28,7 @@ type RpcProxy struct { func NewRPCProxy( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - masterReconciler Component, + //masterReconciler Component, spec ytv1.RPCProxiesSpec) *RpcProxy { resource := ytsaurus.GetResource() l := labeller.Labeller{ @@ -76,10 +76,10 @@ func NewRPCProxy( return &RpcProxy{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: masterReconciler, - serviceType: spec.ServiceType, - balancingService: balancingService, - tlsSecret: tlsSecret, + //master: masterReconciler, + serviceType: spec.ServiceType, + balancingService: balancingService, + tlsSecret: tlsSecret, } } @@ -112,9 +112,9 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } } - if !IsRunningStatus(rp.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, rp.master.GetName()), err - } + //if !IsRunningStatus(rp.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, rp.master.GetName()), err + //} if rp.NeedSync() { if !dry { @@ -144,7 +144,11 @@ func (rp *RpcProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } -func (rp *RpcProxy) Status(ctx context.Context) ComponentStatus { +func (rp *RpcProxy) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (rp *RpcProxy) StatusOld(ctx context.Context) ComponentStatus { status, err := rp.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/scheduler.go b/pkg/components/scheduler.go index 6834324e..52d67156 100644 --- a/pkg/components/scheduler.go +++ b/pkg/components/scheduler.go @@ -21,20 +21,23 @@ import ( type Scheduler struct { localServerComponent - cfgen *ytconfig.Generator - master Component - execNodes []Component - tabletNodes []Component - initUser *InitJob - initOpArchive *InitJob - secret *resources.StringSecret + cfgen *ytconfig.Generator + //master Component + //execNodes []Component + tabletNodesCount int + initUser *InitJob + initOpArchive *InitJob + secret *resources.StringSecret } func NewScheduler( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - master Component, - execNodes, tabletNodes []Component) *Scheduler { + tabletNodesCount int, + // master Component, + // execNodes, + // tabletNodes []Component, +) *Scheduler { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -64,9 +67,10 @@ func NewScheduler( return &Scheduler{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: master, - execNodes: execNodes, - tabletNodes: tabletNodes, + tabletNodesCount: tabletNodesCount, + //master: master, + //execNodes: execNodes, + //tabletNodes: tabletNodes, initUser: NewInitJob( &l, ytsaurus.APIProxy(), @@ -107,7 +111,11 @@ func (s *Scheduler) Fetch(ctx context.Context) error { ) } -func (s *Scheduler) Status(ctx context.Context) ComponentStatus { +func (s *Scheduler) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (s *Scheduler) StatusOld(ctx context.Context) ComponentStatus { status, err := s.doSync(ctx, true) if err != nil { panic(err) @@ -150,18 +158,18 @@ func (s *Scheduler) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } } - if !IsRunningStatus(s.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, s.master.GetName()), err - } + //if !IsRunningStatus(s.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, s.master.GetName()), err + //} - if s.execNodes == nil || len(s.execNodes) > 0 { - for _, end := range s.execNodes { - if !IsRunningStatus(end.Status(ctx).SyncStatus) { - // It makes no sense to start scheduler without exec nodes. - return WaitingStatus(SyncStatusBlocked, end.GetName()), err - } - } - } + //if s.execNodes == nil || len(s.execNodes) > 0 { + // for _, end := range s.execNodes { + // if !IsRunningStatus(end.Status(ctx).SyncStatus) { + // // It makes no sense to start scheduler without exec nodes. + // return WaitingStatus(SyncStatusBlocked, end.GetName()), err + // } + // } + //} if s.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -203,12 +211,12 @@ func (s *Scheduler) initOpAchieve(ctx context.Context, dry bool) (ComponentStatu return status, err } - for _, tnd := range s.tabletNodes { - if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { - // Wait for tablet nodes to proceed with operations archive init. - return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err - } - } + //for _, tnd := range s.tabletNodes { + // if !IsRunningStatus(tnd.Status(ctx).SyncStatus) { + // // Wait for tablet nodes to proceed with operations archive init. + // return WaitingStatus(SyncStatusBlocked, tnd.GetName()), err + // } + //} if !dry { s.prepareInitOperationArchive() @@ -245,7 +253,7 @@ func (s *Scheduler) updateOpArchive(ctx context.Context, dry bool) (*ComponentSt } func (s *Scheduler) needOpArchiveInit() bool { - return s.tabletNodes != nil && len(s.tabletNodes) > 0 + return s.tabletNodesCount > 0 } func (s *Scheduler) setConditionNotNecessaryToUpdateOpArchive(ctx context.Context) { diff --git a/pkg/components/server.go b/pkg/components/server.go index e3eaf212..e1ac0140 100644 --- a/pkg/components/server.go +++ b/pkg/components/server.go @@ -35,6 +35,7 @@ type server interface { needSync() bool buildStatefulSet() *appsv1.StatefulSet rebuildStatefulSet() *appsv1.StatefulSet + hasDiff(ctx context.Context) (bool, error) } type serverImpl struct { @@ -221,6 +222,31 @@ func (s *serverImpl) needUpdate() bool { return needReload } +// hasDiff checks if any is different from the desired state. +// Currently, it only triggered if +// - any of sub-resources are missing, +// - sts have actual image +// - config needs reload +// +// In the future we want it to check more. +func (s *serverImpl) hasDiff(ctx context.Context) (bool, error) { + if !s.exists() { + return true, nil + } + if !s.arePodsReady(ctx) { + return true, nil + } + if !s.podsImageCorrespondsToSpec() { + return true, nil + } + + needReload, err := s.configHelper.NeedReload() + if err != nil { + return false, err + } + return needReload, nil +} + func (s *serverImpl) arePodsReady(ctx context.Context) bool { return s.statefulSet.ArePodsReady(ctx, s.instanceSpec.MinReadyInstanceCount) } diff --git a/pkg/components/strawberry_controller.go b/pkg/components/strawberry_controller.go index 7d6761d4..361454d9 100644 --- a/pkg/components/strawberry_controller.go +++ b/pkg/components/strawberry_controller.go @@ -24,9 +24,9 @@ type StrawberryController struct { initChytClusterJob *InitJob secret *resources.StringSecret - master Component - scheduler Component - dataNodes []Component + //master Component + //scheduler Component + //dataNodes []Component name string } @@ -44,9 +44,10 @@ const ChytInitClusterJobConfigFileName = "chyt-init-cluster.yson" func NewStrawberryController( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - master Component, - scheduler Component, - dataNodes []Component) *StrawberryController { + // master Component, + // scheduler Component, + // dataNodes []Component +) *StrawberryController { resource := ytsaurus.GetResource() image := resource.Spec.CoreImage @@ -113,10 +114,10 @@ func NewStrawberryController( l.GetSecretName(), &l, ytsaurus.APIProxy()), - name: name, - master: master, - scheduler: scheduler, - dataNodes: dataNodes, + name: name, + //master: master, + //scheduler: scheduler, + //dataNodes: dataNodes, } } @@ -238,19 +239,19 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS } } - if !IsRunningStatus(c.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, c.master.GetName()), err - } - - if !IsRunningStatus(c.scheduler.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, c.scheduler.GetName()), err - } - - for _, dataNode := range c.dataNodes { - if !IsRunningStatus(dataNode.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, dataNode.GetName()), err - } - } + //if !IsRunningStatus(c.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, c.master.GetName()), err + //} + // + //if !IsRunningStatus(c.scheduler.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, c.scheduler.GetName()), err + //} + // + //for _, dataNode := range c.dataNodes { + // if !IsRunningStatus(dataNode.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, dataNode.GetName()), err + // } + //} if c.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -289,7 +290,11 @@ func (c *StrawberryController) doSync(ctx context.Context, dry bool) (ComponentS return SimpleStatus(SyncStatusReady), err } -func (c *StrawberryController) Status(ctx context.Context) ComponentStatus { +func (c *StrawberryController) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (c *StrawberryController) StatusOld(ctx context.Context) ComponentStatus { status, err := c.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/suite_test.go b/pkg/components/suite_test.go index 1e2f4c97..a081eb40 100644 --- a/pkg/components/suite_test.go +++ b/pkg/components/suite_test.go @@ -51,10 +51,14 @@ func (fc *FakeComponent) Sync(ctx context.Context) error { return nil } -func (fc *FakeComponent) Status(ctx context.Context) ComponentStatus { +func (fc *FakeComponent) StatusOld(ctx context.Context) ComponentStatus { return fc.status } +func (fc *FakeComponent) Status(ctx context.Context) (ComponentStatus, error) { + return fc.status, nil +} + func (fc *FakeComponent) IsUpdating() bool { return false } @@ -109,6 +113,10 @@ func (fs *FakeServer) Sync(ctx context.Context) error { return nil } +func (fs *FakeServer) hasDiff(ctx context.Context) (bool, error) { + return false, nil +} + func (fs *FakeServer) buildStatefulSet() *appsv1.StatefulSet { return nil } diff --git a/pkg/components/tablet_node.go b/pkg/components/tablet_node.go index c576934c..476b2522 100644 --- a/pkg/components/tablet_node.go +++ b/pkg/components/tablet_node.go @@ -110,9 +110,9 @@ func (tn *TabletNode) doSync(ctx context.Context, dry bool) (ComponentStatus, er return SimpleStatus(SyncStatusReady), err } - if tn.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { - return WaitingStatus(SyncStatusBlocked, tn.ytsaurusClient.GetName()), err - } + //if tn.ytsaurusClient.Status(ctx).SyncStatus != SyncStatusReady { + // return WaitingStatus(SyncStatusBlocked, tn.ytsaurusClient.GetName()), err + //} ytClient := tn.ytsaurusClient.GetYtClient() @@ -213,7 +213,11 @@ func (tn *TabletNode) getBundleBootstrap(bundle string) *ytv1.BundleBootstrapSpe return nil } -func (tn *TabletNode) Status(ctx context.Context) ComponentStatus { +func (tn *TabletNode) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (tn *TabletNode) StatusOld(ctx context.Context) ComponentStatus { status, err := tn.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/tablet_node_test.go b/pkg/components/tablet_node_test.go index 8ff08c1a..b8cfb7bc 100644 --- a/pkg/components/tablet_node_test.go +++ b/pkg/components/tablet_node_test.go @@ -173,11 +173,11 @@ var _ = Describe("Tablet node test", func() { tabletNode := NewTabletNode(cfgen, ytsaurus, ytsaurusClient, ytsaurusSpec.Spec.TabletNodes[0], true) tabletNode.server = NewFakeServer() - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusBlocked)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusBlocked)) ytsaurusClient.SetStatus(SimpleStatus(SyncStatusReady)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) }) It("Tablet node Sync; pods are not ready", func() { @@ -190,11 +190,11 @@ var _ = Describe("Tablet node test", func() { fakeServer.podsReady = false tabletNode.server = fakeServer - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusBlocked)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusBlocked)) fakeServer.podsReady = true - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) }) It("Tablet node Sync; yt errors", func() { @@ -328,32 +328,32 @@ var _ = Describe("Tablet node test", func() { // Failed to check if there is //sys/tablet_cell_bundles/sys. err := tabletNode.Sync(context.Background()) Expect(err).Should(Equal(existsNetError)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) // Failed to create `sys` bundle. err = tabletNode.Sync(context.Background()) Expect(err).Should(Equal(createBundleNetError)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) // Failed to get @tablet_cell_count of the `sys` bundle. err = tabletNode.Sync(context.Background()) Expect(err).Should(Equal(getNetError)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) // Failed to create tablet_cell in the `sys` bundle. err = tabletNode.Sync(context.Background()) Expect(err).Should(Equal(createCellNetError)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) // Failed to get @tablet_cell_count of the `default` bundle. err = tabletNode.Sync(context.Background()) Expect(err).Should(Equal(getNetError)) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusPending)) // Then everything was successfully. err = tabletNode.Sync(context.Background()) Expect(err).Should(Succeed()) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) }) It("Tablet node Sync; success", func() { @@ -408,7 +408,7 @@ var _ = Describe("Tablet node test", func() { err := tabletNode.Sync(context.Background()) Expect(err).Should(Succeed()) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) }) It("Tablet node Sync; no initialization", func() { @@ -422,7 +422,7 @@ var _ = Describe("Tablet node test", func() { err := tabletNode.Sync(context.Background()) Expect(err).Should(Succeed()) - Expect(tabletNode.Status(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) + Expect(tabletNode.StatusOld(context.Background()).SyncStatus).Should(Equal(SyncStatusReady)) }) }) }) diff --git a/pkg/components/tcpproxy.go b/pkg/components/tcpproxy.go index 09030df2..8542aa0d 100644 --- a/pkg/components/tcpproxy.go +++ b/pkg/components/tcpproxy.go @@ -18,7 +18,7 @@ type TcpProxy struct { localServerComponent cfgen *ytconfig.Generator - master Component + //master Component serviceType *v1.ServiceType balancingService *resources.TCPService @@ -27,7 +27,7 @@ type TcpProxy struct { func NewTCPProxy( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - masterReconciler Component, + //masterReconciler Component, spec ytv1.TCPProxiesSpec) *TcpProxy { resource := ytsaurus.GetResource() l := labeller.Labeller{ @@ -68,9 +68,9 @@ func NewTCPProxy( return &TcpProxy{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: masterReconciler, - serviceType: spec.ServiceType, - balancingService: balancingService, + //master: masterReconciler, + serviceType: spec.ServiceType, + balancingService: balancingService, } } @@ -103,9 +103,9 @@ func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro } } - if !IsRunningStatus(tp.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, tp.master.GetName()), err - } + //if !IsRunningStatus(tp.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, tp.master.GetName()), err + //} if tp.NeedSync() { if !dry { @@ -129,7 +129,11 @@ func (tp *TcpProxy) doSync(ctx context.Context, dry bool) (ComponentStatus, erro return SimpleStatus(SyncStatusReady), err } -func (tp *TcpProxy) Status(ctx context.Context) ComponentStatus { +func (tp *TcpProxy) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (tp *TcpProxy) StatusOld(ctx context.Context) ComponentStatus { status, err := tp.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/ui.go b/pkg/components/ui.go index e38ce34b..0815e36d 100644 --- a/pkg/components/ui.go +++ b/pkg/components/ui.go @@ -22,14 +22,14 @@ type UI struct { cfgen *ytconfig.Generator microservice microservice initJob *InitJob - master Component - secret *resources.StringSecret + //master Component + secret *resources.StringSecret } const UIClustersConfigFileName = "clusters-config.json" const UICustomConfigFileName = "common.js" -func NewUI(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *UI { +func NewUI(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus) *UI { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -81,7 +81,7 @@ func NewUI(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Compon l.GetSecretName(), &l, ytsaurus.APIProxy()), - master: master, + //master: master, } } @@ -254,9 +254,9 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { } } - if !IsRunningStatus(u.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, u.master.GetName()), err - } + //if !IsRunningStatus(u.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, u.master.GetName()), err + //} if u.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -293,7 +293,11 @@ func (u *UI) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { return SimpleStatus(SyncStatusReady), err } -func (u *UI) Status(ctx context.Context) ComponentStatus { +func (u *UI) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (u *UI) StatusOld(ctx context.Context) ComponentStatus { status, err := u.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/yql_agent.go b/pkg/components/yql_agent.go index ff3909a1..1f1ceb7c 100644 --- a/pkg/components/yql_agent.go +++ b/pkg/components/yql_agent.go @@ -20,13 +20,17 @@ import ( type YqlAgent struct { localServerComponent - cfgen *ytconfig.Generator - master Component + cfgen *ytconfig.Generator + //master Component initEnvironment *InitJob secret *resources.StringSecret } -func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master Component) *YqlAgent { +func NewYQLAgent( + cfgen *ytconfig.Generator, + ytsaurus *apiproxy.Ytsaurus, + // master Component, +) *YqlAgent { resource := ytsaurus.GetResource() l := labeller.Labeller{ ObjectMeta: &resource.ObjectMeta, @@ -56,7 +60,7 @@ func NewYQLAgent(cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, master return &YqlAgent{ localServerComponent: newLocalServerComponent(&l, ytsaurus, srv), cfgen: cfgen, - master: master, + //master: master, initEnvironment: NewInitJob( &l, ytsaurus.APIProxy(), @@ -133,9 +137,9 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er } } - if !IsRunningStatus(yqla.master.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, yqla.master.GetName()), err - } + //if !IsRunningStatus(yqla.master.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, yqla.master.GetName()), err + //} if yqla.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -176,7 +180,11 @@ func (yqla *YqlAgent) doSync(ctx context.Context, dry bool) (ComponentStatus, er return yqla.initEnvironment.Sync(ctx, dry) } -func (yqla *YqlAgent) Status(ctx context.Context) ComponentStatus { +func (yqla *YqlAgent) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (yqla *YqlAgent) StatusOld(ctx context.Context) ComponentStatus { status, err := yqla.doSync(ctx, true) if err != nil { panic(err) diff --git a/pkg/components/ytsaurus_client.go b/pkg/components/ytsaurus_client.go index 6666fc20..9553a09b 100644 --- a/pkg/components/ytsaurus_client.go +++ b/pkg/components/ytsaurus_client.go @@ -28,8 +28,8 @@ type internalYtsaurusClient interface { type YtsaurusClient struct { localComponent - cfgen *ytconfig.Generator - httpProxy Component + cfgen *ytconfig.Generator + //httpProxy HttpProxy initUserJob *InitJob @@ -40,7 +40,7 @@ type YtsaurusClient struct { func NewYtsaurusClient( cfgen *ytconfig.Generator, ytsaurus *apiproxy.Ytsaurus, - httpProxy Component, + // httpProxy HttpProxy, ) *YtsaurusClient { resource := ytsaurus.GetResource() l := labeller.Labeller{ @@ -54,7 +54,7 @@ func NewYtsaurusClient( return &YtsaurusClient{ localComponent: newLocalComponent(&l, ytsaurus), cfgen: cfgen, - httpProxy: httpProxy, + //httpProxy: httpProxy, initUserJob: NewInitJob( &l, ytsaurus.APIProxy(), @@ -81,7 +81,7 @@ func (yc *YtsaurusClient) Fetch(ctx context.Context) error { return resources.Fetch(ctx, yc.secret, yc.initUserJob, - yc.httpProxy, + //yc.httpProxy, ) } @@ -338,9 +338,9 @@ func (yc *YtsaurusClient) handleUpdatingState(ctx context.Context) (ComponentSta func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus, error) { var err error - if !IsRunningStatus(yc.httpProxy.Status(ctx).SyncStatus) { - return WaitingStatus(SyncStatusBlocked, yc.httpProxy.GetName()), err - } + //if !IsRunningStatus(yc.httpProxy.Status(ctx).SyncStatus) { + // return WaitingStatus(SyncStatusBlocked, yc.httpProxy.GetName()), err + //} if yc.secret.NeedSync(consts.TokenSecretKey, "") { if !dry { @@ -395,7 +395,11 @@ func (yc *YtsaurusClient) doSync(ctx context.Context, dry bool) (ComponentStatus return SimpleStatus(SyncStatusReady), err } -func (yc *YtsaurusClient) Status(ctx context.Context) ComponentStatus { +func (yc *YtsaurusClient) Status(ctx context.Context) (ComponentStatus, error) { + return ComponentStatus{}, nil +} + +func (yc *YtsaurusClient) StatusOld(ctx context.Context) ComponentStatus { status, err := yc.doSync(ctx, true) if err != nil { panic(err) @@ -495,6 +499,8 @@ func (yc *YtsaurusClient) HandlePossibilityCheck(ctx context.Context) (ok bool, return true, "Update is possible", nil } +// TODO (l0kix2): move code to the relevant components. + // Safe mode actions. func (yc *YtsaurusClient) EnableSafeMode(ctx context.Context) error {