Skip to content

Commit

Permalink
Transfer - Make high frequency changes atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
yahavi committed Dec 6, 2023
1 parent 4b2b4a4 commit 87ca001
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 30 deletions.
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/fileserror.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *errorsRetryPhase) handleErrorsFile(errFilePath string, pcWrapper *produ
// Since we're about to handle the transfer retry of the failed files,
// we should now decrement the failures counter view.
e.progressBar.changeNumberOfFailuresBy(-1 * len(failedFiles.Errors))
err = e.stateManager.ChangeTransferFailureCountBy(uint(len(failedFiles.Errors)), false)
err = e.stateManager.ChangeTransferFailureCountBy(uint64(len(failedFiles.Errors)), false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
VisitedFolders uint64 `json:"visited_folders,omitempty"`
DelayedFiles uint64 `json:"delayed_files,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TransferFailures uint64 `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}
Expand Down
43 changes: 18 additions & 25 deletions artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,53 +136,54 @@ func (ts *TransferStateManager) SetRepoFullTransferCompleted() error {
// Increasing Transferred Diff files (modified files) and SizeByBytes value in suitable repository progress state
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase1(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
err := ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase1Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase1Info.TransferredUnits += chunkTotalFiles
AtomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredSizeBytes, chunkTotalSizeInBytes, true)
AtomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredUnits, chunkTotalFiles, true)
return nil
})
if err != nil {
return err
}
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.OverallTransfer.TransferredSizeBytes += chunkTotalSizeInBytes
transferRunStatus.OverallTransfer.TransferredUnits += chunkTotalFiles
AtomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredSizeBytes, chunkTotalSizeInBytes, true)
AtomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredUnits, chunkTotalFiles, true)

if transferRunStatus.BuildInfoRepo {
transferRunStatus.OverallBiFiles.TransferredUnits += chunkTotalFiles
AtomicallyAddInt64(&transferRunStatus.OverallBiFiles.TransferredUnits, chunkTotalFiles, true)
}
return nil
})
}

func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase2(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase2Info.TransferredUnits += chunkTotalFiles
AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredSizeBytes, chunkTotalSizeInBytes, true)
AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredUnits, chunkTotalFiles, true)
return nil
})
}

func (ts *TransferStateManager) IncTotalSizeAndFilesPhase2(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TotalSizeBytes += totalSize
state.CurrentRepo.Phase2Info.TotalUnits += filesNumber
AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalSizeBytes, totalSize, true)
AtomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalUnits, filesNumber, true)
return nil
})
}

// Set relevant information of files and storage we need to transfer in phase3
func (ts *TransferStateManager) SetTotalSizeAndFilesPhase3(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TotalSizeBytes = totalSize
state.CurrentRepo.Phase3Info.TotalUnits = filesNumber
AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalSizeBytes, totalSize, true)
AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalUnits, filesNumber, true)
return nil
})
}

// Increase transferred storage and files in phase 3
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase3(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase3Info.TransferredUnits += chunkTotalFiles
AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredSizeBytes, chunkTotalSizeInBytes, true)
AtomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredUnits, chunkTotalFiles, true)
return nil
})
}
Expand Down Expand Up @@ -288,29 +289,21 @@ func (ts *TransferStateManager) GetDiffHandlingRange() (start, end time.Time, er

func (ts *TransferStateManager) IncVisitedFolders() error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.VisitedFolders++
AtomicallyAddUint64(&transferRunStatus.VisitedFolders, 1, true)
return nil
})
}

func (ts *TransferStateManager) ChangeDelayedFilesCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.DelayedFiles += count
} else {
transferRunStatus.DelayedFiles -= count
}
AtomicallyAddUint64(&transferRunStatus.DelayedFiles, count, increase)
return nil
})
}

func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint, increase bool) error {
func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.TransferFailures += count
} else {
transferRunStatus.TransferFailures -= count
}
AtomicallyAddUint64(&transferRunStatus.TransferFailures, count, increase)
return nil
})
}
Expand Down
53 changes: 51 additions & 2 deletions artifactory/commands/transferfiles/state/statemanager_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package state

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -181,11 +182,11 @@ func TestChangeTransferFailureCountBy(t *testing.T) {
// Increase failures count
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(2, true))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(4, true))
assert.Equal(t, uint(6), stateManager.TransferFailures)
assert.Equal(t, uint64(6), stateManager.TransferFailures)

// Decrease failures count
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(3, false))
assert.Equal(t, uint(3), stateManager.TransferFailures)
assert.Equal(t, uint64(3), stateManager.TransferFailures)
}

func assertReposTransferredSize(t *testing.T, stateManager *TransferStateManager, expectedSize int64, repoKeys ...string) {
Expand Down Expand Up @@ -310,3 +311,51 @@ func TestGetRunningTimeString(t *testing.T) {
})
}
}

func TestStateConcurrency(t *testing.T) {
stateManager, cleanUp := InitStateTest(t)
defer cleanUp()

// Concurrently increment variables in the state
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase1(1, 1))
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase2(1, 1))
assert.NoError(t, stateManager.IncTransferredSizeAndFilesPhase3(1, 1))
assert.NoError(t, stateManager.IncVisitedFolders())
assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, true))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, true))
}()
}
wg.Wait()

// Assert 1000 in all values
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase1Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase2Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.CurrentRepo.Phase3Info.TransferredUnits))
assert.Equal(t, 1000, int(stateManager.OverallTransfer.TransferredSizeBytes))
assert.Equal(t, 1000, int(stateManager.VisitedFolders))
assert.Equal(t, 1000, int(stateManager.DelayedFiles))
assert.Equal(t, 1000, int(stateManager.TransferFailures))

// Concurrently decrement delayed artifacts and transfer failures
for i := 0; i < 500; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, stateManager.ChangeDelayedFilesCountBy(1, false))
assert.NoError(t, stateManager.ChangeTransferFailureCountBy(1, false))
}()
}
wg.Wait()

// Assert 500 in delayed artifacts and transfer failures
assert.Equal(t, 500, int(stateManager.DelayedFiles))
assert.Equal(t, 500, int(stateManager.TransferFailures))
}
17 changes: 17 additions & 0 deletions artifactory/commands/transferfiles/state/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/jfrog/build-info-go/utils"
Expand Down Expand Up @@ -121,3 +122,19 @@ func GetOldTransferDirectoryStructureError() error {
}
return errorutils.CheckErrorf(oldTransferDirectoryStructureErrorFormat, transferDir)
}

func AtomicallyAddInt64(addr *int64, delta int64, increase bool) {
if increase {
atomic.AddInt64(addr, delta)
} else {
atomic.AddInt64(addr, -delta)
}
}

func AtomicallyAddUint64(addr *uint64, delta uint64, increase bool) {
if increase {
atomic.AddUint64(addr, delta)
} else {
atomic.AddUint64(addr, ^uint64(delta-1))

Check failure on line 138 in artifactory/commands/transferfiles/state/utils.go

View workflow job for this annotation

GitHub Actions / Static-Check

unnecessary conversion (unconvert)
}
}
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (tdc *TransferFilesCommand) initStateManager(allSourceLocalRepos, sourceBui
if e != nil {
return e
}
tdc.stateManager.TransferFailures = uint(numberInitialErrors)
tdc.stateManager.TransferFailures = uint64(numberInitialErrors)

numberInitialDelays, e := getDelayedFilesCount(allSourceLocalRepos)
if e != nil {
Expand Down

0 comments on commit 87ca001

Please sign in to comment.