diff --git a/.chloggen/file-exporter-group-by-attr.yaml b/.chloggen/file-exporter-group-by-attr.yaml new file mode 100644 index 000000000000..bf6447ca412c --- /dev/null +++ b/.chloggen/file-exporter-group-by-attr.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Added the option to write telemetry data into multiple files, where the file path is based on a resource attribute. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24654] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/fileexporter/README.md b/exporter/fileexporter/README.md index b83ef3cc2743..ce011b725c67 100644 --- a/exporter/fileexporter/README.md +++ b/exporter/fileexporter/README.md @@ -25,9 +25,9 @@ Exporter supports the following features: + Support for compressing the telemetry data before exporting. ++ Support for writing into multiple files, where the file path is determined by a resource attribute. Please note that there is no guarantee that exact field names will remain stable. -This intended for primarily for debugging Collector without setting up backends. The official [opentelemetry-collector-contrib container](https://hub.docker.com/r/otel/opentelemetry-collector-contrib/tags#!) does not have a writable filesystem by default since it's built on the `scratch` layer. As such, you will need to create a writable directory for the path, potentially by mounting writable volumes or creating a custom image. @@ -52,6 +52,11 @@ The following settings are optional: - `flush_interval`[default: 1s]: `time.Duration` interval between flushes. See [time.ParseDuration](https://pkg.go.dev/time#ParseDuration) for valid formats. NOTE: a value without unit is in nanoseconds and `flush_interval` is ignored and writes are not buffered if `rotation` is set. +- `group_by` enables writing to separate files based on a resource attribute. + - enabled: [default: false] enables group_by. When group_by is enabled, rotation setting is ignored. + - resource_attribute: [default: fileexporter.path_segment]: specifies the name of the resource attribute that contains the path segment of the file to write to. The final path will be the `path` config value, with the `*` replaced with the value of this resource attribute. + - max_open_files: [default: 100]: specifies the maximum number of open file descriptors for the output files. + ## File Rotation Telemetry data is exported to a single file by default. `fileexporter` only enables file rotation when the user specifies `rotation:` in the config. However, if specified, related default settings would apply. @@ -79,6 +84,15 @@ When `format` is json and `compression` is none , telemetry data is written to f Otherwise, when using `proto` format or any kind of encoding, each encoded object is preceded by 4 bytes (an unsigned 32 bit integer) which represent the number of bytes contained in the encoded object.When we need read the messages back in, we read the size, then read the bytes into a separate buffer, then parse from that buffer. +## Group by attribute + +By specifying `group_by.resource_attribute` in the config, the exporter will determine a filepath for each telemetry record, by substituting the value of the resource attribute into the `path` configuration value. + +The final path is guaranteed to start with the prefix part of the `path` config value (the part before the `*` character). For example if `path` is "/data/*.json", and the resource attribute value is "../etc/my_config", then the final path will be sanitized to "/data/etc/my_config.json". + +The final path can contain path separators (`/`). The exporter will create missing directories recursively (similarly to `mkdir -p`). + +Grouping by attribute currently only supports a **single** **resource** attribute. If you would like to use multiple attributes, please use [Transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) create a routing key. If you would like to use a non-resource level (eg: Log/Metric/DataPoint) attribute, please use [Group by Attributes processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/groupbyattrsprocessor) first. ## Example: diff --git a/exporter/fileexporter/config.go b/exporter/fileexporter/config.go index b3485c23d90f..8508b6b01d9e 100644 --- a/exporter/fileexporter/config.go +++ b/exporter/fileexporter/config.go @@ -6,6 +6,7 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "errors" "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" @@ -23,13 +24,14 @@ type Config struct { // Path of the file to write to. Path is relative to current directory. Path string `mapstructure:"path"` - // Mode defines whether the exporter should append to the file + // Mode defines whether the exporter should append to the file. // Options: // - false[default]: truncates the file // - true: appends to the file. Append bool `mapstructure:"append"` - // Rotation defines an option about rotation of telemetry files + // Rotation defines an option about rotation of telemetry files. Ignored + // when GroupByAttribute is used. Rotation *Rotation `mapstructure:"rotation"` // FormatType define the data format of encoded telemetry data @@ -45,6 +47,9 @@ type Config struct { // FlushInterval is the duration between flushes. // See time.ParseDuration for valid values. FlushInterval time.Duration `mapstructure:"flush_interval"` + + // GroupBy enables writing to separate files based on a resource attribute. + GroupBy *GroupBy `mapstructure:"group_by"` } // Rotation an option to rolling log files @@ -70,6 +75,21 @@ type Rotation struct { LocalTime bool `mapstructure:"localtime"` } +type GroupBy struct { + // Enables group_by. When group_by is enabled, rotation setting is ignored. Default is false. + Enabled bool `mapstructure:"enabled"` + + // ResourceAttribute specifies the name of the resource attribute that + // contains the path segment of the file to write to. The final path will be + // the Path config value, with the * replaced with the value of this resource + // attribute. Default is "fileexporter.path_segment". + ResourceAttribute string `mapstructure:"resource_attribute"` + + // MaxOpenFiles specifies the maximum number of open file descriptors for the output files. + // The default is 100. + MaxOpenFiles int `mapstructure:"max_open_files"` +} + var _ component.Config = (*Config)(nil) // Validate checks if the exporter configuration is valid @@ -92,6 +112,22 @@ func (cfg *Config) Validate() error { if cfg.FlushInterval < 0 { return errors.New("flush_interval must be larger than zero") } + + if cfg.GroupBy != nil && cfg.GroupBy.Enabled { + pathParts := strings.Split(cfg.Path, "*") + if len(pathParts) != 2 { + return errors.New("path must contain exatcly one * when group_by is enabled") + } + + if len(pathParts[0]) == 0 { + return errors.New("path must not start with * when group_by is enabled") + } + + if cfg.GroupBy.ResourceAttribute == "" { + return errors.New("resource_attribute must not be empty when group_by is enabled") + } + } + return nil } diff --git a/exporter/fileexporter/config_test.go b/exporter/fileexporter/config_test.go index c2bfdb17822b..2e588047406e 100644 --- a/exporter/fileexporter/config_test.go +++ b/exporter/fileexporter/config_test.go @@ -39,6 +39,10 @@ func TestLoadConfig(t *testing.T) { }, FormatType: formatTypeJSON, FlushInterval: time.Second, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -54,6 +58,10 @@ func TestLoadConfig(t *testing.T) { FormatType: formatTypeProto, Compression: compressionZSTD, FlushInterval: time.Second, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -65,6 +73,10 @@ func TestLoadConfig(t *testing.T) { MaxBackups: defaultMaxBackups, }, FlushInterval: time.Second, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -77,6 +89,10 @@ func TestLoadConfig(t *testing.T) { }, FormatType: formatTypeJSON, FlushInterval: time.Second, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -93,6 +109,10 @@ func TestLoadConfig(t *testing.T) { Path: "./flushed", FlushInterval: 5, FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -101,6 +121,10 @@ func TestLoadConfig(t *testing.T) { Path: "./flushed", FlushInterval: 5 * time.Second, FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -109,6 +133,10 @@ func TestLoadConfig(t *testing.T) { Path: "./flushed", FlushInterval: 500 * time.Millisecond, FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, }, }, { @@ -119,6 +147,44 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), errorMessage: "path must be non-empty", }, + { + id: component.NewIDWithName(metadata.Type, "group_by"), + expected: &Config{ + Path: "./group_by/*.json", + FlushInterval: time.Second, + FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: 10, + ResourceAttribute: "dummy", + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "group_by_defaults"), + expected: &Config{ + Path: "./group_by/*.json", + FlushInterval: time.Second, + FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: defaultMaxOpenFiles, + ResourceAttribute: defaultResourceAttribute, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "group_by_invalid_path"), + errorMessage: "path must contain exatcly one * when group_by is enabled", + }, + { + id: component.NewIDWithName(metadata.Type, "group_by_invalid_path2"), + errorMessage: "path must not start with * when group_by is enabled", + }, + { + id: component.NewIDWithName(metadata.Type, "group_by_empty_resource_attribute"), + errorMessage: "resource_attribute must not be empty when group_by is enabled", + }, } for _, tt := range tests { diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index 5f7a1aaa727e..6c05220697bf 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" "gopkg.in/natefinch/lumberjack.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter/internal/metadata" @@ -32,6 +33,10 @@ const ( // the type of compression codec compressionZSTD = "zstd" + + defaultMaxOpenFiles = 100 + + defaultResourceAttribute = "fileexporter.path_segment" ) type FileExporter interface { @@ -55,6 +60,10 @@ func createDefaultConfig() component.Config { return &Config{ FormatType: formatTypeJSON, Rotation: &Rotation{MaxBackups: defaultMaxBackups}, + GroupBy: &GroupBy{ + ResourceAttribute: defaultResourceAttribute, + MaxOpenFiles: defaultMaxOpenFiles, + }, } } @@ -63,7 +72,7 @@ func createTracesExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Traces, error) { - fe := getOrCreateFileExporter(cfg) + fe := getOrCreateFileExporter(cfg, set.Logger) return exporterhelper.NewTracesExporter( ctx, set, @@ -80,7 +89,7 @@ func createMetricsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Metrics, error) { - fe := getOrCreateFileExporter(cfg) + fe := getOrCreateFileExporter(cfg, set.Logger) return exporterhelper.NewMetricsExporter( ctx, set, @@ -97,7 +106,7 @@ func createLogsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Logs, error) { - fe := getOrCreateFileExporter(cfg) + fe := getOrCreateFileExporter(cfg, set.Logger) return exporterhelper.NewLogsExporter( ctx, set, @@ -113,20 +122,28 @@ func createLogsExporter( // or returns the already cached one. Caching is required because the factory is asked trace and // metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver() // but they must not create separate objects, they must use one Exporter object per configuration. -func getOrCreateFileExporter(cfg component.Config) FileExporter { +func getOrCreateFileExporter(cfg component.Config, logger *zap.Logger) FileExporter { conf := cfg.(*Config) fe := exporters.GetOrAdd(cfg, func() component.Component { - return newFileExporter(conf) + return newFileExporter(conf, logger) }) c := fe.Unwrap() return c.(FileExporter) } -func newFileExporter(conf *Config) FileExporter { - return &fileExporter{ - conf: conf, +func newFileExporter(conf *Config, logger *zap.Logger) FileExporter { + if conf.GroupBy == nil || !conf.GroupBy.Enabled { + return &fileExporter{ + conf: conf, + } + } + + return &groupingFileExporter{ + conf: conf, + logger: logger, } + } func newFileWriter(path string, shouldAppend bool, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) { diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index d37478c540d8..e3b6a2f6b095 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -45,14 +45,7 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error { // Start starts the flush timer if set. func (e *fileExporter) Start(_ context.Context, _ component.Host) error { - e.marshaller = &marshaller{ - formatType: e.conf.FormatType, - tracesMarshaler: tracesMarshalers[e.conf.FormatType], - metricsMarshaler: metricsMarshalers[e.conf.FormatType], - logsMarshaler: logsMarshalers[e.conf.FormatType], - compression: e.conf.Compression, - compressor: buildCompressor(e.conf.Compression), - } + e.marshaller = newMarshaller(e.conf) export := buildExportFunc(e.conf) var err error diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index e1d6d5206b44..cefc5747f775 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" "gopkg.in/natefinch/lumberjack.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" @@ -127,7 +128,7 @@ func TestFileTracesExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { conf := tt.args.conf - feI := newFileExporter(conf) + feI := newFileExporter(conf, zap.NewNop()) require.IsType(t, &fileExporter{}, feI) fe := feI.(*fileExporter) @@ -633,7 +634,7 @@ func TestFlushing(t *testing.T) { // Wrap the buffer with the buffered writer closer that implements flush() method. bwc := newBufferedWriteCloser(buf) // Create a file exporter with flushing enabled. - feI := newFileExporter(cfg) + feI := newFileExporter(cfg, zap.NewNop()) assert.IsType(t, &fileExporter{}, feI) fe := feI.(*fileExporter) @@ -688,7 +689,7 @@ func TestAppend(t *testing.T) { // Wrap the buffer with the buffered writer closer that implements flush() method. bwc := newBufferedWriteCloser(buf) // Create a file exporter with flushing enabled. - feI := newFileExporter(cfg) + feI := newFileExporter(cfg, zap.NewNop()) assert.IsType(t, &fileExporter{}, feI) fe := feI.(*fileExporter) diff --git a/exporter/fileexporter/go.mod b/exporter/fileexporter/go.mod index 64d3b68e2113..04ff79bdeb90 100644 --- a/exporter/fileexporter/go.mod +++ b/exporter/fileexporter/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileex go 1.21 require ( + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/klauspost/compress v1.17.7 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.96.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.96.0 @@ -15,6 +16,7 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) @@ -51,7 +53,6 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/exporter/fileexporter/go.sum b/exporter/fileexporter/go.sum index 7bfd9330a646..dad2fce4cc43 100644 --- a/exporter/fileexporter/go.sum +++ b/exporter/fileexporter/go.sum @@ -23,6 +23,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/fileexporter/grouping_file_exporter.go b/exporter/fileexporter/grouping_file_exporter.go new file mode 100644 index 000000000000..10a7385b05fb --- /dev/null +++ b/exporter/fileexporter/grouping_file_exporter.go @@ -0,0 +1,285 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" + +import ( + "context" + "errors" + "fmt" + "os" + "path" + "strings" + "sync" + + "github.com/hashicorp/golang-lru/v2/simplelru" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +type groupingFileExporter struct { + conf *Config + logger *zap.Logger + marshaller *marshaller + pathPrefix string + pathSuffix string + attribute string + maxOpenFiles int + newFileWriter func(path string) (*fileWriter, error) + + mutex sync.Mutex + writers *simplelru.LRU[string, *fileWriter] +} + +func (e *groupingFileExporter) consumeTraces(ctx context.Context, td ptrace.Traces) error { + if td.ResourceSpans().Len() == 0 { + return nil + } + + groups := make(map[string][]ptrace.ResourceSpans) + + for i := 0; i < td.ResourceSpans().Len(); i++ { + rSpans := td.ResourceSpans().At(i) + group(e, groups, rSpans.Resource(), rSpans) + } + + var errs error + for pathSegment, rSpansSlice := range groups { + traces := ptrace.NewTraces() + for _, rSpans := range rSpansSlice { + rSpans.CopyTo(traces.ResourceSpans().AppendEmpty()) + } + + buf, err := e.marshaller.marshalTraces(traces) + if err != nil { + errs = errors.Join(errs, err) + continue + } + + err = e.write(ctx, pathSegment, buf) + if err != nil { + errs = errors.Join(errs, err) + } + } + + if errs != nil { + return consumererror.NewPermanent(errs) + } + + return nil +} + +func (e *groupingFileExporter) consumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if md.ResourceMetrics().Len() == 0 { + return nil + } + + groups := make(map[string][]pmetric.ResourceMetrics) + + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rMetrics := md.ResourceMetrics().At(i) + group(e, groups, rMetrics.Resource(), rMetrics) + } + + var errs error + for pathSegment, rMetricsSlice := range groups { + metrics := pmetric.NewMetrics() + for _, rMetrics := range rMetricsSlice { + rMetrics.CopyTo(metrics.ResourceMetrics().AppendEmpty()) + } + + buf, err := e.marshaller.marshalMetrics(metrics) + if err != nil { + errs = errors.Join(errs, err) + continue + } + + err = e.write(ctx, pathSegment, buf) + if err != nil { + errs = errors.Join(errs, err) + } + } + + if errs != nil { + return consumererror.NewPermanent(errs) + } + + return nil +} + +func (e *groupingFileExporter) consumeLogs(ctx context.Context, ld plog.Logs) error { + if ld.ResourceLogs().Len() == 0 { + return nil + } + + groups := make(map[string][]plog.ResourceLogs) + + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rLogs := ld.ResourceLogs().At(i) + group(e, groups, rLogs.Resource(), rLogs) + } + + var errs error + for pathSegment, rLogsSlice := range groups { + logs := plog.NewLogs() + for _, rlogs := range rLogsSlice { + rlogs.CopyTo(logs.ResourceLogs().AppendEmpty()) + } + + buf, err := e.marshaller.marshalLogs(logs) + if err != nil { + errs = errors.Join(errs, err) + continue + } + + err = e.write(ctx, pathSegment, buf) + if err != nil { + errs = errors.Join(errs, err) + } + } + + if errs != nil { + return consumererror.NewPermanent(errs) + } + + return nil +} + +func (e *groupingFileExporter) write(_ context.Context, pathSegment string, buf []byte) error { + writer, err := e.getWriter(pathSegment) + if err != nil { + return err + } + + err = writer.export(buf) + if err != nil { + return err + } + + return nil +} + +func (e *groupingFileExporter) getWriter(pathSegment string) (*fileWriter, error) { + fullPath := e.fullPath(pathSegment) + + e.mutex.Lock() + defer e.mutex.Unlock() + + writer, ok := e.writers.Get(fullPath) + if ok { + return writer, nil + } + + err := os.MkdirAll(path.Dir(fullPath), 0755) + if err != nil { + return nil, err + } + + writer, err = e.newFileWriter(fullPath) + if err != nil { + return nil, err + } + + e.writers.Add(fullPath, writer) + + writer.start() + + return writer, nil +} + +func cleanPathPrefix(pathPrefix string) string { + cleaned := path.Clean(pathPrefix) + if strings.HasSuffix(pathPrefix, "/") && !strings.HasSuffix(cleaned, "/") { + return cleaned + "/" + } + + return cleaned +} + +func (e *groupingFileExporter) fullPath(pathSegment string) string { + if strings.HasPrefix(pathSegment, "./") { + pathSegment = pathSegment[1:] + } else if strings.HasPrefix(pathSegment, "../") { + pathSegment = pathSegment[2:] + } + + fullPath := path.Clean(e.pathPrefix + pathSegment + e.pathSuffix) + if strings.HasPrefix(fullPath, e.pathPrefix) { + return fullPath + } + + // avoid path traversal vulnerability + return path.Join(e.pathPrefix, path.Join("/", pathSegment+e.pathSuffix)) +} + +func (e *groupingFileExporter) onEvict(_ string, writer *fileWriter) { + err := writer.shutdown() + if err != nil { + e.logger.Warn("Failed to close file", zap.Error(err), zap.String("path", writer.path)) + } +} + +func group[T any](e *groupingFileExporter, groups map[string][]T, resource pcommon.Resource, resourceEntries T) { + var pathSegment string + v, ok := resource.Attributes().Get(e.attribute) + if ok { + if v.Type() == pcommon.ValueTypeStr { + pathSegment = v.Str() + } else { + ok = false + } + } + + if !ok { + e.logger.Debug(fmt.Sprintf("Resource does not contain %s attribute, dropping it", e.attribute)) + return + } + + groups[pathSegment] = append(groups[pathSegment], resourceEntries) +} + +// Start initializes and starts the exporter. +func (e *groupingFileExporter) Start(context.Context, component.Host) error { + e.marshaller = newMarshaller(e.conf) + export := buildExportFunc(e.conf) + + pathParts := strings.Split(e.conf.Path, "*") + + e.pathPrefix = cleanPathPrefix(pathParts[0]) + e.attribute = e.conf.GroupBy.ResourceAttribute + e.pathSuffix = pathParts[1] + e.maxOpenFiles = e.conf.GroupBy.MaxOpenFiles + e.newFileWriter = func(path string) (*fileWriter, error) { + return newFileWriter(path, e.conf.Append, nil, e.conf.FlushInterval, export) + } + + writers, err := simplelru.NewLRU(e.conf.GroupBy.MaxOpenFiles, e.onEvict) + if err != nil { + return err + } + + e.writers = writers + + return nil +} + +// Shutdown stops the exporter and is invoked during shutdown. +// It stops flushes and closes all underlying writers. +func (e *groupingFileExporter) Shutdown(context.Context) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + if e.writers == nil { + return nil + } + + e.writers.Purge() + e.writers = nil + + return nil +} diff --git a/exporter/fileexporter/grouping_file_exporter_test.go b/exporter/fileexporter/grouping_file_exporter_test.go new file mode 100644 index 000000000000..53f1b06a7382 --- /dev/null +++ b/exporter/fileexporter/grouping_file_exporter_test.go @@ -0,0 +1,499 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package fileexporter + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +type testMarshaller struct { + content []byte +} + +func (m *testMarshaller) MarshalTraces(ptrace.Traces) ([]byte, error) { + return m.content, nil +} + +func (m *testMarshaller) MarshalLogs(plog.Logs) ([]byte, error) { + return m.content, nil +} + +func (m *testMarshaller) MarshalMetrics(pmetric.Metrics) ([]byte, error) { + return m.content, nil +} + +type groupingExporterTestCase struct { + name string + conf *Config + traceUnmarshaler ptrace.Unmarshaler + logUnmarshaler plog.Unmarshaler + metricUnmarshaler pmetric.Unmarshaler +} + +func groupingExporterTestCases() []groupingExporterTestCase { + return []groupingExporterTestCase{ + { + name: "json: default configuration", + conf: &Config{ + FormatType: formatTypeJSON, + Rotation: &Rotation{MaxBackups: defaultMaxBackups}, + GroupBy: &GroupBy{ + Enabled: true, + // defaults: + ResourceAttribute: defaultResourceAttribute, + MaxOpenFiles: defaultMaxOpenFiles, + }, + }, + traceUnmarshaler: &ptrace.JSONUnmarshaler{}, + logUnmarshaler: &plog.JSONUnmarshaler{}, + metricUnmarshaler: &pmetric.JSONUnmarshaler{}, + }, + { + name: "json: compression configuration", + conf: &Config{ + FormatType: formatTypeJSON, + Compression: compressionZSTD, + Rotation: &Rotation{MaxBackups: defaultMaxBackups}, + GroupBy: &GroupBy{ + Enabled: true, + // defaults: + ResourceAttribute: defaultResourceAttribute, + MaxOpenFiles: defaultMaxOpenFiles, + }, + }, + traceUnmarshaler: &ptrace.JSONUnmarshaler{}, + logUnmarshaler: &plog.JSONUnmarshaler{}, + metricUnmarshaler: &pmetric.JSONUnmarshaler{}, + }, + { + name: "Proto: default configuration", + conf: &Config{ + FormatType: formatTypeProto, + GroupBy: &GroupBy{ + Enabled: true, + // defaults: + ResourceAttribute: defaultResourceAttribute, + MaxOpenFiles: defaultMaxOpenFiles, + }, + }, + traceUnmarshaler: &ptrace.ProtoUnmarshaler{}, + logUnmarshaler: &plog.ProtoUnmarshaler{}, + metricUnmarshaler: &pmetric.ProtoUnmarshaler{}, + }, + { + name: "Proto: compression configuration", + conf: &Config{ + FormatType: formatTypeProto, + Compression: compressionZSTD, + Rotation: &Rotation{MaxBackups: defaultMaxBackups}, + GroupBy: &GroupBy{ + Enabled: true, + // defaults: + ResourceAttribute: defaultResourceAttribute, + MaxOpenFiles: defaultMaxOpenFiles, + }, + }, + traceUnmarshaler: &ptrace.ProtoUnmarshaler{}, + logUnmarshaler: &plog.ProtoUnmarshaler{}, + metricUnmarshaler: &pmetric.ProtoUnmarshaler{}, + }, + { + name: "json: max_open_files=1", + conf: &Config{ + FormatType: formatTypeJSON, + Rotation: &Rotation{MaxBackups: defaultMaxBackups}, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: 1, + // defaults: + ResourceAttribute: defaultResourceAttribute, + }, + }, + traceUnmarshaler: &ptrace.JSONUnmarshaler{}, + logUnmarshaler: &plog.JSONUnmarshaler{}, + metricUnmarshaler: &pmetric.JSONUnmarshaler{}, + }, + } +} + +func TestGroupingFileTracesExporter(t *testing.T) { + for _, tt := range groupingExporterTestCases() { + t.Run(tt.name, func(t *testing.T) { + conf := tt.conf + tmpDir := t.TempDir() + conf.Path = tmpDir + "/*.log" + zapCore, logs := observer.New(zap.DebugLevel) + feI := newFileExporter(conf, zap.New(zapCore)) + require.IsType(t, &groupingFileExporter{}, feI) + gfe := feI.(*groupingFileExporter) + + testSpans := func() ptrace.Traces { + td := testdata.GenerateTracesTwoSpansSameResourceOneDifferent() + testdata.GenerateTracesOneSpan().ResourceSpans().At(0).CopyTo(td.ResourceSpans().AppendEmpty()) + td.ResourceSpans().At(0).Resource().Attributes().PutStr("fileexporter.path_segment", "one") + td.ResourceSpans().At(1).Resource().Attributes().PutStr("fileexporter.path_segment", ".././two/two") + return td + } + td := testSpans() + + assert.NoError(t, gfe.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, gfe.consumeTraces(context.Background(), td)) + assert.LessOrEqual(t, gfe.writers.Len(), conf.GroupBy.MaxOpenFiles) + + assert.NoError(t, gfe.Shutdown(context.Background())) + + // make sure the exporter did not modify any data + assert.Equal(t, testSpans(), td) + + debugLogs := logs.FilterLevelExact(zap.DebugLevel) + assert.Equal(t, 1, debugLogs.Len()) + assert.Equal(t, 0, logs.Len()-debugLogs.Len()) + + pathResourceSpans := map[string][]ptrace.ResourceSpans{ + tmpDir + "/one.log": {td.ResourceSpans().At(0)}, + tmpDir + "/two/two.log": {td.ResourceSpans().At(1)}, + } + + for path, wantResourceSpans := range pathResourceSpans { + fi, err := os.Open(path) + if len(wantResourceSpans) == 0 { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + br := bufio.NewReader(fi) + for { + buf, isEnd, err := func() ([]byte, bool, error) { + if gfe.marshaller.formatType == formatTypeJSON && gfe.marshaller.compression == "" { + return readJSONMessage(br) + } + return readMessageFromStream(br) + }() + assert.NoError(t, err) + if isEnd { + break + } + decoder := buildUnCompressor(gfe.marshaller.compression) + buf, err = decoder(buf) + assert.NoError(t, err) + got, err := tt.traceUnmarshaler.UnmarshalTraces(buf) + assert.NoError(t, err) + + gotResourceSpans := make([]ptrace.ResourceSpans, 0) + for i := 0; i < got.ResourceSpans().Len(); i++ { + gotResourceSpans = append(gotResourceSpans, got.ResourceSpans().At(i)) + } + + assert.EqualValues(t, wantResourceSpans, gotResourceSpans) + } + fi.Close() + } + }) + } +} + +func TestGroupingFileLogsExporter(t *testing.T) { + for _, tt := range groupingExporterTestCases() { + t.Run(tt.name, func(t *testing.T) { + conf := tt.conf + tmpDir := t.TempDir() + conf.Path = tmpDir + "/*.log" + zapCore, logs := observer.New(zap.DebugLevel) + feI := newFileExporter(conf, zap.New(zapCore)) + require.IsType(t, &groupingFileExporter{}, feI) + gfe := feI.(*groupingFileExporter) + + testLogs := func() plog.Logs { + td := testdata.GenerateLogsTwoLogRecordsSameResource() + testdata.GenerateLogsOneLogRecord().ResourceLogs().At(0).CopyTo(td.ResourceLogs().AppendEmpty()) + testdata.GenerateLogsOneLogRecord().ResourceLogs().At(0).CopyTo(td.ResourceLogs().AppendEmpty()) + td.ResourceLogs().At(0).Resource().Attributes().PutStr("fileexporter.path_segment", "one") + td.ResourceLogs().At(1).Resource().Attributes().PutStr("fileexporter.path_segment", ".././two/two") + return td + } + td := testLogs() + + assert.NoError(t, gfe.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, gfe.consumeLogs(context.Background(), td)) + assert.LessOrEqual(t, gfe.writers.Len(), conf.GroupBy.MaxOpenFiles) + + assert.NoError(t, gfe.Shutdown(context.Background())) + + // make sure the exporter did not modify any data + assert.Equal(t, testLogs(), td) + + debugLogs := logs.FilterLevelExact(zap.DebugLevel) + assert.Equal(t, 1, debugLogs.Len()) + assert.Equal(t, 0, logs.Len()-debugLogs.Len()) + + pathResourceLogs := map[string][]plog.ResourceLogs{ + tmpDir + "/one.log": {td.ResourceLogs().At(0)}, + tmpDir + "/two/two.log": {td.ResourceLogs().At(1)}, + } + + for path, wantResourceLogs := range pathResourceLogs { + fi, err := os.Open(path) + if len(wantResourceLogs) == 0 { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + br := bufio.NewReader(fi) + for { + buf, isEnd, err := func() ([]byte, bool, error) { + if gfe.marshaller.formatType == formatTypeJSON && gfe.marshaller.compression == "" { + return readJSONMessage(br) + } + return readMessageFromStream(br) + }() + assert.NoError(t, err) + if isEnd { + break + } + decoder := buildUnCompressor(gfe.marshaller.compression) + buf, err = decoder(buf) + assert.NoError(t, err) + got, err := tt.logUnmarshaler.UnmarshalLogs(buf) + assert.NoError(t, err) + + gotResourceLogs := make([]plog.ResourceLogs, 0) + for i := 0; i < got.ResourceLogs().Len(); i++ { + gotResourceLogs = append(gotResourceLogs, got.ResourceLogs().At(i)) + } + + assert.EqualValues(t, wantResourceLogs, gotResourceLogs) + } + fi.Close() + } + }) + } +} + +func TestGroupingFileMetricsExporter(t *testing.T) { + for _, tt := range groupingExporterTestCases() { + t.Run(tt.name, func(t *testing.T) { + conf := tt.conf + tmpDir := t.TempDir() + conf.Path = tmpDir + "/*.log" + + zapCore, logs := observer.New(zap.DebugLevel) + feI := newFileExporter(conf, zap.New(zapCore)) + require.IsType(t, &groupingFileExporter{}, feI) + gfe := feI.(*groupingFileExporter) + + testMetrics := func() pmetric.Metrics { + td := testdata.GenerateMetricsTwoMetrics() + testdata.GenerateMetricsOneCounterOneSummaryMetrics().ResourceMetrics().At(0).CopyTo(td.ResourceMetrics().AppendEmpty()) + testdata.GenerateMetricsOneMetricNoAttributes().ResourceMetrics().At(0).CopyTo(td.ResourceMetrics().AppendEmpty()) + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("fileexporter.path_segment", "one") + td.ResourceMetrics().At(1).Resource().Attributes().PutStr("fileexporter.path_segment", ".././two/two") + return td + } + td := testMetrics() + + assert.NoError(t, gfe.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, gfe.consumeMetrics(context.Background(), td)) + assert.LessOrEqual(t, gfe.writers.Len(), conf.GroupBy.MaxOpenFiles) + + assert.NoError(t, gfe.Shutdown(context.Background())) + + // make sure the exporter did not modify any data + assert.Equal(t, testMetrics(), td) + + debugLogs := logs.FilterLevelExact(zap.DebugLevel) + assert.Equal(t, 1, debugLogs.Len()) + assert.Equal(t, 0, logs.Len()-debugLogs.Len()) + + pathResourceMetrics := map[string][]pmetric.ResourceMetrics{ + tmpDir + "/one.log": {td.ResourceMetrics().At(0)}, + tmpDir + "/two/two.log": {td.ResourceMetrics().At(1)}, + } + + for path, wantResourceMetrics := range pathResourceMetrics { + fi, err := os.Open(path) + if len(wantResourceMetrics) == 0 { + assert.Error(t, err) + continue + } + assert.NoError(t, err) + br := bufio.NewReader(fi) + for { + buf, isEnd, err := func() ([]byte, bool, error) { + if gfe.marshaller.formatType == formatTypeJSON && gfe.marshaller.compression == "" { + return readJSONMessage(br) + } + return readMessageFromStream(br) + }() + assert.NoError(t, err) + if isEnd { + break + } + decoder := buildUnCompressor(gfe.marshaller.compression) + buf, err = decoder(buf) + assert.NoError(t, err) + got, err := tt.metricUnmarshaler.UnmarshalMetrics(buf) + assert.NoError(t, err) + + gotResourceMetrics := make([]pmetric.ResourceMetrics, 0) + for i := 0; i < got.ResourceMetrics().Len(); i++ { + gotResourceMetrics = append(gotResourceMetrics, got.ResourceMetrics().At(i)) + } + + assert.EqualValues(t, wantResourceMetrics, gotResourceMetrics) + } + fi.Close() + } + }) + } +} + +func TestFullPath(t *testing.T) { + tests := []struct { + prefix string + pathSegment string + suffix string + want string + }{ + // good actor + {prefix: "/", pathSegment: "filename", suffix: ".json", want: "/filename.json"}, + {prefix: "/", pathSegment: "/dir/filename", suffix: ".json", want: "/dir/filename.json"}, + {prefix: "/dir", pathSegment: "dirsuffix/filename", suffix: ".json", want: "/dirdirsuffix/filename.json"}, + {prefix: "/dir", pathSegment: "/subdir/filename", suffix: ".json", want: "/dir/subdir/filename.json"}, + {prefix: "/dir", pathSegment: "./filename", suffix: ".json", want: "/dir/filename.json"}, + {prefix: "/dir", pathSegment: "/subdir/", suffix: "filename.json", want: "/dir/subdir/filename.json"}, + {prefix: "/dir/", pathSegment: "subdir", suffix: "/filename.json", want: "/dir/subdir/filename.json"}, + {prefix: "/dir", pathSegment: "", suffix: "filename.json", want: "/dirfilename.json"}, + {prefix: "/dir/", pathSegment: "", suffix: "filename.json", want: "/dir/filename.json"}, + {prefix: "/dir/", pathSegment: "subdir/strangebutok/../", suffix: "filename.json", want: "/dir/subdir/filename.json"}, + {prefix: "/dir", pathSegment: "dirsuffix/strangebutok/../", suffix: "filename.json", want: "/dirdirsuffix/filename.json"}, + + // bad actor + {prefix: "/dir", pathSegment: "../etc/attack", suffix: ".json", want: "/dir/etc/attack.json"}, + {prefix: "/dir", pathSegment: "../etc/attack", suffix: "/filename.json", want: "/dir/etc/attack/filename.json"}, + {prefix: "/dir", pathSegment: "dirsuffix/../etc/attack", suffix: ".json", want: "/dir/etc/attack.json"}, + {prefix: "/dir", pathSegment: "dirsuffix/../../etc/attack", suffix: ".json", want: "/dir/etc/attack.json"}, + {prefix: "/dir", pathSegment: "dirsuffix/../../etc/attack", suffix: ".json", want: "/dir/etc/attack.json"}, + } + + for _, tc := range tests { + t.Run(fmt.Sprintf("%s + %s + %s", tc.prefix, tc.pathSegment, tc.suffix), func(t *testing.T) { + e := &groupingFileExporter{ + pathPrefix: cleanPathPrefix(tc.prefix), + pathSuffix: tc.suffix, + } + + assert.Equal(t, tc.want, e.fullPath(tc.pathSegment)) + }) + } +} + +func BenchmarkExporters(b *testing.B) { + tests := []struct { + name string + conf *Config + }{ + { + name: "default", + conf: &Config{ + Path: tempFileName(b), + FormatType: formatTypeJSON, + }, + }, + { + name: "grouping, 100 writers", + conf: &Config{ + Path: b.TempDir() + "/*", + FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: 100, + }, + }, + }, + { + name: "grouping, 99 writers", + conf: &Config{ + Path: b.TempDir() + "/*", + FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: 99, + }, + }, + }, + { + name: "grouping, 1 writer", + conf: &Config{ + Path: b.TempDir() + "/*", + FormatType: formatTypeJSON, + GroupBy: &GroupBy{ + Enabled: true, + MaxOpenFiles: 1, + }, + }, + }, + } + + var traces []ptrace.Traces + var logs []plog.Logs + for i := 0; i < 100; i++ { + td := testdata.GenerateTracesTwoSpansSameResource() + td.ResourceSpans().At(0).Resource().Attributes().PutStr("fileexporter.path_segment", fmt.Sprintf("file%d", i)) + traces = append(traces, td) + + ld := testdata.GenerateLogsTwoLogRecordsSameResource() + ld.ResourceLogs().At(0).Resource().Attributes().PutStr("fileexporter.path_segment", fmt.Sprintf("file%d", i)) + logs = append(logs, ld) + } + for _, tc := range tests { + fe := newFileExporter(tc.conf, zap.NewNop()) + + // remove marshaling time from the benchmark + tm := &testMarshaller{content: bytes.Repeat([]byte{'a'}, 512)} + marshaller := &marshaller{ + tracesMarshaler: tm, + metricsMarshaler: tm, + logsMarshaler: tm, + compression: "", + compressor: noneCompress, + formatType: "test", + } + switch fExp := fe.(type) { + case *fileExporter: + fExp.marshaller = marshaller + case *groupingFileExporter: + fExp.marshaller = marshaller + } + + require.NoError(b, fe.Start(context.Background(), componenttest.NewNopHost())) + + b.Run(tc.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + ctx := context.Background() + for i := 0; i < b.N; i++ { + require.NoError(b, fe.consumeTraces(ctx, traces[i%len(traces)])) + require.NoError(b, fe.consumeLogs(ctx, logs[i%len(logs)])) + } + }) + + assert.NoError(b, fe.Shutdown(context.Background())) + } +} diff --git a/exporter/fileexporter/marshaller.go b/exporter/fileexporter/marshaller.go index 1eabb80369e4..d45302d86aea 100644 --- a/exporter/fileexporter/marshaller.go +++ b/exporter/fileexporter/marshaller.go @@ -34,6 +34,17 @@ type marshaller struct { formatType string } +func newMarshaller(conf *Config) *marshaller { + return &marshaller{ + formatType: conf.FormatType, + tracesMarshaler: tracesMarshalers[conf.FormatType], + metricsMarshaler: metricsMarshalers[conf.FormatType], + logsMarshaler: logsMarshalers[conf.FormatType], + compression: conf.Compression, + compressor: buildCompressor(conf.Compression), + } +} + func (m *marshaller) marshalTraces(td ptrace.Traces) ([]byte, error) { buf, err := m.tracesMarshaler.MarshalTraces(td) if err != nil { diff --git a/exporter/fileexporter/testdata/config.yaml b/exporter/fileexporter/testdata/config.yaml index cac55b48ae83..fd7b6445ad4f 100644 --- a/exporter/fileexporter/testdata/config.yaml +++ b/exporter/fileexporter/testdata/config.yaml @@ -55,3 +55,31 @@ file/flush_interval_500ms: file/flush_interval_negative_value: path: ./flushed flush_interval: "-1s" + +file/group_by: + path: ./group_by/*.json + group_by: + enabled: true + resource_attribute: dummy + max_open_files: 10 + +file/group_by_defaults: + path: ./group_by/*.json + group_by: + enabled: true + +file/group_by_invalid_path: + path: ./group_by_no_star + group_by: + enabled: true + +file/group_by_invalid_path2: + path: '*/./group_by' + group_by: + enabled: true + +file/group_by_empty_resource_attribute: + path: ./group_by/*.json + group_by: + enabled: true + resource_attribute: ""