Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#1230 from haoming29/fix-origin-reg…
Browse files Browse the repository at this point in the history
…-not-updated

Fix origin registration status not updated
  • Loading branch information
haoming29 authored May 6, 2024
2 parents d708ec3 + d4845b1 commit 263ea8d
Showing 1 changed file with 33 additions and 66 deletions.
99 changes: 33 additions & 66 deletions origin/reg_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type (
// A TTL cache to save namespace registration status
// The TTL depends on the Status: For Status == StatusRegistrationError, the cache item won't expire,
// meaning a failure registration stays in the cache, until the registration is successful.
// For other Status, TTL is set to 5min
var registrationsStatus = ttlcache.New(ttlcache.WithTTL[string, RegistrationStatus](5 * time.Minute))
// For other Status, TTL is set to 1 min
var registrationsStatus = ttlcache.New(ttlcache.WithTTL[string, RegistrationStatus](1 * time.Minute))

var RegistryNotImplErr = errors.New("the running version of the registry didn't implmenet this function")

Expand Down Expand Up @@ -122,31 +122,35 @@ func FetchRegStatus(prefixes []string) (*server_structs.CheckNamespaceCompleteRe

// Fetch the registration status, generate access token for editing the
// registration at the registry, and store the status to the TTL cache
func FetchAndSetRegStatus(prefix string) error {
res, err := FetchRegStatus([]string{prefix})
func FetchAndSetRegStatus(prefixes ...string) error {
res, err := FetchRegStatus(prefixes)
if err == RegistryNotImplErr {
registrationsStatus.Set(
prefix,
RegistrationStatus{Status: RegStatusNotSupported, Msg: RegistryNotImplErr.Error()},
ttlcache.DefaultTTL,
)
for _, prefix := range prefixes {
registrationsStatus.Set(
prefix,
RegistrationStatus{Status: RegStatusNotSupported, Msg: RegistryNotImplErr.Error()},
ttlcache.DefaultTTL,
)
}
return nil // If not implemented, we simply set the status to unknown and return
} else if err != nil {
return err
}
result, ok := res.Results[prefix]
if !ok {
return fmt.Errorf("registry response does not contain status for prefix %s", prefix)
}
internalStatus := RegIncomplete
if result.Completed {
internalStatus = RegCompleted
for _, prefix := range prefixes {
result, ok := res.Results[prefix]
if !ok {
return fmt.Errorf("registry response does not contain status for prefix %s", prefix)
}
internalStatus := RegIncomplete
if result.Completed {
internalStatus = RegCompleted
}
registrationsStatus.Set(
prefix,
RegistrationStatus{Status: internalStatus, EditUrl: result.EditUrl, Msg: result.Msg},
ttlcache.DefaultTTL,
)
}
registrationsStatus.Set(
prefix,
RegistrationStatus{Status: internalStatus, EditUrl: result.EditUrl, Msg: result.Msg},
ttlcache.DefaultTTL,
)
return nil
}

Expand Down Expand Up @@ -175,61 +179,24 @@ func wrapExportsByStatus(exports []server_utils.OriginExport) ([]exportWithStatu
return wrappedExports, nil
}

// fetch and populate the cache with the result
resStatus, err := FetchRegStatus(prefixQ)
// For registry <7.8, this function is not supported
if err == RegistryNotImplErr {
for _, export := range fetchQ {
wrappedExport := exportWithStatus{
Status: RegStatusNotSupported,
OriginExport: export,
}
wrappedExports = append(wrappedExports, wrappedExport)
registrationsStatus.Set(
export.FederationPrefix,
RegistrationStatus{Status: RegStatusNotSupported, Msg: RegistryNotImplErr.Error()},
ttlcache.DefaultTTL,
)
}
} else if err != nil {
// fetch and populate the cache with the result in a batch
if err := FetchAndSetRegStatus(prefixQ...); err != nil {
return nil, errors.Wrap(err, "failed to fetch registration status from the registry")
}

// Populate the fetched items
for _, export := range fetchQ {
status, ok := resStatus.Results[export.FederationPrefix]
if !ok {
statusErrMsg := fmt.Sprintf("status for the prefix %s was not found from registry response", export.FederationPrefix)
if cachedItem := registrationsStatus.Get(export.FederationPrefix); cachedItem != nil {
regStatus := cachedItem.Value()
wrappedExport := exportWithStatus{
Status: RegStatusNotSupported,
StatusDescription: statusErrMsg,
Status: regStatus.Status,
EditUrl: regStatus.EditUrl,
StatusDescription: regStatus.Msg,
OriginExport: export,
}
wrappedExports = append(wrappedExports, wrappedExport)
registrationsStatus.Set(
export.FederationPrefix,
RegistrationStatus{Status: RegStatusNotSupported, Msg: statusErrMsg},
ttlcache.DefaultTTL,
)
} else {
internalStatus := RegIncomplete
if status.Completed {
internalStatus = RegCompleted
}
wrappedExport := exportWithStatus{
Status: internalStatus,
EditUrl: status.EditUrl,
StatusDescription: status.Msg,
OriginExport: export,
}
wrappedExports = append(wrappedExports, wrappedExport)
registrationsStatus.Set(
export.FederationPrefix,
RegistrationStatus{Status: internalStatus, EditUrl: status.EditUrl, Msg: status.Msg},
ttlcache.DefaultTTL,
)
log.Errorf("failed to get the registration status from internal cache for %s", export.FederationPrefix)
}
}

return wrappedExports, nil
}

0 comments on commit 263ea8d

Please sign in to comment.