Skip to content

Commit

Permalink
Implement migration controller
Browse files Browse the repository at this point in the history
This commit introduced drain state migrtation from node annotation
to SriovNetworkNodeState object.
  • Loading branch information
e0ne committed Aug 22, 2023
1 parent 807c34d commit 17d0662
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 17 deletions.
36 changes: 24 additions & 12 deletions controllers/migration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package controllers

import (
"context"
"fmt"
v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
)

type MigrationReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
ClientSet snclientset.Interface
}

//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch;create;update;patch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
Expand All @@ -35,6 +40,9 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err := mr.Get(ctx, types.NamespacedName{
Name: req.Name}, node)
if err != nil {
if apierrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
reqLogger.Error(err, "Error occurred on GET SriovOperatorConfig request from API server.")
return reconcile.Result{}, err
}
Expand All @@ -47,15 +55,19 @@ func (mr *MigrationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
reqLogger.Error(err, "Error occurred on GET SriovNetworkNodeState request from API server.")
return reconcile.Result{}, err
}
patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno))
err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch))
if err != nil {
reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.")
return reconcile.Result{}, err
}
nodeState.Status.DrainStatus = anno
mr.ClientSet.SriovnetworkV1().SriovNetworkNodeStates(namespace).UpdateStatus(context.TODO(), nodeState, metav1.UpdateOptions{})
delete(node.Annotations, constants.NodeDrainAnnotation)
mr.Update(ctx, node)
//patch := []byte(fmt.Sprintf(`{"status":{"drainStatus":"%s"}}`, anno))
//err = mr.Client.Patch(context.TODO(), nodeState, client.RawPatch(types.StrategicMergePatchType, patch))
//if err != nil {
// reqLogger.Error(err, "Error occurred on SriovNetworkNodeState update.")
// return reconcile.Result{}, err
//}

Check failure on line 68 in controllers/migration_controller.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

unnecessary trailing newline (whitespace)
}

return reconcile.Result{}, nil
}

Expand Down
23 changes: 20 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"context"
"flag"
"fmt"
"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubectl/pkg/drain"
"os"
"time"

"github.com/golang/glog"
netattdefv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
openshiftconfigv1 "github.com/openshift/api/config/v1"
mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
Expand All @@ -49,6 +52,7 @@ import (

sriovnetworkv1 "github.com/k8snetworkplumbingwg/sriov-network-operator/api/v1"
"github.com/k8snetworkplumbingwg/sriov-network-operator/controllers"
snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned"
constants "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/consts"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/leaderelection"
"github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/utils"
Expand Down Expand Up @@ -136,6 +140,18 @@ func main() {
os.Exit(1)
}

var config *rest.Config
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)

Check failure on line 146 in main.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

ineffectual assignment to err (ineffassign)

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This definition of err is never used.
} else {
// creates the in-cluster config
config, err = rest.InClusterConfig()

Check failure on line 149 in main.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

ineffectual assignment to err (ineffassign)

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This definition of err is never used.
}

setupLog.Info("###config: ", "config", config)
snclient := snclientset.NewForConfigOrDie(config)

if err = (&controllers.SriovNetworkReconciler{
Client: mgrGlobal.GetClient(),
Scheme: mgrGlobal.GetScheme(),
Expand Down Expand Up @@ -199,8 +215,9 @@ func main() {
os.Exit(1)
}
if err = (&controllers.MigrationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
ClientSet: snclient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MigrationReconciler")
os.Exit(1)
Expand Down
11 changes: 10 additions & 1 deletion pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
)

type Message struct {
drainStatus string
syncStatus string
lastSyncError string
}
Expand Down Expand Up @@ -300,6 +301,7 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
glog.Warningf("Got an error: %v", err)
if more {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: err.Error(),
}
Expand Down Expand Up @@ -356,8 +358,9 @@ func (dn *Daemon) processNextWorkItem() bool {

err := dn.nodeStateSyncHandler()
if err != nil {
// Ereport error message, and put the item back to work queue for retry.
// Report error message, and put the item back to work queue for retry.
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: err.Error(),
}
Expand Down Expand Up @@ -454,6 +457,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {

// add the error but don't requeue
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusFailed,
lastSyncError: sriovResult.LastSyncError,
}
Expand All @@ -466,6 +470,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
if latestState.Status.LastSyncError != "" ||
latestState.Status.SyncStatus != syncStatusSucceeded {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusSucceeded,
lastSyncError: "",
}
Expand All @@ -480,6 +485,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
glog.V(0).Infof("nodeStateSyncHandler(): Name: %s, Interface policy spec not yet set by controller", latestState.Name)
if latestState.Status.SyncStatus != "Succeeded" {
dn.refreshCh <- Message{
drainStatus: latestState.Status.DrainStatus,
syncStatus: "Succeeded",
lastSyncError: "",
}
Expand All @@ -490,6 +496,7 @@ func (dn *Daemon) nodeStateSyncHandler() error {
}

dn.refreshCh <- Message{
drainStatus: latestState.Status.DrainStatus,
syncStatus: syncStatusInProgress,
lastSyncError: "",
}
Expand Down Expand Up @@ -648,11 +655,13 @@ func (dn *Daemon) nodeStateSyncHandler() error {
dn.nodeState = latestState.DeepCopy()
if dn.useSystemdService {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: sriovResult.SyncStatus,
lastSyncError: sriovResult.LastSyncError,
}
} else {
dn.refreshCh <- Message{
drainStatus: dn.nodeState.Status.DrainStatus,
syncStatus: syncStatusSucceeded,
lastSyncError: "",
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/daemon/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ func (w *NodeStateStatusWriter) updateNodeStateStatusRetry(f func(*sriovnetworkv
func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1.SriovNetworkNodeState, error) {
nodeState, err := w.updateNodeStateStatusRetry(func(nodeState *sriovnetworkv1.SriovNetworkNodeState) {
nodeState.Status.Interfaces = w.status.Interfaces
nodeState.Status.DrainStatus = msg.drainStatus
if msg.lastSyncError != "" || msg.syncStatus == syncStatusSucceeded {
// clear lastSyncError when sync Succeeded
nodeState.Status.LastSyncError = msg.lastSyncError
}
nodeState.Status.SyncStatus = msg.syncStatus

glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)
glog.V(0).Infof("setNodeStateStatus(): drainStatus: %s, syncStatus: %s, lastSyncError: %s", nodeState.Status.DrainStatus, nodeState.Status.SyncStatus, nodeState.Status.LastSyncError)
})
if err != nil {
return nil, err
Expand Down

0 comments on commit 17d0662

Please sign in to comment.