diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 51b8a47d1a33..b281c3c8906e 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -233,6 +233,7 @@ connectors: - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector v0.111.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector v0.111.0 - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/slowsqlconnector v0.111.0 + - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector v0.111.0 providers: - gomod: go.opentelemetry.io/collector/confmap/provider/envprovider v1.17.0 @@ -502,4 +503,5 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil => ../../internal/grpcutil - github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudmonitoringreceiver => ../../receiver/googlecloudmonitoringreceiver - github.com/open-telemetry/opentelemetry-collector-contrib/connector/slowsqlconnector => ../../connector/slowsqlconnector + - github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector => ../../connector/tracedurationconnector diff --git a/connector/slowsqlconnector/go.mod b/connector/slowsqlconnector/go.mod index 45e41896652c..f9753db01790 100644 --- a/connector/slowsqlconnector/go.mod +++ b/connector/slowsqlconnector/go.mod @@ -32,6 +32,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect diff --git a/connector/slowsqlconnector/go.sum b/connector/slowsqlconnector/go.sum index 350e548e2cee..fb6c31e91089 100644 --- a/connector/slowsqlconnector/go.sum +++ b/connector/slowsqlconnector/go.sum @@ -25,6 +25,7 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= diff --git a/connector/tracedurationconnector/Makefile b/connector/tracedurationconnector/Makefile new file mode 100644 index 000000000000..c1496226e590 --- /dev/null +++ b/connector/tracedurationconnector/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common \ No newline at end of file diff --git a/connector/tracedurationconnector/README.md b/connector/tracedurationconnector/README.md new file mode 100644 index 000000000000..7d52eb2af0b9 --- /dev/null +++ b/connector/tracedurationconnector/README.md @@ -0,0 +1,88 @@ +# Group by Trace connector + +| Status | | +| ------------- |-----------| +| Distributions | [contrib] | +| Warnings | [Statefulness](#warnings) | +| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aconnector%2Ftraceduration%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aconnector%2Ftraceduration) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aconnector%2Ftraceduration%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aconnector%2Ftraceduration) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@JaredTan95](https://www.github.com/JaredTan95) | + +[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha +[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib + +## Supported Pipeline Types + +| [Exporter Pipeline Type] | [Receiver Pipeline Type] | [Stability Level] | +| ------------------------ | ------------------------ | ----------------- | +| traces | metrics | [alpha] | +| traces | logs | [alpha] | + +[Exporter Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#exporter-pipeline-type +[Receiver Pipeline Type]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md#receiver-pipeline-type +[Stability Level]: https://github.com/open-telemetry/opentelemetry-collector#stability-levels + + +This processor collects all the spans from the same trace, waiting a +pre-determined amount of time before releasing the trace to the next processor. +The expectation is that, generally, traces will be complete after the given time. + +This processor should be used whenever a processor requires grouped traces to make decisions, +such as a tail-based sampler or a per-trace metrics processor. Note that [`tailsamplingprocessor`](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/tailsamplingprocessor) +also implements a similar mechanism and can be used independently. + +The batch processor shouldn't be used before this processor, as this one will +probably undo part (or much) of the work that the batch processor performs. It's +fine to have the batch processor to run right after this one, and every entry in the +batch will be a complete trace. + +Please refer to [config.go](./config.go) for the config spec. + +Examples: + +```yaml +processors: + groupbytrace: + groupbytrace/2: + wait_duration: 10s + num_traces: 1000 + num_workers: 2 +``` + +## Configuration + +Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor. + +The `num_traces` (default=1,000,000) property tells the processor what's the maximum number of traces to keep in the internal storage. A higher `num_traces` might incur in a higher memory usage. + +The `wait_duration` (default=1s) property tells the processor for how long it should keep traces in the internal storage. Once a trace is kept for this duration, it's then released to the next consumer and removed from the internal storage. Spans from a trace that has been released will be kept for the entire duration again. + +The `num_workers` (default=1) property controls how many concurrent workers the processor will use to process traces. If you are looking to optimize this value +then using GOMAXPROCS could be considered as a starting point. + +## Metrics + +The following metrics are recorded by this processor: + +* `otelcol_processor_groupbytrace_conf_num_traces` represents the maximum number of traces that can be kept by the internal storage. This value comes from the processor's configuration and will never change over the lifecycle of the processor. +* `otelcol_processor_groupbytrace_event_latency_bucket`, with the following `event` tag values: + * `onTraceReceived` represents the number of traces' parts the processor has received from the previous components + * `onTraceExpired` represents the number of traces that finished waiting in memory for spans to arrive + * `onTraceReleased` represents the number of traces that have been marked as released to the next component + * `onTraceRemoved` represents the number of traces that have been marked for removal from the internal storage +* `otelcol_processor_groupbytrace_num_events_in_queue` representing the state of the internal queue. Ideally, this number would be close to zero, but might have temporary spikes if the storage is slow. +* `otelcol_processor_groupbytrace_num_traces_in_memory` representing the state of the internal trace storage, waiting for spans to arrive. It's common to have items in memory all the time if the processor has a continuous flow of data. The longer the `wait_duration`, the higher the amount of traces in memory should be, given enough traffic. +* `otelcol_processor_groupbytrace_spans_released` and `otelcol_processor_groupbytrace_traces_released` represent the number of spans and traces effectively released to the next component. +* `otelcol_processor_groupbytrace_traces_evicted` represents the number of traces that have been evicted from the internal storage due to capacity problems. Ideally, this should be zero, or very close to zero at all times. If you keep getting items evicted, increase the `num_traces`. +* `otelcol_processor_groupbytrace_incomplete_releases` represents the traces that have been marked as expired, but had been previously been removed. This might be the case when a span from a trace has been received in a batch while the trace existed in the in-memory storage, but has since been released/removed before the span could be added to the trace. This should always be very close to 0, and a high value might indicate a software bug. + +A healthy system would have the same value for the metric `otelcol_processor_groupbytrace_spans_released` and for three events under `otelcol_processor_groupbytrace_event_latency_bucket`: `onTraceExpired`, `onTraceRemoved` and `onTraceReleased`. + +The metric `otelcol_processor_groupbytrace_event_latency_bucket` is a bucket and shows how long each event took to be processed in miliseconds. In most cases, it should take less than 5ms for an event to be processed, but it might be the case where an event could take 10ms. Higher latencies are possible, but it should never really reach the last item, representing 1s. Events taking more than 1s are killed automatically, and if you have multiple items in this bucket, it might indicate a bug in the software. + +Most metrics are updated when the events occur, except for the following ones, which are updated periodically: +* `otelcol_processor_groupbytrace_num_events_in_queue` +* `otelcol_processor_groupbytrace_num_traces_in_memory` + +## Warnings + +- [Statefulness](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#statefulness): The groupbytrace processor's works best when all spans for a trace are sent to the same collector instance. diff --git a/connector/tracedurationconnector/config.go b/connector/tracedurationconnector/config.go new file mode 100644 index 000000000000..f0146c2e2ed6 --- /dev/null +++ b/connector/tracedurationconnector/config.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import ( + "time" +) + +// Config is the configuration for the processor. +type Config struct { + + // NumTraces is the max number of traces to keep in memory waiting for the duration. + // Default: 1_000_000. + NumTraces int `mapstructure:"num_traces"` + + // NumWorkers is a number of workers processing event queue. + // Default: 1. + NumWorkers int `mapstructure:"num_workers"` + + // WaitDuration tells the processor to wait for the specified duration for the trace to be complete. + // Default: 1s. + WaitDuration time.Duration `mapstructure:"wait_duration"` + + // DiscardOrphans instructs the processor to discard traces without the root span. + // This typically indicates that the trace is incomplete. + // Default: false. + // Not yet implemented, and an error will be returned when this option is used. + DiscardOrphans bool `mapstructure:"discard_orphans"` + + // StoreOnDisk tells the processor to keep only the trace ID in memory, serializing the trace spans to disk. + // Useful when the duration to wait for traces to complete is high. + // Default: false. + // Not yet implemented, and an error will be returned when this option is used. + StoreOnDisk bool `mapstructure:"store_on_disk"` + + Dimensions []Dimension `mapstructure:"dimensions"` +} + +// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute. +type Dimension struct { + Name string `mapstructure:"name"` + Default *string `mapstructure:"default"` +} diff --git a/connector/tracedurationconnector/connector_logs.go b/connector/tracedurationconnector/connector_logs.go new file mode 100644 index 000000000000..ebf446f73b91 --- /dev/null +++ b/connector/tracedurationconnector/connector_logs.go @@ -0,0 +1,358 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/collector/connector" + conventions "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/multierr" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil" +) + +const ( + serviceNameKey = conventions.AttributeServiceName + // TODO(marctc): formalize these constants in the OpenTelemetry specification. + spanNameKey = "span.name" // OpenTelemetry non-standard constan + durationNameKey = "duration" +) + +type logsConnector struct { + config Config + logger *zap.Logger + + // Additional dimensions to add to logs. + dimensions []pdatautil.Dimension + logsConsumer consumer.Logs + component.StartFunc + + component.ShutdownFunc + + // TODO: + telemetryBuilder *metadata.TelemetryBuilder + // the event machine handling all operations for this processor + eventMachine *eventMachine + + // the trace storage + st storage +} + +func newLogsConnector(set connector.Settings, config component.Config) *logsConnector { + cfg := config.(*Config) + + telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + if err != nil { + return nil + } + + // the event machine will buffer up to N concurrent events before blocking + eventMachine := newEventMachine(set.Logger, 10000, cfg.NumWorkers, cfg.NumTraces, telemetryBuilder) + + lc := &logsConnector{ + logger: set.Logger, + config: *cfg, + telemetryBuilder: telemetryBuilder, + eventMachine: eventMachine, + } + + // register the callbacks + eventMachine.onTraceReceived = lc.onTraceReceived + eventMachine.onTraceExpired = lc.onTraceExpired + eventMachine.onTraceReleased = lc.onTraceReleased + eventMachine.onTraceRemoved = lc.onTraceRemoved + + return lc +} + +// Start is invoked during service startup. +func (c *logsConnector) Start(_ context.Context, _ component.Host) error { + // start these metrics, as it might take a while for them to receive their first event + c.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 0) + c.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 0) + c.telemetryBuilder.ProcessorGroupbytraceConfNumTraces.Record(context.Background(), int64(c.config.NumTraces)) + c.eventMachine.startInBackground() + return c.st.start() +} + +// Shutdown is invoked during service shutdown. +func (c *logsConnector) Shutdown(_ context.Context) error { + c.eventMachine.shutdown() + return c.st.shutdown() +} + +// Capabilities implements the consumer interface. +func (c *logsConnector) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func (c *logsConnector) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + var errs error + for _, singleTrace := range batchpersignal.SplitTraces(td) { + errs = multierr.Append(errs, c.eventMachine.consume(singleTrace)) + } + return errs +} + +// resourceSpans as a single trace's spans +func (c *logsConnector) exportTracesAsLogs(ctx context.Context, resourceSpans []ptrace.ResourceSpans) error { + ld := plog.NewLogs() + sl := c.newScopeLogs(ld) + var minSpanStartTime, maxSpanEndTime int64 + var serviceName, spanName string + var traceId pcommon.TraceID + var spanId pcommon.SpanID + var stmp pcommon.Timestamp + for i := 0; i < len(resourceSpans); i++ { + rspans := resourceSpans[i] + resourceAttr := rspans.Resource().Attributes() + serviceAttr, ok := resourceAttr.Get(conventions.AttributeServiceName) + if !ok { + continue + } + serviceName = serviceAttr.Str() + ilsSlice := rspans.ScopeSpans() + for j := 0; j < ilsSlice.Len(); j++ { + ils := ilsSlice.At(j) + //ils.Scope().CopyTo(sl.Scope()) + spans := ils.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + starTime := int64(span.StartTimestamp().AsTime().Nanosecond()) + endTime := int64(span.EndTimestamp().AsTime().Nanosecond()) + + // parentId is empty represent its root span. + // 1. root span duration represent trace duration in most common case. + // 2. Asynchronous scenarios need to calculate duration of the minimum start and maximum end times of the entire span + if span.ParentSpanID().IsEmpty() { + traceId = span.TraceID() + spanId = span.SpanID() + spanName = span.Name() + stmp = span.StartTimestamp() + } else { + if minSpanStartTime == 0 || starTime < minSpanStartTime { + minSpanStartTime = starTime + } + + if maxSpanEndTime == 0 || endTime > maxSpanEndTime { + maxSpanEndTime = endTime + } + + } + } + } + } + c.toLogRecord(sl, serviceName, spanName, traceId, spanId, stmp, maxSpanEndTime-minSpanStartTime) + + return c.exportLogs(ctx, ld) +} + +func (c *logsConnector) exportLogs(ctx context.Context, ld plog.Logs) error { + if err := c.logsConsumer.ConsumeLogs(ctx, ld); err != nil { + c.logger.Error("failed to convert exceptions to logs", zap.Error(err)) + return err + } + return nil +} + +func (c *logsConnector) newScopeLogs(ld plog.Logs) plog.ScopeLogs { + rl := ld.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + return sl +} + +func (c *logsConnector) attrToLogRecord(sl plog.ScopeLogs, serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) plog.LogRecord { + logRecord := sl.LogRecords().AppendEmpty() + logRecord.SetSeverityNumber(plog.SeverityNumberInfo) + logRecord.SetSpanID(span.SpanID()) + logRecord.SetTraceID(span.TraceID()) + spanAttrs := span.Attributes() + + // Copy span attributes to the log record. + // spanAttrs.CopyTo(logRecord.Attributes()) + + // Add common attributes to the log record. + logRecord.Attributes().PutStr(spanNameKey, span.Name()) + logRecord.Attributes().PutStr(serviceNameKey, serviceName) + + // Add configured dimension attributes to the log record. + for _, d := range c.dimensions { + if v, ok := pdatautil.GetDimensionValue(d, spanAttrs, resourceAttrs); ok { + logRecord.Attributes().PutStr(d.Name, v.Str()) + } + } + return logRecord +} + +func (c *logsConnector) toLogRecord(sl plog.ScopeLogs, serviceName, spanName string, traceId pcommon.TraceID, spanId pcommon.SpanID, stmp pcommon.Timestamp, traceDuration int64) plog.LogRecord { + logRecord := sl.LogRecords().AppendEmpty() + logRecord.SetSeverityNumber(plog.SeverityNumberInfo) + logRecord.SetSpanID(spanId) + logRecord.SetTraceID(traceId) + logRecord.SetTimestamp(stmp) + + // Add common attributes to the log record. + logRecord.Attributes().PutStr(spanNameKey, spanName) + logRecord.Attributes().PutStr(serviceNameKey, serviceName) + // Unix Nano + logRecord.Attributes().PutInt(durationNameKey, traceDuration) + return logRecord +} + +func (c *logsConnector) onTraceReceived(trace tracesWithID, worker *eventMachineWorker) error { + traceID := trace.id + if worker.buffer.contains(traceID) { + c.logger.Debug("trace is already in memory storage") + + // it exists in memory already, just append the spans to the trace in the storage + if err := c.addSpans(traceID, trace.td); err != nil { + return fmt.Errorf("couldn't add spans to existing trace: %w", err) + } + + // we are done with this trace, move on + return nil + } + + // at this point, we determined that we haven't seen the trace yet, so, record the + // traceID in the map and the spans to the storage + + // place the trace ID in the buffer, and check if an item had to be evicted + evicted := worker.buffer.put(traceID) + if !evicted.IsEmpty() { + // delete from the storage + worker.fire(event{ + typ: traceRemoved, + payload: evicted, + }) + + rs, err := c.st.get(evicted) + if err != nil || rs == nil { + c.logger.Error("failed to retrieve trace from storage", zap.Error(err), zap.Stringer("traceID", evicted)) + } + + err = c.exportTracesAsLogs(context.Background(), rs) + if err != nil { + c.logger.Error("failed to export traces", zap.Error(err), zap.Stringer("traceID", evicted)) + } + + c.telemetryBuilder.ProcessorGroupbytraceTracesEvicted.Add(context.Background(), 1) + + c.logger.Info("trace evicted: in order to avoid this in the future, adjust the wait duration and/or number of traces to keep in memory", + zap.Stringer("traceID", evicted)) + } + + // we have the traceID in the memory, place the spans in the storage too + if err := c.addSpans(traceID, trace.td); err != nil { + return fmt.Errorf("couldn't add spans to existing trace: %w", err) + } + + c.logger.Debug("scheduled to release trace", zap.Duration("duration", c.config.WaitDuration)) + + time.AfterFunc(c.config.WaitDuration, func() { + // if the event machine has stopped, it will just discard the event + worker.fire(event{ + typ: traceExpired, + payload: traceID, + }) + }) + return nil +} + +func (c *logsConnector) onTraceExpired(traceID pcommon.TraceID, worker *eventMachineWorker) error { + c.logger.Debug("processing expired", zap.Stringer("traceID", traceID)) + + if !worker.buffer.contains(traceID) { + // we likely received multiple batches with spans for the same trace + // and released this trace already + c.logger.Debug("skipping the processing of expired trace", zap.Stringer("traceID", traceID)) + c.telemetryBuilder.ProcessorGroupbytraceIncompleteReleases.Add(context.Background(), 1) + return nil + } + + // delete from the map and erase its memory entry + worker.buffer.delete(traceID) + + // this might block, but we don't need to wait + c.logger.Debug("marking the trace as released", zap.Stringer("traceID", traceID)) + go func() { + _ = c.markAsReleased(traceID, worker.fire) + }() + + return nil +} + +func (c *logsConnector) markAsReleased(traceID pcommon.TraceID, fire func(...event)) error { + // #get is a potentially blocking operation + trace, err := c.st.get(traceID) + if err != nil { + return fmt.Errorf("couldn't retrieve trace %q from the storage: %w", traceID, err) + } + + if trace == nil { + return fmt.Errorf("the trace %q couldn't be found at the storage", traceID) + } + + // signal that the trace is ready to be released + c.logger.Debug("trace marked as released", zap.Stringer("traceID", traceID)) + + // atomically fire the two events, so that a concurrent shutdown won't leave + // an orphaned trace in the storage + fire(event{ + typ: traceReleased, + payload: trace, + }, event{ + typ: traceRemoved, + payload: traceID, + }) + return nil +} + +func (c *logsConnector) onTraceReleased(rss []ptrace.ResourceSpans) error { + trace := ptrace.NewTraces() + for _, rs := range rss { + trs := trace.ResourceSpans().AppendEmpty() + rs.CopyTo(trs) + } + + c.telemetryBuilder.ProcessorGroupbytraceSpansReleased.Add(context.Background(), int64(trace.SpanCount())) + c.telemetryBuilder.ProcessorGroupbytraceTracesReleased.Add(context.Background(), 1) + + err := c.exportTracesAsLogs(context.Background(), rss) + if err != nil { + c.logger.Error("failed to export traces", zap.Error(err)) + } + + return nil +} + +func (c *logsConnector) onTraceRemoved(traceID pcommon.TraceID) error { + trace, err := c.st.delete(traceID) + if err != nil { + return fmt.Errorf("couldn't delete trace %q from the storage: %w", traceID, err) + } + + if trace == nil { + return fmt.Errorf("trace %q not found at the storage", traceID) + } + + return nil +} + +func (c *logsConnector) addSpans(traceID pcommon.TraceID, trace ptrace.Traces) error { + c.logger.Debug("creating trace at the storage", zap.Stringer("traceID", traceID)) + return c.st.createOrAppend(traceID, trace) +} diff --git a/connector/tracedurationconnector/doc.go b/connector/tracedurationconnector/doc.go new file mode 100644 index 000000000000..f4f8c8b65aab --- /dev/null +++ b/connector/tracedurationconnector/doc.go @@ -0,0 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:generate mdatagen metadata.yaml + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" diff --git a/connector/tracedurationconnector/documentation.md b/connector/tracedurationconnector/documentation.md new file mode 100644 index 000000000000..fd95976fa935 --- /dev/null +++ b/connector/tracedurationconnector/documentation.md @@ -0,0 +1,71 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# traceduration + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_processor_groupbytrace_conf_num_traces + +Maximum number of traces to hold in the internal storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### otelcol_processor_groupbytrace_event_latency + +How long the queue events are taking to be processed + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| ms | Histogram | Int | + +### otelcol_processor_groupbytrace_incomplete_releases + +Releases that are suspected to have been incomplete + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| | Sum | Int | true | + +### otelcol_processor_groupbytrace_num_events_in_queue + +Number of events currently in the queue + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### otelcol_processor_groupbytrace_num_traces_in_memory + +Number of traces currently in the in-memory storage + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Int | + +### otelcol_processor_groupbytrace_spans_released + +Spans released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### otelcol_processor_groupbytrace_traces_evicted + +Traces evicted from the internal buffer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | + +### otelcol_processor_groupbytrace_traces_released + +Traces released to the next consumer + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/connector/tracedurationconnector/event.go b/connector/tracedurationconnector/event.go new file mode 100644 index 000000000000..88fa62f17fe4 --- /dev/null +++ b/connector/tracedurationconnector/event.go @@ -0,0 +1,371 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import ( + "context" + "errors" + "fmt" + "hash/maphash" + "sync" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" +) + +const ( + // traces received from the previous processors + traceReceived eventType = iota + + // traceID to be released + traceExpired + + // released traces + traceReleased + + // traceID to be removed + traceRemoved +) + +var ( + errNoTraceID = errors.New("trace doesn't have traceID") + + seed = maphash.MakeSeed() + + hashPool = sync.Pool{ + New: func() any { + var hash maphash.Hash + hash.SetSeed(seed) + return &hash + }, + } +) + +type eventType int +type event struct { + typ eventType + payload any +} + +type tracesWithID struct { + id pcommon.TraceID + td ptrace.Traces +} + +// eventMachine is a machine that accepts events in a typically non-blocking manner, +// processing the events serially per worker scope, to ensure that data at the consumer is consistent. +// Just like the machine itself is non-blocking, consumers are expected to also not block +// on the callbacks, otherwise, events might pile up. When enough events are piled up, firing an +// event will block until enough capacity is available to accept the events. +type eventMachine struct { + workers []*eventMachineWorker + close chan struct{} + metricsCollectionInterval time.Duration + shutdownTimeout time.Duration + + logger *zap.Logger + telemetry *metadata.TelemetryBuilder + onTraceReceived func(td tracesWithID, worker *eventMachineWorker) error + onTraceExpired func(traceID pcommon.TraceID, worker *eventMachineWorker) error + onTraceReleased func(rss []ptrace.ResourceSpans) error + onTraceRemoved func(traceID pcommon.TraceID) error + + onError func(event) + + // shutdown sync + shutdownLock *sync.RWMutex + closed bool +} + +func newEventMachine(logger *zap.Logger, bufferSize int, numWorkers int, numTraces int, telemetry *metadata.TelemetryBuilder) *eventMachine { + em := &eventMachine{ + logger: logger, + telemetry: telemetry, + workers: make([]*eventMachineWorker, numWorkers), + close: make(chan struct{}), + shutdownLock: &sync.RWMutex{}, + metricsCollectionInterval: time.Second, + shutdownTimeout: 10 * time.Second, + } + for i := range em.workers { + em.workers[i] = &eventMachineWorker{ + machine: em, + buffer: newRingBuffer(numTraces / numWorkers), + events: make(chan event, bufferSize/numWorkers), + } + } + return em +} + +func (em *eventMachine) startInBackground() { + em.startWorkers() + go em.periodicMetrics() +} + +func (em *eventMachine) numEvents() int { + var result int + for _, worker := range em.workers { + result += len(worker.events) + } + return result +} + +func (em *eventMachine) periodicMetrics() { + numEvents := em.numEvents() + em.logger.Debug("recording current state of the queue", zap.Int("num-events", numEvents)) + em.telemetry.ProcessorGroupbytraceNumEventsInQueue.Record(context.Background(), int64(numEvents)) + + em.shutdownLock.RLock() + closed := em.closed + em.shutdownLock.RUnlock() + if closed { + return + } + + time.AfterFunc(em.metricsCollectionInterval, func() { + em.periodicMetrics() + }) +} + +func (em *eventMachine) startWorkers() { + for _, worker := range em.workers { + go worker.start() + } +} + +func (em *eventMachine) handleEvent(e event, w *eventMachineWorker) { + switch e.typ { + case traceReceived: + if em.onTraceReceived == nil { + em.logger.Debug("onTraceReceived not set, skipping event") + em.callOnError(e) + return + } + payload, ok := e.payload.(tracesWithID) + if !ok { + // the payload had an unexpected type! + em.callOnError(e) + return + } + + em.handleEventWithObservability("onTraceReceived", func() error { + return em.onTraceReceived(payload, w) + }) + case traceExpired: + if em.onTraceExpired == nil { + em.logger.Debug("onTraceExpired not set, skipping event") + em.callOnError(e) + return + } + payload, ok := e.payload.(pcommon.TraceID) + if !ok { + // the payload had an unexpected type! + em.callOnError(e) + return + } + + em.handleEventWithObservability("onTraceExpired", func() error { + return em.onTraceExpired(payload, w) + }) + case traceReleased: + if em.onTraceReleased == nil { + em.logger.Debug("onTraceReleased not set, skipping event") + em.callOnError(e) + return + } + payload, ok := e.payload.([]ptrace.ResourceSpans) + if !ok { + // the payload had an unexpected type! + em.callOnError(e) + return + } + + em.handleEventWithObservability("onTraceReleased", func() error { + return em.onTraceReleased(payload) + }) + case traceRemoved: + if em.onTraceRemoved == nil { + em.logger.Debug("onTraceRemoved not set, skipping event") + em.callOnError(e) + return + } + payload, ok := e.payload.(pcommon.TraceID) + if !ok { + // the payload had an unexpected type! + em.callOnError(e) + return + } + + em.handleEventWithObservability("onTraceRemoved", func() error { + return em.onTraceRemoved(payload) + }) + default: + em.logger.Info("unknown event type", zap.Any("event", e.typ)) + em.callOnError(e) + return + } +} + +// consume takes a single trace and routes it to one of the workers. +func (em *eventMachine) consume(td ptrace.Traces) error { + traceID, err := getTraceID(td) + if err != nil { + return fmt.Errorf("eventmachine consume failed: %w", err) + } + + var bucket uint64 + if len(em.workers) != 1 { + bucket = workerIndexForTraceID(traceID, len(em.workers)) + } + + em.logger.Debug("scheduled trace to worker", zap.Uint64("id", bucket)) + + em.workers[bucket].fire(event{ + typ: traceReceived, + payload: tracesWithID{id: traceID, td: td}, + }) + return nil +} + +func workerIndexForTraceID(traceID pcommon.TraceID, numWorkers int) uint64 { + hash := hashPool.Get().(*maphash.Hash) + defer func() { + hash.Reset() + hashPool.Put(hash) + }() + + _, _ = hash.Write(traceID[:]) + return hash.Sum64() % uint64(numWorkers) +} + +func (em *eventMachine) shutdown() { + em.logger.Info("shutting down the event manager", zap.Int("pending-events", em.numEvents())) + em.shutdownLock.Lock() + em.closed = true + em.shutdownLock.Unlock() + + done := make(chan struct{}) + + // we never return an error here + ok, _ := doWithTimeout(em.shutdownTimeout, func() error { + for { + if em.numEvents() == 0 { + return nil + } + time.Sleep(100 * time.Millisecond) + + // Do not leak goroutine + select { + case <-done: + return nil + default: + } + } + }) + close(done) + + if !ok { + em.logger.Info("forcing the shutdown of the event manager", zap.Int("pending-events", em.numEvents())) + } + close(em.close) +} + +func (em *eventMachine) callOnError(e event) { + if em.onError != nil { + em.onError(e) + } +} + +// handleEventWithObservability uses the given function to process and event, +// recording the event's latency and timing out if it doesn't finish within a reasonable duration +func (em *eventMachine) handleEventWithObservability(event string, do func() error) { + start := time.Now() + succeeded, err := doWithTimeout(time.Second, do) + duration := time.Since(start) + em.telemetry.ProcessorGroupbytraceEventLatency.Record(context.Background(), duration.Milliseconds(), metric.WithAttributeSet(attribute.NewSet(attribute.String("event", event)))) + + if err != nil { + em.logger.Error("failed to process event", zap.Error(err), zap.String("event", event)) + } + if succeeded { + em.logger.Debug("event finished", zap.String("event", event)) + } else { + em.logger.Debug("event aborted", zap.String("event", event)) + } +} + +type eventMachineWorker struct { + machine *eventMachine + + // the ring buffer holds the IDs for all the in-flight traces + buffer *ringBuffer + + events chan event +} + +func (w *eventMachineWorker) start() { + for { + select { + case e := <-w.events: + w.machine.handleEvent(e, w) + case <-w.machine.close: + return + } + } +} + +func (w *eventMachineWorker) fire(events ...event) { + w.machine.shutdownLock.RLock() + defer w.machine.shutdownLock.RUnlock() + + // we are not accepting new events + if w.machine.closed { + return + } + + for _, e := range events { + w.events <- e + } +} + +// doWithTimeout wraps a function in a timeout, returning whether it succeeded before timing out. +// If the function returns an error within the timeout, it's considered as succeeded and the error will be returned back to the caller. +func doWithTimeout(timeout time.Duration, do func() error) (bool, error) { + done := make(chan error, 1) + go func() { + done <- do() + }() + + select { + case <-time.After(timeout): + return false, nil + case err := <-done: + return true, err + } +} + +func getTraceID(td ptrace.Traces) (pcommon.TraceID, error) { + rss := td.ResourceSpans() + if rss.Len() == 0 { + return pcommon.NewTraceIDEmpty(), errNoTraceID + } + + ilss := rss.At(0).ScopeSpans() + if ilss.Len() == 0 { + return pcommon.NewTraceIDEmpty(), errNoTraceID + } + + spans := ilss.At(0).Spans() + if spans.Len() == 0 { + return pcommon.NewTraceIDEmpty(), errNoTraceID + } + + return spans.At(0).TraceID(), nil +} diff --git a/connector/tracedurationconnector/event_test.go b/connector/tracedurationconnector/event_test.go new file mode 100644 index 000000000000..7d611ab9611e --- /dev/null +++ b/connector/tracedurationconnector/event_test.go @@ -0,0 +1,545 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector + +import ( + "context" + "errors" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" +) + +func TestEventCallback(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + + for _, tt := range []struct { + casename string + typ eventType + payload any + registerCallback func(em *eventMachine, wg *sync.WaitGroup) + }{ + { + casename: "onTraceReceived", + typ: traceReceived, + payload: tracesWithID{id: pcommon.NewTraceIDEmpty(), td: ptrace.NewTraces()}, + registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { + em.onTraceReceived = func(_ tracesWithID, _ *eventMachineWorker) error { + wg.Done() + return nil + } + }, + }, + { + casename: "onTraceExpired", + typ: traceExpired, + payload: pcommon.TraceID([16]byte{1, 2, 3, 4}), + registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { + em.onTraceExpired = func(expired pcommon.TraceID, _ *eventMachineWorker) error { + wg.Done() + assert.Equal(t, pcommon.TraceID([16]byte{1, 2, 3, 4}), expired) + return nil + } + }, + }, + { + casename: "onTraceReleased", + typ: traceReleased, + payload: []ptrace.ResourceSpans{}, + registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { + em.onTraceReleased = func(_ []ptrace.ResourceSpans) error { + wg.Done() + return nil + } + }, + }, + { + casename: "onTraceRemoved", + typ: traceRemoved, + payload: pcommon.TraceID([16]byte{1, 2, 3, 4}), + registerCallback: func(em *eventMachine, wg *sync.WaitGroup) { + em.onTraceRemoved = func(expired pcommon.TraceID) error { + wg.Done() + assert.Equal(t, pcommon.TraceID([16]byte{1, 2, 3, 4}), expired) + return nil + } + }, + }, + } { + t.Run(tt.casename, func(t *testing.T) { + // prepare + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + wg := &sync.WaitGroup{} + em := newEventMachine(logger, 50, 1, 1_000, tel) + tt.registerCallback(em, wg) + + em.startInBackground() + defer em.shutdown() + + // test + wg.Add(1) + em.workers[0].fire(event{ + typ: tt.typ, + payload: tt.payload, + }) + + // verify + wg.Wait() + }) + } +} + +func TestEventCallbackNotSet(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + for _, tt := range []struct { + casename string + typ eventType + }{ + { + casename: "onTraceReceived", + typ: traceReceived, + }, + { + casename: "onTraceExpired", + typ: traceExpired, + }, + { + casename: "onTraceReleased", + typ: traceReleased, + }, + { + casename: "onTraceRemoved", + typ: traceRemoved, + }, + } { + t.Run(tt.casename, func(t *testing.T) { + // prepare + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + wg := &sync.WaitGroup{} + em := newEventMachine(logger, 50, 1, 1_000, tel) + em.onError = func(_ event) { + wg.Done() + } + em.startInBackground() + defer em.shutdown() + + // test + wg.Add(1) + em.workers[0].fire(event{ + typ: tt.typ, + }) + + // verify + wg.Wait() + }) + } +} + +func TestEventInvalidPayload(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + for _, tt := range []struct { + casename string + typ eventType + registerCallback func(*eventMachine, *sync.WaitGroup) + }{ + { + casename: "onTraceReceived", + typ: traceReceived, + registerCallback: func(em *eventMachine, _ *sync.WaitGroup) { + em.onTraceReceived = func(_ tracesWithID, _ *eventMachineWorker) error { + return nil + } + }, + }, + { + casename: "onTraceExpired", + typ: traceExpired, + registerCallback: func(em *eventMachine, _ *sync.WaitGroup) { + em.onTraceExpired = func(_ pcommon.TraceID, _ *eventMachineWorker) error { + return nil + } + }, + }, + { + casename: "onTraceReleased", + typ: traceReleased, + registerCallback: func(em *eventMachine, _ *sync.WaitGroup) { + em.onTraceReleased = func(_ []ptrace.ResourceSpans) error { + return nil + } + }, + }, + { + casename: "onTraceRemoved", + typ: traceRemoved, + registerCallback: func(em *eventMachine, _ *sync.WaitGroup) { + em.onTraceRemoved = func(_ pcommon.TraceID) error { + return nil + } + }, + }, + } { + t.Run(tt.casename, func(t *testing.T) { + // prepare + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + wg := &sync.WaitGroup{} + em := newEventMachine(logger, 50, 1, 1_000, tel) + em.onError = func(_ event) { + wg.Done() + } + tt.registerCallback(em, wg) + em.startInBackground() + defer em.shutdown() + + // test + wg.Add(1) + em.workers[0].fire(event{ + typ: tt.typ, + }) + + // verify + wg.Wait() + }) + } +} + +func TestEventUnknownType(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + // prepare + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + wg := &sync.WaitGroup{} + em := newEventMachine(logger, 50, 1, 1_000, tel) + em.onError = func(_ event) { + wg.Done() + } + em.startInBackground() + defer em.shutdown() + + // test + wg.Add(1) + em.workers[0].fire(event{ + typ: eventType(1234), + }) + + // verify + wg.Wait() +} + +func TestEventTracePerWorker(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + for _, tt := range []struct { + casename string + traceID [16]byte + errString string + }{ + { + casename: "invalid traceID", + errString: "eventmachine consume failed:", + }, + + { + casename: "traceID 1", + traceID: [16]byte{1}, + }, + + { + casename: "traceID 2", + traceID: [16]byte{2}, + }, + + { + casename: "traceID 3", + traceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + }, + } { + t.Run(tt.casename, func(t *testing.T) { + em := newEventMachine(zap.NewNop(), 200, 100, 1_000, tel) + + var wg sync.WaitGroup + var workerForTrace *eventMachineWorker + em.onTraceReceived = func(_ tracesWithID, w *eventMachineWorker) error { + workerForTrace = w + w.fire(event{ + typ: traceExpired, + payload: pcommon.TraceID([16]byte{1}), + }) + return nil + } + em.onTraceExpired = func(_ pcommon.TraceID, w *eventMachineWorker) error { + assert.Equal(t, workerForTrace, w) + wg.Done() + return nil + } + em.startInBackground() + defer em.shutdown() + + td := ptrace.NewTraces() + ils := td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty() + if tt.traceID != [16]byte{} { + span := ils.Spans().AppendEmpty() + span.SetTraceID(tt.traceID) + } + + // test + wg.Add(1) + err := em.consume(td) + + // verify + if tt.errString == "" { + require.NoError(t, err) + } else { + wg.Done() + require.Truef(t, strings.HasPrefix(err.Error(), tt.errString), "error should have prefix %q", tt.errString) + } + + wg.Wait() + }) + } +} + +func TestEventConsumeConsistency(t *testing.T) { + for _, tt := range []struct { + casename string + traceID [16]byte + }{ + { + casename: "trace 1", + traceID: [16]byte{1, 2, 3, 4}, + }, + + { + casename: "trace 2", + traceID: [16]byte{2, 3, 4, 5}, + }, + } { + t.Run(tt.casename, func(t *testing.T) { + realTraceID := workerIndexForTraceID(pcommon.TraceID(tt.traceID), 100) + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 30; j++ { + assert.Equal(t, realTraceID, workerIndexForTraceID(pcommon.TraceID(tt.traceID), 100)) + } + }() + } + wg.Wait() + }) + } +} + +func TestEventShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + // prepare + wg := sync.WaitGroup{} + wg.Add(1) + + traceReceivedFired := &atomic.Int64{} + traceExpiredFired := &atomic.Int64{} + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) + em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error { + traceReceivedFired.Store(1) + return nil + } + em.onTraceExpired = func(pcommon.TraceID, *eventMachineWorker) error { + traceExpiredFired.Store(1) + return nil + } + em.onTraceRemoved = func(pcommon.TraceID) error { + wg.Wait() + return nil + } + em.startInBackground() + + // test + em.workers[0].fire(event{ + typ: traceReceived, + payload: tracesWithID{id: pcommon.NewTraceIDEmpty(), td: ptrace.NewTraces()}, + }) + em.workers[0].fire(event{ + typ: traceRemoved, + payload: pcommon.TraceID([16]byte{1, 2, 3, 4}), + }) + em.workers[0].fire(event{ + typ: traceRemoved, + payload: pcommon.TraceID([16]byte{1, 2, 3, 4}), + }) + + time.Sleep(10 * time.Millisecond) // give it a bit of time to process the items + assert.Equal(t, 1, em.numEvents()) // we should have one pending event in the queue, the second traceRemoved event + + shutdownWg := sync.WaitGroup{} + shutdownWg.Add(1) + go func() { + em.shutdown() + shutdownWg.Done() + }() + + wg.Done() // the pending event should be processed + time.Sleep(100 * time.Millisecond) // give it a bit of time to process the items + + assert.Equal(t, 0, em.numEvents()) + + // new events should *not* be processed + em.workers[0].fire(event{ + typ: traceExpired, + payload: pcommon.TraceID([16]byte{1, 2, 3, 4}), + }) + + // verify + assert.Equal(t, int64(1), traceReceivedFired.Load()) + + // If the code is wrong, there's a chance that the test will still pass + // in case the event is processed after the assertion. + // for this reason, we add a small delay here + time.Sleep(10 * time.Millisecond) + assert.Equal(t, int64(0), traceExpiredFired.Load()) + + // wait until the shutdown has returned + shutdownWg.Wait() +} + +func TestPeriodicMetrics(t *testing.T) { + // prepare + s := setupTestTelemetry() + telemetryBuilder, err := metadata.NewTelemetryBuilder(s.NewSettings().TelemetrySettings) + require.NoError(t, err) + + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, telemetryBuilder) + em.metricsCollectionInterval = time.Millisecond + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + expected := 2 + calls := 0 + for range em.workers[0].events { + // we expect two events, after which we just exit the loop + // if we return from here, we'd still have one item in the queue that is not going to be consumed + wg.Wait() + calls++ + + if calls == expected { + return + } + } + }() + + // sanity check + assertGaugeNotCreated(t, "otelcol_processor_groupbytrace_num_events_in_queue", s) + + // test + em.workers[0].fire(event{typ: traceReceived}) + em.workers[0].fire(event{typ: traceReceived}) // the first is consumed right away, the second is in the queue + go em.periodicMetrics() + + // ensure our gauge is showing 1 item in the queue + assert.Eventually(t, func() bool { + return getGaugeValue(t, "otelcol_processor_groupbytrace_num_events_in_queue", s) == 1 + }, 1*time.Second, 10*time.Millisecond) + + wg.Done() // release all events + + // ensure our gauge is now showing no items in the queue + assert.Eventually(t, func() bool { + return getGaugeValue(t, "otelcol_processor_groupbytrace_num_events_in_queue", s) == 0 + }, 1*time.Second, 10*time.Millisecond) + + // signal and wait for the recursive call to finish + em.shutdownLock.Lock() + em.closed = true + em.shutdownLock.Unlock() + time.Sleep(5 * time.Millisecond) +} + +func TestForceShutdown(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + // prepare + em := newEventMachine(zap.NewNop(), 50, 1, 1_000, tel) + em.shutdownTimeout = 20 * time.Millisecond + + // test + em.workers[0].fire(event{typ: traceExpired}) + + start := time.Now() + em.shutdown() // should take about 20ms to return + duration := time.Since(start) + + // verify + assert.Greater(t, duration, 20*time.Millisecond) + + // wait for shutdown goroutine to end + time.Sleep(100 * time.Millisecond) +} + +func TestDoWithTimeout_NoTimeout(t *testing.T) { + // prepare + wantErr := errors.New("my error") + // test + succeed, err := doWithTimeout(20*time.Millisecond, func() error { + return wantErr + }) + assert.True(t, succeed) + assert.Equal(t, wantErr, err) +} + +func TestDoWithTimeout_TimeoutTrigger(t *testing.T) { + // prepare + start := time.Now() + + // test + succeed, err := doWithTimeout(20*time.Millisecond, func() error { + time.Sleep(1 * time.Second) + return nil + }) + assert.False(t, succeed) + assert.NoError(t, err) + + // verify + assert.WithinDuration(t, start, time.Now(), 100*time.Millisecond) +} + +func getGaugeValue(t *testing.T, name string, tt componentTestTelemetry) int64 { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + m := tt.getMetric(name, md).Data + g := m.(metricdata.Gauge[int64]) + assert.Len(t, g.DataPoints, 1, "expected exactly one data point") + return g.DataPoints[0].Value +} + +func assertGaugeNotCreated(t *testing.T, name string, tt componentTestTelemetry) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + got := tt.getMetric(name, md) + assert.Equal(t, metricdata.Metrics{}, got, "gauge exists already but shouldn't") +} diff --git a/connector/tracedurationconnector/factory.go b/connector/tracedurationconnector/factory.go new file mode 100644 index 000000000000..dee0a1b3d16c --- /dev/null +++ b/connector/tracedurationconnector/factory.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/collector/connector" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" +) + +const ( + defaultWaitDuration = time.Second + defaultNumTraces = 1_000_000 + defaultNumWorkers = 1 + defaultDiscardOrphans = false + defaultStoreOnDisk = false +) + +var ( + errDiskStorageNotSupported = fmt.Errorf("option 'disk storage' not supported in this release") + errDiscardOrphansNotSupported = fmt.Errorf("option 'discard orphans' not supported in this release") +) + +// NewFactory returns a new factory for the Filter processor. +func NewFactory() connector.Factory { + + return connector.NewFactory( + metadata.Type, + createDefaultConfig, + connector.WithTracesToLogs(createTracesToLogsConnector, metadata.TracesToLogsStability), + //processor.WithTraces(createTracesProcessor, metadata.TracesStability) + ) +} + +// createDefaultConfig creates the default configuration for the processor. +func createDefaultConfig() component.Config { + return &Config{ + NumTraces: defaultNumTraces, + NumWorkers: defaultNumWorkers, + WaitDuration: defaultWaitDuration, + + // not supported for now + DiscardOrphans: defaultDiscardOrphans, + StoreOnDisk: defaultStoreOnDisk, + } +} + +// createTracesProcessor creates a trace processor based on this config. +//func createTracesProcessor( +// _ context.Context, +// params processor.Settings, +// cfg component.Config, +// nextConsumer consumer.Traces) (processor.Traces, error) { +// +// oCfg := cfg.(*Config) +// +// var st storage +// if oCfg.StoreOnDisk { +// return nil, errDiskStorageNotSupported +// } +// if oCfg.DiscardOrphans { +// return nil, errDiscardOrphansNotSupported +// } +// +// processor := newGroupByTraceProcessor(params, nextConsumer, *oCfg) +// // the only supported storage for now +// st = newMemoryStorage(processor.telemetryBuilder) +// processor.st = st +// return processor, nil +//} + +func createTracesToLogsConnector(_ context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Logs) (connector.Traces, error) { + oCfg := cfg.(*Config) + + var st storage + if oCfg.StoreOnDisk { + return nil, errDiskStorageNotSupported + } + if oCfg.DiscardOrphans { + return nil, errDiscardOrphansNotSupported + } + + lc := newLogsConnector(params, cfg) + // the only supported storage for now + st = newMemoryStorage(lc.telemetryBuilder) + lc.st = st + + lc.logsConsumer = nextConsumer + return lc, nil +} diff --git a/connector/tracedurationconnector/generated_component_telemetry_test.go b/connector/tracedurationconnector/generated_component_telemetry_test.go new file mode 100644 index 000000000000..33d835bae37f --- /dev/null +++ b/connector/tracedurationconnector/generated_component_telemetry_test.go @@ -0,0 +1,81 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package tracedurationconnector + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" +) + +type componentTestTelemetry struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider +} + +func (tt *componentTestTelemetry) NewSettings() connector.Settings { + settings := connectortest.NewNopSettings() + settings.MeterProvider = tt.meterProvider + settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { + return tt.meterProvider + } + settings.ID = component.NewID(component.MustNewType("traceduration")) + + return settings +} + +func setupTestTelemetry() componentTestTelemetry { + reader := sdkmetric.NewManualReader() + return componentTestTelemetry{ + reader: reader, + meterProvider: sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)), + } +} + +func (tt *componentTestTelemetry) assertMetrics(t *testing.T, expected []metricdata.Metrics) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := tt.getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), tt.len(md)) +} + +func (tt *componentTestTelemetry) getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func (tt *componentTestTelemetry) len(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} + +func (tt *componentTestTelemetry) Shutdown(ctx context.Context) error { + return tt.meterProvider.Shutdown(ctx) +} diff --git a/connector/tracedurationconnector/generated_package_test.go b/connector/tracedurationconnector/generated_package_test.go new file mode 100644 index 000000000000..640e33ccc37a --- /dev/null +++ b/connector/tracedurationconnector/generated_package_test.go @@ -0,0 +1,13 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package tracedurationconnector + +import ( + "os" + "testing" +) + +func TestMain(m *testing.M) { + // skipping goleak test as per metadata.yml configuration + os.Exit(m.Run()) +} diff --git a/connector/tracedurationconnector/go.mod b/connector/tracedurationconnector/go.mod new file mode 100644 index 000000000000..b565990ff9ee --- /dev/null +++ b/connector/tracedurationconnector/go.mod @@ -0,0 +1,63 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector + +go 1.22.0 + +require ( + github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.110.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0 + github.com/stretchr/testify v1.9.0 + go.opentelemetry.io/collector/component v0.110.0 + go.opentelemetry.io/collector/config/configtelemetry v0.110.0 + go.opentelemetry.io/collector/connector v0.110.0 + go.opentelemetry.io/collector/consumer v0.110.0 + go.opentelemetry.io/collector/pdata v1.17.0 + go.opentelemetry.io/collector/processor v0.109.0 + go.opentelemetry.io/collector/semconv v0.110.0 + go.opentelemetry.io/otel v1.30.0 + go.opentelemetry.io/otel/metric v1.30.0 + go.opentelemetry.io/otel/sdk/metric v1.30.0 + go.opentelemetry.io/otel/trace v1.30.0 + go.uber.org/multierr v1.11.0 + go.uber.org/zap v1.27.0 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector v0.110.0 // indirect + go.opentelemetry.io/collector/component/componentprofiles v0.110.0 // indirect + go.opentelemetry.io/collector/component/componentstatus v0.110.0 // indirect + go.opentelemetry.io/collector/connector/connectorprofiles v0.110.0 // indirect + go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect + go.opentelemetry.io/collector/consumer/consumertest v0.110.0 // indirect + go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect + go.opentelemetry.io/collector/pdata/testdata v0.110.0 // indirect + go.opentelemetry.io/collector/pipeline v0.110.0 // indirect + go.opentelemetry.io/collector/processor/processorprofiles v0.109.0 // indirect + go.opentelemetry.io/otel/sdk v1.30.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal + +retract ( + v0.76.2 + v0.76.1 + v0.65.0 +) diff --git a/connector/tracedurationconnector/go.sum b/connector/tracedurationconnector/go.sum new file mode 100644 index 000000000000..e90f2985731e --- /dev/null +++ b/connector/tracedurationconnector/go.sum @@ -0,0 +1,142 @@ +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.110.0 h1:cSo0xbjIcKPihtbPCrE/9Z2m3I/SdI6Ue+pTLZhR4A4= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.110.0/go.mod h1:4ZyvPIvgNqK12WrFB4OU0mAX1K6DpiP099tjfmLERjI= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.110.0 h1:/kMxDplBmo99UQwkpnCkHO3mwBD6/YcX/ldwlZSURFw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.110.0/go.mod h1:nn/ktwLdZ6O9mtnRuak8NQEfGRow3VI3l+YqAroJX7g= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0 h1:KYBzbgQyCz4i5zjzs0iBOFuNh2vagaw2seqvZ7Lftxk= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.110.0/go.mod h1:zLtbGLswPAKzTHM4C1Pcxb+PgNHgo6aGVZQtQaeBtec= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/collector v0.110.0 h1:v1muSqb80nTvzQa/4nR83JqvKISNRQn1zvi1EyhLTuk= +go.opentelemetry.io/collector v0.110.0/go.mod h1:N3tbcxHKHxBZcnsgM8l33/44rsZOABff5Hpo0aWg1cU= +go.opentelemetry.io/collector/component v0.110.0 h1:z7uSY/1dcK+vTY2z3v0XxeCoi2wqgHTow/ds3Gozuz4= +go.opentelemetry.io/collector/component v0.110.0/go.mod h1:W99gZdfGtQ5Zg6Bhrwrcl/uZcCG+2qBnZ1z2JO5WCW0= +go.opentelemetry.io/collector/component/componentprofiles v0.110.0 h1:YH43aYKPYfnC0TqgI+WlbHsJkTvQPO3ImJybK3oyxQ8= +go.opentelemetry.io/collector/component/componentprofiles v0.110.0/go.mod h1:ZDVFaOhCt6ce2u/HHwqxoV5f+8P2dh0Xut8laYRu4+o= +go.opentelemetry.io/collector/component/componentstatus v0.110.0 h1:j+Fg88CgxmiCKL6cBCKRLHTduYbNE1OK1OYaXxGjuk4= +go.opentelemetry.io/collector/component/componentstatus v0.110.0/go.mod h1:UI3vOkwnI6JkRYxuWCrrlJe45ywTdlew426r4Wy4gH4= +go.opentelemetry.io/collector/config/configtelemetry v0.110.0 h1:V8Y/Xv7TJpnNGHLrheRKrMydcKBxWYAZ+dj71Kllyos= +go.opentelemetry.io/collector/config/configtelemetry v0.110.0/go.mod h1:R0MBUxjSMVMIhljuDHWIygzzJWQyZHXXWIgQNxcFwhc= +go.opentelemetry.io/collector/connector v0.110.0 h1:gUUjgLzeipFAGAwDtKAqMtMRM4dULHiehiH+dq/0UkQ= +go.opentelemetry.io/collector/connector v0.110.0/go.mod h1:k4OjXus3SntyImXcEZzw2+6DS2MZAyBEfp/j2tbSNHU= +go.opentelemetry.io/collector/connector/connectorprofiles v0.110.0 h1:EHdVhXPRZ1IFwVfo5a9IiOs3RJ6wtbTL4W5f1p3Q3Vo= +go.opentelemetry.io/collector/connector/connectorprofiles v0.110.0/go.mod h1:VG1L8zbGZfWwJTlN4KYUcFMWVHXys86NfcshY+uVuLk= +go.opentelemetry.io/collector/consumer v0.110.0 h1:CnB83KSFQxhFAbNJwTM0blahg16xa6CyUnIIA5qPMbA= +go.opentelemetry.io/collector/consumer v0.110.0/go.mod h1:WlzhfwDfwKkEa5XFdN5x9+jjp9ZF5EUSmtOgVe69zm0= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 h1:KlEGGPFmQN7CFbj8pkiD9J6L820kJgC0zYKqeSFGLEo= +go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0/go.mod h1:Br4qElhLqAYDMddroNox3CpNv+NxgPgNfGhxFXXxYIw= +go.opentelemetry.io/collector/consumer/consumertest v0.110.0 h1:/rOL4sJr4eSzOp5z6+R7MmuJ5UD6PFOs7S2FA7puE1g= +go.opentelemetry.io/collector/consumer/consumertest v0.110.0/go.mod h1:sKL3UwB6nyH/33JE173o755ekYPwlS/8fs8opTSgPiY= +go.opentelemetry.io/collector/internal/globalsignal v0.110.0 h1:S6bfFEiek8vJeXAbciWS7W8UR6ZrVJB3ftNyFTMHQaY= +go.opentelemetry.io/collector/internal/globalsignal v0.110.0/go.mod h1:GqMXodPWOxK5uqpX8MaMXC2389y2XJTa5nPwf8FYDK8= +go.opentelemetry.io/collector/pdata v1.16.0 h1:g02K8jlRnmQ7TQDuXpdgVL6vIxIVqr5Gbb1qIR27rto= +go.opentelemetry.io/collector/pdata v1.16.0/go.mod h1:YZZJIt2ehxosYf/Y1pbvexjNWsIGNNrzzlCTO9jC1F4= +go.opentelemetry.io/collector/pdata v1.17.0/go.mod h1:yZaQ9KZAm/qie96LTygRKxOXMq0/54h8OW7330ycuvQ= +go.opentelemetry.io/collector/pdata/pprofile v0.110.0 h1:DknuOGOdjYIzVnromhVcW5rWyyjPahf65UAfgXz1xfo= +go.opentelemetry.io/collector/pdata/pprofile v0.110.0/go.mod h1:B3GxiEt40ixeGq2r30AOB3dgKdC281rLw39IqgSTwSM= +go.opentelemetry.io/collector/pdata/testdata v0.110.0 h1:XUXuaKVpD5G7VvTlBCBo/7CDpHvbeWRLXN4zjol94kg= +go.opentelemetry.io/collector/pdata/testdata v0.110.0/go.mod h1:lvpGoQcVDbRjuH3caNIkQ+pkU/+MLKVV4MdNFcp5mxU= +go.opentelemetry.io/collector/pipeline v0.110.0 h1:nArQj8lt2R6ajbbmZ0f7JqkzAzvIOSwxsxDEv9HGKHw= +go.opentelemetry.io/collector/pipeline v0.110.0/go.mod h1:qWk90kohDYBgI/1Kw4DQaQU82+y9GJY8MDse7H2JTWg= +go.opentelemetry.io/collector/processor v0.109.0 h1:Pgo9hib4ae1FSA47RB7TUUS26nConIlXcltzbxrjFg8= +go.opentelemetry.io/collector/processor v0.109.0/go.mod h1:Td43GwGMRCXin5JM/zAzMtLieobHTVVrD4Y7jSvsMtg= +go.opentelemetry.io/collector/processor/processorprofiles v0.109.0 h1:+w0vqF30eOskfpcIuZLAJb1dCWcayBlGWoQCOUWKzf4= +go.opentelemetry.io/collector/processor/processorprofiles v0.109.0/go.mod h1:k7pJ76mOeU1Fx1hoVEJExMK9mhMre8xdSS3+cOKvdM4= +go.opentelemetry.io/collector/semconv v0.110.0 h1:KHQnOHe3gUz0zsxe8ph9kN5OTypCFD4V+06AiBTfeNk= +go.opentelemetry.io/collector/semconv v0.110.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= +go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= +go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= +go.opentelemetry.io/otel/sdk/metric v1.30.0 h1:QJLT8Pe11jyHBHfSAgYH7kEmT24eX792jZO1bo4BXkM= +go.opentelemetry.io/otel/sdk/metric v1.30.0/go.mod h1:waS6P3YqFNzeP01kuo/MBBYqaoBJl7efRQHOaydhy1Y= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd h1:6TEm2ZxXoQmFWFlt1vNxvVOa1Q0dXFQD1m/rYjXmS0E= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.2 h1:3QdXkuq3Bkh7w+ywLdLvM56cmGvQHUMZpiCzt6Rqaoo= +google.golang.org/grpc v1.66.2/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/connector/tracedurationconnector/internal/metadata/generated_status.go b/connector/tracedurationconnector/internal/metadata/generated_status.go new file mode 100644 index 000000000000..82b36966a405 --- /dev/null +++ b/connector/tracedurationconnector/internal/metadata/generated_status.go @@ -0,0 +1,17 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/component" +) + +var ( + Type = component.MustNewType("traceduration") + ScopeName = "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" +) + +const ( + TracesToMetricsStability = component.StabilityLevelAlpha + TracesToLogsStability = component.StabilityLevelAlpha +) diff --git a/connector/tracedurationconnector/internal/metadata/generated_telemetry.go b/connector/tracedurationconnector/internal/metadata/generated_telemetry.go new file mode 100644 index 000000000000..8f01b72f40c6 --- /dev/null +++ b/connector/tracedurationconnector/internal/metadata/generated_telemetry.go @@ -0,0 +1,104 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" +) + +// Deprecated: [v0.108.0] use LeveledMeter instead. +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector") +} + +func LeveledMeter(settings component.TelemetrySettings, level configtelemetry.Level) metric.Meter { + return settings.LeveledMeterProvider(level).Meter("github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + ProcessorGroupbytraceConfNumTraces metric.Int64Gauge + ProcessorGroupbytraceEventLatency metric.Int64Histogram + ProcessorGroupbytraceIncompleteReleases metric.Int64Counter + ProcessorGroupbytraceNumEventsInQueue metric.Int64Gauge + ProcessorGroupbytraceNumTracesInMemory metric.Int64Gauge + ProcessorGroupbytraceSpansReleased metric.Int64Counter + ProcessorGroupbytraceTracesEvicted metric.Int64Counter + ProcessorGroupbytraceTracesReleased metric.Int64Counter + meters map[configtelemetry.Level]metric.Meter +} + +// telemetryBuilderOption applies changes to default builder. +type telemetryBuilderOption func(*TelemetryBuilder) + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{meters: map[configtelemetry.Level]metric.Meter{}} + for _, op := range options { + op(&builder) + } + builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) + var err, errs error + builder.ProcessorGroupbytraceConfNumTraces, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( + "otelcol_processor_groupbytrace_conf_num_traces", + metric.WithDescription("Maximum number of traces to hold in the internal storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceEventLatency, err = builder.meters[configtelemetry.LevelBasic].Int64Histogram( + "otelcol_processor_groupbytrace_event_latency", + metric.WithDescription("How long the queue events are taking to be processed"), + metric.WithUnit("ms"), metric.WithExplicitBucketBoundaries([]float64{5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000}...), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceIncompleteReleases, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_groupbytrace_incomplete_releases", + metric.WithDescription("Releases that are suspected to have been incomplete"), + metric.WithUnit(""), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumEventsInQueue, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( + "otelcol_processor_groupbytrace_num_events_in_queue", + metric.WithDescription("Number of events currently in the queue"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceNumTracesInMemory, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( + "otelcol_processor_groupbytrace_num_traces_in_memory", + metric.WithDescription("Number of traces currently in the in-memory storage"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceSpansReleased, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_groupbytrace_spans_released", + metric.WithDescription("Spans released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesEvicted, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_groupbytrace_traces_evicted", + metric.WithDescription("Traces evicted from the internal buffer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.ProcessorGroupbytraceTracesReleased, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_groupbytrace_traces_released", + metric.WithDescription("Traces released to the next consumer"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/connector/tracedurationconnector/internal/metadata/generated_telemetry_test.go b/connector/tracedurationconnector/internal/metadata/generated_telemetry_test.go new file mode 100644 index 000000000000..d3761f6878e4 --- /dev/null +++ b/connector/tracedurationconnector/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,83 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return mockMeterProvider{} + }, + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := component.TelemetrySettings{ + LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { + return mockMeterProvider{} + }, + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + applied := false + _, err := NewTelemetryBuilder(set, func(b *TelemetryBuilder) { + applied = true + }) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/connector/tracedurationconnector/metadata.yaml b/connector/tracedurationconnector/metadata.yaml new file mode 100644 index 000000000000..cb68b3b85e8e --- /dev/null +++ b/connector/tracedurationconnector/metadata.yaml @@ -0,0 +1,71 @@ +type: traceduration + +status: + class: connector + stability: + alpha: [traces_to_metrics, traces_to_logs] + distributions: [contrib] + warnings: [Statefulness] + codeowners: + active: [JaredTan95] +tests: + config: + goleak: + skip: true + +telemetry: + metrics: + connector_groupbytrace_conf_num_traces: + enabled: true + description: Maximum number of traces to hold in the internal storage + unit: "1" + gauge: + value_type: int + connector_groupbytrace_num_events_in_queue: + enabled: true + description: Number of events currently in the queue + unit: "1" + gauge: + value_type: int + connector_groupbytrace_num_traces_in_memory: + enabled: true + description: Number of traces currently in the in-memory storage + unit: "1" + gauge: + value_type: int + connector_groupbytrace_traces_evicted: + enabled: true + description: Traces evicted from the internal buffer + unit: "1" + sum: + value_type: int + monotonic: true + connector_groupbytrace_spans_released: + enabled: true + description: Spans released to the next consumer + unit: "1" + sum: + value_type: int + monotonic: true + connector_groupbytrace_traces_released: + enabled: true + description: Traces released to the next consumer + unit: "1" + sum: + value_type: int + monotonic: true + connector_groupbytrace_incomplete_releases: + enabled: true + description: Releases that are suspected to have been incomplete + sum: + value_type: int + monotonic: true + connector_groupbytrace_event_latency: + enabled: true + description: How long the queue events are taking to be processed + unit: ms + histogram: + value_type: int + bucket_boundaries: [5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000] + + diff --git a/connector/tracedurationconnector/ring_buffer.go b/connector/tracedurationconnector/ring_buffer.go new file mode 100644 index 000000000000..94e0a1f823e5 --- /dev/null +++ b/connector/tracedurationconnector/ring_buffer.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import "go.opentelemetry.io/collector/pdata/pcommon" + +// ringBuffer keeps an in-memory bounded buffer with the in-flight trace IDs +type ringBuffer struct { + index int + size int + ids []pcommon.TraceID + idToIndex map[pcommon.TraceID]int // key is traceID, value is the index on the 'ids' slice +} + +func newRingBuffer(size int) *ringBuffer { + return &ringBuffer{ + index: -1, // the first span to be received will be placed at position '0' + size: size, + ids: make([]pcommon.TraceID, size), + idToIndex: make(map[pcommon.TraceID]int), + } +} + +func (r *ringBuffer) put(traceID pcommon.TraceID) pcommon.TraceID { + // calculates the item in the ring that we'll store the trace + r.index = (r.index + 1) % r.size + + // see if the ring has an item already + evicted := r.ids[r.index] + + if !evicted.IsEmpty() { + // clear space for the new item + r.delete(evicted) + } + + // place the traceID in memory + r.ids[r.index] = traceID + r.idToIndex[traceID] = r.index + + return evicted +} + +func (r *ringBuffer) contains(traceID pcommon.TraceID) bool { + _, found := r.idToIndex[traceID] + return found +} + +func (r *ringBuffer) delete(traceID pcommon.TraceID) bool { + index, found := r.idToIndex[traceID] + if !found { + return false + } + + delete(r.idToIndex, traceID) + r.ids[index] = pcommon.NewTraceIDEmpty() + return true +} diff --git a/connector/tracedurationconnector/ring_buffer_test.go b/connector/tracedurationconnector/ring_buffer_test.go new file mode 100644 index 000000000000..2562184bd33b --- /dev/null +++ b/connector/tracedurationconnector/ring_buffer_test.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestRingBufferCapacity(t *testing.T) { + // prepare + buffer := newRingBuffer(5) + + // test + traceIDs := []pcommon.TraceID{ + pcommon.TraceID([16]byte{1, 2, 3, 4}), + pcommon.TraceID([16]byte{2, 3, 4, 5}), + pcommon.TraceID([16]byte{3, 4, 5, 6}), + pcommon.TraceID([16]byte{4, 5, 6, 7}), + pcommon.TraceID([16]byte{5, 6, 7, 8}), + pcommon.TraceID([16]byte{6, 7, 8, 9}), + } + for _, traceID := range traceIDs { + buffer.put(traceID) + } + + // verify + for i := 5; i > 0; i-- { // last 5 traces + traceID := traceIDs[i] + assert.True(t, buffer.contains(traceID)) + } + + // the first trace should have been evicted + assert.False(t, buffer.contains(traceIDs[0])) +} + +func TestDeleteFromBuffer(t *testing.T) { + // prepare + buffer := newRingBuffer(2) + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + buffer.put(traceID) + + // test + deleted := buffer.delete(traceID) + + // verify + assert.True(t, deleted) + assert.False(t, buffer.contains(traceID)) +} + +func TestDeleteNonExistingFromBuffer(t *testing.T) { + // prepare + buffer := newRingBuffer(2) + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + + // test + deleted := buffer.delete(traceID) + + // verify + assert.False(t, deleted) + assert.False(t, buffer.contains(traceID)) +} diff --git a/connector/tracedurationconnector/storage.go b/connector/tracedurationconnector/storage.go new file mode 100644 index 000000000000..18b1295b1ef9 --- /dev/null +++ b/connector/tracedurationconnector/storage.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// storage is an abstraction for the span storage used by the groupbytrace processor. +// Implementations should be safe for concurrent use. +type storage interface { + // createOrAppend will check whether the given trace ID is already in the storage and + // will either append the given spans to the existing record, or create a new trace with + // the given spans from trace + createOrAppend(pcommon.TraceID, ptrace.Traces) error + + // get will retrieve the trace based on the given trace ID, returning nil in case a trace + // cannot be found + get(pcommon.TraceID) ([]ptrace.ResourceSpans, error) + + // delete will remove the trace based on the given trace ID, returning the trace that was removed, + // or nil in case a trace cannot be found + delete(pcommon.TraceID) ([]ptrace.ResourceSpans, error) + + // start gives the storage the opportunity to initialize any resources or procedures + start() error + + // shutdown signals the storage that the processor is shutting down + shutdown() error +} diff --git a/connector/tracedurationconnector/storage_memory.go b/connector/tracedurationconnector/storage_memory.go new file mode 100644 index 000000000000..688b25b92ac1 --- /dev/null +++ b/connector/tracedurationconnector/storage_memory.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector" + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" +) + +type memoryStorage struct { + sync.RWMutex + content map[pcommon.TraceID][]ptrace.ResourceSpans + telemetry *metadata.TelemetryBuilder + stopped bool + stoppedLock sync.RWMutex + metricsCollectionInterval time.Duration +} + +var _ storage = (*memoryStorage)(nil) + +func newMemoryStorage(telemetry *metadata.TelemetryBuilder) *memoryStorage { + return &memoryStorage{ + content: make(map[pcommon.TraceID][]ptrace.ResourceSpans), + metricsCollectionInterval: time.Second, + telemetry: telemetry, + } +} + +func (st *memoryStorage) createOrAppend(traceID pcommon.TraceID, td ptrace.Traces) error { + st.Lock() + defer st.Unlock() + + // getting zero value is fine + content := st.content[traceID] + + newRss := ptrace.NewResourceSpansSlice() + td.ResourceSpans().CopyTo(newRss) + for i := 0; i < newRss.Len(); i++ { + content = append(content, newRss.At(i)) + } + st.content[traceID] = content + + return nil +} +func (st *memoryStorage) get(traceID pcommon.TraceID) ([]ptrace.ResourceSpans, error) { + st.RLock() + rss, ok := st.content[traceID] + st.RUnlock() + if !ok { + return nil, nil + } + + result := make([]ptrace.ResourceSpans, len(rss)) + for i, rs := range rss { + newRS := ptrace.NewResourceSpans() + rs.CopyTo(newRS) + result[i] = newRS + } + + return result, nil +} + +// delete will return a reference to a ResourceSpans. Changes to the returned object may not be applied +// to the version in the storage. +func (st *memoryStorage) delete(traceID pcommon.TraceID) ([]ptrace.ResourceSpans, error) { + st.Lock() + defer st.Unlock() + + defer delete(st.content, traceID) + return st.content[traceID], nil +} + +func (st *memoryStorage) start() error { + go st.periodicMetrics() + return nil +} + +func (st *memoryStorage) shutdown() error { + st.stoppedLock.Lock() + defer st.stoppedLock.Unlock() + st.stopped = true + return nil +} + +func (st *memoryStorage) periodicMetrics() { + numTraces := st.count() + st.telemetry.ProcessorGroupbytraceNumTracesInMemory.Record(context.Background(), int64(numTraces)) + + st.stoppedLock.RLock() + stopped := st.stopped + st.stoppedLock.RUnlock() + if stopped { + return + } + + time.AfterFunc(st.metricsCollectionInterval, func() { + st.periodicMetrics() + }) +} + +func (st *memoryStorage) count() int { + st.RLock() + defer st.RUnlock() + return len(st.content) +} diff --git a/connector/tracedurationconnector/storage_memory_test.go b/connector/tracedurationconnector/storage_memory_test.go new file mode 100644 index 000000000000..ada51be68222 --- /dev/null +++ b/connector/tracedurationconnector/storage_memory_test.go @@ -0,0 +1,158 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracedurationconnector + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/tracedurationconnector/internal/metadata" +) + +func TestMemoryCreateAndGetTrace(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) + st := newMemoryStorage(tel) + + traceIDs := []pcommon.TraceID{ + pcommon.TraceID([16]byte{1, 2, 3, 4}), + pcommon.TraceID([16]byte{2, 3, 4, 5}), + } + + baseTrace := ptrace.NewTraces() + rss := baseTrace.ResourceSpans() + rs := rss.AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + span := ils.Spans().AppendEmpty() + + // test + for _, traceID := range traceIDs { + span.SetTraceID(traceID) + assert.NoError(t, st.createOrAppend(traceID, baseTrace)) + } + + // verify + assert.Equal(t, 2, st.count()) + for _, traceID := range traceIDs { + expected := []ptrace.ResourceSpans{baseTrace.ResourceSpans().At(0)} + expected[0].ScopeSpans().At(0).Spans().At(0).SetTraceID(traceID) + + retrieved, err := st.get(traceID) + assert.NoError(t, st.createOrAppend(traceID, baseTrace)) + + require.NoError(t, err) + assert.Equal(t, expected, retrieved) + } +} + +func TestMemoryDeleteTrace(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + + trace := ptrace.NewTraces() + rss := trace.ResourceSpans() + rs := rss.AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + span := ils.Spans().AppendEmpty() + span.SetTraceID(traceID) + + assert.NoError(t, st.createOrAppend(traceID, trace)) + + // test + deleted, err := st.delete(traceID) + + // verify + require.NoError(t, err) + assert.Equal(t, []ptrace.ResourceSpans{trace.ResourceSpans().At(0)}, deleted) + + retrieved, err := st.get(traceID) + require.NoError(t, err) + assert.Nil(t, retrieved) +} + +func TestMemoryAppendSpans(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + + trace := ptrace.NewTraces() + rss := trace.ResourceSpans() + rs := rss.AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + span := ils.Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID([8]byte{1, 2, 3, 4}) + + assert.NoError(t, st.createOrAppend(traceID, trace)) + + secondTrace := ptrace.NewTraces() + secondRss := secondTrace.ResourceSpans() + secondRs := secondRss.AppendEmpty() + secondIls := secondRs.ScopeSpans().AppendEmpty() + secondSpan := secondIls.Spans().AppendEmpty() + secondSpan.SetName("second-name") + secondSpan.SetTraceID(traceID) + secondSpan.SetSpanID([8]byte{5, 6, 7, 8}) + + expected := []ptrace.ResourceSpans{ + ptrace.NewResourceSpans(), + ptrace.NewResourceSpans(), + } + ils.CopyTo(expected[0].ScopeSpans().AppendEmpty()) + secondIls.CopyTo(expected[1].ScopeSpans().AppendEmpty()) + + // test + err := st.createOrAppend(traceID, secondTrace) + require.NoError(t, err) + + // override something in the second span, to make sure we are storing a copy + secondSpan.SetName("changed-second-name") + + // verify + retrieved, err := st.get(traceID) + require.NoError(t, err) + require.Len(t, retrieved, 2) + assert.Equal(t, "second-name", retrieved[1].ScopeSpans().At(0).Spans().At(0).Name()) + + // now that we checked that the secondSpan change here didn't have an effect, revert + // so that we can compare the that everything else has the same value + secondSpan.SetName("second-name") + assert.Equal(t, expected, retrieved) +} + +func TestMemoryTraceIsBeingCloned(t *testing.T) { + set := processortest.NewNopSettings() + tel, _ := metadata.NewTelemetryBuilder(set.TelemetrySettings) // prepare + st := newMemoryStorage(tel) + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4}) + + trace := ptrace.NewTraces() + rss := trace.ResourceSpans() + rs := rss.AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + span := ils.Spans().AppendEmpty() + span.SetTraceID(traceID) + span.SetSpanID([8]byte{1, 2, 3, 4}) + span.SetName("should-not-be-changed") + + // test + err := st.createOrAppend(traceID, trace) + require.NoError(t, err) + span.SetName("changed-trace") + + // verify + retrieved, err := st.get(traceID) + require.NoError(t, err) + assert.Equal(t, "should-not-be-changed", retrieved[0].ScopeSpans().At(0).Spans().At(0).Name()) +} diff --git a/connector/tracedurationconnector/testdata/config.yaml b/connector/tracedurationconnector/testdata/config.yaml new file mode 100644 index 000000000000..b6580cb89f3a --- /dev/null +++ b/connector/tracedurationconnector/testdata/config.yaml @@ -0,0 +1,82 @@ +connectors: + traceduration/custom: + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +exporters: + prometheus: + endpoint: "0.0.0.0:8889" + const_labels: + label1: value1 + + debug: + debug/detailed: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 + + otlp: + endpoint: jaeger-all-in-one:4317 + tls: + insecure: true + + elasticsearch/log: + tls: + insecure: true + insecure_skip_verify: true + #endpoints: [https://172.16.222.21:30990] + endpoints: [http://localhost:9200] + logs_index: trace_duration + logstash_format: + enabled: true + timeout: 2m + user: elastic + password: z45va4uK7EIlX962ox23i9wv + discover: + on_start: true + flush: + bytes: 10485760 + retry: + max_requests: 5 + retry_on_status: + - 429 + - 500 + sending_queue: + enabled: true + +processors: + batch: + filter/ottl: + error_mode: ignore + traces: + span: + - 'attributes["container.name"] == "app_container_1"' + - 'resource.attributes["host.name"] == "localhost"' + - 'name == "app_3"' + +extensions: + health_check: + pprof: + endpoint: :1888 + zpages: + endpoint: :55679 + +service: + extensions: [pprof, zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch,filter/ottl] + exporters: [debug, traceduration/custom] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [debug, prometheus] + logs: + receivers: [traceduration/custom] + processors: [batch] + exporters: [elasticsearch/log]