diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 87a04003702..e846a11347b 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -63,8 +63,11 @@ var defaultMaxCaptureSize = int64(256 * 1024) // Default time between disk size checks. var filesystemPollInterval = 30 * time.Second -// Threshold number of files to check if sync is backed up (defined as >5000 files). -var minNumFiles = 5000 +// Threshold number of files to check if sync is backed up (defined as >1000 files). +var minNumFiles = 1000 + +// Default time between checking and logging number of files in capture dir. +var captureDirSizeLogInterval = 1 * time.Minute var ( clock = clk.New() @@ -139,6 +142,7 @@ type builtIn struct { syncRoutineCancelFn context.CancelFunc syncer datasync.Manager syncerConstructor datasync.ManagerConstructor + filesToSync chan string maxSyncThreads int cloudConnSvc cloud.ConnectionService cloudConn rpc.ClientConn @@ -152,6 +156,9 @@ type builtIn struct { fileDeletionRoutineCancelFn context.CancelFunc fileDeletionBackgroundWorkers *sync.WaitGroup + + captureDirPollingCancelFn context.CancelFunc + captureDirPollingBackgroundWorkers *sync.WaitGroup } var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture") @@ -195,14 +202,21 @@ func (svc *builtIn) Close(_ context.Context) error { if svc.fileDeletionRoutineCancelFn != nil { svc.fileDeletionRoutineCancelFn() } + if svc.captureDirPollingCancelFn != nil { + svc.captureDirPollingCancelFn() + } fileDeletionBackgroundWorkers := svc.fileDeletionBackgroundWorkers + capturePollingWorker := svc.captureDirPollingBackgroundWorkers svc.lock.Unlock() svc.backgroundWorkers.Wait() if fileDeletionBackgroundWorkers != nil { fileDeletionBackgroundWorkers.Wait() } + if capturePollingWorker != nil { + capturePollingWorker.Wait() + } return nil } @@ -377,6 +391,7 @@ func (svc *builtIn) closeSyncer() { if svc.syncer != nil { // If previously we were syncing, close the old syncer and cancel the old updateCollectors goroutine. svc.syncer.Close() + close(svc.filesToSync) svc.syncer = nil } if svc.cloudConn != nil { @@ -399,7 +414,8 @@ func (svc *builtIn) initSyncer(ctx context.Context) error { } client := v1.NewDataSyncServiceClient(conn) - syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads) + svc.filesToSync = make(chan string, svc.maxSyncThreads) + syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads, svc.filesToSync) if err != nil { return errors.Wrap(err, "failed to initialize new syncer") } @@ -425,7 +441,7 @@ func (svc *builtIn) Sync(ctx context.Context, _ map[string]interface{}) error { } svc.lock.Unlock() - svc.sync() + svc.sync(ctx) return nil } @@ -469,6 +485,19 @@ func (svc *builtIn) Reconfigure( } else { svc.captureDir = viamCaptureDotDir } + + if svc.captureDirPollingCancelFn != nil { + svc.captureDirPollingCancelFn() + } + if svc.captureDirPollingBackgroundWorkers != nil { + svc.captureDirPollingBackgroundWorkers.Wait() + } + captureDirPollCtx, captureDirCancelFunc := context.WithCancel(context.Background()) + svc.captureDirPollingCancelFn = captureDirCancelFunc + svc.captureDirPollingBackgroundWorkers = &sync.WaitGroup{} + svc.captureDirPollingBackgroundWorkers.Add(1) + go logCaptureDirSize(captureDirPollCtx, svc.captureDir, svc.captureDirPollingBackgroundWorkers, svc.logger) + svc.captureDisabled = svcConfig.CaptureDisabled // Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service. if svc.captureDisabled { @@ -684,7 +713,7 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64) svc.lock.Unlock() if !isOffline() && shouldSync { - svc.sync() + svc.sync(cancelCtx) } } else { svc.lock.Unlock() @@ -701,64 +730,66 @@ func isOffline() bool { return err != nil } -func (svc *builtIn) sync() { +func (svc *builtIn) sync(ctx context.Context) { svc.flushCollectors() - + // Lock while retrieving any values that could be changed during reconfiguration of the data + // manager. svc.lock.Lock() - if svc.syncer != nil { - toSync := getAllFilesToSync(svc.captureDir, svc.fileLastModifiedMillis) - for _, ap := range svc.additionalSyncPaths { - toSync = append(toSync, getAllFilesToSync(ap, svc.fileLastModifiedMillis)...) - } - syncer := svc.syncer - stopAfter := time.Now().Add(time.Duration(svc.syncIntervalMins * float64(time.Minute))) - svc.lock.Unlock() - - // Only log if there are a large number of files to sync - if len(toSync) > minNumFiles { - svc.logger.Infof("Starting sync of %d files", len(toSync)) - } - for _, p := range toSync { - syncer.SyncFile(p, stopAfter) - } - } else { + captureDir := svc.captureDir + fileLastModifiedMillis := svc.fileLastModifiedMillis + additionalSyncPaths := svc.additionalSyncPaths + if svc.syncer == nil { svc.lock.Unlock() + return } + syncer := svc.syncer + svc.lock.Unlock() + + // Retrieve all files in capture dir and send them to the syncer + getAllFilesToSync(ctx, append([]string{captureDir}, additionalSyncPaths...), + fileLastModifiedMillis, + syncer, + ) } //nolint:errcheck,nilerr -func getAllFilesToSync(dir string, lastModifiedMillis int) []string { - var filePaths []string - _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return nil - } - // Do not sync the files in the corrupted data directory. - if info.IsDir() && info.Name() == datasync.FailedDir { - return filepath.SkipDir - } - if info.IsDir() { +func getAllFilesToSync(ctx context.Context, dirs []string, lastModifiedMillis int, syncer datasync.Manager) { + for _, dir := range dirs { + _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if ctx.Err() != nil { + return filepath.SkipAll + } + if err != nil { + return nil + } + + // Do not sync the files in the corrupted data directory. + if info.IsDir() && info.Name() == datasync.FailedDir { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + // If a file was modified within the past lastModifiedMillis, do not sync it (data + // may still be being written). + timeSinceMod := clock.Since(info.ModTime()) + // When using a mock clock in tests, this can be negative since the file system will still use the system clock. + // Take max(timeSinceMod, 0) to account for this. + if timeSinceMod < 0 { + timeSinceMod = 0 + } + isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt && + timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond + isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt && + timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond + isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt + if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo { + syncer.SendFileToSync(path) + } return nil - } - // If a file was modified within the past lastModifiedMillis, do not sync it (data - // may still be being written). - timeSinceMod := clock.Since(info.ModTime()) - // When using a mock clock in tests, this can be negative since the file system will still use the system clock. - // Take max(timeSinceMod, 0) to account for this. - if timeSinceMod < 0 { - timeSinceMod = 0 - } - isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt && - timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond - isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt && - timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond - isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt - if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo { - filePaths = append(filePaths, path) - } - return nil - }) - return filePaths + }) + } } // Build the component configs associated with the data manager service. @@ -833,3 +864,58 @@ func pollFilesystem(ctx context.Context, wg *sync.WaitGroup, captureDir string, } } } + +func logCaptureDirSize(ctx context.Context, captureDir string, wg *sync.WaitGroup, logger logging.Logger, +) { + t := clock.Ticker(captureDirSizeLogInterval) + defer t.Stop() + defer wg.Done() + for { + if err := ctx.Err(); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Errorw("data manager context closed unexpectedly", "error", err) + } + return + } + select { + case <-ctx.Done(): + return + case <-t.C: + numFiles := countCaptureDirFiles(ctx, captureDir) + if numFiles > minNumFiles { + logger.Infof("Capture dir contains %d files", numFiles) + } + } + } +} + +func countCaptureDirFiles(ctx context.Context, captureDir string) int { + numFiles := 0 + //nolint:errcheck + _ = filepath.Walk(captureDir, func(path string, info os.FileInfo, err error) error { + if ctx.Err() != nil { + return filepath.SkipAll + } + //nolint:nilerr + if err != nil { + return nil + } + + // Do not count the files in the corrupted data directory. + if info.IsDir() && info.Name() == datasync.FailedDir { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + // this is intentionally not doing as many checkas as getAllFilesToSync because + // this is intended for debugging and does not need to be 100% accurate. + isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt + if isCompletedCaptureFile { + numFiles++ + } + return nil + }) + return numFiles +} diff --git a/services/datamanager/builtin/file_deletion_test.go b/services/datamanager/builtin/file_deletion_test.go index f6127b65701..65188ddbcd8 100644 --- a/services/datamanager/builtin/file_deletion_test.go +++ b/services/datamanager/builtin/file_deletion_test.go @@ -147,7 +147,9 @@ func TestFileDeletion(t *testing.T) { var syncer datasync.Manager if tc.syncEnabled { - s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines) + filesToSync := make(chan string) + defer close(filesToSync) + s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines, filesToSync) test.That(t, err, test.ShouldBeNil) syncer = s defer syncer.Close() diff --git a/services/datamanager/builtin/sync_test.go b/services/datamanager/builtin/sync_test.go index 22b737868ab..936e882b2da 100644 --- a/services/datamanager/builtin/sync_test.go +++ b/services/datamanager/builtin/sync_test.go @@ -474,6 +474,14 @@ func TestArbitraryFileUpload(t *testing.T) { } func TestStreamingDCUpload(t *testing.T) { + // Set max unary file size to 1 byte, so it uses the streaming rpc. Reset the original + // value such that other tests take the correct code paths. + origSize := datasync.MaxUnaryFileSize + datasync.MaxUnaryFileSize = 1 + defer func() { + datasync.MaxUnaryFileSize = origSize + }() + tests := []struct { name string serviceFail bool @@ -537,8 +545,6 @@ func TestStreamingDCUpload(t *testing.T) { streamingDCUploads: make(chan *mockStreamingDCClient, 10), fail: &f, } - // Set max unary file size to 1 byte, so it uses the streaming rpc. - datasync.MaxUnaryFileSize = 1 newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient)) cfg.CaptureDisabled = true cfg.ScheduledSyncDisabled = true @@ -919,9 +925,9 @@ func (m *mockStreamingDCClient) CloseSend() error { func getTestSyncerConstructorMock(client mockDataSyncServiceClient) datasync.ManagerConstructor { return func(identity string, _ v1.DataSyncServiceClient, logger logging.Logger, - viamCaptureDotDir string, maxSyncThreads int, + viamCaptureDotDir string, maxSyncThreads int, filesToSync chan string, ) (datasync.Manager, error) { - return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads) + return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads, make(chan string)) } } diff --git a/services/datamanager/datasync/noop_sync.go b/services/datamanager/datasync/noop_sync.go index 75ccc30aae5..f3291ac0f5a 100644 --- a/services/datamanager/datasync/noop_sync.go +++ b/services/datamanager/datasync/noop_sync.go @@ -1,7 +1,5 @@ package datasync -import "time" - type noopManager struct{} var _ Manager = (*noopManager)(nil) @@ -11,7 +9,7 @@ func NewNoopManager() Manager { return &noopManager{} } -func (m *noopManager) SyncFile(path string, stopAfter time.Time) {} +func (m *noopManager) SyncFile(path string) {} func (m *noopManager) SetArbitraryFileTags(tags []string) {} @@ -21,4 +19,6 @@ func (m *noopManager) MarkInProgress(path string) bool { return true } +func (m *noopManager) SendFileToSync(path string) {} + func (m *noopManager) UnmarkInProgress(path string) {} diff --git a/services/datamanager/datasync/sync.go b/services/datamanager/datasync/sync.go index 34d2b09bdbf..86a24232376 100644 --- a/services/datamanager/datasync/sync.go +++ b/services/datamanager/datasync/sync.go @@ -41,7 +41,8 @@ const MaxParallelSyncRoutines = 1000 // Manager is responsible for enqueuing files in captureDir and uploading them to the cloud. type Manager interface { - SyncFile(path string, stopAfter time.Time) + SendFileToSync(path string) + SyncFile(path string) SetArbitraryFileTags(tags []string) Close() MarkInProgress(path string) bool @@ -65,32 +66,32 @@ type syncer struct { closed atomic.Bool logRoutine sync.WaitGroup - syncRoutineTracker chan struct{} + filesToSync chan string captureDir string } // ManagerConstructor is a function for building a Manager. type ManagerConstructor func(identity string, client v1.DataSyncServiceClient, logger logging.Logger, - captureDir string, maxSyncThreadsConfig int) (Manager, error) + captureDir string, maxSyncThreadsConfig int, filesToSync chan string) (Manager, error) // NewManager returns a new syncer. func NewManager(identity string, client v1.DataSyncServiceClient, logger logging.Logger, - captureDir string, maxSyncThreads int, + captureDir string, maxSyncThreads int, filesToSync chan string, ) (Manager, error) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) logger.Debugf("Making new syncer with %d max threads", maxSyncThreads) ret := syncer{ - partID: identity, - client: client, - logger: logger, - cancelCtx: cancelCtx, - cancelFunc: cancelFunc, - arbitraryFileTags: []string{}, - inProgress: make(map[string]bool), - syncErrs: make(chan error, 10), - syncRoutineTracker: make(chan struct{}, maxSyncThreads), - captureDir: captureDir, + partID: identity, + client: client, + logger: logger, + cancelCtx: cancelCtx, + cancelFunc: cancelFunc, + arbitraryFileTags: []string{}, + inProgress: make(map[string]bool), + syncErrs: make(chan error, 10), + filesToSync: filesToSync, + captureDir: captureDir, } ret.logRoutine.Add(1) goutils.PanicCapturingGo(func() { @@ -98,6 +99,27 @@ func NewManager(identity string, client v1.DataSyncServiceClient, logger logging ret.logSyncErrs() }) + for i := 0; i < maxSyncThreads; i++ { + ret.backgroundWorkers.Add(1) + go func() { + defer ret.backgroundWorkers.Done() + for { + if cancelCtx.Err() != nil { + return + } + select { + case <-cancelCtx.Done(): + return + case path, ok := <-ret.filesToSync: + if !ok { + return + } + ret.SyncFile(path) + } + } + }() + } + return &ret, nil } @@ -114,79 +136,47 @@ func (s *syncer) SetArbitraryFileTags(tags []string) { s.arbitraryFileTags = tags } -func (s *syncer) SyncFile(path string, stopAfter time.Time) { +func (s *syncer) SendFileToSync(path string) { + select { + case s.filesToSync <- path: + return + case <-s.cancelCtx.Done(): + return + } +} + +func (s *syncer) SyncFile(path string) { // If the file is already being synced, do not kick off a new goroutine. // The goroutine will again check and return early if sync is already in progress. - s.progressLock.Lock() - if s.inProgress[path] { - s.progressLock.Unlock() + if !s.MarkInProgress(path) { return } - s.progressLock.Unlock() - - for { - if s.cancelCtx.Err() != nil { - return - } - - if time.Now().After(stopAfter) { - return + defer s.UnmarkInProgress(path) + //nolint:gosec + f, err := os.Open(path) + if err != nil { + // Don't log if the file does not exist, because that means it was successfully synced and deleted + // in between paths being built and this executing. + if !errors.Is(err, os.ErrNotExist) { + s.logger.Errorw("error opening file", "error", err) } + return + } - select { - case <-s.cancelCtx.Done(): - return - // Kick off a sync goroutine if under the limit of goroutines. - case s.syncRoutineTracker <- struct{}{}: - s.backgroundWorkers.Add(1) - - goutils.PanicCapturingGo(func() { - defer s.backgroundWorkers.Done() - // At the end, decrement the number of sync routines. - defer func() { - <-s.syncRoutineTracker - }() - select { - case <-s.cancelCtx.Done(): - return - default: - if !s.MarkInProgress(path) { - return - } - defer s.UnmarkInProgress(path) - //nolint:gosec - f, err := os.Open(path) - if err != nil { - // Don't log if the file does not exist, because that means it was successfully synced and deleted - // in between paths being built and this executing. - if !errors.Is(err, os.ErrNotExist) { - s.logger.Errorw("error opening file", "error", err) - } - return - } - - if datacapture.IsDataCaptureFile(f) { - captureFile, err := datacapture.ReadFile(f) - if err != nil { - if err = f.Close(); err != nil { - s.syncErrs <- errors.Wrap(err, "error closing data capture file") - } - if err := moveFailedData(f.Name(), s.captureDir); err != nil { - s.syncErrs <- errors.Wrap(err, fmt.Sprintf("error moving corrupted data %s", f.Name())) - } - return - } - s.syncDataCaptureFile(captureFile) - } else { - s.syncArbitraryFile(f) - } - } - }) - + if datacapture.IsDataCaptureFile(f) { + captureFile, err := datacapture.ReadFile(f) + if err != nil { + if err = f.Close(); err != nil { + s.syncErrs <- errors.Wrap(err, "error closing data capture file") + } + if err := moveFailedData(f.Name(), s.captureDir); err != nil { + s.syncErrs <- errors.Wrap(err, fmt.Sprintf("error moving corrupted data %s", f.Name())) + } return - default: - // Avoid blocking main thread if currently at goroutine capacity. } + s.syncDataCaptureFile(captureFile) + } else { + s.syncArbitraryFile(f) } } @@ -255,7 +245,7 @@ func (s *syncer) MarkInProgress(path string) bool { s.progressLock.Lock() defer s.progressLock.Unlock() if s.inProgress[path] { - s.logger.Warnw("File already in progress, trying to mark it again", "file", path) + s.logger.Debugw("File already in progress, trying to mark it again", "file", path) return false } s.inProgress[path] = true