Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add machine name to grpc #172

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 2 additions & 29 deletions controllers/selfnoderemediation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -87,9 +86,6 @@ var (
Value: "nodeshutdown",
Effect: v1.TaintEffectNoExecute,
}

lastSeenSnrNamespace string
isSnrMatchMachineName bool
)

type processingChangeReason string
Expand Down Expand Up @@ -119,21 +115,6 @@ func (e *UnreconcilableError) Error() string {
return e.msg
}

// GetLastSeenSnrNamespace returns the namespace of the last reconciled SNR
func (r *SelfNodeRemediationReconciler) GetLastSeenSnrNamespace() string {
r.mutex.Lock()
defer r.mutex.Unlock()
return lastSeenSnrNamespace
}

// IsSnrMatchMachineName returns a boolean indicating if the last reconcile SNR
// was pointing an unhealthy machine or a node
func (r *SelfNodeRemediationReconciler) IsSnrMatchMachineName() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return isSnrMatchMachineName
}

// SelfNodeRemediationReconciler reconciles a SelfNodeRemediation object
type SelfNodeRemediationReconciler struct {
client.Client
Expand All @@ -144,7 +125,6 @@ type SelfNodeRemediationReconciler struct {
Recorder record.EventRecorder
Rebooter reboot.Rebooter
MyNodeName string
mutex sync.Mutex
//we need to restore the node only after the cluster realized it can reschedule the affected workloads
//as of writing this lines, kubernetes will check for pods with non-existent node once in 20s, and allows
//40s of grace period for the node to reappear before it deletes the pods.
Expand Down Expand Up @@ -227,10 +207,6 @@ func (r *SelfNodeRemediationReconciler) Reconcile(ctx context.Context, req ctrl.
}
}

r.mutex.Lock()
lastSeenSnrNamespace = req.Namespace
r.mutex.Unlock()

result := ctrl.Result{}
var err error

Expand Down Expand Up @@ -654,12 +630,9 @@ func (r *SelfNodeRemediationReconciler) getNodeFromSnr(snr *v1alpha1.SelfNodeRem
//by a node based controller (e.g. NHC).
//In case snr is created with machine owner reference if NHC isn't it's owner it means
//it was created by a machine based controller (e.g. MHC).
if !r.isOwnedByNHC(snr) {
if !IsOwnedByNHC(snr) {
for _, ownerRef := range snr.OwnerReferences {
if ownerRef.Kind == "Machine" {
r.mutex.Lock()
isSnrMatchMachineName = true
r.mutex.Unlock()
return r.getNodeFromMachine(ownerRef, snr.Namespace)
}
}
Expand Down Expand Up @@ -939,7 +912,7 @@ func (r *SelfNodeRemediationReconciler) getRuntimeStrategy(strategy v1alpha1.Rem

}

func (r *SelfNodeRemediationReconciler) isOwnedByNHC(snr *v1alpha1.SelfNodeRemediation) bool {
func IsOwnedByNHC(snr *v1alpha1.SelfNodeRemediation) bool {
for _, ownerRef := range snr.OwnerReferences {
if ownerRef.Kind == "NodeHealthCheck" {
return true
Expand Down
37 changes: 32 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
Expand All @@ -29,14 +30,18 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -60,11 +65,11 @@ import (
)

const (
nodeNameEnvVar = "MY_NODE_NAME"

WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
nodeNameEnvVar = "MY_NODE_NAME"
machineAnnotation = "machine.openshift.io/machine"
WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
)

var (
Expand Down Expand Up @@ -312,9 +317,16 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
// init certificate reader
certReader := certificates.NewSecretCertStorage(mgr.GetClient(), ctrl.Log.WithName("SecretCertStorage"), ns)

var machineName string
if machineName, err = getMachineName(mgr.GetAPIReader(), myNodeName); err != nil {
setupLog.Error(err, "error when trying to fetch machine name")
os.Exit(1)
}

apiConnectivityCheckConfig := &apicheck.ApiConnectivityCheckConfig{
Log: ctrl.Log.WithName("api-check"),
MyNodeName: myNodeName,
MyMachineName: machineName,
CheckInterval: apiCheckInterval,
MaxErrorsThreshold: maxErrorThreshold,
Peers: myPeers,
Expand Down Expand Up @@ -371,6 +383,21 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
}
}

func getMachineName(reader client.Reader, myNodeName string) (string, error) {
var machineName string
var err error
node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: myNodeName}}
if err = reader.Get(context.Background(), client.ObjectKeyFromObject(node), node); err != nil {
return machineName, err
}
if namespacedMachine, exists := node.GetAnnotations()[machineAnnotation]; exists {
_, machineName, err = cache.SplitMetaNamespaceKey(namespacedMachine)
return machineName, err
}
//Machine name not found
return "", nil
}

func configureWebhookServer(mgr ctrl.Manager, enableHTTP2 bool) {

server := mgr.GetWebhookServer()
Expand Down
4 changes: 3 additions & 1 deletion pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ApiConnectivityCheck struct {
type ApiConnectivityCheckConfig struct {
Log logr.Logger
MyNodeName string
MyMachineName string
CheckInterval time.Duration
MaxErrorsThreshold int
Peers *peers.Peers
Expand Down Expand Up @@ -275,7 +276,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
defer cancel()

resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{
NodeName: c.config.MyNodeName,
NodeName: c.config.MyNodeName,
MachineName: c.config.MyMachineName,
})
if err != nil {
logger.Error(err, "failed to read health response from peer")
Expand Down
7 changes: 3 additions & 4 deletions pkg/peerhealth/client_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,14 @@ var _ = Describe("Checking health using grpc client and server", func() {
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
Namespace: "default",
OwnerReferences: []metav1.OwnerReference{
{Name: "Dummy", Kind: "NodeHealthCheck", APIVersion: "Dummy", UID: "Dummy"},
},
},
}
err := k8sClient.Create(context.Background(), snr)
Expect(err).ToNot(HaveOccurred())

// wait until reconciled
Eventually(func() bool {
return snrReconciler.GetLastSeenSnrNamespace() != ""
}, 5*time.Second, 250*time.Millisecond).Should(BeTrue(), "SNR not reconciled")
})

It("should return unhealthy", func() {
Expand Down
40 changes: 25 additions & 15 deletions pkg/peerhealth/peerhealth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/peerhealth/peerhealth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service PeerHealth {

message HealthRequest {
string nodeName = 1;
string machineName = 2;
}

message HealthResponse {
Expand Down
64 changes: 15 additions & 49 deletions pkg/peerhealth/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

selfNodeRemediationApis "github.com/medik8s/self-node-remediation/api"
"github.com/medik8s/self-node-remediation/api/v1alpha1"
Expand All @@ -26,7 +25,6 @@ import (

const (
connectionTimeout = 5 * time.Second
machineAnnotation = "machine.openshift.io/machine" //todo this is openshift specific
//IMPORTANT! this MUST be less than PeerRequestTimeout in apicheck
//The difference between them should allow some time for sending the request over the network
//todo enforce this
Expand Down Expand Up @@ -121,56 +119,24 @@ func (s Server) IsHealthy(ctx context.Context, request *HealthRequest) (*HealthR
return nil, fmt.Errorf("empty node name in HealthRequest")
}

s.log.Info("checking health for", "node", nodeName)

namespace := s.snr.GetLastSeenSnrNamespace()
isMachine := s.snr.IsSnrMatchMachineName()

// when namespace is empty, there wasn't a SNR yet, which also means that the node must be healthy
if namespace == "" {
// we didn't see a SNR yet, so the node is healthy
// but we need to check for API error, so let's get node
if _, err := s.getNode(ctx, nodeName); err != nil {
// TODO do we need to deal with isNotFound, and if so, how?
s.log.Info("no SNR seen yet, and API server issue, returning API error", "api error", err)
return toResponse(selfNodeRemediationApis.ApiError)
}
s.log.Info("no SNR seen yet, node is healthy")
return toResponse(selfNodeRemediationApis.Healthy)
}

if isMachine {
return toResponse(s.isHealthyMachine(ctx, nodeName, namespace))
} else {
return toResponse(s.isHealthyNode(ctx, nodeName, namespace))
}
}

func (s Server) isHealthyNode(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
return s.isHealthyBySnr(ctx, nodeName, namespace)
}

func (s Server) isHealthyMachine(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
node, err := s.getNode(ctx, nodeName)
if err != nil {
return selfNodeRemediationApis.ApiError
}

ann := node.GetAnnotations()
namespacedMachine, exists := ann[machineAnnotation]

if !exists {
s.log.Info("node doesn't have machine annotation")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
//fetch all snrs from all ns
snrs := &v1alpha1.SelfNodeRemediationList{}
if err := s.snr.List(ctx, snrs); err != nil {
s.log.Error(err, "failed to fetch snrs")
return nil, err
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
}
_, machineName, err := cache.SplitMetaNamespaceKey(namespacedMachine)

if err != nil {
s.log.Error(err, "failed to parse machine annotation on the node")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
//return healthy only if all of snrs are considered healthy for that node
for _, snr := range snrs.Items {
if controllers.IsOwnedByNHC(&snr) {
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
if healthCode := s.isHealthyBySnr(ctx, request.NodeName, snr.Namespace); healthCode != selfNodeRemediationApis.Healthy {
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
return toResponse(healthCode)
}
} else if healthCode := s.isHealthyBySnr(ctx, request.MachineName, snr.Namespace); healthCode != selfNodeRemediationApis.Healthy {
return toResponse(healthCode)
}
}

return s.isHealthyBySnr(ctx, machineName, namespace)
return toResponse(selfNodeRemediationApis.Healthy)
}

func (s Server) isHealthyBySnr(ctx context.Context, snrName string, snrNamespace string) selfNodeRemediationApis.HealthCheckResponseCode {
Expand Down