diff --git a/processor/resourcedetectionprocessor/config.go b/processor/resourcedetectionprocessor/config.go index 8dd1c1f10c8b..9e0b8e9f4e34 100644 --- a/processor/resourcedetectionprocessor/config.go +++ b/processor/resourcedetectionprocessor/config.go @@ -4,8 +4,12 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor" import ( + "time" + "go.opentelemetry.io/collector/config/confighttp" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ecs" @@ -41,6 +45,9 @@ type Config struct { // If a supplied attribute is not a valid attribute of a supplied detector it will be ignored. // Deprecated: Please use detector's resource_attributes config instead Attributes []string `mapstructure:"attributes"` + + // interval of detect action + DetectInterval time.Duration `mapstructure:"detect_interval"` } // DetectorConfig contains user-specified configurations unique to all individual detectors @@ -86,6 +93,9 @@ type DetectorConfig struct { // K8SNode contains user-specified configurations for the K8SNode detector K8SNodeConfig k8snode.Config `mapstructure:"k8snode"` + + // Http contains user-specified configurations for the Http detector + HttpConfig http.Config `mapstructure:"http"` } func detectorCreateDefaultConfig() DetectorConfig { @@ -104,6 +114,7 @@ func detectorCreateDefaultConfig() DetectorConfig { SystemConfig: system.CreateDefaultConfig(), OpenShiftConfig: openshift.CreateDefaultConfig(), K8SNodeConfig: k8snode.CreateDefaultConfig(), + HttpConfig: http.CreateDefaultConfig(), } } @@ -137,6 +148,8 @@ func (d *DetectorConfig) GetConfigFromType(detectorType internal.DetectorType) i return d.OpenShiftConfig case k8snode.TypeStr: return d.K8SNodeConfig + case http.TypeStr: + return d.HttpConfig default: return nil } diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index f08d4e1b560f..7b268be0f4dc 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" @@ -63,6 +65,7 @@ func NewFactory() processor.Factory { system.TypeStr: system.NewDetector, openshift.TypeStr: openshift.NewDetector, k8snode.TypeStr: k8snode.NewDetector, + http.TypeStr: http.NewDetector, }) f := &factory{ @@ -90,6 +93,7 @@ func createDefaultConfig() component.Config { Override: true, Attributes: nil, DetectorConfig: detectorCreateDefaultConfig(), + DetectInterval: time.Minute * 10, // TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved, // Set the default value of 'hostname_source' here instead of 'system' detector } @@ -182,6 +186,7 @@ func (f *factory) getResourceDetectionProcessor( override: oCfg.Override, httpClientSettings: oCfg.ClientConfig, telemetrySettings: params.TelemetrySettings, + detectInterval: oCfg.DetectInterval, }, nil } diff --git a/processor/resourcedetectionprocessor/internal/http/config.go b/processor/resourcedetectionprocessor/internal/http/config.go new file mode 100644 index 000000000000..1e11a58cb794 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/http/config.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http" +import ( + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configopaque" +) + +type Config struct { + // APIKey is the authentication token + APIKey configopaque.String `mapstructure:"api_key"` + + // API URL to use + APIURL string `mapstructure:"api_url"` + + confighttp.ClientConfig `mapstructure:",squash"` +} + +func CreateDefaultConfig() Config { + return Config{ + APIURL: "http://localhost:8080", + } +} diff --git a/processor/resourcedetectionprocessor/internal/http/http.go b/processor/resourcedetectionprocessor/internal/http/http.go new file mode 100644 index 000000000000..dd7191c8a9fd --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/http/http.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package http // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/http" +import ( + "context" + "fmt" + "io" + "net/http" + "time" + + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configopaque" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/processor" + conventions "go.opentelemetry.io/collector/semconv/v1.22.0" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" +) + +const ( + // TypeStr is type of detector. + TypeStr = "http" +) + +type resourceAttribute struct { + Key string + Value string +} + +var _ internal.Detector = (*detector)(nil) + +type detector struct { + logger *zap.Logger + // + set component.TelemetrySettings + interval time.Duration + client *http.Client + apiURL string + apiKey configopaque.String + requestIntervalTicker *time.Ticker +} + +// NewDetector returns a detector which can detect resource attributes on Heroku +func NewDetector(set processor.Settings, dcfg internal.DetectorConfig) (internal.Detector, error) { + cfg := dcfg.(Config) + + if cfg.APIURL == "" { + return nil, fmt.Errorf("apiUrl could not be empty") + } + + return &detector{ + apiKey: cfg.APIKey, + apiURL: cfg.APIURL, + logger: set.Logger, + // TODO: interval request + interval: time.Second * 5, + }, nil +} + +// Detect detects http response metadata and returns a resource with the available ones +func (d detector) Detect(ctx context.Context) (resource pcommon.Resource, schemaURL string, err error) { + res := pcommon.NewResource() + + detectedResources := d.requestResourceAttributes() + + for _, resAttr := range detectedResources { + res.Attributes().PutStr(resAttr.Key, resAttr.Value) + } + + return res, conventions.SchemaURL, nil +} + +func (d detector) requestResourceAttributes() []resourceAttribute { + var resources []resourceAttribute + resp, err := http.Get(d.apiURL) + if err != nil { + d.logger.Warn("Failed to fetch resource", zap.Error(err)) + return resources + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + d.logger.Warn("Failed to fetch resource", zap.Error(err)) + return resources + } + + err = jsoniter.Unmarshal(body, &resources) + if err != nil { + d.logger.Warn("Failed to fetch resource", zap.Error(err)) + return resources + } + + return resources +} diff --git a/processor/resourcedetectionprocessor/internal/http/http_test.go b/processor/resourcedetectionprocessor/internal/http/http_test.go new file mode 100644 index 000000000000..0b9040329d77 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/http/http_test.go @@ -0,0 +1,40 @@ +package http + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/processor/processortest" + conventions "go.opentelemetry.io/collector/semconv/v1.22.0" +) + +type mockMetadata struct { + mock.Mock +} + +func TestDetect(t *testing.T) { + handler := http.NotFound + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + handler(w, r) + })) + defer ts.Close() + handler = func(w http.ResponseWriter, r *http.Request) { + outPut := `[{"key":"attributes_1","value":"foo"},{"key":"attributes_2","value":"bar"}]` + _, _ = w.Write([]byte(outPut)) + } + defaultCfg := CreateDefaultConfig() + defaultCfg.APIURL = ts.URL + + d, err := NewDetector(processortest.NewNopSettings(), defaultCfg) + require.NoError(t, err) + res, schemaURL, err := d.Detect(context.Background()) + require.NoError(t, err) + require.Equal(t, 2, res.Attributes().Len()) + require.NotNil(t, res) + assert.Equal(t, conventions.SchemaURL, schemaURL) +} diff --git a/processor/resourcedetectionprocessor/internal/http/metadata.yaml b/processor/resourcedetectionprocessor/internal/http/metadata.yaml new file mode 100644 index 000000000000..a665714874a8 --- /dev/null +++ b/processor/resourcedetectionprocessor/internal/http/metadata.yaml @@ -0,0 +1,8 @@ +type: resourcedetectionprocessor/http + +parent: resourcedetection + +status: + class: pkg + codeowners: + active: [JaredTan95] \ No newline at end of file diff --git a/processor/resourcedetectionprocessor/internal/resourcedetection.go b/processor/resourcedetectionprocessor/internal/resourcedetection.go index 21b5c7cfb392..751b1587309f 100644 --- a/processor/resourcedetectionprocessor/internal/resourcedetection.go +++ b/processor/resourcedetectionprocessor/internal/resourcedetection.go @@ -106,12 +106,9 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesTo } func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) { - p.once.Do(func() { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, client.Timeout) - defer cancel() - p.detectResource(ctx) - }) + ctx, cancel := context.WithTimeout(ctx, client.Timeout) + defer cancel() + p.detectResource(ctx) return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err } diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 44f3331e6473..a6c6f2ca34e0 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -5,6 +5,10 @@ package resourcedetectionprocessor // import "github.com/open-telemetry/opentele import ( "context" + "net/http" + "time" + + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" @@ -23,15 +27,28 @@ type resourceDetectionProcessor struct { override bool httpClientSettings confighttp.ClientConfig telemetrySettings component.TelemetrySettings + detectInterval time.Duration } // Start is invoked during service startup. func (rdp *resourceDetectionProcessor) Start(ctx context.Context, host component.Host) error { client, _ := rdp.httpClientSettings.ToClient(ctx, host, rdp.telemetrySettings) ctx = internal.ContextWithClient(ctx, client) + go rdp.tickerDetect(ctx, client) + return nil +} + +func (rdp *resourceDetectionProcessor) tickerDetect(ctx context.Context, client *http.Client) { + intervalTicker := time.NewTicker(rdp.detectInterval) + defer intervalTicker.Stop() + var err error - rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) - return err + for range intervalTicker.C { + rdp.resource, rdp.schemaURL, err = rdp.provider.Get(ctx, client) + if err != nil { + rdp.telemetrySettings.Logger.Error("failed to retrieve resource from provider: %v", zap.Error(err)) + } + } } // processTraces implements the ProcessTracesFunc type. diff --git a/processor/resourcedetectionprocessor/testdata/otel-col-config.yaml b/processor/resourcedetectionprocessor/testdata/otel-col-config.yaml new file mode 100644 index 000000000000..5aad8a993c64 --- /dev/null +++ b/processor/resourcedetectionprocessor/testdata/otel-col-config.yaml @@ -0,0 +1,62 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 + otlp/jaeger: + endpoint: localhost:14317 + tls: + insecure: true + retry_on_failure: + enabled: true + max_elapsed_time: 500s + sending_queue: + enabled: true + +processors: + batch: + resourcedetection/sgm: + detectors: [env,http] + timeout: 2s + override: false + detect_interval: 10s + http: + api_url: https://apifoxmock.com/m1/3552517-2307922-default/mock/auto-tagging + resourcedetection/system: + detectors: [env, system] + timeout: 2s + override: false + system: + resource_attributes: + host.name: + enabled: true + host.id: + enabled: true + os.type: + enabled: true + attributes: ["a", "b"] + +extensions: + health_check: + pprof: + endpoint: :1888 + zpages: + endpoint: :55679 + +service: + extensions: [pprof, zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch,resourcedetection/sgm] + exporters: [debug,otlp/jaeger] + metrics: + receivers: [otlp] + processors: [batch,resourcedetection/sgm] + exporters: [debug]