Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add machine name to grpc #172

Merged
merged 5 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ bundle-build: bundle bundle-update ## Build the bundle image.
bundle-push: ## Push the bundle image.
docker push $(BUNDLE_IMG)

protoc-gen-peerhealth: ## Download protoc (protocol buffers tool needed for gRPC)
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
${PROTOC} --go_out=$(shell pwd) \
--go-grpc_out=$(shell pwd) \
--proto_path=$(shell pwd)/pkg/peerhealth \
$(shell pwd)/pkg/peerhealth/peerhealth.proto


.PHONY: protoc
PROTOC = $(shell pwd)/bin/proto/bin/protoc
protoc: protoc-gen-go protoc-gen-go-grpc ## Download protoc (protocol buffers tool needed for gRPC)
Expand Down
14 changes: 1 addition & 13 deletions controllers/selfnoderemediation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ var (
Effect: v1.TaintEffectNoExecute,
}

lastSeenSnrNamespace string
isSnrMatchMachineName bool
lastSeenSnrNamespace string
)

type processingChangeReason string
Expand Down Expand Up @@ -126,14 +125,6 @@ func (r *SelfNodeRemediationReconciler) GetLastSeenSnrNamespace() string {
return lastSeenSnrNamespace
}

// IsSnrMatchMachineName returns a boolean indicating if the last reconcile SNR
// was pointing an unhealthy machine or a node
func (r *SelfNodeRemediationReconciler) IsSnrMatchMachineName() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return isSnrMatchMachineName
}

// SelfNodeRemediationReconciler reconciles a SelfNodeRemediation object
type SelfNodeRemediationReconciler struct {
client.Client
Expand Down Expand Up @@ -657,9 +648,6 @@ func (r *SelfNodeRemediationReconciler) getNodeFromSnr(snr *v1alpha1.SelfNodeRem
if !r.isOwnedByNHC(snr) {
for _, ownerRef := range snr.OwnerReferences {
if ownerRef.Kind == "Machine" {
r.mutex.Lock()
isSnrMatchMachineName = true
r.mutex.Unlock()
return r.getNodeFromMachine(ownerRef, snr.Namespace)
}
}
Expand Down
37 changes: 32 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
Expand All @@ -29,14 +30,18 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/tools/cache"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -60,11 +65,11 @@ import (
)

const (
nodeNameEnvVar = "MY_NODE_NAME"

WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
nodeNameEnvVar = "MY_NODE_NAME"
machineAnnotation = "machine.openshift.io/machine"
WebhookCertDir = "/apiserver.local.config/certificates"
WebhookCertName = "apiserver.crt"
WebhookKeyName = "apiserver.key"
)

var (
Expand Down Expand Up @@ -312,9 +317,16 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
// init certificate reader
certReader := certificates.NewSecretCertStorage(mgr.GetClient(), ctrl.Log.WithName("SecretCertStorage"), ns)

var machineName string
if machineName, err = getMachineName(mgr.GetAPIReader(), myNodeName); err != nil {
setupLog.Error(err, "error when trying to fetch machine name")
os.Exit(1)
}

apiConnectivityCheckConfig := &apicheck.ApiConnectivityCheckConfig{
Log: ctrl.Log.WithName("api-check"),
MyNodeName: myNodeName,
MyMachineName: machineName,
CheckInterval: apiCheckInterval,
MaxErrorsThreshold: maxErrorThreshold,
Peers: myPeers,
Expand Down Expand Up @@ -371,6 +383,21 @@ func initSelfNodeRemediationAgent(mgr manager.Manager) {
}
}

func getMachineName(reader client.Reader, myNodeName string) (string, error) {
var machineName string
var err error
node := &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: myNodeName}}
if err = reader.Get(context.Background(), client.ObjectKeyFromObject(node), node); err == nil {
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
if namespacedMachine, exists := node.GetAnnotations()[machineAnnotation]; exists {
if _, machineName, err = cache.SplitMetaNamespaceKey(namespacedMachine); err != nil {
return machineName, err
}
}

}
return machineName, err
}

func configureWebhookServer(mgr ctrl.Manager, enableHTTP2 bool) {

server := mgr.GetWebhookServer()
Expand Down
4 changes: 3 additions & 1 deletion pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ApiConnectivityCheck struct {
type ApiConnectivityCheckConfig struct {
Log logr.Logger
MyNodeName string
MyMachineName string
CheckInterval time.Duration
MaxErrorsThreshold int
Peers *peers.Peers
Expand Down Expand Up @@ -275,7 +276,8 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
defer cancel()

resp, err := phClient.IsHealthy(ctx, &peerhealth.HealthRequest{
NodeName: c.config.MyNodeName,
NodeName: c.config.MyNodeName,
MachineName: c.config.MyMachineName,
})
if err != nil {
logger.Error(err, "failed to read health response from peer")
Expand Down
40 changes: 25 additions & 15 deletions pkg/peerhealth/peerhealth.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/peerhealth/peerhealth.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ service PeerHealth {

message HealthRequest {
string nodeName = 1;
string machineName = 2;
}

message HealthResponse {
Expand Down
35 changes: 3 additions & 32 deletions pkg/peerhealth/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

selfNodeRemediationApis "github.com/medik8s/self-node-remediation/api"
"github.com/medik8s/self-node-remediation/api/v1alpha1"
Expand All @@ -26,7 +25,6 @@ import (

const (
connectionTimeout = 5 * time.Second
machineAnnotation = "machine.openshift.io/machine" //todo this is openshift specific
//IMPORTANT! this MUST be less than PeerRequestTimeout in apicheck
//The difference between them should allow some time for sending the request over the network
//todo enforce this
Expand Down Expand Up @@ -124,7 +122,7 @@ func (s Server) IsHealthy(ctx context.Context, request *HealthRequest) (*HealthR
s.log.Info("checking health for", "node", nodeName)

namespace := s.snr.GetLastSeenSnrNamespace()
isMachine := s.snr.IsSnrMatchMachineName()
isMachine := len(request.GetMachineName()) > 0

// when namespace is empty, there wasn't a SNR yet, which also means that the node must be healthy
if namespace == "" {
Expand All @@ -140,39 +138,12 @@ func (s Server) IsHealthy(ctx context.Context, request *HealthRequest) (*HealthR
}

if isMachine {
return toResponse(s.isHealthyMachine(ctx, nodeName, namespace))
return toResponse(s.isHealthyBySnr(ctx, request.MachineName, namespace))
mshitrit marked this conversation as resolved.
Show resolved Hide resolved
} else {
return toResponse(s.isHealthyNode(ctx, nodeName, namespace))
return toResponse(s.isHealthyBySnr(ctx, nodeName, namespace))
}
}

func (s Server) isHealthyNode(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
return s.isHealthyBySnr(ctx, nodeName, namespace)
}

func (s Server) isHealthyMachine(ctx context.Context, nodeName string, namespace string) selfNodeRemediationApis.HealthCheckResponseCode {
node, err := s.getNode(ctx, nodeName)
if err != nil {
return selfNodeRemediationApis.ApiError
}

ann := node.GetAnnotations()
namespacedMachine, exists := ann[machineAnnotation]

if !exists {
s.log.Info("node doesn't have machine annotation")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
}
_, machineName, err := cache.SplitMetaNamespaceKey(namespacedMachine)

if err != nil {
s.log.Error(err, "failed to parse machine annotation on the node")
return selfNodeRemediationApis.Unhealthy //todo is this the correct response?
}

return s.isHealthyBySnr(ctx, machineName, namespace)
}

func (s Server) isHealthyBySnr(ctx context.Context, snrName string, snrNamespace string) selfNodeRemediationApis.HealthCheckResponseCode {
apiCtx, cancelFunc := context.WithTimeout(ctx, apiServerTimeout)
defer cancelFunc()
Expand Down