Skip to content

Commit

Permalink
Resampler implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
adityauj committed Aug 21, 2024
1 parent 362adab commit b186dca
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 56 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@
# Project specific ignores
/var
/configs


migrateTimestamps.pl
test_ccms_api.sh
220 changes: 180 additions & 40 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,45 +1,185 @@
{
"metrics": {
"flops_any": {
"frequency": 15,
"aggregation": "sum"
"metrics": {
"debug_metric": {
"frequency": 60,
"aggregation": "avg"
},
"clock": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_idle": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_iowait": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_irq": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_system": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_user": {
"frequency": 60,
"aggregation": "avg"
},
"nv_mem_util": {
"frequency": 60,
"aggregation": "avg"
},
"nv_temp": {
"frequency": 60,
"aggregation": "avg"
},
"nv_sm_clock": {
"frequency": 60,
"aggregation": "avg"
},
"acc_utilization": {
"frequency": 60,
"aggregation": "avg"
},
"acc_mem_used": {
"frequency": 60,
"aggregation": "sum"
},
"acc_power": {
"frequency": 60,
"aggregation": "sum"
},
"flops_any": {
"frequency": 60,
"aggregation": "sum"
},
"flops_dp": {
"frequency": 60,
"aggregation": "sum"
},
"flops_sp": {
"frequency": 60,
"aggregation": "sum"
},
"ib_recv": {
"frequency": 60,
"aggregation": "sum"
},
"ib_xmit": {
"frequency": 60,
"aggregation": "sum"
},
"ib_recv_pkts": {
"frequency": 60,
"aggregation": "sum"
},
"ib_xmit_pkts": {
"frequency": 60,
"aggregation": "sum"
},
"cpu_power": {
"frequency": 60,
"aggregation": "sum"
},
"core_power": {
"frequency": 60,
"aggregation": "sum"
},
"mem_power": {
"frequency": 60,
"aggregation": "sum"
},
"ipc": {
"frequency": 60,
"aggregation": "avg"
},
"cpu_load": {
"frequency": 60,
"aggregation": null
},
"lustre_close": {
"frequency": 60,
"aggregation": null
},
"lustre_open": {
"frequency": 60,
"aggregation": null
},
"lustre_statfs": {
"frequency": 60,
"aggregation": null
},
"lustre_read_bytes": {
"frequency": 60,
"aggregation": null
},
"lustre_write_bytes": {
"frequency": 60,
"aggregation": null
},
"net_bw": {
"frequency": 60,
"aggregation": null
},
"file_bw": {
"frequency": 60,
"aggregation": null
},
"mem_bw": {
"frequency": 60,
"aggregation": "sum"
},
"mem_cached": {
"frequency": 60,
"aggregation": null
},
"mem_used": {
"frequency": 60,
"aggregation": null
},
"net_bytes_in": {
"frequency": 60,
"aggregation": null
},
"net_bytes_out": {
"frequency": 60,
"aggregation": null
},
"nfs4_read": {
"frequency": 60,
"aggregation": null
},
"nfs4_total": {
"frequency": 60,
"aggregation": null
},
"nfs4_write": {
"frequency": 60,
"aggregation": null
},
"vectorization_ratio": {
"frequency": 60,
"aggregation": "avg"
}
},
"flops_dp": {
"frequency": 15,
"aggregation": "sum"
"checkpoints": {
"interval": "12h",
"directory": "./var/checkpoints",
"restore": "48h"
},
"flops_sp": {
"frequency": 15,
"aggregation": "sum"
"archive": {
"interval": "168h",
"directory": "./var/archive"
},
"mem_bw": {
"frequency": 15,
"aggregation": "sum"
"http-api": {
"address": "localhost:8081",
"https-cert-file": null,
"https-key-file": null
},
"load_one": {
"frequency": 15,
"aggregation": null
},
"load_five": {
"frequency": 15,
"aggregation": null
}
},
"checkpoints": {
"interval": "12h",
"directory": "./var/checkpoints",
"restore": "48h"
},
"archive": {
"interval": "168h",
"directory": "./var/archive"
},
"http-api": {
"address": "127.0.0.1:8081",
"https-cert-file": null,
"https-key-file": null
},
"retention-in-memory": "48h",
"nats": null,
"jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0="
}
"retention-in-memory": "48h",
"nats": null,
"jwt-public-key": "kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0="
}
20 changes: 11 additions & 9 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type ErrorResponse struct {
}

type ApiMetricData struct {
Error *string `json:"error,omitempty"`
Data util.FloatArray `json:"data,omitempty"`
From int64 `json:"from"`
To int64 `json:"to"`
Avg util.Float `json:"avg"`
Min util.Float `json:"min"`
Max util.Float `json:"max"`
Error *string `json:"error,omitempty"`
Data util.FloatArray `json:"data,omitempty"`
From int64 `json:"from"`
To int64 `json:"to"`
Resolution int64 `json:"resolution"`
Avg util.Float `json:"avg"`
Min util.Float `json:"min"`
Max util.Float `json:"max"`
}

func handleError(err error, statusCode int, rw http.ResponseWriter) {
Expand Down Expand Up @@ -234,6 +235,7 @@ type ApiQuery struct {
SubType *string `json:"subtype,omitempty"`
Metric string `json:"metric"`
Hostname string `json:"host"`
Resolution int64 `json:"resolution"`
TypeIds []string `json:"type-ids,omitempty"`
SubTypeIds []string `json:"subtype-ids,omitempty"`
ScaleFactor util.Float `json:"scale-by,omitempty"`
Expand Down Expand Up @@ -336,8 +338,8 @@ func handleQuery(rw http.ResponseWriter, r *http.Request) {
res := make([]ApiMetricData, 0, len(sels))
for _, sel := range sels {
data := ApiMetricData{}
data.Data, data.From, data.To, err = ms.Read(sel, query.Metric, req.From, req.To)
// log.Printf("data: %#v, %#v, %#v, %#v", data.Data, data.From, data.To, err)
data.Data, data.From, data.To, data.Resolution, err = ms.Read(sel, query.Metric, req.From, req.To, query.Resolution)

if err != nil {
msg := err.Error()
data.Error = &msg
Expand Down
4 changes: 4 additions & 0 deletions internal/memorystore/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (l *Level) sizeInBytes() int64 {
}
}

for _, child := range l.children {
size += child.sizeInBytes()
}

return size
}

Expand Down
22 changes: 15 additions & 7 deletions internal/memorystore/memorystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ClusterCockpit/cc-metric-store/internal/config"
"github.com/ClusterCockpit/cc-metric-store/internal/util"
"github.com/ClusterCockpit/cc-metric-store/pkg/resampler"
)

var (
Expand Down Expand Up @@ -178,17 +179,18 @@ func (m *MemoryStore) WriteToLevel(l *Level, selector []string, ts int64, metric
// If the level does not hold the metric itself, the data will be aggregated recursively from the children.
// The second and third return value are the actual from/to for the data. Those can be different from
// the range asked for if no data was available.
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64) ([]util.Float, int64, int64, error) {
func (m *MemoryStore) Read(selector util.Selector, metric string, from, to, resolution int64) ([]util.Float, int64, int64, int64, error) {
if from > to {
return nil, 0, 0, errors.New("invalid time range")
return nil, 0, 0, 0, errors.New("invalid time range")
}

minfo, ok := m.Metrics[metric]
if !ok {
return nil, 0, 0, errors.New("unkown metric: " + metric)
return nil, 0, 0, 0, errors.New("unkown metric: " + metric)
}

n, data := 0, make([]util.Float, (to-from)/minfo.Frequency+1)

err := m.root.findBuffers(selector, minfo.Offset, func(b *buffer) error {
cdata, cfrom, cto, err := b.read(from, to, data)
if err != nil {
Expand Down Expand Up @@ -221,21 +223,27 @@ func (m *MemoryStore) Read(selector util.Selector, metric string, from, to int64
})

if err != nil {
return nil, 0, 0, err
return nil, 0, 0, 0, err
} else if n == 0 {
return nil, 0, 0, errors.New("metric or host not found")
return nil, 0, 0, 0, errors.New("metric or host not found")
} else if n > 1 {
if minfo.Aggregation == config.AvgAggregation {
normalize := 1. / util.Float(n)
for i := 0; i < len(data); i++ {
data[i] *= normalize
}
} else if minfo.Aggregation != config.SumAggregation {
return nil, 0, 0, errors.New("invalid aggregation")
return nil, 0, 0, 0, errors.New("invalid aggregation")
}
}

return data, from, to, nil
data, resolution, err = resampler.LargestTriangleThreeBucket(data, minfo.Frequency, resolution)

if err != nil {
return nil, 0, 0, 0, err
}

return data, from, to, resolution, nil
}

// Release all buffers for the selected level and all its children that contain only
Expand Down
Loading

0 comments on commit b186dca

Please sign in to comment.