From 2b926bb9dee1167192ce27739f709ec0183dc922 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 16 Aug 2024 14:43:11 -0500 Subject: [PATCH 1/2] Improve director restart behavior When the director restarts, - Detect whether a response is coming from Pelican or a SSL terminator application. In the latter case, retry the request a few times to allow the director to restart. - If the director has recently restarted, instead of sending a 404, return a 429 (too many requests) indicating the client should retry again soon. --- client/director.go | 71 +++++++++++++++++++++++++------------ director/director.go | 44 +++++++++++++++++------ docs/parameters.yaml | 11 ++++++ launchers/cache_serve.go | 7 ++-- launchers/director_serve.go | 3 +- launchers/launcher.go | 4 +-- launchers/origin_serve.go | 3 +- launchers/registry_serve.go | 3 +- origin/origin_ui.go | 2 +- param/parameters.go | 1 + param/parameters_struct.go | 2 ++ web_ui/oauth2_client.go | 2 +- web_ui/ui.go | 4 +++ 13 files changed, 114 insertions(+), 43 deletions(-) diff --git a/client/director.go b/client/director.go index 2ad92faea..a33c2c974 100644 --- a/client/director.go +++ b/client/director.go @@ -22,20 +22,30 @@ import ( "context" "encoding/json" "io" + "math/rand" "net/http" "net/url" "sort" "strconv" "strings" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/pelicanplatform/pelican/config" + "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/utils" ) +func assumePelican(resp *http.Response) bool { + if !param.Client_AssumeDirectorServerHeader.GetBool() { + return false + } + return strings.HasPrefix(resp.Header.Get("Server"), "pelican/") +} + // Make a request to the director for a given verb/resource; return the // HTTP response object only if a 307 is returned. func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) { @@ -52,34 +62,49 @@ func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (r }, } - req, err := http.NewRequestWithContext(ctx, verb, resourceUrl, nil) - if err != nil { - log.Errorln("Failed to create an HTTP request:", err) - return nil, err - } + var errMsg string + var body []byte + for idx := 0; idx < 10; idx++ { + var req *http.Request + req, err = http.NewRequestWithContext(ctx, verb, resourceUrl, nil) + if err != nil { + log.Errorln("Failed to create an HTTP request:", err) + return nil, err + } - // Include the Client's version as a User-Agent header. The Director will decide - // if it supports the version, and provide an error message in the case that it - // cannot. - req.Header.Set("User-Agent", getUserAgent("")) + // Include the Client's version as a User-Agent header. The Director will decide + // if it supports the version, and provide an error message in the case that it + // cannot. + req.Header.Set("User-Agent", getUserAgent("")) - // Perform the HTTP request - resp, err = client.Do(req) + // Perform the HTTP request + resp, err = client.Do(req) - if err != nil { - log.Errorln("Failed to get response from the director:", err) - return - } + if err != nil { + log.Errorln("Failed to get response from the director:", err) + return + } - defer resp.Body.Close() - log.Tracef("Director's response: %#v\n", resp) - // Check HTTP response -- should be 307 (redirect), else something went wrong - body, err := io.ReadAll(resp.Body) - if err != nil { - log.Errorln("Failed to read the body from the director response:", err) - return resp, err + defer resp.Body.Close() + log.Tracef("Director's response: %#v\n", resp) + // Check HTTP response -- should be 307 (redirect), else something went wrong + body, err = io.ReadAll(resp.Body) + if err != nil { + log.Errorln("Failed to read the body from the director response:", err) + return resp, err + } + errMsg = string(body) + + // If this isn't a Pelican process and we got an error, then sleep and retry + if (assumePelican(resp) && (resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError)) || + (resp.StatusCode == http.StatusTooManyRequests) { + log.Warnln("Director response not from a Pelican process; sleeping for", 3*idx+3, "seconds and retrying") + time.Sleep(time.Duration(3*idx+3)*time.Second + time.Duration(rand.Float32()*1000)*time.Millisecond) + } else { + break + } } - errMsg := string(body) + // The Content-Type will be alike "application/json; charset=utf-8" if utils.HasContentType(resp, "application/json") { var respErr server_structs.SimpleApiResp diff --git a/director/director.go b/director/director.go index b160fc361..e5d5fd85d 100644 --- a/director/director.go +++ b/director/director.go @@ -27,6 +27,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" @@ -44,6 +45,7 @@ import ( "github.com/pelicanplatform/pelican/token" "github.com/pelicanplatform/pelican/token_scopes" "github.com/pelicanplatform/pelican/utils" + "github.com/pelicanplatform/pelican/web_ui" ) type ( @@ -97,8 +99,14 @@ var ( statUtils = make(map[string]serverStatUtil) // The utilities for the stat call. The key is string form of ServerAd.URL statUtilsMutex = sync.RWMutex{} + + startupTime = time.Now() ) +func inStartupSequence() bool { + return time.Since(startupTime) <= 5*time.Minute +} + func getRedirectURL(reqPath string, ad server_structs.ServerAd, requiresAuth bool) (redirectURL url.URL) { var serverURL url.URL if requiresAuth && ad.AuthURL.String() != "" { @@ -360,10 +368,17 @@ func redirectToCache(ginCtx *gin.Context) { // report the lack of path first -- this is most important for the user because it tells them // they're trying to get an object that simply doesn't exist if namespaceAd.Path == "" { - ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems", - }) + if inStartupSequence() { + ginCtx.JSON(http.StatusTooManyRequests, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "No cache serving requested prefix; director just restarted, try again later", + }) + } else { + ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "No namespace found for prefix. Either it doesn't exist, or the Director is experiencing problems", + }) + } return } // if err != nil, depth == 0, which is the default value for depth @@ -567,10 +582,17 @@ func redirectToOrigin(ginCtx *gin.Context) { // report the lack of path first -- this is most important for the user because it tells them // they're trying to get an object that simply doesn't exist if namespaceAd.Path == "" { - ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ - Status: server_structs.RespFailed, - Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems", - }) + if inStartupSequence() { + ginCtx.JSON(http.StatusTooManyRequests, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "No origin serving requested prefix; director just restarted, try again later", + }) + } else { + ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{ + Status: server_structs.RespFailed, + Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems", + }) + } return } @@ -836,6 +858,8 @@ func checkHostnameRedirects(c *gin.Context, incomingHost string) { // original request had been made to /api/v1.0/director/object/foo/bar func ShortcutMiddleware(defaultResponse string) gin.HandlerFunc { return func(c *gin.Context) { + web_ui.ServerHeaderMiddleware(c) + // If this is a request for getting public key, don't modify the path // If this is a request to the Prometheus API, don't modify the path if strings.HasPrefix(c.Request.URL.Path, "/.well-known/") || @@ -1304,7 +1328,7 @@ func collectClientVersionMetric(reqVer *version.Version, service string) { } func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) { - directorAPIV1 := router.Group("/api/v1.0/director") + directorAPIV1 := router.Group("/api/v1.0/director", web_ui.ServerHeaderMiddleware) { // Establish the routes used for cache/origin redirection directorAPIV1.GET("/object/*any", redirectToCache) @@ -1330,7 +1354,7 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) { directorAPIV1.GET("/discoverServers", discoverOriginCache) } - directorAPIV2 := router.Group("/api/v2.0/director") + directorAPIV2 := router.Group("/api/v2.0/director", web_ui.ServerHeaderMiddleware) { directorAPIV2.GET("/listNamespaces", listNamespacesV2) } diff --git a/docs/parameters.yaml b/docs/parameters.yaml index a4ce75510..6e2211e3b 100644 --- a/docs/parameters.yaml +++ b/docs/parameters.yaml @@ -449,6 +449,17 @@ default: 0 components: ["client"] hidden: true --- +name: Client.AssumeDirectorServerHeader +description: |+ + Assume the director service always sets the `Server` header to `pelican`. + + If enabled and a director response is missing this header, then we assume the response is suspect + (may be a default response from a SSL termination server, like Traefik, while pelican is restarting). +type: bool +default: false +components: ["client"] +hidden: true +--- ############################ # Origin-level Configs # ############################ diff --git a/launchers/cache_serve.go b/launchers/cache_serve.go index 6822ee876..c9120da86 100644 --- a/launchers/cache_serve.go +++ b/launchers/cache_serve.go @@ -41,6 +41,7 @@ import ( "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/web_ui" "github.com/pelicanplatform/pelican/xrootd" ) @@ -72,7 +73,7 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m // Register the web endpoints if param.Lotman_EnableAPI.GetBool() { log.Debugln("Registering Lotman API") - lotman.RegisterLotman(ctx, engine.Group("/")) + lotman.RegisterLotman(ctx, engine.Group("/", web_ui.ServerHeaderMiddleware)) } // Bind the c library funcs to Go if success := lotman.InitLotman(); !success { @@ -80,7 +81,7 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m } } - broker.RegisterBrokerCallback(ctx, engine.Group("/")) + broker.RegisterBrokerCallback(ctx, engine.Group("/", web_ui.ServerHeaderMiddleware)) broker.LaunchNamespaceKeyMaintenance(ctx, egrp) configPath, err := xrootd.ConfigXrootd(ctx, false) if err != nil { @@ -102,7 +103,7 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m // Director and origin also registers this metadata URL; avoid registering twice. if !modules.IsEnabled(server_structs.DirectorType) && !modules.IsEnabled(server_structs.OriginType) { - server_utils.RegisterOIDCAPI(engine.Group("/"), false) + server_utils.RegisterOIDCAPI(engine.Group("/", web_ui.ServerHeaderMiddleware), false) } log.Info("Launching cache") diff --git a/launchers/director_serve.go b/launchers/director_serve.go index df7e080cc..a2974e4f0 100644 --- a/launchers/director_serve.go +++ b/launchers/director_serve.go @@ -32,6 +32,7 @@ import ( "github.com/pelicanplatform/pelican/director" "github.com/pelicanplatform/pelican/metrics" "github.com/pelicanplatform/pelican/param" + "github.com/pelicanplatform/pelican/web_ui" ) func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) error { @@ -77,7 +78,7 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group return errors.Wrap(err, "invalid URL for Director.SupportContactUrl") } } - rootGroup := engine.Group("/") + rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware) director.RegisterDirectorOIDCAPI(rootGroup) director.RegisterDirectorWebAPI(rootGroup) engine.Use(director.ShortcutMiddleware(defaultResponse)) diff --git a/launchers/launcher.go b/launchers/launcher.go index 458b3e9e0..9ba609a0b 100644 --- a/launchers/launcher.go +++ b/launchers/launcher.go @@ -122,7 +122,7 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv } if modules.IsEnabled(server_structs.BrokerType) { - rootGroup := engine.Group("/") + rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware) broker.RegisterBroker(ctx, rootGroup) broker.LaunchNamespaceKeyMaintenance(ctx, egrp) } @@ -192,7 +192,7 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv if err != nil { return } - rootGroup := engine.Group("/") + rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware) lc.Register(ctx, rootGroup) } diff --git a/launchers/origin_serve.go b/launchers/origin_serve.go index c07f0ea40..6262a0538 100644 --- a/launchers/origin_serve.go +++ b/launchers/origin_serve.go @@ -40,6 +40,7 @@ import ( "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/server_structs" "github.com/pelicanplatform/pelican/server_utils" + "github.com/pelicanplatform/pelican/web_ui" "github.com/pelicanplatform/pelican/xrootd" ) @@ -88,7 +89,7 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, // Director also registers this metadata URL; avoid registering twice. if !modules.IsEnabled(server_structs.DirectorType) { - server_utils.RegisterOIDCAPI(engine.Group("/"), false) + server_utils.RegisterOIDCAPI(engine.Group("/", web_ui.ServerHeaderMiddleware), false) } if param.Origin_EnableIssuer.GetBool() { diff --git a/launchers/registry_serve.go b/launchers/registry_serve.go index f5f9d2291..4fbb423c3 100644 --- a/launchers/registry_serve.go +++ b/launchers/registry_serve.go @@ -30,6 +30,7 @@ import ( "github.com/pelicanplatform/pelican/metrics" "github.com/pelicanplatform/pelican/param" "github.com/pelicanplatform/pelican/registry" + "github.com/pelicanplatform/pelican/web_ui" ) func RegistryServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) error { @@ -64,7 +65,7 @@ func RegistryServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group go registry.PeriodicTopologyReload(ctx) } - rootRouterGroup := engine.Group("/") + rootRouterGroup := engine.Group("/", web_ui.ServerHeaderMiddleware) // Register routes for server/Pelican client facing APIs registry.RegisterRegistryAPI(rootRouterGroup) // Register routes for APIs to registry Web UI diff --git a/origin/origin_ui.go b/origin/origin_ui.go index 64f96dfc4..17a60d513 100644 --- a/origin/origin_ui.go +++ b/origin/origin_ui.go @@ -162,7 +162,7 @@ func handleExports(ctx *gin.Context) { } func RegisterOriginWebAPI(engine *gin.Engine) error { - originWebAPI := engine.Group("/api/v1.0/origin_ui") + originWebAPI := engine.Group("/api/v1.0/origin_ui", web_ui.ServerHeaderMiddleware) { originWebAPI.GET("/exports", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleExports) } diff --git a/param/parameters.go b/param/parameters.go index 32360b1ef..98bb98f9d 100644 --- a/param/parameters.go +++ b/param/parameters.go @@ -321,6 +321,7 @@ var ( Cache_EnableOIDC = BoolParam{"Cache.EnableOIDC"} Cache_EnableVoms = BoolParam{"Cache.EnableVoms"} Cache_SelfTest = BoolParam{"Cache.SelfTest"} + Client_AssumeDirectorServerHeader = BoolParam{"Client.AssumeDirectorServerHeader"} Client_DisableHttpProxy = BoolParam{"Client.DisableHttpProxy"} Client_DisableProxyFallback = BoolParam{"Client.DisableProxyFallback"} Debug = BoolParam{"Debug"} diff --git a/param/parameters_struct.go b/param/parameters_struct.go index 32eb78faf..d16a21a48 100644 --- a/param/parameters_struct.go +++ b/param/parameters_struct.go @@ -46,6 +46,7 @@ type Config struct { XRootDPrefix string `mapstructure:"xrootdprefix"` } `mapstructure:"cache"` Client struct { + AssumeDirectorServerHeader bool `mapstructure:"assumedirectorserverheader"` DisableHttpProxy bool `mapstructure:"disablehttpproxy"` DisableProxyFallback bool `mapstructure:"disableproxyfallback"` MaximumDownloadSpeed int `mapstructure:"maximumdownloadspeed"` @@ -340,6 +341,7 @@ type configWithType struct { XRootDPrefix struct { Type string; Value string } } Client struct { + AssumeDirectorServerHeader struct { Type string; Value bool } DisableHttpProxy struct { Type string; Value bool } DisableProxyFallback struct { Type string; Value bool } MaximumDownloadSpeed struct { Type string; Value int } diff --git a/web_ui/oauth2_client.go b/web_ui/oauth2_client.go index 4136c35a9..9c0add967 100644 --- a/web_ui/oauth2_client.go +++ b/web_ui/oauth2_client.go @@ -417,7 +417,7 @@ func ConfigOAuthClientAPIs(engine *gin.Engine) error { return err } - oauthGroup := engine.Group("/api/v1.0/auth/oauth", seHandler) + oauthGroup := engine.Group("/api/v1.0/auth/oauth", seHandler, ServerHeaderMiddleware) { oauthGroup.GET("/login", handleOAuthLogin) oauthGroup.GET("/callback", handleOAuthCallback) diff --git a/web_ui/ui.go b/web_ui/ui.go index cf658fcae..e585274c2 100644 --- a/web_ui/ui.go +++ b/web_ui/ui.go @@ -64,6 +64,10 @@ var ( const notFoundFilePath = "frontend/out/404/index.html" +func ServerHeaderMiddleware(ctx *gin.Context) { + ctx.Writer.Header().Add("Server", "pelican/"+config.GetVersion()) +} + func getConfigValues(ctx *gin.Context) { user := ctx.GetString("User") if user == "" { From 73f34c701e8407cfb0246caaa753e556c413418a Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 16 Aug 2024 13:40:37 -0500 Subject: [PATCH 2/2] Fix linter failures from new nil / len test. --- director/prom_query.go | 2 +- local_cache/local_cache.go | 2 +- registry/custom_reg_fields.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/director/prom_query.go b/director/prom_query.go index 8c4475c0f..59543eb6c 100644 --- a/director/prom_query.go +++ b/director/prom_query.go @@ -147,7 +147,7 @@ func parsePromRes(res promQLRes) (promParsed promQLParsed, err error) { ResultType: data.ResultType, } - if data.Result != nil && len(data.Result) > 0 { + if len(data.Result) > 0 { switch data.Result[0].(type) { case float64: // result: [unixtime, value] if len(data.Result) == 2 && (data.ResultType == "scalar" || data.ResultType == "string") { diff --git a/local_cache/local_cache.go b/local_cache/local_cache.go index fe3cd5f9b..40fb7af43 100644 --- a/local_cache/local_cache.go +++ b/local_cache/local_cache.go @@ -803,7 +803,7 @@ func (cr *cacheReader) peekError(ctx context.Context) (err error) { } func (cr *cacheReader) Read(p []byte) (n int, err error) { - if cr.buf != nil && len(cr.buf) > 0 { + if len(cr.buf) > 0 { bytesCopied := copy(p, cr.buf) if len(cr.buf) > bytesCopied { cr.buf = cr.buf[bytesCopied:] diff --git a/registry/custom_reg_fields.go b/registry/custom_reg_fields.go index 0cf78f3da..58c7d3559 100644 --- a/registry/custom_reg_fields.go +++ b/registry/custom_reg_fields.go @@ -282,7 +282,7 @@ func InitCustomRegistrationFields() error { return errors.New(fmt.Sprintf("Bad custom registration field, unsupported field type: %q with %q", conf.Name, conf.Type)) } if conf.Type == "enum" { - if (conf.Options == nil || len(conf.Options) == 0) && conf.OptionsUrl == "" { + if len(conf.Options) == 0 && conf.OptionsUrl == "" { return errors.New(fmt.Sprintf("Bad custom registration field, 'enum' type field does not have options or optionsUrl set: %q", conf.Name)) } }