From 078131ca295d233a7546fb92f14f42bfe01a939c Mon Sep 17 00:00:00 2001 From: Gyubong Lee Date: Thu, 7 Mar 2024 06:16:40 +0000 Subject: [PATCH] Replace janus queue with stdlib queue --- ...2c726d8a3_add_container_registry_tables.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ai/backend/manager/models/alembic/versions/1d42c726d8a3_add_container_registry_tables.py b/src/ai/backend/manager/models/alembic/versions/1d42c726d8a3_add_container_registry_tables.py index 458f6f2738..84ab83839b 100644 --- a/src/ai/backend/manager/models/alembic/versions/1d42c726d8a3_add_container_registry_tables.py +++ b/src/ai/backend/manager/models/alembic/versions/1d42c726d8a3_add_container_registry_tables.py @@ -12,9 +12,9 @@ import sys from concurrent.futures import ThreadPoolExecutor from itertools import groupby +from queue import Queue from typing import Any, Final, Mapping, cast -import janus import sqlalchemy as sa import trafaret as t from alembic import op @@ -69,7 +69,7 @@ def get_async_etcd() -> AsyncEtcd: def migrate_data_etcd_to_psql(): - queue = janus.Queue() + queue = Queue() with ThreadPoolExecutor() as executor: @@ -82,8 +82,9 @@ def backup(etcd_container_registries: Mapping[str, Any]): # If there are no container registries, it returns an empty list. # If an error occurs while saving backup, it returns error. - def take_etcd_container_registries(queue: janus.Queue): - async def _take_container_registries(etcd: AsyncEtcd): + def take_etcd_container_registries(queue: Queue): + async def _take_container_registries(): + etcd = get_async_etcd() result = await etcd.get_prefix(ETCD_CONTAINER_REGISTRY_KEY) try: backup(result) @@ -92,12 +93,11 @@ async def _take_container_registries(etcd: AsyncEtcd): await etcd.delete_prefix(ETCD_CONTAINER_REGISTRY_KEY) return result - etcd = get_async_etcd() - queue.sync_q.put(asyncio.run(_take_container_registries(etcd))) + queue.put(asyncio.run(_take_container_registries())) executor.submit(take_etcd_container_registries, queue) - maybe_registries = queue.sync_q.get() + maybe_registries = queue.get() if isinstance(maybe_registries, Exception): err_msg = ( @@ -189,20 +189,20 @@ def merge_items(items): merged_items = [merge_items(items) for items in grouped_items.values()] - def put_etcd_container_registries(merged_items: list[Any], queue: janus.Queue): + def put_etcd_container_registries(merged_items: list[Any], queue: Queue): etcd = get_async_etcd() for item in merged_items: hostname = item.pop("hostname") asyncio.run(etcd.put_prefix(f"{ETCD_CONTAINER_REGISTRY_KEY}/{hostname}", item)) - queue.sync_q.put(True) + queue.put(True) - queue = janus.Queue() + queue = Queue() with ThreadPoolExecutor() as executor: executor.submit(put_etcd_container_registries, merged_items, queue) - queue.sync_q.get() + queue.get() def upgrade():