From 56356fa4b1b5ae1fb6e00db497f5f9249c35566f Mon Sep 17 00:00:00 2001 From: Alina Militaru <41362174+asincu@users.noreply.github.com> Date: Wed, 20 Sep 2023 12:10:19 -0700 Subject: [PATCH 1/2] Clean up network policies watches --- .../logstorage/elastic/elastic_controller.go | 6 -- .../esmetrics/esmetrics_controller.go | 60 +++++++++++++++---- .../esmetrics/esmetrics_controller_test.go | 24 ++++++-- .../kubecontrollers/es_kube_controllers.go | 41 +++++++++++++ .../es_kube_controllers_test.go | 18 ++++-- .../logstorage/linseed/linseed_controller.go | 5 +- 6 files changed, 124 insertions(+), 30 deletions(-) diff --git a/pkg/controller/logstorage/elastic/elastic_controller.go b/pkg/controller/logstorage/elastic/elastic_controller.go index 7dd4708964..8a40207238 100644 --- a/pkg/controller/logstorage/elastic/elastic_controller.go +++ b/pkg/controller/logstorage/elastic/elastic_controller.go @@ -38,10 +38,8 @@ import ( relasticsearch "github.com/tigera/operator/pkg/render/common/elasticsearch" "github.com/tigera/operator/pkg/render/common/networkpolicy" rsecret "github.com/tigera/operator/pkg/render/common/secret" - "github.com/tigera/operator/pkg/render/kubecontrollers" "github.com/tigera/operator/pkg/render/logstorage/esgateway" "github.com/tigera/operator/pkg/render/logstorage/esmetrics" - "github.com/tigera/operator/pkg/render/logstorage/linseed" "github.com/tigera/operator/pkg/render/monitor" "github.com/tigera/operator/pkg/tls/certificatemanagement" apps "k8s.io/api/apps/v1" @@ -151,10 +149,6 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { {Name: render.ElasticsearchInternalPolicyName, Namespace: render.ElasticsearchNamespace}, {Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.ElasticsearchNamespace}, {Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.KibanaNamespace}, - {Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace}, - {Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace}, - {Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace}, - {Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace}, }) // Watch for changes in storage classes, as new storage classes may be made available for LogStorage. diff --git a/pkg/controller/logstorage/esmetrics/esmetrics_controller.go b/pkg/controller/logstorage/esmetrics/esmetrics_controller.go index aba153e2c9..038fb2001a 100644 --- a/pkg/controller/logstorage/esmetrics/esmetrics_controller.go +++ b/pkg/controller/logstorage/esmetrics/esmetrics_controller.go @@ -17,6 +17,13 @@ package esmetrics import ( "context" "fmt" + "time" + + v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + "github.com/tigera/operator/pkg/render/common/networkpolicy" + "github.com/tigera/operator/pkg/render/logstorage/linseed" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" operatorv1 "github.com/tigera/operator/api/v1" "github.com/tigera/operator/pkg/common" @@ -49,13 +56,14 @@ const ( ) type ESMetricsSubController struct { - client client.Client - scheme *runtime.Scheme - status status.StatusManager - provider operatorv1.Provider - clusterDomain string - usePSP bool - multiTenant bool + client client.Client + scheme *runtime.Scheme + status status.StatusManager + provider operatorv1.Provider + clusterDomain string + usePSP bool + multiTenant bool + tierWatchReady *utils.ReadyFlag } func Add(mgr manager.Manager, opts options.AddOptions) error { @@ -70,11 +78,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { } r := &ESMetricsSubController{ - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion), - clusterDomain: opts.ClusterDomain, - provider: opts.DetectedProvider, + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion), + clusterDomain: opts.ClusterDomain, + provider: opts.DetectedProvider, + tierWatchReady: &utils.ReadyFlag{}, } r.status.Run(opts.ShutdownContext) @@ -106,6 +115,16 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { } } + k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return fmt.Errorf("log-storage-esmetrics-controller failed to establish a connection to k8s: %w", err) + } + + go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady) + go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{ + {Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace}, + }) + return nil } @@ -144,6 +163,23 @@ func (r *ESMetricsSubController) Reconcile(ctx context.Context, request reconcil return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil } + // Validate that the tier watch is ready before querying the tier to ensure we utilize the cache. + if !r.tierWatchReady.IsReady() { + r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger) + return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil + } + + // Ensure the allow-tigera tier exists, before rendering any network policies within it. + if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil { + if errors.IsNotFound(err) { + r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger) + return reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } else { + r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger) + return reconcile.Result{}, err + } + } + esMetricsSecret, err := utils.GetSecret(context.Background(), r.client, esmetrics.ElasticsearchMetricsSecret, common.OperatorNamespace()) if err != nil { r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to retrieve Elasticsearch metrics user secret.", err, reqLogger) diff --git a/pkg/controller/logstorage/esmetrics/esmetrics_controller_test.go b/pkg/controller/logstorage/esmetrics/esmetrics_controller_test.go index d9f12030ad..f04b0cd287 100644 --- a/pkg/controller/logstorage/esmetrics/esmetrics_controller_test.go +++ b/pkg/controller/logstorage/esmetrics/esmetrics_controller_test.go @@ -17,6 +17,8 @@ package esmetrics import ( "context" + v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" @@ -50,6 +52,7 @@ func NewESMetricsControllerWithShims( provider operatorv1.Provider, clusterDomain string, multiTenant bool, + readyFlag *utils.ReadyFlag, ) (*ESMetricsSubController, error) { opts := options.AddOptions{ @@ -60,11 +63,12 @@ func NewESMetricsControllerWithShims( } r := &ESMetricsSubController{ - client: cli, - scheme: scheme, - status: status, - clusterDomain: opts.ClusterDomain, - multiTenant: opts.MultiTenant, + client: cli, + scheme: scheme, + status: status, + clusterDomain: opts.ClusterDomain, + multiTenant: opts.MultiTenant, + tierWatchReady: readyFlag, } r.status.Run(opts.ShutdownContext) return r, nil @@ -77,6 +81,7 @@ var _ = Describe("LogStorage Linseed controller", func() { scheme *runtime.Scheme ctx context.Context r *ESMetricsSubController + readyFlag *utils.ReadyFlag ) BeforeEach(func() { scheme = runtime.NewScheme() @@ -98,8 +103,15 @@ var _ = Describe("LogStorage Linseed controller", func() { mockStatus.On("ReadyToMonitor") mockStatus.On("ClearDegraded") + readyFlag = &utils.ReadyFlag{} + readyFlag.MarkAsReady() + + // Create the allow-tigera Tier, since the controller blocks on its existence. + tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}} + Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred()) + var err error - r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false) + r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag) Expect(err).ShouldNot(HaveOccurred()) }) diff --git a/pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go b/pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go index 4f7a74cb62..6a98cc42d9 100644 --- a/pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go +++ b/pkg/controller/logstorage/kubecontrollers/es_kube_controllers.go @@ -17,6 +17,13 @@ package kubecontrollers import ( "context" "fmt" + "time" + + v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + "github.com/tigera/operator/pkg/render/common/networkpolicy" + "github.com/tigera/operator/pkg/render/logstorage/esgateway" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" operatorv1 "github.com/tigera/operator/api/v1" @@ -54,6 +61,7 @@ type ESKubeControllersController struct { clusterDomain string usePSP bool elasticExternal bool + tierWatchReady *utils.ReadyFlag } func Add(mgr manager.Manager, opts options.AddOptions) error { @@ -72,6 +80,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { clusterDomain: opts.ClusterDomain, status: status.New(mgr.GetClient(), "log-storage-kubecontrollers", opts.KubernetesVersion), elasticExternal: opts.ElasticExternal, + tierWatchReady: &utils.ReadyFlag{}, } r.status.Run(opts.ShutdownContext) @@ -118,6 +127,9 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil { return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err) } + if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil { + return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err) + } if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil { return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err) } @@ -129,6 +141,18 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { return fmt.Errorf("log-storage-kubecontrollers failed to create periodic reconcile watch: %w", err) } + k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err) + } + + // Start goroutines to establish watches against projectcalico.org/v3 resources. + go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady) + go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{ + {Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace}, + {Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace}, + }) + return nil } @@ -170,6 +194,23 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec return reconcile.Result{}, err } + // Validate that the tier watch is ready before querying the tier to ensure we utilize the cache. + if !r.tierWatchReady.IsReady() { + r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger) + return reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } + + // Ensure the allow-tigera tier exists, before rendering any network policies within it. + if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil { + if errors.IsNotFound(err) { + r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger) + return reconcile.Result{RequeueAfter: 10 * time.Second}, nil + } else { + r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger) + return reconcile.Result{}, err + } + } + managementClusterConnection, err := utils.GetManagementClusterConnection(ctx, r.client) if err != nil { r.status.SetDegraded(operatorv1.ResourceReadError, "Error reading ManagementClusterConnection", err, reqLogger) diff --git a/pkg/controller/logstorage/kubecontrollers/es_kube_controllers_test.go b/pkg/controller/logstorage/kubecontrollers/es_kube_controllers_test.go index b5f4684ce5..f973a003e8 100644 --- a/pkg/controller/logstorage/kubecontrollers/es_kube_controllers_test.go +++ b/pkg/controller/logstorage/kubecontrollers/es_kube_controllers_test.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" + controllerruntime "sigs.k8s.io/controller-runtime" . "github.com/onsi/ginkgo" @@ -66,6 +68,7 @@ func NewControllerWithShims( provider operatorv1.Provider, clusterDomain string, multiTenant bool, + tierWatchReady *utils.ReadyFlag, ) (*ESKubeControllersController, error) { opts := options.AddOptions{ DetectedProvider: provider, @@ -75,10 +78,11 @@ func NewControllerWithShims( } r := &ESKubeControllersController{ - client: cli, - scheme: scheme, - status: status, - clusterDomain: opts.ClusterDomain, + client: cli, + scheme: scheme, + status: status, + clusterDomain: opts.ClusterDomain, + tierWatchReady: tierWatchReady, } r.status.Run(opts.ShutdownContext) return r, nil @@ -176,8 +180,12 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() { Expect(cli.Create(ctx, bundle.ConfigMap(common.CalicoNamespace))).ShouldNot(HaveOccurred()) // Create the reconciler for the tests. - r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false) + r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag) Expect(err).ShouldNot(HaveOccurred()) + + // Create the allow-tigera Tier, since the controller blocks on its existence. + tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}} + Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred()) }) It("should wait for the cluster CA to be provisioned", func() { diff --git a/pkg/controller/logstorage/linseed/linseed_controller.go b/pkg/controller/logstorage/linseed/linseed_controller.go index 0c9f381ea3..6def6a69c2 100644 --- a/pkg/controller/logstorage/linseed/linseed_controller.go +++ b/pkg/controller/logstorage/linseed/linseed_controller.go @@ -173,10 +173,13 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { - return fmt.Errorf("log-storage-elastic-controller failed to establish a connection to k8s: %w", err) + return fmt.Errorf("log-storage-linseed-controller failed to establish a connection to k8s: %w", err) } go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady) + go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{ + {Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace}, + }) go utils.WaitToAddResourceWatch(c, k8sClient, log, r.dpiAPIReady, []client.Object{&v3.DeepPacketInspection{TypeMeta: metav1.TypeMeta{Kind: v3.KindDeepPacketInspection}}}) // Perform periodic reconciliation. This acts as a backstop to catch reconcile issues, From 28b7d751bcebf3a414c07d523999b1b012d69024 Mon Sep 17 00:00:00 2001 From: Alina Militaru <41362174+asincu@users.noreply.github.com> Date: Fri, 15 Dec 2023 13:51:17 -0800 Subject: [PATCH 2/2] [CODE REVIEW] Watch linseed policy for multi-tenant --- pkg/controller/logstorage/esmetrics/esmetrics_controller.go | 3 +-- pkg/controller/logstorage/linseed/linseed_controller.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/controller/logstorage/esmetrics/esmetrics_controller.go b/pkg/controller/logstorage/esmetrics/esmetrics_controller.go index 038fb2001a..cc4115f786 100644 --- a/pkg/controller/logstorage/esmetrics/esmetrics_controller.go +++ b/pkg/controller/logstorage/esmetrics/esmetrics_controller.go @@ -21,7 +21,6 @@ import ( v3 "github.com/tigera/api/pkg/apis/projectcalico/v3" "github.com/tigera/operator/pkg/render/common/networkpolicy" - "github.com/tigera/operator/pkg/render/logstorage/linseed" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -122,7 +121,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady) go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{ - {Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace}, + {Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace}, }) return nil diff --git a/pkg/controller/logstorage/linseed/linseed_controller.go b/pkg/controller/logstorage/linseed/linseed_controller.go index 6def6a69c2..b614b980aa 100644 --- a/pkg/controller/logstorage/linseed/linseed_controller.go +++ b/pkg/controller/logstorage/linseed/linseed_controller.go @@ -178,7 +178,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error { go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady) go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{ - {Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace}, + {Name: linseed.PolicyName, Namespace: helper.InstallNamespace()}, }) go utils.WaitToAddResourceWatch(c, k8sClient, log, r.dpiAPIReady, []client.Object{&v3.DeepPacketInspection{TypeMeta: metav1.TypeMeta{Kind: v3.KindDeepPacketInspection}}})