diff --git a/server/dive_server/__init__.py b/server/dive_server/__init__.py index 40b2010fb..cbbc6ccb7 100644 --- a/server/dive_server/__init__.py +++ b/server/dive_server/__init__.py @@ -14,7 +14,7 @@ from .client_webroot import ClientWebroot from .crud_annotation import GroupItem, RevisionLogItem, TrackItem -from .event import process_fs_import, process_s3_import, send_new_user_email +from .event import DIVES3Imports, process_fs_import, process_s3_import, send_new_user_email from .views_annotation import AnnotationResource from .views_configuration import ConfigurationResource from .views_dataset import DatasetResource @@ -52,6 +52,18 @@ def load(self, info): ) info["serverRoot"].api = info["serverRoot"].girder.api + diveS3Import = DIVES3Imports() + events.bind( + "rest.post.assetstore/:id/import.before", + "process_s3_import_before", + diveS3Import.process_s3_import_before, + ) + + events.bind( + "rest.post.assetstore/:id/import.after", + "process_s3_import_after", + diveS3Import.process_s3_import_after, + ) events.bind( "filesystem_assetstore_imported", "process_fs_import", diff --git a/server/dive_server/event.py b/server/dive_server/event.py index 7a428d3ba..e7ff12b7c 100644 --- a/server/dive_server/event.py +++ b/server/dive_server/event.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta import os from bson.objectid import ObjectId @@ -19,6 +19,7 @@ FPSMarker, ImageSequenceType, LargeImageType, + MarkForPostProcess, TypeMarker, VideoType, imageRegex, @@ -26,6 +27,8 @@ videoRegex, ) +from . import crud_rpc + def send_new_user_email(event): try: @@ -45,7 +48,6 @@ def process_assetstore_import(event, meta: dict): info = event.info objectType = info.get("type") importPath = info.get("importPath") - now = datetime.now() if not importPath or not objectType or objectType != "item": return @@ -59,47 +61,86 @@ def process_assetstore_import(event, meta: dict): } ) - # TODO figure out what's going on here? - if imageRegex.search(importPath): dataset_type = ImageSequenceType elif videoRegex.search(importPath): - # Look for exisitng video dataset directory + # Look for existing video dataset directory parentFolder = Folder().findOne({"_id": item["folderId"]}) userId = parentFolder['creatorId'] or parentFolder['baseParentId'] user = User().findOne({'_id': ObjectId(userId)}) foldername = f'Video {item["name"]}' + # resuse existing folder if it already exists with same name dest = Folder().createFolder(parentFolder, foldername, creator=user, reuseExisting=True) - if dest['created'] < now: - # Remove the old item, replace it with the new one. + now = datetime.now() + if now - dest['created'] > timedelta(hours=1): + # Remove the old referenced item, replace it with the new one. oldItem = Item().findOne({'folderId': dest['_id'], 'name': item['name']}) if oldItem is not None: + if oldItem['meta'].get('codec', False): + meta = { + 'source_video': oldItem['meta'].get('source_video', None), + 'transcoder': oldItem['meta'].get('ffmpeg', None), + 'originalFps': oldItem['meta'].get('originalFps', None), + 'originalFpsString': oldItem['meta'].get('originalFpsString', None), + 'codec': oldItem['meta'].get('codec', None), + } + item['meta'].update(meta) + Item().save(item) Item().remove(oldItem) Item().move(item, dest) + # Set the dataset to Video Type dataset_type = VideoType - elif largeImageRegEx.search(importPath): - dataset_type = LargeImageType if dataset_type is not None: # Update metadata of parent folder - # FPS is hardcoded for now Item().save(item) folder = Folder().findOne({"_id": item["folderId"]}) root, _ = os.path.split(importPath) + # if the parent folder is not marked as a DIVE Dataset, Mark it. if not asbool(fromMeta(folder, DatasetMarker)): folder["meta"].update( { - TypeMarker: dataset_type, - FPSMarker: DefaultVideoFPS, - DatasetMarker: True, + TypeMarker: dataset_type, # Sets to video + FPSMarker: -1, # auto calculate the FPS from import AssetstoreSourcePathMarker: root, + MarkForPostProcess: True, # skip transcode or transcode if required **meta, } ) Folder().save(folder) +def convert_video_recursive(folder, user): + subFolders = list(Folder().childFolders(folder, 'folder', user)) + for child in subFolders: + if child.get('meta', {}).get(MarkForPostProcess, False): + child['meta']['MarkForPostProcess'] = False + Folder().save(child) + crud_rpc.postprocess(user, child, False, True) + convert_video_recursive(child, user) + + +class DIVES3Imports: + destinationId = None + destinationType = None + + def process_s3_import_before(self, event): + self.destinationId = event.info.get('params', {}).get('destinationId') + self.destinationType = event.info.get('params', {}).get('destinationType') + + def process_s3_import_after(self, event): + if self.destinationType == 'folder' and self.destinationId is not None: + # go through all sub folders and add a new script to convert + destinationFolder = Folder().findOne({"_id": ObjectId(self.destinationId)}) + print(destinationFolder) + userId = destinationFolder['creatorId'] or destinationFolder['baseParentId'] + user = User().findOne({'_id': ObjectId(userId)}) + convert_video_recursive(destinationFolder, user) + self.destinationId = None + self.destinationType = None + + def process_fs_import(event): return process_assetstore_import(event, {AssetstoreSourceMarker: 'filesystem'}) diff --git a/server/dive_server/views_rpc.py b/server/dive_server/views_rpc.py index 8e81086e3..f578bff77 100644 --- a/server/dive_server/views_rpc.py +++ b/server/dive_server/views_rpc.py @@ -5,8 +5,11 @@ from girder.api.rest import Resource from girder.constants import AccessType from girder.models.folder import Folder +from girder.models.item import Item from girder.models.token import Token +from dive_utils import asbool, fromMeta +from dive_utils.constants import DatasetMarker, FPSMarker, MarkForPostProcess, TypeMarker from dive_utils.types import PipelineDescription from . import crud, crud_rpc @@ -22,7 +25,9 @@ def __init__(self, resourceName): self.route("POST", ("pipeline",), self.run_pipeline_task) self.route("POST", ("train",), self.run_training) self.route("POST", ("postprocess", ":id"), self.postprocess) + self.route("POST", ("convert_dive", ":id"), self.convert_dive) self.route("POST", ("convert_large_image", ":id"), self.convert_large_image) + self.route("POST", ("batch_postprocess", ":id"), self.batch_postprocess) @access.user @autoDescribeRoute( @@ -134,6 +139,64 @@ def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepe self.getCurrentUser(), folder, skipJobs, skipTranscoding, additive, additivePrepend, set ) + @access.user + @autoDescribeRoute( + Description("Post-processing to be run after media/annotation import") + .modelParam( + "id", + description="Item ID containing the file to process", + model=Item, + level=AccessType.WRITE, + ) + .param( + "skipJobs", + "Whether to skip processing that might dispatch worker jobs", + paramType="formData", + dataType="boolean", + default=False, + required=False, + ) + .param( + "skipTranscoding", + "Whether to skip processing that might dispatch worker jobs", + paramType="formData", + dataType="boolean", + default=True, + required=False, + ) + ) + def convert_dive(self, item, skipJobs, skipTranscoding): + # Get the parent folder and create a new subFolder then move the item into that folder. + parentFolder = Folder().findOne({"_id": item["folderId"]}) + user = self.getCurrentUser() + if parentFolder: + foldername = f'Video {item["name"]}' + destFolder = Folder().createFolder( + parentFolder, foldername, creator=user, reuseExisting=True + ) + Item().move(item, destFolder) + if not asbool(fromMeta(destFolder, DatasetMarker)): + destFolder["meta"].update( + { + TypeMarker: 'video', + FPSMarker: -1, # auto calculate the FPS from import + } + ) + Folder().save(destFolder) + crud_rpc.postprocess(self.getCurrentUser(), destFolder, skipJobs, skipTranscoding) + return str(destFolder['_id']) + return '' + + def get_marked_for_postprocess(self, folder, user, datasets, limit): + subFolders = list(Folder().childFolders(folder, 'folder', user)) + for child in subFolders: + if child.get('meta', {}).get(MarkForPostProcess, False): + if len(datasets) < limit: + datasets.append(child) + else: + return + self.get_marked_for_postprocess(child, user, datasets, limit) + @access.user @autoDescribeRoute( Description("Convert folder of images to large images").modelParam( @@ -145,3 +208,47 @@ def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepe ) def convert_large_image(self, folder): return crud_rpc.convert_large_image(self.getCurrentUser(), folder) + + + @access.user + @autoDescribeRoute( + Description("Post-processing for after S3 Imports") + .modelParam( + "id", + description="Folder containing the items to process", + model=Folder, + level=AccessType.WRITE, + ) + .param( + "skipJobs", + "Whether to skip processing that might dispatch worker jobs", + paramType="formData", + dataType="boolean", + default=False, + required=False, + ) + .param( + "skipTranscoding", + "Whether to skip processing that might dispatch worker jobs", + paramType="formData", + dataType="boolean", + default=False, + required=False, + ) + .param( + "limit", + "Number of Jobs to start to attempt to convert to DIVE format", + paramType="formData", + dataType="integer", + default=100, + required=False, + ) + ) + def batch_postprocess(self, folder, skipJobs, skipTranscoding, limit): + # get a list of possible Datasets + datasets = [] + self.get_marked_for_postprocess(folder, self.getCurrentUser(), datasets, limit) + for subFolder in datasets: + subFolder['meta']['MarkForPostProcess'] = False + Folder().save(subFolder) + crud_rpc.postprocess(self.getCurrentUser(), subFolder, skipJobs, skipTranscoding) \ No newline at end of file diff --git a/server/dive_utils/constants.py b/server/dive_utils/constants.py index 381aae80a..5abef6ecf 100644 --- a/server/dive_utils/constants.py +++ b/server/dive_utils/constants.py @@ -106,6 +106,7 @@ TypeMarker = "type" AssetstoreSourceMarker = "import_source" AssetstoreSourcePathMarker = "import_path" +MarkForPostProcess = "MarkForPostProcess" FPSMarker = "fps" OriginalFPSMarker = "originalFps" OriginalFPSStringMarker = "originalFpsString"