Skip to content

Commit

Permalink
RSDK-8819: Implement FTDC file rotation. (viamrobotics#4510)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb authored Nov 6, 2024
1 parent deec85b commit 13092f8
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 21 deletions.
113 changes: 92 additions & 21 deletions ftdc/ftdc.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package ftdc

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -104,14 +104,16 @@ type FTDC struct {
outputWorkerDone chan struct{}

// Fields used to manage where serialized FTDC bytes are written.
//
// When debug is true, the `outputWriter` will "tee" data to both the `currOutputFile` and
// `inmemBuffer`. Otherwise `outputWriter` will just refer to the `currOutputFile`.
debug bool
outputWriter io.Writer
currOutputFile *os.File
// inmemBuffer will remain nil when `debug` is false.
inmemBuffer *bytes.Buffer
outputWriter io.Writer
// bytesWrittenCounter will count bytes that are written to the `outputWriter`. We use an
// `io.Writer` implementer for this, as opposed to just counting by hand, primarily as a
// convenience for working with `json.NewEncoder(writer).Encode(...)`. This counter is folded
// into the above `outputWriter`.
bytesWrittenCounter countingWriter
currOutputFile *os.File
maxFileSizeBytes int64
// ftdcDir controls where FTDC data files will be written.
ftdcDir string

logger logging.Logger
}
Expand All @@ -129,6 +131,7 @@ func NewWithWriter(writer io.Writer, logger logging.Logger) *FTDC {
outputWorkerDone: make(chan struct{}),
logger: logger,
outputWriter: writer,
maxFileSizeBytes: 1_000_000,
}
}

Expand Down Expand Up @@ -175,7 +178,8 @@ func (ftdc *FTDC) Remove(name string) {
"name", name, "generationId", ftdc.inputGenerationID)
}

// Start spins off the background goroutine for collecting/writing FTDC data.
// Start spins off the background goroutine for collecting + writing FTDC data. It's normal for tests
// to _not_ call `Start`. Tests can simulate the same functionality by calling `constructDatum` and `writeDatum`.
func (ftdc *FTDC) Start() {
ftdc.readStatsWorker = utils.NewStoppableWorkerWithTicker(time.Second, ftdc.statsReader)
utils.PanicCapturingGo(ftdc.statsWriter)
Expand Down Expand Up @@ -225,6 +229,7 @@ func (ftdc *FTDC) statsWriter() {
return
}

// FSync the ftdc data once every 30 iterations (roughly every 30 seconds).
datumsWritten++
if datumsWritten%30 == 0 && ftdc.currOutputFile != nil {
utils.UncheckedError(ftdc.currOutputFile.Sync())
Expand All @@ -233,7 +238,9 @@ func (ftdc *FTDC) statsWriter() {
}

// StopAndJoin stops the background worker started by `Start`. It is only legal to call this after
// `Start` returns.
// `Start` returns. It's normal for tests to _not_ call `StopAndJoin`. Tests that have spun up the
// `statsWriter` by hand, without the `statsReader` can `close(ftdc.datumCh)` followed by
// `<-ftdc.outputWorkerDone` to stop+wait for the `statsWriter`.
func (ftdc *FTDC) StopAndJoin(ctx context.Context) {
ftdc.readStatsWorker.Stop()
close(ftdc.datumCh)
Expand Down Expand Up @@ -354,25 +361,89 @@ func (ftdc *FTDC) writeDatum(datum datum) error {
}

// getWriter returns an io.Writer xor error for writing schema/data information. `getWriter` is only
// expected to be called by `newDatum`.
// expected to be called by `writeDatum`.
func (ftdc *FTDC) getWriter() (io.Writer, error) {
if ftdc.outputWriter != nil {
// If we have an `outputWriter` without a `currOutputFile`, it means ftdc was constructed with
// an explicit writer. We will use the passed in writer for all operations. No file will ever be
// created.
if ftdc.outputWriter != nil && ftdc.currOutputFile == nil {
return ftdc.outputWriter, nil
}

// Note to readers, until this function starts mutating `outputWriter` and `currOutputFile`, you
// can safely assume:
//
// `outputWriter == nil if and only if currOutputFile == nil`.
//
// In case that helps reading the following logic.

// If we have an active outputWriter and we have not exceeded our FTDC file rotation quota, we
// can just return.
if ftdc.outputWriter != nil && ftdc.bytesWrittenCounter.count < ftdc.maxFileSizeBytes {
return ftdc.outputWriter, nil
}

// If we're in the logic branch where we have exceeded our FTDC file rotation quota, we first
// close the `currOutputFile`.
if ftdc.currOutputFile != nil {
// Dan: An error closing a file (any resource for that matter) is not an error. I will die
// on that hill.
utils.UncheckedError(ftdc.currOutputFile.Close())
}

var err error
ftdc.currOutputFile, err = os.Create("./viam-server.ftdc")
if err != nil {
// It's unclear in what circumstance we'd expect creating a new file to fail. Try 5 times for no
// good reason before giving up entirely and shutting down FTDC.
for numTries := 0; numTries < 5; numTries++ {
now := time.Now().UTC()
// lint wants 0o600 file permissions. We don't expect the unix user someone is ssh'ed in as
// to be on the same unix user as is running the viam-server process. Thus the file needs to
// be accessible by anyone.
//
//nolint:gosec
ftdc.currOutputFile, err = os.OpenFile(path.Join(ftdc.ftdcDir,
// Filename example: viam-server-2024-10-04T18-42-02.ftdc
fmt.Sprintf("viam-server-%d-%02d-%02dT%02d-%02d-%02dZ.ftdc",
now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second())),
// Create a new file in read+write mode. `O_EXCL` is used to guarantee a new file is
// created. If the filename already exists, that flag changes the `os.OpenFile` behavior
// to return an error.
os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err == nil {
break
}
ftdc.logger.Warnw("FTDC failed to open file", "err", err)

// If the error is some unexpected filename collision, wait a second to change the filename.
time.Sleep(time.Second)
}
if err != nil {
return nil, err
}

if ftdc.debug {
ftdc.inmemBuffer = bytes.NewBuffer(nil)
ftdc.outputWriter = io.MultiWriter(ftdc.currOutputFile, ftdc.inmemBuffer)
} else {
ftdc.outputWriter = ftdc.currOutputFile
}
// New file, reset the bytes written counter.
ftdc.bytesWrittenCounter.count = 0

// When we create a new file, we must rewrite the schema. If we do not, a file may be useless
// without its "ancestors".
//
// As a hack, we decrement the `outputGenerationID` to force a new schema to be written.
ftdc.outputGenerationID--

// Assign the `outputWriter`. The `outputWriter` is an abstraction for where FTDC formatted
// bytes go. Testing often prefers to just write bytes into memory (and consequently construct
// an FTDC with `NewWithWriter`). While in production we obviously want to persist bytes on
// disk.
ftdc.outputWriter = io.MultiWriter(&ftdc.bytesWrittenCounter, ftdc.currOutputFile)

return ftdc.outputWriter, nil
}

type countingWriter struct {
count int64
}

func (cw *countingWriter) Write(p []byte) (int, error) {
cw.count += int64(len(p))
return len(p), nil
}
115 changes: 115 additions & 0 deletions ftdc/ftdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package ftdc
import (
"bytes"
"errors"
"io/fs"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -285,3 +289,114 @@ func TestStatsWriterContinuesOnSchemaError(t *testing.T) {
// Wait for the `statsWriter` goroutine to exit.
<-ftdc.outputWorkerDone
}

func TestCountingBytes(t *testing.T) {
logger := logging.NewTestLogger(t)

// We must not use `NewWithWriter`. Forcing a writer for FTDC data is not compatible with FTDC
// file rotation.
ftdc := New(logger.Sublogger("ftdc"))
// Expect a log rotation after 1,000 bytes. For a changing `foo` object, this is ~60 datums.
ftdc.maxFileSizeBytes = 1000

ftdcFileDir, err := os.MkdirTemp("./", "countingBytesTest")
test.That(t, err, test.ShouldBeNil)
defer os.RemoveAll(ftdcFileDir)

// Isolate all of the files we're going to create to a single, fresh directory.
ftdc.ftdcDir = ftdcFileDir

timesRolledOver := 0
foo := &foo{}
ftdc.Add("foo", foo)
for cnt := 0; cnt < 1000; cnt++ {
foo.x = cnt
foo.y = 2 * cnt

datum := ftdc.constructDatum()
datum.Time = int64(cnt)
err := ftdc.writeDatum(datum)
test.That(t, err, test.ShouldBeNil)

// If writing a datum takes the bytes written to larger than configured max file size, an
// explicit call to `getWriter` should create a new file and reset the count.
if ftdc.bytesWrittenCounter.count >= ftdc.maxFileSizeBytes {
// We're about to write a new ftdc file. The ftdc file names are a function of
// "now". Given the test runs fast, the generated name will collide (names only use
// seconds resolution). Make a subdirectory to avoid a naming conflict.
ftdc.ftdcDir, err = os.MkdirTemp(ftdcFileDir, "subdirectory")
test.That(t, err, test.ShouldBeNil)

_, err = ftdc.getWriter()
test.That(t, err, test.ShouldBeNil)
test.That(t, ftdc.bytesWrittenCounter.count, test.ShouldBeLessThan, 1000)
timesRolledOver++
}
}
logger.Info("Rolled over:", timesRolledOver)

// Assert that the test rolled over at least once. Otherwise the test "passed" without
// exercising the intended code.
test.That(t, timesRolledOver, test.ShouldBeGreaterThan, 0)

// We're about to walk all of the output FTDC files to make some assertions. Many assertions are
// isolated to within a single FTDC file, but the following two assertions are in aggregate
// across all of the files/data:
// - That the number of FTDC files found corresponds to the number of times we've "rolled over".
// - That every "time" in the [0, 1000) range we constructed a datum from is found.
numFTDCFiles := 0
timeSeen := make(map[int64]struct{})
filepath.Walk(ftdcFileDir, filepath.WalkFunc(func(path string, info fs.FileInfo, walkErr error) error {
logger.Info("Path:", path)
if !strings.HasSuffix(path, ".ftdc") {
return nil
}

if walkErr != nil {
logger.Info("Unexpected walk error. Continuing under the assumption any actual* problem will",
"be caught by the assertions. WalkErr:", walkErr)
return nil
}

// We have an FTDC file. Count it for a later test assertion.
numFTDCFiles++

// Additionally, parse the file (in isolation) and assert its contents are correct.
ftdcFile, err := os.Open(path)
test.That(t, err, test.ShouldBeNil)
defer ftdcFile.Close()

// Temporarily set the log level to INFO to avoid spammy logs. Debug logs during parsing are
// only interesting when parsing fails.
logger.SetLevel(logging.INFO)
datums, err := ParseWithLogger(ftdcFile, logger)
logger.SetLevel(logging.DEBUG)
test.That(t, err, test.ShouldBeNil)

for _, flatDatum := range datums {
// Each datum contains two metrics: `foo.X` and `foo.Y`. The "time" must be between [0,
// 1000).
test.That(t, flatDatum.Time, test.ShouldBeGreaterThanOrEqualTo, 0)
test.That(t, flatDatum.Time, test.ShouldBeLessThan, 1000)

// Assert the "time" is new.
test.That(t, timeSeen, test.ShouldNotContainKey, flatDatum.Time)
// Mark the "time" as seen.
timeSeen[flatDatum.Time] = struct{}{}

// As per construction:
// - `foo.X` must be equal to the "time" and
// - `foo.Y` must be `2*time`.
datum := flatDatum.asDatum()
test.That(t, datum.Data["foo"].(map[string]float32)["X"], test.ShouldEqual, flatDatum.Time)
test.That(t, datum.Data["foo"].(map[string]float32)["Y"], test.ShouldEqual, 2*flatDatum.Time)
}

return nil
}))
test.That(t, len(timeSeen), test.ShouldEqual, 1000)

// There will be 1 FTDC file per `timesRolledOver`. And an additional file for first call to
// `writeDatum`. Thus the subtraction of `1` to get the right equation.
test.That(t, timesRolledOver, test.ShouldEqual, numFTDCFiles-1)
}

0 comments on commit 13092f8

Please sign in to comment.