diff --git a/client/acquire_token.go b/client/acquire_token.go index 9183658e7..6052ac34b 100644 --- a/client/acquire_token.go +++ b/client/acquire_token.go @@ -118,6 +118,17 @@ func (tg *tokenGenerator) SetToken(contents string) { tg.Token.Store(&info) } +// Copy the contents +func (tg *tokenGenerator) Copy() *tokenGenerator { + return &tokenGenerator{ + DirResp: tg.DirResp, + Destination: tg.Destination, + IsWrite: tg.IsWrite, + EnableAcquire: tg.EnableAcquire, + Sync: new(singleflight.Group), + } +} + // Determine the token name if it is embedded in the scheme, Condor-style func getTokenName(destination *url.URL) (scheme, tokenName string) { schemePieces := strings.Split(destination.Scheme, "+") diff --git a/client/fed_test.go b/client/fed_test.go index 93e2176f4..c16feff70 100644 --- a/client/fed_test.go +++ b/client/fed_test.go @@ -962,3 +962,81 @@ func TestClientUnpack(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(11), fi.Size()) } + +// A test that spins up a federation, and tests object get and put +func TestPrestage(t *testing.T) { + viper.Reset() + server_utils.ResetOriginExports() + fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg) + + te, err := client.NewTransferEngine(fed.Ctx) + require.NoError(t, err) + + // Other set-up items: + // The cache will open the file to stat it, downloading the first block. + // Make sure we are greater than 64kb in size. + testFileContent := strings.Repeat("test file content", 10000) + // Create the temporary file to upload + tempFile, err := os.CreateTemp(t.TempDir(), "test") + assert.NoError(t, err, "Error creating temp file") + defer os.Remove(tempFile.Name()) + _, err = tempFile.WriteString(testFileContent) + assert.NoError(t, err, "Error writing to temp file") + tempFile.Close() + + tempToken, _ := getTempToken(t) + defer tempToken.Close() + defer os.Remove(tempToken.Name()) + // Disable progress bars to not reuse the same mpb instance + viper.Set("Logging.DisableProgressBars", true) + + oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix) + assert.NoError(t, err) + defer func() { + _, err := config.SetPreferredPrefix(oldPref) + require.NoError(t, err) + }() + + // Set path for object to upload/download + for _, export := range fed.Exports { + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), + export.FederationPrefix, fileName) + + // Upload the file with COPY + transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes) + + // Check the cache info twice, make sure it's not cached. + tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name())) + require.NoError(t, err) + innerFileUrl, err := url.Parse(uploadURL) + require.NoError(t, err) + age, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + assert.Equal(t, -1, age) + + age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + assert.Equal(t, -1, age) + + // Prestage the object + tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl) + require.NoError(t, err) + err = tc.Submit(tj) + require.NoError(t, err) + results, err := tc.Shutdown() + require.NoError(t, err) + assert.Equal(t, 1, len(results)) + + // Check if object is cached. + age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + require.NotEqual(t, -1, age) + } +} diff --git a/client/handle_http.go b/client/handle_http.go index 238f6d80b..9fc4325d9 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -103,6 +103,8 @@ type ( err error } + transferType int + // Error type for when the transfer started to return data then completely stopped StoppedTransferError struct { BytesTransferred int64 @@ -213,7 +215,7 @@ type ( remoteURL *url.URL localPath string token *tokenGenerator - upload bool + xferType transferType packOption string attempts []transferAttemptDetails project string @@ -237,7 +239,7 @@ type ( activeXfer atomic.Int64 totalXfer int localPath string - upload bool + xferType transferType recursive bool skipAcquire bool syncLevel SyncLevel // Policy for handling synchronization when the destination exists @@ -308,13 +310,14 @@ type ( setupResults sync.Once } - TransferOption = option.Interface - identTransferOptionCaches struct{} - identTransferOptionCallback struct{} - identTransferOptionTokenLocation struct{} - identTransferOptionAcquireToken struct{} - identTransferOptionToken struct{} - identTransferOptionSynchronize struct{} + TransferOption = option.Interface + identTransferOptionCaches struct{} + identTransferOptionCallback struct{} + identTransferOptionTokenLocation struct{} + identTransferOptionAcquireToken struct{} + identTransferOptionToken struct{} + identTransferOptionSynchronize struct{} + identTransferOptionCollectionsUrl struct{} transferDetailsOptions struct { NeedsToken bool @@ -335,8 +338,24 @@ const ( SyncNone = iota // When synchronizing, always re-transfer, regardless of existence at destination. SyncExist // Skip synchronization transfer if the destination exists SyncSize // Skip synchronization transfer if the destination exists and matches the current source size + + transferTypeDownload transferType = iota // Transfer is downloading from the federation + transferTypeUpload // Transfer is uploading to the federation + transferTypePrestage // Transfer is staging at a federation cache ) +// Function for merging two contexts together into one (returning a cancel) +func mergeCancel(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { + newCtx, cancel := context.WithCancel(ctx1) + stop := context.AfterFunc(ctx2, func() { + cancel() + }) + return newCtx, func() { + stop() + cancel() + } +} + // The progress container object creates several // background goroutines. Instead of creating the object // globally, create it on first use. This avoids having @@ -724,6 +743,11 @@ func WithCallback(callback TransferCallbackFunc) TransferOption { return option.New(identTransferOptionCallback{}, callback) } +// Override collections URL to be used by the TransferClient +func WithCollectionsUrl(url string) TransferOption { + return option.New(identTransferOptionCollectionsUrl{}, url) +} + // Create an option to override the cache list func WithCaches(caches ...*url.URL) TransferOption { return option.New(identTransferOptionCaches{}, caches) @@ -1140,11 +1164,14 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL callback: tc.callback, skipAcquire: tc.skipAcquire, syncLevel: tc.syncLevel, - upload: upload, + xferType: transferTypeDownload, uuid: id, project: project, token: newTokenGenerator(©Url, nil, upload, !tc.skipAcquire), } + if upload { + tj.xferType = transferTypeUpload + } if tc.token != "" { tj.token.SetToken(tc.token) } @@ -1152,17 +1179,6 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL tj.token.SetTokenLocation(tc.tokenLocation) } - mergeCancel := func(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { - newCtx, cancel := context.WithCancel(ctx1) - stop := context.AfterFunc(ctx2, func() { - cancel() - }) - return newCtx, func() { - stop() - cancel() - } - } - tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx) for _, option := range options { @@ -1224,6 +1240,98 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL return } +// Create a new prestage job for the client +// +// The returned object can be further customized as desired. +// This function does not "submit" the job for execution. +func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error) { + + id, err := uuid.NewV7() + if err != nil { + return + } + + // See if we have a projectName defined + project := searchJobAd(projectName) + + pelicanURL, err := tc.engine.newPelicanURL(remoteUrl) + if err != nil { + err = errors.Wrap(err, "error generating metadata for specified url") + return + } + + copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues. + tj = &TransferJob{ + prefObjServers: tc.prefObjServers, + remoteURL: ©Url, + callback: tc.callback, + skipAcquire: tc.skipAcquire, + syncLevel: tc.syncLevel, + xferType: transferTypePrestage, + uuid: id, + project: project, + token: newTokenGenerator(©Url, nil, false, !tc.skipAcquire), + } + if tc.token != "" { + tj.token.SetToken(tc.token) + } + if tc.tokenLocation != "" { + tj.token.SetTokenLocation(tc.tokenLocation) + } + + tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx) + + for _, option := range options { + switch option.Ident() { + case identTransferOptionCaches{}: + tj.prefObjServers = option.Value().([]*url.URL) + case identTransferOptionCallback{}: + tj.callback = option.Value().(TransferCallbackFunc) + case identTransferOptionTokenLocation{}: + tj.token.SetTokenLocation(option.Value().(string)) + case identTransferOptionAcquireToken{}: + tj.token.EnableAcquire = option.Value().(bool) + case identTransferOptionToken{}: + tj.token.SetToken(option.Value().(string)) + case identTransferOptionSynchronize{}: + tj.syncLevel = option.Value().(SyncLevel) + } + } + + tj.directorUrl = pelicanURL.directorUrl + dirResp, err := GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, "") + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + tj.dirResp = dirResp + tj.token.DirResp = &dirResp + + log.Debugln("Dir resp:", dirResp.XPelNsHdr) + if dirResp.XPelNsHdr.RequireToken { + contents, err := tj.token.get() + if err != nil || contents == "" { + return nil, errors.Wrap(err, "failed to get token for transfer") + } + + // The director response may change if it's given a token; let's repeat the query. + if contents != "" { + dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, contents) + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return nil, err + } + tj.dirResp = dirResp + tj.token.DirResp = &dirResp + } + } + + log.Debugf("Created new prestage job, ID %s client %s, for URL %s", tj.uuid.String(), tc.id.String(), remoteUrl.String()) + return +} + // Returns the status of the transfer job-to-file(s) lookup // // ok is true if the lookup has completed. @@ -1238,7 +1346,7 @@ func (tj *TransferJob) GetLookupStatus() (ok bool, err error) { // Submit the transfer job to the client for processing func (tc *TransferClient) Submit(tj *TransferJob) error { // Ensure that a tj.Wait() immediately after Submit will always block. - log.Debugln("Submiting transfer job", tj.uuid.String()) + log.Debugln("Submitting transfer job", tj.uuid.String()) select { case <-tc.ctx.Done(): return tc.ctx.Err() @@ -1247,6 +1355,87 @@ func (tc *TransferClient) Submit(tj *TransferJob) error { } } +// Check the transfer client +func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error) { + age = -1 + + pelicanURL, err := tc.engine.newPelicanURL(remoteUrl) + if err != nil { + err = errors.Wrap(err, "error generating metadata for specified URL") + return + } + + var prefObjServers []*url.URL + token := newTokenGenerator(remoteUrl, nil, false, true) + if tc.token != "" { + token.SetToken(tc.token) + } + if tc.tokenLocation != "" { + token.SetTokenLocation(tc.tokenLocation) + } + if tc.skipAcquire { + token.EnableAcquire = !tc.skipAcquire + } + for _, option := range options { + switch option.Ident() { + case identTransferOptionCaches{}: + prefObjServers = option.Value().([]*url.URL) + case identTransferOptionTokenLocation{}: + token.SetTokenLocation(option.Value().(string)) + case identTransferOptionAcquireToken{}: + token.EnableAcquire = option.Value().(bool) + case identTransferOptionToken{}: + token.SetToken(option.Value().(string)) + } + } + + ctx, cancel := mergeCancel(tc.ctx, ctx) + defer cancel() + + dirResp, err := GetDirectorInfoForPath(ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, "") + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + token.DirResp = &dirResp + + if dirResp.XPelNsHdr.RequireToken { + var contents string + contents, err = token.get() + if err != nil || contents == "" { + err = errors.Wrap(err, "failed to get token for cache info query") + return + } + + // The director response may change if it's given a token; let's repeat the query. + if contents != "" { + dirResp, err = GetDirectorInfoForPath(ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, contents) + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + token.DirResp = &dirResp + } + } + + var sortedServers []*url.URL + sortedServers, err = generateSortedObjServers(dirResp, prefObjServers) + if err != nil { + log.Errorln("Failed to get namespace caches (treated as non-fatal):", err) + return + } + if len(sortedServers) == 0 { + err = errors.New("No available cache servers detected") + return + } + cacheUrl := *sortedServers[0] + cacheUrl.Path = remoteUrl.Path + + return objectCached(ctx, &cacheUrl, token, config.GetTransport()) +} + // Close the transfer client object // // Any subsequent job submissions will cause a panic @@ -1436,7 +1625,7 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error remoteUrl := &url.URL{Path: job.job.remoteURL.Path, Scheme: job.job.remoteURL.Scheme} var transfers []transferAttemptDetails - if job.job.upload { // Uploads use the redirected endpoint directly + if job.job.xferType == transferTypeUpload { // Uploads use the redirected endpoint directly if len(job.job.dirResp.ObjectServers) == 0 { err = errors.New("No origins found for upload") return @@ -1473,11 +1662,22 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error } if job.job.recursive { - if job.job.upload { + if job.job.xferType == transferTypeUpload { return te.walkDirUpload(job, transfers, te.files, job.job.localPath) } else { return te.walkDirDownload(job, transfers, te.files, remoteUrl) } + } else if job.job.xferType == transferTypePrestage { + // For prestage, from day one we handle internally whether it's recursive + // (as opposed to making the user specify explicitly) + var statInfo FileInfo + if statInfo, err = statHttp(remoteUrl, job.job.dirResp, job.job.token); err != nil { + err = errors.Wrap(err, "failed to stat object to prestage") + return + } + if statInfo.IsCollection { + return te.walkDirDownload(job, transfers, te.files, remoteUrl) + } } job.job.totalXfer += 1 @@ -1493,7 +1693,7 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error remoteURL: remoteUrl, packOption: packOption, localPath: job.job.localPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, project: job.job.project, @@ -1545,7 +1745,7 @@ func runTransferWorker(ctx context.Context, workChan <-chan *clientTransferFile, } var err error var transferResults TransferResults - if file.file.upload { + if file.file.xferType == transferTypeUpload { transferResults, err = uploadObject(file.file) } else { transferResults, err = downloadObject(file.file) @@ -1604,56 +1804,12 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe return } - headClient := &http.Client{Transport: transport} - // Note we are not using a HEAD request here but a GET request for one byte; - // this is because the XRootD server currently (v5.6.9) only returns the Age - // header for GETs - headRequest, _ := http.NewRequestWithContext(ctx, http.MethodGet, tUrl.String(), nil) - headRequest.Header.Set("Range", "0-0") - if token != nil { - if tokenContents, err := token.get(); err == nil && tokenContents != "" { - headRequest.Header.Set("Authorization", "Bearer "+tokenContents) - } - } - var headResponse *http.Response - headResponse, err := headClient.Do(headRequest) - if err != nil { + if age, size, err := objectCached(ctx, tUrl, token, transport); err != nil { headChan <- checkResults{idx, 0, -1, err} return - } - // Allow response body to fail to read; we are only interested in the headers - // of the response, not the contents. - if _, err := io.ReadAll(headResponse.Body); err != nil { - log.Warningln("Failure when reading the one-byte-response body:", err) - } - headResponse.Body.Close() - var age int = -1 - var size int64 = 0 - if headResponse.StatusCode <= 300 { - contentLengthStr := headResponse.Header.Get("Content-Length") - if contentLengthStr != "" { - size, err = strconv.ParseInt(contentLengthStr, 10, 64) - if err != nil { - err = errors.Wrap(err, "problem converting Content-Length in response to an int") - log.Errorln(err.Error()) - - } - } - ageStr := headResponse.Header.Get("Age") - if ageStr != "" { - if ageParsed, err := strconv.Atoi(ageStr); err != nil { - log.Warningf("Ignoring invalid age value (%s) due to parsing error: %s", headRequest.Header.Get("Age"), err.Error()) - } else { - age = ageParsed - } - } } else { - err = &HttpErrResp{ - Code: headResponse.StatusCode, - Err: fmt.Sprintf("GET \"%s\" resulted in status code %d", tUrl, headResponse.StatusCode), - } + headChan <- checkResults{idx, uint64(size), age, err} } - headChan <- checkResults{idx, uint64(size), age, err} }(idx, &tUrl) } // 1 -> success. @@ -1725,11 +1881,16 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe // create the destination directory). func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) { log.Debugln("Downloading object from", transfer.remoteURL, "to", transfer.localPath) - // Remove the source from the file path - directory := path.Dir(transfer.localPath) + var downloaded int64 - if err = os.MkdirAll(directory, 0700); err != nil { - return + localPath := transfer.localPath + if transfer.xferType == transferTypeDownload { + directory := path.Dir(localPath) + if err = os.MkdirAll(directory, 0700); err != nil { + return + } + } else { + localPath = "/dev/null" } size, attempts := sortAttempts(transfer.job.ctx, transfer.remoteURL.Path, transfer.attempts, transfer.token) @@ -1764,7 +1925,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er tokenContents, _ = transfer.token.get() } attemptDownloaded, timeToFirstByte, cacheAge, serverVersion, err := downloadHTTP( - ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, tokenContents, transfer.project, + ctx, transfer.engine, transfer.callback, transferEndpoint, localPath, size, tokenContents, transfer.project, ) endTime := time.Now() if cacheAge >= 0 { @@ -1933,6 +2094,10 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall if req, err = grab.NewRequestToWriter(unpacker, transferUrl.String()); err != nil { return 0, 0, -1, "", errors.Wrap(err, "Failed to create new download request") } + } else if dest == "/dev/null" { + if req, err = grab.NewRequestToWriter(io.Discard, transferUrl.String()); err != nil { + return 0, 0, -1, "", errors.Wrap(err, "Failed to create new prestage request") + } } else if req, err = grab.NewRequest(dest, transferUrl.String()); err != nil { return 0, 0, -1, "", errors.Wrap(err, "Failed to create new download request") } @@ -2472,6 +2637,26 @@ func createWebDavClient(collectionsUrl *url.URL, token *tokenGenerator, project return } +// Determine whether to skip a prestage based on whether an object is at a cache +func skipPrestage(object string, job *TransferJob) bool { + var cache url.URL + if len(job.dirResp.ObjectServers) > 0 { + cache = *job.dirResp.ObjectServers[0] + } else if len(job.prefObjServers) > 0 { + cache = *job.prefObjServers[0] + } else { + log.Errorln("Cannot skip prestage if no cache is specified!") + } + + cache.Path = object + if age, _, err := objectCached(job.ctx, &cache, job.token, config.GetTransport()); err == nil { + return age >= 0 + } else { + log.Warningln("Failed to check cache status of object", cache.String(), "so assuming it needs prestaging:", err) + return false + } +} + // Depending on the synchronization policy, decide if a object download should be skipped func skipDownload(syncLevel SyncLevel, remoteInfo fs.FileInfo, localPath string) bool { if syncLevel == SyncNone { @@ -2552,7 +2737,9 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer if err != nil { return err } - } else if localPath := path.Join(job.job.localPath, localBase, info.Name()); skipDownload(job.job.syncLevel, info, localPath) { + } else if job.job.xferType == transferTypePrestage && skipPrestage(newPath, job.job) { + log.Infoln("Skipping prestage of object", newPath, "as it already is at the cache") + } else if localPath := path.Join(job.job.localPath, localBase, info.Name()); job.job.xferType == transferTypeDownload && skipDownload(job.job.syncLevel, info, localPath) { log.Infoln("Skipping download of object", newPath, "as it already exists at", localPath) } else { job.job.activeXfer.Add(1) @@ -2570,7 +2757,7 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer remoteURL: &url.URL{Path: newPath}, packOption: transfers[0].PackOption, localPath: localPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, }, @@ -2620,7 +2807,7 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran remoteURL: &url.URL{Path: remotePath}, packOption: transfers[0].PackOption, localPath: newPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, }, @@ -2636,6 +2823,10 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran func listHttp(remoteObjectUrl *url.URL, dirResp server_structs.DirectorResponse, token *tokenGenerator) (fileInfos []FileInfo, err error) { // Get our collection listing host collectionsUrl := dirResp.XPelNsHdr.CollectionsUrl + if collectionsUrl == nil { + err = errors.New("namespace does not provide a collections URL for listing") + return + } log.Debugln("Collections URL: ", collectionsUrl.String()) project := searchJobAd(projectName) @@ -2797,6 +2988,103 @@ func statHttp(dest *url.URL, dirResp server_structs.DirectorResponse, token *tok return } +// Check if a given URL is present at the first cache in the director response +// +// Note that xrootd returns an `Age` header for GETs but only a `Content-Length` +// header for HEADs. If `Content-Range` is found, we will use that header; if not, +// we will issue two commands. +func objectCached(ctx context.Context, objectUrl *url.URL, token *tokenGenerator, transport http.RoundTripper) (age int, size int64, err error) { + + age = -1 + + headClient := &http.Client{Transport: transport} + headRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, objectUrl.String(), nil) + if err != nil { + return + } + headRequest.Header.Set("Range", "0-0") + if token != nil { + if tokenContents, err := token.get(); err == nil && tokenContents != "" { + headRequest.Header.Set("Authorization", "Bearer "+tokenContents) + } + } + var headResponse *http.Response + headResponse, err = headClient.Do(headRequest) + if err != nil { + return + } + // Allow response body to fail to read; we are only interested in the headers + // of the response, not the contents. + if _, err := io.Copy(io.Discard, headResponse.Body); err != nil { + log.Warningln("Failure when reading the one-byte-response body:", err) + } + headResponse.Body.Close() + gotContentRange := false + if headResponse.StatusCode <= 300 { + if contentRangeStr := headResponse.Header.Get("Content-Range"); contentRangeStr != "" { + if after, found := strings.CutPrefix(contentRangeStr, "bytes 0-0/"); found { + if afterParsed, err := strconv.Atoi(after); err == nil { + size = int64(afterParsed) + gotContentRange = true + } else { + log.Warningf("Ignoring invalid content range value (%s) due to parsing error: %s", after, err.Error()) + } + } else { + log.Debugln("Unexpected value found in Content-Range header:", contentRangeStr) + } + } + ageStr := headResponse.Header.Get("Age") + if ageStr != "" { + if ageParsed, err := strconv.Atoi(ageStr); err != nil { + log.Warningf("Ignoring invalid age value (%s) due to parsing error: %s", headRequest.Header.Get("Age"), err.Error()) + } else { + age = ageParsed + } + } + } else { + err = &HttpErrResp{ + Code: headResponse.StatusCode, + Err: fmt.Sprintf("GET \"%s\" resulted in status code %d", objectUrl, headResponse.StatusCode), + } + } + // Early return -- all the info we wanted was in the GET response. + if gotContentRange { + return + } + + headRequest, err = http.NewRequestWithContext(ctx, http.MethodHead, objectUrl.String(), nil) + if err != nil { + return + } + if token != nil { + if tokenContents, err := token.get(); err == nil && tokenContents != "" { + headRequest.Header.Set("Authorization", "Bearer "+tokenContents) + } + } + + headResponse, err = headClient.Do(headRequest) + if err != nil { + return + } + if headResponse.StatusCode <= 300 { + contentLengthStr := headResponse.Header.Get("Content-Length") + if contentLengthStr != "" { + size, err = strconv.ParseInt(contentLengthStr, 10, 64) + if err != nil { + err = errors.Wrap(err, "problem converting Content-Length in response to an int") + log.Errorln(err.Error()) + + } + } + } else { + err = &HttpErrResp{ + Code: headResponse.StatusCode, + Err: fmt.Sprintf("HEAD \"%s\" resulted in status code %d", objectUrl, headResponse.StatusCode), + } + } + return +} + // This function searches the condor job ad for a specific classad and returns the value of that classad func searchJobAd(classad classAd) string { diff --git a/client/main.go b/client/main.go index 71615756a..c3c8b920e 100644 --- a/client/main.go +++ b/client/main.go @@ -125,6 +125,49 @@ func DoStat(ctx context.Context, destination string, options ...TransferOption) } } +// Check the cache information of a remote cache +func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error) { + + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to do cache info:", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) while check file size: %v", r) + err = errors.New(ret) + return + } + }() + + destUri, err := url.Parse(destination) + if err != nil { + log.Errorln("Failed to parse destination URL") + return + } + + // Check if we understand the found url scheme + err = schemeUnderstood(destUri.Scheme) + if err != nil { + return + } + + te, err := NewTransferEngine(ctx) + if err != nil { + return + } + + defer func() { + if err := te.Shutdown(); err != nil { + log.Errorln("Failure when shutting down transfer engine:", err) + } + }() + + tc, err := te.NewClient(options...) + if err != nil { + return + } + return tc.CacheInfo(ctx, destUri) +} + func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error) { fedInfo, err := config.GetFederation(ctx) if err != nil { @@ -282,6 +325,7 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) // Get our token if needed token := newTokenGenerator(remoteObjectUrl, &dirResp, false, true) + collectionsOverride := "" for _, option := range options { switch option.Ident() { case identTransferOptionTokenLocation{}: @@ -290,6 +334,8 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) token.EnableAcquire = option.Value().(bool) case identTransferOptionToken{}: token.SetToken(option.Value().(string)) + case identTransferOptionCollectionsUrl{}: + collectionsOverride = option.Value().(string) } } @@ -299,6 +345,13 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) return nil, errors.Wrap(err, "failed to get token for transfer") } } + if collectionsOverride != "" { + collectionsOverrideUrl, err := url.Parse(collectionsOverride) + if err != nil { + return nil, errors.Wrap(err, "unable to parse collections URL override") + } + dirResp.XPelNsHdr.CollectionsUrl = collectionsOverrideUrl + } fileInfos, err = listHttp(remoteObjectUrl, dirResp, token) if err != nil { diff --git a/client/prestage.go b/client/prestage.go new file mode 100644 index 000000000..39a5d07f3 --- /dev/null +++ b/client/prestage.go @@ -0,0 +1,140 @@ +/*************************************************************** + * + * Copyright (C) 2024, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package client + +import ( + "context" + "fmt" + "net/url" + "runtime/debug" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/pelicanplatform/pelican/utils" +) + +// Single-shot call to prestage a single prefix +func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error) { + // First, create a handler for any panics that occur + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to perform prestage:", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) captured in prestage: %v", r) + err = errors.New(ret) + } + }() + + // Parse the source with URL parse + remotePrefix, remotePrefixScheme := correctURLWithUnderscore(prefixUrl) + remotePrefixUrl, err := url.Parse(remotePrefix) + if err != nil { + log.Errorln("Failed to parse source URL:", err) + return nil, err + } + + // Check if we have a query and that it is understood + err = utils.CheckValidQuery(remotePrefixUrl) + if err != nil { + return + } + + remotePrefixUrl.Scheme = remotePrefixScheme + + // This is for condor cases: + remotePrefixScheme, _ = getTokenName(remotePrefixUrl) + + // Check if we understand the found url scheme + err = schemeUnderstood(remotePrefixScheme) + if err != nil { + return nil, err + } + + success := false + + te, err := NewTransferEngine(ctx) + if err != nil { + return nil, err + } + + defer func() { + if err := te.Shutdown(); err != nil { + log.Errorln("Failure when shutting down transfer engine:", err) + } + }() + tc, err := te.NewClient(options...) + if err != nil { + return + } + tj, err := tc.NewPrestageJob(context.Background(), remotePrefixUrl) + if err != nil { + return + } + err = tc.Submit(tj) + if err != nil { + return + } + + transferResults, err = tc.Shutdown() + if err == nil { + if tj.lookupErr == nil { + success = true + } else { + err = tj.lookupErr + } + } + var downloaded int64 = 0 + for _, result := range transferResults { + downloaded += result.TransferredBytes + if err == nil && result.Error != nil { + success = false + err = result.Error + } + } + + if success { + // Get the final size of the download file + } else { + log.Error("Prestage failed:", err) + } + + if !success { + // If there's only a single transfer error, remove the wrapping to provide + // a simpler error message. Results in: + // failed download from local-cache: server returned 404 Not Found + // versus: + // failed to download file: transfer error: failed download from local-cache: server returned 404 Not Found + var te *TransferErrors + if errors.As(err, &te) { + if len(te.Unwrap()) == 1 { + var tae *TransferAttemptError + if errors.As(te.Unwrap()[0], &tae) { + return nil, tae + } else { + return nil, errors.Wrap(err, "failed to prestage file") + } + } + return nil, te + } + return nil, errors.Wrap(err, "failed to prestage file") + } else { + return transferResults, err + } +} diff --git a/client/resources/both-auth.yml b/client/resources/both-auth.yml index c23eb843a..4a0edd67b 100644 --- a/client/resources/both-auth.yml +++ b/client/resources/both-auth.yml @@ -1,5 +1,8 @@ # Origin export configuration to test full multi-export capabilities +Cache: + EnablePrefetch: false + Origin: # Things that configure the origin itself StorageType: "posix" diff --git a/cmd/object_ls.go b/cmd/object_ls.go index 003623403..566b17352 100644 --- a/cmd/object_ls.go +++ b/cmd/object_ls.go @@ -45,9 +45,10 @@ var ( func init() { flagSet := lsCmd.Flags() flagSet.StringP("token", "t", "", "Token file to use for transfer") + flagSet.StringP("collections-url", "", "", "URL to use for collection listing, overriding the director's response") flagSet.BoolP("long", "l", false, "Include extended information") - flagSet.BoolP("collectionOnly", "C", false, "List collections only") - flagSet.BoolP("objectonly", "O", false, "List objects only") + flagSet.BoolP("collection-only", "C", false, "List collections only") + flagSet.BoolP("object-only", "O", false, "List objects only") flagSet.BoolP("json", "j", false, "Print results in JSON format") objectCmd.AddCommand(lsCmd) @@ -82,16 +83,17 @@ func listMain(cmd *cobra.Command, args []string) error { log.Debugln("Location:", object) long, _ := cmd.Flags().GetBool("long") - collectionOnly, _ := cmd.Flags().GetBool("collectionOnly") - objectOnly, _ := cmd.Flags().GetBool("objectonly") + collectionOnly, _ := cmd.Flags().GetBool("collection-only") + objectOnly, _ := cmd.Flags().GetBool("object-only") asJSON, _ := cmd.Flags().GetBool("json") + collectionsUrl, _ := cmd.Flags().GetString("collections-url") if collectionOnly && objectOnly { // If a user specifies collectionOnly and objectOnly, this means basic functionality (list both objects and directories) so just remove the flags return errors.New("cannot specify both collectionOnly (-C) and object only (-O) flags, as they are mutually exclusive") } - fileInfos, err := client.DoList(ctx, object, client.WithTokenLocation(tokenLocation)) + fileInfos, err := client.DoList(ctx, object, client.WithTokenLocation(tokenLocation), client.WithCollectionsUrl(collectionsUrl)) // Exit with failure if err != nil { diff --git a/cmd/object_prestage.go b/cmd/object_prestage.go new file mode 100644 index 000000000..e5e016b75 --- /dev/null +++ b/cmd/object_prestage.go @@ -0,0 +1,138 @@ +/*************************************************************** +* +* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package main + +import ( + "net/url" + "os" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/error_codes" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/utils" +) + +var ( + prestageCmd = &cobra.Command{ + Use: "prestage {source ...} {destination}", + Short: "Prestages a prefix to a Pelican cache", + Hidden: true, // Until we decide how safe this approach is, keep the command hidden. + Run: prestageMain, + } +) + +func init() { + flagSet := prestageCmd.Flags() + flagSet.StringP("cache", "c", "", "Cache to use") + flagSet.StringP("token", "t", "", "Token file to use for transfer") + objectCmd.AddCommand(prestageCmd) +} + +func prestageMain(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + + err := config.InitClient() + if err != nil { + log.Errorln(err) + + if client.IsRetryable(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } else { + os.Exit(1) + } + } + + tokenLocation, _ := cmd.Flags().GetString("token") + + pb := newProgressBar() + defer pb.shutdown() + + if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode()&os.ModeCharDevice) != 0 && param.Logging_LogLocation.GetString() == "" && !param.Logging_DisableProgressBars.GetBool() { + pb.launchDisplay(ctx) + } + + if len(args) < 1 { + log.Errorln("Prefix(es) to prestage must be specified") + err = cmd.Help() + if err != nil { + log.Errorln("Failed to print out help:", err) + } + os.Exit(1) + } + + log.Debugln("Prestage prefixes:", args) + + // Check for manually entered cache to use + var preferredCache string + if nearestCache, ok := os.LookupEnv("NEAREST_CACHE"); ok { + preferredCache = nearestCache + } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { + preferredCache = cache + } + var caches []*url.URL + caches, err = utils.GetPreferredCaches(preferredCache) + if err != nil { + log.Errorln(err) + os.Exit(1) + } + + lastSrc := "" + + for _, src := range args { + if !isPelicanUrl(src) { + log.Errorln("Provided URL is not a valid Pelican URL:", src) + os.Exit(1) + } + if _, err = client.DoPrestage(ctx, src, + client.WithCallback(pb.callback), client.WithTokenLocation(tokenLocation), + client.WithCaches(caches...)); err != nil { + lastSrc = src + break + } + } + + // Exit with failure + if err != nil { + // Print the list of errors + errMsg := err.Error() + var pe error_codes.PelicanError + var te *client.TransferErrors + if errors.As(err, &te) { + errMsg = te.UserError() + } + if errors.Is(err, &pe) { + errMsg = pe.Error() + log.Errorln("Failure prestaging " + lastSrc + ": " + errMsg) + os.Exit(pe.ExitCode()) + } else { // For now, keeping this else here to catch any errors that are not classified PelicanErrors + log.Errorln("Failure prestaging " + lastSrc + ": " + errMsg) + if client.ShouldRetry(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + } +} diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index dad7b706e..38fb0f6f5 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -56,6 +56,7 @@ Director: EnableBroker: true EnableStat: true Cache: + EnablePrefetch: true Port: 8442 SelfTest: true SelfTestInterval: 15s diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 12bececec..9d33e8b07 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1207,6 +1207,17 @@ type: stringSlice default: none components: ["cache"] --- +name: Cache.EnablePrefetch +description: |+ + Control whether data prefeteching is enabled in the cache. + + This is provided solely for testing purposes and is not advised to be disabled + in production +type: bool +default: true +hidden: true +components: ["cache"] +--- name: Cache.SelfTest description: |+ A bool indicating whether the cache should perform self health checks. diff --git a/xrootd/resources/xrootd-cache.cfg b/xrootd/resources/xrootd-cache.cfg index 1a2fa11cf..0d45b2054 100644 --- a/xrootd/resources/xrootd-cache.cfg +++ b/xrootd/resources/xrootd-cache.cfg @@ -53,7 +53,11 @@ pfc.trace info xrootd.tls all xrd.network nodnr pfc.blocksize 128k +{{if .Cache.EnablePrefetch}} pfc.prefetch 20 +{{else}} +pfc.prefetch 0 +{{- end}} pfc.writequeue 16 4 pfc.ram 4g pfc.diskusage {{if .Cache.LowWatermark}}{{.Cache.LowWatermark}}{{else}}0.90{{end}} {{if .Cache.HighWaterMark}}{{.Cache.HighWaterMark}}{{else}}0.95{{end}} purgeinterval 300s diff --git a/xrootd/xrootd_config.go b/xrootd/xrootd_config.go index 7b8382a66..54e73fb24 100644 --- a/xrootd/xrootd_config.go +++ b/xrootd/xrootd_config.go @@ -107,6 +107,7 @@ type ( CacheConfig struct { UseCmsd bool + EnablePrefetch bool EnableVoms bool CalculatedPort string HighWaterMark string