Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize/import performance #131

Merged
merged 6 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions deepdataspace/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,8 @@
_mongo_user = urllib.parse.quote_plus(MONGODB_USER)
_mongo_pass = urllib.parse.quote_plus(MONGODB_PASS)
_mongo_url = f"mongodb://{_mongo_user}:{_mongo_pass}@{MONGODB_HOST}:{MONGODB_PORT}/{MONGODB_DBNAME}"
_mongo_client = MongoClient(_mongo_url, authMechanism="SCRAM-SHA-256")
_mongo_client = MongoClient(_mongo_url, authMechanism="SCRAM-SHA-256", maxPoolSize=None)
MongoDB = _mongo_client[MONGODB_DBNAME]

# init redis client
Redis = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DBNAME, password=REDIS_PASS)

# init sentry client
# TODO: sentry is not necessary for dds tool, remove it as soon as possible
if SENTRY_DSN is not None:
sample_rate = 0.1 if ENV == RunningEnv.Prod else 1.0
sentry_sdk.init(dsn=SENTRY_DSN,
traces_sample_rate=sample_rate,
environment=ENV, )
sentry_sdk.set_tag("os.user", get_os_username())
277 changes: 233 additions & 44 deletions deepdataspace/io/importer.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions deepdataspace/model/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import abc
import logging
import time
from threading import Lock
from typing import ClassVar
Expand All @@ -13,6 +14,7 @@
from typing import Tuple

from pydantic import BaseModel as _Base
from pymongo import WriteConcern
from pymongo.collection import Collection
from pymongo.operations import UpdateOne
from pymongo.typings import _DocumentType
Expand All @@ -24,6 +26,8 @@
_batch_save_queue = {} # a dict of batch save queue for every collection, {'collection_name': batch_save_queue, }
_batch_update_queue = {} # a dict of batch update queue for every collection, {'collection_name': batch_update_queue, }

logger = logging.getLogger("model.base")


def current_ts():
"""
Expand Down Expand Up @@ -53,20 +57,13 @@ def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:

raise NotImplementedError

def post_init(self):
"""
Post init hook for initializing a model object.
"""
pass

@classmethod
def from_dict(cls, data: dict):
"""
Convert a python dict to a model object.
"""

obj = cls.parse_obj(data)
obj.post_init()
return obj

def to_dict(self, include: list = None, exclude: list = None):
Expand Down Expand Up @@ -232,6 +229,8 @@ def batch_update(cls, filters: dict, set_data: dict = None, unset_data: dict = N
co = cls.get_collection()
if co is None:
return None
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)

op = UpdateOne(filters, {"$set": set_data, "$unset": unset_data})

Expand All @@ -257,6 +256,8 @@ def finish_batch_update(cls):
op_lock = cls._get_batch_op_lock()
with op_lock:
co = cls.get_collection()
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)
queue = _batch_update_queue.setdefault(cls_id, [])
if queue:
co.bulk_write(queue)
Expand All @@ -275,7 +276,6 @@ def save(self, refresh=False):
If refresh is True, the object will be re-fetched from mongodb after saving.
"""

self.post_init()
co = self.get_collection()
if co is None:
return None
Expand All @@ -293,7 +293,6 @@ def save(self, refresh=False):
new_self = co.find_one({"_id": _id})
new_self.pop("_id", None)
self.__dict__.update(new_self)
self.post_init()
return self

def batch_save(self, batch_size: int = 20, set_on_insert: Dict = None):
Expand All @@ -303,13 +302,12 @@ def batch_save(self, batch_size: int = 20, set_on_insert: Dict = None):
:param batch_size: the batch size. We will only write to mongodb when the batch is full.
:param set_on_insert: the fields only need to be set when we are inserting a new object.
"""

self.post_init()

cls = self.__class__
co = cls.get_collection()
if co is None:
return None
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)

_id = self.__dict__.get("id", None)
if _id is None:
Expand Down Expand Up @@ -348,6 +346,8 @@ def finish_batch_save(cls):
op_lock = _batch_lock[cls_id]
with op_lock:
co = cls.get_collection()
wc = WriteConcern(w=0)
co = co.with_options(write_concern=wc)
queue = _batch_save_queue.setdefault(cls_id, [])
if queue:
co.bulk_write(queue)
Expand Down
70 changes: 20 additions & 50 deletions deepdataspace/model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from deepdataspace.model.image import ImageModel
from deepdataspace.model.label import Label
from deepdataspace.utils.file import create_file_url
from deepdataspace.utils.function import count_block_time
from deepdataspace.utils.string import get_str_md5

logger = logging.getLogger("io.model.dataset")
Expand Down Expand Up @@ -103,7 +104,7 @@ def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
group_name: str = None

_batch_queue: Dict[int, ImageModel] = {}
_batch_size: int = 100
_batch_size: int = 200

@classmethod
def create_dataset(cls,
Expand Down Expand Up @@ -139,6 +140,7 @@ def create_dataset(cls,
dataset.path = path or dataset.path
dataset.files = files or dataset.files
dataset.name = name
dataset.num_images = Image(dataset.id).count_num({})
dataset.save()
return dataset
else:
Expand All @@ -148,42 +150,11 @@ def create_dataset(cls,
dataset = cls(name=name, id=id_, type=type, path=path,
files=files, status=DatasetStatus.Ready,
description=description, description_func=description_func)
dataset.post_init()
dataset.num_images = Image(dataset.id).count_num({})
dataset.save()
return dataset

@classmethod
def get_importing_dataset(cls,
name: str,
id_: str = None,
type: str = None,
path: str = None,
files: dict = None,
) -> "DataSet":
"""
This is the same as create_dataset.
But if the dataset is new, it's status will be set to "waiting" instead of "ready".
"""

if id_:
dataset = DataSet.find_one({"id": id_})
if dataset is not None:
dataset.type = type or dataset.type
dataset.path = path or dataset.path
dataset.files = files or dataset.files
dataset.name = name
dataset.save()
return dataset
else:
id_ = uuid.uuid4().hex

files = files or {}
dataset = cls(name=name, id=id_, type=type, path=path, files=files, status=DatasetStatus.Waiting)
dataset.post_init()
dataset.save()
return dataset

def _add_cover(self, force_update: bool = False):
def add_cover(self, force_update: bool = False):
has_cover = bool(self.cover_url)
if has_cover and not force_update:
return
Expand Down Expand Up @@ -257,17 +228,16 @@ def add_image(self,
image.flag = flag or image.flag
image.flag_ts = flag_ts or image.flag_ts
image.metadata = metadata or image.metadata
image.post_init()
image._dataset = self # this saves a db query

image.save()
self.num_images = Model.count_num({})
self._add_cover()
self.add_cover()

# save whitelist to redis
whitelist_dirs = set()
self._add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self._add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
if whitelist_dirs:
Redis.sadd(RedisKey.DatasetImageDirs, *whitelist_dirs)

Expand Down Expand Up @@ -326,7 +296,7 @@ def batch_add_image(self,
return image

@staticmethod
def _add_local_file_url_to_whitelist(url: str, whitelist: set):
def add_local_file_url_to_whitelist(url: str, whitelist: set):
if not url or not url.startswith("/files/local_files"):
return

Expand Down Expand Up @@ -377,7 +347,7 @@ def _batch_save_image_batch(self):
object_types.add(AnnotationType.Segmentation)
if obj.alpha and AnnotationType.Matting not in object_types:
object_types.add(AnnotationType.Matting)
self._add_local_file_url_to_whitelist(obj.alpha, whitelist_dirs)
self.add_local_file_url_to_whitelist(obj.alpha, whitelist_dirs)
if obj.points and AnnotationType.KeyPoints not in object_types:
object_types.add(AnnotationType.KeyPoints)

Expand All @@ -387,8 +357,8 @@ def _batch_save_image_batch(self):
image.batch_save(batch_size=self._batch_size, set_on_insert={"idx": image.idx})
idx += 1

self._add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self._add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url, whitelist_dirs)
self.add_local_file_url_to_whitelist(image.url_full_res, whitelist_dirs)

# finish batch saves
IModel.finish_batch_save()
Expand All @@ -406,9 +376,9 @@ def _batch_save_image_batch(self):

self._batch_queue.clear()

def batch_save_image(self, enforce: bool = False):
def batch_save_image(self):
batch_is_full = len(self._batch_queue) >= self._batch_size
if batch_is_full or enforce:
if batch_is_full:
self._batch_save_image_batch()
return True
return False
Expand All @@ -419,7 +389,7 @@ def finish_batch_add_image(self):
This saves all images in the buffer queue to database.
"""
self._batch_save_image_batch()
self._add_cover()
self.add_cover()

def eval_description(self):
"""
Expand Down Expand Up @@ -449,16 +419,16 @@ def cascade_delete(dataset: "DataSet"):
return

dataset_id = dataset.id
print(f"dataset [{dataset_id}] is found, deleting...")
logger.info(f"dataset [{dataset_id}] is found, deleting...")

print(f"dataset [{dataset_id}] is found, deleting categories...")
logger.info(f"dataset [{dataset_id}] is found, deleting categories...")
Category.delete_many({"dataset_id": dataset_id})

print(f"dataset [{dataset_id}] is found, deleting labels...")
logger.info(f"dataset [{dataset_id}] is found, deleting labels...")
Label.delete_many({"dataset_id": dataset_id})

print(f"dataset [{dataset_id}] is found, deleting images...")
logger.info(f"dataset [{dataset_id}] is found, deleting images...")
Image(dataset_id).get_collection().drop()

DataSet.delete_many({"id": dataset_id})
print(f"dataset [{dataset_id}] is deleted.")
logger.info(f"dataset [{dataset_id}] is deleted.")
Loading
Loading