Skip to content

Commit

Permalink
Merge pull request #878 from joereuss12/cleanup-metadata-client-v2-br…
Browse files Browse the repository at this point in the history
…anch

Remove use of global fed metadata within client
  • Loading branch information
joereuss12 authored Apr 4, 2024
2 parents 9b88257 + c71fe73 commit 332d180
Show file tree
Hide file tree
Showing 22 changed files with 1,107 additions and 658 deletions.
273 changes: 119 additions & 154 deletions client/fed_linux_test.go

Large diffs are not rendered by default.

373 changes: 183 additions & 190 deletions client/fed_test.go

Large diffs are not rendered by default.

238 changes: 172 additions & 66 deletions client/handle_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ import (

"github.com/VividCortex/ewma"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/option"
"github.com/opensaucerer/grab/v3"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/studio-b12/gowebdav"
"github.com/vbauerster/mpb/v8"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
"golang.org/x/time/rate"

"github.com/pelicanplatform/pelican/config"
Expand All @@ -59,9 +60,39 @@ import (
var (
progressCtrOnce sync.Once
progressCtr *mpb.Progress

successTTL = ttlcache.DefaultTTL
failureTTL = 5 * time.Minute

loader = ttlcache.LoaderFunc[string, cacheItem](
func(c *ttlcache.Cache[string, cacheItem], key string) *ttlcache.Item[string, cacheItem] {
ctx := context.Background()
// Note: setting this timeout mostly for unit tests
ctx, cancel := context.WithTimeout(ctx, param.Transport_ResponseHeaderTimeout.GetDuration())
defer cancel()
urlFederation, err := config.DiscoverUrlFederation(ctx, key)
if err != nil {
// Set a shorter TTL for failures
item := c.Set(key, cacheItem{err: err}, failureTTL)
return item
}
// Set a longer TTL for successes
item := c.Set(key, cacheItem{
url: pelicanUrl{
directorUrl: urlFederation.DirectorEndpoint,
},
}, successTTL)
return item
},
)
)

type (
cacheItem struct {
url pelicanUrl
err error
}

// Error type for when the transfer started to return data then completely stopped
StoppedTransferError struct {
Err string
Expand Down Expand Up @@ -182,24 +213,25 @@ type (

// An object able to process transfer jobs.
TransferEngine struct {
ctx context.Context // The context provided upon creation of the engine.
cancel context.CancelFunc
egrp *errgroup.Group // The errgroup for the worker goroutines
work chan *clientTransferJob
files chan *clientTransferFile
results chan *clientTransferResults
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
workersActive int
resultsMap map[uuid.UUID]chan *TransferResults
workMap map[uuid.UUID]chan *TransferJob
notifyChan chan bool
closeChan chan bool
closeDoneChan chan bool
ewmaTick *time.Ticker
ewma ewma.MovingAverage
ewmaVal atomic.Int64
ewmaCtr atomic.Int64
clientLock sync.RWMutex
ctx context.Context // The context provided upon creation of the engine.
cancel context.CancelFunc
egrp *errgroup.Group // The errgroup for the worker goroutines
work chan *clientTransferJob
files chan *clientTransferFile
results chan *clientTransferResults
jobLookupDone chan *clientTransferJob // Indicates the job lookup handler is done with the job
workersActive int
resultsMap map[uuid.UUID]chan *TransferResults
workMap map[uuid.UUID]chan *TransferJob
notifyChan chan bool
closeChan chan bool
closeDoneChan chan bool
ewmaTick *time.Ticker
ewma ewma.MovingAverage
ewmaVal atomic.Int64
ewmaCtr atomic.Int64
clientLock sync.RWMutex
pelicanURLCache *ttlcache.Cache[string, cacheItem]
}

TransferCallbackFunc = func(path string, downloaded int64, totalSize int64, completed bool)
Expand All @@ -210,6 +242,7 @@ type (
ctx context.Context
cancel context.CancelFunc
callback TransferCallbackFunc
engine *TransferEngine
skipAcquire bool // Enable/disable the token acquisition logic. Defaults to acquiring a token
tokenLocation string // Location of a token file to use for transfers
token string // Token that should be used for transfers
Expand All @@ -232,6 +265,10 @@ type (
NeedsToken bool
PackOption string
}

pelicanUrl struct {
directorUrl string
}
)

const (
Expand Down Expand Up @@ -333,6 +370,86 @@ func (tr TransferResults) ID() string {
return tr.jobId.String()
}

func (te *TransferEngine) newPelicanURL(remoteUrl *url.URL) (pelicanURL pelicanUrl, err error) {
scheme := remoteUrl.Scheme
if remoteUrl.Host != "" {
if scheme == "osdf" || scheme == "stash" {
// in the osdf/stash case, fix url's that have a hostname
joinedPath, err := url.JoinPath(remoteUrl.Host, remoteUrl.Path)
// Prefix with a / just in case
remoteUrl.Path = path.Join("/", joinedPath)
if err != nil {
log.Errorln("Failed to join remote destination url path:", err)
return pelicanUrl{}, err
}
} else if scheme == "pelican" {
// If we have a host and url is pelican, we need to extract federation data from the host
log.Debugln("Detected pelican:// url, getting federation metadata from specified host", remoteUrl.Host)
federationUrl := &url.URL{}
// federationUrl, _ := url.Parse(remoteUrl.String())
federationUrl.Scheme = "https"
federationUrl.Path = ""
federationUrl.Host = remoteUrl.Host

// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get(federationUrl.String())
if pelicanUrlItem.Value().err != nil {
return pelicanUrl{}, pelicanUrlItem.Value().err
} else {
pelicanURL = pelicanUrlItem.Value().url
}
}
}

// With an osdf:// url scheme, we assume the user will be using the OSDF so load in our osdf metadata for our url
if scheme == "osdf" {
// If we are using an osdf/stash binary, we discovered the federation already --> load into local url metadata
if config.GetPreferredPrefix() == "OSDF" {
log.Debugln("In OSDF mode with osdf:// url; populating metadata with OSDF defaults")
if param.Federation_DirectorUrl.GetString() == "" || param.Federation_DiscoveryUrl.GetString() == "" || param.Federation_RegistryUrl.GetString() == "" {
return pelicanUrl{}, fmt.Errorf("OSDF default metadata is not populated in config")
} else {
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()
}
} else if config.GetPreferredPrefix() == "PELICAN" {
// We hit this case when we are using a pelican binary but an osdf:// url, therefore we need to disover the osdf federation
log.Debugln("In Pelican mode with an osdf:// url, populating metadata with OSDF defaults")
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get("osg-htc.org")
if pelicanUrlItem.Value().err != nil {
err = pelicanUrlItem.Value().err
return
} else {
pelicanURL = pelicanUrlItem.Value().url
}
}
} else if scheme == "pelican" && remoteUrl.Host == "" {
// We hit this case when we do not have a hostname with a pelican:// url
if param.Federation_DiscoveryUrl.GetString() == "" {
return pelicanUrl{}, fmt.Errorf("Pelican url scheme without discovery-url detected, please provide a federation discovery-url " +
"(e.g. pelican://<federation-url-in-hostname.org></namespace></path/to/file>) within the hostname or with the -f flag")
} else {
// Check if cache has key of federationURL, if not, loader will add it:
pelicanUrlItem := te.pelicanURLCache.Get(param.Federation_DiscoveryUrl.GetString())
if pelicanUrlItem != nil {
pelicanURL = pelicanUrlItem.Value().url
} else {
return pelicanUrl{}, fmt.Errorf("Issue getting metadata information from cache")
}
}
} else if scheme == "" {
// If we don't have a url scheme, then our metadata information should be in the config
log.Debugln("No url scheme detected, getting metadata information from configuration")
pelicanURL.directorUrl = param.Federation_DirectorUrl.GetString()

// If the values do not exist, exit with failure
if pelicanURL.directorUrl == "" {
return pelicanUrl{}, fmt.Errorf("Missing metadata information in config, ensure Federation DirectorUrl, RegistryUrl, and DiscoverUrl are all set")
}
}
return
}

// Returns a new transfer engine object whose lifetime is tied
// to the provided context. Will launcher worker goroutines to
// handle the underlying transfers
Expand All @@ -342,21 +459,31 @@ func NewTransferEngine(ctx context.Context) *TransferEngine {
work := make(chan *clientTransferJob)
files := make(chan *clientTransferFile)
results := make(chan *clientTransferResults, 5)
suppressedLoader := ttlcache.NewSuppressedLoader(loader, new(singleflight.Group))
pelicanURLCache := ttlcache.New(
ttlcache.WithTTL[string, cacheItem](30*time.Minute),
ttlcache.WithLoader(suppressedLoader),
)

// Start our cache for url metadata
go pelicanURLCache.Start()

te := &TransferEngine{
ctx: ctx,
cancel: cancel,
egrp: egrp,
work: work,
files: files,
results: results,
resultsMap: make(map[uuid.UUID]chan *TransferResults),
workMap: make(map[uuid.UUID]chan *TransferJob),
jobLookupDone: make(chan *clientTransferJob),
notifyChan: make(chan bool),
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
ewmaTick: time.NewTicker(ewmaInterval),
ewma: ewma.NewMovingAverage(),
ctx: ctx,
cancel: cancel,
egrp: egrp,
work: work,
files: files,
results: results,
resultsMap: make(map[uuid.UUID]chan *TransferResults),
workMap: make(map[uuid.UUID]chan *TransferJob),
jobLookupDone: make(chan *clientTransferJob),
notifyChan: make(chan bool),
closeChan: make(chan bool),
closeDoneChan: make(chan bool),
ewmaTick: time.NewTicker(ewmaInterval),
ewma: ewma.NewMovingAverage(),
pelicanURLCache: pelicanURLCache,
}
workerCount := param.Client_WorkerCount.GetInt()
if workerCount <= 0 {
Expand Down Expand Up @@ -421,6 +548,7 @@ func (te *TransferEngine) NewClient(options ...TransferOption) (client *Transfer
return
}
client = &TransferClient{
engine: te,
id: id,
results: make(chan *TransferResults),
work: make(chan *TransferJob),
Expand Down Expand Up @@ -463,6 +591,7 @@ func (te *TransferEngine) Shutdown() error {
te.Close()
<-te.closeDoneChan
te.ewmaTick.Stop()
te.pelicanURLCache.Stop()
te.cancel()

err := te.egrp.Wait()
Expand Down Expand Up @@ -742,6 +871,12 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
return
}

pelicanURL, err := tc.engine.newPelicanURL(remoteUrl)
if err != nil {
err = errors.Wrap(err, "error generating metadata for specified url")
return
}

copyUrl := *remoteUrl // Make a copy of the input URL to avoid concurrent issues.
tj = &TransferJob{
caches: tc.caches,
Expand Down Expand Up @@ -773,40 +908,11 @@ func (tc *TransferClient) NewTransferJob(remoteUrl *url.URL, localPath string, u
}
}

if remoteUrl.Scheme == "pelican" && remoteUrl.Host != "" {
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrlCopy := *remoteUrl
fedUrlCopy.Scheme = "https"
fedUrlCopy.Path = ""
fedUrlCopy.RawFragment = ""
fedUrlCopy.RawQuery = ""
viper.Set("Federation.DiscoveryUrl", fedUrlCopy.String())
if err = config.DiscoverFederation(); err != nil {
return
}
} else if remoteUrl.Scheme == "osdf" {
if remoteUrl.Host != "" {
remoteUrl.Path = path.Clean(path.Join("/", remoteUrl.Host, remoteUrl.Path))
}
fd := config.GetFederation()
defer config.SetFederation(fd)
config.SetFederation(config.FederationDiscovery{})
fedUrl := &url.URL{}
fedUrl.Scheme = "https"
fedUrl.Host = "osg-htc.org"
viper.Set("Federation.DiscoveryUrl", fedUrl.String())
if err = config.DiscoverFederation(); err != nil {
return
}
}

tj.useDirector = param.Federation_DirectorUrl.GetString() != ""
ns, err := getNamespaceInfo(remoteUrl.Path, param.Federation_DirectorUrl.GetString(), upload)
tj.useDirector = pelicanURL.directorUrl != ""
ns, err := getNamespaceInfo(remoteUrl.Path, pelicanURL.directorUrl, upload)
if err != nil {
log.Errorln(err)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl)
err = errors.Wrapf(err, "failed to get namespace information for remote URL %s", remoteUrl.String())
}
tj.namespace = ns

Expand Down Expand Up @@ -1290,7 +1396,7 @@ func sortAttempts(ctx context.Context, path string, attempts []transferAttemptDe
}

func downloadObject(transfer *transferFile) (transferResults TransferResults, err error) {
log.Debugln("Downloading file from", transfer.remoteURL, "to", transfer.localPath)
log.Debugln("Downloading object from", transfer.remoteURL, "to", transfer.localPath)
// Remove the source from the file path
directory := path.Dir(transfer.localPath)
var downloaded int64
Expand Down
Loading

0 comments on commit 332d180

Please sign in to comment.