Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#933 from haoming29/pause-redirect
Browse files Browse the repository at this point in the history
Allow admin to configure filtering servers for redirecting traffic at the director
  • Loading branch information
jhiemstrawisc authored Mar 25, 2024
2 parents f8ac1da + c68cabb commit adf0924
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 42 deletions.
19 changes: 17 additions & 2 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ import (
log "github.com/sirupsen/logrus"
)

type filterType string

const (
permFiltered filterType = "permFiltered" // Read from Director.FilteredServers
tempFiltered filterType = "tempFiltered" // Filtered by web UI
tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI
)

var (
serverAds = ttlcache.New[common.ServerAd, []common.NamespaceAdV2](ttlcache.WithTTL[common.ServerAd, []common.NamespaceAdV2](15 * time.Minute))
serverAdMutex = sync.RWMutex{}
serverAds = ttlcache.New(ttlcache.WithTTL[common.ServerAd, []common.NamespaceAdV2](15 * time.Minute))
serverAdMutex = sync.RWMutex{}
filteredServers = map[string]filterType{}
filteredServersMutex = sync.RWMutex{}
)

func recordAd(ad common.ServerAd, namespaceAds *[]common.NamespaceAdV2) {
Expand Down Expand Up @@ -117,6 +127,7 @@ func matchesPrefix(reqPath string, namespaceAds []common.NamespaceAdV2) *common.
return best
}

// Get the longest matches namespace prefix with corresponding serverAds and namespace Ads given a path to an object
func getAdsForPath(reqPath string) (originNamespace common.NamespaceAdV2, originAds []common.ServerAd, cacheAds []common.ServerAd) {
serverAdMutex.RLock()
defer serverAdMutex.RUnlock()
Expand All @@ -135,6 +146,10 @@ func getAdsForPath(reqPath string) (originNamespace common.NamespaceAdV2, origin
continue
}
serverAd := item.Key()
if filtered, ft := checkFilter(serverAd.Name); filtered {
log.Debugf("Skipping %s server %s as it's in the filtered server list with type %s", serverAd.Type, serverAd.Name, ft)
continue
}
if serverAd.Type == common.OriginType {
if ns := matchesPrefix(reqPath, item.Value()); ns != nil {
if best == nil || len(ns.Path) > len(best.Path) {
Expand Down
19 changes: 19 additions & 0 deletions director/cache_ads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,25 @@ func TestGetAdsForPath(t *testing.T) {
assert.Equal(t, nsAd.Path, "")
assert.Equal(t, len(oAds), 0)
assert.Equal(t, len(cAds), 0)

// Filtered server should not be included
filteredServersMutex.Lock()
tmp := filteredServers
filteredServers = map[string]filterType{"origin1": permFiltered, "cache1": tempFiltered}
filteredServersMutex.Unlock()
defer func() {
filteredServersMutex.Lock()
filteredServers = tmp
filteredServersMutex.Unlock()
}()

nsAd, oAds, cAds = getAdsForPath("/chtc")
assert.Equal(t, nsAd.Path, "/chtc")
assert.Equal(t, len(oAds), 0)
assert.Equal(t, len(cAds), 1)
assert.False(t, hasServerAdWithName(oAds, "origin1"))
assert.False(t, hasServerAdWithName(cAds, "cache1"))
assert.True(t, hasServerAdWithName(cAds, "cache2"))
}

func TestConfigCacheEviction(t *testing.T) {
Expand Down
99 changes: 83 additions & 16 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ type (
}

listServerResponse struct {
Name string `json:"name"`
AuthURL string `json:"authUrl"`
URL string `json:"url"` // This is server's XRootD URL for file transfer
WebURL string `json:"webUrl"` // This is server's Web interface and API
Type common.ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Status HealthTestStatus `json:"status"`
Name string `json:"name"`
AuthURL string `json:"authUrl"`
BrokerURL string `json:"brokerUrl"`
URL string `json:"url"` // This is server's XRootD URL for file transfer
WebURL string `json:"webUrl"` // This is server's Web interface and API
Type common.ServerType `json:"type"`
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Writes bool `json:"enableWrite"`
DirectReads bool `json:"enableFallbackRead"`
Filtered bool `json:"filtered"`
FilteredType filterType `json:"filteredType"`
Status HealthTestStatus `json:"status"`
}

statResponse struct {
Expand Down Expand Up @@ -92,15 +97,21 @@ func listServers(ctx *gin.Context) {
if ok {
healthStatus = healthUtil.Status
}
filtered, ft := checkFilter(server.Name)
res := listServerResponse{
Name: server.Name,
AuthURL: server.AuthURL.String(),
URL: server.URL.String(),
WebURL: server.WebURL.String(),
Type: server.Type,
Latitude: server.Latitude,
Longitude: server.Longitude,
Status: healthStatus,
Name: server.Name,
BrokerURL: server.BrokerURL.String(),
AuthURL: server.AuthURL.String(),
URL: server.URL.String(),
WebURL: server.WebURL.String(),
Type: server.Type,
Latitude: server.Latitude,
Longitude: server.Longitude,
Writes: server.Writes,
DirectReads: server.DirectReads,
Filtered: filtered,
FilteredType: ft,
Status: healthStatus,
}
resList = append(resList, res)
}
Expand Down Expand Up @@ -148,11 +159,67 @@ func queryOrigins(ctx *gin.Context) {
ctx.JSON(http.StatusOK, res)
}

// A gin route handler that given a server hostname through path variable `name`,
// checks and adds the server to a list of servers to be bypassed when the director redirects
// object requests from the client
func handleFilterServer(ctx *gin.Context) {
sn := strings.TrimPrefix(ctx.Param("name"), "/")
if sn == "" {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "name is a required path parameter"})
return
}
filtered, filterType := checkFilter(sn)
if filtered {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't filter a server that already has been fitlered with type " + filterType})
return
}
filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()

// If we previously temporarily allowed a server, we switch to permFiltered (reset)
if filterType == tempAllowed {
filteredServers[sn] = permFiltered
} else {
filteredServers[sn] = tempFiltered
}
ctx.JSON(http.StatusOK, gin.H{"message": "success"})
}

// A gin route handler that given a server hostname through path variable `name`,
// checks and removes the server from a list of servers to be bypassed when the director redirects
// object requests from the client
func handleAllowServer(ctx *gin.Context) {
sn := strings.TrimPrefix(ctx.Param("name"), "/")
if sn == "" {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "name is a required path parameter"})
return
}
filtered, ft := checkFilter(sn)
if !filtered {
ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't allow a server that is not being filtered. " + ft})
return
}

filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()

if ft == tempFiltered {
// For temporarily filtered server, allowing them by removing the server from the map
delete(filteredServers, sn)
} else if ft == permFiltered {
// For servers to filter from the config, temporarily allow the server
filteredServers[sn] = tempAllowed
}
ctx.JSON(http.StatusOK, gin.H{"message": "success"})
}

func RegisterDirectorWebAPI(router *gin.RouterGroup) {
directorWebAPI := router.Group("/api/v1.0/director_ui")
// Follow RESTful schema
{
directorWebAPI.GET("/servers", listServers)
directorWebAPI.PATCH("/servers/filter/*name", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleFilterServer)
directorWebAPI.PATCH("/servers/allow/*name", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleAllowServer)
directorWebAPI.GET("/servers/origins/stat/*path", web_ui.AuthHandler, queryOrigins)
directorWebAPI.HEAD("/servers/origins/stat/*path", web_ui.AuthHandler, queryOrigins)
}
Expand Down
41 changes: 41 additions & 0 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jellydator/ttlcache/v3"
"github.com/pelicanplatform/pelican/common"
"github.com/pelicanplatform/pelican/param"

log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,6 +61,32 @@ func listServerAds(serverTypes []common.ServerType) []common.ServerAd {
return ads
}

// Check if a server is filtered from "production" servers by
// checking if a serverName is in the filteredServers map
func checkFilter(serverName string) (bool, filterType) {
filteredServersMutex.RLock()
defer filteredServersMutex.RUnlock()

status, exists := filteredServers[serverName]
// No filter entry
if !exists {
return false, ""
} else {
// Has filter entry
switch status {
case permFiltered:
return true, permFiltered
case tempFiltered:
return true, tempFiltered
case tempAllowed:
return false, tempAllowed
default:
log.Error("Unknown filterType: ", status)
return false, ""
}
}
}

// Configure TTL caches to enable cache eviction and other additional cache events handling logic
//
// The `ctx` is the context for listening to server shutdown event in order to cleanup internal cache eviction
Expand Down Expand Up @@ -118,3 +145,17 @@ func ConfigTTLCache(ctx context.Context, egrp *errgroup.Group) {
return nil
})
}

// Populate internal filteredServers map by Director.FilteredServers
func ConfigFilterdServers() {
filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()

if !param.Director_FilteredServers.IsSet() {
return
}

for _, sn := range param.Director_FilteredServers.GetStringSlice() {
filteredServers[sn] = permFiltered
}
}
63 changes: 63 additions & 0 deletions director/director_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,66 @@ func TestListServerAds(t *testing.T) {
assert.True(t, adsCache[0] == mockCacheServerAd)
})
}

func TestCheckFilter(t *testing.T) {
testCases := []struct {
name string
mapItems map[string]filterType
serverToTest string
filtered bool
ft filterType
}{
{
name: "empty-list-return-false",
serverToTest: "mock",
filtered: false,
},
{
name: "dne-return-false",
serverToTest: "mock",
mapItems: map[string]filterType{"no-your-server": permFiltered},
filtered: false,
},
{
name: "perm-return-true",
serverToTest: "mock",
mapItems: map[string]filterType{"mock": permFiltered, "no-your-server": tempFiltered},
filtered: true,
ft: permFiltered,
},
{
name: "temp-filter-return-true",
serverToTest: "mock",
mapItems: map[string]filterType{"mock": tempFiltered, "no-your-server": permFiltered},
filtered: true,
ft: tempFiltered,
},
{
name: "temp-allow-return-false",
serverToTest: "mock",
mapItems: map[string]filterType{"mock": tempAllowed, "no-your-server": permFiltered},
filtered: false,
ft: tempAllowed,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
filteredServersMutex.Lock()
tmpMap := filteredServers
filteredServers = tc.mapItems
filteredServersMutex.Unlock()

defer func() {
filteredServersMutex.Lock()
filteredServers = tmpMap
filteredServersMutex.Unlock()
}()

getFilter, getType := checkFilter(tc.serverToTest)
assert.Equal(t, tc.filtered, getFilter)
assert.Equal(t, tc.ft, getType)

})
}
}
Loading

0 comments on commit adf0924

Please sign in to comment.