Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Sep 20, 2024
1 parent f1e006a commit 231af33
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 20 deletions.
16 changes: 8 additions & 8 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func New(
connToConnectivityStateEnabled func(conn rpc.ClientConn) datasync.ConnectivityState,
logger logging.Logger,
) (datamanager.Service, error) {
logger.Debug("New START")
defer logger.Debug("New END")
logger.Info("New START")
defer logger.Info("New END")
capture := capture.New(
clk,
logger.Sublogger("capture"),
Expand Down Expand Up @@ -130,8 +130,8 @@ func New(

// Close releases all resources managed by data_manager.
func (b *builtIn) Close(_ context.Context) error {
b.logger.Debug("Close START")
defer b.logger.Debug("Close END")
b.logger.Info("Close START")
defer b.logger.Info("Close END")
b.mu.Lock()
defer b.mu.Unlock()
b.diskSummaryLogger.close()
Expand All @@ -147,8 +147,8 @@ func (b *builtIn) Close(_ context.Context) error {
// If automated sync is also enabled, calling Sync will upload the files,
// regardless of whether or not is the scheduled time.
func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error {
b.logger.Debug("Sync START")
defer b.logger.Debug("Sync END")
b.logger.Info("Sync START")
defer b.logger.Info("Sync END")
b.mu.Lock()
defer b.mu.Unlock()
return b.sync.Sync(ctx, extra)
Expand All @@ -171,8 +171,8 @@ func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error
// If an error occurs after the first Reconfigure call, data capture & data sync will continue to function using the old config
// until a successful Reconfigure call is made or Close is called.
func (b *builtIn) Reconfigure(ctx context.Context, deps resource.Dependencies, conf resource.Config) error {
b.logger.Debug("Reconfigure START")
defer b.logger.Debug("Reconfigure END")
b.logger.Info("Reconfigure START")
defer b.logger.Info("Reconfigure END")
c, err := resource.NativeConfig[*Config](conf)
if err != nil {
// If this error occurs it is due to the builtin.Config not being a native config which is a
Expand Down
26 changes: 25 additions & 1 deletion services/datamanager/builtin/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package builtin

import (
"errors"
"runtime"

"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/services/datamanager/builtin/capture"
datasync "go.viam.com/rdk/services/datamanager/builtin/sync"
"go.viam.com/rdk/utils"
)

// Sync defaults.
Expand All @@ -18,6 +20,10 @@ const (
// which is evaluated if the file deletion threshold has been reached. If `captureFileIndex % N == 0`
// return true then the file will be deleted to free up space.
defaultDeleteEveryNth = 5
// defaultSyncIntervalMins is the sync interval that will be set if the config's sync_interval_mins is zero (including when it is unset)
defaultSyncIntervalMins = 0.1
// syncIntervalMinsEpsilon is the value below which SyncIntervalMins is considered zero
syncIntervalMinsEpsilon = 0.00001
)

// Capture Defaults
Expand Down Expand Up @@ -46,6 +52,18 @@ type Config struct {

// Validate returns components which will be depended upon weakly due to the above matcher.
func (c *Config) Validate(path string) ([]string, error) {
if c.SyncIntervalMins < 0 {
return nil, errors.New("sync_interval_mins can't be negative")
}
if c.FileLastModifiedMillis < 0 {
return nil, errors.New("file_last_modified_millis can't be negative")
}
if c.MaximumCaptureFileSizeBytes < 0 {
return nil, errors.New("maximum_capture_file_size_bytes can't be negative")
}
if c.DeleteEveryNthWhenDiskFull < 0 {
return nil, errors.New("delete_every_nth_when_disk_full can't be negative")
}
return []string{cloud.InternalServiceName.String()}, nil
}

Expand Down Expand Up @@ -88,6 +106,12 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool) da
fileLastModifiedMillis = defaultFileLastModifiedMillis
}
c.FileLastModifiedMillis = fileLastModifiedMillis

syncIntervalMins := c.SyncIntervalMins
if utils.Float64AlmostEqual(c.SyncIntervalMins, 0, syncIntervalMinsEpsilon) {
syncIntervalMins = defaultSyncIntervalMins
}

return datasync.Config{
AdditionalSyncPaths: c.AdditionalSyncPaths,
Tags: c.Tags,
Expand All @@ -98,7 +122,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool) da
MaximumNumSyncThreads: c.MaximumNumSyncThreads,
ScheduledSyncDisabled: c.ScheduledSyncDisabled,
SelectiveSyncerName: c.SelectiveSyncerName,
SyncIntervalMins: c.SyncIntervalMins,
SyncIntervalMins: syncIntervalMins,
SelectiveSyncSensor: syncSensor,
SelectiveSyncSensorEnabled: syncSensorEnabled,
}
Expand Down
12 changes: 12 additions & 0 deletions services/datamanager/builtin/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ func TestConfig(t *testing.T) {
DeleteEveryNthWhenDiskFull: 5,
FileLastModifiedMillis: 10000,
MaximumNumSyncThreads: runtime.NumCPU() / 2,
SyncIntervalMins: 0.1,
})
})

t.Run("returns a sync config with defaults when called on a config with SyncIntervalMins which is practically 0", func(t *testing.T) {
c := &Config{SyncIntervalMins: 0.000000000000000001}
test.That(t, c.syncConfig(nil, false), test.ShouldResemble, sync.Config{
CaptureDir: viamCaptureDotDir,
DeleteEveryNthWhenDiskFull: 5,
FileLastModifiedMillis: 10000,
MaximumNumSyncThreads: runtime.NumCPU() / 2,
SyncIntervalMins: 0.1,
})
})
t.Run("returns a sync config with overridden defaults when called on a full config", func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions services/datamanager/builtin/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/utils"
)

// Config is the sync config from builtin.
Expand Down Expand Up @@ -89,7 +88,7 @@ type Config struct {
}

func (c Config) schedulerEnabled() bool {
configDisabled := c.ScheduledSyncDisabled || utils.Float64AlmostEqual(c.SyncIntervalMins, 0.0, 0.00001)
configDisabled := c.ScheduledSyncDisabled
selectiveSyncerInvalid := c.SelectiveSyncSensorEnabled && c.SelectiveSyncSensor == nil
return !configDisabled && !selectiveSyncerInvalid
}
Expand Down Expand Up @@ -135,7 +134,7 @@ func (c *Config) logDiff(o Config, logger logging.Logger) {
}

if c.FileLastModifiedMillis != o.FileLastModifiedMillis {
logger.Infof("file_last_modified_millis: old: %d, new: %Bd", c.FileLastModifiedMillis, o.FileLastModifiedMillis)
logger.Infof("file_last_modified_millis: old: %d, new: %d", c.FileLastModifiedMillis, o.FileLastModifiedMillis)
}

if c.MaximumNumSyncThreads != o.MaximumNumSyncThreads {
Expand Down Expand Up @@ -167,8 +166,9 @@ func (c *Config) logDiff(o Config, logger logging.Logger) {
if c.SelectiveSyncSensor != nil {
oldName = c.SelectiveSyncSensor.Name().String()
}

newName := ""
if c.SelectiveSyncSensor != nil {
if o.SelectiveSyncSensor != nil {
newName = o.SelectiveSyncSensor.Name().String()
}
logger.Infof("SelectiveSyncSensor: old: %s, new: %s", oldName, newName)
Expand Down
9 changes: 2 additions & 7 deletions services/datamanager/builtin/sync/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,15 @@ func TestConfig(t *testing.T) {
})

t.Run("schedulerEnabled()", func(t *testing.T) {
t.Run("false by default", func(t *testing.T) {
test.That(t, Config{}.schedulerEnabled(), test.ShouldBeFalse)
t.Run("true by default", func(t *testing.T) {
test.That(t, Config{}.schedulerEnabled(), test.ShouldBeTrue)
})

t.Run("false if ScheduledSyncDisabled", func(t *testing.T) {
test.That(t, Config{ScheduledSyncDisabled: true}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{ScheduledSyncDisabled: true, SyncIntervalMins: 1.0}.schedulerEnabled(), test.ShouldBeFalse)
})

t.Run("false if SyncIntervalMins is almost 0", func(t *testing.T) {
test.That(t, Config{SyncIntervalMins: 0.0000001}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{SelectiveSyncSensorEnabled: true, SelectiveSyncSensor: &inject.Sensor{}}.schedulerEnabled(), test.ShouldBeFalse)
})

t.Run("false if SelectiveSyncSensorEnabled is true and SelectiveSyncSensor is nil", func(t *testing.T) {
test.That(t, Config{SelectiveSyncSensorEnabled: true}.schedulerEnabled(), test.ShouldBeFalse)
test.That(t, Config{SelectiveSyncSensorEnabled: true, SyncIntervalMins: 1.0}.schedulerEnabled(), test.ShouldBeFalse)
Expand Down

0 comments on commit 231af33

Please sign in to comment.