diff --git a/application/application.go b/application/application.go index 41a0696..250849a 100644 --- a/application/application.go +++ b/application/application.go @@ -1,6 +1,7 @@ package application import ( + "fmt" "keess/kube_syncer" "github.com/spf13/viper" @@ -11,7 +12,7 @@ import ( func New() *cli.App { app := cli.NewApp() app.Name = "Keess" - app.Version = "v0.1.10" + app.Version = "v0.1.12" app.Usage = "Keep stuff synchronized." app.Description = "Keep secrets and configmaps synchronized." app.Suggest = true @@ -82,6 +83,8 @@ func run(c *cli.Context) error { developmentMode = viper.GetBool("DEVELOPMENT_MODE") } + fmt.Printf("Starting %s %s\n", c.App.Name, c.App.Version) + var syncer kube_syncer.Syncer err := syncer.Start(kubeConfigPath, developmentMode, sourceContext, destinationContexts) diff --git a/chart/Chart.yaml b/chart/Chart.yaml index 56a4e5c..11bb45e 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.10 +version: 0.1.12 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "v0.1.10" +appVersion: "v0.1.12" diff --git a/chart/values.yaml b/chart/values.yaml index 81368b2..efff8d1 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -6,7 +6,7 @@ image: repository: image-registry.powerapp.cloud/keess/keess pullPolicy: IfNotPresent # Overrides the image tag whose default is the chart appVersion. - tag: "main-e25d1f9a8370a1cbef0b5f06020bc2478d761934-13" + tag: "PR-20-65562ff5dc8b83fcc62d98a05c721ee9250ebc5f-1" imagePullSecrets: [] nameOverride: "" diff --git a/kube_syncer/abstractions/abstractions.go b/kube_syncer/abstractions/abstractions.go index 1f89ece..e9705ac 100644 --- a/kube_syncer/abstractions/abstractions.go +++ b/kube_syncer/abstractions/abstractions.go @@ -2,6 +2,7 @@ package abstractions import ( "strings" + "time" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -38,6 +39,9 @@ const All string = "all" // Constant with the annotation created by the kubectl apply command const KubectlApplyAnnotation string = "kubectl.kubernetes.io/last-applied-configuration" +// The timeout for watching. +var WatchTimeOut int64 = int64(time.Duration(60 * 60 * 24 * 365 * 10)) + // Logger object. var Logger *zap.SugaredLogger diff --git a/kube_syncer/abstractions/kubernetes_entity.go b/kube_syncer/abstractions/kubernetes_entity.go index 5284930..17a3593 100644 --- a/kube_syncer/abstractions/kubernetes_entity.go +++ b/kube_syncer/abstractions/kubernetes_entity.go @@ -40,9 +40,9 @@ func (e *KubernetesEntity) Create() error { client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.ConfigMap) - entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext) - _, error := client.Create(context.TODO(), entity, v1.CreateOptions{}) + _, error := client.Create(context.TODO(), &entity, v1.CreateOptions{}) if error == nil { Logger.Infof("The configMap '%s' was added in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) @@ -51,7 +51,7 @@ func (e *KubernetesEntity) Create() error { Logger.Error(error) } else { // If alredy exists it need to be updated. - _, error := client.Update(context.TODO(), entity, v1.UpdateOptions{}) + _, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{}) if error == nil { Logger.Infof("The configMap '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) } else { @@ -67,9 +67,9 @@ func (e *KubernetesEntity) Create() error { client := e.Client.CoreV1().Secrets(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.Secret) - entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext) - _, error := client.Create(context.TODO(), entity, v1.CreateOptions{}) + _, error := client.Create(context.TODO(), &entity, v1.CreateOptions{}) if error == nil { Logger.Infof("The secret '%s' was added in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) @@ -78,7 +78,7 @@ func (e *KubernetesEntity) Create() error { Logger.Error(error) } else { // If alredy exists it need to be updated. - _, error := client.Update(context.TODO(), entity, v1.UpdateOptions{}) + _, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{}) if error == nil { Logger.Infof("The secret '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) } else { @@ -99,9 +99,9 @@ func (e *KubernetesEntity) Update() error { client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.ConfigMap) - entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext) - _, error := client.Update(context.TODO(), entity, v1.UpdateOptions{}) + _, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{}) if error == nil { Logger.Infof("The configmap '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) @@ -110,7 +110,7 @@ func (e *KubernetesEntity) Update() error { Logger.Error(error) } else { // If not exists it need to be created. - _, error := client.Create(context.TODO(), entity, v1.CreateOptions{}) + _, error := client.Create(context.TODO(), &entity, v1.CreateOptions{}) if error == nil { Logger.Infof("The configmap '%s' was created in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) } else { @@ -126,9 +126,9 @@ func (e *KubernetesEntity) Update() error { client := e.Client.CoreV1().Secrets(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.Secret) - entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext) - _, error := client.Update(context.TODO(), entity, v1.UpdateOptions{}) + _, error := client.Update(context.TODO(), &entity, v1.UpdateOptions{}) if error == nil { Logger.Infof("The secret '%s' was updated in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) @@ -137,7 +137,7 @@ func (e *KubernetesEntity) Update() error { Logger.Error(error) } else { // If not exists it need to be created. - _, error := client.Create(context.TODO(), entity, v1.CreateOptions{}) + _, error := client.Create(context.TODO(), &entity, v1.CreateOptions{}) if error == nil { Logger.Infof("The secret '%s' was created in the namespace '%s' on context '%s'.", entity.Name, entity.Namespace, e.DestinationContext) } else { @@ -157,7 +157,7 @@ func (e *KubernetesEntity) Delete() error { client := e.Client.CoreV1().ConfigMaps(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.ConfigMap) - entity := getNewConfigMap(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewConfigMap(*sourceEntity, e.DestinationNamespace, e.SourceContext) error := client.Delete(context.TODO(), entity.Name, v1.DeleteOptions{}) @@ -178,7 +178,7 @@ func (e *KubernetesEntity) Delete() error { client := e.Client.CoreV1().Secrets(e.DestinationNamespace) sourceEntity := e.Entity.(*corev1.Secret) - entity := getNewSecret(sourceEntity, e.DestinationNamespace, e.SourceContext) + entity := getNewSecret(*sourceEntity, e.DestinationNamespace, e.SourceContext) error := client.Delete(context.TODO(), entity.Name, v1.DeleteOptions{}) @@ -198,7 +198,7 @@ func (e *KubernetesEntity) Delete() error { return errors.New("unsuported type") } -func getNewConfigMap(sourceConfigMap *corev1.ConfigMap, namespace, sourceContext string) *corev1.ConfigMap { +func getNewConfigMap(sourceConfigMap corev1.ConfigMap, namespace, sourceContext string) corev1.ConfigMap { destinationConfigMap := sourceConfigMap.DeepCopy() destinationConfigMap.UID = "" @@ -211,10 +211,10 @@ func getNewConfigMap(sourceConfigMap *corev1.ConfigMap, namespace, sourceContext destinationConfigMap.Namespace = namespace destinationConfigMap.ResourceVersion = "" - return destinationConfigMap + return *destinationConfigMap } -func getNewSecret(sourceSecret *corev1.Secret, namespace, sourceContext string) *corev1.Secret { +func getNewSecret(sourceSecret corev1.Secret, namespace, sourceContext string) corev1.Secret { destinationSecret := sourceSecret.DeepCopy() destinationSecret.UID = "" @@ -227,5 +227,5 @@ func getNewSecret(sourceSecret *corev1.Secret, namespace, sourceContext string) destinationSecret.Namespace = namespace destinationSecret.ResourceVersion = "" - return destinationSecret + return *destinationSecret } diff --git a/kube_syncer/configmap_watcher.go b/kube_syncer/configmap_watcher.go index b74ccce..8d32e83 100644 --- a/kube_syncer/configmap_watcher.go +++ b/kube_syncer/configmap_watcher.go @@ -24,10 +24,15 @@ func (w ConfigMapWatcher) Watch() <-chan abstractions.ISynchronizable { configMapsChan := make(chan abstractions.ISynchronizable) go func() { - watcher, _ := w.kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{ - LabelSelector: abstractions.LabelSelector, + watcher, err := w.kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{ + LabelSelector: abstractions.LabelSelector, + TimeoutSeconds: &abstractions.WatchTimeOut, }) + if err != nil { + panic(err) + } + w.logger.Info("Watching configMaps events.") for event := range watcher.ResultChan() { diff --git a/kube_syncer/namespace_watcher.go b/kube_syncer/namespace_watcher.go index 0e0faca..3f66965 100644 --- a/kube_syncer/namespace_watcher.go +++ b/kube_syncer/namespace_watcher.go @@ -23,7 +23,13 @@ func (w NamespaceWatcher) Watch() <-chan abstractions.ISynchronizable { namespacesChan := make(chan abstractions.ISynchronizable) go func() { - watcher, _ := w.kubeClient.CoreV1().Namespaces().Watch(context.Background(), metav1.ListOptions{}) + watcher, err := w.kubeClient.CoreV1().Namespaces().Watch(context.TODO(), metav1.ListOptions{ + TimeoutSeconds: &abstractions.WatchTimeOut, + }) + + if err != nil { + panic(err) + } w.logger.Info("Watching namespaces events.") diff --git a/kube_syncer/secret_watcher.go b/kube_syncer/secret_watcher.go index 9a0bc24..122439e 100644 --- a/kube_syncer/secret_watcher.go +++ b/kube_syncer/secret_watcher.go @@ -24,10 +24,15 @@ func (w SecretWatcher) Watch() <-chan abstractions.ISynchronizable { secretsChan := make(chan abstractions.ISynchronizable) go func() { - watcher, _ := w.kubeClient.CoreV1().Secrets(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{ - LabelSelector: abstractions.LabelSelector, + watcher, err := w.kubeClient.CoreV1().Secrets(metav1.NamespaceAll).Watch(context.TODO(), metav1.ListOptions{ + LabelSelector: abstractions.LabelSelector, + TimeoutSeconds: &abstractions.WatchTimeOut, }) + if err != nil { + panic(err) + } + w.logger.Info("Watching secrets events.") for event := range watcher.ResultChan() { diff --git a/kube_syncer/syncer.go b/kube_syncer/syncer.go index 2efaec4..bf47bc0 100644 --- a/kube_syncer/syncer.go +++ b/kube_syncer/syncer.go @@ -109,6 +109,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex s.logger.Info("Config loaded from kube config.") } + client.RESTClient().Get().Timeout(time.Duration(abstractions.WatchTimeOut)) s.kubeClients = map[string]*kubernetes.Clientset{} s.kubeClients[s.sourceContext] = client @@ -124,6 +125,7 @@ func (s *Syncer) Start(kubeConfigPath string, developmentMode bool, sourceContex s.logger.Error(err) } + client.RESTClient().Get().Timeout(time.Duration(abstractions.WatchTimeOut)) s.kubeClients[context] = client } @@ -202,6 +204,11 @@ func (s *Syncer) Run() error { for currentContext, kubeClient := range s.kubeClients { + // Don't look to another clusters on backward synchronization. + if currentContext != s.sourceContext { + continue + } + // Now list all ConfigMaps that are managed by Keess. managedConfigMapList, err := kubeClient.CoreV1().ConfigMaps(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{ LabelSelector: abstractions.ManagedLabelSelector, @@ -222,6 +229,11 @@ func (s *Syncer) Run() error { continue } + // Only do back synchronization between namespaces of the same cluster. + if sourceContext != currentContext { + continue + } + sourceKubeClient := s.kubeClients[sourceContext] sourceConfigMap, err := sourceKubeClient.CoreV1().ConfigMaps(sourceNamespace).Get(context.TODO(), configMap.Name, metav1.GetOptions{}) @@ -275,6 +287,11 @@ func (s *Syncer) Run() error { continue } + // Only do back synchronization between namespaces of the same cluster. + if sourceContext != currentContext { + continue + } + sourceKubeClient := s.kubeClients[sourceContext] sourceSecret, err := sourceKubeClient.CoreV1().Secrets(sourceNamespace).Get(context.TODO(), secret.Name, metav1.GetOptions{})