Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Policy Assistant] Add support for k8s native workload traffic #227

Merged
merged 1 commit into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cmd/policy-assistant/pkg/kube/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -112,6 +113,43 @@ func (k *Kubernetes) CreateNetworkPolicy(policy *networkingv1.NetworkPolicy) (*n
return createdPolicy, errors.Wrapf(err, "unable to create network policy %s/%s", policy.Namespace, policy.Name)
}

func (k *Kubernetes) GetDeploymentsInNamespace(namespace string) ([]appsv1.Deployment, error) {
deploymentList, err := k.ClientSet.AppsV1().Deployments(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get deployments in namespace %s", namespace)
}
return deploymentList.Items, nil
}

func (k *Kubernetes) GetDaemonSetsInNamespace(namespace string) ([]appsv1.DaemonSet, error) {
daemonSetList, err := k.ClientSet.AppsV1().DaemonSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get daemonSets in namespace %s", namespace)
}
return daemonSetList.Items, nil
}

func (k *Kubernetes) GetStatefulSetsInNamespace(namespace string) ([]appsv1.StatefulSet, error) {
statefulSetList, err := k.ClientSet.AppsV1().StatefulSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get StatefulSets in namespace %s", namespace)
}
return statefulSetList.Items, nil
}

func (k *Kubernetes) GetReplicaSetsInNamespace(namespace string) ([]appsv1.ReplicaSet, error) {
replicaSetList, err := k.ClientSet.AppsV1().ReplicaSets(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "unable to get ReplicaSets in namespace %s", namespace)
}
return replicaSetList.Items, nil
}

func (k *Kubernetes) GetReplicaSet(namespace string, name string) (*appsv1.ReplicaSet, error) {
replicaSet, err := k.ClientSet.AppsV1().ReplicaSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
return replicaSet, errors.Wrapf(err, "unable to get replicaSet %s/%s", namespace, name)
}

func (k *Kubernetes) GetService(namespace string, name string) (*v1.Service, error) {
service, err := k.ClientSet.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
return service, errors.Wrapf(err, "unable to get service %s/%s", namespace, name)
Expand Down
268 changes: 265 additions & 3 deletions cmd/policy-assistant/pkg/matcher/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"strings"

"github.com/mattfenwick/collections/pkg/slice"
"github.com/mattfenwick/cyclonus/pkg/kube"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to import policy-assistant's kube package instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is related to this issue #170

"github.com/mattfenwick/cyclonus/pkg/utils"
"github.com/olekukonko/tablewriter"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
v1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -57,7 +60,8 @@ func labelsToString(labels map[string]string) string {

type TrafficPeer struct {
gabrielggg marked this conversation as resolved.
Show resolved Hide resolved
Internal *InternalPeer
IP string
// IP external to cluster
IP string
}

func (p *TrafficPeer) Namespace() string {
Expand All @@ -71,10 +75,268 @@ func (p *TrafficPeer) IsExternal() bool {
return p.Internal == nil
}

func (p *TrafficPeer) Translate() TrafficPeer {
//Translates kubernetes workload types to TrafficPeers.
var podsNetworking []*PodNetworking
var podLabels map[string]string
var namespaceLabels map[string]string
var workloadOwner string
var workloadKind string
var internalPeer InternalPeer
workloadOwnerExists := false
workloadMetadata := strings.Split(strings.ToLower(p.Internal.Workload), "/")
if len(workloadMetadata) != 3 || (workloadMetadata[0] == "" || workloadMetadata[1] == "" || workloadMetadata[2] == "") || (workloadMetadata[1] != "daemonset" && workloadMetadata[1] != "statefulset" && workloadMetadata[1] != "replicaset" && workloadMetadata[1] != "deployment" && workloadMetadata[1] != "pod") {
logrus.Fatalf("Bad Workload structure: Types supported are pod, replicaset, deployment, daemonset, statefulset, and 3 fields are required with this structure, <namespace>/<workloadType>/<workloadName>")
}
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
ns, err := kubeClient.GetNamespace(workloadMetadata[0])
utils.DoOrDie(err)
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{workloadMetadata[0]})
gabrielggg marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", workloadMetadata[0], err)
}
for _, pod := range kubePods {
if workloadMetadata[1] == "deployment" && pod.OwnerReferences != nil && pod.OwnerReferences[0].Kind == "ReplicaSet" {
kubeReplicaSets, err := kubeClient.GetReplicaSet(workloadMetadata[0], pod.OwnerReferences[0].Name)
if err != nil {
logrus.Fatalf("unable to read Replicaset from kube, rs '%s': %+v", pod.OwnerReferences[0].Name, err)
}
if kubeReplicaSets.OwnerReferences != nil {
workloadOwner = kubeReplicaSets.OwnerReferences[0].Name
workloadKind = "deployment"
}

} else if (workloadMetadata[1] == "daemonset" || workloadMetadata[1] == "statefulset" || workloadMetadata[1] == "replicaset") && pod.OwnerReferences != nil {
gabrielggg marked this conversation as resolved.
Show resolved Hide resolved
workloadOwner = pod.OwnerReferences[0].Name
workloadKind = pod.OwnerReferences[0].Kind
} else if workloadMetadata[1] == "pod" {
workloadOwner = pod.Name
workloadKind = "pod"
}
if strings.ToLower(workloadOwner) == workloadMetadata[2] && strings.ToLower(workloadKind) == workloadMetadata[1] {
podLabels = pod.Labels
namespaceLabels = ns.Labels
podNetworking := PodNetworking{
IP: pod.Status.PodIP,
}
podsNetworking = append(podsNetworking, &podNetworking)
workloadOwnerExists = true

}
}

if !workloadOwnerExists {
logrus.Infof("workload not found on the cluster")
internalPeer = InternalPeer{
Workload: "",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the churn. Could we actually return an error here instead? I think that might be a more natural way to convey failure to find the specified workload. If we do this, we might as well return errors at these two locations too:

  1. ns, err := kubeClient.GetNamespace(workloadMetadata[0])
  2. kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{workloadMetadata[0]})

Copy link
Contributor Author

@gabrielggg gabrielggg Jun 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @huntergregory , thanks for the review, the problem here is that doing a fatal exits the program when a workload is scaled down to 0. for example if you have a deployment with 0 replicas it crashes there when you call the function to map all the deployments on the cluster to trafficpeers. Have you thought about that scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is nitpicky, and I'm noticing it requires a good amount of change, so honestly feel free to leave everything as is 🙂. I was more so suggesting that we could change the function signature to:

func (p *TrafficPeer) Translate() (*TrafficPeer, error) {

and instead of using logrus.Fatal(), we could start returning errors to be handled as needed:

return nil, fmt.Errorf("failed to get workload: %w", err)

But this might be a lot of work for little reward: if you replace logrus.Fatal() in this function, you might as well replace it in all functions. So feel free to leave as is 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example if you have a deployment with 0 replicas

Good call out. Do you mind calling this out somehow in the code too?

}
} else {
internalPeer = InternalPeer{
Workload: p.Internal.Workload,
PodLabels: podLabels,
NamespaceLabels: namespaceLabels,
Namespace: workloadMetadata[0],
Pods: podsNetworking,
}
}

TranslatedPeer := TrafficPeer{
Internal: &internalPeer,
}
return TranslatedPeer
}

func DeploymentsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with deployments to TrafficPeers.
var deploymentPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeDeployments, err := kubeClient.GetDeploymentsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read deployments from kube, ns '%s': %+v", namespace.Name, err)
}
for _, deployment := range kubeDeployments {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/deployment/" + deployment.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
deploymentPeers = append(deploymentPeers, tmpPeerTranslated)
}

}

}

return deploymentPeers
}

func DaemonSetsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with daemonSets to TrafficPeers.
var daemonSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeDaemonSets, err := kubeClient.GetDaemonSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read daemonSets from kube, ns '%s': %+v", namespace.Name, err)
}
for _, daemonSet := range kubeDaemonSets {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/daemonset/" + daemonSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
daemonSetPeers = append(daemonSetPeers, tmpPeerTranslated)
}
}

}

return daemonSetPeers
}

func StatefulSetsToTrafficPeers() []TrafficPeer {
//Translates all pods associated with statefulSets to TrafficPeers.
var statefulSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeStatefulSets, err := kubeClient.GetStatefulSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read statefulSets from kube, ns '%s': %+v", namespace.Name, err)
}
for _, statefulSet := range kubeStatefulSets {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/statefulset/" + statefulSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
statefulSetPeers = append(statefulSetPeers, tmpPeerTranslated)
}
}

}

return statefulSetPeers
}

func ReplicaSetsToTrafficPeers() []TrafficPeer {
gabrielggg marked this conversation as resolved.
Show resolved Hide resolved
//Translates all pods associated with replicaSets that are not associated with deployments to TrafficPeers.
var replicaSetPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubeReplicaSets, err := kubeClient.GetReplicaSetsInNamespace(namespace.Name)
if err != nil {
logrus.Fatalf("unable to read replicaSets from kube, ns '%s': %+v", namespace.Name, err)
}

for _, replicaSet := range kubeReplicaSets {
if replicaSet.OwnerReferences != nil {
continue
} else {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/replicaset/" + replicaSet.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
replicaSetPeers = append(replicaSetPeers, tmpPeerTranslated)
}

}
}

}

return replicaSetPeers
}

func PodsToTrafficPeers() []TrafficPeer {
//Translates all pods that are not associated with other workload types (deployment, replicaSet, daemonSet, statefulSet.) to TrafficPeers.
var podPeers []TrafficPeer
kubeClient, err := kube.NewKubernetesForContext("")
utils.DoOrDie(err)
kubeNamespaces, err := kubeClient.GetAllNamespaces()
if err != nil {
logrus.Fatalf("unable to read namespaces from kube: %+v", err)
}

for _, namespace := range kubeNamespaces.Items {
kubePods, err := kube.GetPodsInNamespaces(kubeClient, []string{namespace.Name})
if err != nil {
logrus.Fatalf("unable to read pods from kube, ns '%s': %+v", namespace.Name, err)
}
for _, pod := range kubePods {
if pod.OwnerReferences != nil {
continue
} else {
tmpInternalPeer := InternalPeer{
Workload: namespace.Name + "/pod/" + pod.Name,
}
tmpPeer := TrafficPeer{
Internal: &tmpInternalPeer,
}
tmpPeerTranslated := tmpPeer.Translate()
if tmpPeerTranslated.Internal.Workload != "" {
podPeers = append(podPeers, tmpPeerTranslated)
}
}
}

}

return podPeers
}

// Internal to cluster
type InternalPeer struct {
// optional: if set, will override remaining values with information from cluster
Workload string
PodLabels map[string]string
NamespaceLabels map[string]string
Namespace string
NodeLabels map[string]string
Node string
// optional
Pods []*PodNetworking
}

type PodNetworking struct {
IP string
// don't worry about populating below fields right now
IsHostNetworking bool
NodeLabels []string
}
Loading