Skip to content

Commit

Permalink
migrate
Browse files Browse the repository at this point in the history
  • Loading branch information
legnaleurc committed Dec 17, 2023
1 parent cd8a6c9 commit 930b064
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 117 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import sqlite3
from contextlib import closing, contextmanager
from logging import getLogger
from pathlib import Path

from wcpan.drive.core.drive import Node
from wcpan.drive.core.types import Node


DATABASE_PATH = "./data/_migrated.sqlite"


def initialize_cache():
with migration_cache() as query:
def initialize_cache(dsn: Path):
with migration_cache(dsn) as query:
try:
query.execute(
"""
Expand All @@ -30,84 +28,84 @@ def initialize_cache():
getLogger(__name__).exception("cache initialize failed")


def is_migrated(node: Node):
with migration_cache() as query:
def is_migrated(dsn: Path, node: Node):
with migration_cache(dsn) as query:
query.execute(
"""
SELECT id FROM migrated WHERE node_id = ? AND is_muxed = 1 AND is_coded = 1;
""",
(node.id_,),
(node.id,),
)
row = query.fetchone()
if not row:
return False
return True


def has_cache(node: Node):
with migration_cache() as query:
def has_cache(dsn: Path, node: Node):
with migration_cache(dsn) as query:
query.execute(
"""
SELECT id FROM migrated WHERE node_id = ?;
""",
(node.id_,),
(node.id,),
)
row = query.fetchone()
if not row:
return False
return True


def need_transcode(node: Node):
with migration_cache() as query:
def need_transcode(dsn: Path, node: Node):
with migration_cache(dsn) as query:
query.execute(
"""
SELECT id FROM migrated WHERE node_id = ? AND is_coded = 0;
""",
(node.id_,),
(node.id,),
)
row = query.fetchone()
if not row:
return False
return True


def set_cache(node: Node, is_muxed: bool, is_coded: bool):
with migration_cache() as query:
def set_cache(dsn: Path, node: Node, is_muxed: bool, is_coded: bool):
with migration_cache(dsn) as query:
query.execute(
"""
SELECT is_muxed, is_coded FROM migrated WHERE node_id = ?;
""",
(node.id_,),
(node.id,),
)
row = query.fetchone()
if not row:
query.execute(
"""
INSERT INTO migrated (node_id, is_muxed, is_coded) VALUES (?, ?, ?);
""",
(node.id_, is_muxed, is_coded),
(node.id, is_muxed, is_coded),
)
return
query.execute(
"""
UPDATE migrated SET is_muxed = ?, is_coded = ? WHERE node_id = ?;
""",
(is_muxed, is_coded, node.id_),
(is_muxed, is_coded, node.id),
)


def unset_cache(node: Node):
with migration_cache() as query:
def unset_cache(dsn: Path, node: Node):
with migration_cache(dsn) as query:
query.execute(
"""
DELETE FROM migrated WHERE node_id = ?;
""",
(node.id_,),
(node.id,),
)


@contextmanager
def migration_cache():
with sqlite3.connect(DATABASE_PATH) as db, closing(db.cursor()) as query:
def migration_cache(dsn: Path):
with sqlite3.connect(dsn) as db, closing(db.cursor()) as query:
yield query
97 changes: 55 additions & 42 deletions drive/legacy/faststart/app/main.py → drive/app/faststart/main.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,105 @@
#! /usr/bin/env python3

import argparse
import asyncio
import sys
from argparse import ArgumentParser, Namespace
from concurrent.futures import Executor
from contextlib import AsyncExitStack
from logging import getLogger
from logging.config import dictConfig
from pathlib import PurePath, Path
from tempfile import TemporaryDirectory

from wcpan.drive.core.drive import Drive, DriveFactory
from wcpan.drive.core.types import Node
from wcpan.drive.core.util import create_executor
from wcpan.drive.core.types import Node, Drive
from wcpan.drive.cli.lib import create_drive_from_config
from wcpan.logging import ConfigBuilder
from wcpan.queue import AioQueue

from app.lib import create_executor

from .cache import initialize_cache
from .processor import create_processor, is_oggmedia, is_realmedia


async def main(args: list[str] = None):
if args is None:
args = sys.argv

async def main(args: list[str] | None = None):
kwargs = parse_args(args)
root_path_list: list[str] = kwargs.root_path
config_path = Path(kwargs.config_path)
data_path = Path(kwargs.data_path)
root_path_list = [PurePath(_) for _ in kwargs.root_path]
remux_only: bool = kwargs.remux_only
transcode_only: bool = kwargs.transcode_only
cache_only: bool = kwargs.cache_only
jobs: int = kwargs.jobs
tmp_path: str | None = kwargs.tmp_path

initialize_cache()
data_path.mkdir(exist_ok=True, parents=True)

dsn = data_path / "_migrated.sqlite"
initialize_cache(dsn)
dictConfig(
ConfigBuilder(path="./data/migrate.log")
ConfigBuilder(path=data_path / "migrate.log")
.add("wcpan")
.add("app", level="D")
.to_dict()
)

factory = DriveFactory()
factory.load_config()

with create_executor() as pool, TemporaryDirectory(
dir=tmp_path
) as work_folder, AioQueue.fifo() as queue:
async with factory(pool=pool) as drive:
async for change in drive.sync():
getLogger(__name__).debug(change)

async for video_file in walk_root_list(drive, root_path_list):
await queue.push(
node_work(
video_file,
drive=drive,
work_folder=work_folder,
pool=pool,
remux_only=remux_only,
transcode_only=transcode_only,
cache_only=cache_only,
)
async with AsyncExitStack() as stack:
pool = stack.enter_context(create_executor())
work_folder = Path(stack.enter_context(TemporaryDirectory(dir=tmp_path)))
queue = stack.enter_context(AioQueue[None].fifo())
drive = await stack.enter_async_context(create_drive_from_config(config_path))

async for change in drive.sync():
getLogger(__name__).debug(change)

async for video_file in walk_root_list(drive, root_path_list):
await queue.push(
node_work(
video_file,
drive=drive,
work_folder=work_folder,
dsn=dsn,
pool=pool,
remux_only=remux_only,
transcode_only=transcode_only,
cache_only=cache_only,
)
)

await queue.consume(jobs)
await queue.consume(jobs)

return 0


def parse_args(args: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser("app")
def parse_args(args: list[str] | None) -> Namespace:
if args is None:
args = sys.argv

parser = ArgumentParser("app")

parser.add_argument("--config-path", required=True, type=str)
parser.add_argument("--data-path", required=True, type=str)
parser.add_argument("--tmp-path", type=str)
parser.add_argument("--jobs", "-j", default=1)

mutex_group = parser.add_mutually_exclusive_group()
mutex_group.add_argument("--remux-only", action="store_true", default=False)
mutex_group.add_argument("--transcode-only", action="store_true", default=False)
mutex_group.add_argument("--cache-only", action="store_true", default=False)

parser.add_argument("--jobs", "-j", type=int, default=1)
parser.add_argument("--tmp-path", type=str)

parser.add_argument("root_path", type=str, nargs="+")

return parser.parse_args(args[1:])


async def walk_root_list(drive: Drive, root_list: list[str]):
async def walk_root_list(drive: Drive, root_list: list[PurePath]):
for root_path in root_list:
root_node = await drive.get_node_by_path(root_path)
# TODO add log
if not root_node:
continue

async for root, folders, files in drive.walk(root_node):
async for _root, _folders, files in drive.walk(root_node):
for file_ in files:
if (
not file_.is_video
Expand All @@ -104,14 +114,17 @@ async def node_work(
node: Node,
*,
drive: Drive,
work_folder: str,
work_folder: Path,
dsn: Path,
pool: Executor,
remux_only: bool,
transcode_only: bool,
cache_only: bool,
):
getLogger(__name__).info(f"processing {node.name}")
processor = create_processor(work_folder, pool, drive, node)
processor = create_processor(
work_folder=work_folder, dsn=dsn, pool=pool, drive=drive, node=node
)
if not processor:
getLogger(__name__).warning(f"no processor for {node.name}")
return
Expand Down
Loading

0 comments on commit 930b064

Please sign in to comment.