From 6aa82c01091e3a3c21bb458958a7094a3d16e58f Mon Sep 17 00:00:00 2001 From: EdgeNeko Date: Thu, 4 Jul 2024 21:13:51 +0800 Subject: [PATCH] Rebuild local utilities --- app/Controllers/admin.py | 20 +++-- app/Models/errors.py | 10 +++ app/Services/index_service.py | 7 +- app/Services/storage/local_storage.py | 17 ++-- app/Services/storage/s3_compatible_storage.py | 3 +- app/Services/upload_service.py | 23 ++++- app/util/local_file_utility.py | 12 +++ main.py | 2 +- readme.md | 40 +++++---- scripts/local_indexing.py | 86 +++++-------------- 10 files changed, 108 insertions(+), 112 deletions(-) create mode 100644 app/Models/errors.py create mode 100644 app/util/local_file_utility.py diff --git a/app/Controllers/admin.py b/app/Controllers/admin.py index 97dc420..b4c3416 100644 --- a/app/Controllers/admin.py +++ b/app/Controllers/admin.py @@ -13,12 +13,14 @@ from app.Models.api_response.admin_api_response import ServerInfoResponse, ImageUploadResponse, \ DuplicateValidationResponse from app.Models.api_response.base import NekoProtocol +from app.Models.errors import PointDuplicateError from app.Models.img_data import ImageData from app.Services.authentication import force_admin_token_verify from app.Services.provider import ServiceProvider from app.Services.vector_db_context import PointNotFoundError from app.config import config -from app.util.generate_uuid import generate_uuid, generate_uuid_from_sha1 +from app.util.generate_uuid import generate_uuid_from_sha1 +from app.util.local_file_utility import VALID_IMAGE_EXTENSIONS admin_router = APIRouter(dependencies=[Depends(force_admin_token_verify)], tags=["Admin"]) @@ -106,19 +108,19 @@ async def upload_image(image_file: Annotated[UploadFile, File(description="The i img_type = IMAGE_MIMES[image_file.content_type.lower()] elif image_file.filename: extension = PurePath(image_file.filename).suffix.lower() - if extension in {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'}: + if extension in VALID_IMAGE_EXTENSIONS: img_type = extension[1:] if not img_type: logger.warning("Failed to infer image format of the uploaded image. Content Type: {}, Filename: {}", image_file.content_type, image_file.filename) raise HTTPException(415, "Unsupported image format.") img_bytes = await image_file.read() - img_id = generate_uuid(img_bytes) - if img_id in services.upload_service.uploading_ids or len( - await services.db_context.validate_ids([str(img_id)])) != 0: # check for duplicate points - logger.warning("Duplicate upload request for image id: {}", img_id) - raise HTTPException(409, f"The uploaded point is already contained in the database! entity id: {img_id}") - + try: + img_id = await services.upload_service.assign_image_id(img_bytes) + except PointDuplicateError as ex: + raise HTTPException(409, + f"The uploaded point is already contained in the database! entity id: {ex.entity_id}") \ + from ex try: image = Image.open(BytesIO(img_bytes)) image.verify() @@ -136,7 +138,7 @@ async def upload_image(image_file: Annotated[UploadFile, File(description="The i format=img_type, index_date=datetime.now()) - await services.upload_service.upload_image(image_data, img_bytes, model.skip_ocr, model.local_thumbnail) + await services.upload_service.queue_upload_image(image_data, img_bytes, model.skip_ocr, model.local_thumbnail) return ImageUploadResponse(message="OK. Image added to upload queue.", image_id=img_id) diff --git a/app/Models/errors.py b/app/Models/errors.py new file mode 100644 index 0000000..921399b --- /dev/null +++ b/app/Models/errors.py @@ -0,0 +1,10 @@ +from uuid import UUID + + +class PointDuplicateError(ValueError): + def __init__(self, message: str, entity_id: UUID | None = None): + self.message = message + self.entity_id = entity_id + super().__init__(message) + + pass diff --git a/app/Services/index_service.py b/app/Services/index_service.py index 0b673a0..00cdcf5 100644 --- a/app/Services/index_service.py +++ b/app/Services/index_service.py @@ -1,6 +1,7 @@ from PIL import Image from fastapi.concurrency import run_in_threadpool +from app.Models.errors import PointDuplicateError from app.Models.img_data import ImageData from app.Services.lifespan_service import LifespanService from app.Services.ocr_services import OCRService @@ -9,10 +10,6 @@ from app.config import config -class PointDuplicateError(ValueError): - pass - - class IndexService(LifespanService): def __init__(self, ocr_service: OCRService, transformers_service: TransformersService, db_context: VectorDbContext): self._ocr_service = ocr_service @@ -45,7 +42,7 @@ async def _is_point_duplicate(self, image_data: list[ImageData]) -> bool: async def index_image(self, image: Image.Image, image_data: ImageData, skip_ocr=False, skip_duplicate_check=False, background=False): if not skip_duplicate_check and (await self._is_point_duplicate([image_data])): - raise PointDuplicateError("The uploaded points are contained in the database!") + raise PointDuplicateError("The uploaded points are contained in the database!", image_data.id) if background: await run_in_threadpool(self._prepare_image, image, image_data, skip_ocr) diff --git a/app/Services/storage/local_storage.py b/app/Services/storage/local_storage.py index 88c26cf..fe04a8a 100644 --- a/app/Services/storage/local_storage.py +++ b/app/Services/storage/local_storage.py @@ -5,13 +5,13 @@ from typing import Optional, AsyncGenerator import aiofiles -from aiopath import Path as asyncPath from loguru import logger from app.Services.storage.base import BaseStorage, FileMetaDataT, RemoteFilePathType, LocalFilePathType from app.Services.storage.exception import RemoteFileNotFoundError, LocalFileNotFoundError, RemoteFilePermissionError, \ LocalFilePermissionError, LocalFileExistsError, RemoteFileExistsError from app.config import config +from app.util.local_file_utility import glob_local_files def transform_exception(param: str): @@ -129,16 +129,13 @@ async def list_files(self, batch_max_files: Optional[int] = None, valid_extensions: Optional[set[str]] = None) \ -> AsyncGenerator[list[RemoteFilePathType], None]: - _path = asyncPath(self.file_path_warp(path)) + local_path = self.file_path_warp(path) files = [] - if valid_extensions is None: - valid_extensions = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'} - async for file in _path.glob(pattern): - if file.suffix.lower() in valid_extensions: - files.append(syncPath(file)) - if batch_max_files is not None and len(files) == batch_max_files: - yield files - files = [] + for file in glob_local_files(local_path, pattern, valid_extensions): + files.append(file) + if batch_max_files is not None and len(files) == batch_max_files: + yield files + files = [] if files: yield files diff --git a/app/Services/storage/s3_compatible_storage.py b/app/Services/storage/s3_compatible_storage.py index 9199c4d..195b751 100644 --- a/app/Services/storage/s3_compatible_storage.py +++ b/app/Services/storage/s3_compatible_storage.py @@ -18,6 +18,7 @@ from app.Services.storage.exception import LocalFileNotFoundError, RemoteFileNotFoundError, RemoteFilePermissionError, \ RemoteFileExistsError from app.config import config +from app.util.local_file_utility import VALID_IMAGE_EXTENSIONS def transform_exception(func): @@ -138,7 +139,7 @@ async def list_files(self, valid_extensions: Optional[set[str]] = None) \ -> AsyncGenerator[list[RemoteFilePathType], None]: if valid_extensions is None: - valid_extensions = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'} + valid_extensions = VALID_IMAGE_EXTENSIONS files = [] # In opendal, current path should be "" instead of "." _path = "" if self._file_path_str_warp(path) == "." else self._file_path_str_warp(path) diff --git a/app/Services/upload_service.py b/app/Services/upload_service.py index 6520a0c..d5f7585 100644 --- a/app/Services/upload_service.py +++ b/app/Services/upload_service.py @@ -1,17 +1,21 @@ import asyncio import gc +import io +import pathlib from io import BytesIO from PIL import Image from loguru import logger from app.Models.api_models.admin_query_params import UploadImageThumbnailMode +from app.Models.errors import PointDuplicateError from app.Models.img_data import ImageData -from app.Services.lifespan_service import LifespanService from app.Services.index_service import IndexService +from app.Services.lifespan_service import LifespanService from app.Services.storage import StorageService from app.Services.vector_db_context import VectorDbContext from app.config import config +from app.util.generate_uuid import generate_uuid class UploadService(LifespanService): @@ -75,11 +79,24 @@ async def _upload_task(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bo img.close() - async def upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool, - thumbnail_mode: UploadImageThumbnailMode): + async def queue_upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool, + thumbnail_mode: UploadImageThumbnailMode): self.uploading_ids.add(img_data.id) await self._queue.put((img_data, img_bytes, skip_ocr, thumbnail_mode)) logger.success("Image {} added to upload queue. Queue Length: {} [+1]", img_data.id, self._queue.qsize()) + async def assign_image_id(self, img_file: pathlib.Path | io.BytesIO | bytes): + img_id = generate_uuid(img_file) + # check for duplicate points + if img_id in self.uploading_ids or len(await self._db_context.validate_ids([str(img_id)])) != 0: + logger.warning("Duplicate upload request for image id: {}", img_id) + raise PointDuplicateError(f"The uploaded point is already contained in the database! entity id: {img_id}", + img_id) + return img_id + + async def sync_upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool, + thumbnail_mode: UploadImageThumbnailMode): + await self._upload_task(img_data, img_bytes, skip_ocr, thumbnail_mode) + def get_queue_size(self): return self._queue.qsize() diff --git a/app/util/local_file_utility.py b/app/util/local_file_utility.py new file mode 100644 index 0000000..389dcc9 --- /dev/null +++ b/app/util/local_file_utility.py @@ -0,0 +1,12 @@ +from pathlib import Path + +VALID_IMAGE_EXTENSIONS = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'} + + +def glob_local_files(path: Path, pattern: str = "*", valid_extensions: set[str] = None): + if valid_extensions is None: + valid_extensions = VALID_IMAGE_EXTENSIONS + + for file in path.glob(pattern): + if file.suffix.lower() in valid_extensions: + yield file diff --git a/main.py b/main.py index aa9d312..4d41847 100644 --- a/main.py +++ b/main.py @@ -62,7 +62,7 @@ def parse_args(): environment.local_indexing = True from scripts import local_indexing - asyncio.run(local_indexing.main(args)) + asyncio.run(local_indexing.main(args.local_index_target_dir)) elif args.local_create_thumbnail: from scripts import local_create_thumbnail diff --git a/readme.md b/readme.md index 3126068..c85d0e0 100644 --- a/readme.md +++ b/readme.md @@ -88,30 +88,13 @@ Local file storage does not require an additional database deployment process, b ``` This operation will create a collection in the Qdrant database with the same name as `config.QDRANT_COLL` to store image vectors. -7. (Optional) In development deployment and small-scale deployment, you can use the built-in static file indexing and - service functions of this application. Use the following command to index your local image directory: - ```shell - python main.py --local-index - ``` - This operation will copy all image files in the `` directory to - the `config.STATIC_FILE_PATH` directory (default is `./static`) and write the image information to the Qdrant - database. - - Then run the following command to generate thumbnails for all images in the static directory: - - ```shell - python main.py --local-create-thumbnail - ``` - - If you want to deploy on a large scale, you can use OSS storage services like `MinIO` to store image files in OSS and - then write the image information to the Qdrant database. -8. Run this application: +7. Run this application: ```shell python main.py ``` You can use `--host` to specify the IP address you want to bind to (default is 0.0.0.0) and `--port` to specify the port you want to bind to (default is 8000). -9. (Optional) Deploy the front-end application: [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App) +8. (Optional) Deploy the front-end application: [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App) is a simple web front-end application for this project. If you want to deploy it, please refer to its [deployment documentation](https://github.com/hv0905/NekoImageGallery.App). @@ -172,6 +155,25 @@ the [official documentation](https://docs.docker.com/config/containers/resource_ docker compose up -d ``` +### Upload images to NekoImageGallery + +There are serval ways to upload images to NekoImageGallery + +- Through the web interface: You can use the web interface to upload images to the server. The web interface is provided + by [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App). Make sure you have enabled the **Admin API + ** and set your **Admin Token** in the configuration file. +- Through local indexing: This is suitable for local deployment or when the images you want to upload are already on the + server. + Use the following command to index your local image directory: + ```shell + python main.py --local-index + ``` + The above command will recursively upload all images in the specified directory and its subdirectories to the server. +- Through the API: You can use the upload API provided by NekoImageGallery to upload images. + Make sure you have enabled the **Admin API** and set your **Admin Token** in the configuration file. + This method is suitable for automated image uploading. Checkout [API documentation](#-api-documentation) for more + information. + ## 📚 API Documentation The API documentation is provided by FastAPI's built-in Swagger UI. You can access the API documentation by visiting diff --git a/scripts/local_indexing.py b/scripts/local_indexing.py index 2632abb..0b1db26 100644 --- a/scripts/local_indexing.py +++ b/scripts/local_indexing.py @@ -1,86 +1,44 @@ -import uuid from datetime import datetime from pathlib import Path import PIL -from PIL import Image from loguru import logger +from app.Models.api_models.admin_query_params import UploadImageThumbnailMode +from app.Models.errors import PointDuplicateError from app.Models.img_data import ImageData from app.Services.provider import ServiceProvider -from app.config import config, StorageMode -from app.util.generate_uuid import generate_uuid -from .local_utility import fetch_path_uuid_list - -overall_count = 0 +from app.util.local_file_utility import glob_local_files services: ServiceProvider | None = None -async def copy_and_index(file_path: Path, uuid_str: str = None): - global overall_count - overall_count += 1 - logger.info("[{}] Indexing {}", str(overall_count), str(file_path)) +async def index_task(file_path: Path): try: - img = Image.open(file_path) + img_id = await services.upload_service.assign_image_id(file_path) + image_data = ImageData(id=img_id, + local=True, + categories=[], + starred=False, + format=file_path.suffix[1:], # remove the dot + index_date=datetime.now()) + await services.upload_service.sync_upload_image(image_data, file_path.read_bytes(), skip_ocr=False, + thumbnail_mode=UploadImageThumbnailMode.IF_NECESSARY) + except PointDuplicateError as ex: + logger.warning("Image {} already exists in the database", file_path) except PIL.UnidentifiedImageError as e: - logger.error("Error when opening image {}: {}", file_path, e) - return - image_id = uuid.UUID(uuid_str) if uuid_str else generate_uuid(file_path) - img_ext = file_path.suffix - imgdata = ImageData(id=image_id, - url=await services.storage_service.active_storage.url(f'{image_id}{img_ext}'), - index_date=datetime.now(), - format=img_ext[1:], - local=True) - try: - # This has already been checked for duplicated, so there's no need to double-check. - await services.index_service.index_image(img, imgdata, skip_duplicate_check=True) - except Exception as e: logger.error("Error when processing image {}: {}", file_path, e) - return - # copy to static - await services.storage_service.active_storage.upload(file_path, f'{image_id}{img_ext}') - - -async def copy_and_index_batch(file_path_list: list[tuple[Path, str]]): - for file_path_uuid_tuple in file_path_list: - await copy_and_index(file_path_uuid_tuple[0], uuid_str=file_path_uuid_tuple[1]) @logger.catch() -async def main(args): +async def main(root_directory): global services - config.storage.method = StorageMode.LOCAL # ensure to use LocalStorage services = ServiceProvider() await services.onload() - root = Path(args.local_index_target_dir) - # First, check if the database is empty - item_number = await services.db_context.get_counts(exact=False) - if item_number == 0: - # database is empty, do as usual - logger.warning("The database is empty, Will not check for duplicate points.") - async for item in services.storage_service.active_storage.list_files(root, batch_max_files=1): - await copy_and_index(item[0]) - else: - # database is not empty, check for duplicate points - logger.warning("The database is not empty, Will check for duplicate points.") - async for itm in services.storage_service.active_storage.list_files(root, batch_max_files=5000): - local_file_path_with_uuid_list = fetch_path_uuid_list(itm) - local_file_uuid_list = [itm[1] for itm in local_file_path_with_uuid_list] - duplicate_uuid_list = await services.db_context.validate_ids(local_file_uuid_list) - if len(duplicate_uuid_list) > 0: - duplicate_uuid_list = set(duplicate_uuid_list) - local_file_path_with_uuid_list = [item for item in local_file_path_with_uuid_list - if item[1] not in duplicate_uuid_list] - logger.info("Found {} duplicate points, of which {} are duplicates in the database. " - "The remaining {} points will be indexed.", - len(itm) - len(local_file_path_with_uuid_list), len(duplicate_uuid_list), - len(local_file_path_with_uuid_list)) - else: - logger.info("Found {} duplicate points, of which {} are duplicates in the database." - " The remaining {} points will be indexed.", - 0, 0, len(local_file_path_with_uuid_list)) - await copy_and_index_batch(local_file_path_with_uuid_list) + root = Path(root_directory) + + for idx, item in enumerate(glob_local_files(root, '**/*')): + logger.info("[{}] Indexing {}", idx, str(item)) + await index_task(item) - logger.success("Indexing completed! {} images indexed", overall_count) + logger.success("Indexing completed!")