Skip to content

Commit

Permalink
feat: improve kube api usage and reduce unnecessary reconciliations
Browse files Browse the repository at this point in the history
Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
  • Loading branch information
gdsoumya committed Oct 20, 2024
1 parent cf2a376 commit d41e1a6
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 109 deletions.
15 changes: 12 additions & 3 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package main

import (
"net/http"
"time"

"k8s.io/apimachinery/pkg/util/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/workqueue"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
Expand All @@ -39,15 +42,21 @@ func init() {
}

func main() {
clients := clients.ClientSets{}
stop := make(chan struct{})
defer close(stop)
defer runtime.HandleCrash()

wq := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
defer wq.ShutDown()

//Getting kubeConfig and Generate ClientSets
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
clientset, err := clients.NewClientSet(stop, 5*time.Minute, wq)
if err != nil {
log.Fatalf("Unable to Get the kubeconfig, err: %v", err)
}

// Trigger the chaos metrics collection
go controller.Exporter(clients)
go controller.Exporter(clientset, wq)

//This section will start the HTTP server and expose metrics on the /metrics endpoint.
http.Handle("/metrics", promhttp.Handler())
Expand Down
27 changes: 13 additions & 14 deletions controller/collect-data.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"math"
"strconv"
"strings"
Expand All @@ -12,15 +11,15 @@ import (
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
clientTypes "k8s.io/apimachinery/pkg/types"
)

//go:generate mockgen -destination=mocks/mock_collect-data.go -package=mocks github.com/litmuschaos/chaos-exporter/controller ResultCollector

// ResultCollector interface for the both functions GetResultList and getExperimentMetricsFromResult
type ResultCollector interface {
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error)
GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error)
GetExperimentMetricsFromResult(chaosResult *litmuschaosv1alpha1.ChaosResult, clients clients.ClientSets) (bool, error)
SetResultDetails()
GetResultDetails() ChaosResultDetails
Expand All @@ -30,28 +29,28 @@ type ResultDetails struct {
}

// GetResultList return the result list correspond to the monitoring enabled chaosengine
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) (litmuschaosv1alpha1.ChaosResultList, error) {
func (r *ResultDetails) GetResultList(clients clients.ClientSets, chaosNamespace string, monitoringEnabled *MonitoringEnabled) ([]*v1alpha1.ChaosResult, error) {

chaosResultList, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosResults(chaosNamespace).List(context.Background(), metav1.ListOptions{})
chaosResultList, err := clients.ResultInformer.ChaosResults(chaosNamespace).List(labels.Everything())
if err != nil {
return litmuschaosv1alpha1.ChaosResultList{}, err
return nil, err
}
// waiting until any chaosresult found
if len(chaosResultList.Items) == 0 {
if len(chaosResultList) == 0 {
if monitoringEnabled.IsChaosResultsAvailable {
monitoringEnabled.IsChaosResultsAvailable = false
log.Warnf("No chaosresult found!")
log.Info("[Wait]: Waiting for the chaosresult ... ")
}
return litmuschaosv1alpha1.ChaosResultList{}, nil
return nil, nil
}

if !monitoringEnabled.IsChaosResultsAvailable {
log.Info("[Wait]: Cheers! Wait is over, found desired chaosresult")
monitoringEnabled.IsChaosResultsAvailable = true
}

return *chaosResultList, nil
return chaosResultList, nil
}

// GetExperimentMetricsFromResult derive all the metrics data from the chaosresult and set into resultDetails struct
Expand All @@ -61,7 +60,7 @@ func (r *ResultDetails) GetExperimentMetricsFromResult(chaosResult *litmuschaosv
if err != nil {
return false, err
}
engine, err := clients.LitmusClient.LitmuschaosV1alpha1().ChaosEngines(chaosResult.Namespace).Get(context.Background(), chaosResult.Spec.EngineName, metav1.GetOptions{})
engine, err := clients.EngineInformer.ChaosEngines(chaosResult.Namespace).Get(chaosResult.Spec.EngineName)
if err != nil {
// k8serrors.IsNotFound(err) checking k8s resource is found or not,
// It will skip this result if k8s resource is not found.
Expand Down Expand Up @@ -267,14 +266,14 @@ func getProbeSuccessPercentage(chaosResult *litmuschaosv1alpha1.ChaosResult) (fl
// getEventsForSpecificInvolvedResource derive all the events correspond to the specific resource
func getEventsForSpecificInvolvedResource(clients clients.ClientSets, resourceUID clientTypes.UID, chaosNamespace string) (corev1.EventList, error) {
finalEventList := corev1.EventList{}
eventsList, err := clients.KubeClient.CoreV1().Events(chaosNamespace).List(context.Background(), metav1.ListOptions{})
eventsList, err := clients.EventsInformer.Events(chaosNamespace).List(labels.Everything())
if err != nil {
return corev1.EventList{}, err
}

for _, event := range eventsList.Items {
if event.InvolvedObject.UID == resourceUID {
finalEventList.Items = append(finalEventList.Items, event)
for _, event := range eventsList {
if event != nil && event.InvolvedObject.UID == resourceUID {
finalEventList.Items = append(finalEventList.Items, *event)
}
}
return finalEventList, nil
Expand Down
7 changes: 6 additions & 1 deletion controller/collect-data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controller_test

import (
"context"
"testing"

"github.com/litmuschaos/chaos-exporter/controller"
"github.com/litmuschaos/chaos-exporter/pkg/clients"
"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
Expand All @@ -12,7 +14,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"testing"
"k8s.io/client-go/util/workqueue"
)

func TestGetResultList(t *testing.T) {
Expand Down Expand Up @@ -183,5 +185,8 @@ func CreateFakeClient(t *testing.T) clients.ClientSets {
cs := clients.ClientSets{}
cs.KubeClient = fake.NewSimpleClientset([]runtime.Object{}...)
cs.LitmusClient = litmusFakeClientSet.NewSimpleClientset([]runtime.Object{}...)
stopCh := make(chan struct{})
err := cs.SetupInformers(stopCh, cs.KubeClient, cs.LitmusClient, 0, workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()))
require.NoError(t, err)
return cs
}
20 changes: 13 additions & 7 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ limitations under the License.
package controller

import (
"time"

"github.com/litmuschaos/chaos-exporter/pkg/clients"
"github.com/litmuschaos/chaos-exporter/pkg/log"
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/util/workqueue"
)

// Exporter continuously collects the chaos metrics for a given chaosengine
func Exporter(clients clients.ClientSets) {
func Exporter(clientSet clients.ClientSets, wq workqueue.RateLimitingInterface) {
log.Info("Started creating Metrics")
// Register the fixed (count) chaos metrics
log.Info("Registering Fixed Metrics")
Expand All @@ -35,7 +34,7 @@ func Exporter(clients clients.ClientSets) {
ResultCollector: &ResultDetails{},
}
//gaugeMetrics := GaugeMetrics{}
overallChaosResults := litmuschaosv1alpha1.ChaosResultList{}
overallChaosResults := []*litmuschaosv1alpha1.ChaosResult{}

r.GaugeMetrics.InitializeGaugeMetrics().
RegisterFixedMetrics()
Expand All @@ -45,11 +44,18 @@ func Exporter(clients clients.ClientSets) {
IsChaosEnginesAvailable: true,
}

for {
if err := r.GetLitmusChaosMetrics(clients, &overallChaosResults, &monitoringEnabled); err != nil {
// refresh metrics whenever there's a change in chaosengine or chaosresult
// or every informer resync duration, whichever is earlier
for _, done := wq.Get(); !done; _, done = wq.Get() {
needRequeue, err := r.GetLitmusChaosMetrics(clientSet, overallChaosResults, &monitoringEnabled)
if err != nil {
log.Errorf("err: %v", err)
}
time.Sleep(1000 * time.Millisecond)
wq.Done(clients.ProcessKey)
// Add after
if needRequeue != nil {
wq.AddAfter(clients.ProcessKey, *needRequeue)
}
}
}

Expand Down
33 changes: 18 additions & 15 deletions controller/handle-result-deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"fmt"
"os"
"strconv"
"time"

litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
)

// unsetDeletedChaosResults unset the metrics correspond to deleted chaosresults
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults *litmuschaosv1alpha1.ChaosResultList) {
for _, oldResult := range oldChaosResults.Items {
func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newChaosResults []*litmuschaosv1alpha1.ChaosResult) {
for _, oldResult := range oldChaosResults {
found := false
for _, newResult := range newChaosResults.Items {
for _, newResult := range newChaosResults {
if oldResult.UID == newResult.UID {
found = true
break
Expand All @@ -22,7 +23,7 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC
if !found {
for _, value := range resultStore[string(oldResult.UID)] {

probeSuccesPercentage, _ := getProbeSuccessPercentage(&oldResult)
probeSuccesPercentage, _ := getProbeSuccessPercentage(oldResult)
resultDetails := initialiseResult().
setName(oldResult.Name).
setNamespace(oldResult.Namespace).
Expand All @@ -46,10 +47,13 @@ func (gaugeMetrics *GaugeMetrics) unsetDeletedChaosResults(oldChaosResults, newC

// unsetOutdatedMetrics unset the metrics when chaosresult verdict changes
// if same chaosresult is continuously repeated more than scrape interval then it sets the metrics value to 0
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) float64 {
func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResultDetails) (float64, *time.Duration) {
scrapeTime, _ := strconv.Atoi(getEnv("TSDB_SCRAPE_INTERVAL", "10"))
result, ok := matchVerdict[string(resultDetails.UID)]
reset := false
var needRequeue *time.Duration

scrapeDuration := time.Duration(scrapeTime) * time.Second

switch ok {
case true:
Expand All @@ -59,18 +63,17 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
gaugeMetrics.ResultVerdict.DeleteLabelValues(resultDetails.Namespace, resultDetails.Name, resultDetails.ChaosEngineName,
resultDetails.ChaosEngineContext, result.Verdict, fmt.Sprintf("%f", result.ProbeSuccessPercentage), resultDetails.AppLabel,
resultDetails.AppNs, resultDetails.AppKind, resultDetails.WorkflowName, result.FaultName)
result.Count = 1
result.Timer = time.Now()
needRequeue = &scrapeDuration
default:
// if time passed scrape time then reset the value to 0
if result.Count >= scrapeTime {
if time.Since(result.Timer) >= scrapeDuration {
reset = true
} else {
result.Count++
}
}
default:
result = initialiseResultData().
setCount(1).
setTimer(time.Now()).
setVerdictReset(false)
}

Expand All @@ -80,9 +83,9 @@ func (gaugeMetrics *GaugeMetrics) unsetOutdatedMetrics(resultDetails ChaosResult
setVerdictReset(reset)

if reset {
return float64(0)
return float64(0), needRequeue
}
return float64(1)
return float64(1), needRequeue
}

// getEnv derived the ENVs and sets the default value if env contains empty value
Expand All @@ -105,7 +108,7 @@ func (resultDetails *ChaosResultDetails) setResultData() {
setAppLabel(resultDetails.AppLabel).
setVerdict(resultDetails.Verdict).
setFaultName(resultDetails.FaultName).
setCount(0).
setTimer(time.Now()).
setVerdictReset(false).
setProbeSuccesPercentage(resultDetails.ProbeSuccessPercentage)

Expand Down Expand Up @@ -164,8 +167,8 @@ func (resultData *ResultData) setFaultName(fault string) *ResultData {
}

// setCount sets the count inside resultData struct
func (resultData *ResultData) setCount(count int) *ResultData {
resultData.Count = count
func (resultData *ResultData) setTimer(timer time.Time) *ResultData {
resultData.Timer = timer
return resultData
}

Expand Down
27 changes: 12 additions & 15 deletions controller/handle-result-deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package controller

import (
"errors"
"testing"

"github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
)

func Test_unsetDeletedChaosResults(t *testing.T) {
Expand All @@ -15,8 +16,8 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
execFunc func(details *ChaosResultDetails)
isErr bool
resultDetails *ChaosResultDetails
oldChaosResult *v1alpha1.ChaosResultList
newChaosResult *v1alpha1.ChaosResultList
oldChaosResult []*v1alpha1.ChaosResult
newChaosResult []*v1alpha1.ChaosResult
}{
{
name: "success: deleted chaosResult",
Expand All @@ -26,21 +27,17 @@ func Test_unsetDeletedChaosResults(t *testing.T) {
resultDetails: &ChaosResultDetails{
UID: "FAKE-UID-OLD",
},
oldChaosResult: &v1alpha1.ChaosResultList{
Items: []v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-OLD",
},
oldChaosResult: []*v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-OLD",
},
},
},
newChaosResult: &v1alpha1.ChaosResultList{
Items: []v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-NEW",
},
newChaosResult: []*v1alpha1.ChaosResult{
{
ObjectMeta: metav1.ObjectMeta{
UID: "FAKE-UID-NEW",
},
},
},
Expand Down
Loading

0 comments on commit d41e1a6

Please sign in to comment.