Skip to content

Commit

Permalink
Fix/namespace sync (#20)
Browse files Browse the repository at this point in the history
This PR fixes the issue that occurred when synchronizing between
namespaces, where the source namespace was being saved incorrectly.
  • Loading branch information
mvleandro authored Aug 16, 2023
1 parent 2e728ec commit 3e65d3e
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 27 deletions.
5 changes: 4 additions & 1 deletion application/application.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package application

import (
"fmt"
"keess/kube_syncer"

"github.com/spf13/viper"
Expand All @@ -11,7 +12,7 @@ import (
func New() *cli.App {
app := cli.NewApp()
app.Name = "Keess"
app.Version = "v0.1.10"
app.Version = "v0.1.12"
app.Usage = "Keep stuff synchronized."
app.Description = "Keep secrets and configmaps synchronized."
app.Suggest = true
Expand Down Expand Up @@ -82,6 +83,8 @@ func run(c *cli.Context) error {
developmentMode = viper.GetBool("DEVELOPMENT_MODE")
}

fmt.Printf("Starting %s %s\n", c.App.Name, c.App.Version)

var syncer kube_syncer.Syncer
err := syncer.Start(kubeConfigPath, developmentMode, sourceContext, destinationContexts)

Expand Down
4 changes: 2 additions & 2 deletions chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.10
version: 0.1.12

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "v0.1.10"
appVersion: "v0.1.12"
2 changes: 1 addition & 1 deletion chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ image:
repository: image-registry.powerapp.cloud/keess/keess
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "main-e25d1f9a8370a1cbef0b5f06020bc2478d761934-13"
tag: "PR-20-65562ff5dc8b83fcc62d98a05c721ee9250ebc5f-1"

imagePullSecrets: []
nameOverride: ""
Expand Down
4 changes: 4 additions & 0 deletions kube_syncer/abstractions/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package abstractions

import (
"strings"
"time"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -38,6 +39,9 @@ const All string = "all"
// Constant with the annotation created by the kubectl apply command
const KubectlApplyAnnotation string = "kubectl.kubernetes.io/last-applied-configuration"

// The timeout for watching.
var WatchTimeOut int64 = int64(time.Duration(60 * 60 * 24 * 365 * 10))

// Logger object.
var Logger *zap.SugaredLogger

Expand Down
36 changes: 18 additions & 18 deletions kube_syncer/abstractions/kubernetes_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func (e *KubernetesEntity) Create() error {
client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.ConfigMap)
entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext)

_, error := client.Create(context.TODO(), entity, v1.CreateOptions{})
_, error := client.Create(context.TODO(), &entity, v1.CreateOptions{})

if error == nil {
Logger.Infof("The configMap '%s' was added in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
Expand All @@ -51,7 +51,7 @@ func (e *KubernetesEntity) Create() error {
Logger.Error(error)
} else {
// If alredy exists it need to be updated.
_, error := client.Update(context.TODO(), entity, v1.UpdateOptions{})
_, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{})
if error == nil {
Logger.Infof("The configMap '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
} else {
Expand All @@ -67,9 +67,9 @@ func (e *KubernetesEntity) Create() error {
client := e.Client.CoreV1().Secrets(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.Secret)
entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext)

_, error := client.Create(context.TODO(), entity, v1.CreateOptions{})
_, error := client.Create(context.TODO(), &entity, v1.CreateOptions{})

if error == nil {
Logger.Infof("The secret '%s' was added in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
Expand All @@ -78,7 +78,7 @@ func (e *KubernetesEntity) Create() error {
Logger.Error(error)
} else {
// If alredy exists it need to be updated.
_, error := client.Update(context.TODO(), entity, v1.UpdateOptions{})
_, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{})
if error == nil {
Logger.Infof("The secret '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
} else {
Expand All @@ -99,9 +99,9 @@ func (e *KubernetesEntity) Update() error {
client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.ConfigMap)
entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext)

_, error := client.Update(context.TODO(), entity, v1.UpdateOptions{})
_, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{})

if error == nil {
Logger.Infof("The configmap '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
Expand All @@ -110,7 +110,7 @@ func (e *KubernetesEntity) Update() error {
Logger.Error(error)
} else {
// If not exists it need to be created.
_, error := client.Create(context.TODO(), entity, v1.CreateOptions{})
_, error := client.Create(context.TODO(), &entity, v1.CreateOptions{})
if error == nil {
Logger.Infof("The configmap '%s' was created in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
} else {
Expand All @@ -126,9 +126,9 @@ func (e *KubernetesEntity) Update() error {
client := e.Client.CoreV1().Secrets(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.Secret)
entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext)

_, error := client.Update(context.TODO(), entity, v1.UpdateOptions{})
_, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{})

if error == nil {
Logger.Infof("The secret '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
Expand All @@ -137,7 +137,7 @@ func (e *KubernetesEntity) Update() error {
Logger.Error(error)
} else {
// If not exists it need to be created.
_, error := client.Create(context.TODO(), entity, v1.CreateOptions{})
_, error := client.Create(context.TODO(), &entity, v1.CreateOptions{})
if error == nil {
Logger.Infof("The secret '%s' was created in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext)
} else {
Expand All @@ -157,7 +157,7 @@ func (e *KubernetesEntity) Delete() error {
client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.ConfigMap)
entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext)

error := client.Delete(context.TODO(), entity.Name, v1.DeleteOptions{})

Expand All @@ -178,7 +178,7 @@ func (e *KubernetesEntity) Delete() error {
client := e.Client.CoreV1().Secrets(e.DestinationNamespace)

sourceEntity := e.Entity.(*corev1.Secret)
entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext)
entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext)

error := client.Delete(context.TODO(), entity.Name, v1.DeleteOptions{})

Expand All @@ -198,7 +198,7 @@ func (e *KubernetesEntity) Delete() error {
return errors.New("unsuported type")
}

func getNewConfigMap(sourceConfigMap *corev1.ConfigMap, namespace, sourceContext string) *corev1.ConfigMap {
func getNewConfigMap(sourceConfigMap corev1.ConfigMap, namespace, sourceContext string) corev1.ConfigMap {
destinationConfigMap := sourceConfigMap.DeepCopy()

destinationConfigMap.UID = ""
Expand All @@ -211,10 +211,10 @@ func getNewConfigMap(sourceConfigMap *corev1.ConfigMap, namespace, sourceContext
destinationConfigMap.Namespace = namespace
destinationConfigMap.ResourceVersion = ""

return destinationConfigMap
return *destinationConfigMap
}

func getNewSecret(sourceSecret *corev1.Secret, namespace, sourceContext string) *corev1.Secret {
func getNewSecret(sourceSecret corev1.Secret, namespace, sourceContext string) corev1.Secret {
destinationSecret := sourceSecret.DeepCopy()

destinationSecret.UID = ""
Expand All @@ -227,5 +227,5 @@ func getNewSecret(sourceSecret *corev1.Secret, namespace, sourceContext string)
destinationSecret.Namespace = namespace
destinationSecret.ResourceVersion = ""

return destinationSecret
return *destinationSecret
}
9 changes: 7 additions & 2 deletions kube_syncer/configmap_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ func (w ConfigMapWatcher) Watch() <-chan abstractions.ISynchronizable {
configMapsChan := make(chan abstractions.ISynchronizable)

go func() {
watcher, _ := w.kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: abstractions.LabelSelector,
watcher, err := w.kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: abstractions.LabelSelector,
TimeoutSeconds: &abstractions.WatchTimeOut,
})

if err != nil {
panic(err)
}

w.logger.Info("Watching configMaps events.")

for event := range watcher.ResultChan() {
Expand Down
8 changes: 7 additions & 1 deletion kube_syncer/namespace_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ func (w NamespaceWatcher) Watch() <-chan abstractions.ISynchronizable {
namespacesChan := make(chan abstractions.ISynchronizable)

go func() {
watcher, _ := w.kubeClient.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{})
watcher, err := w.kubeClient.CoreV1().Namespaces().Watch(context.TODO(), metav1.ListOptions{
TimeoutSeconds: &abstractions.WatchTimeOut,
})

if err != nil {
panic(err)
}

w.logger.Info("Watching namespaces events.")

Expand Down
9 changes: 7 additions & 2 deletions kube_syncer/secret_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ func (w SecretWatcher) Watch() <-chan abstractions.ISynchronizable {
secretsChan := make(chan abstractions.ISynchronizable)

go func() {
watcher, _ := w.kubeClient.CoreV1().Secrets(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: abstractions.LabelSelector,
watcher, err := w.kubeClient.CoreV1().Secrets(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{
LabelSelector: abstractions.LabelSelector,
TimeoutSeconds: &abstractions.WatchTimeOut,
})

if err != nil {
panic(err)
}

w.logger.Info("Watching secrets events.")

for event := range watcher.ResultChan() {
Expand Down
17 changes: 17 additions & 0 deletions kube_syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex
s.logger.Info("Config loaded from kube config.")
}

client.RESTClient().Get().Timeout(time.Duration(abstractions.WatchTimeOut))
s.kubeClients = map[string]*kubernetes.Clientset{}
s.kubeClients[s.sourceContext] = client

Expand All @@ -124,6 +125,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex
s.logger.Error(err)
}

client.RESTClient().Get().Timeout(time.Duration(abstractions.WatchTimeOut))
s.kubeClients[context] = client
}

Expand Down Expand Up @@ -202,6 +204,11 @@ func (s *Syncer) Run() error {

for currentContext, kubeClient := range s.kubeClients {

// Don't look to another clusters on backward synchronization.
if currentContext != s.sourceContext {
continue
}

// Now list all ConfigMaps that are managed by Keess.
managedConfigMapList, err := kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
LabelSelector: abstractions.ManagedLabelSelector,
Expand All @@ -222,6 +229,11 @@ func (s *Syncer) Run() error {
continue
}

// Only do back synchronization between namespaces of the same cluster.
if sourceContext != currentContext {
continue
}

sourceKubeClient := s.kubeClients[sourceContext]
sourceConfigMap, err := sourceKubeClient.CoreV1().ConfigMaps(sourceNamespace).Get(context.TODO(), configMap.Name, metav1.GetOptions{})

Expand Down Expand Up @@ -275,6 +287,11 @@ func (s *Syncer) Run() error {
continue
}

// Only do back synchronization between namespaces of the same cluster.
if sourceContext != currentContext {
continue
}

sourceKubeClient := s.kubeClients[sourceContext]
sourceSecret, err := sourceKubeClient.CoreV1().Secrets(sourceNamespace).Get(context.TODO(), secret.Name, metav1.GetOptions{})

Expand Down

0 comments on commit 3e65d3e

Please sign in to comment.