-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
327 lines (284 loc) · 11.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import requests
from bs4 import BeautifulSoup, Comment
import pandas as pd
import os, schedule, time, logging, re, nltk, json, random
from nltk.corpus import stopwords, wordnet
from nltk.stem import PorterStemmer
from concurrent.futures import ThreadPoolExecutor
from urllib.robotparser import RobotFileParser
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load or set default configuration
config = {
"MAX_VECTOR_SIZE": 10000, # Hypervector dimensionality
"TEST_SPLIT": 0.1,
"URLS": [
'https://www.bbc.com',
'https://www.cnn.com',
],
"RATE_LIMIT": 1 # seconds between requests to prevent overloading
}
# Load configuration from file if exists
try:
with open('config.json', 'r') as f:
config.update(json.load(f))
except FileNotFoundError:
logging.info("No config file found, using default configuration.")
# Apply configuration
MAX_VECTOR_SIZE = config['MAX_VECTOR_SIZE']
TEST_SPLIT = config['TEST_SPLIT']
URLS = config['URLS']
RATE_LIMIT = config['RATE_LIMIT']
# Prepare data storage
DATA_DIR = 'data'
os.makedirs(DATA_DIR, exist_ok=True)
EXISTING_FEATURES_PATH = os.path.join(DATA_DIR, 'existing_features.pkl')
EXISTING_LABELS_PATH = os.path.join(DATA_DIR, 'existing_labels.pkl')
LABEL_MAP_PATH = os.path.join(DATA_DIR, 'label_map.pkl')
# NLTK setup for enhanced preprocessing
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')
nltk.download('punkt')
stop_words = set(stopwords.words('english'))
porter = PorterStemmer()
visited_urls = set()
robot_cache = {}
# Initialize or load label map
if os.path.exists(LABEL_MAP_PATH):
label_map = pd.read_pickle(LABEL_MAP_PATH)
else:
label_map = {'Long Text': 0, 'Short Text': 1, 'Media Rich': 2}
# Function to respect rate limiting
def rate_limit_delay():
time.sleep(RATE_LIMIT)
# Retry mechanism with exponential backoff
def fetch_with_retries(url, retries=3, backoff=2):
for attempt in range(retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == retries - 1:
logging.error(f"Request failed after {retries} retries for {url}: {e}")
return None
else:
logging.warning(f"Retrying {url} in {backoff ** attempt} seconds due to: {e}")
time.sleep(backoff ** attempt)
# Caching robots.txt
def can_fetch(url):
domain = re.findall(r'(https?://[^/]+)', url)[0]
if domain not in robot_cache:
rp = RobotFileParser()
rp.set_url(domain + '/robots.txt')
rp.read()
robot_cache[domain] = rp
return robot_cache[domain].can_fetch('*', url)
# Get internal links with base URL
def get_internal_links(soup, base_url):
internal_links = set()
for link in soup.find_all('a', href=True):
href = link['href']
if href.startswith('/') or not href.startswith(('http://', 'https://')):
internal_link = base_url + href
internal_links.add(internal_link)
elif href.startswith(base_url):
internal_links.add(href)
return internal_links
# Web Scraping with Error Handling, Retry, and Recursive Depth
def scrape_website(url, depth=0, max_depth=2):
print(f"Scraping {url} at depth {depth}...")
if depth > max_depth:
return None, None
rate_limit_delay() # Apply rate limiting
response = fetch_with_retries(url)
if response:
soup = BeautifulSoup(response.content, 'html.parser')
body_tag = soup.find('body')
if body_tag:
for element in body_tag(text=lambda text: isinstance(text, Comment)):
element.extract()
body_content = ' '.join(body_tag.stripped_strings)
if depth < max_depth:
internal_links = get_internal_links(soup, url)
for link in internal_links:
if can_fetch(link) and link not in visited_urls:
link_content, _ = scrape_website(link, depth + 1)
if link_content:
body_content += ' ' + link_content
visited_urls.add(link)
else:
logging.error(f"Failed to scrape {link}")
elif not can_fetch(link):
logging.info(f"Scraping of {link} disallowed by robots.txt")
print(f"Scraped successfully: {url}")
return body_content, soup
return None, None
# Heuristic Labeling with Improved Strategy
def generate_label(soup):
if soup is None:
return None
text_length = len(soup.get_text())
num_images = len(soup.find_all('img'))
num_videos = len(soup.find_all('video'))
if num_images > 5 or num_videos > 0:
return 'Media Rich'
elif text_length > 1000:
return 'Long Text'
else:
return 'Short Text'
# Hyperdimensional Computing (HDC) based data preprocessing
def generate_random_hypervector(size):
"""Generate a random hypervector of +1/-1 with the specified dimensionality."""
return np.random.choice([-1, 1], size=size)
def superposition(vectors):
"""Perform superposition (element-wise summing) of multiple hypervectors."""
return np.sign(np.sum(vectors, axis=0))
def bind(vectors):
"""Perform binding (element-wise multiplication) of hypervectors."""
result = vectors[0]
for vector in vectors[1:]:
result = result * vector
return result
def text_to_hypervector(text):
"""Convert text to a hypervector using bundling and binding."""
words = clean_text(text).split()
word_vectors = [generate_random_hypervector(MAX_VECTOR_SIZE) for _ in words]
return superposition(word_vectors) # Hyperdimensional encoding through superposition
# Text cleaning
def clean_text(text):
text = text.lower()
text = re.sub(r'\[.*?\]', '', text)
text = re.sub(r'https?://\S+|www\.\S+', '', text)
text = re.sub(r'<.*?>+', '', text)
text = re.sub(r'[^a-zA-Z\s]', '', text)
text = ' '.join([word for word in text.split() if word not in stop_words])
return text
# Symbolic Inference: Neuro-Symbolic Classification
def symbolic_inference(features):
"""Perform neuro-symbolic reasoning based on the hypervector input."""
# Example neuro-symbolic logic:
# 1. Check for certain characteristics (e.g., length, specific patterns).
# 2. Use logical rules for classification (e.g., length-based or content-based rules).
predictions = []
for feature in features:
magnitude = np.sum(np.abs(feature)) # Example: Summing absolute values as a proxy for content magnitude
if magnitude > 5000:
predictions.append(0) # 'Long Text'
elif magnitude < 3000 and random.random() > 0.5:
predictions.append(1) # 'Short Text'
else:
predictions.append(2) # 'Media Rich'
return predictions
# Data Augmentation
def augment_text(text):
words = text.split()
augmented_text = words.copy()
# Synonym replacement
n_replace = max(1, int(len(words) * 0.1))
for _ in range(n_replace):
idx_to_replace = random.randint(0, len(words) - 1)
synonyms = get_synonyms(words[idx_to_replace])
if synonyms:
synonym = random.choice(synonyms)
augmented_text[idx_to_replace] = synonym
return ' '.join(augmented_text)
def get_synonyms(word):
synonyms = set()
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonym = lemma.name().replace('_', ' ')
if synonym != word:
synonyms.add(synonym)
return list(synonyms)
def random_deletion(text, p=0.1):
words = text.split()
if len(words) == 1:
return text
retained_words = [word for word in words if random.random() > p]
return ' '.join(retained_words)
def random_swap(text, n_swaps=1):
words = text.split()
if len(words) < 2:
return text
for _ in range(n_swaps):
idx1, idx2 = random.sample(range(len(words)), 2)
words[idx1], words[idx2] = words[idx2], words[idx1]
return ' '.join(words)
# Preprocessing Data
def preprocess_data(raw_data):
augmented_data = []
for text in raw_data:
clean_text_data = clean_text(text)
augmented_data.append(clean_text_data)
augmented_data.append(augment_text(clean_text_data))
augmented_data.append(random_deletion(clean_text_data))
augmented_data.append(random_swap(clean_text_data))
hypervectors = [text_to_hypervector(text) for text in augmented_data]
return hypervectors
# Data integration with Error Handling
def integrate_data(new_data, existing_data_path):
if new_data is None:
return None
if os.path.exists(existing_data_path):
existing_data = pd.read_pickle(existing_data_path)
else:
existing_data = pd.DataFrame()
integrated_data = pd.concat([existing_data, pd.DataFrame(new_data)], ignore_index=True)
integrated_data.to_pickle(existing_data_path)
return integrated_data
# Evaluation and Performance Metrics
def evaluate_model(predictions, labels):
precision = precision_score(labels, predictions, average='weighted')
recall = recall_score(labels, predictions, average='weighted')
f1 = f1_score(labels, predictions, average='weighted')
logging.info(f'Precision: {precision}, Recall: {recall}, F1 Score: {f1}')
cm = confusion_matrix(labels, predictions)
plot_confusion_matrix(cm)
# Plot Confusion Matrix
def plot_confusion_matrix(cm):
plt.figure(figsize=(10, 7))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()
# Concurrency and Automated Retraining Setup
def automated_retraining():
print("Starting automated retraining...")
with ThreadPoolExecutor(max_workers=len(URLS)) as executor:
futures = [executor.submit(scrape_website, url, 0) for url in URLS]
for future in futures:
body_content, soup = future.result()
if body_content and soup:
label = generate_label(soup)
update_label_map(label)
preprocessed_content = preprocess_data([body_content])
integrated_features = integrate_data(preprocessed_content, EXISTING_FEATURES_PATH)
integrated_labels = integrate_data([label], EXISTING_LABELS_PATH)
if integrated_features is not None and integrated_labels is not None:
predictions = symbolic_inference(integrated_features) # Neuro-symbolic reasoning for classification
evaluate_model(predictions, integrated_labels)
def update_label_map(label):
global label_map
if label not in label_map:
new_index = max(label_map.values()) + 1
label_map[label] = new_index
pd.to_pickle(label_map, LABEL_MAP_PATH)
print(f"Updated label map with new label: {label}")
# Run schedule
if __name__ == "__main__":
automated_retraining()
schedule.every().day.at("01:00").do(automated_retraining)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logging.info("Program interrupted by user. Exiting...")