Skip to content

Commit

Permalink
Added ingestion pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
SawyerCzupka committed Jan 29, 2025
1 parent 5f82f42 commit c831115
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 1 deletion.
18 changes: 18 additions & 0 deletions ml-api/src/ml_api/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pydantic_settings import BaseSettings
from pathlib import Path


class Settings(BaseSettings):
Expand All @@ -20,5 +21,22 @@ class Settings(BaseSettings):
RETRIEVAL_TOP_K: int = 20
RETRIEVAL_MMR_THRESHOLD: float = 0.7

# RAG Config
CHUNK_SIZE: int = 512
CHUNK_OVERLAP: int = 64

# Ingestion
DATA_BASE_DIR: Path = Path(
"/scope/scope-data/gef/output"
) # then /{project_id}/{document_id}.{extension}
INGEST_BATCH_SIZE: int = 10
# Single file
READER_NUM_WORKERS_SINGLE: int = 4
PIPELINE_NUM_WORKERS_SINGLE: int = 1

# Multi-File
READER_NUM_WORKERS_BATCH: int = 4
PIPELINE_NUM_WORKERS_BATCH: int = 4


settings = Settings()
153 changes: 153 additions & 0 deletions ml-api/src/ml_api/ingestion/gef_documents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os
import re
from enum import Enum
from typing import List


class DocumentType(str, Enum):
CEO_ENDORSEMENT = "CEO Endorsement"
REVIEW_SHEET_CEO_ENDORSEMENT = "Review Sheet for CEO Endorsement"
PROJECT_IMPLEMENTATION_REPORT = "Project Implementation Report (PIR)"
PROJECT_IDENTIFICATION_FORM = "Project Identification Form (PIF)"
REVIEW_SHEET_PIF = "Review Sheet for PIF"
STAP_REVIEW = "STAP Review"
AGENCY_PROJECT_DOCUMENT = "Agency Project Document"
FSP_PIF_DOCUMENT = "FSP PIF Document"
FSP_CEO_ENDORSEMENT_DOCUMENT = "FSP CEO Endorsement Document"
MSP_CEO_APPROVAL_DOCUMENT = "MSP CEO Approval Document"
MSP_PIF_DOCUMENT = "MSP PIF Document"
CEO_PIF_CLEARANCE_LETTER = "CEO PIF Clearance Letter"
CEO_ENDORSEMENT_LETTER = "CEO Endorsement Letter"
PPG_APPROVAL_LETTER = "PPG Approval Letter"
MIDTERM_REVIEW = "Midterm Review (MTR)"
CHILD_FSP_CEO_ENDORSEMENT_DOCUMENT = "Child FSP CEO Endorsement Document"
COUNCIL_NOTIFICATION_LETTER = "Council Notification Letter"
PPG_DOCUMENT = "PPG Document"
ANNEXES_APPENDIXES = "Annexes/Appendixes to Project Documents"
AGENCY_RESPONSE_MATRIX = "Agency Response Matrix"
UNKNOWN = "Unknown"


def extract_and_identify_filename(filename):
original_filename = extract_original_filename(filename)
return identify_document_type(original_filename)


def select_doc_types_for_project(project_id, data_dir: str = "../data/gef-7/"):
return select_document_types(get_doc_types_for_project(project_id, data_dir))


def get_doc_types_for_project(project_id, data_dir):
doc_types: List[DocumentType] = []
for filename in os.listdir(data_dir + str(project_id)):
doc_type = extract_and_identify_filename(filename)
doc_types += [doc_type]
return list(set(doc_types))


def select_document_types(available_types: List[DocumentType]) -> List[DocumentType]:
selected_types = []

# Priority order based on the email
priority_order = [
# DocumentType.TERMINAL_EVALUATION,
DocumentType.MIDTERM_REVIEW,
DocumentType.PROJECT_IMPLEMENTATION_REPORT,
DocumentType.CEO_ENDORSEMENT,
]

# First, check for the highest priority document available
for doc_type in priority_order:
if doc_type in available_types:
selected_types.append(doc_type)
break

# If we selected a PIR, also include CEO Endorsement if available
if (
selected_types
and selected_types[0] == DocumentType.PROJECT_IMPLEMENTATION_REPORT
):
if DocumentType.CEO_ENDORSEMENT in available_types:
selected_types.append(DocumentType.CEO_ENDORSEMENT)

# If we only have CEO Endorsement, include it
if not selected_types and DocumentType.CEO_ENDORSEMENT in available_types:
selected_types.append(DocumentType.CEO_ENDORSEMENT)

# Include any other available monitoring or evaluation reports
for doc_type in priority_order:
if doc_type in available_types and doc_type not in selected_types:
selected_types.append(doc_type)

return selected_types


def extract_original_filename(filename):
"""
Extract the original name of the file as downloaded from GEF.
This function uses a regular expression to match a specific pattern in the filename.
The pattern is expected to be in the format "p<digits>_doc<digits>__<original_filename>".
If the filename matches this pattern, the function extracts and returns the original filename.
If the filename does not match the pattern, the function returns the original filename as is.
Args:
filename (str): The name of the file to be processed.
Returns:
str: The extracted original filename if the pattern matches, otherwise the input filename.
"""
"""Extract the original name of the file as downloaded from GEF"""

pattern = (
r"^p\d+_doc\d+__(.+)$" # captures characters after the last double underscore
)
match = re.match(pattern, filename)

if match:
return match.group(1)
else:
return filename


def identify_document_type(filename: str) -> DocumentType:
patterns = [
(r"_CEOEndorsement\.pdf$", DocumentType.CEO_ENDORSEMENT),
(
r"_ReviewSheet_CEOEndorsement\.pdf$",
DocumentType.REVIEW_SHEET_CEO_ENDORSEMENT,
),
(
r"^ProjectImplementationReportPIR_",
DocumentType.PROJECT_IMPLEMENTATION_REPORT,
),
(r"_PIF\.pdf$", DocumentType.PROJECT_IDENTIFICATION_FORM),
(r"_ReviewSheet_PIF\.pdf$", DocumentType.REVIEW_SHEET_PIF),
(r"^STAPreview_|_STAPReview\.pdf$", DocumentType.STAP_REVIEW),
(r"^Agencyprojectdocument_", DocumentType.AGENCY_PROJECT_DOCUMENT),
(r"^FSPPIFdocument_", DocumentType.FSP_PIF_DOCUMENT),
(r"^FSPCEOEndorsementdocument_", DocumentType.FSP_CEO_ENDORSEMENT_DOCUMENT),
(r"^MSPCEOApprovaldocument_", DocumentType.MSP_CEO_APPROVAL_DOCUMENT),
(r"^MSPPIFdocument_", DocumentType.MSP_PIF_DOCUMENT),
(r"^CEOPIFClearanceLetter_", DocumentType.CEO_PIF_CLEARANCE_LETTER),
(r"^CEOEndorsementLetter_", DocumentType.CEO_ENDORSEMENT_LETTER),
(r"^PPGApprovalLetter_", DocumentType.PPG_APPROVAL_LETTER),
(r"^MidtermReviewMTR_", DocumentType.MIDTERM_REVIEW),
(
r"^ChildFSPCEOEndorsementdocument_",
DocumentType.CHILD_FSP_CEO_ENDORSEMENT_DOCUMENT,
),
(
r"^CouncilNotificationLetterof(ChildProjectunderaProgramforreview|CEOEndorsementofaFSP)_",
DocumentType.COUNCIL_NOTIFICATION_LETTER,
),
(r"^PPGdocument_", DocumentType.PPG_DOCUMENT),
(r"^Annexesappendixestotheprojectdocuments_", DocumentType.ANNEXES_APPENDIXES),
(r"^Agencyresponsematrix_", DocumentType.AGENCY_RESPONSE_MATRIX),
]

for pattern, doc_type in patterns:
if re.search(pattern, filename):
return doc_type

return DocumentType.UNKNOWN
129 changes: 128 additions & 1 deletion ml-api/src/ml_api/ingestion/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,131 @@
This includes parsing the raw document to text, chunking the text with metadata,
and indexing the chunks into the Qdrant database to be used in inference.
"""
"""

import logging
from pathlib import Path

from llama_index.core.readers import SimpleDirectoryReader
from llama_index.core.vector_stores.types import BasePydanticVectorStore
from ml_api.config import settings
from ml_api.utils.qdrant import get_qdrant_vector_store

from .metadata import file_metadata
from .pipeline import get_pipeline

logger = logging.getLogger(__name__)


class IngestionService:
"""Service to ingest data into the Qdrant database and manage state."""

def __init__(self, vector_store: BasePydanticVectorStore | None = None):
self.vector_store = (
get_qdrant_vector_store() if vector_store is None else vector_store
)
self.data_base_dir = settings.DATA_BASE_DIR
self.pipeline = get_pipeline()

def ingest_single_file(self, file_path: Path) -> bool:
"""Ingest a specific file into the qdrant database.
Args:
file_path (Path): the path to the file to ingest
"""

try:
# Try to ingest the file
reader = SimpleDirectoryReader(
input_files=[file_path], file_metadata=file_metadata
)

docs = reader.load_data(num_workers=settings.READER_NUM_WORKERS_SINGLE)
logger.info(f"Loaded {len(docs)} documents from {file_path}")

# Pipeline includes embeddings and vector db, so this is all we need to run
processed_nodes = self.pipeline.run(
show_progress=True,
documents=docs,
num_workers=settings.PIPELINE_NUM_WORKERS_SINGLE,
)

logger.info(
f"Processed & ingested {len(processed_nodes)} nodes from {file_path}"
)
return True

except Exception as e:
logger.error(f"Failed to ingest file {file_path}: {e}")
return False

def ingest_files(self, file_paths: list[Path]):
"""
Ingests a group of files into the vector database. Expects a pre-batched list of file paths.
Args:
file_paths (list[Path]): A list of file paths to be ingested.
Raises:
Exception: If any error occurs during the ingestion process, it will be logged.
"""
try:
reader = SimpleDirectoryReader(
input_files=file_paths, file_metadata=file_metadata
)

docs = reader.load_data(num_workers=settings.READER_NUM_WORKERS_BATCH)
logger.info(f"Loaded {len(docs)} documents from {len(file_paths)} files.")

processed_nodes = self.pipeline.run(
show_progress=True,
documents=docs,
num_workers=settings.PIPELINE_NUM_WORKERS_BATCH,
)
logger.info(
f"Processed & ingested {len(processed_nodes)} nodes from {len(file_paths)} files."
)

except Exception as e:
logger.error(f"Batch ingestion failed: {e}", exc_info=True)

def _batch_files(
self, files: list[Path], batch_size: int = settings.INGEST_BATCH_SIZE
):
"""Batch files into groups of a certain size.
Args:
files (list[Path]): The list of files to be batched.
batch_size (int, optional): The size of each batch. Defaults to settings.INGEST_BATCH_SIZE.
Yields:
list[Path]: A batch of files.
"""
for i in range(0, len(files), batch_size):
yield files[i : i + batch_size]

def ingest_directory(self, directory: Path = settings.DATA_BASE_DIR):
"""Ingest all files in a directory.
TODO: Make this check the database for existing files and only ingest new ones.
"""

all_files = list(
directory.glob("**/*")
) # Gives all files in the directory and subdirectories

if not all_files:
logger.warning(
f"No files found in directory {directory}, aborting ingestion."
)
return

batches = list(self._batch_files(all_files))

num_processed_files = 0
for i, batch in enumerate(batches):
self.ingest_files(batch)
num_processed_files += len(batch)
logger.info(
f"Progress: Batch #{i} | {num_processed_files}/{len(all_files)} files processed."
)
37 changes: 37 additions & 0 deletions ml-api/src/ml_api/ingestion/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import os
from .gef_documents import identify_document_type

logger = logging.getLogger(__name__)


def file_metadata(filename: str) -> dict[str, str]:
"""
Extracts metadata from the given filename.
"""
logger.debug("Extracting metadata from filename: %s", filename)
base_name = os.path.basename(filename)
project_id, doc_id = parse_filename(base_name)

doc_type = identify_document_type(base_name)

return {
"filename": base_name,
"extension": os.path.splitext(base_name)[1],
"project_id": project_id,
"doc_id": doc_id,
"doc_type": doc_type,
}


def parse_filename(filename: str) -> tuple[str, str]:
"""
Parses the filename to extract project and document IDs.
"""
parts = filename.split("_")
project_id = parts[0][1:]
doc_id = parts[1].split(".")[0][3:]
logger.debug(
"Parsed filename %s into project_id=%s, doc_id=%s", filename, project_id, doc_id
)
return project_id, doc_id
25 changes: 25 additions & 0 deletions ml-api/src/ml_api/ingestion/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import TransformComponent

from ml_api.config import settings
from ml_api.utils.qdrant import get_qdrant_vector_store
from ml_api.utils.embeddings import get_embed_model


def get_pipeline():
qdrant = get_qdrant_vector_store()
embed_model = get_embed_model()

transformations: list[TransformComponent] = [
SentenceSplitter(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP,
include_metadata=True,
),
embed_model,
]

pipeline = IngestionPipeline(transformations=transformations, vector_store=qdrant)

return pipeline

0 comments on commit c831115

Please sign in to comment.