Skip to content

Commit

Permalink
Neo4J ChunkProcessor implementation + Multicontainer docker env setup
Browse files Browse the repository at this point in the history
  • Loading branch information
hajdul88 committed Oct 24, 2024
1 parent 7807ec0 commit df1c86a
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 40 deletions.
22 changes: 10 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
# Use the official Python image from Docker Hub as the base image
FROM python:3.9-slim

# Set the working directory inside the container
WORKDIR /app

# Copy the pyproject.toml and poetry.lock to the working directory inside the container
COPY pyproject.toml poetry.lock /app/

# Install Poetry (a dependency manager for Python)
RUN pip install poetry

# Install the dependencies listed in pyproject.toml
RUN poetry install --no-dev

# Copy the rest of the application code to the container

COPY ./src /app/src
COPY ./tests /app/tests
COPY ./example_data /app/example_data
COPY README.md /app/

# Set the working directory to the src directory
WORKDIR /app/src

# Set environment variables (optional)
ENV NEO4J_URI=bolt://neo4j:7687
ENV NEO4J_USER=neo4j
ENV NEO4J_PASSWORD=test
ENV directory="../example_data"
ENV mode="size"
ENV chunk_size="300"
ENV overlap_size="20"
ENV txt_separator="\n\n"
ENV NEO4J_URI="bolt://neo4j:7687"
ENV NEO4J_USER="neo4j"
ENV NEO4J_PASSWORD="test1234"


# Define the command that runs your application when the container starts
CMD ["poetry", "run", "python", "run_pipeline.py"]
35 changes: 23 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,42 @@ services:
neo4j:
image: neo4j:latest
environment:
NEO4J_AUTH: "neo4j/test1234" # Default credentials for Neo4j
NEO4JLABS_PLUGINS: '["graph-data-science"]' # Install Graph Data Science plugin
NEO4J_dbms_security_procedures_unrestricted: "gds.*,apoc.*" # Unrestrict GDS and APOC procedures
NEO4J_AUTH: "neo4j/test1234"
NEO4JLABS_PLUGINS: '["graph-data-science"]'
NEO4J_dbms_security_procedures_unrestricted: "gds.*,apoc.*"
ports:
- "7474:7474" # Neo4j web interface
- "7687:7687" # Bolt protocol for database connections
- "7474:7474" # Web interface
- "7687:7687" # Bolt for app
volumes:
- neo4j_data:/data
healthcheck:
test: ["CMD", "bash", "-c", "echo > /dev/tcp/localhost/7687"]
interval: 20s
timeout: 20s
retries: 5

app:
build:
context: .
dockerfile: Dockerfile
environment:
NEO4J_URI: bolt://neo4j:7687
NEO4J_USER: neo4j
NEO4J_PASSWORD: test1234
directory: "../example_data"
mode: "size"
chunk_size: "300"
overlap_size: "20"
txt_separator: "\n\n"
NEO4J_URI: "bolt://neo4j:7687"
NEO4J_USER: "neo4j"
NEO4J_PASSWORD: "test1234"
depends_on:
- neo4j
neo4j:
condition: service_healthy
ports:
- "8000:8000"
volumes:
- ./src:/app/src # Mount the source code
- ./tests:/app/tests # Mount the tests directory
- ./src:/app/src
- ./tests:/app/tests

volumes:
neo4j_data: # Persistent volume for Neo4j
neo4j_data:

68 changes: 68 additions & 0 deletions src/chunk_to_network/neo4j_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from neo4j import GraphDatabase

class ChunkProcessor:
def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
self.neo4j_uri = neo4j_uri
self.neo4j_user = neo4j_user
self.neo4j_password = neo4j_password
self.driver = GraphDatabase.driver(self.neo4j_uri, auth=(self.neo4j_user, self.neo4j_password))

def close(self):
if self.driver:
self.driver.close()

def create_file_node(self, session, file_name):
query = """
MERGE (f:File {name: $file_name})
RETURN f
"""
session.run(query, file_name=file_name)

def create_chunk_node(self, session, chunk, index):
query = """
MERGE (c:Chunk {index: $index, text: $text})
RETURN c
"""
session.run(query, index=index, text=chunk)

def create_relationship_between_chunks(self, session, index1, index2):
query = """
MATCH (c1:Chunk {index: $index1}), (c2:Chunk {index: $index2})
MERGE (c1)-[:NEXT]->(c2)
"""
session.run(query, index1=index1, index2=index2)

def create_relationship_file_to_chunk(self, session, index1, index2):
query = """
MATCH (c1:File {name: $index1}), (c2:Chunk {index: $index2})
MERGE (c1)-[:CONTAINS]->(c2)
"""
session.run(query, index1=index1, index2=index2)

async def process_chunks(self, directory_reader):

previous_index = None
index = 1

with self.driver.session() as session:
async for file_name, chunk in directory_reader.read_files():
if chunk.strip():

self.create_chunk_node(session, chunk, index)

self.create_file_node(session, file_name)

self.create_relationship_file_to_chunk(session, file_name, index)

if previous_index is None or file_name != previous_file_name:
previous_index = None

if previous_index is not None:
self.create_relationship_between_chunks(session, previous_index, index)

previous_index = index
previous_file_name = file_name
index += 1

print("Chunk processing complete.")

5 changes: 2 additions & 3 deletions src/directory_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def read_files(self):
with optional overlap between chunks.
Yields:
str: The chunked content of each file.
Tuple[str, str]: The file and the actual chunk as strings
"""
if self.chunk_size <= self.overlap_size:
raise ValueError(f"The chunk size {self.chunk_size} is smaller than or equal to the overlapping size {self.overlap_size}.")
Expand All @@ -29,15 +29,14 @@ async def read_files(self):
reader = FileReaderFactory.get_reader(file_path)

# Choose the separator based on the file type

if file_name.endswith(".txt"):
separator = self.txt_separator
else:
separator = None # For unsupported file types, skip separator logic

# Read the file with the appropriate separator
async for chunk in reader.read(file_path, mode=self.mode, chunk_size=self.chunk_size, separator=separator, overlap_size=self.overlap_size):
yield chunk
yield file_name, chunk
except ValueError:
continue
# Skip unsupported files or errors
47 changes: 34 additions & 13 deletions src/run_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,51 @@
import os
import time
import asyncio
from directory_reader import DirectoryFileReader
from chunk_to_network import neo4j_processor as neo4j_ingestion


# This is only here for the local setup, otherwise its not needed.
def init():
"""
Initialize environment variables for all parameters.
"""
os.environ['directory'] = "../example_data"
os.environ['mode'] = "size"
os.environ['chunk_size'] = "300"
os.environ['overlap_size'] = "20"
os.environ['txt_separator'] = "\n\n"
os.environ['NEO4J_URI'] = "bolt://localhost:7687"
os.environ['NEO4J_USER'] = "neo4j"
os.environ['NEO4J_PASSWORD'] = "test1234"

async def main():
"""
Main function to run the DirectoryFileReader and output chunked content
of .txt and .pdf files.
"""

directory = "../example_data"
## PARAMETERS
directory = os.environ['directory']
mode = os.environ['mode']
chunk_size = int(os.environ['chunk_size'])
overlap_size = int(os.environ['overlap_size'])
txt_separator = os.environ['txt_separator']
NEO4J_URI = os.environ['NEO4J_URI']
NEO4J_USER = os.environ['NEO4J_USER']
NEO4J_PASSWORD = os.environ['NEO4J_PASSWORD']

mode = "separator"
overlap_size = 0
txt_separator = "\n\n" # Example separator for text files (line break separator)
directory_reader = DirectoryFileReader(directory, mode=mode, chunk_size=chunk_size, txt_separator=txt_separator, overlap_size=overlap_size)

directory_reader = DirectoryFileReader(directory, mode=mode, chunk_size=500, txt_separator=txt_separator, overlap_size=overlap_size)
chunk_processor = neo4j_ingestion.ChunkProcessor(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)

file_chunks = []
await chunk_processor.process_chunks(directory_reader)

async for chunk in directory_reader.read_files():
if chunk.strip():
file_chunks.append(chunk)
chunk_processor.close()

print("All file chunks stored in memory:")
for index, chunk in enumerate(file_chunks):
print(f"Chunk {index + 1}:\n{chunk}\n{'-' * 80}")

if __name__ == "__main__":
asyncio.run(main())

#init() # To run locally
asyncio.run(main())
time.sleep(20)

0 comments on commit df1c86a

Please sign in to comment.