From 16c583f3e27c3e1716907ebec90dcdecb11bae55 Mon Sep 17 00:00:00 2001 From: Joe Reuss Date: Fri, 1 Mar 2024 10:11:02 -0600 Subject: [PATCH] Remove use of global fed metadata within client This is v2 of PR #798 since the merge conflicts were becoming very tedious. Therefore, I took the changes from that PR and converted it over to a new branch. You can get the changes split up a little better from that PR as well but I will do my best to list them here: Changes include: - Added a ttl cache for storing federation url metadata so we do not have to look up each time - Improved unit tests for client commands, Note: for pelican prefix and osdf:// url scheme, it's hard to test since we use osg-htc.org for metadata lookup. Therefore it is hard to work with fed in a box so I just check for the proper metadata lookup - Added function called discoverUrlFederation which discovers a federation from a url and does not populate global metadata config values - Added a function called schemeUnderstood to take some repeated code within client into one function to check for understood url schemes - Added unit tests for discoverUrlFederation and schemeUnderstood --- client/fed_test.go | 489 ++++++++++++++------------------- client/handle_http.go | 160 ++++++++--- client/handle_http_test.go | 121 ++++++++ client/main.go | 162 ++++------- client/main_test.go | 33 +++ cmd/object_copy.go | 4 + cmd/object_get.go | 4 + cmd/object_put.go | 4 + cmd/plugin.go | 2 +- config/config.go | 138 ++++++---- config/config_test.go | 32 +++ github_scripts/get_put_test.sh | 5 +- local_cache/local_cache.go | 2 +- 13 files changed, 659 insertions(+), 497 deletions(-) diff --git a/client/fed_test.go b/client/fed_test.go index 3f87ae178..2052541f4 100644 --- a/client/fed_test.go +++ b/client/fed_test.go @@ -23,10 +23,8 @@ package client_test import ( "context" _ "embed" - "encoding/json" "fmt" - "io" - "net/http" + "net/url" "os" "path" "path/filepath" @@ -34,7 +32,6 @@ import ( "testing" "time" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -44,9 +41,7 @@ import ( "github.com/pelicanplatform/pelican/common" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/fed_test_utils" - "github.com/pelicanplatform/pelican/launchers" "github.com/pelicanplatform/pelican/param" - "github.com/pelicanplatform/pelican/server_utils" "github.com/pelicanplatform/pelican/test_utils" "github.com/pelicanplatform/pelican/token" "github.com/pelicanplatform/pelican/token_scopes" @@ -63,168 +58,6 @@ var ( mixedAuthOriginCfg string ) -func generateFileTestScitoken() (string, error) { - // Issuer is whichever server that initiates the test, so it's the server itself - issuerUrl, err := config.GetServerIssuerURL() - if err != nil { - return "", err - } - if issuerUrl == "" { // if empty, then error - return "", errors.New("Failed to create token: Invalid iss, Server_ExternalWebUrl is empty") - } - - fTestTokenCfg := token.NewWLCGToken() - fTestTokenCfg.Lifetime = time.Minute - fTestTokenCfg.Issuer = issuerUrl - fTestTokenCfg.Subject = "origin" - fTestTokenCfg.AddAudiences(config.GetServerAudience()) - fTestTokenCfg.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"), - token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/")) - - // CreateToken also handles validation for us - tok, err := fTestTokenCfg.CreateToken() - if err != nil { - return "", errors.Wrap(err, "failed to create file test token:") - } - - return tok, nil -} - -func TestFullUpload(t *testing.T) { - // Setup our test federation - ctx, cancel, egrp := test_utils.TestContext(context.Background(), t) - defer func() { require.NoError(t, egrp.Wait()) }() - defer cancel() - - viper.Reset() - common.ResetOriginExports() - defer viper.Reset() - defer common.ResetOriginExports() - - modules := config.ServerType(0) - modules.Set(config.OriginType) - modules.Set(config.DirectorType) - modules.Set(config.RegistryType) - - // Create our own temp directory (for some reason t.TempDir() does not play well with xrootd) - tmpPathPattern := "XRootD-Test_Origin*" - tmpPath, err := os.MkdirTemp("", tmpPathPattern) - require.NoError(t, err) - - // Need to set permissions or the xrootd process we spawn won't be able to write PID/UID files - permissions := os.FileMode(0755) - err = os.Chmod(tmpPath, permissions) - require.NoError(t, err) - - viper.Set("ConfigDir", tmpPath) - - // Increase the log level; otherwise, its difficult to debug failures - viper.Set("Logging.Level", "Debug") - config.InitConfig() - - originDir, err := os.MkdirTemp("", "Origin") - assert.NoError(t, err) - - // Change the permissions of the temporary directory - permissions = os.FileMode(0777) - err = os.Chmod(originDir, permissions) - require.NoError(t, err) - - viper.Set("Origin.FederationPrefix", "/test") - viper.Set("Origin.StoragePrefix", originDir) - viper.Set("Origin.StorageType", "posix") - // Disable functionality we're not using (and is difficult to make work on Mac) - viper.Set("Origin.EnableCmsd", false) - viper.Set("Origin.EnableMacaroons", false) - viper.Set("Origin.EnableVoms", false) - viper.Set("Origin.EnableWrites", true) - viper.Set("TLSSkipVerify", true) - viper.Set("Server.EnableUI", false) - viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite")) - viper.Set("Origin.RunLocation", tmpPath) - viper.Set("Registry.RequireOriginApproval", false) - viper.Set("Registry.RequireCacheApproval", false) - viper.Set("Logging.Origin.Scitokens", "debug") - viper.Set("Origin.Port", 0) - viper.Set("Server.WebPort", 0) - - err = config.InitServer(ctx, modules) - require.NoError(t, err) - - fedCancel, err := launchers.LaunchModules(ctx, modules) - defer fedCancel() - if err != nil { - log.Errorln("Failure in fedServeInternal:", err) - require.NoError(t, err) - } - - desiredURL := param.Server_ExternalWebUrl.GetString() + "/api/v1.0/health" - err = server_utils.WaitUntilWorking(ctx, "GET", desiredURL, "director", 200) - require.NoError(t, err) - - httpc := http.Client{ - Transport: config.GetTransport(), - } - resp, err := httpc.Get(desiredURL) - require.NoError(t, err) - - assert.Equal(t, resp.StatusCode, http.StatusOK) - - responseBody, err := io.ReadAll(resp.Body) - require.NoError(t, err) - expectedResponse := struct { - Msg string `json:"message"` - }{} - err = json.Unmarshal(responseBody, &expectedResponse) - require.NoError(t, err) - - assert.NotEmpty(t, expectedResponse.Msg) - - t.Run("testFullUpload", func(t *testing.T) { - testFileContent := "test file content" - - // Create the temporary file to upload - tempFile, err := os.CreateTemp(t.TempDir(), "test") - assert.NoError(t, err, "Error creating temp file") - defer os.Remove(tempFile.Name()) - _, err = tempFile.WriteString(testFileContent) - assert.NoError(t, err, "Error writing to temp file") - tempFile.Close() - - // Create a token file - token, err := generateFileTestScitoken() - assert.NoError(t, err) - tempToken, err := os.CreateTemp(t.TempDir(), "token") - assert.NoError(t, err, "Error creating temp token file") - defer os.Remove(tempToken.Name()) - _, err = tempToken.WriteString(token) - assert.NoError(t, err, "Error writing to temp token file") - tempToken.Close() - - // Upload the file - tempPath := tempFile.Name() - fileName := filepath.Base(tempPath) - uploadURL := "stash:///test/" + fileName - - transferResults, err := client.DoCopy(ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) - assert.NoError(t, err, "Error uploading file") - assert.Equal(t, int64(len(testFileContent)), transferResults[0].TransferredBytes, "Uploaded file size does not match") - - // Upload an osdf file - uploadURL = "pelican:///test/stuff/blah.txt" - assert.NoError(t, err, "Error parsing upload URL") - transferResults, err = client.DoCopy(ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) - assert.NoError(t, err, "Error uploading file") - assert.Equal(t, int64(len(testFileContent)), transferResults[0].TransferredBytes, "Uploaded file size does not match") - }) - t.Cleanup(func() { - os.RemoveAll(tmpPath) - os.RemoveAll(originDir) - }) - - viper.Reset() -} - // A test that spins up a federation, and tests object get and put func TestGetAndPutAuth(t *testing.T) { viper.Reset() @@ -273,7 +106,7 @@ func TestGetAndPutAuth(t *testing.T) { // This tests object get/put with a pelican:// url t.Run("testPelicanObjectPutAndGetWithPelicanUrl", func(t *testing.T) { - config.SetPreferredPrefix("pelican") + config.SetPreferredPrefix("PELICAN") // Set path for object to upload/download for _, export := range *fed.Exports { tempPath := tempFile.Name() @@ -298,33 +131,27 @@ func TestGetAndPutAuth(t *testing.T) { // This tests pelican object get/put with an osdf url t.Run("testPelicanObjectPutAndGetWithOSDFUrl", func(t *testing.T) { - config.SetPreferredPrefix("pelican") - for _, export := range *fed.Exports { - // Set path for object to upload/download - tempPath := tempFile.Name() - fileName := filepath.Base(tempPath) - // Minimal fix of test as it is soon to be replaced - uploadURL := fmt.Sprintf("pelican://%s/%s", export.FederationPrefix, fileName) + config.SetPreferredPrefix("PELICAN") + // Set path for object to upload/download + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + uploadStr := "osdf:///test/" + fileName + uploadURL, err := url.Parse(uploadStr) + assert.NoError(t, err) - // Upload the file with PUT - transferResultsUpload, err := client.DoPut(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) - assert.NoError(t, err) - if err == nil { - assert.Equal(t, transferResultsUpload[0].TransferredBytes, int64(17)) - } + // For OSDF url's, we don't want to rely on osdf metadata to be running since we manually discover "osg-htc.org" therefore, just ensure we get correct metadata for the url: + pelicanURL, err := client.NewPelicanURL(uploadURL, "osdf") + assert.NoError(t, err) - // Download that same file with GET - transferResultsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name())) - assert.NoError(t, err) - if err == nil { - assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes) - } - } + // Check valid metadata: + assert.Equal(t, "https://osdf-director.osg-htc.org", pelicanURL.DirectorUrl) + assert.Equal(t, "https://osdf-registry.osg-htc.org", pelicanURL.RegistryUrl) + assert.Equal(t, "osg-htc.org", pelicanURL.DiscoveryUrl) }) // This tests object get/put with a pelican:// url t.Run("testOsdfObjectPutAndGetWithPelicanUrl", func(t *testing.T) { - config.SetPreferredPrefix("osdf") + config.SetPreferredPrefix("OSDF") for _, export := range *fed.Exports { // Set path for object to upload/download tempPath := tempFile.Name() @@ -349,13 +176,13 @@ func TestGetAndPutAuth(t *testing.T) { // This tests pelican object get/put with an osdf url t.Run("testOsdfObjectPutAndGetWithOSDFUrl", func(t *testing.T) { - config.SetPreferredPrefix("osdf") + config.SetPreferredPrefix("OSDF") for _, export := range *fed.Exports { // Set path for object to upload/download tempPath := tempFile.Name() fileName := filepath.Base(tempPath) // Minimal fix of test as it is soon to be replaced - uploadURL := fmt.Sprintf("pelican://%s/%s", export.FederationPrefix, fileName) + uploadURL := fmt.Sprintf("osdf://%s/%s", export.FederationPrefix, fileName) // Upload the file with PUT transferResultsUpload, err := client.DoPut(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) @@ -374,6 +201,151 @@ func TestGetAndPutAuth(t *testing.T) { }) } +// A test that spins up a federation, and tests object get and put +func TestCopyAuth(t *testing.T) { + viper.Reset() + fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg) + + // Other set-up items: + testFileContent := "test file content" + // Create the temporary file to upload + tempFile, err := os.CreateTemp(t.TempDir(), "test") + assert.NoError(t, err, "Error creating temp file") + defer os.Remove(tempFile.Name()) + _, err = tempFile.WriteString(testFileContent) + assert.NoError(t, err, "Error writing to temp file") + tempFile.Close() + + issuer, err := config.GetServerIssuerURL() + require.NoError(t, err) + audience := config.GetServerAudience() + + // Create a token file + tokenConfig := token.NewWLCGToken() + tokenConfig.Lifetime = time.Minute + tokenConfig.Issuer = issuer + tokenConfig.Subject = "origin" + tokenConfig.AddAudiences(audience) + + scopes := []token_scopes.TokenScope{} + readScope, err := token_scopes.Storage_Read.Path("/") + assert.NoError(t, err) + scopes = append(scopes, readScope) + modScope, err := token_scopes.Storage_Modify.Path("/") + assert.NoError(t, err) + scopes = append(scopes, modScope) + tokenConfig.AddScopes(scopes...) + token, err := tokenConfig.CreateToken() + assert.NoError(t, err) + tempToken, err := os.CreateTemp(t.TempDir(), "token") + assert.NoError(t, err, "Error creating temp token file") + defer os.Remove(tempToken.Name()) + _, err = tempToken.WriteString(token) + assert.NoError(t, err, "Error writing to temp token file") + tempToken.Close() + // Disable progress bars to not reuse the same mpb instance + viper.Set("Logging.DisableProgressBars", true) + + // This tests object get/put with a pelican:// url + t.Run("testPelicanObjectCopyWithPelicanUrl", func(t *testing.T) { + config.SetPreferredPrefix("PELICAN") + // Set path for object to upload/download + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt()) + uploadURL := "pelican://" + hostname + "/test/" + fileName + + // Upload the file with PUT + transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsUpload[0].TransferredBytes, int64(17)) + } + + // Download that same file with GET + transferResultsDownload, err := client.DoCopy(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes) + } + }) + + // This tests pelican object get/put with an osdf url + t.Run("testPelicanObjectCopyWithOSDFUrl", func(t *testing.T) { + config.SetPreferredPrefix("PELICAN") + // Set path for object to upload/download + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + uploadStr := "osdf:///test/" + fileName + uploadURL, err := url.Parse(uploadStr) + assert.NoError(t, err) + + // For OSDF url's, we don't want to rely on osdf metadata to be running since we manually discover "osg-htc.org" therefore, just ensure we get correct metadata for the url: + pelicanURL, err := client.NewPelicanURL(uploadURL, "osdf") + assert.NoError(t, err) + + // Check valid metadata: + assert.Equal(t, "https://osdf-director.osg-htc.org", pelicanURL.DirectorUrl) + assert.Equal(t, "https://osdf-registry.osg-htc.org", pelicanURL.RegistryUrl) + assert.Equal(t, "osg-htc.org", pelicanURL.DiscoveryUrl) + }) + + // This tests object get/put with a pelican:// url + t.Run("testOsdfObjectCopyWithPelicanUrl", func(t *testing.T) { + config.SetPreferredPrefix("OSDF") + // Set path for object to upload/download + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt()) + uploadURL := "pelican://" + hostname + "/test/" + fileName + + // Upload the file with PUT + transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsUpload[0].TransferredBytes, int64(17)) + } + + // Download that same file with GET + transferResultsDownload, err := client.DoCopy(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes) + } + }) + + // This tests pelican object get/put with an osdf url + t.Run("testOsdfObjectCopyWithOSDFUrl", func(t *testing.T) { + config.SetPreferredPrefix("OSDF") + // Set path for object to upload/download + tempPath := tempFile.Name() + fileName := filepath.Base(tempPath) + hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt()) + uploadURL := "osdf:///test/" + fileName + + // Set our metadata values in config since that is what this url scheme - prefix combo does in handle_http + metadata, err := config.DiscoverUrlFederation("https://" + hostname) + assert.NoError(t, err) + viper.Set("Federation.DirectorUrl", metadata.DirectorEndpoint) + viper.Set("Federation.RegistryUrl", metadata.NamespaceRegistrationEndpoint) + viper.Set("Federation.DiscoveryUrl", hostname) + + // Upload the file with PUT + transferResultsUpload, err := client.DoCopy(fed.Ctx, tempFile.Name(), uploadURL, false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsUpload[0].TransferredBytes, int64(17)) + } + + // Download that same file with GET + transferResultsDownload, err := client.DoCopy(fed.Ctx, uploadURL, t.TempDir(), false, client.WithTokenLocation(tempToken.Name())) + assert.NoError(t, err) + if err == nil { + assert.Equal(t, transferResultsDownload[0].TransferredBytes, transferResultsUpload[0].TransferredBytes) + } + }) +} + // A test that spins up the federation, where the origin is in EnablePublicReads mode. Then GET a file from the origin without a token func TestGetPublicRead(t *testing.T) { ctx, _, _ := test_utils.TestContext(context.Background(), t) @@ -412,7 +384,6 @@ func TestGetPublicRead(t *testing.T) { func TestRecursiveUploadsAndDownloads(t *testing.T) { // Create instance of test federation - ctx, _, _ := test_utils.TestContext(context.Background(), t) viper.Reset() common.ResetOriginExports() @@ -467,97 +438,38 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { tempFile2.Close() t.Run("testPelicanRecursiveGetAndPutOsdfURL", func(t *testing.T) { - config.SetPreferredPrefix("pelican") - for _, export := range *fed.Exports { - // Set path for object to upload/download - tempPath := tempDir - dirName := filepath.Base(tempPath) - // Note: minimally fixing this test as it is soon to be replaced - uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), - export.FederationPrefix, "pel_osdf", dirName) - - ////////////////////////////////////////////////////////// - - // Upload the file with PUT - transferDetailsUpload, err := client.DoPut(ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) - require.NoError(t, err) - if err == nil && len(transferDetailsUpload) == 2 { - countBytes17 := 0 - countBytes23 := 0 - // Verify we got the correct files back (have to do this since files upload in different orders at times) - for _, transfer := range transferDetailsUpload { - transferredBytes := transfer.TransferredBytes - switch transferredBytes { - case int64(17): - countBytes17++ - continue - case int64(23): - countBytes23++ - continue - default: - // We got a byte amount we are not expecting - t.Fatal("did not upload proper amount of bytes") - } - } - if countBytes17 != 1 || countBytes23 != 1 { - // We would hit this case if 1 counter got hit twice for some reason - t.Fatal("One of the files was not uploaded correctly") - } - } else if len(transferDetailsUpload) != 2 { - t.Fatalf("Amount of transfers results returned for upload was not correct. Transfer details returned: %d", len(transferDetailsUpload)) - } + config.SetPreferredPrefix("PELICAN") + // Set path for object to upload/download + tempPath := tempDir + dirName := filepath.Base(tempPath) + uploadStr := "osdf:///test/" + dirName + uploadURL, err := url.Parse(uploadStr) + assert.NoError(t, err) - // Download the files we just uploaded - var transferDetailsDownload []client.TransferResults - if export.Capabilities.PublicReads { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true) - } else { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) - } - assert.NoError(t, err) - if err == nil && len(transferDetailsUpload) == 2 { - countBytesUploadIdx0 := 0 - countBytesUploadIdx1 := 0 - // Verify we got the correct files back (have to do this since files upload in different orders at times) - // In this case, we want to match them to the sizes of the uploaded files - for _, transfer := range transferDetailsUpload { - transferredBytes := transfer.TransferredBytes - switch transferredBytes { - case transferDetailsUpload[0].TransferredBytes: - countBytesUploadIdx0++ - continue - case transferDetailsUpload[1].TransferredBytes: - countBytesUploadIdx1++ - continue - default: - // We got a byte amount we are not expecting - t.Fatal("did not download proper amount of bytes") - } - } - if countBytesUploadIdx0 != 1 || countBytesUploadIdx1 != 1 { - // We would hit this case if 1 counter got hit twice for some reason - t.Fatal("One of the files was not downloaded correctly") - } else if len(transferDetailsDownload) != 2 { - t.Fatalf("Amount of transfers results returned for download was not correct. Transfer details returned: %d", len(transferDetailsDownload)) - } - } + // For OSDF url's, we don't want to rely on osdf metadata to be running therefore, just ensure we get correct metadata for the url: + pelicanURL, err := client.NewPelicanURL(uploadURL, "osdf") + assert.NoError(t, err) - } + // Check valid metadata: + assert.Equal(t, "https://osdf-director.osg-htc.org", pelicanURL.DirectorUrl) + assert.Equal(t, "https://osdf-registry.osg-htc.org", pelicanURL.RegistryUrl) + assert.Equal(t, "osg-htc.org", pelicanURL.DiscoveryUrl) }) t.Run("testPelicanRecursiveGetAndPutPelicanURL", func(t *testing.T) { - config.SetPreferredPrefix("pelican") + config.SetPreferredPrefix("PELICAN") for _, export := range *fed.Exports { // Set path for object to upload/download tempPath := tempDir dirName := filepath.Base(tempPath) - uploadURL := fmt.Sprintf("pelican://%s/%s/%s", export.FederationPrefix, "pel_pel", dirName) + uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), + export.FederationPrefix, "osdf_osdf", dirName) - ////////////////////////////////////////////////////////// + log.Errorf("\n\n\n%v\n\n\n", uploadURL) // Upload the file with PUT - transferDetailsUpload, err := client.DoPut(ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) assert.NoError(t, err) if err == nil && len(transferDetailsUpload) == 2 { countBytes17 := 0 @@ -588,9 +500,9 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { // Download the files we just uploaded var transferDetailsDownload []client.TransferResults if export.Capabilities.PublicReads { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true) } else { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) } assert.NoError(t, err) if err == nil && len(transferDetailsUpload) == 2 { @@ -623,19 +535,26 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { }) t.Run("testOsdfRecursiveGetAndPutOsdfURL", func(t *testing.T) { - config.SetPreferredPrefix("osdf") + config.SetPreferredPrefix("OSDF") for _, export := range *fed.Exports { // Set path for object to upload/download tempPath := tempDir dirName := filepath.Base(tempPath) // Note: minimally fixing this test as it is soon to be replaced - uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), + uploadURL := fmt.Sprintf("osdf:///%s/%s/%s", export.FederationPrefix, "osdf_osdf", dirName) - ////////////////////////////////////////////////////////// + hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt()) + + // Set our metadata values in config since that is what this url scheme - prefix combo does in handle_http + metadata, err := config.DiscoverUrlFederation("https://" + hostname) + assert.NoError(t, err) + viper.Set("Federation.DirectorUrl", metadata.DirectorEndpoint) + viper.Set("Federation.RegistryUrl", metadata.NamespaceRegistrationEndpoint) + viper.Set("Federation.DiscoveryUrl", hostname) // Upload the file with PUT - transferDetailsUpload, err := client.DoPut(ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) assert.NoError(t, err) if err == nil && len(transferDetailsUpload) == 2 { countBytes17 := 0 @@ -667,9 +586,9 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { tmpDir := t.TempDir() var transferDetailsDownload []client.TransferResults if export.Capabilities.PublicReads { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, tmpDir, true) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, tmpDir, true) } else { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, tmpDir, true, client.WithTokenLocation(tempToken.Name())) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, tmpDir, true, client.WithTokenLocation(tempToken.Name())) } assert.NoError(t, err) if err == nil && len(transferDetailsDownload) == 2 { @@ -708,17 +627,17 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { }) t.Run("testOsdfRecursiveGetAndPutPelicanURL", func(t *testing.T) { - config.SetPreferredPrefix("osdf") + config.SetPreferredPrefix("OSDF") for _, export := range *fed.Exports { // Set path for object to upload/download tempPath := tempDir dirName := filepath.Base(tempPath) - uploadURL := fmt.Sprintf("pelican://%s/%s/%s", export.FederationPrefix, "osdf_pel", dirName) + uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), + export.FederationPrefix, "osdf_osdf", dirName) - ////////////////////////////////////////////////////////// // Upload the file with PUT - transferDetailsUpload, err := client.DoPut(ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) + transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name())) assert.NoError(t, err) if err == nil && len(transferDetailsUpload) == 2 { countBytes17 := 0 @@ -749,9 +668,9 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) { // Download the files we just uploaded var transferDetailsDownload []client.TransferResults if export.Capabilities.PublicReads { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true) } else { - transferDetailsDownload, err = client.DoGet(ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) + transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name())) } assert.NoError(t, err) if err == nil && len(transferDetailsUpload) == 2 { diff --git a/client/handle_http.go b/client/handle_http.go index fc49b4be0..7965b0a23 100644 --- a/client/handle_http.go +++ b/client/handle_http.go @@ -41,11 +41,11 @@ import ( "github.com/VividCortex/ewma" "github.com/google/uuid" + "github.com/jellydator/ttlcache/v3" "github.com/lestrrat-go/option" "github.com/opensaucerer/grab/v3" "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/spf13/viper" "github.com/studio-b12/gowebdav" "github.com/vbauerster/mpb/v8" "golang.org/x/sync/errgroup" @@ -59,6 +59,27 @@ import ( var ( progressCtrOnce sync.Once progressCtr *mpb.Progress + + PelicanURLCache = ttlcache.New[string, PelicanURL]( + ttlcache.WithTTL[string, PelicanURL](30*time.Minute), + ttlcache.WithLoader[string, PelicanURL](loader), + ) + + loader = ttlcache.LoaderFunc[string, PelicanURL]( + func(c *ttlcache.Cache[string, PelicanURL], key string) *ttlcache.Item[string, PelicanURL] { + urlFederation, err := config.DiscoverUrlFederation(key) + if err != nil { + return nil + } + // Set our local url metadata + item := c.Set(key, PelicanURL{ + DirectorUrl: urlFederation.DirectorEndpoint, + DiscoveryUrl: key, + RegistryUrl: urlFederation.NamespaceRegistrationEndpoint, + }, ttlcache.DefaultTTL) + return item + }, + ) ) type ( @@ -233,6 +254,13 @@ type ( NeedsToken bool PackOption string } + + PelicanURL struct { + ObjectUrl *url.URL + DiscoveryUrl string + DirectorUrl string + RegistryUrl string + } ) const ( @@ -334,6 +362,86 @@ func (tr TransferResults) ID() string { return tr.jobId.String() } +func NewPelicanURL(remoteUrl *url.URL, scheme string) (pelicanURL PelicanURL, err error) { + if remoteUrl.Host != "" { + if scheme == "osdf" || scheme == "stash" { + // in the osdf/stash case, fix url's that have a hostname + remoteUrl.Path, err = url.JoinPath(remoteUrl.Host, remoteUrl.Path) + if err != nil { + log.Errorln("Failed to join remote destination url path:", err) + return PelicanURL{}, err + } + } else if scheme == "pelican" { + // If we have a host and url is pelican, we need to extract federation data from the host + log.Debugln("Detected pelican:// url, getting federation metadata from specified host") + federationUrl := &url.URL{} + // federationUrl, _ := url.Parse(remoteUrl.String()) + federationUrl.Scheme = "https" + federationUrl.Path = "" + federationUrl.Host = remoteUrl.Host + + // Check if cache has key of federationURL, if not, loader will add it: + pelicanUrlItem := PelicanURLCache.Get(federationUrl.String()) + if pelicanUrlItem != nil { + pelicanURL = pelicanUrlItem.Value() + } else { + return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache. Provided url: %s", federationUrl.String()) + } + } + } + + // With an osdf:// url scheme, we assume the user will be using the OSDF so load in our osdf metadata for our url + if scheme == "osdf" { + // If we are using an osdf/stash binary, we discovered the federation already --> load into local url metadata + if config.GetPreferredPrefix() == "OSDF" { + log.Debugln("Detected an osdf binary with an osdf:// url, populating metadata with osdf defaults") + if param.Federation_DirectorUrl.GetString() == "" || param.Federation_DiscoveryUrl.GetString() == "" || param.Federation_RegistryUrl.GetString() == "" { + return PelicanURL{}, fmt.Errorf("osdf default metadata is not populated in config") + } else { + pelicanURL.DirectorUrl = param.Federation_DirectorUrl.GetString() + pelicanURL.DiscoveryUrl = param.Federation_DiscoveryUrl.GetString() + pelicanURL.RegistryUrl = param.Federation_RegistryUrl.GetString() + } + } else if config.GetPreferredPrefix() == "PELICAN" { + // We hit this case when we are using a pelican binary but an osdf:// url, therefore we need to disover the osdf federation + log.Debugln("Detected an pelican binary with an osdf:// url, populating metadata with osdf defaults") + // Check if cache has key of federationURL, if not, loader will add it: + pelicanUrlItem := PelicanURLCache.Get("osg-htc.org") + if pelicanUrlItem != nil { + pelicanURL = pelicanUrlItem.Value() + } else { + return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache") + } + } + } else if scheme == "pelican" && remoteUrl.Host == "" { + // We hit this case when we do not have a hostname with a pelican:// url + if param.Federation_DiscoveryUrl.GetString() == "" { + return PelicanURL{}, fmt.Errorf("Pelican url scheme without discovery-url detected, please provide a federation discovery-url within the hostname or with the -f flag") + } else { + // Check if cache has key of federationURL, if not, loader will add it: + pelicanUrlItem := PelicanURLCache.Get(param.Federation_DiscoveryUrl.GetString()) + if pelicanUrlItem != nil { + pelicanURL = pelicanUrlItem.Value() + } else { + return PelicanURL{}, fmt.Errorf("Issue getting metadata information from cache") + } + } + } else if scheme == "" { + // If we don't have a url scheme, then our metadata information should be in the config + log.Debugln("No url scheme detected, getting metadata information from configuration") + pelicanURL.DirectorUrl = param.Federation_DirectorUrl.GetString() + pelicanURL.DiscoveryUrl = param.Federation_DiscoveryUrl.GetString() + pelicanURL.RegistryUrl = param.Federation_RegistryUrl.GetString() + + // If the values do not exist, exit with failure + if pelicanURL.DirectorUrl == "" || pelicanURL.DiscoveryUrl == "" || pelicanURL.RegistryUrl == "" { + return PelicanURL{}, fmt.Errorf("Missing metadata information in config, ensure Federation DirectorUrl, RegistryUrl, and DiscoverUrl are all set") + } + } + pelicanURL.ObjectUrl = remoteUrl + return pelicanURL, nil +} + // Returns a new transfer engine object whose lifetime is tied // to the provided context. Will launcher worker goroutines to // handle the underlying transfers @@ -736,14 +844,25 @@ func (te *TransferEngine) runJobHandler() error { // // The returned object can be further customized as desired. // This function does not "submit" the job for execution. -func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) { +func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, scheme string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error) { id, err := uuid.NewV7() if err != nil { return } - copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues. + pelicanURL, err := NewPelicanURL(remoteUrl, scheme) + if err != nil { + err = errors.Wrap(err, "error generating metadata for specified url") + return + } + + if pelicanURL.ObjectUrl == nil { + err = errors.Wrap(err, "No object url found") + return + } + + copyUrl := *pelicanURL.ObjectUrl // Make a copy of the input URL to avoid concurrent issues. tj = &TransferJob{ caches: tc.caches, recursive: recursive, @@ -773,40 +892,11 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u } } - if remoteUrl.Scheme == "pelican" && remoteUrl.Host != "" { - fd := config.GetFederation() - defer config.SetFederation(fd) - config.SetFederation(config.FederationDiscovery{}) - fedUrlCopy := *remoteUrl - fedUrlCopy.Scheme = "https" - fedUrlCopy.Path = "" - fedUrlCopy.RawFragment = "" - fedUrlCopy.RawQuery = "" - viper.Set("Federation.DiscoveryUrl", fedUrlCopy.String()) - if err = config.DiscoverFederation(); err != nil { - return - } - } else if remoteUrl.Scheme == "osdf" { - if remoteUrl.Host != "" { - remoteUrl.Path = path.Clean(path.Join("/", remoteUrl.Host, remoteUrl.Path)) - } - fd := config.GetFederation() - defer config.SetFederation(fd) - config.SetFederation(config.FederationDiscovery{}) - fedUrl := &url.URL{} - fedUrl.Scheme = "https" - fedUrl.Host = "osg-htc.org" - viper.Set("Federation.DiscoveryUrl", fedUrl.String()) - if err = config.DiscoverFederation(); err != nil { - return - } - } - - tj.useDirector = param.Federation_DirectorUrl.GetString() != "" - ns, err := getNamespaceInfo(remoteUrl.Path, param.Federation_DirectorUrl.GetString(), upload) + tj.useDirector = pelicanURL.DirectorUrl != "" + ns, err := getNamespaceInfo(pelicanURL.ObjectUrl.Path, pelicanURL.DirectorUrl, upload) if err != nil { log.Errorln(err) - err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl) + err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", pelicanURL.ObjectUrl) } tj.namespace = ns diff --git a/client/handle_http_test.go b/client/handle_http_test.go index ba283ac6e..168316cb3 100644 --- a/client/handle_http_test.go +++ b/client/handle_http_test.go @@ -23,6 +23,7 @@ package client import ( "bytes" "context" + "encoding/json" "net" "net/http" "net/http/httptest" @@ -450,3 +451,123 @@ func TestSortAttempts(t *testing.T) { assert.Equal(t, svr2.URL, results[0].Url.String()) assert.Equal(t, svr3.URL, results[1].Url.String()) } + +func TestNewPelicanURL(t *testing.T) { + t.Run("TestOsdfOrStashSchemeWithOSDFPrefixNoError", func(t *testing.T) { + viper.Reset() + config.SetPreferredPrefix("OSDF") + remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt" + remoteObjectURL, err := url.Parse(remoteObject) + assert.NoError(t, err) + + // Instead of relying on osdf, let's just set our global metadata (osdf prefix does this for us) + viper.Set("Federation.DirectorUrl", "someDirectorUrl") + viper.Set("Federation.DiscoveryUrl", "someDiscoveryUrl") + viper.Set("Federation.RegistryUrl", "someRegistryUrl") + + pelicanURL, err := NewPelicanURL(remoteObjectURL, "osdf") + assert.NoError(t, err) + + // Check pelicanURL properly filled out + assert.Equal(t, "someDirectorUrl", pelicanURL.DirectorUrl) + assert.Equal(t, "someDiscoveryUrl", pelicanURL.DiscoveryUrl) + assert.Equal(t, "someRegistryUrl", pelicanURL.RegistryUrl) + assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl) + viper.Reset() + }) + + t.Run("TestOsdfOrStashSchemeWithOSDFPrefixWithError", func(t *testing.T) { + viper.Reset() + config.SetPreferredPrefix("OSDF") + remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt" + remoteObjectURL, err := url.Parse(remoteObject) + assert.NoError(t, err) + + // Instead of relying on osdf, let's just set our global metadata but don't set one piece + viper.Set("Federation.DirectorUrl", "someDirectorUrl") + viper.Set("Federation.DiscoveryUrl", "someDiscoveryUrl") + + _, err = NewPelicanURL(remoteObjectURL, "osdf") + // Make sure we get an error + assert.Error(t, err) + viper.Reset() + }) + + t.Run("TestOsdfOrStashSchemeWithPelicanPrefixNoError", func(t *testing.T) { + viper.Reset() + config.SetPreferredPrefix("PELICAN") + remoteObject := "osdf:///something/somewhere/thatdoesnotexist.txt" + remoteObjectURL, err := url.Parse(remoteObject) + assert.NoError(t, err) + + pelicanURL, err := NewPelicanURL(remoteObjectURL, "osdf") + assert.NoError(t, err) + + // Check pelicanURL properly filled out + assert.Equal(t, "https://osdf-director.osg-htc.org", pelicanURL.DirectorUrl) + assert.Equal(t, "osg-htc.org", pelicanURL.DiscoveryUrl) + assert.Equal(t, "https://osdf-registry.osg-htc.org", pelicanURL.RegistryUrl) + assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl) + viper.Reset() + // Note: can't really test this for an error since that would require osg-htc.org to be down + }) + + t.Run("TestPelicanSchemeNoError", func(t *testing.T) { + viper.Reset() + viper.Set("TLSSkipVerify", true) + err := config.InitClient() + assert.NoError(t, err) + // Create a server that gives us a mock response + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // make our response: + response := config.FederationDiscovery{ + DirectorEndpoint: "director", + NamespaceRegistrationEndpoint: "registry", + JwksUri: "jwks", + BrokerEndpoint: "broker", + } + + responseJSON, err := json.Marshal(response) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + _, err = w.Write(responseJSON) + assert.NoError(t, err) + })) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) + + remoteObject := "pelican://" + serverURL.Host + "/something/somewhere/thatdoesnotexist.txt" + remoteObjectURL, err := url.Parse(remoteObject) + assert.NoError(t, err) + + pelicanURL, err := NewPelicanURL(remoteObjectURL, "pelican") + assert.NoError(t, err) + + // Check pelicanURL properly filled out + assert.Equal(t, "director", pelicanURL.DirectorUrl) + assert.Equal(t, server.URL, pelicanURL.DiscoveryUrl) + assert.Equal(t, "registry", pelicanURL.RegistryUrl) + assert.Equal(t, remoteObjectURL, pelicanURL.ObjectUrl) + // Check to make sure it was populated in our cache + assert.True(t, PelicanURLCache.Has(pelicanURL.DiscoveryUrl)) + viper.Reset() + }) + + t.Run("TestPelicanSchemeWithError", func(t *testing.T) { + viper.Reset() + + remoteObject := "pelican://some-host/something/somewhere/thatdoesnotexist.txt" + remoteObjectURL, err := url.Parse(remoteObject) + assert.NoError(t, err) + + _, err = NewPelicanURL(remoteObjectURL, "pelican") + assert.Error(t, err) + viper.Reset() + }) +} diff --git a/client/main.go b/client/main.go index fb18fdc3e..1c9e885dc 100644 --- a/client/main.go +++ b/client/main.go @@ -430,6 +430,7 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name } return } else { + log.Debugln("Director URL not found, searching in topology") ns, err = namespaces.MatchNamespace(resourcePath) if err != nil { return @@ -438,6 +439,17 @@ func getNamespaceInfo(resourcePath, OSDFDirectorUrl string, isPut bool) (ns name } } +func schemeUnderstood(scheme string) error { + understoodSchemes := []string{"file", "osdf", "pelican", "stash", ""} + + _, foundDest := find(understoodSchemes, scheme) + if !foundDest { + return fmt.Errorf("Do not understand the destination scheme: %s. Permitted values are %s", + scheme, strings.Join(understoodSchemes, ", ")) + } + return nil +} + /* Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request @@ -463,37 +475,14 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re return nil, err } remoteDestUrl.Scheme = remoteDestScheme - fd := config.GetFederation() - defer config.SetFederation(fd) - if remoteDestUrl.Host != "" { - if remoteDestUrl.Scheme == "osdf" || remoteDestUrl.Scheme == "stash" { - remoteDestUrl.Path, err = url.JoinPath(remoteDestUrl.Host, remoteDestUrl.Path) - if err != nil { - log.Errorln("Failed to join remote destination url path:", err) - return nil, err - } - } else if remoteDestUrl.Scheme == "pelican" { - - config.SetFederation(config.FederationDiscovery{}) - federationUrl, _ := url.Parse(remoteDestUrl.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return nil, err - } - } - } remoteDestScheme, _ = getTokenName(remoteDestUrl) - understoodSchemes := []string{"file", "osdf", "pelican", ""} - _, foundDest := find(understoodSchemes, remoteDestScheme) - if !foundDest { - return nil, fmt.Errorf("Do not understand the destination scheme: %s. Permitted values are %s", - remoteDestUrl.Scheme, strings.Join(understoodSchemes, ", ")) + // Check if we understand the found url scheme + err = schemeUnderstood(remoteDestScheme) + if err != nil { + return nil, err } te := NewTransferEngine(ctx) @@ -506,7 +495,7 @@ func DoPut(ctx context.Context, localObject string, remoteDestination string, re if err != nil { return } - tj, err := client.NewTransferJob(remoteDestUrl, localObject, true, recursive) + tj, err := client.NewTransferJob(remoteDestUrl, localObject, remoteDestScheme, true, recursive) if err != nil { return } @@ -551,39 +540,14 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re return nil, err } remoteObjectUrl.Scheme = remoteObjectScheme - fd := config.GetFederation() - defer config.SetFederation(fd) - - // If there is a host specified, prepend it to the path in the osdf case - if remoteObjectUrl.Host != "" { - if remoteObjectUrl.Scheme == "osdf" { - remoteObjectUrl.Path, err = url.JoinPath(remoteObjectUrl.Host, remoteObjectUrl.Path) - if err != nil { - log.Errorln("Failed to join source url path:", err) - return nil, err - } - } else if remoteObjectUrl.Scheme == "pelican" { - - config.SetFederation(config.FederationDiscovery{}) - federationUrl, _ := url.Parse(remoteObjectUrl.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return nil, err - } - } - } + // This is for condor cases: remoteObjectScheme, _ = getTokenName(remoteObjectUrl) - understoodSchemes := []string{"file", "osdf", "pelican", ""} - - _, foundSource := find(understoodSchemes, remoteObjectScheme) - if !foundSource { - return nil, fmt.Errorf("Do not understand the source scheme: %s. Permitted values are %s", - remoteObjectUrl.Scheme, strings.Join(understoodSchemes, ", ")) + // Check if we understand the found url scheme + err = schemeUnderstood(remoteObjectScheme) + if err != nil { + return nil, err } if remoteObjectScheme == "osdf" || remoteObjectScheme == "pelican" { @@ -630,7 +594,7 @@ func DoGet(ctx context.Context, remoteObject string, localDestination string, re if err != nil { return } - tj, err := tc.NewTransferJob(remoteObjectUrl, localDestination, false, recursive) + tj, err := tc.NewTransferJob(remoteObjectUrl, localDestination, remoteObjectScheme, false, recursive) if err != nil { return } @@ -703,64 +667,16 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv sourceURL.Scheme = source_scheme destination, dest_scheme := correctURLWithUnderscore(destination) - dest_url, err := url.Parse(destination) + destURL, err := url.Parse(destination) if err != nil { log.Errorln("Failed to parse destination URL:", err) return nil, err } - dest_url.Scheme = dest_scheme - fd := config.GetFederation() - defer config.SetFederation(fd) - - // If there is a host specified, prepend it to the path in the osdf case - if sourceURL.Host != "" { - if sourceURL.Scheme == "osdf" || sourceURL.Scheme == "stash" { - sourceURL.Path = "/" + path.Join(sourceURL.Host, sourceURL.Path) - } else if sourceURL.Scheme == "pelican" { - config.SetFederation(config.FederationDiscovery{}) - federationUrl, _ := url.Parse(sourceURL.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return nil, err - } - } - } - - if dest_url.Host != "" { - if dest_url.Scheme == "osdf" || dest_url.Scheme == "stash" { - dest_url.Path = "/" + path.Join(dest_url.Host, dest_url.Path) - } else if dest_url.Scheme == "pelican" { - config.SetFederation(config.FederationDiscovery{}) - federationUrl, _ := url.Parse(dest_url.String()) - federationUrl.Scheme = "https" - federationUrl.Path = "" - viper.Set("Federation.DiscoveryUrl", federationUrl.String()) - err = config.DiscoverFederation() - if err != nil { - return nil, err - } - } - } + destURL.Scheme = dest_scheme + // Check for scheme here for when using condor sourceScheme, _ := getTokenName(sourceURL) - destScheme, _ := getTokenName(dest_url) - - understoodSchemes := []string{"stash", "file", "osdf", "pelican", ""} - - _, foundSource := find(understoodSchemes, sourceScheme) - if !foundSource { - log.Errorln("Do not understand source scheme:", sourceURL.Scheme) - return nil, errors.New("Do not understand source scheme") - } - - _, foundDest := find(understoodSchemes, destScheme) - if !foundDest { - log.Errorln("Do not understand destination scheme:", sourceURL.Scheme) - return nil, errors.New("Do not understand destination scheme") - } + destScheme, _ := getTokenName(destURL) payload := payloadStruct{} parseJobAd(&payload) @@ -769,14 +685,27 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv var localPath string var remoteURL *url.URL + var remoteScheme string if isPut { - log.Debugln("Detected object write to remote federation object", dest_url.Path) + // Verify valid scheme + err = schemeUnderstood(destScheme) + if err != nil { + return nil, err + } + + log.Debugln("Detected object write to remote federation object", destURL.Path) localPath = sourceFile - remoteURL = dest_url + remoteURL = destURL + remoteScheme = destScheme } else { + // Verify valid scheme + err = schemeUnderstood(sourceScheme) + if err != nil { + return nil, err + } - if dest_url.Scheme == "file" { - destination = dest_url.Path + if destURL.Scheme == "file" { + destination = destURL.Path } if sourceScheme == "stash" || sourceScheme == "osdf" || sourceScheme == "pelican" { @@ -801,6 +730,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv } localPath = destination remoteURL = sourceURL + remoteScheme = sourceScheme } payload.version = config.GetVersion() @@ -827,7 +757,7 @@ func DoCopy(ctx context.Context, sourceFile string, destination string, recursiv if err != nil { return } - tj, err := tc.NewTransferJob(remoteURL, localPath, isPut, recursive) + tj, err := tc.NewTransferJob(remoteURL, localPath, remoteScheme, isPut, recursive) if err != nil { return } diff --git a/client/main_test.go b/client/main_test.go index 435a22e9b..e80779349 100644 --- a/client/main_test.go +++ b/client/main_test.go @@ -330,3 +330,36 @@ func TestParseNoJobAd(t *testing.T) { payload := payloadStruct{} parseJobAd(&payload) } + +func TestSchemeUnderstood(t *testing.T) { + t.Run("TestProperSchemeOsdf", func(t *testing.T) { + scheme := "osdf" + err := schemeUnderstood(scheme) + assert.NoError(t, err) + }) + t.Run("TestProperSchemeStash", func(t *testing.T) { + scheme := "stash" + err := schemeUnderstood(scheme) + assert.NoError(t, err) + }) + t.Run("TestProperSchemePelican", func(t *testing.T) { + scheme := "pelican" + err := schemeUnderstood(scheme) + assert.NoError(t, err) + }) + t.Run("TestProperSchemeFile", func(t *testing.T) { + scheme := "file" + err := schemeUnderstood(scheme) + assert.NoError(t, err) + }) + t.Run("TestProperSchemeEmpty", func(t *testing.T) { + scheme := "" + err := schemeUnderstood(scheme) + assert.NoError(t, err) + }) + t.Run("TestImproperScheme", func(t *testing.T) { + scheme := "ThisSchemeDoesNotExistAndHopefullyNeverWill" + err := schemeUnderstood(scheme) + assert.Error(t, err) + }) +} diff --git a/cmd/object_copy.go b/cmd/object_copy.go index 85ded6cb6..fb829b25d 100644 --- a/cmd/object_copy.go +++ b/cmd/object_copy.go @@ -200,6 +200,10 @@ func copyMain(cmd *cobra.Command, args []string) { } } + // Start our cache for url metadata + go client.PelicanURLCache.Start() + defer client.PelicanURLCache.Stop() + var result error lastSrc := "" for _, src := range source { diff --git a/cmd/object_get.go b/cmd/object_get.go index 8301bca26..3e26efc26 100644 --- a/cmd/object_get.go +++ b/cmd/object_get.go @@ -115,6 +115,10 @@ func getMain(cmd *cobra.Command, args []string) { } } + // Start our cache for url metadata + go client.PelicanURLCache.Start() + defer client.PelicanURLCache.Stop() + var result error lastSrc := "" for _, src := range source { diff --git a/cmd/object_put.go b/cmd/object_put.go index 31f332e69..218d2953a 100644 --- a/cmd/object_put.go +++ b/cmd/object_put.go @@ -85,6 +85,10 @@ func putMain(cmd *cobra.Command, args []string) { log.Debugln("Sources:", source) log.Debugln("Destination:", dest) + // Start our cache for url metadata + go client.PelicanURLCache.Start() + defer client.PelicanURLCache.Stop() + var result error lastSrc := "" for _, src := range source { diff --git a/cmd/plugin.go b/cmd/plugin.go index 104c1053c..ad552cd82 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -431,7 +431,7 @@ func runPluginWorker(ctx context.Context, upload bool, workChan <-chan PluginTra var tj *client.TransferJob urlCopy := *transfer.url - tj, err = tc.NewTransferJob(&urlCopy, transfer.localFile, upload, false, client.WithAcquireToken(false), client.WithCaches(caches...)) + tj, err = tc.NewTransferJob(&urlCopy, transfer.localFile, urlCopy.Scheme, upload, false, client.WithAcquireToken(false), client.WithCaches(caches...)) jobMap[tj.ID()] = transfer if err != nil { return errors.Wrap(err, "Failed to create new transfer job") diff --git a/config/config.go b/config/config.go index 9a8bf6e45..d5460c297 100644 --- a/config/config.go +++ b/config/config.go @@ -364,6 +364,76 @@ func GetAllPrefixes() []string { return prefixes } +// This function is for discovering federations as specified by a url during a pelican:// transfer. +// this does not populate global fields and is more temporary per url +func DiscoverUrlFederation(federationDiscoveryUrl string) (metadata FederationDiscovery, err error) { + log.Debugln("Performing federation service discovery for specified url against endpoint", federationDiscoveryUrl) + federationUrl, err := url.Parse(federationDiscoveryUrl) + if err != nil { + return FederationDiscovery{}, errors.Wrapf(err, "Invalid federation value %s:", federationDiscoveryUrl) + } + federationUrl.Scheme = "https" + if len(federationUrl.Path) > 0 && len(federationUrl.Host) == 0 { + federationUrl.Host = federationUrl.Path + federationUrl.Path = "" + } + + discoveryUrl, err := url.Parse(federationUrl.String()) + if err != nil { + return FederationDiscovery{}, errors.Wrap(err, "unable to parse federation discovery URL") + } + discoveryUrl.Path, err = url.JoinPath(federationUrl.Path, ".well-known/pelican-configuration") + if err != nil { + return FederationDiscovery{}, errors.Wrap(err, "Unable to parse federation url because of invalid path") + } + + httpClient := http.Client{ + Transport: GetTransport(), + Timeout: time.Second * 5, + } + req, err := http.NewRequest(http.MethodGet, discoveryUrl.String(), nil) + if err != nil { + return FederationDiscovery{}, errors.Wrapf(err, "Failure when doing federation metadata request creation for %s", discoveryUrl) + } + req.Header.Set("User-Agent", "pelican/"+version) + + result, err := httpClient.Do(req) + if err != nil { + return FederationDiscovery{}, errors.Wrapf(err, "Failure when doing federation metadata lookup to %s", discoveryUrl) + } + + if result.Body != nil { + defer result.Body.Close() + } + + body, err := io.ReadAll(result.Body) + if err != nil { + return FederationDiscovery{}, errors.Wrapf(err, "Failure when doing federation metadata read to %s", discoveryUrl) + } + + if result.StatusCode != http.StatusOK { + truncatedMessage := string(body) + if len(body) > 1000 { + truncatedMessage = string(body[:1000]) + truncatedMessage += " [... remainder truncated ...]" + } + return FederationDiscovery{}, errors.Errorf("Federation metadata discovery failed with HTTP status %d. Error message: %s", result.StatusCode, truncatedMessage) + } + + metadata = FederationDiscovery{} + err = json.Unmarshal(body, &metadata) + if err != nil { + return FederationDiscovery{}, errors.Wrapf(err, "Failure when parsing federation metadata at %s", discoveryUrl) + } + + log.Debugln("Federation service discovery resulted in director URL", metadata.DirectorEndpoint) + log.Debugln("Federation service discovery resulted in registry URL", metadata.NamespaceRegistrationEndpoint) + log.Debugln("Federation service discovery resulted in JWKS URL", metadata.JwksUri) + log.Debugln("Federation service discovery resulted in broker URL", metadata.BrokerEndpoint) + + return metadata, nil +} + func DiscoverFederation() error { federationStr := param.Federation_DiscoveryUrl.GetString() externalUrlStr := param.Server_ExternalWebUrl.GetString() @@ -396,93 +466,47 @@ func DiscoverFederation() error { curRegistryURL := param.Federation_RegistryUrl.GetString() curFederationJwkURL := param.Federation_JwkUrl.GetString() curBrokerURL := param.Federation_BrokerUrl.GetString() - if len(curDirectorURL) != 0 && len(curRegistryURL) != 0 && len(curFederationJwkURL) != 0 { + if curDirectorURL != "" && curRegistryURL != "" && curFederationJwkURL != "" && curBrokerURL != "" { return nil } - log.Debugln("Performing federation service discovery against endpoint", federationStr) federationUrl, err := url.Parse(federationStr) if err != nil { return errors.Wrapf(err, "Invalid federation value %s:", federationStr) } + if federationUrl.Path != "" && federationUrl.Host != "" { // If the host is nothing, then the url is fine, but if we have a host and a path then there is a problem return errors.New("Invalid federation discovery url is set. No path allowed for federation discovery url. Provided url: " + federationStr) } + federationUrl.Scheme = "https" if len(federationUrl.Path) > 0 && len(federationUrl.Host) == 0 { federationUrl.Host = federationUrl.Path federationUrl.Path = "" + return errors.Wrap(err, "Error discovering the federation with given discovery url") } - discoveryUrl, err := url.Parse(federationUrl.String()) - if err != nil { - return errors.Wrap(err, "unable to parse federation discovery URL") - } - discoveryUrl.Path, err = url.JoinPath(federationUrl.Path, ".well-known/pelican-configuration") - if err != nil { - return errors.Wrap(err, "Unable to parse federation url because of invalid path") - } - - httpClient := http.Client{ - Transport: GetTransport(), - Timeout: time.Second * 5, - } - req, err := http.NewRequest(http.MethodGet, discoveryUrl.String(), nil) - if err != nil { - return errors.Wrapf(err, "Failure when doing federation metadata request creation for %s", discoveryUrl) - } - req.Header.Set("User-Agent", "pelican/"+version) - - result, err := httpClient.Do(req) - if err != nil { - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - return MetadataTimeoutErr.Wrap(err) - } else { - return NewMetadataError(err, "Error occured when querying for metadata") - } - } - - if result.Body != nil { - defer result.Body.Close() - } - - body, err := io.ReadAll(result.Body) + metadata, err := DiscoverUrlFederation(federationStr) if err != nil { - return errors.Wrapf(err, "Failure when doing federation metadata read to %s", discoveryUrl) - } - - if result.StatusCode != http.StatusOK { - truncatedMessage := string(body) - if len(body) > 1000 { - truncatedMessage = string(body[:1000]) - truncatedMessage += " [... remainder truncated ...]" - } - return errors.Errorf("Federation metadata discovery failed with HTTP status %d. Error message: %s", result.StatusCode, truncatedMessage) + return errors.Wrapf(err, "Invalid federation value %s:", federationStr) } - metadata := FederationDiscovery{} - err = json.Unmarshal(body, &metadata) - if err != nil { - return errors.Wrapf(err, "Failure when parsing federation metadata at %s", discoveryUrl) - } + // Set our globals if curDirectorURL == "" { - log.Debugln("Federation service discovery resulted in director URL", metadata.DirectorEndpoint) + log.Debugln("Setting global director url to", metadata.DirectorEndpoint) viper.Set("Federation.DirectorUrl", metadata.DirectorEndpoint) } if curRegistryURL == "" { - log.Debugln("Federation service discovery resulted in registry URL", - metadata.NamespaceRegistrationEndpoint) + log.Debugln("Setting global registry url to", metadata.NamespaceRegistrationEndpoint) viper.Set("Federation.RegistryUrl", metadata.NamespaceRegistrationEndpoint) } if curFederationJwkURL == "" { - log.Debugln("Federation service discovery resulted in JWKS URL", - metadata.JwksUri) + log.Debugln("Setting global jwks url to", metadata.JwksUri) viper.Set("Federation.JwkUrl", metadata.JwksUri) } if curBrokerURL == "" && metadata.BrokerEndpoint != "" { - log.Debugln("Federation service discovery resulted in broker URL", metadata.BrokerEndpoint) + log.Debugln("Setting global broker url to", metadata.BrokerEndpoint) viper.Set("Federation.BrokerUrl", metadata.BrokerEndpoint) } diff --git a/config/config_test.go b/config/config_test.go index 29da625e8..c035441b7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -342,3 +342,35 @@ func TestDiscoverFederation(t *testing.T) { viper.Reset() }) } + +func TestDiscoverUrlFederation(t *testing.T) { + // Server to be a "mock" federation + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // make our response: + response := FederationDiscovery{ + DirectorEndpoint: "director", + NamespaceRegistrationEndpoint: "registry", + JwksUri: "jwks", + BrokerEndpoint: "broker", + } + + responseJSON, err := json.Marshal(response) + if err != nil { + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + _, err = w.Write(responseJSON) + assert.NoError(t, err) + })) + defer server.Close() + metadata, err := DiscoverUrlFederation(server.URL) + assert.NoError(t, err) + + // Assert that the metadata matches expectations + assert.Equal(t, "director", metadata.DirectorEndpoint, "Unexpected DirectorEndpoint") + assert.Equal(t, "registry", metadata.NamespaceRegistrationEndpoint, "Unexpected NamespaceRegistrationEndpoint") + assert.Equal(t, "jwks", metadata.JwksUri, "Unexpected JwksUri") + assert.Equal(t, "broker", metadata.BrokerEndpoint, "Unexpected BrokerEndpoint") +} \ No newline at end of file diff --git a/github_scripts/get_put_test.sh b/github_scripts/get_put_test.sh index acb21af4f..06c37796e 100755 --- a/github_scripts/get_put_test.sh +++ b/github_scripts/get_put_test.sh @@ -28,6 +28,7 @@ chmod 777 get_put_tmp/origin # Setup env variables needed export PELICAN_FEDERATION_DIRECTORURL="https://$HOSTNAME:8444" +export PELICAN_DIRECTOR_DEFAULTRESPONSE="origins" export PELICAN_FEDERATION_REGISTRYURL="https://$HOSTNAME:8444" export PELICAN_TLSSKIPVERIFY=true export PELICAN_ORIGIN_ENABLEDIRECTREADS=true @@ -123,7 +124,7 @@ do done # Run pelican object put -./pelican object put ./get_put_tmp/input.txt pelican:///test/input.txt -d -t get_put_tmp/test-token.jwt -l get_put_tmp/putOutput.txt +./pelican object put input.txt pelican://$HOSTNAME:8444/test/input.txt -d -t token -l putOutput.txt # Check output of command if grep -q "Dumping response: HTTP/1.1 200 OK" get_put_tmp/putOutput.txt; then @@ -134,7 +135,7 @@ else exit 1 fi -./pelican object get pelican:///test/input.txt get_put_tmp/output.txt -d -t get_put_tmp/test-token.jwt -l get_put_tmp/getOutput.txt +./pelican object get pelican://$HOSTNAME:8444/test/input.txt output.txt -d -t token -l getOutput.txt # Check output of command if grep -q "HTTP Transfer was successful" get_put_tmp/getOutput.txt; then diff --git a/local_cache/local_cache.go b/local_cache/local_cache.go index a765a0a6d..1566aad15 100644 --- a/local_cache/local_cache.go +++ b/local_cache/local_cache.go @@ -506,7 +506,7 @@ func (sc *LocalCache) runMux() error { sourceURL := *sc.directorURL sourceURL.Path = path.Join(sourceURL.Path, path.Clean(req.request.path)) - tj, err := sc.tc.NewTransferJob(&sourceURL, localPath, false, false, client.WithToken(req.request.token)) + tj, err := sc.tc.NewTransferJob(&sourceURL, localPath, sourceURL.Scheme, false, false, client.WithToken(req.request.token)) if err != nil { ds := &downloadStatus{} ds.err.Store(&err)