Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactoring to remove pubsub flags to improve experience #3339

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,6 @@ ifeq ($(ENABLE_PUBSUB),true)
--set disabledBuiltins={http.send} \
--set logMutations=true \
--set audit.enablePubsub=${ENABLE_PUBSUB} \
--set audit.connection=${AUDIT_CONNECTION} \
--set audit.channel=${AUDIT_CHANNEL} \
--set-string auditPodAnnotations.dapr\\.io/enabled=true \
--set-string auditPodAnnotations.dapr\\.io/app-id=audit \
--set-string auditPodAnnotations.dapr\\.io/metrics-port=9999 \
Expand Down
2 changes: 0 additions & 2 deletions cmd/build/helmify/replacements.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ var replacements = map[string]string{

"- HELMSUBST_PUBSUB_ARGS": `{{ if .Values.audit.enablePubsub}}
- --enable-pub-sub={{ .Values.audit.enablePubsub }}
- --audit-connection={{ .Values.audit.connection }}
- --audit-channel={{ .Values.audit.channel }}
{{- end }}`,

"HELMSUBST_MUTATING_WEBHOOK_FAILURE_POLICY": `{{ .Values.mutatingWebhookFailurePolicy }}`,
Expand Down
2 changes: 0 additions & 2 deletions cmd/build/helmify/static/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ controllerManager:
# - ipBlock:
# cidr: 0.0.0.0/0
audit:
enablePubsub: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you removed the wrong value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yip! I will update that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

connection: audit-connection
channel: audit-channel
hostNetwork: false
dnsPolicy: ClusterFirst
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ spec:
- --operation=status
{{ if .Values.audit.enablePubsub}}
- --enable-pub-sub={{ .Values.audit.enablePubsub }}
- --audit-connection={{ .Values.audit.connection }}
- --audit-channel={{ .Values.audit.channel }}
{{- end }}
{{ if not .Values.disableMutation}}- --operation=mutation-status{{- end }}
- --logtostderr
Expand Down
2 changes: 0 additions & 2 deletions manifest_staging/charts/gatekeeper/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ controllerManager:
# - ipBlock:
# cidr: 0.0.0.0/0
audit:
enablePubsub: false
connection: audit-connection
channel: audit-channel
hostNetwork: false
dnsPolicy: ClusterFirst
Expand Down
4 changes: 1 addition & 3 deletions pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ var (
auditEventsInvolvedNamespace = flag.Bool("audit-events-involved-namespace", false, "emit audit events for each violation in the involved objects namespace, the default (false) generates events in the namespace Gatekeeper is installed in. Audit events from cluster-scoped resources will still follow the default behavior")
auditMatchKindOnly = flag.Bool("audit-match-kind-only", false, "only use kinds specified in all constraints for auditing cluster resources. if kind is not specified in any of the constraints, it will audit all resources (same as setting this flag to false)")
apiCacheDir = flag.String("api-cache-dir", defaultAPICacheDir, "The directory where audit from api server cache are stored, defaults to /tmp/audit")
auditConnection = flag.String("audit-connection", defaultConnection, "Connection name for publishing audit violation messages. Defaults to audit-connection")
auditChannel = flag.String("audit-channel", defaultChannel, "Channel name for publishing audit violation messages. Defaults to audit-channel")
emptyAuditResults = newLimitQueue(0)
logStatsAudit = flag.Bool("log-stats-audit", false, "(alpha) log stats metrics for the audit run")
)
Expand Down Expand Up @@ -901,7 +899,7 @@ func (am *Manager) addAuditResponsesToUpdateLists(
labels := r.obj.GetLabels()
logViolation(am.log, constraint, ea, gvk, namespace, name, msg, details, labels)
if *pubsubController.PubsubEnabled {
err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, gvk, namespace, name, msg, details, labels, timestamp))
err := am.pubsubSystem.Publish(context.Background(), violationMsg(constraint, ea, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "pubsub audit Publishing")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pubsub/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// PubSub is the interface that wraps pubsub methods.
type Connection interface {
// Publish single message over a specific topic/channel
Publish(ctx context.Context, data interface{}, topic string) error
Publish(ctx context.Context, data interface{}) error

// Close connections
CloseConnection() error
Expand Down
20 changes: 18 additions & 2 deletions pkg/pubsub/dapr/dapr.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
type ClientConfig struct {
// Name of the component to be used for pub sub messaging
Component string `json:"component"`

// Topic where the messages would be published for the connection
Topic string `json:"topic"`
}

// Dapr represents driver for interacting with pub sub using dapr.
Expand All @@ -21,19 +24,22 @@ type Dapr struct {

// Name of the pubsub component
pubSubComponent string

// Topic where the messages would be published for the connection
topic string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What part of the connection/channel config should be specify-able by downstream users (e.g. constraint authors) and what should be owned by the author of the constraint objects?

We should think about the personas who would be interacting with these knobs. Are the people setting up the infra always the same people writing the policy?

}

const (
Name = "dapr"
)

func (r *Dapr) Publish(_ context.Context, data interface{}, topic string) error {
func (r *Dapr) Publish(_ context.Context, data interface{}) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

err = r.client.PublishEvent(context.Background(), r.pubSubComponent, topic, jsonData)
err = r.client.PublishEvent(context.Background(), r.pubSubComponent, r.topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}
Expand All @@ -56,6 +62,11 @@ func (r *Dapr) UpdateConnection(_ context.Context, config interface{}) error {
return fmt.Errorf("failed to get value of component")
}
r.pubSubComponent = cfg.Component
cfg.Topic, ok = m["topic"].(string)
if !ok {
return fmt.Errorf("failed to get value of topic")
}
r.topic = cfg.Topic
return nil
}

Expand All @@ -70,6 +81,10 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection
if !ok {
return nil, fmt.Errorf("failed to get value of component")
}
cfg.Topic, ok = m["topic"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of topic")
}

tmp, err := daprClient.NewClient()
if err != nil {
Expand All @@ -79,5 +94,6 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection
return &Dapr{
client: tmp,
pubSubComponent: cfg.Component,
topic: cfg.Topic,
}, nil
}
25 changes: 6 additions & 19 deletions pkg/pubsub/dapr/dapr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func TestDapr_Publish(t *testing.T) {
ctx := context.Background()

type args struct {
ctx context.Context
data interface{}
topic string
ctx context.Context
data interface{}
}

tests := []struct {
Expand All @@ -72,35 +71,22 @@ func TestDapr_Publish(t *testing.T) {
data: map[string]interface{}{
"test": "test",
},
topic: "test",
},
wantErr: false,
},
{
name: "test publish without data",
args: args{
ctx: ctx,
data: nil,
topic: "test",
ctx: ctx,
data: nil,
},
wantErr: false,
},
{
name: "test publish without topic",
args: args{
ctx: ctx,
data: map[string]interface{}{
"test": "test",
},
topic: "",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := testClient
if err := r.Publish(tt.args.ctx, tt.args.data, tt.args.topic); (err != nil) != tt.wantErr {
if err := r.Publish(tt.args.ctx, tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("Dapr.Publish() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand All @@ -117,6 +103,7 @@ func TestDapr_UpdateConnection(t *testing.T) {
name: "test update connection",
config: map[string]interface{}{
"component": "foo",
"topic": "bar",
},
wantErr: false,
},
Expand Down
11 changes: 10 additions & 1 deletion pkg/pubsub/dapr/fake_dapr_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func FakeConnection() (connection.Connection, func()) {
return &Dapr{
client: c,
pubSubComponent: "test",
topic: "test",
}, f
}

Expand All @@ -338,11 +339,14 @@ type FakeDapr struct {
// Name of the pubsub component
pubSubComponent string

// Name of the topic
topic string

// closing function
f func()
}

func (r *FakeDapr) Publish(_ context.Context, _ interface{}, _ string) error {
func (r *FakeDapr) Publish(_ context.Context, _ interface{}) error {
return nil
}

Expand Down Expand Up @@ -376,12 +380,17 @@ func FakeNewConnection(ctx context.Context, config interface{}) (connection.Conn
if !ok {
return nil, fmt.Errorf("failed to get value of component")
}
cfg.Topic, ok = m["topic"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of topic")
}

c, f := getTestClient(ctx)

return &FakeDapr{
client: c,
pubSubComponent: cfg.Component,
f: f,
topic: cfg.Topic,
}, nil
}
18 changes: 11 additions & 7 deletions pkg/pubsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"

Expand All @@ -19,16 +20,19 @@ func NewSystem() *System {
return &System{}
}

func (s *System) Publish(_ context.Context, connection string, topic string, msg interface{}) error {
func (s *System) Publish(ctx context.Context, msg interface{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still looking at supporting multiple queues simultaneously? Possibly specifying which connection/topic gets pushed to as part of a constraint's enforcement action?

Example: send violation X to security queue, violation Y to ops queue?

If so, Publish() will need to support something more fine-grained than "broadcast everywhere"

s.mux.RLock()
defer s.mux.RUnlock()
if len(s.connections) > 0 {
if c, ok := s.connections[connection]; ok {
return c.Publish(context.Background(), msg, topic)
}
return fmt.Errorf("connection is not initialized, name: %s ", connection)
var errs error

if len(s.connections) == 0 {
return fmt.Errorf("no connections are established")
}

for _, c := range s.connections {
errs = errors.Join(errs, c.Publish(ctx, msg))
}
return fmt.Errorf("No connections are established")
return errs
}

func (s *System) UpsertConnection(ctx context.Context, config interface{}, name string, provider string) error {
Expand Down
15 changes: 5 additions & 10 deletions pkg/pubsub/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestMain(m *testing.M) {
cfg := map[string]interface{}{
dapr.Name: map[string]interface{}{
"component": "pubsub",
"topic": "audit",
},
}
for name, fakeConn := range tmp {
Expand Down Expand Up @@ -90,6 +91,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "pubsub",
"topic": "test",
},
name: "dapr",
provider: "dapr",
Expand All @@ -111,6 +113,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "pubsub",
"topic": "test",
},
name: "audit",
provider: "test",
Expand All @@ -133,6 +136,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
ctx: context.Background(),
config: map[string]interface{}{
"component": "test",
"topic": "audit",
},
name: "audit",
provider: "dapr",
Expand Down Expand Up @@ -222,15 +226,6 @@ func TestSystem_Publish(t *testing.T) {
args: args{ctx: context.Background(), connection: "audit", topic: "test", msg: nil},
wantErr: true,
},
{
name: "Publishing to a connection that does not exist",
fields: fields{
connections: map[string]connection.Connection{"audit": &dapr.Dapr{}},
providers: map[string]string{"audit": "dapr"},
},
args: args{ctx: context.Background(), connection: "test", topic: "test", msg: nil},
wantErr: true,
},
{
name: "Publishing to a connection that does exist",
fields: fields{
Expand All @@ -248,7 +243,7 @@ func TestSystem_Publish(t *testing.T) {
connections: tt.fields.connections,
providers: tt.fields.providers,
}
if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr {
if err := s.Publish(tt.args.ctx, tt.args.msg); (err != nil) != tt.wantErr {
t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
3 changes: 2 additions & 1 deletion test/pubsub/publish-components.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ data:
provider: "dapr"
config: |
{
"component": "pubsub"
"component": "pubsub",
"topic": "audit-channel"
}
17 changes: 9 additions & 8 deletions website/docs/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Install prerequisites such as a pubsub tool, a message broker etc.

### Setting up audit with pubsub enabled

In the audit deployment, set the `--enable-pub-sub` flag to `true` to publish audit violations. Additionally, use `--audit-connection` (defaults to `audit-connection`) and `--audit-channel`(defaults to `audit-channel`) flags to allow audit to publish violations using desired connection onto desired channel. `--audit-connection` must be set to the name of the connection config, and `--audit-channel` must be set to name of the channel where violations should get published.
In the audit deployment, set the `--enable-pub-sub` flag to `true` to publish audit violations.

A ConfigMap that contains `provider` and `config` fields in `data` is required to establish connection for sending violations over the channel. Following is an example ConfigMap to establish a connection that uses Dapr to publish messages:

Expand All @@ -33,7 +33,8 @@ data:
provider: "dapr"
config: |
{
"component": "pubsub"
"component": "pubsub",
"topic": "audit-channel"
}
```

Expand Down Expand Up @@ -125,6 +126,9 @@ Dapr: https://dapr.io/
- name: go-sub
image: fake-subscriber:latest
imagePullPolicy: Never
env:
- name: AUDIT_CHANNEL
Copy link
Member

@ritazh ritazh Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is consuming this env var? it looks like it is no longer being used and got removed here: https://github.com/open-policy-agent/gatekeeper/pull/3339/files#diff-76ed074a9305c04054cdebb9e9aad2d818052b07091de1f20cad0bbac34ffb52L228

value: "audit-channel"
```

> [!IMPORTANT]
Expand Down Expand Up @@ -156,15 +160,13 @@ Dapr: https://dapr.io/
EOF
```

2. To upgrade or install Gatekeeper with `--enable-pub-sub` set to `true`, `--audit-connection` set to `audit-connection`, `--audit-channel` set to `audit-channel` on audit pod.
2. To upgrade or install Gatekeeper with `--enable-pub-sub` set to `true` on audit pod.

```shell
# auditPodAnnotations is used to add annotations required by Dapr to inject sidecar to audit pod
echo 'auditPodAnnotations: {dapr.io/enabled: "true", dapr.io/app-id: "audit", dapr.io/metrics-port: "9999", dapr.io/sidecar-seccomp-profile-type: "RuntimeDefault"}' > /tmp/annotations.yaml
helm upgrade --install gatekeeper gatekeeper/gatekeeper --namespace gatekeeper-system \
--set audit.enablePubsub=true \
--set audit.connection=audit-connection \
--set audit.channel=audit-channel \
--values /tmp/annotations.yaml
```

Expand All @@ -183,13 +185,12 @@ Dapr: https://dapr.io/
provider: "dapr"
config: |
{
"component": "pubsub"
"component": "pubsub",
"topic": "audit-channel"
}
EOF
```

**Note:** Name of the connection configMap must match the value of `--audit-connection` for it to be used by audit to publish violation. At the moment, only one connection config can exists for audit.

4. Create the constraint templates and constraints, and make sure audit ran by checking constraints. If constraint status is updated with information such as `auditTimeStamp` or `totalViolations`, then audit has ran at least once. Additionally, populated `TOTAL-VIOLATIONS` field for all constraints while listing constraints also indicates that audit has ran at least once.

```log
Expand Down
Loading