Skip to content

Commit

Permalink
enhanced exception handling for llm and optimized video concatenation
Browse files Browse the repository at this point in the history
  • Loading branch information
harry committed May 16, 2024
1 parent 37e5623 commit 4ca8d8d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 43 deletions.
85 changes: 49 additions & 36 deletions app/services/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from app.config import config

_max_retries = 5


def _generate_response(prompt: str) -> str:
content = ""
Expand Down Expand Up @@ -219,11 +221,8 @@ def generate_script(video_subject: str, language: str = "", paragraph_number: in

final_script = ""
logger.info(f"subject: {video_subject}")
# logger.debug(f"prompt: \n{prompt}")
response = _generate_response(prompt=prompt)

# Return the generated script
if response:
def format_response(response):
# Clean the script
# Remove asterisks, hashes
response = response.replace("*", "")
Expand All @@ -240,19 +239,30 @@ def generate_script(video_subject: str, language: str = "", paragraph_number: in
selected_paragraphs = paragraphs[:paragraph_number]

# Join the selected paragraphs into a single string
final_script = "\n\n".join(selected_paragraphs)
return "\n\n".join(selected_paragraphs)

# Print to console the number of paragraphs used
# logger.info(f"number of paragraphs used: {len(selected_paragraphs)}")
else:
logging.error("gpt returned an empty response")
for i in range(_max_retries):
try:
response = _generate_response(prompt=prompt)
if response:
final_script = format_response(response)
else:
logging.error("gpt returned an empty response")

# g4f may return an error message
if final_script and "当日额度已消耗完" in final_script:
raise ValueError(final_script)

# g4f may return an error message
if final_script and "当日额度已消耗完" in final_script:
raise ValueError(final_script)
if final_script:
break
except Exception as e:
logger.error(f"failed to generate script: {e}")

if i < _max_retries:
logger.warning(f"failed to generate video script, trying again... {i + 1}")

logger.success(f"completed: \n{final_script}")
return final_script
return final_script.strip()


def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> List[str]:
Expand Down Expand Up @@ -283,25 +293,28 @@ def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> Li
""".strip()

logger.info(f"subject: {video_subject}")
# logger.debug(f"prompt: \n{prompt}")
response = _generate_response(prompt)
search_terms = []

try:
search_terms = json.loads(response)
if not isinstance(search_terms, list) or not all(isinstance(term, str) for term in search_terms):
raise ValueError("response is not a list of strings.")

except (json.JSONDecodeError, ValueError):
# logger.warning(f"gpt returned an unformatted response. attempting to clean...")
# Attempt to extract list-like string and convert to list
match = re.search(r'\["(?:[^"\\]|\\.)*"(?:,\s*"[^"\\]*")*\]', response)
if match:
try:
search_terms = json.loads(match.group())
except json.JSONDecodeError:
logger.error(f"could not parse response: {response}")
return []
search_terms = []
for i in range(_max_retries):
try:
response = _generate_response(prompt)
search_terms = json.loads(response)
if not isinstance(search_terms, list) or not all(isinstance(term, str) for term in search_terms):
logger.error("response is not a list of strings.")
continue

except Exception as e:
match = re.search(r'\[.*]', response)
if match:
try:
search_terms = json.loads(match.group())
except json.JSONDecodeError:
pass

if search_terms and len(search_terms) > 0:
break
if i < _max_retries:
logger.warning(f"failed to generate video terms, trying again... {i + 1}")

logger.success(f"completed: \n{search_terms}")
return search_terms
Expand All @@ -310,8 +323,8 @@ def generate_terms(video_subject: str, video_script: str, amount: int = 5) -> Li
if __name__ == "__main__":
video_subject = "生命的意义是什么"
script = generate_script(video_subject=video_subject, language="zh-CN", paragraph_number=1)
# print("######################")
# print(script)
# search_terms = generate_terms(video_subject=video_subject, video_script=script, amount=5)
# print("######################")
# print(search_terms)
print("######################")
print(script)
search_terms = generate_terms(video_subject=video_subject, video_script=script, amount=5)
print("######################")
print(search_terms)
16 changes: 9 additions & 7 deletions app/services/video.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,28 @@ def combine_videos(combined_video_path: str,

clips = []
video_duration = 0

raw_clips = []
for video_path in video_paths:
clip = VideoFileClip(video_path).without_audio()
clip_duration = clip.duration
start_time = 0

while start_time < clip_duration:
end_time = min(start_time + max_clip_duration, clip_duration)
split_clip = clip.subclip(start_time, end_time)
raw_clips.append(split_clip)
# logger.info(f"splitting from {start_time:.2f} to {end_time:.2f}, clip duration {clip_duration:.2f}, split_clip duration {split_clip.duration:.2f}")
start_time = end_time
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while video_duration < audio_duration:
# random video_paths order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(raw_clips)
if video_concat_mode.value == VideoConcatMode.sequential.value:
break

# random video_paths order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(raw_clips)

# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
while video_duration < audio_duration:
for clip in raw_clips:
# Check if clip is longer than the remaining audio
if (audio_duration - video_duration) < clip.duration:
Expand Down

0 comments on commit 4ca8d8d

Please sign in to comment.