From 87ca0018c621cf4b553db5d5a92582a4bc9a49ee Mon Sep 17 00:00:00 2001 From: yahavi Date: Wed, 6 Dec 2023 14:03:25 +0200 Subject: [PATCH] Transfer - Make high frequency changes atomic --- .../commands/transferfiles/fileserror.go | 2 +- .../commands/transferfiles/state/runstatus.go | 2 +- .../transferfiles/state/statemanager.go | 43 +++++++-------- .../transferfiles/state/statemanager_test.go | 53 ++++++++++++++++++- .../commands/transferfiles/state/utils.go | 17 ++++++ .../commands/transferfiles/transfer.go | 2 +- 6 files changed, 89 insertions(+), 30 deletions(-) diff --git a/artifactory/commands/transferfiles/fileserror.go b/artifactory/commands/transferfiles/fileserror.go index 47f0b867a..faa1aad52 100644 --- a/artifactory/commands/transferfiles/fileserror.go +++ b/artifactory/commands/transferfiles/fileserror.go @@ -77,7 +77,7 @@ func (e *errorsRetryPhase) handleErrorsFile(errFilePath string, pcWrapper *produ // Since we're about to handle the transfer retry of the failed files, // we should now decrement the failures counter view. e.progressBar.changeNumberOfFailuresBy(-1 * len(failedFiles.Errors)) - err = e.stateManager.ChangeTransferFailureCountBy(uint(len(failedFiles.Errors)), false) + err = e.stateManager.ChangeTransferFailureCountBy(uint64(len(failedFiles.Errors)), false) if err != nil { return err } diff --git a/artifactory/commands/transferfiles/state/runstatus.go b/artifactory/commands/transferfiles/state/runstatus.go index 5454aa8bb..2c6924f6c 100644 --- a/artifactory/commands/transferfiles/state/runstatus.go +++ b/artifactory/commands/transferfiles/state/runstatus.go @@ -40,7 +40,7 @@ type TransferRunStatus struct { WorkingThreads int `json:"working_threads,omitempty"` VisitedFolders uint64 `json:"visited_folders,omitempty"` DelayedFiles uint64 `json:"delayed_files,omitempty"` - TransferFailures uint `json:"transfer_failures,omitempty"` + TransferFailures uint64 `json:"transfer_failures,omitempty"` TimeEstimationManager `json:"time_estimation,omitempty"` StaleChunks []StaleChunks `json:"stale_chunks,omitempty"` } diff --git a/artifactory/commands/transferfiles/state/statemanager.go b/artifactory/commands/transferfiles/state/statemanager.go index 273c9d28b..6c234ac55 100644 --- a/artifactory/commands/transferfiles/state/statemanager.go +++ b/artifactory/commands/transferfiles/state/statemanager.go @@ -136,18 +136,19 @@ func (ts *TransferStateManager) SetRepoFullTransferCompleted() error { // Increasing Transferred Diff files (modified files) and SizeByBytes value in suitable repository progress state func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase1(chunkTotalFiles, chunkTotalSizeInBytes int64) error { err := ts.TransferState.Action(func(state *TransferState) error { - state.CurrentRepo.Phase1Info.TransferredSizeBytes += chunkTotalSizeInBytes - state.CurrentRepo.Phase1Info.TransferredUnits += chunkTotalFiles + AtomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredSizeBytes, chunkTotalSizeInBytes, true) + AtomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredUnits, chunkTotalFiles, true) return nil }) if err != nil { return err } return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error { - transferRunStatus.OverallTransfer.TransferredSizeBytes += chunkTotalSizeInBytes - transferRunStatus.OverallTransfer.TransferredUnits += chunkTotalFiles + AtomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredSizeBytes, chunkTotalSizeInBytes, true) + AtomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredUnits, chunkTotalFiles, true) + if transferRunStatus.BuildInfoRepo { - transferRunStatus.OverallBiFiles.TransferredUnits += chunkTotalFiles + AtomicallyAddInt64(&transferRunStatus.OverallBiFiles.TransferredUnits, chunkTotalFiles, true) } return nil }) @@ -155,16 +156,16 @@ func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase1(chunkTotalFiles func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase2(chunkTotalFiles, chunkTotalSizeInBytes int64) error { return ts.TransferState.Action(func(state *TransferState) error { - state.CurrentRepo.Phase2Info.TransferredSizeBytes += chunkTotalSizeInBytes - state.CurrentRepo.Phase2Info.TransferredUnits += chunkTotalFiles + AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredSizeBytes, chunkTotalSizeInBytes, true) + AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredUnits, chunkTotalFiles, true) return nil }) } func (ts *TransferStateManager) IncTotalSizeAndFilesPhase2(filesNumber, totalSize int64) error { return ts.TransferState.Action(func(state *TransferState) error { - state.CurrentRepo.Phase2Info.TotalSizeBytes += totalSize - state.CurrentRepo.Phase2Info.TotalUnits += filesNumber + AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalSizeBytes, totalSize, true) + AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalUnits, filesNumber, true) return nil }) } @@ -172,8 +173,8 @@ func (ts *TransferStateManager) IncTotalSizeAndFilesPhase2(filesNumber, totalSiz // Set relevant information of files and storage we need to transfer in phase3 func (ts *TransferStateManager) SetTotalSizeAndFilesPhase3(filesNumber, totalSize int64) error { return ts.TransferState.Action(func(state *TransferState) error { - state.CurrentRepo.Phase3Info.TotalSizeBytes = totalSize - state.CurrentRepo.Phase3Info.TotalUnits = filesNumber + AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalSizeBytes, totalSize, true) + AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalUnits, filesNumber, true) return nil }) } @@ -181,8 +182,8 @@ func (ts *TransferStateManager) SetTotalSizeAndFilesPhase3(filesNumber, totalSiz // Increase transferred storage and files in phase 3 func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase3(chunkTotalFiles, chunkTotalSizeInBytes int64) error { return ts.TransferState.Action(func(state *TransferState) error { - state.CurrentRepo.Phase3Info.TransferredSizeBytes += chunkTotalSizeInBytes - state.CurrentRepo.Phase3Info.TransferredUnits += chunkTotalFiles + AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredSizeBytes, chunkTotalSizeInBytes, true) + AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredUnits, chunkTotalFiles, true) return nil }) } @@ -288,29 +289,21 @@ func (ts *TransferStateManager) GetDiffHandlingRange() (start, end time.Time, er func (ts *TransferStateManager) IncVisitedFolders() error { return ts.action(func(transferRunStatus *TransferRunStatus) error { - transferRunStatus.VisitedFolders++ + AtomicallyAddUint64(&transferRunStatus.VisitedFolders, 1, true) return nil }) } func (ts *TransferStateManager) ChangeDelayedFilesCountBy(count uint64, increase bool) error { return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error { - if increase { - transferRunStatus.DelayedFiles += count - } else { - transferRunStatus.DelayedFiles -= count - } + AtomicallyAddUint64(&transferRunStatus.DelayedFiles, count, increase) return nil }) } -func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint, increase bool) error { +func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint64, increase bool) error { return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error { - if increase { - transferRunStatus.TransferFailures += count - } else { - transferRunStatus.TransferFailures -= count - } + AtomicallyAddUint64(&transferRunStatus.TransferFailures, count, increase) return nil }) } diff --git a/artifactory/commands/transferfiles/state/statemanager_test.go b/artifactory/commands/transferfiles/state/statemanager_test.go index 62db05c9e..2db3f9760 100644 --- a/artifactory/commands/transferfiles/state/statemanager_test.go +++ b/artifactory/commands/transferfiles/state/statemanager_test.go @@ -1,6 +1,7 @@ package state import ( + "sync" "testing" "time" @@ -181,11 +182,11 @@ func TestChangeTransferFailureCountBy(t *testing.T) { // Increase failures count assert.NoError(t, stateManager.ChangeTransferFailureCountBy(2, true)) assert.NoError(t, stateManager.ChangeTransferFailureCountBy(4, true)) - assert.Equal(t, uint(6), stateManager.TransferFailures) + assert.Equal(t, uint64(6), stateManager.TransferFailures) // Decrease failures count assert.NoError(t, stateManager.ChangeTransferFailureCountBy(3, false)) - assert.Equal(t, uint(3), stateManager.TransferFailures) + assert.Equal(t, uint64(3), stateManager.TransferFailures) } func assertReposTransferredSize(t *testing.T, stateManager *TransferStateManager, expectedSize int64, repoKeys ...string) { @@ -310,3 +311,51 @@ func TestGetRunningTimeString(t *testing.T) { }) } } + +func TestStateConcurrency(t *testing.T) { + stateManager, cleanUp := InitStateTest(t) + defer cleanUp() + + // Concurrently increment variables in the state + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase1(1, 1)) + assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase2(1, 1)) + assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase3(1, 1)) + assert.NoError(t, stateManager.IncVisitedFolders()) + assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, true)) + assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, true)) + }() + } + wg.Wait() + + // Assert 1000 in all values + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredSizeBytes)) + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredUnits)) + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredSizeBytes)) + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredUnits)) + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredSizeBytes)) + assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredUnits)) + assert.Equal(t, 1000, int(stateManager.OverallTransfer.TransferredSizeBytes)) + assert.Equal(t, 1000, int(stateManager.VisitedFolders)) + assert.Equal(t, 1000, int(stateManager.DelayedFiles)) + assert.Equal(t, 1000, int(stateManager.TransferFailures)) + + // Concurrently decrement delayed artifacts and transfer failures + for i := 0; i < 500; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, false)) + assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, false)) + }() + } + wg.Wait() + + // Assert 500 in delayed artifacts and transfer failures + assert.Equal(t, 500, int(stateManager.DelayedFiles)) + assert.Equal(t, 500, int(stateManager.TransferFailures)) +} diff --git a/artifactory/commands/transferfiles/state/utils.go b/artifactory/commands/transferfiles/state/utils.go index 6f87a5711..c80de5a0a 100644 --- a/artifactory/commands/transferfiles/state/utils.go +++ b/artifactory/commands/transferfiles/state/utils.go @@ -5,6 +5,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" "github.com/jfrog/build-info-go/utils" @@ -121,3 +122,19 @@ func GetOldTransferDirectoryStructureError() error { } return errorutils.CheckErrorf(oldTransferDirectoryStructureErrorFormat, transferDir) } + +func AtomicallyAddInt64(addr *int64, delta int64, increase bool) { + if increase { + atomic.AddInt64(addr, delta) + } else { + atomic.AddInt64(addr, -delta) + } +} + +func AtomicallyAddUint64(addr *uint64, delta uint64, increase bool) { + if increase { + atomic.AddUint64(addr, delta) + } else { + atomic.AddUint64(addr, ^uint64(delta-1)) + } +} diff --git a/artifactory/commands/transferfiles/transfer.go b/artifactory/commands/transferfiles/transfer.go index 03ecd3848..0eb9a4331 100644 --- a/artifactory/commands/transferfiles/transfer.go +++ b/artifactory/commands/transferfiles/transfer.go @@ -236,7 +236,7 @@ func (tdc *TransferFilesCommand) initStateManager(allSourceLocalRepos, sourceBui if e != nil { return e } - tdc.stateManager.TransferFailures = uint(numberInitialErrors) + tdc.stateManager.TransferFailures = uint64(numberInitialErrors) numberInitialDelays, e := getDelayedFilesCount(allSourceLocalRepos) if e != nil {