From 8138c2878fdb060f8fead212d93e5b091ce4763a Mon Sep 17 00:00:00 2001 From: joey Date: Tue, 6 Aug 2024 14:49:03 +0800 Subject: [PATCH] feat: collector `dropper` processor support raw data --- cmd/monitor/collector/bootstrap-agent.yaml | 9 +-------- cmd/monitor/collector/bootstrap.yaml | 4 ++++ .../monitor/oap/collector/core/model/odata/raw.go | 14 ++++++++++++++ .../plugins/processors/dropper/provider.go | 11 ++++++++++- .../plugins/receivers/collector/provider.go | 15 +++++++++++++-- 5 files changed, 42 insertions(+), 11 deletions(-) diff --git a/cmd/monitor/collector/bootstrap-agent.yaml b/cmd/monitor/collector/bootstrap-agent.yaml index 4bf357f43528..9861835e175e 100644 --- a/cmd/monitor/collector/bootstrap-agent.yaml +++ b/cmd/monitor/collector/bootstrap-agent.yaml @@ -54,19 +54,12 @@ erda.oap.collector.core: erda.oap.collector.receiver.prometheus-remote-write@default: -grpc-client@erda.core.token: - addr: "${ERDA_SERVER_GRPC_ADDR:erda-server:8096}" -erda.core.token-client: - -erda.oap.collector.authentication: - sync_interval: ${COLLECTOR_AUTHENTICATION_AK_SYNC_INTERVAL:2m} - erda.oap.collector.receiver.collector: auth: username: "${COLLECTOR_AUTH_USERNAME:collector}" password: "${COLLECTOR_AUTH_PASSWORD:G$9767bP32drYFPWrK4XMLRMTatiM6cU}" force: ${COLLECTOR_AUTH_FORCE:false} - skip: ${COLLECTOR_AUTH_SKIP:false} + skip: ${COLLECTOR_AUTH_SKIP:true} erda.oap.collector.receiver.prometheus-remote-write@external_metrics: remote_write_url: "${EXTERNAL_METRIC_REMOTE_WRITE_URL:/api/v1/external-prometheus-remote-write}" diff --git a/cmd/monitor/collector/bootstrap.yaml b/cmd/monitor/collector/bootstrap.yaml index 3568da5a6534..c488a01a870b 100644 --- a/cmd/monitor/collector/bootstrap.yaml +++ b/cmd/monitor/collector/bootstrap.yaml @@ -18,6 +18,8 @@ erda.oap.collector.core: pipelines: raws: - receivers: [ "erda.oap.collector.receiver.collector" ] + processors: + - "erda.oap.collector.processor.dropper@application-agent" exporters: [ "erda.oap.collector.exporter.kafka@collector" ] - receivers: @@ -486,6 +488,8 @@ erda.oap.collector.processor.aggregator@mem-percent: target_key: "mem_usage_percent" # ************* processors ************* +erda.oap.collector.processor.dropper@application-agent: + metric_prefix: ${APPLICATION_AGENT_DROP_PREFIX} # ************* processors ************* diff --git a/internal/tools/monitor/oap/collector/core/model/odata/raw.go b/internal/tools/monitor/oap/collector/core/model/odata/raw.go index 94367b75d6d6..5075192e11dd 100644 --- a/internal/tools/monitor/oap/collector/core/model/odata/raw.go +++ b/internal/tools/monitor/oap/collector/core/model/odata/raw.go @@ -14,6 +14,8 @@ package odata +import "encoding/json" + var _ ObservableData = &Raw{} // bytes representation of ObservableData for performance @@ -33,3 +35,15 @@ func NewRaw(data []byte) *Raw { func (r *Raw) GetTags() map[string]string { return map[string]string{} } + +type nameField struct { + Name string `json:"name"` +} + +func (r *Raw) GetName() string { + var name nameField + if err := json.Unmarshal(r.Data, &name); err != nil { + return "" + } + return name.Name +} diff --git a/internal/tools/monitor/oap/collector/plugins/processors/dropper/provider.go b/internal/tools/monitor/oap/collector/plugins/processors/dropper/provider.go index cf2ba8bc6030..f058387210e0 100644 --- a/internal/tools/monitor/oap/collector/plugins/processors/dropper/provider.go +++ b/internal/tools/monitor/oap/collector/plugins/processors/dropper/provider.go @@ -60,7 +60,16 @@ func (p *provider) ProcessLog(item *log.Log) (*log.Log, error) { return item, ni func (p *provider) ProcessSpan(item *trace.Span) (*trace.Span, error) { return item, nil } -func (p *provider) ProcessRaw(item *odata.Raw) (*odata.Raw, error) { return item, nil } +func (p *provider) ProcessRaw(item *odata.Raw) (*odata.Raw, error) { + if len(p.Cfg.MetricPrefix) == 0 { + return item, nil + } + name := item.GetName() + if len(name) > 0 && strings.HasPrefix(name, p.Cfg.MetricPrefix) { + return nil, nil + } + return item, nil +} func (p *provider) ProcessProfile(*profile.ProfileIngest) (*profile.Output, error) { return &profile.Output{}, nil diff --git a/internal/tools/monitor/oap/collector/plugins/receivers/collector/provider.go b/internal/tools/monitor/oap/collector/plugins/receivers/collector/provider.go index a44c46e120d1..ea6bde1181f1 100644 --- a/internal/tools/monitor/oap/collector/plugins/receivers/collector/provider.go +++ b/internal/tools/monitor/oap/collector/plugins/receivers/collector/provider.go @@ -41,13 +41,19 @@ var _ model.Receiver = (*provider)(nil) type provider struct { Cfg *config Log logs.Logger - Router httpserver.Router `autowired:"http-router"` - Validator authentication.Validator `autowired:"erda.oap.collector.authentication.Validator"` + Router httpserver.Router `autowired:"http-router"` + Validator authentication.Validator auth *Authenticator consumer model.ObservableDataConsumerFunc } +type skipValidator struct{} + +func (skipValidator) Validate(scope string, scopeId string, token string) bool { + return true +} + func (p *provider) ComponentClose() error { return nil } @@ -62,6 +68,11 @@ func (p *provider) RegisterConsumer(consumer model.ObservableDataConsumerFunc) { // Run this is optional func (p *provider) Init(ctx servicehub.Context) error { + if p.Cfg.Auth.Skip { + p.Validator = skipValidator{} + } else { + p.Validator = ctx.Service("erda.oap.collector.authentication.Validator").(authentication.Validator) + } p.auth = NewAuthenticator( WithLogger(p.Log), WithValidator(p.Validator),