Skip to content

Commit

Permalink
Give the Director the ability to detect version mismatches
Browse files Browse the repository at this point in the history
This is a port of PR PelicanPlatform#178. Creating a new PR was simpler route to fix the rebase
  • Loading branch information
jhiemstrawisc committed Oct 5, 2023
1 parent d0729f2 commit ec4fdbd
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 10 deletions.
37 changes: 33 additions & 4 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package client

import (
"encoding/json"
"io"
"net/http"
"net/url"
"sort"
Expand All @@ -30,6 +32,10 @@ import (
log "github.com/sirupsen/logrus"
)

type directorResponse struct {
Error string `json:"error"`
}

// Simple parser to that takes a "values" string from a header and turns it
// into a map of key/value pairs
func HeaderParser(values string) (retMap map[string]string) {
Expand Down Expand Up @@ -129,16 +135,39 @@ func QueryDirector(source string, directorUrl string) (resp *http.Response, err
},
}

log.Debugln("Querying OSDF Director at", resourceUrl)
resp, err = client.Get(resourceUrl)
log.Debugln("Director's response:", resp)
req, err := http.NewRequest("GET", resourceUrl, nil)
if err != nil {
log.Errorln("Failed to create an HTTP request:", err)
return nil, err
}

// Include the Client's version as a User-Agent header. The Director will decide
// if it supports the version, and provide an error message in the case that it
// cannot.
userAgent := "pelican-client/" + ObjectClientOptions.Version
req.Header.Set("User-Agent", userAgent)

// Perform the HTTP request
resp, err = client.Do(req)

if err != nil {
log.Errorln("Failed to get response from OSDF Director:", err)
log.Errorln("Failed to get response from the director:", err)
return
}

defer resp.Body.Close()
log.Debugln("Director's response:", resp)

// Check HTTP response -- should be 307 (redirect), else something went wrong
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 307 {
var respErr directorResponse
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return nil, errors.Wrap(unmarshalErr, "Could not unmarshall the director's response")
}
return nil, errors.Errorf("The director reported an error: %s\n", respErr.Error)
}

return
}

Expand Down
4 changes: 2 additions & 2 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestQueryDirector(t *testing.T) {
expectedLocation := "http://redirect.com"
handler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Location", expectedLocation)
w.WriteHeader(http.StatusFound)
w.WriteHeader(http.StatusTemporaryRedirect)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
Expand All @@ -206,7 +206,7 @@ func TestQueryDirector(t *testing.T) {
}

// Check the HTTP status code
if actualResp.StatusCode != http.StatusFound {
if actualResp.StatusCode != http.StatusTemporaryRedirect {
t.Errorf("Expected HTTP status code %d, but got %d", http.StatusFound, actualResp.StatusCode)
}
}
77 changes: 77 additions & 0 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ import (
"net/netip"
"net/url"
"path"
"regexp"
"strings"

"github.com/gin-gonic/gin"
"github.com/hashicorp/go-version"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

var (
minClientVersion, _ = version.NewVersion("7.0.0")
minOriginVersion, _ = version.NewVersion("7.0.0")
)

func getRedirectURL(reqPath string, ad ServerAd, requiresAuth bool) (redirectURL url.URL) {
var serverURL url.URL
if requiresAuth {
Expand Down Expand Up @@ -85,7 +93,61 @@ func getFinalRedirectURL(rurl url.URL, authzEscaped string) string {
return rurl.String()
}

func versionCompatCheck(ginCtx *gin.Context) error {
// Check that the version of whichever service (eg client, origin, etc) is talking to the Director
// is actually something the Director thinks it can communicate with

// The service/version is sent via User-Agent header in the form "pelican-<service>/<version>"
userAgentSlc := ginCtx.Request.Header["User-Agent"]
if len(userAgentSlc) < 1 {
return errors.New("No user agent could be found")
}

// gin gives us a slice of user agents. Since pelican services should only ever
// send one UA, assume that it is the 0-th element of the slice.
userAgent := userAgentSlc[0]

// Make sure we're working with something that's formatted the way we expect. If we
// don't match, then we're definitely not coming from one of the services, so we
// let things go without an error. Maybe someone is using curl?
uaRegExp := regexp.MustCompile(`^pelican-[^\/]+\/\d+\.\d+\.\d+`)
if matches := uaRegExp.MatchString(userAgent); !matches {
return nil
}

userAgentSplit := strings.Split(userAgent, "/")
// Grab the actual service/version that's using the Director. There may be different versioning
// requirements between origins, clients, and other services.
service := (strings.Split(userAgentSplit[0], "-"))[1]
reqVerStr := userAgentSplit[1]
reqVer, err := version.NewVersion(reqVerStr)
if err != nil {
return errors.Wrapf(err, "Could not parse service version as a semantic version: %s\n", reqVerStr)
}

var minCompatVer *version.Version
switch service {
case "client":
minCompatVer = minClientVersion
case "origin":
minCompatVer = minOriginVersion
}

if reqVer.LessThan(minCompatVer) {
return errors.Errorf("The director does not support your %s version (%s). Please update to %s or newer.", service, reqVer.String(), minCompatVer.String())
}

return nil
}

func RedirectToCache(ginCtx *gin.Context) {
err := versionCompatCheck(ginCtx)
if err != nil {
log.Debugf("A version incompatibility was encountered while redirecting to a cache and no response was served: %v", err)
ginCtx.JSON(500, gin.H{"error": "Incompatible versions detected: " + fmt.Sprintf("%v", err)})
return
}

reqPath := path.Clean("/" + ginCtx.Request.URL.Path)
reqPath = strings.TrimPrefix(reqPath, "/api/v1.0/director/object")
ipAddr, err := getRealIP(ginCtx)
Expand Down Expand Up @@ -157,6 +219,13 @@ func RedirectToCache(ginCtx *gin.Context) {
}

func RedirectToOrigin(ginCtx *gin.Context) {
err := versionCompatCheck(ginCtx)
if err != nil {
log.Debugf("A version incompatibility was encountered while redirecting to an origin and no response was served: %v", err)
ginCtx.JSON(500, gin.H{"error": "Incompatible versions detected: " + fmt.Sprintf("%v", err)})
return
}

reqPath := path.Clean("/" + ginCtx.Request.URL.Path)
reqPath = strings.TrimPrefix(reqPath, "/api/v1.0/director/origin")

Expand Down Expand Up @@ -222,6 +291,14 @@ func RegisterOrigin(ctx *gin.Context) {
ctx.JSON(401, gin.H{"error": "Bearer token not present in the 'Authorization' header"})
return
}

err := versionCompatCheck(ctx)
if err != nil {
log.Debugf("A version incompatibility was encountered while registering an origin and no response was served: %v", err)
ctx.JSON(500, gin.H{"error": "Incompatible versions detected: " + fmt.Sprintf("%v", err)})
return
}

ad := OriginAdvertise{}
if ctx.ShouldBind(&ad) != nil {
ctx.JSON(400, gin.H{"error": "Invalid origin registration"})
Expand Down
41 changes: 39 additions & 2 deletions director/redirect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ func TestDirectorRegistration(t *testing.T) {
r.POST("/", RegisterOrigin)
c.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer([]byte(`{"Namespaces": [{"Path": "/foo/bar", "URL": "https://get-your-tokens.org"}]}`)))

c.Request.Header.Set("Authorization", string(signed))
c.Request.Header.Set("Authorization", "Bearer "+string(signed))
c.Request.Header.Set("Content-Type", "application/json")
// Hard code the current min version. When this test starts failing because of new stuff in the Director,
// we'll know that means it's time to update the min version in redirect.go
c.Request.Header.Set("User-Agent", "pelican-origin/7.0.0")

r.ServeHTTP(w, c.Request)

Expand Down Expand Up @@ -183,11 +186,45 @@ func TestDirectorRegistration(t *testing.T) {
rInv.POST("/", RegisterOrigin)
cInv.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer([]byte(`{"Namespaces": [{"Path": "/foo/bar", "URL": "https://get-your-tokens.org"}]}`)))

cInv.Request.Header.Set("Authorization", string(signedInv))
cInv.Request.Header.Set("Authorization", "Bearer "+string(signedInv))
cInv.Request.Header.Set("Content-Type", "application/json")
// Hard code the current min version. When this test starts failing because of new stuff in the Director,
// we'll know that means it's time to update the min version in redirect.go
cInv.Request.Header.Set("User-Agent", "pelican-origin/7.0.0")

rInv.ServeHTTP(wInv, cInv.Request)
assert.Equal(t, 400, wInv.Result().StatusCode, "Expected failing status code of 400")
body, _ := io.ReadAll(wInv.Result().Body)
assert.Equal(t, `{"error":"Authorization token verification failed"}`, string(body), "Failure wasn't because token verification failed")

// Repeat again but with bad origin version
wInv = httptest.NewRecorder()
cInv, rInv = gin.CreateTestContext(wInv)
tsInv = httptest.NewServer(http.HandlerFunc(func(wInv http.ResponseWriter, req *http.Request) {
assert.Equal(t, "POST", req.Method, "Not POST Method")
_, err := wInv.Write([]byte(":)"))
assert.NoError(t, err)
}))
defer tsInv.Close()
cInv.Request = &http.Request{
URL: &url.URL{},
}

// Sign token with the good key
signedInv, err = jwt.Sign(tok, jwt.WithKey(jwa.ES512, pKey))
assert.NoError(t, err, "Error signing token")

// Create the request and set the headers
rInv.POST("/", RegisterOrigin)
cInv.Request, _ = http.NewRequest(http.MethodPost, "/", bytes.NewBuffer([]byte(`{"Namespaces": [{"Path": "/foo/bar", "URL": "https://get-your-tokens.org"}]}`)))

cInv.Request.Header.Set("Authorization", "Bearer "+string(signedInv))
cInv.Request.Header.Set("Content-Type", "application/json")
cInv.Request.Header.Set("User-Agent", "pelican-origin/6.0.0")

rInv.ServeHTTP(wInv, cInv.Request)
assert.Equal(t, 500, wInv.Result().StatusCode, "Expected failing status code of 500")
body, _ = io.ReadAll(wInv.Result().Body)
assert.Equal(t, `{"error":"Incompatible versions detected: The director does not support your origin version (6.0.0). Please update to 7.0.0 or newer."}`,
string(body), "Failure wasn't because of version incompatibility")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/go-kit/log v0.2.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd
github.com/hashicorp/go-version v1.6.0
github.com/jellydator/ttlcache/v3 v3.0.1
github.com/jsipprell/keyctl v1.0.4-0.20211208153515-36ca02672b6c
github.com/lestrrat-go/jwx/v2 v2.0.11
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
Expand Down
15 changes: 13 additions & 2 deletions origin_ui/advertise.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package origin_ui
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"
Expand All @@ -34,6 +34,10 @@ import (
"github.com/spf13/viper"
)

type directorResponse struct {
Error string `json:"error"`
}

func PeriodicAdvertiseOrigin() error {
ticker := time.NewTicker(1 * time.Minute)
go func() {
Expand Down Expand Up @@ -119,6 +123,8 @@ func AdvertiseOrigin() error {

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
userAgent := "pelican-origin/" + client.ObjectClientOptions.Version
req.Header.Set("User-Agent", userAgent)

// We should switch this over to use the common transport, but for that to happen
// that function needs to be exported from pelican
Expand All @@ -131,8 +137,13 @@ func AdvertiseOrigin() error {
}
defer resp.Body.Close()

body, _ = io.ReadAll(resp.Body)
if resp.StatusCode > 299 {
return fmt.Errorf("Error response %v from director registration: %v", resp.StatusCode, resp.Status)
var respErr directorResponse
if unmarshalErr := json.Unmarshal(body, &respErr); unmarshalErr != nil { // Error creating json
return errors.Wrapf(unmarshalErr, "Could not unmarshall the director's response, which responded %v from director registration: %v", resp.StatusCode, resp.Status)
}
return errors.Errorf("Error during director registration: %v\n", respErr.Error)
}

return nil
Expand Down

0 comments on commit ec4fdbd

Please sign in to comment.