Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Rewrite the vfolder status migration in #1892 #1936

Merged
merged 2 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,206 +10,121 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql as pgsql
from sqlalchemy.sql import text

from ai.backend.manager.models.base import EnumValueType, IDColumn, metadata

# revision identifiers, used by Alembic.
revision = "7ff52ff68bfc"
down_revision = "a5319bfc7d7c"
branch_labels = None
depends_on = None


BATCH_SIZE = 100

ENUM_NAME = "vfolderoperationstatus"


class VFolderOperationStatus(enum.StrEnum):
# New enum values
DELETE_PENDING = "delete-pending" # vfolder is in trash bin
DELETE_COMPLETE = "delete-complete" # vfolder is deleted permanently, only DB row remains
DELETE_ERROR = "delete-error"

# Legacy enum values
LEGACY_DELETE_COMPLETE = "deleted-complete"
PURGE_ONGOING = "purge-ongoing"

# Original enum values
DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted in storage
class OldVFolderOperationStatus(enum.StrEnum):
READY = "ready"
PERFORMING = "performing"
CLONING = "cloning"
MOUNTED = "mounted"
ERROR = "error"
DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted
DELETE_COMPLETE = "deleted-complete" # vfolder is deleted
PURGE_ONGOING = "purge-ongoing" # vfolder is being removed permanently


vfolders = sa.Table(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)


def add_enum(enum_val: VFolderOperationStatus) -> None:
op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE IF NOT EXISTS '{str(enum_val)}'")


def delete_enum(enum_val: VFolderOperationStatus) -> None:
op.execute(
def upgrade() -> None:
conn = op.get_bind()
# Relax the colum type from enum to varchar(64).
conn.execute(
text("ALTER TABLE vfolders ALTER COLUMN status TYPE varchar(64) USING status::text;")
)
conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';"))
conn.execute(
text(
f"""DELETE FROM pg_enum
WHERE enumlabel = '{str(enum_val)}'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}'
)"""
"""\
UPDATE vfolders
SET status = CASE
WHEN status = 'deleted-complete' THEN 'delete-pending'
WHEN status = 'purge-ongoing' THEN 'delete-ongoing'
WHEN status = 'error' THEN 'delete-error'
ELSE status
END,
status_history = (
SELECT jsonb_object_agg(new_key, value)
FROM (
SELECT
CASE
WHEN key = 'deleted-complete' THEN 'delete-pending'
WHEN key = 'purge-ongoing' THEN 'delete-ongoing'
WHEN key = 'error' THEN 'delete-error'
ELSE key
END AS new_key,
value
FROM jsonb_each(status_history)
) AS subquery
);
"""
)
)


def update_legacy_to_new(
conn,
vfolder_t,
legacy_enum: VFolderOperationStatus,
new_enum: VFolderOperationStatus,
*,
legacy_enum_name: str | None = None,
new_enum_name: str | None = None,
) -> None:
_legacy_enum_name = legacy_enum_name or legacy_enum.name
_new_enum_name = new_enum_name or new_enum.name

while True:
stmt = (
sa.select([vfolder_t.c.id])
.where(
(vfolder_t.c.status == legacy_enum)
| (vfolder_t.c.status_history.has_key(_legacy_enum_name))
)
.limit(BATCH_SIZE)
)
result = conn.execute(stmt).fetchall()
vfolder_ids = [vf[0] for vf in result]

if not vfolder_ids:
break

# Update `status`
update_status = (
sa.update(vfolder_t)
.values({"status": new_enum})
.where((vfolder_t.c.id.in_(vfolder_ids)) & (vfolder_t.c.status == legacy_enum))
)
conn.execute(update_status)

# Update `status_history`
update_status_history = (
sa.update(vfolder_t)
.values({
"status_history": sa.func.jsonb_build_object(
_new_enum_name, vfolder_t.c.status_history.op("->>")(_legacy_enum_name)
)
+ vfolder_t.c.status_history.op("-")(_legacy_enum_name)
})
.where(
(vfolder_t.c.id.in_(vfolder_ids))
& (vfolder_t.c.status_history.has_key(_legacy_enum_name))
)
)
conn.execute(update_status_history)


def upgrade() -> None:
conn = op.get_bind()

add_enum(VFolderOperationStatus.DELETE_PENDING)
add_enum(VFolderOperationStatus.DELETE_COMPLETE)
add_enum(VFolderOperationStatus.DELETE_ERROR)
conn.commit()

vfolders = sa.Table(
conn.execute(text("DROP TYPE vfolderoperationstatus;"))
op.add_column(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
"status_changed",
sa.DateTime(timezone=True),
nullable=True,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)

update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.PURGE_ONGOING, VFolderOperationStatus.DELETE_ONGOING
)
update_legacy_to_new(
conn,
vfolders,
VFolderOperationStatus.LEGACY_DELETE_COMPLETE,
VFolderOperationStatus.DELETE_PENDING,
legacy_enum_name="DELETE_COMPLETE",
)

delete_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE)
delete_enum(VFolderOperationStatus.PURGE_ONGOING)

op.add_column(
"vfolders", sa.Column("status_changed", sa.DateTime(timezone=True), nullable=True)
)
op.create_index(
op.f("ix_vfolders_status_changed"), "vfolders", ["status_changed"], unique=False
op.f("ix_vfolders_status_changed"),
"vfolders",
["status_changed"],
unique=False,
)
conn.commit()


def downgrade() -> None:
conn = op.get_bind()

add_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE)
add_enum(VFolderOperationStatus.PURGE_ONGOING)
conn.commit()

vfolders = sa.Table(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)

update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_COMPLETE, VFolderOperationStatus.PURGE_ONGOING
) # `deleted` vfolders are not in DB rows in this downgraded version
update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_ONGOING, VFolderOperationStatus.PURGE_ONGOING
conn.execute(
text(
"""\
UPDATE vfolders
SET status = CASE
WHEN status = 'delete-pending' THEN 'deleted-complete'
WHEN status = 'delete-complete' THEN 'purge-ongoing'
WHEN status = 'delete-ongoing' THEN 'purge-ongoing'
WHEN status = 'delete-error' THEN 'error'
ELSE status
END,
status_history = (
SELECT jsonb_object_agg(new_key, value)
FROM (
SELECT
CASE
WHEN key = 'delete-pending' THEN 'deleted-complete'
WHEN key = 'delete-complete' THEN 'purge-ongoing'
WHEN key = 'delete-ongoing' THEN 'purge-ongoing'
WHEN key = 'delete-error' THEN 'error'
ELSE key
END AS new_key,
value
FROM jsonb_each(status_history)
) AS subquery
);
"""
)
)
update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_ERROR, VFolderOperationStatus.ERROR
conn.execute(
text(
"CREATE TYPE vfolderoperationstatus AS ENUM (%s)"
% (",".join(f"'{choice.value}'" for choice in OldVFolderOperationStatus))
)
)
update_legacy_to_new(
conn,
vfolders,
VFolderOperationStatus.DELETE_PENDING,
VFolderOperationStatus.LEGACY_DELETE_COMPLETE,
new_enum_name="DELETE_COMPLETE",
conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status DROP DEFAULT;"))
conn.execute(
text(
"ALTER TABLE vfolders ALTER COLUMN status TYPE vfolderoperationstatus "
"USING status::vfolderoperationstatus;"
)
)

delete_enum(VFolderOperationStatus.DELETE_PENDING)
delete_enum(VFolderOperationStatus.DELETE_COMPLETE)
delete_enum(VFolderOperationStatus.DELETE_ERROR)

conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';"))
op.drop_index(op.f("ix_vfolders_status_changed"), table_name="vfolders")
op.drop_column("vfolders", "status_changed")

conn.commit()
38 changes: 37 additions & 1 deletion src/ai/backend/manager/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class FixtureOpModes(enum.StrEnum):
UPDATE = "update"


T_Enum = TypeVar("T_Enum", bound=enum.Enum)
T_Enum = TypeVar("T_Enum", bound=enum.Enum, covariant=True)
T_StrEnum = TypeVar("T_StrEnum", bound=enum.Enum, covariant=True)


class EnumType(TypeDecorator, SchemaType, Generic[T_Enum]):
Expand Down Expand Up @@ -205,6 +206,41 @@ def python_type(self) -> T_Enum:
return self._enum_class


class StrEnumType(TypeDecorator, Generic[T_StrEnum]):
"""
Maps Postgres VARCHAR(64) column with a Python enum.StrEnum type.
"""

impl = sa.VARCHAR
cache_ok = True

def __init__(self, enum_cls: type[T_StrEnum], **opts) -> None:
self._opts = opts
super().__init__(length=64, **opts)
self._enum_cls = enum_cls

def process_bind_param(
self,
value: Optional[T_StrEnum],
dialect: Dialect,
) -> Optional[str]:
return value.value if value is not None else None

def process_result_value(
self,
value: str,
dialect: Dialect,
) -> Optional[T_StrEnum]:
return self._enum_cls(value) if value is not None else None

def copy(self, **kw) -> type[Self]:
return StrEnumType(self._enum_cls, **self._opts)

@property
def python_type(self) -> T_StrEnum:
return self._enum_class


class CurvePublicKeyColumn(TypeDecorator):
"""
A column type wrapper for string-based Z85-encoded CURVE public key.
Expand Down
7 changes: 4 additions & 3 deletions src/ai/backend/manager/models/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
OrderExprArg,
PaginatedList,
QuotaScopeIDType,
StrEnumType,
batch_multiresult,
generate_sql_info_for_gql_connection,
metadata,
Expand Down Expand Up @@ -253,9 +254,9 @@ class VFolderCloneInfo(NamedTuple):
sa.Column("cloneable", sa.Boolean, default=False, nullable=False),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
StrEnumType(VFolderOperationStatus),
default=VFolderOperationStatus.READY,
server_default=VFolderOperationStatus.READY.value,
server_default=VFolderOperationStatus.READY,
nullable=False,
),
# status_history records the most recent status changes for each status
Expand Down Expand Up @@ -1207,7 +1208,7 @@ async def resolve_num_files(self, info: graphene.ResolveInfo) -> int:
"cloneable": ("vfolders_cloneable", None),
"status": (
"vfolders_status",
enum_field_getter(VFolderOperationStatus),
lambda s: VFolderOperationStatus(s),
),
}

Expand Down
Loading