Skip to content

Commit

Permalink
Replace janus queue with stdlib queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Mar 7, 2024
1 parent e9ef02b commit 078131c
Showing 1 changed file with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import sys
from concurrent.futures import ThreadPoolExecutor
from itertools import groupby
from queue import Queue
from typing import Any, Final, Mapping, cast

import janus
import sqlalchemy as sa
import trafaret as t
from alembic import op
Expand Down Expand Up @@ -69,7 +69,7 @@ def get_async_etcd() -> AsyncEtcd:


def migrate_data_etcd_to_psql():
queue = janus.Queue()
queue = Queue()

with ThreadPoolExecutor() as executor:

Expand All @@ -82,8 +82,9 @@ def backup(etcd_container_registries: Mapping[str, Any]):

# If there are no container registries, it returns an empty list.
# If an error occurs while saving backup, it returns error.
def take_etcd_container_registries(queue: janus.Queue):
async def _take_container_registries(etcd: AsyncEtcd):
def take_etcd_container_registries(queue: Queue):
async def _take_container_registries():
etcd = get_async_etcd()
result = await etcd.get_prefix(ETCD_CONTAINER_REGISTRY_KEY)
try:
backup(result)
Expand All @@ -92,12 +93,11 @@ async def _take_container_registries(etcd: AsyncEtcd):
await etcd.delete_prefix(ETCD_CONTAINER_REGISTRY_KEY)
return result

etcd = get_async_etcd()
queue.sync_q.put(asyncio.run(_take_container_registries(etcd)))
queue.put(asyncio.run(_take_container_registries()))

executor.submit(take_etcd_container_registries, queue)

maybe_registries = queue.sync_q.get()
maybe_registries = queue.get()

if isinstance(maybe_registries, Exception):
err_msg = (
Expand Down Expand Up @@ -189,20 +189,20 @@ def merge_items(items):

merged_items = [merge_items(items) for items in grouped_items.values()]

def put_etcd_container_registries(merged_items: list[Any], queue: janus.Queue):
def put_etcd_container_registries(merged_items: list[Any], queue: Queue):
etcd = get_async_etcd()
for item in merged_items:
hostname = item.pop("hostname")
asyncio.run(etcd.put_prefix(f"{ETCD_CONTAINER_REGISTRY_KEY}/{hostname}", item))

queue.sync_q.put(True)
queue.put(True)

queue = janus.Queue()
queue = Queue()

with ThreadPoolExecutor() as executor:
executor.submit(put_etcd_container_registries, merged_items, queue)

queue.sync_q.get()
queue.get()


def upgrade():
Expand Down

0 comments on commit 078131c

Please sign in to comment.