Skip to content

Commit

Permalink
support for Timestamp in file outputter path (#38029) (#38538)
Browse files Browse the repository at this point in the history
* support for TIME_NOW var

* fix import ordering

* add documentation

* add unit tests

* fix linting error

* path is a date time formatter

* fmt

* remove unneeded

* update docs

* add UTC

* improve message

* change link

* create new Unpacker

* fmt

* description of the use case

* unit tests

(cherry picked from commit e067577)

Co-authored-by: Yoel Spotts <[email protected]>
Co-authored-by: Craig MacKenzie <[email protected]>
  • Loading branch information
3 people authored Mar 22, 2024
1 parent 2b50d68 commit 26aad5d
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 16 deletions.
28 changes: 20 additions & 8 deletions libbeat/outputs/fileout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
)

type fileOutConfig struct {
Path string `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
Queue config.Namespace `config:"queue"`
Path *PathFormatString `config:"path"`
Filename string `config:"filename"`
RotateEveryKb uint `config:"rotate_every_kb" validate:"min=1"`
NumberOfFiles uint `config:"number_of_files"`
Codec codec.Config `config:"codec"`
Permissions uint32 `config:"permissions"`
RotateOnStartup bool `config:"rotate_on_startup"`
Queue config.Namespace `config:"queue"`
}

func defaultConfig() fileOutConfig {
Expand All @@ -45,6 +45,18 @@ func defaultConfig() fileOutConfig {
}
}

func readConfig(cfg *config.C) (*fileOutConfig, error) {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
return nil, err
}

// disable bulk support in publisher pipeline
_ = cfg.SetInt("bulk_max_size", -1, -1)

return &foConfig, nil
}

func (c *fileOutConfig) Validate() error {
if c.NumberOfFiles < 2 || c.NumberOfFiles > file.MaxBackupsLimit {
return fmt.Errorf("the number_of_files to keep should be between 2 and %v",
Expand Down
100 changes: 100 additions & 0 deletions libbeat/outputs/fileout/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fileout

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

func TestConfig(t *testing.T) {
for name, test := range map[string]struct {
config *config.C
useWindowsPath bool
assertion func(t *testing.T, config *fileOutConfig, err error)
}{
"default config": {
config: config.MustNewConfigFrom([]byte(`{ }`)),
assertion: func(t *testing.T, actual *fileOutConfig, err error) {
expectedConfig := &fileOutConfig{
NumberOfFiles: 7,
RotateEveryKb: 10 * 1024,
Permissions: 0600,
RotateOnStartup: true,
}

assert.Equal(t, expectedConfig, actual)
assert.Nil(t, err)
},
},
"config given with posix path": {
config: config.MustNewConfigFrom(mapstr.M{
"number_of_files": 10,
"rotate_every_kb": 5 * 1024,
"path": "/tmp/packetbeat/%{+yyyy-MM-dd-mm-ss-SSSSSS}",
"filename": "pb",
}),
assertion: func(t *testing.T, actual *fileOutConfig, err error) {
assert.Equal(t, uint(10), actual.NumberOfFiles)
assert.Equal(t, uint(5*1024), actual.RotateEveryKb)
assert.Equal(t, true, actual.RotateOnStartup)
assert.Equal(t, uint32(0600), actual.Permissions)
assert.Equal(t, "pb", actual.Filename)

path, runErr := actual.Path.Run(time.Date(2024, 1, 2, 3, 4, 5, 67890, time.UTC))
assert.Nil(t, runErr)

assert.Equal(t, "/tmp/packetbeat/2024-01-02-04-05-000067", path)
assert.Nil(t, err)
},
},
"config given with windows path": {
useWindowsPath: true,
config: config.MustNewConfigFrom(mapstr.M{
"number_of_files": 10,
"rotate_every_kb": 5 * 1024,
"path": "c:\\tmp\\packetbeat\\%{+yyyy-MM-dd-mm-ss-SSSSSS}",
"filename": "pb",
}),
assertion: func(t *testing.T, actual *fileOutConfig, err error) {
assert.Equal(t, uint(10), actual.NumberOfFiles)
assert.Equal(t, uint(5*1024), actual.RotateEveryKb)
assert.Equal(t, true, actual.RotateOnStartup)
assert.Equal(t, uint32(0600), actual.Permissions)
assert.Equal(t, "pb", actual.Filename)

path, runErr := actual.Path.Run(time.Date(2024, 1, 2, 3, 4, 5, 67890, time.UTC))
assert.Nil(t, runErr)

assert.Equal(t, "c:\\tmp\\packetbeat\\2024-01-02-04-05-000067", path)
assert.Nil(t, err)
},
},
} {
t.Run(name, func(t *testing.T) {
isWindowsPath = test.useWindowsPath
cfg, err := readConfig(test.config)
test.assertion(t, cfg, err)
})
}
}
8 changes: 8 additions & 0 deletions libbeat/outputs/fileout/docs/fileout.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ The default value is `true`.
The path to the directory where the generated files will be saved. This option is
mandatory.

The path may include the timestamp when the file output is initialized using the `+FORMAT` syntax where `FORMAT` is a
valid https://github.com/elastic/beats/blob/{doc-branch}/libbeat/common/dtfmt/doc.go[time format],
and enclosed with expansion braces: `%{+FORMAT}`. For example:

```
path: 'fileoutput-%{+yyyy.MM.dd}'
```

===== `filename`

The name of the generated files. The default is set to the Beat name. For example, the files
Expand Down
17 changes: 9 additions & 8 deletions libbeat/outputs/fileout/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,17 @@ func makeFileout(
observer outputs.Observer,
cfg *c.C,
) (outputs.Group, error) {
foConfig := defaultConfig()
if err := cfg.Unpack(&foConfig); err != nil {
foConfig, err := readConfig(cfg)
if err != nil {
return outputs.Fail(err)
}

// disable bulk support in publisher pipeline
_ = cfg.SetInt("bulk_max_size", -1, -1)

fo := &fileOutput{
log: logp.NewLogger("file"),
beat: beat,
observer: observer,
}
if err := fo.init(beat, foConfig); err != nil {
if err = fo.init(beat, *foConfig); err != nil {
return outputs.Fail(err)
}

Expand All @@ -74,10 +71,14 @@ func makeFileout(

func (out *fileOutput) init(beat beat.Info, c fileOutConfig) error {
var path string
configPath, runErr := c.Path.Run(time.Now().UTC())
if runErr != nil {
return runErr
}
if c.Filename != "" {
path = filepath.Join(c.Path, c.Filename)
path = filepath.Join(configPath, c.Filename)
} else {
path = filepath.Join(c.Path, out.beat.Beat)
path = filepath.Join(configPath, out.beat.Beat)
}

out.filePath = path
Expand Down
66 changes: 66 additions & 0 deletions libbeat/outputs/fileout/pathformatstring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fileout

import (
"os"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/elastic/beats/v7/libbeat/beat"
)

var isWindowsPath = os.PathSeparator == '\\'

// PathFormatString is a wrapper around EventFormatString for the
// handling paths with a format expression that has access to the timestamp format.
// It has special handling for paths, specifically for windows path separator
// which would be interpreted as an escape character. This formatter double escapes
// the path separator so it is properly interpreted by the fmtstr processor
type PathFormatString struct {
efs *fmtstr.EventFormatString
}

// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *PathFormatString) Run(timestamp time.Time) (string, error) {
placeholderEvent := &beat.Event{
Timestamp: timestamp,
}
return fs.efs.Run(placeholderEvent)
}

// Unpack tries to initialize the PathFormatString from provided value
// (which must be a string). Unpack method satisfies go-ucfg.Unpacker interface
// required by config.C, in order to use PathFormatString with
// `common.(*Config).Unpack()`.
func (fs *PathFormatString) Unpack(v interface{}) error {
path, ok := v.(string)
if !ok {
return nil
}

if isWindowsPath {
path = strings.ReplaceAll(path, "\\", "\\\\")
}

fs.efs = &fmtstr.EventFormatString{}
return fs.efs.Unpack(path)
}
87 changes: 87 additions & 0 deletions libbeat/outputs/fileout/pathformatstring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package fileout

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestPathFormatString(t *testing.T) {
tests := []struct {
title string
useWindowsPath bool
format string
timestamp time.Time
expected string
}{
{
"empty string",
false,
"",
time.Time{},
"",
},
{
"no fields configured",
false,
"format string",
time.Time{},
"format string",
},
{
"test timestamp formatter",
false,
"timestamp: %{+YYYY.MM.dd}",
time.Date(2015, 5, 1, 20, 12, 34, 0, time.UTC),
"timestamp: 2015.05.01",
},
{
"test timestamp formatter with posix path",
false,
"/tmp/%{+YYYY.MM.dd}",
time.Date(2015, 5, 1, 20, 12, 34, 0, time.UTC),
"/tmp/2015.05.01",
},
{
"test timestamp formatter with windows path",
true,
"C:\\tmp\\%{+YYYY.MM.dd}",
time.Date(2015, 5, 1, 20, 12, 34, 0, time.UTC),
"C:\\tmp\\2015.05.01",
},
}

for i, test := range tests {
t.Logf("test(%v): %v", i, test.title)
isWindowsPath = test.useWindowsPath
pfs := &PathFormatString{}
err := pfs.Unpack(test.format)
if err != nil {
t.Error(err)
continue
}

actual, err := pfs.Run(test.timestamp)

assert.NoError(t, err)
assert.Equal(t, test.expected, actual)
}
}

0 comments on commit 26aad5d

Please sign in to comment.