diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index e287d1f43..d34765855 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -16,6 +16,37 @@ import ( "github.com/interuss/stacktrace" ) +// subscriptionIsImplicitAndOnlyAttachedToOIR will check if: +// - the subscription is defined and is implicit +// - the subscription is attached to the specified operational intent +// - the subscription is not attached to any other operational intent +// +// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned, +// the subscription can be safely removed after the operational intent is deleted or attached to another subscription. +// +// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method. +// +// See https://github.com/interuss/dss/issues/1059 for more details +func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) { + if subscription == nil { + return false, nil + } + if !subscription.ImplicitSubscription { + return false, nil + } + // Get the Subscription's dependent OperationalIntents + dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID) + if err != nil { + return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents") + } + if len(dependentOps) == 0 { + return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") + } else if len(dependentOps) == 1 && dependentOps[0] == oirID { + return true, nil + } + return false, nil +} + // DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at // the specified version. func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest, @@ -69,29 +100,20 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest } // Get the Subscription supporting the OperationalIntent, if one is defined - var sub *scdmodels.Subscription - removeImplicitSubscription := false + var previousSubscription *scdmodels.Subscription if old.SubscriptionID != nil { - sub, err = r.GetSubscription(ctx, *old.SubscriptionID) + previousSubscription, err = r.GetSubscription(ctx, *old.SubscriptionID) if err != nil { return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo") } - if sub == nil { + if previousSubscription == nil { return stacktrace.NewError("OperationalIntent's Subscription missing from repo") } + } - if sub.ImplicitSubscription { - // Get the Subscription's dependent OperationalIntents - dependentOps, err := r.GetDependentOperationalIntents(ctx, sub.ID) - if err != nil { - return stacktrace.Propagate(err, "Could not find dependent OperationalIntents") - } - if len(dependentOps) == 0 { - return stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") - } else if len(dependentOps) == 1 { - removeImplicitSubscription = true - } - } + removeImplicitSubscription, err := subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, id, previousSubscription) + if err != nil { + return stacktrace.Propagate(err, "Could not determine if Subscription can be removed") } // Gather the subscriptions that need to be notified @@ -119,7 +141,7 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest // removeImplicitSubscription is only true if the OIR had a subscription defined if removeImplicitSubscription { // Automatically remove a now-unused implicit Subscription - err = r.DeleteSubscription(ctx, sub.ID) + err = r.DeleteSubscription(ctx, previousSubscription.ID) if err != nil { return stacktrace.Propagate(err, "Unable to delete associated implicit Subscription") } @@ -810,33 +832,63 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize // For an OIR being created, version starts at 0 version := int32(0) + var previousSub *scdmodels.Subscription if old != nil { version = int32(old.Version) + // Fetch the previous OIR's subscription if it exists + if old.SubscriptionID != nil { + previousSub, err = r.GetSubscription(ctx, *old.SubscriptionID) + if err != nil { + return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo") + } + } } - var sub *scdmodels.Subscription + // Determine if the previous subscription is being replaced and if it will need to be cleaned up + previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID + removePreviousImplicitSubscription := false + if previousSubIsBeingReplaced { + removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, validParams.id, previousSub) + if err != nil { + return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed") + } + } + + // attachedSub is the subscription that will end up being attached to the OIR + // it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters + attachedSub := previousSub if validParams.subscriptionID.Empty() { - // Create an implicit subscription if one has been requested. - // Requesting neither an explicit nor an implicit subscription is allowed for ACCEPTED states: - // for other states, an error will have been returned earlier. - // if no implicit subscription is requested and we reached this point, we will proceed without subscription + // No subscription ID was provided: + // check if an implicit subscription should be created, otherwise do nothing if validParams.implicitSubscription.requested { - if sub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil { + // Parameters for a new implicit subscription have been passed: we will create + // a new implicit subscription even if another subscription was attached to this OIR before, + // regardless of whether it was an implicit subscription or not. + if attachedSub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil { return stacktrace.Propagate(err, "Failed to create implicit subscription") } + } else { + // If no subscription ID is provided and no implicit subscription is requested, + // the OIR should have no attached subscription + attachedSub = nil } } else { // Attempt to rely on the specified subscription - sub, err = r.GetSubscription(ctx, validParams.subscriptionID) - if err != nil { - return stacktrace.Propagate(err, "Unable to get depended-upon Subscription") - } - if sub == nil { - return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Depended-upon Subscription %s does not exist", validParams.subscriptionID) + // If it is different from the previous subscription, we need to fetch it from the store + // in order to ensure it correctly covers the OIR. + // We do the check below in order to avoid re-fetching the subscription if it has not changed + if attachedSub == nil || previousSubIsBeingReplaced { + attachedSub, err = r.GetSubscription(ctx, validParams.subscriptionID) + if err != nil { + return stacktrace.Propagate(err, "Unable to get requested Subscription from store") + } + if attachedSub == nil { + return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Specified Subscription %s does not exist", validParams.subscriptionID) + } } // We need to confirm that it is owned by the calling manager - if sub.Manager != manager { + if attachedSub.Manager != manager { return stacktrace.Propagate( // We do a bit of wrapping gymnastics because the root error message will be sent in the response, // and we don't want to include the effective manager in there. @@ -845,27 +897,27 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize // The propagation message will end in the logs and help with debugging. "Subscription %s owned by %s, but %s attempted to use it for an OperationalIntent", validParams.subscriptionID, - sub.Manager, + attachedSub.Manager, manager, ) } // We need to ensure the subscription covers the OIR's geo-temporal extent - sub, err = ensureSubscriptionCoversOIR(ctx, r, sub, validParams) + attachedSub, err = ensureSubscriptionCoversOIR(ctx, r, attachedSub, validParams) if err != nil { return stacktrace.Propagate(err, "Failed to ensure subscription covers OIR") } } if validParams.state.RequiresKey() { - responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, sub) + responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, attachedSub) if err != nil { return stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), "Failed to validate key") } } // Construct the new OperationalIntent - op := validParams.toOIR(manager, sub, version+1) + op := validParams.toOIR(manager, attachedSub, version+1) // Upsert the OperationalIntent op, err = r.UpsertOperationalIntent(ctx, op) @@ -873,6 +925,14 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } + // Check if the previously attached subscription should be removed + if removePreviousImplicitSubscription { + err = r.DeleteSubscription(ctx, previousSub.ID) + if err != nil { + return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription") + } + } + notifyVolume, err := computeNotificationVolume(old, validParams.uExtent) if err != nil { return stacktrace.Propagate(err, "Failed to compute notification volume")