Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downtime set by origin and cache #1996

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 39 additions & 28 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,16 @@ func (f filterType) String() string {
}
}

// recordAd does following for an incoming ServerAd and []NamespaceAdV2 pair:
//
// 1. Update the ServerAd by setting server location and updating server topology attribute
// 2. Record the ServerAd and NamespaceAdV2 to the TTL cache
// 3. Set up the server `stat` call utilities
// 4. Set up utilities for collecting origin/health server file transfer test status
// 5. Return the updated ServerAd. The ServerAd passed in will not be modified
func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]server_structs.NamespaceAdV2) (updatedAd server_structs.ServerAd) {
if err := updateLatLong(&sAd); err != nil {
if geoIPError, ok := err.(geoIPError); ok {
labels := geoIPError.labels
metrics.PelicanDirectorGeoIPErrors.With(labels).Inc()
}
log.Debugln("Failed to lookup GeoIP coordinates for host", sAd.URL.Host)
}

if sAd.URL.String() == "" {
log.Errorf("The URL of the serverAd %#v is empty. Cannot set the TTL cache.", sAd)
return
// getServerAd returns the existing server ad of the given server url from the cache
func getServerAd(serverUrl string) (*server_structs.Advertisement, error) {
if serverUrl == "" {
return nil, errors.Errorf("The URL of the serverAd is empty. Cannot set the TTL cache.")
}
// Since servers from topology always use http, while servers from Pelican always use https
// we want to ignore the scheme difference when checking duplicates (only consider hostname:port)
rawURL := sAd.URL.String() // could be http (topology) or https (Pelican or some topology ones)
httpURL := sAd.URL.String()
httpsURL := sAd.URL.String()
rawURL := serverUrl // could be http (topology) or https (Pelican or some topology ones)
httpURL := serverUrl
httpsURL := serverUrl
if strings.HasPrefix(rawURL, "https") {
httpURL = "http" + strings.TrimPrefix(rawURL, "https")
}
Expand All @@ -113,21 +98,47 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
if existing == nil {
existing = serverAds.Get(rawURL)
}
if existing == nil {
return nil, errors.Errorf("The server ad for %s is not found in the cache", serverUrl)
}
return existing.Value(), nil
}

// recordAd does following for an incoming ServerAd and []NamespaceAdV2 pair:
//
// 1. Update the ServerAd by setting server location and updating server topology attribute
// 2. Record the ServerAd and NamespaceAdV2 to the TTL cache
// 3. Set up the server `stat` call utilities
// 4. Set up utilities for collecting origin/health server file transfer test status
// 5. Return the updated ServerAd. The ServerAd passed in will not be modified
func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]server_structs.NamespaceAdV2) (updatedAd server_structs.ServerAd) {
if err := updateLatLong(&sAd); err != nil {
if geoIPError, ok := err.(geoIPError); ok {
labels := geoIPError.labels
metrics.PelicanDirectorGeoIPErrors.With(labels).Inc()
}
log.Debugln("Failed to lookup GeoIP coordinates for host", sAd.URL.Host)
}

existing, err := getServerAd(sAd.URL.String())
if err != nil {
log.Debugf("No existing server ad for %s: %v", sAd.URL.String(), err)
}

// There's an existing ad in the cache
if existing != nil {
if sAd.FromTopology && !existing.Value().FromTopology {
if sAd.FromTopology && !existing.FromTopology {
// if the incoming is from topology but the existing is from Pelican
log.Debugf("The ServerAd generated from topology with name %s and URL %s was ignored because there's already a Pelican ad for this server", sAd.Name, sAd.URL.String())
return
}
if !sAd.FromTopology && existing.Value().FromTopology {
if !sAd.FromTopology && existing.FromTopology {
// Pelican server will overwrite topology one. We leave a message to let admin know
log.Debugf("The existing ServerAd generated from topology with name %s and URL %s is replaced by the Pelican server with name %s", existing.Value().Name, existing.Value().URL.String(), sAd.Name)
serverAds.Delete(existing.Value().URL.String())
log.Debugf("The existing ServerAd generated from topology with name %s and URL %s is replaced by the Pelican server with name %s", existing.Name, existing.URL.String(), sAd.Name)
serverAds.Delete(existing.URL.String())
}
if !sAd.FromTopology && !existing.Value().FromTopology { // Only copy the IO Load value for Pelican server
sAd.IOLoad = existing.Value().GetIOLoad() // we copy the value from the existing serverAD to be consistent
if !sAd.FromTopology && !existing.FromTopology { // Only copy the IO Load value for Pelican server
sAd.IOLoad = existing.GetIOLoad() // we copy the value from the existing serverAD to be consistent
}
}

Expand Down
100 changes: 100 additions & 0 deletions director/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,6 +1516,103 @@ func collectClientVersionMetric(reqVer *version.Version, service string) {
metrics.PelicanDirectorClientVersionTotal.With(prometheus.Labels{"version": shortenedVersion, "service": service}).Inc()
}

// Endpoint for origin and cache servers to set their own downtime in director
func setDowntimeByServer(ctx *gin.Context) {
downtimeRequest := server_structs.DowntimeRequest{}
err := ctx.ShouldBindBodyWith(&downtimeRequest, binding.JSON)
if err != nil {
ctx.JSON(http.StatusBadRequest, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Invalid downtime setting request",
})
return
}

// Server's XRootD URL starting with http:// or https://, and ending with a port number
serverUrl := downtimeRequest.ServerUrl
ad, err := getServerAd(serverUrl)
if err != nil {
ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Server not found in cached server ads: " + serverUrl,
})
return
}
serverName := ad.Name
if serverName == "" {
ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Server name not found",
})
return
}

tokens, present := ctx.Request.Header["Authorization"]
if !present || len(tokens) == 0 {
ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Bearer token not present in the 'Authorization' header",
})
return
}

token := strings.TrimPrefix(tokens[0], "Bearer ")

serverType := ad.ServerAd.Type
var serverNs string
if serverType == "Origin" {
serverNs = server_structs.GetOriginNs(serverName)
} else if serverType == "Cache" {
serverNs = server_structs.GetCacheNS(serverName)
}
if serverNs == "" {
ctx.JSON(http.StatusNotFound, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Server namespace not found",
})
return
}

ok, err := verifyAdvertiseToken(ctx, token, serverNs)
if err != nil {
if err == adminApprovalErr {
ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("%s was not approved by an administrator", downtimeRequest.ServerUrl),
})
return
} else {
ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: fmt.Sprintf("Authorization token verification failed %v", err),
})
return
}
}

if !ok {
ctx.JSON(http.StatusForbidden, server_structs.SimpleApiResp{
Status: server_structs.RespFailed,
Msg: "Authorization token verification failed. Token missing required scope",
})
return
}

ctx.Params = append(ctx.Params, gin.Param{Key: "name", Value: serverName})

// Set the downtime
if downtimeRequest.EnableDowntime {
handleFilterServer(ctx)
} else {
handleAllowServer(ctx)
}

ctx.JSON(http.StatusOK, server_structs.SimpleApiResp{
Status: server_structs.RespOK,
Msg: fmt.Sprintf("Successfully set the downtime of %s %s to %v", ad.Type, serverName, downtimeRequest.EnableDowntime),
})
}

func collectDirectorRedirectionMetric(ctx *gin.Context, destination string) {
labels := prometheus.Labels{
"destination": destination,
Expand Down Expand Up @@ -1571,6 +1668,9 @@ func RegisterDirectorAPI(ctx context.Context, router *gin.RouterGroup) {
// so that director can be our point of contact for collecting system-level metrics.
// Rename the endpoint to reflect such plan.
directorAPIV1.GET("/discoverServers", discoverOriginCache)

// Cache/Origin sets their own downtime
directorAPIV1.POST("/downtime", setDowntimeByServer)
}

directorAPIV2 := router.Group("/api/v2.0/director", web_ui.ServerHeaderMiddleware)
Expand Down
Loading
Loading