-
Notifications
You must be signed in to change notification settings - Fork 0
/
extractor.py
113 lines (88 loc) · 3.23 KB
/
extractor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import numpy as np
import json
import os
from bs4 import BeautifulSoup as bs4
import sys
import logging
from multiprocessing import Process, Manager, Pool, freeze_support
from util.argparsers import extractor_parser
def pool_job(datadir, html_json_file, logger_path):
high_logger = my_custom_logger(logger_path)
file_name = html_json_file.split('/')[-1].split('\\')[-1]
site = file_name.split('_')[1]
area = file_name.split('_')[0]
with open(f"{html_json_file}", "r", encoding='utf-8') as f:
jdata = json.load(f)
statement = f'== Extracting text from {site} {area} htmls'
print(statement)
high_logger.warning(statement)
platform_dict = {"platform": site, "area": area}
platform_dict['pages'] = []
for (page_key, page_info) in jdata['pages'].items():
page_dict = {"page_id": page_key, "source": page_info['url']}
page_soup = bs4(page_info['html'], features='lxml')
text_elements = []
for string in page_soup.strings:
if not string.isspace():
text_elements.append(f"{string}\n")
page_dict['text'] = text_elements
platform_dict['pages'].append(page_dict)
output_file = f"{datadir}/all_text/{area}/{site}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(platform_dict , f)
statement = f"==== wrote to {output_file}"
print(statement)
high_logger.warning(statement)
statement = f"== Completed {site} {area} text extraction"
print(statement)
high_logger.warning(statement)
def extract_text(datadir, pools, **kwargs):
if pools:
pool = Pool(pools)
html_directory = f"{datadir}/all_htmls/"
try:
os.mkdir(f"{datadir}/all_text/")
except FileExistsError as e:
pass
try:
os.mkdir(f"{datadir}/all_text/hatespeech")
except FileExistsError as e:
pass
try:
os.mkdir(f"{datadir}/all_text/misinformation")
except FileExistsError as e:
pass
try:
os.mkdir(f"{datadir}/all_text/copyright")
except FileExistsError as e:
pass
try:
os.mkdir(f"{datadir}/logs/extractor/")
except FileExistsError as e:
pass
logger_path = f"{datadir}/logs/extractor/prints.log"
high_logger = my_custom_logger(logger_path)
for file in os.listdir(f"{html_directory}"):
if file.endswith('.json'):
html_json_path = f"{html_directory}/{file}"
if pools:
pool.apply_async(pool_job, args=(datadir, html_json_path, logger_path), error_callback=pcb)
else:
pool_job(datadir, html_json_path, logger_path)
if pools:
pool.close()
pool.join()
statement = f"Completed Extraction for {datadir}"
print(statement)
high_logger.warning(statement)
def pcb(res):
print(f'One of the jobs errored: {res}')
def my_custom_logger(logger_name, level=logging.WARNING):
logger = logging.getLogger(logger_name)
logger.setLevel(level)
file_handler = logging.FileHandler(logger_name, mode='a')
logger.addHandler(file_handler)
return logger
if __name__ == "__main__":
args = extractor_parser.parse_args()
extract_text(**vars(args))