Skip to content

Commit

Permalink
S3 Import Processing Handling (#1387)
Browse files Browse the repository at this point in the history
* Adds better S3 import processing based on UMD/Janssen version of dive

* add batch processing
  • Loading branch information
BryonLewis authored Dec 1, 2023
1 parent 56d57ab commit 8934ab4
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 14 deletions.
14 changes: 13 additions & 1 deletion server/dive_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from .client_webroot import ClientWebroot
from .crud_annotation import GroupItem, RevisionLogItem, TrackItem
from .event import process_fs_import, process_s3_import, send_new_user_email
from .event import DIVES3Imports, process_fs_import, process_s3_import, send_new_user_email
from .views_annotation import AnnotationResource
from .views_configuration import ConfigurationResource
from .views_dataset import DatasetResource
Expand Down Expand Up @@ -52,6 +52,18 @@ def load(self, info):
)
info["serverRoot"].api = info["serverRoot"].girder.api

diveS3Import = DIVES3Imports()
events.bind(
"rest.post.assetstore/:id/import.before",
"process_s3_import_before",
diveS3Import.process_s3_import_before,
)

events.bind(
"rest.post.assetstore/:id/import.after",
"process_s3_import_after",
diveS3Import.process_s3_import_after,
)
events.bind(
"filesystem_assetstore_imported",
"process_fs_import",
Expand Down
67 changes: 54 additions & 13 deletions server/dive_server/event.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timedelta
import os

from bson.objectid import ObjectId
Expand All @@ -19,13 +19,16 @@
FPSMarker,
ImageSequenceType,
LargeImageType,
MarkForPostProcess,
TypeMarker,
VideoType,
imageRegex,
largeImageRegEx,
videoRegex,
)

from . import crud_rpc


def send_new_user_email(event):
try:
Expand All @@ -45,7 +48,6 @@ def process_assetstore_import(event, meta: dict):
info = event.info
objectType = info.get("type")
importPath = info.get("importPath")
now = datetime.now()

if not importPath or not objectType or objectType != "item":
return
Expand All @@ -59,47 +61,86 @@ def process_assetstore_import(event, meta: dict):
}
)

# TODO figure out what's going on here?

if imageRegex.search(importPath):
dataset_type = ImageSequenceType

elif videoRegex.search(importPath):
# Look for exisitng video dataset directory
# Look for existing video dataset directory
parentFolder = Folder().findOne({"_id": item["folderId"]})
userId = parentFolder['creatorId'] or parentFolder['baseParentId']
user = User().findOne({'_id': ObjectId(userId)})
foldername = f'Video {item["name"]}'
# resuse existing folder if it already exists with same name
dest = Folder().createFolder(parentFolder, foldername, creator=user, reuseExisting=True)
if dest['created'] < now:
# Remove the old item, replace it with the new one.
now = datetime.now()
if now - dest['created'] > timedelta(hours=1):
# Remove the old referenced item, replace it with the new one.
oldItem = Item().findOne({'folderId': dest['_id'], 'name': item['name']})
if oldItem is not None:
if oldItem['meta'].get('codec', False):
meta = {
'source_video': oldItem['meta'].get('source_video', None),
'transcoder': oldItem['meta'].get('ffmpeg', None),
'originalFps': oldItem['meta'].get('originalFps', None),
'originalFpsString': oldItem['meta'].get('originalFpsString', None),
'codec': oldItem['meta'].get('codec', None),
}
item['meta'].update(meta)
Item().save(item)
Item().remove(oldItem)
Item().move(item, dest)
# Set the dataset to Video Type
dataset_type = VideoType
elif largeImageRegEx.search(importPath):
dataset_type = LargeImageType

if dataset_type is not None:
# Update metadata of parent folder
# FPS is hardcoded for now
Item().save(item)
folder = Folder().findOne({"_id": item["folderId"]})
root, _ = os.path.split(importPath)
# if the parent folder is not marked as a DIVE Dataset, Mark it.
if not asbool(fromMeta(folder, DatasetMarker)):
folder["meta"].update(
{
TypeMarker: dataset_type,
FPSMarker: DefaultVideoFPS,
DatasetMarker: True,
TypeMarker: dataset_type, # Sets to video
FPSMarker: -1, # auto calculate the FPS from import
AssetstoreSourcePathMarker: root,
MarkForPostProcess: True, # skip transcode or transcode if required
**meta,
}
)
Folder().save(folder)


def convert_video_recursive(folder, user):
subFolders = list(Folder().childFolders(folder, 'folder', user))
for child in subFolders:
if child.get('meta', {}).get(MarkForPostProcess, False):
child['meta']['MarkForPostProcess'] = False
Folder().save(child)
crud_rpc.postprocess(user, child, False, True)
convert_video_recursive(child, user)


class DIVES3Imports:
destinationId = None
destinationType = None

def process_s3_import_before(self, event):
self.destinationId = event.info.get('params', {}).get('destinationId')
self.destinationType = event.info.get('params', {}).get('destinationType')

def process_s3_import_after(self, event):
if self.destinationType == 'folder' and self.destinationId is not None:
# go through all sub folders and add a new script to convert
destinationFolder = Folder().findOne({"_id": ObjectId(self.destinationId)})
print(destinationFolder)
userId = destinationFolder['creatorId'] or destinationFolder['baseParentId']
user = User().findOne({'_id': ObjectId(userId)})
convert_video_recursive(destinationFolder, user)
self.destinationId = None
self.destinationType = None


def process_fs_import(event):
return process_assetstore_import(event, {AssetstoreSourceMarker: 'filesystem'})

Expand Down
107 changes: 107 additions & 0 deletions server/dive_server/views_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from girder.api.rest import Resource
from girder.constants import AccessType
from girder.models.folder import Folder
from girder.models.item import Item
from girder.models.token import Token

from dive_utils import asbool, fromMeta
from dive_utils.constants import DatasetMarker, FPSMarker, MarkForPostProcess, TypeMarker
from dive_utils.types import PipelineDescription

from . import crud, crud_rpc
Expand All @@ -22,7 +25,9 @@ def __init__(self, resourceName):
self.route("POST", ("pipeline",), self.run_pipeline_task)
self.route("POST", ("train",), self.run_training)
self.route("POST", ("postprocess", ":id"), self.postprocess)
self.route("POST", ("convert_dive", ":id"), self.convert_dive)
self.route("POST", ("convert_large_image", ":id"), self.convert_large_image)
self.route("POST", ("batch_postprocess", ":id"), self.batch_postprocess)

@access.user
@autoDescribeRoute(
Expand Down Expand Up @@ -134,6 +139,64 @@ def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepe
self.getCurrentUser(), folder, skipJobs, skipTranscoding, additive, additivePrepend, set
)

@access.user
@autoDescribeRoute(
Description("Post-processing to be run after media/annotation import")
.modelParam(
"id",
description="Item ID containing the file to process",
model=Item,
level=AccessType.WRITE,
)
.param(
"skipJobs",
"Whether to skip processing that might dispatch worker jobs",
paramType="formData",
dataType="boolean",
default=False,
required=False,
)
.param(
"skipTranscoding",
"Whether to skip processing that might dispatch worker jobs",
paramType="formData",
dataType="boolean",
default=True,
required=False,
)
)
def convert_dive(self, item, skipJobs, skipTranscoding):
# Get the parent folder and create a new subFolder then move the item into that folder.
parentFolder = Folder().findOne({"_id": item["folderId"]})
user = self.getCurrentUser()
if parentFolder:
foldername = f'Video {item["name"]}'
destFolder = Folder().createFolder(
parentFolder, foldername, creator=user, reuseExisting=True
)
Item().move(item, destFolder)
if not asbool(fromMeta(destFolder, DatasetMarker)):
destFolder["meta"].update(
{
TypeMarker: 'video',
FPSMarker: -1, # auto calculate the FPS from import
}
)
Folder().save(destFolder)
crud_rpc.postprocess(self.getCurrentUser(), destFolder, skipJobs, skipTranscoding)
return str(destFolder['_id'])
return ''

def get_marked_for_postprocess(self, folder, user, datasets, limit):
subFolders = list(Folder().childFolders(folder, 'folder', user))
for child in subFolders:
if child.get('meta', {}).get(MarkForPostProcess, False):
if len(datasets) < limit:
datasets.append(child)
else:
return
self.get_marked_for_postprocess(child, user, datasets, limit)

@access.user
@autoDescribeRoute(
Description("Convert folder of images to large images").modelParam(
Expand All @@ -145,3 +208,47 @@ def postprocess(self, folder, skipJobs, skipTranscoding, additive, additivePrepe
)
def convert_large_image(self, folder):
return crud_rpc.convert_large_image(self.getCurrentUser(), folder)


@access.user
@autoDescribeRoute(
Description("Post-processing for after S3 Imports")
.modelParam(
"id",
description="Folder containing the items to process",
model=Folder,
level=AccessType.WRITE,
)
.param(
"skipJobs",
"Whether to skip processing that might dispatch worker jobs",
paramType="formData",
dataType="boolean",
default=False,
required=False,
)
.param(
"skipTranscoding",
"Whether to skip processing that might dispatch worker jobs",
paramType="formData",
dataType="boolean",
default=False,
required=False,
)
.param(
"limit",
"Number of Jobs to start to attempt to convert to DIVE format",
paramType="formData",
dataType="integer",
default=100,
required=False,
)
)
def batch_postprocess(self, folder, skipJobs, skipTranscoding, limit):
# get a list of possible Datasets
datasets = []
self.get_marked_for_postprocess(folder, self.getCurrentUser(), datasets, limit)
for subFolder in datasets:
subFolder['meta']['MarkForPostProcess'] = False
Folder().save(subFolder)
crud_rpc.postprocess(self.getCurrentUser(), subFolder, skipJobs, skipTranscoding)
1 change: 1 addition & 0 deletions server/dive_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
TypeMarker = "type"
AssetstoreSourceMarker = "import_source"
AssetstoreSourcePathMarker = "import_path"
MarkForPostProcess = "MarkForPostProcess"
FPSMarker = "fps"
OriginalFPSMarker = "originalFps"
OriginalFPSStringMarker = "originalFpsString"
Expand Down

0 comments on commit 8934ab4

Please sign in to comment.