This repository has been archived by the owner on Jan 19, 2023. It is now read-only.
generated from vshn/go-bootstrap
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaccumulate.go
147 lines (126 loc) · 4.86 KB
/
accumulate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/cloudscale-ch/cloudscale-go-sdk/v2"
cloudscalev1 "github.com/vshn/provider-cloudscale/apis/cloudscale/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const (
organizationLabel = "appuio.io/organization"
namespaceLabel = "crossplane.io/claim-namespace"
)
// AccumulateKey represents one data point ("fact") in the billing database.
// The actual value for the data point is not present in this type, as this type is just a map key, and the corresponding value is stored as a map value.
type AccumulateKey struct {
Query string
Zone string
Tenant string
Namespace string
Start time.Time
}
// String returns the full "source" string as used by the appuio-cloud-reporting
func (this AccumulateKey) String() string {
return this.Query + ":" + this.Zone + ":" + this.Tenant + ":" + this.Namespace
}
/*
accumulateBucketMetrics gets all the bucket metrics from cloudscale and puts them into a map. The map key is the "AccumulateKey",
and the value is the raw value of the data returned by cloudscale (e.g. bytes, requests). In order to construct the
correct AccumulateKey, this function needs to fetch the ObjectUsers's tags, because that's where the zone, tenant and
namespace are stored.
This method is "accumulating" data because it collects data from possibly multiple ObjectsUsers under the same
AccumulateKey. This is because the billing system can't handle multiple ObjectsUsers per namespace.
*/
func accumulateBucketMetrics(ctx context.Context, date time.Time, cloudscaleClient *cloudscale.Client, k8sclient client.Client) (map[AccumulateKey]uint64, error) {
bucketMetricsRequest := cloudscale.BucketMetricsRequest{Start: date, End: date}
bucketMetrics, err := cloudscaleClient.Metrics.GetBucketMetrics(ctx, &bucketMetricsRequest)
if err != nil {
return nil, err
}
nsTenants, err := fetchNamespaces(ctx, k8sclient)
if err != nil {
return nil, err
}
buckets, err := fetchBuckets(ctx, k8sclient)
if err != nil {
return nil, err
}
accumulated := make(map[AccumulateKey]uint64)
for _, bucketMetricsData := range bucketMetrics.Data {
name := bucketMetricsData.Subject.BucketName
ns, ok := buckets[name]
if !ok {
fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket, bucket resource %q not found\n", name)
continue
}
tenant, ok := nsTenants[ns]
if !ok {
fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket, namespace %q not found in map\n", ns)
continue
}
err = accumulateBucketMetricsForObjectsUser(accumulated, bucketMetricsData, tenant, ns)
if err != nil {
fmt.Fprintf(os.Stderr, "WARNING: Cannot sync bucket %s: %v\n", name, err)
continue
}
}
return accumulated, nil
}
func fetchBuckets(ctx context.Context, k8sclient client.Client) (map[string]string, error) {
buckets := &cloudscalev1.BucketList{}
if err := k8sclient.List(ctx, buckets, client.HasLabels{namespaceLabel}); err != nil {
return nil, fmt.Errorf("bucket list: %w", err)
}
bucketNS := map[string]string{}
for _, b := range buckets.Items {
bucketNS[b.GetBucketName()] = b.Labels[namespaceLabel]
}
return bucketNS, nil
}
func fetchNamespaces(ctx context.Context, k8sclient client.Client) (map[string]string, error) {
namespaces := &corev1.NamespaceList{}
if err := k8sclient.List(ctx, namespaces, client.HasLabels{organizationLabel}); err != nil {
return nil, fmt.Errorf("namespace list: %w", err)
}
nsTenants := map[string]string{}
for _, ns := range namespaces.Items {
nsTenants[ns.Name] = ns.Labels[organizationLabel]
}
return nsTenants, nil
}
func accumulateBucketMetricsForObjectsUser(accumulated map[AccumulateKey]uint64, bucketMetricsData cloudscale.BucketMetricsData, tenant, namespace string) error {
if len(bucketMetricsData.TimeSeries) != 1 {
return fmt.Errorf("there must be exactly one metrics data point, found %d", len(bucketMetricsData.TimeSeries))
}
// For now all the buckets have the same zone. This may change in the future if Cloudscale decides to have different
// prices for different locations.
zone := sourceZones[0]
sourceStorage := AccumulateKey{
Query: sourceQueryStorage,
Zone: zone,
Tenant: tenant,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}
sourceTrafficOut := AccumulateKey{
Query: sourceQueryTrafficOut,
Zone: zone,
Tenant: tenant,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}
sourceRequests := AccumulateKey{
Query: sourceQueryRequests,
Zone: zone,
Tenant: tenant,
Namespace: namespace,
Start: bucketMetricsData.TimeSeries[0].Start,
}
accumulated[sourceStorage] += uint64(bucketMetricsData.TimeSeries[0].Usage.StorageBytes)
accumulated[sourceTrafficOut] += uint64(bucketMetricsData.TimeSeries[0].Usage.SentBytes)
accumulated[sourceRequests] += uint64(bucketMetricsData.TimeSeries[0].Usage.Requests)
return nil
}