Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port 5363 UI for the kubernetes exporter moving the config into port #19

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7e0188b
Added support for control the app config from the UI using Polling an…
yairsimantov20 Dec 11, 2023
e7079db
fixed miss type
yairsimantov20 Dec 11, 2023
311502d
Renaming
yairsimantov20 Dec 11, 2023
d84ad22
updated typing
yairsimantov20 Dec 11, 2023
a32b98b
at the start of polling getting the current state
yairsimantov20 Dec 11, 2023
a04dfd1
fixed schema & logs
yairsimantov20 Dec 11, 2023
25254fb
revert useragent addition
yairsimantov20 Dec 12, 2023
1eef152
fixes
yairsimantov20 Dec 13, 2023
97edf37
cr requests
yairsimantov20 Dec 13, 2023
56bad41
better config management
yairsimantov20 Dec 13, 2023
eb24061
removed bool config managment
yairsimantov20 Dec 13, 2023
296e170
better error for config file
yairsimantov20 Dec 13, 2023
2c21603
moved dependent and create missing to the app config
yairsimantov20 Dec 13, 2023
62a731d
starting kafka config once
yairsimantov20 Dec 13, 2023
f1ef258
starting kafka config once
yairsimantov20 Dec 13, 2023
7aae882
unified configuration
yairsimantov20 Dec 13, 2023
5be6bdb
fixed configuration flag issue
yairsimantov20 Dec 13, 2023
a6ced85
not failing for non-existing configuration on startup
yairsimantov20 Dec 14, 2023
1db8ded
change imports
yairsimantov20 Dec 14, 2023
7498948
changing offset reset to latest
yairsimantov20 Dec 14, 2023
f4e8dd5
starting resync in goroutine
yairsimantov20 Dec 14, 2023
6326254
revert moving the handle
yairsimantov20 Dec 17, 2023
bbb39ee
revert moving the handle
yairsimantov20 Dec 17, 2023
a1e4031
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
3e3c91c
iconsume and consumer tests
yairsimantov20 Dec 18, 2023
80421b1
dependency injection and tests
yairsimantov20 Dec 18, 2023
d0e4f66
dep injection
yairsimantov20 Dec 18, 2023
a522369
event handler tests
yairsimantov20 Dec 20, 2023
1838961
file renaming
yairsimantov20 Dec 20, 2023
9879466
removed function
yairsimantov20 Dec 20, 2023
2cf5c27
cr
yairsimantov20 Dec 20, 2023
d4816cd
cr
yairsimantov20 Dec 20, 2023
ae6ccad
closing after one message consumer
yairsimantov20 Dec 20, 2023
06451f1
closing after one message consumer
yairsimantov20 Dec 20, 2023
86ec4e7
cr
yairsimantov20 Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ module github.com/port-labs/port-k8s-exporter
go 1.19

require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/go-resty/resty/v2 v2.7.0
github.com/google/uuid v1.3.0
github.com/itchyny/gojq v0.12.9
github.com/stretchr/testify v1.8.2
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
k8s.io/client-go v0.25.2
k8s.io/klog/v2 v2.80.1
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
)

require (
Expand All @@ -36,19 +40,19 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
Expand Down
109 changes: 99 additions & 10 deletions go.sum

Large diffs are not rendered by default.

91 changes: 55 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,52 @@ package main
import (
"flag"
"fmt"
"os"

"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler/polling"
"github.com/port-labs/port-k8s-exporter/pkg/handlers"
"github.com/port-labs/port-k8s-exporter/pkg/k8s"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"github.com/port-labs/port-k8s-exporter/pkg/signal"
"k8s.io/klog/v2"
)

var (
configFilePath string
resyncInterval uint
stateKey string
deleteDependents bool
createMissingRelatedEntities bool
portBaseURL string
portClientId string
portClientSecret string
)
func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
apiConfig, err := integration.GetIntegrationConfig(portClient, exporterConfig.StateKey)
if err != nil {
klog.Fatalf("Error getting K8s integration config: %s", err.Error())
}

func main() {
klog.InitFlags(nil)
flag.Parse()
cli.WithDeleteDependents(apiConfig.DeleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(apiConfig.CreateMissingRelatedEntities)(portClient)

stopCh := signal.SetupSignalHandler()
newHandler := handlers.NewControllersHandler(exporterConfig, apiConfig, k8sClient, portClient)
newHandler.Handle()

exporterConfig, err := config.New(configFilePath, resyncInterval, stateKey)
if err != nil {
klog.Fatalf("Error building Port K8s Exporter config: %s", err.Error())
return newHandler, nil
}

func createEventListener(stateKey string, eventListenerType string, portClient *cli.PortClient) (event_handler.IListener, error) {
klog.Infof("Received event listener type: %s", eventListenerType)
switch eventListenerType {
case "KAFKA":
return consumer.NewEventListener(stateKey, portClient)
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
case "POLLING":
return polling.NewEventListener(stateKey, portClient), nil
default:
return nil, fmt.Errorf("unknown event listener type: %s", eventListenerType)
}

}

func main() {
klog.InitFlags(nil)

k8sConfig := k8s.NewKubeConfig()

exporterConfig, _ := config.GetConfigFile(config.ApplicationConfig.ConfigFilePath, config.ApplicationConfig.ResyncInterval, config.ApplicationConfig.StateKey, config.ApplicationConfig.EventListenerType)
clientConfig, err := k8sConfig.ClientConfig()
if err != nil {
klog.Fatalf("Error getting K8s client config: %s", err.Error())
Expand All @@ -48,34 +59,42 @@ func main() {
klog.Fatalf("Error building K8s client: %s", err.Error())
}

portClient, err := cli.New(portBaseURL,
portClient, err := cli.New(config.ApplicationConfig.PortBaseURL,
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", exporterConfig.StateKey)),
cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret),
cli.WithDeleteDependents(deleteDependents), cli.WithCreateMissingRelatedEntities(createMissingRelatedEntities),
)

if err != nil {
klog.Fatalf("Error building Port client: %s", err.Error())
}

err = integration.NewIntegration(portClient, stateKey)
_, err = integration.GetIntegrationConfig(portClient, exporterConfig.StateKey)
if err != nil {
klog.Fatalf("Error creating K8s integration: %s", err.Error())
if exporterConfig == nil {
klog.Fatalf("The integration does not exist and no config file was provided")
}
err = integration.NewIntegration(portClient, exporterConfig.StateKey, exporterConfig.EventListenerType, exporterConfig.Resources)
if err != nil {
klog.Fatalf("Error creating K8s integration: %s", err.Error())
}
}

eventListener, err := createEventListener(exporterConfig.StateKey, exporterConfig.EventListenerType, portClient)
if err != nil {
klog.Fatalf("Error creating event listener: %s", err.Error())
}

klog.Info("Starting controllers handler")
controllersHandler := handlers.NewControllersHandler(exporterConfig, k8sClient, portClient)
controllersHandler.Handle(stopCh)
klog.Info("Started controllers handler")
err = event_handler.Start(eventListener, func() (event_handler.IStoppableRsync, error) {
return initiateHandler(exporterConfig, k8sClient, portClient)
})

if err != nil {
klog.Fatalf("Error starting event listener: %s", err.Error())
yairsimantov20 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func init() {
flag.StringVar(&configFilePath, "config", "", "Path to Port K8s Exporter config file. Required.")
flag.StringVar(&stateKey, "state-key", "", "Port K8s Exporter state key id. Required.")
flag.BoolVar(&deleteDependents, "delete-dependents", false, "Flag to enable deletion of dependent Port Entities. Optional.")
flag.BoolVar(&createMissingRelatedEntities, "create-missing-related-entities", false, "Flag to enable creation of missing related Port entities. Optional.")
flag.UintVar(&resyncInterval, "resync-interval", 0, "The re-sync interval in minutes. Optional.")
flag.StringVar(&portBaseURL, "port-base-url", "https://api.getport.io", "Port base URL. Optional.")
portClientId = os.Getenv("PORT_CLIENT_ID")
portClientSecret = os.Getenv("PORT_CLIENT_SECRET")
config.Init()
flag.Parse()
}
79 changes: 22 additions & 57 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,25 @@
package config

import (
"github.com/port-labs/port-k8s-exporter/pkg/port"
"os"

"gopkg.in/yaml.v2"
)

type Entity struct {
Mappings []port.EntityMapping
}

type Port struct {
Entity Entity
}

type Selector struct {
Query string
}

type Resource struct {
Kind string
Selector Selector
Port Port
}

type Config struct {
Resources []Resource
ResyncInterval uint
StateKey string
}

type KindConfig struct {
Selector Selector
Port Port
}

type AggregatedResource struct {
Kind string
KindConfigs []KindConfig
}

func New(filepath string, resyncInterval uint, stateKey string) (*Config, error) {
c := &Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
}
config, err := os.ReadFile(filepath)
if err != nil {
return nil, err
}

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}

return c, nil
var KafkaConfig = &KafkaConfiguration{}
var PollingListenerRate uint

var ApplicationConfig = &ApplicationConfiguration{}

func Init() {
// Kafka listener Configuration
NewString(&KafkaConfig.Brokers, "event-listener-brokers", "localhost:9092", "Kafka event listener brokers")
NewString(&KafkaConfig.SecurityProtocol, "event-listener-security-protocol", "plaintext", "Kafka event listener security protocol")
NewString(&KafkaConfig.AuthenticationMechanism, "event-listener-authentication-mechanism", "none", "Kafka event listener authentication mechanism")

// Polling listener Configuration
NewUInt(&PollingListenerRate, "event-listener-polling-rate", 60, "Polling event listener polling rate")

// Application Configuration
NewString(&ApplicationConfig.ConfigFilePath, "config", "config.yaml", "Path to Port K8s Exporter config file. Required.")
NewString(&ApplicationConfig.StateKey, "state-key", "my-k8s-exporter", "Port K8s Exporter state key id. Required.")
NewUInt(&ApplicationConfig.ResyncInterval, "resync-interval", 0, "The re-sync interval in minutes. Optional.")
NewString(&ApplicationConfig.PortBaseURL, "port-base-url", "https://api.getport.io", "Port base URL. Optional.")
NewString(&ApplicationConfig.PortClientId, "port-client-id", "", "Port client id. Required.")
NewString(&ApplicationConfig.PortClientSecret, "port-client-secret", "", "Port client secret. Required.")
NewString(&ApplicationConfig.EventListenerType, "event-listener-type", "POLLING", "Event listener type, can be either POLLING or KAFKA. Optional.")
}
21 changes: 21 additions & 0 deletions pkg/config/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package config

type KafkaConfiguration struct {
Brokers string
SecurityProtocol string
GroupID string
AuthenticationMechanism string
Username string
Password string
KafkaSecurityEnabled bool
}

type ApplicationConfiguration struct {
ConfigFilePath string
StateKey string
ResyncInterval uint
PortBaseURL string
PortClientId string
PortClientSecret string
EventListenerType string
}
62 changes: 62 additions & 0 deletions pkg/config/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package config

import (
"flag"
"github.com/port-labs/port-k8s-exporter/pkg/goutils"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
"k8s.io/utils/strings/slices"
"os"
"strings"
)

var keys []string

func prepareEnvKey(key string) string {
newKey := strings.ToUpper(strings.ReplaceAll(key, "-", "_"))

if slices.Contains(keys, newKey) {
klog.Fatalf("Application Error : Found duplicate config key: %s", newKey)
}

keys = append(keys, newKey)
return newKey
}

func NewString(v *string, key string, defaultValue string, description string) {
value := goutils.GetStringEnvOrDefault(prepareEnvKey(key), defaultValue)
flag.StringVar(v, key, value, description)
}

func NewUInt(v *uint, key string, defaultValue uint, description string) {
value := uint(goutils.GetUintEnvOrDefault(prepareEnvKey(key), uint64(defaultValue)))
flag.UintVar(v, key, value, description)
}

type FileNotFoundError struct {
s string
}

func (e *FileNotFoundError) Error() string {
return e.s
}

func GetConfigFile(filepath string, resyncInterval uint, stateKey string, eventListenerType string) (*port.Config, error) {
c := &port.Config{
ResyncInterval: resyncInterval,
StateKey: stateKey,
EventListenerType: eventListenerType,
}
config, err := os.ReadFile(filepath)
if err != nil {
return c, &FileNotFoundError{err.Error()}
}

err = yaml.Unmarshal(config, c)
if err != nil {
return nil, err
}

return c, nil
}
Loading