Skip to content

Commit

Permalink
Use the pelican_url package throughout Pelican
Browse files Browse the repository at this point in the history
With the scaffolding in the pelican_url package, this commit hooks up the
rest of Pelican to the machinery.
  • Loading branch information
jhiemstrawisc committed Sep 17, 2024
1 parent c629a59 commit c8d012d
Show file tree
Hide file tree
Showing 40 changed files with 645 additions and 1,786 deletions.
2 changes: 1 addition & 1 deletion broker/token_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func getRegistryIssValue(prefix string) (iss string, err error) {
if err != nil {
return
}
namespaceUrlStr := fedInfo.NamespaceRegistrationEndpoint
namespaceUrlStr := fedInfo.RegistryEndpoint
if namespaceUrlStr == "" {
err = errors.New("namespace URL is not set")
return
Expand Down
15 changes: 7 additions & 8 deletions client/acquire_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"fmt"
"io/fs"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strconv"
Expand All @@ -41,6 +39,7 @@ import (

"github.com/pelicanplatform/pelican/config"
oauth2 "github.com/pelicanplatform/pelican/oauth2"
"github.com/pelicanplatform/pelican/pelican_url"
"github.com/pelicanplatform/pelican/server_structs"
)

Expand All @@ -60,7 +59,7 @@ type (
// of the process.
tokenGenerator struct {
DirResp *server_structs.DirectorResponse
Destination *url.URL
Destination *pelican_url.PelicanURL
TokenLocation string
TokenName string
IsWrite bool
Expand All @@ -79,7 +78,7 @@ type (
}
)

func newTokenGenerator(dest *url.URL, dirResp *server_structs.DirectorResponse, isWrite bool, enableAcquire bool) *tokenGenerator {
func newTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, isWrite bool, enableAcquire bool) *tokenGenerator {
return &tokenGenerator{
DirResp: dirResp,
Destination: dest,
Expand Down Expand Up @@ -549,7 +548,7 @@ func registerClient(dirResp server_structs.DirectorResponse) (*config.PrefixEntr

// Given a URL and a director Response, attempt to acquire a valid
// token for that URL.
func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error) {
func AcquireToken(dest string, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error) {

log.Debugln("Acquiring a token from configuration and OAuth2")

Expand Down Expand Up @@ -621,7 +620,7 @@ func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse,
var acceptableToken *config.TokenEntry = nil
acceptableUnexpiredToken := ""
for idx, token := range prefixEntry.Tokens {
if !tokenIsAcceptable(token.AccessToken, destination.Path, dirResp, opts) {
if !tokenIsAcceptable(token.AccessToken, dest, dirResp, opts) {
continue
}
if acceptableToken == nil {
Expand Down Expand Up @@ -676,7 +675,7 @@ func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse,
}
}

token, err := oauth2.AcquireToken(issuer, prefixEntry, dirResp, destination.Path, opts)
token, err := oauth2.AcquireToken(issuer, prefixEntry, dirResp, dest, opts)
if errors.Is(err, oauth2.ErrUnknownClient) {
// We use anonymously-registered clients; OA4MP can periodically garbage collect these to prevent DoS
// In this case, we register a new client and try to acquire again.
Expand All @@ -690,7 +689,7 @@ func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse,
log.Warningln("Failed to save new token to configuration file:", err)
}

if token, err = oauth2.AcquireToken(issuer, prefixEntry, dirResp, destination.Path, opts); err != nil {
if token, err = oauth2.AcquireToken(issuer, prefixEntry, dirResp, dest, opts); err != nil {
return "", err
}
} else if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@ import (
log "github.com/sirupsen/logrus"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/pelican_url"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/utils"
)

// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) {
resourceUrl := directorUrl + sourcePath
func queryDirector(ctx context.Context, verb string, pUrl *pelican_url.PelicanURL) (resp *http.Response, err error) {
resourceUrl, err := url.Parse(pUrl.FedInfo.DirectorEndpoint)
if err != nil {
log.Errorln("Failed to parse the director URL:", err)
return nil, err
}
resourceUrl.Path = pUrl.Path
resourceUrl.RawQuery = pUrl.RawQuery

// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
// dance!)
Expand All @@ -52,7 +60,7 @@ func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (r
},
}

req, err := http.NewRequestWithContext(ctx, verb, resourceUrl, nil)
req, err := http.NewRequestWithContext(ctx, verb, resourceUrl.String(), nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
Expand Down Expand Up @@ -161,28 +169,26 @@ func parseServersFromDirectorResponse(resp *http.Response) (servers []*url.URL,
}

// Retrieve federation namespace information for a given URL.
func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl string, isPut bool, query string) (parsedResponse server_structs.DirectorResponse, err error) {
if directorUrl == "" {
func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, isPut bool) (parsedResponse server_structs.DirectorResponse, err error) {
if pUrl.FedInfo.DirectorEndpoint == "" {
return server_structs.DirectorResponse{},
errors.Errorf("unable to retrieve information from a Director for object %s because no director URL was provided", resourcePath)
errors.Errorf("unable to retrieve information from a Director for object %s because none was found in pelican URL metadata.", pUrl.Path)
}

log.Debugln("Will query director at", directorUrl, "for object", resourcePath)
log.Debugln("Will query director at", pUrl.FedInfo.DirectorEndpoint, "for object", pUrl.Path)
verb := "GET"
if isPut {
verb = "PUT"
}
if query != "" {
resourcePath += "?" + query
}

var dirResp *http.Response
dirResp, err = queryDirector(ctx, verb, resourcePath, directorUrl)
dirResp, err = queryDirector(ctx, verb, pUrl)
if err != nil {
if isPut && dirResp != nil && dirResp.StatusCode == 405 {
err = errors.New("error 405: No writeable origins were found")
return
} else {
err = errors.Wrapf(err, "error while querying the director at %s", directorUrl)
err = errors.Wrapf(err, "error while querying the director at %s", pUrl.FedInfo.DirectorEndpoint)
return
}
}
Expand Down
24 changes: 21 additions & 3 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ package client
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/pelicanplatform/pelican/pelican_url"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/utils"
)
Expand Down Expand Up @@ -102,8 +104,14 @@ func TestQueryDirector(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()

pUrl := pelican_url.PelicanURL{
FedInfo: pelican_url.FederationDiscovery{
DirectorEndpoint: server.URL,
},
Path: "/foo/bar",
}
// Call QueryDirector with the test server URL and a source path
actualResp, err := queryDirector(context.Background(), "GET", "/foo/bar", server.URL)
actualResp, err := queryDirector(context.Background(), "GET", &pUrl)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -163,7 +171,7 @@ func TestGetDirectorInfoForPath(t *testing.T) {
directorUrl: "",
isPut: false,
query: "",
expectedError: "unable to retrieve information from a Director for object /test because no director URL was provided",
expectedError: "unable to retrieve information from a Director for object /test because none was found in pelican URL metadata.",
},
{
name: "Successful GET request",
Expand Down Expand Up @@ -194,7 +202,17 @@ func TestGetDirectorInfoForPath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
_, err := GetDirectorInfoForPath(ctx, tt.resourcePath, tt.directorUrl, tt.isPut, tt.query)
urlStr := fmt.Sprintf("pelican://foo%s", tt.resourcePath)
if tt.query != "" {
urlStr += "?" + tt.query
}

pUrl, err := pelican_url.Parse(urlStr, nil, nil)
assert.NoError(t, err)

pUrl.FedInfo.DirectorEndpoint = tt.directorUrl

_, err = GetDirectorInfoForPath(ctx, pUrl, tt.isPut)
if tt.expectedError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedError)
Expand Down
4 changes: 2 additions & 2 deletions client/errorAccum.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

grab "github.com/opensaucerer/grab/v3"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/pelican_url"
)

type (
Expand Down Expand Up @@ -143,7 +143,7 @@ func IsRetryable(err error) bool {
if errors.Is(err, grab.ErrBadLength) {
return false
}
if errors.Is(err, config.MetadataTimeoutErr) {
if errors.Is(err, pelican_url.MetadataTimeoutErr) {
return true
}
// There's little a user can do about a TCP connection reset besides retry; if it
Expand Down
43 changes: 23 additions & 20 deletions client/fed_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package client_test
import (
"fmt"
"io/fs"
"net/url"
"os"
"path/filepath"
"strconv"
Expand All @@ -38,6 +39,7 @@ import (
config "github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/fed_test_utils"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/pelican_url"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/token"
"github.com/pelicanplatform/pelican/token_scopes"
Expand All @@ -49,6 +51,8 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, mixedAuthOriginCfg)
discoveryUrl, err := url.Parse(param.Federation_DiscoveryUrl.GetString())
require.NoError(t, err)

te, err := client.NewTransferEngine(fed.Ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -124,20 +128,20 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s", discoveryUrl.Host,
export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand All @@ -151,32 +155,31 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
require.NoError(t, err)
}()
assert.NoError(t, err)

oldHost, err := pelican_url.SetOsdfDiscoveryHost(discoveryUrl.String())
require.NoError(t, err)
defer func() {
_, _ = pelican_url.SetOsdfDiscoveryHost(oldHost)
}()

for _, export := range fed.Exports {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("osdf:///%s/%s/%s", export.FederationPrefix, "osdf_osdf", dirName)
hostname := fmt.Sprintf("%v:%v", param.Server_WebHost.GetString(), param.Server_WebPort.GetInt())

// Set our metadata values in config since that is what this url scheme - prefix combo does in handle_http
metadata, err := config.DiscoverUrlFederation(fed.Ctx, "https://"+hostname)
assert.NoError(t, err)
viper.Set("Federation.DirectorUrl", metadata.DirectorEndpoint)
viper.Set("Federation.RegistryUrl", metadata.NamespaceRegistrationEndpoint)
viper.Set("Federation.DiscoveryUrl", hostname)
uploadUrl := fmt.Sprintf("osdf://%s/%s/%s", export.FederationPrefix, "osdf_osdf", dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
tmpDir := t.TempDir()
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, tmpDir, true)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, tmpDir, true)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, tmpDir, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, tmpDir, true, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand All @@ -195,19 +198,19 @@ func TestRecursiveUploadsAndDownloads(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s%s/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()),
uploadUrl := fmt.Sprintf("pelican://%s%s/%s/%s", discoveryUrl.Host,
export.FederationPrefix, "osdf_osdf", dirName)
// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()))
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadUrl, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
var transferDetailsDownload []client.TransferResults
if export.Capabilities.PublicReads {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true)
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true)
} else {
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadUrl, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
}
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
Expand Down
Loading

0 comments on commit c8d012d

Please sign in to comment.