Skip to content

Commit

Permalink
ROX-17157: Fix reconciliation for offline mode for Kubernetes Listener (
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrb committed Jul 26, 2023
1 parent 26c4c2a commit f0696c2
Show file tree
Hide file tree
Showing 16 changed files with 444 additions and 55 deletions.
8 changes: 8 additions & 0 deletions sensor/kubernetes/eventpipeline/component/component.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package component

import (
"context"

"github.com/stackrox/rox/sensor/common/message"
)

Expand All @@ -26,3 +28,9 @@ type OutputQueue interface {
Send(event *ResourceEvent)
ResponsesC() <-chan *message.ExpiringMessage
}

// ContextListener is a component that listens but has a context in the messages
type ContextListener interface {
PipelineComponent
StartWithContext(context.Context) error
}
64 changes: 64 additions & 0 deletions sensor/kubernetes/eventpipeline/component/mocks/component.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sensor/kubernetes/eventpipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func New(client client.Interface, configHandler config.Handler, detector detector.Detector, reprocessor reprocessor.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, storeProvider *resources.InMemoryStoreProvider, queueSize int) common.SensorComponent {
outputQueue := output.New(detector, queueSize)
var depResolver component.Resolver
var resourceListener component.PipelineComponent
var resourceListener component.ContextListener
if env.ResyncDisabled.BooleanSetting() {
depResolver = resolver.New(outputQueue, storeProvider, queueSize)
resourceListener = listener.New(client, configHandler, nodeName, resyncPeriod, traceWriter, depResolver, storeProvider)
Expand Down
27 changes: 25 additions & 2 deletions sensor/kubernetes/eventpipeline/pipeline_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventpipeline

import (
"context"
"sync/atomic"

"github.com/pkg/errors"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/env"
"github.com/stackrox/rox/pkg/logging"
"github.com/stackrox/rox/pkg/sync"
"github.com/stackrox/rox/sensor/common"
"github.com/stackrox/rox/sensor/common/detector"
"github.com/stackrox/rox/sensor/common/message"
Expand All @@ -25,14 +27,18 @@ var (
type eventPipeline struct {
output component.OutputQueue
resolver component.Resolver
listener component.PipelineComponent
listener component.ContextListener
detector detector.Detector
reprocessor reprocessor.Handler

offlineMode *atomic.Bool

eventsC chan *message.ExpiringMessage
stopSig concurrency.Signal

contextMtx sync.Mutex
context context.Context
cancelContext context.CancelFunc
}

// Capabilities implements common.SensorComponent
Expand Down Expand Up @@ -64,6 +70,20 @@ func (p *eventPipeline) ResponsesC() <-chan *message.ExpiringMessage {
return p.eventsC
}

func (p *eventPipeline) stopCurrentContext() {
p.contextMtx.Lock()
defer p.contextMtx.Unlock()
if p.cancelContext != nil {
p.cancelContext()
}
}

func (p *eventPipeline) createNewContext() {
p.contextMtx.Lock()
defer p.contextMtx.Unlock()
p.context, p.cancelContext = context.WithCancel(context.Background())
}

// Start implements common.SensorComponent
func (p *eventPipeline) Start() error {
// The order is important here, we need to start the components
Expand Down Expand Up @@ -102,13 +122,16 @@ func (p *eventPipeline) Notify(event common.SensorComponentEvent) {
// Start listening to events if not yet listening
if p.offlineMode.CompareAndSwap(true, false) {
log.Infof("Connection established: Starting Kubernetes listener")
if err := p.listener.Start(); err != nil {
// TODO(ROX-18613): use contextProvider to provide context for listener
p.createNewContext()
if err := p.listener.StartWithContext(p.context); err != nil {
log.Fatalf("Failed to start listener component. Sensor cannot run without listening to Kubernetes events: %s", err)
}
}
case common.SensorComponentEventOfflineMode:
// Stop listening to events
if p.offlineMode.CompareAndSwap(false, true) {
p.stopCurrentContext()
p.listener.Stop(errors.New("gRPC connection stopped"))
}
}
Expand Down
115 changes: 108 additions & 7 deletions sensor/kubernetes/eventpipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package eventpipeline

import (
"sync/atomic"
"testing"

"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/sync"
"github.com/stackrox/rox/sensor/common"
mockDetector "github.com/stackrox/rox/sensor/common/detector/mocks"
"github.com/stackrox/rox/sensor/common/message"
mockReprocessor "github.com/stackrox/rox/sensor/common/reprocessor/mocks"
Expand All @@ -24,7 +26,11 @@ type eventPipelineSuite struct {
resolver *mockComponent.MockResolver
detector *mockDetector.MockDetector
reprocessor *mockReprocessor.MockHandler
listener *mockComponent.MockContextListener
outputQueue *mockComponent.MockOutputQueue
pipeline *eventPipeline

outputC chan *message.ExpiringMessage
}

var _ suite.SetupTestSuite = &eventPipelineSuite{}
Expand All @@ -38,28 +44,123 @@ func (s *eventPipelineSuite) TearDownTest() {
s.T().Cleanup(s.mockCtrl.Finish)
}

type mockListener struct{}

func (m *mockListener) Start() error { return nil }
func (m *mockListener) Stop(_ error) {}

func (s *eventPipelineSuite) SetupTest() {
s.mockCtrl = gomock.NewController(s.T())

s.resolver = mockComponent.NewMockResolver(s.mockCtrl)
s.detector = mockDetector.NewMockDetector(s.mockCtrl)
s.reprocessor = mockReprocessor.NewMockHandler(s.mockCtrl)
s.listener = mockComponent.NewMockContextListener(s.mockCtrl)
s.outputQueue = mockComponent.NewMockOutputQueue(s.mockCtrl)

offlineMode := atomic.Bool{}
offlineMode.Store(true)

s.pipeline = &eventPipeline{
eventsC: make(chan *message.ExpiringMessage),
stopSig: concurrency.NewSignal(),
output: mockComponent.NewMockOutputQueue(s.mockCtrl),
output: s.outputQueue,
resolver: s.resolver,
detector: s.detector,
reprocessor: s.reprocessor,
listener: &mockListener{},
listener: s.listener,
offlineMode: &offlineMode,
}
}

func (s *eventPipelineSuite) write() {
s.outputC <- message.NewExpiring(s.pipeline.context, nil)
}

func (s *eventPipelineSuite) online() {
s.pipeline.Notify(common.SensorComponentEventCentralReachable)
}

func (s *eventPipelineSuite) offline() {
s.pipeline.Notify(common.SensorComponentEventOfflineMode)
}

func (s *eventPipelineSuite) readSuccess() {
msg, more := <-s.pipeline.ResponsesC()
s.Assert().True(more, "channel should be open")
s.Assert().False(msg.IsExpired(), "message should not be expired")
}

func (s *eventPipelineSuite) readExpired() {
msg, more := <-s.pipeline.ResponsesC()
s.Assert().True(more, "channel should be open")
s.Assert().True(msg.IsExpired(), "message should be expired")
}

func (s *eventPipelineSuite) Test_OfflineModeCases() {
s.T().Setenv("ROX_RESYNC_DISABLED", "true")

outputC := make(chan *message.ExpiringMessage, 10)
s.outputQueue.EXPECT().ResponsesC().
AnyTimes().Return(outputC)
s.outputC = outputC

s.outputQueue.EXPECT().Start().Times(1)
s.resolver.EXPECT().Start().Times(1)
s.listener.EXPECT().StartWithContext(gomock.Any()).AnyTimes()
s.listener.EXPECT().Stop(gomock.Any()).AnyTimes()

s.Require().NoError(s.pipeline.Start())
s.pipeline.Notify(common.SensorComponentEventCentralReachable)

testCases := map[string][]func(){
"Base case: Start, WA, WB, RA, RB, Disconnect": {s.online, s.write, s.write, s.readSuccess, s.readSuccess, s.offline},
"Case: Start, WA, WB, Disconnect, RA, RB, Reconnect": {s.write, s.write, s.offline, s.readExpired, s.readExpired, s.online},
"Case: Start, WA, WB, Disconnect, Reconnect, RA, RB": {s.write, s.write, s.offline, s.online, s.readExpired, s.readExpired},
"Case: Start, WA, Disconnect, WB, Reconnect, RA, RB": {s.write, s.offline, s.write, s.online, s.readExpired, s.readExpired},
"Case: Start, WA, Disconnect, Reconnect, WB, RA, RB": {s.write, s.offline, s.online, s.write, s.readExpired, s.readSuccess},
"Case: Start, Disconnect, WA, Reconnect, WB, RA, RB": {s.offline, s.write, s.online, s.write, s.readExpired, s.readSuccess},
"Case: Start, Disconnect, Reconnect, WA, WB, RA, RB": {s.offline, s.online, s.write, s.write, s.readSuccess, s.readSuccess},
}

for caseName, orderedFunctions := range testCases {
s.Run(caseName, func() {
for _, fn := range orderedFunctions {
fn()
}
})
}
}

func (s *eventPipelineSuite) Test_OfflineMode() {
s.T().Setenv("ROX_RESYNC_DISABLED", "true")

outputC := make(chan *message.ExpiringMessage, 10)
s.outputQueue.EXPECT().ResponsesC().
AnyTimes().Return(outputC)

s.outputQueue.EXPECT().Start().Times(1)
s.resolver.EXPECT().Start().Times(1)

// Expect listener to be reset (i.e. started twice and stopped once)
s.listener.EXPECT().StartWithContext(gomock.Any()).Times(2)
s.listener.EXPECT().Stop(gomock.Any()).Times(1)

s.Require().NoError(s.pipeline.Start())
s.pipeline.Notify(common.SensorComponentEventCentralReachable)

outputC <- message.NewExpiring(s.pipeline.context, nil)
outputC <- message.NewExpiring(s.pipeline.context, nil)

// Read message A
msgA, more := <-s.pipeline.ResponsesC()
s.Require().True(more, "should have more messages in ResponsesC")
s.Assert().False(msgA.IsExpired(), "context should not be expired")

s.pipeline.Notify(common.SensorComponentEventOfflineMode)
s.pipeline.Notify(common.SensorComponentEventCentralReachable)

// Read message B
msgB, more := <-s.pipeline.ResponsesC()
s.Require().True(more, "should have more messages in ResponsesC")
s.Assert().True(msgB.IsExpired(), "context should be expired")
}

func (s *eventPipelineSuite) Test_ReprocessDeployments() {
s.T().Setenv("ROX_RESYNC_DISABLED", "true")
messageReceived := sync.WaitGroup{}
Expand Down
2 changes: 1 addition & 1 deletion sensor/kubernetes/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
)

// New returns a new kubernetes listener.
func New(client client.Interface, configHandler config.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, queue component.Resolver, storeProvider *resources.InMemoryStoreProvider) component.PipelineComponent {
func New(client client.Interface, configHandler config.Handler, nodeName string, resyncPeriod time.Duration, traceWriter io.Writer, queue component.Resolver, storeProvider *resources.InMemoryStoreProvider) component.ContextListener {
k := &listenerImpl{
client: client,
stopSig: concurrency.NewSignal(),
Expand Down
21 changes: 21 additions & 0 deletions sensor/kubernetes/listener/listener_impl.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package listener

import (
"context"
"errors"
"io"
"time"

"github.com/stackrox/rox/pkg/buildinfo"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/sync"
"github.com/stackrox/rox/sensor/common/awscredentials"
"github.com/stackrox/rox/sensor/common/config"
"github.com/stackrox/rox/sensor/kubernetes/client"
Expand All @@ -29,9 +33,25 @@ type listenerImpl struct {
traceWriter io.Writer
outputQueue component.Resolver
storeProvider *resources.InMemoryStoreProvider
context context.Context
contextMtx sync.Mutex
}

func (k *listenerImpl) StartWithContext(ctx context.Context) error {
k.contextMtx.Lock()
defer k.contextMtx.Unlock()
k.context = ctx
return k.Start()
}

func (k *listenerImpl) Start() error {
if k.context == nil {
if !buildinfo.ReleaseBuild {
panic("Something went very wrong: starting Kubernetes Listener with nil context")
}
return errors.New("cannot start listener without a context")
}

// This happens if the listener is restarting. Then the signal will already have been triggered
// when starting a new run of the listener.
if k.stopSig.IsDone() {
Expand All @@ -54,6 +74,7 @@ func (k *listenerImpl) Stop(_ error) {
k.credentialsManager.Stop()
}
k.stopSig.Signal()
k.storeProvider.CleanupStores()
}

func clusterOperatorCRDExists(client client.Interface) (bool, error) {
Expand Down
Loading

0 comments on commit f0696c2

Please sign in to comment.