Skip to content

Commit

Permalink
added support for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen G Pope authored and Stephen G Pope committed Oct 17, 2024
2 parents 9b4a1fd + 81c98ca commit 9929163
Show file tree
Hide file tree
Showing 18 changed files with 277 additions and 120 deletions.
55 changes: 54 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,62 @@
import os

# Retrieve the API key from environment variables
API_KEY = os.environ.get('API_KEY')
if not API_KEY:
raise ValueError("API_KEY environment variable is not set")

# GCP environment variables
GCP_SA_CREDENTIALS = os.environ.get('GCP_SA_CREDENTIALS', '')
GDRIVE_USER = os.environ.get('GDRIVE_USER', '')
GCP_BUCKET_NAME = os.environ.get('GCP_BUCKET_NAME', '')

# S3 (DigitalOcean Spaces) environment variables
S3_ENDPOINT_URL = os.environ.get('S3_ENDPOINT_URL', '')
S3_ACCESS_KEY = os.environ.get('S3_ACCESS_KEY', '')
S3_SECRET_KEY = os.environ.get('S3_SECRET_KEY', '')

def validate_env_vars(provider):
""" Validate the necessary environment variables for the selected storage provider """
required_vars = {
'GCP': ['GCP_BUCKET_NAME', 'GCP_SA_CREDENTIALS'],
'S3': ['S3_ENDPOINT_URL', 'S3_ACCESS_KEY', 'S3_SECRET_KEY']
}

missing_vars = [var for var in required_vars[provider] if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing environment variables for {provider} storage: {', '.join(missing_vars)}")

class CloudStorageProvider:
""" Abstract CloudStorageProvider class to define the upload_file method """
def upload_file(self, file_path: str) -> str:
raise NotImplementedError("upload_file must be implemented by subclasses")

class GCPStorageProvider(CloudStorageProvider):
""" GCP-specific cloud storage provider """
def __init__(self):
self.bucket_name = os.getenv('GCP_BUCKET_NAME')

def upload_file(self, file_path: str) -> str:
from services.gcp_toolkit import upload_to_gcs
return upload_to_gcs(file_path, self.bucket_name)

class S3CompatibleProvider(CloudStorageProvider):
""" S3-compatible storage provider (e.g., DigitalOcean Spaces) """
def __init__(self):
self.bucket_name = os.getenv('S3_BUCKET_NAME')
self.region = os.getenv('S3_REGION')
self.endpoint_url = os.getenv('S3_ENDPOINT_URL')
self.access_key = os.getenv('S3_ACCESS_KEY')
self.secret_key = os.getenv('S3_SECRET_KEY')

def upload_file(self, file_path: str) -> str:
from services.s3_toolkit import upload_to_s3
return upload_to_s3(file_path, self.bucket_name, self.region, self.endpoint_url, self.access_key, self.secret_key)

def get_storage_provider() -> CloudStorageProvider:
""" Get the appropriate storage provider based on the available environment variables """
if os.getenv('S3_BUCKET_NAME'):
validate_env_vars('S3')
return S3CompatibleProvider()
else:
validate_env_vars('GCP')
return GCPStorageProvider()
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ google-auth-httplib2
google-api-python-client
google-cloud-storage
psutil
Pillow
boto3
Pillow
22 changes: 11 additions & 11 deletions routes/audio_mixing.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
from flask import Blueprint, request, jsonify
from flask import current_app
from flask import Blueprint
from app_utils import *
import uuid
import threading
import logging
from services.audio_mixing import process_audio_mixing
from services.authentication import authenticate

from services.gcp_toolkit import upload_to_gcs # Ensure this import is present
from services.cloud_storage import upload_file

audio_mixing_bp = Blueprint('audio_mixing', __name__)
logger = logging.getLogger(__name__)
Expand All @@ -30,7 +26,6 @@
})
@queue_task_wrapper(bypass_queue=False)
def audio_mixing(job_id, data):

video_url = data.get('video_url')
audio_url = data.get('audio_url')
video_vol = data.get('video_vol', 100)
Expand All @@ -42,14 +37,19 @@ def audio_mixing(job_id, data):
logger.info(f"Job {job_id}: Received audio mixing request for {video_url} and {audio_url}")

try:
# Process audio and video mixing
output_filename = process_audio_mixing(
video_url, audio_url, video_vol, audio_vol, output_length, job_id, webhook_url
)
gcs_url = upload_to_gcs(output_filename)

return gcs_url, "/audio-mixing", 200
# Upload the mixed file using the unified upload_file() method
cloud_url = upload_file(output_filename)

logger.info(f"Job {job_id}: Mixed media uploaded to cloud storage: {cloud_url}")

# Return the cloud URL for the uploaded file
return cloud_url, "/audio-mixing", 200

except Exception as e:

logger.error(f"Job {job_id}: Error during audio mixing process - {str(e)}")
return str(e), "/audio-mixing", 500

15 changes: 11 additions & 4 deletions routes/caption_video.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import logging
from services.caption_video import process_captioning
from services.authentication import authenticate
from services.gcp_toolkit import upload_to_gcs
from services.cloud_storage import upload_file
import os

caption_bp = Blueprint('caption', __name__)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,8 +61,14 @@ def caption_video(job_id, data):
output_filename = process_captioning(video_url, captions, caption_type, options, job_id)
logger.info(f"Job {job_id}: Captioning process completed successfully")

return output_filename, "/caption-video", 200

# Upload the captioned video using the unified upload_file() method
cloud_url = upload_file(output_filename)

logger.info(f"Job {job_id}: Captioned video uploaded to cloud storage: {cloud_url}")

# Return the cloud URL for the uploaded file
return cloud_url, "/caption-video", 200

except Exception as e:
logger.error(f"Job {job_id}: Error during captioning process - {str(e)}", exc_info=True)
return str(e), "/caption-video", 500
return str(e), "/caption-video", 500
10 changes: 7 additions & 3 deletions routes/combine_videos.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from services.ffmpeg_toolkit import process_video_combination
from services.authentication import authenticate
from services.cloud_storage import upload_file

combine_bp = Blueprint('combine', __name__)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -38,11 +39,14 @@ def combine_videos(job_id, data):
logger.info(f"Job {job_id}: Received combine-videos request for {len(media_urls)} videos")

try:
gcs_url = process_video_combination(media_urls, job_id)
output_file = process_video_combination(media_urls, job_id)
logger.info(f"Job {job_id}: Video combination process completed successfully")

return gcs_url, "/combine-videos", 200

cloud_url = upload_file(output_file)
logger.info(f"Job {job_id}: Combined video uploaded to cloud storage: {cloud_url}")

return cloud_url, "/combine-videos", 200

except Exception as e:
logger.error(f"Job {job_id}: Error during video combination process - {str(e)}")
return str(e), "/combine-videos", 500
24 changes: 16 additions & 8 deletions routes/extract_keyframes.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from flask import Blueprint, request, jsonify
from flask import current_app
from flask import Blueprint
from app_utils import *
import logging
from services.extract_keyframes import process_keyframe_extraction
from services.authentication import authenticate
from services.gcp_toolkit import upload_to_gcs
from services.cloud_storage import upload_file

extract_keyframes_bp = Blueprint('extract_keyframes', __name__)
logger = logging.getLogger(__name__)
Expand All @@ -30,11 +29,20 @@ def extract_keyframes(job_id, data):
logger.info(f"Job {job_id}: Received keyframe extraction request for {video_url}")

try:
image_urls = process_keyframe_extraction(video_url, job_id)
response = {"image_urls": [{"image_url": url} for url in image_urls]}

return response, "/extract-keyframes", 200
# Process keyframe extraction
image_paths = process_keyframe_extraction(video_url, job_id)

# Upload each extracted keyframe and collect the cloud URLs
image_urls = []
for image_path in image_paths:
cloud_url = upload_file(image_path)
image_urls.append({"image_url": cloud_url})

logger.info(f"Job {job_id}: Keyframes uploaded to cloud storage")

# Return the URLs of the uploaded keyframes
return {"image_urls": image_urls}, "/extract-keyframes", 200

except Exception as e:
logger.error(f"Job {job_id}: Error during keyframe extraction - {str(e)}")
return str(e), "/extract-keyframes", 500
return str(e), "/extract-keyframes", 500
20 changes: 13 additions & 7 deletions routes/image_to_video.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from flask import Blueprint, request, jsonify
from flask import current_app
from flask import Blueprint
from app_utils import *
import logging
from services.image_to_video import process_image_to_video
from services.authentication import authenticate
from services.gcp_toolkit import upload_to_gcs
from services.cloud_storage import upload_file

image_to_video_bp = Blueprint('image_to_video', __name__)
logger = logging.getLogger(__name__)
Expand All @@ -29,20 +28,27 @@ def image_to_video(job_id, data):
image_url = data.get('image_url')
length = data.get('length', 5)
frame_rate = data.get('frame_rate', 30)
zoom_speed = data.get('zoom_speed', 3)/100
zoom_speed = data.get('zoom_speed', 3) / 100
webhook_url = data.get('webhook_url')
id = data.get('id')

logger.info(f"Job {job_id}: Received image to video request for {image_url}")

try:
# Process image to video conversion
output_filename = process_image_to_video(
image_url, length, frame_rate, zoom_speed, job_id, webhook_url
)
gcs_url = upload_to_gcs(output_filename)

return gcs_url, "/image-to-video", 200
# Upload the resulting file using the unified upload_file() method
cloud_url = upload_file(output_filename)

# Log the successful upload
logger.info(f"Job {job_id}: Converted video uploaded to cloud storage: {cloud_url}")

# Return the cloud URL for the uploaded file
return cloud_url, "/image-to-video", 200

except Exception as e:
logger.error(f"Job {job_id}: Error processing image to video: {str(e)}", exc_info=True)
return str(e), "/image-to-video", 500
return str(e), "/image-to-video", 500
31 changes: 11 additions & 20 deletions routes/media_to_mp3.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# routes/media_to_mp3.py
from flask import Blueprint, current_app
from app_utils import *
import logging
from services.ffmpeg_toolkit import process_conversion
from services.authentication import authenticate
from services.gcp_toolkit import upload_to_gcs
from services.cloud_storage import upload_file
import os

convert_bp = Blueprint('convert', __name__)
logger = logging.getLogger(__name__)

GCP_BUCKET_NAME = os.getenv('GCP_BUCKET_NAME')

@convert_bp.route('/media-to-mp3', methods=['POST'])
@authenticate
@validate_payload({
Expand All @@ -26,30 +25,22 @@
})
@queue_task_wrapper(bypass_queue=False)
def convert_media_to_mp3(job_id, data):
media_url = data.get('media_url')
media_url = data['media_url']
webhook_url = data.get('webhook_url')
id = data.get('id')
bitrate = data.get('bitrate', '128k') # Default to 128k if not provided
bitrate = data.get('bitrate', '128k')

logger.info(f"Job {job_id}: Received conversion request for {media_url} with bitrate {bitrate}")
logger.info(f"Job {job_id}: Received media-to-mp3 request for media URL: {media_url}")

try:
output_path = process_conversion(media_url, job_id, bitrate)
logger.info(f"Job {job_id}: Conversion completed. Output file: {output_path}")

if not GCP_BUCKET_NAME:
raise Exception("GCP_BUCKET_NAME is not set.")

logger.info(f"Job {job_id}: Uploading to Google Cloud Storage bucket '{GCP_BUCKET_NAME}'...")
uploaded_file_url = upload_to_gcs(output_path, GCP_BUCKET_NAME)

if not uploaded_file_url:
raise Exception(f"Failed to upload the output file {output_path}")
output_file = process_conversion(media_url, job_id, bitrate)
logger.info(f"Job {job_id}: Media conversion process completed successfully")

logger.info(f"Job {job_id}: File uploaded successfully. URL: {uploaded_file_url}")
cloud_url = upload_file(output_file)
logger.info(f"Job {job_id}: Converted media uploaded to cloud storage: {cloud_url}")

return uploaded_file_url, "/media-to-mp3", 200
return cloud_url, "/media-to-mp3", 200

except Exception as e:
logger.error(f"Job {job_id}: Error during processing - {str(e)}")
logger.error(f"Job {job_id}: Error during media conversion process - {str(e)}")
return str(e), "/media-to-mp3", 500
10 changes: 5 additions & 5 deletions routes/transcribe_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from services.transcription import process_transcription
from services.authentication import authenticate
from services.gcp_toolkit import upload_to_gcs
from services.cloud_storage import upload_file

transcribe_bp = Blueprint('transcribe', __name__)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -37,14 +37,14 @@ def transcribe(job_id, data):
result = process_transcription(media_url, output, max_chars)
logger.info(f"Job {job_id}: Transcription process completed successfully")

# If the result is a file path, upload it to GCS
# If the result is a file path, upload it using the unified upload_file() method
if output in ['srt', 'vtt', 'ass']:
gcs_url = upload_to_gcs(result)
cloud_url = upload_file(result)
os.remove(result) # Remove the temporary file after uploading
return gcs_url, "/transcribe-media", 200
return cloud_url, "/transcribe-media", 200
else:
return result, "/transcribe-media", 200

except Exception as e:
logger.error(f"Job {job_id}: Error during transcription process - {str(e)}")
return str(e), "/transcribe-media", 500
return str(e), "/transcribe-media", 500
17 changes: 16 additions & 1 deletion routes/v1/ffmpeg_compose.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from flask import Blueprint, request, jsonify
from flask import current_app
from app_utils import *
Expand All @@ -6,6 +7,7 @@
import logging
from services.v1.ffmpeg_compose import process_ffmpeg_compose
from services.authentication import authenticate
from services.cloud_storage import upload_file

v1_ffmpeg_compose_bp = Blueprint('v1_ffmpeg_compose', __name__)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -90,7 +92,20 @@ def ffmpeg_api(job_id, data):
logger.info(f"Job {job_id}: Received flexible FFmpeg request")

try:
output_urls = process_ffmpeg_compose(data, job_id)

output_filenames = process_ffmpeg_compose(data, job_id)

# Upload output files to GCP and create result array
output_urls = []
for output_filename in output_filenames:
if os.path.exists(output_filename):
upload_url = upload_file(output_filename)
output_urls.append({"file_url": upload_url})
os.remove(output_filename) # Clean up local output file after upload
else:
raise Exception(f"Expected output file {output_filename} not found")


return output_urls, "/v1/ffmpeg/compose", 200

except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .file_management import download_file
from .gcp_toolkit import upload_to_gcs
from .transcription import process_transcription
from .caption_video import process_captioning # Add this line
from .caption_video import process_captioning
Loading

0 comments on commit 9929163

Please sign in to comment.