-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_pipeline.py
199 lines (154 loc) · 6.14 KB
/
data_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import adalflow as adal
from adalflow.core.types import ModelClientType
from adalflow.components.data_process import TextSplitter, ToEmbeddings
import os
import subprocess
from adalflow.utils import get_adalflow_default_root_path
from config import configs
# TODO: fix the delay in the data pipeline, chunk_size and chunk_overlap
# def get_data_transformer():
# # batch_size = 100
# # splitter_config = {"split_by": "word", "chunk_size": 500, "chunk_overlap": 100}
# splitter = TextSplitter(**configs["text_splitter"])
# embedder = adal.Embedder(
# model_client=ModelClientType.OPENAI(),
# model_kwargs=configs["embedder"]["model_kwargs"],
# )
# embedder_transformer = ToEmbeddings(
# embedder, batch_size=configs["embedder"]["batch_size"]
# )
# data_transformer = adal.Sequential(splitter, embedder_transformer)
# return data_transformer
def prepare_data_pipeline():
splitter = TextSplitter(**configs["text_splitter"])
embedder = adal.Embedder(
model_client=configs["embedder"]["model_client"](),
model_kwargs=configs["embedder"]["model_kwargs"],
)
embedder_transformer = ToEmbeddings(
embedder=embedder, batch_size=configs["embedder"]["batch_size"]
)
data_transformer = adal.Sequential(
splitter, embedder_transformer
) # sequential will chain together splitter and embedder
return data_transformer
def download_github_repo(repo_url, local_path):
"""
Downloads a GitHub repository to a specified local path.
Args:
repo_url (str): The URL of the GitHub repository to clone.
local_path (str): The local directory where the repository will be cloned.
Returns:
str: The output message from the `git` command.
"""
try:
# Check if Git is installed
print(f"local_path: {local_path}")
subprocess.run(
["git", "--version"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Ensure the local path exists
os.makedirs(local_path, exist_ok=True)
# Clone the repository
result = subprocess.run(
["git", "clone", repo_url, local_path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.stdout.decode("utf-8")
except subprocess.CalledProcessError as e:
return f"Error during cloning: {e.stderr.decode('utf-8')}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def read_all_documents(path: str):
"""
Recursively reads all documents in a directory and its subdirectories.
Args:
path (str): The root directory path.
Returns:
list: A list of strings, where each string is the content of a file.
"""
documents = []
pathes = []
for root, _, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
try:
with open(file_path, "r", encoding="utf-8") as f:
documents.append(f.read())
pathes.append(file_path)
except Exception as e:
print(f"Error reading {file_path}: {e}")
return [
adal.Document(text=doc, meta_data={"title": path})
for doc, path in zip(documents, pathes)
]
from typing import List
def transform_documents_and_save_to_db(documents: List[adal.Document], db_path: str):
"""
Transforms a list of documents and saves them to a local database.
Args:
documents (list): A list of `Document` objects.
db_path (str): The path to the local database file.
"""
# Get the data transformer
data_transformer = prepare_data_pipeline()
from adalflow.core.db import LocalDB
# Save the documents to a local database
db = LocalDB("microsoft_lomps")
db.register_transformer(transformer=data_transformer, key="split_and_embed")
db.load(documents)
db.transform(key="split_and_embed")
db.save_state(filepath=db_path)
def chat_with_adalflow_lib():
"""
(1) Download repo: https://github.com/SylphAI-Inc/AdalFlow
(2) Read all documents in the repo
(3) Transform the documents using the data pipeline
(4) Save the transformed documents to a local database
"""
# Download the repository
repo_url = "https://github.com/SylphAI-Inc/AdalFlow"
local_path = os.path.join(get_adalflow_default_root_path(), "AdalFlow")
download_github_repo(repo_url, local_path)
# Read all documents in the repository
documents = read_all_documents(local_path)
# Transform the documents using the data pipeline
db_path = os.path.join(get_adalflow_default_root_path(), "db_adalflow")
transform_documents_and_save_to_db(documents, db_path)
from adalflow.components.retriever.faiss_retriever import FAISSRetriever
from adalflow.core.db import LocalDB
if __name__ == "__main__":
from adalflow.utils import get_logger
adal.setup_env()
chat_with_adalflow_lib()
# # get_logger()
# repo_url = "https://github.com/microsoft/LMOps"
# from adalflow.utils import get_adalflow_default_root_path
# local_path = os.path.join(get_adalflow_default_root_path(), "LMOps")
# # download_github_repo(repo_url, local_path)
# target_path = os.path.join(local_path, "prompt_optimization")
# documents = read_all_documents(target_path)
# print(len(documents))
# print(documents[0])
# # transformed_documents = prepare_data_pipeline()(documents[0:2])
# # print(len(transformed_documents))
# # print(transformed_documents[0])
# # save to local db
# # from adalflow.core.db import LocalDB
# db = LocalDB("microsft_lomps")
# key = "split_and_embed"
# print(prepare_data_pipeline())
# db.register_transformer(transformer=prepare_data_pipeline(), key=key)
# db.load(documents)
# db.transform(key=key)
# transformed_docs = db.transformed_items[key]
# print(len(transformed_docs))
# print(transformed_docs[0])
# db_path = os.path.join(get_adalflow_default_root_path(), "db_microsft_lomps")
# db.save_state(filepath=db_path)
# db = load_db(db_path)