Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Okhlopkov <[email protected]>
  • Loading branch information
Pavel Okhlopkov committed Dec 13, 2024
1 parent bf2dcd1 commit d2c4cd4
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 1,020 deletions.
4 changes: 2 additions & 2 deletions pkg/module_manager/go_hook/go_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ type GoHook interface {
type HookConfigLoader interface {
LoadAndValidate() (*config.HookConfig, error)
LoadOnStartup() (*float64, error)
LoadBeforeAll() (*float64, error)
LoadAfterAll() (*float64, error)
LoadBeforeAll(kind string) (*float64, error)
LoadAfterAll(kind string) (*float64, error)
LoadAfterDeleteHelm() (*float64, error)
}

Expand Down
339 changes: 2 additions & 337 deletions pkg/module_manager/models/hooks/global_hook_config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,14 @@
package hooks

import (
"errors"
"fmt"

"github.com/davecgh/go-spew/spew"
sdkhook "github.com/deckhouse/module-sdk/pkg/hook"
"github.com/go-openapi/spec"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

. "github.com/flant/addon-operator/pkg/hook/types"
gohook "github.com/flant/addon-operator/pkg/module_manager/go_hook"
"github.com/flant/shell-operator/pkg/hook/config"
. "github.com/flant/shell-operator/pkg/hook/types"
kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager"
eventtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types"
schdulertypes "github.com/flant/shell-operator/pkg/schedule_manager/types"
)

const (
defaultHookGroupName = "main"
defaultHookQueueName = "main"
)

// GlobalHookConfig is a structure with versioned hook configuration
Expand Down Expand Up @@ -84,66 +71,6 @@ func getGlobalHookConfigSchema(version string) *spec.Schema {
return config.GetSchema(globalHookVersion)
}

// LoadAndValidateShellConfig loads shell hook config from bytes and validate it. Returns multierror.
func (c *GlobalHookConfig) LoadAndValidateShellConfig(data []byte) error {
vu := config.NewDefaultVersionedUntyped()
err := vu.Load(data)
if err != nil {
return err
}

err = config.ValidateConfig(vu.Obj, getGlobalHookConfigSchema(vu.Version), "")
if err != nil {
return err
}

c.Version = vu.Version

err = c.HookConfig.ConvertAndCheck(data)
if err != nil {
return err
}

err = c.ConvertAndCheck(data)
if err != nil {
return err
}

return nil
}

func (c *GlobalHookConfig) LoadAndValidateBatchConfig(hcfg *sdkhook.HookConfig) error {
c.Version = hcfg.ConfigVersion

hcv1 := remapHookConfigV1FromHookConfig(hcfg)

err := hcv1.ConvertAndCheck(&c.HookConfig)
if err != nil {
return fmt.Errorf("convert and check from hook config v1: %w", err)
}

if hcfg.OnStartup != nil {
c.OnStartup = &OnStartupConfig{}
c.OnStartup.AllowFailure = false
c.OnStartup.BindingName = string(OnStartup)
c.OnStartup.Order = float64(*hcfg.OnStartup)
}

if hcfg.OnBeforeHelm != nil {
c.BeforeAll = &BeforeAllConfig{}
c.BeforeAll.BindingName = string(BeforeAll)
c.BeforeAll.Order = float64(*hcfg.OnBeforeHelm)
}

if hcfg.OnAfterHelm != nil {
c.AfterAll = &AfterAllConfig{}
c.AfterAll.BindingName = string(AfterAll)
c.AfterAll.Order = float64(*hcfg.OnAfterHelm)
}

return nil
}

func (c *GlobalHookConfig) LoadAndValidateConfig(configLoader gohook.HookConfigLoader) error {
hookConfig, err := configLoader.LoadAndValidate()
if err != nil {
Expand All @@ -164,7 +91,7 @@ func (c *GlobalHookConfig) LoadAndValidateConfig(configLoader gohook.HookConfigL
c.OnStartup.Order = *onStartup
}

beforeAll, err := configLoader.LoadBeforeAll()
beforeAll, err := configLoader.LoadBeforeAll(string(BeforeAll))
if err != nil {
return fmt.Errorf("load before all: %w", err)
}
Expand All @@ -175,7 +102,7 @@ func (c *GlobalHookConfig) LoadAndValidateConfig(configLoader gohook.HookConfigL
c.BeforeAll.Order = *beforeAll
}

afterAll, err := configLoader.LoadAfterAll()
afterAll, err := configLoader.LoadAfterAll(string(AfterAll))
if err != nil {
return fmt.Errorf("load after all: %w", err)
}
Expand All @@ -189,113 +116,6 @@ func (c *GlobalHookConfig) LoadAndValidateConfig(configLoader gohook.HookConfigL
return nil
}

func (c *GlobalHookConfig) LoadAndValidateGoConfig(input *gohook.HookConfig) error {
hookConfig, err := newHookConfigFromGoConfig(input)
if err != nil {
return err
}

c.HookConfig = hookConfig

if input.OnBeforeAll != nil {
c.BeforeAll = &BeforeAllConfig{}
c.BeforeAll.BindingName = string(BeforeAll)
c.BeforeAll.Order = input.OnBeforeAll.Order
}

if input.OnAfterAll != nil {
c.AfterAll = &AfterAllConfig{}
c.AfterAll.BindingName = string(AfterAll)
c.AfterAll.Order = input.OnAfterAll.Order
}

return nil
}

func (c *GlobalHookConfig) ConvertAndCheck(data []byte) error {
switch c.Version {
case "v0":
configV0 := &GlobalHookConfigV0{}
err := yaml.Unmarshal(data, configV0)
if err != nil {
return fmt.Errorf("unmarshal GlobalHookConfig version 0: %s", err)
}
c.GlobalV0 = configV0
err = c.ConvertAndCheckV0()
if err != nil {
return err
}
case "v1":
configV1 := &GlobalHookConfigV0{}
err := yaml.Unmarshal(data, configV1)
if err != nil {
return fmt.Errorf("unmarshal GlobalHookConfig v1: %s", err)
}
c.GlobalV1 = configV1
err = c.ConvertAndCheckV1()
if err != nil {
return err
}
default:
// NOTE: this should not happen
return fmt.Errorf("version '%s' is unsupported", c.Version)
}

return nil
}

func (c *GlobalHookConfig) ConvertAndCheckV0() (err error) {
c.BeforeAll, err = c.ConvertBeforeAll(c.GlobalV0.BeforeAll)
if err != nil {
return err
}

c.AfterAll, err = c.ConvertAfterAll(c.GlobalV0.AfterAll)
if err != nil {
return err
}

return nil
}

func (c *GlobalHookConfig) ConvertAndCheckV1() (err error) {
c.BeforeAll, err = c.ConvertBeforeAll(c.GlobalV1.BeforeAll)
if err != nil {
return err
}

c.AfterAll, err = c.ConvertAfterAll(c.GlobalV1.AfterAll)
if err != nil {
return err
}

return nil
}

func (c *GlobalHookConfig) ConvertBeforeAll(value interface{}) (*BeforeAllConfig, error) {
floatValue, err := config.ConvertFloatForBinding(value, "beforeAll")
if err != nil || floatValue == nil {
return nil, err
}

res := &BeforeAllConfig{}
res.BindingName = string(BeforeAll)
res.Order = *floatValue
return res, nil
}

func (c *GlobalHookConfig) ConvertAfterAll(value interface{}) (*AfterAllConfig, error) {
floatValue, err := config.ConvertFloatForBinding(value, "afterAll")
if err != nil || floatValue == nil {
return nil, err
}

res := &AfterAllConfig{}
res.BindingName = string(AfterAll)
res.Order = *floatValue
return res, nil
}

func (c *GlobalHookConfig) Bindings() []BindingType {
res := make([]BindingType, 0)

Expand Down Expand Up @@ -338,158 +158,3 @@ func (c *GlobalHookConfig) BindingsCount() int {
}
return res
}

func newHookConfigFromGoConfig(input *gohook.HookConfig) (config.HookConfig, error) {
c := config.HookConfig{
Version: "v1",
Schedules: []ScheduleConfig{},
OnKubernetesEvents: []OnKubernetesEventConfig{},
}

if input.Settings != nil {
c.Settings = &Settings{
ExecutionMinInterval: input.Settings.ExecutionMinInterval,
ExecutionBurst: input.Settings.ExecutionBurst,
}
}

if input.OnStartup != nil {
c.OnStartup = &OnStartupConfig{}
c.OnStartup.BindingName = string(OnStartup)
c.OnStartup.Order = input.OnStartup.Order
}

/*** A HUGE copy paste from shell-operator’s hook_config.ConvertAndCheckV1 ***/
// WARNING no checks and defaults!
for i, kubeCfg := range input.Kubernetes {
// err := c.CheckOnKubernetesEventV1(kubeCfg, fmt.Sprintf("kubernetes[%d]", i))
// if err != nil {
// return fmt.Errorf("invalid kubernetes config [%d]: %v", i, err)
//}

monitor := &kubeeventsmanager.MonitorConfig{}
monitor.Metadata.DebugName = config.MonitorDebugName(kubeCfg.Name, i)
monitor.Metadata.MonitorId = config.MonitorConfigID()
monitor.Metadata.LogLabels = map[string]string{}
monitor.Metadata.MetricLabels = map[string]string{}
// monitor.WithMode(kubeCfg.Mode)
monitor.ApiVersion = kubeCfg.ApiVersion
monitor.Kind = kubeCfg.Kind
monitor.WithNameSelector(kubeCfg.NameSelector)
monitor.WithFieldSelector(kubeCfg.FieldSelector)
monitor.WithNamespaceSelector(kubeCfg.NamespaceSelector)
monitor.WithLabelSelector(kubeCfg.LabelSelector)
if kubeCfg.FilterFunc == nil {
return config.HookConfig{}, errors.New(`"FilterFunc" in KubernetesConfig cannot be nil`)
}
filterFunc := kubeCfg.FilterFunc
monitor.FilterFunc = func(obj *unstructured.Unstructured) (interface{}, error) {
return filterFunc(obj)
}
if gohook.BoolDeref(kubeCfg.ExecuteHookOnEvents, true) {
monitor.WithEventTypes(nil)
} else {
monitor.WithEventTypes([]eventtypes.WatchEventType{})
}

kubeConfig := OnKubernetesEventConfig{}
kubeConfig.Monitor = monitor
kubeConfig.AllowFailure = input.AllowFailure
if kubeCfg.Name == "" {
return c, spew.Errorf(`"name" is a required field in binding: %v`, kubeCfg)
}
kubeConfig.BindingName = kubeCfg.Name
if input.Queue == "" {
kubeConfig.Queue = defaultHookQueueName
} else {
kubeConfig.Queue = input.Queue
}
kubeConfig.Group = defaultHookGroupName

kubeConfig.ExecuteHookOnSynchronization = gohook.BoolDeref(kubeCfg.ExecuteHookOnSynchronization, true)
kubeConfig.WaitForSynchronization = gohook.BoolDeref(kubeCfg.WaitForSynchronization, true)

kubeConfig.KeepFullObjectsInMemory = false
kubeConfig.Monitor.KeepFullObjectsInMemory = false

c.OnKubernetesEvents = append(c.OnKubernetesEvents, kubeConfig)
}

// for i, kubeCfg := range c.V1.OnKubernetesEvent {
// if len(kubeCfg.IncludeSnapshotsFrom) > 0 {
// err := c.CheckIncludeSnapshots(kubeCfg.IncludeSnapshotsFrom...)
// if err != nil {
// return fmt.Errorf("invalid kubernetes config [%d]: includeSnapshots %v", i, err)
// }
// }
//}

// schedule bindings with includeSnapshotsFrom
// are depend on kubernetes bindings.
c.Schedules = []ScheduleConfig{}
for _, inSch := range input.Schedule {
// err := c.CheckScheduleV1(rawSchedule)
// if err != nil {
// return fmt.Errorf("invalid schedule config [%d]: %v", i, err)
//}

res := ScheduleConfig{}

if inSch.Name == "" {
return c, spew.Errorf(`"name" is a required field in binding: %v`, inSch)
}
res.BindingName = inSch.Name

res.AllowFailure = input.AllowFailure
res.ScheduleEntry = schdulertypes.ScheduleEntry{
Crontab: inSch.Crontab,
Id: config.ScheduleID(),
}

if input.Queue == "" {
res.Queue = "main"
} else {
res.Queue = input.Queue
}
res.Group = "main"

// schedule, err := c.ConvertScheduleV1(rawSchedule)
// if err != nil {
// return err
//}
c.Schedules = append(c.Schedules, res)
}

// Update IncludeSnapshotsFrom for every binding with a group.
// Merge binding's IncludeSnapshotsFrom with snapshots list calculated for group.
groupSnapshots := make(map[string][]string)
for _, kubeCfg := range c.OnKubernetesEvents {
if kubeCfg.Group == "" {
continue
}
if _, ok := groupSnapshots[kubeCfg.Group]; !ok {
groupSnapshots[kubeCfg.Group] = make([]string, 0)
}
groupSnapshots[kubeCfg.Group] = append(groupSnapshots[kubeCfg.Group], kubeCfg.BindingName)
}
newKubeEvents := make([]OnKubernetesEventConfig, 0)
for _, cfg := range c.OnKubernetesEvents {
if snapshots, ok := groupSnapshots[cfg.Group]; ok {
cfg.IncludeSnapshotsFrom = config.MergeArrays(cfg.IncludeSnapshotsFrom, snapshots)
}
newKubeEvents = append(newKubeEvents, cfg)
}
c.OnKubernetesEvents = newKubeEvents
newSchedules := make([]ScheduleConfig, 0)
for _, cfg := range c.Schedules {
if snapshots, ok := groupSnapshots[cfg.Group]; ok {
cfg.IncludeSnapshotsFrom = config.MergeArrays(cfg.IncludeSnapshotsFrom, snapshots)
}
newSchedules = append(newSchedules, cfg)
}
c.Schedules = newSchedules

/*** END Copy Paste ***/

return c, nil
}
Loading

0 comments on commit d2c4cd4

Please sign in to comment.