Skip to content

Commit

Permalink
Fix topology downtime integration by using downtime URL and not ns JSON
Browse files Browse the repository at this point in the history
This changes the overall approach for grabbing downtime information from Topology.
Previously, we generated the list of downed servers by getting a diff of the servers
returned in the namespaces JSON when querying that endpoint with `?include_downed=<0/1>`.
The issue with this approach is that Topology services marked as Pelican services are
explicitly excluded from the returned JSON.

However, Pelican services that are recorded in Topology are still included in the downtime
URL, so this new approach grabs the list of resource names from that instead. That was
probably the right approach all along, but topology endpoints remain inscrutable ¯\_(ツ)_/¯

This also corrects a documentation issue that claimed filtered servers could be added
in the Director using their hostname. In fact, they need to be filtered using their
site name!
  • Loading branch information
jhiemstrawisc committed Aug 29, 2024
1 parent e0bed5a commit f543f9d
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 223 deletions.
5 changes: 3 additions & 2 deletions config/resources/osdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ Xrootd:
DetailedMonitoringHost: xrd-mon.osgstorage.org
Federation:
DiscoveryUrl: osg-htc.org
TopologyURL: https://topology.opensciencegrid.org
TopologyNamespaceURL: https://topology.opensciencegrid.org/osdf/namespaces?production=1
TopologyUrl: https://topology.opensciencegrid.org
TopologyNamespaceUrl: https://topology.opensciencegrid.org/osdf/namespaces?production=1
TopologyDowntimeUrl: https://topology.opensciencegrid.org/rgdowntime/xml
TopologyReloadInterval: 4.5m
Registry:
RequireCacheApproval: true
Expand Down
85 changes: 48 additions & 37 deletions director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@ package director

import (
"context"
"encoding/xml"
"net/http"
"net/url"
"strings"
"time"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/server_structs"
"github.com/pelicanplatform/pelican/server_utils"
"github.com/pelicanplatform/pelican/utils"
)

// Consolite two ServerAds that share the same ServerAd.URL. For all but the capability fields,
Expand All @@ -54,7 +58,7 @@ func consolidateDupServerAd(newAd, existingAd server_structs.ServerAd) server_st

// Takes in server information from topology and handles converting the necessary bits into a new Pelican
// ServerAd.
func parseServerAdFromTopology(server server_utils.Server, serverType server_structs.ServerType, caps server_structs.Capabilities) server_structs.ServerAd {
func parseServerAdFromTopology(server server_structs.TopoServer, serverType server_structs.ServerType, caps server_structs.Capabilities) server_structs.ServerAd {
serverAd := server_structs.ServerAd{}
serverAd.Type = serverType.String()
serverAd.Name = server.Resource
Expand Down Expand Up @@ -118,30 +122,26 @@ func parseServerAdFromTopology(server server_utils.Server, serverType server_str
return serverAd
}

// Do a subtraction of excludeDowned set from the includeDowned set to find cache servers
// that are in downtime
//
// The excludeDowned is a list of running OSDF topology servers
// The includeDowned is a list of running and downed OSDF topology servers
func findDownedTopologyCache(excludeDowned, includeDowned []server_utils.Server) (caches []server_utils.Server) {
for _, included := range includeDowned {
found := false
for _, excluded := range excludeDowned {
if included == excluded {
found = true
break
}
}
if !found {
caches = append(caches, included)
}
// Use the topology downtime endpoint to create the list of downed servers. Servers are tracked using their
// resource name, NOT their FQDN.
func updateDowntimeFromTopology(ctx context.Context) error {
dtUrlStr := param.Federation_TopologyDowntimeUrl.GetString()
_, err := url.Parse(dtUrlStr)
if err != nil {
return errors.Wrapf(err, "encountered an invalid URL %s when parsing configured topology downtime URL", dtUrlStr)
}
tr := config.GetTransport()
resp, err := utils.MakeRequest(ctx, tr, dtUrlStr, http.MethodGet, nil, nil)
if err != nil {
return errors.Wrapf(err, "failed to fetch topology downtime from %s", dtUrlStr)
}
return
}

// Update filteredServers based on topology downtime
func updateDowntimeFromTopology(excludedNss, includedNss *server_utils.TopologyNamespacesJSON) {
downedCaches := findDownedTopologyCache(excludedNss.Caches, includedNss.Caches)
// Parse the big blurb of XML into a struct.
var downtimeInfo server_structs.TopoDowntimeInfo
err = xml.Unmarshal(resp, &downtimeInfo)
if err != nil {
return errors.Wrap(err, "failed to unmarshal topology downtime XML")
}

filteredServersMutex.Lock()
defer filteredServersMutex.Unlock()
Expand All @@ -151,33 +151,44 @@ func updateDowntimeFromTopology(excludedNss, includedNss *server_utils.TopologyN
delete(filteredServers, key)
}
}
for _, dc := range downedCaches {
if sAd := serverAds.Get(dc.Endpoint); sAd == nil {
// The downed cache is not in the director yet
filteredServers[dc.Resource] = topoFiltered
} else {
// If we have the cache in the director, use it's name as the key
filteredServers[sAd.Value().Name] = topoFiltered

const timeLayout = "Jan 2, 2006 15:04 PM MST" // see https://pkg.go.dev/time#pkg-constants
for _, downtime := range downtimeInfo.CurrentDowntimes.Downtimes {
parsedStartDT, err := time.Parse(timeLayout, downtime.StartTime)
if err != nil {
log.Warningf("Could not put %s into downtime because its start time '%s' could not be parsed: %s", downtime.ResourceName, downtime.StartTime, err)
continue
}

parsedEndDT, err := time.Parse(timeLayout, downtime.EndTime)
if err != nil {
log.Warningf("Could not put %s into downtime because its end time '%s' could not be parsed: %s", downtime.ResourceName, downtime.EndTime, err)
continue
}

currentTime := time.Now()
if parsedStartDT.Before(currentTime) && parsedEndDT.After(currentTime) {
filteredServers[downtime.ResourceName] = topoFiltered
}
}
log.Infof("The following servers are put in downtime: %#v", filteredServers)

log.Infof("The following servers are currently configured in downtime: %#v", filteredServers)
return nil
}

// Populate internal cache with origin/cache ads
func AdvertiseOSDF(ctx context.Context) error {
namespaces, err := server_utils.GetTopologyJSON(ctx, false)
namespaces, err := server_utils.GetTopologyJSON(ctx)
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON")
}

// Second call to fetch all servers (including servers in downtime)
includedNss, err := server_utils.GetTopologyJSON(ctx, true)
err = updateDowntimeFromTopology(ctx)
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON with server in downtime included (include_downed)")
// Don't treat this as a fatal error, but log it in a loud way.
log.Errorf("Unable to generate downtime list for servers from topology: %v", err)
}

updateDowntimeFromTopology(namespaces, includedNss)

cacheAdMap := make(map[string]*server_structs.Advertisement) // key is serverAd.URL.String()
originAdMap := make(map[string]*server_structs.Advertisement) // key is serverAd.URL.String()
tGen := server_structs.TokenGen{}
Expand Down
Loading

0 comments on commit f543f9d

Please sign in to comment.