From b437801863269acc2fb3fe39c826118046986e7b Mon Sep 17 00:00:00 2001 From: imhuwq Date: Thu, 28 Mar 2024 15:02:21 +0800 Subject: [PATCH] feature(dataset indices): create necessary indices on startup --- deepdataspace/task/__init__.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/deepdataspace/task/__init__.py b/deepdataspace/task/__init__.py index 7d6ecd3..e666b98 100644 --- a/deepdataspace/task/__init__.py +++ b/deepdataspace/task/__init__.py @@ -13,6 +13,10 @@ from celery.signals import worker_ready from deepdataspace import environs +from deepdataspace.model.dataset import DataSet +from deepdataspace.model.dataset import DatasetStatus +from deepdataspace.model.image import Image +from deepdataspace.task.celery import app from deepdataspace.task.celery import app from deepdataspace.task.ping import ping @@ -71,8 +75,38 @@ def import_and_process_data_dir(data_dir: str, enforce: bool = False, auto_trigg logger.error(str(err)) +@app.task +def create_dataset_index(): + """ + Create index for all datasets. + """ + + logger.info(f"create_dataset_index starts") + + total = DataSet.count_num({}) + datasets = DataSet.find_many({}) + for idx, dataset in enumerate(datasets): + logger.info(f"[{idx + 1}/{total}] creating index for dataset {dataset.id}") + if dataset.status != DatasetStatus.Ready: + logger.info(f"dataset status is not {DatasetStatus.Ready}, skip it") + else: + ImageModel = Image(dataset.id) + ImageModel.get_collection().create_index([ + ("objects.category_id", 1), + ]) + logger.info(f"dataset created index on field: objects.category_id") + + ImageModel.get_collection().create_index([ + ("idx", 1) + ]) + logger.info(f"dataset created index on field: idx") + + @worker_ready.connect def startup_jobs(sender=None, headers=None, body=None, **kwargs): + # ensure necessary indices + create_dataset_index.apply_async() + # import all datasets data_dir = environs.DATASET_DIR import_and_process_data_dir.apply_async(args=(data_dir, False, True))