Skip to content

Commit

Permalink
unified configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Dec 13, 2023
1 parent f1ef258 commit 7aae882
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 122 deletions.
39 changes: 9 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,14 @@ import (
"k8s.io/klog/v2"
)

var (
configFilePath string
resyncInterval uint
eventListenerType string
stateKey string
deleteDependents bool
createMissingRelatedEntities bool
portBaseURL string
portClientId string
portClientSecret string
)

func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
apiConfig, err := integration.GetIntegrationConfig(portClient, stateKey)
apiConfig, err := integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
if err != nil {
klog.Fatalf("Error getting K8s integration config: %s", err.Error())
}

cli.WithDeleteDependents(deleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(createMissingRelatedEntities)(portClient)
cli.WithDeleteDependents(apiConfig.DeleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(apiConfig.CreateMissingRelatedEntities)(portClient)

newHandler := handlers.NewControllersHandler(exporterConfig, apiConfig, k8sClient, portClient)
newHandler.Handle()
Expand All @@ -56,21 +44,21 @@ func main() {
klog.Fatalf("Error building K8s client: %s", err.Error())
}

portClient, err := cli.New(portBaseURL,
cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
portClient, err := cli.New(config.ApplicationConfig.PortBaseURL,
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", config.ApplicationConfig.StateKey)),
)

if err != nil {
klog.Fatalf("Error building Port client: %s", err.Error())
}

exporterConfig, err := config.GetConfigFile(configFilePath, resyncInterval, stateKey, eventListenerType)
exporterConfig, err := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType)
if err != nil {
klog.Fatalf("Error building Port K8s Exporter config: %s", err.Error())
}

_, err = integration.GetIntegrationConfig(portClient, stateKey)
_, err = integration.GetIntegrationConfig(portClient, config.ApplicationConfig.StateKey)
if err != nil {
if exporterConfig == nil {
klog.Fatalf("The integration does not exist and no config file was provided")
Expand All @@ -83,7 +71,7 @@ func main() {

klog.Info("Starting controllers handler")
handler, _ := initiateHandler(exporterConfig, k8sClient, portClient)
eventListener := event_listener.NewEventListener(stateKey, eventListenerType, handler, portClient)
eventListener := event_listener.NewEventListener(config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType, handler, portClient)
err = eventListener.Start(func(handler *handlers.ControllersHandler) (*handlers.ControllersHandler, error) {
handler.Stop()
return initiateHandler(exporterConfig, k8sClient, portClient)
Expand All @@ -94,14 +82,5 @@ func main() {
}

func init() {
configFilePath = config.NewString("config", "", "Path to Port K8s Exporter config file. Required.")
stateKey = config.NewString("state-key", "", "Port K8s Exporter state key id. Required.")

resyncInterval = config.NewUInt("resync-interval", 0, "The re-sync interval in minutes. Optional.")
portBaseURL = config.NewString("port-base-url", "https://api.getport.io", "Port base URL. Optional.")

portClientId = config.NewString("port-client-id", "", "Port client id. Required.")
portClientSecret = config.NewString("port-client-secret", "", "Port client secret. Required.")

eventListenerType = config.NewString("event-listener-type", "POLLING", "Event listener type. Optional.")
}
82 changes: 14 additions & 68 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,72 +1,18 @@
package config

import (
"flag"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
"os"
"slices"
"strings"
)

var keys []string

func prepareEnvKey(key string) string {
newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_"))

if slices.Contains(keys, newKey) {
klog.Fatalf("Application Error : Found duplicate config key: %s", newKey)
}

keys = append(keys, newKey)
return newKey
var KafkaConfig = &KafkaConfiguration{
Brokers: NewString("event-listener-brokers", "localhost:9092", "Kafka brokers"),
SecurityProtocol: NewString("event-listener-security-protocol", "plaintext", "Kafka security protocol"),
AuthenticationMechanism: NewString("event-listener-authentication-mechanism", "none", "Kafka authentication mechanism"),
}

func NewString(key string, defaultValue string, description string) string {
var value string
flag.StringVar(&value, key, "", description)
if value == "" {
value = goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue)
}

return value
}

func NewUInt(key string, defaultValue uint, description string) uint {
var value uint64
flag.Uint64Var(&value, key, 0, description)
if value == 0 {
value = goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue))
}

return uint(value)
}

type FileNotFoundError struct {
s string
}

func (e *FileNotFoundError) Error() string {
return e.s
}

func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) {
c := &port.Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
EventListenerType: eventListenerType,
}
config, err := os.ReadFile(filepath)
if err != nil {
return c, &FileNotFoundError{err.Error()}
}

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}

return c, nil
var PollingListenerRate = NewUInt("event-listener-polling-rate", 60, "Polling rate for the polling event listener")

var ApplicationConfig = &ApplicationConfiguration{
ConfigFilePath: NewString("config", "", "Path to Port K8s Exporter config file. Required."),
StateKey: NewString("state-key", "", "Port K8s Exporter state key id. Required."),
ResyncInterval: NewUInt("resync-interval", 0, "The re-sync interval in minutes. Optional."),
PortBaseURL: NewString("port-base-url", "https://api.getport.io", "Port base URL. Optional."),
PortClientId: NewString("port-client-id", "", "Port client id. Required."),
PortClientSecret: NewString("port-client-secret", "", "Port client secret. Required."),
EventListenerType: NewString("event-listener-type", "POLLING", "Event listener type. Optional."),
}
21 changes: 21 additions & 0 deletions pkg/config/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package config

type KafkaConfiguration struct {
Brokers string
SecurityProtocol string
GroupID string
AuthenticationMechanism string
Username string
Password string
KafkaSecurityEnabled bool
}

type ApplicationConfiguration struct {
ConfigFilePath string
StateKey string
ResyncInterval uint
PortBaseURL string
PortClientId string
PortClientSecret string
EventListenerType string
}
72 changes: 72 additions & 0 deletions pkg/config/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package config

import (
"flag"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
"os"
"slices"

Check failure on line 10 in pkg/config/utils.go

View workflow job for this annotation

GitHub Actions / build

package slices is not in GOROOT (/opt/hostedtoolcache/go/1.19.13/x64/src/slices)
"strings"
)

var keys []string

func prepareEnvKey(key string) string {
newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_"))

if slices.Contains(keys, newKey) {
klog.Fatalf("Application Error : Found duplicate config key: %s", newKey)
}

keys = append(keys, newKey)
return newKey
}

func NewString(key string, defaultValue string, description string) string {
var value string
flag.StringVar(&value, key, "", description)
if value == "" {
value = goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue)
}

return value
}

func NewUInt(key string, defaultValue uint, description string) uint {
var value uint64
flag.Uint64Var(&value, key, 0, description)
if value == 0 {
value = goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue))
}

return uint(value)
}

type FileNotFoundError struct {
s string
}

func (e *FileNotFoundError) Error() string {
return e.s
}

func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) {
c := &port.Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
EventListenerType: eventListenerType,
}
config, err := os.ReadFile(filepath)
if err != nil {
return c, &FileNotFoundError{err.Error()}
}

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}

return c, nil
}
13 changes: 2 additions & 11 deletions pkg/event_listener/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consumer

import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"k8s.io/klog/v2"
"os"
"os/signal"
Expand All @@ -12,19 +13,9 @@ type Consumer struct {
client *kafka.Consumer
}

type KafkaConfiguration struct {
Brokers string
SecurityProtocol string
GroupID string
AuthenticationMechanism string
Username string
Password string
KafkaSecurityEnabled bool
}

type JsonHandler func(value []byte)

func NewConsumer(config *KafkaConfiguration) (*Consumer, error) {
func NewConsumer(config *config.KafkaConfiguration) (*Consumer, error) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": config.Brokers,
"group.id": config.GroupID,
Expand Down
19 changes: 6 additions & 13 deletions pkg/event_listener/event_listener_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ type EventListener struct {
portClient *cli.PortClient
}

var kafkaConfig = &consumer.KafkaConfiguration{
Brokers: config.NewString("event-listener-brokers", "localhost:9092", "Kafka brokers"),
SecurityProtocol: config.NewString("event-listener-security-protocol", "plaintext", "Kafka security protocol"),
AuthenticationMechanism: config.NewString("event-listener-authentication-mechanism", "none", "Kafka authentication mechanism"),
}
var pollingListenerRate = config.NewUInt("event-listener-polling-rate", 60, "Polling rate for the polling event listener")

func shouldResync(stateKey string, message *IncomingMessage) bool {
return message.Diff != nil &&
message.Diff.After != nil &&
Expand Down Expand Up @@ -69,10 +62,10 @@ func startKafkaEventListener(l *EventListener, resync func()) error {
return err
}

c := &consumer.KafkaConfiguration{
Brokers: kafkaConfig.Brokers,
SecurityProtocol: kafkaConfig.SecurityProtocol,
AuthenticationMechanism: kafkaConfig.AuthenticationMechanism,
c := &config.KafkaConfiguration{
Brokers: config.KafkaConfig.Brokers,
SecurityProtocol: config.KafkaConfig.SecurityProtocol,
AuthenticationMechanism: config.KafkaConfig.AuthenticationMechanism,
Username: credentials.Username,
Password: credentials.Password,
GroupID: orgId + ".k8s." + l.stateKey,
Expand Down Expand Up @@ -102,8 +95,8 @@ func startKafkaEventListener(l *EventListener, resync func()) error {

func startPollingEventListener(l *EventListener, resync func()) {
klog.Infof("Starting polling event listener")
klog.Infof("Polling rate set to %d seconds", pollingListenerRate)
pollingHandler := polling.NewPollingHandler(pollingListenerRate, l.stateKey, l.portClient)
klog.Infof("Polling rate set to %d seconds", config.PollingListenerRate)
pollingHandler := polling.NewPollingHandler(config.PollingListenerRate, l.stateKey, l.portClient)
pollingHandler.Run(resync)
}

Expand Down

0 comments on commit 7aae882

Please sign in to comment.