forked from codeananda/document_embedding_analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_.py
1251 lines (1071 loc) · 50.6 KB
/
main_.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import asyncio
import json
import os
import re
from collections import defaultdict
from copy import copy
from pathlib import Path
from pprint import pprint
from time import time
from typing import List, Dict, Any
from uuid import uuid4
import numpy as np
import requests
import tiktoken
import torch
from bs4 import BeautifulSoup, Comment
from doctran import Doctran, ExtractProperty
from dotenv import load_dotenv, find_dotenv
from evaluate import load
from langchain_community.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings # from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.text_splitter import (
RecursiveCharacterTextSplitter,
)
from loguru import logger
import openai #
from pdfminer.high_level import extract_text
from sklearn.metrics.pairwise import cosine_similarity
_ = load_dotenv(find_dotenv())
client = openai.ChatCompletion(api_key=os.getenv("OPENAI_API_KEY")) # client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
MAX_EMBEDDING_TOKEN_LENGTH = 512
OPENAI_EMBEDDING_MODEL_NAME = "text-embedding-ada-002"
HUGGINGFACE_EMBEDDING_MODEL_NAME = "nomic-embed-text-v1" # "e5-base-v2"
HUGGINGFACE_EMBEDDING_PATH = "nomic-ai/nomic-embed-text-v1" # "intfloat/e5-base-v2"
HUGGINGFACE_EMBEDDING_PREFIX = "" # e.g. e5-base-v2 might require "query: " to better match QA pairs
ALLOW_parallel_gen_embed_section_content = True
# Normalized by default
embed_OPENAI = OpenAIEmbeddings(model=OPENAI_EMBEDDING_MODEL_NAME, )
embed_HF = HuggingFaceEmbeddings(model_name=HUGGINGFACE_EMBEDDING_PATH, model_kwargs = {"device": "cuda" if torch.cuda.is_available() else "cpu", "trust_remote_code": True}, encode_kwargs={"normalize_embeddings": True})
def _num_tokens_from_string(string: str, encoding_name: str = "gpt-4o-mini") -> int:
"""Returns the number of tokens in a text string."""
try:
encoding = tiktoken.get_encoding(encoding_name)
except ValueError:
encoding = tiktoken.encoding_for_model(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens
def truncated_pprint(obj, N=5):
"""Pretty print an object, truncating lists and strings to N items/characters
for easier viewing of plan_json objects"""
def truncate(item, N):
if isinstance(item, list) and N is not None:
return item[:N] + (["..."] if len(item) > N else [])
if isinstance(item, str) and N is not None:
N = 125
return item[:N] + ("..." if len(item) > N else "")
return item
def trunc_recursive(item, N):
if isinstance(item, list):
return [trunc_recursive(i, N) for i in truncate(item, N)]
elif isinstance(item, dict):
return {k: trunc_recursive(v, N) for k, v in item.items()}
else:
return truncate(item, N)
truncated_obj = trunc_recursive(obj, N)
pprint(truncated_obj, sort_dicts=False)
def load_arxiv_paper(path: str | Path) -> Dict[str, str]:
"""Load an arXiv paper from disk to a dict with keys "Title", "Abstract", "Content",
and "References"."""
doc = Path(path)
# Extract all text from pdf
text = extract_text(doc)
title = doc.stem
title = title.replace("_", " ")
# The pattern searches for "abstract" followed by any content.
# Then, it looks for one of the potential following sections:
# "I. Introduction", "1. Introduction", or "Contents".
# We use a positive lookahead (?=...) to assert that the introduction or contents
# pattern exists, but we don't include it in the main match.
pattern = r"(?:abstract|this\s+paper)(.*?)(?=([i1][. ]+introduction|contents))"
# The re.DOTALL flag allows the . in the pattern to match newline characters,
match = re.search(pattern, text.lower(), re.DOTALL)
abstract = ""
abstract_end = 0
if match:
abstract_end = match.end()
abstract = match.group(1).strip()
# Extract references section
pattern = r"references\s*\n"
matches = [match for match in re.finditer(pattern, text.lower())]
ref_list = []
references = ""
reference_start = len(text)
if matches:
final_match = matches[-1]
reference_start = final_match.start()
references = text[reference_start:]
# Regex to extract reference number and description
ref_pattern = r"\[(\d+)\]\s+(.*?)(?=\n\s*\[\d+\]|$)"
# Find all matches
matches = re.findall(ref_pattern, references, re.DOTALL)
for ref_number, ref_description in matches:
ref_list.append({
"resource_id": int(ref_number),
"resource_description": ref_description.replace('\n', '').strip()
})
ref_list = sorted(ref_list, key=lambda x: x["resource_id"])
content = text[abstract_end:reference_start]
print("EXTRACTION OVERVIEW:\nTitle:"+title[:50].replace("\n","\\n")+"...\nAbstract:"+abstract[:50].replace("\n","\\n")+"...\nContent:"+content[:50].replace("\n","\\n")+"...\nReferences:"+references[:50].replace("\n","\\n")+"...")
article_dict = {
"title": title,
"abstract": abstract,
"content": content,
"references": ref_list,
}
return article_dict
def split_patents_into_individual_files(patents_file: str | Path) -> None:
"""Read in a file containing many patents. Split each patent into its own file, keeping
only the english parts, and write to disk."""
# Read in file
with open(patents_file, "r") as f:
lines = f.readlines()
# Get all eng sections
lines_en = [line for line in lines if "\ten\t" in line]
# Split each on TITLE and write to its own file with TITLE as filename
os.makedirs("data/patents", exist_ok=True)
title = "no title found"
# Create dict of patents
patents: defaultdict = defaultdict(str)
for i, x in enumerate(lines_en):
if "\tTITLE\t" in x:
title = x.split("\t")[-1].strip()
patents[title] += x
# Write each patent to its own file
for title, content in patents.items():
filename_friendly_title = "".join(i for i in title if i not in "\/:*?<>|")
with open(f"data/patents/{filename_friendly_title}.txt", "w") as f:
f.write(content)
logger.info(f"Wrote file: {filename_friendly_title}.txt")
def load_patent_file(patent_file: str | Path) -> Dict[str, str]:
"""Read in a patent file and return a dict with keys as section titles and values the content.
Parameters
----------
patent_file : str
Path to the patent file.
Returns
-------
patent_dict : dict
Dict with keys as section titles and values the content. Keys are ['title',
'descr', 'claim_1', 'claim_2', ..., 'claim_n', 'pdfep']. Not all patents
will have all keys. All will have 'title' at a minimum.
"""
logger.info(f"Loading patent file: {patent_file}")
# Read file
with open(patent_file, "r") as f:
lines: list = f.readlines()
# Get all english sections
lines_en: list = [line for line in lines if "\ten\t" in line]
# Convert into dict with keys as section titles and values the content
patent_dict = {}
total_claims = 1
for x in lines_en:
if "\tTITLE\t" in x:
patent_dict["title"] = x
elif "\tDESCR\t" in x:
patent_dict["descr"] = x
elif "\tCLAIM\t" in x:
# Some patents have multiple claims, so we need to number them
patent_dict[f"claim_{total_claims}"] = x
total_claims += 1
elif "\tPDFEP" in x:
patent_dict["pdfep"] = x
else:
raise ValueError(
f"Expected sections in [TITLE, DESCR, CLAIM, PDFEP]. Received: {x}"
)
logger.info(
f"Extracted {len(patent_dict)} sections from patent file named: "
f"{list(patent_dict.keys())}"
)
return patent_dict
def load_wikipedia_url(url: str) -> Dict[str, str]:
"""Extracts the content from a Wikipedia URL into a dictionary. The keys are the header
names + indicator of header level e.g. 'h2 Definitions'. The values are the content
underneath each header tags.
Only extracts content the user will see. Ignores hidden content and Contents list.
Parameters
----------
url : str
The URL of the Wikipedia page to extract content from.
Returns
-------
article_dict : dict
A dictionary of the content extracted from the Wikipedia page.
"""
if not "wikipedia" in url:
raise ValueError("URL is not a wikipedia URL. Received: " + url)
r = requests.get(url)
html_content = r.text
# Create a BeautifulSoup object
soup = BeautifulSoup(html_content, "html.parser")
# Remove unwanted tags: script, style, [document], head, title
for element in soup(["script", "style", "head", "title", "[document]"]):
element.decompose()
# Also remove HTML comments
for element in soup.find_all(string=lambda text: isinstance(text, Comment)):
element.extract()
# Define the header tags to find
tags = ["h1", "h2", "h3", "h4", "h5", "h6"]
found_tags = soup.find_all(tags)
# Extract tags and associated content into a dict
article_dict = {}
for tag in found_tags:
content = []
next_tag = tag.find_next()
# Look for next tags until the next header tag
while next_tag and next_tag.name not in tags:
# Reference section can contain both p and li tags
if "reference" in str(next_tag).lower() and next_tag.name in ["p", "li"]:
content.append(next_tag.get_text(strip=False))
elif next_tag.name == "p":
content.append(next_tag.get_text(strip=False))
next_tag = next_tag.find_next()
key = f"{tag.name} {tag.get_text(strip=True)}"
article_dict[key] = " ".join(content)
for key in list(
article_dict.keys()
): # Using list() to avoid changing the dictionary size during iteration
if key.endswith("[edit]"):
new_key = key.rsplit("[edit]", 1)[0]
article_dict[new_key] = article_dict.pop(key)
del article_dict["h2 Contents"] # TODO: check what is this specific case
num_sections = len(article_dict.keys())
logger.info(
f"Successfully downloaded content from Wikipedia page {url}. "
f"Extracted {num_sections} sections."
)
#print("EXTRACTION OVERVIEW:\nTitle:"+title[:50].replace("\n","\\n")+"...\nAbstract:"+abstract[:50].replace("\n","\\n")+"...\nContent:"+content[:50].replace("\n","\\n")+"...\nReferences:"+references[:50].replace("\n","\\n")+"...")
print("EXTRACTION OVERVIEW:")
# print for each key in article_dict, display key and frst 50 characters of value
for key in article_dict.keys():
print(f"{key}: {article_dict[key][:80]}...")
return article_dict
async def _extract_title(string: str) -> str:
"""Extract a title from `string` that is max 7 words long."""
doctran = Doctran(
openai_api_key=os.getenv("OPENAI_API_KEY"), openai_model="gpt-4o-mini"
)
document = doctran.parse(content=string)
properties = ExtractProperty(
name="title",
description="The title of the document (max 7 words).",
type="string",
required=True,
)
try:
document = document.extract(properties=[properties]).execute()
return document.transformed_content
except Exception as e:
logger.error(f"Error extracting title from string: {e}")
return "None"
async def divide_sections_if_too_large(
article_dict: Dict[str, str],
max_section_token_length: int = MAX_EMBEDDING_TOKEN_LENGTH,
doc_type: str = "patent",
) -> Dict[str, str]:
"""This function takes an existing dictionary containing the section heaadings and
content (from above functions), checks if any section is too large (i.e., more
than MAX_EMBEDDING_TOKEN_LENGTH tokens), divides such sections into smaller sections, generates a new
title, and returns the updated dictionary
"""
if doc_type not in ["patent", "wikipedia", "arxiv"]:
raise ValueError(
f"doc_type must be one of 'patent', 'wikipedia', or 'arxiv'. Got {doc_type}."
)
logger.info("Dividing sections if too large in plan and section content.")
final_dict: Dict = {}
start_dict = copy(article_dict)
def is_reference_section(heading: str):
"""Returns True if heading is a reference section."""
result = re.search(r"reference|further reading|see also|bibliography|external links", heading, re.IGNORECASE)
return result
for heading, content in start_dict.items():
content_length = len(content)
if content_length == 0:
final_dict[heading] = " "
elif content_length <= max_section_token_length: # characters length is always less than token length
final_dict[heading] = content
else:
num_tokens = _num_tokens_from_string(content)
logger.info(f"Content character length: {len(content)} tokens: {num_tokens} for '{heading}'")
# Each section must contain something, otherwise the embedding models fail
if num_tokens == 0:
final_dict[heading] = " "
# If the section is small enough, add it to the final dict
elif num_tokens <= max_section_token_length:
final_dict[heading] = content
# If section is too big, split into smaller sections, extract title, and add to final dict
else:
# Split
char_splitter = RecursiveCharacterTextSplitter(
chunk_size=max_section_token_length,
chunk_overlap=0,
# ' ' separator means sometimes sentences will be cut in two to ensure
# the chunk size is not exceeded
separators=["\n\n", "\n", ". ", ".", ", ", ",", " "],
length_function=_num_tokens_from_string,
)
splits: List[str] = char_splitter.split_text(content)
# Keep heading the same but add numbers to sections e.g. 'h2 Reference' -> 'h2 Reference 1'
# TODO - add a continue statement here?
if doc_type in ["wikipedia", "arxiv"] and is_reference_section(heading):
for i, split in enumerate(splits, start=1):
new_heading = f"{heading} {i}"
final_dict[new_heading] = split
logger.info(
f"Added '{new_heading}' split original heading '{heading}'"
)
else:
# Create new titles for each split
for split in splits:
# Headings are of the form h1, h2, h3 etc. we split it into more of the same level
if doc_type == "wikipedia":
heading_level = int(heading[1])
title = await _extract_title(split)
new_heading = f"h{heading_level} {title}"
# Heading levels aren't important for other doc_types
else:
new_heading = await _extract_title(split)
final_dict[new_heading] = split
logger.info(
f"Added '{new_heading}' split original heading '{heading}'"
)
n_keys_start = len(start_dict.keys())
n_keys_final = len(final_dict.keys())
logger.info(
f"\n\tFinished dividing sections if too large in plan and section content."
f"\n\tStarted with {n_keys_start} sections and got {n_keys_final} final sections."
f"\n\tThat's a {n_keys_final / n_keys_start:.2f}x increase in sections"
)
return final_dict
def _gen_embed_section_content(
heading: str, content: str, id: int = 1, total_sections: int = 1
) -> Dict[str, str | list[float]]:
"""Given a heading and content, returns a dictionary with the heading, content,
and embeddings of the heading and content.
Parameters
----------
heading : str
The heading of the section.
content : str
The content of the section.
id : int, optional
The id of the section, by default 1
total_sections : int, optional
The total number of sections, by default 1
"""
# Get resource indexes used in content
resources_ids = sorted(set([int(num) for num in re.findall(r"\[(\d+)\]", content)]))
section_json = {
"section_id": id,
"section": heading,
"content": content,
"section_embedding_1": embed_OPENAI.embed_query(heading),
"section_embedding_2": embed_HF.embed_query(HUGGINGFACE_EMBEDDING_PREFIX + heading),
"content_embedding_1": embed_OPENAI.embed_query(content),
"content_embedding_2": embed_HF.embed_query(HUGGINGFACE_EMBEDDING_PREFIX + content),
"resources_used": resources_ids
}
logger.info(
f"{id}/{total_sections} - created section + content embeddings for {heading} - SECTION: {heading[:50]}... CONTENT: {content[:50]}..."
)
return section_json
from typing import List, Dict
def _gen_embed_section_content_batch(
headings: List[str], contents: List[str], ids: List[int], total_sections: int
) -> List[Dict[str, str | List[float]]]:
"""Given lists of headings and contents, returns a list of dictionaries
with the headings, contents, and embeddings for each section.
Parameters
----------
headings : List[str]
The headings of the sections.
contents : List[str]
The contents of the sections.
ids : List[int]
The ids of the sections.
total_sections : int
The total number of sections.
"""
section_jsons = []
headings = [heading if isinstance(heading, str) else "" for heading in headings]
contents = [content if isinstance(content, str) else "" for content in contents]
heading_embeddings_1 = embed_OPENAI.embed_documents(headings)
content_embeddings_1 = embed_OPENAI.embed_documents(contents)
def HF_embed_split(contents, n): return sum([embed_HF.embed_documents([HUGGINGFACE_EMBEDDING_PREFIX + c for c in contents[i::n]]) for i in range(n)], [])
n = 1
while True:
try: heading_embeddings_2 = HF_embed_split(headings, n); break
except: n *= 2; print(f"Failed heading_embeddings_2 to HF embed content 1 batch of {len(contents)/(n/2)} trying with {n} splits")
n = 1
while True:
try: content_embeddings_2 = HF_embed_split(contents, n); break
except: n *= 2; print(f"Failed content_embeddings_2 to HF embed content 1 batch of {len(contents)/(n/2)} trying with {n} splits")
for id, heading, content, emb_h1, emb_h2, emb_c1, emb_c2 in zip(
ids, headings, contents, heading_embeddings_1, heading_embeddings_2, content_embeddings_1, content_embeddings_2):
# Get resource indexes used in content
resources_ids = sorted(set([int(num) for num in re.findall(r"\[(\d+)\]", content)]))
section_jsons.append({
"section_id": id,
"section": heading,
"content": content,
"section_embedding_1": emb_h1,
"section_embedding_2": emb_h2,
"content_embedding_1": emb_c1,
"content_embedding_2": emb_c2,
"resources_used": resources_ids
})
logger.info(
f"{id}/{total_sections} - created section + content embeddings for {heading} - SECTION: {heading[:50]}... CONTENT: {content[:50]}..."
)
return section_jsons
from concurrent.futures import ThreadPoolExecutor, as_completed
def _gen_embed_plan(plan: List[dict], i: int) -> List[float]:
"""Calculate plan embedding by averaging the section embeddings and content embeddings
sequentially.
Parameters
----------
plan : List[dict]
List of dictionaries, each containing the section and content embeddings.
i : int
The index of the embedding to use.
"""
try:
s1_mean = np.mean([x[f"section_embedding_{i}"] for x in plan], axis=0)
c1_mean = np.mean([x[f"content_embedding_{i}"] for x in plan], axis=0)
except KeyError:
raise KeyError(
f"Could not find section_embedding_{i} or content_embedding_{i} in "
f"every element in plan. Please check that every element has both of these "
f"keys."
)
total_mean = np.mean([c1_mean, s1_mean], axis=0)
total_mean = list(total_mean)
logger.info(f"Created plan embedding {i}")
return total_mean
def _parallel_gen_embed_section_content(headings, content, start_index, total_sections, initial_plan=None):
# Collect data for batch processing
headings_to_process = []
contents_to_process = []
indices = []
for i, (heading, content) in enumerate(zip(headings[start_index:], content[start_index:]), start=1):
headings_to_process.append(heading)
contents_to_process.append(content)
indices.append(i)
# Process embeddings in batch
plan = _gen_embed_section_content_batch(headings_to_process, contents_to_process, indices, total_sections)
# Merge dict data for each element of plan if initial_plan is provided
if initial_plan:
for i, (p, ip) in enumerate(zip(plan, initial_plan), start=1):
p.update(ip)
return plan
def generate_embeddings_plan_and_section_content(
article_dict: Dict, doc_type: str = "patent"
) -> Dict:
"""Given a dictionary of the article content, returns a dictionary with the title,
abstract, plan and associated embeddings.
"""
doc_type_error_msg = (
f"doc_type must be one of 'patent', 'wikipedia', or 'arxiv'. "
f"Received {doc_type}"
)
if doc_type not in ["patent", "wikipedia", "arxiv", "latex"]:
raise ValueError(doc_type_error_msg)
logger.info("Creating plan json")
if doc_type == "wikipedia":
headings = list(article_dict.keys())
content = list(article_dict.values())
# Wikipedia titles are of form 'h1 Example' so we remove the 'h1 '
title = headings[0][3:]
abstract = content[0]
total_sections = len(headings) - 1
start_index = 1
elif doc_type == "patent":
headings = list(article_dict.keys())
content = list(article_dict.values())
# Titles are separated by tabs, the last element is the actual title
title = content[0].split("\t")[-1].strip()
# Remove illegal characters from title (it's used as a filename)
title = "".join(i for i in title if i not in "\/:*?<>|")
try:
abstract = content[1]
except IndexError:
abstract = "no abstract"
total_sections = len(headings) - 2
start_index = 2
elif doc_type in ["arxiv"]:
# Filtered keys and values
key_to_exclude = "References"
headings = [key for key in article_dict.keys() if key != key_to_exclude]
content = [value for key, value in article_dict.items() if key != key_to_exclude]
# The first key/value pairs in arxiv dicts are {'Title': title, 'Abstract': abstract}
# so we take the first two elements of content
title = content[0]
try:
abstract = content[1]
except IndexError:
abstract = "no abstract"
total_sections = len(headings) - 2
start_index = 2
elif doc_type == "latex":
# Loop through article_dict["plan"] and extract headings and content
headings = [x["section"] for x in article_dict["plan"]]
content = [x["content"] for x in article_dict["plan"]]
title = article_dict["title"]
abstract = article_dict["abstract"]
start_index = 0
total_sections = len(headings)
else:
raise ValueError(doc_type_error_msg)
logger.info("Title: " + title)
logger.info("Abstract: " + abstract)
if ALLOW_parallel_gen_embed_section_content:
plan = _parallel_gen_embed_section_content(headings, content, start_index, total_sections, article_dict.get("plan", None))
else:
plan = [
_gen_embed_section_content(
heading, content, id=i, total_sections=total_sections
)
for i, (heading, content) in enumerate(
zip(headings[start_index:], content[start_index:]), start=1
)
]
# Get Resources
resources = []
try:
resources = article_dict.get("references", [])
if isinstance(resources, dict): # for Wikipedia
resources = list(resources.values())
for i in range(len(resources)):
resources[i]["resource_embedding"] = embed_OPENAI.embed_query(resources[i]["resource_description"])
except Exception as e:
logger.debug(f"Error occurred while processing resources {e}")
plan_embed_1 = _gen_embed_plan(plan, 1)
plan_embed_2 = _gen_embed_plan(plan, 2)
try:
plan_json = {
"id": str(uuid4()) if "id" not in article_dict else article_dict["id"],
"title": title,
"abstract": abstract,
"title_embedding_1": embed_OPENAI.embed_query(title),
"title_embedding_2": embed_HF.embed_query(HUGGINGFACE_EMBEDDING_PREFIX + title),
"abstract_embedding_1": embed_OPENAI.embed_query(abstract),
"abstract_embedding_2": embed_HF.embed_query(HUGGINGFACE_EMBEDDING_PREFIX + abstract),
"plan": plan,
"resources": resources,
"plan_embedding_1": plan_embed_1,
"plan_embedding_2": plan_embed_2,
"embedding1_model": OPENAI_EMBEDDING_MODEL_NAME,
"embedding2_model": HUGGINGFACE_EMBEDDING_MODEL_NAME,
"success": True,
"error": None,
}
except Exception as e:
plan_json = {
"id": str(uuid4()),
"title": title,
"abstract": abstract,
"title_embedding_1": None,
"title_embedding_2": None,
"abstract_embedding_1": None,
"abstract_embedding_2": None,
"plan": plan,
"resources": [],
"plan_embedding_1": None,
"plan_embedding_2": None,
"embedding1_model": OPENAI_EMBEDDING_MODEL_NAME,
"embedding2_model": HUGGINGFACE_EMBEDDING_MODEL_NAME,
"success": False,
"error": str(e),
}
logger.info("Finished creating plan json")
return plan_json
async def get_embeddings(
input: str | List[str], model: str = OPENAI_EMBEDDING_MODEL_NAME
) -> List[float] | List[List[float]]:
"""This function takes one string or a list of strings and a model name,
generates an embedding for each string in the list using the specified model,
and returns a list of embeddings, each represented as a list of floating point
numbers.
Parameters
----------
input : str | List[str]
The input string or list of strings to be embedded.
model : str, optional [OPENAI_EMBEDDING_MODEL_NAME, HUGGINGFACE_EMBEDDING_PATH]
The name of the model to be used for embedding.
"""
if model == OPENAI_EMBEDDING_MODEL_NAME:
embedder = embed_OPENAI # OpenAIEmbeddings(model=model)
elif model == HUGGINGFACE_EMBEDDING_MODEL_NAME:
embedder = embed_HF # HuggingFaceEmbeddings(model_name=HUGGINGFACE_EMBEDDING_PATH, model_kwargs = {"device": "cuda" if torch.cuda.is_available() else "cpu", "trust_remote_code": True}, encode_kwargs={"normalize_embeddings": True})
else:
raise ValueError(
f"Model name must be OPENAI_EMBEDDING_MODEL_NAME or HUGGINGFACE_EMBEDDING_MODEL_NAME. Received {model}"
)
if isinstance(input, str):
try:
return await embedder.aembed_query(input)
except NotImplementedError as e:
return embedder.embed_query(input)
elif isinstance(input, list):
try:
return await embedder.aembed_documents(input)
except NotImplementedError as e:
return embedder.embed_documents(input)
else:
raise ValueError(
f"Input must be a string or a list of strings. Received {type(input)}"
)
def _compare_documents(
document: str | Path | Dict[str, Any],
prediction: str | Path | Dict[str, Any],
compare_on: str = "section",
) -> Dict[str, Any]:
"""Compare the 'compare_on' sections of document and prediction. Calculate MAUVE,
and ROUGE-L scores on the actual text, and cosine similarity on the embeddings.
Parameters
----------
document : Dict[str, Any]
Dictionary containing the grouth truth document. Must contain the keys
'plan' and 'id'.
prediction : Dict[str, Any]
Dictionary containing the prediction to compare against. Must contain the keys
'plan' and 'id'.
compare_on : str, optional ['section', 'content']
Whether to compare on the 'section' level i.e. the plan of the document, or
the 'content' level.
"""
if compare_on not in ["section", "content"]:
raise ValueError(
f"`compare_on` must be 'section' or 'content'. Received {compare_on}"
)
if isinstance(document, str) or isinstance(document, Path):
with open(document, "r") as f:
document = json.load(f)
if isinstance(prediction, str) or isinstance(prediction, Path):
with open(prediction, "r") as f:
prediction = json.load(f)
if not isinstance(document, dict) or not isinstance(prediction, dict):
raise TypeError(
"Both `document` and `prediction` must be dictionaries. Received "
f"{type(document)} and {type(prediction)}"
)
if "plan" not in document or "plan" not in prediction:
raise ValueError(
f'Both `document` and `prediction` must contain the key "plan". At least '
f"one of them does not."
)
start = time()
doc1_name = f"ID: {document['id']} Title: {document['title']}"
doc2_name = f"ID: {prediction['id']} Title: {prediction['title']}"
logger.info(
f"\n\tStarting to compare two documents on {compare_on}:"
f"\n\t\t{doc1_name}"
f"\n\t\t{doc2_name}"
)
# mauve = load("mauve")
rouge = load("rouge")
section_results = []
doc_plan: List[Dict[str, Any]] = document["plan"]
predict_plan: List[Dict[str, Any]] = prediction["plan"]
logger.info(
f"\n\t{doc1_name} has {len(doc_plan)} sections."
f"\n\t{doc2_name} has {len(predict_plan)} sections."
)
total_comparisons = min(len(doc_plan), len(predict_plan))
# If plans have different lengths, just goes up to shortest
for idx, (p_dict, d_dict) in enumerate(zip(predict_plan, doc_plan), start=1):
# Compute MAUVE
# mauve_results = mauve.compute(
# predictions=[p_dict[compare_on]], references=[d_dict[compare_on]],verbose=True
# )
# mauve_score = mauve_results.mauve
# Compute ROUGE-L
results = rouge.compute(
predictions=[p_dict[compare_on]],
references=[d_dict[compare_on]],
rouge_types=["rougeL"],
)
rouge_score = results["rougeL"]
# Compute cosine distance between both section embeddings
cosine_1 = cosine_similarity(
[p_dict[f"{compare_on}_embedding_1"]], [d_dict[f"{compare_on}_embedding_1"]]
)[0][0]
cosine_2 = cosine_similarity(
[p_dict[f"{compare_on}_embedding_2"]], [d_dict[f"{compare_on}_embedding_2"]]
)[0][0]
# Combine results
result = {
"section_id": idx,
#"mauve_similarity": mauve_score,
"rouge_L_similarity": rouge_score,
"embedding1_cosine_similarity": cosine_1,
"embedding2_cosine_similarity": cosine_2,
}
section_results.append(result)
logger.info(f"{idx}/{total_comparisons} sections compared.")
# Calcualte total scores
# mauve_total = np.mean([x["mauve_similarity"] for x in section_results])
rouge_total = np.mean([x["rouge_L_similarity"] for x in section_results])
cosine_1_total = np.mean(
[x["embedding1_cosine_similarity"] for x in section_results]
)
cosine_2_total = np.mean(
[x["embedding2_cosine_similarity"] for x in section_results]
)
total_results = {
# "mauve_similarity": mauve_total,
"rouge_L_similarity": rouge_total,
"embedding1_cosine_similarity": cosine_1_total,
"embedding2_cosine_similarity": cosine_2_total,
}
if compare_on == "section":
compare_on = "plan"
output = {
"document_id": document["id"],
"prediction_id": prediction["id"],
f"{compare_on}_total_similarity": total_results,
f"{compare_on}_bysection_similarity": section_results,
}
end = time()
seconds = end - start
mins = seconds / 60
logger.info(
f"\n\tFinished comparing document {compare_on}s:"
f"\n\t\tThat took: {mins:.2f} mins ({seconds:.0f} seconds)"
)
return output
def compare_documents_sections(
document1: str | Path | Dict[str, Any],
document2: str | Path | Dict[str, Any],
) -> Dict[str, Any]:
"""This function takes two documents, a comparison method, compares the section
headings (also called plans) of the documents in order using the specified method,
and returns a dictionary containing the similarity scores.
Definition: a document's 'plan' is the headings and subheadings of the document in
order.
Example Usage:
>>> url_1 = 'https://en.wikipedia.org/wiki/Simulated_annealing'
>>> url_2 = 'https://en.wikipedia.org/wiki/Dual-phase_evolution'
>>> doc_1 = await extract_plan_and_content_wikipedia(url_1)
>>> doc_2 = await extract_plan_and_content_wikipedia(url_2)
>>> compare_plan = compare_documents_sections(doc_1, doc_2, None)
"""
return _compare_documents(document1, document2, compare_on="section")
def compare_documents_content(
document1: str | Path | Dict[str, Any],
document2: str | Path | Dict[str, Any],
) -> Dict[str, Any]:
"""This function takes two documents, a comparison method, compares the sections
of the documents using the specified method, and returns a dictionary containing
the section-wise similarity scores.
Definition: a document's 'content' is the text under the headings and subheadings
Example Usage:
>>> url_1 = 'https://en.wikipedia.org/wiki/Simulated_annealing'
>>> url_2 = 'https://en.wikipedia.org/wiki/Dual-phase_evolution'
>>> doc_1 = await extract_plan_and_content_wikipedia(url_1)
>>> doc_2 = await extract_plan_and_content_wikipedia(url_2)
>>> compare_sections = compare_documents_content(doc_1, doc_2, None)
"""
# TODO - do we really need method? Or can we just do every metric every time?
return _compare_documents(document1, document2, compare_on="content")
def generate_title(input: str | Path, doc_type: str) -> str:
"""Extracts the title from the input based on the document type."""
if doc_type == "wikipedia":
# For Wikipedia, we use the last part of the URL as the title
title = input.split("/")[-1].replace("_", " ")
elif doc_type == "arxiv":
# For arXiv, we use the file name without extension as the title
title = Path(input).stem.replace("_", " ")
elif doc_type == "patent":
# For patents, we read the first line and extract the title
with open(input, "r") as f:
first_line = f.readline()
title = first_line.split("\t")[-1].strip()
title = "".join(i for i in title if i not in "\/:*?<>|") # Remove illegal characters
else:
raise ValueError(f"doc_type must be one of 'patent', 'wikipedia', or 'arxiv'. Received {doc_type}")
return title
import re
import json
import uuid
import os
import asyncio
from pathlib import Path
# Helper functions for LaTeX processing
def latex_extract_citations(text, references):
citations = re.findall(r'\\cite[t|p]*\{([^}]+)\}', text)
return list({citation for citation in citations if citation in references})
def plan_create_section_entry(section_id, section_title, content="", resources_cited=None, references=[]):
if resources_cited is None:
resources_cited = []
return {
"section_id": section_id,
"section": section_title,
"content": content,
"resources_used": sorted(set([i for i, ref in enumerate(references) if ref in resources_cited])),
"resources_cited_key": resources_cited,
}
def latex_clean_content(content):
# Remove lines starting with %
content = '\n'.join(line for line in content.split('\n') if not line.strip().startswith('%'))
# Remove unnecessary LaTeX commands
content = re.sub(r'\\(usepackage|documentclass)\{.*?\}', '', content)
# Remove LaTeX environments
#content = re.sub(r'\\begin\{.*?\}.*?\\end\{.*?\}', '', content, flags=re.DOTALL)
# Remove extra whitespace
content = re.sub(r'\s+', ' ', content).strip()
return content
def latex_sections_hierarchical_numbering(sections, method="counters"):
section_counters = {"section": 0, "subsection": 0, "subsubsection": 0}
numbered_sections = []
for section_type, section_title in sections:
if method == "counters":
if section_type == "section":
section_counters["section"] += 1
section_counters["subsection"] = 0
section_counters["subsubsection"] = 0
section_number = f"{section_counters['section']}"
elif section_type == "subsection":
section_counters["subsection"] += 1
section_counters["subsubsection"] = 0
section_number = f"{section_counters['section']}.{section_counters['subsection']}"
elif section_type == "subsubsection":
section_counters["subsubsection"] += 1
section_number = f"{section_counters['section']}.{section_counters['subsection']}.{section_counters['subsubsection']}"
section_title = f"{section_number} {section_title}"
numbered_sections.append((section_type, section_title))
return numbered_sections
def biblatex_extract_resources(bibtex_content):
resources = []
entries = re.findall(r'@(\w+)\{([^,]+),(.+?)\n\}', bibtex_content, re.DOTALL | re.IGNORECASE)
for i, (entry_type, citation_key, content) in enumerate(entries):
title_match = re.search(r'title\s*=\s*\{(.+?)\}', content, re.DOTALL | re.IGNORECASE)
author_match = re.search(r'author\s*=\s*\{(.+?)\}', content, re.DOTALL | re.IGNORECASE)
year_match = re.search(r'year\s*=\s*\{(.+?)\}', content)
url_match = re.search(r'url\s*=\s*\{(.+?)\}', content)
doi_match = re.search(r'doi\s*=\s*\{(.+?)\}', content, re.DOTALL)
title = title_match.group(1).strip() if title_match else None
author = author_match.group(1).strip() if author_match else None
year = year_match.group(1).strip() if year_match else None
url = url_match.group(1).strip() if url_match else doi_match.group(1).strip() if doi_match else None
resources.append({
"resource_id": i + 1,
"resource_key": citation_key.strip(),
"resource_description": f"{title if title else ''}\nAuthor:{author if author else ''}\nYear:{year if year else ''}",
"url": url
})
return resources
# Main function to extract plan and content from LaTeX files
async def extract_plan_and_content_latex(tex_file: Path, without_embeddings=False, output_dir = './output/latex') -> Dict[str, Any]:
data_dir = tex_file.parent
tex_filename = tex_file.name
bib_filename = tex_filename.replace('.tex', '.bib')