Skip to content

Commit

Permalink
[dss] Reduce contention on OI creation/mutation endpoint (#1004)
Browse files Browse the repository at this point in the history
  • Loading branch information
barroco authored Feb 23, 2024
1 parent b42f7e5 commit 9147b9b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ func (a *Server) PutOperationalIntentReference(ctx context.Context, manager stri
action := func(ctx context.Context, r repos.Repository) (err error) {
var version int32 // Version of the Operational Intent (0 means creation requested).

// Lock subscriptions based on the cell to reduce the number of retries under concurrent load.
// See issue #1002 for details.
err = r.LockSubscriptionsOnCells(ctx, cells)
if err != nil {
return stacktrace.Propagate(err, "Unable to acquire lock")
}

// Get existing OperationalIntent, if any, and validate request
old, err := r.GetOperationalIntent(ctx, id)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/scd/repos/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package repos

import (
"context"

"github.com/golang/geo/s2"
dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
)
Expand Down Expand Up @@ -51,6 +51,9 @@ type Subscription interface {
// specified Subscription and returns the resulting corresponding
// notification indices.
IncrementNotificationIndices(ctx context.Context, subscriptionIds []dssmodels.ID) ([]int, error)

// LockSubscriptionsOnCells locks the subscriptions of interest on specific cells.
LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error
}

type UssAvailability interface {
Expand Down
31 changes: 31 additions & 0 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,34 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds

return indices, nil
}

func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error {
cids := make([]int64, len(cells))

for i, cell := range cells {
cids[i] = int64(cell)
}

var pgCids pgtype.Int8Array
err := pgCids.Set(cids)
if err != nil {
return stacktrace.Propagate(err, "Failed to convert array to jackc/pgtype")
}

const query = `
SELECT
id
FROM
scd_subscriptions
WHERE
cells && $1
FOR UPDATE
`

_, err = c.q.Exec(ctx, query, pgCids)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", query)
}

return nil
}

0 comments on commit 9147b9b

Please sign in to comment.