Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of global fed metadata within client #878

Merged
273 changes: 119 additions & 154 deletions client/fed_linux_test.go

Large diffs are not rendered by default.

373 changes: 183 additions & 190 deletions client/fed_test.go

Large diffs are not rendered by default.

238 changes: 172 additions & 66 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ import (

"github.com/VividCortex/ewma"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/option"
"github.com/opensaucerer/grab/v3"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/studio-b12/gowebdav"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"

"github.com/pelicanplatform/pelican/config"
Expand All @@ -59,9 +60,39 @@ import (
var (
progressCtrOnce sync.Once
progressCtr *mpb.Progress

successTTL = ttlcache.DefaultTTL
failureTTL = 5 * time.Minute

loader = ttlcache.LoaderFunc[string, cacheItem](
func(c *ttlcache.Cache[string, cacheItem], key string) *ttlcache.Item[string, cacheItem] {
ctx := context.Background()
// Note: setting this timeout mostly for unit tests
ctx, cancel := context.WithTimeout(ctx, param.Transport_ResponseHeaderTimeout.GetDuration())
defer cancel()
urlFederation, err := config.DiscoverUrlFederation(ctx, key)
if err != nil {
// Set a shorter TTL for failures
item := c.Set(key, cacheItem{err: err}, failureTTL)
return item
}
// Set a longer TTL for successes
item := c.Set(key, cacheItem{
url: pelicanUrl{
directorUrl: urlFederation.DirectorEndpoint,
},
}, successTTL)
return item
},
)
)

type (
cacheItem struct {
url pelicanUrl
err error
}

// Error type for when the transfer started to return data then completely stopped
StoppedTransferError struct {
Err string
Expand Down Expand Up @@ -182,24 +213,25 @@ type (

// An object able to process transfer jobs.
TransferEngine struct {
ctx context.Context // The context provided upon creation of the engine.
cancel context.CancelFunc
egrp *errgroup.Group // The errgroup for the worker goroutines
work chan *clientTransferJob
files chan *clientTransferFile
results chan *clientTransferResults
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
workersActive int
resultsMap map[uuid.UUID]chan *TransferResults
workMap map[uuid.UUID]chan *TransferJob
notifyChan chan bool
closeChan chan bool
closeDoneChan chan bool
ewmaTick *time.Ticker
ewma ewma.MovingAverage
ewmaVal atomic.Int64
ewmaCtr atomic.Int64
clientLock sync.RWMutex
ctx context.Context // The context provided upon creation of the engine.
cancel context.CancelFunc
egrp *errgroup.Group // The errgroup for the worker goroutines
work chan *clientTransferJob
files chan *clientTransferFile
results chan *clientTransferResults
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
workersActive int
resultsMap map[uuid.UUID]chan *TransferResults
workMap map[uuid.UUID]chan *TransferJob
notifyChan chan bool
closeChan chan bool
closeDoneChan chan bool
ewmaTick *time.Ticker
ewma ewma.MovingAverage
ewmaVal atomic.Int64
ewmaCtr atomic.Int64
clientLock sync.RWMutex
pelicanURLCache *ttlcache.Cache[string, cacheItem]
}

TransferCallbackFunc = func(path string, downloaded int64, totalSize int64, completed bool)
Expand All @@ -210,6 +242,7 @@ type (
ctx context.Context
cancel context.CancelFunc
callback TransferCallbackFunc
engine *TransferEngine
skipAcquire bool // Enable/disable the token acquisition logic. Defaults to acquiring a token
tokenLocation string // Location of a token file to use for transfers
token string // Token that should be used for transfers
Expand All @@ -232,6 +265,10 @@ type (
NeedsToken bool
PackOption string
}

pelicanUrl struct {
directorUrl string
}
)

const (
Expand Down Expand Up @@ -333,6 +370,86 @@ func (tr TransferResults) ID() string {
return tr.jobId.String()
}

func (te *TransferEngine) newPelicanURL(remoteUrl *url.URL) (pelicanURL pelicanUrl, err error) {
scheme := remoteUrl.Scheme
if remoteUrl.Host != "" {
if scheme == "osdf" || scheme == "stash" {
// in the osdf/stash case, fix url's that have a hostname
joinedPath, err := url.JoinPath(remoteUrl.Host, remoteUrl.Path)
// Prefix with a / just in case
remoteUrl.Path = path.Join("/", joinedPath)
if err != nil {
log.Errorln("Failed to join remote destination url path:", err)
return pelicanUrl{}, err
}
} else if scheme == "pelican" {
// If we have a host and url is pelican, we need to extract federation data from the host
log.Debugln("Detected pelican:// url, getting federation metadata from specified host", remoteUrl.Host)
federationUrl := &url.URL{}
// federationUrl, _ := url.Parse(remoteUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
federationUrl.Host = remoteUrl.Host

// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get(federationUrl.String())
if pelicanUrlItem.Value().err != nil {
return pelicanUrl{}, pelicanUrlItem.Value().err
} else {
pelicanURL = pelicanUrlItem.Value().url
}
}
}

// With an osdf:// url scheme, we assume the user will be using the OSDF so load in our osdf metadata for our url
if scheme == "osdf" {
// If we are using an osdf/stash binary, we discovered the federation already --> load into local url metadata
if config.GetPreferredPrefix() == "OSDF" {
log.Debugln("In OSDF mode with osdf:// url; populating metadata with OSDF defaults")
if param.Federation_DirectorUrl.GetString() == "" || param.Federation_DiscoveryUrl.GetString() == "" || param.Federation_RegistryUrl.GetString() == "" {
return pelicanUrl{}, fmt.Errorf("OSDF default metadata is not populated in config")
} else {
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()
}
} else if config.GetPreferredPrefix() == "PELICAN" {
// We hit this case when we are using a pelican binary but an osdf:// url, therefore we need to disover the osdf federation
log.Debugln("In Pelican mode with an osdf:// url, populating metadata with OSDF defaults")
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get("osg-htc.org")
if pelicanUrlItem.Value().err != nil {
err = pelicanUrlItem.Value().err
return
} else {
pelicanURL = pelicanUrlItem.Value().url
}
}
} else if scheme == "pelican" && remoteUrl.Host == "" {
// We hit this case when we do not have a hostname with a pelican:// url
if param.Federation_DiscoveryUrl.GetString() == "" {
return pelicanUrl{}, fmt.Errorf("Pelican url scheme without discovery-url detected, please provide a federation discovery-url " +
"(e.g. pelican://<federation-url-in-hostname.org></namespace></path/to/file>) within the hostname or with the -f flag")
} else {
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get(param.Federation_DiscoveryUrl.GetString())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value().url
} else {
return pelicanUrl{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "" {
// If we don't have a url scheme, then our metadata information should be in the config
log.Debugln("No url scheme detected, getting metadata information from configuration")
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()

// If the values do not exist, exit with failure
if pelicanURL.directorUrl == "" {
return pelicanUrl{}, fmt.Errorf("Missing metadata information in config, ensure Federation DirectorUrl, RegistryUrl, and DiscoverUrl are all set")
}
}
return
}

// Returns a new transfer engine object whose lifetime is tied
// to the provided context. Will launcher worker goroutines to
// handle the underlying transfers
Expand All @@ -342,21 +459,31 @@ func NewTransferEngine(ctx context.Context) *TransferEngine {
work := make(chan *clientTransferJob)
files := make(chan *clientTransferFile)
results := make(chan *clientTransferResults, 5)
suppressedLoader := ttlcache.NewSuppressedLoader(loader, new(singleflight.Group))
pelicanURLCache := ttlcache.New(
ttlcache.WithTTL[string, cacheItem](30*time.Minute),
ttlcache.WithLoader(suppressedLoader),
)

// Start our cache for url metadata
go pelicanURLCache.Start()

te := &TransferEngine{
ctx: ctx,
cancel: cancel,
egrp: egrp,
work: work,
files: files,
results: results,
resultsMap: make(map[uuid.UUID]chan *TransferResults),
workMap: make(map[uuid.UUID]chan *TransferJob),
jobLookupDone: make(chan *clientTransferJob),
notifyChan: make(chan bool),
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
ewmaTick: time.NewTicker(ewmaInterval),
ewma: ewma.NewMovingAverage(),
ctx: ctx,
cancel: cancel,
egrp: egrp,
work: work,
files: files,
results: results,
resultsMap: make(map[uuid.UUID]chan *TransferResults),
workMap: make(map[uuid.UUID]chan *TransferJob),
jobLookupDone: make(chan *clientTransferJob),
notifyChan: make(chan bool),
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
ewmaTick: time.NewTicker(ewmaInterval),
ewma: ewma.NewMovingAverage(),
pelicanURLCache: pelicanURLCache,
}
workerCount := param.Client_WorkerCount.GetInt()
if workerCount <= 0 {
Expand Down Expand Up @@ -421,6 +548,7 @@ func (te *TransferEngine) NewClient(options ...TransferOption) (client *Transfer
return
}
client = &TransferClient{
engine: te,
id: id,
results: make(chan *TransferResults),
work: make(chan *TransferJob),
Expand Down Expand Up @@ -463,6 +591,7 @@ func (te *TransferEngine) Shutdown() error {
te.Close()
<-te.closeDoneChan
te.ewmaTick.Stop()
te.pelicanURLCache.Stop()
te.cancel()

err := te.egrp.Wait()
Expand Down Expand Up @@ -742,6 +871,12 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
return
}

pelicanURL, err := tc.engine.newPelicanURL(remoteUrl)
if err != nil {
err = errors.Wrap(err, "error generating metadata for specified url")
return
}

copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues.
tj = &TransferJob{
caches: tc.caches,
Expand Down Expand Up @@ -773,40 +908,11 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
}
}

if remoteUrl.Scheme == "pelican" && remoteUrl.Host != "" {
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrlCopy := *remoteUrl
fedUrlCopy.Scheme = "https"
fedUrlCopy.Path = ""
fedUrlCopy.RawFragment = ""
fedUrlCopy.RawQuery = ""
viper.Set("Federation.DiscoveryUrl", fedUrlCopy.String())
if err = config.DiscoverFederation(); err != nil {
return
}
} else if remoteUrl.Scheme == "osdf" {
if remoteUrl.Host != "" {
remoteUrl.Path = path.Clean(path.Join("/", remoteUrl.Host, remoteUrl.Path))
}
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrl := &url.URL{}
fedUrl.Scheme = "https"
fedUrl.Host = "osg-htc.org"
viper.Set("Federation.DiscoveryUrl", fedUrl.String())
if err = config.DiscoverFederation(); err != nil {
return
}
}

tj.useDirector = param.Federation_DirectorUrl.GetString() != ""
ns, err := getNamespaceInfo(remoteUrl.Path, param.Federation_DirectorUrl.GetString(), upload)
tj.useDirector = pelicanURL.directorUrl != ""
ns, err := getNamespaceInfo(remoteUrl.Path, pelicanURL.directorUrl, upload)
if err != nil {
log.Errorln(err)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String())
}
tj.namespace = ns

Expand Down Expand Up @@ -1290,7 +1396,7 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe
}

func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) {
log.Debugln("Downloading file from", transfer.remoteURL, "to", transfer.localPath)
log.Debugln("Downloading object from", transfer.remoteURL, "to", transfer.localPath)
// Remove the source from the file path
directory := path.Dir(transfer.localPath)
var downloaded int64
Expand Down
Loading
Loading