Skip to content

Commit

Permalink
Merge branch 'dev' into add-exclude-flag-to-publish
Browse files Browse the repository at this point in the history
  • Loading branch information
eyalbe4 authored Jun 28, 2023
2 parents 1e65e42 + 114cca0 commit 5d04ce3
Show file tree
Hide file tree
Showing 31 changed files with 1,211 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,12 @@ func (w *SplitContentWriter) closeCurrentFile() error {
return err
}
if w.writer.GetFilePath() != "" {
fullPath := filepath.Join(w.dirPath, fmt.Sprintf("%s-%d.json", w.filePrefix, w.fileIndex))
fullPath, err := getUniqueErrorOrDelayFilePath(w.dirPath, func() string {
return w.filePrefix
})
if err != nil {
return err
}
log.Debug(fmt.Sprintf("Saving split content JSON file to: %s.", fullPath))
if err := fileutils.MoveFile(w.writer.GetFilePath(), fullPath); err != nil {
return fmt.Errorf("saving file failed! failed moving %s to %s: %w", w.writer.GetFilePath(), fullPath, err)
Expand Down
32 changes: 16 additions & 16 deletions artifactory/commands/transferfiles/errorshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jfrog/jfrog-client-go/utils/io/fileutils"
"github.com/jfrog/jfrog-client-go/utils/log"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -42,9 +41,7 @@ type TransferErrorsMng struct {
type errorWriter struct {
writer *content.ContentWriter
errorCount int
// In case we have multiple errors files - we index them
fileIndex int
filePath string
filePath string
}

type errorWriterMng struct {
Expand Down Expand Up @@ -116,7 +113,7 @@ func (mng *TransferErrorsMng) start() (err error) {
if err != nil {
return err
}
writerRetry, retryFilePath, err := mng.newContentWriter(retryablePath, 0)
writerRetry, retryFilePath, err := mng.newUniqueContentWriter(retryablePath)
if err != nil {
return err
}
Expand All @@ -126,14 +123,14 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.retryable = errorWriter{writer: writerRetry, fileIndex: 0, filePath: retryFilePath}
writerMng.retryable = errorWriter{writer: writerRetry, filePath: retryFilePath}
// Init the content writer which is responsible for writing 'skipped errors' into files.
// In the next run we won't retry and upload those files.
skippedPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
writerSkip, skipFilePath, err := mng.newContentWriter(skippedPath, 0)
writerSkip, skipFilePath, err := mng.newUniqueContentWriter(skippedPath)
if err != nil {
return err
}
Expand All @@ -143,7 +140,7 @@ func (mng *TransferErrorsMng) start() (err error) {
err = e
}
}()
writerMng.skipped = errorWriter{writer: writerSkip, fileIndex: 0, filePath: skipFilePath}
writerMng.skipped = errorWriter{writer: writerSkip, filePath: skipFilePath}
mng.errorWriterMng = writerMng

// Read errors from channel and write them to files.
Expand All @@ -156,17 +153,22 @@ func (mng *TransferErrorsMng) start() (err error) {
return
}

func (mng *TransferErrorsMng) newContentWriter(dirPath string, index int) (*content.ContentWriter, string, error) {
func (mng *TransferErrorsMng) newUniqueContentWriter(dirPath string) (*content.ContentWriter, string, error) {
writer, err := content.NewContentWriter("errors", true, false)
if err != nil {
return nil, "", err
}
errorsFilePath := filepath.Join(dirPath, getErrorsFileName(mng.repoKey, mng.phaseId, mng.phaseStartTime, index))
errorsFilePath, err := getUniqueErrorOrDelayFilePath(dirPath, func() string {
return getErrorsFileNamePrefix(mng.repoKey, mng.phaseId, mng.phaseStartTime)
})
if err != nil {
return nil, "", err
}
return writer, errorsFilePath, nil
}

func getErrorsFileName(repoKey string, phaseId int, phaseStartTime string, index int) string {
return fmt.Sprintf("%s-%d-%s-%d.json", repoKey, phaseId, phaseStartTime, index)
func getErrorsFileNamePrefix(repoKey string, phaseId int, phaseStartTime string) string {
return fmt.Sprintf("%s-%d-%s", repoKey, phaseId, phaseStartTime)
}

func (mng *TransferErrorsMng) writeErrorContent(e ExtendedFileUploadStatusResponse) error {
Expand Down Expand Up @@ -197,12 +199,11 @@ func (mng *TransferErrorsMng) writeSkippedErrorContent(e ExtendedFileUploadStatu
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.skipped.fileIndex++
dirPath, err := getJfrogTransferRepoSkippedDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.skipped.fileIndex)
mng.errorWriterMng.skipped.writer, mng.errorWriterMng.skipped.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand All @@ -222,12 +223,11 @@ func (mng *TransferErrorsMng) writeRetryableErrorContent(e ExtendedFileUploadSta
return err
}
// Initialize variables for new errors file
mng.errorWriterMng.retryable.fileIndex++
dirPath, err := getJfrogTransferRepoRetryableDir(mng.repoKey)
if err != nil {
return err
}
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newContentWriter(dirPath, mng.errorWriterMng.retryable.fileIndex)
mng.errorWriterMng.retryable.writer, mng.errorWriterMng.retryable.filePath, err = mng.newUniqueContentWriter(dirPath)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/errorshandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ func writeEmptyErrorsFile(t *testing.T, repoKey string, retryable bool, phase, c
assert.NoError(t, err)
assert.NoError(t, fileutils.CreateDirIfNotExist(errorsDirPath))

fileName := getErrorsFileName(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now()), counter)
fileName := fmt.Sprintf("%s-%d.json", getErrorsFileNamePrefix(repoKey, phase, state.ConvertTimeToEpochMilliseconds(time.Now())), counter)
assert.NoError(t, os.WriteFile(filepath.Join(errorsDirPath, fileName), nil, 0644))
}
34 changes: 34 additions & 0 deletions artifactory/commands/transferfiles/filediff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package transferfiles

import (
"testing"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
servicesUtils "github.com/jfrog/jfrog-client-go/artifactory/services/utils"
"github.com/stretchr/testify/assert"
)

var convertResultsToFileRepresentationTestCases = []struct {
input servicesUtils.ResultItem
expectedOutput api.FileRepresentation
}{
{
servicesUtils.ResultItem{Repo: repo1Key, Path: "path-in-repo", Name: "file-name", Type: "file", Size: 100},
api.FileRepresentation{Repo: repo1Key, Path: "path-in-repo", Name: "file-name", Size: 100},
},
{
servicesUtils.ResultItem{Repo: repo1Key, Path: "path-in-repo", Name: "folder-name", Type: "folder"},
api.FileRepresentation{Repo: repo1Key, Path: "path-in-repo/folder-name"},
},
{
servicesUtils.ResultItem{Repo: repo1Key, Path: ".", Name: "folder-name", Type: "folder"},
api.FileRepresentation{Repo: repo1Key, Path: "folder-name"},
},
}

func TestConvertResultsToFileRepresentation(t *testing.T) {
for _, testCase := range convertResultsToFileRepresentationTestCases {
files := convertResultsToFileRepresentation([]servicesUtils.ResultItem{testCase.input})
assert.Equal(t, []api.FileRepresentation{testCase.expectedOutput}, files)
}
}
47 changes: 34 additions & 13 deletions artifactory/commands/transferfiles/filesdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transferfiles

import (
"fmt"
"path"
"time"

"github.com/jfrog/gofrog/parallel"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra

paginationI := 0
for {
result, err := f.getTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationI)
result, lastPage, err := f.getTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationI)
if err != nil {
return err
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra
return err
}

if len(result) < AqlPaginationLimit {
if lastPage {
break
}
paginationI++
Expand All @@ -163,12 +164,26 @@ func (f *filesDiffPhase) handleTimeFrameFilesDiff(pcWrapper *producerConsumerWra

func convertResultsToFileRepresentation(results []servicesUtils.ResultItem) (files []api.FileRepresentation) {
for _, result := range results {
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: result.Path,
Name: result.Name,
Size: result.Size,
})
switch result.Type {
case "folder":
var pathInRepo string
if result.Path == "." {
pathInRepo = result.Name
} else {
pathInRepo = path.Join(result.Path, result.Name)
}
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: pathInRepo,
})
default:
files = append(files, api.FileRepresentation{
Repo: result.Repo,
Path: result.Path,
Name: result.Name,
Size: result.Size,
})
}
}
return
}
Expand All @@ -177,7 +192,11 @@ func convertResultsToFileRepresentation(results []servicesUtils.ResultItem) (fil
// fromTimestamp - Time in RFC3339 represents the start time
// toTimestamp - Time in RFC3339 represents the end time
// paginationOffset - Requested page
func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (result []servicesUtils.ResultItem, err error) {
// Return values:
// result - The list of changed files and folders between the input timestamps
// lastPage - True if we are in the last AQL page and it is not needed to run another AQL requests
// err - The error, if any occurred
func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (result []servicesUtils.ResultItem, lastPage bool, err error) {
var timeFrameFilesDiff *servicesUtils.AqlSearchResult
if f.packageType == docker {
// Handle Docker repositories.
Expand All @@ -187,9 +206,11 @@ func (f *filesDiffPhase) getTimeFrameFilesDiff(fromTimestamp, toTimestamp string
timeFrameFilesDiff, err = f.getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp, paginationOffset)
}
if err != nil {
return []servicesUtils.ResultItem{}, err
return []servicesUtils.ResultItem{}, true, err
}
return f.locallyGeneratedFilter.FilterLocallyGenerated(timeFrameFilesDiff.Results)
lastPage = len(timeFrameFilesDiff.Results) < AqlPaginationLimit
result, err = f.locallyGeneratedFilter.FilterLocallyGenerated(timeFrameFilesDiff.Results)
return
}

func (f *filesDiffPhase) getNonDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp string, paginationOffset int) (aqlResult *servicesUtils.AqlSearchResult, err error) {
Expand Down Expand Up @@ -242,7 +263,7 @@ func (f *filesDiffPhase) getDockerTimeFrameFilesDiff(fromTimestamp, toTimestamp

func generateDiffAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
query := fmt.Sprintf(`items.find({"$and":[{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"repo":"%s","type":"any"}]})`, fromTimestamp, toTimestamp, repoKey)
query += `.include("repo","path","name","modified","size")`
query += `.include("repo","path","name","type","modified","size")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
}
Expand All @@ -265,7 +286,7 @@ func generateGetDirContentAqlQuery(repoKey string, paths []string) string {
func generateDockerManifestAqlQuery(repoKey, fromTimestamp, toTimestamp string, paginationOffset int) string {
query := `items.find({"$and":`
query += fmt.Sprintf(`[{"repo":"%s"},{"modified":{"$gte":"%s"}},{"modified":{"$lt":"%s"}},{"$or":[{"name":"manifest.json"},{"name":"list.manifest.json"}]}`, repoKey, fromTimestamp, toTimestamp)
query += `]}).include("repo","path","name","modified")`
query += `]}).include("repo","path","name","type","modified")`
query += fmt.Sprintf(`.sort({"$asc":["modified"]}).offset(%d).limit(%d)`, paginationOffset*AqlPaginationLimit, AqlPaginationLimit)
return query
}
4 changes: 3 additions & 1 deletion artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, p
}

// Add the folder as a candidate to transfer. The reason is that we'd like to transfer only folders with properties or empty folders.
curUploadChunk.AppendUploadCandidateIfNeeded(api.FileRepresentation{Repo: m.repoKey, Path: params.relativePath, NonEmptyDir: len(result) > 0}, m.buildInfoRepo)
if params.relativePath != "." {
curUploadChunk.AppendUploadCandidateIfNeeded(api.FileRepresentation{Repo: m.repoKey, Path: params.relativePath, NonEmptyDir: len(result) > 0}, m.buildInfoRepo)
}

// Empty folder
if paginationI == 0 && len(result) == 0 {
Expand Down
16 changes: 11 additions & 5 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
if phaseBase != nil {
timeEstMng = &phaseBase.stateManager.TimeEstimationManager
}
for {
for i := 0; ; i++ {
if ShouldStop(phaseBase, nil, errorsChannelMng) {
return
}
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
// Run once per 3 minutes
if i%60 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
}

// Each uploading thread receive a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
// Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil {
log.Error("Couldn't store the stale chunks:", err.Error())
}
// When totalChunks size is zero, it means that all the tokens are uploaded,
// we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory.
// If during the polling some chunks data were lost due to network issues, either on the client or on the source,
Expand Down
13 changes: 13 additions & 0 deletions artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}

// This structure contains a collection of chunks that have been undergoing processing for over 30 minutes
type StaleChunks struct {
NodeID string `json:"node_id,omitempty"`
Chunks []StaleChunk `json:"stale_node_chunks,omitempty"`
}

type StaleChunk struct {
ChunkID string `json:"chunk_id,omitempty"`
Files []string `json:"files,omitempty"`
Sent int64 `json:"sent,omitempty"`
}

func (ts *TransferRunStatus) action(action ActionOnStatusFunc) error {
Expand Down
16 changes: 15 additions & 1 deletion artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,20 @@ func (ts *TransferStateManager) GetWorkingThreads() (workingThreads int, err err
})
}

func (ts *TransferStateManager) SetStaleChunks(staleChunks []StaleChunks) error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.StaleChunks = staleChunks
return nil
})
}

func (ts *TransferStateManager) GetStaleChunks() (staleChunks []StaleChunks, err error) {
return staleChunks, ts.action(func(transferRunStatus *TransferRunStatus) error {
staleChunks = transferRunStatus.StaleChunks
return nil
})
}

func (ts *TransferStateManager) SaveStateAndSnapshots() error {
ts.TransferState.lastSaveTimestamp = time.Now()
if err := ts.persistTransferState(false); err != nil {
Expand Down Expand Up @@ -361,7 +375,7 @@ func GetRunningTime() (runningTime string, isRunning bool, err error) {
return
}
runningSecs := int64(time.Since(time.Unix(0, startTimestamp)).Seconds())
return secondsToLiteralTime(runningSecs, ""), true, nil
return SecondsToLiteralTime(runningSecs, ""), true, nil
}

func UpdateChunkInState(stateManager *TransferStateManager, chunk *api.ChunkStatus) (err error) {
Expand Down
3 changes: 2 additions & 1 deletion artifactory/commands/transferfiles/state/timeestimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package state

import (
"fmt"

"github.com/jfrog/jfrog-cli-core/v2/artifactory/commands/transferfiles/api"
"github.com/jfrog/jfrog-cli-core/v2/artifactory/utils"

Expand Down Expand Up @@ -185,7 +186,7 @@ func (tem *TimeEstimationManager) GetEstimatedRemainingTimeString() string {
return err.Error()
}

return secondsToLiteralTime(remainingTimeSec, "About ")
return SecondsToLiteralTime(remainingTimeSec, "About ")
}

func (tem *TimeEstimationManager) isTimeEstimationAvailable() bool {
Expand Down
Loading

0 comments on commit 5d04ce3

Please sign in to comment.