Skip to content

Commit

Permalink
Add support for prestaging data to a cache
Browse files Browse the repository at this point in the history
The new `pelican object prestage` command (and the corresponding
public APIs) allow a user to trigger the prestaging of data to a
specific cache.

The prestage itself is not intelligent: it checks to see if a file
is already present and, if not, it triggers a full download.  Future
work may extend this on the Pelican-side so there's a control
interface exposed that triggers prestaging without data movement all
the way to the client.

Includes unit tests and some modest refactoring.  Given the current
LIGO origins don't have the collections URL set (and I wanted to
test it out), this also provides the ability to override the
collections URL on the CLI.
  • Loading branch information
bbockelm committed Sep 29, 2024
1 parent d8ec81f commit c9efbab
Show file tree
Hide file tree
Showing 12 changed files with 815 additions and 85 deletions.
11 changes: 11 additions & 0 deletions client/acquire_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ func (tg *tokenGenerator) SetToken(contents string) {
tg.Token.Store(&info)
}

// Copy the contents
func (tg *tokenGenerator) Copy() *tokenGenerator {
return &tokenGenerator{
DirResp: tg.DirResp,
Destination: tg.Destination,
IsWrite: tg.IsWrite,
EnableAcquire: tg.EnableAcquire,
Sync: new(singleflight.Group),
}
}

// Determine the token name if it is embedded in the scheme, Condor-style
func getTokenName(destination *url.URL) (scheme, tokenName string) {
schemePieces := strings.Split(destination.Scheme, "+")
Expand Down
78 changes: 78 additions & 0 deletions client/fed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,81 @@ func TestClientUnpack(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, int64(11), fi.Size())
}

// A test that spins up a federation, and tests object get and put
func TestPrestage(t *testing.T) {
viper.Reset()
server_utils.ResetOriginExports()
fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)

te, err := client.NewTransferEngine(fed.Ctx)
require.NoError(t, err)

// Other set-up items:
// The cache will open the file to stat it, downloading the first block.
// Make sure we are greater than 64kb in size.
testFileContent := strings.Repeat("test file content", 10000)
// Create the temporary file to upload
tempFile, err := os.CreateTemp(t.TempDir(), "test")
assert.NoError(t, err, "Error creating temp file")
defer os.Remove(tempFile.Name())
_, err = tempFile.WriteString(testFileContent)
assert.NoError(t, err, "Error writing to temp file")
tempFile.Close()

tempToken, _ := getTempToken(t)
defer tempToken.Close()
defer os.Remove(tempToken.Name())
// Disable progress bars to not reuse the same mpb instance
viper.Set("Logging.DisableProgressBars", true)

oldPref, err := config.SetPreferredPrefix(config.PelicanPrefix)
assert.NoError(t, err)
defer func() {
_, err := config.SetPreferredPrefix(oldPref)
require.NoError(t, err)
}()

// Set path for object to upload/download
for _, export := range fed.Exports {
tempPath := tempFile.Name()
fileName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/prestage/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, fileName)

// Upload the file with COPY
transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name()))
assert.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), transferResultsUpload[0].TransferredBytes)

// Check the cache info twice, make sure it's not cached.
tc, err := te.NewClient(client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
innerFileUrl, err := url.Parse(uploadURL)
require.NoError(t, err)
age, size, err := tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
assert.Equal(t, -1, age)

age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
assert.Equal(t, -1, age)

// Prestage the object
tj, err := tc.NewPrestageJob(fed.Ctx, innerFileUrl)
require.NoError(t, err)
err = tc.Submit(tj)
require.NoError(t, err)
results, err := tc.Shutdown()
require.NoError(t, err)
assert.Equal(t, 1, len(results))

// Check if object is cached.
age, size, err = tc.CacheInfo(fed.Ctx, innerFileUrl)
require.NoError(t, err)
assert.Equal(t, int64(len(testFileContent)), size)
require.NotEqual(t, -1, age)
}
}
Loading

0 comments on commit c9efbab

Please sign in to comment.