Skip to content

Commit

Permalink
Make the registry aware of topology
Browse files Browse the repository at this point in the history
I still need to sort out how this will play with the key chaining PR.
  • Loading branch information
jhiemstrawisc committed Dec 6, 2023
1 parent cafecbb commit c1bfee6
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 79 deletions.
17 changes: 14 additions & 3 deletions cmd/namespace_registry_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"syscall"

"github.com/pkg/errors"

nsregistry "github.com/pelicanplatform/pelican/namespace_registry"
"github.com/pelicanplatform/pelican/web_ui"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/namespace_registry"
"github.com/pelicanplatform/pelican/web_ui"
)

func serveNamespaceRegistry( /*cmd*/ *cobra.Command /*args*/, []string) error {
Expand All @@ -40,6 +41,16 @@ func serveNamespaceRegistry( /*cmd*/ *cobra.Command /*args*/, []string) error {
return errors.Wrap(err, "Unable to initialize the namespace registry database")
}

if config.GetPreferredPrefix() == "OSDF" {
log.Info("Populating registry with namespaces from OSG topology service...")
if err := nsregistry.PopulateTopology(); err != nil {
panic(errors.Wrap(err, "Unable to populate topology table"))
}

// Checks topology for updates every 10 minutes
go nsregistry.PeriodicTopologyReload()
}

engine, err := web_ui.GetEngine()
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions config/resources/osdf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ Xrootd:
Federation:
DiscoveryUrl: osg-htc.org
TopologyNamespaceURL: https://topology.opensciencegrid.org/stashcache/namespaces.json
TopologyReloadInterval: 10
76 changes: 7 additions & 69 deletions director/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,19 @@
package director

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/pelicanplatform/pelican/param"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

type (
Server struct {
AuthEndpoint string `json:"auth_endpoint"`
Endpoint string `json:"endpoint"`
Resource string `json:"resource"`
}

CredentialGeneration struct {
BasePath string `json:"base_path"`
Issuer string `json:"issuer"`
MaxScopeDepth int `json:"max_scope_depth"`
Strategy string `json:"strategy"`
VaultIssuer string `json:"vault_issuer"`
VaultServer string `json:"vault_server"`
}

Namespace struct {
Caches []Server `json:"caches"`
Origins []Server `json:"origins"`
CredentialGeneration CredentialGeneration `json:"credential_generation"`
DirlistHost string `json:"dirlisthost"`
Path string `json:"path"`
ReadHTTPS bool `json:"readhttps"`
UseTokenOnRead bool `json:"usetokenonread"`
WritebackHost string `json:"writebackhost"`
}

NamespaceJSON struct {
Caches []Server `json:"caches"`
Namespaces []Namespace `json:"namespaces"`
}
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/utils"
)

func parseServerAd(server Server, serverType ServerType) ServerAd {

func parseServerAd(server utils.Server, serverType ServerType) ServerAd {
serverAd := ServerAd{}
serverAd.Type = serverType
serverAd.Name = server.Resource
Expand Down Expand Up @@ -101,37 +67,9 @@ func parseServerAd(server Server, serverType ServerType) ServerAd {

// Populate internal cache with origin/cache ads
func AdvertiseOSDF() error {
topoNamespaceUrl := param.Federation_TopologyNamespaceUrl.GetString()
if topoNamespaceUrl == "" {
return errors.New("Topology namespaces.json configuration option (`Federation.TopologyNamespaceURL`) not set")
}

req, err := http.NewRequest("GET", topoNamespaceUrl, nil)
if err != nil {
return errors.Wrap(err, "Failure when getting OSDF namespace data from topology")
}

req.Header.Set("Accept", "application/json")

client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return errors.Wrap(err, "Failure when getting response for OSDF namespace data")
}
defer resp.Body.Close()

if resp.StatusCode > 299 {
return fmt.Errorf("Error response %v from OSDF namespace endpoint: %v", resp.StatusCode, resp.Status)
}

respBytes, err := io.ReadAll(resp.Body)
namespaces, err := utils.GetTopologyJSON()
if err != nil {
return errors.Wrap(err, "Failure when reading OSDF namespace response")
}

var namespaces NamespaceJSON
if err = json.Unmarshal(respBytes, &namespaces); err != nil {
return errors.Wrapf(err, "Failure when parsing JSON response from topology URL %v", topoNamespaceUrl)
return errors.Wrapf(err, "Failed to get topology JSON")
}

cacheAdMap := make(map[ServerAd][]NamespaceAd)
Expand Down Expand Up @@ -183,7 +121,7 @@ func PeriodicCacheReload() {
// The ad cache times out every 15 minutes, so update it every
// 10. If a key isn't updated, it will survive for 5 minutes
// and then disappear
time.Sleep(time.Minute * 10)
time.Sleep(time.Minute * param.Federation_TopologyReloadInterval.GetDuration())
err := AdvertiseOSDF()
if err != nil {
log.Warningf("Failed to re-advertise: %s. Will try again later",
Expand Down
10 changes: 9 additions & 1 deletion docs/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,15 @@ description: >-
type: url
osdf_default: https://topology.opensciencegrid.org/stashcache/namespaces.json
default: none
components: ["director"]
components: ["director", "nsregistry"]
---
name: Federation.TopologyReloadInterval
description: >-
The frequency, in minutes, that topology should be reloaded.
type: duration
osdf_default: 10
default: none
components: ["director", "nsregistry"]
---
name: Federation.DirectorUrl
description: >-
Expand Down
161 changes: 156 additions & 5 deletions namespace_registry/registry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ import (
"database/sql"
"os"
"path/filepath"
"time"

"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/pelicanplatform/pelican/param"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"

// commented sqlite driver requires CGO
// _ "github.com/mattn/go-sqlite3" // SQLite driver
_ "modernc.org/sqlite"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/param"
"github.com/pelicanplatform/pelican/utils"
)

type Namespace struct {
Expand Down Expand Up @@ -63,13 +66,39 @@ func createNamespaceTable() {

_, err := db.Exec(query)
if err != nil {
log.Fatalf("Failed to create table: %v", err)
log.Fatalf("Failed to create namespace table: %v", err)
}
}

func createTopologyTable() {
query := `
CREATE TABLE IF NOT EXISTS topology (
id INTEGER PRIMARY KEY AUTOINCREMENT,
prefix TEXT NOT NULL UNIQUE
);`

_, err := db.Exec(query)
if err != nil {
log.Fatalf("Failed to create topology table: %v", err)
}
}

func namespaceExists(prefix string) (bool, error) {
checkQuery := `SELECT prefix FROM namespace WHERE prefix = ?`
result, err := db.Query(checkQuery, prefix)
var checkQuery string
var args []interface{}
if config.GetPreferredPrefix() == "OSDF" {
checkQuery = `
SELECT prefix FROM namespace WHERE prefix = ?
UNION
SELECT prefix FROM topology WHERE prefix = ?
`
args = []interface{}{prefix, prefix}
} else {
checkQuery = `SELECT prefix FROM namespace WHERE prefix = ?`
args = []interface{}{prefix}
}

result, err := db.Query(checkQuery, args...)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -211,6 +240,128 @@ func InitializeDB() error {
return db.Ping()
}

func modifyTopologyTable(prefixes []string, mode string) error {
if len(prefixes) == 0 {
return nil // nothing to do!
}

var query string
switch mode {
case "add":
query = `INSERT INTO topology (prefix) VALUES (?)`
case "del":
query = `DELETE FROM topology WHERE prefix = ?`
default:
return errors.New("invalid mode, use 'add' or 'del'")
}

tx, err := db.Begin()
if err != nil {
return err
}

stmt, err := tx.Prepare(query)
if err != nil {
return err
}
defer stmt.Close()

for _, prefix := range prefixes {
_, err := stmt.Exec(prefix)
if err != nil {
if errRoll := tx.Rollback(); errRoll != nil {
log.Errorln("Failed to rollback transaction:", errRoll)
}
return err
}
}

// One nice batch commit
err = tx.Commit()
if err != nil {
return err
}

return nil
}

// Create a table in the registry to store namespace prefixes from topology
func PopulateTopology() error {
// Create the toplogy table
createTopologyTable()

// The topology table may already exist from before, it may not. Because of this
// we need to add to the table any prefixes that are in topology, delete from the
// table any that aren't in topology, and skip any that exist in both.

// First get all that are in the table. At time of writing, this is ~57 entries,
// and that number should be monotonically decreasing. We're safe to load into mem.
retrieveQuery := "SELECT prefix FROM topology"
rows, err := db.Query(retrieveQuery)
if err != nil {
return errors.Wrap(err, "Could not construct topology database query")
}
defer rows.Close()

nsFromTopoTable := make(map[string]bool)
for rows.Next() {
var existingPrefix string
if err := rows.Scan(&existingPrefix); err != nil {
return errors.Wrap(err, "Error while scanning rows from topology table")
}
nsFromTopoTable[existingPrefix] = true
}
rows.Close()

// Next, get the values from topology
namespaces, err := utils.GetTopologyJSON()
if err != nil {
return errors.Wrapf(err, "Failed to get topology JSON")
}

// Be careful here, the ns object we iterate over is from topology,
// and it's not the same ns object we use elsewhere in this file.
nsFromTopoJSON := make(map[string]bool)
for _, ns := range namespaces.Namespaces {
nsFromTopoJSON[ns.Path] = true
}

toAdd := []string{}
toDelete := []string{}
// If in topo and not in the table, add
for prefix := range nsFromTopoJSON {
if found := nsFromTopoTable[prefix]; !found {
toAdd = append(toAdd, prefix)
}
}
// If in table and not in topo, delete
for prefix := range nsFromTopoTable {
if found := nsFromTopoJSON[prefix]; !found {
toDelete = append(toDelete, prefix)
}
}

if err := modifyTopologyTable(toAdd, "add"); err != nil {
return errors.Wrap(err, "Failed to update topology table with new values")
}
if err := modifyTopologyTable(toDelete, "del"); err != nil {
return errors.Wrap(err, "Failed to clean old values from topology table")
}

return nil
}

func PeriodicTopologyReload() {
for {
time.Sleep(time.Minute * param.Federation_TopologyReloadInterval.GetDuration())
err := PopulateTopology()
if err != nil {
log.Warningf("Failed to re-populate topology table: %s. Will try again later",
err)
}
}
}

func ShutdownDB() {
err := db.Close()
if err != nil {
Expand Down
Loading

0 comments on commit c1bfee6

Please sign in to comment.