Skip to content

Commit

Permalink
Incapsulate flows in components
Browse files Browse the repository at this point in the history
  • Loading branch information
l0kix2 committed Mar 21, 2024
1 parent 2e64177 commit 4f14eaa
Show file tree
Hide file tree
Showing 32 changed files with 1,574 additions and 236 deletions.
48 changes: 32 additions & 16 deletions controllers/component_manager.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"time"
Expand Down Expand Up @@ -40,18 +42,23 @@ func NewComponentManager(
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

d := components.NewDiscovery(cfgen, ytsaurus)
m := components.NewMaster(cfgen, ytsaurus)
//m := components.NewMaster(cfgen, ytsaurus)
var hps []components.Component
for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
//hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, m, hpSpec))
hps = append(hps, components.NewHTTPProxy(cfgen, ytsaurus, hpSpec))
}
yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0])
//yc := components.NewYtsaurusClient(cfgen, ytsaurus, hps[0])
yc := components.NewYtsaurusClient(cfgen, ytsaurus)

m := components.NewMaster(cfgen, ytsaurus, yc)

var dnds []components.Component
nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec))
//dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, m, dndSpec))
dnds = append(dnds, components.NewDataNode(nodeCfgGen, ytsaurus, dndSpec))
}
}

Expand All @@ -64,30 +71,34 @@ func NewComponentManager(
allComponents = append(allComponents, hps...)

if resource.Spec.UI != nil {
ui := components.NewUI(cfgen, ytsaurus, m)
//ui := components.NewUI(cfgen, ytsaurus, m)
ui := components.NewUI(cfgen, ytsaurus)
allComponents = append(allComponents, ui)
}

if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 {
var rps []components.Component
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec))
//rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, m, rpSpec))
rps = append(rps, components.NewRPCProxy(cfgen, ytsaurus, rpSpec))
}
allComponents = append(allComponents, rps...)
}

if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 {
var tps []components.Component
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec))
//tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, m, tpSpec))
tps = append(tps, components.NewTCPProxy(cfgen, ytsaurus, tpSpec))
}
allComponents = append(allComponents, tps...)
}

var ends []components.Component
if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec))
//ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, m, endSpec))
ends = append(ends, components.NewExecNode(nodeCfgGen, ytsaurus, endSpec))
}
}
allComponents = append(allComponents, ends...)
Expand All @@ -101,33 +112,38 @@ func NewComponentManager(
allComponents = append(allComponents, tnds...)

if resource.Spec.Schedulers != nil {
s = components.NewScheduler(cfgen, ytsaurus, m, ends, tnds)
//s = components.NewScheduler(cfgen, ytsaurus, m, ends, tnds)
s = components.NewScheduler(cfgen, ytsaurus, len(tnds))
allComponents = append(allComponents, s)
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus, m)
//ca := components.NewControllerAgent(cfgen, ytsaurus, m)
ca := components.NewControllerAgent(cfgen, ytsaurus)
allComponents = append(allComponents, ca)
}

var q components.Component
if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
q = components.NewQueryTracker(cfgen, ytsaurus, yc, tnds)
q = components.NewQueryTracker(cfgen, ytsaurus, yc, len(tnds))
allComponents = append(allComponents, q)
}

if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, m, tnds)
//qa := components.NewQueueAgent(cfgen, ytsaurus, yc, m, tnds)
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, len(tnds))
allComponents = append(allComponents, qa)
}

if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus, m)
//yqla := components.NewYQLAgent(cfgen, ytsaurus, m)
yqla := components.NewYQLAgent(cfgen, ytsaurus)
allComponents = append(allComponents, yqla)
}

if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil {
strawberry := components.NewStrawberryController(cfgen, ytsaurus, m, s, dnds)
//strawberry := components.NewStrawberryController(cfgen, ytsaurus, m, s, dnds)
strawberry := components.NewStrawberryController(cfgen, ytsaurus)
allComponents = append(allComponents, strawberry)
}

Expand All @@ -153,7 +169,7 @@ func NewComponentManager(
return nil, err
}

componentStatus := c.Status(ctx)
componentStatus := c.StatusOld(ctx)
c.SetReadyCondition(componentStatus)
syncStatus := componentStatus.SyncStatus

Expand Down Expand Up @@ -205,7 +221,7 @@ func (cm *ComponentManager) Sync(ctx context.Context) (ctrl.Result, error) {

hasPending := false
for _, c := range cm.allComponents {
status := c.Status(ctx)
status := c.StatusOld(ctx)

if status.SyncStatus == components.SyncStatusPending ||
status.SyncStatus == components.SyncStatusUpdating {
Expand Down
150 changes: 150 additions & 0 deletions controllers/component_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package controllers

import (
"context"

apiProxy "github.com/ytsaurus/yt-k8s-operator/pkg/apiproxy"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
"github.com/ytsaurus/yt-k8s-operator/pkg/ytconfig"
)

type component interface {
//Fetch(ctx context.Context) error
Sync(ctx context.Context) error
Status(ctx context.Context) (components.ComponentStatus, error)
GetName() string
GetType() consts.ComponentType
}

type componentRegistry struct {
comps map[string]component
byType map[consts.ComponentType][]component
}

func (cr *componentRegistry) add(comp component) {
cr.comps[comp.GetName()] = comp
compsOfSameType := cr.byType[comp.GetType()]
compsOfSameType = append(compsOfSameType, comp)

Check failure on line 28 in controllers/component_registry.go

View workflow job for this annotation

GitHub Actions / Run checks / Run checks

ineffectual assignment to compsOfSameType (ineffassign)
}

func (cr *componentRegistry) list() []component {
var result []component
for _, comp := range cr.comps {
result = append(result, comp)
}
return result
}
func (cr *componentRegistry) listByType(types ...consts.ComponentType) []component {
var result []component
for _, compType := range types {
result = append(result, cr.byType[compType]...)
}
return result
}

func buildComponentRegistry(
ytsaurus *apiProxy.Ytsaurus,
) *componentRegistry {
registry := &componentRegistry{
comps: make(map[string]component),
}

resource := ytsaurus.GetResource()
clusterDomain := getClusterDomain(ytsaurus.APIProxy().Client())
cfgen := ytconfig.NewGenerator(resource, clusterDomain)

yc := components.NewYtsaurusClient(cfgen, ytsaurus)
registry.add(yc)

d := components.NewDiscovery(cfgen, ytsaurus)
registry.add(d)

m := components.NewMaster(cfgen, ytsaurus, yc)
registry.add(m)

for _, hpSpec := range ytsaurus.GetResource().Spec.HTTPProxies {
hp := components.NewHTTPProxy(cfgen, ytsaurus, hpSpec)
registry.add(hp)
}

nodeCfgGen := ytconfig.NewLocalNodeGenerator(ytsaurus.GetResource(), clusterDomain)
if resource.Spec.DataNodes != nil && len(resource.Spec.DataNodes) > 0 {
for _, dndSpec := range ytsaurus.GetResource().Spec.DataNodes {
dnd := components.NewDataNode(nodeCfgGen, ytsaurus, dndSpec)
registry.add(dnd)
}
}

if resource.Spec.UI != nil {
ui := components.NewUI(cfgen, ytsaurus)
registry.add(ui)
}

if resource.Spec.RPCProxies != nil && len(resource.Spec.RPCProxies) > 0 {
for _, rpSpec := range ytsaurus.GetResource().Spec.RPCProxies {
rp := components.NewRPCProxy(cfgen, ytsaurus, rpSpec)
registry.add(rp)
}
}

if resource.Spec.TCPProxies != nil && len(resource.Spec.TCPProxies) > 0 {
for _, tpSpec := range ytsaurus.GetResource().Spec.TCPProxies {
tp := components.NewTCPProxy(cfgen, ytsaurus, tpSpec)
registry.add(tp)
}
}

if resource.Spec.ExecNodes != nil && len(resource.Spec.ExecNodes) > 0 {
for _, endSpec := range ytsaurus.GetResource().Spec.ExecNodes {
end := components.NewExecNode(nodeCfgGen, ytsaurus, endSpec)
registry.add(end)
}
}

tndCount := 0
if resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
for idx, tndSpec := range ytsaurus.GetResource().Spec.TabletNodes {
tnd := components.NewTabletNode(nodeCfgGen, ytsaurus, yc, tndSpec, idx == 0)
registry.add(tnd)
tndCount++
}
}
if resource.Spec.Schedulers != nil {
s := components.NewScheduler(cfgen, ytsaurus, tndCount)
registry.add(s)
}

if resource.Spec.ControllerAgents != nil {
ca := components.NewControllerAgent(cfgen, ytsaurus)
registry.add(ca)
}

var q component
if resource.Spec.QueryTrackers != nil && resource.Spec.Schedulers != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
q = components.NewQueryTracker(cfgen, ytsaurus, yc, tndCount)
registry.add(q)
}

if resource.Spec.QueueAgents != nil && resource.Spec.TabletNodes != nil && len(resource.Spec.TabletNodes) > 0 {
qa := components.NewQueueAgent(cfgen, ytsaurus, yc, tndCount)
registry.add(qa)
}

if resource.Spec.YQLAgents != nil {
yqla := components.NewYQLAgent(cfgen, ytsaurus)
registry.add(yqla)
}

if (resource.Spec.DeprecatedChytController != nil || resource.Spec.StrawberryController != nil) && resource.Spec.Schedulers != nil {
strawberry := components.NewStrawberryController(cfgen, ytsaurus)
registry.add(strawberry)
}

if resource.Spec.MasterCaches != nil {
mc := components.NewMasterCache(cfgen, ytsaurus)
registry.add(mc)
}

return registry
}
65 changes: 65 additions & 0 deletions controllers/helpers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package controllers

import (
"context"
"fmt"
"net"
"os"
"strings"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

ytv1 "github.com/ytsaurus/yt-k8s-operator/api/v1"
"github.com/ytsaurus/yt-k8s-operator/pkg/components"
"github.com/ytsaurus/yt-k8s-operator/pkg/consts"
)

const (
Expand All @@ -30,3 +37,61 @@ func getClusterDomain(client client.Client) string {

return clusterDomain
}

func logComponentStatuses(
ctx context.Context,
registry *componentRegistry,
statuses map[string]components.ComponentStatus,
componentsOrder [][]consts.ComponentType,
resource *ytv1.Ytsaurus,
) {
logger := log.FromContext(ctx)

var readyComponents []string
var notReadyComponents []string

for batchIndex, typesInBatch := range componentsOrder {
compsInBatch := registry.listByType(typesInBatch...)
for _, comp := range compsInBatch {
name := comp.GetName()
status := statuses[name]

if status.SyncStatus == components.SyncStatusReady {
readyComponents = append(readyComponents, name)
} else {
notReadyComponents = append(notReadyComponents, name)
}

logger.V(1).Info(
fmt.Sprintf(
"%d.%s %s: %s",
batchIndex,
statusToSymbol(status.SyncStatus),
name,
status.Message,
),
)
}
}

// NB: This log is mentioned at https://ytsaurus.tech/docs/ru/admin-guide/install-ytsaurus
logger.Info("Ytsaurus sync status",
"notReadyComponents", notReadyComponents,
"readyComponents", readyComponents,
"updateState", resource.Status.UpdateStatus.State,
"clusterState", resource.Status.State)

}

func statusToSymbol(st components.SyncStatus) string {
switch st {
case components.SyncStatusReady:
return "[v]"
case components.SyncStatusBlocked:
return "[x]"
case components.SyncStatusUpdating:
return "[.]"
default:
return "[ ]"
}
}
4 changes: 3 additions & 1 deletion controllers/sync.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

// TODO: file will be deleted after this refactoring. No need to review changes.

import (
"context"
"time"
Expand Down Expand Up @@ -243,7 +245,7 @@ func getComponentNames(components []components.Component) []string {
return names
}

func (r *YtsaurusReconciler) Sync(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
func (r *YtsaurusReconciler) SyncOld(ctx context.Context, resource *ytv1.Ytsaurus) (ctrl.Result, error) {
logger := log.FromContext(ctx)

if !resource.Spec.IsManaged {
Expand Down
Loading

0 comments on commit 4f14eaa

Please sign in to comment.