diff --git a/src/yt2doc/factories.py b/src/yt2doc/factories.py index 44940d6..2d6fc54 100644 --- a/src/yt2doc/factories.py +++ b/src/yt2doc/factories.py @@ -15,6 +15,7 @@ from yt2doc.formatting.formatter import MarkdownFormatter from yt2doc.formatting.llm_topic_segmenter import LLMTopicSegmenter from yt2doc.formatting.llm_adapter import LLMAdapter +from yt2doc.formatting.paragraphs_segmenter import ParagraphsSegmenter from yt2doc.yt2doc import Yt2Doc @@ -43,6 +44,7 @@ def get_yt2doc( ) sat = SaT(sat_model) + paragraphs_segmenter = ParagraphsSegmenter(sat=sat) if segment_unchaptered is True: if llm_model is None: raise LLMModelNotSpecified( @@ -57,9 +59,12 @@ def get_yt2doc( ) llm_adapter = LLMAdapter(llm_client=llm_client, llm_model=llm_model) llm_topic_segmenter = LLMTopicSegmenter(llm_adapter=llm_adapter) - formatter = MarkdownFormatter(sat=sat, topic_segmenter=llm_topic_segmenter) + formatter = MarkdownFormatter( + paragraphs_segmenter=paragraphs_segmenter, + topic_segmenter=llm_topic_segmenter, + ) else: - formatter = MarkdownFormatter(sat=sat) + formatter = MarkdownFormatter(paragraphs_segmenter=paragraphs_segmenter) video_info_extractor = MediaInfoExtractor(temp_dir=temp_dir) transcriber = Transcriber( diff --git a/src/yt2doc/formatting/formatter.py b/src/yt2doc/formatting/formatter.py index 2eabc66..486ae9d 100644 --- a/src/yt2doc/formatting/formatter.py +++ b/src/yt2doc/formatting/formatter.py @@ -1,8 +1,6 @@ import typing import logging -from wtpsplit import SaT - from yt2doc.extraction import interfaces as extraction_interfaces from yt2doc.formatting import interfaces @@ -12,24 +10,23 @@ class MarkdownFormatter: def __init__( self, - sat: SaT, + paragraphs_segmenter: interfaces.IParagraphsSegmenter, topic_segmenter: typing.Optional[interfaces.ITopicSegmenter] = None, ) -> None: - self.sat = sat + self.paragraphs_segmenter = paragraphs_segmenter self.topic_segmenter = topic_segmenter self.video_title_template = "# {name}" self.chapter_title_template = "## {name}" - def _paragraph_text(self, text: str) -> str: - if len(text) < 15: - return text - logger.info("Splitting text into paragraphs with Segment Any Text.") - paragraphed_sentences: typing.List[typing.List[str]] = self.sat.split( - text, do_paragraph_segmentation=True, verbose=True - ) - paragraphs = ["".join(sentences) for sentences in paragraphed_sentences] - paragraphed_text = "\n\n".join(paragraphs) - return paragraphed_text + @staticmethod + def _paragraphs_to_text( + paragraphs: typing.Sequence[typing.Sequence[interfaces.Sentence]], + ) -> str: + paragraph_texts = [] + for paragraph in paragraphs: + paragraph_text = "".join(sentence.text for sentence in paragraph) + paragraph_texts.append(paragraph_text) + return "\n\n".join(paragraph_texts) def format_chaptered_transcript( self, chaptered_transcript: extraction_interfaces.ChapteredTranscript @@ -42,24 +39,21 @@ def format_chaptered_transcript( and len(chaptered_transcript.chapters) == 1 ): transcript_segments = chaptered_transcript.chapters[0].segments - full_text = "".join([segment.text for segment in transcript_segments]) - logger.info( - "Splitting text into paragraphs with Segment Any Text for topic segmentation." + paragraphed_sentences = self.paragraphs_segmenter.segment( + transcription_segments=transcript_segments ) - paragraphed_sentences: typing.List[typing.List[str]] = self.sat.split( - full_text, do_paragraph_segmentation=True, verbose=True + chapters = self.topic_segmenter.segment( + sentences_in_paragraphs=paragraphed_sentences ) - chapters = self.topic_segmenter.segment(paragraphs=paragraphed_sentences) chapter_and_text_list = [ - (chapter.title, chapter.text) for chapter in chapters + (chapter.title, self._paragraphs_to_text(chapter.paragraphs)) + for chapter in chapters ] else: for chapter in chaptered_transcript.chapters: - chapter_text = self._paragraph_text( - "".join(s.text for s in chapter.segments) - ) - chapter_and_text_list.append((chapter.title, chapter_text.strip())) + chapter_full_text = "".join(s.text for s in chapter.segments) + chapter_and_text_list.append((chapter.title, chapter_full_text.strip())) transcript_text = "\n\n".join( [ diff --git a/src/yt2doc/formatting/interfaces.py b/src/yt2doc/formatting/interfaces.py index d9921de..c2fde0a 100644 --- a/src/yt2doc/formatting/interfaces.py +++ b/src/yt2doc/formatting/interfaces.py @@ -13,7 +13,7 @@ class Sentence(BaseModel): class Chapter(BaseModel): title: str - text: str + paragraphs: typing.Sequence[typing.Sequence[Sentence]] class FormattedTranscript(BaseModel): @@ -29,7 +29,7 @@ class FormattedPlaylist(BaseModel): class IParagraphsSegmenter(typing.Protocol): def segment( self, transcription_segments: typing.Sequence[transcription_interfaces.Segment] - ) -> typing.Sequence[typing.Sequence[Sentence]]: ... + ) -> typing.List[typing.List[Sentence]]: ... class ILLMAdapter(typing.Protocol): @@ -44,7 +44,7 @@ def generate_title_for_paragraphs( class ITopicSegmenter(typing.Protocol): def segment( - self, paragraphs: typing.List[typing.List[str]] + self, sentences_in_paragraphs: typing.List[typing.List[Sentence]] ) -> typing.Sequence[Chapter]: ... diff --git a/src/yt2doc/formatting/llm_topic_segmenter.py b/src/yt2doc/formatting/llm_topic_segmenter.py index 9fb4827..aae78cf 100644 --- a/src/yt2doc/formatting/llm_topic_segmenter.py +++ b/src/yt2doc/formatting/llm_topic_segmenter.py @@ -12,19 +12,14 @@ class LLMTopicSegmenter: def __init__(self, llm_adapter: interfaces.ILLMAdapter) -> None: self.llm_adapter = llm_adapter - def _get_title_for_chapter(self, paragraphs: typing.List[typing.List[str]]) -> str: - truncated_paragraphs = [p[:10] for p in paragraphs] - return self.llm_adapter.generate_title_for_paragraphs( - paragraphs=truncated_paragraphs - ) - def segment( - self, paragraphs: typing.List[typing.List[str]] + self, + sentences_in_paragraphs: typing.List[typing.List[interfaces.Sentence]], ) -> typing.Sequence[interfaces.Chapter]: group_size = 8 grouped_paragraphs_with_overlap = [ - (i, paragraphs[i : i + group_size]) - for i in range(0, len(paragraphs), group_size - 1) + (i, sentences_in_paragraphs[i : i + group_size]) + for i in range(0, len(sentences_in_paragraphs), group_size - 1) ] logger.info( f"grouped_paragraphs_with_overlap: {grouped_paragraphs_with_overlap}" @@ -37,9 +32,13 @@ def segment( truncated_group_paragraphs = [ paragraph[:truncate_sentence_index] for paragraph in grouped_paragraphs ] + truncated_group_paragraphs_texts = [ + [sentence.text for sentence in paragraph] + for paragraph in truncated_group_paragraphs + ] paragraph_indexes = self.llm_adapter.get_topic_changing_paragraph_indexes( - paragraphs=truncated_group_paragraphs + paragraphs=truncated_group_paragraphs_texts, ) logger.info(f"paragraph indexes from LLM: {paragraph_indexes}") @@ -49,34 +48,46 @@ def segment( topic_changed_indexes += aligned_indexes if len(topic_changed_indexes) == 0: - paragraph_texts = ["".join(paragraph) for paragraph in paragraphs] - text = "\n\n".join(paragraph_texts) + truncated_paragraphs_in_chapter = [p[:10] for p in sentences_in_paragraphs] + truncated_paragraphs_texts = [ + [sentence.text for sentence in paragraph] + for paragraph in truncated_paragraphs_in_chapter + ] + title = self.llm_adapter.generate_title_for_paragraphs( + paragraphs=truncated_paragraphs_texts + ) return [ interfaces.Chapter( - title=self._get_title_for_chapter(paragraphs=paragraphs), - text=text, + title=title, + paragraphs=sentences_in_paragraphs, ) ] - chapter_paragraphs: typing.List[typing.List[typing.List[str]]] = [] - current_chapter_paragraphs: typing.List[typing.List[str]] = [] - for index, paragraph in enumerate(paragraphs): + paragraphs_in_chapters: typing.List[ + typing.List[typing.List[interfaces.Sentence]] + ] = [] + current_chapter_paragraphs: typing.List[typing.List[interfaces.Sentence]] = [] + for index, sentences_in_paragraph in enumerate(sentences_in_paragraphs): if index in topic_changed_indexes: - chapter_paragraphs.append(current_chapter_paragraphs) + paragraphs_in_chapters.append(current_chapter_paragraphs) current_chapter_paragraphs = [] - current_chapter_paragraphs.append(paragraph) - chapter_paragraphs.append(current_chapter_paragraphs) + current_chapter_paragraphs.append(sentences_in_paragraph) + paragraphs_in_chapters.append(current_chapter_paragraphs) + + chapters: typing.List[interfaces.Chapter] = [] + for paragraphs_in_chapter in tqdm( + paragraphs_in_chapters, desc="Generating titles for chapters" + ): + truncated_paragraphs_in_chapter = [p[:10] for p in paragraphs_in_chapter] + truncated_paragraphs_texts = [ + [sentence.text for sentence in paragraph] + for paragraph in truncated_paragraphs_in_chapter + ] + title = self.llm_adapter.generate_title_for_paragraphs( + paragraphs=truncated_paragraphs_texts + ) + chapters.append( + interfaces.Chapter(title=title, paragraphs=paragraphs_in_chapter) + ) - chapter_titles_and_texts: typing.List[typing.Tuple[str, str]] = [] - for chapter in tqdm(chapter_paragraphs, desc="Generating titles for chapters"): - paragraphs_: typing.List[str] = [] - for paragraph in chapter: - paragraph_text = "".join(paragraph) - paragraphs_.append(paragraph_text) - title = self._get_title_for_chapter(paragraphs=chapter) - chapter_titles_and_texts.append((title, "\n\n".join(paragraphs_))) - chapters = [ - interfaces.Chapter(title=title, text=text) - for title, text in chapter_titles_and_texts - ] return chapters diff --git a/src/yt2doc/formatting/paragraphs_segmenter.py b/src/yt2doc/formatting/paragraphs_segmenter.py index b190086..3f500b1 100644 --- a/src/yt2doc/formatting/paragraphs_segmenter.py +++ b/src/yt2doc/formatting/paragraphs_segmenter.py @@ -1,10 +1,13 @@ import typing +import logging from wtpsplit import SaT from yt2doc.formatting import interfaces from yt2doc.transcription import interfaces as transcription_interfaces +logger = logging.getLogger(__file__) + class ParagraphsSegmenter: def __init__(self, sat: SaT) -> None: @@ -12,61 +15,72 @@ def __init__(self, sat: SaT) -> None: def segment( self, transcription_segments: typing.Sequence[transcription_interfaces.Segment] - ) -> typing.Sequence[typing.Sequence[interfaces.Sentence]]: + ) -> typing.List[typing.List[interfaces.Sentence]]: # Get sentences from SaT full_text = "".join(s.text for s in transcription_segments) - paragraphed_texts = self.sat.split(full_text, do_paragraph_segmentation=True, verbose=True) + logger.info("Splitting text into paragraphs with Segment Any Text.") + paragraphed_texts = self.sat.split( + full_text, do_paragraph_segmentation=True, verbose=True + ) # Align timestamps segments_text = "".join(s.text for s in transcription_segments) segments_pos = 0 # Position in segments text curr_segment_idx = 0 # Current segment index curr_segment_offset = 0 # Position within current segment - + result_paragraphs = [] - + for paragraph in paragraphed_texts: result_sentences = [] - + for sentence in paragraph: # Find matching position for this sentence sentence_pos = 0 # Position in current sentence - + # Find start position start_segment_idx = curr_segment_idx - + # Match characters exactly including spaces while sentence_pos < len(sentence): if segments_pos >= len(segments_text): break - + # Match characters exactly if sentence[sentence_pos] == segments_text[segments_pos]: sentence_pos += 1 segments_pos += 1 curr_segment_offset += 1 # Update segment index if needed - while (curr_segment_idx < len(transcription_segments) - 1 and - curr_segment_offset >= len(transcription_segments[curr_segment_idx].text)): + while curr_segment_idx < len( + transcription_segments + ) - 1 and curr_segment_offset >= len( + transcription_segments[curr_segment_idx].text + ): curr_segment_offset = 0 curr_segment_idx += 1 else: # If no match, move forward in segments segments_pos += 1 curr_segment_offset += 1 - while (curr_segment_idx < len(transcription_segments) - 1 and - curr_segment_offset >= len(transcription_segments[curr_segment_idx].text)): + while curr_segment_idx < len( + transcription_segments + ) - 1 and curr_segment_offset >= len( + transcription_segments[curr_segment_idx].text + ): curr_segment_offset = 0 curr_segment_idx += 1 - + # Create sentence with aligned timestamp result_sentences.append( interfaces.Sentence( text=sentence, - start_second=transcription_segments[start_segment_idx].start_second + start_second=transcription_segments[ + start_segment_idx + ].start_second, ) ) - + result_paragraphs.append(result_sentences) - + return result_paragraphs diff --git a/tests/integ/formatting/test_formatter.py b/tests/integ/formatting/test_formatter.py index 0984f6b..8b3f42f 100644 --- a/tests/integ/formatting/test_formatter.py +++ b/tests/integ/formatting/test_formatter.py @@ -10,6 +10,7 @@ from src.yt2doc.formatting.formatter import MarkdownFormatter from src.yt2doc.formatting.llm_topic_segmenter import LLMTopicSegmenter from src.yt2doc.formatting.llm_adapter import LLMAdapter +from src.yt2doc.formatting.paragraphs_segmenter import ParagraphsSegmenter from src.yt2doc.extraction.interfaces import ChapteredTranscript, TranscriptChapter from src.yt2doc.transcription.interfaces import Segment @@ -93,10 +94,15 @@ def test_format_chaptered_transcript_basic( ) -> None: # Arrange sat = SaT("sat-3l") - formatter = MarkdownFormatter(sat=sat) + paragraphs_segmenter = ParagraphsSegmenter(sat=sat) + formatter = MarkdownFormatter(paragraphs_segmenter=paragraphs_segmenter) segments_dicts = [ - {"start_second": segment.start_second, "end_second": segment.end_second, "text": segment.text} + { + "start_second": segment.start_second, + "end_second": segment.end_second, + "text": segment.text, + } for segment in mock_transcript_segments ] @@ -154,11 +160,18 @@ def mock_generate_title_for_paragraphs( ) sat = SaT("sat-3l") + paragraphs_segmenter = ParagraphsSegmenter(sat=sat) segmenter = LLMTopicSegmenter(llm_adapter=mock_llm_adapter) - formatter = MarkdownFormatter(sat=sat, topic_segmenter=segmenter) + formatter = MarkdownFormatter( + paragraphs_segmenter=paragraphs_segmenter, topic_segmenter=segmenter + ) segments_dicts = [ - {"start_second": segment.start_second, "end_second": segment.end_second, "text": segment.text} + { + "start_second": segment.start_second, + "end_second": segment.end_second, + "text": segment.text, + } for segment in mock_transcript_segments ] diff --git a/tests/unit/formatting/test_paragraphs_segmenter.py b/tests/unit/formatting/test_paragraphs_segmenter.py index b04937b..2131e3b 100644 --- a/tests/unit/formatting/test_paragraphs_segmenter.py +++ b/tests/unit/formatting/test_paragraphs_segmenter.py @@ -12,12 +12,12 @@ def test_segment_aligns_timestamps_correctly() -> None: mock_sat.split.return_value = [ [ "Hello world! ", # First paragraph, first sentence - "This is a test. " # First paragraph, second sentence + "This is a test. ", # First paragraph, second sentence ], [ "Another paragraph here. ", # Second paragraph, single sentence "Only 0.1 percent people get it. ", - "And even more. " # Second paragraph, third sentence + "And even more. ", # Second paragraph, third sentence ], [ "This is a longer sentence that spans multiple segments and tests our handling of longer text blocks. " # Third paragraph @@ -25,15 +25,15 @@ def test_segment_aligns_timestamps_correctly() -> None: [ "Short text. ", # Fourth paragraph, first sentence "Followed by another. ", # Fourth paragraph, second sentence - "And one more for good measure. " # Fourth paragraph, third sentence + "And one more for good measure. ", # Fourth paragraph, third sentence ], [ "Final paragraph to conclude our test. ", # Fifth paragraph, first sentence "With some extra content. ", # Fifth paragraph, second sentence - "And a final closing statement. " # Fifth paragraph, third sentence - ] + "And a final closing statement. ", # Fifth paragraph, third sentence + ], ] - + # Create test transcription segments that split text differently segments = [ Segment(start_second=0.0, end_second=1.0, text=" Hello"), @@ -41,7 +41,9 @@ def test_segment_aligns_timestamps_correctly() -> None: Segment(start_second=2.0, end_second=3.0, text=" is a"), Segment(start_second=3.0, end_second=4.0, text=" test. Another"), Segment(start_second=4.0, end_second=5.0, text=" paragraph here. Only"), - Segment(start_second=5.0, end_second=6.0, text=" 0.1 percent people get it. And"), + Segment( + start_second=5.0, end_second=6.0, text=" 0.1 percent people get it. And" + ), Segment(start_second=6.0, end_second=7.0, text=" even more. This is a"), Segment(start_second=7.0, end_second=8.0, text=" longer sentence that spans"), Segment(start_second=8.0, end_second=9.0, text=" multiple segments and tests"), @@ -49,16 +51,18 @@ def test_segment_aligns_timestamps_correctly() -> None: Segment(start_second=10.0, end_second=11.0, text=" text blocks. Short"), Segment(start_second=11.0, end_second=12.0, text=" text. Followed by"), Segment(start_second=12.0, end_second=13.0, text=" another. And one"), - Segment(start_second=13.0, end_second=14.0, text=" more for good measure. Final"), + Segment( + start_second=13.0, end_second=14.0, text=" more for good measure. Final" + ), Segment(start_second=14.0, end_second=15.0, text=" paragraph to conclude our"), Segment(start_second=15.0, end_second=16.0, text=" test. With some extra"), Segment(start_second=16.0, end_second=17.0, text=" content. And a final"), - Segment(start_second=17.0, end_second=18.0, text=" closing statement.") + Segment(start_second=17.0, end_second=18.0, text=" closing statement."), ] - + segmenter = ParagraphsSegmenter(mock_sat) result = segmenter.segment(segments) - + # Verify structure: should be sequence of paragraphs containing sequences of sentences assert len(result) == 5 # Five paragraphs assert len(result[0]) == 2 # First paragraph has two sentences @@ -66,48 +70,51 @@ def test_segment_aligns_timestamps_correctly() -> None: assert len(result[2]) == 1 # Third paragraph has one sentence assert len(result[3]) == 3 # Fourth paragraph has three sentences assert len(result[4]) == 3 # Fifth paragraph has three sentences - + # Verify first paragraph assert result[0][0].text == "Hello world! " assert result[0][0].start_second == 0.0 - + assert result[0][1].text == "This is a test. " assert result[0][1].start_second == 1.0 - + # Verify second paragraph assert result[1][0].text == "Another paragraph here. " assert result[1][0].start_second == 3.0 - + assert result[1][1].text == "Only 0.1 percent people get it. " assert result[1][1].start_second == 4.0 - + assert result[1][2].text == "And even more. " assert result[1][2].start_second == 5.0 - + # Verify third paragraph (long sentence spanning multiple segments) - assert result[2][0].text == "This is a longer sentence that spans multiple segments and tests our handling of longer text blocks. " + assert ( + result[2][0].text + == "This is a longer sentence that spans multiple segments and tests our handling of longer text blocks. " + ) assert result[2][0].start_second == 6.0 - + # Verify fourth paragraph assert result[3][0].text == "Short text. " assert result[3][0].start_second == 10.0 - + assert result[3][1].text == "Followed by another. " assert result[3][1].start_second == 11.0 - + assert result[3][2].text == "And one more for good measure. " assert result[3][2].start_second == 12.0 - + # Verify fifth paragraph assert result[4][0].text == "Final paragraph to conclude our test. " assert result[4][0].start_second == 13.0 - + assert result[4][1].text == "With some extra content. " assert result[4][1].start_second == 15.0 - + assert result[4][2].text == "And a final closing statement. " assert result[4][2].start_second == 16.0 - + # Verify SaT was called correctly with complete text mock_sat.split.assert_called_once_with( " Hello world! This is a test. Another paragraph here. Only 0.1 percent people get it. " @@ -116,5 +123,5 @@ def test_segment_aligns_timestamps_correctly() -> None: "more for good measure. Final paragraph to conclude our test. With some extra " "content. And a final closing statement.", do_paragraph_segmentation=True, - verbose=True + verbose=True, )