diff --git a/cmd/fed_serve_cache_test.go b/cmd/fed_serve_cache_test.go index abe45ed7b..22aa46d10 100644 --- a/cmd/fed_serve_cache_test.go +++ b/cmd/fed_serve_cache_test.go @@ -101,6 +101,8 @@ func TestFedServeCache(t *testing.T) { viper.Set("Origin.EnableCmsd", false) viper.Set("Origin.EnableMacaroons", false) viper.Set("Origin.EnableVoms", false) + viper.Set("Server.WebPort", 0) + viper.Set("Origin.Port", 0) viper.Set("TLSSkipVerify", true) viper.Set("Server.EnableUI", false) viper.Set("Registry.DbLocation", filepath.Join(t.TempDir(), "ns-registry.sqlite")) diff --git a/config/resources/defaults.yaml b/config/resources/defaults.yaml index dad7b706e..62853239e 100644 --- a/config/resources/defaults.yaml +++ b/config/resources/defaults.yaml @@ -49,12 +49,16 @@ Director: CacheSortMethod: "distance" MinStatResponse: 1 MaxStatResponse: 1 - StatTimeout: 1000ms + StatTimeout: 2000ms StatConcurrencyLimit: 1000 AdvertisementTTL: 15m OriginCacheHealthTestInterval: 15s EnableBroker: true - EnableStat: true + CheckOriginPresence: true + CheckCachePresence: true + AssumePresenceAtSingleOrigin: true + CachePresenceTTL: 1m + CachePresenceCapacity: 10000 Cache: Port: 8442 SelfTest: true diff --git a/director/cache_ads.go b/director/cache_ads.go index 7aeaf5f0d..9b19cc686 100644 --- a/director/cache_ads.go +++ b/director/cache_ads.go @@ -35,6 +35,7 @@ import ( "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" + "github.com/pelicanplatform/pelican/utils" ) type filterType string @@ -143,12 +144,17 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[] if concLimit == 0 { concLimit = -1 } - statErrGrp := errgroup.Group{} + statErrGrp := utils.Group{} statErrGrp.SetLimit(concLimit) newUtil := serverStatUtil{ Errgroup: &statErrGrp, Cancel: cancel, Context: baseCtx, + ResultCache: ttlcache.New[string, *objectMetadata]( + ttlcache.WithTTL[string, *objectMetadata](param.Director_CachePresenceTTL.GetDuration()), + ttlcache.WithDisableTouchOnHit[string, *objectMetadata](), + ttlcache.WithCapacity[string, *objectMetadata](uint64(param.Director_CachePresenceCapacity.GetInt())), + ), } statUtils[ad.URL.String()] = newUtil } diff --git a/director/director.go b/director/director.go index 2699b0fe2..ddf39e967 100644 --- a/director/director.go +++ b/director/director.go @@ -31,6 +31,7 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/hashicorp/go-version" + "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -67,9 +68,10 @@ type ( } // Utility struct to keep track of the `stat` call the director made to the origin/cache servers serverStatUtil struct { - Context context.Context - Cancel context.CancelFunc - Errgroup *errgroup.Group + Context context.Context + Cancel context.CancelFunc + Errgroup *utils.Group + ResultCache *ttlcache.Cache[string, *objectMetadata] } // Context key for the project name @@ -382,7 +384,7 @@ func redirectToCache(ginCtx *gin.Context) { reqParams := getRequestParameters(ginCtx.Request) - disableStat := !param.Director_EnableStat.GetBool() + disableStat := !param.Director_CheckCachePresence.GetBool() // Skip the stat check for object availability // If either disableStat or skipstat is set, then skip the stat query @@ -603,7 +605,7 @@ func redirectToOrigin(ginCtx *gin.Context) { reqParams := getRequestParameters(ginCtx.Request) // Skip the stat check for object availability if either disableStat or skipstat is set - skipStat := reqParams.Has(pelican_url.QuerySkipStat) || !param.Director_EnableStat.GetBool() + skipStat := reqParams.Has(pelican_url.QuerySkipStat) || !param.Director_CheckOriginPresence.GetBool() // Include caches in the response if Director.CachesPullFromCaches is enabled // AND prefercached query parameter is set @@ -622,7 +624,7 @@ func redirectToOrigin(ginCtx *gin.Context) { } // If the namespace requires a token yet there's no token available, skip the stat. - if !namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "" { + if (!namespaceAd.Caps.PublicReads && reqParams.Get("authz") == "") || (param.Director_AssumePresenceAtSingleOrigin.GetBool() && len(originAds) == 1) { skipStat = true } diff --git a/director/stat.go b/director/stat.go index e2b735d84..5c249d4f6 100644 --- a/director/stat.go +++ b/director/stat.go @@ -28,6 +28,7 @@ import ( "strconv" "time" + "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -263,6 +264,21 @@ func WithToken(tk string) queryOption { } } +func getStatUtils(ads []server_structs.ServerAd) map[string]*serverStatUtil { + statUtilsMutex.RLock() + defer statUtilsMutex.RUnlock() + + result := make(map[string]*serverStatUtil, len(ads)) + for _, ad := range ads { + url := ad.URL.String() + statUtil, ok := statUtils[url] + if ok { + result[url] = &statUtil + } + } + return result +} + // Implementation of querying origins/cache servers for their availability of an object. // It blocks until max successful requests has been received, all potential origins/caches responded (or timeout), or cancelContext was closed. // @@ -310,9 +326,11 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st return } timeout := param.Director_StatTimeout.GetDuration() - positiveReqChan := make(chan *objectMetadata) - negativeReqChan := make(chan error) - deniedReqChan := make(chan *headReqForbiddenErr) // Requests with 403 response + // Note there is a small buffer in each channel; in the case of a cache hit, we write + // to the channel from within this goroutine. + positiveReqChan := make(chan *objectMetadata, 5) + negativeReqChan := make(chan error, 5) + deniedReqChan := make(chan *headReqForbiddenErr, 5) // Requests with 403 response // Cancel the rest of the requests when requests received >= max required maxCancelCtx, maxCancel := context.WithCancel(ctx) numTotalReq := 0 @@ -327,12 +345,9 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st return } - // Use RLock to allolw multiple queries - statUtilsMutex.RLock() - defer statUtilsMutex.RUnlock() - + utils := getStatUtils(ads) for _, adExt := range ads { - statUtil, ok := statUtils[adExt.URL.String()] + statUtil, ok := utils[adExt.URL.String()] if !ok { numTotalReq += 1 log.Debugf("Server %q is missing data for stat call, skip querying...", adExt.Name) @@ -345,15 +360,24 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st } // Use an anonymous func to pass variable safely to the goroutine func(serverAd server_structs.ServerAd) { - statUtil.Errgroup.Go(func() error { - baseUrl := serverAd.URL - - // For the topology server, if the server does not support public read, - // or the token is provided, or the object is protected, then it's safe to assume this request goes to authenticated endpoint - // For Pelican server, we don't populate authURL and only use server URL as the base URL - if serverAd.FromTopology && (!serverAd.Caps.PublicReads || cfg.protected || cfg.token != "") && serverAd.AuthURL.String() != "" { - baseUrl = serverAd.AuthURL - } + + baseUrl := serverAd.URL + // For the topology server, if the server does not support public read, + // or the token is provided, or the object is protected, then it's safe to assume this request goes to authenticated endpoint + // For Pelican server, we don't populate authURL and only use server URL as the base URL + if serverAd.FromTopology && (!serverAd.Caps.PublicReads || cfg.protected || cfg.token != "") && serverAd.AuthURL.String() != "" { + baseUrl = serverAd.AuthURL + } + + totalLabels := prometheus.Labels{ + "server_name": serverAd.Name, + "server_url": baseUrl.String(), + "server_type": string(serverAd.Type), + "cached_result": "false", + "result": "", + } + + queryFunc := func() (metadata *objectMetadata, err error) { activeLabels := prometheus.Labels{ "server_name": serverAd.Name, @@ -363,22 +387,30 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st metrics.PelicanDirectorStatActive.With(activeLabels).Inc() defer metrics.PelicanDirectorStatActive.With(activeLabels).Dec() - metadata, err := stat.ReqHandler(maxCancelCtx, objectName, baseUrl, true, cfg.token, timeout) + metadata, err = stat.ReqHandler(maxCancelCtx, objectName, baseUrl, true, cfg.token, timeout) + var reqNotFound *headReqNotFoundErr cancelErr := &headReqCancelledErr{} - if err != nil && !errors.As(err, &cancelErr) { // Skip additional requests if the previous one is cancelled + if err != nil && !errors.As(err, &cancelErr) && !errors.As(err, &reqNotFound) { // If the request returns 403 or 500, it could be because we request a digest and xrootd // does not have this turned on, or had trouble calculating the checksum // Retry without digest metadata, err = stat.ReqHandler(maxCancelCtx, objectName, baseUrl, false, cfg.token, timeout) } - totalLabels := prometheus.Labels{ - "server_name": serverAd.Name, - "server_url": baseUrl.String(), - "server_type": string(serverAd.Type), - "result": "", + // If get a 404, record it in the cache. + if errors.As(err, &reqNotFound) { + statUtil.ResultCache.Set(objectName, nil, ttlcache.DefaultTTL) + } else if err == nil { + statUtil.ResultCache.Set(objectName, metadata, ttlcache.DefaultTTL) } + + return + } + + lookupFunc := func() error { + + metadata, err := queryFunc() if err != nil { switch e := err.(type) { case *headReqTimeoutErr: @@ -418,7 +450,30 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st positiveReqChan <- metadata } return nil - }) + } + + if item := statUtil.ResultCache.Get(objectName); item != nil { + // If we get a cache hit -- but the cache item is going to expire in the next 10 seconds, + // then we assume this is a "hot" object and we'll benefit from the preemptively refreshing + // the ttlcache. If we can, asynchronously query the service. + if time.Until(item.ExpiresAt()) < 10*time.Second { + statUtil.Errgroup.TryGo(func() (err error) { _, err = queryFunc(); return }) + } + totalLabels["cached_result"] = "true" + if metadata := item.Value(); metadata != nil { + totalLabels["result"] = string(metrics.StatSucceeded) + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + positiveReqChan <- metadata + } else { + log.Debugf("Object %s not found at %s server %s: (cached result)", objectName, serverAd.Type, baseUrl.String()) + negativeReqChan <- &headReqNotFoundErr{} + totalLabels["result"] = string(metrics.StatNotFound) + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + } + metrics.PelicanDirectorStatTotal.With(totalLabels).Inc() + } else { + statUtil.Errgroup.TryGoUntil(ctx, lookupFunc) + } }(adExt) } @@ -454,9 +509,9 @@ func (stat *ObjectStat) queryServersForObject(ctx context.Context, objectName st qResult.Status = queryFailed qResult.ErrorType = queryInsufficientResErr qResult.Msg = fmt.Sprintf("Number of success response: %d is less than MinStatResponse (%d) required.", len(successResult), minReq) - serverIssuers := []string{} - for _, dErr := range deniedResult { - serverIssuers = append(serverIssuers, dErr.IssuerUrl) + serverIssuers := make([]string, len(deniedResult)) + for idx, dErr := range deniedResult { + serverIssuers[idx] = dErr.IssuerUrl } qResult.DeniedServers = serverIssuers return diff --git a/director/stat_test.go b/director/stat_test.go index 7f74755c5..49ea263ef 100644 --- a/director/stat_test.go +++ b/director/stat_test.go @@ -30,16 +30,41 @@ import ( "time" "github.com/jellydator/ttlcache/v3" + log "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/pelicanplatform/pelican/config" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/utils" ) +func cleanupMock() { + statUtilsMutex.Lock() + defer statUtilsMutex.Unlock() + serverAds.DeleteAll() + for sa := range statUtils { + delete(statUtils, sa) + } +} + +func initMockStatUtils() { + statUtilsMutex.Lock() + defer statUtilsMutex.Unlock() + + for _, key := range serverAds.Keys() { + ctx, cancel := context.WithCancel(context.Background()) + statUtils[key] = serverStatUtil{ + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), + } + } +} + func TestQueryServersForObject(t *testing.T) { server_utils.ResetTestState() viper.Set("Director.MinStatResponse", 1) @@ -190,29 +215,6 @@ func TestQueryServersForObject(t *testing.T) { ) } - cleanupMock := func() { - statUtilsMutex.Lock() - defer statUtilsMutex.Unlock() - serverAds.DeleteAll() - for sa := range statUtils { - delete(statUtils, sa) - } - } - - initMockStatUtils := func() { - statUtilsMutex.Lock() - defer statUtilsMutex.Unlock() - - for _, key := range serverAds.Keys() { - ctx, cancel := context.WithCancel(context.Background()) - statUtils[key] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, - } - } - } - t.Cleanup(func() { cleanupMock() // Restore the old serverAds at the end of this test func @@ -388,9 +390,10 @@ func TestQueryServersForObject(t *testing.T) { statUtilsMutex.Lock() statUtils[mockCacheServer[0].URL.String()] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), } statUtilsMutex.Unlock() @@ -422,9 +425,10 @@ func TestQueryServersForObject(t *testing.T) { statUtilsMutex.Lock() statUtils[mockOrigin[0].URL.String()] = serverStatUtil{ - Context: ctx, - Cancel: cancel, - Errgroup: &errgroup.Group{}, + Context: ctx, + Cancel: cancel, + Errgroup: &utils.Group{}, + ResultCache: ttlcache.New[string, *objectMetadata](), } statUtilsMutex.Unlock() @@ -698,6 +702,94 @@ func TestQueryServersForObject(t *testing.T) { }) } +func TestCache(t *testing.T) { + viper.Reset() + viper.Set("Logging.Level", "Debug") + viper.Set("ConfigDir", t.TempDir()) + config.InitConfig() + reqCounter := 0 + + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + reqCounter += 1 + log.Debugln("Mock server handling request for ", req.URL.String()) + if req.Method == "HEAD" && req.URL.String() == "/foo/test.txt" { + rw.Header().Set("Content-Length", "1") + rw.WriteHeader(http.StatusOK) + return + } else if req.Method == "HEAD" { + rw.WriteHeader(http.StatusNotFound) + } else { + rw.WriteHeader(http.StatusBadRequest) + } + })) + defer server.Close() + + viper.Set("Server.ExternalWebUrl", server.URL) + viper.Set("IssuerUrl", server.URL) + realServerUrl, err := url.Parse(server.URL) + require.NoError(t, err) + + serverAds = ttlcache.New(ttlcache.WithTTL[string, *server_structs.Advertisement](15 * time.Minute)) + + mockCacheAd := server_structs.ServerAd{ + Name: "cache", + Type: server_structs.CacheType.String(), + URL: *realServerUrl, + Caps: server_structs.Capabilities{PublicReads: true}, + } + mockNsAd := server_structs.NamespaceAdV2{Path: "/foo"} + serverAds.Set( + mockCacheAd.URL.String(), + &server_structs.Advertisement{ + ServerAd: mockCacheAd, + NamespaceAds: []server_structs.NamespaceAdV2{mockNsAd}, + }, + ttlcache.DefaultTTL, + ) + initMockStatUtils() + t.Cleanup(cleanupMock) + + t.Run("repeated-cache-access-found", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stat := NewObjectStat() + + startCtr := reqCounter + qResult := stat.queryServersForObject(ctx, "/foo/test.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, querySuccessful, qResult.Status) + require.Len(t, qResult.Objects, 1) + assert.Equal(t, 1, qResult.Objects[0].ContentLength) + require.Equal(t, startCtr+1, reqCounter) + + qResult = stat.queryServersForObject(ctx, "/foo/test.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, querySuccessful, qResult.Status) + require.Len(t, qResult.Objects, 1) + assert.Equal(t, 1, qResult.Objects[0].ContentLength) + require.Equal(t, startCtr+1, reqCounter) + }) + + t.Run("repeated-cache-access-not-found", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stat := NewObjectStat() + + startCtr := reqCounter + qResult := stat.queryServersForObject(ctx, "/foo/notfound.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, queryFailed, qResult.Status) + assert.Len(t, qResult.Objects, 0) + assert.Equal(t, queryInsufficientResErr, qResult.ErrorType) + require.Equal(t, startCtr+1, reqCounter) + + qResult = stat.queryServersForObject(ctx, "/foo/notfound.txt", server_structs.CacheType, 1, 1) + assert.Equal(t, queryFailed, qResult.Status) + assert.Len(t, qResult.Objects, 0) + assert.Equal(t, queryInsufficientResErr, qResult.ErrorType) + require.Equal(t, startCtr+1, reqCounter) + }) +} + func TestSendHeadReq(t *testing.T) { server_utils.ResetTestState() diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 518c8299f..6911c82c2 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1284,7 +1284,8 @@ description: |+ - "random": Sorts caches randomly. - "adaptive": Sorts caches according to stochastically-generated weights that consider a combination of factors, including a cache's distance from the client, its IO load, and whether the cache already has the requested object. - See details at https://github.com/PelicanPlatform/pelican/discussions/1198 + See details at https://github.com/PelicanPlatform/pelican/discussions/1198. Note that if `Director.CheckCachePresence` + is set to false, then the adaptive algorithm cannot use the cache locality information. type: string default: distance components: ["director"] @@ -1334,7 +1335,53 @@ components: ["director"] --- name: Director.EnableStat description: |+ - Disable the `stat` query to the origin servers for object requests. + Before redirecting a cache (or, for direct reads or writes, a client) to an origin, + query the origin to see if the object is present. + + Enabling this option generates slightly more load on the origin; however, it provides + improved error messages and allows a namespace to effectively be split across multiple + origins. +type: bool +default: true +deprecated: true +hidden: true +replacedby: Director.CheckOriginPresence +components: ["director"] +--- +name: Director.CheckOriginPresence +description: |+ + Before redirecting a cache (or, for direct reads or writes, a client) to an origin, + query the origin to see if the object is present. + + Enabling this option generates slightly more load on the origin; however, it provides + improved error messages and allows a namespace to effectively be split across multiple + origins. +type: bool +default: true +components: ["director"] +--- +name: Director.AssumePresenceAtSingleOrigin +description: |+ + If `Director.CheckOriginPresence` is enabled, the director will check for object + presence at the origin before redirecting a client. + + If this option is enabled and there's only one possible origin for the object, + then the check will be skipped. + + Enabling this option will reduce load on single-origin namespaces but clients will + get less informative error messages. +type: bool +default: true +hidden: true +components: ["director"] +--- +name: Director.CheckCachePresence +description: |+ + Before redirecting a client to a cache, query the cache to see if the object is present + at the cache. + + Enabling this option improves the cache selection algorithm, allowing the director + to prefer caches nearby the client with the object over caches without the object. type: bool default: true components: ["director"] @@ -1343,7 +1390,7 @@ name: Director.StatTimeout description: |+ The timeout for a single `stat` request. type: duration -default: 200ms +default: 2000ms components: ["director"] --- name: Director.StatConcurrencyLimit @@ -1429,6 +1476,29 @@ default: none components: ["director"] hidden: true --- +name: Director.CachePresenceTTL +description: |+ + If `Director.CheckCachePresence` is enabled, the director will check with remote cache + to see if the object is present before redirecting a client. + + This parameter controls how long the director will cache the result of the lookup. Longer values + will reduce the load generated on the caches but may reduce the accuracy of the result (as the + contents of the cache will change over time). +type: duration +default: 1m +components: ["director"] +--- +name: Director.CachePresenceCapacity +description: |+ + If `Director.CheckCachePresence` is enabled, the director will check with remote cache + to see if the object is present before redirecting a client. + + This parameter controls how many responses, per-service, will be cached. +type: int +default: 10000 +hidden: true +components: ["director"] +--- ############################ # Registry-level configs # ############################ diff --git a/local_cache/cache_test.go b/local_cache/cache_test.go index 8401a4bd9..f57582ce4 100644 --- a/local_cache/cache_test.go +++ b/local_cache/cache_test.go @@ -417,7 +417,7 @@ func TestOriginUnresponsive(t *testing.T) { tmpDir := t.TempDir() server_utils.ResetTestState() - viper.Set("Transport.ResponseHeaderTimeout", "3s") + viper.Set("Transport.ResponseHeaderTimeout", "5s") viper.Set("Logging.Level", "debug") ft := fed_test_utils.NewFedTest(t, pubOriginCfg) diff --git a/metrics/director.go b/metrics/director.go index ec436b6e7..a1e9ca2df 100644 --- a/metrics/director.go +++ b/metrics/director.go @@ -80,7 +80,7 @@ var ( PelicanDirectorStatTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "pelican_director_stat_total", Help: "The total stat queries the director issues. The status can be Succeeded, Cancelled, Timeout, Forbidden, or UnknownErr", - }, []string{"server_name", "server_url", "server_type", "result"}) // result: see enums for DirectorStatResult + }, []string{"server_name", "server_url", "server_type", "result", "cached_result"}) // result: see enums for DirectorStatResult PelicanDirectorServerCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "pelican_director_server_count", diff --git a/param/parameters.go b/param/parameters.go index ed60e757b..1af215d7a 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -52,6 +52,7 @@ type ObjectParam struct { func GetDeprecated() map[string][]string { return map[string][]string{ "Cache.DataLocation": {"Cache.LocalRoot"}, + "Director.EnableStat": {"Director.CheckOriginPresence"}, "DisableHttpProxy": {"Client.DisableHttpProxy"}, "DisableProxyFallback": {"Client.DisableProxyFallback"}, "MinimumDownloadSpeed": {"Client.MinimumDownloadSpeed"}, @@ -300,6 +301,7 @@ var ( Client_MaximumDownloadSpeed = IntParam{"Client.MaximumDownloadSpeed"} Client_MinimumDownloadSpeed = IntParam{"Client.MinimumDownloadSpeed"} Client_WorkerCount = IntParam{"Client.WorkerCount"} + Director_CachePresenceCapacity = IntParam{"Director.CachePresenceCapacity"} Director_MaxStatResponse = IntParam{"Director.MaxStatResponse"} Director_MinStatResponse = IntParam{"Director.MinStatResponse"} Director_StatConcurrencyLimit = IntParam{"Director.StatConcurrencyLimit"} @@ -329,7 +331,10 @@ var ( Client_DisableHttpProxy = BoolParam{"Client.DisableHttpProxy"} Client_DisableProxyFallback = BoolParam{"Client.DisableProxyFallback"} Debug = BoolParam{"Debug"} + Director_AssumePresenceAtSingleOrigin = BoolParam{"Director.AssumePresenceAtSingleOrigin"} Director_CachesPullFromCaches = BoolParam{"Director.CachesPullFromCaches"} + Director_CheckCachePresence = BoolParam{"Director.CheckCachePresence"} + Director_CheckOriginPresence = BoolParam{"Director.CheckOriginPresence"} Director_EnableBroker = BoolParam{"Director.EnableBroker"} Director_EnableOIDC = BoolParam{"Director.EnableOIDC"} Director_EnableStat = BoolParam{"Director.EnableStat"} @@ -376,6 +381,7 @@ var ( Client_SlowTransferWindow = DurationParam{"Client.SlowTransferWindow"} Client_StoppedTransferTimeout = DurationParam{"Client.StoppedTransferTimeout"} Director_AdvertisementTTL = DurationParam{"Director.AdvertisementTTL"} + Director_CachePresenceTTL = DurationParam{"Director.CachePresenceTTL"} Director_OriginCacheHealthTestInterval = DurationParam{"Director.OriginCacheHealthTestInterval"} Director_StatTimeout = DurationParam{"Director.StatTimeout"} Federation_TopologyReloadInterval = DurationParam{"Federation.TopologyReloadInterval"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 32cc73682..318566bf8 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -60,9 +60,14 @@ type Config struct { Debug bool `mapstructure:"debug"` Director struct { AdvertisementTTL time.Duration `mapstructure:"advertisementttl"` + AssumePresenceAtSingleOrigin bool `mapstructure:"assumepresenceatsingleorigin"` + CachePresenceCapacity int `mapstructure:"cachepresencecapacity"` + CachePresenceTTL time.Duration `mapstructure:"cachepresencettl"` CacheResponseHostnames []string `mapstructure:"cacheresponsehostnames"` CacheSortMethod string `mapstructure:"cachesortmethod"` CachesPullFromCaches bool `mapstructure:"cachespullfromcaches"` + CheckCachePresence bool `mapstructure:"checkcachepresence"` + CheckOriginPresence bool `mapstructure:"checkoriginpresence"` DefaultResponse string `mapstructure:"defaultresponse"` EnableBroker bool `mapstructure:"enablebroker"` EnableOIDC bool `mapstructure:"enableoidc"` @@ -356,9 +361,14 @@ type configWithType struct { Debug struct { Type string; Value bool } Director struct { AdvertisementTTL struct { Type string; Value time.Duration } + AssumePresenceAtSingleOrigin struct { Type string; Value bool } + CachePresenceCapacity struct { Type string; Value int } + CachePresenceTTL struct { Type string; Value time.Duration } CacheResponseHostnames struct { Type string; Value []string } CacheSortMethod struct { Type string; Value string } CachesPullFromCaches struct { Type string; Value bool } + CheckCachePresence struct { Type string; Value bool } + CheckOriginPresence struct { Type string; Value bool } DefaultResponse struct { Type string; Value string } EnableBroker struct { Type string; Value bool } EnableOIDC struct { Type string; Value bool } diff --git a/utils/errgroup.go b/utils/errgroup.go new file mode 100644 index 000000000..6f518be75 --- /dev/null +++ b/utils/errgroup.go @@ -0,0 +1,176 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in https://cs.opensource.google/go/x/sync/+/refs/tags/v0.8.0:LICENSE. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +// +// [errgroup.Group] is related to [sync.WaitGroup] but adds handling of tasks +// returning errors. +// +// This is a fork of the upstream golang code at: +// https://cs.opensource.google/go/x/sync/+/refs/tags/v0.8.0:errgroup/errgroup.go +// +// It adds the ability to block in TryGo until a context has expired. + +package utils + +import ( + "context" + "fmt" + "sync" +) + +type token struct{} + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid, has no limit on the number of active goroutines, +// and does not cancel on error. +type Group struct { + cancel func(error) + + wg sync.WaitGroup + + sem chan token + + errOnce sync.Once + err error +} + +func (g *Group) done() { + if g.sem != nil { + <-g.sem + } + g.wg.Done() +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel(g.err) + } + return g.err +} + +// Go calls the given function in a new goroutine. +// It blocks until the new goroutine can be added without the number of +// active goroutines in the group exceeding the configured limit. +// +// The first call to return a non-nil error cancels the group's context, if the +// group was created by calling WithContext. The error will be returned by Wait. +func (g *Group) Go(f func() error) { + if g.sem != nil { + g.sem <- token{} + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() +} + +// TryGo calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// It will block until the provided context is cancelled. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGo(f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + default: + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// TryGoUntil calls the given function in a new goroutine only if the number of +// active goroutines in the group is currently below the configured limit. +// +// If the group is at the configured limit, the call will block until the +// provided context is cancelled. +// +// The return value reports whether the goroutine was started. +func (g *Group) TryGoUntil(ctx context.Context, f func() error) bool { + if g.sem != nil { + select { + case g.sem <- token{}: + // Note: this allows barging iff channels in general allow barging. + case <-ctx.Done(): + return false + } + } + + g.wg.Add(1) + go func() { + defer g.done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel(g.err) + } + }) + } + }() + return true +} + +// SetLimit limits the number of active goroutines in this group to at most n. +// A negative value indicates no limit. +// +// Any subsequent call to the Go method will block until it can add an active +// goroutine without exceeding the configured limit. +// +// The limit must not be modified while any goroutines in the group are active. +func (g *Group) SetLimit(n int) { + if n < 0 { + g.sem = nil + return + } + if len(g.sem) != 0 { + panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem))) + } + g.sem = make(chan token, n) +}