-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformers.py
120 lines (96 loc) · 4.53 KB
/
transformers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from typing import Iterator
from numpy import array, average
import openai
import pandas as pd
import numpy as np
from config import TEXT_EMBEDDING_CHUNK_SIZE, EMBEDDINGS_MODEL, OPENAI_API_KEY
from database import load_vectors
openai.api_key = OPENAI_API_KEY
def get_col_average_from_list_of_lists(list_of_lists):
"""Return the average of each column in a list of lists."""
if len(list_of_lists) == 1:
return list_of_lists[0]
else:
list_of_lists_array = array(list_of_lists)
average_embedding = average(list_of_lists_array, axis=0)
return average_embedding.tolist()
# Create embeddings for a text using a tokenizer and an OpenAI engine
def create_embeddings_for_text(text, tokenizer):
"""Return a list of tuples (text_chunk, embedding) and an average embedding for a text."""
token_chunks = list(chunks(text, TEXT_EMBEDDING_CHUNK_SIZE, tokenizer))
text_chunks = [tokenizer.decode(chunk) for chunk in token_chunks]
embeddings_response = get_embeddings(text_chunks, EMBEDDINGS_MODEL)
embeddings = [embedding["embedding"] for embedding in embeddings_response]
text_embeddings = list(zip(text_chunks, embeddings))
average_embedding = get_col_average_from_list_of_lists(embeddings)
return (text_embeddings, average_embedding)
def get_embeddings(text_array, engine):
return openai.Engine(id=engine).embeddings(input=text_array)["data"]
# Split a text into smaller chunks of size n, preferably ending at the end of a sentence
def chunks(text, n, tokenizer):
tokens = tokenizer.encode(text)
"""Yield successive n-sized chunks from text."""
i = 0
while i < len(tokens):
# Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens
j = min(i + int(1.5 * n), len(tokens))
while j > i + int(0.5 * n):
# Decode the tokens and check for full stop or newline
chunk = tokenizer.decode(tokens[i:j])
if chunk.endswith(".") or chunk.endswith("\n"):
break
j -= 1
# If no end of sentence found, use n tokens as the chunk size
if j == i + int(0.5 * n):
j = min(i + n, len(tokens))
yield tokens[i:j]
i = j
def get_unique_id_for_file_chunk(filename, chunk_index):
return str(filename+"-!"+str(chunk_index))
def handle_file_string(file,tokenizer,redis_conn, text_embedding_field,index_name):
print("Handle file string called")
filename = file[0]
file_body_string = file[1]
# Clean up the file string by replacing newlines and double spaces and semi-colons
clean_file_body_string = file_body_string.replace(" ", " ").replace("\n", "; ").replace(';',' ')
#
# Add the filename to the text to embed
text_to_embed = "Filename is: {}; {}".format(
filename, clean_file_body_string)
print(text_to_embed)
# Create embeddings for the text
try:
text_embeddings, average_embedding = create_embeddings_for_text(
text_to_embed, tokenizer)
print("[handle_file_string] Created embedding for {}".format(filename))
except Exception as e:
print("[handle_file_string] Error creating embedding: {}".format(e))
# Get the vectors array of triples: file_chunk_id, embedding, metadata for each embedding
# Metadata is a dict with keys: filename, file_chunk_index
vectors = []
for i, (text_chunk, embedding) in enumerate(text_embeddings):
id = get_unique_id_for_file_chunk(filename, i)
vectors.append(({'id': id
, "vector": embedding, 'metadata': {"filename": filename
, "text_chunk": text_chunk
, "file_chunk_index": i}}))
try:
load_vectors(redis_conn, vectors,text_embedding_field)
except Exception as e:
print(f'Ran into a problem uploading to Redis: {e}')
# Make a class to generate batches for insertion
class BatchGenerator:
def __init__(self, batch_size: int = 10) -> None:
self.batch_size = batch_size
# Makes chunks out of an input DataFrame
def to_batches(self, df: pd.DataFrame) -> Iterator[pd.DataFrame]:
splits = self.splits_num(df.shape[0])
if splits <= 1:
yield df
else:
for chunk in np.array_split(df, splits):
yield chunk
# Determines how many chunks DataFrame contains
def splits_num(self, elements: int) -> int:
return round(elements / self.batch_size)
__call__ = to_batches