Skip to content

Commit

Permalink
Post rebase fixes for sync functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
jhiemstrawisc committed Sep 30, 2024
1 parent 82b159b commit 5ffc8d1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 50 deletions.
88 changes: 45 additions & 43 deletions client/fed_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ func TestRecursiveUploadsAndDownloadsWithQuery(t *testing.T) {
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, mixedAuthOriginCfg)
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
require.NoError(t, err)

te, err := client.NewTransferEngine(fed.Ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -327,20 +329,19 @@ func TestRecursiveUploadsAndDownloadsWithQuery(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s?recursive", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, "osdf_osdf", dirName)
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s?recursive", discoveryUrl.Host, export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, false, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, false, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand All @@ -356,20 +357,19 @@ func TestRecursiveUploadsAndDownloadsWithQuery(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s?recursive=true", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, "osdf_osdf", dirName)
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s?recursive=true", discoveryUrl.Host, export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, false, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, false, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand All @@ -385,20 +385,19 @@ func TestRecursiveUploadsAndDownloadsWithQuery(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s?recursive=false", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, "osdf_osdf", dirName)
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s?recursive=false", discoveryUrl.Host, export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, false, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, false, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand All @@ -414,20 +413,19 @@ func TestRecursiveUploadsAndDownloadsWithQuery(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s?recursive&directread", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
export.FederationPrefix, "osdf_osdf", dirName)
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s?recursive&directread", discoveryUrl.Host, export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, false, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, false, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), false, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand Down Expand Up @@ -467,6 +465,8 @@ func TestSyncUpload(t *testing.T) {
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
require.NoError(t, err)

// Create a token file
issuer, err := config.GetServerIssuerURL()
Expand Down Expand Up @@ -525,31 +525,31 @@ func TestSyncUpload(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_upload/%s", discoveryUrl.Host, dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
})

t.Run("testSyncUploadNone", func(t *testing.T) {
// Set path for object to upload/download
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_none/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_upload_none/%s", discoveryUrl.Host, dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Synchronize the uploaded files again.
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)

// Should have already been uploaded once
Expand All @@ -559,11 +559,11 @@ func TestSyncUpload(t *testing.T) {
t.Run("testSyncUploadPartial", func(t *testing.T) {
// Set path for object to upload/download
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir))
uploadUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_upload_partial/%s", discoveryUrl.Host, dirName)
uploadInnerUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_upload_partial/%s/%s", discoveryUrl.Host, dirName, filepath.Base(innerTempDir))

// Upload some files with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err := client.DoPut(fed.Ctx, innerTempDir, uploadInnerUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 1)

Expand All @@ -573,13 +573,13 @@ func TestSyncUpload(t *testing.T) {
require.NoError(t, err)

// Upload again; this time there should be fewer uploads as the subdir was already moved.
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 2)

// Download all the objects
downloadDir := t.TempDir()
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadUrl, downloadDir, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)

Expand All @@ -596,6 +596,8 @@ func TestSyncDownload(t *testing.T) {
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
require.NoError(t, err)

// Create a token file
issuer, err := config.GetServerIssuerURL()
Expand Down Expand Up @@ -653,16 +655,16 @@ func TestSyncDownload(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_download/%s", discoveryUrl.Host, dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

t.Run("testSyncDownloadFull", func(t *testing.T) {
// Download the files we just uploaded
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
})
Expand All @@ -672,12 +674,12 @@ func TestSyncDownload(t *testing.T) {
dirName := t.TempDir()

// Synchronize the uploaded files to a local directory
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadUrl, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)

// Synchronize the files again; should result in no transfers
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
assert.NoError(t, err)
assert.Len(t, transferDetailsDownload, 0)
})
Expand All @@ -686,17 +688,17 @@ func TestSyncDownload(t *testing.T) {
// Set path for object to upload/download
downloadDir := t.TempDir()
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir))
uploadUrl = fmt.Sprintf("pelican://%s/first/namespace/sync_download_partial/%s", discoveryUrl.Host, dirName)
uploadInnerUrl := fmt.Sprintf("pelican://%s/first/namespace/sync_download_partial/%s/%s", discoveryUrl.Host, dirName, filepath.Base(innerTempDir))

// Upload the initial files
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the inner directory
innerDownloadDir := filepath.Join(downloadDir, dirName, filepath.Base(innerTempDir))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadInnerURL, innerDownloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadInnerUrl, innerDownloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsDownload, 1)

Expand All @@ -705,12 +707,12 @@ func TestSyncDownload(t *testing.T) {
newTestFileContent := "XXXX content is within another XXXX"
err = os.WriteFile(innerTempFile.Name(), []byte(newTestFileContent), os.FileMode(0755))
require.NoError(t, err)
transferDetailsUpload, err = client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err = client.DoPut(fed.Ctx, innerTempDir, uploadInnerUrl, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 1)

// Download all the objects
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
assert.Len(t, transferDetailsDownload, 2)

Expand All @@ -727,7 +729,7 @@ func TestSyncDownload(t *testing.T) {
err = os.WriteFile(innerDownloadFile, []byte("XXXX"), os.FileMode(0755))
require.NoError(t, err)
log.Debugln("Re-downloading file direct from origin")
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL+"?directread", downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl+"?directread", downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
assert.Len(t, transferDetailsDownload, 1)
contentBytes, err = os.ReadFile(filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name())))
Expand Down
Loading

0 comments on commit 5ffc8d1

Please sign in to comment.