-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
process.go
194 lines (186 loc) · 10.2 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package flinkmetricsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver"
import (
"strings"
"go.opentelemetry.io/collector/pdata/pcommon"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/flinkmetricsreceiver/internal/models"
)
func (s *flinkmetricsScraper) processJobmanagerMetrics(now pcommon.Timestamp, jobmanagerMetrics *models.JobmanagerMetrics) {
if jobmanagerMetrics == nil {
return
}
for _, metric := range jobmanagerMetrics.Metrics {
switch metric.ID {
case "Status.JVM.CPU.Load":
_ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
case "Status.JVM.GarbageCollector.PS_MarkSweep.Time":
_ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
case "Status.JVM.GarbageCollector.PS_Scavenge.Time":
_ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
case "Status.JVM.GarbageCollector.PS_MarkSweep.Count":
_ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSMarkSweep)
case "Status.JVM.GarbageCollector.PS_Scavenge.Count":
_ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNamePSScavenge)
case "Status.Flink.Memory.Managed.Used":
_ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
case "Status.Flink.Memory.Managed.Total":
_ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
case "Status.JVM.Memory.Mapped.TotalCapacity":
_ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
case "Status.JVM.Memory.Mapped.MemoryUsed":
_ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
case "Status.JVM.CPU.Time":
_ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
case "Status.JVM.Threads.Count":
_ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Committed":
_ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Committed":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Max":
_ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Committed":
_ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Used":
_ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Max":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.Direct.MemoryUsed":
_ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Direct.TotalCapacity":
_ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
case "Status.JVM.ClassLoader.ClassesLoaded":
_ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Used":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Max":
_ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Used":
_ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
}
}
rb := s.mb.NewResourceBuilder()
rb.SetHostName(jobmanagerMetrics.Host)
rb.SetFlinkResourceTypeJobmanager()
s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
func (s *flinkmetricsScraper) processTaskmanagerMetrics(now pcommon.Timestamp, taskmanagerMetricInstances []*models.TaskmanagerMetrics) {
for _, taskmanagerMetrics := range taskmanagerMetricInstances {
for _, metric := range taskmanagerMetrics.Metrics {
switch metric.ID {
case "Status.JVM.GarbageCollector.G1_Young_Generation.Count":
_ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
case "Status.JVM.GarbageCollector.G1_Old_Generation.Count":
_ = s.mb.RecordFlinkJvmGcCollectionsCountDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
case "Status.JVM.GarbageCollector.G1_Old_Generation.Time":
_ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1OldGeneration)
case "Status.JVM.GarbageCollector.G1_Young_Generation.Time":
_ = s.mb.RecordFlinkJvmGcCollectionsTimeDataPoint(now, metric.Value, metadata.AttributeGarbageCollectorNameG1YoungGeneration)
case "Status.JVM.CPU.Load":
_ = s.mb.RecordFlinkJvmCPULoadDataPoint(now, metric.Value)
case "Status.Flink.Memory.Managed.Used":
_ = s.mb.RecordFlinkMemoryManagedUsedDataPoint(now, metric.Value)
case "Status.Flink.Memory.Managed.Total":
_ = s.mb.RecordFlinkMemoryManagedTotalDataPoint(now, metric.Value)
case "Status.JVM.Memory.Mapped.TotalCapacity":
_ = s.mb.RecordFlinkJvmMemoryMappedTotalCapacityDataPoint(now, metric.Value)
case "Status.JVM.Memory.Mapped.MemoryUsed":
_ = s.mb.RecordFlinkJvmMemoryMappedUsedDataPoint(now, metric.Value)
case "Status.JVM.CPU.Time":
_ = s.mb.RecordFlinkJvmCPUTimeDataPoint(now, metric.Value)
case "Status.JVM.Threads.Count":
_ = s.mb.RecordFlinkJvmThreadsCountDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Committed":
_ = s.mb.RecordFlinkJvmMemoryHeapCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Committed":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Max":
_ = s.mb.RecordFlinkJvmMemoryNonheapMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Committed":
_ = s.mb.RecordFlinkJvmMemoryNonheapCommittedDataPoint(now, metric.Value)
case "Status.JVM.Memory.NonHeap.Used":
_ = s.mb.RecordFlinkJvmMemoryNonheapUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Max":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.Direct.MemoryUsed":
_ = s.mb.RecordFlinkJvmMemoryDirectUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Direct.TotalCapacity":
_ = s.mb.RecordFlinkJvmMemoryDirectTotalCapacityDataPoint(now, metric.Value)
case "Status.JVM.ClassLoader.ClassesLoaded":
_ = s.mb.RecordFlinkJvmClassLoaderClassesLoadedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Metaspace.Used":
_ = s.mb.RecordFlinkJvmMemoryMetaspaceUsedDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Max":
_ = s.mb.RecordFlinkJvmMemoryHeapMaxDataPoint(now, metric.Value)
case "Status.JVM.Memory.Heap.Used":
_ = s.mb.RecordFlinkJvmMemoryHeapUsedDataPoint(now, metric.Value)
}
}
rb := s.mb.NewResourceBuilder()
rb.SetHostName(taskmanagerMetrics.Host)
rb.SetFlinkTaskmanagerID(taskmanagerMetrics.TaskmanagerID)
rb.SetFlinkResourceTypeTaskmanager()
s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
}
func (s *flinkmetricsScraper) processJobsMetrics(now pcommon.Timestamp, jobsMetricsInstances []*models.JobMetrics) {
for _, jobsMetrics := range jobsMetricsInstances {
for _, metric := range jobsMetrics.Metrics {
switch metric.ID {
case "numRestarts":
_ = s.mb.RecordFlinkJobRestartCountDataPoint(now, metric.Value)
case "lastCheckpointSize":
_ = s.mb.RecordFlinkJobLastCheckpointSizeDataPoint(now, metric.Value)
case "lastCheckpointDuration":
_ = s.mb.RecordFlinkJobLastCheckpointTimeDataPoint(now, metric.Value)
case "numberOfInProgressCheckpoints":
_ = s.mb.RecordFlinkJobCheckpointInProgressDataPoint(now, metric.Value)
case "numberOfCompletedCheckpoints":
_ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointCompleted)
case "numberOfFailedCheckpoints":
_ = s.mb.RecordFlinkJobCheckpointCountDataPoint(now, metric.Value, metadata.AttributeCheckpointFailed)
}
}
rb := s.mb.NewResourceBuilder()
rb.SetHostName(jobsMetrics.Host)
rb.SetFlinkJobName(jobsMetrics.JobName)
s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
}
func (s *flinkmetricsScraper) processSubtaskMetrics(now pcommon.Timestamp, subtaskMetricsInstances []*models.SubtaskMetrics) {
for _, subtaskMetrics := range subtaskMetricsInstances {
for _, metric := range subtaskMetrics.Metrics {
switch {
// record task metrics
case metric.ID == "numRecordsIn":
_ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordIn)
case metric.ID == "numRecordsOut":
_ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordOut)
case metric.ID == "numLateRecordsDropped":
_ = s.mb.RecordFlinkTaskRecordCountDataPoint(now, metric.Value, metadata.AttributeRecordDropped)
// record operator metrics
case strings.Contains(metric.ID, ".numRecordsIn"):
operatorName := strings.Split(metric.ID, ".numRecordsIn")
_ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordIn)
case strings.Contains(metric.ID, ".numRecordsOut"):
operatorName := strings.Split(metric.ID, ".numRecordsOut")
_ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordOut)
case strings.Contains(metric.ID, ".numLateRecordsDropped"):
operatorName := strings.Split(metric.ID, ".numLateRecordsDropped")
_ = s.mb.RecordFlinkOperatorRecordCountDataPoint(now, metric.Value, operatorName[0], metadata.AttributeRecordDropped)
case strings.Contains(metric.ID, ".currentOutputWatermark"):
operatorName := strings.Split(metric.ID, ".currentOutputWatermark")
_ = s.mb.RecordFlinkOperatorWatermarkOutputDataPoint(now, metric.Value, operatorName[0])
}
}
rb := s.mb.NewResourceBuilder()
rb.SetHostName(subtaskMetrics.Host)
rb.SetFlinkTaskmanagerID(subtaskMetrics.TaskmanagerID)
rb.SetFlinkJobName(subtaskMetrics.JobName)
rb.SetFlinkTaskName(subtaskMetrics.TaskName)
rb.SetFlinkSubtaskIndex(subtaskMetrics.SubtaskIndex)
s.mb.EmitForResource(metadata.WithResource(rb.Emit()))
}
}