Skip to content

Commit

Permalink
updating pubsub system
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Jan 24, 2025
1 parent d53f33f commit 1f2905e
Show file tree
Hide file tree
Showing 29 changed files with 844 additions and 850 deletions.
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/drivers/k8scel"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
Expand Down Expand Up @@ -435,7 +435,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.

mutationSystem := mutation.NewSystem(mutationOpts)
expansionSystem := expansion.NewSystem(mutationSystem)
pubsubSystem := pubsub.NewSystem()
exportSystem := export.NewSystem()

c := mgr.GetCache()
dc, ok := c.(watch.RemovableCache)
Expand Down Expand Up @@ -508,7 +508,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.
MutationSystem: mutationSystem,
ExpansionSystem: expansionSystem,
ProviderCache: providerCache,
PubsubSystem: pubsubSystem,
ExportSystem: exportSystem,
}

if err := controller.AddToManager(mgr, &opts); err != nil {
Expand Down Expand Up @@ -538,7 +538,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.
ProcessExcluder: processExcluder,
CacheLister: auditCache,
ExpansionSystem: expansionSystem,
PubSubSystem: pubsubSystem,
ExportSystem: exportSystem,
}
if err := audit.AddToManager(mgr, &auditDeps); err != nil {
setupLog.Error(err, "unable to register audit with the manager")
Expand Down
4 changes: 2 additions & 2 deletions pkg/audit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

Expand All @@ -25,7 +25,7 @@ type Dependencies struct {
ProcessExcluder *process.Excluder
CacheLister *CacheLister
ExpansionSystem *expansion.System
PubSubSystem *pubsub.System
ExportSystem *export.System
}

// AddToManager adds audit manager to the Manager.
Expand Down
18 changes: 9 additions & 9 deletions pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
pubsubController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -91,7 +91,7 @@ type Manager struct {
auditCache *CacheLister

expansionSystem *expansion.System
pubsubSystem *pubsub.System
exportSystem *export.System
}

// StatusViolation represents each violation under status.
Expand All @@ -107,7 +107,7 @@ type StatusViolation struct {
}

// ConstraintMsg represents publish message for each constraint.
type PubsubMsg struct {
type ExportMsg struct {
ID string `json:"id,omitempty"`
Details interface{} `json:"details,omitempty"`
EventType string `json:"eventType,omitempty"`
Expand Down Expand Up @@ -269,7 +269,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) {
gkNamespace: util.GetNamespace(),
auditCache: deps.CacheLister,
expansionSystem: deps.ExpansionSystem,
pubsubSystem: deps.PubSubSystem,
exportSystem: deps.ExportSystem,
}
return am, nil
}
Expand Down Expand Up @@ -902,10 +902,10 @@ func (am *Manager) addAuditResponsesToUpdateLists(
details := r.Metadata["details"]
labels := r.obj.GetLabels()
logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels)
if *pubsubController.PubsubEnabled {
err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if *exportController.ExportEnabled {
err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "pubsub audit Publishing")
am.log.Error(err, "error exporting audit violation")
}
}
if *emitAuditEvents {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func violationMsg(constraint *unstructured.Unstructured, enforcementAction util.
userConstraintAnnotations := constraint.GetAnnotations()
delete(userConstraintAnnotations, "kubectl.kubernetes.io/last-applied-configuration")

return PubsubMsg{
return ExportMsg{
Message: message,
Details: details,
ID: timestamp,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/add_pubsub.go → pkg/controller/add_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ limitations under the License.
package controller

import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
)

func init() {
Injectors = append(Injectors, &pubsub.Adder{})
Injectors = append(Injectors, &export.Adder{})
}
12 changes: 6 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/fakes"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
Expand All @@ -56,8 +56,8 @@ type GetPodInjector interface {
InjectGetPod(func(context.Context) (*corev1.Pod, error))
}

type PubsubInjector interface {
InjectPubsubSystem(pubsubSystem *pubsub.System)
type ExportInjector interface {
InjectExportSystem(exportSystem *export.System)
}

type DataClientInjector interface {
Expand Down Expand Up @@ -101,7 +101,7 @@ type Dependencies struct {
MutationSystem *mutation.System
ExpansionSystem *expansion.System
ProviderCache *externaldata.ProviderCache
PubsubSystem *pubsub.System
ExportSystem *export.System
SyncEventsCh chan event.GenericEvent
CacheMgr *cm.CacheManager
}
Expand Down Expand Up @@ -212,8 +212,8 @@ func AddToManager(m manager.Manager, deps *Dependencies) error {
if a2, ok := a.(GetPodInjector); ok {
a2.InjectGetPod(deps.GetPod)
}
if a2, ok := a.(PubsubInjector); ok {
a2.InjectPubsubSystem(deps.PubsubSystem)
if a2, ok := a.(ExportInjector); ok {
a2.InjectExportSystem(deps.ExportSystem)
}
if a2, ok := a.(CacheManagerInjector); ok {
// this is used by the config controller to sync
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package pubsub
package export

import (
"context"
"encoding/json"
"flag"
"fmt"

"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
corev1 "k8s.io/api/core/v1"
Expand All @@ -25,36 +25,36 @@ import (
)

var (
PubsubEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
ExportEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
log = logf.Log.WithName("controller").WithValues(logging.Process, "pubsub_controller")
)

type Adder struct {
PubsubSystem *pubsub.System
ExportSystem *export.System
}

func (a *Adder) Add(mgr manager.Manager) error {
if !*PubsubEnabled {
if !*ExportEnabled {
return nil
}
log.Info("Warning: Alpha flag enable-pub-sub is set to true. This flag may change in the future.")
r := newReconciler(mgr, a.PubsubSystem)
r := newReconciler(mgr, a.ExportSystem)
return add(mgr, r)
}

func (a *Adder) InjectTracker(_ *readiness.Tracker) {}

func (a *Adder) InjectPubsubSystem(pubsubSystem *pubsub.System) {
a.PubsubSystem = pubsubSystem
func (a *Adder) InjectExportSystem(exportSystem *export.System) {
a.ExportSystem = exportSystem
}

type Reconciler struct {
client.Client
scheme *runtime.Scheme
system *pubsub.System
system *export.System
}

func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
func newReconciler(mgr manager.Manager, system *export.System) *Reconciler {
return &Reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
Expand All @@ -63,7 +63,7 @@ func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("pubsub-config-controller", mgr, controller.Options{Reconciler: r})
c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
Expand Down Expand Up @@ -111,22 +111,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
}

if len(cfg.Data) == 0 {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName))
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName))
}
if _, ok := cfg.Data["provider"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName))
if _, ok := cfg.Data["driver"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in configmap %s, unable to establish connection", request.NamespacedName))
}
var config interface{}
err = json.Unmarshal([]byte(cfg.Data["config"]), &config)
if err != nil {
return reconcile.Result{}, err
}

err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"])
err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["driver"])
if err != nil {
return reconcile.Result{}, err
}

log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"])
log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"])
return reconcile.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pubsub
package export

import (
"context"
"flag"
"fmt"
"testing"

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestReconcile(t *testing.T) {
},
},
wantErr: true,
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName),
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName),
},
}
for _, tc := range testCases {
Expand Down
88 changes: 88 additions & 0 deletions pkg/export/dapr/dapr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package dapr

import (
"context"
"encoding/json"
"fmt"

daprClient "github.com/dapr/go-sdk/client"
)

type Connection struct {
// Name of the component object to use in Dapr
component string

client daprClient.Client
}

// Dapr represents driver to use Dapr.
type Dapr struct {
openConnections map[string]Connection
}

const (
Name = "dapr"
)

var Connections = &Dapr{
openConnections: make(map[string]Connection),
}

func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{}, topic string) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

conn, ok := r.openConnections[connectionName]
if !ok {
return fmt.Errorf("connection not found: %s for Dapr driver", connectionName)
}
err = conn.client.PublishEvent(context.Background(), conn.component, topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}

return nil
}

func (r *Dapr) CloseConnection(connectionName string) error {
delete(r.openConnections, connectionName)
return nil
}

func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
component, ok := cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}
conn := r.openConnections[connectionName]
conn.component = component
r.openConnections[connectionName] = conn
return nil
}

func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error {
var conn Connection
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
conn.component, ok = cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}

tmp, err := daprClient.NewClient()
if err != nil {
return err
}

conn.client = tmp
r.openConnections[connectionName] = conn
return nil
}
Loading

0 comments on commit 1f2905e

Please sign in to comment.