Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up network policies watches #2888

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions pkg/controller/logstorage/elastic/elastic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ import (
relasticsearch "github.com/tigera/operator/pkg/render/common/elasticsearch"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
rsecret "github.com/tigera/operator/pkg/render/common/secret"
"github.com/tigera/operator/pkg/render/kubecontrollers"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"github.com/tigera/operator/pkg/render/logstorage/esmetrics"
"github.com/tigera/operator/pkg/render/logstorage/linseed"
"github.com/tigera/operator/pkg/render/monitor"
"github.com/tigera/operator/pkg/tls/certificatemanagement"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -151,10 +149,6 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
{Name: render.ElasticsearchInternalPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: networkpolicy.TigeraComponentDefaultDenyPolicyName, Namespace: render.KibanaNamespace},
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: esmetrics.ElasticsearchMetricsPolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
})

// Watch for changes in storage classes, as new storage classes may be made available for LogStorage.
Expand Down
60 changes: 48 additions & 12 deletions pkg/controller/logstorage/esmetrics/esmetrics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ package esmetrics
import (
"context"
"fmt"
"time"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/logstorage/linseed"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

operatorv1 "github.com/tigera/operator/api/v1"
"github.com/tigera/operator/pkg/common"
Expand Down Expand Up @@ -49,13 +56,14 @@ const (
)

type ESMetricsSubController struct {
client client.Client
scheme *runtime.Scheme
status status.StatusManager
provider operatorv1.Provider
clusterDomain string
usePSP bool
multiTenant bool
client client.Client
scheme *runtime.Scheme
status status.StatusManager
provider operatorv1.Provider
clusterDomain string
usePSP bool
multiTenant bool
tierWatchReady *utils.ReadyFlag
}

func Add(mgr manager.Manager, opts options.AddOptions) error {
Expand All @@ -70,11 +78,12 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}

r := &ESMetricsSubController{
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion),
clusterDomain: opts.ClusterDomain,
provider: opts.DetectedProvider,
client: mgr.GetClient(),
scheme: mgr.GetScheme(),
status: status.New(mgr.GetClient(), tigeraStatusName, opts.KubernetesVersion),
clusterDomain: opts.ClusterDomain,
provider: opts.DetectedProvider,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)

Expand Down Expand Up @@ -106,6 +115,16 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
}
}

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-esmetrics-controller failed to establish a connection to k8s: %w", err)
}

go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
asincu marked this conversation as resolved.
Show resolved Hide resolved
})

return nil
}

Expand Down Expand Up @@ -144,6 +163,23 @@ func (r *ESMetricsSubController) Reconcile(ctx context.Context, request reconcil
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}

// Validate that the tier watch is ready before querying the tier to ensure we utilize the cache.
if !r.tierWatchReady.IsReady() {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger)
return reconcile.Result{RequeueAfter: utils.StandardRetry}, nil
}

// Ensure the allow-tigera tier exists, before rendering any network policies within it.
if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
} else {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger)
return reconcile.Result{}, err
}
}

esMetricsSecret, err := utils.GetSecret(context.Background(), r.client, esmetrics.ElasticsearchMetricsSecret, common.OperatorNamespace())
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Failed to retrieve Elasticsearch metrics user secret.", err, reqLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package esmetrics
import (
"context"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -50,6 +52,7 @@ func NewESMetricsControllerWithShims(
provider operatorv1.Provider,
clusterDomain string,
multiTenant bool,
readyFlag *utils.ReadyFlag,
) (*ESMetricsSubController, error) {

opts := options.AddOptions{
Expand All @@ -60,11 +63,12 @@ func NewESMetricsControllerWithShims(
}

r := &ESMetricsSubController{
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
multiTenant: opts.MultiTenant,
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
multiTenant: opts.MultiTenant,
tierWatchReady: readyFlag,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand All @@ -77,6 +81,7 @@ var _ = Describe("LogStorage Linseed controller", func() {
scheme *runtime.Scheme
ctx context.Context
r *ESMetricsSubController
readyFlag *utils.ReadyFlag
)
BeforeEach(func() {
scheme = runtime.NewScheme()
Expand All @@ -98,8 +103,15 @@ var _ = Describe("LogStorage Linseed controller", func() {
mockStatus.On("ReadyToMonitor")
mockStatus.On("ClearDegraded")

readyFlag = &utils.ReadyFlag{}
readyFlag.MarkAsReady()

// Create the allow-tigera Tier, since the controller blocks on its existence.
tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}}
Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred())

var err error
r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false)
r, err = NewESMetricsControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag)
Expect(err).ShouldNot(HaveOccurred())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ package kubecontrollers
import (
"context"
"fmt"
"time"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"
"github.com/tigera/operator/pkg/render/common/networkpolicy"
"github.com/tigera/operator/pkg/render/logstorage/esgateway"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

operatorv1 "github.com/tigera/operator/api/v1"

Expand Down Expand Up @@ -54,6 +61,7 @@ type ESKubeControllersController struct {
clusterDomain string
usePSP bool
elasticExternal bool
tierWatchReady *utils.ReadyFlag
}

func Add(mgr manager.Manager, opts options.AddOptions) error {
Expand All @@ -72,6 +80,7 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
clusterDomain: opts.ClusterDomain,
status: status.New(mgr.GetClient(), "log-storage-kubecontrollers", opts.KubernetesVersion),
elasticExternal: opts.ElasticExternal,
tierWatchReady: &utils.ReadyFlag{},
}
r.status.Run(opts.ShutdownContext)

Expand Down Expand Up @@ -118,6 +127,9 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
if err := utils.AddServiceWatch(c, render.ElasticsearchServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddServiceWatch(c, esgateway.ServiceName, render.ElasticsearchNamespace); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
if err := utils.AddConfigMapWatch(c, certificatemanagement.TrustedCertConfigMapName, common.CalicoNamespace, &handler.EnqueueRequestForObject{}); err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to watch the Service resource: %w", err)
}
Expand All @@ -129,6 +141,18 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {
return fmt.Errorf("log-storage-kubecontrollers failed to create periodic reconcile watch: %w", err)
}

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-kubecontrollers failed to establish a connection to k8s: %w", err)
}

// Start goroutines to establish watches against projectcalico.org/v3 resources.
go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: esgateway.PolicyName, Namespace: render.ElasticsearchNamespace},
{Name: kubecontrollers.EsKubeControllerNetworkPolicyName, Namespace: common.CalicoNamespace},
})

return nil
}

Expand Down Expand Up @@ -170,6 +194,23 @@ func (r *ESKubeControllersController) Reconcile(ctx context.Context, request rec
return reconcile.Result{}, err
}

// Validate that the tier watch is ready before querying the tier to ensure we utilize the cache.
if !r.tierWatchReady.IsReady() {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for Tier watch to be established", nil, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}

// Ensure the allow-tigera tier exists, before rendering any network policies within it.
if err := r.client.Get(ctx, client.ObjectKey{Name: networkpolicy.TigeraComponentTierName}, &v3.Tier{}); err != nil {
if errors.IsNotFound(err) {
r.status.SetDegraded(operatorv1.ResourceNotReady, "Waiting for allow-tigera tier to be created", err, reqLogger)
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
} else {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error querying allow-tigera tier", err, reqLogger)
return reconcile.Result{}, err
}
}

managementClusterConnection, err := utils.GetManagementClusterConnection(ctx, r.client)
if err != nil {
r.status.SetDegraded(operatorv1.ResourceReadError, "Error reading ManagementClusterConnection", err, reqLogger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

v3 "github.com/tigera/api/pkg/apis/projectcalico/v3"

controllerruntime "sigs.k8s.io/controller-runtime"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -66,6 +68,7 @@ func NewControllerWithShims(
provider operatorv1.Provider,
clusterDomain string,
multiTenant bool,
tierWatchReady *utils.ReadyFlag,
) (*ESKubeControllersController, error) {
opts := options.AddOptions{
DetectedProvider: provider,
Expand All @@ -75,10 +78,11 @@ func NewControllerWithShims(
}

r := &ESKubeControllersController{
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
client: cli,
scheme: scheme,
status: status,
clusterDomain: opts.ClusterDomain,
tierWatchReady: tierWatchReady,
}
r.status.Run(opts.ShutdownContext)
return r, nil
Expand Down Expand Up @@ -176,8 +180,12 @@ var _ = Describe("LogStorage ES kube-controllers controller", func() {
Expect(cli.Create(ctx, bundle.ConfigMap(common.CalicoNamespace))).ShouldNot(HaveOccurred())

// Create the reconciler for the tests.
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false)
r, err = NewControllerWithShims(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain, false, readyFlag)
Expect(err).ShouldNot(HaveOccurred())

// Create the allow-tigera Tier, since the controller blocks on its existence.
tier := &v3.Tier{ObjectMeta: metav1.ObjectMeta{Name: "allow-tigera"}}
Expect(cli.Create(ctx, tier)).ShouldNot(HaveOccurred())
})

It("should wait for the cluster CA to be provisioned", func() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/logstorage/linseed/linseed_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,13 @@ func Add(mgr manager.Manager, opts options.AddOptions) error {

k8sClient, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return fmt.Errorf("log-storage-elastic-controller failed to establish a connection to k8s: %w", err)
return fmt.Errorf("log-storage-linseed-controller failed to establish a connection to k8s: %w", err)
}

go utils.WaitToAddTierWatch(networkpolicy.TigeraComponentTierName, c, k8sClient, log, r.tierWatchReady)
go utils.WaitToAddNetworkPolicyWatches(c, k8sClient, log, []types.NamespacedName{
{Name: linseed.PolicyName, Namespace: render.ElasticsearchNamespace},
Copy link
Member

@caseydavenport caseydavenport Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few types of watches:

  • Watches that are always in the Install namespace (helper.InstallNamespace())
  • Watches that are always in the truth namespace (helper.TruthNamespace())
  • Watches that are either in a specific namespace for single-tenant, and all namespaces for multi-tenant (like this one)

We actually define the helper above such that for multi-tenant clusters, helper.InstallNamespace returns "" so I think you can just use that here

})
go utils.WaitToAddResourceWatch(c, k8sClient, log, r.dpiAPIReady, []client.Object{&v3.DeepPacketInspection{TypeMeta: metav1.TypeMeta{Kind: v3.KindDeepPacketInspection}}})

// Perform periodic reconciliation. This acts as a backstop to catch reconcile issues,
Expand Down