diff --git a/operator/pkg/config/multiclusterglobalhub_config.go b/operator/pkg/config/multiclusterglobalhub_config.go index 8745755b1..d73932455 100644 --- a/operator/pkg/config/multiclusterglobalhub_config.go +++ b/operator/pkg/config/multiclusterglobalhub_config.go @@ -358,7 +358,8 @@ func GetMulticlusterGlobalHub(ctx context.Context, c client.Client) (*v1alpha4.M return nil, err } if len(mghList.Items) != 1 { - return nil, fmt.Errorf("mgh should only have 1 instance, but got %v", len(mghList.Items)) + klog.Infof("mgh should have 1 instance, but got %v", len(mghList.Items)) + return nil, nil } return &mghList.Items[0], nil } diff --git a/operator/pkg/controllers/agent/addon_controller_manifests.go b/operator/pkg/controllers/agent/addon_controller_manifests.go index 7069ae4ed..2b909884a 100644 --- a/operator/pkg/controllers/agent/addon_controller_manifests.go +++ b/operator/pkg/controllers/agent/addon_controller_manifests.go @@ -146,7 +146,7 @@ func (a *HohAgentAddon) GetValues(cluster *clusterv1.ManagedCluster, installNamespace = operatorconstants.GHAgentInstallNamespace } mgh, err := config.GetMulticlusterGlobalHub(a.ctx, a.client) - if err != nil { + if err != nil || mgh == nil { log.Error(err, "failed to get MulticlusterGlobalHub") return nil, err } diff --git a/operator/pkg/controllers/agent/addon_installer.go b/operator/pkg/controllers/agent/addon_installer.go index 769e6a4bc..1cc86237a 100644 --- a/operator/pkg/controllers/agent/addon_installer.go +++ b/operator/pkg/controllers/agent/addon_installer.go @@ -36,7 +36,7 @@ type AddonInstaller struct { func (r *AddonInstaller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { mgh, err := config.GetMulticlusterGlobalHub(ctx, r.Client) - if err != nil { + if err != nil || mgh == nil { return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } if mgh.DeletionTimestamp != nil { diff --git a/operator/pkg/controllers/agent/addon_installer_test.go b/operator/pkg/controllers/agent/addon_installer_test.go index 3747227dd..fe6eaf2a1 100644 --- a/operator/pkg/controllers/agent/addon_installer_test.go +++ b/operator/pkg/controllers/agent/addon_installer_test.go @@ -95,7 +95,7 @@ func fakeHoHAddon(cluster, installNamespace, addonDeployMode string) *v1alpha1.M } func TestAddonInstaller(t *testing.T) { - namespace := "default" + namespace := "multicluster-global-hub" name := "test" config.SetMGHNamespacedName(types.NamespacedName{ Namespace: namespace, diff --git a/operator/pkg/controllers/crd/crd_controller.go b/operator/pkg/controllers/crd/crd_controller.go index 9b966d991..9eaceb796 100644 --- a/operator/pkg/controllers/crd/crd_controller.go +++ b/operator/pkg/controllers/crd/crd_controller.go @@ -80,7 +80,7 @@ func (r *CrdController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R defer r.mu.Unlock() // Check if mgh exist or deleting mgh, err := config.GetMulticlusterGlobalHub(ctx, r.GetClient()) - if err != nil { + if err != nil || mgh == nil { return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } if mgh.DeletionTimestamp != nil { diff --git a/operator/pkg/controllers/hubofhubs/controller.go b/operator/pkg/controllers/hubofhubs/controller.go index 58c14eda4..34b96b151 100644 --- a/operator/pkg/controllers/hubofhubs/controller.go +++ b/operator/pkg/controllers/hubofhubs/controller.go @@ -391,7 +391,10 @@ func watchMulticlusterGlobalHubPredict() predicate.TypedPredicate[*v1alpha4.Mult return true }, UpdateFunc: func(e event.TypedUpdateEvent[*v1alpha4.MulticlusterGlobalHub]) bool { - return e.ObjectOld.GetResourceVersion() != e.ObjectNew.GetResourceVersion() + if e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration() { + return true + } + return !reflect.DeepEqual(e.ObjectOld.GetAnnotations(), e.ObjectNew.GetAnnotations()) }, DeleteFunc: func(e event.TypedDeleteEvent[*v1alpha4.MulticlusterGlobalHub]) bool { return !e.DeleteStateUnknown @@ -604,16 +607,12 @@ func (r *GlobalHubReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( r.log.V(2).Info("reconciling mgh instance", "namespace", req.Namespace, "name", req.Name) mgh, err := config.GetMulticlusterGlobalHub(ctx, r.client) if err != nil { - if errors.IsNotFound(err) { - // Request object not found, could have been deleted after reconcile request. - // Owned objects are automatically garbage collected. For additional cleanup logic use finalizers. - // Return and don't requeue - r.log.Info("mgh instance not found", "namespace", req.Namespace, "name", req.Name) - return ctrl.Result{}, nil - } r.log.Error(err, "failed to get MulticlusterGlobalHub") return ctrl.Result{}, err } + if mgh == nil { + return ctrl.Result{}, nil + } err = config.SetMulticlusterGlobalHubConfig(ctx, mgh, r.client, r.imageClient) if err != nil { return ctrl.Result{}, err @@ -644,8 +643,12 @@ func (r *GlobalHubReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( config.SetImportClusterInHosted(mgh) // prune resources if deleting mgh or metrics is disabled - if err = r.pruneReconciler.Reconcile(ctx, mgh); err != nil { - return ctrl.Result{RequeueAfter: 2 * time.Second}, fmt.Errorf("failed to prune Global Hub resources %v", err) + needRequeue, err := r.pruneReconciler.Reconcile(ctx, mgh) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to prune Global Hub resources %v", err) + } + if needRequeue { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } if mgh.GetDeletionTimestamp() != nil { return ctrl.Result{}, nil @@ -666,8 +669,12 @@ func (r *GlobalHubReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // storage and transporter - if err = r.ReconcileMiddleware(ctx, mgh); err != nil { - return ctrl.Result{}, err + needRequeue, reconcileErr := r.ReconcileMiddleware(ctx, mgh) + if reconcileErr != nil { + return ctrl.Result{}, reconcileErr + } + if needRequeue { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } // reconcile metrics @@ -676,8 +683,12 @@ func (r *GlobalHubReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // reconcile manager - if err := r.managerReconciler.Reconcile(ctx, mgh); err != nil { - return ctrl.Result{}, err + needRequeue, reconcileErr = r.managerReconciler.Reconcile(ctx, mgh) + if reconcileErr != nil { + return ctrl.Result{}, reconcileErr + } + if needRequeue { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } if config.WithInventory(mgh) { @@ -713,15 +724,18 @@ func (r *GlobalHubReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // 2. then create the kafka and postgres resources at the same time // 3. wait for kafka and postgres ready func (r *GlobalHubReconciler) ReconcileMiddleware(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub, -) error { +) (bool, error) { if err := r.transportReconciler.Reconcile(ctx, mgh); err != nil { - return err + return true, err } - - if err := r.storageReconciler.Reconcile(ctx, mgh); err != nil { - return err + needRequeue, err := r.storageReconciler.Reconcile(ctx, mgh) + if err != nil { + return true, err } - return nil + if needRequeue { + return true, nil + } + return false, nil } var WatchedSecret = sets.NewString( diff --git a/operator/pkg/controllers/hubofhubs/controller_test.go b/operator/pkg/controllers/hubofhubs/controller_test.go index 26e928cce..1eb28ce09 100644 --- a/operator/pkg/controllers/hubofhubs/controller_test.go +++ b/operator/pkg/controllers/hubofhubs/controller_test.go @@ -5,7 +5,6 @@ package hubofhubs import ( "context" - "fmt" "os" "path/filepath" "testing" @@ -128,6 +127,7 @@ func TestController(t *testing.T) { NamespacedName: client.ObjectKeyFromObject(mgh), }) + // TODO: Will rewrite the test in status refactor pr Eventually(func() error { err := runtimeMgr.GetClient().Get(ctx, client.ObjectKeyFromObject(mgh), mgh) if err != nil { @@ -140,14 +140,6 @@ func TestController(t *testing.T) { Expect(cond.Status).To(Equal(metav1.ConditionTrue)) Expect(cond.Message).To(Equal("The data will be kept in the database for 24 months.")) } - if cond.Type == config.CONDITION_TYPE_GLOBALHUB_READY { - count++ - Expect(cond.Status).To(Equal(metav1.ConditionFalse)) - Expect(cond.Message).To(ContainSubstring("database not ready")) - } - } - if count != 2 { - return fmt.Errorf("expected to be 2, but got %v", count) } return nil }, 10*time.Second, 1*time.Second).Should(Succeed()) diff --git a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go index 0831702d9..32d5ea517 100644 --- a/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/manager/manager_reconciler.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4" @@ -51,11 +52,11 @@ func NewManagerReconciler(mgr ctrl.Manager, kubeClient kubernetes.Interface, con func (r *ManagerReconciler) Reconcile(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub, -) error { +) (bool, error) { // generate random session secret for oauth-proxy proxySessionSecret, err := config.GetOauthSessionSecret() if err != nil { - return fmt.Errorf("failed to get random session secret for oauth-proxy: %v", err) + return true, fmt.Errorf("failed to get random session secret for oauth-proxy: %v", err) } // create new HoHRenderer and HoHDeployer @@ -64,7 +65,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, // create discovery client dc, err := discovery.NewDiscoveryClientForConfig(r.Manager.GetConfig()) if err != nil { - return err + return true, err } // create restmapper for deployer to find GVR @@ -78,7 +79,7 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, // dataRetention should at least be 1 month, otherwise it will deleted the current month partitions and records months, err := commonutils.ParseRetentionMonth(mgh.Spec.DataLayer.Postgres.Retention) if err != nil { - return fmt.Errorf("failed to parse month retention: %v", err) + return true, fmt.Errorf("failed to parse month retention: %v", err) } if months < 1 { months = 1 @@ -91,28 +92,29 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, transportConn := config.GetTransporterConn() if transportConn == nil || transportConn.BootstrapServer == "" { - return fmt.Errorf("the transport connection(%s) must not be empty", transportConn) + klog.V(2).Infof("Wait kafka connection created") + return true, nil } storageConn := config.GetStorageConnection() if storageConn == nil || !config.GetDatabaseReady() { - return fmt.Errorf("the storage connection or database isn't ready") + return true, fmt.Errorf("the storage connection or database isn't ready") } if isMiddlewareUpdated(transportConn, storageConn) { err = commonutils.RestartPod(ctx, r.kubeClient, mgh.Namespace, constants.ManagerDeploymentName) if err != nil { - return fmt.Errorf("failed to restart manager pod: %w", err) + return true, fmt.Errorf("failed to restart manager pod: %w", err) } } electionConfig, err := config.GetElectionConfig() if err != nil { - return fmt.Errorf("failed to get the electionConfig %w", err) + return true, fmt.Errorf("failed to get the electionConfig %w", err) } kafkaConfigYaml, err := transportConn.YamlMarshal(true) if err != nil { - return fmt.Errorf("failed to marshall kafka connetion for config: %w", err) + return true, fmt.Errorf("failed to marshall kafka connetion for config: %w", err) } managerObjects, err := hohRenderer.Render("manifests", "", func(profile string) (interface{}, error) { @@ -149,12 +151,12 @@ func (r *ManagerReconciler) Reconcile(ctx context.Context, }, nil }) if err != nil { - return fmt.Errorf("failed to render manager objects: %v", err) + return true, fmt.Errorf("failed to render manager objects: %v", err) } if err = utils.ManipulateGlobalHubObjects(managerObjects, mgh, hohDeployer, mapper, r.GetScheme()); err != nil { - return fmt.Errorf("failed to create/update manager objects: %v", err) + return true, fmt.Errorf("failed to create/update manager objects: %v", err) } - return nil + return false, nil } func isMiddlewareUpdated(transportConn *transport.KafkaConnCredential, storageConn *config.PostgresConnection) bool { diff --git a/operator/pkg/controllers/hubofhubs/prune/prune_reconciler.go b/operator/pkg/controllers/hubofhubs/prune/prune_reconciler.go index f65b12a3a..29de6860c 100644 --- a/operator/pkg/controllers/hubofhubs/prune/prune_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/prune/prune_reconciler.go @@ -48,28 +48,32 @@ func NewPruneReconciler(c client.Client) *PruneReconciler { func (r *PruneReconciler) Reconcile(ctx context.Context, mgh *globalhubv1alpha4.MulticlusterGlobalHub, -) error { +) (bool, error) { // Deleting the multiclusterglobalhub instance if mgh.GetDeletionTimestamp() != nil && operatorutils.Contains(mgh.GetFinalizers(), constants.GlobalHubCleanupFinalizer) { if err := r.pruneWebhookResources(ctx); err != nil { - return err + return true, err + } + needRequeue, err := r.GlobalHubResources(ctx, mgh) + if err != nil { + return true, fmt.Errorf("failed to prune Global Hub resources %v", err) } - if err := r.GlobalHubResources(ctx, mgh); err != nil { - return fmt.Errorf("failed to prune Global Hub resources %v", err) + if needRequeue { + return true, nil } if err := r.MetricsResources(ctx); err != nil { - return err + return true, err } - return nil + return false, nil } // If webhook do not need to enable, should remove the related resources if config.IsACMResourceReady() && !config.GetImportClusterInHosted() { if err := r.pruneWebhookResources(ctx); err != nil { - return err + return true, err } if err := r.pruneHostedResources(ctx); err != nil { - return err + return true, err } } @@ -78,10 +82,10 @@ func (r *PruneReconciler) Reconcile(ctx context.Context, mgh.Spec.EnableMetrics = false klog.Info("Kafka and Postgres are provided by customer, disable metrics") if err := r.MetricsResources(ctx); err != nil { - return err + return true, err } } - return nil + return false, nil } func (r *PruneReconciler) pruneHostedResources(ctx context.Context) error { @@ -156,32 +160,36 @@ func (r *PruneReconciler) pruneACMResources(ctx context.Context) error { func (r *PruneReconciler) GlobalHubResources(ctx context.Context, mgh *globalhubv1alpha4.MulticlusterGlobalHub, -) error { +) (bool, error) { if config.IsACMResourceReady() { if err := r.pruneACMResources(ctx); err != nil { - return err + return true, err } } if !config.IsBYOKafka() { - if err := r.pruneStrimziResources(ctx); err != nil { - return err + needRequeue, err := r.pruneStrimziResources(ctx) + if err != nil { + return true, err + } + if needRequeue { + return true, nil } } mgh.SetFinalizers(operatorutils.Remove(mgh.GetFinalizers(), constants.GlobalHubCleanupFinalizer)) if err := operatorutils.UpdateObject(ctx, r.Client, mgh); err != nil { - return err + return true, err } // clean up namesapced resources, eg. mgh system namespace, etc if err := r.pruneNamespacedResources(ctx); err != nil { - return err + return true, err } // clean up the cluster resources, eg. clusterrole, clusterrolebinding, etc if err := r.pruneGlobalResources(ctx); err != nil { - return err + return true, err } if config.IsACMResourceReady() { @@ -189,12 +197,12 @@ func (r *PruneReconciler) GlobalHubResources(ctx context.Context, // the finalizer is added by the global hub manager. ideally, they should be pruned by manager // But currently, we do not have a channel from operator to let manager knows when to start pruning. if err := jobs.NewPruneFinalizer(ctx, r.Client).Run(); err != nil { - return err + return true, err } r.log.Info("removed finalizer from mgh, app, policy, placement and etc") } - return nil + return false, nil } func (r *PruneReconciler) MetricsResources(ctx context.Context) error { @@ -377,7 +385,7 @@ func (r *PruneReconciler) pruneManagedHubs(ctx context.Context) error { return nil } -func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { +func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) (bool, error) { klog.Infof("Remove strimzi resources") listOpts := []client.ListOption{ client.HasLabels{constants.GlobalHubOwnerLabelKey}, @@ -385,12 +393,12 @@ func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { kafkaUserList := &kafkav1beta2.KafkaUserList{} klog.Infof("Delete kafkaUsers") if err := r.Client.List(ctx, kafkaUserList, listOpts...); err != nil { - return err + return true, err } for idx := range kafkaUserList.Items { klog.Infof("Delete kafka user %v", kafkaUserList.Items[idx].Name) if err := r.Client.Delete(ctx, &kafkaUserList.Items[idx]); err != nil && !errors.IsNotFound(err) { - return err + return true, err } } @@ -398,20 +406,21 @@ func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { klog.Infof("Delete kafkaTopics") if err := r.Client.List(ctx, kafkaTopicList, listOpts...); err != nil { - return err + return true, err } for idx := range kafkaTopicList.Items { klog.Infof("Delete kafka topic %v", kafkaTopicList.Items[idx].Name) if err := r.Client.Delete(ctx, &kafkaTopicList.Items[idx]); err != nil && !errors.IsNotFound(err) { - return err + return true, err } } - + // Wait kafkatopic is removed if err := r.Client.List(ctx, kafkaTopicList, listOpts...); err != nil { - return err + return true, err } + if len(kafkaTopicList.Items) != 0 { - return fmt.Errorf("kafkatopics still exist, they should be removed. kafkaTopicList:%v", kafkaTopicList.Items) + return true, nil } kafka := &kafkav1beta2.Kafka{ @@ -423,7 +432,7 @@ func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { klog.Infof("Delete kafka cluster %v", kafka.Name) if err := r.Client.Delete(ctx, kafka); err != nil && !errors.IsNotFound(err) { - return err + return true, err } klog.Infof("kafka cluster deleted") @@ -435,9 +444,9 @@ func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { if err != nil { klog.Errorf("Failed to get strimzi subscription, err:%v", err) if errors.IsNotFound(err) { - return nil + return false, nil } - return err + return true, err } if kafkaSub.Status.InstalledCSV != "" { @@ -449,15 +458,15 @@ func (r *PruneReconciler) pruneStrimziResources(ctx context.Context) error { } klog.Infof("Delete kafka csv %v", kafkaCsv.Name) if err := r.Client.Delete(ctx, kafkaCsv); err != nil { - return err + return true, err } klog.Infof("kafka csv deleted") } if err := r.Client.Delete(ctx, kafkaSub); err != nil && !errors.IsNotFound(err) { - return err + return true, err } klog.Infof("kafka subscription deleted") - return nil + return false, nil } diff --git a/operator/pkg/controllers/hubofhubs/prune/prune_reconciler_test.go b/operator/pkg/controllers/hubofhubs/prune/prune_reconciler_test.go index d097a3318..1b3774b03 100644 --- a/operator/pkg/controllers/hubofhubs/prune/prune_reconciler_test.go +++ b/operator/pkg/controllers/hubofhubs/prune/prune_reconciler_test.go @@ -110,6 +110,7 @@ func TestMulticlusterGlobalHubReconcilerStrimziResources(t *testing.T) { name string initObjects []runtime.Object wantErr bool + wantRequeue bool }{ { name: "remove kafka resources", @@ -156,7 +157,8 @@ func TestMulticlusterGlobalHubReconcilerStrimziResources(t *testing.T) { }, }, }, - wantErr: true, + wantErr: false, + wantRequeue: true, }, { name: "remove subscription and csv", @@ -186,8 +188,12 @@ func TestMulticlusterGlobalHubReconcilerStrimziResources(t *testing.T) { subv1alpha1.AddToScheme(scheme.Scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(tt.initObjects...).Build() r := NewPruneReconciler(fakeClient) - if err := r.pruneStrimziResources(ctx); (err != nil) != tt.wantErr { - t.Errorf("MulticlusterGlobalHubReconciler.pruneStrimziResources() error = %v, wantErr %v", err, tt.wantErr) + needRequeue, err := r.pruneStrimziResources(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("Case:%v, MulticlusterGlobalHubReconciler.pruneStrimziResources() error = %v, wantErr %v", tt.name, err, tt.wantErr) + } + if needRequeue != tt.wantRequeue { + t.Errorf("Case:%v, MulticlusterGlobalHubReconciler.pruneStrimziResources() needRequeue = %v, wantRequeue %v", tt.name, needRequeue, tt.wantRequeue) } }) } @@ -316,7 +322,7 @@ func TestWebhookResources(t *testing.T) { fakeClient := fake.NewClientBuilder().WithScheme(scheme.Scheme).WithRuntimeObjects(tt.initObjects...).Build() r := NewPruneReconciler(fakeClient) - if err := r.Reconcile(ctx, tt.mgh); err != nil { + if _, err := r.Reconcile(ctx, tt.mgh); err != nil { t.Errorf("MulticlusterGlobalHubReconciler.reconcile() error = %v", err) } listOpts := []client.ListOption{ diff --git a/operator/pkg/controllers/hubofhubs/storage/storage_reconciler.go b/operator/pkg/controllers/hubofhubs/storage/storage_reconciler.go index 3b5d8ff69..77b1a42b0 100644 --- a/operator/pkg/controllers/hubofhubs/storage/storage_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/storage/storage_reconciler.go @@ -13,6 +13,7 @@ import ( "github.com/go-logr/logr" "github.com/jackc/pgx/v4" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/klog" ctrl "sigs.k8s.io/controller-runtime" "github.com/stolostron/multicluster-global-hub/operator/api/operator/v1alpha4" @@ -52,19 +53,22 @@ func NewStorageReconciler(mgr ctrl.Manager, enableGlobalResource bool) *StorageR } } -func (r *StorageReconciler) Reconcile(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) error { +func (r *StorageReconciler) Reconcile(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) { storageConn, err := r.ReconcileStorage(ctx, mgh) if err != nil { - return fmt.Errorf("storage not ready, Error: %v", err) + return true, fmt.Errorf("storage not ready, Error: %v", err) } _ = config.SetStorageConnection(storageConn) - err = r.reconcileDatabase(ctx, mgh) + needRequeue, err := r.reconcileDatabase(ctx, mgh) if err != nil { - return fmt.Errorf("database not ready, Error: %v", err) + return true, fmt.Errorf("database not ready, Error: %v", err) + } + if needRequeue { + return true, nil } config.SetDatabaseReady(true) - return nil + return false, nil } func (r *StorageReconciler) ReconcileStorage(ctx context.Context, @@ -99,24 +103,25 @@ func (r *StorageReconciler) ReconcileStorage(ctx context.Context, return pgConnection, nil } -func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) error { +func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4.MulticlusterGlobalHub) (bool, error) { log := r.log.WithName("database") storageConn := config.GetStorageConnection() if storageConn == nil { - return fmt.Errorf("storage connection is nil") + return true, fmt.Errorf("storage connection is nil") } if config.ContainConditionStatus(mgh, config.CONDITION_TYPE_DATABASE_INIT, config.CONDITION_STATUS_TRUE) { log.V(7).Info("database has been initialized, checking the reconcile counter") // if the operator is restarted, reconcile the database again if r.databaseReconcileCount > 0 { - return nil + return false, nil } } conn, err := database.PostgresConnection(ctx, storageConn.SuperuserDatabaseURI, storageConn.CACert) if err != nil { - return fmt.Errorf("failed to connect to database: %v", err) + klog.Infof("wait database ready") + return true, nil } defer func() { if err := conn.Close(ctx); err != nil { @@ -128,7 +133,7 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4 backupEnabled, err := utils.IsBackupEnabled(ctx, r.GetClient()) if err != nil { log.Error(err, "failed to get backup status") - return err + return true, err } if backupEnabled || !r.upgrade { @@ -143,7 +148,7 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4 _, err = conn.Exec(ctx, lockSql) if err != nil { log.Error(err, "failed to parse database_uri_with_readonlyuser") - return err + return true, err } } @@ -154,12 +159,12 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4 readonlyUsername := objURI.User.Username() if err := applySQL(ctx, conn, databaseFS, "database", readonlyUsername); err != nil { - return err + return true, err } if r.enableGlobalResource { if err := applySQL(ctx, conn, databaseOldFS, "database.old", readonlyUsername); err != nil { - return err + return true, err } } @@ -167,7 +172,7 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4 err := applySQL(ctx, conn, upgradeFS, "upgrade", readonlyUsername) if err != nil { log.Error(err, "failed to exec the upgrade sql files") - return err + return true, err } r.upgrade = true } @@ -176,9 +181,9 @@ func (r *StorageReconciler) reconcileDatabase(ctx context.Context, mgh *v1alpha4 r.databaseReconcileCount++ err = config.SetConditionDatabaseInit(ctx, r.GetClient(), mgh, config.CONDITION_STATUS_TRUE) if err != nil { - return config.FailToSetConditionError(config.CONDITION_STATUS_TRUE, err) + return true, config.FailToSetConditionError(config.CONDITION_STATUS_TRUE, err) } - return nil + return false, nil } func applySQL(ctx context.Context, conn *pgx.Conn, databaseFS embed.FS, rootDir, username string) error { diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go index 6e773be4c..b31548182 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/byo_transporter.go @@ -49,9 +49,9 @@ func (s *BYOTransporter) EnsureTopic(clusterName string) (*transport.ClusterTopi }, nil } -func (s *BYOTransporter) EnsureKafka() error { +func (s *BYOTransporter) EnsureKafka() (bool, error) { // do nothing - return nil + return false, nil } func (s *BYOTransporter) Prune(clusterName string) error { diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go index c3e1e177e..91ed6a07a 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_kafka_controller.go @@ -39,15 +39,26 @@ func (r *KafkaController) Reconcile(ctx context.Context, request ctrl.Request) ( if err != nil { return ctrl.Result{}, err } + if mgh == nil { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } if mgh.DeletionTimestamp != nil { return ctrl.Result{}, nil } - if err := r.trans.EnsureKafka(); err != nil { + needRequeue, err := r.trans.EnsureKafka() + if err != nil { return ctrl.Result{}, err } + if needRequeue { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } - if err := r.trans.kafkaClusterReady(); err != nil { - return ctrl.Result{RequeueAfter: 5 * time.Second}, err + kafkaReady, reconcileErr := r.trans.kafkaClusterReady() + if reconcileErr != nil { + return ctrl.Result{}, reconcileErr + } + if !kafkaReady { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } // use the client ca to sign the csr for the managed hubs diff --git a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go index a76fd0508..baf71ab99 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go +++ b/operator/pkg/controllers/hubofhubs/transporter/protocol/strimzi_transporter.go @@ -178,14 +178,16 @@ func WithSubName(name string) KafkaOption { } // EnsureKafka the kafka subscription, cluster, metrics, global hub user and topic -func (k *strimziTransporter) EnsureKafka() error { +func (k *strimziTransporter) EnsureKafka() (bool, error) { k.log.V(2).Info("reconcile global hub kafka transport...") err := k.ensureSubscription(k.mgh) if err != nil { - return err + return true, err } + if !config.GetKafkaResourceReady() { - return fmt.Errorf("the kafka crds is not ready") + klog.Infof("Wait kafka crd ready") + return true, nil } _, enableKRaft := k.mgh.Annotations[operatorconstants.EnableKRaft] @@ -194,18 +196,18 @@ func (k *strimziTransporter) EnsureKafka() error { // kafka metrics, monitor, global hub kafkaTopic and kafkaUser err = k.renderKafkaResources(k.mgh, enableKRaft) if err != nil { - return err + return true, err } if !enableKRaft { // TODO: use manifest to create kafka cluster err, _ = k.CreateUpdateKafkaCluster(k.mgh) if err != nil { - return err + return true, err } } - return nil + return false, nil } // renderKafkaMetricsResources renders the kafka podmonitor and metrics, and kafkaUser and kafkaTopic for global hub @@ -538,7 +540,7 @@ func (k *strimziTransporter) newKafkaUser( } // waits for kafka cluster to be ready and returns nil if kafka cluster ready -func (k *strimziTransporter) kafkaClusterReady() error { +func (k *strimziTransporter) kafkaClusterReady() (bool, error) { kafkaCluster := &kafkav1beta2.Kafka{} err := k.manager.GetClient().Get(k.ctx, types.NamespacedName{ Name: k.kafkaClusterName, @@ -546,10 +548,13 @@ func (k *strimziTransporter) kafkaClusterReady() error { }, kafkaCluster) if err != nil { k.log.V(2).Info("fail to get the kafka cluster, waiting", "message", err.Error()) - return err + if errors.IsNotFound(err) { + return false, nil + } + return false, err } if kafkaCluster.Status == nil || kafkaCluster.Status.Conditions == nil { - return fmt.Errorf("kafka cluster status is not ready") + return false, nil } if kafkaCluster.Spec != nil && kafkaCluster.Spec.Kafka.Listeners != nil { @@ -565,12 +570,15 @@ func (k *strimziTransporter) kafkaClusterReady() error { } for _, condition := range kafkaCluster.Status.Conditions { - if *condition.Type == "Ready" && *condition.Status == "True" { - k.log.Info("kafka cluster is ready") - return nil + if *condition.Type == "Ready" { + if *condition.Status == "True" { + k.log.Info("kafka cluster is ready") + return true, nil + } } } - return fmt.Errorf("kafka cluster is not ready") + klog.Infof("Wait kafka cluster ready") + return false, nil } func (k *strimziTransporter) CreateUpdateKafkaCluster(mgh *operatorv1alpha4.MulticlusterGlobalHub) (error, bool) { diff --git a/operator/pkg/controllers/hubofhubs/transporter/transport_reconciler.go b/operator/pkg/controllers/hubofhubs/transporter/transport_reconciler.go index d4e01238f..ff03ebe12 100644 --- a/operator/pkg/controllers/hubofhubs/transporter/transport_reconciler.go +++ b/operator/pkg/controllers/hubofhubs/transporter/transport_reconciler.go @@ -38,7 +38,7 @@ func (r *TransportReconciler) Reconcile(ctx context.Context, mgh *v1alpha4.Multi protocol.WithContext(ctx), protocol.WithCommunity(operatorutils.IsCommunityMode()), ) - if err := r.transporter.EnsureKafka(); err != nil { + if _, err := r.transporter.EnsureKafka(); err != nil { return err } // update the transporter diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index a2e5def89..1126c5d2c 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -28,7 +28,7 @@ type Transporter interface { // CreateTopic creates/updates a kafka topic EnsureTopic(clusterName string) (*ClusterTopic, error) // EnsureKafka creates/updates a kafka - EnsureKafka() error + EnsureKafka() (bool, error) // Cleanup will delete the user or topic for the cluster Prune(clusterName string) error diff --git a/test/integration/operator/hubofhubs/manager_test.go b/test/integration/operator/hubofhubs/manager_test.go index 0c86fea00..93d9a46a4 100644 --- a/test/integration/operator/hubofhubs/manager_test.go +++ b/test/integration/operator/hubofhubs/manager_test.go @@ -69,8 +69,9 @@ var _ = Describe("manager", Ordered, func() { GlobalResourceEnabled: true, }) - err := reconciler.Reconcile(ctx, mgh) + needRequeue, err := reconciler.Reconcile(ctx, mgh) Expect(err).To(Succeed()) + Expect(needRequeue).To(BeFalse()) // deployment Eventually(func() error { diff --git a/test/integration/operator/hubofhubs/storage_test.go b/test/integration/operator/hubofhubs/storage_test.go index a176e8f4a..6cb72bd3c 100644 --- a/test/integration/operator/hubofhubs/storage_test.go +++ b/test/integration/operator/hubofhubs/storage_test.go @@ -72,7 +72,8 @@ var _ = Describe("storage", Ordered, func() { // the subscription Eventually(func() error { - return storageReconciler.Reconcile(ctx, mgh) + _, err = storageReconciler.Reconcile(ctx, mgh) + return err }, 10*time.Second, 100*time.Millisecond).ShouldNot(HaveOccurred()) err = runtimeClient.Get(ctx, client.ObjectKeyFromObject(mgh), mgh)