Skip to content

Commit

Permalink
Rebuild local utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
hv0905 committed Jul 4, 2024
1 parent d01e75e commit 6aa82c0
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 112 deletions.
20 changes: 11 additions & 9 deletions app/Controllers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
from app.Models.api_response.admin_api_response import ServerInfoResponse, ImageUploadResponse, \
DuplicateValidationResponse
from app.Models.api_response.base import NekoProtocol
from app.Models.errors import PointDuplicateError
from app.Models.img_data import ImageData
from app.Services.authentication import force_admin_token_verify
from app.Services.provider import ServiceProvider
from app.Services.vector_db_context import PointNotFoundError
from app.config import config
from app.util.generate_uuid import generate_uuid, generate_uuid_from_sha1
from app.util.generate_uuid import generate_uuid_from_sha1
from app.util.local_file_utility import VALID_IMAGE_EXTENSIONS

admin_router = APIRouter(dependencies=[Depends(force_admin_token_verify)], tags=["Admin"])

Expand Down Expand Up @@ -106,19 +108,19 @@ async def upload_image(image_file: Annotated[UploadFile, File(description="The i
img_type = IMAGE_MIMES[image_file.content_type.lower()]
elif image_file.filename:
extension = PurePath(image_file.filename).suffix.lower()
if extension in {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'}:
if extension in VALID_IMAGE_EXTENSIONS:
img_type = extension[1:]
if not img_type:
logger.warning("Failed to infer image format of the uploaded image. Content Type: {}, Filename: {}",
image_file.content_type, image_file.filename)
raise HTTPException(415, "Unsupported image format.")
img_bytes = await image_file.read()
img_id = generate_uuid(img_bytes)
if img_id in services.upload_service.uploading_ids or len(
await services.db_context.validate_ids([str(img_id)])) != 0: # check for duplicate points
logger.warning("Duplicate upload request for image id: {}", img_id)
raise HTTPException(409, f"The uploaded point is already contained in the database! entity id: {img_id}")

try:
img_id = await services.upload_service.assign_image_id(img_bytes)
except PointDuplicateError as ex:
raise HTTPException(409,
f"The uploaded point is already contained in the database! entity id: {ex.entity_id}") \
from ex
try:
image = Image.open(BytesIO(img_bytes))
image.verify()
Expand All @@ -136,7 +138,7 @@ async def upload_image(image_file: Annotated[UploadFile, File(description="The i
format=img_type,
index_date=datetime.now())

await services.upload_service.upload_image(image_data, img_bytes, model.skip_ocr, model.local_thumbnail)
await services.upload_service.queue_upload_image(image_data, img_bytes, model.skip_ocr, model.local_thumbnail)
return ImageUploadResponse(message="OK. Image added to upload queue.", image_id=img_id)


Expand Down
10 changes: 10 additions & 0 deletions app/Models/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from uuid import UUID


class PointDuplicateError(ValueError):
def __init__(self, message: str, entity_id: UUID | None = None):
self.message = message
self.entity_id = entity_id
super().__init__(message)

pass
7 changes: 2 additions & 5 deletions app/Services/index_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from PIL import Image
from fastapi.concurrency import run_in_threadpool

from app.Models.errors import PointDuplicateError
from app.Models.img_data import ImageData
from app.Services.lifespan_service import LifespanService
from app.Services.ocr_services import OCRService
Expand All @@ -9,10 +10,6 @@
from app.config import config


class PointDuplicateError(ValueError):
pass


class IndexService(LifespanService):
def __init__(self, ocr_service: OCRService, transformers_service: TransformersService, db_context: VectorDbContext):
self._ocr_service = ocr_service
Expand Down Expand Up @@ -45,7 +42,7 @@ async def _is_point_duplicate(self, image_data: list[ImageData]) -> bool:
async def index_image(self, image: Image.Image, image_data: ImageData, skip_ocr=False, skip_duplicate_check=False,
background=False):
if not skip_duplicate_check and (await self._is_point_duplicate([image_data])):
raise PointDuplicateError("The uploaded points are contained in the database!")
raise PointDuplicateError("The uploaded points are contained in the database!", image_data.id)

if background:
await run_in_threadpool(self._prepare_image, image, image_data, skip_ocr)
Expand Down
17 changes: 7 additions & 10 deletions app/Services/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from typing import Optional, AsyncGenerator

import aiofiles
from aiopath import Path as asyncPath
from loguru import logger

from app.Services.storage.base import BaseStorage, FileMetaDataT, RemoteFilePathType, LocalFilePathType
from app.Services.storage.exception import RemoteFileNotFoundError, LocalFileNotFoundError, RemoteFilePermissionError, \
LocalFilePermissionError, LocalFileExistsError, RemoteFileExistsError
from app.config import config
from app.util.local_file_utility import glob_local_files


def transform_exception(param: str):
Expand Down Expand Up @@ -129,16 +129,13 @@ async def list_files(self,
batch_max_files: Optional[int] = None,
valid_extensions: Optional[set[str]] = None) \
-> AsyncGenerator[list[RemoteFilePathType], None]:
_path = asyncPath(self.file_path_warp(path))
local_path = self.file_path_warp(path)
files = []
if valid_extensions is None:
valid_extensions = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'}
async for file in _path.glob(pattern):
if file.suffix.lower() in valid_extensions:
files.append(syncPath(file))
if batch_max_files is not None and len(files) == batch_max_files:
yield files
files = []
for file in glob_local_files(local_path, pattern, valid_extensions):
files.append(file)
if batch_max_files is not None and len(files) == batch_max_files:
yield files
files = []
if files:
yield files

Expand Down
3 changes: 2 additions & 1 deletion app/Services/storage/s3_compatible_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from app.Services.storage.exception import LocalFileNotFoundError, RemoteFileNotFoundError, RemoteFilePermissionError, \
RemoteFileExistsError
from app.config import config
from app.util.local_file_utility import VALID_IMAGE_EXTENSIONS


def transform_exception(func):
Expand Down Expand Up @@ -138,7 +139,7 @@ async def list_files(self,
valid_extensions: Optional[set[str]] = None) \
-> AsyncGenerator[list[RemoteFilePathType], None]:
if valid_extensions is None:
valid_extensions = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'}
valid_extensions = VALID_IMAGE_EXTENSIONS
files = []
# In opendal, current path should be "" instead of "."
_path = "" if self._file_path_str_warp(path) == "." else self._file_path_str_warp(path)
Expand Down
23 changes: 20 additions & 3 deletions app/Services/upload_service.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import asyncio
import gc
import io
import pathlib
from io import BytesIO

from PIL import Image
from loguru import logger

from app.Models.api_models.admin_query_params import UploadImageThumbnailMode
from app.Models.errors import PointDuplicateError
from app.Models.img_data import ImageData
from app.Services.lifespan_service import LifespanService
from app.Services.index_service import IndexService
from app.Services.lifespan_service import LifespanService
from app.Services.storage import StorageService
from app.Services.vector_db_context import VectorDbContext
from app.config import config
from app.util.generate_uuid import generate_uuid


class UploadService(LifespanService):
Expand Down Expand Up @@ -75,11 +79,24 @@ async def _upload_task(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bo

img.close()

async def upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool,
thumbnail_mode: UploadImageThumbnailMode):
async def queue_upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool,
thumbnail_mode: UploadImageThumbnailMode):
self.uploading_ids.add(img_data.id)
await self._queue.put((img_data, img_bytes, skip_ocr, thumbnail_mode))
logger.success("Image {} added to upload queue. Queue Length: {} [+1]", img_data.id, self._queue.qsize())

async def assign_image_id(self, img_file: pathlib.Path | io.BytesIO | bytes):
img_id = generate_uuid(img_file)
# check for duplicate points
if img_id in self.uploading_ids or len(await self._db_context.validate_ids([str(img_id)])) != 0:
logger.warning("Duplicate upload request for image id: {}", img_id)
raise PointDuplicateError(f"The uploaded point is already contained in the database! entity id: {img_id}",
img_id)
return img_id

async def sync_upload_image(self, img_data: ImageData, img_bytes: bytes, skip_ocr: bool,
thumbnail_mode: UploadImageThumbnailMode):
await self._upload_task(img_data, img_bytes, skip_ocr, thumbnail_mode)

def get_queue_size(self):
return self._queue.qsize()
12 changes: 12 additions & 0 deletions app/util/local_file_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from pathlib import Path

VALID_IMAGE_EXTENSIONS = {'.jpg', '.png', '.jpeg', '.jfif', '.webp', '.gif'}


def glob_local_files(path: Path, pattern: str = "*", valid_extensions: set[str] = None):
if valid_extensions is None:
valid_extensions = VALID_IMAGE_EXTENSIONS

for file in path.glob(pattern):
if file.suffix.lower() in valid_extensions:
yield file
2 changes: 1 addition & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def parse_args():
environment.local_indexing = True
from scripts import local_indexing

asyncio.run(local_indexing.main(args))
asyncio.run(local_indexing.main(args.local_index_target_dir))
elif args.local_create_thumbnail:
from scripts import local_create_thumbnail

Expand Down
40 changes: 21 additions & 19 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,30 +88,13 @@ Local file storage does not require an additional database deployment process, b
```
This operation will create a collection in the Qdrant database with the same name as `config.QDRANT_COLL` to store
image vectors.
7. (Optional) In development deployment and small-scale deployment, you can use the built-in static file indexing and
service functions of this application. Use the following command to index your local image directory:
```shell
python main.py --local-index <path-to-your-image-directory>
```
This operation will copy all image files in the `<path-to-your-image-directory>` directory to
the `config.STATIC_FILE_PATH` directory (default is `./static`) and write the image information to the Qdrant
database.
Then run the following command to generate thumbnails for all images in the static directory:
```shell
python main.py --local-create-thumbnail
```
If you want to deploy on a large scale, you can use OSS storage services like `MinIO` to store image files in OSS and
then write the image information to the Qdrant database.
8. Run this application:
7. Run this application:
```shell
python main.py
```
You can use `--host` to specify the IP address you want to bind to (default is 0.0.0.0) and `--port` to specify the
port you want to bind to (default is 8000).
9. (Optional) Deploy the front-end application: [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App)
8. (Optional) Deploy the front-end application: [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App)
is a simple web front-end application for this project. If you want to deploy it, please refer to
its [deployment documentation](https://github.com/hv0905/NekoImageGallery.App).
Expand Down Expand Up @@ -172,6 +155,25 @@ the [official documentation](https://docs.docker.com/config/containers/resource_
docker compose up -d
```
### Upload images to NekoImageGallery
There are serval ways to upload images to NekoImageGallery
- Through the web interface: You can use the web interface to upload images to the server. The web interface is provided
by [NekoImageGallery.App](https://github.com/hv0905/NekoImageGallery.App). Make sure you have enabled the **Admin API
** and set your **Admin Token** in the configuration file.
- Through local indexing: This is suitable for local deployment or when the images you want to upload are already on the
server.
Use the following command to index your local image directory:
```shell
python main.py --local-index <path-to-your-image-directory>
```
The above command will recursively upload all images in the specified directory and its subdirectories to the server.
- Through the API: You can use the upload API provided by NekoImageGallery to upload images.
Make sure you have enabled the **Admin API** and set your **Admin Token** in the configuration file.
This method is suitable for automated image uploading. Checkout [API documentation](#-api-documentation) for more
information.
## 📚 API Documentation
The API documentation is provided by FastAPI's built-in Swagger UI. You can access the API documentation by visiting
Expand Down
86 changes: 22 additions & 64 deletions scripts/local_indexing.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,44 @@
import uuid
from datetime import datetime
from pathlib import Path

import PIL
from PIL import Image
from loguru import logger

from app.Models.api_models.admin_query_params import UploadImageThumbnailMode
from app.Models.errors import PointDuplicateError
from app.Models.img_data import ImageData
from app.Services.provider import ServiceProvider
from app.config import config, StorageMode
from app.util.generate_uuid import generate_uuid
from .local_utility import fetch_path_uuid_list

overall_count = 0
from app.util.local_file_utility import glob_local_files

services: ServiceProvider | None = None


async def copy_and_index(file_path: Path, uuid_str: str = None):
global overall_count
overall_count += 1
logger.info("[{}] Indexing {}", str(overall_count), str(file_path))
async def index_task(file_path: Path):
try:
img = Image.open(file_path)
img_id = await services.upload_service.assign_image_id(file_path)
image_data = ImageData(id=img_id,
local=True,
categories=[],
starred=False,
format=file_path.suffix[1:], # remove the dot
index_date=datetime.now())
await services.upload_service.sync_upload_image(image_data, file_path.read_bytes(), skip_ocr=False,
thumbnail_mode=UploadImageThumbnailMode.IF_NECESSARY)
except PointDuplicateError as ex:
logger.warning("Image {} already exists in the database", file_path)
except PIL.UnidentifiedImageError as e:
logger.error("Error when opening image {}: {}", file_path, e)
return
image_id = uuid.UUID(uuid_str) if uuid_str else generate_uuid(file_path)
img_ext = file_path.suffix
imgdata = ImageData(id=image_id,
url=await services.storage_service.active_storage.url(f'{image_id}{img_ext}'),
index_date=datetime.now(),
format=img_ext[1:],
local=True)
try:
# This has already been checked for duplicated, so there's no need to double-check.
await services.index_service.index_image(img, imgdata, skip_duplicate_check=True)
except Exception as e:
logger.error("Error when processing image {}: {}", file_path, e)
return
# copy to static
await services.storage_service.active_storage.upload(file_path, f'{image_id}{img_ext}')


async def copy_and_index_batch(file_path_list: list[tuple[Path, str]]):
for file_path_uuid_tuple in file_path_list:
await copy_and_index(file_path_uuid_tuple[0], uuid_str=file_path_uuid_tuple[1])


@logger.catch()
async def main(args):
async def main(root_directory):
global services
config.storage.method = StorageMode.LOCAL # ensure to use LocalStorage
services = ServiceProvider()
await services.onload()
root = Path(args.local_index_target_dir)
# First, check if the database is empty
item_number = await services.db_context.get_counts(exact=False)
if item_number == 0:
# database is empty, do as usual
logger.warning("The database is empty, Will not check for duplicate points.")
async for item in services.storage_service.active_storage.list_files(root, batch_max_files=1):
await copy_and_index(item[0])
else:
# database is not empty, check for duplicate points
logger.warning("The database is not empty, Will check for duplicate points.")
async for itm in services.storage_service.active_storage.list_files(root, batch_max_files=5000):
local_file_path_with_uuid_list = fetch_path_uuid_list(itm)
local_file_uuid_list = [itm[1] for itm in local_file_path_with_uuid_list]
duplicate_uuid_list = await services.db_context.validate_ids(local_file_uuid_list)
if len(duplicate_uuid_list) > 0:
duplicate_uuid_list = set(duplicate_uuid_list)
local_file_path_with_uuid_list = [item for item in local_file_path_with_uuid_list
if item[1] not in duplicate_uuid_list]
logger.info("Found {} duplicate points, of which {} are duplicates in the database. "
"The remaining {} points will be indexed.",
len(itm) - len(local_file_path_with_uuid_list), len(duplicate_uuid_list),
len(local_file_path_with_uuid_list))
else:
logger.info("Found {} duplicate points, of which {} are duplicates in the database."
" The remaining {} points will be indexed.",
0, 0, len(local_file_path_with_uuid_list))
await copy_and_index_batch(local_file_path_with_uuid_list)
root = Path(root_directory)

for idx, item in enumerate(glob_local_files(root, '**/*')):
logger.info("[{}] Indexing {}", idx, str(item))
await index_task(item)

logger.success("Indexing completed! {} images indexed", overall_count)
logger.success("Indexing completed!")

0 comments on commit 6aa82c0

Please sign in to comment.