Skip to content

Commit

Permalink
NETOBSERV-1931: introduce flowcollector validation webhook
Browse files Browse the repository at this point in the history
Start by validating agent features
  • Loading branch information
jotak committed Oct 16, 2024
1 parent 737a699 commit 0a5e298
Show file tree
Hide file tree
Showing 34 changed files with 837 additions and 193 deletions.
1 change: 1 addition & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ resources:
version: v1beta2
webhooks:
conversion: true
validation: true
webhookVersion: v1
- controller: true
group: core
Expand Down
72 changes: 70 additions & 2 deletions apis/flowcollector/v1beta2/flowcollector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,30 @@ limitations under the License.

package v1beta2

import ctrl "sigs.k8s.io/controller-runtime"
import (
"context"
"fmt"
"slices"

"k8s.io/apimachinery/pkg/api/errors"
runtime "k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/netobserv/network-observability-operator/pkg/cluster"
)

// +kubebuilder:webhook:verbs=create;update,path=/validate-flows-netobserv-io-v1beta2-flowcollector,mutating=false,failurePolicy=fail,sideEffects=None,groups=flows.netobserv.io,resources=flowcollectors,versions=v1beta2,name=flowcollectorconversionwebhook.netobserv.io,admissionReviewVersions=v1
var (
log = logf.Log.WithName("flowcollector-resource")
CurrentClusterInfo *cluster.Info
)

// +kubebuilder:webhook:verbs=create;update,path=/validate-netobserv-io-v1beta2-flowcollector,mutating=false,failurePolicy=fail,groups=netobserv.io,resources=flowcollectors,versions=v1beta2,name=flowcollectorconversionwebhook.netobserv.io,sideEffects=None,admissionReviewVersions=v1
func (r *FlowCollector) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
WithValidator(r).
Complete()
}

Expand All @@ -30,3 +48,53 @@ func (r *FlowCollector) SetupWebhookWithManager(mgr ctrl.Manager) error {
// https://book.kubebuilder.io/multiversion-tutorial/conversion-concepts.html
func (*FlowCollector) Hub() {}
func (*FlowCollectorList) Hub() {}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *FlowCollector) ValidateCreate(ctx context.Context, newObj runtime.Object) (admission.Warnings, error) {
log.Info("validate create", "name", r.Name)
fc, ok := newObj.(*FlowCollector)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected a FlowCollector but got a %T", newObj))
}
return r.validate(ctx, fc)
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *FlowCollector) ValidateUpdate(ctx context.Context, _, newObj runtime.Object) (admission.Warnings, error) {
log.Info("validate update", "name", r.Name)
fc, ok := newObj.(*FlowCollector)
if !ok {
return nil, errors.NewBadRequest(fmt.Sprintf("expected a FlowCollector but got a %T", newObj))
}
return r.validate(ctx, fc)
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *FlowCollector) ValidateDelete(_ context.Context, _ runtime.Object) (admission.Warnings, error) {
log.Info("validate delete", "name", r.Name)
return nil, nil
}

func (r *FlowCollector) validate(_ context.Context, fc *FlowCollector) (admission.Warnings, error) {
var warnings admission.Warnings
if slices.Contains(fc.Spec.Agent.EBPF.Features, NetworkEvents) {
// Make sure required version of ocp is installed
if CurrentClusterInfo != nil && CurrentClusterInfo.IsOpenShift() {
b, err := CurrentClusterInfo.OpenShiftVersionIsAtLeast("4.18.0")
if err != nil {
warnings = append(warnings, fmt.Sprintf("Could not detect OpenShift cluster version: %s", err.Error()))
} else if !b {
warnings = append(warnings, fmt.Sprintf("The NetworkEvents feature requires OpenShift 4.18 or above (version detected: %s)", CurrentClusterInfo.GetOpenShiftVersion()))
}
} else {
warnings = append(warnings, "The NetworkEvents feature is only supported with OpenShift")
}
if !fc.Spec.Agent.EBPF.Privileged {
warnings = append(warnings, "The NetworkEvents feature requires eBPF Agent to run in privileged mode")
}
}
if slices.Contains(fc.Spec.Agent.EBPF.Features, PacketDrop) && !fc.Spec.Agent.EBPF.Privileged {
warnings = append(warnings, "The PacketDrop feature requires eBPF Agent to run in privileged mode")
}
return warnings, nil
}
2 changes: 1 addition & 1 deletion apis/flowcollector/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -1373,7 +1373,7 @@ spec:
generateName: flowcollectorconversionwebhook.netobserv.io
rules:
- apiGroups:
- netobserv.io
- flows.netobserv.io
apiVersions:
- v1beta2
operations:
Expand All @@ -1384,7 +1384,7 @@ spec:
sideEffects: None
targetPort: 9443
type: ValidatingAdmissionWebhook
webhookPath: /validate-netobserv-io-v1beta2-flowcollector
webhookPath: /validate-flows-netobserv-io-v1beta2-flowcollector
- admissionReviewVersions:
- v1
containerPort: 443
Expand Down
4 changes: 2 additions & 2 deletions config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ webhooks:
service:
name: webhook-service
namespace: system
path: /validate-netobserv-io-v1beta2-flowcollector
path: /validate-flows-netobserv-io-v1beta2-flowcollector
failurePolicy: Fail
name: flowcollectorconversionwebhook.netobserv.io
rules:
- apiGroups:
- netobserv.io
- flows.netobserv.io
apiVersions:
- v1beta2
operations:
Expand Down
2 changes: 1 addition & 1 deletion controllers/consoleplugin/consoleplugin_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (b *builder) getPromConfig(ctx context.Context) cfg.PrometheusConfig {
config.Timeout = api.Duration{Duration: b.desired.Prometheus.Querier.Timeout.Duration}
}
if b.desired.Prometheus.Querier.Mode == "" || b.desired.Prometheus.Querier.Mode == flowslatest.PromModeAuto {
if b.info.UseOpenShiftSCC /* aka IsOpenShift */ {
if b.info.ClusterInfo.IsOpenShift() {
config.URL = "https://thanos-querier.openshift-monitoring.svc:9091/" // requires cluster-monitoringv-view cluster role
config.DevURL = "https://thanos-querier.openshift-monitoring.svc:9092/" // restricted to a particular namespace
config.ForwardUserToken = true
Expand Down
10 changes: 5 additions & 5 deletions controllers/consoleplugin/consoleplugin_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewReconciler(cmn *reconcilers.Instance) CPReconciler {
serviceAccount: cmn.Managed.NewServiceAccount(constants.PluginName),
configMap: cmn.Managed.NewConfigMap(configMapName),
}
if cmn.AvailableAPIs.HasSvcMonitor() {
if cmn.ClusterInfo.HasSvcMonitor() {
rec.serviceMonitor = cmn.Managed.NewServiceMonitor(constants.PluginName)
}
return rec
Expand All @@ -67,21 +67,21 @@ func (r *CPReconciler) Reconcile(ctx context.Context, desired *flowslatest.FlowC
return err
}

if r.AvailableAPIs.HasConsolePlugin() {
if r.ClusterInfo.HasConsolePlugin() {
if err = r.checkAutoPatch(ctx, desired); err != nil {
return err
}
}

if helper.UseConsolePlugin(&desired.Spec) && (r.AvailableAPIs.HasConsolePlugin() || helper.UseTestConsolePlugin(&desired.Spec)) {
if helper.UseConsolePlugin(&desired.Spec) && (r.ClusterInfo.HasConsolePlugin() || helper.UseTestConsolePlugin(&desired.Spec)) {
// Create object builder
builder := newBuilder(r.Instance, &desired.Spec)

if err := r.reconcilePermissions(ctx, &builder); err != nil {
return err
}

if r.AvailableAPIs.HasConsolePlugin() {
if r.ClusterInfo.HasConsolePlugin() {
if err = r.reconcilePlugin(ctx, &builder, &desired.Spec); err != nil {
return err
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func (r *CPReconciler) reconcileServices(ctx context.Context, builder *builder)
if err := r.ReconcileService(ctx, r.metricsService, builder.metricsService(), &report); err != nil {
return err
}
if r.AvailableAPIs.HasSvcMonitor() {
if r.ClusterInfo.HasSvcMonitor() {
serviceMonitor := builder.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.Managed, &r.Client, r.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion controllers/consoleplugin/consoleplugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
config "github.com/netobserv/network-observability-operator/controllers/consoleplugin/config"
"github.com/netobserv/network-observability-operator/controllers/constants"
"github.com/netobserv/network-observability-operator/controllers/reconcilers"
"github.com/netobserv/network-observability-operator/pkg/cluster"
"github.com/netobserv/network-observability-operator/pkg/helper"
"github.com/netobserv/network-observability-operator/pkg/manager/status"
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func getAutoScalerSpecs() (ascv2.HorizontalPodAutoscaler, flowslatest.FlowCollec
}

func getBuilder(spec *flowslatest.FlowCollectorSpec, lk *helper.LokiConfig) builder {
info := reconcilers.Common{Namespace: testNamespace, Loki: lk}
info := reconcilers.Common{Namespace: testNamespace, Loki: lk, ClusterInfo: &cluster.Info{}}
b := newBuilder(info.NewInstance(testImage, status.Instance{}), spec)
_, _, _ = b.configMap(context.Background()) // build configmap to update builder's volumes
return b
Expand Down
4 changes: 2 additions & 2 deletions controllers/ebpf/agent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func NewAgentController(common *reconcilers.Instance) *AgentController {
permissions: permissions.NewReconciler(common),
promSvc: common.Managed.NewService(constants.EBPFAgentMetricsSvcName),
}
if common.AvailableAPIs.HasSvcMonitor() {
if common.ClusterInfo.HasSvcMonitor() {
agent.serviceMonitor = common.Managed.NewServiceMonitor(constants.EBPFAgentMetricsSvcMonitoringName)
}
if common.AvailableAPIs.HasPromRule() {
if common.ClusterInfo.HasPromRule() {
agent.prometheusRule = common.Managed.NewPrometheusRule(constants.EBPFAgentPromoAlertRule)
}
return &agent
Expand Down
8 changes: 4 additions & 4 deletions controllers/ebpf/agent_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ func (c *AgentController) reconcileMetricsService(ctx context.Context, target *f

if !helper.IsEBPFMetricsEnabled(target) {
c.Managed.TryDelete(ctx, c.promSvc)
if c.AvailableAPIs.HasSvcMonitor() {
if c.ClusterInfo.HasSvcMonitor() {
c.Managed.TryDelete(ctx, c.serviceMonitor)
}
if c.AvailableAPIs.HasPromRule() {
if c.ClusterInfo.HasPromRule() {
c.Managed.TryDelete(ctx, c.prometheusRule)
}
return nil
Expand All @@ -33,15 +33,15 @@ func (c *AgentController) reconcileMetricsService(ctx context.Context, target *f
if err := c.ReconcileService(ctx, c.promSvc, c.promService(target), &report); err != nil {
return err
}
if c.AvailableAPIs.HasSvcMonitor() {
if c.ClusterInfo.HasSvcMonitor() {
serviceMonitor := c.promServiceMonitoring(target)
if err := reconcilers.GenericReconcile(ctx, c.Managed, &c.Client, c.serviceMonitor,
serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}

if c.AvailableAPIs.HasPromRule() {
if c.ClusterInfo.HasPromRule() {
promRules := c.agentPrometheusRule(target)
if err := reconcilers.GenericReconcile(ctx, c.Managed, &c.Client, c.prometheusRule, promRules, &report, helper.PrometheusRuleChanged); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion controllers/ebpf/internal/permissions/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *Reconciler) reconcileServiceAccount(ctx context.Context) error {
func (c *Reconciler) reconcileVendorPermissions(
ctx context.Context, desired *flowslatest.FlowCollectorEBPF,
) error {
if c.UseOpenShiftSCC {
if c.ClusterInfo.IsOpenShift() {
return c.reconcileOpenshiftPermissions(ctx, desired)
}
return nil
Expand Down
9 changes: 4 additions & 5 deletions controllers/flowcollector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func Start(ctx context.Context, mgr *manager.Manager) error {
Owns(&corev1.Service{}).
Owns(&corev1.ServiceAccount{})

if mgr.IsOpenShift() {
if mgr.ClusterInfo.IsOpenShift() {
builder.Owns(&securityv1.SecurityContextConstraints{})
}
if mgr.HasConsolePlugin() {
if mgr.ClusterInfo.HasConsolePlugin() {
builder.Owns(&osv1.ConsolePlugin{})
} else {
log.Info("Console not detected: the console plugin is not available")
}
if !mgr.HasCNO() {
if !mgr.ClusterInfo.HasCNO() {
log.Info("CNO not detected: using ovnKubernetes config and reconciler")
}

Expand Down Expand Up @@ -181,8 +181,7 @@ func (r *FlowCollectorReconciler) newCommonInfo(clh *helper.Client, ns, prevNs s
Client: *clh,
Namespace: ns,
PreviousNamespace: prevNs,
UseOpenShiftSCC: r.mgr.IsOpenShift(),
AvailableAPIs: &r.mgr.AvailableAPIs,
ClusterInfo: r.mgr.ClusterInfo,
Watcher: r.watcher,
Loki: loki,
IsDownstream: r.mgr.Config.DownstreamDeployment,
Expand Down
9 changes: 4 additions & 5 deletions controllers/flp/flp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow
r.watcher.Reset(ns)

// obtain default cluster ID - api is specific to openshift
if r.mgr.IsOpenShift() && r.clusterID == "" {
if r.mgr.ClusterInfo.IsOpenShift() && r.clusterID == "" {
cversion := &configv1.ClusterVersion{}
key := client.ObjectKey{Name: "version"}
if err := r.Client.Get(ctx, key, cversion); err != nil {
Expand All @@ -138,7 +138,7 @@ func (r *Reconciler) reconcile(ctx context.Context, clh *helper.Client, fc *flow

// Auto-detect subnets
var subnetLabels []flowslatest.SubnetLabel
if r.mgr.IsOpenShift() && helper.AutoDetectOpenShiftNetworks(&fc.Spec.Processor) {
if r.mgr.ClusterInfo.IsOpenShift() && helper.AutoDetectOpenShiftNetworks(&fc.Spec.Processor) {
var err error
subnetLabels, err = r.getOpenShiftSubnets(ctx)
if err != nil {
Expand Down Expand Up @@ -190,8 +190,7 @@ func (r *Reconciler) newCommonInfo(clh *helper.Client, ns, prevNs string, loki *
Client: *clh,
Namespace: ns,
PreviousNamespace: prevNs,
UseOpenShiftSCC: r.mgr.IsOpenShift(),
AvailableAPIs: &r.mgr.AvailableAPIs,
ClusterInfo: r.mgr.ClusterInfo,
Watcher: r.watcher,
Loki: loki,
ClusterID: r.clusterID,
Expand Down Expand Up @@ -278,7 +277,7 @@ func (r *Reconciler) getOpenShiftSubnets(ctx context.Context) ([]flowslatest.Sub
var subnets []flowslatest.SubnetLabel

// Pods and Services subnets are found in CNO config
if r.mgr.HasCNO() {
if r.mgr.ClusterInfo.HasCNO() {
network := &configv1.Network{}
err := r.Get(ctx, types.NamespacedName{Name: "cluster"}, network)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion controllers/flp/flp_monolith_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func newMonolithBuilder(info *reconcilers.Instance, desired *flowslatest.FlowCol
}

func (b *monolithBuilder) daemonSet(annotations map[string]string) *appsv1.DaemonSet {
pod := b.generic.podTemplate(true /*listens*/, !b.generic.info.UseOpenShiftSCC, annotations)
pod := b.generic.podTemplate(true /*listens*/, !b.generic.info.ClusterInfo.IsOpenShift(), annotations)
return &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: b.generic.name(),
Expand Down
10 changes: 5 additions & 5 deletions controllers/flp/flp_monolith_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func newMonolithReconciler(cmn *reconcilers.Instance) *monolithReconciler {
roleBindingIn: cmn.Managed.NewCRB(RoleBindingMonoName(ConfKafkaIngester)),
roleBindingTr: cmn.Managed.NewCRB(RoleBindingMonoName(ConfKafkaTransformer)),
}
if cmn.AvailableAPIs.HasSvcMonitor() {
if cmn.ClusterInfo.HasSvcMonitor() {
rec.serviceMonitor = cmn.Managed.NewServiceMonitor(serviceMonitorName(ConfMonolith))
}
if cmn.AvailableAPIs.HasPromRule() {
if cmn.ClusterInfo.HasPromRule() {
rec.prometheusRule = cmn.Managed.NewPrometheusRule(prometheusRuleName(ConfMonolith))
}
return &rec
Expand Down Expand Up @@ -160,13 +160,13 @@ func (r *monolithReconciler) reconcilePrometheusService(ctx context.Context, bui
if err := r.ReconcileService(ctx, r.promService, builder.promService(), &report); err != nil {
return err
}
if r.AvailableAPIs.HasSvcMonitor() {
if r.ClusterInfo.HasSvcMonitor() {
serviceMonitor := builder.generic.serviceMonitor()
if err := reconcilers.GenericReconcile(ctx, r.Managed, &r.Client, r.serviceMonitor, serviceMonitor, &report, helper.ServiceMonitorChanged); err != nil {
return err
}
}
if r.AvailableAPIs.HasPromRule() {
if r.ClusterInfo.HasPromRule() {
promRules := builder.generic.prometheusRule()
if err := reconcilers.GenericReconcile(ctx, r.Managed, &r.Client, r.prometheusRule, promRules, &report, helper.PrometheusRuleChanged); err != nil {
return err
Expand Down Expand Up @@ -194,7 +194,7 @@ func (r *monolithReconciler) reconcilePermissions(ctx context.Context, builder *
return r.CreateOwned(ctx, builder.serviceAccount())
} // We only configure name, update is not needed for now

cr := buildClusterRoleIngester(r.UseOpenShiftSCC)
cr := buildClusterRoleIngester(r.ClusterInfo.IsOpenShift())
if err := r.ReconcileClusterRole(ctx, cr); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 0a5e298

Please sign in to comment.