From ba39b7768862f753856fc5a28df7fb27af68dea4 Mon Sep 17 00:00:00 2001 From: vimystic <122659254+vimystic@users.noreply.github.com> Date: Tue, 2 Apr 2024 14:40:04 -0600 Subject: [PATCH] Update: use incluster client to get last line of node container logs --- api/v1/self_healing_types.go | 5 ++- controllers/selfhealing_controller.go | 32 ++++++++------ internal/fullnode/stuck_detection.go | 62 ++++++++++++++++++++++++++- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/api/v1/self_healing_types.go b/api/v1/self_healing_types.go index 31421a3e..cd6c34e2 100644 --- a/api/v1/self_healing_types.go +++ b/api/v1/self_healing_types.go @@ -26,7 +26,10 @@ type SelfHealSpec struct { // // +optional HeightDriftMitigation *HeightDriftMitigationSpec `json:"heightDriftMitigation"` - StuckPodMitigation *StuckPodMitigationSpec `json:"stuckPodMitigation"` + // Take action when a pod is stuck. + // + // +optional + StuckPodMitigation *StuckPodMitigationSpec `json:"stuckPodMitigation"` } type PVCAutoScaleSpec struct { diff --git a/controllers/selfhealing_controller.go b/controllers/selfhealing_controller.go index 532d8bde..e4ef0fd1 100644 --- a/controllers/selfhealing_controller.go +++ b/controllers/selfhealing_controller.go @@ -28,6 +28,7 @@ import ( "github.com/strangelove-ventures/cosmos-operator/internal/fullnode" "github.com/strangelove-ventures/cosmos-operator/internal/healthcheck" "github.com/strangelove-ventures/cosmos-operator/internal/kube" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -122,17 +123,7 @@ func (r *SelfHealingReconciler) mitigateHeightDrift(ctx context.Context, reporte } pods := r.driftDetector.LaggingPods(ctx, crd) - var deleted int - for _, pod := range pods { - // CosmosFullNodeController will detect missing pod and re-create it. - if err := r.Delete(ctx, pod); kube.IgnoreNotFound(err) != nil { - reporter.Error(err, "Failed to delete pod", "pod", pod.Name) - reporter.RecordError("HeightDriftMitigationDeletePod", err) - continue - } - reporter.Info("Deleted pod for meeting height drift threshold", "pod", pod.Name) - deleted++ - } + deleted := r.DeletePods(pods, "HeightDriftMitigationDeletePod", reporter, ctx) if deleted > 0 { msg := fmt.Sprintf("Height lagged behind by %d or more blocks; deleted pod(s)", crd.Spec.SelfHeal.HeightDriftMitigation.Threshold) reporter.RecordInfo("HeightDriftMitigation", msg) @@ -145,10 +136,25 @@ func (r *SelfHealingReconciler) mitigateStuckPods(ctx context.Context, reporter } pods := r.stuckDetector.StuckPods(ctx, crd) + deleted := r.DeletePods(pods, "StuckPodMitigationDeletePod", reporter, ctx) + if deleted > 0 { + msg := fmt.Sprintf("Stuck for %d seconds; deleted pod(s)", crd.Spec.SelfHeal.StuckPodMitigation.Threshold) + reporter.RecordInfo("StuckPodMitigation", msg) + } +} - if pods != nil { - fmt.Println(pods) +func (r *SelfHealingReconciler) DeletePods(pods []*v1.Pod, reason string, reporter kube.Reporter, ctx context.Context) int { + var deleted int + for _, pod := range pods { + if err := r.Delete(ctx, pod); kube.IgnoreNotFound(err) != nil { + reporter.Error(err, "Failed to delete pod", "pod", pod.Name) + reporter.RecordError(reason, err) + continue + } + reporter.Info("Deleted pod for ", reason, " pod:", pod.Name) + deleted++ } + return deleted } // SetupWithManager sets up the controller with the Manager. diff --git a/internal/fullnode/stuck_detection.go b/internal/fullnode/stuck_detection.go index 00da3d8f..78dd0de7 100644 --- a/internal/fullnode/stuck_detection.go +++ b/internal/fullnode/stuck_detection.go @@ -2,11 +2,18 @@ package fullnode import ( "context" + "fmt" + "io/ioutil" + "strings" "time" cosmosv1 "github.com/strangelove-ventures/cosmos-operator/api/v1" + "github.com/strangelove-ventures/cosmos-operator/internal/kube" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" ) type StuckPodDetection struct { @@ -15,8 +22,59 @@ type StuckPodDetection struct { computeRollout func(maxUnavail *intstr.IntOrString, desired, ready int) int } +func NewStuckDetection(collector StatusCollector) DriftDetection { + return DriftDetection{ + available: kube.AvailablePods, + collector: collector, + computeRollout: kube.ComputeRollout, + } +} + // StuckPods returns pods that are stuck on a block height due to a cometbft issue that manifests on sentries using horcrux. func (d StuckPodDetection) StuckPods(ctx context.Context, crd *cosmosv1.CosmosFullNode) []*corev1.Pod { - //TODO - return nil + + pods := d.collector.Collect(ctx, client.ObjectKeyFromObject(crd)).Synced().Pods() + + fmt.Println(pods[0]) + + config, err := rest.InClusterConfig() + if err != nil { + panic(err.Error()) + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + panic(err.Error()) + } + + getPodLogsLastLine(clientset, pods[0]) + + //MORE TODO HERE + + return []*corev1.Pod{} +} + +func getPodLogsLastLine(clientset *kubernetes.Clientset, pod *corev1.Pod) { + podLogOpts := corev1.PodLogOptions{} + logRequest := clientset.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOpts) + + logStream, err := logRequest.Stream(context.Background()) + if err != nil { + fmt.Printf("Error getting logs for pod %s: %v\n", pod.Name, err) + return + } + defer logStream.Close() + + logBytes, err := ioutil.ReadAll(logStream) + if err != nil { + fmt.Printf("Error reading logs for pod %s: %v\n", pod.Name, err) + return + } + + logLines := strings.Split(strings.TrimRight(string(logBytes), "\n"), "\n") + if len(logLines) > 0 { + fmt.Println("Last line of logs for pod", pod.Name+":", logLines[len(logLines)-1]) + } else { + fmt.Println("No logs found for pod", pod.Name) + } }