Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Prohibited Caches for Federation Prefixes #1769

Closed
1 change: 1 addition & 0 deletions config/resources/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Director:
AssumePresenceAtSingleOrigin: true
CachePresenceTTL: 1m
CachePresenceCapacity: 10000
ProhibitedCachesRefreshInterval: 1m
Cache:
Port: 8442
SelfTest: true
Expand Down
3 changes: 3 additions & 0 deletions director/advertise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func TestAdvertiseOSDF(t *testing.T) {
require.NotNil(t, foundServer.NamespaceAds)
assert.True(t, foundServer.NamespaceAds[0].FromTopology)

// Override the check for prohibited caches
prohibitedCachesLastSetTimestamp.Store(time.Now().Unix())

// Test a few values. If they're correct, it indicates the whole process likely succeeded
nsAd, oAds, cAds := getAdsForPath("/my/server/path/to/file")
assert.Equal(t, "/my/server", nsAd.Path)
Expand Down
47 changes: 46 additions & 1 deletion director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,51 @@ func matchesPrefix(reqPath string, namespaceAds []server_structs.NamespaceAdV2)
return best
}

// isProhibited checks if the given server is prohibited from serving
// the specified namespace. It validates this by referencing the
// prohibitedCaches map that the director maintains in memory.
func isProhibited(nsAd *server_structs.NamespaceAdV2, ad *server_structs.Advertisement) bool {
if ad.Type == server_structs.OriginType.String() {
return false
}
if prohibitedCachesLastSetTimestamp.Load() == 0 {
log.Warning("Prohibited caches data is not set, waiting for it to be set before continuing with cache matchmaking")
start := time.Now()
// Wait until last set timestamp is updated
for prohibitedCachesLastSetTimestamp.Load() == 0 {
if time.Since(start) >= 3*time.Second {
log.Error("Prohibited caches data was not set within the 3-second timeout")
break
}
time.Sleep(100 * time.Millisecond)
}
}
if time.Since(time.Unix(prohibitedCachesLastSetTimestamp.Load(), 0)) >= 15*time.Minute {
log.Error("Prohibited caches data is outdated, caches will not be used.")
return true
}

serverHost := ad.ServerAd.URL.Host
serverHostname := strings.Split(serverHost, ":")[0]

prohibitedCachesData := prohibitedCaches.Load()
if prohibitedCachesData == nil {
return false
}

for prefix, caches := range *prohibitedCachesData {
if strings.HasPrefix(nsAd.Path, prefix) {
for _, cacheHostname := range caches {
if strings.EqualFold(serverHostname, cacheHostname) {
return true
}
}
}
}

return false
}

func getAdsForPath(reqPath string) (originNamespace server_structs.NamespaceAdV2, originAds []server_structs.ServerAd, cacheAds []server_structs.ServerAd) {
skippedServers := []server_structs.ServerAd{}

Expand All @@ -326,7 +371,7 @@ func getAdsForPath(reqPath string) (originNamespace server_structs.NamespaceAdV2
log.Debugf("Skipping %s server %s as it's in the filtered server list with type %s", ad.Type, ad.Name, ft)
continue
}
if ns := matchesPrefix(reqPath, ad.NamespaceAds); ns != nil {
if ns := matchesPrefix(reqPath, ad.NamespaceAds); ns != nil && !isProhibited(ns, ad) {
if best == nil || len(ns.Path) > len(best.Path) {
best = ns
// If anything was previously set by a namespace that constituted a shorter
Expand Down
16 changes: 16 additions & 0 deletions director/cache_ads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func TestGetAdsForPath(t *testing.T) {
recordAd(context.Background(), cacheAd1, &c1Slice)
recordAd(context.Background(), cacheAd2, &o1Slice)

// Override the check for prohibited caches
prohibitedCachesLastSetTimestamp.Store(time.Now().Unix())

// If /chtc is served both from topology and Pelican, the Topology server/namespace should be ignored
nsAd, oAds, cAds := getAdsForPath("/chtc")
assert.Equal(t, "/chtc", nsAd.Path)
Expand Down Expand Up @@ -210,6 +213,19 @@ func TestGetAdsForPath(t *testing.T) {
assert.Equal(t, 0, len(oAds))
assert.Equal(t, 0, len(cAds))

// Prohibited caches should not be matched
prohibitedCaches.Store(&map[string][]string{
"/chtc": {"cache1.wisc.edu"},
})

nsAd, oAds, cAds = getAdsForPath("/chtc/PUBLIC")
assert.Equal(t, 1, len(oAds))
assert.Equal(t, 0, len(cAds))
assert.True(t, hasServerAdWithName(oAds, "origin2"))
assert.False(t, hasServerAdWithName(cAds, "cache1"))

prohibitedCaches.Store(&map[string][]string{})

// Filtered server should not be included
filteredServersMutex.Lock()
tmp := filteredServers
Expand Down
147 changes: 147 additions & 0 deletions director/prohibited_caches.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package director

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
)

var (
// prohibitedCaches maps federation prefixes to a list of cache hostnames where data should not propagate.
prohibitedCaches atomic.Pointer[map[string][]string]
// prohibitedCachesLastSetTimestamp tracks when prohibitedCaches was last explicitly set.
prohibitedCachesLastSetTimestamp atomic.Int64
)

func init() {
emptyMap := make(map[string][]string)
prohibitedCaches.Store(&emptyMap)

// Initialize prohibitedCachesLastSetTimestamp to 0 (indicating never set)
prohibitedCachesLastSetTimestamp.Store(0)
}

// fetchProhibitedCaches makes a request to the registry endpoint to retrieve
// information about prohibited caches and returns the result.
func fetchProhibitedCaches(ctx context.Context) (map[string][]string, error) {
fedInfo, err := config.GetFederation(ctx)
if err != nil {
return nil, err
}
registryUrlStr := fedInfo.RegistryEndpoint
registryUrl, err := url.Parse(registryUrlStr)
if err != nil {
return nil, err
}
reqUrl := registryUrl.JoinPath("/api/v1.0/registry/namespaces/prohibitedCaches")

client := http.Client{Transport: config.GetTransport()}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqUrl.String(), nil)
if err != nil {
return nil, err
}

req.Header.Add("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to fetch prohibited caches from the registry: unexpected status code %d", resp.StatusCode)
}

var result map[string][]string
err = json.NewDecoder(resp.Body).Decode(&result)
if err != nil {
return nil, err
}

return result, nil
}

// LaunchPeriodicProhibitedCachesFetch starts a new goroutine that periodically
// refreshes the prohibited cache data maintained by the director in memory.
func LaunchPeriodicProhibitedCachesFetch(ctx context.Context, egrp *errgroup.Group) {
refreshInterval := param.Director_ProhibitedCachesRefreshInterval.GetDuration()

if refreshInterval < 1*time.Millisecond {
log.Warnf("Director.ProhibitedCachesRefreshInterval is set to: %v, which is too low. Falling back to default: 1m", refreshInterval)

viper.Set("Director.ProhibitedCachesRefreshInterval", "1m")
refreshInterval = 1 * time.Minute
}

ticker := time.NewTicker(refreshInterval)

// Initial fetch of prohibited caches
data, err := fetchProhibitedCaches(ctx)
if err != nil {
ticker.Reset(1 * time.Second) // Higher frequency (10s)
log.Warningf("Error fetching prohibited caches on first attempt: %v", err)
log.Debug("Switching to higher frequency (1s) for prohibited caches fetch")
} else {
prohibitedCaches.Store(&data)
prohibitedCachesLastSetTimestamp.Store(time.Now().Unix())
log.Debug("Prohibited caches updated successfully on first attempt")
}

egrp.Go(func() error {
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Fetch the prohibited caches
data, err := fetchProhibitedCaches(ctx)
if err != nil {
log.Warningf("Error fetching prohibited caches: %v", err)
lastSet := prohibitedCachesLastSetTimestamp.Load()
if time.Since(time.Unix(lastSet, 0)) >= 15*time.Minute {
log.Debug("Prohibited caches last updated over 15 minutes ago, switching to higher frequency")
ticker.Reset(1 * time.Second) // Higher frequency (1s)
}
continue
}
ticker.Reset(refreshInterval) // Normal frequency
prohibitedCaches.Store(&data)
prohibitedCachesLastSetTimestamp.Store(time.Now().Unix())
log.Debug("Prohibited caches updated successfully")
case <-ctx.Done():
log.Debug("Periodic fetch terminated")
return nil
}
}
})
}
98 changes: 98 additions & 0 deletions director/prohibited_caches_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/***************************************************************
*
* Copyright (C) 2024, Pelican Project, Morgridge Institute for Research
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
***************************************************************/

package director

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/pelicanplatform/pelican/config"
)

// TestLaunchPeriodicProhibitedCachesFetch tests whether the prohibited caches data is periodically updated from the registry.
func TestLaunchPeriodicProhibitedCachesFetch(t *testing.T) {
config.ResetConfig()
defer config.ResetConfig()

mockDataChan := make(chan map[string][]string, 2)
mockData := map[string][]string{
"/foo/bar": {"hostname5", "hostname6"},
}
mockDataChan <- mockData

var lastData map[string][]string

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
select {
case currentData := <-mockDataChan:
lastData = currentData
default:
}
if lastData == nil {
lastData = make(map[string][]string)
}
if err := json.NewEncoder(w).Encode(lastData); err != nil {
log.Errorf("Failed to encode response: %v", err)
}
}))
defer server.Close()

// Set the registry URL to the mock server.
viper.Set("Federation.Registryurl", server.URL)
viper.Set("Director.ProhibitedCachesRefreshInterval", "200ms")

ctx, cancel := context.WithCancel(context.Background())
egrp := &errgroup.Group{}

LaunchPeriodicProhibitedCachesFetch(ctx, egrp)

time.Sleep(500 * time.Millisecond)

currentMapPtr := prohibitedCaches.Load()
assert.NotNil(t, currentMapPtr, "prohibitedCaches should not be nil")

assert.Equal(t, mockData, *currentMapPtr, "prohibitedCaches does not match the expected value")

mockData = map[string][]string{
"/foo/bar": {"hostname3", "hostname4"},
}
mockDataChan <- mockData

time.Sleep(500 * time.Millisecond)

currentMapPtr = prohibitedCaches.Load()
assert.NotNil(t, currentMapPtr, "prohibitedCaches should not be nil")

assert.Equal(t, mockData, *currentMapPtr, "prohibitedCaches does not match the expected value")

cancel()

require.NoError(t, egrp.Wait(), "Periodic fetch goroutine did not terminate properly")
}
8 changes: 8 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,14 @@ default: 10000
hidden: true
components: ["director"]
---
name: Director.ProhibitedCachesRefreshInterval
description: |+
Specifies the time interval after which the director queries the registry to refresh its in-memory data about prohibited caches.
Prohibited caches data is essentially a mapping of federation prefixes to a list of caches to which the data should not be propagated for the given prefix.
type: duration
default: 1min
components: ["director"]
---
############################
# Registry-level configs #
############################
Expand Down
2 changes: 2 additions & 0 deletions launchers/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group

director.LaunchServerIOQuery(ctx, egrp)

director.LaunchPeriodicProhibitedCachesFetch(ctx, egrp)

if config.GetPreferredPrefix() == config.OsdfPrefix {
metrics.SetComponentHealthStatus(metrics.DirectorRegistry_Topology, metrics.StatusWarning, "Start requesting from topology, status unknown")
log.Info("Generating/advertising server ads from OSG topology service...")
Expand Down
1 change: 1 addition & 0 deletions param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading