From 4e58ea9a6383594e7a961cd342672ebc76225791 Mon Sep 17 00:00:00 2001 From: Xiao Liu <46879761+liangzai006@users.noreply.github.com> Date: Mon, 23 Oct 2023 15:28:15 +0800 Subject: [PATCH] add inspection rule specified node --- apis/kubeeye/v1alpha2/inspectplan_types.go | 17 +- apis/kubeeye/v1alpha2/inspectrule_types.go | 3 +- apis/kubeeye/v1alpha2/inspecttask_types.go | 14 +- .../kubeeye/v1alpha2/zz_generated.deepcopy.go | 34 +- .../kubeeye.kubesphere.io_inspectplans.yaml | 13 +- .../kubeeye.kubesphere.io_inspecttasks.yaml | 11 +- deploy/kubeeye_message_secret.yaml | 0 deploy/kubeeye_v1alpha2_inspectplan.yaml | 6 +- deploy/rule/kubeeye_v1alpha2_systemd.yaml | 5 +- pkg/conf/conf.go | 30 +- pkg/controllers/inspectplan_controller.go | 34 +- pkg/controllers/inspectresult_controller.go | 24 +- pkg/controllers/inspectrules_controller.go | 2 +- pkg/controllers/inspecttask_controller.go | 77 ++-- pkg/inspect/command_inspect.go | 20 +- pkg/inspect/file_change_Inspect.go | 10 +- pkg/inspect/file_filter_inspect.go | 7 +- pkg/inspect/nodeinfo_inspect.go | 27 +- pkg/inspect/prometheus_Inspect.go | 12 +- pkg/inspect/sysctl_inspect.go | 25 +- pkg/inspect/systemd_inspect.go | 24 +- pkg/message/alarm_message.go | 7 +- pkg/message/conf/message_conf.go | 16 - pkg/message/email_message.go | 45 +++ pkg/message/message.go | 2 +- pkg/rules/rules.go | 375 +++++++++++++----- pkg/server/query/query.go | 2 +- pkg/template/job_template.go | 8 + pkg/utils/utils.go | 22 +- 29 files changed, 591 insertions(+), 281 deletions(-) create mode 100644 deploy/kubeeye_message_secret.yaml delete mode 100644 pkg/message/conf/message_conf.go create mode 100644 pkg/message/email_message.go diff --git a/apis/kubeeye/v1alpha2/inspectplan_types.go b/apis/kubeeye/v1alpha2/inspectplan_types.go index f30b9e20..16d0865d 100644 --- a/apis/kubeeye/v1alpha2/inspectplan_types.go +++ b/apis/kubeeye/v1alpha2/inspectplan_types.go @@ -28,15 +28,14 @@ type InspectPlanSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Schedule *string `json:"schedule,omitempty"` - Suspend bool `json:"suspend,omitempty"` - Timeout string `json:"timeout,omitempty"` - RuleGroup string `json:"ruleGroup,omitempty"` - RuleNames []string `json:"ruleNames,omitempty"` - MaxTasks int `json:"maxTasks,omitempty"` - ClusterName []Cluster `json:"clusterName,omitempty"` - KubeConfig string `json:"kubeConfig,omitempty"` - Once *metav1.Time `json:"one,omitempty"` + Schedule *string `json:"schedule,omitempty"` + Suspend bool `json:"suspend,omitempty"` + Timeout string `json:"timeout,omitempty"` + RuleNames []InspectRuleNames `json:"ruleNames,omitempty"` + MaxTasks int `json:"maxTasks,omitempty"` + ClusterName []Cluster `json:"clusterName,omitempty"` + KubeConfig string `json:"kubeConfig,omitempty"` + Once *metav1.Time `json:"one,omitempty"` } type TaskNames struct { diff --git a/apis/kubeeye/v1alpha2/inspectrule_types.go b/apis/kubeeye/v1alpha2/inspectrule_types.go index 7d1d62a4..677b4abd 100644 --- a/apis/kubeeye/v1alpha2/inspectrule_types.go +++ b/apis/kubeeye/v1alpha2/inspectrule_types.go @@ -79,8 +79,7 @@ type OpaRule struct { } type PrometheusRule struct { RuleItemBases `json:",inline"` - Endpoint string `json:"endpoint,omitempty"` - SpecialRule *string `json:"specialRule,omitempty"` + Endpoint string `json:"endpoint,omitempty"` } type FileChangeRule struct { diff --git a/apis/kubeeye/v1alpha2/inspecttask_types.go b/apis/kubeeye/v1alpha2/inspecttask_types.go index 26efbfc7..94d9e024 100644 --- a/apis/kubeeye/v1alpha2/inspecttask_types.go +++ b/apis/kubeeye/v1alpha2/inspecttask_types.go @@ -28,10 +28,10 @@ type InspectTaskSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - ClusterName []Cluster `json:"clusterName,omitempty"` - RuleNames []string `json:"ruleNames,omitempty"` - Timeout string `json:"timeout,omitempty"` - InspectPolicy Policy `json:"inspectPolicy,omitempty"` + ClusterName []Cluster `json:"clusterName,omitempty"` + RuleNames []InspectRuleNames `json:"ruleNames,omitempty"` + Timeout string `json:"timeout,omitempty"` + InspectPolicy Policy `json:"inspectPolicy,omitempty"` } // InspectTaskStatus defines the observed state of InspectTask @@ -48,6 +48,12 @@ type InspectTaskStatus struct { InspectRuleType []string `json:"inspectRuleType,omitempty" yaml:"inspectRuleType"` } +type InspectRuleNames struct { + Name string `json:"name,omitempty"` + NodeName string `json:"nodeName,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` +} + type JobPhase struct { JobName string `json:"jobName,omitempty"` Phase Phase `json:"phase,omitempty"` diff --git a/apis/kubeeye/v1alpha2/zz_generated.deepcopy.go b/apis/kubeeye/v1alpha2/zz_generated.deepcopy.go index 5a52dbbb..97615eaa 100644 --- a/apis/kubeeye/v1alpha2/zz_generated.deepcopy.go +++ b/apis/kubeeye/v1alpha2/zz_generated.deepcopy.go @@ -285,8 +285,10 @@ func (in *InspectPlanSpec) DeepCopyInto(out *InspectPlanSpec) { } if in.RuleNames != nil { in, out := &in.RuleNames, &out.RuleNames - *out = make([]string, len(*in)) - copy(*out, *in) + *out = make([]InspectRuleNames, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } if in.ClusterName != nil { in, out := &in.ClusterName, &out.ClusterName @@ -565,6 +567,28 @@ func (in *InspectRuleList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InspectRuleNames) DeepCopyInto(out *InspectRuleNames) { + *out = *in + if in.NodeSelector != nil { + in, out := &in.NodeSelector, &out.NodeSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InspectRuleNames. +func (in *InspectRuleNames) DeepCopy() *InspectRuleNames { + if in == nil { + return nil + } + out := new(InspectRuleNames) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InspectRuleSpec) DeepCopyInto(out *InspectRuleSpec) { *out = *in @@ -748,8 +772,10 @@ func (in *InspectTaskSpec) DeepCopyInto(out *InspectTaskSpec) { } if in.RuleNames != nil { in, out := &in.RuleNames, &out.RuleNames - *out = make([]string, len(*in)) - copy(*out, *in) + *out = make([]InspectRuleNames, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } } diff --git a/config/crd/bases/kubeeye.kubesphere.io_inspectplans.yaml b/config/crd/bases/kubeeye.kubesphere.io_inspectplans.yaml index d8a82692..f30c671f 100644 --- a/config/crd/bases/kubeeye.kubesphere.io_inspectplans.yaml +++ b/config/crd/bases/kubeeye.kubesphere.io_inspectplans.yaml @@ -51,11 +51,18 @@ spec: one: format: date-time type: string - ruleGroup: - type: string ruleNames: items: - type: string + properties: + name: + type: string + nodeName: + type: string + nodeSelector: + additionalProperties: + type: string + type: object + type: object type: array schedule: type: string diff --git a/config/crd/bases/kubeeye.kubesphere.io_inspecttasks.yaml b/config/crd/bases/kubeeye.kubesphere.io_inspecttasks.yaml index b61d2343..8988ade7 100644 --- a/config/crd/bases/kubeeye.kubesphere.io_inspecttasks.yaml +++ b/config/crd/bases/kubeeye.kubesphere.io_inspecttasks.yaml @@ -48,7 +48,16 @@ spec: type: string ruleNames: items: - type: string + properties: + name: + type: string + nodeName: + type: string + nodeSelector: + additionalProperties: + type: string + type: object + type: object type: array timeout: type: string diff --git a/deploy/kubeeye_message_secret.yaml b/deploy/kubeeye_message_secret.yaml new file mode 100644 index 00000000..e69de29b diff --git a/deploy/kubeeye_v1alpha2_inspectplan.yaml b/deploy/kubeeye_v1alpha2_inspectplan.yaml index b424c509..59f28cb1 100644 --- a/deploy/kubeeye_v1alpha2_inspectplan.yaml +++ b/deploy/kubeeye_v1alpha2_inspectplan.yaml @@ -5,4 +5,8 @@ metadata: spec: maxTasks: 10 ruleNames: - - inspect-rule-namespace \ No newline at end of file + - name: inspect-rule-namespace + nodeSelector: + node-role.kubernetes.io/master: "" + - name: inspect-rule-systemd + nodeName: node1 \ No newline at end of file diff --git a/deploy/rule/kubeeye_v1alpha2_systemd.yaml b/deploy/rule/kubeeye_v1alpha2_systemd.yaml index ae9cb12d..240855ab 100644 --- a/deploy/rule/kubeeye_v1alpha2_systemd.yaml +++ b/deploy/rule/kubeeye_v1alpha2_systemd.yaml @@ -16,5 +16,8 @@ spec: rule: docker == "active" - name: etcd rule: etcd == "active" + nodeSelector: + node-role.kubernetes.io/master: "" - name: kubelet - rule: kubelet == "active" \ No newline at end of file + rule: kubelet == "active" + nodeName: master \ No newline at end of file diff --git a/pkg/conf/conf.go b/pkg/conf/conf.go index b57e5939..ec0d9fed 100644 --- a/pkg/conf/conf.go +++ b/pkg/conf/conf.go @@ -2,6 +2,7 @@ package conf import ( corev1 "k8s.io/api/core/v1" + "time" ) const ( @@ -40,12 +41,28 @@ type MessageType string const ( AlarmMessage MessageType = "alarm" + EmailMessage MessageType = "email" +) + +type Mode string + +const ( + CompleteMode Mode = "complete" + AbnormalMode Mode = "abnormal" ) type MessageConfig struct { Enable bool `json:"enable,omitempty"` Type MessageType `json:"type,omitempty"` - Url string `json:"url,omitempty"` + Mode Mode `json:"mode,omitempty"` + Email EmailConfig `json:"email,omitempty"` +} +type EmailConfig struct { + Address string `json:"address,omitempty"` + Port int32 `json:"port,omitempty"` + Fo string `json:"form,omitempty"` + To []string `json:"to,omitempty"` + SecretKey string `json:"secretKey,omitempty"` } type JobConfig struct { @@ -79,3 +96,14 @@ func (j *JobConfig) DeepCopy() *JobConfig { *j2 = *j return j2 } + +type MessageEvent struct { + Content []byte + Target string + Sender string + Timestamp time.Time +} + +type EventHandler interface { + HandleMessageEvent(event *MessageEvent) +} diff --git a/pkg/controllers/inspectplan_controller.go b/pkg/controllers/inspectplan_controller.go index 652d0ac5..9c5f7367 100644 --- a/pkg/controllers/inspectplan_controller.go +++ b/pkg/controllers/inspectplan_controller.go @@ -198,39 +198,39 @@ func nextScheduledTimeDuration(sched cron.Schedule, now *metav1.Time) *time.Dura return &nextTime } -func (r *InspectPlanReconciler) createInspectTask(inspectPlan *kubeeyev1alpha2.InspectPlan, ctx context.Context) (string, error) { +func (r *InspectPlanReconciler) createInspectTask(plan *kubeeyev1alpha2.InspectPlan, ctx context.Context) (string, error) { ownerController := true - r.removeTask(ctx, inspectPlan) + r.removeTask(ctx, plan) inspectTask := kubeeyev1alpha2.InspectTask{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", inspectPlan.Name, time.Now().Format("20060102-15-04")), - Labels: map[string]string{constant.LabelPlanName: inspectPlan.Name}, + Name: fmt.Sprintf("%s-%s", plan.Name, time.Now().Format("20060102-15-04")), + Labels: map[string]string{constant.LabelPlanName: plan.Name}, Annotations: map[string]string{constant.AnnotationInspectType: func() string { - if inspectPlan.Spec.Schedule == nil { + if plan.Spec.Schedule == nil { return string(kubeeyev1alpha2.InspectTypeInstant) } return string(kubeeyev1alpha2.InspectTypeTiming) }()}, OwnerReferences: []metav1.OwnerReference{{ - APIVersion: inspectPlan.APIVersion, - Kind: inspectPlan.Kind, - Name: inspectPlan.Name, - UID: inspectPlan.UID, + APIVersion: plan.APIVersion, + Kind: plan.Kind, + Name: plan.Name, + UID: plan.UID, Controller: &ownerController, BlockOwnerDeletion: &ownerController, }}, }, Spec: kubeeyev1alpha2.InspectTaskSpec{ - RuleNames: inspectPlan.Spec.RuleNames, - ClusterName: inspectPlan.Spec.ClusterName, + RuleNames: plan.Spec.RuleNames, + ClusterName: plan.Spec.ClusterName, Timeout: func() string { - if inspectPlan.Spec.Timeout == "" { + if plan.Spec.Timeout == "" { return "10m" } - return inspectPlan.Spec.Timeout + return plan.Spec.Timeout }(), InspectPolicy: func() kubeeyev1alpha2.Policy { - if inspectPlan.Spec.Once == nil && inspectPlan.Spec.Schedule != nil { + if plan.Spec.Once == nil && plan.Spec.Schedule != nil { return kubeeyev1alpha2.CyclePolicy } return kubeeyev1alpha2.SinglePolicy @@ -320,7 +320,7 @@ func ConvertTaskStatus(tasks []kubeeyev1alpha2.InspectTask) (taskStatus []kubeey func (r *InspectPlanReconciler) updateAddRuleReferNum(ctx context.Context, plan *kubeeyev1alpha2.InspectPlan) { for _, v := range plan.Spec.RuleNames { - rule, err := r.K8sClient.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, v, metav1.GetOptions{}) + rule, err := r.K8sClient.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, v.Name, metav1.GetOptions{}) if err != nil { klog.Error(err, "Failed to get inspectRules") continue @@ -344,7 +344,7 @@ func (r *InspectPlanReconciler) updateAddRuleReferNum(ctx context.Context, plan klog.Error(err, "Failed to update inspectRules") continue } - plan.Labels = utils.MergeMap(plan.Labels, map[string]string{fmt.Sprintf("%s/%s", "kubeeye.kubesphere.io", v): v}) + plan.Labels = utils.MergeMap(plan.Labels, map[string]string{fmt.Sprintf("%s/%s", "kubeeye.kubesphere.io", v.Name): v.Name}) } @@ -353,7 +353,7 @@ func (r *InspectPlanReconciler) updateAddRuleReferNum(ctx context.Context, plan func (r *InspectPlanReconciler) updateSubRuleReferNum(ctx context.Context, plan *kubeeyev1alpha2.InspectPlan) { for _, v := range plan.Spec.RuleNames { - rule, err := r.K8sClient.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, v, metav1.GetOptions{}) + rule, err := r.K8sClient.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, v.Name, metav1.GetOptions{}) if err != nil { klog.Error(err, "Failed to get inspectRules") continue diff --git a/pkg/controllers/inspectresult_controller.go b/pkg/controllers/inspectresult_controller.go index 72b7ae2c..c73c4059 100644 --- a/pkg/controllers/inspectresult_controller.go +++ b/pkg/controllers/inspectresult_controller.go @@ -21,10 +21,10 @@ import ( "context" "encoding/json" kubeeyev1alpha2 "github.com/kubesphere/kubeeye/apis/kubeeye/v1alpha2" + "github.com/kubesphere/kubeeye/pkg/conf" "github.com/kubesphere/kubeeye/pkg/constant" "github.com/kubesphere/kubeeye/pkg/kube" "github.com/kubesphere/kubeeye/pkg/message" - "github.com/kubesphere/kubeeye/pkg/message/conf" "github.com/kubesphere/kubeeye/pkg/output" "github.com/kubesphere/kubeeye/pkg/template" "github.com/kubesphere/kubeeye/pkg/utils" @@ -188,7 +188,7 @@ func (r *InspectResultReconciler) CountLevelNum(resultName string) (map[kubeeyev } func totalResultLevel(data interface{}, mapLevel map[kubeeyev1alpha2.Level]*int) { - maps, err := utils.StructToMap(data) + maps, err := utils.ArrayStructToArrayMap(data) if err != nil { return } @@ -242,19 +242,17 @@ func (r *InspectResultReconciler) SendMessage(ctx context.Context, name string) klog.Error("render html template error", err) return } - - if kc.Message.Url == "" { - klog.Error("message request url is empty") + var messageHandler conf.EventHandler + switch kc.Message.Type { + case conf.EmailMessage: + messageHandler = message.NewEmailMessageOptions(&kc.Message.Email, r.Client) + default: + klog.Error("unable identify send message type") return } - messageHandler := &message.AlarmMessageHandler{ - RequestUrl: kc.Message.Url, - } - event := &conf.MessageEvent{ - Content: data.String(), - } - dispatcher := message.RegisterHandler(messageHandler) - dispatcher.DispatchMessageEvent(event) + dispatcher.DispatchMessageEvent(&conf.MessageEvent{ + Content: data.Bytes(), + }) } diff --git a/pkg/controllers/inspectrules_controller.go b/pkg/controllers/inspectrules_controller.go index e930c083..2a92081b 100644 --- a/pkg/controllers/inspectrules_controller.go +++ b/pkg/controllers/inspectrules_controller.go @@ -147,7 +147,7 @@ func (r *InspectRulesReconciler) SetupWithManager(mgr ctrl.Manager) error { func ComputeLevel(data interface{}, mapLevel map[kubeeyev1alpha2.Level]*int) { - maps, err := utils.StructToMap(data) + maps, err := utils.ArrayStructToArrayMap(data) if err != nil { return } diff --git a/pkg/controllers/inspecttask_controller.go b/pkg/controllers/inspecttask_controller.go index 1cb3d344..a794207b 100644 --- a/pkg/controllers/inspecttask_controller.go +++ b/pkg/controllers/inspecttask_controller.go @@ -197,38 +197,20 @@ func (r *InspectTaskReconciler) Reconcile(ctx context.Context, req ctrl.Request) } -func createInspectRule(ctx context.Context, clients *kube.KubernetesClient, ruleGroup []kubeeyev1alpha2.JobRule, task *kubeeyev1alpha2.InspectTask) ([]kubeeyev1alpha2.JobRule, error) { - r := sortRuleOpaToAtLast(ruleGroup) - marshal, err := json.Marshal(r) - if err != nil { - return nil, err - } - - _, err = clients.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Get(ctx, task.Name, metav1.GetOptions{}) - if err == nil { - _ = clients.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Delete(ctx, task.Name, metav1.DeleteOptions{}) - } - // create temp inspect rule - configMapTemplate := template.BinaryConfigMapTemplate(task.Name, constant.DefaultNamespace, marshal, true, map[string]string{constant.LabelInspectRuleGroup: "inspect-rule-temp"}) - _, err = clients.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Create(ctx, configMapTemplate, metav1.CreateOptions{}) - if err != nil { - return nil, err - } - return r, nil -} - func (r *InspectTaskReconciler) CreateInspect(ctx context.Context, cluster kubeeyev1alpha2.Cluster, task *kubeeyev1alpha2.InspectTask, ruleLists []kubeeyev1alpha2.InspectRule, clients *kube.KubernetesClient, kubeEyeConfig conf.KubeEyeConfig) error { + e := rules.NewExecuteRuleOptions(clients, task) - inspectRule, inspectRuleNum, err := rules.ParseRules(ctx, clients, task.Name, ruleLists) + mergeRule, err := e.MergeRule(ruleLists) if err != nil { return err } - rule, err := createInspectRule(ctx, clients, inspectRule, task) + + createInspectRule, err := e.CreateInspectRule(ctx, e.GenerateJob(ctx, mergeRule)) if err != nil { return err } jobConfig := kubeEyeConfig.GetClusterJobConfig(cluster.Name) - JobPhase, err := r.createJobsInspect(ctx, task, clients, jobConfig, rule) + JobPhase, err := r.createJobsInspect(ctx, task, clients, jobConfig, createInspectRule) if err != nil { return err } @@ -236,32 +218,25 @@ func (r *InspectTaskReconciler) CreateInspect(ctx context.Context, cluster kubee task.Status.EndTimestamp = &metav1.Time{Time: time.Now()} task.Status.Duration = task.Status.EndTimestamp.Sub(task.Status.StartTimestamp.Time).String() task.Status.InspectRuleType = func() (data []string) { - for k, v := range inspectRuleNum { + for k, v := range e.GetRuleTotal() { if v > 0 { data = append(data, k) } } return data }() - err = r.getInspectResultData(ctx, clients, task, cluster, inspectRuleNum) + err = r.getInspectResultData(ctx, clients, task, cluster, e.GetRuleTotal()) if err != nil { return err } - return nil -} - -func sortRuleOpaToAtLast(rule []kubeeyev1alpha2.JobRule) []kubeeyev1alpha2.JobRule { - finds, b, OpaRule := utils.ArrayFinds(rule, func(i kubeeyev1alpha2.JobRule) bool { - return i.RuleType == constant.Opa - }) - if b { - rule = append(rule[:finds], rule[finds+1:]...) - rule = append(rule, OpaRule) + err = r.cleanConfig(ctx, clients, task) + if err != nil { + return err } - - return rule + return nil } + func GetStatus(task *kubeeyev1alpha2.InspectTask) kubeeyev1alpha2.Phase { if task.Status.JobPhase == nil { return kubeeyev1alpha2.PhaseFailed @@ -305,18 +280,18 @@ func (r *InspectTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (r *InspectTaskReconciler) createJobsInspect(ctx context.Context, inspectTask *kubeeyev1alpha2.InspectTask, clusterClient *kube.KubernetesClient, config *conf.JobConfig, inspectRule []kubeeyev1alpha2.JobRule) ([]kubeeyev1alpha2.JobPhase, error) { +func (r *InspectTaskReconciler) createJobsInspect(ctx context.Context, task *kubeeyev1alpha2.InspectTask, kubeClient *kube.KubernetesClient, config *conf.JobConfig, jobRules []kubeeyev1alpha2.JobRule) ([]kubeeyev1alpha2.JobPhase, error) { var jobNames []kubeeyev1alpha2.JobPhase - nodes := kube.GetNodes(ctx, clusterClient.ClientSet) - concurrency := 5 - runNumber := math.Round(float64(len(nodes)) + float64(len(inspectRule))*0.1) - if runNumber > 5 { + nodes := kube.GetNodes(ctx, kubeClient.ClientSet) + concurrency := 4 + runNumber := math.Round(float64(len(nodes)) + float64(len(jobRules))*0.1) + if runNumber > 4 { concurrency = int(runNumber) } var wg sync.WaitGroup var mutex sync.Mutex semaphore := make(chan struct{}, concurrency) - for _, rule := range inspectRule { + for _, rule := range jobRules { wg.Add(1) semaphore <- struct{}{} go func(v kubeeyev1alpha2.JobRule) { @@ -324,11 +299,11 @@ func (r *InspectTaskReconciler) createJobsInspect(ctx context.Context, inspectTa wg.Done() <-semaphore }() - if isTimeout(inspectTask.CreationTimestamp, inspectTask.Spec.Timeout) { + if isTimeout(task.CreationTimestamp, task.Spec.Timeout) { jobNames = append(jobNames, kubeeyev1alpha2.JobPhase{JobName: v.JobName, Phase: kubeeyev1alpha2.PhaseFailed}) return } - if err := isExistsJob(ctx, clusterClient, v.JobName); err != nil { + if err := isExistsJob(ctx, kubeClient, v.JobName); err != nil { mutex.Lock() jobNames = append(jobNames, kubeeyev1alpha2.JobPhase{JobName: v.JobName, Phase: kubeeyev1alpha2.PhaseSucceeded}) mutex.Unlock() @@ -337,13 +312,13 @@ func (r *InspectTaskReconciler) createJobsInspect(ctx context.Context, inspectTa inspectInterface, status := inspect.RuleOperatorMap[v.RuleType] if status { klog.Infof("Job %s created", v.JobName) - jobTask, err := inspectInterface.CreateJobTask(ctx, clusterClient, &v, inspectTask, config) + jobTask, err := inspectInterface.CreateJobTask(ctx, kubeClient, &v, task, config) if err != nil { klog.Errorf("create job error. error:%s", err) jobNames = append(jobNames, kubeeyev1alpha2.JobPhase{JobName: v.JobName, Phase: kubeeyev1alpha2.PhaseFailed}) return } - resultJob := r.waitForJobCompletionGetResult(ctx, clusterClient, v.JobName, jobTask, inspectTask.Spec.Timeout) + resultJob := r.waitForJobCompletionGetResult(ctx, kubeClient, v.JobName, jobTask, task.Spec.Timeout) mutex.Lock() jobNames = append(jobNames, *resultJob) mutex.Unlock() @@ -355,10 +330,6 @@ func (r *InspectTaskReconciler) createJobsInspect(ctx context.Context, inspectTa } wg.Wait() - err := r.cleanConfig(ctx, clusterClient, inspectTask) - if err != nil { - return nil, err - } return jobNames, nil } @@ -588,8 +559,8 @@ func (r *InspectTaskReconciler) updatePlanStatus(ctx context.Context, phase kube } func (r *InspectTaskReconciler) getRules(ctx context.Context, task *kubeeyev1alpha2.InspectTask) (rules []kubeeyev1alpha2.InspectRule) { - for _, name := range task.Spec.RuleNames { - rule, err := r.K8sClients.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, name, metav1.GetOptions{}) + for _, v := range task.Spec.RuleNames { + rule, err := r.K8sClients.VersionClientSet.KubeeyeV1alpha2().InspectRules().Get(ctx, v.Name, metav1.GetOptions{}) if err != nil { klog.Error(err, "get rule error") continue diff --git a/pkg/inspect/command_inspect.go b/pkg/inspect/command_inspect.go index 929a7107..4ee69c39 100644 --- a/pkg/inspect/command_inspect.go +++ b/pkg/inspect/command_inspect.go @@ -68,25 +68,29 @@ func (o *commandInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha2 ctl := kubeeyev1alpha2.CommandResultItem{ Name: r.Name, Command: r.Command, - Level: r.Level, } command := exec.Command("sh", "-c", r.Command) outputResult, err := command.Output() if err != nil { fmt.Println(err) ctl.Value = fmt.Sprintf("command execute failed, %s", err) + ctl.Level = r.Level + ctl.Assert = true continue } - if _, err = visitor.CheckRule(*r.Rule); err != nil { - ctl.Value = fmt.Sprintf("rule condition is not correct, %s", err) + + err, res := visitor.EventRuleEvaluate(map[string]interface{}{"result": string(outputResult)}, *r.Rule) + if err != nil { + ctl.Value = fmt.Sprintf("rule evaluate failed err:%s", err) + ctl.Level = r.Level + ctl.Assert = true } else { - err, res := visitor.EventRuleEvaluate(map[string]interface{}{"result": string(outputResult)}, *r.Rule) - if err != nil { - ctl.Value = fmt.Sprintf("rule evaluate failed err:%s", err) - } else { - ctl.Assert = res + if res { + ctl.Level = r.Level } + ctl.Assert = res } + commandResult = append(commandResult, ctl) } } diff --git a/pkg/inspect/file_change_Inspect.go b/pkg/inspect/file_change_Inspect.go index be5110d9..9f830cef 100644 --- a/pkg/inspect/file_change_Inspect.go +++ b/pkg/inspect/file_change_Inspect.go @@ -75,13 +75,13 @@ func (o *fileChangeInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp resultItem := kubeeyev1alpha2.FileChangeResultItem{ FileName: file.Name, Path: file.Path, - Level: file.Level, } baseFile, fileErr := os.ReadFile(path.Join(constant.RootPathPrefix, file.Path)) if fileErr != nil { klog.Errorf("Failed to open base file path:%s,error:%s", baseFile, fileErr) resultItem.Issues = []string{fmt.Sprintf("%s:The file does not exist", file.Name)} + resultItem.Level = file.Level fileResults = append(fileResults, resultItem) continue } @@ -94,8 +94,9 @@ func (o *fileChangeInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp _, createErr := clients.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Create(ctx, mapTemplate, metav1.CreateOptions{}) if createErr != nil { resultItem.Issues = []string{fmt.Sprintf("%s:create configMap failed", file.Name)} + resultItem.Level = file.Level + fileResults = append(fileResults, resultItem) } - fileResults = append(fileResults, resultItem) continue } } @@ -109,6 +110,9 @@ func (o *fileChangeInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp diffResult[i] = strings.ReplaceAll(diffResult[i], "\x1b[0m", "") } resultItem.Issues = diffResult + if len(resultItem.Issues) > 0 { + resultItem.Level = file.Level + } fileResults = append(fileResults, resultItem) } @@ -132,7 +136,7 @@ func (o *fileChangeInspect) GetResult(runNodeName string, resultCm *corev1.Confi for i := range fileChangeResult { fileChangeResult[i].NodeName = runNodeName } - resultCr.Spec.FileChangeResult = fileChangeResult + resultCr.Spec.FileChangeResult = append(resultCr.Spec.FileChangeResult, fileChangeResult...) return resultCr, nil } diff --git a/pkg/inspect/file_filter_inspect.go b/pkg/inspect/file_filter_inspect.go index 6701ff54..758667e6 100644 --- a/pkg/inspect/file_filter_inspect.go +++ b/pkg/inspect/file_filter_inspect.go @@ -73,11 +73,11 @@ func (o *fileFilterInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp filterR := kubeeyev1alpha2.FileChangeResultItem{ FileName: rule.Name, Path: rule.Path, - Level: rule.Level, } if err != nil { klog.Errorf(" Failed to open file . err:%s", err) filterR.Issues = append(filterR.Issues, fmt.Sprintf("Failed to open file for %s.", rule.Name)) + filterR.Level = rule.Level filterResult = append(filterResult, filterR) continue } @@ -93,6 +93,9 @@ func (o *fileFilterInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp filterR.Issues = append(filterR.Issues, reader.Text()) } } + if len(filterR.Issues) > 0 { + filterR.Level = rule.Level + } filterResult = append(filterResult, filterR) } } @@ -117,7 +120,7 @@ func (o *fileFilterInspect) GetResult(runNodeName string, resultCm *corev1.Confi for i := range fileFilterResult { fileFilterResult[i].NodeName = runNodeName } - resultCr.Spec.FileFilterResult = fileFilterResult + resultCr.Spec.FileFilterResult = append(resultCr.Spec.FileFilterResult, fileFilterResult...) return resultCr, nil } diff --git a/pkg/inspect/nodeinfo_inspect.go b/pkg/inspect/nodeinfo_inspect.go index 3fd3e6ed..93211d86 100644 --- a/pkg/inspect/nodeinfo_inspect.go +++ b/pkg/inspect/nodeinfo_inspect.go @@ -17,6 +17,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "path" "math" "strings" @@ -107,16 +108,30 @@ func (o *nodeInfoInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha err, ok = visitor.EventRuleEvaluate(map[string]interface{}{constant.Filesystem: storage}, *info.Rule) if err != nil { resultItem.Value = err.Error() + resultItem.Assert = true + resultItem.Level = info.Level + } else { + if ok { + resultItem.Level = info.Level + } + resultItem.Assert = ok } - resultItem.Assert = ok + resultItem.FileSystem.Type = constant.Filesystem nodeInfoResult = append(nodeInfoResult, resultItem) resultItem.Value = fmt.Sprintf("%.0f%%", inode) err, ok = visitor.EventRuleEvaluate(map[string]interface{}{constant.Inode: inode}, *info.Rule) if err != nil { resultItem.Value = err.Error() + resultItem.Assert = true + resultItem.Level = info.Level + } else { + if ok { + resultItem.Level = info.Level + } + resultItem.Assert = ok } - resultItem.Assert = ok + resultItem.FileSystem.Type = constant.Inode nodeInfoResult = append(nodeInfoResult, resultItem) } @@ -133,7 +148,11 @@ func (o *nodeInfoInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha resultItem.Value = err.Error() } } + if ok { + resultItem.Level = info.Level + } resultItem.Assert = ok + nodeInfoResult = append(nodeInfoResult, resultItem) } } @@ -158,7 +177,7 @@ func (o *nodeInfoInspect) GetResult(runNodeName string, resultCm *corev1.ConfigM for i := range nodeInfoResult { nodeInfoResult[i].NodeName = runNodeName } - resultCr.Spec.NodeInfo = nodeInfoResult + resultCr.Spec.NodeInfo = append(resultCr.Spec.NodeInfo, nodeInfoResult...) return resultCr, nil } @@ -206,7 +225,7 @@ func GetLoadAvg(fs procfs.FS) (float64, float64, float64) { } func GetFileSystem(p string) (float64, float64) { u := new(unix.Statfs_t) - err := unix.Statfs(constant.RootPathPrefix, u) + err := unix.Statfs(path.Join(constant.RootPathPrefix, p), u) if err != nil { klog.Error("failed to get filesystem info") return 0, 0 diff --git a/pkg/inspect/prometheus_Inspect.go b/pkg/inspect/prometheus_Inspect.go index f182e6cb..db669eab 100644 --- a/pkg/inspect/prometheus_Inspect.go +++ b/pkg/inspect/prometheus_Inspect.go @@ -84,14 +84,12 @@ func (o *prometheusInspect) RunInspect(ctx context.Context, rules []kubeeyev1alp continue } for _, result := range queryResults { - pR := kubeeyev1alpha2.PrometheusResult{ + + proRuleResult = append(proRuleResult, kubeeyev1alpha2.PrometheusResult{ Result: toString(result), - } - if proRule.SpecialRule == nil { - pR.Assert = true - pR.Level = proRule.Level - } - proRuleResult = append(proRuleResult, pR) + Assert: true, + Level: proRule.Level, + }) } diff --git a/pkg/inspect/sysctl_inspect.go b/pkg/inspect/sysctl_inspect.go index 0dd30c98..fe33c9c8 100644 --- a/pkg/inspect/sysctl_inspect.go +++ b/pkg/inspect/sysctl_inspect.go @@ -77,31 +77,28 @@ func (o *sysctlInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha2. ctlRule, err := fs.SysctlStrings(sysRule.Name) klog.Infof("name:%s,value:%s", sysRule.Name, ctlRule) ctl := kubeeyev1alpha2.NodeMetricsResultItem{ - Name: sysRule.Name, - Level: sysRule.Level, + Name: sysRule.Name, } if err != nil { errVal := fmt.Sprintf("name:%s to does not exist", sysRule.Name) ctl.Value = &errVal + ctl.Level = sysRule.Level ctl.Assert = true } else { val := parseSysctlVal(ctlRule) ctl.Value = &val if sysRule.Rule != nil { - if _, err := visitor.CheckRule(*sysRule.Rule); err != nil { - checkErr := fmt.Sprintf("rule condition is not correct, %s", err.Error()) - ctl.Value = &checkErr + err, res := visitor.EventRuleEvaluate(map[string]interface{}{sysRule.Name: val}, *sysRule.Rule) + if err != nil { + evalErr := fmt.Sprintf("event rule evaluate to failed err:%s", err) ctl.Assert = true + ctl.Value = &evalErr + ctl.Level = sysRule.Level } else { - err, res := visitor.EventRuleEvaluate(map[string]interface{}{sysRule.Name: val}, *sysRule.Rule) - if err != nil { - evalErr := fmt.Sprintf("event rule evaluate to failed err:%s", err) - ctl.Assert = true - ctl.Value = &evalErr - } else { - ctl.Assert = !res + if !res { + ctl.Level = sysRule.Level } - + ctl.Assert = !res } } @@ -132,7 +129,7 @@ func (o *sysctlInspect) GetResult(runNodeName string, resultCm *corev1.ConfigMap SysctlResult[i].NodeName = runNodeName } - resultCr.Spec.SysctlResult = SysctlResult + resultCr.Spec.SysctlResult = append(resultCr.Spec.SysctlResult, SysctlResult...) return resultCr, nil } diff --git a/pkg/inspect/systemd_inspect.go b/pkg/inspect/systemd_inspect.go index d478114c..02a94460 100644 --- a/pkg/inspect/systemd_inspect.go +++ b/pkg/inspect/systemd_inspect.go @@ -77,26 +77,23 @@ func (o *systemdInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha2 } for _, r := range systemd { ctl := kubeeyev1alpha2.NodeMetricsResultItem{ - Name: r.Name, - Level: r.Level, + Name: r.Name, } for _, status := range unitsContext { if status.Name == fmt.Sprintf("%s.service", r.Name) { ctl.Value = &status.ActiveState - if r.Rule != nil { - if _, err := visitor.CheckRule(*r.Rule); err != nil { - sprintf := fmt.Sprintf("rule condition is not correct, %s", err.Error()) - klog.Error(sprintf) + err, res := visitor.EventRuleEvaluate(map[string]interface{}{r.Name: status.ActiveState}, *r.Rule) + if err != nil { + sprintf := fmt.Sprintf("err:%s", err.Error()) ctl.Value = &sprintf + ctl.Assert = true + ctl.Level = r.Level } else { - err, res := visitor.EventRuleEvaluate(map[string]interface{}{r.Name: status.ActiveState}, *r.Rule) - if err != nil { - sprintf := fmt.Sprintf("err:%s", err.Error()) - ctl.Value = &sprintf - } else { - ctl.Assert = !res + if !res { + ctl.Level = r.Level } + ctl.Assert = !res } } break @@ -105,6 +102,7 @@ func (o *systemdInspect) RunInspect(ctx context.Context, rules []kubeeyev1alpha2 if ctl.Value == nil { errVal := fmt.Sprintf("name:%s to does not exist", r.Name) ctl.Assert = true + ctl.Level = r.Level ctl.Value = &errVal } nodeResult = append(nodeResult, ctl) @@ -131,7 +129,7 @@ func (o *systemdInspect) GetResult(runNodeName string, resultCm *corev1.ConfigMa for i := range systemdResult { systemdResult[i].NodeName = runNodeName } - resultCr.Spec.SystemdResult = systemdResult + resultCr.Spec.SystemdResult = append(resultCr.Spec.SystemdResult, systemdResult...) return resultCr, nil } diff --git a/pkg/message/alarm_message.go b/pkg/message/alarm_message.go index cebca16d..a6f44dad 100644 --- a/pkg/message/alarm_message.go +++ b/pkg/message/alarm_message.go @@ -1,12 +1,13 @@ package message import ( + "bytes" "fmt" - "github.com/kubesphere/kubeeye/pkg/message/conf" + "github.com/kubesphere/kubeeye/pkg/conf" + "io" "k8s.io/klog/v2" "net/http" - "strings" ) type AlarmMessageHandler struct { @@ -19,7 +20,7 @@ func (h *AlarmMessageHandler) HandleMessageEvent(event *conf.MessageEvent) { // 例如,发送消息给目标 fmt.Printf("Message sent to %s by %s: %s\n", event.Target, event.Sender, event.Content) - resp, err := http.Post(h.RequestUrl, "application/json", strings.NewReader(event.Content)) + resp, err := http.Post(h.RequestUrl, "application/json", bytes.NewReader(event.Content)) if err != nil { klog.Error(err) return diff --git a/pkg/message/conf/message_conf.go b/pkg/message/conf/message_conf.go deleted file mode 100644 index d71a0bed..00000000 --- a/pkg/message/conf/message_conf.go +++ /dev/null @@ -1,16 +0,0 @@ -package conf - -import ( - "time" -) - -type MessageEvent struct { - Content string - Target string - Sender string - Timestamp time.Time -} - -type EventHandler interface { - HandleMessageEvent(event *MessageEvent) -} diff --git a/pkg/message/email_message.go b/pkg/message/email_message.go new file mode 100644 index 00000000..99e77b9f --- /dev/null +++ b/pkg/message/email_message.go @@ -0,0 +1,45 @@ +package message + +import ( + "context" + "github.com/kubesphere/kubeeye/pkg/conf" + "github.com/kubesphere/kubeeye/pkg/constant" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "net/smtp" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type EmailMessageHandler struct { + // 可以添加处理器需要的属性 + *conf.EmailConfig + client.Client +} + +func NewEmailMessageOptions(event *conf.EmailConfig, c client.Client) *EmailMessageHandler { + return &EmailMessageHandler{ + EmailConfig: event, + Client: c, + } +} + +func (e *EmailMessageHandler) HandleMessageEvent(event *conf.MessageEvent) { + // 执行消息发送操作 + // 例如,发送消息给目标 + if e != nil { + return + } + var secret corev1.Secret + err := e.Client.Get(context.TODO(), types.NamespacedName{ + Namespace: constant.DefaultNamespace, + Name: e.SecretKey, + }, &secret) + u := secret.StringData["username"] + p := secret.StringData["password"] + auth := smtp.PlainAuth("", u, p, e.Address) + err = smtp.SendMail(e.Address, auth, e.Fo, e.To, event.Content) + if err != nil { + klog.Error("send email failed, err: ", err) + } +} diff --git a/pkg/message/message.go b/pkg/message/message.go index 0eddcd43..b8338472 100644 --- a/pkg/message/message.go +++ b/pkg/message/message.go @@ -1,6 +1,6 @@ package message -import "github.com/kubesphere/kubeeye/pkg/message/conf" +import "github.com/kubesphere/kubeeye/pkg/conf" type EventDispatcher struct { handlers conf.EventHandler diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index bdd90873..873fdbe1 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -8,6 +8,7 @@ import ( "github.com/kubesphere/kubeeye/clients/clientset/versioned" "github.com/kubesphere/kubeeye/pkg/constant" "github.com/kubesphere/kubeeye/pkg/kube" + "github.com/kubesphere/kubeeye/pkg/template" "github.com/kubesphere/kubeeye/pkg/utils" corev1 "k8s.io/api/core/v1" kubeErr "k8s.io/apimachinery/pkg/api/errors" @@ -32,83 +33,83 @@ func GetRules(ctx context.Context, task types.NamespacedName, client versioned.I return nil } -func MergeRule(rules ...kubeeyev1alpha2.InspectRule) (*kubeeyev1alpha2.InspectRuleSpec, error) { +func MergeRule(task *kubeeyev1alpha2.InspectTask, rules ...kubeeyev1alpha2.InspectRule) (*kubeeyev1alpha2.InspectRuleSpec, error) { ruleSpec := &kubeeyev1alpha2.InspectRuleSpec{} - for _, rule := range rules { - if rule.Spec.Opas != nil { - opas, err := RuleArrayDeduplication[kubeeyev1alpha2.OpaRule](append(ruleSpec.Opas, rule.Spec.Opas...)) - if err != nil { - return nil, err - } - ruleSpec.Opas = opas - } - if rule.Spec.Prometheus != nil { - for _, pro := range rule.Spec.Prometheus { - if "" != rule.Spec.PrometheusEndpoint && len(rule.Spec.PrometheusEndpoint) > 0 { - pro.Endpoint = rule.Spec.PrometheusEndpoint - } - _, b, _ := utils.ArrayFinds(ruleSpec.Prometheus, func(m kubeeyev1alpha2.PrometheusRule) bool { - return m.Name == pro.Name - }) - if !b { - ruleSpec.Prometheus = append(ruleSpec.Prometheus, pro) - } - } - } - if rule.Spec.FileChange != nil && len(rule.Spec.FileChange) > 0 { - fileChange, err := RuleArrayDeduplication[kubeeyev1alpha2.FileChangeRule](append(ruleSpec.FileChange, rule.Spec.FileChange...)) - if err != nil { - return nil, err - } - ruleSpec.FileChange = fileChange - } - if rule.Spec.Sysctl != nil { - sysctl, err := RuleArrayDeduplication[kubeeyev1alpha2.SysRule](append(ruleSpec.Sysctl, rule.Spec.Sysctl...)) - if err != nil { - return nil, err - } - ruleSpec.Sysctl = sysctl - } - if rule.Spec.NodeInfo != nil { - - nodeInfo, err := RuleArrayDeduplication[kubeeyev1alpha2.NodeInfo](append(ruleSpec.NodeInfo, rule.Spec.NodeInfo...)) - if err != nil { - return nil, err - } - ruleSpec.NodeInfo = nodeInfo - } - if rule.Spec.Systemd != nil { - - systemd, err := RuleArrayDeduplication[kubeeyev1alpha2.SysRule](append(ruleSpec.Systemd, rule.Spec.Systemd...)) - if err != nil { - return nil, err - } - ruleSpec.Systemd = systemd - } - if rule.Spec.FileFilter != nil { - fileFilter, err := RuleArrayDeduplication[kubeeyev1alpha2.FileFilterRule](append(ruleSpec.FileFilter, rule.Spec.FileFilter...)) - if err != nil { - return nil, err - } - ruleSpec.FileFilter = fileFilter - } - if rule.Spec.CustomCommand != nil { - command, err := RuleArrayDeduplication[kubeeyev1alpha2.CustomCommandRule](append(ruleSpec.CustomCommand, rule.Spec.CustomCommand...)) - if err != nil { - return nil, err - } - ruleSpec.CustomCommand = command - } - - ruleSpec.Component = rule.Spec.Component - } + //for _, rule := range rules { + // if rule.Spec.Opas != nil { + // opas, err := RuleArrayDeduplication[kubeeyev1alpha2.OpaRule](append(ruleSpec.Opas, rule.Spec.Opas...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.Opas = opas + // } + // if rule.Spec.Prometheus != nil { + // for _, pro := range rule.Spec.Prometheus { + // if "" != rule.Spec.PrometheusEndpoint && len(rule.Spec.PrometheusEndpoint) > 0 { + // pro.Endpoint = rule.Spec.PrometheusEndpoint + // } + // _, b, _ := utils.ArrayFinds(ruleSpec.Prometheus, func(m kubeeyev1alpha2.PrometheusRule) bool { + // return m.Name == pro.Name + // }) + // if !b { + // ruleSpec.Prometheus = append(ruleSpec.Prometheus, pro) + // } + // } + // } + // if rule.Spec.FileChange != nil && len(rule.Spec.FileChange) > 0 { + // fileChange, err := RuleArrayDeduplication[kubeeyev1alpha2.FileChangeRule](append(ruleSpec.FileChange, rule.Spec.FileChange...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.FileChange = fileChange + // } + // if rule.Spec.Sysctl != nil { + // sysctl, err := RuleArrayDeduplication[kubeeyev1alpha2.SysRule](append(ruleSpec.Sysctl, rule.Spec.Sysctl...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.Sysctl = sysctl + // } + // if rule.Spec.NodeInfo != nil { + // + // nodeInfo, err := RuleArrayDeduplication[kubeeyev1alpha2.NodeInfo](append(ruleSpec.NodeInfo, rule.Spec.NodeInfo...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.NodeInfo = nodeInfo + // } + // if rule.Spec.Systemd != nil { + // + // systemd, err := RuleArrayDeduplication[kubeeyev1alpha2.SysRule](append(ruleSpec.Systemd, rule.Spec.Systemd...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.Systemd = systemd + // } + // if rule.Spec.FileFilter != nil { + // fileFilter, err := RuleArrayDeduplication[kubeeyev1alpha2.FileFilterRule](append(ruleSpec.FileFilter, rule.Spec.FileFilter...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.FileFilter = fileFilter + // } + // if rule.Spec.CustomCommand != nil { + // command, err := RuleArrayDeduplication[kubeeyev1alpha2.CustomCommandRule](append(ruleSpec.CustomCommand, rule.Spec.CustomCommand...)) + // if err != nil { + // return nil, err + // } + // ruleSpec.CustomCommand = command + // } + // + // ruleSpec.Component = rule.Spec.Component + //} return ruleSpec, nil } -func RuleArrayDeduplication[T any](obj interface{}) ([]T, error) { - maps, err := utils.StructToMap(obj) +func RuleArrayDeduplication[T any](obj interface{}) []T { + maps, err := utils.ArrayStructToArrayMap(obj) if err != nil { - return nil, err + return nil } var newMaps []map[string]interface{} for _, m := range maps { @@ -119,13 +120,13 @@ func RuleArrayDeduplication[T any](obj interface{}) ([]T, error) { newMaps = append(newMaps, m) } } - toStruct := utils.MapToStruct[T](newMaps...) - return toStruct, nil + return utils.MapToStruct[T](newMaps...) + } func Allocation(rule interface{}, taskName string, ruleType string) (*kubeeyev1alpha2.JobRule, error) { - toMap, err := utils.StructToMap(rule) + toMap, err := utils.ArrayStructToArrayMap(rule) if err != nil { klog.Errorf("Failed to convert rule to map. err:%s", err) return nil, err @@ -148,7 +149,7 @@ func Allocation(rule interface{}, taskName string, ruleType string) (*kubeeyev1a func AllocationRule(rule interface{}, taskName string, allNode []corev1.Node, ctlOrTem string) ([]kubeeyev1alpha2.JobRule, error) { - toMap, err := utils.StructToMap(rule) + toMap, err := utils.ArrayStructToArrayMap(rule) if err != nil { klog.Errorf("Failed to convert rule to map. err:%s", err) return nil, err @@ -165,12 +166,12 @@ func AllocationRule(rule interface{}, taskName string, allNode []corev1.Node, ct JobName: fmt.Sprintf("%s-%s-%s", taskName, ctlOrTem, rand.String(5)), RuleType: ctlOrTem, } - fileChange, err := json.Marshal(v) + jobRule.RunRule, err = json.Marshal(v) if err != nil { klog.Errorf("Failed to marshal fileChange rule. err:%s", err) return nil, err } - jobRule.RunRule = fileChange + jobRules = append(jobRules, jobRule) } @@ -184,12 +185,12 @@ func AllocationRule(rule interface{}, taskName string, allNode []corev1.Node, ct for i := range filterData { filterData[i]["nodeName"] = &item.Name } - sysMarshal, err := json.Marshal(filterData) + jobRule.RunRule, err = json.Marshal(filterData) if err != nil { klog.Errorf("Failed to marshal fileChange rule. err:%s", err) return nil, err } - jobRule.RunRule = sysMarshal + jobRules = append(jobRules, jobRule) } } @@ -200,79 +201,80 @@ func AllocationRule(rule interface{}, taskName string, allNode []corev1.Node, ct func mergeNodeRule(rule []map[string]interface{}) map[string][]map[string]interface{} { var mergeMap = make(map[string][]map[string]interface{}) for _, m := range rule { - for k, v := range m { - if k == "nodeName" { - mergeMap[v.(string)] = append(mergeMap[v.(string)], m) - } else if k == "nodeSelector" { - formatLabels := labels.FormatLabels(v.(map[string]string)) - mergeMap[formatLabels] = append(mergeMap[formatLabels], m) - } + nnv, nnvExist := m["nodeName"] + nsv, nsvExist := m["nodeSelector"] + if nnvExist { + mergeMap[nnv.(string)] = append(mergeMap[nnv.(string)], m) + } else if nsvExist { + convertMap := utils.MapValConvert[string](nsv.(map[string]interface{})) + formatLabels := labels.FormatLabels(convertMap) + mergeMap[formatLabels] = append(mergeMap[formatLabels], m) } } return mergeMap } -func ParseRules(ctx context.Context, clients *kube.KubernetesClient, taskName string, ruleGroup []kubeeyev1alpha2.InspectRule) ([]kubeeyev1alpha2.JobRule, map[string]int, error) { +func ParseRules(ctx context.Context, clients *kube.KubernetesClient, task *kubeeyev1alpha2.InspectTask, ruleGroup []kubeeyev1alpha2.InspectRule) ([]kubeeyev1alpha2.JobRule, map[string]int, error) { nodes := kube.GetNodes(ctx, clients.ClientSet) - ruleSpec, err := MergeRule(ruleGroup...) + ruleSpec, err := MergeRule(task, ruleGroup...) if err != nil { return nil, nil, err } var inspectRuleTotal = make(map[string]int) var executeRule []kubeeyev1alpha2.JobRule - component, err := Allocation(ruleSpec.Component, taskName, constant.Component) + component, err := Allocation(ruleSpec.Component, task.Name, constant.Component) if err == nil { executeRule = append(executeRule, *component) inspectRuleTotal[constant.Component] = TotalServiceNum(ctx, clients, ruleSpec.Component) } - opa, err := Allocation(ruleSpec.Opas, taskName, constant.Opa) + opa, err := Allocation(ruleSpec.Opas, task.Name, constant.Opa) if err == nil { executeRule = append(executeRule, *opa) inspectRuleTotal[constant.Opa] = len(ruleSpec.Opas) } - prometheus, err := Allocation(ruleSpec.Prometheus, taskName, constant.Prometheus) + prometheus, err := Allocation(ruleSpec.Prometheus, task.Name, constant.Prometheus) if err == nil { executeRule = append(executeRule, *prometheus) inspectRuleTotal[constant.Prometheus] = len(ruleSpec.Prometheus) } if len(nodes) > 0 { - change, err := AllocationRule(ruleSpec.FileChange, taskName, nodes, constant.FileChange) + change, err := AllocationRule(ruleSpec.FileChange, task.Name, nodes, constant.FileChange) if err != nil { return nil, nil, err } executeRule = append(executeRule, change...) inspectRuleTotal[constant.FileChange] = len(ruleSpec.FileChange) - sysctl, err := AllocationRule(ruleSpec.Sysctl, taskName, nodes, constant.Sysctl) + sysctl, err := AllocationRule(ruleSpec.Sysctl, task.Name, nodes, constant.Sysctl) if err != nil { return nil, nil, err } executeRule = append(executeRule, sysctl...) inspectRuleTotal[constant.Sysctl] = len(ruleSpec.Sysctl) - nodeInfo, err := AllocationRule(ruleSpec.NodeInfo, taskName, nodes, constant.NodeInfo) + nodeInfo, err := AllocationRule(ruleSpec.NodeInfo, task.Name, nodes, constant.NodeInfo) if err != nil { return nil, nil, err } executeRule = append(executeRule, nodeInfo...) inspectRuleTotal[constant.NodeInfo] = len(ruleSpec.NodeInfo) - systemd, err := AllocationRule(ruleSpec.Systemd, taskName, nodes, constant.Systemd) + systemd, err := AllocationRule(ruleSpec.Systemd, task.Name, nodes, constant.Systemd) if err != nil { return nil, nil, err } executeRule = append(executeRule, systemd...) inspectRuleTotal[constant.Systemd] = len(ruleSpec.Systemd) - fileFilter, err := AllocationRule(ruleSpec.FileFilter, taskName, nodes, constant.FileFilter) + fileFilter, err := AllocationRule(ruleSpec.FileFilter, task.Name, nodes, constant.FileFilter) if err != nil { return nil, nil, err } executeRule = append(executeRule, fileFilter...) inspectRuleTotal[constant.FileFilter] = len(ruleSpec.FileFilter) - customCommand, err := AllocationRule(ruleSpec.CustomCommand, taskName, nodes, constant.CustomCommand) + customCommand, err := AllocationRule(ruleSpec.CustomCommand, task.Name, nodes, constant.CustomCommand) if err != nil { return nil, nil, err } @@ -306,3 +308,180 @@ func TotalServiceNum(ctx context.Context, clients *kube.KubernetesClient, compon return componentRuleNumber } + +type ExecuteRule struct { + KubeClient *kube.KubernetesClient + Task *kubeeyev1alpha2.InspectTask + clusterInspectRuleMap map[string]string + clusterInspectRuleNames []string + ruleTotal map[string]int +} + +func NewExecuteRuleOptions(clients *kube.KubernetesClient, Task *kubeeyev1alpha2.InspectTask) *ExecuteRule { + clusterInspectRuleNames := []string{constant.Opa, constant.Prometheus, constant.Component} + clusterInspectRuleMap := map[string]string{ + "opas": constant.Opa, + "prometheus": constant.Prometheus, + "component": constant.Component, + "fileChange": constant.FileChange, + "sysctl": constant.Sysctl, + "systemd": constant.Systemd, + "fileFilter": constant.FileFilter, + "customCommand": constant.CustomCommand, + "nodeInfo": constant.NodeInfo, + } + return &ExecuteRule{ + KubeClient: clients, + Task: Task, + clusterInspectRuleNames: clusterInspectRuleNames, + clusterInspectRuleMap: clusterInspectRuleMap, + } +} + +func (e *ExecuteRule) SetRuleSchedule(rules []kubeeyev1alpha2.InspectRule) (newRules []kubeeyev1alpha2.InspectRule) { + for _, r := range e.Task.Spec.RuleNames { + _, isExist, rule := utils.ArrayFinds(rules, func(m kubeeyev1alpha2.InspectRule) bool { + return r.Name == m.Name + }) + if isExist { + if !utils.IsEmptyString(r.NodeName) || r.NodeSelector != nil { + toMap := utils.StructToMap(rule.Spec) + if toMap != nil { + for _, v := range toMap { + switch val := v.(type) { + case []interface{}: + for i := range val { + m := val[i].(map[string]interface{}) + _, nnExist := m["nodeName"] + _, nsExist := m["nodeSelector"] + if !nnExist && !nsExist { + m["nodeName"] = r.NodeName + m["nodeSelector"] = r.NodeSelector + } + } + } + } + rule.Spec = utils.MapToStruct[kubeeyev1alpha2.InspectRuleSpec](toMap)[0] + } + + } + newRules = append(newRules, rule) + } + } + return newRules +} + +func (e *ExecuteRule) SetPrometheusEndpoint(allRule []kubeeyev1alpha2.InspectRule) []kubeeyev1alpha2.InspectRule { + for i := range allRule { + if !utils.IsEmptyString(allRule[i].Spec.PrometheusEndpoint) && allRule[i].Spec.Prometheus != nil { + for p := range allRule[i].Spec.Prometheus { + if utils.IsEmptyString(allRule[i].Spec.Prometheus[p].Endpoint) { + allRule[i].Spec.Prometheus[p].Endpoint = allRule[i].Spec.PrometheusEndpoint + } + } + } + } + return allRule +} + +func (e *ExecuteRule) MergeRule(allRule []kubeeyev1alpha2.InspectRule) (kubeeyev1alpha2.InspectRuleSpec, error) { + var newRuleSpec kubeeyev1alpha2.InspectRuleSpec + var newSpec = make(map[string][]interface{}) + ruleTotal := map[string]int{constant.Component: 0} + for _, rule := range e.SetPrometheusEndpoint(e.SetRuleSchedule(allRule)) { + if rule.Spec.Component != nil && newRuleSpec.Component == nil { + newRuleSpec.Component = rule.Spec.Component + } + toMap := utils.StructToMap(rule.Spec) + for k, v := range toMap { + switch val := v.(type) { + case []interface{}: + newSpec[k] = RuleArrayDeduplication[interface{}](append(newSpec[k], val...)) + ruleTotal[e.clusterInspectRuleMap[k]] = len(newSpec[k]) + } + } + } + ruleTotal[constant.Component] = TotalServiceNum(context.TODO(), e.KubeClient, newRuleSpec.Component) + + marshal, err := json.Marshal(newSpec) + if err != nil { + return newRuleSpec, err + } + err = json.Unmarshal(marshal, &newRuleSpec) + if err != nil { + return newRuleSpec, err + } + e.ruleTotal = ruleTotal + return newRuleSpec, nil +} + +func (e *ExecuteRule) GenerateJob(ctx context.Context, rulesSpec kubeeyev1alpha2.InspectRuleSpec) (jobs []kubeeyev1alpha2.JobRule) { + + toMap := utils.StructToMap(rulesSpec) + nodes := kube.GetNodes(ctx, e.KubeClient.ClientSet) + for key, v := range toMap { + mapV, mapExist := e.clusterInspectRuleMap[key] + if mapExist { + _, exist := utils.ArrayFind(mapV, e.clusterInspectRuleNames) + if exist { + allocation, err := Allocation(v, e.Task.Name, mapV) + if err == nil { + jobs = append(jobs, *allocation) + } + } else { + allocationRule, err := AllocationRule(v, e.Task.Name, nodes, mapV) + if err == nil { + jobs = append(jobs, allocationRule...) + } + } + } + } + _, exist, _ := utils.ArrayFinds(jobs, func(m kubeeyev1alpha2.JobRule) bool { + return m.RuleType == constant.Component + }) + if !exist { + component, err := Allocation(nil, e.Task.Name, constant.Component) + if err == nil { + jobs = append(jobs, *component) + } + } + + return jobs +} + +func (e *ExecuteRule) CreateInspectRule(ctx context.Context, ruleGroup []kubeeyev1alpha2.JobRule) ([]kubeeyev1alpha2.JobRule, error) { + r := sortRuleOpaToAtLast(ruleGroup) + marshal, err := json.Marshal(r) + if err != nil { + return nil, err + } + + _, err = e.KubeClient.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Get(ctx, e.Task.Name, metav1.GetOptions{}) + if err == nil { + _ = e.KubeClient.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Delete(ctx, e.Task.Name, metav1.DeleteOptions{}) + } + // create temp inspect rule + configMapTemplate := template.BinaryConfigMapTemplate(e.Task.Name, constant.DefaultNamespace, marshal, true, map[string]string{constant.LabelInspectRuleGroup: "inspect-rule-temp"}) + _, err = e.KubeClient.ClientSet.CoreV1().ConfigMaps(constant.DefaultNamespace).Create(ctx, configMapTemplate, metav1.CreateOptions{}) + if err != nil { + return nil, err + } + return r, nil +} + +func sortRuleOpaToAtLast(rule []kubeeyev1alpha2.JobRule) []kubeeyev1alpha2.JobRule { + + finds, b, OpaRule := utils.ArrayFinds(rule, func(i kubeeyev1alpha2.JobRule) bool { + return i.RuleType == constant.Opa + }) + if b { + rule = append(rule[:finds], rule[finds+1:]...) + rule = append(rule, OpaRule) + } + + return rule +} + +func (e *ExecuteRule) GetRuleTotal() map[string]int { + return e.ruleTotal +} diff --git a/pkg/server/query/query.go b/pkg/server/query/query.go index 93761ce2..17166db3 100644 --- a/pkg/server/query/query.go +++ b/pkg/server/query/query.go @@ -132,7 +132,7 @@ func ParsePagination(values url.Values) *Pagination { } func (q *Query) GetPageData(data interface{}, c compare, f filterC) Result { - toMap, err := utils.StructToMap(data) + toMap, err := utils.ArrayStructToArrayMap(data) if err != nil { return Result{ TotalItems: 0, diff --git a/pkg/template/job_template.go b/pkg/template/job_template.go index 6fc16847..9bb102b3 100644 --- a/pkg/template/job_template.go +++ b/pkg/template/job_template.go @@ -73,6 +73,14 @@ func InspectJobsTemplate(jobConfig *conf.JobConfig, jobName string, inspectTask NodeName: nodeName, NodeSelector: nodeSelector, RestartPolicy: corev1.RestartPolicyNever, + Tolerations: []corev1.Toleration{ + { + Key: "", + Operator: corev1.TolerationOpExists, + Value: "", + Effect: "", + }, + }, Volumes: []corev1.Volume{{ Name: "root", VolumeSource: corev1.VolumeSource{ diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 84b3c603..63d1f079 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -76,7 +76,7 @@ func IsEmptyString(s string) bool { return len(strings.TrimSpace(s)) == 0 } -func StructToMap(obj interface{}) ([]map[string]interface{}, error) { +func ArrayStructToArrayMap(obj interface{}) ([]map[string]interface{}, error) { marshal, err := json.Marshal(obj) if err != nil { return nil, err @@ -89,6 +89,19 @@ func StructToMap(obj interface{}) ([]map[string]interface{}, error) { return result, nil } +func StructToMap(obj interface{}) map[string]interface{} { + marshal, err := json.Marshal(obj) + if err != nil { + return nil + } + var result map[string]interface{} + err = json.Unmarshal(marshal, &result) + if err != nil { + return nil + } + return result +} + func MapToStruct[T any](maps ...map[string]interface{}) []T { var result []T marshal, err := json.Marshal(maps) @@ -121,3 +134,10 @@ func MergeMap[T any](maps ...map[string]T) map[string]T { } return result } +func MapValConvert[T any](mapV1 map[string]interface{}) map[string]T { + var newMap = make(map[string]T) + for k, v := range mapV1 { + newMap[k] = v.(T) + } + return newMap +}