Skip to content

Commit 6056d6a

Browse files
authored
Add Kafka broker attributes extraction (#1927)
1 parent 33372c7 commit 6056d6a

File tree

22 files changed

+1059
-239
lines changed

22 files changed

+1059
-239
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ require (
174174
github.com/prometheus/prometheus v0.300.1
175175
github.com/shirou/gopsutil v3.21.11+incompatible
176176
github.com/shirou/gopsutil/v3 v3.24.5
177+
github.com/shirou/gopsutil/v4 v4.25.3
177178
github.com/stretchr/testify v1.10.0
178179
github.com/xeipuuv/gojsonschema v1.2.0
179180
go.opentelemetry.io/collector/client v1.30.0
@@ -497,7 +498,6 @@ require (
497498
github.com/rs/cors v1.11.1 // indirect
498499
github.com/safchain/ethtool v0.0.0-20210803160452-9aa261dae9b1 // indirect
499500
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.32 // indirect
500-
github.com/shirou/gopsutil/v4 v4.25.3 // indirect
501501
github.com/shurcooL/httpfs v0.0.0-20230704072500-f1e31cf0ba5c // indirect
502502
github.com/sirupsen/logrus v1.9.3 // indirect
503503
github.com/sleepinggenius2/gosmi v0.4.4 // indirect

internal/detector/java/extract/name.go

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package extract
55

66
import (
77
"archive/zip"
8-
"bufio"
98
"context"
109
"fmt"
1110
"io"
@@ -14,6 +13,7 @@ import (
1413
"strings"
1514

1615
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
16+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/util"
1717
"github.com/aws/amazon-cloudwatch-agent/internal/util/collections"
1818
)
1919

@@ -23,6 +23,8 @@ const (
2323

2424
// metaManifestFile is the path to the Manifest in the Java archive.
2525
metaManifestFile = "META-INF/MANIFEST.MF"
26+
// metaManifestSeparator is the separator of the key/value pairs in the file.
27+
metaManifestSeparator = ':'
2628
// metaManifestApplicationName is a non-standard, but explicit field that should be used if present.
2729
metaManifestApplicationName = "Application-Name"
2830
// metaManifestImplementationTitle is a standard field that is conventionally used to name the application.
@@ -188,7 +190,7 @@ func (e *archiveManifestNameExtractor) Extract(ctx context.Context, process dete
188190
return "", detector.ErrIncompatibleExtractor
189191
}
190192
fallback := strings.TrimSuffix(filepath.Base(arg), filepath.Ext(arg))
191-
path, err := absPath(ctx, process, arg)
193+
path, err := util.AbsPath(ctx, process, arg)
192194
e.logger.Debug("Trying to extract name from Java Archive", "pid", process.PID(), "path", path)
193195
if err != nil {
194196
return fallback, nil
@@ -225,40 +227,22 @@ func (e *archiveManifestNameExtractor) readManifest(jarPath string) (string, err
225227
return "", err
226228
}
227229
defer rc.Close()
228-
return e.parseManifest(rc), nil
230+
return parseManifest(rc, e.fieldPriority, e.fieldLookup), nil
229231
}
230232

231-
func (e *archiveManifestNameExtractor) parseManifest(r io.Reader) string {
232-
manifest := make(map[string]string, len(e.fieldPriority))
233-
scanner := bufio.NewScanner(r)
234-
for scanner.Scan() {
235-
line := scanner.Text()
236-
if parts := strings.SplitN(line, ":", 2); len(parts) == 2 {
237-
field := strings.TrimSpace(parts[0])
238-
if e.fieldLookup.Contains(field) {
239-
manifest[field] = strings.TrimSpace(parts[1])
240-
// exit early if the highest priority field is found
241-
if field == e.fieldPriority[0] && manifest[field] != "" {
242-
return manifest[field]
243-
}
244-
}
233+
func parseManifest(r io.Reader, fieldPriority []string, fieldLookup collections.Set[string]) string {
234+
manifest := make(map[string]string, len(fieldPriority))
235+
_ = util.ReadProperties(r, metaManifestSeparator, func(key, value string) bool {
236+
if !fieldLookup.Contains(key) || value == "" {
237+
return true
245238
}
246-
}
247-
for _, field := range e.fieldPriority {
239+
manifest[key] = value
240+
return key != fieldPriority[0]
241+
})
242+
for _, field := range fieldPriority {
248243
if name := manifest[field]; name != "" {
249244
return name
250245
}
251246
}
252247
return ""
253248
}
254-
255-
func absPath(ctx context.Context, process detector.Process, path string) (string, error) {
256-
if filepath.IsAbs(path) {
257-
return path, nil
258-
}
259-
cwd, err := process.CwdWithContext(ctx)
260-
if err != nil {
261-
return "", err
262-
}
263-
return filepath.Join(cwd, path), nil
264-
}

internal/detector/java/java.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
1212
"github.com/aws/amazon-cloudwatch-agent/internal/detector/java/extract"
13-
"github.com/aws/amazon-cloudwatch-agent/internal/detector/kafka"
13+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/kafkabroker"
1414
"github.com/aws/amazon-cloudwatch-agent/internal/detector/tomcat"
1515
"github.com/aws/amazon-cloudwatch-agent/internal/detector/util"
1616
)
@@ -35,7 +35,7 @@ func NewDetector(logger *slog.Logger, nameFilter detector.NameFilter) detector.P
3535
logger: logger,
3636
subDetectors: []detector.ProcessDetector{
3737
tomcat.NewDetector(logger),
38-
kafka.NewDetector(logger),
38+
kafkabroker.NewDetector(logger),
3939
},
4040
nameExtractor: extract.NewNameExtractor(logger, nameFilter),
4141
portExtractor: extract.NewPortExtractor(),

internal/detector/kafka/README.md

Lines changed: 0 additions & 22 deletions
This file was deleted.

internal/detector/kafka/kafka.go

Lines changed: 0 additions & 86 deletions
This file was deleted.

internal/detector/kafka/kafka_test.go

Lines changed: 0 additions & 86 deletions
This file was deleted.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Kafka Broker Process Detector
2+
3+
Detects and classifies Kafka Broker processes running on the system.
4+
5+
> [!WARNING]
6+
> This is meant to be used as a sub-detector of the [Java Process Detector](../java) and will result in incomplete metadata
7+
> results if used on its own.
8+
9+
## Overview
10+
11+
The Kafka detector identifies Kafka broker instances by searching for the Kafka main class (`kafka.Kafka`) in command-line arguments.
12+
It extracts the broker configuration and identifying information from multiple sources.
13+
14+
### Extracted Attributes
15+
16+
The detector extracts the following attributes when available:
17+
- `broker.id` - Unique identifier for the Kafka broker (normalized from `node.id` in KRaft mode)
18+
- `cluster.id` - Unique identifier for the Kafka cluster
19+
20+
### Configuration Sources (in priority order)
21+
22+
1. Meta properties file in Kafka log directories
23+
2. Command line `--override` arguments
24+
3. Server properties file
25+
26+
### Sample Metadata Result
27+
```json
28+
{
29+
"categories": ["KAFKA/BROKER"],
30+
"name": "Kafka Broker",
31+
"attributes": {
32+
"broker.id": "0",
33+
"cluster.id": "WQSzAfd_RvO0TocjqhQoaA"
34+
}
35+
}
36+
```

0 commit comments

Comments
 (0)