Skip to content

Commit

Permalink
Added scheduler related metrics (Azure#674)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelawyu authored Feb 21, 2024
1 parent d23d9d3 commit 6be95ff
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
6 changes: 5 additions & 1 deletion cmd/hubagent/workload/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const (

resourceChangeControllerName = "resource-change-controller"
mcPlacementControllerName = "memberCluster-placement-controller"

schedulerQueueName = "scheduler-queue"
)

var (
Expand Down Expand Up @@ -225,7 +227,9 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager,
klog.Info("Setting up scheduler")
defaultProfile := profile.NewDefaultProfile()
defaultFramework := framework.NewFramework(defaultProfile, mgr)
defaultSchedulingQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue()
defaultSchedulingQueue := queue.NewSimpleClusterResourcePlacementSchedulingQueue(
queue.WithName(schedulerQueueName),
)
defaultScheduler := scheduler.NewScheduler("DefaultScheduler", defaultFramework, defaultSchedulingQueue, mgr)
klog.Info("Starting the scheduler")
// Scheduler must run in a separate goroutine as Run() is a blocking call.
Expand Down
19 changes: 19 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,22 @@ var (
}).Inc()
}
)

// The scheduler related metrics.
var (
// SchedulingCycleDurationMilliseconds is a Fleet scheduler metric that tracks how long it
// takes to complete a scheduling loop run.
SchedulingCycleDurationMilliseconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "scheduling_cycle_duration_milliseconds",
Help: "The duration of a scheduling cycle run in milliseconds",
Buckets: []float64{
10, 50, 100, 500, 1000, 5000, 10000, 50000,
},
},
[]string{
"is_failed",
"needs_requeue",
},
)
)
15 changes: 13 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/metrics"
"go.goms.io/fleet/pkg/scheduler/framework"
"go.goms.io/fleet/pkg/scheduler/queue"
"go.goms.io/fleet/pkg/utils/controller"
Expand Down Expand Up @@ -80,8 +81,6 @@ func NewScheduler(
}

// ScheduleOnce performs scheduling for one single item pulled from the work queue.
//
// TO-DO (chenyu1): add scheduler related metrics.
func (s *Scheduler) scheduleOnce(ctx context.Context) {
// Retrieve the next item (name of a CRP) from the work queue.
//
Expand Down Expand Up @@ -177,18 +176,21 @@ func (s *Scheduler) scheduleOnce(ctx context.Context) {
//
// Note that the scheduler will enter this cycle as long as the CRP is active and an active
// policy snapshot has been produced.
cycleStartTime := time.Now()
res, err := s.framework.RunSchedulingCycleFor(ctx, crp.Name, latestPolicySnapshot)
if err != nil {
klog.ErrorS(err, "Failed to run scheduling cycle", "clusterResourcePlacement", crpRef)
// Requeue for later processing.
s.queue.AddRateLimited(crpName)
observeSchedulingCycleMetrics(cycleStartTime, true, false)
return
}

// Requeue if the scheduling cycle suggests so.
if res.Requeue {
if res.RequeueAfter > 0 {
s.queue.AddAfter(crpName, res.RequeueAfter)
observeSchedulingCycleMetrics(cycleStartTime, false, true)
return
}
// Untrack the key from the rate limiter.
Expand All @@ -202,7 +204,9 @@ func (s *Scheduler) scheduleOnce(ctx context.Context) {
// finish the scheduling in multiple cycles); in such cases, rate limiter should not add
// any delay to the requeues.
s.queue.Add(crpName)
observeSchedulingCycleMetrics(cycleStartTime, false, true)
}
observeSchedulingCycleMetrics(cycleStartTime, false, false)
}

// Run starts the scheduler.
Expand Down Expand Up @@ -342,3 +346,10 @@ func (s *Scheduler) addSchedulerCleanUpFinalizer(ctx context.Context, crp *fleet

return nil
}

// observeSchedulingCycleMetrics adds a data point to the scheduling cycle duration metric.
func observeSchedulingCycleMetrics(startTime time.Time, isFailed, needsRequeue bool) {
metrics.SchedulingCycleDurationMilliseconds.
WithLabelValues(fmt.Sprintf("%t", isFailed), fmt.Sprintf("%t", needsRequeue)).
Observe(float64(time.Since(startTime).Milliseconds()))
}
51 changes: 51 additions & 0 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"log"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/client_golang/prometheus/testutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1"
"go.goms.io/fleet/pkg/metrics"
)

const (
Expand Down Expand Up @@ -272,3 +276,50 @@ func TestAddSchedulerCleanUpFinalizer(t *testing.T) {
t.Errorf("updated CRP diff (-got, +want): %s", diff)
}
}

func TestObserveSchedulingCycleMetrics(t *testing.T) {
metricMetadata := `
# HELP scheduling_cycle_duration_milliseconds The duration of a scheduling cycle run in milliseconds
# TYPE scheduling_cycle_duration_milliseconds histogram
`

testCases := []struct {
name string
cycleStartTime time.Time
wantMetricCount int
wantHistogram string
}{
{
name: "should observe a data point",
cycleStartTime: time.Now(),
wantMetricCount: 1,
wantHistogram: `
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="10"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="50"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="100"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="500"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="1000"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="5000"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="10000"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="50000"} 1
scheduling_cycle_duration_milliseconds_bucket{is_failed="false",needs_requeue="false",le="+Inf"} 1
scheduling_cycle_duration_milliseconds_sum{is_failed="false",needs_requeue="false"} 0
scheduling_cycle_duration_milliseconds_count{is_failed="false",needs_requeue="false"} 1
`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
observeSchedulingCycleMetrics(tc.cycleStartTime, false, false)

if c := testutil.CollectAndCount(metrics.SchedulingCycleDurationMilliseconds); c != tc.wantMetricCount {
t.Fatalf("metric counts, got %d, want %d", c, tc.wantMetricCount)
}

if err := testutil.CollectAndCompare(metrics.SchedulingCycleDurationMilliseconds, strings.NewReader(metricMetadata+tc.wantHistogram)); err != nil {
t.Errorf("%s", err)
}
})
}
}

0 comments on commit 6be95ff

Please sign in to comment.