Skip to content

Commit

Permalink
Add machine name to grpc request in order to remove workaround of usi…
Browse files Browse the repository at this point in the history
…ng a global variable that checks whether snr is created for a machine.

Signed-off-by: Michael Shitrit <[email protected]>
  • Loading branch information
mshitrit committed Dec 25, 2023
1 parent 7a1b271 commit 07f7eb7
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 66 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ bundle-build: bundle bundle-update ## Build the bundle image.
bundle-push: ## Push the bundle image.
docker push $(BUNDLE_IMG)

protoc-gen-peerhealth: ## Download protoc (protocol buffers tool needed for gRPC)
${PROTOC} --go_out=$(shell pwd) \
--go-grpc_out=$(shell pwd) \
--proto_path=$(shell pwd)/pkg/peerhealth \
$(shell pwd)/pkg/peerhealth/peerhealth.proto


.PHONY: protoc
PROTOC = $(shell pwd)/bin/proto/bin/protoc
protoc: protoc-gen-go protoc-gen-go-grpc ## Download protoc (protocol buffers tool needed for gRPC)
Expand Down
14 changes: 1 addition & 13 deletions controllers/selfnoderemediation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ var (
Effect: v1.TaintEffectNoExecute,
}

lastSeenSnrNamespace string
isSnrMatchMachineName bool
lastSeenSnrNamespace string
)

type processingChangeReason string
Expand Down Expand Up @@ -126,14 +125,6 @@ func (r *SelfNodeRemediationReconciler) GetLastSeenSnrNamespace() string {
return lastSeenSnrNamespace
}

// IsSnrMatchMachineName returns a boolean indicating if the last reconcile SNR
// was pointing an unhealthy machine or a node
func (r *SelfNodeRemediationReconciler) IsSnrMatchMachineName() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return isSnrMatchMachineName
}

// SelfNodeRemediationReconciler reconciles a SelfNodeRemediation object
type SelfNodeRemediationReconciler struct {
client.Client
Expand Down Expand Up @@ -657,9 +648,6 @@ func (r *SelfNodeRemediationReconciler) getNodeFromSnr(snr *v1alpha1.SelfNodeRem
if !r.isOwnedByNHC(snr) {
for _, ownerRef := range snr.OwnerReferences {
if ownerRef.Kind == "Machine" {
r.mutex.Lock()
isSnrMatchMachineName = true
r.mutex.Unlock()
return r.getNodeFromMachine(ownerRef, snr.Namespace)
}
}
Expand Down
37 changes: 32 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
Expand All @@ -29,14 +30,18 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -60,11 +65,11 @@ import (
)

const (
nodeNameEnvVar = "MY_NODE_NAME"

WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
nodeNameEnvVar = "MY_NODE_NAME"
machineAnnotation = "machine.openshift.io/machine"
WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
)

var (
Expand Down Expand Up @@ -312,9 +317,16 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
// init certificate reader
certReader := certificates.NewSecretCertStorage(mgr.GetClient(), ctrl.Log.WithName("SecretCertStorage"), ns)

var machineName string
if machineName, err = getMachineName(mgr.GetAPIReader(), myNodeName); err != nil {
setupLog.Error(err, "error when trying to fetch machine name")
os.Exit(1)
}

apiConnectivityCheckConfig := &apicheck.ApiConnectivityCheckConfig{
Log: ctrl.Log.WithName("api-check"),
MyNodeName: myNodeName,
MyMachineName: machineName,
CheckInterval: apiCheckInterval,
MaxErrorsThreshold: maxErrorThreshold,
Peers: myPeers,
Expand Down Expand Up @@ -371,6 +383,21 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
}
}

func getMachineName(reader client.Reader, myNodeName string) (string, error) {
var machineName string
var err error
node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: myNodeName}}
if err = reader.Get(context.Background(), client.ObjectKeyFromObject(node), node); err == nil {
if namespacedMachine, exists := node.GetAnnotations()[machineAnnotation]; exists {
if _, machineName, err = cache.SplitMetaNamespaceKey(namespacedMachine); err != nil {
return machineName, err
}
}

}
return machineName, err
}

func configureWebhookServer(mgr ctrl.Manager, enableHTTP2 bool) {

server := mgr.GetWebhookServer()
Expand Down
4 changes: 3 additions & 1 deletion pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ApiConnectivityCheck struct {
type ApiConnectivityCheckConfig struct {
Log logr.Logger
MyNodeName string
MyMachineName string
CheckInterval time.Duration
MaxErrorsThreshold int
Peers *peers.Peers
Expand Down Expand Up @@ -275,7 +276,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
defer cancel()

resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{
NodeName: c.config.MyNodeName,
NodeName: c.config.MyNodeName,
MachineName: c.config.MyMachineName,
})
if err != nil {
logger.Error(err, "failed to read health response from peer")
Expand Down
40 changes: 25 additions & 15 deletions pkg/peerhealth/peerhealth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/peerhealth/peerhealth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service PeerHealth {

message HealthRequest {
string nodeName = 1;
string machineName = 2;
}

message HealthResponse {
Expand Down
35 changes: 3 additions & 32 deletions pkg/peerhealth/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

selfNodeRemediationApis "github.com/medik8s/self-node-remediation/api"
"github.com/medik8s/self-node-remediation/api/v1alpha1"
Expand All @@ -26,7 +25,6 @@ import (

const (
connectionTimeout = 5 * time.Second
machineAnnotation = "machine.openshift.io/machine" //todo this is openshift specific
//IMPORTANT! this MUST be less than PeerRequestTimeout in apicheck
//The difference between them should allow some time for sending the request over the network
//todo enforce this
Expand Down Expand Up @@ -124,7 +122,7 @@ func (s Server) IsHealthy(ctx context.Context, request *HealthRequest) (*HealthR
s.log.Info("checking health for", "node", nodeName)

namespace := s.snr.GetLastSeenSnrNamespace()
isMachine := s.snr.IsSnrMatchMachineName()
isMachine := len(request.GetMachineName()) > 0

// when namespace is empty, there wasn't a SNR yet, which also means that the node must be healthy
if namespace == "" {
Expand All @@ -140,39 +138,12 @@ func (s Server) IsHealthy(ctx context.Context, request *HealthRequest) (*HealthR
}

if isMachine {
return toResponse(s.isHealthyMachine(ctx, nodeName, namespace))
return toResponse(s.isHealthyBySnr(ctx, request.MachineName, namespace))
} else {
return toResponse(s.isHealthyNode(ctx, nodeName, namespace))
return toResponse(s.isHealthyBySnr(ctx, nodeName, namespace))
}
}

func (s Server) isHealthyNode(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
return s.isHealthyBySnr(ctx, nodeName, namespace)
}

func (s Server) isHealthyMachine(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
node, err := s.getNode(ctx, nodeName)
if err != nil {
return selfNodeRemediationApis.ApiError
}

ann := node.GetAnnotations()
namespacedMachine, exists := ann[machineAnnotation]

if !exists {
s.log.Info("node doesn't have machine annotation")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
}
_, machineName, err := cache.SplitMetaNamespaceKey(namespacedMachine)

if err != nil {
s.log.Error(err, "failed to parse machine annotation on the node")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
}

return s.isHealthyBySnr(ctx, machineName, namespace)
}

func (s Server) isHealthyBySnr(ctx context.Context, snrName string, snrNamespace string) selfNodeRemediationApis.HealthCheckResponseCode {
apiCtx, cancelFunc := context.WithTimeout(ctx, apiServerTimeout)
defer cancelFunc()
Expand Down

0 comments on commit 07f7eb7

Please sign in to comment.