From d6e75475d9878a42cb7b89f9b4f685859c5aeda9 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Mon, 11 Mar 2024 18:22:34 +0000 Subject: [PATCH 1/5] Add parameter to pause redirect traffic --- docs/parameters.yaml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 2de05c237..fa58d5cec 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -822,6 +822,15 @@ type: bool default: true components: ["director"] --- +name: Director.FilteredServers +description: >- + A list of server names to not to redirect client requests to. This is mutual exclusive with + pausing redirect to a server using the web UI. If this is a non-empty list, the web UI + can't pause or resume redirect to a server. +type: stringSlice +default: none +components: ["director"] +--- ############################ # Registry-level configs # ############################ From 99ab296acd55a2ca4e85bb21d3e1ca0bf66aed1e Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Mon, 11 Mar 2024 20:09:58 +0000 Subject: [PATCH 2/5] Kill warnings --- registry/registry.go | 4 ++-- registry/registry_ui.go | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/registry/registry.go b/registry/registry.go index 20d03436c..ff2b50982 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -190,7 +190,7 @@ func verifySignature(payload []byte, signature []byte, publicKey *ecdsa.PublicKe } // Generate server nonce for key-sign challenge -func keySignChallengeInit(ctx *gin.Context, data *registrationData) (map[string]interface{}, error) { +func keySignChallengeInit(data *registrationData) (map[string]interface{}, error) { serverNonce, err := generateNonce() if err != nil { return nil, errors.Wrap(err, "Failed to generate nonce for key-sign challenge") @@ -359,7 +359,7 @@ func keySignChallenge(ctx *gin.Context, data *registrationData) (bool, map[strin return created, res, nil } } else if data.ClientNonce != "" { - res, err := keySignChallengeInit(ctx, data) + res, err := keySignChallengeInit(data) if err != nil { return false, nil, err } else { diff --git a/registry/registry_ui.go b/registry/registry_ui.go index 221db2464..5333219d4 100644 --- a/registry/registry_ui.go +++ b/registry/registry_ui.go @@ -484,12 +484,9 @@ func createUpdateNamespace(ctx *gin.Context, isUpdate bool) { return } - if validCF, err := validateCustomFields(ns.CustomFields, true); !validCF { - if err != nil { - ctx.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Error validating custom fields: %v", err)}) - return - } - ctx.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid custom field: %s", err.Error())}) + validCF, err := validateCustomFields(ns.CustomFields, true) + if !validCF && err != nil { + ctx.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid custom fields: %s", err.Error())}) return } From 73d39cc270c7b7d3b5f249de36f1d4eafa4c9c48 Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Mon, 11 Mar 2024 20:55:15 +0000 Subject: [PATCH 3/5] Add API for filter servers to be redirected to --- director/cache_ads.go | 19 ++- director/cache_ads_test.go | 19 +++ director/director.go | 93 +++++++++++--- director/director_api.go | 41 +++++++ director/director_api_test.go | 63 ++++++++++ director/director_test.go | 224 +++++++++++++++++++++++++++++++--- docs/parameters.yaml | 4 +- launchers/director_serve.go | 2 + param/parameters.go | 1 + param/parameters_struct.go | 2 + 10 files changed, 431 insertions(+), 37 deletions(-) diff --git a/director/cache_ads.go b/director/cache_ads.go index a70d7790b..0ae80c633 100644 --- a/director/cache_ads.go +++ b/director/cache_ads.go @@ -34,9 +34,19 @@ import ( log "github.com/sirupsen/logrus" ) +type filterType string + +const ( + permFiltered filterType = "permFiltered" // Read from Director.FilteredServers + tempFiltered filterType = "tempFiltered" // Filtered by web UI + tempAllowed filterType = "tempAllowed" // Read from Director.FilteredServers but mutated by web UI +) + var ( - serverAds = ttlcache.New[common.ServerAd, []common.NamespaceAdV2](ttlcache.WithTTL[common.ServerAd, []common.NamespaceAdV2](15 * time.Minute)) - serverAdMutex = sync.RWMutex{} + serverAds = ttlcache.New(ttlcache.WithTTL[common.ServerAd, []common.NamespaceAdV2](15 * time.Minute)) + serverAdMutex = sync.RWMutex{} + filteredServers = map[string]filterType{} + filteredServersMutex = sync.RWMutex{} ) func recordAd(ad common.ServerAd, namespaceAds *[]common.NamespaceAdV2) { @@ -117,6 +127,7 @@ func matchesPrefix(reqPath string, namespaceAds []common.NamespaceAdV2) *common. return best } +// Get the longest matches namespace prefix with corresponding serverAds and namespace Ads given a path to an object func getAdsForPath(reqPath string) (originNamespace common.NamespaceAdV2, originAds []common.ServerAd, cacheAds []common.ServerAd) { serverAdMutex.RLock() defer serverAdMutex.RUnlock() @@ -135,6 +146,10 @@ func getAdsForPath(reqPath string) (originNamespace common.NamespaceAdV2, origin continue } serverAd := item.Key() + if filtered, ft := checkFilter(serverAd.Name); filtered { + log.Debugf("Skipping %s server %s as it's in the filtered server list with type %s", serverAd.Type, serverAd.Name, ft) + continue + } if serverAd.Type == common.OriginType { if ns := matchesPrefix(reqPath, item.Value()); ns != nil { if best == nil || len(ns.Path) > len(best.Path) { diff --git a/director/cache_ads_test.go b/director/cache_ads_test.go index 43ffa128a..12041fdd1 100644 --- a/director/cache_ads_test.go +++ b/director/cache_ads_test.go @@ -193,6 +193,25 @@ func TestGetAdsForPath(t *testing.T) { assert.Equal(t, nsAd.Path, "") assert.Equal(t, len(oAds), 0) assert.Equal(t, len(cAds), 0) + + // Filtered server should not be included + filteredServersMutex.Lock() + tmp := filteredServers + filteredServers = map[string]filterType{"origin1": permFiltered, "cache1": tempFiltered} + filteredServersMutex.Unlock() + defer func() { + filteredServersMutex.Lock() + filteredServers = tmp + filteredServersMutex.Unlock() + }() + + nsAd, oAds, cAds = getAdsForPath("/chtc") + assert.Equal(t, nsAd.Path, "/chtc") + assert.Equal(t, len(oAds), 0) + assert.Equal(t, len(cAds), 1) + assert.False(t, hasServerAdWithName(oAds, "origin1")) + assert.False(t, hasServerAdWithName(cAds, "cache1")) + assert.True(t, hasServerAdWithName(cAds, "cache2")) } func TestConfigCacheEviction(t *testing.T) { diff --git a/director/director.go b/director/director.go index 2315bf966..958b65cc7 100644 --- a/director/director.go +++ b/director/director.go @@ -35,14 +35,19 @@ type ( } listServerResponse struct { - Name string `json:"name"` - AuthURL string `json:"authUrl"` - URL string `json:"url"` // This is server's XRootD URL for file transfer - WebURL string `json:"webUrl"` // This is server's Web interface and API - Type common.ServerType `json:"type"` - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Status HealthTestStatus `json:"status"` + Name string `json:"name"` + AuthURL string `json:"authUrl"` + BrokerURL string `json:"brokerUrl"` + URL string `json:"url"` // This is server's XRootD URL for file transfer + WebURL string `json:"webUrl"` // This is server's Web interface and API + Type common.ServerType `json:"type"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + EnableWrite bool `json:"enableWrite"` + EnableFallbackRead bool `json:"enableFallbackRead"` + Filtered bool `json:"filtered"` + FilteredType filterType `json:"filteredType"` + Status HealthTestStatus `json:"status"` } statResponse struct { @@ -92,15 +97,21 @@ func listServers(ctx *gin.Context) { if ok { healthStatus = healthUtil.Status } + filtered, ft := checkFilter(server.Name) res := listServerResponse{ - Name: server.Name, - AuthURL: server.AuthURL.String(), - URL: server.URL.String(), - WebURL: server.WebURL.String(), - Type: server.Type, - Latitude: server.Latitude, - Longitude: server.Longitude, - Status: healthStatus, + Name: server.Name, + BrokerURL: server.BrokerURL.String(), + AuthURL: server.AuthURL.String(), + URL: server.URL.String(), + WebURL: server.WebURL.String(), + Type: server.Type, + Latitude: server.Latitude, + Longitude: server.Longitude, + EnableWrite: server.EnableWrite, + EnableFallbackRead: server.EnableFallbackRead, + Filtered: filtered, + FilteredType: ft, + Status: healthStatus, } resList = append(resList, res) } @@ -148,11 +159,61 @@ func queryOrigins(ctx *gin.Context) { ctx.JSON(http.StatusOK, res) } +func handleFilterServer(ctx *gin.Context) { + sn := strings.TrimPrefix(ctx.Param("name"), "/") + if sn == "" { + ctx.JSON(http.StatusBadRequest, gin.H{"error": "name is a required path parameter"}) + return + } + filtered, ft := checkFilter(sn) + if filtered { + ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't filter a server that already has been fitlered with type " + ft}) + return + } + filteredServersMutex.Lock() + defer filteredServersMutex.Unlock() + + // If we previously temporarily allowed a server, we switch to permFiltered (reset) + if ft == tempAllowed { + filteredServers[sn] = permFiltered + } else { + filteredServers[sn] = tempFiltered + } + ctx.JSON(http.StatusOK, gin.H{"message": "success"}) +} + +func handleAllowServer(ctx *gin.Context) { + sn := strings.TrimPrefix(ctx.Param("name"), "/") + if sn == "" { + ctx.JSON(http.StatusBadRequest, gin.H{"error": "name is a required path parameter"}) + return + } + filtered, ft := checkFilter(sn) + if !filtered { + ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't allow a server that is not being filtered. " + ft}) + return + } + + filteredServersMutex.Lock() + defer filteredServersMutex.Unlock() + + if ft == tempFiltered { + // For temporarily filtered server, allowing them by removing the server from the map + delete(filteredServers, sn) + } else if ft == permFiltered { + // For servers to filter from the config, temporarily allow the server + filteredServers[sn] = tempAllowed + } + ctx.JSON(http.StatusOK, gin.H{"message": "success"}) +} + func RegisterDirectorWebAPI(router *gin.RouterGroup) { directorWebAPI := router.Group("/api/v1.0/director_ui") // Follow RESTful schema { directorWebAPI.GET("/servers", listServers) + directorWebAPI.PATCH("/servers/filter/*name", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleFilterServer) + directorWebAPI.PATCH("/servers/allow/*name", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleAllowServer) directorWebAPI.GET("/servers/origins/stat/*path", web_ui.AuthHandler, queryOrigins) directorWebAPI.HEAD("/servers/origins/stat/*path", web_ui.AuthHandler, queryOrigins) } diff --git a/director/director_api.go b/director/director_api.go index 2b5653484..9d2723703 100644 --- a/director/director_api.go +++ b/director/director_api.go @@ -24,6 +24,7 @@ import ( "github.com/jellydator/ttlcache/v3" "github.com/pelicanplatform/pelican/common" + "github.com/pelicanplatform/pelican/param" log "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" @@ -60,6 +61,32 @@ func listServerAds(serverTypes []common.ServerType) []common.ServerAd { return ads } +// Check if a server is filtered from "production" servers by +// checking if a serverName is in the filteredServers map +func checkFilter(serverName string) (bool, filterType) { + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + + status, ok := filteredServers[serverName] + // No filter entry + if !ok { + return false, "" + } else { + // Has filter entry + switch status { + case permFiltered: + return true, permFiltered + case tempFiltered: + return true, tempFiltered + case tempAllowed: + return false, tempAllowed + default: + log.Error("Unknown filterType: ", status) + return false, "" + } + } +} + // Configure TTL caches to enable cache eviction and other additional cache events handling logic // // The `ctx` is the context for listening to server shutdown event in order to cleanup internal cache eviction @@ -118,3 +145,17 @@ func ConfigTTLCache(ctx context.Context, egrp *errgroup.Group) { return nil }) } + +// Populate internal filteredServers map by Director.FilteredServers +func ConfigFilterdServers() { + filteredServersMutex.Lock() + defer filteredServersMutex.Unlock() + + if len(param.Director_FilteredServers.GetStringSlice()) == 0 { + return + } + + for _, sn := range param.Director_FilteredServers.GetStringSlice() { + filteredServers[sn] = permFiltered + } +} diff --git a/director/director_api_test.go b/director/director_api_test.go index d9a3330cc..298370b5f 100644 --- a/director/director_api_test.go +++ b/director/director_api_test.go @@ -170,3 +170,66 @@ func TestListServerAds(t *testing.T) { assert.True(t, adsCache[0] == mockCacheServerAd) }) } + +func TestCheckFilter(t *testing.T) { + testCases := []struct { + name string + mapItems map[string]filterType + serverToTest string + filtered bool + ft filterType + }{ + { + name: "empty-list-return-false", + serverToTest: "mock", + filtered: false, + }, + { + name: "dne-return-false", + serverToTest: "mock", + mapItems: map[string]filterType{"no-your-server": permFiltered}, + filtered: false, + }, + { + name: "perm-return-true", + serverToTest: "mock", + mapItems: map[string]filterType{"mock": permFiltered, "no-your-server": tempFiltered}, + filtered: true, + ft: permFiltered, + }, + { + name: "temp-filter-return-true", + serverToTest: "mock", + mapItems: map[string]filterType{"mock": tempFiltered, "no-your-server": permFiltered}, + filtered: true, + ft: tempFiltered, + }, + { + name: "temp-allow-return-false", + serverToTest: "mock", + mapItems: map[string]filterType{"mock": tempAllowed, "no-your-server": permFiltered}, + filtered: false, + ft: tempAllowed, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + filteredServersMutex.Lock() + tmpMap := filteredServers + filteredServers = tc.mapItems + filteredServersMutex.Unlock() + + defer func() { + filteredServersMutex.Lock() + filteredServers = tmpMap + filteredServersMutex.Unlock() + }() + + getFilter, getType := checkFilter(tc.serverToTest) + assert.Equal(t, tc.filtered, getFilter) + assert.Equal(t, tc.ft, getType) + + }) + } +} diff --git a/director/director_test.go b/director/director_test.go index 61b1cda60..9f8b5ebe2 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -20,6 +20,7 @@ package director import ( "encoding/json" + "io" "net/http" "net/http/httptest" "testing" @@ -45,24 +46,30 @@ func TestListServers(t *testing.T) { }() mocklistOriginRes := listServerResponse{ - Name: mockOriginServerAd.Name, - AuthURL: mockOriginServerAd.AuthURL.String(), - URL: mockOriginServerAd.URL.String(), - WebURL: mockOriginServerAd.WebURL.String(), - Type: mockOriginServerAd.Type, - Latitude: mockOriginServerAd.Latitude, - Longitude: mockOriginServerAd.Longitude, - Status: HealthStatusUnknown, + Name: mockOriginServerAd.Name, + BrokerURL: mockOriginServerAd.BrokerURL.String(), + AuthURL: mockOriginServerAd.AuthURL.String(), + URL: mockOriginServerAd.URL.String(), + WebURL: mockOriginServerAd.WebURL.String(), + Type: mockOriginServerAd.Type, + Latitude: mockOriginServerAd.Latitude, + Longitude: mockOriginServerAd.Longitude, + EnableWrite: mockOriginServerAd.EnableWrite, + EnableFallbackRead: mockOriginServerAd.EnableFallbackRead, + Status: HealthStatusUnknown, } mocklistCacheRes := listServerResponse{ - Name: mockCacheServerAd.Name, - AuthURL: mockCacheServerAd.AuthURL.String(), - URL: mockCacheServerAd.URL.String(), - WebURL: mockCacheServerAd.WebURL.String(), - Type: mockCacheServerAd.Type, - Latitude: mockCacheServerAd.Latitude, - Longitude: mockCacheServerAd.Longitude, - Status: HealthStatusUnknown, + Name: mockCacheServerAd.Name, + BrokerURL: mockCacheServerAd.BrokerURL.String(), + AuthURL: mockCacheServerAd.AuthURL.String(), + URL: mockCacheServerAd.URL.String(), + WebURL: mockCacheServerAd.WebURL.String(), + Type: mockCacheServerAd.Type, + Latitude: mockCacheServerAd.Latitude, + Longitude: mockCacheServerAd.Longitude, + EnableWrite: mockCacheServerAd.EnableWrite, + EnableFallbackRead: mockCacheServerAd.EnableFallbackRead, + Status: HealthStatusUnknown, } t.Run("query-origin", func(t *testing.T) { @@ -145,3 +152,188 @@ func TestListServers(t *testing.T) { require.Equal(t, 400, w.Code) }) } + +func TestHandleFilterServer(t *testing.T) { + t.Cleanup(func() { + filteredServersMutex.Lock() + defer filteredServersMutex.Unlock() + filteredServers = map[string]filterType{} + }) + router := gin.Default() + router.GET("/servers/filter/*name", handleFilterServer) + + t.Run("filter-server-success", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/filter/mock-dne", nil) + filteredServersMutex.Lock() + delete(filteredServers, "mock-dne") + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 200, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, tempFiltered, filteredServers["mock-dne"]) + }) + t.Run("filter-server-w-permFiltered", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/filter/mock-pf", nil) + filteredServersMutex.Lock() + filteredServers["mock-pf"] = permFiltered + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, permFiltered, filteredServers["mock-pf"]) + + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "Can't filter a server that already has been fitlered") + }) + t.Run("filter-server-w-tempFiltered", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/filter/mock-tf", nil) + filteredServersMutex.Lock() + filteredServers["mock-tf"] = tempFiltered + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, tempFiltered, filteredServers["mock-tf"]) + + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "Can't filter a server that already has been fitlered") + }) + t.Run("filter-server-w-tempAllowed", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/filter/mock-ta", nil) + filteredServersMutex.Lock() + filteredServers["mock-ta"] = tempAllowed + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 200, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, permFiltered, filteredServers["mock-ta"]) + }) + t.Run("filter-with-invalid-name", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/filter/", nil) + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "name is a required path parameter") + }) +} + +func TestHandleAllowServer(t *testing.T) { + t.Cleanup(func() { + filteredServersMutex.Lock() + defer filteredServersMutex.Unlock() + filteredServers = map[string]filterType{} + }) + router := gin.Default() + router.GET("/servers/allow/*name", handleAllowServer) + + t.Run("allow-server-that-dne", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/allow/mock-dne", nil) + filteredServersMutex.Lock() + delete(filteredServers, "mock-dne") + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "Can't allow a server that is not being filtered.") + }) + t.Run("allow-server-w-permFiltered", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/allow/mock-pf", nil) + filteredServersMutex.Lock() + filteredServers["mock-pf"] = permFiltered + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 200, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, tempAllowed, filteredServers["mock-pf"]) + }) + t.Run("allow-server-w-tempFiltered", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/allow/mock-tf", nil) + filteredServersMutex.Lock() + filteredServers["mock-tf"] = tempFiltered + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 200, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Empty(t, filteredServers["mock-tf"]) + }) + t.Run("allow-server-w-tempAllowed", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/allow/mock-ta", nil) + filteredServersMutex.Lock() + filteredServers["mock-ta"] = tempAllowed + filteredServersMutex.Unlock() + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + + filteredServersMutex.RLock() + defer filteredServersMutex.RUnlock() + assert.Equal(t, tempAllowed, filteredServers["mock-ta"]) + + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "Can't allow a server that is not being filtered.") + }) + t.Run("allow-with-invalid-name", func(t *testing.T) { + // Create a request to the endpoint + w := httptest.NewRecorder() + req, _ := http.NewRequest("GET", "/servers/allow/", nil) + router.ServeHTTP(w, req) + + // Check the response + require.Equal(t, 400, w.Code) + resB, err := io.ReadAll(w.Body) + require.NoError(t, err) + assert.Contains(t, string(resB), "name is a required path parameter") + }) +} diff --git a/docs/parameters.yaml b/docs/parameters.yaml index fa58d5cec..d4be04951 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -824,9 +824,7 @@ components: ["director"] --- name: Director.FilteredServers description: >- - A list of server names to not to redirect client requests to. This is mutual exclusive with - pausing redirect to a server using the web UI. If this is a non-empty list, the web UI - can't pause or resume redirect to a server. + A list of server names to not to redirect client requests to. type: stringSlice default: none components: ["director"] diff --git a/launchers/director_serve.go b/launchers/director_serve.go index 4b0479eb1..4ebf75c0d 100644 --- a/launchers/director_serve.go +++ b/launchers/director_serve.go @@ -50,6 +50,8 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group director.ConfigTTLCache(ctx, egrp) + director.ConfigFilterdServers() + // Configure the shortcut middleware to either redirect to a cache // or to an origin defaultResponse := param.Director_DefaultResponse.GetString() diff --git a/param/parameters.go b/param/parameters.go index 488a2f051..2178217c1 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -182,6 +182,7 @@ var ( var ( Cache_PermittedNamespaces = StringSliceParam{"Cache.PermittedNamespaces"} Director_CacheResponseHostnames = StringSliceParam{"Director.CacheResponseHostnames"} + Director_FilteredServers = StringSliceParam{"Director.FilteredServers"} Director_OriginResponseHostnames = StringSliceParam{"Director.OriginResponseHostnames"} Issuer_GroupRequirements = StringSliceParam{"Issuer.GroupRequirements"} Monitoring_AggregatePrefixes = StringSliceParam{"Monitoring.AggregatePrefixes"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index b30f6fb78..ed790d52d 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -53,6 +53,7 @@ type Config struct { CacheResponseHostnames []string DefaultResponse string EnableBroker bool + FilteredServers []string GeoIPLocation string MaxMindKeyFile string MaxStatResponse int @@ -280,6 +281,7 @@ type configWithType struct { CacheResponseHostnames struct { Type string; Value []string } DefaultResponse struct { Type string; Value string } EnableBroker struct { Type string; Value bool } + FilteredServers struct { Type string; Value []string } GeoIPLocation struct { Type string; Value string } MaxMindKeyFile struct { Type string; Value string } MaxStatResponse struct { Type string; Value int } From 601a72b1537707c5319f18b5e3be4da83f605a7d Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Mon, 11 Mar 2024 21:26:04 +0000 Subject: [PATCH 4/5] Add swagger doc to the new APIs --- .../app/api/docs/pelican-swagger.yaml | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/web_ui/frontend/app/api/docs/pelican-swagger.yaml b/web_ui/frontend/app/api/docs/pelican-swagger.yaml index 60b6ce8b3..a1eba7665 100644 --- a/web_ui/frontend/app/api/docs/pelican-swagger.yaml +++ b/web_ui/frontend/app/api/docs/pelican-swagger.yaml @@ -309,6 +309,10 @@ definitions: type: string description: The URL of the issuer on the server to check token validity example: "https://example-origin.com:8443" + broker_url: + type: string + description: The URL to the connection broker + example: "https://example-origin.com:8447" url: type: string description: The URL of XrootD service on the server to access objects @@ -329,6 +333,31 @@ definitions: type: number description: The longitude of the server based on its IP address default: 0 + enableWrite: + type: boolean + description: If the origin server enables write access. Only valid for type=Origin + default: false + enableFallbackRead: + type: boolean + description: If fall back to read from the origin when there's no cache to serve the file + default: false + filtered: + type: boolean + description: | + If the server is excluded from serving client object requests. + No traffic will be redirect to the server if the value is true + default: false + filteredType: + type: string + description: | + The type of filtering/allowance of the server. Can be permFilter|tempFilter|tempAllow + + * **permFilter** when the server is filtered by configuration parameter `Director.FilteredServers` + * **tempFilter** when the server is filtered by web API request at `/servers/filter/*name`. The filter + rule lives in-memory and will be reset at the server restart + * **tempAllow** when the server is filtered by configuration parameter but _allowed_ by the web API at `/servers/allow/*name`, + the change lives in-memory and will be overwritten by config parameter at the server restart + default: "" status: type: string description: The status of director file transfer test against the server. Can be Initializing|Unknown|OK|Error @@ -1187,6 +1216,83 @@ paths: minimum: 0 "400": description: "Bad request, query parameter is invalid" + schema: + type: object + $ref: "#/definitions/ErrorModel" + /director_ui/servers/filter/{name}: + patch: + summary: Filter a server from director redirecting + description: | + `Authentication Required` `Admin` + + + **The filter rules registered by this endpoint are in-memory and will be reset at the server restart.** + + If `Director.FilteredServers` is set, you may add additional, in-memory filter rules. + + If you want to persist the filtering rules, set `Director.FilteredServers` in your server config instead. + tags: + - "director_ui" + parameters: + - in: path + name: name + type: string + required: true + description: The server name to filter + produces: + - application/json + responses: + "200": + description: "OK" + schema: + type: object + $ref: "#/definitions/SuccessModel" + "400": + description: "Bad request. Either `name` is invalid or the server has been filtered" + schema: + type: object + $ref: "#/definitions/ErrorModel" + "403": + description: "Forbidden. Admin privilege required" + schema: + type: object + $ref: "#/definitions/ErrorModel" + /director_ui/servers/allow/{name}: + patch: + summary: Reset filtering rule for a server from director redirecting + description: | + `Authentication Required` `Admin` + + + **The reset of filter rules registered by this endpoint are in-memory and will be removed at the server restart.** + + You may reset the rules defined in `Director.FilteredServers` in-memory but they will be overwritten at the server restart + tags: + - "director_ui" + parameters: + - in: path + name: name + type: string + required: true + description: The server name to reset filtering + produces: + - application/json + responses: + "200": + description: "OK" + schema: + type: object + $ref: "#/definitions/SuccessModel" + "400": + description: "Bad request. Either `name` is invalid or the server is not being filtered" + schema: + type: object + $ref: "#/definitions/ErrorModel" + "403": + description: "Forbidden. Admin privilege required" + schema: + type: object + $ref: "#/definitions/ErrorModel" /director_ui/servers/origins/stat: get: tags: From c68cabbebb146655150ca4dc2caae0de42feda0a Mon Sep 17 00:00:00 2001 From: Haoming Meng Date: Mon, 18 Mar 2024 16:26:51 +0000 Subject: [PATCH 5/5] Address code review comments --- director/director.go | 64 +++++++++++++++++++++------------------ director/director_api.go | 6 ++-- director/director_test.go | 44 +++++++++++++-------------- docs/parameters.yaml | 3 +- 4 files changed, 62 insertions(+), 55 deletions(-) diff --git a/director/director.go b/director/director.go index 958b65cc7..851f10b47 100644 --- a/director/director.go +++ b/director/director.go @@ -35,19 +35,19 @@ type ( } listServerResponse struct { - Name string `json:"name"` - AuthURL string `json:"authUrl"` - BrokerURL string `json:"brokerUrl"` - URL string `json:"url"` // This is server's XRootD URL for file transfer - WebURL string `json:"webUrl"` // This is server's Web interface and API - Type common.ServerType `json:"type"` - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - EnableWrite bool `json:"enableWrite"` - EnableFallbackRead bool `json:"enableFallbackRead"` - Filtered bool `json:"filtered"` - FilteredType filterType `json:"filteredType"` - Status HealthTestStatus `json:"status"` + Name string `json:"name"` + AuthURL string `json:"authUrl"` + BrokerURL string `json:"brokerUrl"` + URL string `json:"url"` // This is server's XRootD URL for file transfer + WebURL string `json:"webUrl"` // This is server's Web interface and API + Type common.ServerType `json:"type"` + Latitude float64 `json:"latitude"` + Longitude float64 `json:"longitude"` + Writes bool `json:"enableWrite"` + DirectReads bool `json:"enableFallbackRead"` + Filtered bool `json:"filtered"` + FilteredType filterType `json:"filteredType"` + Status HealthTestStatus `json:"status"` } statResponse struct { @@ -99,19 +99,19 @@ func listServers(ctx *gin.Context) { } filtered, ft := checkFilter(server.Name) res := listServerResponse{ - Name: server.Name, - BrokerURL: server.BrokerURL.String(), - AuthURL: server.AuthURL.String(), - URL: server.URL.String(), - WebURL: server.WebURL.String(), - Type: server.Type, - Latitude: server.Latitude, - Longitude: server.Longitude, - EnableWrite: server.EnableWrite, - EnableFallbackRead: server.EnableFallbackRead, - Filtered: filtered, - FilteredType: ft, - Status: healthStatus, + Name: server.Name, + BrokerURL: server.BrokerURL.String(), + AuthURL: server.AuthURL.String(), + URL: server.URL.String(), + WebURL: server.WebURL.String(), + Type: server.Type, + Latitude: server.Latitude, + Longitude: server.Longitude, + Writes: server.Writes, + DirectReads: server.DirectReads, + Filtered: filtered, + FilteredType: ft, + Status: healthStatus, } resList = append(resList, res) } @@ -159,22 +159,25 @@ func queryOrigins(ctx *gin.Context) { ctx.JSON(http.StatusOK, res) } +// A gin route handler that given a server hostname through path variable `name`, +// checks and adds the server to a list of servers to be bypassed when the director redirects +// object requests from the client func handleFilterServer(ctx *gin.Context) { sn := strings.TrimPrefix(ctx.Param("name"), "/") if sn == "" { ctx.JSON(http.StatusBadRequest, gin.H{"error": "name is a required path parameter"}) return } - filtered, ft := checkFilter(sn) + filtered, filterType := checkFilter(sn) if filtered { - ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't filter a server that already has been fitlered with type " + ft}) + ctx.JSON(http.StatusBadRequest, gin.H{"error": "Can't filter a server that already has been fitlered with type " + filterType}) return } filteredServersMutex.Lock() defer filteredServersMutex.Unlock() // If we previously temporarily allowed a server, we switch to permFiltered (reset) - if ft == tempAllowed { + if filterType == tempAllowed { filteredServers[sn] = permFiltered } else { filteredServers[sn] = tempFiltered @@ -182,6 +185,9 @@ func handleFilterServer(ctx *gin.Context) { ctx.JSON(http.StatusOK, gin.H{"message": "success"}) } +// A gin route handler that given a server hostname through path variable `name`, +// checks and removes the server from a list of servers to be bypassed when the director redirects +// object requests from the client func handleAllowServer(ctx *gin.Context) { sn := strings.TrimPrefix(ctx.Param("name"), "/") if sn == "" { diff --git a/director/director_api.go b/director/director_api.go index 9d2723703..9b3a0e509 100644 --- a/director/director_api.go +++ b/director/director_api.go @@ -67,9 +67,9 @@ func checkFilter(serverName string) (bool, filterType) { filteredServersMutex.RLock() defer filteredServersMutex.RUnlock() - status, ok := filteredServers[serverName] + status, exists := filteredServers[serverName] // No filter entry - if !ok { + if !exists { return false, "" } else { // Has filter entry @@ -151,7 +151,7 @@ func ConfigFilterdServers() { filteredServersMutex.Lock() defer filteredServersMutex.Unlock() - if len(param.Director_FilteredServers.GetStringSlice()) == 0 { + if !param.Director_FilteredServers.IsSet() { return } diff --git a/director/director_test.go b/director/director_test.go index 9f8b5ebe2..1d7c5e73f 100644 --- a/director/director_test.go +++ b/director/director_test.go @@ -46,30 +46,30 @@ func TestListServers(t *testing.T) { }() mocklistOriginRes := listServerResponse{ - Name: mockOriginServerAd.Name, - BrokerURL: mockOriginServerAd.BrokerURL.String(), - AuthURL: mockOriginServerAd.AuthURL.String(), - URL: mockOriginServerAd.URL.String(), - WebURL: mockOriginServerAd.WebURL.String(), - Type: mockOriginServerAd.Type, - Latitude: mockOriginServerAd.Latitude, - Longitude: mockOriginServerAd.Longitude, - EnableWrite: mockOriginServerAd.EnableWrite, - EnableFallbackRead: mockOriginServerAd.EnableFallbackRead, - Status: HealthStatusUnknown, + Name: mockOriginServerAd.Name, + BrokerURL: mockOriginServerAd.BrokerURL.String(), + AuthURL: mockOriginServerAd.AuthURL.String(), + URL: mockOriginServerAd.URL.String(), + WebURL: mockOriginServerAd.WebURL.String(), + Type: mockOriginServerAd.Type, + Latitude: mockOriginServerAd.Latitude, + Longitude: mockOriginServerAd.Longitude, + Writes: mockOriginServerAd.Writes, + DirectReads: mockOriginServerAd.DirectReads, + Status: HealthStatusUnknown, } mocklistCacheRes := listServerResponse{ - Name: mockCacheServerAd.Name, - BrokerURL: mockCacheServerAd.BrokerURL.String(), - AuthURL: mockCacheServerAd.AuthURL.String(), - URL: mockCacheServerAd.URL.String(), - WebURL: mockCacheServerAd.WebURL.String(), - Type: mockCacheServerAd.Type, - Latitude: mockCacheServerAd.Latitude, - Longitude: mockCacheServerAd.Longitude, - EnableWrite: mockCacheServerAd.EnableWrite, - EnableFallbackRead: mockCacheServerAd.EnableFallbackRead, - Status: HealthStatusUnknown, + Name: mockCacheServerAd.Name, + BrokerURL: mockCacheServerAd.BrokerURL.String(), + AuthURL: mockCacheServerAd.AuthURL.String(), + URL: mockCacheServerAd.URL.String(), + WebURL: mockCacheServerAd.WebURL.String(), + Type: mockCacheServerAd.Type, + Latitude: mockCacheServerAd.Latitude, + Longitude: mockCacheServerAd.Longitude, + Writes: mockCacheServerAd.Writes, + DirectReads: mockCacheServerAd.DirectReads, + Status: HealthStatusUnknown, } t.Run("query-origin", func(t *testing.T) { diff --git a/docs/parameters.yaml b/docs/parameters.yaml index 380681b1d..6a7eecac1 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -1006,7 +1006,8 @@ components: ["director"] --- name: Director.FilteredServers description: >- - A list of server names to not to redirect client requests to. + A list of server host names to not to redirect client requests to. This is for admins to put a list of + servers in the federation into downtime. type: stringSlice default: none components: ["director"]