Skip to content

Commit

Permalink
fix: Rewrite the vfolder status migration in #1892 (#1936)
Browse files Browse the repository at this point in the history
  • Loading branch information
achimnol authored Feb 28, 2024
1 parent 29e940b commit 1fec5cf
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,206 +10,121 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql as pgsql
from sqlalchemy.sql import text

from ai.backend.manager.models.base import EnumValueType, IDColumn, metadata

# revision identifiers, used by Alembic.
revision = "7ff52ff68bfc"
down_revision = "a5319bfc7d7c"
branch_labels = None
depends_on = None


BATCH_SIZE = 100

ENUM_NAME = "vfolderoperationstatus"


class VFolderOperationStatus(enum.StrEnum):
# New enum values
DELETE_PENDING = "delete-pending" # vfolder is in trash bin
DELETE_COMPLETE = "delete-complete" # vfolder is deleted permanently, only DB row remains
DELETE_ERROR = "delete-error"

# Legacy enum values
LEGACY_DELETE_COMPLETE = "deleted-complete"
PURGE_ONGOING = "purge-ongoing"

# Original enum values
DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted in storage
class OldVFolderOperationStatus(enum.StrEnum):
READY = "ready"
PERFORMING = "performing"
CLONING = "cloning"
MOUNTED = "mounted"
ERROR = "error"
DELETE_ONGOING = "delete-ongoing" # vfolder is being deleted
DELETE_COMPLETE = "deleted-complete" # vfolder is deleted
PURGE_ONGOING = "purge-ongoing" # vfolder is being removed permanently


vfolders = sa.Table(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)


def add_enum(enum_val: VFolderOperationStatus) -> None:
op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE IF NOT EXISTS '{str(enum_val)}'")


def delete_enum(enum_val: VFolderOperationStatus) -> None:
op.execute(
def upgrade() -> None:
conn = op.get_bind()
# Relax the colum type from enum to varchar(64).
conn.execute(
text("ALTER TABLE vfolders ALTER COLUMN status TYPE varchar(64) USING status::text;")
)
conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';"))
conn.execute(
text(
f"""DELETE FROM pg_enum
WHERE enumlabel = '{str(enum_val)}'
AND enumtypid = (
SELECT oid FROM pg_type WHERE typname = '{ENUM_NAME}'
)"""
"""\
UPDATE vfolders
SET status = CASE
WHEN status = 'deleted-complete' THEN 'delete-pending'
WHEN status = 'purge-ongoing' THEN 'delete-ongoing'
WHEN status = 'error' THEN 'delete-error'
ELSE status
END,
status_history = (
SELECT jsonb_object_agg(new_key, value)
FROM (
SELECT
CASE
WHEN key = 'deleted-complete' THEN 'delete-pending'
WHEN key = 'purge-ongoing' THEN 'delete-ongoing'
WHEN key = 'error' THEN 'delete-error'
ELSE key
END AS new_key,
value
FROM jsonb_each(status_history)
) AS subquery
);
"""
)
)


def update_legacy_to_new(
conn,
vfolder_t,
legacy_enum: VFolderOperationStatus,
new_enum: VFolderOperationStatus,
*,
legacy_enum_name: str | None = None,
new_enum_name: str | None = None,
) -> None:
_legacy_enum_name = legacy_enum_name or legacy_enum.name
_new_enum_name = new_enum_name or new_enum.name

while True:
stmt = (
sa.select([vfolder_t.c.id])
.where(
(vfolder_t.c.status == legacy_enum)
| (vfolder_t.c.status_history.has_key(_legacy_enum_name))
)
.limit(BATCH_SIZE)
)
result = conn.execute(stmt).fetchall()
vfolder_ids = [vf[0] for vf in result]

if not vfolder_ids:
break

# Update `status`
update_status = (
sa.update(vfolder_t)
.values({"status": new_enum})
.where((vfolder_t.c.id.in_(vfolder_ids)) & (vfolder_t.c.status == legacy_enum))
)
conn.execute(update_status)

# Update `status_history`
update_status_history = (
sa.update(vfolder_t)
.values({
"status_history": sa.func.jsonb_build_object(
_new_enum_name, vfolder_t.c.status_history.op("->>")(_legacy_enum_name)
)
+ vfolder_t.c.status_history.op("-")(_legacy_enum_name)
})
.where(
(vfolder_t.c.id.in_(vfolder_ids))
& (vfolder_t.c.status_history.has_key(_legacy_enum_name))
)
)
conn.execute(update_status_history)


def upgrade() -> None:
conn = op.get_bind()

add_enum(VFolderOperationStatus.DELETE_PENDING)
add_enum(VFolderOperationStatus.DELETE_COMPLETE)
add_enum(VFolderOperationStatus.DELETE_ERROR)
conn.commit()

vfolders = sa.Table(
conn.execute(text("DROP TYPE vfolderoperationstatus;"))
op.add_column(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
"status_changed",
sa.DateTime(timezone=True),
nullable=True,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)

update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.PURGE_ONGOING, VFolderOperationStatus.DELETE_ONGOING
)
update_legacy_to_new(
conn,
vfolders,
VFolderOperationStatus.LEGACY_DELETE_COMPLETE,
VFolderOperationStatus.DELETE_PENDING,
legacy_enum_name="DELETE_COMPLETE",
)

delete_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE)
delete_enum(VFolderOperationStatus.PURGE_ONGOING)

op.add_column(
"vfolders", sa.Column("status_changed", sa.DateTime(timezone=True), nullable=True)
)
op.create_index(
op.f("ix_vfolders_status_changed"), "vfolders", ["status_changed"], unique=False
op.f("ix_vfolders_status_changed"),
"vfolders",
["status_changed"],
unique=False,
)
conn.commit()


def downgrade() -> None:
conn = op.get_bind()

add_enum(VFolderOperationStatus.LEGACY_DELETE_COMPLETE)
add_enum(VFolderOperationStatus.PURGE_ONGOING)
conn.commit()

vfolders = sa.Table(
"vfolders",
metadata,
IDColumn("id"),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
nullable=False,
),
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
extend_existing=True,
)

update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_COMPLETE, VFolderOperationStatus.PURGE_ONGOING
) # `deleted` vfolders are not in DB rows in this downgraded version
update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_ONGOING, VFolderOperationStatus.PURGE_ONGOING
conn.execute(
text(
"""\
UPDATE vfolders
SET status = CASE
WHEN status = 'delete-pending' THEN 'deleted-complete'
WHEN status = 'delete-complete' THEN 'purge-ongoing'
WHEN status = 'delete-ongoing' THEN 'purge-ongoing'
WHEN status = 'delete-error' THEN 'error'
ELSE status
END,
status_history = (
SELECT jsonb_object_agg(new_key, value)
FROM (
SELECT
CASE
WHEN key = 'delete-pending' THEN 'deleted-complete'
WHEN key = 'delete-complete' THEN 'purge-ongoing'
WHEN key = 'delete-ongoing' THEN 'purge-ongoing'
WHEN key = 'delete-error' THEN 'error'
ELSE key
END AS new_key,
value
FROM jsonb_each(status_history)
) AS subquery
);
"""
)
)
update_legacy_to_new(
conn, vfolders, VFolderOperationStatus.DELETE_ERROR, VFolderOperationStatus.ERROR
conn.execute(
text(
"CREATE TYPE vfolderoperationstatus AS ENUM (%s)"
% (",".join(f"'{choice.value}'" for choice in OldVFolderOperationStatus))
)
)
update_legacy_to_new(
conn,
vfolders,
VFolderOperationStatus.DELETE_PENDING,
VFolderOperationStatus.LEGACY_DELETE_COMPLETE,
new_enum_name="DELETE_COMPLETE",
conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status DROP DEFAULT;"))
conn.execute(
text(
"ALTER TABLE vfolders ALTER COLUMN status TYPE vfolderoperationstatus "
"USING status::vfolderoperationstatus;"
)
)

delete_enum(VFolderOperationStatus.DELETE_PENDING)
delete_enum(VFolderOperationStatus.DELETE_COMPLETE)
delete_enum(VFolderOperationStatus.DELETE_ERROR)

conn.execute(text("ALTER TABLE vfolders ALTER COLUMN status SET DEFAULT 'ready';"))
op.drop_index(op.f("ix_vfolders_status_changed"), table_name="vfolders")
op.drop_column("vfolders", "status_changed")

conn.commit()
38 changes: 37 additions & 1 deletion src/ai/backend/manager/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ class FixtureOpModes(enum.StrEnum):
UPDATE = "update"


T_Enum = TypeVar("T_Enum", bound=enum.Enum)
T_Enum = TypeVar("T_Enum", bound=enum.Enum, covariant=True)
T_StrEnum = TypeVar("T_StrEnum", bound=enum.Enum, covariant=True)


class EnumType(TypeDecorator, SchemaType, Generic[T_Enum]):
Expand Down Expand Up @@ -205,6 +206,41 @@ def python_type(self) -> T_Enum:
return self._enum_class


class StrEnumType(TypeDecorator, Generic[T_StrEnum]):
"""
Maps Postgres VARCHAR(64) column with a Python enum.StrEnum type.
"""

impl = sa.VARCHAR
cache_ok = True

def __init__(self, enum_cls: type[T_StrEnum], **opts) -> None:
self._opts = opts
super().__init__(length=64, **opts)
self._enum_cls = enum_cls

def process_bind_param(
self,
value: Optional[T_StrEnum],
dialect: Dialect,
) -> Optional[str]:
return value.value if value is not None else None

def process_result_value(
self,
value: str,
dialect: Dialect,
) -> Optional[T_StrEnum]:
return self._enum_cls(value) if value is not None else None

def copy(self, **kw) -> type[Self]:
return StrEnumType(self._enum_cls, **self._opts)

@property
def python_type(self) -> T_StrEnum:
return self._enum_class


class CurvePublicKeyColumn(TypeDecorator):
"""
A column type wrapper for string-based Z85-encoded CURVE public key.
Expand Down
7 changes: 4 additions & 3 deletions src/ai/backend/manager/models/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
OrderExprArg,
PaginatedList,
QuotaScopeIDType,
StrEnumType,
batch_multiresult,
generate_sql_info_for_gql_connection,
metadata,
Expand Down Expand Up @@ -253,9 +254,9 @@ class VFolderCloneInfo(NamedTuple):
sa.Column("cloneable", sa.Boolean, default=False, nullable=False),
sa.Column(
"status",
EnumValueType(VFolderOperationStatus),
StrEnumType(VFolderOperationStatus),
default=VFolderOperationStatus.READY,
server_default=VFolderOperationStatus.READY.value,
server_default=VFolderOperationStatus.READY,
nullable=False,
),
# status_history records the most recent status changes for each status
Expand Down Expand Up @@ -1207,7 +1208,7 @@ async def resolve_num_files(self, info: graphene.ResolveInfo) -> int:
"cloneable": ("vfolders_cloneable", None),
"status": (
"vfolders_status",
enum_field_getter(VFolderOperationStatus),
lambda s: VFolderOperationStatus(s),
),
}

Expand Down

0 comments on commit 1fec5cf

Please sign in to comment.