Skip to content

Commit

Permalink
Added unit and integration test for node config propagation (#279)
Browse files Browse the repository at this point in the history
* Added unit and integration test for node config propagation

* Bug fixes for propagation logic
  • Loading branch information
berkayoz authored Apr 2, 2024
1 parent 2ef77f6 commit c768ab2
Show file tree
Hide file tree
Showing 20 changed files with 473 additions and 111 deletions.
1 change: 0 additions & 1 deletion src/k8s/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/canonical/go-dqlite v1.21.0
github.com/canonical/lxd v0.0.0-20231002162033-38796399c135
github.com/canonical/microcluster v0.0.0-20240122235408-1525f8ea8d7a
github.com/mitchellh/mapstructure v1.5.0
github.com/moby/sys/mountinfo v0.7.1
github.com/onsi/gomega v1.30.0
github.com/pelletier/go-toml v1.9.5
Expand Down
2 changes: 0 additions & 2 deletions src/k8s/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,6 @@ github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0Qu
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/component/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func ReconcileDNSComponent(ctx context.Context, s snap.Snap, alreadyEnabled *boo
if err != nil {
return "", "", fmt.Errorf("failed to disable dns: %w", err)
}
return "", "", nil
return clusterConfig.Kubelet.ClusterDNS, clusterConfig.Kubelet.ClusterDomain, nil
}
return "", "", nil
}
23 changes: 19 additions & 4 deletions src/k8s/pkg/component/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
)
Expand Down Expand Up @@ -43,12 +44,26 @@ func UpdateGatewayComponent(ctx context.Context, s snap.Snap, isRefresh bool) er
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
// There is a race condition where the cilium resources can change
// while we try to restart them, which fails with:
// the object has been modified; please apply your changes to the latest version and try again
attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}

if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
if err := control.RetryFor(attempts, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium daemonset after %d attempts: %w", attempts, err)
}

return nil
Expand Down
21 changes: 17 additions & 4 deletions src/k8s/pkg/component/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
)
Expand Down Expand Up @@ -35,11 +36,23 @@ func UpdateIngressComponent(ctx context.Context, s snap.Snap, isRefresh bool, de
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)

if err := control.RetryFor(attempts, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium daemonset after %d attempts: %w", attempts, err)
}

return nil
Expand Down
20 changes: 16 additions & 4 deletions src/k8s/pkg/component/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,23 @@ func UpdateLoadBalancerComponent(ctx context.Context, s snap.Snap, isRefresh boo
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
attempts := 3
if err := control.RetryFor(attempts, func() error {
if err := client.RestartDeployment(ctx, "cilium-operator", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium-operator deployment after %d attempts: %w", attempts, err)
}
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)

if err := control.RetryFor(attempts, func() error {
if err := client.RestartDaemonset(ctx, "cilium", "kube-system"); err != nil {
return fmt.Errorf("failed to restart cilium daemonset: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed to restart cilium daemonset after %d attempts: %w", attempts, err)
}

return nil
Expand Down
7 changes: 2 additions & 5 deletions src/k8s/pkg/k8sd/api/cluster_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"

apiv1 "github.com/canonical/k8s/api/v1"
"github.com/canonical/k8s/pkg/utils"
Expand Down Expand Up @@ -39,11 +38,9 @@ func postClusterBootstrap(m *microcluster.MicroCluster, s *state.State, r *http.
return response.BadRequest(fmt.Errorf("cluster is already bootstrapped"))
}

// Set timeout
timeout := utils.TimeoutFromCtx(s.Context, 30*time.Second)

// Bootstrap the cluster
if err := m.NewCluster(hostname, req.Address, config, timeout); err != nil {
// Timeout 0 should leave the timeout to context via the m.ctx
if err := m.NewCluster(hostname, req.Address, config, 0); err != nil {
// TODO move node cleanup here
return response.BadRequest(fmt.Errorf("failed to bootstrap new cluster: %w", err))
}
Expand Down
29 changes: 13 additions & 16 deletions src/k8s/pkg/k8sd/api/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"

api "github.com/canonical/k8s/api/v1"
"github.com/mitchellh/mapstructure"

"github.com/canonical/k8s/pkg/component"
"github.com/canonical/k8s/pkg/k8sd/database"
Expand Down Expand Up @@ -152,8 +151,9 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
}
}

var dnsIP = newConfig.Kubelet.ClusterDNS
if req.Config.DNS != nil {
dnsIP, _, err := component.ReconcileDNSComponent(r.Context(), snap, oldConfig.DNS.Enabled, req.Config.DNS.Enabled, newConfig)
dnsIP, _, err = component.ReconcileDNSComponent(r.Context(), snap, oldConfig.DNS.Enabled, req.Config.DNS.Enabled, newConfig)
if err != nil {
return response.InternalError(fmt.Errorf("failed to reconcile dns: %w", err))
}
Expand All @@ -170,23 +170,20 @@ func putClusterConfig(s *state.State, r *http.Request) response.Response {
}); err != nil {
return response.InternalError(fmt.Errorf("database transaction to update cluster configuration failed: %w", err))
}
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: newConfig.Kubelet.ClusterDomain,
}, &data); err != nil {
return response.InternalError(fmt.Errorf("failed to encode node config: %w", err))
}
cmData := types.MapFromNodeConfig(types.NodeConfig{
ClusterDNS: &dnsIP,
ClusterDomain: &newConfig.Kubelet.ClusterDomain,
})

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return response.InternalError(fmt.Errorf("failed to create kubernetes client: %w", err))
}

if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", data); err != nil {
return response.InternalError(fmt.Errorf("failed to update node config: %w", err))
}
if _, err := client.UpdateConfigMap(r.Context(), "kube-system", "k8sd-config", cmData); err != nil {
return response.InternalError(fmt.Errorf("failed to update node config: %w", err))
}

if req.Config.LocalStorage != nil {
Expand Down
2 changes: 1 addition & 1 deletion src/k8s/pkg/k8sd/api/cluster_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func postClusterJoin(m *microcluster.MicroCluster, s *state.State, r *http.Reque
return response.BadRequest(fmt.Errorf("invalid hostname %q: %w", req.Name, err))
}

timeout := utils.TimeoutFromCtx(s.Context, 30*time.Second)
timeout := utils.TimeoutFromCtx(r.Context(), 30*time.Second)

internalToken := types.InternalWorkerNodeToken{}
// Check if token is worker token
Expand Down
29 changes: 13 additions & 16 deletions src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/canonical/k8s/pkg/utils/vals"
"github.com/canonical/microcluster/state"
"github.com/mitchellh/mapstructure"
)

// onBootstrap is called after we bootstrap the first cluster node.
Expand Down Expand Up @@ -290,8 +289,9 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error
}
}

var dnsIP = cfg.Kubelet.ClusterDNS
if cfg.DNS.Enabled != nil {
dnsIP, _, err := component.ReconcileDNSComponent(s.Context, snap, vals.Pointer(false), cfg.DNS.Enabled, cfg)
dnsIP, _, err = component.ReconcileDNSComponent(s.Context, snap, vals.Pointer(false), cfg.DNS.Enabled, cfg)
if err != nil {
return fmt.Errorf("failed to reconcile dns: %w", err)
}
Expand All @@ -307,23 +307,20 @@ func onBootstrapControlPlane(s *state.State, initConfig map[string]string) error
}); err != nil {
return fmt.Errorf("database transaction to update cluster configuration failed: %w", err)
}
}

var data map[string]string
if err := mapstructure.Decode(types.NodeConfig{
ClusterDNS: dnsIP,
ClusterDomain: cfg.Kubelet.ClusterDomain,
}, &data); err != nil {
return fmt.Errorf("failed to encode node config: %w", err)
}
cmData := types.MapFromNodeConfig(types.NodeConfig{
ClusterDNS: &dnsIP,
ClusterDomain: &cfg.Kubelet.ClusterDomain,
})

client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
client, err := k8s.NewClient(snap.KubernetesRESTClientGetter(""))
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}

if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", data); err != nil {
return fmt.Errorf("failed to update node configs: %w", err)
}
if _, err := client.UpdateConfigMap(s.Context, "kube-system", "k8sd-config", cmData); err != nil {
return fmt.Errorf("failed to update node configs: %w", err)
}

if cfg.LocalStorage.Enabled != nil {
Expand Down
15 changes: 14 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"time"

"github.com/canonical/k8s/pkg/k8sd/controllers"
"github.com/canonical/k8s/pkg/snap"
Expand All @@ -13,7 +14,19 @@ func onStart(s *state.State) error {
snap := snap.SnapFromContext(s.Context)

configController := controllers.NewNodeConfigurationController(snap, func(ctx context.Context) *k8s.Client {
return k8s.RetryNewClient(ctx, snap.KubernetesNodeRESTClientGetter("kube-system"))
for {
select {
case <-ctx.Done():
return nil
case <-time.After(3 * time.Second):
}

client, err := k8s.NewClient(snap.KubernetesNodeRESTClientGetter("kube-system"))
if err != nil {
continue
}
return client
}
})
go configController.Run(s.Context)

Expand Down
23 changes: 13 additions & 10 deletions src/k8s/pkg/k8sd/controllers/node_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils/k8s"
"github.com/mitchellh/mapstructure"
v1 "k8s.io/api/core/v1"
)

Expand All @@ -33,7 +32,6 @@ func (c *NodeConfigurationController) Run(ctx context.Context) {
case <-ctx.Done():
return
case <-time.After(3 * time.Second):
default:
}

if err := client.WatchConfigMap(ctx, "kube-system", "k8sd-config", func(configMap *v1.ConfigMap) error { return c.reconcile(ctx, configMap) }); err != nil {
Expand All @@ -45,22 +43,27 @@ func (c *NodeConfigurationController) Run(ctx context.Context) {
}

func (c *NodeConfigurationController) reconcile(ctx context.Context, configMap *v1.ConfigMap) error {
var nodeConfig types.NodeConfig
if err := mapstructure.Decode(configMap.Data, &nodeConfig); err != nil {
return fmt.Errorf("failed to decode node config: %w", err)
}
nodeConfig := types.NodeConfigFromMap(configMap.Data)

kubeletUpdateMap := make(map[string]string)
var kubeletDeleteList []string

if nodeConfig.ClusterDNS != "" {
kubeletUpdateMap["--cluster-dns"] = nodeConfig.ClusterDNS
if nodeConfig.ClusterDNS != nil && *nodeConfig.ClusterDNS != "" {
kubeletUpdateMap["--cluster-dns"] = *nodeConfig.ClusterDNS
} else {
kubeletDeleteList = append(kubeletDeleteList, "--cluster-dns")
}

if nodeConfig.ClusterDomain != "" {
kubeletUpdateMap["--cluster-domain"] = nodeConfig.ClusterDomain
if nodeConfig.ClusterDomain != nil && *nodeConfig.ClusterDomain != "" {
kubeletUpdateMap["--cluster-domain"] = *nodeConfig.ClusterDomain
} else {
kubeletUpdateMap["--cluster-domain"] = "cluster.local"
}

if nodeConfig.CloudProvider != nil && *nodeConfig.CloudProvider != "" {
kubeletUpdateMap["--cloud-provider"] = *nodeConfig.CloudProvider
} else {
kubeletDeleteList = append(kubeletDeleteList, "--cloud-provider")
}

mustRestartKubelet, err := snaputil.UpdateServiceArguments(c.snap, "kubelet", kubeletUpdateMap, kubeletDeleteList)
Expand Down
Loading

0 comments on commit c768ab2

Please sign in to comment.