-
Notifications
You must be signed in to change notification settings - Fork 0
/
indexing_pipeline.py
90 lines (72 loc) · 3.19 KB
/
indexing_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
from pathlib import Path
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.core.pipeline import Pipeline
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from source.git_root_finder import GitRootFinder
from source.logger import LoggerMixin
from source.transcription_and_metadata_to_document import (
TranscriptionAndMetadataToDocument,
)
class IndexingPipeline(LoggerMixin):
"""
Initializes an indexing pipeline for processing and storing documents.
It has to be run once when all data is available. Afterward only run the GPNChatPipeline
This pipeline consists of the following components:
- TranscriptionAndMetadataToDocument: Converts transcription and metadata to documents.
- DocumentSplitter: Splits documents into smaller segments.
- SentenceTransformersDocumentEmbedder: Embeds the document segments using a pre-trained sentence transformer model.
- DocumentWriter: Writes the embedded documents to a Qdrant document store.
The components are connected in a sequence where the output of one is passed as input to the next.
The pipeline is visualized and saved as an image file "indexing_pipeline.png".
"""
def __init__(self):
super().__init__()
qdrant_document_store = QdrantDocumentStore(
location="http://localhost:6333",
recreate_index=True,
return_embedding=True,
wait_result_from_api=True,
embedding_dim=384,
index="gpn-chat",
use_sparse_embeddings=False,
sparse_idf=True,
)
self.pipeline = Pipeline()
self.pipeline.add_component(
instance=TranscriptionAndMetadataToDocument(), name="textfile_loader"
)
self.pipeline.add_component(
instance=DocumentSplitter(
split_by="sentence", split_length=5, split_overlap=2
),
name="splitter",
)
self.pipeline.add_component(
instance=SentenceTransformersDocumentEmbedder(
model="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
),
name="embedder",
)
self.pipeline.add_component(
name="writer", instance=DocumentWriter(qdrant_document_store)
)
self.pipeline.connect(sender="textfile_loader", receiver="splitter")
self.pipeline.connect(sender="splitter", receiver="embedder")
self.pipeline.connect(sender="embedder.documents", receiver="writer")
self.pipeline.draw(
Path(os.path.join(GitRootFinder.get(), "indexing_pipeline.png"))
)
def run(self) -> None:
"""
Runs the data processing pipeline.
:return: None
"""
data_directory = os.path.join(GitRootFinder.get(), "data")
self.pipeline.run({"textfile_loader": {"data_directory": data_directory}})
self.log.info("The indexing pipeline finished successfully")
if __name__ == "__main__":
indexing_pipeline = IndexingPipeline()
indexing_pipeline.run()