From d0e4f66547a6f6263862b1f8d3fa40fcec8aeda1 Mon Sep 17 00:00:00 2001 From: yair Date: Tue, 19 Dec 2023 01:56:24 +0200 Subject: [PATCH] dep injection --- main.go | 39 ++++++++++--------- .../consumer/consumer.go | 0 .../consumer/consumer_test.go | 0 .../consumer/event_listener.go | 6 +-- pkg/event_handler/event_handler.go | 33 ++++++++++++++++ pkg/event_handler/handler_test.go | 13 +++++++ .../polling/event_listener.go | 4 +- .../polling/polling.go | 0 .../polling/polling_test.go | 4 +- pkg/event_listener/event_listener_handler.go | 34 ---------------- pkg/handlers/controllers.go | 6 +-- pkg/port/integration/integration.go | 8 ++-- 12 files changed, 79 insertions(+), 68 deletions(-) rename pkg/{event_listener => event_handler}/consumer/consumer.go (100%) rename pkg/{event_listener => event_handler}/consumer/consumer_test.go (100%) rename pkg/{event_listener => event_handler}/consumer/event_listener.go (91%) create mode 100644 pkg/event_handler/event_handler.go create mode 100644 pkg/event_handler/handler_test.go rename pkg/{event_listener => event_handler}/polling/event_listener.go (83%) rename pkg/{event_listener => event_handler}/polling/polling.go (100%) rename pkg/{event_listener => event_handler}/polling/polling_test.go (95%) delete mode 100644 pkg/event_listener/event_listener_handler.go diff --git a/main.go b/main.go index 99698a0..f3020a4 100644 --- a/main.go +++ b/main.go @@ -4,9 +4,9 @@ import ( "flag" "fmt" "github.com/port-labs/port-k8s-exporter/pkg/config" - "github.com/port-labs/port-k8s-exporter/pkg/event_listener" - "github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer" - "github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling" + "github.com/port-labs/port-k8s-exporter/pkg/event_handler" + "github.com/port-labs/port-k8s-exporter/pkg/event_handler/consumer" + "github.com/port-labs/port-k8s-exporter/pkg/event_handler/polling" "github.com/port-labs/port-k8s-exporter/pkg/handlers" "github.com/port-labs/port-k8s-exporter/pkg/k8s" "github.com/port-labs/port-k8s-exporter/pkg/port" @@ -16,7 +16,7 @@ import ( ) func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) { - apiConfig, err := integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey) + apiConfig, err := integration.GetIntegrationConfig(portClient, exporterConfig.StateKey) if err != nil { klog.Fatalf("Error getting K8s integration config: %s", err.Error()) } @@ -30,24 +30,29 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli return newHandler, nil } -func CreateEventListener(portClient *cli.PortClient) (event_listener.IEventListener, error) { - klog.Infof("Received event listener type: %s", config.ApplicationConfig.EventListenerType) - switch config.ApplicationConfig.EventListenerType { +func createEventListener(stateKey string, eventListenerType string, portClient *cli.PortClient) (event_handler.IListener, error) { + klog.Infof("Received event listener type: %s", eventListenerType) + switch eventListenerType { case "KAFKA": - return consumer.NewEventListener(portClient) + return consumer.NewEventListener(stateKey, portClient) case "POLLING": - return polling.NewEventListener(portClient), nil + return polling.NewEventListener(stateKey, portClient), nil default: - return nil, fmt.Errorf("unknown event listener type: %s", config.ApplicationConfig.EventListenerType) + return nil, fmt.Errorf("unknown event listener type: %s", eventListenerType) } } +func initIntegrationConfig() { + +} + func main() { klog.InitFlags(nil) k8sConfig := k8s.NewKubeConfig() + exporterConfig, _ := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType) clientConfig, err := k8sConfig.ClientConfig() if err != nil { klog.Fatalf("Error getting K8s client config: %s", err.Error()) @@ -60,35 +65,31 @@ func main() { portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret), - cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey)), + cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", exporterConfig.StateKey)), ) if err != nil { klog.Fatalf("Error building Port client: %s", err.Error()) } - exporterConfig, _ := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType) - - _, err = integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey) + _, err = integration.GetIntegrationConfig(portClient, exporterConfig.StateKey) if err != nil { if exporterConfig == nil { klog.Fatalf("The integration does not exist and no config file was provided") } - err = integration.NewIntegration(portClient, exporterConfig, exporterConfig.Resources) + err = integration.NewIntegration(portClient, exporterConfig.StateKey, exporterConfig.EventListenerType, exporterConfig.Resources) if err != nil { klog.Fatalf("Error creating K8s integration: %s", err.Error()) } } - eventListener, err := CreateEventListener(portClient) + eventListener, err := createEventListener(exporterConfig.StateKey, exporterConfig.EventListenerType, portClient) if err != nil { klog.Fatalf("Error creating event listener: %s", err.Error()) } klog.Info("Starting controllers handler") - handler, _ := initiateHandler(exporterConfig, k8sClient, portClient) - err = event_listener.StartEventHandler(eventListener, handler, func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) { - handler.Stop() + err = event_handler.StartEventHandler(eventListener, func() (event_handler.IStoppableRsync, error) { return initiateHandler(exporterConfig, k8sClient, portClient) }) diff --git a/pkg/event_listener/consumer/consumer.go b/pkg/event_handler/consumer/consumer.go similarity index 100% rename from pkg/event_listener/consumer/consumer.go rename to pkg/event_handler/consumer/consumer.go diff --git a/pkg/event_listener/consumer/consumer_test.go b/pkg/event_handler/consumer/consumer_test.go similarity index 100% rename from pkg/event_listener/consumer/consumer_test.go rename to pkg/event_handler/consumer/consumer_test.go diff --git a/pkg/event_listener/consumer/event_listener.go b/pkg/event_handler/consumer/event_listener.go similarity index 91% rename from pkg/event_listener/consumer/event_listener.go rename to pkg/event_handler/consumer/event_listener.go index 26e8cbe..49ec773 100644 --- a/pkg/event_listener/consumer/event_listener.go +++ b/pkg/event_handler/consumer/event_listener.go @@ -26,7 +26,7 @@ type IncomingMessage struct { } `json:"diff"` } -func NewEventListener(portClient *cli.PortClient) (*EventListener, error) { +func NewEventListener(stateKey string, portClient *cli.PortClient) (*EventListener, error) { klog.Infof("Getting Consumer Information") credentials, err := kafka_credentials.GetKafkaCredentials(portClient) if err != nil { @@ -43,7 +43,7 @@ func NewEventListener(portClient *cli.PortClient) (*EventListener, error) { AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism, Username: credentials.Username, Password: credentials.Password, - GroupID: orgId + ".k8s." + config.ApplicationConfig.StateKey, + GroupID: orgId + ".k8s." + stateKey, } topic := orgId + ".change.log" @@ -53,7 +53,7 @@ func NewEventListener(portClient *cli.PortClient) (*EventListener, error) { } return &EventListener{ - stateKey: config.ApplicationConfig.StateKey, + stateKey: stateKey, portClient: portClient, topic: topic, consumer: instance, diff --git a/pkg/event_handler/event_handler.go b/pkg/event_handler/event_handler.go new file mode 100644 index 0000000..b2a247a --- /dev/null +++ b/pkg/event_handler/event_handler.go @@ -0,0 +1,33 @@ +package event_handler + +import ( + "fmt" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" +) + +type IListener interface { + Run(resync func()) error +} + +type IStoppableRsync interface { + Stop() +} + +func StartEventHandler(eventListener IListener, controllerHandlerFactory func() (IStoppableRsync, error)) error { + controllerHandler, err := controllerHandlerFactory() + if err != nil { + return err + } + + return eventListener.Run(func() { + klog.Infof("Resync request received. Recreating controllers for the new port configuration") + controllerHandler.Stop() + newController, resyncErr := controllerHandlerFactory() + controllerHandler = newController + + if resyncErr != nil { + utilruntime.HandleError(fmt.Errorf("error resyncing: %s", resyncErr.Error())) + } + }) +} diff --git a/pkg/event_handler/handler_test.go b/pkg/event_handler/handler_test.go new file mode 100644 index 0000000..f027d9b --- /dev/null +++ b/pkg/event_handler/handler_test.go @@ -0,0 +1,13 @@ +package event_handler + +import ( + "testing" +) + +type fixture struct { + t *testing.T +} + +func TestStartKafkaEventListener(t *testing.T) { + +} diff --git a/pkg/event_listener/polling/event_listener.go b/pkg/event_handler/polling/event_listener.go similarity index 83% rename from pkg/event_listener/polling/event_listener.go rename to pkg/event_handler/polling/event_listener.go index 546f052..8856ca8 100644 --- a/pkg/event_listener/polling/event_listener.go +++ b/pkg/event_handler/polling/event_listener.go @@ -11,9 +11,9 @@ type EventListener struct { portClient *cli.PortClient } -func NewEventListener(portClient *cli.PortClient) *EventListener { +func NewEventListener(stateKey string, portClient *cli.PortClient) *EventListener { return &EventListener{ - stateKey: config.ApplicationConfig.StateKey, + stateKey: stateKey, portClient: portClient, } } diff --git a/pkg/event_listener/polling/polling.go b/pkg/event_handler/polling/polling.go similarity index 100% rename from pkg/event_listener/polling/polling.go rename to pkg/event_handler/polling/polling.go diff --git a/pkg/event_listener/polling/polling_test.go b/pkg/event_handler/polling/polling_test.go similarity index 95% rename from pkg/event_listener/polling/polling_test.go rename to pkg/event_handler/polling/polling_test.go index 6476c3b..759e84d 100644 --- a/pkg/event_listener/polling/polling_test.go +++ b/pkg/event_handler/polling/polling_test.go @@ -39,9 +39,7 @@ func NewFixture(t *testing.T, c chan time.Time) *Fixture { } _ = integration.DeleteIntegration(portClient, stateKey) - err = integration.NewIntegration(portClient, &port.Config{ - StateKey: stateKey, - }, []port.Resource{}) + err = integration.NewIntegration(portClient, stateKey, "", []port.Resource{}) if err != nil { t.Errorf("Error creating Port integration: %s", err.Error()) } diff --git a/pkg/event_listener/event_listener_handler.go b/pkg/event_listener/event_listener_handler.go deleted file mode 100644 index 68192f7..0000000 --- a/pkg/event_listener/event_listener_handler.go +++ /dev/null @@ -1,34 +0,0 @@ -package event_listener - -import ( - "fmt" - "github.com/port-labs/port-k8s-exporter/pkg/handlers" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/klog/v2" -) - -type IEventListener interface { - Run(resync func()) error -} - -type Handler struct { - eventListener IEventListener - controllerHandler *handlers.ControllersHandler -} - -func StartEventHandler(eventListener IEventListener, controllerHandler *handlers.ControllersHandler, resync func(*handlers.ControllersHandler) (*handlers.ControllersHandler, error)) error { - err := eventListener.Run(func() { - klog.Infof("Resync request received. Recreating controllers for the new port configuration") - newController, resyncErr := resync(controllerHandler) - controllerHandler = newController - - if resyncErr != nil { - utilruntime.HandleError(fmt.Errorf("error resyncing: %s", resyncErr.Error())) - } - }) - if err != nil { - return err - } - - return nil -} diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index c665615..385aff3 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -16,7 +16,7 @@ import ( type ControllersHandler struct { controllers []*k8s.Controller informersFactory dynamicinformer.DynamicSharedInformerFactory - exporterConfig *port.Config + stateKey string portClient *cli.PortClient stopCh chan struct{} } @@ -57,7 +57,7 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.AppConf controllersHandler := &ControllersHandler{ controllers: controllers, informersFactory: informersFactory, - exporterConfig: exporterConfig, + stateKey: exporterConfig.StateKey, portClient: portClient, stopCh: signal.SetupSignalHandler(), } @@ -106,7 +106,7 @@ func (c *ControllersHandler) RunDeleteStaleEntities() { klog.Errorf("error authenticating with Port: %v", err) } - err = c.portClient.DeleteStaleEntities(context.Background(), c.exporterConfig.StateKey, goutils.MergeMaps(currentEntitiesSet...)) + err = c.portClient.DeleteStaleEntities(context.Background(), c.stateKey, goutils.MergeMaps(currentEntitiesSet...)) if err != nil { klog.Errorf("error deleting stale entities: %s", err.Error()) } diff --git a/pkg/port/integration/integration.go b/pkg/port/integration/integration.go index 06241e6..1e92d18 100644 --- a/pkg/port/integration/integration.go +++ b/pkg/port/integration/integration.go @@ -7,13 +7,13 @@ import ( "github.com/port-labs/port-k8s-exporter/pkg/port/cli" ) -func NewIntegration(portClient *cli.PortClient, exporterConfig *port.Config, resources []port.Resource) error { +func NewIntegration(portClient *cli.PortClient, stateKey string, eventListenerType string, resources []port.Resource) error { integration := &port.Integration{ - Title: exporterConfig.StateKey, + Title: stateKey, InstallationAppType: "K8S EXPORTER", - InstallationId: exporterConfig.StateKey, + InstallationId: stateKey, EventListener: port.EventListenerSettings{ - Type: exporterConfig.EventListenerType, + Type: eventListenerType, }, Config: &port.AppConfig{ Resources: resources,