From f0696c2f4f6d14be04e75cf07141da7c14efff74 Mon Sep 17 00:00:00 2001 From: Frederico Bittencourt Date: Wed, 26 Jul 2023 14:34:48 +0200 Subject: [PATCH] ROX-17157: Fix reconciliation for offline mode for Kubernetes Listener (#7032) --- .../eventpipeline/component/component.go | 8 ++ .../component/mocks/component.go | 64 ++++++++++ sensor/kubernetes/eventpipeline/pipeline.go | 2 +- .../kubernetes/eventpipeline/pipeline_impl.go | 27 +++- .../kubernetes/eventpipeline/pipeline_test.go | 115 ++++++++++++++++-- sensor/kubernetes/listener/listener.go | 2 +- sensor/kubernetes/listener/listener_impl.go | 21 ++++ .../listener/resource_event_handler.go | 65 +++++----- .../listener/resource_event_handler_impl.go | 6 +- .../resource_event_handler_impl_test.go | 48 ++++++++ .../k8sreconciliation/reconcile_test.go | 40 +++++- .../yaml/netpol-block-egress.yaml | 11 ++ .../k8sreconciliation/yaml/nginx.yaml | 3 +- .../k8sreconciliation/yaml/nginx2.yaml | 9 +- .../k8sreconciliation/yaml/nginx3.yaml | 22 ++++ sensor/tests/helper/helper.go | 56 ++++++++- 16 files changed, 444 insertions(+), 55 deletions(-) create mode 100644 sensor/tests/connection/k8sreconciliation/yaml/netpol-block-egress.yaml create mode 100644 sensor/tests/connection/k8sreconciliation/yaml/nginx3.yaml diff --git a/sensor/kubernetes/eventpipeline/component/component.go b/sensor/kubernetes/eventpipeline/component/component.go index 5b60df5468234..c363623b61528 100644 --- a/sensor/kubernetes/eventpipeline/component/component.go +++ b/sensor/kubernetes/eventpipeline/component/component.go @@ -1,6 +1,8 @@ package component import ( + "context" + "github.com/stackrox/rox/sensor/common/message" ) @@ -26,3 +28,9 @@ type OutputQueue interface { Send(event *ResourceEvent) ResponsesC() <-chan *message.ExpiringMessage } + +// ContextListener is a component that listens but has a context in the messages +type ContextListener interface { + PipelineComponent + StartWithContext(context.Context) error +} diff --git a/sensor/kubernetes/eventpipeline/component/mocks/component.go b/sensor/kubernetes/eventpipeline/component/mocks/component.go index b89024f02aa57..ee90b62542cc6 100644 --- a/sensor/kubernetes/eventpipeline/component/mocks/component.go +++ b/sensor/kubernetes/eventpipeline/component/mocks/component.go @@ -5,6 +5,7 @@ package mocks import ( + context "context" reflect "reflect" message "github.com/stackrox/rox/sensor/common/message" @@ -196,3 +197,66 @@ func (mr *MockOutputQueueMockRecorder) Stop(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockOutputQueue)(nil).Stop), arg0) } + +// MockContextListener is a mock of ContextListener interface. +type MockContextListener struct { + ctrl *gomock.Controller + recorder *MockContextListenerMockRecorder +} + +// MockContextListenerMockRecorder is the mock recorder for MockContextListener. +type MockContextListenerMockRecorder struct { + mock *MockContextListener +} + +// NewMockContextListener creates a new mock instance. +func NewMockContextListener(ctrl *gomock.Controller) *MockContextListener { + mock := &MockContextListener{ctrl: ctrl} + mock.recorder = &MockContextListenerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockContextListener) EXPECT() *MockContextListenerMockRecorder { + return m.recorder +} + +// Start mocks base method. +func (m *MockContextListener) Start() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Start") + ret0, _ := ret[0].(error) + return ret0 +} + +// Start indicates an expected call of Start. +func (mr *MockContextListenerMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockContextListener)(nil).Start)) +} + +// StartWithContext mocks base method. +func (m *MockContextListener) StartWithContext(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartWithContext", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// StartWithContext indicates an expected call of StartWithContext. +func (mr *MockContextListenerMockRecorder) StartWithContext(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartWithContext", reflect.TypeOf((*MockContextListener)(nil).StartWithContext), arg0) +} + +// Stop mocks base method. +func (m *MockContextListener) Stop(arg0 error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop", arg0) +} + +// Stop indicates an expected call of Stop. +func (mr *MockContextListenerMockRecorder) Stop(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockContextListener)(nil).Stop), arg0) +} diff --git a/sensor/kubernetes/eventpipeline/pipeline.go b/sensor/kubernetes/eventpipeline/pipeline.go index 492e300319f5c..b2e3fe072f729 100644 --- a/sensor/kubernetes/eventpipeline/pipeline.go +++ b/sensor/kubernetes/eventpipeline/pipeline.go @@ -24,7 +24,7 @@ import ( func New(client client.Interface, configHandler config.Handler, detector detector.Detector, reprocessor reprocessor.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, storeProvider *resources.InMemoryStoreProvider, queueSize int) common.SensorComponent { outputQueue := output.New(detector, queueSize) var depResolver component.Resolver - var resourceListener component.PipelineComponent + var resourceListener component.ContextListener if env.ResyncDisabled.BooleanSetting() { depResolver = resolver.New(outputQueue, storeProvider, queueSize) resourceListener = listener.New(client, configHandler, nodeName, resyncPeriod, traceWriter, depResolver, storeProvider) diff --git a/sensor/kubernetes/eventpipeline/pipeline_impl.go b/sensor/kubernetes/eventpipeline/pipeline_impl.go index 3f68d37b99f4e..4e5599afd6477 100644 --- a/sensor/kubernetes/eventpipeline/pipeline_impl.go +++ b/sensor/kubernetes/eventpipeline/pipeline_impl.go @@ -1,6 +1,7 @@ package eventpipeline import ( + "context" "sync/atomic" "github.com/pkg/errors" @@ -10,6 +11,7 @@ import ( "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/env" "github.com/stackrox/rox/pkg/logging" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/sensor/common" "github.com/stackrox/rox/sensor/common/detector" "github.com/stackrox/rox/sensor/common/message" @@ -25,7 +27,7 @@ var ( type eventPipeline struct { output component.OutputQueue resolver component.Resolver - listener component.PipelineComponent + listener component.ContextListener detector detector.Detector reprocessor reprocessor.Handler @@ -33,6 +35,10 @@ type eventPipeline struct { eventsC chan *message.ExpiringMessage stopSig concurrency.Signal + + contextMtx sync.Mutex + context context.Context + cancelContext context.CancelFunc } // Capabilities implements common.SensorComponent @@ -64,6 +70,20 @@ func (p *eventPipeline) ResponsesC() <-chan *message.ExpiringMessage { return p.eventsC } +func (p *eventPipeline) stopCurrentContext() { + p.contextMtx.Lock() + defer p.contextMtx.Unlock() + if p.cancelContext != nil { + p.cancelContext() + } +} + +func (p *eventPipeline) createNewContext() { + p.contextMtx.Lock() + defer p.contextMtx.Unlock() + p.context, p.cancelContext = context.WithCancel(context.Background()) +} + // Start implements common.SensorComponent func (p *eventPipeline) Start() error { // The order is important here, we need to start the components @@ -102,13 +122,16 @@ func (p *eventPipeline) Notify(event common.SensorComponentEvent) { // Start listening to events if not yet listening if p.offlineMode.CompareAndSwap(true, false) { log.Infof("Connection established: Starting Kubernetes listener") - if err := p.listener.Start(); err != nil { + // TODO(ROX-18613): use contextProvider to provide context for listener + p.createNewContext() + if err := p.listener.StartWithContext(p.context); err != nil { log.Fatalf("Failed to start listener component. Sensor cannot run without listening to Kubernetes events: %s", err) } } case common.SensorComponentEventOfflineMode: // Stop listening to events if p.offlineMode.CompareAndSwap(false, true) { + p.stopCurrentContext() p.listener.Stop(errors.New("gRPC connection stopped")) } } diff --git a/sensor/kubernetes/eventpipeline/pipeline_test.go b/sensor/kubernetes/eventpipeline/pipeline_test.go index 282f483f3845e..bca0ff7802046 100644 --- a/sensor/kubernetes/eventpipeline/pipeline_test.go +++ b/sensor/kubernetes/eventpipeline/pipeline_test.go @@ -1,12 +1,14 @@ package eventpipeline import ( + "sync/atomic" "testing" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/sync" + "github.com/stackrox/rox/sensor/common" mockDetector "github.com/stackrox/rox/sensor/common/detector/mocks" "github.com/stackrox/rox/sensor/common/message" mockReprocessor "github.com/stackrox/rox/sensor/common/reprocessor/mocks" @@ -24,7 +26,11 @@ type eventPipelineSuite struct { resolver *mockComponent.MockResolver detector *mockDetector.MockDetector reprocessor *mockReprocessor.MockHandler + listener *mockComponent.MockContextListener + outputQueue *mockComponent.MockOutputQueue pipeline *eventPipeline + + outputC chan *message.ExpiringMessage } var _ suite.SetupTestSuite = &eventPipelineSuite{} @@ -38,28 +44,123 @@ func (s *eventPipelineSuite) TearDownTest() { s.T().Cleanup(s.mockCtrl.Finish) } -type mockListener struct{} - -func (m *mockListener) Start() error { return nil } -func (m *mockListener) Stop(_ error) {} - func (s *eventPipelineSuite) SetupTest() { s.mockCtrl = gomock.NewController(s.T()) s.resolver = mockComponent.NewMockResolver(s.mockCtrl) s.detector = mockDetector.NewMockDetector(s.mockCtrl) s.reprocessor = mockReprocessor.NewMockHandler(s.mockCtrl) + s.listener = mockComponent.NewMockContextListener(s.mockCtrl) + s.outputQueue = mockComponent.NewMockOutputQueue(s.mockCtrl) + + offlineMode := atomic.Bool{} + offlineMode.Store(true) + s.pipeline = &eventPipeline{ eventsC: make(chan *message.ExpiringMessage), stopSig: concurrency.NewSignal(), - output: mockComponent.NewMockOutputQueue(s.mockCtrl), + output: s.outputQueue, resolver: s.resolver, detector: s.detector, reprocessor: s.reprocessor, - listener: &mockListener{}, + listener: s.listener, + offlineMode: &offlineMode, } } +func (s *eventPipelineSuite) write() { + s.outputC <- message.NewExpiring(s.pipeline.context, nil) +} + +func (s *eventPipelineSuite) online() { + s.pipeline.Notify(common.SensorComponentEventCentralReachable) +} + +func (s *eventPipelineSuite) offline() { + s.pipeline.Notify(common.SensorComponentEventOfflineMode) +} + +func (s *eventPipelineSuite) readSuccess() { + msg, more := <-s.pipeline.ResponsesC() + s.Assert().True(more, "channel should be open") + s.Assert().False(msg.IsExpired(), "message should not be expired") +} + +func (s *eventPipelineSuite) readExpired() { + msg, more := <-s.pipeline.ResponsesC() + s.Assert().True(more, "channel should be open") + s.Assert().True(msg.IsExpired(), "message should be expired") +} + +func (s *eventPipelineSuite) Test_OfflineModeCases() { + s.T().Setenv("ROX_RESYNC_DISABLED", "true") + + outputC := make(chan *message.ExpiringMessage, 10) + s.outputQueue.EXPECT().ResponsesC(). + AnyTimes().Return(outputC) + s.outputC = outputC + + s.outputQueue.EXPECT().Start().Times(1) + s.resolver.EXPECT().Start().Times(1) + s.listener.EXPECT().StartWithContext(gomock.Any()).AnyTimes() + s.listener.EXPECT().Stop(gomock.Any()).AnyTimes() + + s.Require().NoError(s.pipeline.Start()) + s.pipeline.Notify(common.SensorComponentEventCentralReachable) + + testCases := map[string][]func(){ + "Base case: Start, WA, WB, RA, RB, Disconnect": {s.online, s.write, s.write, s.readSuccess, s.readSuccess, s.offline}, + "Case: Start, WA, WB, Disconnect, RA, RB, Reconnect": {s.write, s.write, s.offline, s.readExpired, s.readExpired, s.online}, + "Case: Start, WA, WB, Disconnect, Reconnect, RA, RB": {s.write, s.write, s.offline, s.online, s.readExpired, s.readExpired}, + "Case: Start, WA, Disconnect, WB, Reconnect, RA, RB": {s.write, s.offline, s.write, s.online, s.readExpired, s.readExpired}, + "Case: Start, WA, Disconnect, Reconnect, WB, RA, RB": {s.write, s.offline, s.online, s.write, s.readExpired, s.readSuccess}, + "Case: Start, Disconnect, WA, Reconnect, WB, RA, RB": {s.offline, s.write, s.online, s.write, s.readExpired, s.readSuccess}, + "Case: Start, Disconnect, Reconnect, WA, WB, RA, RB": {s.offline, s.online, s.write, s.write, s.readSuccess, s.readSuccess}, + } + + for caseName, orderedFunctions := range testCases { + s.Run(caseName, func() { + for _, fn := range orderedFunctions { + fn() + } + }) + } +} + +func (s *eventPipelineSuite) Test_OfflineMode() { + s.T().Setenv("ROX_RESYNC_DISABLED", "true") + + outputC := make(chan *message.ExpiringMessage, 10) + s.outputQueue.EXPECT().ResponsesC(). + AnyTimes().Return(outputC) + + s.outputQueue.EXPECT().Start().Times(1) + s.resolver.EXPECT().Start().Times(1) + + // Expect listener to be reset (i.e. started twice and stopped once) + s.listener.EXPECT().StartWithContext(gomock.Any()).Times(2) + s.listener.EXPECT().Stop(gomock.Any()).Times(1) + + s.Require().NoError(s.pipeline.Start()) + s.pipeline.Notify(common.SensorComponentEventCentralReachable) + + outputC <- message.NewExpiring(s.pipeline.context, nil) + outputC <- message.NewExpiring(s.pipeline.context, nil) + + // Read message A + msgA, more := <-s.pipeline.ResponsesC() + s.Require().True(more, "should have more messages in ResponsesC") + s.Assert().False(msgA.IsExpired(), "context should not be expired") + + s.pipeline.Notify(common.SensorComponentEventOfflineMode) + s.pipeline.Notify(common.SensorComponentEventCentralReachable) + + // Read message B + msgB, more := <-s.pipeline.ResponsesC() + s.Require().True(more, "should have more messages in ResponsesC") + s.Assert().True(msgB.IsExpired(), "context should be expired") +} + func (s *eventPipelineSuite) Test_ReprocessDeployments() { s.T().Setenv("ROX_RESYNC_DISABLED", "true") messageReceived := sync.WaitGroup{} diff --git a/sensor/kubernetes/listener/listener.go b/sensor/kubernetes/listener/listener.go index 71771df31aeab..8f4e49c19dbcc 100644 --- a/sensor/kubernetes/listener/listener.go +++ b/sensor/kubernetes/listener/listener.go @@ -20,7 +20,7 @@ var ( ) // New returns a new kubernetes listener. -func New(client client.Interface, configHandler config.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, queue component.Resolver, storeProvider *resources.InMemoryStoreProvider) component.PipelineComponent { +func New(client client.Interface, configHandler config.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, queue component.Resolver, storeProvider *resources.InMemoryStoreProvider) component.ContextListener { k := &listenerImpl{ client: client, stopSig: concurrency.NewSignal(), diff --git a/sensor/kubernetes/listener/listener_impl.go b/sensor/kubernetes/listener/listener_impl.go index 2898b5ea980fe..c445edf9ee9a0 100644 --- a/sensor/kubernetes/listener/listener_impl.go +++ b/sensor/kubernetes/listener/listener_impl.go @@ -1,10 +1,14 @@ package listener import ( + "context" + "errors" "io" "time" + "github.com/stackrox/rox/pkg/buildinfo" "github.com/stackrox/rox/pkg/concurrency" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/sensor/common/awscredentials" "github.com/stackrox/rox/sensor/common/config" "github.com/stackrox/rox/sensor/kubernetes/client" @@ -29,9 +33,25 @@ type listenerImpl struct { traceWriter io.Writer outputQueue component.Resolver storeProvider *resources.InMemoryStoreProvider + context context.Context + contextMtx sync.Mutex +} + +func (k *listenerImpl) StartWithContext(ctx context.Context) error { + k.contextMtx.Lock() + defer k.contextMtx.Unlock() + k.context = ctx + return k.Start() } func (k *listenerImpl) Start() error { + if k.context == nil { + if !buildinfo.ReleaseBuild { + panic("Something went very wrong: starting Kubernetes Listener with nil context") + } + return errors.New("cannot start listener without a context") + } + // This happens if the listener is restarting. Then the signal will already have been triggered // when starting a new run of the listener. if k.stopSig.IsDone() { @@ -54,6 +74,7 @@ func (k *listenerImpl) Stop(_ error) { k.credentialsManager.Stop() } k.stopSig.Signal() + k.storeProvider.CleanupStores() } func clusterOperatorCRDExists(client client.Interface) (bool, error) { diff --git a/sensor/kubernetes/listener/resource_event_handler.go b/sensor/kubernetes/listener/resource_event_handler.go index f98d246aaa447..831b229ae2a0a 100644 --- a/sensor/kubernetes/listener/resource_event_handler.go +++ b/sensor/kubernetes/listener/resource_event_handler.go @@ -1,6 +1,8 @@ package listener import ( + "context" + osAppsExtVersions "github.com/openshift/client-go/apps/informers/externalversions" osConfigExtVersions "github.com/openshift/client-go/config/informers/externalversions" osRouteExtVersions "github.com/openshift/client-go/route/informers/externalversions" @@ -49,6 +51,9 @@ func managedFieldsTransformer(obj interface{}) (interface{}, error) { } func (k *listenerImpl) handleAllEvents() { + k.contextMtx.Lock() + defer k.contextMtx.Unlock() + // TODO(ROX-14194): remove resyncingSif once all resources are adapted var resyncingSif informers.SharedInformerFactory if env.ResyncDisabled.BooleanSetting() { @@ -125,13 +130,13 @@ func (k *listenerImpl) handleAllEvents() { stopSignal := &k.stopSig // Informers that need to be synced initially - handle(namespaceInformer, dispatchers.ForNamespaces(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(secretInformer, dispatchers.ForSecrets(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(saInformer, dispatchers.ForServiceAccounts(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, namespaceInformer, dispatchers.ForNamespaces(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, secretInformer, dispatchers.ForSecrets(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, saInformer, dispatchers.ForServiceAccounts(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) // Roles need to be synced before role bindings because role bindings have a reference - handle(roleInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(clusterRoleInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, roleInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, clusterRoleInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) var osConfigFactory osConfigExtVersions.SharedInformerFactory if k.client.OpenshiftConfig() != nil { @@ -145,17 +150,17 @@ func (k *listenerImpl) handleAllEvents() { } // For openshift clusters only if osConfigFactory != nil { - handle(osConfigFactory.Config().V1().ClusterOperators().Informer(), dispatchers.ForClusterOperators(), + handle(k.context, osConfigFactory.Config().V1().ClusterOperators().Informer(), dispatchers.ForClusterOperators(), k.outputQueue, nil, noDependencyWaitGroup, stopSignal, &eventLock) } if crdSharedInformerFactory != nil { log.Info("syncing compliance operator resources") // Handle results, rules, and scan setting bindings first - handle(complianceResultInformer, dispatchers.ForComplianceOperatorResults(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(complianceRuleInformer, dispatchers.ForComplianceOperatorRules(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(complianceScanSettingBindingsInformer, dispatchers.ForComplianceOperatorScanSettingBindings(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) - handle(complianceScanInformer, dispatchers.ForComplianceOperatorScans(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceResultInformer, dispatchers.ForComplianceOperatorResults(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceRuleInformer, dispatchers.ForComplianceOperatorRules(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceScanSettingBindingsInformer, dispatchers.ForComplianceOperatorScanSettingBindings(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceScanInformer, dispatchers.ForComplianceOperatorScans(), k.outputQueue, &syncingResources, noDependencyWaitGroup, stopSignal, &eventLock) } if !startAndWait(stopSignal, noDependencyWaitGroup, sif, resyncingSif, osConfigFactory, crdSharedInformerFactory) { @@ -169,8 +174,8 @@ func (k *listenerImpl) handleAllEvents() { roleBindingInformer := resyncingSif.Rbac().V1().RoleBindings().Informer() clusterRoleBindingInformer := resyncingSif.Rbac().V1().ClusterRoleBindings().Informer() - handle(roleBindingInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, prePodWaitGroup, stopSignal, &eventLock) - handle(clusterRoleBindingInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, prePodWaitGroup, stopSignal, &eventLock) + handle(k.context, roleBindingInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, prePodWaitGroup, stopSignal, &eventLock) + handle(k.context, clusterRoleBindingInformer, dispatchers.ForRBAC(), k.outputQueue, &syncingResources, prePodWaitGroup, stopSignal, &eventLock) if !startAndWait(stopSignal, prePodWaitGroup, resyncingSif) { return @@ -180,7 +185,7 @@ func (k *listenerImpl) handleAllEvents() { // Wait for the pod informer to sync before processing other types. // This is required because the PodLister is used to populate the image ids of deployments. - // However, do not ACTUALLY handle pod events yet -- those need to wait for deployments to be + // However, do not ACTUALLY handle, pod events yet -- those need to wait for deployments to be // synced, since we need to enrich pods with the deployment ids, and for that we need the entire // hierarchy to be populated. if !cache.WaitForCacheSync(stopSignal.Done(), podInformer.Informer().HasSynced) { @@ -191,25 +196,25 @@ func (k *listenerImpl) handleAllEvents() { preTopLevelDeploymentWaitGroup := &concurrency.WaitGroup{} // Non-deployment types. - handle(sif.Networking().V1().NetworkPolicies().Informer(), dispatchers.ForNetworkPolicies(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) - handle(sif.Core().V1().Nodes().Informer(), dispatchers.ForNodes(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) - handle(sif.Core().V1().Services().Informer(), dispatchers.ForServices(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, sif.Networking().V1().NetworkPolicies().Informer(), dispatchers.ForNetworkPolicies(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, sif.Core().V1().Nodes().Informer(), dispatchers.ForNodes(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, sif.Core().V1().Services().Informer(), dispatchers.ForServices(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) if osRouteFactory != nil { - handle(osRouteFactory.Route().V1().Routes().Informer(), dispatchers.ForOpenshiftRoutes(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, osRouteFactory.Route().V1().Routes().Informer(), dispatchers.ForOpenshiftRoutes(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) } // Deployment subtypes (this ensures that the hierarchy maps are generated correctly) - handle(resyncingSif.Batch().V1().Jobs().Informer(), dispatchers.ForJobs(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) - handle(resyncingSif.Apps().V1().ReplicaSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.ReplicaSet), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) - handle(resyncingSif.Core().V1().ReplicationControllers().Informer(), dispatchers.ForDeployments(kubernetesPkg.ReplicationController), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, resyncingSif.Batch().V1().Jobs().Informer(), dispatchers.ForJobs(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, resyncingSif.Apps().V1().ReplicaSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.ReplicaSet), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, resyncingSif.Core().V1().ReplicationControllers().Informer(), dispatchers.ForDeployments(kubernetesPkg.ReplicationController), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) // Compliance operator profiles are handled AFTER results, rules, and scan setting bindings have been synced if complianceProfileInformer != nil { - handle(complianceProfileInformer, dispatchers.ForComplianceOperatorProfiles(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceProfileInformer, dispatchers.ForComplianceOperatorProfiles(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) } if complianceTailoredProfileInformer != nil { - handle(complianceTailoredProfileInformer, dispatchers.ForComplianceOperatorTailoredProfiles(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) + handle(k.context, complianceTailoredProfileInformer, dispatchers.ForComplianceOperatorTailoredProfiles(), k.outputQueue, &syncingResources, preTopLevelDeploymentWaitGroup, stopSignal, &eventLock) } if !startAndWait(stopSignal, preTopLevelDeploymentWaitGroup, sif, resyncingSif, crdSharedInformerFactory, osRouteFactory) { @@ -221,19 +226,19 @@ func (k *listenerImpl) handleAllEvents() { wg := &concurrency.WaitGroup{} // Deployment types. - handle(resyncingSif.Apps().V1().DaemonSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.DaemonSet), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) - handle(resyncingSif.Apps().V1().Deployments().Informer(), dispatchers.ForDeployments(kubernetesPkg.Deployment), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) - handle(resyncingSif.Apps().V1().StatefulSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.StatefulSet), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, resyncingSif.Apps().V1().DaemonSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.DaemonSet), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, resyncingSif.Apps().V1().Deployments().Informer(), dispatchers.ForDeployments(kubernetesPkg.Deployment), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, resyncingSif.Apps().V1().StatefulSets().Informer(), dispatchers.ForDeployments(kubernetesPkg.StatefulSet), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) if ok, err := sensorUtils.HasAPI(k.client.Kubernetes(), "batch/v1", kubernetesPkg.CronJob); err != nil { log.Errorf("error determining API version to use for CronJobs: %v", err) } else if ok { - handle(resyncingSif.Batch().V1().CronJobs().Informer(), dispatchers.ForDeployments(kubernetesPkg.CronJob), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, resyncingSif.Batch().V1().CronJobs().Informer(), dispatchers.ForDeployments(kubernetesPkg.CronJob), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) } else { - handle(resyncingSif.Batch().V1beta1().CronJobs().Informer(), dispatchers.ForDeployments(kubernetesPkg.CronJob), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, resyncingSif.Batch().V1beta1().CronJobs().Informer(), dispatchers.ForDeployments(kubernetesPkg.CronJob), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) } if osAppsFactory != nil { - handle(osAppsFactory.Apps().V1().DeploymentConfigs().Informer(), dispatchers.ForDeployments(kubernetesPkg.DeploymentConfig), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) + handle(k.context, osAppsFactory.Apps().V1().DeploymentConfigs().Informer(), dispatchers.ForDeployments(kubernetesPkg.DeploymentConfig), k.outputQueue, &syncingResources, wg, stopSignal, &eventLock) } // SharedInformerFactories can have Start called multiple times which will start the rest of the handlers @@ -245,7 +250,7 @@ func (k *listenerImpl) handleAllEvents() { // Finally, run the pod informer, and process pod events. podWaitGroup := &concurrency.WaitGroup{} - handle(podInformer.Informer(), dispatchers.ForDeployments(kubernetesPkg.Pod), k.outputQueue, &syncingResources, podWaitGroup, stopSignal, &eventLock) + handle(k.context, podInformer.Informer(), dispatchers.ForDeployments(kubernetesPkg.Pod), k.outputQueue, &syncingResources, podWaitGroup, stopSignal, &eventLock) if !startAndWait(stopSignal, podWaitGroup, resyncingSif) { return } @@ -269,6 +274,7 @@ func (k *listenerImpl) handleAllEvents() { // Helper function that creates and adds a handler to an informer. // //////////////////////////////////////////////////////////////// func handle( + ctx context.Context, informer cache.SharedIndexInformer, dispatcher resources.Dispatcher, resolver component.Resolver, @@ -278,6 +284,7 @@ func handle( eventLock *sync.Mutex, ) { handlerImpl := &resourceEventHandlerImpl{ + context: ctx, eventLock: eventLock, dispatcher: dispatcher, resolver: resolver, diff --git a/sensor/kubernetes/listener/resource_event_handler_impl.go b/sensor/kubernetes/listener/resource_event_handler_impl.go index 4572f2b96406a..d7fba13bf93af 100644 --- a/sensor/kubernetes/listener/resource_event_handler_impl.go +++ b/sensor/kubernetes/listener/resource_event_handler_impl.go @@ -1,6 +1,8 @@ package listener import ( + "context" + "github.com/pkg/errors" "github.com/stackrox/rox/generated/internalapi/central" "github.com/stackrox/rox/pkg/concurrency" @@ -17,6 +19,7 @@ import ( // resourceEventHandlerImpl processes OnAdd, OnUpdate, and OnDelete events, and joins the results to an output // channel type resourceEventHandlerImpl struct { + context context.Context eventLock *sync.Mutex dispatcher resources.Dispatcher @@ -106,8 +109,7 @@ func (h *resourceEventHandlerImpl) sendResourceEvent(obj, oldObj interface{}, ac } message := h.dispatcher.ProcessEvent(obj, oldObj, action) - // TODO(ROX-17157) Add context here - message.Context = nil + message.Context = h.context h.resolver.Send(message) } diff --git a/sensor/kubernetes/listener/resource_event_handler_impl_test.go b/sensor/kubernetes/listener/resource_event_handler_impl_test.go index 1486d95a5ef66..1eed00d6f82c3 100644 --- a/sensor/kubernetes/listener/resource_event_handler_impl_test.go +++ b/sensor/kubernetes/listener/resource_event_handler_impl_test.go @@ -1,6 +1,8 @@ package listener import ( + "context" + "fmt" "testing" "github.com/stackrox/rox/generated/internalapi/central" @@ -15,6 +17,12 @@ import ( "k8s.io/apimachinery/pkg/types" ) +type contextKey int + +const ( + ctxKeyTest contextKey = iota +) + func TestResourceEventHandlerImpl(t *testing.T) { suite.Run(t, new(ResourceEventHandlerImplTestSuite)) } @@ -73,10 +81,15 @@ func (suite *ResourceEventHandlerImplTestSuite) assertFinished(handler *resource } func (suite *ResourceEventHandlerImplTestSuite) newHandlerImpl() *resourceEventHandlerImpl { + return suite.newHandlerImplWithContext(context.Background()) +} + +func (suite *ResourceEventHandlerImplTestSuite) newHandlerImplWithContext(ctx context.Context) *resourceEventHandlerImpl { var treatCreatesAsUpdates concurrency.Flag treatCreatesAsUpdates.Set(true) var eventLock sync.Mutex return &resourceEventHandlerImpl{ + context: ctx, eventLock: &eventLock, dispatcher: suite.dispatcher, resolver: suite.resolver, @@ -103,6 +116,17 @@ func (suite *ResourceEventHandlerImplTestSuite) TestIDsAddedToSyncSet() { suite.Empty(handler.missingInitialIDs) } +func (suite *ResourceEventHandlerImplTestSuite) TestContextIsPassed() { + ctx := context.WithValue(context.Background(), ctxKeyTest, "abc") + handler := suite.newHandlerImplWithContext(ctx) + + obj := randomID() + suite.dispatcher.EXPECT().ProcessEvent(obj, nil, central.ResourceAction_SYNC_RESOURCE). + Return(&component.ResourceEvent{}) + suite.resolver.EXPECT().Send(matchContextValue("abc")) + handler.OnAdd(obj) +} + func (suite *ResourceEventHandlerImplTestSuite) TestIDsAddedToMissingSet() { handler := suite.newHandlerImpl() @@ -201,3 +225,27 @@ func (suite *ResourceEventHandlerImplTestSuite) TestEmptySeenAndEmptyPopulate() handlerTwo.PopulateInitialObjects([]interface{}{}) suite.assertFinished(handlerTwo) } + +func matchContextValue(value string) gomock.Matcher { + return &contextMatcher{value} +} + +type contextMatcher struct { + value string +} + +// Matches implements gomock.Matcher +func (m *contextMatcher) Matches(x interface{}) bool { + event, ok := x.(*component.ResourceEvent) + if !ok { + return false + } + return event.Context.Value(ctxKeyTest) == m.value +} + +// String implements gomock.Matcher +func (m *contextMatcher) String() string { + return fmt.Sprintf("received context should have value %s", m.value) +} + +var _ gomock.Matcher = &contextMatcher{} diff --git a/sensor/tests/connection/k8sreconciliation/reconcile_test.go b/sensor/tests/connection/k8sreconciliation/reconcile_test.go index edac82c855e0c..c0c9aeaa9b810 100644 --- a/sensor/tests/connection/k8sreconciliation/reconcile_test.go +++ b/sensor/tests/connection/k8sreconciliation/reconcile_test.go @@ -16,6 +16,9 @@ import ( var ( NginxDeployment1 = helper.K8sResourceInfo{Kind: "Deployment", YamlFile: "nginx.yaml", Name: "nginx-deployment"} NginxDeployment2 = helper.K8sResourceInfo{Kind: "Deployment", YamlFile: "nginx2.yaml", Name: "nginx-deployment-2"} + NginxDeployment3 = helper.K8sResourceInfo{Kind: "Deployment", YamlFile: "nginx3.yaml", Name: "nginx-deployment-3"} + + NetpolBlockEgress = helper.K8sResourceInfo{Kind: "NetworkPolicy", YamlFile: "netpol-block-egress.yaml", Name: "block-all-egress"} ) func Test_SensorReconcilesKubernetesEvents(t *testing.T) { @@ -31,27 +34,58 @@ func Test_SensorReconcilesKubernetesEvents(t *testing.T) { c, err := helper.NewContext(t) require.NoError(t, err) + // Timeline of the events in this test: + // 1) Create deployment Nginx1 + // 2) Create deployment Nginx2 + // 3) Create NetworkPolicy block-all-egress + // 4) gRPC Connection interrupted + // 5) Delete deployment Nginx2 + // 6) Create deployment Nginx3 + // 7) gRPC Connection re-established + // 8) Sensor transmits current state with SYNC event type: + // Deployment Nginx1 + // Deployment Nginx3 + // NetworkPolicy block-all-egress + // + // Using a NetworkPolicy here will make sure that no deployments that were removed while the connection + // was down will be reprocessed and sent when the NetworkPolicy event gets resynced. c.RunTest(helper.WithTestCase(func(t *testing.T, testContext *helper.TestContext, _ map[string]k8s.Object) { ctx := context.Background() testContext.WaitForSyncEvent(2 * time.Minute) _, err = c.ApplyResourceAndWaitNoObject(ctx, helper.DefaultNamespace, NginxDeployment1, nil) + require.NoError(t, err) + deleteDeployment2, err := c.ApplyResourceAndWaitNoObject(ctx, helper.DefaultNamespace, NginxDeployment2, nil) + require.NoError(t, err) + + _, err = c.ApplyResourceAndWaitNoObject(ctx, helper.DefaultNamespace, NetpolBlockEgress, nil) + require.NoError(t, err) testContext.StopCentralGRPC() obj := &appsV1.Deployment{} - _, err = c.ApplyResource(ctx, helper.DefaultNamespace, &NginxDeployment2, obj, nil) + _, err = c.ApplyResource(ctx, helper.DefaultNamespace, &NginxDeployment3, obj, nil) require.NoError(t, err) + require.NoError(t, deleteDeployment2()) + testContext.StartFakeGRPC() archived := testContext.ArchivedMessages() require.Len(t, archived, 1) deploymentMessageInArchive(t, archived[0], helper.DefaultNamespace, NginxDeployment1.Name) + // Wait for sync event to be sent, then expect the following state to be transmitted: + // SYNC nginx-deployment-1 + // SYNC nginx-deployment-3 + // No event for nginx-deployment-2 (was deleted while connection was down) + // This reconciliation state will make Central delete Nginx2, keep Nginx1 and create Nginx3 testContext.WaitForSyncEvent(2 * time.Minute) - testContext.DeploymentActionReceived(NginxDeployment1.Name, central.ResourceAction_SYNC_RESOURCE) - testContext.DeploymentActionReceived(NginxDeployment2.Name, central.ResourceAction_SYNC_RESOURCE) + testContext.FirstDeploymentReceivedWithAction(NginxDeployment1.Name, central.ResourceAction_SYNC_RESOURCE) + testContext.FirstDeploymentReceivedWithAction(NginxDeployment3.Name, central.ResourceAction_SYNC_RESOURCE) + + // This assertion will fail if events are not properly cleared from the internal queues and in-memory stores + testContext.DeploymentNotReceived(NginxDeployment2.Name) })) } diff --git a/sensor/tests/connection/k8sreconciliation/yaml/netpol-block-egress.yaml b/sensor/tests/connection/k8sreconciliation/yaml/netpol-block-egress.yaml new file mode 100644 index 0000000000000..849a70622a637 --- /dev/null +++ b/sensor/tests/connection/k8sreconciliation/yaml/netpol-block-egress.yaml @@ -0,0 +1,11 @@ +kind: NetworkPolicy +apiVersion: networking.k8s.io/v1 +metadata: + name: block-all-egress +spec: + podSelector: + matchLabels: + app: nginx-2 + policyTypes: + - Egress + diff --git a/sensor/tests/connection/k8sreconciliation/yaml/nginx.yaml b/sensor/tests/connection/k8sreconciliation/yaml/nginx.yaml index 685c17aa68e1d..597ddab02b3b6 100644 --- a/sensor/tests/connection/k8sreconciliation/yaml/nginx.yaml +++ b/sensor/tests/connection/k8sreconciliation/yaml/nginx.yaml @@ -5,7 +5,7 @@ metadata: labels: app: nginx spec: - replicas: 3 + replicas: 1 selector: matchLabels: app: nginx @@ -19,3 +19,4 @@ spec: image: nginx:1.14.2 ports: - containerPort: 80 + diff --git a/sensor/tests/connection/k8sreconciliation/yaml/nginx2.yaml b/sensor/tests/connection/k8sreconciliation/yaml/nginx2.yaml index 1945de4195fbb..1cc86c13e2385 100644 --- a/sensor/tests/connection/k8sreconciliation/yaml/nginx2.yaml +++ b/sensor/tests/connection/k8sreconciliation/yaml/nginx2.yaml @@ -3,19 +3,20 @@ kind: Deployment metadata: name: nginx-deployment-2 labels: - app: nginx + app: nginx-2 spec: - replicas: 3 + replicas: 1 selector: matchLabels: - app: nginx + app: nginx-2 template: metadata: labels: - app: nginx + app: nginx-2 spec: containers: - name: nginx image: nginx:1.14.2 ports: - containerPort: 80 + diff --git a/sensor/tests/connection/k8sreconciliation/yaml/nginx3.yaml b/sensor/tests/connection/k8sreconciliation/yaml/nginx3.yaml new file mode 100644 index 0000000000000..a29391ea87cef --- /dev/null +++ b/sensor/tests/connection/k8sreconciliation/yaml/nginx3.yaml @@ -0,0 +1,22 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx-deployment-3 + labels: + app: nginx-3 +spec: + replicas: 1 + selector: + matchLabels: + app: nginx-3 + template: + metadata: + labels: + app: nginx-3 + spec: + containers: + - name: nginx + image: nginx:1.14.2 + ports: + - containerPort: 80 + diff --git a/sensor/tests/helper/helper.go b/sensor/tests/helper/helper.go index cabf94418eb32..c9933cbdcaca4 100644 --- a/sensor/tests/helper/helper.go +++ b/sensor/tests/helper/helper.go @@ -24,6 +24,7 @@ import ( "github.com/stackrox/rox/sensor/kubernetes/client" "github.com/stackrox/rox/sensor/kubernetes/sensor" "github.com/stackrox/rox/sensor/testutils" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" @@ -477,6 +478,33 @@ func (c *TestContext) WaitForDeploymentEventWithTimeout(name string, timeout tim } +// FirstDeploymentStateMatchesWithTimeout checks that the first deployment received with name matches the assertion function. +func (c *TestContext) FirstDeploymentStateMatchesWithTimeout(name string, assertion AssertFunc, message string, timeout time.Duration) { + timer := time.NewTimer(timeout) + ticker := time.NewTicker(defaultTicker) + for { + select { + case <-timer.C: + c.t.Errorf("timeout reached waiting for state: (%s): no deployment found", message) + return + case <-ticker.C: + messages := c.GetFakeCentral().GetAllMessages() + deploymentMessage := GetFirstMessageWithDeploymentName(messages, DefaultNamespace, name) + deployment := deploymentMessage.GetEvent().GetDeployment() + action := deploymentMessage.GetEvent().GetAction() + if deployment != nil { + // Always return when deployment is found. As soon as deployment != nil it should be the deployment + // that matches assertion. If it isn't, the test should fail immediately. There's no point in waiting. + err := assertion(deployment, action) + if err != nil { + c.t.Errorf("first deployment found didn't meet expected state: %s", err) + } + return + } + } + } +} + // LastDeploymentState checks the deployment state similarly to `LastDeploymentStateWithTimeout` with a default 3 seconds timeout. func (c *TestContext) LastDeploymentState(name string, assertion AssertFunc, message string) { c.LastDeploymentStateWithTimeout(name, assertion, message, defaultWaitTimeout) @@ -509,17 +537,24 @@ func (c *TestContext) LastDeploymentStateWithTimeout(name string, assertion Asse // DeploymentCreateReceived checks if a deployment object was received with CREATE action. func (c *TestContext) DeploymentCreateReceived(name string) { - c.DeploymentActionReceived(name, central.ResourceAction_CREATE_RESOURCE) + c.FirstDeploymentReceivedWithAction(name, central.ResourceAction_CREATE_RESOURCE) } -// DeploymentActionReceived checks if a deployment object was received with specific action type. -func (c *TestContext) DeploymentActionReceived(name string, expectedAction central.ResourceAction) { - c.LastDeploymentState(name, func(_ *storage.Deployment, action central.ResourceAction) error { +// FirstDeploymentReceivedWithAction checks if a deployment object was received with specific action type. +func (c *TestContext) FirstDeploymentReceivedWithAction(name string, expectedAction central.ResourceAction) { + c.FirstDeploymentStateMatchesWithTimeout(name, func(_ *storage.Deployment, action central.ResourceAction) error { if action != expectedAction { return errors.Errorf("event action is %s, but expected %s", action, expectedAction) } return nil - }, fmt.Sprintf("Deployment %s should be received with action %s", name, expectedAction)) + }, fmt.Sprintf("Deployment %s should be received with action %s", name, expectedAction), defaultWaitTimeout) +} + +// DeploymentNotReceived checks that a deployment event for deployment with name should not have been received by fake central. +func (c *TestContext) DeploymentNotReceived(name string) { + messages := c.GetFakeCentral().GetAllMessages() + lastDeploymentUpdate := GetLastMessageWithDeploymentName(messages, DefaultNamespace, name) + assert.Nilf(c.t, lastDeploymentUpdate, "should not have found deployment with name %s: %+v", name, lastDeploymentUpdate) } // GetLastMessageMatching finds last element in slice matching `matchFn`. @@ -797,6 +832,17 @@ func (c *TestContext) waitForResource(timeout time.Duration, fn condition) error } } +// GetFirstMessageWithDeploymentName find the first sensor message by namespace and deployment name +func GetFirstMessageWithDeploymentName(messages []*central.MsgFromSensor, ns, name string) *central.MsgFromSensor { + for _, msg := range messages { + deployment := msg.GetEvent().GetDeployment() + if deployment.GetName() == name && deployment.GetNamespace() == ns { + return msg + } + } + return nil +} + // GetLastMessageWithDeploymentName find most recent sensor messages by namespace and deployment name func GetLastMessageWithDeploymentName(messages []*central.MsgFromSensor, ns, name string) *central.MsgFromSensor { var lastMessage *central.MsgFromSensor