Skip to content

Commit

Permalink
[scd] OIR upsert: push down cleanup of implicit subscription into CRDB
Browse files Browse the repository at this point in the history
  • Loading branch information
Shastick committed Sep 10, 2024
1 parent 9e399b3 commit fdb7849
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 53 deletions.
46 changes: 0 additions & 46 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,6 @@ import (
"github.com/interuss/stacktrace"
)

// subscriptionIsImplicitAndOnlyAttachedToOIR will check if:
// - the subscription is defined and is implicit
// - the subscription is attached to the specified operational intent
// - the subscription is not attached to any other operational intent
//
// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned,
// the subscription can be safely removed after the operational intent is deleted or attached to another subscription.
//
// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method.
//
// See https://github.com/interuss/dss/issues/1059 for more details
func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) {
if subscription == nil {
return false, nil
}
if !subscription.ImplicitSubscription {
return false, nil
}
// Get the Subscription's dependent OperationalIntents
dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID)
if err != nil {
return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents")
}
if len(dependentOps) == 0 {
return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents")
} else if len(dependentOps) == 1 && dependentOps[0] == oirID {
return true, nil
}
return false, nil
}

// DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at
// the specified version.
func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest,
Expand Down Expand Up @@ -837,13 +806,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize

// Determine if the previous subscription is being replaced and if it will need to be cleaned up
previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID
removePreviousImplicitSubscription := false
if previousSubIsBeingReplaced {
removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, validParams.id, previousSub)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed")
}
}

// attachedSub is the subscription that will end up being attached to the OIR
// it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters
Expand Down Expand Up @@ -916,14 +878,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo")
}

// Check if the previously attached subscription should be removed
if removePreviousImplicitSubscription {
err = r.DeleteSubscription(ctx, previousSub.ID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription")
}
}

notifyVolume, err := computeNotificationVolume(old, validParams.uExtent)
if err != nil {
return stacktrace.Propagate(err, "Failed to compute notification volume")
Expand Down
52 changes: 45 additions & 7 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,51 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err
func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
var (
upsertOperationsQuery = fmt.Sprintf(`
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
WITH previous_implicit_sub AS (
-- get the current subscription id if:
-- - it exists
-- - it is implicit
-- - the OIR's subscription is being updated (ie, the new subscription id is different from the old one)
SELECT
scd_subscriptions.id
FROM scd_operations
JOIN scd_subscriptions ON scd_operations.subscription_id = scd_subscriptions.id
WHERE
scd_operations.id = $1
AND
scd_subscriptions.implicit = true
AND
-- in SQL, X != NULL will always be false:
-- this condition needs to cover cases where the new subscription is undefined,
-- so we add an explicit 'IS NULL' check.
(scd_subscriptions.id != $9 OR $9 IS NULL)
),
upserted_oir AS (
-- actual insertion/update statement
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s
),
dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being mutated (!)
SELECT id
FROM scd_operations
WHERE subscription_id = (SELECT id FROM previous_implicit_sub)
),
deleted_subscription_id AS (
-- We are guaranteed to only delete something here if the OIR is being updated. Upon creation
-- previous_implicit_sub will be empty
DELETE FROM scd_subscriptions
WHERE id = (SELECT id FROM previous_implicit_sub)
AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being updated is still counted here, hence a value of 1
RETURNING id
)
-- return the upserted OIR
SELECT * FROM upserted_oir
`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
)

cids := make([]int64, len(operation.Cells))
Expand Down

0 comments on commit fdb7849

Please sign in to comment.