From c9efbab120c1d0c0362ec5ae0e0bd41c6f8e6463 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Sun, 29 Sep 2024 12:17:01 +0200 Subject: [PATCH] Add support for prestaging data to a cache The new `pelican object prestage` command (and the corresponding public APIs) allow a user to trigger the prestaging of data to a specific cache. The prestage itself is not intelligent: it checks to see if a file is already present and, if not, it triggers a full download. Future work may extend this on the Pelican-side so there's a control interface exposed that triggers prestaging without data movement all the way to the client. Includes unit tests and some modest refactoring. Given the current LIGO origins don't have the collections URL set (and I wanted to test it out), this also provides the ability to override the collections URL on the CLI. --- client/acquire_token.go | 11 + client/fed_test.go | 78 ++++++ client/handle_http.go | 448 ++++++++++++++++++++++++------ client/main.go | 53 ++++ client/prestage.go | 140 ++++++++++ client/resources/both-auth.yml | 3 + cmd/object_ls.go | 12 +- cmd/object_prestage.go | 138 +++++++++ config/resources/defaults.yaml | 1 + docs/parameters.yaml | 11 + xrootd/resources/xrootd-cache.cfg | 4 + xrootd/xrootd_config.go | 1 + 12 files changed, 815 insertions(+), 85 deletions(-) create mode 100644 client/prestage.go create mode 100644 cmd/object_prestage.go diff --git a/client/acquire_token.go b/client/acquire_token.go index 9183658e7..6052ac34b 100644 --- a/client/acquire_token.go +++ b/client/acquire_token.go @@ -118,6 +118,17 @@ func (tg *tokenGenerator) SetToken(contents string) { tg.Token.Store(&info) } +// Copy the contents +func (tg *tokenGenerator) Copy() *tokenGenerator { + return &tokenGenerator{ + DirResp: tg.DirResp, + Destination: tg.Destination, + IsWrite: tg.IsWrite, + EnableAcquire: tg.EnableAcquire, + Sync: new(singleflight.Group), + } +} + // Determine the token name if it is embedded in the scheme, Condor-style func getTokenName(destination *url.URL) (scheme, tokenName string) { schemePieces := strings.Split(destination.Scheme, "+") diff --git a/client/fed_test.go b/client/fed_test.go index 93e2176f4..c16feff70 100644 --- a/client/fed_test.go +++ b/client/fed_test.go @@ -962,3 +962,81 @@ func TestClientUnpack(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(11), fi.Size()) } + +// A test that spins up a federation, and tests object get and put +func TestPrestage(t *testing.T) { + viper.Reset() + server_utils.ResetOriginExports() + fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg) + + te, err := client.NewTransferEngine(fed.Ctx) + require.NoError(t, err) + + // Other set-up items: + // The cache will open the file to stat it, downloading the first block. + // Make sure we are greater than 64kb in size. + testFileContent := strings.Repeat("test file content", 10000) + // Create the temporary file to upload + tempFile, err := os.CreateTemp(t.TempDir(), "test") + assert.NoError(t, err, "Error creating temp file") + defer os.Remove(tempFile.Name()) + _, err = tempFile.WriteString(testFileContent) + assert.NoError(t, err, "Error writing to temp file") + tempFile.Close() + + tempToken, _ := getTempToken(t) + defer tempToken.Close() + defer os.Remove(tempToken.Name()) + // Disable progress bars to not reuse the same mpb instance + viper.Set("Logging.DisableProgressBars", true) + + oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix) + assert.NoError(t, err) + defer func() { + _, err := config.SetPreferredPrefix(oldPref) + require.NoError(t, err) + }() + + // Set path for object to upload/download + for _, export := range fed.Exports { + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), + export.FederationPrefix, fileName) + + // Upload the file with COPY + transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes) + + // Check the cache info twice, make sure it's not cached. + tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name())) + require.NoError(t, err) + innerFileUrl, err := url.Parse(uploadURL) + require.NoError(t, err) + age, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + assert.Equal(t, -1, age) + + age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + assert.Equal(t, -1, age) + + // Prestage the object + tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl) + require.NoError(t, err) + err = tc.Submit(tj) + require.NoError(t, err) + results, err := tc.Shutdown() + require.NoError(t, err) + assert.Equal(t, 1, len(results)) + + // Check if object is cached. + age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl) + require.NoError(t, err) + assert.Equal(t, int64(len(testFileContent)), size) + require.NotEqual(t, -1, age) + } +} diff --git a/client/handle_http.go b/client/handle_http.go index 238f6d80b..9fc4325d9 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -103,6 +103,8 @@ type ( err error } + transferType int + // Error type for when the transfer started to return data then completely stopped StoppedTransferError struct { BytesTransferred int64 @@ -213,7 +215,7 @@ type ( remoteURL *url.URL localPath string token *tokenGenerator - upload bool + xferType transferType packOption string attempts []transferAttemptDetails project string @@ -237,7 +239,7 @@ type ( activeXfer atomic.Int64 totalXfer int localPath string - upload bool + xferType transferType recursive bool skipAcquire bool syncLevel SyncLevel // Policy for handling synchronization when the destination exists @@ -308,13 +310,14 @@ type ( setupResults sync.Once } - TransferOption = option.Interface - identTransferOptionCaches struct{} - identTransferOptionCallback struct{} - identTransferOptionTokenLocation struct{} - identTransferOptionAcquireToken struct{} - identTransferOptionToken struct{} - identTransferOptionSynchronize struct{} + TransferOption = option.Interface + identTransferOptionCaches struct{} + identTransferOptionCallback struct{} + identTransferOptionTokenLocation struct{} + identTransferOptionAcquireToken struct{} + identTransferOptionToken struct{} + identTransferOptionSynchronize struct{} + identTransferOptionCollectionsUrl struct{} transferDetailsOptions struct { NeedsToken bool @@ -335,8 +338,24 @@ const ( SyncNone = iota // When synchronizing, always re-transfer, regardless of existence at destination. SyncExist // Skip synchronization transfer if the destination exists SyncSize // Skip synchronization transfer if the destination exists and matches the current source size + + transferTypeDownload transferType = iota // Transfer is downloading from the federation + transferTypeUpload // Transfer is uploading to the federation + transferTypePrestage // Transfer is staging at a federation cache ) +// Function for merging two contexts together into one (returning a cancel) +func mergeCancel(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { + newCtx, cancel := context.WithCancel(ctx1) + stop := context.AfterFunc(ctx2, func() { + cancel() + }) + return newCtx, func() { + stop() + cancel() + } +} + // The progress container object creates several // background goroutines. Instead of creating the object // globally, create it on first use. This avoids having @@ -724,6 +743,11 @@ func WithCallback(callback TransferCallbackFunc) TransferOption { return option.New(identTransferOptionCallback{}, callback) } +// Override collections URL to be used by the TransferClient +func WithCollectionsUrl(url string) TransferOption { + return option.New(identTransferOptionCollectionsUrl{}, url) +} + // Create an option to override the cache list func WithCaches(caches ...*url.URL) TransferOption { return option.New(identTransferOptionCaches{}, caches) @@ -1140,11 +1164,14 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL callback: tc.callback, skipAcquire: tc.skipAcquire, syncLevel: tc.syncLevel, - upload: upload, + xferType: transferTypeDownload, uuid: id, project: project, token: newTokenGenerator(©Url, nil, upload, !tc.skipAcquire), } + if upload { + tj.xferType = transferTypeUpload + } if tc.token != "" { tj.token.SetToken(tc.token) } @@ -1152,17 +1179,6 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL tj.token.SetTokenLocation(tc.tokenLocation) } - mergeCancel := func(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { - newCtx, cancel := context.WithCancel(ctx1) - stop := context.AfterFunc(ctx2, func() { - cancel() - }) - return newCtx, func() { - stop() - cancel() - } - } - tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx) for _, option := range options { @@ -1224,6 +1240,98 @@ func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL return } +// Create a new prestage job for the client +// +// The returned object can be further customized as desired. +// This function does not "submit" the job for execution. +func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error) { + + id, err := uuid.NewV7() + if err != nil { + return + } + + // See if we have a projectName defined + project := searchJobAd(projectName) + + pelicanURL, err := tc.engine.newPelicanURL(remoteUrl) + if err != nil { + err = errors.Wrap(err, "error generating metadata for specified url") + return + } + + copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues. + tj = &TransferJob{ + prefObjServers: tc.prefObjServers, + remoteURL: ©Url, + callback: tc.callback, + skipAcquire: tc.skipAcquire, + syncLevel: tc.syncLevel, + xferType: transferTypePrestage, + uuid: id, + project: project, + token: newTokenGenerator(©Url, nil, false, !tc.skipAcquire), + } + if tc.token != "" { + tj.token.SetToken(tc.token) + } + if tc.tokenLocation != "" { + tj.token.SetTokenLocation(tc.tokenLocation) + } + + tj.ctx, tj.cancel = mergeCancel(ctx, tc.ctx) + + for _, option := range options { + switch option.Ident() { + case identTransferOptionCaches{}: + tj.prefObjServers = option.Value().([]*url.URL) + case identTransferOptionCallback{}: + tj.callback = option.Value().(TransferCallbackFunc) + case identTransferOptionTokenLocation{}: + tj.token.SetTokenLocation(option.Value().(string)) + case identTransferOptionAcquireToken{}: + tj.token.EnableAcquire = option.Value().(bool) + case identTransferOptionToken{}: + tj.token.SetToken(option.Value().(string)) + case identTransferOptionSynchronize{}: + tj.syncLevel = option.Value().(SyncLevel) + } + } + + tj.directorUrl = pelicanURL.directorUrl + dirResp, err := GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, "") + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + tj.dirResp = dirResp + tj.token.DirResp = &dirResp + + log.Debugln("Dir resp:", dirResp.XPelNsHdr) + if dirResp.XPelNsHdr.RequireToken { + contents, err := tj.token.get() + if err != nil || contents == "" { + return nil, errors.Wrap(err, "failed to get token for transfer") + } + + // The director response may change if it's given a token; let's repeat the query. + if contents != "" { + dirResp, err = GetDirectorInfoForPath(tj.ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, contents) + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return nil, err + } + tj.dirResp = dirResp + tj.token.DirResp = &dirResp + } + } + + log.Debugf("Created new prestage job, ID %s client %s, for URL %s", tj.uuid.String(), tc.id.String(), remoteUrl.String()) + return +} + // Returns the status of the transfer job-to-file(s) lookup // // ok is true if the lookup has completed. @@ -1238,7 +1346,7 @@ func (tj *TransferJob) GetLookupStatus() (ok bool, err error) { // Submit the transfer job to the client for processing func (tc *TransferClient) Submit(tj *TransferJob) error { // Ensure that a tj.Wait() immediately after Submit will always block. - log.Debugln("Submiting transfer job", tj.uuid.String()) + log.Debugln("Submitting transfer job", tj.uuid.String()) select { case <-tc.ctx.Done(): return tc.ctx.Err() @@ -1247,6 +1355,87 @@ func (tc *TransferClient) Submit(tj *TransferJob) error { } } +// Check the transfer client +func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error) { + age = -1 + + pelicanURL, err := tc.engine.newPelicanURL(remoteUrl) + if err != nil { + err = errors.Wrap(err, "error generating metadata for specified URL") + return + } + + var prefObjServers []*url.URL + token := newTokenGenerator(remoteUrl, nil, false, true) + if tc.token != "" { + token.SetToken(tc.token) + } + if tc.tokenLocation != "" { + token.SetTokenLocation(tc.tokenLocation) + } + if tc.skipAcquire { + token.EnableAcquire = !tc.skipAcquire + } + for _, option := range options { + switch option.Ident() { + case identTransferOptionCaches{}: + prefObjServers = option.Value().([]*url.URL) + case identTransferOptionTokenLocation{}: + token.SetTokenLocation(option.Value().(string)) + case identTransferOptionAcquireToken{}: + token.EnableAcquire = option.Value().(bool) + case identTransferOptionToken{}: + token.SetToken(option.Value().(string)) + } + } + + ctx, cancel := mergeCancel(tc.ctx, ctx) + defer cancel() + + dirResp, err := GetDirectorInfoForPath(ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, "") + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + token.DirResp = &dirResp + + if dirResp.XPelNsHdr.RequireToken { + var contents string + contents, err = token.get() + if err != nil || contents == "" { + err = errors.Wrap(err, "failed to get token for cache info query") + return + } + + // The director response may change if it's given a token; let's repeat the query. + if contents != "" { + dirResp, err = GetDirectorInfoForPath(ctx, remoteUrl.Path, pelicanURL.directorUrl, false, remoteUrl.RawQuery, contents) + if err != nil { + log.Errorln(err) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String()) + return + } + token.DirResp = &dirResp + } + } + + var sortedServers []*url.URL + sortedServers, err = generateSortedObjServers(dirResp, prefObjServers) + if err != nil { + log.Errorln("Failed to get namespace caches (treated as non-fatal):", err) + return + } + if len(sortedServers) == 0 { + err = errors.New("No available cache servers detected") + return + } + cacheUrl := *sortedServers[0] + cacheUrl.Path = remoteUrl.Path + + return objectCached(ctx, &cacheUrl, token, config.GetTransport()) +} + // Close the transfer client object // // Any subsequent job submissions will cause a panic @@ -1436,7 +1625,7 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error remoteUrl := &url.URL{Path: job.job.remoteURL.Path, Scheme: job.job.remoteURL.Scheme} var transfers []transferAttemptDetails - if job.job.upload { // Uploads use the redirected endpoint directly + if job.job.xferType == transferTypeUpload { // Uploads use the redirected endpoint directly if len(job.job.dirResp.ObjectServers) == 0 { err = errors.New("No origins found for upload") return @@ -1473,11 +1662,22 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error } if job.job.recursive { - if job.job.upload { + if job.job.xferType == transferTypeUpload { return te.walkDirUpload(job, transfers, te.files, job.job.localPath) } else { return te.walkDirDownload(job, transfers, te.files, remoteUrl) } + } else if job.job.xferType == transferTypePrestage { + // For prestage, from day one we handle internally whether it's recursive + // (as opposed to making the user specify explicitly) + var statInfo FileInfo + if statInfo, err = statHttp(remoteUrl, job.job.dirResp, job.job.token); err != nil { + err = errors.Wrap(err, "failed to stat object to prestage") + return + } + if statInfo.IsCollection { + return te.walkDirDownload(job, transfers, te.files, remoteUrl) + } } job.job.totalXfer += 1 @@ -1493,7 +1693,7 @@ func (te *TransferEngine) createTransferFiles(job *clientTransferJob) (err error remoteURL: remoteUrl, packOption: packOption, localPath: job.job.localPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, project: job.job.project, @@ -1545,7 +1745,7 @@ func runTransferWorker(ctx context.Context, workChan <-chan *clientTransferFile, } var err error var transferResults TransferResults - if file.file.upload { + if file.file.xferType == transferTypeUpload { transferResults, err = uploadObject(file.file) } else { transferResults, err = downloadObject(file.file) @@ -1604,56 +1804,12 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe return } - headClient := &http.Client{Transport: transport} - // Note we are not using a HEAD request here but a GET request for one byte; - // this is because the XRootD server currently (v5.6.9) only returns the Age - // header for GETs - headRequest, _ := http.NewRequestWithContext(ctx, http.MethodGet, tUrl.String(), nil) - headRequest.Header.Set("Range", "0-0") - if token != nil { - if tokenContents, err := token.get(); err == nil && tokenContents != "" { - headRequest.Header.Set("Authorization", "Bearer "+tokenContents) - } - } - var headResponse *http.Response - headResponse, err := headClient.Do(headRequest) - if err != nil { + if age, size, err := objectCached(ctx, tUrl, token, transport); err != nil { headChan <- checkResults{idx, 0, -1, err} return - } - // Allow response body to fail to read; we are only interested in the headers - // of the response, not the contents. - if _, err := io.ReadAll(headResponse.Body); err != nil { - log.Warningln("Failure when reading the one-byte-response body:", err) - } - headResponse.Body.Close() - var age int = -1 - var size int64 = 0 - if headResponse.StatusCode <= 300 { - contentLengthStr := headResponse.Header.Get("Content-Length") - if contentLengthStr != "" { - size, err = strconv.ParseInt(contentLengthStr, 10, 64) - if err != nil { - err = errors.Wrap(err, "problem converting Content-Length in response to an int") - log.Errorln(err.Error()) - - } - } - ageStr := headResponse.Header.Get("Age") - if ageStr != "" { - if ageParsed, err := strconv.Atoi(ageStr); err != nil { - log.Warningf("Ignoring invalid age value (%s) due to parsing error: %s", headRequest.Header.Get("Age"), err.Error()) - } else { - age = ageParsed - } - } } else { - err = &HttpErrResp{ - Code: headResponse.StatusCode, - Err: fmt.Sprintf("GET \"%s\" resulted in status code %d", tUrl, headResponse.StatusCode), - } + headChan <- checkResults{idx, uint64(size), age, err} } - headChan <- checkResults{idx, uint64(size), age, err} }(idx, &tUrl) } // 1 -> success. @@ -1725,11 +1881,16 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe // create the destination directory). func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) { log.Debugln("Downloading object from", transfer.remoteURL, "to", transfer.localPath) - // Remove the source from the file path - directory := path.Dir(transfer.localPath) + var downloaded int64 - if err = os.MkdirAll(directory, 0700); err != nil { - return + localPath := transfer.localPath + if transfer.xferType == transferTypeDownload { + directory := path.Dir(localPath) + if err = os.MkdirAll(directory, 0700); err != nil { + return + } + } else { + localPath = "/dev/null" } size, attempts := sortAttempts(transfer.job.ctx, transfer.remoteURL.Path, transfer.attempts, transfer.token) @@ -1764,7 +1925,7 @@ func downloadObject(transfer *transferFile) (transferResults TransferResults, er tokenContents, _ = transfer.token.get() } attemptDownloaded, timeToFirstByte, cacheAge, serverVersion, err := downloadHTTP( - ctx, transfer.engine, transfer.callback, transferEndpoint, transfer.localPath, size, tokenContents, transfer.project, + ctx, transfer.engine, transfer.callback, transferEndpoint, localPath, size, tokenContents, transfer.project, ) endTime := time.Now() if cacheAge >= 0 { @@ -1933,6 +2094,10 @@ func downloadHTTP(ctx context.Context, te *TransferEngine, callback TransferCall if req, err = grab.NewRequestToWriter(unpacker, transferUrl.String()); err != nil { return 0, 0, -1, "", errors.Wrap(err, "Failed to create new download request") } + } else if dest == "/dev/null" { + if req, err = grab.NewRequestToWriter(io.Discard, transferUrl.String()); err != nil { + return 0, 0, -1, "", errors.Wrap(err, "Failed to create new prestage request") + } } else if req, err = grab.NewRequest(dest, transferUrl.String()); err != nil { return 0, 0, -1, "", errors.Wrap(err, "Failed to create new download request") } @@ -2472,6 +2637,26 @@ func createWebDavClient(collectionsUrl *url.URL, token *tokenGenerator, project return } +// Determine whether to skip a prestage based on whether an object is at a cache +func skipPrestage(object string, job *TransferJob) bool { + var cache url.URL + if len(job.dirResp.ObjectServers) > 0 { + cache = *job.dirResp.ObjectServers[0] + } else if len(job.prefObjServers) > 0 { + cache = *job.prefObjServers[0] + } else { + log.Errorln("Cannot skip prestage if no cache is specified!") + } + + cache.Path = object + if age, _, err := objectCached(job.ctx, &cache, job.token, config.GetTransport()); err == nil { + return age >= 0 + } else { + log.Warningln("Failed to check cache status of object", cache.String(), "so assuming it needs prestaging:", err) + return false + } +} + // Depending on the synchronization policy, decide if a object download should be skipped func skipDownload(syncLevel SyncLevel, remoteInfo fs.FileInfo, localPath string) bool { if syncLevel == SyncNone { @@ -2552,7 +2737,9 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer if err != nil { return err } - } else if localPath := path.Join(job.job.localPath, localBase, info.Name()); skipDownload(job.job.syncLevel, info, localPath) { + } else if job.job.xferType == transferTypePrestage && skipPrestage(newPath, job.job) { + log.Infoln("Skipping prestage of object", newPath, "as it already is at the cache") + } else if localPath := path.Join(job.job.localPath, localBase, info.Name()); job.job.xferType == transferTypeDownload && skipDownload(job.job.syncLevel, info, localPath) { log.Infoln("Skipping download of object", newPath, "as it already exists at", localPath) } else { job.job.activeXfer.Add(1) @@ -2570,7 +2757,7 @@ func (te *TransferEngine) walkDirDownloadHelper(job *clientTransferJob, transfer remoteURL: &url.URL{Path: newPath}, packOption: transfers[0].PackOption, localPath: localPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, }, @@ -2620,7 +2807,7 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran remoteURL: &url.URL{Path: remotePath}, packOption: transfers[0].PackOption, localPath: newPath, - upload: job.job.upload, + xferType: job.job.xferType, token: job.job.token, attempts: transfers, }, @@ -2636,6 +2823,10 @@ func (te *TransferEngine) walkDirUpload(job *clientTransferJob, transfers []tran func listHttp(remoteObjectUrl *url.URL, dirResp server_structs.DirectorResponse, token *tokenGenerator) (fileInfos []FileInfo, err error) { // Get our collection listing host collectionsUrl := dirResp.XPelNsHdr.CollectionsUrl + if collectionsUrl == nil { + err = errors.New("namespace does not provide a collections URL for listing") + return + } log.Debugln("Collections URL: ", collectionsUrl.String()) project := searchJobAd(projectName) @@ -2797,6 +2988,103 @@ func statHttp(dest *url.URL, dirResp server_structs.DirectorResponse, token *tok return } +// Check if a given URL is present at the first cache in the director response +// +// Note that xrootd returns an `Age` header for GETs but only a `Content-Length` +// header for HEADs. If `Content-Range` is found, we will use that header; if not, +// we will issue two commands. +func objectCached(ctx context.Context, objectUrl *url.URL, token *tokenGenerator, transport http.RoundTripper) (age int, size int64, err error) { + + age = -1 + + headClient := &http.Client{Transport: transport} + headRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, objectUrl.String(), nil) + if err != nil { + return + } + headRequest.Header.Set("Range", "0-0") + if token != nil { + if tokenContents, err := token.get(); err == nil && tokenContents != "" { + headRequest.Header.Set("Authorization", "Bearer "+tokenContents) + } + } + var headResponse *http.Response + headResponse, err = headClient.Do(headRequest) + if err != nil { + return + } + // Allow response body to fail to read; we are only interested in the headers + // of the response, not the contents. + if _, err := io.Copy(io.Discard, headResponse.Body); err != nil { + log.Warningln("Failure when reading the one-byte-response body:", err) + } + headResponse.Body.Close() + gotContentRange := false + if headResponse.StatusCode <= 300 { + if contentRangeStr := headResponse.Header.Get("Content-Range"); contentRangeStr != "" { + if after, found := strings.CutPrefix(contentRangeStr, "bytes 0-0/"); found { + if afterParsed, err := strconv.Atoi(after); err == nil { + size = int64(afterParsed) + gotContentRange = true + } else { + log.Warningf("Ignoring invalid content range value (%s) due to parsing error: %s", after, err.Error()) + } + } else { + log.Debugln("Unexpected value found in Content-Range header:", contentRangeStr) + } + } + ageStr := headResponse.Header.Get("Age") + if ageStr != "" { + if ageParsed, err := strconv.Atoi(ageStr); err != nil { + log.Warningf("Ignoring invalid age value (%s) due to parsing error: %s", headRequest.Header.Get("Age"), err.Error()) + } else { + age = ageParsed + } + } + } else { + err = &HttpErrResp{ + Code: headResponse.StatusCode, + Err: fmt.Sprintf("GET \"%s\" resulted in status code %d", objectUrl, headResponse.StatusCode), + } + } + // Early return -- all the info we wanted was in the GET response. + if gotContentRange { + return + } + + headRequest, err = http.NewRequestWithContext(ctx, http.MethodHead, objectUrl.String(), nil) + if err != nil { + return + } + if token != nil { + if tokenContents, err := token.get(); err == nil && tokenContents != "" { + headRequest.Header.Set("Authorization", "Bearer "+tokenContents) + } + } + + headResponse, err = headClient.Do(headRequest) + if err != nil { + return + } + if headResponse.StatusCode <= 300 { + contentLengthStr := headResponse.Header.Get("Content-Length") + if contentLengthStr != "" { + size, err = strconv.ParseInt(contentLengthStr, 10, 64) + if err != nil { + err = errors.Wrap(err, "problem converting Content-Length in response to an int") + log.Errorln(err.Error()) + + } + } + } else { + err = &HttpErrResp{ + Code: headResponse.StatusCode, + Err: fmt.Sprintf("HEAD \"%s\" resulted in status code %d", objectUrl, headResponse.StatusCode), + } + } + return +} + // This function searches the condor job ad for a specific classad and returns the value of that classad func searchJobAd(classad classAd) string { diff --git a/client/main.go b/client/main.go index 71615756a..c3c8b920e 100644 --- a/client/main.go +++ b/client/main.go @@ -125,6 +125,49 @@ func DoStat(ctx context.Context, destination string, options ...TransferOption) } } +// Check the cache information of a remote cache +func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error) { + + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to do cache info:", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) while check file size: %v", r) + err = errors.New(ret) + return + } + }() + + destUri, err := url.Parse(destination) + if err != nil { + log.Errorln("Failed to parse destination URL") + return + } + + // Check if we understand the found url scheme + err = schemeUnderstood(destUri.Scheme) + if err != nil { + return + } + + te, err := NewTransferEngine(ctx) + if err != nil { + return + } + + defer func() { + if err := te.Shutdown(); err != nil { + log.Errorln("Failure when shutting down transfer engine:", err) + } + }() + + tc, err := te.NewClient(options...) + if err != nil { + return + } + return tc.CacheInfo(ctx, destUri) +} + func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error) { fedInfo, err := config.GetFederation(ctx) if err != nil { @@ -282,6 +325,7 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) // Get our token if needed token := newTokenGenerator(remoteObjectUrl, &dirResp, false, true) + collectionsOverride := "" for _, option := range options { switch option.Ident() { case identTransferOptionTokenLocation{}: @@ -290,6 +334,8 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) token.EnableAcquire = option.Value().(bool) case identTransferOptionToken{}: token.SetToken(option.Value().(string)) + case identTransferOptionCollectionsUrl{}: + collectionsOverride = option.Value().(string) } } @@ -299,6 +345,13 @@ func DoList(ctx context.Context, remoteObject string, options ...TransferOption) return nil, errors.Wrap(err, "failed to get token for transfer") } } + if collectionsOverride != "" { + collectionsOverrideUrl, err := url.Parse(collectionsOverride) + if err != nil { + return nil, errors.Wrap(err, "unable to parse collections URL override") + } + dirResp.XPelNsHdr.CollectionsUrl = collectionsOverrideUrl + } fileInfos, err = listHttp(remoteObjectUrl, dirResp, token) if err != nil { diff --git a/client/prestage.go b/client/prestage.go new file mode 100644 index 000000000..39a5d07f3 --- /dev/null +++ b/client/prestage.go @@ -0,0 +1,140 @@ +/*************************************************************** + * + * Copyright (C) 2024, Morgridge Institute for Research + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ***************************************************************/ + +package client + +import ( + "context" + "fmt" + "net/url" + "runtime/debug" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/pelicanplatform/pelican/utils" +) + +// Single-shot call to prestage a single prefix +func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error) { + // First, create a handler for any panics that occur + defer func() { + if r := recover(); r != nil { + log.Debugln("Panic captured while attempting to perform prestage:", r) + log.Debugln("Panic caused by the following", string(debug.Stack())) + ret := fmt.Sprintf("Unrecoverable error (panic) captured in prestage: %v", r) + err = errors.New(ret) + } + }() + + // Parse the source with URL parse + remotePrefix, remotePrefixScheme := correctURLWithUnderscore(prefixUrl) + remotePrefixUrl, err := url.Parse(remotePrefix) + if err != nil { + log.Errorln("Failed to parse source URL:", err) + return nil, err + } + + // Check if we have a query and that it is understood + err = utils.CheckValidQuery(remotePrefixUrl) + if err != nil { + return + } + + remotePrefixUrl.Scheme = remotePrefixScheme + + // This is for condor cases: + remotePrefixScheme, _ = getTokenName(remotePrefixUrl) + + // Check if we understand the found url scheme + err = schemeUnderstood(remotePrefixScheme) + if err != nil { + return nil, err + } + + success := false + + te, err := NewTransferEngine(ctx) + if err != nil { + return nil, err + } + + defer func() { + if err := te.Shutdown(); err != nil { + log.Errorln("Failure when shutting down transfer engine:", err) + } + }() + tc, err := te.NewClient(options...) + if err != nil { + return + } + tj, err := tc.NewPrestageJob(context.Background(), remotePrefixUrl) + if err != nil { + return + } + err = tc.Submit(tj) + if err != nil { + return + } + + transferResults, err = tc.Shutdown() + if err == nil { + if tj.lookupErr == nil { + success = true + } else { + err = tj.lookupErr + } + } + var downloaded int64 = 0 + for _, result := range transferResults { + downloaded += result.TransferredBytes + if err == nil && result.Error != nil { + success = false + err = result.Error + } + } + + if success { + // Get the final size of the download file + } else { + log.Error("Prestage failed:", err) + } + + if !success { + // If there's only a single transfer error, remove the wrapping to provide + // a simpler error message. Results in: + // failed download from local-cache: server returned 404 Not Found + // versus: + // failed to download file: transfer error: failed download from local-cache: server returned 404 Not Found + var te *TransferErrors + if errors.As(err, &te) { + if len(te.Unwrap()) == 1 { + var tae *TransferAttemptError + if errors.As(te.Unwrap()[0], &tae) { + return nil, tae + } else { + return nil, errors.Wrap(err, "failed to prestage file") + } + } + return nil, te + } + return nil, errors.Wrap(err, "failed to prestage file") + } else { + return transferResults, err + } +} diff --git a/client/resources/both-auth.yml b/client/resources/both-auth.yml index c23eb843a..4a0edd67b 100644 --- a/client/resources/both-auth.yml +++ b/client/resources/both-auth.yml @@ -1,5 +1,8 @@ # Origin export configuration to test full multi-export capabilities +Cache: + EnablePrefetch: false + Origin: # Things that configure the origin itself StorageType: "posix" diff --git a/cmd/object_ls.go b/cmd/object_ls.go index 003623403..566b17352 100644 --- a/cmd/object_ls.go +++ b/cmd/object_ls.go @@ -45,9 +45,10 @@ var ( func init() { flagSet := lsCmd.Flags() flagSet.StringP("token", "t", "", "Token file to use for transfer") + flagSet.StringP("collections-url", "", "", "URL to use for collection listing, overriding the director's response") flagSet.BoolP("long", "l", false, "Include extended information") - flagSet.BoolP("collectionOnly", "C", false, "List collections only") - flagSet.BoolP("objectonly", "O", false, "List objects only") + flagSet.BoolP("collection-only", "C", false, "List collections only") + flagSet.BoolP("object-only", "O", false, "List objects only") flagSet.BoolP("json", "j", false, "Print results in JSON format") objectCmd.AddCommand(lsCmd) @@ -82,16 +83,17 @@ func listMain(cmd *cobra.Command, args []string) error { log.Debugln("Location:", object) long, _ := cmd.Flags().GetBool("long") - collectionOnly, _ := cmd.Flags().GetBool("collectionOnly") - objectOnly, _ := cmd.Flags().GetBool("objectonly") + collectionOnly, _ := cmd.Flags().GetBool("collection-only") + objectOnly, _ := cmd.Flags().GetBool("object-only") asJSON, _ := cmd.Flags().GetBool("json") + collectionsUrl, _ := cmd.Flags().GetString("collections-url") if collectionOnly && objectOnly { // If a user specifies collectionOnly and objectOnly, this means basic functionality (list both objects and directories) so just remove the flags return errors.New("cannot specify both collectionOnly (-C) and object only (-O) flags, as they are mutually exclusive") } - fileInfos, err := client.DoList(ctx, object, client.WithTokenLocation(tokenLocation)) + fileInfos, err := client.DoList(ctx, object, client.WithTokenLocation(tokenLocation), client.WithCollectionsUrl(collectionsUrl)) // Exit with failure if err != nil { diff --git a/cmd/object_prestage.go b/cmd/object_prestage.go new file mode 100644 index 000000000..e5e016b75 --- /dev/null +++ b/cmd/object_prestage.go @@ -0,0 +1,138 @@ +/*************************************************************** +* +* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research +* +* Licensed under the Apache License, Version 2.0 (the "License"); you +* may not use this file except in compliance with the License. You may +* obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +***************************************************************/ + +package main + +import ( + "net/url" + "os" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/pelicanplatform/pelican/client" + "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/error_codes" + "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/utils" +) + +var ( + prestageCmd = &cobra.Command{ + Use: "prestage {source ...} {destination}", + Short: "Prestages a prefix to a Pelican cache", + Hidden: true, // Until we decide how safe this approach is, keep the command hidden. + Run: prestageMain, + } +) + +func init() { + flagSet := prestageCmd.Flags() + flagSet.StringP("cache", "c", "", "Cache to use") + flagSet.StringP("token", "t", "", "Token file to use for transfer") + objectCmd.AddCommand(prestageCmd) +} + +func prestageMain(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + + err := config.InitClient() + if err != nil { + log.Errorln(err) + + if client.IsRetryable(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } else { + os.Exit(1) + } + } + + tokenLocation, _ := cmd.Flags().GetString("token") + + pb := newProgressBar() + defer pb.shutdown() + + if fileInfo, _ := os.Stdout.Stat(); (fileInfo.Mode()&os.ModeCharDevice) != 0 && param.Logging_LogLocation.GetString() == "" && !param.Logging_DisableProgressBars.GetBool() { + pb.launchDisplay(ctx) + } + + if len(args) < 1 { + log.Errorln("Prefix(es) to prestage must be specified") + err = cmd.Help() + if err != nil { + log.Errorln("Failed to print out help:", err) + } + os.Exit(1) + } + + log.Debugln("Prestage prefixes:", args) + + // Check for manually entered cache to use + var preferredCache string + if nearestCache, ok := os.LookupEnv("NEAREST_CACHE"); ok { + preferredCache = nearestCache + } else if cache, _ := cmd.Flags().GetString("cache"); cache != "" { + preferredCache = cache + } + var caches []*url.URL + caches, err = utils.GetPreferredCaches(preferredCache) + if err != nil { + log.Errorln(err) + os.Exit(1) + } + + lastSrc := "" + + for _, src := range args { + if !isPelicanUrl(src) { + log.Errorln("Provided URL is not a valid Pelican URL:", src) + os.Exit(1) + } + if _, err = client.DoPrestage(ctx, src, + client.WithCallback(pb.callback), client.WithTokenLocation(tokenLocation), + client.WithCaches(caches...)); err != nil { + lastSrc = src + break + } + } + + // Exit with failure + if err != nil { + // Print the list of errors + errMsg := err.Error() + var pe error_codes.PelicanError + var te *client.TransferErrors + if errors.As(err, &te) { + errMsg = te.UserError() + } + if errors.Is(err, &pe) { + errMsg = pe.Error() + log.Errorln("Failure prestaging " + lastSrc + ": " + errMsg) + os.Exit(pe.ExitCode()) + } else { // For now, keeping this else here to catch any errors that are not classified PelicanErrors + log.Errorln("Failure prestaging " + lastSrc + ": " + errMsg) + if client.ShouldRetry(err) { + log.Errorln("Errors are retryable") + os.Exit(11) + } + os.Exit(1) + } + } +} diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index dad7b706e..38fb0f6f5 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -56,6 +56,7 @@ Director: EnableBroker: true EnableStat: true Cache: + EnablePrefetch: true Port: 8442 SelfTest: true SelfTestInterval: 15s diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 12bececec..9d33e8b07 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1207,6 +1207,17 @@ type: stringSlice default: none components: ["cache"] --- +name: Cache.EnablePrefetch +description: |+ + Control whether data prefeteching is enabled in the cache. + + This is provided solely for testing purposes and is not advised to be disabled + in production +type: bool +default: true +hidden: true +components: ["cache"] +--- name: Cache.SelfTest description: |+ A bool indicating whether the cache should perform self health checks. diff --git a/xrootd/resources/xrootd-cache.cfg b/xrootd/resources/xrootd-cache.cfg index 1a2fa11cf..0d45b2054 100644 --- a/xrootd/resources/xrootd-cache.cfg +++ b/xrootd/resources/xrootd-cache.cfg @@ -53,7 +53,11 @@ pfc.trace info xrootd.tls all xrd.network nodnr pfc.blocksize 128k +{{if .Cache.EnablePrefetch}} pfc.prefetch 20 +{{else}} +pfc.prefetch 0 +{{- end}} pfc.writequeue 16 4 pfc.ram 4g pfc.diskusage {{if .Cache.LowWatermark}}{{.Cache.LowWatermark}}{{else}}0.90{{end}} {{if .Cache.HighWaterMark}}{{.Cache.HighWaterMark}}{{else}}0.95{{end}} purgeinterval 300s diff --git a/xrootd/xrootd_config.go b/xrootd/xrootd_config.go index 7b8382a66..54e73fb24 100644 --- a/xrootd/xrootd_config.go +++ b/xrootd/xrootd_config.go @@ -107,6 +107,7 @@ type ( CacheConfig struct { UseCmsd bool + EnablePrefetch bool EnableVoms bool CalculatedPort string HighWaterMark string