From 98d559c3d44ec6ebfdc03fe26400b6d1182617c0 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 26 Aug 2024 10:55:40 -0500 Subject: [PATCH 1/6] Add timed-wait functionality to the errgroup This is a fork of the upstream errgroup code with an additional method that allows a timed wait. Those using the `TryGoUntil` method will block waiting for the ability to execute in the errgroup _or_ the provided context is canceled. This allows callers to have a third option between "never block" and "block indefinitely". --- utils/errgroup.go | 176 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 utils/errgroup.go diff --git a/utils/errgroup.go b/utils/errgroup.go new file mode 100644 index 000000000..6f518be75 --- /dev/null +++ b/utils/errgroup.go @@ -0,0 +1,176 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in https://cs.opensource.google/go/x/sync/+/refs/tags/v0.8.0:LICENSE. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +// +// This is a fork of the upstream golang code at: +// https://cs.opensource.google/go/x/sync/+/refs/tags/v0.8.0:errgroup/errgroup.go +// +// It adds the ability to block in TryGo until a context has expired. + +package utils + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// It will block until the provided context is cancelled. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// TryGoUntil calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// If the group is at the configured limit, the call will block until the +// provided context is cancelled. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGoUntil(ctx context.Context, f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + case <-ctx.Done(): + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +} From 3fd75dd4092105cc05a2489d356ed70291871a90 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Mon, 26 Aug 2024 09:53:45 -0500 Subject: [PATCH 2/6] Cache the object existence responses from the service This PR allows the object existence response to be cached at the director, preventing the remote service from getting repeated queries for the same object. It also splits the parameters enabling the director cache so we can control separately the origin and cache behavior. Finally, if there's a single origin, the director can assume it can elide the existence check: assume that an object always exists at the one origin that can provide it. --- config/resources/defaults.yaml | 6 +- director/cache_ads.go | 8 +- director/director.go | 14 +-- director/stat.go | 111 ++++++++++++++++++------ director/stat_test.go | 152 ++++++++++++++++++++++++++------- docs/parameters.yaml | 71 ++++++++++++++- metrics/director.go | 2 +- param/parameters.go | 6 ++ param/parameters_struct.go | 10 +++ 9 files changed, 312 insertions(+), 68 deletions(-) diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index dad7b706e..956eefe9e 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -54,7 +54,11 @@ Director: AdvertisementTTL: 15m OriginCacheHealthTestInterval: 15s EnableBroker: true - EnableStat: true + CheckOriginPresence: true + CheckCachePresence: true + AssumePresenceAtSingleOrigin: true + CachePresenceTTL: 1m + CachePresenceCapacity: 10000 Cache: Port: 8442 SelfTest: true diff --git a/director/cache_ads.go b/director/cache_ads.go index 063159dfe..6a459606f 100644 --- a/director/cache_ads.go +++ b/director/cache_ads.go @@ -35,6 +35,7 @@ import ( "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/utils" ) type filterType string @@ -143,12 +144,17 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[] if concLimit == 0 { concLimit = -1 } - statErrGrp := errgroup.Group{} + statErrGrp := utils.Group{} statErrGrp.SetLimit(concLimit) newUtil := serverStatUtil{ Errgroup: &statErrGrp, Cancel: cancel, Context: baseCtx, + ResultCache: ttlcache.New[string, *objectMetadata]( + ttlcache.WithTTL[string, *objectMetadata](param.Director_CachePresenceTTL.GetDuration()), + ttlcache.WithDisableTouchOnHit[string, *objectMetadata](), + ttlcache.WithCapacity[string, *objectMetadata](uint64(param.Director_CachePresenceCapacity.GetInt())), + ), } statUtils[ad.URL.String()] = newUtil } diff --git a/director/director.go b/director/director.go index 9d2527b80..1c243af4d 100644 --- a/director/director.go +++ b/director/director.go @@ -31,6 +31,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/hashicorp/go-version" + "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -67,9 +68,10 @@ type ( } // Utility struct to keep track of the `stat` call the director made to the origin/cache servers serverStatUtil struct { - Context context.Context - Cancel context.CancelFunc - Errgroup *errgroup.Group + Context context.Context + Cancel context.CancelFunc + Errgroup *utils.Group + ResultCache *ttlcache.Cache[string, *objectMetadata] } ) @@ -362,7 +364,7 @@ func redirectToCache(ginCtx *gin.Context) { reqParams := getRequestParameters(ginCtx.Request) - disableStat := !param.Director_EnableStat.GetBool() + disableStat := !param.Director_CheckCachePresence.GetBool() // Skip the stat check for object availability // If either disableStat or skipstat is set, then skip the stat query @@ -580,7 +582,7 @@ func redirectToOrigin(ginCtx *gin.Context) { reqParams := getRequestParameters(ginCtx.Request) // Skip the stat check for object availability if either disableStat or skipstat is set - skipStat := reqParams.Has(pelican_url.QuerySkipStat) || !param.Director_EnableStat.GetBool() + skipStat := reqParams.Has(pelican_url.QuerySkipStat) || !param.Director_CheckOriginPresence.GetBool() // Include caches in the response if Director.CachesPullFromCaches is enabled // AND prefercached query parameter is set @@ -599,7 +601,7 @@ func redirectToOrigin(ginCtx *gin.Context) { } // If the namespace requires a token yet there's no token available, skip the stat. - if !namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "" { + if (!namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "") || (param.Director_AssumePresenceAtSingleOrigin.GetBool() && len(originAds) == 1) { skipStat = true } diff --git a/director/stat.go b/director/stat.go index e2b735d84..d430cad23 100644 --- a/director/stat.go +++ b/director/stat.go @@ -28,6 +28,7 @@ import ( "strconv" "time" + "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -263,6 +264,21 @@ func WithToken(tk string) queryOption { } } +func getStatUtils(ads []server_structs.ServerAd) map[string]*serverStatUtil { + statUtilsMutex.RLock() + defer statUtilsMutex.RUnlock() + + result := make(map[string]*serverStatUtil, len(ads)) + for _, ad := range ads { + url := ad.URL.String() + statUtil, ok := statUtils[url] + if ok { + result[url] = &statUtil + } + } + return result +} + // Implementation of querying origins/cache servers for their availability of an object. // It blocks until max successful requests has been received, all potential origins/caches responded (or timeout), or cancelContext was closed. // @@ -310,9 +326,11 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st return } timeout := param.Director_StatTimeout.GetDuration() - positiveReqChan := make(chan *objectMetadata) - negativeReqChan := make(chan error) - deniedReqChan := make(chan *headReqForbiddenErr) // Requests with 403 response + // Note there is a small buffer in each channel; in the case of a cache hit, we write + // to the channel from within this goroutine. + positiveReqChan := make(chan *objectMetadata, 1) + negativeReqChan := make(chan error, 1) + deniedReqChan := make(chan *headReqForbiddenErr, 1) // Requests with 403 response // Cancel the rest of the requests when requests received >= max required maxCancelCtx, maxCancel := context.WithCancel(ctx) numTotalReq := 0 @@ -327,12 +345,9 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st return } - // Use RLock to allolw multiple queries - statUtilsMutex.RLock() - defer statUtilsMutex.RUnlock() - + utils := getStatUtils(ads) for _, adExt := range ads { - statUtil, ok := statUtils[adExt.URL.String()] + statUtil, ok := utils[adExt.URL.String()] if !ok { numTotalReq += 1 log.Debugf("Server %q is missing data for stat call, skip querying...", adExt.Name) @@ -345,15 +360,24 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st } // Use an anonymous func to pass variable safely to the goroutine func(serverAd server_structs.ServerAd) { - statUtil.Errgroup.Go(func() error { - baseUrl := serverAd.URL - - // For the topology server, if the server does not support public read, - // or the token is provided, or the object is protected, then it's safe to assume this request goes to authenticated endpoint - // For Pelican server, we don't populate authURL and only use server URL as the base URL - if serverAd.FromTopology && (!serverAd.Caps.PublicReads || cfg.protected || cfg.token != "") && serverAd.AuthURL.String() != "" { - baseUrl = serverAd.AuthURL - } + + baseUrl := serverAd.URL + // For the topology server, if the server does not support public read, + // or the token is provided, or the object is protected, then it's safe to assume this request goes to authenticated endpoint + // For Pelican server, we don't populate authURL and only use server URL as the base URL + if serverAd.FromTopology && (!serverAd.Caps.PublicReads || cfg.protected || cfg.token != "") && serverAd.AuthURL.String() != "" { + baseUrl = serverAd.AuthURL + } + + totalLabels := prometheus.Labels{ + "server_name": serverAd.Name, + "server_url": baseUrl.String(), + "server_type": string(serverAd.Type), + "cached_result": "false", + "result": "", + } + + queryFunc := func() (metadata *objectMetadata, err error) { activeLabels := prometheus.Labels{ "server_name": serverAd.Name, @@ -363,22 +387,30 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st metrics.PelicanDirectorStatActive.With(activeLabels).Inc() defer metrics.PelicanDirectorStatActive.With(activeLabels).Dec() - metadata, err := stat.ReqHandler(maxCancelCtx, objectName, baseUrl, true, cfg.token, timeout) + metadata, err = stat.ReqHandler(maxCancelCtx, objectName, baseUrl, true, cfg.token, timeout) + var reqNotFound *headReqNotFoundErr cancelErr := &headReqCancelledErr{} - if err != nil && !errors.As(err, &cancelErr) { // Skip additional requests if the previous one is cancelled + if err != nil && !errors.As(err, &cancelErr) && !errors.As(err, &reqNotFound) { // If the request returns 403 or 500, it could be because we request a digest and xrootd // does not have this turned on, or had trouble calculating the checksum // Retry without digest metadata, err = stat.ReqHandler(maxCancelCtx, objectName, baseUrl, false, cfg.token, timeout) } - totalLabels := prometheus.Labels{ - "server_name": serverAd.Name, - "server_url": baseUrl.String(), - "server_type": string(serverAd.Type), - "result": "", + // If get a 404, record it in the cache. + if errors.As(err, &reqNotFound) { + statUtil.ResultCache.Set(objectName, nil, ttlcache.DefaultTTL) + } else if err == nil { + statUtil.ResultCache.Set(objectName, metadata, ttlcache.DefaultTTL) } + + return + } + + lookupFunc := func() error { + + metadata, err := queryFunc() if err != nil { switch e := err.(type) { case *headReqTimeoutErr: @@ -418,7 +450,30 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st positiveReqChan <- metadata } return nil - }) + } + + if item := statUtil.ResultCache.Get(objectName); item != nil { + // If we get a cache hit -- but the cache item is going to expire in the next 10 seconds, + // then we assume this is a "hot" object and we'll benefit from the preemptively refreshing + // the ttlcache. If we can, asynchronously query the service. + if time.Until(item.ExpiresAt()) < 10*time.Second { + statUtil.Errgroup.TryGo(func() (err error) { _, err = queryFunc(); return }) + } + totalLabels["cached_result"] = "true" + if metadata := item.Value(); metadata != nil { + totalLabels["result"] = string(metrics.StatSucceeded) + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + positiveReqChan <- metadata + } else { + log.Debugf("Object %s not found at %s server %s: (cached result)", objectName, serverAd.Type, baseUrl.String()) + negativeReqChan <- &headReqNotFoundErr{} + totalLabels["result"] = string(metrics.StatNotFound) + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + } + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + } else { + statUtil.Errgroup.TryGoUntil(ctx, lookupFunc) + } }(adExt) } @@ -454,9 +509,9 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st qResult.Status = queryFailed qResult.ErrorType = queryInsufficientResErr qResult.Msg = fmt.Sprintf("Number of success response: %d is less than MinStatResponse (%d) required.", len(successResult), minReq) - serverIssuers := []string{} - for _, dErr := range deniedResult { - serverIssuers = append(serverIssuers, dErr.IssuerUrl) + serverIssuers := make([]string, len(deniedResult)) + for idx, dErr := range deniedResult { + serverIssuers[idx] = dErr.IssuerUrl } qResult.DeniedServers = serverIssuers return diff --git a/director/stat_test.go b/director/stat_test.go index 7f74755c5..49ea263ef 100644 --- a/director/stat_test.go +++ b/director/stat_test.go @@ -30,16 +30,41 @@ import ( "time" "github.com/jellydator/ttlcache/v3" + log "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/utils" ) +func cleanupMock() { + statUtilsMutex.Lock() + defer statUtilsMutex.Unlock() + serverAds.DeleteAll() + for sa := range statUtils { + delete(statUtils, sa) + } +} + +func initMockStatUtils() { + statUtilsMutex.Lock() + defer statUtilsMutex.Unlock() + + for _, key := range serverAds.Keys() { + ctx, cancel := context.WithCancel(context.Background()) + statUtils[key] = serverStatUtil{ + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), + } + } +} + func TestQueryServersForObject(t *testing.T) { server_utils.ResetTestState() viper.Set("Director.MinStatResponse", 1) @@ -190,29 +215,6 @@ func TestQueryServersForObject(t *testing.T) { ) } - cleanupMock := func() { - statUtilsMutex.Lock() - defer statUtilsMutex.Unlock() - serverAds.DeleteAll() - for sa := range statUtils { - delete(statUtils, sa) - } - } - - initMockStatUtils := func() { - statUtilsMutex.Lock() - defer statUtilsMutex.Unlock() - - for _, key := range serverAds.Keys() { - ctx, cancel := context.WithCancel(context.Background()) - statUtils[key] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, - } - } - } - t.Cleanup(func() { cleanupMock() // Restore the old serverAds at the end of this test func @@ -388,9 +390,10 @@ func TestQueryServersForObject(t *testing.T) { statUtilsMutex.Lock() statUtils[mockCacheServer[0].URL.String()] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), } statUtilsMutex.Unlock() @@ -422,9 +425,10 @@ func TestQueryServersForObject(t *testing.T) { statUtilsMutex.Lock() statUtils[mockOrigin[0].URL.String()] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), } statUtilsMutex.Unlock() @@ -698,6 +702,94 @@ func TestQueryServersForObject(t *testing.T) { }) } +func TestCache(t *testing.T) { + viper.Reset() + viper.Set("Logging.Level", "Debug") + viper.Set("ConfigDir", t.TempDir()) + config.InitConfig() + reqCounter := 0 + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + reqCounter += 1 + log.Debugln("Mock server handling request for ", req.URL.String()) + if req.Method == "HEAD" && req.URL.String() == "/foo/test.txt" { + rw.Header().Set("Content-Length", "1") + rw.WriteHeader(http.StatusOK) + return + } else if req.Method == "HEAD" { + rw.WriteHeader(http.StatusNotFound) + } else { + rw.WriteHeader(http.StatusBadRequest) + } + })) + defer server.Close() + + viper.Set("Server.ExternalWebUrl", server.URL) + viper.Set("IssuerUrl", server.URL) + realServerUrl, err := url.Parse(server.URL) + require.NoError(t, err) + + serverAds = ttlcache.New(ttlcache.WithTTL[string, *server_structs.Advertisement](15 * time.Minute)) + + mockCacheAd := server_structs.ServerAd{ + Name: "cache", + Type: server_structs.CacheType.String(), + URL: *realServerUrl, + Caps: server_structs.Capabilities{PublicReads: true}, + } + mockNsAd := server_structs.NamespaceAdV2{Path: "/foo"} + serverAds.Set( + mockCacheAd.URL.String(), + &server_structs.Advertisement{ + ServerAd: mockCacheAd, + NamespaceAds: []server_structs.NamespaceAdV2{mockNsAd}, + }, + ttlcache.DefaultTTL, + ) + initMockStatUtils() + t.Cleanup(cleanupMock) + + t.Run("repeated-cache-access-found", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stat := NewObjectStat() + + startCtr := reqCounter + qResult := stat.queryServersForObject(ctx, "/foo/test.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, querySuccessful, qResult.Status) + require.Len(t, qResult.Objects, 1) + assert.Equal(t, 1, qResult.Objects[0].ContentLength) + require.Equal(t, startCtr+1, reqCounter) + + qResult = stat.queryServersForObject(ctx, "/foo/test.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, querySuccessful, qResult.Status) + require.Len(t, qResult.Objects, 1) + assert.Equal(t, 1, qResult.Objects[0].ContentLength) + require.Equal(t, startCtr+1, reqCounter) + }) + + t.Run("repeated-cache-access-not-found", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stat := NewObjectStat() + + startCtr := reqCounter + qResult := stat.queryServersForObject(ctx, "/foo/notfound.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, queryFailed, qResult.Status) + assert.Len(t, qResult.Objects, 0) + assert.Equal(t, queryInsufficientResErr, qResult.ErrorType) + require.Equal(t, startCtr+1, reqCounter) + + qResult = stat.queryServersForObject(ctx, "/foo/notfound.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, queryFailed, qResult.Status) + assert.Len(t, qResult.Objects, 0) + assert.Equal(t, queryInsufficientResErr, qResult.ErrorType) + require.Equal(t, startCtr+1, reqCounter) + }) +} + func TestSendHeadReq(t *testing.T) { server_utils.ResetTestState() diff --git a/docs/parameters.yaml b/docs/parameters.yaml index e630efc7e..ccb7aef64 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1334,7 +1334,53 @@ components: ["director"] --- name: Director.EnableStat description: |+ - Disable the `stat` query to the origin servers for object requests. + Before redirecting a cache (or, for direct reads or writes, a client) to an origin, + query the origin to see if the object is present. + + Enabling this option generates slightly more load on the origin; however, it provides + improved error messages and allows a namespace to effectively be split across multiple + origins. +type: bool +default: true +deprecated: true +hidden: true +replacedby: Director.CheckOriginPresence +components: ["director"] +--- +name: Director.CheckOriginPresence +description: |+ + Before redirecting a cache (or, for direct reads or writes, a client) to an origin, + query the origin to see if the object is present. + + Enabling this option generates slightly more load on the origin; however, it provides + improved error messages and allows a namespace to effectively be split across multiple + origins. +type: bool +default: true +components: ["director"] +--- +name: Director.AssumePresenceAtSingleOrigin +description: |+ + If `Director.CheckOriginPresence` is enabled, the director will check for object + presence at the origin before redirecting a client. + + If this option is enabled and there's only one possible origin for the object, + then the check will be skipped. + + Enabling this option will reduce load on single-origin namespaces but clients will + get less informative error messages. +type: bool +default: true +hidden: true +components: ["director"] +--- +name: Director.CheckCachePresence +description: |+ + Before redirecting a client to a cache, query the origin to see if the object is present + at the cache. + + Enabling this option improves the cache selection algorithm, allowing the director + to prefer caches nearby the client with the object over caches without the object. type: bool default: true components: ["director"] @@ -1429,6 +1475,29 @@ default: none components: ["director"] hidden: true --- +name: Director.CachePresenceTTL +description: |+ + If `Director.CheckCachePresence` is enabled, the director will check with remote cache + to see if the object is present before redirecting a client. + + This parameter controls how long the director will cache the result of the lookup. Longer values + will reduce the load generated on the caches but may reduce the accuracy of the result (as the + contents of the cache will change over time). +type: duration +default: 1m +components: ["director"] +--- +name: Director.CachePresenceCapacity +description: |+ + If `Director.CheckCachePresence` is enabled, the director will check with remote cache + to see if the object is present before redirecting a client. + + This parameter controls how many responses, per-service, will be cached. +type: int +default: 10000 +hidden: true +components: ["director"] +--- ############################ # Registry-level configs # ############################ diff --git a/metrics/director.go b/metrics/director.go index 6a87d9990..82b5b2d15 100644 --- a/metrics/director.go +++ b/metrics/director.go @@ -80,7 +80,7 @@ var ( PelicanDirectorStatTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pelican_director_stat_total", Help: "The total stat queries the director issues. The status can be Succeeded, Cancelled, Timeout, Forbidden, or UnknownErr", - }, []string{"server_name", "server_url", "server_type", "result"}) // result: see enums for DirectorStatResult + }, []string{"server_name", "server_url", "server_type", "result", "cached_result"}) // result: see enums for DirectorStatResult PelicanDirectorServerCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pelican_director_server_count", diff --git a/param/parameters.go b/param/parameters.go index ed60e757b..1af215d7a 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -52,6 +52,7 @@ type ObjectParam struct { func GetDeprecated() map[string][]string { return map[string][]string{ "Cache.DataLocation": {"Cache.LocalRoot"}, + "Director.EnableStat": {"Director.CheckOriginPresence"}, "DisableHttpProxy": {"Client.DisableHttpProxy"}, "DisableProxyFallback": {"Client.DisableProxyFallback"}, "MinimumDownloadSpeed": {"Client.MinimumDownloadSpeed"}, @@ -300,6 +301,7 @@ var ( Client_MaximumDownloadSpeed = IntParam{"Client.MaximumDownloadSpeed"} Client_MinimumDownloadSpeed = IntParam{"Client.MinimumDownloadSpeed"} Client_WorkerCount = IntParam{"Client.WorkerCount"} + Director_CachePresenceCapacity = IntParam{"Director.CachePresenceCapacity"} Director_MaxStatResponse = IntParam{"Director.MaxStatResponse"} Director_MinStatResponse = IntParam{"Director.MinStatResponse"} Director_StatConcurrencyLimit = IntParam{"Director.StatConcurrencyLimit"} @@ -329,7 +331,10 @@ var ( Client_DisableHttpProxy = BoolParam{"Client.DisableHttpProxy"} Client_DisableProxyFallback = BoolParam{"Client.DisableProxyFallback"} Debug = BoolParam{"Debug"} + Director_AssumePresenceAtSingleOrigin = BoolParam{"Director.AssumePresenceAtSingleOrigin"} Director_CachesPullFromCaches = BoolParam{"Director.CachesPullFromCaches"} + Director_CheckCachePresence = BoolParam{"Director.CheckCachePresence"} + Director_CheckOriginPresence = BoolParam{"Director.CheckOriginPresence"} Director_EnableBroker = BoolParam{"Director.EnableBroker"} Director_EnableOIDC = BoolParam{"Director.EnableOIDC"} Director_EnableStat = BoolParam{"Director.EnableStat"} @@ -376,6 +381,7 @@ var ( Client_SlowTransferWindow = DurationParam{"Client.SlowTransferWindow"} Client_StoppedTransferTimeout = DurationParam{"Client.StoppedTransferTimeout"} Director_AdvertisementTTL = DurationParam{"Director.AdvertisementTTL"} + Director_CachePresenceTTL = DurationParam{"Director.CachePresenceTTL"} Director_OriginCacheHealthTestInterval = DurationParam{"Director.OriginCacheHealthTestInterval"} Director_StatTimeout = DurationParam{"Director.StatTimeout"} Federation_TopologyReloadInterval = DurationParam{"Federation.TopologyReloadInterval"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 32cc73682..318566bf8 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -60,9 +60,14 @@ type Config struct { Debug bool `mapstructure:"debug"` Director struct { AdvertisementTTL time.Duration `mapstructure:"advertisementttl"` + AssumePresenceAtSingleOrigin bool `mapstructure:"assumepresenceatsingleorigin"` + CachePresenceCapacity int `mapstructure:"cachepresencecapacity"` + CachePresenceTTL time.Duration `mapstructure:"cachepresencettl"` CacheResponseHostnames []string `mapstructure:"cacheresponsehostnames"` CacheSortMethod string `mapstructure:"cachesortmethod"` CachesPullFromCaches bool `mapstructure:"cachespullfromcaches"` + CheckCachePresence bool `mapstructure:"checkcachepresence"` + CheckOriginPresence bool `mapstructure:"checkoriginpresence"` DefaultResponse string `mapstructure:"defaultresponse"` EnableBroker bool `mapstructure:"enablebroker"` EnableOIDC bool `mapstructure:"enableoidc"` @@ -356,9 +361,14 @@ type configWithType struct { Debug struct { Type string; Value bool } Director struct { AdvertisementTTL struct { Type string; Value time.Duration } + AssumePresenceAtSingleOrigin struct { Type string; Value bool } + CachePresenceCapacity struct { Type string; Value int } + CachePresenceTTL struct { Type string; Value time.Duration } CacheResponseHostnames struct { Type string; Value []string } CacheSortMethod struct { Type string; Value string } CachesPullFromCaches struct { Type string; Value bool } + CheckCachePresence struct { Type string; Value bool } + CheckOriginPresence struct { Type string; Value bool } DefaultResponse struct { Type string; Value string } EnableBroker struct { Type string; Value bool } EnableOIDC struct { Type string; Value bool } From f1160f24646e7f25710b85dfdb5810b79bbf83e3 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Thu, 19 Sep 2024 09:03:39 -0500 Subject: [PATCH 3/6] Minor tweaks of the parameter documentation --- docs/parameters.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/parameters.yaml b/docs/parameters.yaml index ccb7aef64..6183e9176 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1284,7 +1284,8 @@ description: |+ - "random": Sorts caches randomly. - "adaptive": Sorts caches according to stochastically-generated weights that consider a combination of factors, including a cache's distance from the client, its IO load, and whether the cache already has the requested object. - See details at https://github.com/PelicanPlatform/pelican/discussions/1198 + See details at https://github.com/PelicanPlatform/pelican/discussions/1198. Note that if `Director.CheckCachePresence` + is set to false, then the adaptive algorithm cannot use the cache locality information. type: string default: distance components: ["director"] @@ -1376,7 +1377,7 @@ components: ["director"] --- name: Director.CheckCachePresence description: |+ - Before redirecting a client to a cache, query the origin to see if the object is present + Before redirecting a client to a cache, query the cache to see if the object is present at the cache. Enabling this option improves the cache selection algorithm, allowing the director From eb4448a05e2e47cc11862523f00c38f1ee8eec87 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 20 Sep 2024 07:47:06 -0500 Subject: [PATCH 4/6] Assume caches can serve public reads With this change, the caches registered in topology will be queried by the director for object locality; without it, those caches are ignored, greatly decreasing the total number of public caches that are used. --- config/resources/defaults.yaml | 2 +- docs/parameters.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index 956eefe9e..62853239e 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -49,7 +49,7 @@ Director: CacheSortMethod: "distance" MinStatResponse: 1 MaxStatResponse: 1 - StatTimeout: 1000ms + StatTimeout: 2000ms StatConcurrencyLimit: 1000 AdvertisementTTL: 15m OriginCacheHealthTestInterval: 15s diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 6183e9176..e5ceddc9e 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1390,7 +1390,7 @@ name: Director.StatTimeout description: |+ The timeout for a single `stat` request. type: duration -default: 200ms +default: 2000ms components: ["director"] --- name: Director.StatConcurrencyLimit From b761a7ec789f40f0def35b5868b76197304c81f2 Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Mon, 21 Oct 2024 17:06:31 +0000 Subject: [PATCH 5/6] Fix test that was not using random ports --- cmd/fed_serve_cache_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/fed_serve_cache_test.go b/cmd/fed_serve_cache_test.go index abe45ed7b..22aa46d10 100644 --- a/cmd/fed_serve_cache_test.go +++ b/cmd/fed_serve_cache_test.go @@ -101,6 +101,8 @@ func TestFedServeCache(t *testing.T) { viper.Set("Origin.EnableCmsd", false) viper.Set("Origin.EnableMacaroons", false) viper.Set("Origin.EnableVoms", false) + viper.Set("Server.WebPort", 0) + viper.Set("Origin.Port", 0) viper.Set("TLSSkipVerify", true) viper.Set("Server.EnableUI", false) viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite")) From 707d5d00fa390a71a174e51f7e4d9804e6b613bb Mon Sep 17 00:00:00 2001 From: Justin Hiemstra Date: Tue, 22 Oct 2024 15:58:04 +0000 Subject: [PATCH 6/6] Increase stat channel buffer sizes and bump test timeout value --- director/stat.go | 6 +++--- local_cache/cache_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/director/stat.go b/director/stat.go index d430cad23..5c249d4f6 100644 --- a/director/stat.go +++ b/director/stat.go @@ -328,9 +328,9 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st timeout := param.Director_StatTimeout.GetDuration() // Note there is a small buffer in each channel; in the case of a cache hit, we write // to the channel from within this goroutine. - positiveReqChan := make(chan *objectMetadata, 1) - negativeReqChan := make(chan error, 1) - deniedReqChan := make(chan *headReqForbiddenErr, 1) // Requests with 403 response + positiveReqChan := make(chan *objectMetadata, 5) + negativeReqChan := make(chan error, 5) + deniedReqChan := make(chan *headReqForbiddenErr, 5) // Requests with 403 response // Cancel the rest of the requests when requests received >= max required maxCancelCtx, maxCancel := context.WithCancel(ctx) numTotalReq := 0 diff --git a/local_cache/cache_test.go b/local_cache/cache_test.go index 8401a4bd9..f57582ce4 100644 --- a/local_cache/cache_test.go +++ b/local_cache/cache_test.go @@ -417,7 +417,7 @@ func TestOriginUnresponsive(t *testing.T) { tmpDir := t.TempDir() server_utils.ResetTestState() - viper.Set("Transport.ResponseHeaderTimeout", "3s") + viper.Set("Transport.ResponseHeaderTimeout", "5s") viper.Set("Logging.Level", "debug") ft := fed_test_utils.NewFedTest(t, pubOriginCfg)