Skip to content
This repository was archived by the owner on Nov 29, 2024. It is now read-only.

Commit

Permalink
Merge pull request #2 from kurtosis-tech/laurent/multiple-namespaces
Browse files Browse the repository at this point in the history
feat: Multiple namespaces support
  • Loading branch information
laurentluce authored Oct 3, 2024
2 parents 556a03a + d90b42d commit c0da31c
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 89 deletions.
95 changes: 30 additions & 65 deletions kardinal/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,51 @@ func Reconcile(ctx context.Context, cl client.Client) error {
logrus.Info("Reconciling")

// Get k8s resources
namespaceStr := "baseline"
logrus.Infof("Get cluster resources for namespace %s", namespaceStr)
namespace, err := resources.GetNamespaceResources(ctx, namespaceStr, cl)
logrus.Infof("Get cluster resources")
clusterResources, err := resources.NewResourcesFromClient(ctx, cl)
if err != nil {
return stacktrace.Propagate(err, "An error occurred retrieving the list of resources for namespace %s", namespaceStr)
return stacktrace.Propagate(err, "An error occurred retrieving the list of resources")
}
// Generate base cluster topology
logrus.Info("Generate base cluster topology")
baseClusterTopology, err := topology.NewClusterTopologyFromResources(namespace.Services, namespace.Deployments, namespaceStr, namespaceStr)
version := "baseline"
baseClusterTopology, err := topology.NewClusterTopologyFromResources(clusterResources, version)
if err != nil {
return stacktrace.Propagate(err, "An error occurred generating the base cluster topology")
}

// Update base cluster topology with flows
patches := []*topology.ServicePatch{}
for _, flow := range namespace.Flows {
logrus.Infof("Processing flow %s", flow.Name)
service, err := baseClusterTopology.GetService(flow.Spec.Service)
if err != nil {
return stacktrace.Propagate(err, "An error occurred retrieving base cluster topology service %s", flow.Spec.Service)
}
deployment := resources.GetDeploymentFromName(service.ServiceID, namespace.Deployments)
deployment.Spec.Template.Spec.Containers[0].Image = flow.Spec.Image
patch := &topology.ServicePatch{
Service: flow.Spec.Service,
DeploymentSpec: &deployment.Spec,
}
patches = append(patches, patch)

flowPatch := &topology.FlowPatch{
FlowId: flow.GetObjectMeta().GetName(),
ServicePatches: patches,
}
err = baseClusterTopology.UpdateWithFlow(flowPatch)
if err != nil {
return stacktrace.Propagate(err, "An error occurred updating the base cluster topology with flow %s", flowPatch.FlowId)
}
}

// Reconcile
baseClusterTopologyResources, _ := baseClusterTopology.GetResources()
baseClusterTopologyNamespace := baseClusterTopologyResources[namespaceStr]

// Create missing resources
if baseClusterTopologyNamespace != nil {
for _, service := range baseClusterTopologyNamespace.Services {
if namespace.GetService(service.Name) == nil {
logrus.Infof("Creating service %s", service.Name)
_ = cl.Create(ctx, service)
for _, namespace := range clusterResources.Namespaces {
for _, flow := range namespace.Flows {
logrus.Infof("Processing flow %s", flow.Name)
service, err := baseClusterTopology.GetService(flow.Spec.Service, namespace.Name)
if err != nil {
return stacktrace.Propagate(err, "An error occurred retrieving base cluster topology service %s in namespace %s", flow.Spec.Service, namespace.Name)
}
}
for _, deployment := range baseClusterTopologyNamespace.Deployments {
if namespace.GetDeployment(deployment.Name) == nil {
logrus.Infof("Creating deployment %s", deployment.Name)
_ = cl.Create(ctx, deployment)
deployment := resources.GetDeploymentFromName(service.ServiceID, namespace.Deployments)
deployment.Spec.Template.Spec.Containers[0].Image = flow.Spec.Image
patch := &topology.ServicePatch{
Namespace: namespace.Name,
Service: flow.Spec.Service,
DeploymentSpec: &deployment.Spec,
}
}
}

// Delete missing resources
for _, service := range namespace.Services {
serviceAnnotations := service.Annotations
isManaged, found := serviceAnnotations["kardinal.dev/managed"]
if found && isManaged == "true" {
if baseClusterTopologyNamespace == nil || baseClusterTopologyNamespace.GetService(service.Name) == nil {
logrus.Infof("Deleting service %s", service.Name)
_ = cl.Delete(ctx, service)
patches := []*topology.ServicePatch{patch}
flowPatch := &topology.FlowPatch{
FlowId: flow.GetObjectMeta().GetName(),
ServicePatches: patches,
}
}
}
for _, deployment := range namespace.Deployments {
deploymentAnnotations := deployment.Annotations
isManaged, found := deploymentAnnotations["kardinal.dev/managed"]
if found && isManaged == "true" {
if baseClusterTopologyNamespace == nil || baseClusterTopologyNamespace.GetDeployment(deployment.Name) == nil {
logrus.Infof("Deleting deployment %s", deployment.Name)
_ = cl.Delete(ctx, deployment)
err = baseClusterTopology.UpdateWithFlow(flowPatch)
if err != nil {
return stacktrace.Propagate(err, "An error occurred updating the base cluster topology with flow %s", flowPatch.FlowId)
}
}
}

// Reconcile
err = baseClusterTopology.ApplyResources(ctx, clusterResources, cl)
if err != nil {
return stacktrace.Propagate(err, "An error occurred applying the resources")
}

return nil
}
38 changes: 37 additions & 1 deletion kardinal/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,32 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func GetNamespaceResources(ctx context.Context, namespace string, cl client.Client) (*Namespace, error) {
type Resources struct {
Namespaces []*Namespace
}

func NewResourcesFromClient(ctx context.Context, cl client.Client) (*Resources, error) {

namespaces := []*Namespace{}
coreV1Namespaces := &corev1.NamespaceList{}
err := cl.List(ctx, coreV1Namespaces)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred retrieving the list of namespaces")
}

for _, coreV1Namespace := range coreV1Namespaces.Items {
namespace, err := getNamespaceResources(ctx, coreV1Namespace.Name, cl)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occurred retrieving the list of namespaces")
}

namespaces = append(namespaces, namespace)
}

return &Resources{Namespaces: namespaces}, nil
}

func getNamespaceResources(ctx context.Context, namespace string, cl client.Client) (*Namespace, error) {

services := &corev1.ServiceList{}
err := cl.List(ctx, services, client.InNamespace(namespace))
Expand All @@ -33,6 +58,7 @@ func GetNamespaceResources(ctx context.Context, namespace string, cl client.Clie
}

return &Namespace{
Name: namespace,
Services: lo.Map(services.Items, func(service corev1.Service, _ int) *corev1.Service { return &service }),
Deployments: lo.Map(deployments.Items, func(deployment appsv1.Deployment, _ int) *appsv1.Deployment { return &deployment }),
Flows: lo.Map(flows.Items, func(flow kardinalcorev1.Flow, _ int) *kardinalcorev1.Flow { return &flow }),
Expand All @@ -49,6 +75,16 @@ func GetDeploymentFromName(name string, deployments []*appsv1.Deployment) *appsv
return nil
}

func (resources *Resources) GetNamespaceByName(namespace string) *Namespace {
for _, resourcesNamespace := range resources.Namespaces {
if resourcesNamespace.Name == namespace {
return resourcesNamespace
}
}

return nil
}

// Use in priority the label app value
func getObjectName(obj *metav1.ObjectMeta) string {
labelApp, ok := obj.GetLabels()["app"]
Expand Down
107 changes: 84 additions & 23 deletions kardinal/topology/topology.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package topology

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"strings"

"github.com/kurtosis-tech/stacktrace"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
net "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"kardinal.dev/kardinal-operator/kardinal/resources"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ClusterTopology struct {
FlowID string `json:"flowID"`
Ingress *Ingress `json:"ingress"`
Services []*Service `json:"services"`
ServiceDependencies []*ServiceDependency `json:"serviceDependencies"`
Namespace string `json:"namespace"`
}

func (clusterTopology *ClusterTopology) GetService(serviceName string) (*Service, error) {
func (clusterTopology *ClusterTopology) GetService(serviceName string, namespace string) (*Service, error) {
for _, service := range clusterTopology.Services {
if service.ServiceID == serviceName {
if service.Namespace == namespace && service.ServiceID == serviceName {
return service, nil
}
}
Expand All @@ -37,10 +40,9 @@ func (clusterTopology *ClusterTopology) UpdateWithFlow(flowPatch *FlowPatch) err
flowID := flowPatch.FlowId

for _, servicePatch := range flowPatch.ServicePatches {
serviceID := servicePatch.Service
targetService, err := clusterTopology.GetService(serviceID)
targetService, err := clusterTopology.GetService(servicePatch.Service, servicePatch.Namespace)
if err != nil {
return err
return stacktrace.Propagate(err, "An error occurred retrieving the service %s in namespace %s", servicePatch.Service, servicePatch.Namespace)
}
modifiedTargetService := DeepCopyService(targetService)
modifiedTargetService.DeploymentSpec = servicePatch.DeploymentSpec
Expand All @@ -52,7 +54,7 @@ func (clusterTopology *ClusterTopology) UpdateWithFlow(flowPatch *FlowPatch) err
return nil
}

func (clusterTopology *ClusterTopology) GetResources() (map[string]*resources.Namespace, error) {
func (clusterTopology *ClusterTopology) GetResources() (*resources.Resources, error) {
namespaces := map[string]*resources.Namespace{}
for _, service := range clusterTopology.Services {
if service.IsManaged {
Expand All @@ -70,7 +72,58 @@ func (clusterTopology *ClusterTopology) GetResources() (map[string]*resources.Na
}
}

return namespaces, nil
clusterTopologyResources := &resources.Resources{
Namespaces: lo.Values(namespaces),
}
return clusterTopologyResources, nil
}

func (clusterTopology *ClusterTopology) ApplyResources(ctx context.Context, clusterResources *resources.Resources, cl client.Client) error {
clusterTopologyResources, err := clusterTopology.GetResources()
if err != nil {
return stacktrace.Propagate(err, "An error occurred retrieving the list of resources")
}

for _, namespace := range clusterResources.Namespaces {
clusterTopologyNamespace := clusterTopologyResources.GetNamespaceByName(namespace.Name)
if clusterTopologyNamespace != nil {
for _, service := range clusterTopologyNamespace.Services {
if namespace.GetService(service.Name) == nil {
logrus.Infof("Creating service %s", service.Name)
_ = cl.Create(ctx, service)
}
}
for _, deployment := range clusterTopologyNamespace.Deployments {
if namespace.GetDeployment(deployment.Name) == nil {
logrus.Infof("Creating deployment %s", deployment.Name)
_ = cl.Create(ctx, deployment)
}
}
}

for _, service := range namespace.Services {
serviceAnnotations := service.Annotations
isManaged, found := serviceAnnotations["kardinal.dev/managed"]
if found && isManaged == "true" {
if clusterTopologyNamespace == nil || clusterTopologyNamespace.GetService(service.Name) == nil {
logrus.Infof("Deleting service %s", service.Name)
_ = cl.Delete(ctx, service)
}
}
}
for _, deployment := range namespace.Deployments {
deploymentAnnotations := deployment.Annotations
isManaged, found := deploymentAnnotations["kardinal.dev/managed"]
if found && isManaged == "true" {
if clusterTopologyNamespace == nil || clusterTopologyNamespace.GetDeployment(deployment.Name) == nil {
logrus.Infof("Deleting deployment %s", deployment.Name)
_ = cl.Delete(ctx, deployment)
}
}
}
}

return nil
}

type Service struct {
Expand Down Expand Up @@ -171,11 +224,13 @@ func NewServiceFromServiceAndDeployment(coreV1Service *corev1.Service, deploymen
namespace := coreV1Service.Namespace

clusterTopologyService := &Service{
ServiceID: coreV1Service.Name,
Namespace: namespace,
Version: version,
ServiceSpec: &coreV1Service.Spec,
DeploymentSpec: &deployment.Spec,
ServiceID: coreV1Service.Name,
Namespace: namespace,
Version: version,
ServiceSpec: &coreV1Service.Spec,
}
if deployment != nil {
clusterTopologyService.DeploymentSpec = &deployment.Spec
}
isStateful, ok := serviceAnnotations["kardinal.dev.service/stateful"]
if ok && isStateful == "true" {
Expand Down Expand Up @@ -244,30 +299,36 @@ type FlowPatch struct {
}

type ServicePatch struct {
Namespace string
Service string
DeploymentSpec *appsv1.DeploymentSpec
}

func NewClusterTopologyFromResources(
services []*corev1.Service,
deployments []*appsv1.Deployment,
namespace string,
clusterResources *resources.Resources,
version string,
) (*ClusterTopology, error) {
clusterTopologyServices, clusterTopologyServiceDependencies, err := processServices(services, deployments, version)
if err != nil {
return nil, stacktrace.NewError("an error occurred processing the service configs")
clusterTopologyServices := []*Service{}
clusterTopologyServiceDependencies := []*ServiceDependency{}

for _, resourceNamespace := range clusterResources.Namespaces {
services, serviceDependencies, err := processServices(resourceNamespace.Services, resourceNamespace.Deployments, version)
if err != nil {
return nil, stacktrace.NewError("an error occurred processing the service configs")
}
clusterTopologyServices = append(clusterTopologyServices, services...)
clusterTopologyServiceDependencies = append(clusterTopologyServiceDependencies, serviceDependencies...)
}

// some validations
if len(clusterTopologyServices) == 0 {
return nil, stacktrace.NewError("At least one service is required in addition to the ingress service(s)")
}

clusterTopology := ClusterTopology{}
clusterTopology.Namespace = namespace
clusterTopology.Services = clusterTopologyServices
clusterTopology.ServiceDependencies = clusterTopologyServiceDependencies
clusterTopology := ClusterTopology{
Services: clusterTopologyServices,
ServiceDependencies: clusterTopologyServiceDependencies,
}

return &clusterTopology, nil
}
Expand Down

0 comments on commit c0da31c

Please sign in to comment.