Skip to content

Commit

Permalink
DATA-2703 Have datasync better cope with large numbers of files to sy…
Browse files Browse the repository at this point in the history
…nc (viamrobotics#4070)

Co-authored-by: Dan Gottlieb <[email protected]>

In the event of network outages (among other circumstances), data capture will continue to create files, but data sync will be unable to make progress on them. The memory utilization of the existing behavior was ~O(number of files to sync).

With this patch the expected memory utilization is O(max sync threads config value). This approximation, notably, does not grow while the network is unavailable.
  • Loading branch information
vijayvuyyuru committed Jun 13, 2024
1 parent 7d0ed8e commit 2d0eccc
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 143 deletions.
196 changes: 141 additions & 55 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ var defaultMaxCaptureSize = int64(256 * 1024)
// Default time between disk size checks.
var filesystemPollInterval = 30 * time.Second

// Threshold number of files to check if sync is backed up (defined as >5000 files).
var minNumFiles = 5000
// Threshold number of files to check if sync is backed up (defined as >1000 files).
var minNumFiles = 1000

// Default time between checking and logging number of files in capture dir.
var captureDirSizeLogInterval = 1 * time.Minute

var (
clock = clk.New()
Expand Down Expand Up @@ -139,6 +142,7 @@ type builtIn struct {
syncRoutineCancelFn context.CancelFunc
syncer datasync.Manager
syncerConstructor datasync.ManagerConstructor
filesToSync chan string
maxSyncThreads int
cloudConnSvc cloud.ConnectionService
cloudConn rpc.ClientConn
Expand All @@ -152,6 +156,9 @@ type builtIn struct {

fileDeletionRoutineCancelFn context.CancelFunc
fileDeletionBackgroundWorkers *sync.WaitGroup

captureDirPollingCancelFn context.CancelFunc
captureDirPollingBackgroundWorkers *sync.WaitGroup
}

var viamCaptureDotDir = filepath.Join(os.Getenv("HOME"), ".viam", "capture")
Expand Down Expand Up @@ -195,14 +202,21 @@ func (svc *builtIn) Close(_ context.Context) error {
if svc.fileDeletionRoutineCancelFn != nil {
svc.fileDeletionRoutineCancelFn()
}
if svc.captureDirPollingCancelFn != nil {
svc.captureDirPollingCancelFn()
}

fileDeletionBackgroundWorkers := svc.fileDeletionBackgroundWorkers
capturePollingWorker := svc.captureDirPollingBackgroundWorkers
svc.lock.Unlock()
svc.backgroundWorkers.Wait()

if fileDeletionBackgroundWorkers != nil {
fileDeletionBackgroundWorkers.Wait()
}
if capturePollingWorker != nil {
capturePollingWorker.Wait()
}

return nil
}
Expand Down Expand Up @@ -377,6 +391,7 @@ func (svc *builtIn) closeSyncer() {
if svc.syncer != nil {
// If previously we were syncing, close the old syncer and cancel the old updateCollectors goroutine.
svc.syncer.Close()
close(svc.filesToSync)
svc.syncer = nil
}
if svc.cloudConn != nil {
Expand All @@ -399,7 +414,8 @@ func (svc *builtIn) initSyncer(ctx context.Context) error {
}

client := v1.NewDataSyncServiceClient(conn)
syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads)
svc.filesToSync = make(chan string, svc.maxSyncThreads)
syncer, err := svc.syncerConstructor(identity, client, svc.logger, svc.captureDir, svc.maxSyncThreads, svc.filesToSync)
if err != nil {
return errors.Wrap(err, "failed to initialize new syncer")
}
Expand All @@ -425,7 +441,7 @@ func (svc *builtIn) Sync(ctx context.Context, _ map[string]interface{}) error {
}

svc.lock.Unlock()
svc.sync()
svc.sync(ctx)
return nil
}

Expand Down Expand Up @@ -469,6 +485,19 @@ func (svc *builtIn) Reconfigure(
} else {
svc.captureDir = viamCaptureDotDir
}

if svc.captureDirPollingCancelFn != nil {
svc.captureDirPollingCancelFn()
}
if svc.captureDirPollingBackgroundWorkers != nil {
svc.captureDirPollingBackgroundWorkers.Wait()
}
captureDirPollCtx, captureDirCancelFunc := context.WithCancel(context.Background())
svc.captureDirPollingCancelFn = captureDirCancelFunc
svc.captureDirPollingBackgroundWorkers = &sync.WaitGroup{}
svc.captureDirPollingBackgroundWorkers.Add(1)
go logCaptureDirSize(captureDirPollCtx, svc.captureDir, svc.captureDirPollingBackgroundWorkers, svc.logger)

svc.captureDisabled = svcConfig.CaptureDisabled
// Service is disabled, so close all collectors and clear the map so we can instantiate new ones if we enable this service.
if svc.captureDisabled {
Expand Down Expand Up @@ -684,7 +713,7 @@ func (svc *builtIn) uploadData(cancelCtx context.Context, intervalMins float64)
svc.lock.Unlock()

if !isOffline() && shouldSync {
svc.sync()
svc.sync(cancelCtx)
}
} else {
svc.lock.Unlock()
Expand All @@ -701,64 +730,66 @@ func isOffline() bool {
return err != nil
}

func (svc *builtIn) sync() {
func (svc *builtIn) sync(ctx context.Context) {
svc.flushCollectors()

// Lock while retrieving any values that could be changed during reconfiguration of the data
// manager.
svc.lock.Lock()
if svc.syncer != nil {
toSync := getAllFilesToSync(svc.captureDir, svc.fileLastModifiedMillis)
for _, ap := range svc.additionalSyncPaths {
toSync = append(toSync, getAllFilesToSync(ap, svc.fileLastModifiedMillis)...)
}
syncer := svc.syncer
stopAfter := time.Now().Add(time.Duration(svc.syncIntervalMins * float64(time.Minute)))
svc.lock.Unlock()

// Only log if there are a large number of files to sync
if len(toSync) > minNumFiles {
svc.logger.Infof("Starting sync of %d files", len(toSync))
}
for _, p := range toSync {
syncer.SyncFile(p, stopAfter)
}
} else {
captureDir := svc.captureDir
fileLastModifiedMillis := svc.fileLastModifiedMillis
additionalSyncPaths := svc.additionalSyncPaths
if svc.syncer == nil {
svc.lock.Unlock()
return
}
syncer := svc.syncer
svc.lock.Unlock()

// Retrieve all files in capture dir and send them to the syncer
getAllFilesToSync(ctx, append([]string{captureDir}, additionalSyncPaths...),
fileLastModifiedMillis,
syncer,
)
}

//nolint:errcheck,nilerr
func getAllFilesToSync(dir string, lastModifiedMillis int) []string {
var filePaths []string
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
// Do not sync the files in the corrupted data directory.
if info.IsDir() && info.Name() == datasync.FailedDir {
return filepath.SkipDir
}
if info.IsDir() {
func getAllFilesToSync(ctx context.Context, dirs []string, lastModifiedMillis int, syncer datasync.Manager) {
for _, dir := range dirs {
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if ctx.Err() != nil {
return filepath.SkipAll
}
if err != nil {
return nil
}

// Do not sync the files in the corrupted data directory.
if info.IsDir() && info.Name() == datasync.FailedDir {
return filepath.SkipDir
}

if info.IsDir() {
return nil
}
// If a file was modified within the past lastModifiedMillis, do not sync it (data
// may still be being written).
timeSinceMod := clock.Since(info.ModTime())
// When using a mock clock in tests, this can be negative since the file system will still use the system clock.
// Take max(timeSinceMod, 0) to account for this.
if timeSinceMod < 0 {
timeSinceMod = 0
}
isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt &&
timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond
isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt &&
timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond
isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt
if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo {
syncer.SendFileToSync(path)
}
return nil
}
// If a file was modified within the past lastModifiedMillis, do not sync it (data
// may still be being written).
timeSinceMod := clock.Since(info.ModTime())
// When using a mock clock in tests, this can be negative since the file system will still use the system clock.
// Take max(timeSinceMod, 0) to account for this.
if timeSinceMod < 0 {
timeSinceMod = 0
}
isStuckInProgressCaptureFile := filepath.Ext(path) == datacapture.InProgressFileExt &&
timeSinceMod >= defaultFileLastModifiedMillis*time.Millisecond
isNonCaptureFileThatIsNotBeingWrittenTo := filepath.Ext(path) != datacapture.InProgressFileExt &&
timeSinceMod >= time.Duration(lastModifiedMillis)*time.Millisecond
isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt
if isCompletedCaptureFile || isStuckInProgressCaptureFile || isNonCaptureFileThatIsNotBeingWrittenTo {
filePaths = append(filePaths, path)
}
return nil
})
return filePaths
})
}
}

// Build the component configs associated with the data manager service.
Expand Down Expand Up @@ -833,3 +864,58 @@ func pollFilesystem(ctx context.Context, wg *sync.WaitGroup, captureDir string,
}
}
}

func logCaptureDirSize(ctx context.Context, captureDir string, wg *sync.WaitGroup, logger logging.Logger,
) {
t := clock.Ticker(captureDirSizeLogInterval)
defer t.Stop()
defer wg.Done()
for {
if err := ctx.Err(); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorw("data manager context closed unexpectedly", "error", err)
}
return
}
select {
case <-ctx.Done():
return
case <-t.C:
numFiles := countCaptureDirFiles(ctx, captureDir)
if numFiles > minNumFiles {
logger.Infof("Capture dir contains %d files", numFiles)
}
}
}
}

func countCaptureDirFiles(ctx context.Context, captureDir string) int {
numFiles := 0
//nolint:errcheck
_ = filepath.Walk(captureDir, func(path string, info os.FileInfo, err error) error {
if ctx.Err() != nil {
return filepath.SkipAll
}
//nolint:nilerr
if err != nil {
return nil
}

// Do not count the files in the corrupted data directory.
if info.IsDir() && info.Name() == datasync.FailedDir {
return filepath.SkipDir
}

if info.IsDir() {
return nil
}
// this is intentionally not doing as many checkas as getAllFilesToSync because
// this is intended for debugging and does not need to be 100% accurate.
isCompletedCaptureFile := filepath.Ext(path) == datacapture.FileExt
if isCompletedCaptureFile {
numFiles++
}
return nil
})
return numFiles
}
4 changes: 3 additions & 1 deletion services/datamanager/builtin/file_deletion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func TestFileDeletion(t *testing.T) {

var syncer datasync.Manager
if tc.syncEnabled {
s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines)
filesToSync := make(chan string)
defer close(filesToSync)
s, err := datasync.NewManager("rick astley", mockClient, logger, tempCaptureDir, datasync.MaxParallelSyncRoutines, filesToSync)
test.That(t, err, test.ShouldBeNil)
syncer = s
defer syncer.Close()
Expand Down
14 changes: 10 additions & 4 deletions services/datamanager/builtin/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,14 @@ func TestArbitraryFileUpload(t *testing.T) {
}

func TestStreamingDCUpload(t *testing.T) {
// Set max unary file size to 1 byte, so it uses the streaming rpc. Reset the original
// value such that other tests take the correct code paths.
origSize := datasync.MaxUnaryFileSize
datasync.MaxUnaryFileSize = 1
defer func() {
datasync.MaxUnaryFileSize = origSize
}()

tests := []struct {
name string
serviceFail bool
Expand Down Expand Up @@ -537,8 +545,6 @@ func TestStreamingDCUpload(t *testing.T) {
streamingDCUploads: make(chan *mockStreamingDCClient, 10),
fail: &f,
}
// Set max unary file size to 1 byte, so it uses the streaming rpc.
datasync.MaxUnaryFileSize = 1
newDMSvc.SetSyncerConstructor(getTestSyncerConstructorMock(mockClient))
cfg.CaptureDisabled = true
cfg.ScheduledSyncDisabled = true
Expand Down Expand Up @@ -919,9 +925,9 @@ func (m *mockStreamingDCClient) CloseSend() error {

func getTestSyncerConstructorMock(client mockDataSyncServiceClient) datasync.ManagerConstructor {
return func(identity string, _ v1.DataSyncServiceClient, logger logging.Logger,
viamCaptureDotDir string, maxSyncThreads int,
viamCaptureDotDir string, maxSyncThreads int, filesToSync chan string,
) (datasync.Manager, error) {
return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads)
return datasync.NewManager(identity, client, logger, viamCaptureDotDir, maxSyncThreads, make(chan string))
}
}

Expand Down
6 changes: 3 additions & 3 deletions services/datamanager/datasync/noop_sync.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package datasync

import "time"

type noopManager struct{}

var _ Manager = (*noopManager)(nil)
Expand All @@ -11,7 +9,7 @@ func NewNoopManager() Manager {
return &noopManager{}
}

func (m *noopManager) SyncFile(path string, stopAfter time.Time) {}
func (m *noopManager) SyncFile(path string) {}

func (m *noopManager) SetArbitraryFileTags(tags []string) {}

Expand All @@ -21,4 +19,6 @@ func (m *noopManager) MarkInProgress(path string) bool {
return true
}

func (m *noopManager) SendFileToSync(path string) {}

func (m *noopManager) UnmarkInProgress(path string) {}
Loading

0 comments on commit 2d0eccc

Please sign in to comment.