Skip to content

Commit

Permalink
dep injection
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Dec 18, 2023
1 parent 80421b1 commit d0e4f66
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 68 deletions.
39 changes: 20 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"flag"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_listener/polling"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler/polling"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/k8s"
"github.com/port-labs/port-k8s-exporter/pkg/port"
Expand All @@ -16,7 +16,7 @@ import (
)

func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
apiConfig, err := integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
apiConfig, err := integration.GetIntegrationConfig(portClient, exporterConfig.StateKey)
if err != nil {
klog.Fatalf("Error getting K8s integration config: %s", err.Error())
}
Expand All @@ -30,24 +30,29 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli
return newHandler, nil
}

func CreateEventListener(portClient *cli.PortClient) (event_listener.IEventListener, error) {
klog.Infof("Received event listener type: %s", config.ApplicationConfig.EventListenerType)
switch config.ApplicationConfig.EventListenerType {
func createEventListener(stateKey string, eventListenerType string, portClient *cli.PortClient) (event_handler.IListener, error) {
klog.Infof("Received event listener type: %s", eventListenerType)
switch eventListenerType {
case "KAFKA":
return consumer.NewEventListener(portClient)
return consumer.NewEventListener(stateKey, portClient)
case "POLLING":
return polling.NewEventListener(portClient), nil
return polling.NewEventListener(stateKey, portClient), nil
default:
return nil, fmt.Errorf("unknown event listener type: %s", config.ApplicationConfig.EventListenerType)
return nil, fmt.Errorf("unknown event listener type: %s", eventListenerType)
}

}

func initIntegrationConfig() {

}

func main() {
klog.InitFlags(nil)

k8sConfig := k8s.NewKubeConfig()

exporterConfig, _ := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType)
clientConfig, err := k8sConfig.ClientConfig()
if err != nil {
klog.Fatalf("Error getting K8s client config: %s", err.Error())
Expand All @@ -60,35 +65,31 @@ func main() {

portClient, err := cli.New(config.ApplicationConfig.PortBaseURL,
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey)),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", exporterConfig.StateKey)),
)

if err != nil {
klog.Fatalf("Error building Port client: %s", err.Error())
}

exporterConfig, _ := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType)

_, err = integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
_, err = integration.GetIntegrationConfig(portClient, exporterConfig.StateKey)
if err != nil {
if exporterConfig == nil {
klog.Fatalf("The integration does not exist and no config file was provided")
}
err = integration.NewIntegration(portClient, exporterConfig, exporterConfig.Resources)
err = integration.NewIntegration(portClient, exporterConfig.StateKey, exporterConfig.EventListenerType, exporterConfig.Resources)
if err != nil {
klog.Fatalf("Error creating K8s integration: %s", err.Error())
}
}

eventListener, err := CreateEventListener(portClient)
eventListener, err := createEventListener(exporterConfig.StateKey, exporterConfig.EventListenerType, portClient)
if err != nil {
klog.Fatalf("Error creating event listener: %s", err.Error())
}

klog.Info("Starting controllers handler")
handler, _ := initiateHandler(exporterConfig, k8sClient, portClient)
err = event_listener.StartEventHandler(eventListener, handler, func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) {
handler.Stop()
err = event_handler.StartEventHandler(eventListener, func() (event_handler.IStoppableRsync, error) {
return initiateHandler(exporterConfig, k8sClient, portClient)
})

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type IncomingMessage struct {
} `json:"diff"`
}

func NewEventListener(portClient *cli.PortClient) (*EventListener, error) {
func NewEventListener(stateKey string, portClient *cli.PortClient) (*EventListener, error) {
klog.Infof("Getting Consumer Information")
credentials, err := kafka_credentials.GetKafkaCredentials(portClient)
if err != nil {
Expand All @@ -43,7 +43,7 @@ func NewEventListener(portClient *cli.PortClient) (*EventListener, error) {
AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism,
Username: credentials.Username,
Password: credentials.Password,
GroupID: orgId + ".k8s." + config.ApplicationConfig.StateKey,
GroupID: orgId + ".k8s." + stateKey,
}

topic := orgId + ".change.log"
Expand All @@ -53,7 +53,7 @@ func NewEventListener(portClient *cli.PortClient) (*EventListener, error) {
}

return &EventListener{
stateKey: config.ApplicationConfig.StateKey,
stateKey: stateKey,
portClient: portClient,
topic: topic,
consumer: instance,
Expand Down
33 changes: 33 additions & 0 deletions pkg/event_handler/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package event_handler

import (
"fmt"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)

type IListener interface {
Run(resync func()) error
}

type IStoppableRsync interface {
Stop()
}

func StartEventHandler(eventListener IListener, controllerHandlerFactory func() (IStoppableRsync, error)) error {
controllerHandler, err := controllerHandlerFactory()
if err != nil {
return err
}

return eventListener.Run(func() {
klog.Infof("Resync request received. Recreating controllers for the new port configuration")
controllerHandler.Stop()
newController, resyncErr := controllerHandlerFactory()
controllerHandler = newController

if resyncErr != nil {
utilruntime.HandleError(fmt.Errorf("error resyncing: %s", resyncErr.Error()))
}
})
}
13 changes: 13 additions & 0 deletions pkg/event_handler/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package event_handler

import (
"testing"
)

type fixture struct {
t *testing.T
}

func TestStartKafkaEventListener(t *testing.T) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type EventListener struct {
portClient *cli.PortClient
}

func NewEventListener(portClient *cli.PortClient) *EventListener {
func NewEventListener(stateKey string, portClient *cli.PortClient) *EventListener {
return &EventListener{
stateKey: config.ApplicationConfig.StateKey,
stateKey: stateKey,
portClient: portClient,
}
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ func NewFixture(t *testing.T, c chan time.Time) *Fixture {
}

_ = integration.DeleteIntegration(portClient, stateKey)
err = integration.NewIntegration(portClient, &port.Config{
StateKey: stateKey,
}, []port.Resource{})
err = integration.NewIntegration(portClient, stateKey, "", []port.Resource{})
if err != nil {
t.Errorf("Error creating Port integration: %s", err.Error())
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/event_listener/event_listener_handler.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type ControllersHandler struct {
controllers []*k8s.Controller
informersFactory dynamicinformer.DynamicSharedInformerFactory
exporterConfig *port.Config
stateKey string
portClient *cli.PortClient
stopCh chan struct{}
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.AppConf
controllersHandler := &ControllersHandler{
controllers: controllers,
informersFactory: informersFactory,
exporterConfig: exporterConfig,
stateKey: exporterConfig.StateKey,
portClient: portClient,
stopCh: signal.SetupSignalHandler(),
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *ControllersHandler) RunDeleteStaleEntities() {
klog.Errorf("error authenticating with Port: %v", err)
}

err = c.portClient.DeleteStaleEntities(context.Background(), c.exporterConfig.StateKey, goutils.MergeMaps(currentEntitiesSet...))
err = c.portClient.DeleteStaleEntities(context.Background(), c.stateKey, goutils.MergeMaps(currentEntitiesSet...))
if err != nil {
klog.Errorf("error deleting stale entities: %s", err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/port/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
)

func NewIntegration(portClient *cli.PortClient, exporterConfig *port.Config, resources []port.Resource) error {
func NewIntegration(portClient *cli.PortClient, stateKey string, eventListenerType string, resources []port.Resource) error {
integration := &port.Integration{
Title: exporterConfig.StateKey,
Title: stateKey,
InstallationAppType: "K8S EXPORTER",
InstallationId: exporterConfig.StateKey,
InstallationId: stateKey,
EventListener: port.EventListenerSettings{
Type: exporterConfig.EventListenerType,
Type: eventListenerType,
},
Config: &port.AppConfig{
Resources: resources,
Expand Down

0 comments on commit d0e4f66

Please sign in to comment.