From 1fec5cfd8b10064ec1eb90cec8ce3163a0fd19de Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 28 Feb 2024 17:01:40 +0900 Subject: [PATCH] fix: Rewrite the vfolder status migration in #1892 (#1936) --- ...2ff68bfc_detail_vfolder_deletion_status.py | 261 ++++++------------ src/ai/backend/manager/models/base.py | 38 ++- src/ai/backend/manager/models/vfolder.py | 7 +- 3 files changed, 129 insertions(+), 177 deletions(-) diff --git a/src/ai/backend/manager/models/alembic/versions/7ff52ff68bfc_detail_vfolder_deletion_status.py b/src/ai/backend/manager/models/alembic/versions/7ff52ff68bfc_detail_vfolder_deletion_status.py index ce02631f4d..1ef890e631 100644 --- a/src/ai/backend/manager/models/alembic/versions/7ff52ff68bfc_detail_vfolder_deletion_status.py +++ b/src/ai/backend/manager/models/alembic/versions/7ff52ff68bfc_detail_vfolder_deletion_status.py @@ -10,11 +10,8 @@ import sqlalchemy as sa from alembic import op -from sqlalchemy.dialects import postgresql as pgsql from sqlalchemy.sql import text -from ai.backend.manager.models.base import EnumValueType, IDColumn, metadata - # revision identifiers, used by Alembic. revision = "7ff52ff68bfc" down_revision = "a5319bfc7d7c" @@ -22,194 +19,112 @@ depends_on = None -BATCH_SIZE = 100 - -ENUM_NAME = "vfolderoperationstatus" - - -class VFolderOperationStatus(enum.StrEnum): - # New enum values - DELETE_PENDING = "delete-pending" # vfolder is in trash bin - DELETE_COMPLETE = "delete-complete" # vfolder is deleted permanently, only DB row remains - DELETE_ERROR = "delete-error" - - # Legacy enum values - LEGACY_DELETE_COMPLETE = "deleted-complete" - PURGE_ONGOING = "purge-ongoing" - - # Original enum values - DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted in storage +class OldVFolderOperationStatus(enum.StrEnum): + READY = "ready" + PERFORMING = "performing" + CLONING = "cloning" + MOUNTED = "mounted" ERROR = "error" + DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted + DELETE_COMPLETE = "deleted-complete" # vfolder is deleted + PURGE_ONGOING = "purge-ongoing" # vfolder is being removed permanently -vfolders = sa.Table( - "vfolders", - metadata, - IDColumn("id"), - sa.Column( - "status", - EnumValueType(VFolderOperationStatus), - nullable=False, - ), - sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()), - extend_existing=True, -) - - -def add_enum(enum_val: VFolderOperationStatus) -> None: - op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE IF NOT EXISTS '{str(enum_val)}'") - - -def delete_enum(enum_val: VFolderOperationStatus) -> None: - op.execute( +def upgrade() -> None: + conn = op.get_bind() + # Relax the colum type from enum to varchar(64). + conn.execute( + text("ALTER TABLE vfolders ALTER COLUMN status TYPE varchar(64) USING status::text;") + ) + conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';")) + conn.execute( text( - f"""DELETE FROM pg_enum - WHERE enumlabel = '{str(enum_val)}' - AND enumtypid = ( - SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}' - )""" + """\ + UPDATE vfolders + SET status = CASE + WHEN status = 'deleted-complete' THEN 'delete-pending' + WHEN status = 'purge-ongoing' THEN 'delete-ongoing' + WHEN status = 'error' THEN 'delete-error' + ELSE status + END, + status_history = ( + SELECT jsonb_object_agg(new_key, value) + FROM ( + SELECT + CASE + WHEN key = 'deleted-complete' THEN 'delete-pending' + WHEN key = 'purge-ongoing' THEN 'delete-ongoing' + WHEN key = 'error' THEN 'delete-error' + ELSE key + END AS new_key, + value + FROM jsonb_each(status_history) + ) AS subquery + ); + """ ) ) - - -def update_legacy_to_new( - conn, - vfolder_t, - legacy_enum: VFolderOperationStatus, - new_enum: VFolderOperationStatus, - *, - legacy_enum_name: str | None = None, - new_enum_name: str | None = None, -) -> None: - _legacy_enum_name = legacy_enum_name or legacy_enum.name - _new_enum_name = new_enum_name or new_enum.name - - while True: - stmt = ( - sa.select([vfolder_t.c.id]) - .where( - (vfolder_t.c.status == legacy_enum) - | (vfolder_t.c.status_history.has_key(_legacy_enum_name)) - ) - .limit(BATCH_SIZE) - ) - result = conn.execute(stmt).fetchall() - vfolder_ids = [vf[0] for vf in result] - - if not vfolder_ids: - break - - # Update `status` - update_status = ( - sa.update(vfolder_t) - .values({"status": new_enum}) - .where((vfolder_t.c.id.in_(vfolder_ids)) & (vfolder_t.c.status == legacy_enum)) - ) - conn.execute(update_status) - - # Update `status_history` - update_status_history = ( - sa.update(vfolder_t) - .values({ - "status_history": sa.func.jsonb_build_object( - _new_enum_name, vfolder_t.c.status_history.op("->>")(_legacy_enum_name) - ) - + vfolder_t.c.status_history.op("-")(_legacy_enum_name) - }) - .where( - (vfolder_t.c.id.in_(vfolder_ids)) - & (vfolder_t.c.status_history.has_key(_legacy_enum_name)) - ) - ) - conn.execute(update_status_history) - - -def upgrade() -> None: - conn = op.get_bind() - - add_enum(VFolderOperationStatus.DELETE_PENDING) - add_enum(VFolderOperationStatus.DELETE_COMPLETE) - add_enum(VFolderOperationStatus.DELETE_ERROR) - conn.commit() - - vfolders = sa.Table( + conn.execute(text("DROP TYPE vfolderoperationstatus;")) + op.add_column( "vfolders", - metadata, - IDColumn("id"), sa.Column( - "status", - EnumValueType(VFolderOperationStatus), - nullable=False, + "status_changed", + sa.DateTime(timezone=True), + nullable=True, ), - sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()), - extend_existing=True, - ) - - update_legacy_to_new( - conn, vfolders, VFolderOperationStatus.PURGE_ONGOING, VFolderOperationStatus.DELETE_ONGOING - ) - update_legacy_to_new( - conn, - vfolders, - VFolderOperationStatus.LEGACY_DELETE_COMPLETE, - VFolderOperationStatus.DELETE_PENDING, - legacy_enum_name="DELETE_COMPLETE", - ) - - delete_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE) - delete_enum(VFolderOperationStatus.PURGE_ONGOING) - - op.add_column( - "vfolders", sa.Column("status_changed", sa.DateTime(timezone=True), nullable=True) ) op.create_index( - op.f("ix_vfolders_status_changed"), "vfolders", ["status_changed"], unique=False + op.f("ix_vfolders_status_changed"), + "vfolders", + ["status_changed"], + unique=False, ) - conn.commit() def downgrade() -> None: conn = op.get_bind() - - add_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE) - add_enum(VFolderOperationStatus.PURGE_ONGOING) - conn.commit() - - vfolders = sa.Table( - "vfolders", - metadata, - IDColumn("id"), - sa.Column( - "status", - EnumValueType(VFolderOperationStatus), - nullable=False, - ), - sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()), - extend_existing=True, - ) - - update_legacy_to_new( - conn, vfolders, VFolderOperationStatus.DELETE_COMPLETE, VFolderOperationStatus.PURGE_ONGOING - ) # `deleted` vfolders are not in DB rows in this downgraded version - update_legacy_to_new( - conn, vfolders, VFolderOperationStatus.DELETE_ONGOING, VFolderOperationStatus.PURGE_ONGOING + conn.execute( + text( + """\ + UPDATE vfolders + SET status = CASE + WHEN status = 'delete-pending' THEN 'deleted-complete' + WHEN status = 'delete-complete' THEN 'purge-ongoing' + WHEN status = 'delete-ongoing' THEN 'purge-ongoing' + WHEN status = 'delete-error' THEN 'error' + ELSE status + END, + status_history = ( + SELECT jsonb_object_agg(new_key, value) + FROM ( + SELECT + CASE + WHEN key = 'delete-pending' THEN 'deleted-complete' + WHEN key = 'delete-complete' THEN 'purge-ongoing' + WHEN key = 'delete-ongoing' THEN 'purge-ongoing' + WHEN key = 'delete-error' THEN 'error' + ELSE key + END AS new_key, + value + FROM jsonb_each(status_history) + ) AS subquery + ); + """ + ) ) - update_legacy_to_new( - conn, vfolders, VFolderOperationStatus.DELETE_ERROR, VFolderOperationStatus.ERROR + conn.execute( + text( + "CREATE TYPE vfolderoperationstatus AS ENUM (%s)" + % (",".join(f"'{choice.value}'" for choice in OldVFolderOperationStatus)) + ) ) - update_legacy_to_new( - conn, - vfolders, - VFolderOperationStatus.DELETE_PENDING, - VFolderOperationStatus.LEGACY_DELETE_COMPLETE, - new_enum_name="DELETE_COMPLETE", + conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status DROP DEFAULT;")) + conn.execute( + text( + "ALTER TABLE vfolders ALTER COLUMN status TYPE vfolderoperationstatus " + "USING status::vfolderoperationstatus;" + ) ) - - delete_enum(VFolderOperationStatus.DELETE_PENDING) - delete_enum(VFolderOperationStatus.DELETE_COMPLETE) - delete_enum(VFolderOperationStatus.DELETE_ERROR) - + conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';")) op.drop_index(op.f("ix_vfolders_status_changed"), table_name="vfolders") op.drop_column("vfolders", "status_changed") - - conn.commit() diff --git a/src/ai/backend/manager/models/base.py b/src/ai/backend/manager/models/base.py index 2767c008f9..77730f1ad0 100644 --- a/src/ai/backend/manager/models/base.py +++ b/src/ai/backend/manager/models/base.py @@ -120,7 +120,8 @@ class FixtureOpModes(enum.StrEnum): UPDATE = "update" -T_Enum = TypeVar("T_Enum", bound=enum.Enum) +T_Enum = TypeVar("T_Enum", bound=enum.Enum, covariant=True) +T_StrEnum = TypeVar("T_StrEnum", bound=enum.Enum, covariant=True) class EnumType(TypeDecorator, SchemaType, Generic[T_Enum]): @@ -205,6 +206,41 @@ def python_type(self) -> T_Enum: return self._enum_class +class StrEnumType(TypeDecorator, Generic[T_StrEnum]): + """ + Maps Postgres VARCHAR(64) column with a Python enum.StrEnum type. + """ + + impl = sa.VARCHAR + cache_ok = True + + def __init__(self, enum_cls: type[T_StrEnum], **opts) -> None: + self._opts = opts + super().__init__(length=64, **opts) + self._enum_cls = enum_cls + + def process_bind_param( + self, + value: Optional[T_StrEnum], + dialect: Dialect, + ) -> Optional[str]: + return value.value if value is not None else None + + def process_result_value( + self, + value: str, + dialect: Dialect, + ) -> Optional[T_StrEnum]: + return self._enum_cls(value) if value is not None else None + + def copy(self, **kw) -> type[Self]: + return StrEnumType(self._enum_cls, **self._opts) + + @property + def python_type(self) -> T_StrEnum: + return self._enum_class + + class CurvePublicKeyColumn(TypeDecorator): """ A column type wrapper for string-based Z85-encoded CURVE public key. diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index be106e3d51..f5bce4d780 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -57,6 +57,7 @@ OrderExprArg, PaginatedList, QuotaScopeIDType, + StrEnumType, batch_multiresult, generate_sql_info_for_gql_connection, metadata, @@ -253,9 +254,9 @@ class VFolderCloneInfo(NamedTuple): sa.Column("cloneable", sa.Boolean, default=False, nullable=False), sa.Column( "status", - EnumValueType(VFolderOperationStatus), + StrEnumType(VFolderOperationStatus), default=VFolderOperationStatus.READY, - server_default=VFolderOperationStatus.READY.value, + server_default=VFolderOperationStatus.READY, nullable=False, ), # status_history records the most recent status changes for each status @@ -1207,7 +1208,7 @@ async def resolve_num_files(self, info: graphene.ResolveInfo) -> int: "cloneable": ("vfolders_cloneable", None), "status": ( "vfolders_status", - enum_field_getter(VFolderOperationStatus), + lambda s: VFolderOperationStatus(s), ), }