Skip to content

Commit 9e92874

Browse files
authored
Add Kafka client detector (#1932)
1 parent d70a8bd commit 9e92874

File tree

18 files changed

+334
-24
lines changed

18 files changed

+334
-24
lines changed

cmd/workload-discovery/discovery.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"encoding/json"
99
"errors"
10+
"flag"
1011
"log/slog"
1112
"math"
1213
"os"
@@ -218,9 +219,13 @@ func convertToPid32(pid int) (int32, error) {
218219
}
219220

220221
func main() {
222+
var debug bool
223+
flag.BoolVar(&debug, "debug", false, "enable debug logging")
224+
flag.Parse()
225+
221226
cfg := Config{
222227
Concurrency: runtime.NumCPU(),
223-
LogLevel: slog.LevelDebug,
228+
LogLevel: slog.LevelInfo,
224229
Timeout: 500 * time.Millisecond,
225230
FilterConfig: filter.Config{
226231
Process: filter.ProcessConfig{
@@ -229,6 +234,9 @@ func main() {
229234
},
230235
},
231236
}
237+
if debug {
238+
cfg.LogLevel = slog.LevelDebug
239+
}
232240

233241
logger := buildLogger(cfg.LogLevel)
234242

extension/agenthealth/metadata/metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
ValueJVM = "jvm"
1818
ValueTomcat = "tomcat"
1919
ValueKafkaBroker = "kafka_broker"
20+
ValueKafkaConsumer = "kafka_consumer"
21+
ValueKafkaProducer = "kafka_producer"
2022
ValueNVIDIA = "nvidia_gpu"
2123

2224
shortKeyObservabilitySolutions = "obs"
@@ -29,6 +31,8 @@ var (
2931
Build(KeyObservabilitySolutions, ValueJVM),
3032
Build(KeyObservabilitySolutions, ValueTomcat),
3133
Build(KeyObservabilitySolutions, ValueKafkaBroker),
34+
Build(KeyObservabilitySolutions, ValueKafkaConsumer),
35+
Build(KeyObservabilitySolutions, ValueKafkaProducer),
3236
Build(KeyObservabilitySolutions, ValueNVIDIA),
3337
)
3438
shortKeyMapping = map[string]string{
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package common
5+
6+
const (
7+
ExtJAR = ".jar"
8+
ExtWAR = ".war"
9+
)

internal/detector/detector.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ package detector
66
import (
77
"context"
88
"errors"
9+
10+
"github.com/shirou/gopsutil/v4/process"
911
)
1012

1113
var (
@@ -27,6 +29,8 @@ type DeviceDetector interface {
2729
Detect() (*Metadata, error)
2830
}
2931

32+
type OpenFilesStat = process.OpenFilesStat
33+
3034
// Process defines an interface for interacting with system processes.
3135
type Process interface {
3236
// PID returns the process ID.
@@ -42,4 +46,6 @@ type Process interface {
4246
EnvironWithContext(ctx context.Context) ([]string, error)
4347
// CreateTimeWithContext returns the creation time of the process in milliseconds.
4448
CreateTimeWithContext(ctx context.Context) (int64, error)
49+
// OpenFilesWithContext returns the open file descriptors of the process.
50+
OpenFilesWithContext(context.Context) ([]OpenFilesStat, error)
4551
}

internal/detector/detectortest/mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ func (m *MockProcess) CreateTimeWithContext(ctx context.Context) (int64, error)
5252
return int64(args.Int(0)), args.Error(1)
5353
}
5454

55+
func (m *MockProcess) OpenFilesWithContext(ctx context.Context) ([]detector.OpenFilesStat, error) {
56+
args := m.Called(ctx)
57+
if args.Get(0) == nil {
58+
return nil, args.Error(1)
59+
}
60+
return args.Get(0).([]detector.OpenFilesStat), args.Error(1)
61+
}
62+
5563
type MockProcessDetector struct {
5664
mock.Mock
5765
}

internal/detector/java/extract/name.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@ import (
1313
"strings"
1414

1515
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
16+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/common"
1617
"github.com/aws/amazon-cloudwatch-agent/internal/detector/util"
1718
"github.com/aws/amazon-cloudwatch-agent/internal/util/collections"
1819
)
1920

2021
const (
21-
extJAR = ".jar"
22-
extWAR = ".war"
23-
2422
// metaManifestFile is the path to the Manifest in the Java archive.
2523
metaManifestFile = "META-INF/MANIFEST.MF"
2624
// metaManifestSeparator is the separator of the key/value pairs in the file.
@@ -186,7 +184,7 @@ func newArchiveManifestNameExtractor(logger *slog.Logger) argNameExtractor {
186184
// Extract opens the archive and reads the manifest file. Tries to extract the name from values of specific keys in the
187185
// manifest. Prioritizes keys in the order defined in the extractor.
188186
func (e *archiveManifestNameExtractor) Extract(ctx context.Context, process detector.Process, arg string) (string, error) {
189-
if !strings.HasSuffix(arg, extJAR) && !strings.HasSuffix(arg, extWAR) {
187+
if !strings.HasSuffix(arg, common.ExtJAR) && !strings.HasSuffix(arg, common.ExtWAR) {
190188
return "", detector.ErrIncompatibleExtractor
191189
}
192190
fallback := strings.TrimSuffix(filepath.Base(arg), filepath.Ext(arg))
@@ -232,11 +230,12 @@ func (e *archiveManifestNameExtractor) readManifest(jarPath string) (string, err
232230

233231
func parseManifest(r io.Reader, fieldPriority []string, fieldLookup collections.Set[string]) string {
234232
manifest := make(map[string]string, len(fieldPriority))
235-
_ = util.ReadProperties(r, metaManifestSeparator, func(key, value string) bool {
233+
_ = util.ScanProperties(r, metaManifestSeparator, func(key, value string) bool {
236234
if !fieldLookup.Contains(key) || value == "" {
237235
return true
238236
}
239237
manifest[key] = value
238+
// stop scanning if the highest priority field is found.
240239
return key != fieldPriority[0]
241240
})
242241
for _, field := range fieldPriority {

internal/detector/java/java.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/aws/amazon-cloudwatch-agent/internal/detector"
1212
"github.com/aws/amazon-cloudwatch-agent/internal/detector/java/extract"
1313
"github.com/aws/amazon-cloudwatch-agent/internal/detector/kafkabroker"
14+
"github.com/aws/amazon-cloudwatch-agent/internal/detector/kafkaclient"
1415
"github.com/aws/amazon-cloudwatch-agent/internal/detector/tomcat"
1516
"github.com/aws/amazon-cloudwatch-agent/internal/detector/util"
1617
)
@@ -36,6 +37,7 @@ func NewDetector(logger *slog.Logger, nameFilter detector.NameFilter) detector.P
3637
subDetectors: []detector.ProcessDetector{
3738
tomcat.NewDetector(logger),
3839
kafkabroker.NewDetector(logger),
40+
kafkaclient.NewDetector(logger),
3941
},
4042
nameExtractor: extract.NewNameExtractor(logger, nameFilter),
4143
portExtractor: extract.NewPortExtractor(),

internal/detector/kafkabroker/extract/attributes.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package extract
66
import (
77
"context"
88
"errors"
9+
"fmt"
910
"log/slog"
1011
"os"
1112
"path/filepath"
@@ -127,6 +128,7 @@ func (e *attributesExtractor) extractAttributesFromMetaProperties(ctx context.Co
127128
case propertyClusterID:
128129
attributes[key] = value
129130
}
131+
// stop scanning the properties once both attributes have been set
130132
return len(attributes) < 2
131133
})
132134
if err != nil {
@@ -170,7 +172,7 @@ func (e *attributesExtractor) parseArgs(args []string) (*brokerInfo, error) {
170172
}
171173

172174
if info == nil {
173-
return nil, detector.ErrIncompatibleExtractor
175+
return nil, fmt.Errorf("%w: Class (%s) not found in command-line", detector.ErrIncompatibleExtractor, brokerClassName)
174176
}
175177
return info, nil
176178
}
@@ -215,7 +217,7 @@ func (e *attributesExtractor) parsePropertiesFile(path string, fn func(key, valu
215217
}
216218
defer file.Close()
217219

218-
err = util.ReadProperties(file, propertiesSeparator, fn)
220+
err = util.ScanProperties(file, propertiesSeparator, fn)
219221
if err != nil && !errors.Is(err, util.ErrLineLimitExceeded) {
220222
return err
221223
}

internal/detector/kafkabroker/kafkabroker.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,10 @@ func NewDetector(logger *slog.Logger) detector.ProcessDetector {
3535
func (d *kafkaBrokerDetector) Detect(ctx context.Context, process detector.Process) (*detector.Metadata, error) {
3636
attributes, err := d.attributesExtractor.Extract(ctx, process)
3737
if err != nil {
38-
d.logger.Debug("Kafka Broker not detected for process", "pid", process.PID(), "error", err)
38+
d.logger.Debug("Kafka broker not detected for process", "pid", process.PID(), "error", err)
3939
return nil, detector.ErrIncompatibleDetector
4040
}
4141

42-
d.logger.Debug("Detected Kafka Broker", "pid", process.PID())
43-
4442
md := &detector.Metadata{
4543
Categories: []detector.Category{detector.CategoryKafkaBroker},
4644
Name: brokerMetadataName,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Kafka Client Process Detector
2+
3+
Detects and classifies Kafka Client processes running on the system.
4+
5+
> [!NOTE]
6+
> This detector only supports detection of processes that use the Kafka clients library.
7+
8+
> [!WARNING]
9+
> This is meant to be used as a sub-detector of the [Java Process Detector](../java) and will result in incomplete metadata
10+
> results if used on its own.
11+
12+
## Overview
13+
14+
The Kafka client detector identifies Java applications that use the Kafka client library (`kafka-clients`) by searching in:
15+
1. Command line classpath (`-cp`/`-classpath`) arguments
16+
2. Open file descriptors for `kafka-clients.jar`
17+
18+
### Sample Metadata Result
19+
```json
20+
{
21+
"categories": ["KAFKA/CLIENT"]
22+
}
23+
```

0 commit comments

Comments
 (0)