Skip to content

Commit

Permalink
Merge pull request #18 from midas-research/added_notifications_to_mer…
Browse files Browse the repository at this point in the history
…ged_cvat_audio

Added notifications and cloud to merged cvat audio
  • Loading branch information
rohan220217 authored Nov 22, 2024
2 parents fc2171e + edd588f commit b81a78d
Show file tree
Hide file tree
Showing 27 changed files with 1,022 additions and 7 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ cvat-core/reports

# produced by cvat/apps/iam/rules/tests/generate_tests.py
/cvat/apps/*/rules/*_test.gen.rego

# Custom
rsa
rsa.pub
95 changes: 95 additions & 0 deletions cvat/apps/engine/chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import traceback
import subprocess


def get_video_duration(video_file):
result = subprocess.run(
['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
duration = float(result.stdout)
return duration


class MakeVideoChunks:
def make(task_id, chunk_duration=1):
try:
current_file_path = os.path.abspath(__file__)
print(f"Current file path: {current_file_path}")

# Define the raw video directory
raw_video_dir = f"/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/data/data/{task_id}/raw"
print(f"Raw video directory: {raw_video_dir}")

# Recursively search for .mp4 files in the raw video directory and its subdirectories
input_files = []
for root, dirs, files in os.walk(raw_video_dir):
for file in files:
if file.endswith('.mp4'):
input_files.append(os.path.join(root, file))

# Check if any .mp4 files are found
if not input_files:
raise FileNotFoundError("No .mp4 files found in the specified directory or subdirectories.")

print(f"Input files: {input_files}")
input_file = input_files[0] # Use the first .mp4 file found
output_folder = f"/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/data/data/{task_id}/compressed"

# Create the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

print(f"Processing video: {input_file}")

# Retrieve video duration
video_duration = get_video_duration(input_file)
print(f"Video duration: {video_duration} seconds")

# Define start and end times
start_time = 0 # Start from the beginning of the video
end_time = int(video_duration) # Set end time to the duration of the video

# Create chunks using a loop
for i in range(start_time, end_time, chunk_duration):
output_file = os.path.join(output_folder, f'{i}.mp4')

# If the output file exists, remove it
if os.path.exists(output_file):
print(f"File {output_file} already exists. Removing it.")
os.remove(output_file)

command = [
'ffmpeg',
'-ss', str(i), # Start time for the chunk
'-i', input_file, # Input file
'-c', 'copy', # Copy codec, no re-encoding
'-t', str(chunk_duration), # Duration of the chunk
output_file # Output file path
]

# Execute the command
print(' '.join(command))
subprocess.run(command)

response = {
"success": True,
"message": None,
"data": None,
"error": None
}

return response
except Exception as e:
print(str(e))
error = traceback.print_exc()

response = {
"success": False,
"message": f"An unexpected error occurred, Error: {e}",
"data": None,
"error": error
}

return response
1 change: 0 additions & 1 deletion cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from cvat.apps.engine.utils import parse_specific_attributes
from cvat.apps.events.utils import cache_deleted

class SafeCharField(models.CharField):
def get_prep_value(self, value):
value = super().get_prep_value(value)
Expand Down
58 changes: 56 additions & 2 deletions cvat/apps/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from utils.dataset_manifest.core import VideoManifestValidator, is_dataset_manifest
from utils.dataset_manifest.utils import detect_related_images
from .cloud_provider import db_storage_to_storage_instance
from .chunks import MakeVideoChunks

slogger = ServerLogManager(__name__)

Expand Down Expand Up @@ -105,6 +106,7 @@ def _copy_data_from_share_point(
))

for path in filtered_server_files:
slogger.glob.info(f"Copying file: {path}")
if server_dir is None:
source_path = os.path.join(settings.SHARE_ROOT, os.path.normpath(path))
else:
Expand Down Expand Up @@ -449,8 +451,10 @@ def _download_data_from_cloud_storage(
files: List[str],
upload_dir: str,
):
slogger.glob.info(f"Downloading data from cloud storage: {files}")
cloud_storage_instance = db_storage_to_storage_instance(db_storage)
cloud_storage_instance.bulk_download_to_dir(files, upload_dir)
slogger.glob.info(f"Downloaded data to {upload_dir}")

def _get_manifest_frame_indexer(start_frame=0, frame_step=1):
return lambda frame_id: start_frame + frame_id * frame_step
Expand Down Expand Up @@ -559,6 +563,7 @@ def _create_thread(
slogger.glob.info("create task #{}".format(db_task.id))

job_file_mapping = _validate_job_file_mapping(db_task, data)
slogger.glob.info(f"Job file mapping: {job_file_mapping}")

db_data = db_task.data
upload_dir = db_data.get_upload_dirname() if db_data.storage != models.StorageChoice.SHARE else settings.SHARE_ROOT
Expand Down Expand Up @@ -700,24 +705,29 @@ def _update_status(msg: str) -> None:

# count and validate uploaded files
media = _count_files(data)
slogger.glob.info(f"Media: {media}")
media, task_mode = _validate_data(media, manifest_files)
is_media_sorted = False

if is_data_in_cloud:
# first we need to filter files and keep only supported ones
slogger.glob.info(f"Data in cloud")
if any([v for k, v in media.items() if k != 'image']) and db_data.storage_method == models.StorageMethodChoice.CACHE:
slogger.glob.info(f"Storage method: {db_data.storage_method}")
# FUTURE-FIXME: This is a temporary workaround for creating tasks
# with unsupported cloud storage data (video, archive, pdf) when use_cache is enabled
db_data.storage_method = models.StorageMethodChoice.FILE_SYSTEM
_update_status("The 'use cache' option is ignored")
# _update_status("The 'use cache' option is ignored")

if db_data.storage_method == models.StorageMethodChoice.FILE_SYSTEM or not settings.USE_CACHE:
slogger.glob.info(f"Storage method: {db_data.storage_method}")
filtered_data = []
for files in (i for i in media.values() if i):
filtered_data.extend(files)
media_to_download = filtered_data

if media['image']:
if 'image' in media and media['image']:
slogger.glob.info(f"Image in media")
start_frame = db_data.start_frame
stop_frame = len(filtered_data) - 1
if data['stop_frame'] is not None:
Expand All @@ -726,40 +736,62 @@ def _update_status(msg: str) -> None:
step = db_data.get_frame_step()
if start_frame or step != 1 or stop_frame != len(filtered_data) - 1:
media_to_download = filtered_data[start_frame : stop_frame + 1: step]

slogger.glob.info(f"Downloading data from cloud storage: {media_to_download}")
_download_data_from_cloud_storage(db_data.cloud_storage, media_to_download, upload_dir)
del media_to_download
del filtered_data
is_data_in_cloud = False
db_data.storage = models.StorageChoice.LOCAL
slogger.glob.info(f"DB Data Storage: {db_data.storage}")
else:
manifest = ImageManifestManager(db_data.get_manifest_path())

if job_file_mapping is not None and task_mode != 'annotation':
raise ValidationError("job_file_mapping can't be used with sequence-based data like videos")

slogger.glob.info(f"Data: {data}")
if data['server_files']:
if db_data.storage == models.StorageChoice.LOCAL and not db_data.cloud_storage:
# this means that the data has not been downloaded from the storage to the host
slogger.glob.info(f"Copying data from share point")
_copy_data_from_share_point(
(data['server_files'] + [manifest_file]) if manifest_file else data['server_files'],
upload_dir, data.get('server_files_path'), data.get('server_files_exclude'))
manifest_root = upload_dir
slogger.glob.info(f"Manifest Root: {manifest_root}")
elif is_data_in_cloud:
# we should sort media before sorting in the extractor because the manifest structure should match to the sorted media
if job_file_mapping is not None:
slogger.glob.info(f"Job file mapping")
filtered_files = []
for f in itertools.chain.from_iterable(job_file_mapping):
if f not in data['server_files']:
raise ValidationError(f"Job mapping file {f} is not specified in input files")
filtered_files.append(f)
data['server_files'] = filtered_files
sorted_media = list(itertools.chain.from_iterable(job_file_mapping))
else:
slogger.glob.info(f"Sorting media")
sorted_media = sort(media['image'], data['sorting_method'])
media['image'] = sorted_media

# Add logic to handle audio files from cloud storage
if db_data.storage == models.StorageChoice.CLOUD_STORAGE:
slogger.glob.info(f"Downloading data from cloud storage: {data['server_files']}")
_download_data_from_cloud_storage(db_data.cloud_storage, data['server_files'], upload_dir)

is_media_sorted = True

if manifest_file:
# Define task manifest content based on cloud storage manifest content and uploaded files
slogger.glob.info(f"Creating task manifest based on cloud storage manifest content and uploaded files")
_create_task_manifest_based_on_cloud_storage_manifest(
sorted_media, cloud_storage_manifest_prefix,
cloud_storage_manifest, manifest)
else: # without manifest file but with use_cache option
# Define task manifest content based on list with uploaded files
slogger.glob.info(f"Creating task manifest from cloud data: {db_data.cloud_storage, sorted_media, manifest}")
_create_task_manifest_from_cloud_data(db_data.cloud_storage, sorted_media, manifest)

av_scan_paths(upload_dir)
Expand All @@ -770,6 +802,7 @@ def _update_status(msg: str) -> None:
# If upload from server_files image and directories
# need to update images list by all found images in directories
if (data['server_files']) and len(media['directory']) and len(media['image']):
slogger.glob.info(f"Updating images list by all found images in directories: {media['directory']}")
media['image'].extend(
[os.path.relpath(image, upload_dir) for image in
MEDIA_TYPES['directory']['extractor'](
Expand Down Expand Up @@ -1264,3 +1297,24 @@ def process_results(img_meta: list[tuple[str, int, tuple[int, int]]]):

slogger.glob.info("Found frames {} for Data #{}".format(db_data.size, db_data.id))
_save_task_to_db(db_task, job_file_mapping=job_file_mapping)

if MEDIA_TYPE == "video":
# Video Chunks overwrites
slogger.glob.info(f"Creating video chunks")
job_id_string = job.id
match = re.search(r'task-(\d+)', job_id_string)

if match:
task_id = match.group(1) # Extracted '106'
response = MakeVideoChunks.make(task_id)
slogger.glob.info(response)
else:
response = {
"success" : False,
"message" : "No match found."
}
slogger.glob.error(response)

# f = open( '/home/vignesh/Desktop/Desktop/IIITD/BTP.02/cvat/cvat/apps/engine/chunks.txt', 'w' )
# f.write( 'dict = ' + repr(response) + '\n' )
# f.close()
11 changes: 11 additions & 0 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,17 @@ def save_segments(self, request):
job.save()

self.send_annotation_email(request, 'annotation')

## Notification
from ..notifications.api import SendNotificationToSingleUser

notification_response = SendNotificationToSingleUser(
request.user.id,
f"#{job.id} - Annotaion Completed",
f"This annotation was completed at {datetime.now()}. \nStatus: {job.ai_audio_annotation_status}",
"info"
)

return Response({'success': True, 'segments': saved_segments}, status=status.HTTP_201_CREATED)

except Exception as e:
Expand Down
Empty file.
18 changes: 18 additions & 0 deletions cvat/apps/notifications/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from django.contrib import admin

from .models import *
# Register your models here.


class NotificationsAdmin(admin.ModelAdmin):
model = Notifications


admin.site.register(Notifications, NotificationsAdmin)


class NotificationStatusAdmin(admin.ModelAdmin):
model = NotificationStatus


admin.site.register(NotificationStatus, NotificationStatusAdmin)
Loading

0 comments on commit b81a78d

Please sign in to comment.