Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#415 from joereuss12/recursive-uplo…
Browse files Browse the repository at this point in the history
…ads-branch

Implement recursive uploads
  • Loading branch information
turetske authored Dec 5, 2023
2 parents 138180b + bf4ba6d commit cafecbb
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 9 deletions.
83 changes: 78 additions & 5 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func download_http(sourceUrl *url.URL, destination string, payload *payloadStruc

if recursive {
var err error
files, err = walkDavDir(sourceUrl, namespace, token)
files, err = walkDavDir(sourceUrl, namespace, token, "", false)
if err != nil {
log.Errorln("Error from walkDavDir", err)
return 0, err
Expand Down Expand Up @@ -744,6 +744,37 @@ func (pr *ProgressReader) Size() int64 {
return pr.sizer.Size()
}

// Recursively uploads a directory with all files and nested dirs, keeping file structure on server side
func UploadDirectory(src string, dest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {
var files []string
var amountDownloaded int64
srcUrl := url.URL{Path: src}
// Get the list of files as well as make any directories on the server end
files, err := walkDavDir(&srcUrl, namespace, token, dest.Path, true)
if err != nil {
return 0, err
}

// Upload all of our files within the proper directories
for _, file := range files {
tempDest := url.URL{}
tempDest.Path, err = url.JoinPath(dest.Path, file)
if err != nil {
return 0, err
}
downloaded, err := UploadFile(file, &tempDest, token, namespace)
if err != nil {
return 0, err
}
amountDownloaded += downloaded
}
// Close progress bar container
if ObjectClientOptions.ProgressBars {
progressContainer.Wait()
}
return amountDownloaded, err
}

// UploadFile Uploads a file using HTTP
func UploadFile(src string, origDest *url.URL, token string, namespace namespaces.Namespace) (int64, error) {

Expand Down Expand Up @@ -846,7 +877,12 @@ func UploadFile(src string, origDest *url.URL, token string, namespace namespace
} else {
progressBar.Abort(true)
}
progressContainer.Wait()
// If it is recursive, we need to reuse the mpb instance. Closed later
if ObjectClientOptions.Recursive {
progressBar.Wait()
} else { // If not recursive, go ahead and close it
progressContainer.Wait()
}
}()
}
tickerDuration := 500 * time.Millisecond
Expand Down Expand Up @@ -936,7 +972,7 @@ func doPut(request *http.Request, responseChan chan<- *http.Response, errorChan

}

func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string) ([]string, error) {
func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string, destPath string, upload bool) ([]string, error) {

// Create the client to walk the filesystem
rootUrl := *url
Expand All @@ -961,13 +997,50 @@ func walkDavDir(url *url.URL, namespace namespaces.Namespace, token string) ([]s
// XRootD does not like keep alives and kills things, so turn them off.
transport := config.GetTransport()
c.SetTransport(transport)

files, err := walkDir(url.Path, c)
var files []string
var err error
if upload {
files, err = walkDirUpload(url.Path, c, destPath)
} else {
files, err = walkDir(url.Path, c)
}
log.Debugln("Found files:", files)
return files, err

}

// For uploads, we want to make directories on the server end
func walkDirUpload(path string, client *gowebdav.Client, destPath string) ([]string, error) {
// List of files to return
var files []string
// Whenever this function is called, we should create a new dir on the server side for uploads
err := client.Mkdir(destPath+path, 0755)
if err != nil {
return nil, err
}

// Get our list of files
infos, err := os.ReadDir(path)
if err != nil {
return nil, err
}
for _, info := range infos {
newPath := path + "/" + info.Name()
if info.IsDir() {
// Recursively call this function to create any nested dir's as well as list their files
returnedFiles, err := walkDirUpload(newPath, client, destPath)
if err != nil {
return nil, err
}
files = append(files, returnedFiles...)
} else {
// It is a normal file
files = append(files, newPath)
}
}
return files, err
}

func walkDir(path string, client *gowebdav.Client) ([]string, error) {
var files []string
log.Debugln("Reading directory: ", path)
Expand Down
11 changes: 7 additions & 4 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ func getTokenName(destination *url.URL) (scheme, tokenName string) {
}

// Do writeback to stash using SciTokens
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace) (int64, error) {
func doWriteBack(source string, destination *url.URL, namespace namespaces.Namespace, recursive bool) (int64, error) {

scitoken_contents, err := getToken(destination, namespace, true, "")
if err != nil {
return 0, err
}
return UploadFile(source, destination, scitoken_contents, namespace)

if recursive {
return UploadDirectory(source, destination, scitoken_contents, namespace)
} else {
return UploadFile(source, destination, scitoken_contents, namespace)
}
}

// getToken returns the token to use for the given destination
Expand Down Expand Up @@ -503,7 +506,7 @@ func DoStashCPSingle(sourceFile string, destination string, methods []string, re
AddError(err)
return 0, err
}
_, err = doWriteBack(source_url.Path, dest_url, ns)
_, err = doWriteBack(source_url.Path, dest_url, ns, recursive)
AddError(err)
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions cmd/object_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func copyMain(cmd *cobra.Command, args []string) {
for _, src := range source {
var tmpDownloaded int64
isRecursive, _ := cmd.Flags().GetBool("recursive")
client.ObjectClientOptions.Recursive = isRecursive
tmpDownloaded, result = client.DoStashCPSingle(src, dest, splitMethods, isRecursive)
downloaded += tmpDownloaded
if result != nil {
Expand Down

0 comments on commit cafecbb

Please sign in to comment.