Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port 5363 UI for the kubernetes exporter moving the config into port #19

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7e0188b
Added support for control the app config from the UI using Polling an…
yairsimantov20 Dec 11, 2023
e7079db
fixed miss type
yairsimantov20 Dec 11, 2023
311502d
Renaming
yairsimantov20 Dec 11, 2023
d84ad22
updated typing
yairsimantov20 Dec 11, 2023
a32b98b
at the start of polling getting the current state
yairsimantov20 Dec 11, 2023
a04dfd1
fixed schema & logs
yairsimantov20 Dec 11, 2023
25254fb
revert useragent addition
yairsimantov20 Dec 12, 2023
1eef152
fixes
yairsimantov20 Dec 13, 2023
97edf37
cr requests
yairsimantov20 Dec 13, 2023
56bad41
better config management
yairsimantov20 Dec 13, 2023
eb24061
removed bool config managment
yairsimantov20 Dec 13, 2023
296e170
better error for config file
yairsimantov20 Dec 13, 2023
2c21603
moved dependent and create missing to the app config
yairsimantov20 Dec 13, 2023
62a731d
starting kafka config once
yairsimantov20 Dec 13, 2023
f1ef258
starting kafka config once
yairsimantov20 Dec 13, 2023
7aae882
unified configuration
yairsimantov20 Dec 13, 2023
5be6bdb
fixed configuration flag issue
yairsimantov20 Dec 13, 2023
a6ced85
not failing for non-existing configuration on startup
yairsimantov20 Dec 14, 2023
1db8ded
change imports
yairsimantov20 Dec 14, 2023
7498948
changing offset reset to latest
yairsimantov20 Dec 14, 2023
f4e8dd5
starting resync in goroutine
yairsimantov20 Dec 14, 2023
6326254
revert moving the handle
yairsimantov20 Dec 17, 2023
bbb39ee
revert moving the handle
yairsimantov20 Dec 17, 2023
a1e4031
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
3e3c91c
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
80421b1
dependency injection and tests
yairsimantov20 Dec 18, 2023
d0e4f66
dep injection
yairsimantov20 Dec 18, 2023
a522369
event handler tests
yairsimantov20 Dec 20, 2023
1838961
file renaming
yairsimantov20 Dec 20, 2023
9879466
removed function
yairsimantov20 Dec 20, 2023
2cf5c27
cr
yairsimantov20 Dec 20, 2023
d4816cd
cr
yairsimantov20 Dec 20, 2023
ae6ccad
closing after one message consumer
yairsimantov20 Dec 20, 2023
06451f1
closing after one message consumer
yairsimantov20 Dec 20, 2023
86ec4e7
cr
yairsimantov20 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func main() {
}

klog.Info("Starting controllers handler")
err = event_handler.StartEventHandler(eventListener, func() (event_handler.IStoppableRsync, error) {
err = event_handler.Start(eventListener, func() (event_handler.IStoppableRsync, error) {
return initiateHandler(exporterConfig, k8sClient, portClient)
})

Expand Down
6 changes: 6 additions & 0 deletions pkg/event_handler/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Fixture struct {

type MockConsumer struct {
pollData kafka.Event
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
close func()
}

func (m *MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error) {
Expand All @@ -36,6 +37,10 @@ func (m *MockConsumer) SubscribeTopics(topics []string, rebalanceCb kafka.Rebala
}

func (m *MockConsumer) Poll(timeoutMs int) (event kafka.Event) {
// The consumer will poll this in while true loop so we need to close it inorder not to spam the logs
defer func() {
m.close()
}()
return m.pollData
}

Expand All @@ -50,6 +55,7 @@ func (m *MockConsumer) Close() (err error) {
func NewFixture(t *testing.T) *Fixture {
mock := &MockConsumer{}
consumer, err := NewConsumer(&config.KafkaConfiguration{}, mock)
mock.close = consumer.Close
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/event_handler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ type IStoppableRsync interface {
Stop()
}

func StartEventHandler(eventListener IListener, controllerHandlerFactory func() (IStoppableRsync, error)) error {
controllerHandler, err := controllerHandlerFactory()
func Start(eventListener IListener, initControllerHandler func() (IStoppableRsync, error)) error {
controllerHandler, err := initControllerHandler()
if err != nil {
return err
}

return eventListener.Run(func() {
klog.Infof("Resync request received. Recreating controllers for the new port configuration")
controllerHandler.Stop()
newController, resyncErr := controllerHandlerFactory()
newController, resyncErr := initControllerHandler()
controllerHandler = newController

if resyncErr != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/event_handler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ func (e *EventListenerMock) Run(resync func()) error {
}

func TestStartKafkaEventListener(t *testing.T) {
// Test should get a new controller handler on each call to the passed function and stop the previous one
// The flow for this test will be: create controller handler -> resync and stop the controller handler & create a
// new controller handler X 2 which will result the last controller handler to not be stopped
eventListenerMock := &EventListenerMock{}
firstResponse := &ControllerHandlerMock{}
secondResponse := &ControllerHandlerMock{}
Expand All @@ -37,7 +40,7 @@ func TestStartKafkaEventListener(t *testing.T) {
thirdResponse,
}

err := StartEventHandler(eventListenerMock, func() (IStoppableRsync, error) {
err := Start(eventListenerMock, func() (IStoppableRsync, error) {
r := responses[0]
responses = responses[1:]

Expand Down
5 changes: 3 additions & 2 deletions pkg/event_handler/polling/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ import (
type EventListener struct {
stateKey string
portClient *cli.PortClient
handler *Handler
}

func NewEventListener(stateKey string, portClient *cli.PortClient) *EventListener {
return &EventListener{
stateKey: stateKey,
portClient: portClient,
handler: NewPollingHandler(config.PollingListenerRate, stateKey, portClient, nil),
}
}

func (l *EventListener) Run(resync func()) error {
klog.Infof("Starting polling event listener")
klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate)
pollingHandler := NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient, nil)
pollingHandler.Run(resync)
l.handler.Run(resync)

return nil
}
8 changes: 4 additions & 4 deletions pkg/event_handler/polling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ func (t *Ticker) GetC() <-chan time.Time {
return t.ticker.C
}

type HandlerSettings struct {
type Handler struct {
ticker ITicker
stateKey string
portClient *cli.PortClient
pollingRate uint
}

func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient, tickerOverride ITicker) *HandlerSettings {
func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortClient, tickerOverride ITicker) *Handler {
ticker := tickerOverride
if ticker == nil {
ticker = NewTicker(time.Second * time.Duration(pollingRate))
}
rv := &HandlerSettings{
rv := &Handler{
ticker: ticker,
stateKey: stateKey,
portClient: portClient,
Expand All @@ -54,7 +54,7 @@ func NewPollingHandler(pollingRate uint, stateKey string, portClient *cli.PortCl
return rv
}

func (h *HandlerSettings) Run(resync func()) {
func (h *Handler) Run(resync func()) {
klog.Infof("Starting polling handler")
currentState, err := integration.GetIntegration(h.portClient, h.stateKey)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/port/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func NewIntegration(portClient *cli.PortClient, stateKey string, eventListenerTy
return nil
}

// ToDo: remove this function
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
func GetIntegrationConfig(portClient *cli.PortClient, stateKey string) (*port.AppConfig, error) {
_, err := portClient.Authenticate(context.Background(), portClient.ClientID, portClient.ClientSecret)
if err != nil {
Expand Down