Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes, + XML ingestion, global api default setting, anki validation #47

Merged
merged 18 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .gitignore
Binary file not shown.
2 changes: 1 addition & 1 deletion App_Function_Libraries/Audio/Audio_Transcription_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,4 @@ def save_audio_temp(audio_data, sample_rate=16000):

#
#
#######################################################################################################################
#######################################################################################################################
144 changes: 144 additions & 0 deletions App_Function_Libraries/Books/Book_Ingestion_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import zipfile
from datetime import datetime
import logging
import xml.etree.ElementTree as ET
import html2text
import csv
#
# External Imports
import ebooklib
Expand Down Expand Up @@ -241,6 +244,147 @@ def process_zip_file(zip_file,
return "\n".join(results)


def import_html(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an HTML file and converts it to markdown format.
"""
try:
logging.info(f"Importing HTML file from {file_path}")
h = html2text.HTML2Text()
h.ignore_links = False

with open(file_path, 'r', encoding='utf-8') as file:
html_content = file.read()

markdown_content = h.handle(html_content)

# Extract title from HTML if not provided
if not title:
soup = BeautifulSoup(html_content, 'html.parser')
title_tag = soup.find('title')
title = title_tag.string if title_tag else os.path.basename(file_path)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing HTML file: {str(e)}")
raise


def import_xml(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an XML file and converts it to markdown format.
"""
try:
logging.info(f"Importing XML file from {file_path}")
tree = ET.parse(file_path)
root = tree.getroot()

# Convert XML to markdown
markdown_content = xml_to_markdown(root)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing XML file: {str(e)}")
raise


def import_opml(file_path, title=None, author=None, keywords=None, **kwargs):
"""
Imports an OPML file and converts it to markdown format.
"""
try:
logging.info(f"Importing OPML file from {file_path}")
tree = ET.parse(file_path)
root = tree.getroot()

# Extract title from OPML if not provided
if not title:
title_elem = root.find(".//title")
title = title_elem.text if title_elem is not None else os.path.basename(file_path)

# Convert OPML to markdown
markdown_content = opml_to_markdown(root)

return process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs)

except Exception as e:
logging.exception(f"Error importing OPML file: {str(e)}")
raise


def xml_to_markdown(element, level=0):
"""
Recursively converts XML elements to markdown format.
"""
markdown = ""

# Add element name as heading
if level > 0:
markdown += f"{'#' * min(level, 6)} {element.tag}\n\n"

# Add element text if it exists
if element.text and element.text.strip():
markdown += f"{element.text.strip()}\n\n"

# Process child elements
for child in element:
markdown += xml_to_markdown(child, level + 1)

return markdown


def opml_to_markdown(root):
"""
Converts OPML structure to markdown format.
"""
markdown = "# Table of Contents\n\n"

def process_outline(outline, level=0):
result = ""
for item in outline.findall("outline"):
text = item.get("text", "")
result += f"{' ' * level}- {text}\n"
result += process_outline(item, level + 1)
return result

body = root.find(".//body")
if body is not None:
markdown += process_outline(body)

return markdown


def process_markdown_content(markdown_content, file_path, title, author, keywords, **kwargs):
"""
Processes markdown content and adds it to the database.
"""
info_dict = {
'title': title or os.path.basename(file_path),
'uploader': author or "Unknown",
'ingestion_date': datetime.now().strftime('%Y-%m-%d')
}

# Create segments (you may want to adjust the chunking method)
segments = [{'Text': markdown_content}]

# Add to database
result = add_media_to_database(
url=file_path,
info_dict=info_dict,
segments=segments,
summary=kwargs.get('summary', "No summary provided"),
keywords=keywords.split(',') if keywords else [],
custom_prompt_input=kwargs.get('custom_prompt'),
whisper_model="Imported",
media_type="document",
overwrite=False
)

return f"Document '{title}' imported successfully. Database result: {result}"


def import_file_handler(file,
title,
author,
Expand Down
147 changes: 146 additions & 1 deletion App_Function_Libraries/Chunk_Lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import xml.etree.ElementTree as ET
#
# Import 3rd party
from openai import OpenAI
Expand All @@ -23,7 +24,6 @@
from sklearn.metrics.pairwise import cosine_similarity
#
# Import Local
from App_Function_Libraries.Tokenization_Methods_Lib import openai_tokenize
from App_Function_Libraries.Utils.Utils import load_comprehensive_config
#
#######################################################################################################################
Expand Down Expand Up @@ -943,6 +943,151 @@ def chunk_ebook_by_chapters(text: str, chunk_options: Dict[str, Any]) -> List[Di
#
# End of ebook chapter chunking
#######################################################################################################################
#
# XML Chunking

def extract_xml_structure(element, path=""):
"""
Recursively extract XML structure and content.
Returns a list of (path, text) tuples.
"""
results = []
current_path = f"{path}/{element.tag}" if path else element.tag

# Get direct text content
if element.text and element.text.strip():
results.append((current_path, element.text.strip()))

# Process attributes if any
if element.attrib:
for key, value in element.attrib.items():
results.append((f"{current_path}/@{key}", value))

# Process child elements
for child in element:
results.extend(extract_xml_structure(child, current_path))

return results


def chunk_xml(xml_text: str, chunk_options: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Enhanced XML chunking that preserves structure and hierarchy.
Processes XML content into chunks while maintaining structural context.

Args:
xml_text (str): The XML content as a string
chunk_options (Dict[str, Any]): Configuration options including:
- max_size (int): Maximum chunk size (default: 1000)
- overlap (int): Number of overlapping elements (default: 0)
- method (str): Chunking method (default: 'xml')
- language (str): Content language (default: 'english')

Returns:
List[Dict[str, Any]]: List of chunks, each containing:
- text: The chunk content
- metadata: Chunk metadata including XML paths and chunking info
"""
logging.debug("Starting XML chunking process...")

try:
# Parse XML content
root = ET.fromstring(xml_text)
chunks = []

# Get chunking parameters with defaults
max_size = chunk_options.get('max_size', 1000)
overlap = chunk_options.get('overlap', 0)
language = chunk_options.get('language', 'english')

logging.debug(f"Chunking parameters - max_size: {max_size}, overlap: {overlap}, language: {language}")

# Extract full structure with hierarchy
xml_content = extract_xml_structure(root)
logging.debug(f"Extracted {len(xml_content)} XML elements")

# Initialize chunking variables
current_chunk = []
current_size = 0
chunk_count = 0

# Process XML content into chunks
for path, content in xml_content:
# Calculate content size (by words)
content_size = len(content.split())

# Check if adding this content would exceed max_size
if current_size + content_size > max_size and current_chunk:
# Create chunk from current content
chunk_text = '\n'.join(f"{p}: {c}" for p, c in current_chunk)
chunk_count += 1

# Create chunk with metadata
chunks.append({
'text': chunk_text,
'metadata': {
'paths': [p for p, _ in current_chunk],
'chunk_method': 'xml',
'chunk_index': chunk_count,
'max_size': max_size,
'overlap': overlap,
'language': language,
'root_tag': root.tag,
'xml_attributes': dict(root.attrib)
}
})

# Handle overlap if specified
if overlap > 0:
# Keep last few items for overlap
overlap_items = current_chunk[-overlap:]
current_chunk = overlap_items
current_size = sum(len(c.split()) for _, c in overlap_items)
logging.debug(f"Created overlap chunk with {len(overlap_items)} items")
else:
current_chunk = []
current_size = 0

# Add current content to chunk
current_chunk.append((path, content))
current_size += content_size

# Process final chunk if content remains
if current_chunk:
chunk_text = '\n'.join(f"{p}: {c}" for p, c in current_chunk)
chunk_count += 1

chunks.append({
'text': chunk_text,
'metadata': {
'paths': [p for p, _ in current_chunk],
'chunk_method': 'xml',
'chunk_index': chunk_count,
'max_size': max_size,
'overlap': overlap,
'language': language,
'root_tag': root.tag,
'xml_attributes': dict(root.attrib)
}
})

# Update total chunks count in metadata
for chunk in chunks:
chunk['metadata']['total_chunks'] = chunk_count

logging.debug(f"XML chunking complete. Created {len(chunks)} chunks")
return chunks

except ET.ParseError as e:
logging.error(f"XML parsing error: {str(e)}")
raise
except Exception as e:
logging.error(f"Unexpected error during XML chunking: {str(e)}")
raise

#
# End of XML Chunking
#######################################################################################################################

#######################################################################################################################
#
Expand Down
Loading
Loading