Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte] Yugabyte compatible implementation #1143

Merged
merged 11 commits into from
Feb 11, 2025
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,11 @@ jobs:
run: make down-locally
- name: Start local DSS instance
run: make start-locally
# TODO: [Yugabyte migration] Prober is failing randomly possibly due to misconfigured transaction isolation
# Test disabled to be addressed in a different increment
# - name: Probe local DSS instance
# run: make probe-locally
- name: Run Qualifier against local DSS instance
run: make qualify-locally
- name: Bring down local DSS instance
run: make down-locally
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX s_cell_idx ON subscriptions USING ybgin (cells);
CREATE INDEX isa_cell_idx ON identification_service_areas USING ybgin (cells);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This migration drops ybgin indexes due to existing limitation to use it with other indexes.
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.
DROP INDEX s_cell_idx;
DROP INDEX isa_cell_idx;

UPDATE schema_versions set schema_version = 'v1.0.1' WHERE onerow_enforcer = TRUE;

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE INDEX sc_cells_idx ON scd_constraints USING ybgin (cells);
CREATE INDEX ss_cells_idx ON scd_subscriptions USING ybgin (cells);
CREATE INDEX so_cells_idx ON scd_operations USING ybgin (cells);
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- This migration drops ybgin indexes due to existing limitation to use it with other indexes.
-- https://docs.yugabyte.com/preview/explore/ysql-language-features/indexes-constraints/gin/#limitations.
DROP INDEX sc_cells_idx;
DROP INDEX ss_cells_idx;
DROP INDEX so_cells_idx;

UPDATE schema_versions set schema_version = 'v1.0.1' WHERE onerow_enforcer = TRUE;

21 changes: 18 additions & 3 deletions build/dev/docker-compose_dss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ services:

local-dss-ybdb:
image: yugabytedb/yugabyte:2024.1.2.0-b77
command: bin/yugabyted start --background=false
# ysql_output_buffer_size needs to be increased to allow ysql to retry read restart errors. https://docs.yugabyte.com/preview/reference/configuration/yb-tserver/#ysql-output-buffer-size
command: bin/yugabyted start --background=false --tserver_flags="ysql_output_buffer_size=1048576"
ports:
- "7000:7000"
- "9000:9000"
Expand All @@ -35,6 +36,11 @@ services:
networks:
- dss_sandbox_default_network
profiles: ["with-yugabyte"]
healthcheck:
test: ["CMD", "/home/yugabyte/postgres/bin/pg_isready", "-h", "local-dss-ybdb"]
interval: 5s
timeout: 5s
retries: 10

local-dss-rid-bootstrapper-ybdb:
build:
Expand All @@ -43,7 +49,7 @@ services:
image: interuss-local/dss
command: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/rid --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
local-dss-ybdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
Expand All @@ -56,7 +62,7 @@ services:
image: interuss-local/dss
entrypoint: /usr/bin/db-manager migrate --schemas_dir=/db-schemas/yugabyte/scd --db_version "latest" --cockroach_host local-dss-ybdb --cockroach_user yugabyte --cockroach_port 5433
depends_on:
local-dss-crdb:
local-dss-ybdb:
condition: service_healthy
networks:
- dss_sandbox_default_network
Expand Down Expand Up @@ -94,6 +100,8 @@ services:
volumes:
- $PWD/../test-certs:/var/test-certs:ro
- $PWD/startup/core_service.sh:/startup/core_service.sh:ro
environment:
COMPOSE_PROFILES: ${COMPOSE_PROFILES}
command: /startup/core_service.sh ${DEBUG_ON:-0}
ports:
- "4000:4000"
Expand All @@ -103,13 +111,20 @@ services:
condition: service_completed_successfully
local-dss-scd-bootstrapper:
condition: service_completed_successfully
local-dss-rid-bootstrapper-ybdb:
condition: service_completed_successfully
required: false
local-dss-scd-bootstrapper-ybdb:
condition: service_completed_successfully
required: false
networks:
- dss_sandbox_default_network
healthcheck:
test: wget -O - 'http://localhost/healthy' || exit 1
interval: 3m
start_period: 30s
start_interval: 5s
profiles: ["", "with-yugabyte"]

local-dss-dummy-oauth:
build:
Expand Down
19 changes: 15 additions & 4 deletions build/dev/startup/core_service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@

DEBUG_ON=${1:-0}

# POSIX compliant test to check if with-yugabyte profile is enabled.
if [ "${COMPOSE_PROFILES#*"with-yugabyte"}" != "${COMPOSE_PROFILES}" ]; then
echo "Using Yugabyte"
DATASTORE_CONNECTION="-cockroach_host local-dss-ybdb -cockroach_user yugabyte --cockroach_port 5433"
else
echo "Using CockroachDB"
DATASTORE_CONNECTION="-cockroach_host local-dss-crdb"
fi

if [ "$DEBUG_ON" = "1" ]; then
echo "Debug Mode: on"

dlv --headless --listen=:4000 --api-version=2 --accept-multiclient exec --continue /usr/bin/core-service -- \
-cockroach_host local-dss-crdb \
# Linter is disabled to properly unwrap $DATASTORE_CONNECTION.
# shellcheck disable=SC2086
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@barroco barroco Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for the link. Let's address it in a subsequent PR due to its possible complexity.

dlv --headless --listen=:4000 --api-version=2 --accept-multiclient exec --continue /usr/bin/core-service -- ${DATASTORE_CONNECTION} \
-public_key_files /var/test-certs/auth2.pem \
-log_format console \
-dump_requests \
Expand All @@ -20,8 +30,9 @@ if [ "$DEBUG_ON" = "1" ]; then
else
echo "Debug Mode: off"

/usr/bin/core-service \
-cockroach_host local-dss-crdb \
# Linter is disabled to properly unwrap $DATASTORE_CONNECTION.
# shellcheck disable=SC2086
/usr/bin/core-service ${DATASTORE_CONNECTION} \
-public_key_files /var/test-certs/auth2.pem \
-log_format console \
-dump_requests \
Expand Down
17 changes: 12 additions & 5 deletions pkg/rid/store/cockroach/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import (
)

const (
// currentMajorSchemaVersion is the current major schema version.
currentMajorSchemaVersion = 4
// The current major schema version per datastore type.
currentCrdbMajorSchemaVersion = 4
currentYugabyteMajorSchemaVersion = 1

// Records expire if current time is <expiredDurationInMin> minutes more than records' endTime.
expiredDurationInMin = 30
Expand Down Expand Up @@ -91,8 +92,12 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error {
return stacktrace.NewError("Remote ID database has not been bootstrapped with Schema Manager, Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas")
}

if currentMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentMajorSchemaVersion)
if s.db.Version.Type == datastore.CockroachDB && currentCrdbMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentCrdbMajorSchemaVersion)
}

if s.db.Version.Type == datastore.Yugabyte && currentYugabyteMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for remote ID! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#updgrading-database-schemas", vs, currentYugabyteMajorSchemaVersion)
}

return nil
Expand Down Expand Up @@ -121,7 +126,9 @@ func (s *Store) Transact(ctx context.Context, f func(repo repos.Repository) erro

ctx = crdb.WithMaxRetries(ctx, flags.ConnectParameters().MaxRetries)

return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{}, func(tx pgx.Tx) error {
return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{
IsoLevel: pgx.Serializable,
}, func(tx pgx.Tx) error {
// Is this recover still necessary?
defer recoverRollbackRepanic(ctx, tx)
return f(&repo{
Expand Down
6 changes: 3 additions & 3 deletions pkg/rid/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(ctx context.Context, cells s2.
// strict we could keep this count in memory, (or in some other storage).
var query = `
SELECT
IFNULL(MAX(subscriptions_per_cell_id), 0)
COALESCE(MAX(subscriptions_per_cell_id), 0)
FROM (
SELECT
COUNT(*) AS subscriptions_per_cell_id
Expand All @@ -96,11 +96,11 @@ func (r *repo) MaxSubscriptionCountInCellsByOwner(ctx context.Context, cells s2.
FROM subscriptions
WHERE owner = $1
AND ends_at >= $2
)
) as q1
WHERE
cell_id = ANY($3)
GROUP BY cell_id
)`
) as q2`

row := r.QueryRow(ctx, query, owner, r.clock.Now(), dssql.CellUnionToCellIds(cells))
var ret int
Expand Down
4 changes: 3 additions & 1 deletion pkg/scd/store/cockroach/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ func init() {
func (u *repo) UpsertUssAvailability(ctx context.Context, s *scdmodels.UssAvailabilityStatus) (*scdmodels.UssAvailabilityStatus, error) {
var (
upsertQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_uss_availability
(%s)
VALUES
($1, $2, transaction_timestamp())
ON CONFLICT (id) DO UPDATE
SET availability = $2, updated_at = transaction_timestamp()
RETURNING
%s`, availabilityFieldsWithoutPrefix, availabilityFieldsWithPrefix)
)
Expand Down
32 changes: 29 additions & 3 deletions pkg/scd/store/cockroach/constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,38 @@ func (c *repo) GetConstraint(ctx context.Context, id dssmodels.ID) (*scdmodels.C
func (c *repo) UpsertConstraint(ctx context.Context, s *scdmodels.Constraint) (*scdmodels.Constraint, error) {
var (
upsertQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_constraints
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp())
ON CONFLICT (%s) DO UPDATE
SET %s = $2,
%s = $3,
%s = $4,
%s = $5,
%s = $6,
%s = $7,
%s = $8,
%s = $9,
%s = transaction_timestamp()
WHERE scd_constraints.%s = $1
RETURNING
%s`, constraintFieldsWithoutPrefix, constraintFieldsWithPrefix)
%s`,
constraintFieldsWithoutPrefix,
constraintFieldsWithIndices[0],
constraintFieldsWithIndices[1],
constraintFieldsWithIndices[2],
constraintFieldsWithIndices[3],
constraintFieldsWithIndices[4],
constraintFieldsWithIndices[5],
constraintFieldsWithIndices[6],
constraintFieldsWithIndices[7],
constraintFieldsWithIndices[8],
constraintFieldsWithIndices[9],
constraintFieldsWithIndices[0],
constraintFieldsWithPrefix,
)
)

cids, err := dsssql.CellUnionToCellIdsWithValidation(s.Cells)
Expand Down Expand Up @@ -203,7 +228,8 @@ func (c *repo) SearchConstraints(ctx context.Context, v4d *dssmodels.Volume4D) (
COALESCE(starts_at <= $3, true)
AND
COALESCE(ends_at >= $2, true)
LIMIT $4`, constraintFieldsWithoutPrefix)
LIMIT $4;
`, constraintFieldsWithoutPrefix)
)

// TODO: Lazily calculate & cache spatial covering so that it is only ever
Expand Down
37 changes: 34 additions & 3 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,44 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err
func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
var (
upsertOperationsQuery = fmt.Sprintf(`
UPSERT INTO
INSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
ON CONFLICT (%s) DO UPDATE
SET %s = $2,
%s = $3,
%s = $4,
%s = $5,
%s = $6,
%s = $7,
%s = $8,
%s = $9,
%s = transaction_timestamp(),
%s = $10,
%s = $11,
%s = $12,
%s = $13
RETURNING
%s`,
operationFieldsWithoutPrefix,
operationFieldsWithIndices[0],
operationFieldsWithIndices[1],
operationFieldsWithIndices[2],
operationFieldsWithIndices[3],
operationFieldsWithIndices[4],
operationFieldsWithIndices[5],
operationFieldsWithIndices[6],
operationFieldsWithIndices[7],
operationFieldsWithIndices[8],
operationFieldsWithIndices[9],
operationFieldsWithIndices[10],
operationFieldsWithIndices[11],
operationFieldsWithIndices[12],
operationFieldsWithIndices[13],
operationFieldsWithPrefix,
)
)

cids := make([]int64, len(operation.Cells))
Expand Down
15 changes: 10 additions & 5 deletions pkg/scd/store/cockroach/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
)

const (
// currentMajorSchemaVersion is the current major schema version.
currentMajorSchemaVersion = 3
// The current major schema version per datastore type.
currentCrdbMajorSchemaVersion = 3
currentYugabyteMajorSchemaVersion = 1
)

var (
Expand Down Expand Up @@ -66,8 +67,12 @@ func (s *Store) CheckCurrentMajorSchemaVersion(ctx context.Context) error {
return stacktrace.NewError("Strategic conflict detection database has not been bootstrapped with Schema Manager, Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

if currentMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentMajorSchemaVersion)
if s.db.Version.Type == datastore.CockroachDB && currentCrdbMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentCrdbMajorSchemaVersion)
}

if s.db.Version.Type == datastore.Yugabyte && currentYugabyteMajorSchemaVersion != vs.Major {
return stacktrace.NewError("Unsupported schema version for strategic conflict detection! Got %s, requires major version of %d. Please check https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas", vs, currentYugabyteMajorSchemaVersion)
}

return nil
Expand All @@ -84,7 +89,7 @@ func (s *Store) Interact(_ context.Context) (repos.Repository, error) {
// Transact implements store.Transactor interface.
func (s *Store) Transact(ctx context.Context, f func(context.Context, repos.Repository) error) error {
ctx = crdb.WithMaxRetries(ctx, flags.ConnectParameters().MaxRetries)
return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{}, func(tx pgx.Tx) error {
return crdbpgx.ExecuteTx(ctx, s.db.Pool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
return f(ctx, &repo{
q: tx,
clock: s.clock,
Expand Down
Loading