From 741f897a42ad94578a9a4e986f4779f376aaf798 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Thu, 24 Oct 2024 10:20:27 +0100 Subject: [PATCH] Improvements to simulation following testing (#260) (#4023) * fix test * lint Co-authored-by: Christopher Martin --- cmd/simulator/cmd/root.go | 2 +- internal/scheduler/simulator/simulator.go | 5 +- .../simulator/sink/fair_share_writer.go | 64 ----------- .../simulator/sink/queue_stats_writer.go | 101 ++++++++++++++++++ internal/scheduler/simulator/sink/sink.go | 14 +-- 5 files changed, 113 insertions(+), 73 deletions(-) delete mode 100644 internal/scheduler/simulator/sink/fair_share_writer.go create mode 100644 internal/scheduler/simulator/sink/queue_stats_writer.go diff --git a/cmd/simulator/cmd/root.go b/cmd/simulator/cmd/root.go index 6f3fcc97375..0e5951d5e4a 100644 --- a/cmd/simulator/cmd/root.go +++ b/cmd/simulator/cmd/root.go @@ -84,7 +84,7 @@ func runSimulations(cmd *cobra.Command, args []string) error { } if pathExists(outputDirPath) && overwriteDirIfExists { - err := os.Remove(outputDirPath) + err := os.RemoveAll(outputDirPath) if err != nil { return err } diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 9b4da445c1f..f600329c187 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -210,7 +210,7 @@ func (s *Simulator) Run(ctx *armadacontext.Context) error { return err } } - if time.Now().Unix()-lastLogTime.Unix() >= 15 { + if time.Now().Unix()-lastLogTime.Unix() >= 5 { ctx.Infof("Simulator time %s", s.time) lastLogTime = s.time } @@ -518,6 +518,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { return err } } + sctx.UpdateFairShares() constraints := schedulerconstraints.NewSchedulingConstraints(pool, totalResources, s.schedulingConfig, nil) sch := scheduling.NewPreemptingQueueScheduler( sctx, @@ -544,7 +545,7 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { return err } - err = s.sink.OnCycleEnd(result) + err = s.sink.OnCycleEnd(s.time, result) if err != nil { return err } diff --git a/internal/scheduler/simulator/sink/fair_share_writer.go b/internal/scheduler/simulator/sink/fair_share_writer.go deleted file mode 100644 index 43ead572a66..00000000000 --- a/internal/scheduler/simulator/sink/fair_share_writer.go +++ /dev/null @@ -1,64 +0,0 @@ -package sink - -import ( - "os" - - parquetWriter "github.com/xitongsys/parquet-go/writer" - - "github.com/armadaproject/armada/internal/common/armadacontext" - "github.com/armadaproject/armada/internal/scheduler/scheduling" -) - -type FairShareRow struct { - Ts int64 `parquet:"name=ts, type=INT64"` - Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` - Pool string `parquet:"name=pool, type=BYTE_ARRAY, convertedtype=UTF8"` - FairShare float64 `parquet:"name=fair_share, type=DOUBLE"` - AdjustedFairShare float64 `parquet:"name=adjusted_fair_share, type=DOUBLE"` - ActualShare float64 `parquet:"name=actual_share, type=DOUBLE"` -} - -type FairShareWriter struct { - writer *parquetWriter.ParquetWriter -} - -func NewFairShareWriter(path string) (*FairShareWriter, error) { - fileWriter, err := os.Create(path + "/fair_share.parquet") - if err != nil { - return nil, err - } - pw, err := parquetWriter.NewParquetWriterFromWriter(fileWriter, new(FairShareRow), 1) - if err != nil { - return nil, err - } - return &FairShareWriter{ - writer: pw, - }, nil -} - -func (j *FairShareWriter) Update(result *scheduling.SchedulerResult) error { - for _, sctx := range result.SchedulingContexts { - for _, qctx := range sctx.QueueSchedulingContexts { - row := FairShareRow{ - Ts: 0, - Queue: qctx.Queue, - Pool: sctx.Pool, - FairShare: qctx.FairShare, - AdjustedFairShare: qctx.AdjustedFairShare, - ActualShare: sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx), - } - err := j.writer.Write(row) - if err != nil { - return err - } - } - } - return nil -} - -func (j *FairShareWriter) Close(ctx *armadacontext.Context) { - err := j.writer.WriteStop() - if err != nil { - ctx.Warnf("Could not clearnly close fair share parquet file: %s", err) - } -} diff --git a/internal/scheduler/simulator/sink/queue_stats_writer.go b/internal/scheduler/simulator/sink/queue_stats_writer.go new file mode 100644 index 00000000000..07b575e6509 --- /dev/null +++ b/internal/scheduler/simulator/sink/queue_stats_writer.go @@ -0,0 +1,101 @@ +package sink + +import ( + "os" + "time" + + parquetWriter "github.com/xitongsys/parquet-go/writer" + + "github.com/armadaproject/armada/internal/common/armadacontext" + "github.com/armadaproject/armada/internal/scheduler/scheduling" + "github.com/armadaproject/armada/internal/scheduler/scheduling/context" +) + +type QueueStatsRow struct { + Ts int64 `parquet:"name=ts, type=INT64"` + Queue string `parquet:"name=queue, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"` + Pool string `parquet:"name=pool, type=BYTE_ARRAY, convertedtype=UTF8"` + FairShare float64 `parquet:"name=fair_share, type=DOUBLE"` + AdjustedFairShare float64 `parquet:"name=adjusted_fair_share, type=DOUBLE"` + ActualShare float64 `parquet:"name=actual_share, type=DOUBLE"` + CpuShare float64 `parquet:"name=cpu_share, type=DOUBLE"` + MemoryShare float64 `parquet:"name=memory_share, type=DOUBLE"` + GpuShare float64 `parquet:"name=gpu_share, type=DOUBLE"` + AllocatedCPU int `parquet:"name=allocated_cpu, type=INT64"` + AllocatedMemory int `parquet:"name=allocated_memory, type=INT64"` + AllocatedGPU int `parquet:"name=allocated_gpu, type=INT64"` + NumScheduled int `parquet:"name=num_scheduled, type=INT32"` + NumPreempted int `parquet:"name=num_preempted, type=INT32"` + NumEvicted int `parquet:"name=num_evicted, type=INT32"` +} + +type QueueStatsWriter struct { + writer *parquetWriter.ParquetWriter +} + +func NewQueueStatsWriter(path string) (*QueueStatsWriter, error) { + fileWriter, err := os.Create(path + "/queue_stats.parquet") + if err != nil { + return nil, err + } + pw, err := parquetWriter.NewParquetWriterFromWriter(fileWriter, new(QueueStatsRow), 1) + if err != nil { + return nil, err + } + return &QueueStatsWriter{ + writer: pw, + }, nil +} + +func (j *QueueStatsWriter) Update(time time.Time, result *scheduling.SchedulerResult) error { + // Work out number of preemptions per queue + preemptedJobsByQueue := map[string]int{} + for _, job := range result.PreemptedJobs { + preemptedJobsByQueue[job.Job.Queue()] = preemptedJobsByQueue[job.Job.Queue()] + 1 + } + + for _, sctx := range result.SchedulingContexts { + for _, qctx := range sctx.QueueSchedulingContexts { + row := QueueStatsRow{ + Ts: time.Unix(), + Queue: qctx.Queue, + Pool: sctx.Pool, + FairShare: qctx.FairShare, + AdjustedFairShare: qctx.AdjustedFairShare, + ActualShare: sctx.FairnessCostProvider.UnweightedCostFromQueue(qctx), + CpuShare: calculateResourceShare(sctx, qctx, "cpu"), + MemoryShare: calculateResourceShare(sctx, qctx, "memory"), + GpuShare: calculateResourceShare(sctx, qctx, "nvidia.com/gpu"), + AllocatedCPU: allocatedResources(qctx, "cpu"), + AllocatedMemory: allocatedResources(qctx, "memory") / (1024 * 1024), // in MB + AllocatedGPU: allocatedResources(qctx, "nvidia.com/gpu"), + NumScheduled: len(qctx.SuccessfulJobSchedulingContexts), + NumPreempted: preemptedJobsByQueue[qctx.Queue], + NumEvicted: len(qctx.EvictedJobsById), + } + err := j.writer.Write(row) + if err != nil { + return err + } + } + } + return nil +} + +func (j *QueueStatsWriter) Close(ctx *armadacontext.Context) { + err := j.writer.WriteStop() + if err != nil { + ctx.Warnf("Could not cleanly close queue_stats parquet file: %s", err) + } +} + +func calculateResourceShare(sctx *context.SchedulingContext, qctx *context.QueueSchedulingContext, resource string) float64 { + total := sctx.Allocated.Resources[resource] + allocated := qctx.Allocated.Resources[resource] + return allocated.AsApproximateFloat64() / total.AsApproximateFloat64() +} + +func allocatedResources(qctx *context.QueueSchedulingContext, resource string) int { + allocated := qctx.Allocated.Resources[resource] + return int(allocated.AsApproximateFloat64()) +} diff --git a/internal/scheduler/simulator/sink/sink.go b/internal/scheduler/simulator/sink/sink.go index 2a65cc26711..b2c599dc07d 100644 --- a/internal/scheduler/simulator/sink/sink.go +++ b/internal/scheduler/simulator/sink/sink.go @@ -1,6 +1,8 @@ package sink import ( + "time" + "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/scheduler/scheduling" "github.com/armadaproject/armada/internal/scheduler/simulator/model" @@ -8,13 +10,13 @@ import ( type Sink interface { OnNewStateTransitions(transitions []*model.StateTransition) error - OnCycleEnd(result *scheduling.SchedulerResult) error + OnCycleEnd(time time.Time, result *scheduling.SchedulerResult) error Close(ctx *armadacontext.Context) } type ParquetSink struct { jobWriter *JobWriter - fairShareWriter *FairShareWriter + fairShareWriter *QueueStatsWriter } func NewParquetSink(outputDir string) (*ParquetSink, error) { @@ -22,7 +24,7 @@ func NewParquetSink(outputDir string) (*ParquetSink, error) { if err != nil { return nil, err } - fairShareWriter, err := NewFairShareWriter(outputDir) + fairShareWriter, err := NewQueueStatsWriter(outputDir) if err != nil { return nil, err } @@ -42,8 +44,8 @@ func (s *ParquetSink) OnNewStateTransitions(transitions []*model.StateTransition return nil } -func (s *ParquetSink) OnCycleEnd(result *scheduling.SchedulerResult) error { - err := s.fairShareWriter.Update(result) +func (s *ParquetSink) OnCycleEnd(time time.Time, result *scheduling.SchedulerResult) error { + err := s.fairShareWriter.Update(time, result) if err != nil { return err } @@ -61,7 +63,7 @@ func (s NullSink) OnNewStateTransitions(_ []*model.StateTransition) error { return nil } -func (s NullSink) OnCycleEnd(_ *scheduling.SchedulerResult) error { +func (s NullSink) OnCycleEnd(_ time.Time, _ *scheduling.SchedulerResult) error { return nil }