From 2d0eccc46c288b763e1f6c810cfe929de11f2a02 Mon Sep 17 00:00:00 2001 From: Vijay Vuyyuru Date: Thu, 13 Jun 2024 14:59:05 -0400 Subject: [PATCH] DATA-2703 Have datasync better cope with large numbers of files to sync (#4070) Co-authored-by: Dan Gottlieb In the event of network outages (among other circumstances), data capture will continue to create files, but data sync will be unable to make progress on them. The memory utilization of the existing behavior was ~O(number of files to sync). With this patch the expected memory utilization is O(max sync threads config value). This approximation, notably, does not grow while the network is unavailable. --- services/datamanager/builtin/builtin.go | 196 +++++++++++++----- .../datamanager/builtin/file_deletion_test.go | 4 +- services/datamanager/builtin/sync_test.go | 14 +- services/datamanager/datasync/noop_sync.go | 6 +- services/datamanager/datasync/sync.go | 150 +++++++------- 5 files changed, 227 insertions(+), 143 deletions(-) diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 87a04003702..e846a11347b 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -63,8 +63,11 @@ var defaultMaxCaptureSize = int64(256 * 1024) // Default time between disk size checks. var filesystemPollInterval = 30 * time.Second -// Threshold number of files to check if sync is backed up (defined as >5000 files). -var minNumFiles = 5000 +// Threshold number of files to check if sync is backed up (defined as >1000 files). +var minNumFiles = 1000 + +// Default time between checking and logging number of files in capture dir. +var captureDirSizeLogInterval = 1 * time.Minute var ( clock = clk.New() @@ -139,6 +142,7 @@ type builtIn struct { syncRoutineCancelFn context.CancelFunc syncer datasync.Manager syncerConstructor datasync.ManagerConstructor + filesToSync chan string maxSyncThreads int cloudConnSvc cloud.ConnectionService cloudConn rpc.ClientConn @@ -152,6 +156,9 @@ type builtIn struct { fileDeletionRoutineCancelFn context.CancelFunc fileDeletionBackgroundWorkers *sync.WaitGroup + + captureDirPollingCancelFn context.CancelFunc + captureDirPollingBackgroundWorkers *sync.WaitGroup } var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture") @@ -195,14 +202,21 @@ func (svc *builtIn) Close(_ context.Context) error { if svc.fileDeletionRoutineCancelFn != nil { svc.fileDeletionRoutineCancelFn() } + if svc.captureDirPollingCancelFn != nil { + svc.captureDirPollingCancelFn() + } fileDeletionBackgroundWorkers := svc.fileDeletionBackgroundWorkers + capturePollingWorker := svc.captureDirPollingBackgroundWorkers svc.lock.Unlock() svc.backgroundWorkers.Wait() if fileDeletionBackgroundWorkers != nil { fileDeletionBackgroundWorkers.Wait() } + if capturePollingWorker != nil { + capturePollingWorker.Wait() + } return nil } @@ -377,6 +391,7 @@ func (svc *builtIn) closeSyncer() { if svc.syncer != nil { // If previously we were syncing, close the old syncer and cancel the old updateCollectors goroutine. svc.syncer.Close() + close(svc.filesToSync) svc.syncer = nil } if svc.cloudConn != nil { @@ -399,7 +414,8 @@ func (svc *builtIn) initSyncer(ctx context.Context) error { } client := v1.NewDataSyncServiceClient(conn) - syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads) + svc.filesToSync = make(chan string, svc.maxSyncThreads) + syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads, svc.filesToSync) if err != nil { return errors.Wrap(err, "failed to initialize new syncer") } @@ -425,7 +441,7 @@ func (svc *builtIn) Sync(ctx context.Context, _ map[string]interface{}) error { } svc.lock.Unlock() - svc.sync() + svc.sync(ctx) return nil } @@ -469,6 +485,19 @@ func (svc *builtIn) Reconfigure( } else { svc.captureDir = viamCaptureDotDir } + + if svc.captureDirPollingCancelFn != nil { + svc.captureDirPollingCancelFn() + } + if svc.captureDirPollingBackgroundWorkers != nil { + svc.captureDirPollingBackgroundWorkers.Wait() + } + captureDirPollCtx, captureDirCancelFunc := context.WithCancel(context.Background()) + svc.captureDirPollingCancelFn = captureDirCancelFunc + svc.captureDirPollingBackgroundWorkers = &sync.WaitGroup{} + svc.captureDirPollingBackgroundWorkers.Add(1) + go logCaptureDirSize(captureDirPollCtx, svc.captureDir, svc.captureDirPollingBackgroundWorkers, svc.logger) + svc.captureDisabled = svcConfig.CaptureDisabled // Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service. if svc.captureDisabled { @@ -684,7 +713,7 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64) svc.lock.Unlock() if !isOffline() && shouldSync { - svc.sync() + svc.sync(cancelCtx) } } else { svc.lock.Unlock() @@ -701,64 +730,66 @@ func isOffline() bool { return err != nil } -func (svc *builtIn) sync() { +func (svc *builtIn) sync(ctx context.Context) { svc.flushCollectors() - + // Lock while retrieving any values that could be changed during reconfiguration of the data + // manager. svc.lock.Lock() - if svc.syncer != nil { - toSync := getAllFilesToSync(svc.captureDir, svc.fileLastModifiedMillis) - for _, ap := range svc.additionalSyncPaths { - toSync = append(toSync, getAllFilesToSync(ap, svc.fileLastModifiedMillis)...) - } - syncer := svc.syncer - stopAfter := time.Now().Add(time.Duration(svc.syncIntervalMins * float64(time.Minute))) - svc.lock.Unlock() - - // Only log if there are a large number of files to sync - if len(toSync) > minNumFiles { - svc.logger.Infof("Starting sync of %d files", len(toSync)) - } - for _, p := range toSync { - syncer.SyncFile(p, stopAfter) - } - } else { + captureDir := svc.captureDir + fileLastModifiedMillis := svc.fileLastModifiedMillis + additionalSyncPaths := svc.additionalSyncPaths + if svc.syncer == nil { svc.lock.Unlock() + return } + syncer := svc.syncer + svc.lock.Unlock() + + // Retrieve all files in capture dir and send them to the syncer + getAllFilesToSync(ctx, append([]string{captureDir}, additionalSyncPaths...), + fileLastModifiedMillis, + syncer, + ) } //nolint:errcheck,nilerr -func getAllFilesToSync(dir string, lastModifiedMillis int) []string { - var filePaths []string - _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return nil - } - // Do not sync the files in the corrupted data directory. - if info.IsDir() && info.Name() == datasync.FailedDir { - return filepath.SkipDir - } - if info.IsDir() { +func getAllFilesToSync(ctx context.Context, dirs []string, lastModifiedMillis int, syncer datasync.Manager) { + for _, dir := range dirs { + _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if ctx.Err() != nil { + return filepath.SkipAll + } + if err != nil { + return nil + } + + // Do not sync the files in the corrupted data directory. + if info.IsDir() && info.Name() == datasync.FailedDir { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + // If a file was modified within the past lastModifiedMillis, do not sync it (data + // may still be being written). + timeSinceMod := clock.Since(info.ModTime()) + // When using a mock clock in tests, this can be negative since the file system will still use the system clock. + // Take max(timeSinceMod, 0) to account for this. + if timeSinceMod < 0 { + timeSinceMod = 0 + } + isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt && + timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond + isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt && + timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond + isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt + if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo { + syncer.SendFileToSync(path) + } return nil - } - // If a file was modified within the past lastModifiedMillis, do not sync it (data - // may still be being written). - timeSinceMod := clock.Since(info.ModTime()) - // When using a mock clock in tests, this can be negative since the file system will still use the system clock. - // Take max(timeSinceMod, 0) to account for this. - if timeSinceMod < 0 { - timeSinceMod = 0 - } - isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt && - timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond - isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt && - timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond - isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt - if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo { - filePaths = append(filePaths, path) - } - return nil - }) - return filePaths + }) + } } // Build the component configs associated with the data manager service. @@ -833,3 +864,58 @@ func pollFilesystem(ctx context.Context, wg *sync.WaitGroup, captureDir string, } } } + +func logCaptureDirSize(ctx context.Context, captureDir string, wg *sync.WaitGroup, logger logging.Logger, +) { + t := clock.Ticker(captureDirSizeLogInterval) + defer t.Stop() + defer wg.Done() + for { + if err := ctx.Err(); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Errorw("data manager context closed unexpectedly", "error", err) + } + return + } + select { + case <-ctx.Done(): + return + case <-t.C: + numFiles := countCaptureDirFiles(ctx, captureDir) + if numFiles > minNumFiles { + logger.Infof("Capture dir contains %d files", numFiles) + } + } + } +} + +func countCaptureDirFiles(ctx context.Context, captureDir string) int { + numFiles := 0 + //nolint:errcheck + _ = filepath.Walk(captureDir, func(path string, info os.FileInfo, err error) error { + if ctx.Err() != nil { + return filepath.SkipAll + } + //nolint:nilerr + if err != nil { + return nil + } + + // Do not count the files in the corrupted data directory. + if info.IsDir() && info.Name() == datasync.FailedDir { + return filepath.SkipDir + } + + if info.IsDir() { + return nil + } + // this is intentionally not doing as many checkas as getAllFilesToSync because + // this is intended for debugging and does not need to be 100% accurate. + isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt + if isCompletedCaptureFile { + numFiles++ + } + return nil + }) + return numFiles +} diff --git a/services/datamanager/builtin/file_deletion_test.go b/services/datamanager/builtin/file_deletion_test.go index f6127b65701..65188ddbcd8 100644 --- a/services/datamanager/builtin/file_deletion_test.go +++ b/services/datamanager/builtin/file_deletion_test.go @@ -147,7 +147,9 @@ func TestFileDeletion(t *testing.T) { var syncer datasync.Manager if tc.syncEnabled { - s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines) + filesToSync := make(chan string) + defer close(filesToSync) + s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines, filesToSync) test.That(t, err, test.ShouldBeNil) syncer = s defer syncer.Close() diff --git a/services/datamanager/builtin/sync_test.go b/services/datamanager/builtin/sync_test.go index 22b737868ab..936e882b2da 100644 --- a/services/datamanager/builtin/sync_test.go +++ b/services/datamanager/builtin/sync_test.go @@ -474,6 +474,14 @@ func TestArbitraryFileUpload(t *testing.T) { } func TestStreamingDCUpload(t *testing.T) { + // Set max unary file size to 1 byte, so it uses the streaming rpc. Reset the original + // value such that other tests take the correct code paths. + origSize := datasync.MaxUnaryFileSize + datasync.MaxUnaryFileSize = 1 + defer func() { + datasync.MaxUnaryFileSize = origSize + }() + tests := []struct { name string serviceFail bool @@ -537,8 +545,6 @@ func TestStreamingDCUpload(t *testing.T) { streamingDCUploads: make(chan *mockStreamingDCClient, 10), fail: &f, } - // Set max unary file size to 1 byte, so it uses the streaming rpc. - datasync.MaxUnaryFileSize = 1 newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient)) cfg.CaptureDisabled = true cfg.ScheduledSyncDisabled = true @@ -919,9 +925,9 @@ func (m *mockStreamingDCClient) CloseSend() error { func getTestSyncerConstructorMock(client mockDataSyncServiceClient) datasync.ManagerConstructor { return func(identity string, _ v1.DataSyncServiceClient, logger logging.Logger, - viamCaptureDotDir string, maxSyncThreads int, + viamCaptureDotDir string, maxSyncThreads int, filesToSync chan string, ) (datasync.Manager, error) { - return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads) + return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads, make(chan string)) } } diff --git a/services/datamanager/datasync/noop_sync.go b/services/datamanager/datasync/noop_sync.go index 75ccc30aae5..f3291ac0f5a 100644 --- a/services/datamanager/datasync/noop_sync.go +++ b/services/datamanager/datasync/noop_sync.go @@ -1,7 +1,5 @@ package datasync -import "time" - type noopManager struct{} var _ Manager = (*noopManager)(nil) @@ -11,7 +9,7 @@ func NewNoopManager() Manager { return &noopManager{} } -func (m *noopManager) SyncFile(path string, stopAfter time.Time) {} +func (m *noopManager) SyncFile(path string) {} func (m *noopManager) SetArbitraryFileTags(tags []string) {} @@ -21,4 +19,6 @@ func (m *noopManager) MarkInProgress(path string) bool { return true } +func (m *noopManager) SendFileToSync(path string) {} + func (m *noopManager) UnmarkInProgress(path string) {} diff --git a/services/datamanager/datasync/sync.go b/services/datamanager/datasync/sync.go index 34d2b09bdbf..86a24232376 100644 --- a/services/datamanager/datasync/sync.go +++ b/services/datamanager/datasync/sync.go @@ -41,7 +41,8 @@ const MaxParallelSyncRoutines = 1000 // Manager is responsible for enqueuing files in captureDir and uploading them to the cloud. type Manager interface { - SyncFile(path string, stopAfter time.Time) + SendFileToSync(path string) + SyncFile(path string) SetArbitraryFileTags(tags []string) Close() MarkInProgress(path string) bool @@ -65,32 +66,32 @@ type syncer struct { closed atomic.Bool logRoutine sync.WaitGroup - syncRoutineTracker chan struct{} + filesToSync chan string captureDir string } // ManagerConstructor is a function for building a Manager. type ManagerConstructor func(identity string, client v1.DataSyncServiceClient, logger logging.Logger, - captureDir string, maxSyncThreadsConfig int) (Manager, error) + captureDir string, maxSyncThreadsConfig int, filesToSync chan string) (Manager, error) // NewManager returns a new syncer. func NewManager(identity string, client v1.DataSyncServiceClient, logger logging.Logger, - captureDir string, maxSyncThreads int, + captureDir string, maxSyncThreads int, filesToSync chan string, ) (Manager, error) { cancelCtx, cancelFunc := context.WithCancel(context.Background()) logger.Debugf("Making new syncer with %d max threads", maxSyncThreads) ret := syncer{ - partID: identity, - client: client, - logger: logger, - cancelCtx: cancelCtx, - cancelFunc: cancelFunc, - arbitraryFileTags: []string{}, - inProgress: make(map[string]bool), - syncErrs: make(chan error, 10), - syncRoutineTracker: make(chan struct{}, maxSyncThreads), - captureDir: captureDir, + partID: identity, + client: client, + logger: logger, + cancelCtx: cancelCtx, + cancelFunc: cancelFunc, + arbitraryFileTags: []string{}, + inProgress: make(map[string]bool), + syncErrs: make(chan error, 10), + filesToSync: filesToSync, + captureDir: captureDir, } ret.logRoutine.Add(1) goutils.PanicCapturingGo(func() { @@ -98,6 +99,27 @@ func NewManager(identity string, client v1.DataSyncServiceClient, logger logging ret.logSyncErrs() }) + for i := 0; i < maxSyncThreads; i++ { + ret.backgroundWorkers.Add(1) + go func() { + defer ret.backgroundWorkers.Done() + for { + if cancelCtx.Err() != nil { + return + } + select { + case <-cancelCtx.Done(): + return + case path, ok := <-ret.filesToSync: + if !ok { + return + } + ret.SyncFile(path) + } + } + }() + } + return &ret, nil } @@ -114,79 +136,47 @@ func (s *syncer) SetArbitraryFileTags(tags []string) { s.arbitraryFileTags = tags } -func (s *syncer) SyncFile(path string, stopAfter time.Time) { +func (s *syncer) SendFileToSync(path string) { + select { + case s.filesToSync <- path: + return + case <-s.cancelCtx.Done(): + return + } +} + +func (s *syncer) SyncFile(path string) { // If the file is already being synced, do not kick off a new goroutine. // The goroutine will again check and return early if sync is already in progress. - s.progressLock.Lock() - if s.inProgress[path] { - s.progressLock.Unlock() + if !s.MarkInProgress(path) { return } - s.progressLock.Unlock() - - for { - if s.cancelCtx.Err() != nil { - return - } - - if time.Now().After(stopAfter) { - return + defer s.UnmarkInProgress(path) + //nolint:gosec + f, err := os.Open(path) + if err != nil { + // Don't log if the file does not exist, because that means it was successfully synced and deleted + // in between paths being built and this executing. + if !errors.Is(err, os.ErrNotExist) { + s.logger.Errorw("error opening file", "error", err) } + return + } - select { - case <-s.cancelCtx.Done(): - return - // Kick off a sync goroutine if under the limit of goroutines. - case s.syncRoutineTracker <- struct{}{}: - s.backgroundWorkers.Add(1) - - goutils.PanicCapturingGo(func() { - defer s.backgroundWorkers.Done() - // At the end, decrement the number of sync routines. - defer func() { - <-s.syncRoutineTracker - }() - select { - case <-s.cancelCtx.Done(): - return - default: - if !s.MarkInProgress(path) { - return - } - defer s.UnmarkInProgress(path) - //nolint:gosec - f, err := os.Open(path) - if err != nil { - // Don't log if the file does not exist, because that means it was successfully synced and deleted - // in between paths being built and this executing. - if !errors.Is(err, os.ErrNotExist) { - s.logger.Errorw("error opening file", "error", err) - } - return - } - - if datacapture.IsDataCaptureFile(f) { - captureFile, err := datacapture.ReadFile(f) - if err != nil { - if err = f.Close(); err != nil { - s.syncErrs <- errors.Wrap(err, "error closing data capture file") - } - if err := moveFailedData(f.Name(), s.captureDir); err != nil { - s.syncErrs <- errors.Wrap(err, fmt.Sprintf("error moving corrupted data %s", f.Name())) - } - return - } - s.syncDataCaptureFile(captureFile) - } else { - s.syncArbitraryFile(f) - } - } - }) - + if datacapture.IsDataCaptureFile(f) { + captureFile, err := datacapture.ReadFile(f) + if err != nil { + if err = f.Close(); err != nil { + s.syncErrs <- errors.Wrap(err, "error closing data capture file") + } + if err := moveFailedData(f.Name(), s.captureDir); err != nil { + s.syncErrs <- errors.Wrap(err, fmt.Sprintf("error moving corrupted data %s", f.Name())) + } return - default: - // Avoid blocking main thread if currently at goroutine capacity. } + s.syncDataCaptureFile(captureFile) + } else { + s.syncArbitraryFile(f) } } @@ -255,7 +245,7 @@ func (s *syncer) MarkInProgress(path string) bool { s.progressLock.Lock() defer s.progressLock.Unlock() if s.inProgress[path] { - s.logger.Warnw("File already in progress, trying to mark it again", "file", path) + s.logger.Debugw("File already in progress, trying to mark it again", "file", path) return false } s.inProgress[path] = true