From a8f41e4bba48a1068be02c14f6ca3d637a2ad98b Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Fri, 28 Jun 2024 11:43:00 -0400 Subject: [PATCH] Refactor data capture -- single patch --- .gitignore | 2 +- config/config_test.go | 8 +- data/capture.go | 462 ++++++++++++++++++ data/collector.go | 4 + data/collector_test.go | 8 + data/registry.go | 19 +- data/registry_test.go | 2 +- go.mod | 2 +- services/datamanager/builtin/builtin.go | 461 ++--------------- services/datamanager/builtin/builtin_test.go | 12 +- services/datamanager/builtin/capture_test.go | 22 +- services/datamanager/builtin/export_test.go | 3 - .../datamanager/builtin/file_deletion_test.go | 7 +- services/datamanager/builtin/sync_test.go | 8 +- 14 files changed, 550 insertions(+), 470 deletions(-) create mode 100644 data/capture.go diff --git a/.gitignore b/.gitignore index 2eebf96bf5d3..fa0bd2055537 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ coverage.xml profile*.pdf code-coverage-results.md metadata +data.ndjson *~ *# @@ -53,7 +54,6 @@ metadata # act .secrets -/data/ out/ .idea/* bin/ diff --git a/config/config_test.go b/config/config_test.go index 80d2c4ada6f8..3f705ece165f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -32,6 +32,7 @@ import ( "go.viam.com/rdk/logging" "go.viam.com/rdk/referenceframe" "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/datamanager" "go.viam.com/rdk/services/shell" "go.viam.com/rdk/spatialmath" rutils "go.viam.com/rdk/utils" @@ -69,6 +70,8 @@ func TestConfigRobot(t *testing.T) { test.That(t, newBc, test.ShouldResemble, bc) } +// TestConfig3 depends on the `datamanager` package *not* being loaded. Its `init` function +// registers an associated API that alters `AssociatedResourceConfigs` results. func TestConfig3(t *testing.T) { logger := logging.NewTestLogger(t) @@ -107,6 +110,7 @@ func TestConfig3(t *testing.T) { MaxPowerPct: 0.5, TicksPerRotation: 10000, }) + test.That(t, cfg.Components[2].AssociatedResourceConfigs, test.ShouldHaveLength, 1) test.That(t, cfg.Components[2].AssociatedResourceConfigs[0], test.ShouldResemble, resource.AssociatedResourceConfig{ API: resource.APINamespaceRDK.WithServiceType("data_manager"), @@ -114,6 +118,7 @@ func TestConfig3(t *testing.T) { "hi": 1.1, "friend": 2.2, }, + ConvertedAttributes: &datamanager.AssociatedConfig{}, }) test.That(t, cfg.Components[3].ConvertedAttributes, test.ShouldResemble, &incremental.Config{ @@ -135,7 +140,8 @@ func TestConfig3(t *testing.T) { "hi": 3.3, "friend": 4.4, }, - RemoteName: "rem1", + ConvertedAttributes: &datamanager.AssociatedConfig{}, + RemoteName: "rem1", }) test.That(t, cfg.Remotes[0].AssociatedResourceConfigs[1], test.ShouldResemble, resource.AssociatedResourceConfig{ API: resource.APINamespaceRDK.WithServiceType("some_type"), diff --git a/data/capture.go b/data/capture.go new file mode 100644 index 000000000000..0a05623e44ec --- /dev/null +++ b/data/capture.go @@ -0,0 +1,462 @@ +package data + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/pkg/errors" + + "go.viam.com/rdk/logging" + "go.viam.com/rdk/protoutils" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/datamanager" + "go.viam.com/rdk/services/datamanager/datacapture" + "go.viam.com/rdk/services/datamanager/datasync" + "go.viam.com/rdk/utils" +) + +// TODO: re-determine if queue size is optimal given we now support 10khz+ capture rates +// The Collector's queue should be big enough to ensure that .capture() is never blocked by the queue being +// written to disk. A default value of 250 was chosen because even with the fastest reasonable capture interval (1ms), +// this would leave 250ms for a (buffered) disk write before blocking, which seems sufficient for the size of +// writes this would be performing. +const defaultCaptureQueueSize = 250 + +// Default bufio.Writer buffer size in bytes. +const defaultCaptureBufferSize = 4096 + +// Threshold number of files to check if sync is backed up (defined as >1000 files). +var minNumFiles = 1000 + +// Default maximum size in bytes of a data capture file. +var defaultMaxCaptureSize = int64(256 * 1024) + +var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture") + +// Default time between checking and logging number of files in capture dir. +var captureDirSizeLogInterval = 1 * time.Minute + +// ErrCaptureDirectoryConfigurationDisabled happens when the viam-server is run with +// `-untrusted-env` and the capture directory is not `~/.viam`. +var ErrCaptureDirectoryConfigurationDisabled = errors.New("changing the capture directory is prohibited in this environment") + +func generateMetadataKey(component, method string) string { + return fmt.Sprintf("%s/%s", component, method) +} + +var metadataToAdditionalParamFields = map[string]string{ + generateMetadataKey("rdk:component:board", "Analogs"): "reader_name", + generateMetadataKey("rdk:component:board", "Gpios"): "pin_name", +} + +// CaptureManager manages polling resources for metrics and writing those metrics to files. There +// must be only one CaptureManager per DataManager. The lifecycle of a CaptureManager is: +// +// - NewCaptureManager +// - Reconfigure (any number of times) +// - Close (once). +type CaptureManager struct { + mu sync.Mutex + captureDir string + captureDisabled bool + collectors map[resourceMethodMetadata]*collectorAndConfig + maxCaptureFileSize int64 + componentMethodFrequencyHz map[resourceMethodMetadata]float32 + + captureDirPollingCancelFn context.CancelFunc + captureDirPollingBackgroundWorkers *sync.WaitGroup + + logger logging.Logger + clk clock.Clock +} + +// NewCaptureManager creates a new capture manager. +func NewCaptureManager(logger logging.Logger, clk clock.Clock) *CaptureManager { + return &CaptureManager{ + logger: logger, + captureDir: viamCaptureDotDir, + collectors: make(map[resourceMethodMetadata]*collectorAndConfig), + componentMethodFrequencyHz: make(map[resourceMethodMetadata]float32), + clk: clk, + } +} + +// Config is the capture manager config. +type Config struct { + CaptureDisabled bool + CaptureDir string + Tags []string + MaximumCaptureFileSizeBytes int64 +} + +// Reconfigure reconfigures the capture manager. +func (cm *CaptureManager) Reconfigure(ctx context.Context, deps resource.Dependencies, resConfig resource.Config, dataConfig Config) error { + captureConfigs, err := cm.updateDataCaptureConfigs(deps, resConfig, dataConfig.CaptureDir) + if err != nil { + return err + } + + if !utils.IsTrustedEnvironment(ctx) && dataConfig.CaptureDir != "" && dataConfig.CaptureDir != viamCaptureDotDir { + return ErrCaptureDirectoryConfigurationDisabled + } + + if dataConfig.CaptureDir != "" { + cm.captureDir = dataConfig.CaptureDir + } else { + cm.captureDir = viamCaptureDotDir + } + cm.captureDisabled = dataConfig.CaptureDisabled + // Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service. + if cm.captureDisabled { + cm.CloseCollectors() + cm.collectors = make(map[resourceMethodMetadata]*collectorAndConfig) + } + + // Initialize or add collectors based on changes to the component configurations. + newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig) + if !cm.captureDisabled { + for res, resConfs := range captureConfigs { + for _, resConf := range resConfs { + if resConf.Method == "" { + continue + } + // Create component/method metadata + methodMetadata := MethodMetadata{ + API: resConf.Name.API, + MethodName: resConf.Method, + } + + componentMethodMetadata := resourceMethodMetadata{ + ResourceName: resConf.Name.ShortName(), + MethodMetadata: methodMetadata, + MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams), + } + _, ok := cm.componentMethodFrequencyHz[componentMethodMetadata] + + // Only log capture frequency if the component frequency is new or the frequency has changed + // otherwise we'll be logging way too much + if !ok || (ok && resConf.CaptureFrequencyHz != cm.componentMethodFrequencyHz[componentMethodMetadata]) { + syncVal := "will" + if resConf.CaptureFrequencyHz == 0 { + syncVal += " not" + } + cm.logger.Infof( + "capture frequency for %s is set to %.2fHz and %s sync", componentMethodMetadata, resConf.CaptureFrequencyHz, syncVal, + ) + } + + // we need this map to keep track of if state has changed in the configs + // without it, we will be logging the same message over and over for no reason + cm.componentMethodFrequencyHz[componentMethodMetadata] = resConf.CaptureFrequencyHz + + maxCaptureFileSize := dataConfig.MaximumCaptureFileSizeBytes + if maxCaptureFileSize == 0 { + maxCaptureFileSize = defaultMaxCaptureSize + } + if !resConf.Disabled && (resConf.CaptureFrequencyHz > 0 || cm.maxCaptureFileSize != maxCaptureFileSize) { + // We only use service-level tags. + resConf.Tags = dataConfig.Tags + + maxFileSizeChanged := cm.maxCaptureFileSize != maxCaptureFileSize + cm.maxCaptureFileSize = maxCaptureFileSize + + newCollectorAndConfig, err := cm.initializeOrUpdateCollector(res, componentMethodMetadata, resConf, maxFileSizeChanged) + if err != nil { + cm.logger.CErrorw(ctx, "failed to initialize or update collector", "error", err) + } else { + newCollectors[componentMethodMetadata] = newCollectorAndConfig + } + } + } + } + } + + // If a component/method has been removed from the config, close the collector. + for md, collAndConfig := range cm.collectors { + if _, present := newCollectors[md]; !present { + collAndConfig.Collector.Close() + } + } + cm.collectors = newCollectors + + if cm.captureDirPollingCancelFn != nil { + cm.captureDirPollingCancelFn() + } + if cm.captureDirPollingBackgroundWorkers != nil { + cm.captureDirPollingBackgroundWorkers.Wait() + } + captureDirPollCtx, captureDirCancelFunc := context.WithCancel(context.Background()) + cm.captureDirPollingCancelFn = captureDirCancelFunc + cm.captureDirPollingBackgroundWorkers = &sync.WaitGroup{} + cm.captureDirPollingBackgroundWorkers.Add(1) + go cm.logCaptureDirSize(captureDirPollCtx, cm.captureDir, cm.captureDirPollingBackgroundWorkers, cm.logger) + + return nil +} + +// Close closes the capture manager. +func (cm *CaptureManager) Close() { + if cm.captureDirPollingCancelFn != nil { + cm.captureDirPollingCancelFn() + } + if cm.captureDirPollingBackgroundWorkers != nil { + cm.captureDirPollingBackgroundWorkers.Wait() + } + + cm.FlushCollectors() + cm.CloseCollectors() +} + +// CaptureDir returns the capture directory. +func (cm *CaptureManager) CaptureDir() string { + return cm.captureDir +} + +// Parameters stored for each collector. +type collectorAndConfig struct { + Resource resource.Resource + Collector Collector + Config datamanager.DataCaptureConfig +} + +// Identifier for a particular collector: component name, component model, component type, +// method parameters, and method name. +type resourceMethodMetadata struct { + ResourceName string + MethodParams string + MethodMetadata MethodMetadata +} + +func (r resourceMethodMetadata) String() string { + return fmt.Sprintf( + "[API: %s, Resource Name: %s, Method Name: %s, Method Params: %s]", + r.MethodMetadata.API, r.ResourceName, r.MethodMetadata.MethodName, r.MethodParams) +} + +// Initialize a collector for the component/method or update it if it has previously been created. +// Return the component/method metadata which is used as a key in the collectors map. +func (cm *CaptureManager) initializeOrUpdateCollector( + res resource.Resource, + md resourceMethodMetadata, + config datamanager.DataCaptureConfig, + maxFileSizeChanged bool, +) (*collectorAndConfig, error) { + // Build metadata. + captureMetadata, err := datacapture.BuildCaptureMetadata( + config.Name.API, + config.Name.ShortName(), + config.Method, + config.AdditionalParams, + config.Tags, + ) + if err != nil { + return nil, err + } + + // TODO(DATA-451): validate method params + + if storedCollectorAndConfig, ok := cm.collectors[md]; ok { + if storedCollectorAndConfig.Config.Equals(&config) && + res == storedCollectorAndConfig.Resource && + !maxFileSizeChanged { + // If the attributes have not changed, do nothing and leave the existing collector. + return cm.collectors[md], nil + } + // If the attributes have changed, close the existing collector. + storedCollectorAndConfig.Collector.Close() + } + + // Get collector constructor for the component API and method. + collectorConstructor := CollectorLookup(md.MethodMetadata) + if collectorConstructor == nil { + return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata) + } + + // Parameters to initialize collector. + interval := getDurationFromHz(config.CaptureFrequencyHz) + // Set queue size to defaultCaptureQueueSize if it was not set in the config. + captureQueueSize := config.CaptureQueueSize + if captureQueueSize == 0 { + captureQueueSize = defaultCaptureQueueSize + } + + captureBufferSize := config.CaptureBufferSize + if captureBufferSize == 0 { + captureBufferSize = defaultCaptureBufferSize + } + additionalParamKey, ok := metadataToAdditionalParamFields[generateMetadataKey( + md.MethodMetadata.API.String(), + md.MethodMetadata.MethodName)] + if ok { + if _, ok := config.AdditionalParams[additionalParamKey]; !ok { + return nil, errors.Errorf("failed to validate additional_params for %s, must supply %s", + md.MethodMetadata.API.String(), additionalParamKey) + } + } + methodParams, err := protoutils.ConvertStringMapToAnyPBMap(config.AdditionalParams) + if err != nil { + return nil, err + } + + // Create a collector for this resource and method. + targetDir := datacapture.FilePathWithReplacedReservedChars( + filepath.Join(cm.captureDir, captureMetadata.GetComponentType(), + captureMetadata.GetComponentName(), captureMetadata.GetMethodName())) + if err := os.MkdirAll(targetDir, 0o700); err != nil { + return nil, err + } + params := CollectorParams{ + ComponentName: config.Name.ShortName(), + Interval: interval, + MethodParams: methodParams, + Target: datacapture.NewBuffer(targetDir, captureMetadata, cm.maxCaptureFileSize), + QueueSize: captureQueueSize, + BufferSize: captureBufferSize, + Logger: cm.logger, + Clock: cm.clk, + } + collector, err := collectorConstructor(res, params) + if err != nil { + return nil, err + } + collector.Collect() + + return &collectorAndConfig{res, collector, config}, nil +} + +// Build the component configs associated with the data manager service. +func (cm *CaptureManager) updateDataCaptureConfigs( + resources resource.Dependencies, + conf resource.Config, + captureDir string, +) (map[resource.Resource][]datamanager.DataCaptureConfig, error) { + resourceCaptureConfigMap := make(map[resource.Resource][]datamanager.DataCaptureConfig) + for name, assocCfg := range conf.AssociatedAttributes { + associatedConf, err := utils.AssertType[*datamanager.AssociatedConfig](assocCfg) + if err != nil { + return nil, err + } + + res, err := resources.Lookup(name) + if err != nil { + cm.logger.Debugw("failed to lookup resource", "error", err) + continue + } + + captureCopies := make([]datamanager.DataCaptureConfig, len(associatedConf.CaptureMethods)) + for _, method := range associatedConf.CaptureMethods { + method.CaptureDirectory = captureDir + captureCopies = append(captureCopies, method) + } + resourceCaptureConfigMap[res] = captureCopies + } + return resourceCaptureConfigMap, nil +} + +// CloseCollectors closes collectors. +func (cm *CaptureManager) CloseCollectors() { + var collectorsToClose []Collector + cm.mu.Lock() + for _, collectorAndConfig := range cm.collectors { + collectorsToClose = append(collectorsToClose, collectorAndConfig.Collector) + } + cm.collectors = make(map[resourceMethodMetadata]*collectorAndConfig) + cm.mu.Unlock() + + var wg sync.WaitGroup + for _, collector := range collectorsToClose { + wg.Add(1) + go func(toClose Collector) { + defer wg.Done() + toClose.Close() + }(collector) + } + wg.Wait() +} + +// FlushCollectors flushes collectors. +func (cm *CaptureManager) FlushCollectors() { + var collectorsToFlush []Collector + cm.mu.Lock() + for _, collectorAndConfig := range cm.collectors { + collectorsToFlush = append(collectorsToFlush, collectorAndConfig.Collector) + } + cm.mu.Unlock() + + var wg sync.WaitGroup + for _, collector := range collectorsToFlush { + wg.Add(1) + go func(toFlush Collector) { + defer wg.Done() + toFlush.Flush() + }(collector) + } + wg.Wait() +} + +// Get time.Duration from hz. +func getDurationFromHz(captureFrequencyHz float32) time.Duration { + if captureFrequencyHz == 0 { + return time.Duration(0) + } + return time.Duration(float32(time.Second) / captureFrequencyHz) +} + +func (cm *CaptureManager) logCaptureDirSize(ctx context.Context, captureDir string, wg *sync.WaitGroup, logger logging.Logger) { + t := cm.clk.Ticker(captureDirSizeLogInterval) + defer t.Stop() + defer wg.Done() + for { + if err := ctx.Err(); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Errorw("data manager context closed unexpectedly", "error", err) + } + return + } + select { + case <-ctx.Done(): + return + case <-t.C: + numFiles := countCaptureDirFiles(ctx, captureDir) + if numFiles > minNumFiles { + logger.Infof("Capture dir contains %d files", numFiles) + } + } + } +} + +func countCaptureDirFiles(ctx context.Context, captureDir string) int { + numFiles := 0 + //nolint:errcheck + _ = filepath.Walk(captureDir, func(path string, info os.FileInfo, err error) error { + if ctx.Err() != nil { + return filepath.SkipAll + } + //nolint:nilerr + if err != nil { + return nil + } + + // Do not count the files in the corrupted data directory. + if info.IsDir() && info.Name() == datasync.FailedDir { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + // this is intentionally not doing as many checkas as getAllFilesToSync because + // this is intended for debugging and does not need to be 100% accurate. + isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt + if isCompletedCaptureFile { + numFiles++ + } + return nil + }) + return numFiles +} diff --git a/data/collector.go b/data/collector.go index 45bfe4157d98..930f9be255a9 100644 --- a/data/collector.go +++ b/data/collector.go @@ -139,6 +139,10 @@ func (c *collector) Collect() { defer c.logRoutine.Done() c.logCaptureErrs() }) + + // We must wait on `started` before returning. The sleep/ticker based captures rely on the clock + // advancing to do their first "tick". They must make an initial clock reading before unittests + // add an "interval". Lest the ticker never fires and a reading is never made. <-started } diff --git a/data/collector_test.go b/data/collector_test.go index a34a7cba4e4d..95dbe303c16f 100644 --- a/data/collector_test.go +++ b/data/collector_test.go @@ -373,3 +373,11 @@ func (b *signalingBuffer) Flush() error { func (b *signalingBuffer) Path() string { return b.bw.Path() } + +func TestGetDurationFromHz(t *testing.T) { + test.That(t, getDurationFromHz(0.1), test.ShouldEqual, time.Second*10) + test.That(t, getDurationFromHz(0.5), test.ShouldEqual, time.Second*2) + test.That(t, getDurationFromHz(1), test.ShouldEqual, time.Second) + test.That(t, getDurationFromHz(1000), test.ShouldEqual, time.Millisecond) + test.That(t, getDurationFromHz(0), test.ShouldEqual, 0) +} diff --git a/data/registry.go b/data/registry.go index 9769488c5e74..e9243dbe13ad 100644 --- a/data/registry.go +++ b/data/registry.go @@ -5,7 +5,6 @@ import ( "time" "github.com/benbjohnson/clock" - "github.com/mitchellh/copystructure" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/anypb" @@ -53,6 +52,8 @@ func (m MethodMetadata) String() string { return fmt.Sprintf("Api: %v, Method Name: %s", m.API, m.MethodName) } +// collectorRegistry is accessed without locks. This is safe because all collectors are registered +// in package initialization functions. Those functions are executed in series. var collectorRegistry = map[MethodMetadata]CollectorConstructor{} // RegisterCollector registers a Collector to its corresponding MethodMetadata. @@ -67,18 +68,6 @@ func RegisterCollector(method MethodMetadata, c CollectorConstructor) { // CollectorLookup looks up a Collector by the given MethodMetadata. nil is returned if // there is None. -func CollectorLookup(method MethodMetadata) *CollectorConstructor { - if registration, ok := RegisteredCollectors()[method]; ok { - return ®istration - } - return nil -} - -// RegisteredCollectors returns a copy of the registry. -func RegisteredCollectors() map[MethodMetadata]CollectorConstructor { - copied, err := copystructure.Copy(collectorRegistry) - if err != nil { - panic(err) - } - return copied.(map[MethodMetadata]CollectorConstructor) +func CollectorLookup(method MethodMetadata) CollectorConstructor { + return collectorRegistry[method] } diff --git a/data/registry_test.go b/data/registry_test.go index 44cc2c89639a..f3f08a98c4b4 100644 --- a/data/registry_test.go +++ b/data/registry_test.go @@ -28,7 +28,7 @@ func TestRegister(t *testing.T) { // Return registered collector if one exists. RegisterCollector(md, dummyCollectorConstructor) - ret := *CollectorLookup(md) + ret := CollectorLookup(md) test.That(t, ret, test.ShouldEqual, dummyCollectorConstructor) // Return nothing if exact match has not been registered. diff --git a/go.mod b/go.mod index 363456c269f8..d57cce4d6677 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,6 @@ require ( github.com/lucasb-eyer/go-colorful v1.2.0 github.com/mattn/go-tflite v1.0.4 github.com/matttproud/golang_protobuf_extensions v1.0.4 - github.com/mitchellh/copystructure v1.2.0 github.com/mkch/gpio v0.0.0-20190919032813-8327cd97d95e github.com/montanaflynn/stats v0.7.0 github.com/muesli/clusters v0.0.0-20200529215643-2700303c1762 @@ -275,6 +274,7 @@ require ( github.com/mbilski/exhaustivestruct v1.2.0 // indirect github.com/mgechev/revive v1.3.2 // indirect github.com/miekg/dns v1.1.53 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index c7eb3fbbecd8..54e18db3a537 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -3,7 +3,6 @@ package builtin import ( "context" - "fmt" "net" "os" "path/filepath" @@ -22,7 +21,6 @@ import ( "go.viam.com/rdk/data" "go.viam.com/rdk/internal/cloud" "go.viam.com/rdk/logging" - "go.viam.com/rdk/protoutils" "go.viam.com/rdk/resource" "go.viam.com/rdk/services/datamanager" "go.viam.com/rdk/services/datamanager/datacapture" @@ -47,38 +45,17 @@ func init() { }) } -// TODO: re-determine if queue size is optimal given we now support 10khz+ capture rates -// The Collector's queue should be big enough to ensure that .capture() is never blocked by the queue being -// written to disk. A default value of 250 was chosen because even with the fastest reasonable capture interval (1ms), -// this would leave 250ms for a (buffered) disk write before blocking, which seems sufficient for the size of -// writes this would be performing. -const defaultCaptureQueueSize = 250 - -// Default bufio.Writer buffer size in bytes. -const defaultCaptureBufferSize = 4096 - // Default time to wait in milliseconds to check if a file has been modified. const defaultFileLastModifiedMillis = 10000.0 -// Default maximum size in bytes of a data capture file. -var defaultMaxCaptureSize = int64(256 * 1024) - // Default time between disk size checks. var filesystemPollInterval = 30 * time.Second -// Threshold number of files to check if sync is backed up (defined as >1000 files). -var minNumFiles = 1000 - -// Default time between checking and logging number of files in capture dir. -var captureDirSizeLogInterval = 1 * time.Minute - var ( clock = clk.New() deletionTicker = clk.New() ) -var errCaptureDirectoryConfigurationDisabled = errors.New("changing the capture directory is prohibited in this environment") - // Config describes how to configure the service. type Config struct { CaptureDir string `json:"capture_dir"` @@ -130,14 +107,11 @@ func readyToSync(ctx context.Context, s selectiveSyncer, logger logging.Logger) type builtIn struct { resource.Named logger logging.Logger - captureDir string - captureDisabled bool - collectorsMu sync.Mutex - collectors map[resourceMethodMetadata]*collectorAndConfig lock sync.Mutex backgroundWorkers sync.WaitGroup fileLastModifiedMillis int + // Dan: This now includes the capture dir. Change to syncPaths additionalSyncPaths []string tags []string syncDisabled bool @@ -150,22 +124,16 @@ type builtIn struct { cloudConnSvc cloud.ConnectionService cloudConn rpc.ClientConn syncTicker *clk.Ticker - maxCaptureFileSize int64 syncSensor selectiveSyncer selectiveSyncEnabled bool - componentMethodFrequencyHz map[resourceMethodMetadata]float32 - fileDeletionRoutineCancelFn context.CancelFunc fileDeletionBackgroundWorkers *sync.WaitGroup - captureDirPollingCancelFn context.CancelFunc - captureDirPollingBackgroundWorkers *sync.WaitGroup + captureManager *data.CaptureManager } -var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture") - // NewBuiltIn returns a new data manager service for the given robot. func NewBuiltIn( ctx context.Context, @@ -174,17 +142,15 @@ func NewBuiltIn( logger logging.Logger, ) (datamanager.Service, error) { svc := &builtIn{ - Named: conf.ResourceName().AsNamed(), - logger: logger, - captureDir: viamCaptureDotDir, - collectors: make(map[resourceMethodMetadata]*collectorAndConfig), - syncIntervalMins: 0, - additionalSyncPaths: []string{}, - tags: []string{}, - fileLastModifiedMillis: defaultFileLastModifiedMillis, - syncerConstructor: datasync.NewManager, - selectiveSyncEnabled: false, - componentMethodFrequencyHz: make(map[resourceMethodMetadata]float32), + Named: conf.ResourceName().AsNamed(), + logger: logger, + syncIntervalMins: 0, + additionalSyncPaths: []string{}, + tags: []string{}, + fileLastModifiedMillis: defaultFileLastModifiedMillis, + syncerConstructor: datasync.NewManager, + selectiveSyncEnabled: false, + captureManager: data.NewCaptureManager(logger.Sublogger("capture"), clock), } if err := svc.Reconfigure(ctx, deps, conf); err != nil { @@ -197,7 +163,6 @@ func NewBuiltIn( // Close releases all resources managed by data_manager. func (svc *builtIn) Close(_ context.Context) error { svc.lock.Lock() - svc.closeCollectors() svc.closeSyncer() if svc.syncRoutineCancelFn != nil { svc.syncRoutineCancelFn() @@ -205,191 +170,19 @@ func (svc *builtIn) Close(_ context.Context) error { if svc.fileDeletionRoutineCancelFn != nil { svc.fileDeletionRoutineCancelFn() } - if svc.captureDirPollingCancelFn != nil { - svc.captureDirPollingCancelFn() - } fileDeletionBackgroundWorkers := svc.fileDeletionBackgroundWorkers - capturePollingWorker := svc.captureDirPollingBackgroundWorkers svc.lock.Unlock() + svc.captureManager.Close() svc.backgroundWorkers.Wait() if fileDeletionBackgroundWorkers != nil { fileDeletionBackgroundWorkers.Wait() } - if capturePollingWorker != nil { - capturePollingWorker.Wait() - } return nil } -func (svc *builtIn) closeCollectors() { - var collectorsToClose []data.Collector - svc.collectorsMu.Lock() - for _, collectorAndConfig := range svc.collectors { - collectorsToClose = append(collectorsToClose, collectorAndConfig.Collector) - } - svc.collectors = make(map[resourceMethodMetadata]*collectorAndConfig) - svc.collectorsMu.Unlock() - - var wg sync.WaitGroup - for _, collector := range collectorsToClose { - wg.Add(1) - go func(toClose data.Collector) { - defer wg.Done() - toClose.Close() - }(collector) - } - wg.Wait() -} - -func (svc *builtIn) flushCollectors() { - var collectorsToFlush []data.Collector - svc.collectorsMu.Lock() - for _, collectorAndConfig := range svc.collectors { - collectorsToFlush = append(collectorsToFlush, collectorAndConfig.Collector) - } - svc.collectorsMu.Unlock() - - var wg sync.WaitGroup - for _, collector := range collectorsToFlush { - wg.Add(1) - go func(toFlush data.Collector) { - defer wg.Done() - toFlush.Flush() - }(collector) - } - wg.Wait() -} - -// Parameters stored for each collector. -type collectorAndConfig struct { - Resource resource.Resource - Collector data.Collector - Config datamanager.DataCaptureConfig -} - -// Identifier for a particular collector: component name, component model, component type, -// method parameters, and method name. -type resourceMethodMetadata struct { - ResourceName string - MethodParams string - MethodMetadata data.MethodMetadata -} - -func (r resourceMethodMetadata) String() string { - return fmt.Sprintf( - "[API: %s, Resource Name: %s, Method Name: %s, Method Params: %s]", - r.MethodMetadata.API, r.ResourceName, r.MethodMetadata.MethodName, r.MethodParams) -} - -// Get time.Duration from hz. -func getDurationFromHz(captureFrequencyHz float32) time.Duration { - if captureFrequencyHz == 0 { - return time.Duration(0) - } - return time.Duration(float32(time.Second) / captureFrequencyHz) -} - -var metadataToAdditionalParamFields = map[string]string{ - generateMetadataKey("rdk:component:board", "Analogs"): "reader_name", - generateMetadataKey("rdk:component:board", "Gpios"): "pin_name", -} - -// Initialize a collector for the component/method or update it if it has previously been created. -// Return the component/method metadata which is used as a key in the collectors map. -func (svc *builtIn) initializeOrUpdateCollector( - res resource.Resource, - md resourceMethodMetadata, - config datamanager.DataCaptureConfig, - maxFileSizeChanged bool, -) (*collectorAndConfig, error) { - // Build metadata. - captureMetadata, err := datacapture.BuildCaptureMetadata( - config.Name.API, - config.Name.ShortName(), - config.Method, - config.AdditionalParams, - config.Tags, - ) - if err != nil { - return nil, err - } - - // TODO(DATA-451): validate method params - - svc.collectorsMu.Lock() - defer svc.collectorsMu.Unlock() - if storedCollectorAndConfig, ok := svc.collectors[md]; ok { - if storedCollectorAndConfig.Config.Equals(&config) && - res == storedCollectorAndConfig.Resource && - !maxFileSizeChanged { - // If the attributes have not changed, do nothing and leave the existing collector. - return svc.collectors[md], nil - } - // If the attributes have changed, close the existing collector. - storedCollectorAndConfig.Collector.Close() - } - - // Get collector constructor for the component API and method. - collectorConstructor := data.CollectorLookup(md.MethodMetadata) - if collectorConstructor == nil { - return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata) - } - - // Parameters to initialize collector. - interval := getDurationFromHz(config.CaptureFrequencyHz) - // Set queue size to defaultCaptureQueueSize if it was not set in the config. - captureQueueSize := config.CaptureQueueSize - if captureQueueSize == 0 { - captureQueueSize = defaultCaptureQueueSize - } - - captureBufferSize := config.CaptureBufferSize - if captureBufferSize == 0 { - captureBufferSize = defaultCaptureBufferSize - } - additionalParamKey, ok := metadataToAdditionalParamFields[generateMetadataKey( - md.MethodMetadata.API.String(), - md.MethodMetadata.MethodName)] - if ok { - if _, ok := config.AdditionalParams[additionalParamKey]; !ok { - return nil, errors.Errorf("failed to validate additional_params for %s, must supply %s", - md.MethodMetadata.API.String(), additionalParamKey) - } - } - methodParams, err := protoutils.ConvertStringMapToAnyPBMap(config.AdditionalParams) - if err != nil { - return nil, err - } - - // Create a collector for this resource and method. - targetDir := datacapture.FilePathWithReplacedReservedChars( - filepath.Join(svc.captureDir, captureMetadata.GetComponentType(), - captureMetadata.GetComponentName(), captureMetadata.GetMethodName())) - if err := os.MkdirAll(targetDir, 0o700); err != nil { - return nil, err - } - params := data.CollectorParams{ - ComponentName: config.Name.ShortName(), - Interval: interval, - MethodParams: methodParams, - Target: datacapture.NewBuffer(targetDir, captureMetadata, svc.maxCaptureFileSize), - QueueSize: captureQueueSize, - BufferSize: captureBufferSize, - Logger: svc.logger, - Clock: clock, - } - collector, err := (*collectorConstructor)(res, params) - if err != nil { - return nil, err - } - collector.Collect() - - return &collectorAndConfig{res, collector, config}, nil -} - func (svc *builtIn) closeSyncer() { if svc.syncer != nil { // If previously we were syncing, close the old syncer and cancel the old updateCollectors goroutine. @@ -417,8 +210,8 @@ func (svc *builtIn) initSyncer(ctx context.Context) error { } client := v1.NewDataSyncServiceClient(conn) - svc.filesToSync = make(chan string, svc.maxSyncThreads) - syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads, svc.filesToSync) + svc.filesToSync = make(chan string) + syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureManager.CaptureDir(), svc.maxSyncThreads, svc.filesToSync) if err != nil { return errors.Wrap(err, "failed to initialize new syncer") } @@ -466,6 +259,17 @@ func (svc *builtIn) Reconfigure( return err } + dataConfig := data.Config{ + CaptureDisabled: svcConfig.CaptureDisabled, + CaptureDir: svcConfig.CaptureDir, + Tags: svcConfig.Tags, + MaximumCaptureFileSizeBytes: svcConfig.MaximumCaptureFileSizeBytes, + } + if err = svc.captureManager.Reconfigure(ctx, deps, conf, dataConfig); err != nil { + svc.logger.Warnw("DataCapture reconfigure error", "err", err) + return err + } + // Syncer should be reinitialized if the max sync threads are updated in the config newMaxSyncThreadValue := datasync.MaxParallelSyncRoutines if svcConfig.MaximumNumSyncThreads != 0 { @@ -474,39 +278,6 @@ func (svc *builtIn) Reconfigure( reinitSyncer := cloudConnSvc != svc.cloudConnSvc || newMaxSyncThreadValue != svc.maxSyncThreads svc.cloudConnSvc = cloudConnSvc - captureConfigs, err := svc.updateDataCaptureConfigs(deps, conf, svcConfig.CaptureDir) - if err != nil { - return err - } - - if !utils.IsTrustedEnvironment(ctx) && svcConfig.CaptureDir != "" && svcConfig.CaptureDir != viamCaptureDotDir { - return errCaptureDirectoryConfigurationDisabled - } - - if svcConfig.CaptureDir != "" { - svc.captureDir = svcConfig.CaptureDir - } else { - svc.captureDir = viamCaptureDotDir - } - - if svc.captureDirPollingCancelFn != nil { - svc.captureDirPollingCancelFn() - } - if svc.captureDirPollingBackgroundWorkers != nil { - svc.captureDirPollingBackgroundWorkers.Wait() - } - captureDirPollCtx, captureDirCancelFunc := context.WithCancel(context.Background()) - svc.captureDirPollingCancelFn = captureDirCancelFunc - svc.captureDirPollingBackgroundWorkers = &sync.WaitGroup{} - svc.captureDirPollingBackgroundWorkers.Add(1) - go logCaptureDirSize(captureDirPollCtx, svc.captureDir, svc.captureDirPollingBackgroundWorkers, svc.logger) - - svc.captureDisabled = svcConfig.CaptureDisabled - // Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service. - if svc.captureDisabled { - svc.closeCollectors() - } - if svc.fileDeletionRoutineCancelFn != nil { svc.fileDeletionRoutineCancelFn() } @@ -518,78 +289,12 @@ func (svc *builtIn) Reconfigure( deleteEveryNthValue = svcConfig.DeleteEveryNthWhenDiskFull } - // Initialize or add collectors based on changes to the component configurations. - newCollectors := make(map[resourceMethodMetadata]*collectorAndConfig) - if !svc.captureDisabled { - for res, resConfs := range captureConfigs { - for _, resConf := range resConfs { - if resConf.Method == "" { - continue - } - // Create component/method metadata - methodMetadata := data.MethodMetadata{ - API: resConf.Name.API, - MethodName: resConf.Method, - } - - componentMethodMetadata := resourceMethodMetadata{ - ResourceName: resConf.Name.ShortName(), - MethodMetadata: methodMetadata, - MethodParams: fmt.Sprintf("%v", resConf.AdditionalParams), - } - _, ok := svc.componentMethodFrequencyHz[componentMethodMetadata] - - // Only log capture frequency if the component frequency is new or the frequency has changed - // otherwise we'll be logging way too much - if !ok || (ok && resConf.CaptureFrequencyHz != svc.componentMethodFrequencyHz[componentMethodMetadata]) { - syncVal := "will" - if resConf.CaptureFrequencyHz == 0 { - syncVal += " not" - } - svc.logger.Infof( - "capture frequency for %s is set to %.2fHz and %s sync", componentMethodMetadata, resConf.CaptureFrequencyHz, syncVal, - ) - } - - // we need this map to keep track of if state has changed in the configs - // without it, we will be logging the same message over and over for no reason - svc.componentMethodFrequencyHz[componentMethodMetadata] = resConf.CaptureFrequencyHz - - maxCaptureFileSize := svcConfig.MaximumCaptureFileSizeBytes - if maxCaptureFileSize == 0 { - maxCaptureFileSize = defaultMaxCaptureSize - } - if !resConf.Disabled && (resConf.CaptureFrequencyHz > 0 || svc.maxCaptureFileSize != maxCaptureFileSize) { - // We only use service-level tags. - resConf.Tags = svcConfig.Tags - - maxFileSizeChanged := svc.maxCaptureFileSize != maxCaptureFileSize - svc.maxCaptureFileSize = maxCaptureFileSize - - newCollectorAndConfig, err := svc.initializeOrUpdateCollector(res, componentMethodMetadata, resConf, maxFileSizeChanged) - if err != nil { - svc.logger.CErrorw(ctx, "failed to initialize or update collector", "error", err) - } else { - newCollectors[componentMethodMetadata] = newCollectorAndConfig - } - } - } - } - } else { + if svcConfig.CaptureDisabled { svc.fileDeletionRoutineCancelFn = nil svc.fileDeletionBackgroundWorkers = nil } - // If a component/method has been removed from the config, close the collector. - svc.collectorsMu.Lock() - for md, collAndConfig := range svc.collectors { - if _, present := newCollectors[md]; !present { - collAndConfig.Collector.Close() - } - } - svc.collectors = newCollectors - svc.collectorsMu.Unlock() - svc.additionalSyncPaths = svcConfig.AdditionalSyncPaths + svc.additionalSyncPaths = append([]string{svc.captureManager.CaptureDir()}, svcConfig.AdditionalSyncPaths...) fileLastModifiedMillis := svcConfig.FileLastModifiedMillis if fileLastModifiedMillis <= 0 { @@ -644,15 +349,16 @@ func (svc *builtIn) Reconfigure( svc.closeSyncer() } } + // if datacapture is enabled, kick off a go routine to check if disk space is filling due to // cached datacapture files - if !svc.captureDisabled { + if !svcConfig.CaptureDisabled { fileDeletionCtx, cancelFunc := context.WithCancel(context.Background()) svc.fileDeletionRoutineCancelFn = cancelFunc svc.fileDeletionBackgroundWorkers = &sync.WaitGroup{} svc.fileDeletionBackgroundWorkers.Add(1) go pollFilesystem(fileDeletionCtx, svc.fileDeletionBackgroundWorkers, - svc.captureDir, deleteEveryNthValue, svc.syncer, svc.logger) + svc.captureManager.CaptureDir(), deleteEveryNthValue, svc.syncer, svc.logger) } return nil @@ -734,25 +440,16 @@ func isOffline() bool { } func (svc *builtIn) sync(ctx context.Context) { - svc.flushCollectors() - // Lock while retrieving any values that could be changed during reconfiguration of the data - // manager. + svc.captureManager.FlushCollectors() + svc.lock.Lock() - captureDir := svc.captureDir - fileLastModifiedMillis := svc.fileLastModifiedMillis - additionalSyncPaths := svc.additionalSyncPaths - if svc.syncer == nil { - svc.lock.Unlock() - return - } syncer := svc.syncer + syncPaths := svc.additionalSyncPaths + fileLastModifiedMillis := svc.fileLastModifiedMillis svc.lock.Unlock() // Retrieve all files in capture dir and send them to the syncer - getAllFilesToSync(ctx, append([]string{captureDir}, additionalSyncPaths...), - fileLastModifiedMillis, - syncer, - ) + getAllFilesToSync(ctx, syncPaths, fileLastModifiedMillis, syncer) } //nolint:errcheck,nilerr @@ -794,39 +491,6 @@ func getAllFilesToSync(ctx context.Context, dirs []string, lastModifiedMillis in } } -// Build the component configs associated with the data manager service. -func (svc *builtIn) updateDataCaptureConfigs( - resources resource.Dependencies, - conf resource.Config, - captureDir string, -) (map[resource.Resource][]datamanager.DataCaptureConfig, error) { - resourceCaptureConfigMap := make(map[resource.Resource][]datamanager.DataCaptureConfig) - for name, assocCfg := range conf.AssociatedAttributes { - associatedConf, err := utils.AssertType[*datamanager.AssociatedConfig](assocCfg) - if err != nil { - return nil, err - } - - res, err := resources.Lookup(name) - if err != nil { - svc.logger.Debugw("failed to lookup resource", "error", err) - continue - } - - captureCopies := make([]datamanager.DataCaptureConfig, len(associatedConf.CaptureMethods)) - for _, method := range associatedConf.CaptureMethods { - method.CaptureDirectory = captureDir - captureCopies = append(captureCopies, method) - } - resourceCaptureConfigMap[res] = captureCopies - } - return resourceCaptureConfigMap, nil -} - -func generateMetadataKey(component, method string) string { - return fmt.Sprintf("%s/%s", component, method) -} - func pollFilesystem(ctx context.Context, wg *sync.WaitGroup, captureDir string, deleteEveryNth int, syncer datasync.Manager, logger logging.Logger, ) { @@ -866,58 +530,3 @@ func pollFilesystem(ctx context.Context, wg *sync.WaitGroup, captureDir string, } } } - -func logCaptureDirSize(ctx context.Context, captureDir string, wg *sync.WaitGroup, logger logging.Logger, -) { - t := clock.Ticker(captureDirSizeLogInterval) - defer t.Stop() - defer wg.Done() - for { - if err := ctx.Err(); err != nil { - if !errors.Is(err, context.Canceled) { - logger.Errorw("data manager context closed unexpectedly", "error", err) - } - return - } - select { - case <-ctx.Done(): - return - case <-t.C: - numFiles := countCaptureDirFiles(ctx, captureDir) - if numFiles > minNumFiles { - logger.Infof("Capture dir contains %d files", numFiles) - } - } - } -} - -func countCaptureDirFiles(ctx context.Context, captureDir string) int { - numFiles := 0 - //nolint:errcheck - _ = filepath.Walk(captureDir, func(path string, info os.FileInfo, err error) error { - if ctx.Err() != nil { - return filepath.SkipAll - } - //nolint:nilerr - if err != nil { - return nil - } - - // Do not count the files in the corrupted data directory. - if info.IsDir() && info.Name() == datasync.FailedDir { - return filepath.SkipDir - } - - if info.IsDir() { - return nil - } - // this is intentionally not doing as many checkas as getAllFilesToSync because - // this is intended for debugging and does not need to be 100% accurate. - isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt - if isCompletedCaptureFile { - numFiles++ - } - return nil - }) - return numFiles -} diff --git a/services/datamanager/builtin/builtin_test.go b/services/datamanager/builtin/builtin_test.go index da444ad38f39..e0037765414b 100644 --- a/services/datamanager/builtin/builtin_test.go +++ b/services/datamanager/builtin/builtin_test.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/golang/geo/r3" "go.viam.com/test" @@ -16,6 +15,7 @@ import ( "go.viam.com/rdk/components/arm" "go.viam.com/rdk/components/camera" "go.viam.com/rdk/config" + "go.viam.com/rdk/data" "go.viam.com/rdk/gostream" "go.viam.com/rdk/internal/cloud" cloudinject "go.viam.com/rdk/internal/testutils/inject" @@ -126,14 +126,6 @@ func getServiceConfig(t *testing.T, cfg *config.Config) (*Config, map[resource.N return nil, nil, nil } -func TestGetDurationFromHz(t *testing.T) { - test.That(t, GetDurationFromHz(0.1), test.ShouldEqual, time.Second*10) - test.That(t, GetDurationFromHz(0.5), test.ShouldEqual, time.Second*2) - test.That(t, GetDurationFromHz(1), test.ShouldEqual, time.Second) - test.That(t, GetDurationFromHz(1000), test.ShouldEqual, time.Millisecond) - test.That(t, GetDurationFromHz(0), test.ShouldEqual, 0) -} - func TestEmptyConfig(t *testing.T) { // Data manager should not be enabled implicitly, an empty config will not result in a data manager being configured. initConfig, associations, deps := setupConfig(t, enabledTabularCollectorEmptyConfigPath) @@ -155,7 +147,7 @@ func TestUntrustedEnv(t *testing.T) { ConvertedAttributes: config, AssociatedAttributes: associations, }) - test.That(t, err, test.ShouldEqual, errCaptureDirectoryConfigurationDisabled) + test.That(t, err, test.ShouldEqual, data.ErrCaptureDirectoryConfigurationDisabled) } func getAllFileInfos(dir string) []os.FileInfo { diff --git a/services/datamanager/builtin/capture_test.go b/services/datamanager/builtin/capture_test.go index 28f5346672fc..306da2f0f208 100644 --- a/services/datamanager/builtin/capture_test.go +++ b/services/datamanager/builtin/capture_test.go @@ -102,6 +102,7 @@ func TestDataCaptureEnabled(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + logger := logging.NewTestLogger(t) // Set up capture directories. initCaptureDir := t.TempDir() updatedCaptureDir := t.TempDir() @@ -143,7 +144,7 @@ func TestDataCaptureEnabled(t *testing.T) { donePassingTime1 := passTime(passTimeCtx1, mockClock, captureInterval) if !tc.initialServiceDisableStatus && !tc.initialCollectorDisableStatus { - waitForCaptureFilesToExceedNFiles(initCaptureDir, 0) + waitForCaptureFilesToExceedNFiles(initCaptureDir, 0, logger) testFilesContainSensorData(t, initCaptureDir) } else { initialCaptureFiles := getAllFileInfos(initCaptureDir) @@ -177,7 +178,7 @@ func TestDataCaptureEnabled(t *testing.T) { donePassingTime2 := passTime(passTimeCtx2, mockClock, captureInterval) if !tc.newServiceDisableStatus && !tc.newCollectorDisableStatus { - waitForCaptureFilesToExceedNFiles(updatedCaptureDir, 0) + waitForCaptureFilesToExceedNFiles(updatedCaptureDir, 0, logger) testFilesContainSensorData(t, updatedCaptureDir) } else { updatedCaptureFiles := getAllFileInfos(updatedCaptureDir) @@ -195,6 +196,7 @@ func TestDataCaptureEnabled(t *testing.T) { } func TestSwitchResource(t *testing.T) { + logger := logging.NewTestLogger(t) captureDir := t.TempDir() mockClock := clk.NewMock() // Make mockClock the package level clock used by the dmsvc so that we can simulate time's passage @@ -221,7 +223,7 @@ func TestSwitchResource(t *testing.T) { passTimeCtx1, cancelPassTime1 := context.WithCancel(context.Background()) donePassingTime1 := passTime(passTimeCtx1, mockClock, captureInterval) - waitForCaptureFilesToExceedNFiles(captureDir, 0) + waitForCaptureFilesToExceedNFiles(captureDir, 0, logger) testFilesContainSensorData(t, captureDir) cancelPassTime1() @@ -252,7 +254,7 @@ func TestSwitchResource(t *testing.T) { donePassingTime2 := passTime(passTimeCtx2, mockClock, captureInterval) // Test that sensor data is captured from the new collector. - waitForCaptureFilesToExceedNFiles(captureDir, len(getAllFileInfos(captureDir))) + waitForCaptureFilesToExceedNFiles(captureDir, len(getAllFileInfos(captureDir)), logger) testFilesContainSensorData(t, captureDir) filePaths := getAllFilePaths(captureDir) @@ -345,7 +347,9 @@ func testFilesContainSensorData(t *testing.T, dir string) { // waitForCaptureFilesToExceedNFiles returns once `captureDir` contains more than `n` files of at // least `emptyFileBytesSize` bytes. This is not suitable for waiting for file deletion to happen. -func waitForCaptureFilesToExceedNFiles(captureDir string, n int) { +func waitForCaptureFilesToExceedNFiles(captureDir string, n int, logger logging.Logger) { + var diagnostics sync.Once + start := time.Now() for { files := getAllFileInfos(captureDir) nonEmptyFiles := 0 @@ -363,6 +367,14 @@ func waitForCaptureFilesToExceedNFiles(captureDir string, n int) { } time.Sleep(10 * time.Millisecond) + if time.Since(start) > 10*time.Second { + diagnostics.Do(func() { + logger.Infow("waitForCaptureFilesToEqualNFiles diagnostics after 10 seconds of waiting", "numFiles", len(files), "expectedFiles", n) + for idx, file := range files { + logger.Infow("File information", "idx", idx, "dir", captureDir, "name", file.Name(), "size", file.Size()) + } + }) + } } } diff --git a/services/datamanager/builtin/export_test.go b/services/datamanager/builtin/export_test.go index 670c08b59c68..935145f200a3 100644 --- a/services/datamanager/builtin/export_test.go +++ b/services/datamanager/builtin/export_test.go @@ -14,6 +14,3 @@ func (svc *builtIn) SetSyncerConstructor(fn datasync.ManagerConstructor) { func (svc *builtIn) SetFileLastModifiedMillis(s int) { svc.fileLastModifiedMillis = s } - -// Make getDurationFromHz global for tests. -var GetDurationFromHz = getDurationFromHz diff --git a/services/datamanager/builtin/file_deletion_test.go b/services/datamanager/builtin/file_deletion_test.go index 65188ddbcd84..76bf0d7b7d1e 100644 --- a/services/datamanager/builtin/file_deletion_test.go +++ b/services/datamanager/builtin/file_deletion_test.go @@ -226,11 +226,10 @@ func TestFilePolling(t *testing.T) { // run forward 10ms to capture 4 files then close the collectors, mockClock.Add(captureInterval) - flusher, ok := dmsvc.(*builtIn) - test.That(t, ok, test.ShouldBeTrue) + captureManager := dmsvc.(*builtIn).captureManager // flush and close collectors to ensure we have exactly 4 files - flusher.flushCollectors() - flusher.closeCollectors() + captureManager.FlushCollectors() + captureManager.CloseCollectors() // number of capture files is based on the number of unique // collectors in the robot config used in this test waitForCaptureFilesToEqualNFiles(tempDir, 4, logger) diff --git a/services/datamanager/builtin/sync_test.go b/services/datamanager/builtin/sync_test.go index 936e882b2da0..3bea165091d7 100644 --- a/services/datamanager/builtin/sync_test.go +++ b/services/datamanager/builtin/sync_test.go @@ -59,6 +59,7 @@ func TestSyncEnabled(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + logger := logging.NewTestLogger(t) // Set up server. mockClock := clk.NewMock() // Make mockClock the package level clock used by the dmsvc so that we can simulate time's passage @@ -89,7 +90,7 @@ func TestSyncEnabled(t *testing.T) { }) test.That(t, err, test.ShouldBeNil) mockClock.Add(captureInterval) - waitForCaptureFilesToExceedNFiles(tmpDir, 0) + waitForCaptureFilesToExceedNFiles(tmpDir, 0, logger) mockClock.Add(syncInterval) var sentReq bool wait := time.After(time.Second) @@ -124,7 +125,7 @@ func TestSyncEnabled(t *testing.T) { } var sentReqAfterUpdate bool mockClock.Add(captureInterval) - waitForCaptureFilesToExceedNFiles(tmpDir, 0) + waitForCaptureFilesToExceedNFiles(tmpDir, 0, logger) mockClock.Add(syncInterval) wait = time.After(time.Second) select { @@ -499,6 +500,7 @@ func TestStreamingDCUpload(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + logger := logging.NewTestLogger(t) // Set up server. mockClock := clk.NewMock() clock = mockClock @@ -528,7 +530,7 @@ func TestStreamingDCUpload(t *testing.T) { // Capture an image, then close. mockClock.Add(captureInterval) - waitForCaptureFilesToExceedNFiles(tmpDir, 0) + waitForCaptureFilesToExceedNFiles(tmpDir, 0, logger) err = dmsvc.Close(context.Background()) test.That(t, err, test.ShouldBeNil)