From 17d06622c4a55d00647ee3100745c8075cde1e81 Mon Sep 17 00:00:00 2001 From: Ivan Kolodiazhnyi Date: Mon, 7 Aug 2023 12:43:02 +0300 Subject: [PATCH] Implement migration controller This commit introduced drain state migrtation from node annotation to SriovNetworkNodeState object. --- controllers/migration_controller.go | 36 +++++++++++++++++++---------- main.go | 23 +++++++++++++++--- pkg/daemon/daemon.go | 11 ++++++++- pkg/daemon/writer.go | 3 ++- 4 files changed, 56 insertions(+), 17 deletions(-) diff --git a/controllers/migration_controller.go b/controllers/migration_controller.go index 498cf06288..332b3a1c47 100644 --- a/controllers/migration_controller.go +++ b/controllers/migration_controller.go @@ -2,10 +2,10 @@ package controllers import ( "context" - "fmt" - v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" - constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -13,14 +13,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" + snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" + constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" ) type MigrationReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + ClientSet snclientset.Interface } -//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -35,6 +40,9 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) err := mr.Get(ctx, types.NamespacedName{ Name: req.Name}, node) if err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } reqLogger.Error(err, "Error occurred on GET SriovOperatorConfig request from API server.") return reconcile.Result{}, err } @@ -47,15 +55,19 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request) reqLogger.Error(err, "Error occurred on GET SriovNetworkNodeState request from API server.") return reconcile.Result{}, err } - patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno)) - err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch)) - if err != nil { - reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.") - return reconcile.Result{}, err - } + nodeState.Status.DrainStatus = anno + mr.ClientSet.SriovnetworkV1().SriovNetworkNodeStates(namespace).UpdateStatus(context.TODO(), nodeState, metav1.UpdateOptions{}) + delete(node.Annotations, constants.NodeDrainAnnotation) + mr.Update(ctx, node) + //patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno)) + //err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch)) + //if err != nil { + // reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.") + // return reconcile.Result{}, err + //} } - + return reconcile.Result{}, nil } diff --git a/main.go b/main.go index 39d9109c07..b9e398d366 100644 --- a/main.go +++ b/main.go @@ -20,12 +20,15 @@ import ( "context" "flag" "fmt" - "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/kubectl/pkg/drain" "os" "time" + "github.com/golang/glog" netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1" openshiftconfigv1 "github.com/openshift/api/config/v1" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" @@ -49,6 +52,7 @@ import ( sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1" "github.com/k8snetworkplumbingwg/sriov-network-operator/controllers" + snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection" "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils" @@ -136,6 +140,18 @@ func main() { os.Exit(1) } + var config *rest.Config + kubeconfig := os.Getenv("KUBECONFIG") + if kubeconfig != "" { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) + } else { + // creates the in-cluster config + config, err = rest.InClusterConfig() + } + + setupLog.Info("###config: ", "config", config) + snclient := snclientset.NewForConfigOrDie(config) + if err = (&controllers.SriovNetworkReconciler{ Client: mgrGlobal.GetClient(), Scheme: mgrGlobal.GetScheme(), @@ -199,8 +215,9 @@ func main() { os.Exit(1) } if err = (&controllers.MigrationReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ClientSet: snclient, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MigrationReconciler") os.Exit(1) diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 93f60af6fa..f4250a927e 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -53,6 +53,7 @@ const ( ) type Message struct { + drainStatus string syncStatus string lastSyncError string } @@ -300,6 +301,7 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { glog.Warningf("Got an error: %v", err) if more { dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: syncStatusFailed, lastSyncError: err.Error(), } @@ -356,8 +358,9 @@ func (dn *Daemon) processNextWorkItem() bool { err := dn.nodeStateSyncHandler() if err != nil { - // Ereport error message, and put the item back to work queue for retry. + // Report error message, and put the item back to work queue for retry. dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: syncStatusFailed, lastSyncError: err.Error(), } @@ -454,6 +457,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { // add the error but don't requeue dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: syncStatusFailed, lastSyncError: sriovResult.LastSyncError, } @@ -466,6 +470,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { if latestState.Status.LastSyncError != "" || latestState.Status.SyncStatus != syncStatusSucceeded { dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: syncStatusSucceeded, lastSyncError: "", } @@ -480,6 +485,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { glog.V(0).Infof("nodeStateSyncHandler(): Name: %s, Interface policy spec not yet set by controller", latestState.Name) if latestState.Status.SyncStatus != "Succeeded" { dn.refreshCh <- Message{ + drainStatus: latestState.Status.DrainStatus, syncStatus: "Succeeded", lastSyncError: "", } @@ -490,6 +496,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { } dn.refreshCh <- Message{ + drainStatus: latestState.Status.DrainStatus, syncStatus: syncStatusInProgress, lastSyncError: "", } @@ -648,11 +655,13 @@ func (dn *Daemon) nodeStateSyncHandler() error { dn.nodeState = latestState.DeepCopy() if dn.useSystemdService { dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: sriovResult.SyncStatus, lastSyncError: sriovResult.LastSyncError, } } else { dn.refreshCh <- Message{ + drainStatus: dn.nodeState.Status.DrainStatus, syncStatus: syncStatusSucceeded, lastSyncError: "", } diff --git a/pkg/daemon/writer.go b/pkg/daemon/writer.go index 2a6fc6384b..0a83f2da81 100644 --- a/pkg/daemon/writer.go +++ b/pkg/daemon/writer.go @@ -161,13 +161,14 @@ func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) { nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) { nodeState.Status.Interfaces = w.status.Interfaces + nodeState.Status.DrainStatus = msg.drainStatus if msg.lastSyncError != "" || msg.syncStatus == syncStatusSucceeded { // clear lastSyncError when sync Succeeded nodeState.Status.LastSyncError = msg.lastSyncError } nodeState.Status.SyncStatus = msg.syncStatus - glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError) + glog.V(0).Infof("setNodeStateStatus(): drainStatus: %s, syncStatus: %s, lastSyncError: %s", nodeState.Status.DrainStatus, nodeState.Status.SyncStatus, nodeState.Status.LastSyncError) }) if err != nil { return nil, err