Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve director restart behavior #1565

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,30 @@ import (
"context"
"encoding/json"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/utils"
)

func assumePelican(resp *http.Response) bool {
if !param.Client_AssumeDirectorServerHeader.GetBool() {
return false
}
return strings.HasPrefix(resp.Header.Get("Server"), "pelican/")
}

// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) {
Expand All @@ -52,34 +62,49 @@ func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (r
},
}

req, err := http.NewRequestWithContext(ctx, verb, resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
}
var errMsg string
var body []byte
for idx := 0; idx < 10; idx++ {
var req *http.Request
req, err = http.NewRequestWithContext(ctx, verb, resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
}

// Include the Client's version as a User-Agent header. The Director will decide
// if it supports the version, and provide an error message in the case that it
// cannot.
req.Header.Set("User-Agent", getUserAgent(""))
// Include the Client's version as a User-Agent header. The Director will decide
// if it supports the version, and provide an error message in the case that it
// cannot.
req.Header.Set("User-Agent", getUserAgent(""))

// Perform the HTTP request
resp, err = client.Do(req)
// Perform the HTTP request
resp, err = client.Do(req)

if err != nil {
log.Errorln("Failed to get response from the director:", err)
return
}
if err != nil {
log.Errorln("Failed to get response from the director:", err)
return
}

defer resp.Body.Close()
log.Tracef("Director's response: %#v\n", resp)
// Check HTTP response -- should be 307 (redirect), else something went wrong
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Errorln("Failed to read the body from the director response:", err)
return resp, err
defer resp.Body.Close()
log.Tracef("Director's response: %#v\n", resp)
// Check HTTP response -- should be 307 (redirect), else something went wrong
body, err = io.ReadAll(resp.Body)
if err != nil {
log.Errorln("Failed to read the body from the director response:", err)
return resp, err
}
errMsg = string(body)

// If this isn't a Pelican process and we got an error, then sleep and retry
if (assumePelican(resp) && (resp.StatusCode == http.StatusBadGateway || resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusInternalServerError)) ||
(resp.StatusCode == http.StatusTooManyRequests) {
log.Warnln("Director response not from a Pelican process; sleeping for", 3*idx+3, "seconds and retrying")
time.Sleep(time.Duration(3*idx+3)*time.Second + time.Duration(rand.Float32()*1000)*time.Millisecond)
} else {
break
}
}
errMsg := string(body)

// The Content-Type will be alike "application/json; charset=utf-8"
if utils.HasContentType(resp, "application/json") {
var respErr server_structs.SimpleApiResp
Expand Down
44 changes: 34 additions & 10 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pelicanplatform/pelican/token"
"github.com/pelicanplatform/pelican/token_scopes"
"github.com/pelicanplatform/pelican/utils"
"github.com/pelicanplatform/pelican/web_ui"
)

type (
Expand Down Expand Up @@ -97,8 +99,14 @@ var (

statUtils = make(map[string]serverStatUtil) // The utilities for the stat call. The key is string form of ServerAd.URL
statUtilsMutex = sync.RWMutex{}

startupTime = time.Now()
)

func inStartupSequence() bool {
return time.Since(startupTime) <= 5*time.Minute
}

func getRedirectURL(reqPath string, ad server_structs.ServerAd, requiresAuth bool) (redirectURL url.URL) {
var serverURL url.URL
if requiresAuth && ad.AuthURL.String() != "" {
Expand Down Expand Up @@ -360,10 +368,17 @@ func redirectToCache(ginCtx *gin.Context) {
// report the lack of path first -- this is most important for the user because it tells them
// they're trying to get an object that simply doesn't exist
if namespaceAd.Path == "" {
ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems",
})
if inStartupSequence() {
ginCtx.JSON(http.StatusTooManyRequests, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No cache serving requested prefix; director just restarted, try again later",
})
} else {
ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No namespace found for prefix. Either it doesn't exist, or the Director is experiencing problems",
})
}
return
}
// if err != nil, depth == 0, which is the default value for depth
Expand Down Expand Up @@ -567,10 +582,17 @@ func redirectToOrigin(ginCtx *gin.Context) {
// report the lack of path first -- this is most important for the user because it tells them
// they're trying to get an object that simply doesn't exist
if namespaceAd.Path == "" {
ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems",
})
if inStartupSequence() {
ginCtx.JSON(http.StatusTooManyRequests, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No origin serving requested prefix; director just restarted, try again later",
})
} else {
ginCtx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "No namespace found for path. Either it doesn't exist, or the Director is experiencing problems",
})
}
return
}

Expand Down Expand Up @@ -836,6 +858,8 @@ func checkHostnameRedirects(c *gin.Context, incomingHost string) {
// original request had been made to /api/v1.0/director/object/foo/bar
func ShortcutMiddleware(defaultResponse string) gin.HandlerFunc {
return func(c *gin.Context) {
web_ui.ServerHeaderMiddleware(c)

// If this is a request for getting public key, don't modify the path
// If this is a request to the Prometheus API, don't modify the path
if strings.HasPrefix(c.Request.URL.Path, "/.well-known/") ||
Expand Down Expand Up @@ -1304,7 +1328,7 @@ func collectClientVersionMetric(reqVer *version.Version, service string) {
}

func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) {
directorAPIV1 := router.Group("/api/v1.0/director")
directorAPIV1 := router.Group("/api/v1.0/director", web_ui.ServerHeaderMiddleware)
{
// Establish the routes used for cache/origin redirection
directorAPIV1.GET("/object/*any", redirectToCache)
Expand All @@ -1330,7 +1354,7 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) {
directorAPIV1.GET("/discoverServers", discoverOriginCache)
}

directorAPIV2 := router.Group("/api/v2.0/director")
directorAPIV2 := router.Group("/api/v2.0/director", web_ui.ServerHeaderMiddleware)
{
directorAPIV2.GET("/listNamespaces", listNamespacesV2)
}
Expand Down
2 changes: 1 addition & 1 deletion director/prom_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func parsePromRes(res promQLRes) (promParsed promQLParsed, err error) {
ResultType: data.ResultType,
}

if data.Result != nil && len(data.Result) > 0 {
if len(data.Result) > 0 {
switch data.Result[0].(type) {
case float64: // result: [unixtime, value]
if len(data.Result) == 2 && (data.ResultType == "scalar" || data.ResultType == "string") {
Expand Down
11 changes: 11 additions & 0 deletions docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,17 @@ default: 0
components: ["client"]
hidden: true
---
name: Client.AssumeDirectorServerHeader
description: |+
Assume the director service always sets the `Server` header to `pelican`.

If enabled and a director response is missing this header, then we assume the response is suspect
(may be a default response from a SSL termination server, like Traefik, while pelican is restarting).
type: bool
default: false
components: ["client"]
hidden: true
---
############################
# Origin-level Configs #
############################
Expand Down
7 changes: 4 additions & 3 deletions launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/web_ui"
"github.com/pelicanplatform/pelican/xrootd"
)

Expand Down Expand Up @@ -72,15 +73,15 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
// Register the web endpoints
if param.Lotman_EnableAPI.GetBool() {
log.Debugln("Registering Lotman API")
lotman.RegisterLotman(ctx, engine.Group("/"))
lotman.RegisterLotman(ctx, engine.Group("/", web_ui.ServerHeaderMiddleware))
}
// Bind the c library funcs to Go
if success := lotman.InitLotman(); !success {
return nil, errors.New("Failed to initialize lotman")
}
}

broker.RegisterBrokerCallback(ctx, engine.Group("/"))
broker.RegisterBrokerCallback(ctx, engine.Group("/", web_ui.ServerHeaderMiddleware))
broker.LaunchNamespaceKeyMaintenance(ctx, egrp)
configPath, err := xrootd.ConfigXrootd(ctx, false)
if err != nil {
Expand All @@ -102,7 +103,7 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m

// Director and origin also registers this metadata URL; avoid registering twice.
if !modules.IsEnabled(server_structs.DirectorType) && !modules.IsEnabled(server_structs.OriginType) {
server_utils.RegisterOIDCAPI(engine.Group("/"), false)
server_utils.RegisterOIDCAPI(engine.Group("/", web_ui.ServerHeaderMiddleware), false)
}

log.Info("Launching cache")
Expand Down
3 changes: 2 additions & 1 deletion launchers/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pelicanplatform/pelican/director"
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/web_ui"
)

func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) error {
Expand Down Expand Up @@ -77,7 +78,7 @@ func DirectorServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group
return errors.Wrap(err, "invalid URL for Director.SupportContactUrl")
}
}
rootGroup := engine.Group("/")
rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware)
director.RegisterDirectorOIDCAPI(rootGroup)
director.RegisterDirectorWebAPI(rootGroup)
engine.Use(director.ShortcutMiddleware(defaultResponse))
Expand Down
4 changes: 2 additions & 2 deletions launchers/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv
}

if modules.IsEnabled(server_structs.BrokerType) {
rootGroup := engine.Group("/")
rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware)
broker.RegisterBroker(ctx, rootGroup)
broker.LaunchNamespaceKeyMaintenance(ctx, egrp)
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func LaunchModules(ctx context.Context, modules server_structs.ServerType) (serv
if err != nil {
return
}
rootGroup := engine.Group("/")
rootGroup := engine.Group("/", web_ui.ServerHeaderMiddleware)
lc.Register(ctx, rootGroup)
}

Expand Down
3 changes: 2 additions & 1 deletion launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/web_ui"
"github.com/pelicanplatform/pelican/xrootd"
)

Expand Down Expand Up @@ -88,7 +89,7 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,

// Director also registers this metadata URL; avoid registering twice.
if !modules.IsEnabled(server_structs.DirectorType) {
server_utils.RegisterOIDCAPI(engine.Group("/"), false)
server_utils.RegisterOIDCAPI(engine.Group("/", web_ui.ServerHeaderMiddleware), false)
}

if param.Origin_EnableIssuer.GetBool() {
Expand Down
3 changes: 2 additions & 1 deletion launchers/registry_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pelicanplatform/pelican/metrics"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/registry"
"github.com/pelicanplatform/pelican/web_ui"
)

func RegistryServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group) error {
Expand Down Expand Up @@ -64,7 +65,7 @@ func RegistryServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group
go registry.PeriodicTopologyReload(ctx)
}

rootRouterGroup := engine.Group("/")
rootRouterGroup := engine.Group("/", web_ui.ServerHeaderMiddleware)
// Register routes for server/Pelican client facing APIs
registry.RegisterRegistryAPI(rootRouterGroup)
// Register routes for APIs to registry Web UI
Expand Down
2 changes: 1 addition & 1 deletion local_cache/local_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func (cr *cacheReader) peekError(ctx context.Context) (err error) {
}

func (cr *cacheReader) Read(p []byte) (n int, err error) {
if cr.buf != nil && len(cr.buf) > 0 {
if len(cr.buf) > 0 {
bytesCopied := copy(p, cr.buf)
if len(cr.buf) > bytesCopied {
cr.buf = cr.buf[bytesCopied:]
Expand Down
2 changes: 1 addition & 1 deletion origin/origin_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func handleExports(ctx *gin.Context) {
}

func RegisterOriginWebAPI(engine *gin.Engine) error {
originWebAPI := engine.Group("/api/v1.0/origin_ui")
originWebAPI := engine.Group("/api/v1.0/origin_ui", web_ui.ServerHeaderMiddleware)
{
originWebAPI.GET("/exports", web_ui.AuthHandler, web_ui.AdminAuthHandler, handleExports)
}
Expand Down
1 change: 1 addition & 0 deletions param/parameters.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions param/parameters_struct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion registry/custom_reg_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func InitCustomRegistrationFields() error {
return errors.New(fmt.Sprintf("Bad custom registration field, unsupported field type: %q with %q", conf.Name, conf.Type))
}
if conf.Type == "enum" {
if (conf.Options == nil || len(conf.Options) == 0) && conf.OptionsUrl == "" {
if len(conf.Options) == 0 && conf.OptionsUrl == "" {
return errors.New(fmt.Sprintf("Bad custom registration field, 'enum' type field does not have options or optionsUrl set: %q", conf.Name))
}
}
Expand Down
2 changes: 1 addition & 1 deletion web_ui/oauth2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func ConfigOAuthClientAPIs(engine *gin.Engine) error {
return err
}

oauthGroup := engine.Group("/api/v1.0/auth/oauth", seHandler)
oauthGroup := engine.Group("/api/v1.0/auth/oauth", seHandler, ServerHeaderMiddleware)
{
oauthGroup.GET("/login", handleOAuthLogin)
oauthGroup.GET("/callback", handleOAuthCallback)
Expand Down
Loading
Loading