Skip to content

Commit

Permalink
fix fatal both in client and in unstructured
Browse files Browse the repository at this point in the history
  • Loading branch information
danielsinai committed Jun 27, 2024
1 parent 2b4c0bf commit 582bd10
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 46 deletions.
20 changes: 8 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import (
"k8s.io/klog/v2"
)

func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portClient *cli.PortClient) (*handlers.ControllersHandler, error) {
func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client) (*handlers.ControllersHandler, error) {
portClient, err := cli.New()
if err != nil {
return nil, fmt.Errorf("error building Port client: %v", err)
}
i, err := integration.GetIntegration(portClient, exporterConfig.StateKey)
if err != nil {
return nil, fmt.Errorf("error getting Port integration: %v", err)
Expand All @@ -25,9 +29,6 @@ func initiateHandler(exporterConfig *port.Config, k8sClient *k8s.Client, portCli

}

cli.WithDeleteDependents(i.Config.DeleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(i.Config.CreateMissingRelatedEntities)(portClient)

newHandler := handlers.NewControllersHandler(exporterConfig, i.Config, k8sClient, portClient)
newHandler.Handle()

Expand All @@ -50,12 +51,7 @@ func main() {
if err != nil {
klog.Fatalf("Error building K8s client: %s", err.Error())
}

portClient, err := cli.New(config.ApplicationConfig.PortBaseURL,
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret),
cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/^0.3.4 (statekey/%s)", applicationConfig.StateKey)),
)

portClient, err := cli.New()
if err != nil {
klog.Fatalf("Error building Port client: %s", err.Error())
}
Expand All @@ -64,14 +60,14 @@ func main() {
klog.Fatalf("Error initializing Port integration: %s", err.Error())
}

eventListener, err := event_handler.CreateEventListener(applicationConfig.StateKey, applicationConfig.EventListenerType, portClient)
eventListener, err := event_handler.CreateEventListener(applicationConfig.StateKey, applicationConfig.EventListenerType)
if err != nil {
klog.Fatalf("Error creating event listener: %s", err.Error())
}

klog.Info("Starting controllers handler")
err = event_handler.Start(eventListener, func() (event_handler.IStoppableRsync, error) {
return initiateHandler(applicationConfig, k8sClient, portClient)
return initiateHandler(applicationConfig, k8sClient)
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/crd/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newFixture(t *testing.T, portClientId string, portClientSecret string, user
userAgent = "port-k8s-exporter/0.1"
}

portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", userAgent),
portClient, err := cli.New(cli.WithHeader("User-Agent", userAgent),
cli.WithClientID(portClientId), cli.WithClientSecret(portClientSecret))
deleteDefaultResources(portClient)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Fixture struct {

func NewFixture(t *testing.T) *Fixture {
stateKey := guuid.NewString()
portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
portClient, err := cli.New(cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret))
if err != nil {
t.Errorf("Error building Port client: %s", err.Error())
Expand Down
8 changes: 7 additions & 1 deletion pkg/event_handler/event_listener_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package event_handler

import (
"fmt"

"github.com/port-labs/port-k8s-exporter/pkg/event_handler/consumer"
"github.com/port-labs/port-k8s-exporter/pkg/event_handler/polling"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"k8s.io/klog/v2"
)

func CreateEventListener(stateKey string, eventListenerType string, portClient *cli.PortClient) (IListener, error) {
func CreateEventListener(stateKey string, eventListenerType string) (IListener, error) {
portClient, err := cli.New()
if err != nil {
return nil, fmt.Errorf("error building Port client: %v", err)
}

klog.Infof("Received event listener type: %s", eventListenerType)
switch eventListenerType {
case "KAFKA":
Expand Down
8 changes: 5 additions & 3 deletions pkg/event_handler/polling/polling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package polling

import (
"fmt"

_ "github.com/port-labs/port-k8s-exporter/test_utils"

"testing"
"time"

guuid "github.com/google/uuid"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/integration"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

type Fixture struct {
Expand All @@ -31,7 +33,7 @@ func (m *MockTicker) GetC() <-chan time.Time {

func NewFixture(t *testing.T, c chan time.Time) *Fixture {
stateKey := guuid.NewString()
portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
portClient, err := cli.New(cli.WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/0.1 (statekey/%s)", stateKey)),
cli.WithClientID(config.ApplicationConfig.PortClientId), cli.WithClientSecret(config.ApplicationConfig.PortClientSecret))
if err != nil {
t.Errorf("Error building Port client: %s", err.Error())
Expand Down
12 changes: 12 additions & 0 deletions pkg/goutils/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,15 @@ func StructToMap(obj interface{}) (newMap map[string]interface{}, err error) {
err = json.Unmarshal(data, &newMap)
return
}

func DeepCopy(obj interface{}) interface{} {
if obj == nil {
return nil
}

var newObj interface{}
data, _ := json.Marshal(obj)
json.Unmarshal(data, &newObj)

return newObj
}
2 changes: 1 addition & 1 deletion pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.Integra
}

informer := informersFactory.ForResource(gvr)
controller := k8s.NewController(port.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, portClient, informer, portConfig)
controller := k8s.NewController(port.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, informer, portConfig)
controllers = append(controllers, controller)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/jq/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func runJQQuery(jqQuery string, obj interface{}) (interface{}, error) {
klog.Warningf("failed to compile jq query: %s", jqQuery)
return nil, err
}

mutex.Lock()
queryRes, ok := code.Run(obj).Next()
deepClone := goutils.DeepCopy(obj)
queryRes, ok := code.Run(deepClone).Next()
mutex.Unlock()

if !ok {
Expand Down
14 changes: 11 additions & 3 deletions pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,21 @@ import (
"fmt"
"time"

"hash/fnv"
"strconv"

"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/jq"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
"github.com/port-labs/port-k8s-exporter/pkg/port/mapping"
"hash/fnv"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"strconv"

"encoding/json"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -49,7 +51,13 @@ type Controller struct {
workqueue workqueue.RateLimitingInterface
}

func NewController(resource port.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig) *Controller {
func NewController(resource port.AggregatedResource, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig) *Controller {
portClient, err := cli.New()
if err != nil {
klog.Fatalf("Error building Port client: %v", err)
}
cli.WithDeleteDependents(integrationConfig.DeleteDependents)(portClient)
cli.WithCreateMissingRelatedEntities(integrationConfig.CreateMissingRelatedEntities)(portClient)
controller := &Controller{
Resource: resource,
portClient: portClient,
Expand Down
33 changes: 13 additions & 20 deletions pkg/k8s/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package k8s
import (
"context"
"fmt"
"github.com/port-labs/port-k8s-exporter/pkg/jq"
"github.com/stretchr/testify/assert"
"reflect"
"strings"
"testing"
"time"

"github.com/port-labs/port-k8s-exporter/pkg/jq"
"github.com/stretchr/testify/assert"

"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port"
"github.com/port-labs/port-k8s-exporter/pkg/port/cli"
_ "github.com/port-labs/port-k8s-exporter/test_utils"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -21,8 +21,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -70,15 +69,9 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture {
fixtureConfig.userAgent = "port-k8s-exporter/0.1"
}

portClient, err := cli.New(config.ApplicationConfig.PortBaseURL, cli.WithHeader("User-Agent", fixtureConfig.userAgent),
cli.WithClientID(fixtureConfig.portClientId), cli.WithClientSecret(fixtureConfig.portClientSecret))
if err != nil {
t.Errorf("Error building Port client: %s", err.Error())
}

return &fixture{
t: t,
controller: newController(fixtureConfig.resource, fixtureConfig.objects, portClient, kubeclient, interationConfig),
controller: newController(fixtureConfig.resource, fixtureConfig.objects, kubeclient, interationConfig),
}
}

Expand All @@ -101,16 +94,16 @@ func newDeployment() *appsv1.Deployment {
"app": "port-k8s-exporter",
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "port-k8s-exporter",
Namespace: "port-k8s-exporter",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Selector: &v1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Expand All @@ -132,19 +125,19 @@ func newDeploymentWithCustomLabels(generation int64,
labels map[string]string,
) *appsv1.Deployment {
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Name: "port-k8s-exporter",
Namespace: "port-k8s-exporter",
GenerateName: generateName,
Generation: generation,
CreationTimestamp: creationTimestamp,
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
Selector: &v1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
ObjectMeta: v1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Expand All @@ -168,13 +161,13 @@ func newUnstructured(obj interface{}) *unstructured.Unstructured {
return &unstructured.Unstructured{Object: res}
}

func newController(resource port.Resource, objects []runtime.Object, portClient *cli.PortClient, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig) *Controller {
func newController(resource port.Resource, objects []runtime.Object, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig) *Controller {
k8sI := dynamicinformer.NewDynamicSharedInformerFactory(kubeclient, noResyncPeriodFunc())
s := strings.SplitN(resource.Kind, "/", 3)
gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]}
informer := k8sI.ForResource(gvr)
kindConfig := port.KindConfig{Selector: resource.Selector, Port: resource.Port}
c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, portClient, informer, integrationConfig)
c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig)

for _, d := range objects {
informer.Informer().GetIndexer().Add(d)
Expand Down
17 changes: 15 additions & 2 deletions pkg/port/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/go-resty/resty/v2"
"github.com/port-labs/port-k8s-exporter/pkg/config"
"github.com/port-labs/port-k8s-exporter/pkg/port"
)

Expand All @@ -21,10 +22,16 @@ type (
}
)

func New(baseURL string, opts ...Option) (*PortClient, error) {
func New(opts ...Option) (*PortClient, error) {
applicationConfig, err := config.NewConfiguration()

if err != nil {
return nil, err
}

c := &PortClient{
Client: resty.New().
SetBaseURL(baseURL).
SetBaseURL(config.ApplicationConfig.PortBaseURL).
SetRetryCount(5).
SetRetryWaitTime(300).
// retry when create permission fails because scopes are created async-ly and sometimes (mainly in tests) the scope doesn't exist yet.
Expand All @@ -40,9 +47,15 @@ func New(baseURL string, opts ...Option) (*PortClient, error) {
return err != nil || b["ok"] != true
}),
}

WithClientID(config.ApplicationConfig.PortClientId)(c)
WithClientSecret(config.ApplicationConfig.PortClientSecret)(c)
WithHeader("User-Agent", fmt.Sprintf("port-k8s-exporter/^0.3.4 (statekey/%s)", applicationConfig.StateKey))(c)

for _, opt := range opts {
opt(c)
}

return c, nil
}

Expand Down

0 comments on commit 582bd10

Please sign in to comment.