From 020ea2bf8877b484f022f46b117b3a4e423ec49e Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 6 Sep 2024 16:34:06 -0700 Subject: [PATCH 1/9] Do not perform stat without authorization If the namespace requires an authorization token, the director should not perform a `stat` for optimizing resource selection; the HEAD requests are expected to fail in that case. If a token is required for the namespace, the client will re-query the director once it's acquired an appropriate token. --- client/director.go | 10 +++++++--- client/director_test.go | 4 ++-- client/handle_http.go | 7 ++++++- client/main.go | 6 +++--- client/sharing_url.go | 2 +- cmd/config_mgr.go | 2 +- director/director.go | 10 ++++++++++ 7 files changed, 30 insertions(+), 11 deletions(-) diff --git a/client/director.go b/client/director.go index 2ad92faea..89dbd4110 100644 --- a/client/director.go +++ b/client/director.go @@ -38,7 +38,7 @@ import ( // Make a request to the director for a given verb/resource; return the // HTTP response object only if a 307 is returned. -func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) { +func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string, token string) (resp *http.Response, err error) { resourceUrl := directorUrl + sourcePath // Here we use http.Transport to prevent the client from following the director's // redirect. We use the Location url elsewhere (plus we still need to do the token @@ -63,6 +63,10 @@ func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (r // cannot. req.Header.Set("User-Agent", getUserAgent("")) + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + // Perform the HTTP request resp, err = client.Do(req) @@ -161,7 +165,7 @@ func parseServersFromDirectorResponse(resp *http.Response) (servers []*url.URL, } // Retrieve federation namespace information for a given URL. -func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl string, isPut bool, query string) (parsedResponse server_structs.DirectorResponse, err error) { +func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl string, isPut bool, query string, token string) (parsedResponse server_structs.DirectorResponse, err error) { if directorUrl == "" { return server_structs.DirectorResponse{}, errors.Errorf("unable to retrieve information from a Director for object %s because no director URL was provided", resourcePath) @@ -176,7 +180,7 @@ func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl strin resourcePath += "?" + query } var dirResp *http.Response - dirResp, err = queryDirector(ctx, verb, resourcePath, directorUrl) + dirResp, err = queryDirector(ctx, verb, resourcePath, directorUrl, token) if err != nil { if isPut && dirResp != nil && dirResp.StatusCode == 405 { err = errors.New("error 405: No writeable origins were found") diff --git a/client/director_test.go b/client/director_test.go index e9e0f0e5d..f610351f0 100644 --- a/client/director_test.go +++ b/client/director_test.go @@ -103,7 +103,7 @@ func TestQueryDirector(t *testing.T) { defer server.Close() // Call QueryDirector with the test server URL and a source path - actualResp, err := queryDirector(context.Background(), "GET", "/foo/bar", server.URL) + actualResp, err := queryDirector(context.Background(), "GET", "/foo/bar", server.URL, "") if err != nil { t.Fatal(err) } @@ -194,7 +194,7 @@ func TestGetDirectorInfoForPath(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.Background() - _, err := GetDirectorInfoForPath(ctx, tt.resourcePath, tt.directorUrl, tt.isPut, tt.query) + _, err := GetDirectorInfoForPath(ctx, tt.resourcePath, tt.directorUrl, tt.isPut, tt.query, "") if tt.expectedError != "" { assert.Error(t, err) assert.Contains(t, err.Error(), tt.expectedError) diff --git a/client/handle_http.go b/client/handle_http.go index e33871488..f1ff98d18 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -1144,7 +1144,7 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL } tj.directorUrl = pelicanURL.directorUrl - dirResp, err := GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery) + dirResp, err := GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery, "") if err != nil { log.Errorln(err) err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) @@ -1158,6 +1158,11 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL if err != nil || contents == "" { return nil, errors.Wrap(err, "failed to get token for transfer") } + + // The director response may change if it's given a token; let's repeat the query. + if tj.token != "" { + dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery, tj.token) + } } // If we are a recursive download and using the director, we want to attempt to get directory listings from diff --git a/client/main.go b/client/main.go index 26f556aa2..71615756a 100644 --- a/client/main.go +++ b/client/main.go @@ -94,7 +94,7 @@ func DoStat(ctx context.Context, destination string, options ...TransferOption) return nil, errors.Wrap(err, "Failed to generate pelicanURL object") } - dirResp, err := GetDirectorInfoForPath(ctx, destUri.Path, pelicanURL.directorUrl, false, "") + dirResp, err := GetDirectorInfoForPath(ctx, destUri.Path, pelicanURL.directorUrl, false, "", "") if err != nil { return nil, err } @@ -131,7 +131,7 @@ func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []stri return } - parsedDirResp, err := GetDirectorInfoForPath(ctx, testFile, fedInfo.DirectorEndpoint, false, "") + parsedDirResp, err := GetDirectorInfoForPath(ctx, testFile, fedInfo.DirectorEndpoint, false, "", "") if err != nil { return } @@ -275,7 +275,7 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) return nil, errors.Wrap(err, "failed to generate pelicanURL object") } - dirResp, err := GetDirectorInfoForPath(ctx, remoteObjectUrl.Path, pelicanURL.directorUrl, false, "") + dirResp, err := GetDirectorInfoForPath(ctx, remoteObjectUrl.Path, pelicanURL.directorUrl, false, "", "") if err != nil { return nil, err } diff --git a/client/sharing_url.go b/client/sharing_url.go index 5149e3205..9f97c3de8 100644 --- a/client/sharing_url.go +++ b/client/sharing_url.go @@ -89,7 +89,7 @@ func CreateSharingUrl(ctx context.Context, objectUrl *url.URL, isWrite bool) (st objectUrl.Path = "/" + strings.TrimPrefix(objectUrl.Path, "/") log.Debugln("Will query director for path", objectUrl.Path) - dirResp, err := queryDirector(ctx, "GET", objectUrl.Path, directorUrl) + dirResp, err := queryDirector(ctx, "GET", objectUrl.Path, directorUrl, "") if err != nil { log.Errorln("Error while querying the Director:", err) return "", errors.Wrapf(err, "Error while querying the director at %s", directorUrl) diff --git a/cmd/config_mgr.go b/cmd/config_mgr.go index cfb4abee1..a67800c71 100644 --- a/cmd/config_mgr.go +++ b/cmd/config_mgr.go @@ -154,7 +154,7 @@ func addTokenSubcommands(tokenCmd *cobra.Command) { os.Exit(1) } - dirResp, err := client.GetDirectorInfoForPath(cmd.Context(), dest.Path, fedInfo.DirectorEndpoint, isWrite, "") + dirResp, err := client.GetDirectorInfoForPath(cmd.Context(), dest.Path, fedInfo.DirectorEndpoint, isWrite, "", "") if err != nil { fmt.Fprintln(os.Stderr, "Failed to get director info for path:", err) os.Exit(1) diff --git a/director/director.go b/director/director.go index 2ec3a29a2..ea43d24f9 100644 --- a/director/director.go +++ b/director/director.go @@ -373,6 +373,11 @@ func redirectToCache(ginCtx *gin.Context) { log.Errorf("Failed to get depth attribute for the redirecting request to %q, with best match namespace prefix %q", reqPath, namespaceAd.Path) } + // If the namespace requires a token yet there's no token available, skip the stat. + if !namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "" { + skipStat = true + } + // Stat origins and caches for object availability // For origins, we only want ones with the object // For caches, we still list all in the response but turn down priorities for ones that don't have the object @@ -574,6 +579,11 @@ func redirectToOrigin(ginCtx *gin.Context) { return } + // If the namespace requires a token yet there's no token available, skip the stat. + if !namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "" { + skipStat = true + } + var q *ObjectStat availableAds := []server_structs.ServerAd{} From c006738bceec077af0b4dadcba3cc447d32557f5 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 6 Sep 2024 16:37:42 -0700 Subject: [PATCH 2/9] Do not hang when an empty directory is transferred The transfer job was not being correctly marked as complete if an empty directory was specified (as the directory was never marked as "done" when there were zero total files found). --- client/handle_http.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index f1ff98d18..1acaa8235 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -1008,9 +1008,9 @@ func (te *TransferEngine) runMux() error { // Notification that a job has been processed into files (or failed) job := recv.Interface().(*clientTransferJob) job.job.lookupDone.Store(true) - // If no transfers were created and we have an error, the job is no + // If no transfers were created or we have an error, the job is no // longer active - if job.job.lookupErr != nil && job.job.totalXfer == 0 { + if job.job.lookupErr != nil || job.job.totalXfer == 0 { // Remove this job from the list of active jobs for the client. activeJobs[job.uuid] = slices.DeleteFunc(activeJobs[job.uuid], func(oldJob *TransferJob) bool { return oldJob.uuid == job.job.uuid @@ -2470,6 +2470,7 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer attempts: transfers, }, }: + job.job.totalXfer += 1 } } } @@ -2519,6 +2520,7 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran attempts: transfers, }, }: + job.job.totalXfer += 1 } } } From 02acb1881f6053d75cac7080b67fbc50336d3977 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 6 Sep 2024 16:42:17 -0700 Subject: [PATCH 3/9] Add synchronization functionality to the client The new "sync" feature allows the client to skip data movement if the destination object or file already exists (optionally checking for matching size between source and destination). --- client/fed_linux_test.go | 275 +++++++++++++++++++++++++++++++++++++++ client/handle_http.go | 89 ++++++++++++- 2 files changed, 357 insertions(+), 7 deletions(-) diff --git a/client/fed_linux_test.go b/client/fed_linux_test.go index 3129605bd..3e16274a0 100644 --- a/client/fed_linux_test.go +++ b/client/fed_linux_test.go @@ -457,3 +457,278 @@ func TestFailureOnOriginDisablingListings(t *testing.T) { assert.Error(t, err) assert.Contains(t, err.Error(), "no collections URL found in director response") } + +func TestSyncUpload(t *testing.T) { + // Create instance of test federation + viper.Reset() + server_utils.ResetOriginExports() + + fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg) + + // Create a token file + issuer, err := config.GetServerIssuerURL() + require.NoError(t, err) + + tokenConfig := token.NewWLCGToken() + tokenConfig.Lifetime = time.Minute + tokenConfig.Issuer = issuer + tokenConfig.Subject = "origin" + tokenConfig.AddAudienceAny() + tokenConfig.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"), + token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/")) + token, err := tokenConfig.CreateToken() + assert.NoError(t, err) + tempToken, err := os.CreateTemp(t.TempDir(), "token") + assert.NoError(t, err, "Error creating temp token file") + defer os.Remove(tempToken.Name()) + _, err = tempToken.WriteString(token) + assert.NoError(t, err, "Error writing to temp token file") + tempToken.Close() + + // Disable progress bars to not reuse the same mpb instance + viper.Set("Logging.DisableProgressBars", true) + + // Make our test directories and files + tempDir := t.TempDir() + innerTempDir, err := os.MkdirTemp(tempDir, "InnerUploadDir") + assert.NoError(t, err) + permissions := os.FileMode(0755) + err = os.Chmod(tempDir, permissions) + require.NoError(t, err) + err = os.Chmod(innerTempDir, permissions) + require.NoError(t, err) + + testFileContent1 := "test file content" + testFileContent2 := "more test file content!" + innerTestFileContent := "this content is within another dir!" + tempFile1, err := os.CreateTemp(tempDir, "test1") + require.NoError(t, err, "Error creating temp1 file") + tempFile2, err := os.CreateTemp(tempDir, "test1") + require.NoError(t, err, "Error creating temp2 file") + innerTempFile, err := os.CreateTemp(innerTempDir, "testInner") + require.NoError(t, err, "Error creating inner test file") + + _, err = tempFile1.WriteString(testFileContent1) + require.NoError(t, err, "Error writing to temp1 file") + tempFile1.Close() + _, err = tempFile2.WriteString(testFileContent2) + require.NoError(t, err, "Error writing to temp2 file") + tempFile2.Close() + _, err = innerTempFile.WriteString(innerTestFileContent) + require.NoError(t, err, "Error writing to inner test file") + innerTempFile.Close() + + t.Run("testSyncUploadFull", func(t *testing.T) { + // Set path for object to upload/download + tempPath := tempDir + dirName := filepath.Base(tempPath) + uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName) + + // Upload the file with PUT + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsUpload) + + // Download the files we just uploaded + transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsDownload) + }) + + t.Run("testSyncUploadNone", func(t *testing.T) { + // Set path for object to upload/download + dirName := filepath.Base(tempDir) + uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_none/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName) + + // Upload the file with PUT + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsUpload) + + // Synchronize the uploaded files again. + transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + + // Should have already been uploaded once + assert.Len(t, transferDetailsUpload, 0) + }) + + t.Run("testSyncUploadPartial", func(t *testing.T) { + // Set path for object to upload/download + dirName := filepath.Base(tempDir) + uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName) + uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir)) + + // Upload some files with PUT + transferDetailsUpload, err := client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + require.Len(t, transferDetailsUpload, 1) + + // Change the contents of the already-uploaded file; changes shouldn't be detected as the size stays the same + newTestFileContent := "XXXX content is within another XXXX" + err = os.WriteFile(innerTempFile.Name(), []byte(newTestFileContent), os.FileMode(0755)) + require.NoError(t, err) + + // Upload again; this time there should be fewer uploads as the subdir was already moved. + transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + require.Len(t, transferDetailsUpload, 2) + + // Download all the objects + downloadDir := t.TempDir() + transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name())) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsDownload) + + // Verify we received the original contents, not any modified contents + contentBytes, err := os.ReadFile(filepath.Join(downloadDir, dirName, filepath.Base(innerTempDir), filepath.Base(innerTempFile.Name()))) + require.NoError(t, err) + assert.Equal(t, innerTestFileContent, string(contentBytes)) + }) +} + +func TestSyncDownload(t *testing.T) { + // Create instance of test federation + viper.Reset() + server_utils.ResetOriginExports() + + fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg) + + // Create a token file + issuer, err := config.GetServerIssuerURL() + require.NoError(t, err) + + tokenConfig := token.NewWLCGToken() + tokenConfig.Lifetime = time.Minute + tokenConfig.Issuer = issuer + tokenConfig.Subject = "origin" + tokenConfig.AddAudienceAny() + tokenConfig.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"), + token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/")) + token, err := tokenConfig.CreateToken() + assert.NoError(t, err) + tempToken, err := os.CreateTemp(t.TempDir(), "token") + assert.NoError(t, err, "Error creating temp token file") + defer os.Remove(tempToken.Name()) + _, err = tempToken.WriteString(token) + assert.NoError(t, err, "Error writing to temp token file") + tempToken.Close() + + // Disable progress bars to not reuse the same mpb instance + viper.Set("Logging.DisableProgressBars", true) + + // Make our test directories and files + tempDir := t.TempDir() + innerTempDir, err := os.MkdirTemp(tempDir, "InnerUploadDir") + assert.NoError(t, err) + permissions := os.FileMode(0755) + err = os.Chmod(tempDir, permissions) + require.NoError(t, err) + err = os.Chmod(innerTempDir, permissions) + require.NoError(t, err) + + testFileContent1 := "test file content" + testFileContent2 := "more test file content!" + innerTestFileContent := "this content is within another dir!" + tempFile1, err := os.CreateTemp(tempDir, "test1") + require.NoError(t, err, "Error creating temp1 file") + tempFile2, err := os.CreateTemp(tempDir, "test1") + require.NoError(t, err, "Error creating temp2 file") + innerTempFile, err := os.CreateTemp(innerTempDir, "testInner") + require.NoError(t, err, "Error creating inner test file") + + _, err = tempFile1.WriteString(testFileContent1) + require.NoError(t, err, "Error writing to temp1 file") + tempFile1.Close() + _, err = tempFile2.WriteString(testFileContent2) + require.NoError(t, err, "Error writing to temp2 file") + tempFile2.Close() + _, err = innerTempFile.WriteString(innerTestFileContent) + require.NoError(t, err, "Error writing to inner test file") + innerTempFile.Close() + + // Set path for object to upload/download + tempPath := tempDir + dirName := filepath.Base(tempPath) + uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName) + + // Upload the file with PUT + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsUpload) + + t.Run("testSyncDownloadFull", func(t *testing.T) { + // Download the files we just uploaded + transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsDownload) + }) + + t.Run("testSyncDownloadNone", func(t *testing.T) { + // Set path for object to upload/download + dirName := t.TempDir() + + // Synchronize the uploaded files to a local directory + transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsDownload) + + // Synchronize the files again; should result in no transfers + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + assert.NoError(t, err) + assert.Len(t, transferDetailsDownload, 0) + }) + + t.Run("testSyncDownloadPartial", func(t *testing.T) { + // Set path for object to upload/download + downloadDir := t.TempDir() + dirName := filepath.Base(tempDir) + uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName) + uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir)) + + // Upload the initial files + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + verifySuccessfulTransfer(t, transferDetailsUpload) + + // Download the inner directory + innerDownloadDir := filepath.Join(downloadDir, dirName, filepath.Base(innerTempDir)) + transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadInnerURL, innerDownloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + require.Len(t, transferDetailsDownload, 1) + + // Change the contents of one already-uploaded file and re-upload it. + // Filesize is the same so a re-download should be skipped. + newTestFileContent := "XXXX content is within another XXXX" + err = os.WriteFile(innerTempFile.Name(), []byte(newTestFileContent), os.FileMode(0755)) + require.NoError(t, err) + transferDetailsUpload, err = client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name())) + require.NoError(t, err) + require.Len(t, transferDetailsUpload, 1) + + // Download all the objects + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + assert.Len(t, transferDetailsDownload, 2) + + // Verify we received the original contents, not any modified contents + contentBytes, err := os.ReadFile(filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name()))) + require.NoError(t, err) + assert.Equal(t, innerTestFileContent, string(contentBytes)) + + // Change the local size, then re-sync + innerDownloadFile := filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name())) + log.Debugln("Overwriting old version of file", innerDownloadFile) + err = os.Remove(innerDownloadFile) + require.NoError(t, err) + err = os.WriteFile(innerDownloadFile, []byte("XXXX"), os.FileMode(0755)) + require.NoError(t, err) + log.Debugln("Re-downloading file direct from origin") + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL+"?directread", downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize)) + require.NoError(t, err) + assert.Len(t, transferDetailsDownload, 1) + contentBytes, err = os.ReadFile(filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name()))) + require.NoError(t, err) + assert.Equal(t, newTestFileContent, string(contentBytes)) + }) +} diff --git a/client/handle_http.go b/client/handle_http.go index 1acaa8235..cefafa6e7 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -23,6 +23,7 @@ import ( "context" "fmt" "io" + "io/fs" "net" "net/http" "net/http/httputil" @@ -234,6 +235,8 @@ type ( localPath string upload bool recursive bool + skipAcquire bool + syncLevel SyncLevel // Policy for handling synchronization when the destination exists prefObjServers []*url.URL // holds any client-requested caches/origins dirResp server_structs.DirectorResponse directorUrl string @@ -254,6 +257,9 @@ type ( file *transferFile } + // Different types of synchronization for recursize transfers + SyncLevel int + // An object able to process transfer jobs. TransferEngine struct { ctx context.Context // The context provided upon creation of the engine. @@ -286,9 +292,10 @@ type ( cancel context.CancelFunc callback TransferCallbackFunc engine *TransferEngine - skipAcquire bool // Enable/disable the token acquisition logic. Defaults to acquiring a token - tokenLocation string // Location of a token file to use for transfers - token string // Token that should be used for transfers + skipAcquire bool // Enable/disable the token acquisition logic. Defaults to acquiring a token + syncLevel SyncLevel // Policy for the client to synchronize data + tokenLocation string // Location of a token file to use for transfers + token string // Token that should be used for transfers work chan *TransferJob closed bool prefObjServers []*url.URL // holds any client-requested caches/origins @@ -303,6 +310,7 @@ type ( identTransferOptionTokenLocation struct{} identTransferOptionAcquireToken struct{} identTransferOptionToken struct{} + identTransferOptionSynchronize struct{} transferDetailsOptions struct { NeedsToken bool @@ -319,6 +327,10 @@ const ( projectName classAd = "ProjectName" jobId classAd = "GlobalJobId" + + SyncNone = iota // When synchronizing, always re-transfer, regardless of existence at destination. + SyncExist // Skip synchronization transfer if the destination exists + SyncSize // Skip synchronization transfer if the destination exists and matches the current source size ) // The progress container object creates several @@ -737,6 +749,14 @@ func WithAcquireToken(enable bool) TransferOption { return option.New(identTransferOptionAcquireToken{}, enable) } +// Create an option to specify the object synchronization level +// +// The synchronization level specifies what to do if the destination +// object already exists. +func WithSynchronize(level SyncLevel) TransferOption { + return option.New(identTransferOptionSynchronize{}, level) +} + // Create a new client to work with an engine func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error) { log.Debugln("Making new clients") @@ -765,6 +785,8 @@ func (te *TransferEngine) NewClient(options ...TransferOption) (client *Transfer client.skipAcquire = !option.Value().(bool) case identTransferOptionToken{}: client.token = option.Value().(string) + case identTransferOptionSynchronize{}: + client.syncLevel = option.Value().(SyncLevel) } } func() { @@ -1103,6 +1125,9 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL localPath: localPath, remoteURL: ©Url, callback: tc.callback, + skipAcquire: tc.skipAcquire, + syncLevel: tc.syncLevel, + tokenLocation: tc.tokenLocation, upload: upload, uuid: id, project: project, @@ -1140,6 +1165,8 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL tj.token.EnableAcquire = option.Value().(bool) case identTransferOptionToken{}: tj.token.SetToken(option.Value().(string)) + case identTransferOptionSynchronize{}: + tj.syncLevel = option.Value().(SyncLevel) } } @@ -2415,6 +2442,52 @@ func createWebDavClient(collectionsUrl *url.URL, token *tokenGenerator, project return } +// Depending on the synchronization policy, decide if a object download should be skipped +func skipDownload(syncLevel SyncLevel, remoteInfo fs.FileInfo, localPath string) bool { + if syncLevel == SyncNone { + return false + } + localInfo, err := os.Stat(localPath) + if err != nil { + return false + } + switch syncLevel { + case SyncExist: + return true + case SyncSize: + return localInfo.Size() == remoteInfo.Size() + } + return false +} + +// Depending on the synchronization policy, decide if the upload should be skipped +func skipUpload(job *TransferJob, localPath, remotePath string) bool { + if job.syncLevel == SyncNone { + return false + } + + localInfo, err := os.Stat(localPath) + if err != nil { + return false + } + + remoteUrl := url.URL{ + Path: remotePath, + } + remoteInfo, err := statHttp(&remoteUrl, job.dirResp, job.token) + if err != nil { + return false + } + + switch job.syncLevel { + case SyncExist: + return true + case SyncSize: + return localInfo.Size() == remoteInfo.Size + } + return false +} + // Walk a remote collection in a WebDAV server, emitting the files discovered func (te *TransferEngine) walkDirDownload(job *clientTransferJob, transfers []transferAttemptDetails, files chan *clientTransferFile, url *url.URL) error { // Create the client to walk the filesystem @@ -2449,6 +2522,8 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer if err != nil { return err } + } else if localPath := path.Join(job.job.localPath, localBase, info.Name()); skipDownload(job.job.syncLevel, info, localPath) { + log.Infoln("Skipping download of object", newPath, "as it already exists at", localPath) } else { job.job.activeXfer.Add(1) select { @@ -2464,7 +2539,7 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer engine: te, remoteURL: &url.URL{Path: newPath}, packOption: transfers[0].PackOption, - localPath: path.Join(job.job.localPath, localBase, info.Name()), + localPath: localPath, upload: job.job.upload, token: job.job.token, attempts: transfers, @@ -2477,7 +2552,7 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer return nil } -// Walk a local directory structure, writing all discovered files to the files channel +// Helper function for walkDirUpload; not to be called directly func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []transferAttemptDetails, files chan *clientTransferFile, localPath string) error { if job.job.ctx.Err() != nil { return job.job.ctx.Err() @@ -2497,9 +2572,9 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran if err != nil { return err } + } else if remotePath := path.Join(job.job.remoteURL.Path, strings.TrimPrefix(newPath, job.job.localPath)); skipUpload(job.job, newPath, remotePath) { + log.Infoln("Skipping upload of object", remotePath, "as it already exists at the destination") } else if info.Type().IsRegular() { - // It is a normal file; strip off the path prefix and append the destination prefix - remotePath := path.Join(job.job.remoteURL.Path, strings.TrimPrefix(newPath, job.job.localPath)) job.job.activeXfer.Add(1) select { case <-job.job.ctx.Done(): From 87e53fafd8fbfccdce3a83cf1ad9dd0faefa424a Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 7 Sep 2024 11:49:52 -0500 Subject: [PATCH 4/9] Add `sync` sub-command to the object interface --- cmd/object_sync.go | 215 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 cmd/object_sync.go diff --git a/cmd/object_sync.go b/cmd/object_sync.go new file mode 100644 index 000000000..8b3587807 --- /dev/null +++ b/cmd/object_sync.go @@ -0,0 +1,215 @@ +/*************************************************************** +* +* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package main + +import ( + "net/url" + "os" + "strings" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/error_codes" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/utils" +) + +var ( + syncCmd = &cobra.Command{ + Use: "sync {source ...} {destination}", + Short: "Sync a directory to or from a Pelican federation", + Run: syncMain, + } +) + +func init() { + flagSet := syncCmd.Flags() + flagSet.StringP("cache", "c", "", "Cache to use") + flagSet.StringP("token", "t", "", "Token file to use for transfer") + objectCmd.AddCommand(syncCmd) +} + +func getLastScheme(scheme string) string { + idx := strings.LastIndex(scheme, "+") + if idx == -1 { + return scheme + } + return scheme[idx+1:] +} + +// Returns true if the input is a url-like object that +// pelican can consume. +// +// Schemes we understand are "osdf", "pelican", +// "foo+osdf", or "foo+pelican" where "foo" is some arbitrary +// prefix not containing a "/" +func isPelicanUrl(input string) bool { + prefix, _, found := strings.Cut(input, "://") + if !found { + return false + } + if strings.Contains(prefix, "/") { + return false + } + scheme := getLastScheme(prefix) + if scheme != "pelican" && scheme != "osdf" { + return false + } + if _, err := url.Parse(input); err != nil { + return false + } + return true +} + +func syncMain(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + + err := config.InitClient() + if err != nil { + log.Errorln(err) + + if client.IsRetryable(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } else { + os.Exit(1) + } + } + + tokenLocation, _ := cmd.Flags().GetString("token") + + pb := newProgressBar() + defer pb.shutdown() + + // Check if the program was executed from a terminal + // https://rosettacode.org/wiki/Check_output_device_is_a_terminal#Go + if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode()&os.ModeCharDevice) != 0 && param.Logging_LogLocation.GetString() == "" && !param.Logging_DisableProgressBars.GetBool() { + pb.launchDisplay(ctx) + } + + if len(args) < 2 { + log.Errorln("No source or destination to sync") + err = cmd.Help() + if err != nil { + log.Errorln("Failed to print out help:", err) + } + os.Exit(1) + } + sources := args[:len(args)-1] + dest := args[len(args)-1] + doDownload := false + if isPelicanUrl(dest) { + for _, src := range sources { + if isPelicanUrl(src) { + log.Errorf("URL (%s) cannot be a source when synchronizing to a federation URL", src) + os.Exit(1) + } + } + log.Debugln("Synchronizing to a Pelican data federation") + } else { + if !isPelicanUrl(sources[0]) { + log.Errorln("Either the first or last argument must be a pelican:// or osdf://-style URL specifying a remote destination") + os.Exit(1) + } + for _, src := range sources { + if !isPelicanUrl(src) { + log.Errorln("When synchronizing to a local directory, all sources must be pelican URLs:", src) + os.Exit(1) + } + } + log.Debugln("Synchronizing from a Pelican data federation") + doDownload = true + } + + log.Debugln("Sources:", sources) + log.Debugln("Destination:", dest) + + // Check for manually entered cache to use + var preferredCache string + if nearestCache, ok := os.LookupEnv("NEAREST_CACHE"); ok { + preferredCache = nearestCache + } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { + preferredCache = cache + } + var caches []*url.URL + caches, err = utils.GetPreferredCaches(preferredCache) + if err != nil { + log.Errorln(err) + os.Exit(1) + } + + if doDownload && len(sources) > 1 { + if destStat, err := os.Stat(dest); err != nil { + log.Errorln("Destination does not exist") + os.Exit(1) + } else if !destStat.IsDir() { + log.Errorln("Destination is not a directory") + os.Exit(1) + } + } + + lastSrc := "" + + if doDownload { + for _, src := range sources { + if _, err = client.DoGet(ctx, src, dest, true, + client.WithCallback(pb.callback), client.WithTokenLocation(tokenLocation), + client.WithCaches(caches...), client.WithSynchronize(client.SyncExist)); err != nil { + lastSrc = src + break + } + } + } else { + for _, src := range sources { + if _, err = client.DoPut(ctx, src, dest, true, + client.WithCallback(pb.callback), client.WithTokenLocation(tokenLocation), + client.WithCaches(caches...), client.WithSynchronize(client.SyncExist)); err != nil { + lastSrc = src + break + } + } + } + + // Exit with failure + if err != nil { + // Print the list of errors + errMsg := err.Error() + var pe error_codes.PelicanError + var te *client.TransferErrors + if errors.As(err, &te) { + errMsg = te.UserError() + } + if errors.Is(err, &pe) { + errMsg = pe.Error() + log.Errorln("Failure getting " + lastSrc + ": " + errMsg) + os.Exit(pe.ExitCode()) + } else { // For now, keeping this else here to catch any errors that are not classified PelicanErrors + log.Errorln("Failure getting " + lastSrc + ": " + errMsg) + if client.ShouldRetry(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + } +} From 125fee72e7852e17ca80c6c1903afa3e222bbb42 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 7 Sep 2024 11:35:25 -0500 Subject: [PATCH 5/9] Fix race condition on recursive transfers If a recursive transfer has more than one resulting object movement-- and the transfers all finish before the directory walk has completed -- then the overall transfer job was never be marked as completed. This adds a check for job completion after the end of the directory walk completion. It also refactors the active transfer cleanup logic into a standalone function to avoid a large copy/paste of code. --- client/handle_http.go | 63 ++++++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index cefafa6e7..01b233562 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -830,6 +830,37 @@ func (te *TransferEngine) Close() { } } +// If we've detected a job is done, clean up the active job state map +func (te *TransferEngine) finishJob(activeJobs *map[uuid.UUID][]*TransferJob, job *TransferJob, id uuid.UUID) { + if len((*activeJobs)[id]) == 1 { + log.Debugln("Job", job.ID(), "is done for client", id.String(), "which has no active jobs remaining") + // Delete the job from the list of active jobs + delete(*activeJobs, id) + func() { + te.clientLock.Lock() + defer te.clientLock.Unlock() + // If the client is closed and there are no remaining + // jobs for that client, we can close the results channel + // for the client -- a clean shutdown of the client. + if te.workMap[id] == nil { + close(te.resultsMap[id]) + log.Debugln("Client", id.String(), "has no more work and is finished shutting down") + } + }() + } else { + // Scan through the list of active jobs, removing the recently + // completed one and saving the updated list. + newJobList := make([]*TransferJob, 0, len((*activeJobs)[id])) + for _, oldJob := range (*activeJobs)[id] { + if oldJob.uuid != job.uuid { + newJobList = append(newJobList, oldJob) + } + } + (*activeJobs)[id] = newJobList + log.Debugln("Job", job.ID(), "is done for client", id.String(), " which has", len(newJobList), "jobs remaining") + } +} + // Launches a helper goroutine that ensures completed // transfer results are routed back to their requesting // channels @@ -960,33 +991,7 @@ func (te *TransferEngine) runMux() error { // Test to see if the transfer job is done (true if job-to-file translation // has completed and there are no remaining active transfers) if job.lookupDone.Load() && job.activeXfer.Load() == 0 { - if len(activeJobs[id]) == 1 { - log.Debugln("Job", job.ID(), "is done for client", id.String(), " which has no active jobs remaining") - // Delete the job from the list of active jobs - delete(activeJobs, id) - func() { - te.clientLock.Lock() - defer te.clientLock.Unlock() - // If the client is closed and there are no remaining - // jobs for that client, we can close the results channel - // for the client -- a clean shutdown of the client. - if te.workMap[id] == nil { - close(te.resultsMap[id]) - log.Debugln("Client", id.String(), "has no more work and is finished shutting down") - } - }() - } else { - // Scan through the list of active jobs, removing the recently - // completed one and saving the updated list. - newJobList := make([]*TransferJob, 0, len(activeJobs[id])) - for _, oldJob := range activeJobs[id] { - if oldJob.uuid != job.uuid { - newJobList = append(newJobList, oldJob) - } - } - activeJobs[id] = newJobList - log.Debugln("Job", job.ID(), "is done for client", id.String(), " which has ", len(newJobList), "jobs remaining") - } + te.finishJob(&activeJobs, job, id) } if len(tmpResults[id]) == 1 { // The last result back to this client has been sent; delete the @@ -1048,6 +1053,10 @@ func (te *TransferEngine) runMux() error { } }() } + } else if job.job.activeXfer.Load() == 0 { + // Transfer jobs were created but they all completed before the recursive directory + // walk finished. + te.finishJob(&activeJobs, job.job, job.uuid) } } else if chosen == len(workMap)+len(resultsMap)+5 { // Notification that the engine should shut down From af71062de667d1dc4543c9349d4d09790ed01de3 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 7 Sep 2024 11:42:26 -0500 Subject: [PATCH 6/9] Add structured logging for downloads This adds structured logging fields to the download function, allowing us to see which URLs and which transfer jobs are associated with a given log line. With the structured information, one can correlate all the messages from a given transfer or job with a simple 'grep'. --- client/handle_http.go | 63 ++++++++++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index 01b233562..339cb2f7e 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -88,10 +88,14 @@ var ( }, ) + stoppedTransferDebugLine sync.Once + PelicanError error_codes.PelicanError ) type ( + logFields string + classAd string cacheItem struct { @@ -640,7 +644,7 @@ func (te *TransferEngine) newPelicanURL(remoteUrl *url.URL) (pelicanURL pelicanU } } else if scheme == "" { // If we don't have a url scheme, then our metadata information should be in the config - log.Debugln("No url scheme detected, getting metadata information from configuration") + log.Debugln("No url scheme detected for", remoteUrl.String(), ", getting metadata information from configuration") if fedInfo, err := config.GetFederation(te.ctx); err == nil { pelicanURL.directorUrl = fedInfo.DirectorEndpoint } else { @@ -1743,13 +1747,18 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er transferEndpointUrl := *transferEndpoint.Url transferEndpointUrl.Path = transfer.remoteURL.Path transferEndpoint.Url = &transferEndpointUrl + fields := log.Fields{ + "url": transferEndpoint.Url.String(), + "job": transfer.job.ID(), + } + ctx := context.WithValue(transfer.ctx, logFields("fields"), fields) transferStartTime = time.Now() // Update start time for this attempt tokenContents := "" if transfer.token != nil { tokenContents, _ = transfer.token.get() } attemptDownloaded, timeToFirstByte, cacheAge, serverVersion, err := downloadHTTP( - transfer.ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, tokenContents, transfer.project, + ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, tokenContents, transfer.project, ) endTime := time.Now() if cacheAge >= 0 { @@ -1763,7 +1772,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er downloaded += attemptDownloaded if err != nil { - log.Debugln("Failed to download from", transferEndpoint.Url, ":", err) + log.WithFields(fields).Debugln("Failed to download from", transferEndpoint.Url, ":", err) var ope *net.OpError var cse *ConnectionSetupError proxyStr, _ := os.LookupEnv("http_proxy") @@ -1802,7 +1811,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er transferResults.Attempts = append(transferResults.Attempts, attempt) if err == nil { // Success - log.Debugln("Downloaded bytes:", downloaded) + log.WithFields(fields).Debugln("Downloaded bytes:", downloaded) success = true break } @@ -1833,9 +1842,13 @@ func parseTransferStatus(status string) (int, string) { // // Returns the downloaded size, time to 1st byte downloaded, serverVersion and an error if there is one func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCallbackFunc, transfer transferAttemptDetails, dest string, totalSize int64, token string, project string) (downloaded int64, timeToFirstByte time.Duration, cacheAge time.Duration, serverVersion string, err error) { + fields, ok := ctx.Value(logFields("fields")).(log.Fields) + if !ok { + fields = log.Fields{} + } defer func() { if r := recover(); r != nil { - log.Errorln("Panic occurred in downloadHTTP:", r) + log.WithFields(fields).Errorln("Panic occurred in downloadHTTP:", r) ret := fmt.Sprintf("Unrecoverable error (panic) occurred in downloadHTTP: %v", r) err = errors.New(ret) } @@ -1895,8 +1908,8 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall ctx, cancel := context.WithCancel(ctx) defer cancel() - log.Debugln("Attempting to download from:", transferUrl.Host) - log.Debugln("Transfer URL String:", transferUrl.String()) + log.WithFields(fields).Debugln("Attempting to download from:", transferUrl.Host) + log.WithFields(fields).Debugln("Transfer URL String:", transferUrl.String()) var req *grab.Request var unpacker *autoUnpacker if transfer.PackOption != "" { @@ -1946,7 +1959,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall downloadLimit := param.Client_MinimumDownloadSpeed.GetInt() // Start the transfer - log.Debugln("Starting the HTTP transfer...") + log.WithFields(fields).Debugln("Starting the HTTP transfer...") downloadStart := time.Now() resp := client.Do(req) // Check the error real quick @@ -1957,7 +1970,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall if errors.Is(err, grab.ErrBadLength) { err = fmt.Errorf("local copy of file is larger than remote copy %w", grab.ErrBadLength) } else if errors.As(err, &sce) { - log.Debugln("Creating a client status code error") + log.WithFields(fields).Debugln("Creating a client status code error") sce2 := StatusCodeError(sce) err = &sce2 } else if errors.As(err, &cam) && cam == syscall.ENOMEM { @@ -1966,7 +1979,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall } else { err = &ConnectionSetupError{Err: err} } - log.Errorln("Failed to download:", err) + log.WithFields(fields).Errorln("Failed to download:", err) return } } @@ -1976,13 +1989,13 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall if ageSec, err := strconv.Atoi(ageStr); err == nil { cacheAge = time.Duration(ageSec) * time.Second } else { - log.Debugf("Server at %s gave unparseable Age header (%s) in response: %s", transfer.Url.Host, ageStr, err.Error()) + log.WithFields(fields).Debugf("Server at %s gave unparseable Age header (%s) in response: %s", transfer.Url.Host, ageStr, err.Error()) } } if cacheAge == 0 { - log.Debugln("Server at", transfer.Url.Host, "had a cache miss") + log.WithFields(fields).Debugln("Server at", transfer.Url.Host, "had a cache miss") } else if cacheAge > 0 { - log.Debugln("Server at", transfer.Url.Host, "had a cache hit with data age", cacheAge.String()) + log.WithFields(fields).Debugln("Server at", transfer.Url.Host, "had a cache hit with data age", cacheAge.String()) } // Size of the download @@ -1997,7 +2010,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall var headResponse *http.Response headResponse, err = headClient.Do(headRequest) if err != nil { - log.Errorln("Could not successfully get response for HEAD request") + log.WithFields(fields).Errorln("Could not successfully get response for HEAD request") err = errors.Wrap(err, "Could not determine the size of the remote object") return } @@ -2006,7 +2019,7 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall if contentLengthStr != "" { totalSize, err = strconv.ParseInt(contentLengthStr, 10, 64) if err != nil { - log.Errorln("problem converting content-length to an int:", err) + log.WithFields(fields).Errorln("problem converting content-length to an int:", err) totalSize = 0 } } @@ -2018,8 +2031,10 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall stoppedTransferTimeout := compatToDuration(param.Client_StoppedTransferTimeout.GetDuration(), "Client.StoppedTranferTimeout") slowTransferRampupTime := compatToDuration(param.Client_SlowTransferRampupTime.GetDuration(), "Client.SlowTransferRampupTime") slowTransferWindow := compatToDuration(param.Client_SlowTransferWindow.GetDuration(), "Client.SlowTransferWindow") - log.Debugf("Stopped transfer timeout is %s; slow transfer ramp-up is %s; slow transfer look-back window is %s", - stoppedTransferTimeout.String(), slowTransferRampupTime.String(), slowTransferWindow.String()) + stoppedTransferDebugLine.Do(func() { + log.WithFields(fields).Debugf("Stopped transfer timeout is %s; slow transfer ramp-up is %s; slow transfer look-back window is %s", + stoppedTransferTimeout.String(), slowTransferRampupTime.String(), slowTransferWindow.String()) + }) startBelowLimit := time.Time{} var noProgressStartTime time.Time var lastBytesComplete int64 @@ -2056,7 +2071,7 @@ Loop: StoppedTime: time.Since(noProgressStartTime), CacheHit: cacheAge > 0, } - log.Errorln(err.Error()) + log.WithFields(fields).Errorln(err.Error()) return } } else { @@ -2081,7 +2096,7 @@ Loop: warning := []byte("Warning! Downloading too slow...\n") status, err := getProgressContainer().Write(warning) if err != nil { - log.Errorln("Problem displaying slow message", err, status) + log.WithFields(fields).Errorln("Problem displaying slow message", err, status) continue } startBelowLimit = time.Now() @@ -2093,7 +2108,7 @@ Loop: // The download is below the threshold for more than `SlowTransferWindow` seconds, cancel the download cancel() - log.Errorf("Cancelled: Download speed of %s/s is below the limit of %s/s", ByteCountSI(int64(resp.BytesPerSecond())), ByteCountSI(int64(downloadLimit))) + log.WithFields(fields).Errorf("Cancelled: Download speed of %s/s is below the limit of %s/s", ByteCountSI(int64(resp.BytesPerSecond())), ByteCountSI(int64(downloadLimit))) err = &SlowTransferError{ BytesTransferred: resp.BytesComplete(), @@ -2124,7 +2139,7 @@ Loop: err = &ConnectionSetupError{URL: resp.Request.URL().String()} return } - log.Debugln("Got error from HTTP download", err) + log.WithFields(fields).Debugln("Got error from HTTP download", err) return } else { // Check the trailers for any error information @@ -2132,7 +2147,7 @@ Loop: if errorStatus := trailer.Get("X-Transfer-Status"); errorStatus != "" { statusCode, statusText := parseTransferStatus(errorStatus) if statusCode != 200 { - log.Debugln("Got error from file transfer") + log.WithFields(fields).Debugln("Got error from file transfer") err = errors.New("transfer error: " + statusText) return } @@ -2141,7 +2156,7 @@ Loop: // Valid responses include 200 and 206. The latter occurs if the download was resumed after a // prior attempt. if resp.HTTPResponse.StatusCode != 200 && resp.HTTPResponse.StatusCode != 206 { - log.Debugln("Got failure status code:", resp.HTTPResponse.StatusCode) + log.WithFields(fields).Debugln("Got failure status code:", resp.HTTPResponse.StatusCode) return 0, 0, -1, serverVersion, &HttpErrResp{resp.HTTPResponse.StatusCode, fmt.Sprintf("Request failed (HTTP status %d): %s", resp.HTTPResponse.StatusCode, resp.Err().Error())} } @@ -2153,7 +2168,7 @@ Loop: } } - log.Debugln("HTTP Transfer was successful") + log.WithFields(fields).Debugln("HTTP Transfer was successful") return } From d7c53da027639e564bf2d7379f390e74a4e2ebd8 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sat, 7 Sep 2024 11:46:51 -0500 Subject: [PATCH 7/9] Do not consider a zero-sized object a failure This code previously considered a zero-sized object a stat failure as the webdav API doesn't distinguish between "server didn't send the information" and "zero sized object". Between the two, let's assume a competent server and not consider the zero-sized object a failure. --- client/handle_http.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index 339cb2f7e..6481a417e 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -2764,15 +2764,6 @@ func statHttp(dest *url.URL, dirResp server_structs.DirectorResponse, token *tok return } - if info.Size == 0 { - if info.IsCollection { - resultsChan <- statResults{info, nil} - } - err = errors.New("Stat response did not include a size") - resultsChan <- statResults{FileInfo{}, err} - return - } - resultsChan <- statResults{FileInfo{ Name: endpoint.Path, Size: info.Size, From 56368db3f2a8600f0f6c5988bdd93979a6d225c7 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Tue, 17 Sep 2024 14:44:02 +0000 Subject: [PATCH 8/9] Fixed merge/linter errors from rebase --- client/handle_http.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/handle_http.go b/client/handle_http.go index 6481a417e..6713034ee 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -1140,7 +1140,6 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL callback: tc.callback, skipAcquire: tc.skipAcquire, syncLevel: tc.syncLevel, - tokenLocation: tc.tokenLocation, upload: upload, uuid: id, project: project, @@ -1200,8 +1199,8 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL } // The director response may change if it's given a token; let's repeat the query. - if tj.token != "" { - dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery, tj.token) + if contents != "" { + dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery, contents) } } From c94fc33aed5deff1b3f247a58da033625f10e630 Mon Sep 17 00:00:00 2001 From: Emma Turetsky Date: Tue, 17 Sep 2024 15:25:39 +0000 Subject: [PATCH 9/9] Added error check and transfer job reassignment after repeat of dir query --- client/handle_http.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/handle_http.go b/client/handle_http.go index 6713034ee..238f6d80b 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -1201,6 +1201,13 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL // The director response may change if it's given a token; let's repeat the query. if contents != "" { dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, upload, remoteUrl.RawQuery, contents) + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return nil, err + } + tj.dirResp = dirResp + tj.token.DirResp = &dirResp } }