-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_crawl_i2p.py
112 lines (94 loc) · 4.14 KB
/
async_crawl_i2p.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import aiohttp
import asyncio
from aiohttp_socks import ProxyConnector
from bs4 import BeautifulSoup
from datetime import datetime
from urllib.parse import urljoin
import csv
import os
from colorama import init, Fore
from utils import print_colored, get_random_user_agent, generate_secure_random_string, save_data_to_file, save_url_to_csv, save_url_to_temp_db, load_urls_from_temp_db
from crawler_constants import *
# Initialize colorama
init()
async def web_crawler_with_saving_and_urls(id, url, session, connector):
flag = False
if ".onion" in str(url) or ".i2p" in str(url):
flag = True
if not flag:
return set()
if not id:
id = 1
scraped_urls = load_urls_from_temp_db()
if url in scraped_urls:
print_colored(f"URL already scraped: {url}", Fore.MAGENTA)
return set()
try:
# Add a random user agent to the headers
headers = {'User-Agent': get_random_user_agent()}
async with session.get(url, headers=headers, allow_redirects=True) as response:
response.raise_for_status() # Raise an HTTPError for bad responses
if response.status == 200:
# Get the final URL after following redirects
final_url = str(response.url)
print_colored(
f"Final URL after redirects: {final_url}", Fore.MAGENTA)
soup = BeautifulSoup(await response.text(), 'html.parser')
base_url = final_url
urls_set = {
urljoin(base_url, link.get('href'))
for link in soup.find_all('a', href=True)
if not link.get('href').startswith('mailto:')
}
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
filename = f"{id}_{timestamp}_{generate_secure_random_string(8)}.html"
save_data_to_file(await response.text(), DATA_DIRECTORY, filename, i2p=True)
# Save the final URL to CSV
save_url_to_csv(filename, final_url, CSV_FILE_PATH, i2p=True)
# Save the final URL to the temporary database
save_url_to_temp_db(final_url, i2p=True)
save_url_to_temp_db(url, i2p=True)
return urls_set
else:
print_colored(
f"Failed to retrieve the page. Status code: {response.status}", Fore.MAGENTA)
return set()
except Exception as e:
print_colored(
f"Request failed for URL: {url}\nError: {e}", Fore.MAGENTA)
return set()
async def recursive_crawler(url, session, connector, depth=1, max_depth=3, limit=False):
if limit and depth > max_depth:
return
print_colored(f"\nCrawling URL (Depth {depth}): {url}", Fore.MAGENTA)
found_urls = await web_crawler_with_saving_and_urls(depth, url, session, connector)
tasks = [recursive_crawler(next_url, session, connector,
depth + 1, max_depth, limit) for next_url in found_urls]
await asyncio.gather(*tasks)
async def main():
test_url = "http://ramble.i2p/"
url_to_crawl = test_url
proxy_url = 'http://localhost:4444'
connector = ProxyConnector.from_url(proxy_url)
try:
async with aiohttp.ClientSession(connector=connector) as session:
await recursive_crawler(url_to_crawl, session=session, connector=connector)
except KeyboardInterrupt:
print_colored("KeyboardInterrupt received. Exiting...", Fore.RED)
finally:
temp_folder_path = 'temp'
try:
for file_name in os.listdir(temp_folder_path):
file_path = os.path.join(temp_folder_path, file_name)
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
os.rmdir(file_path)
os.rmdir(temp_folder_path)
except Exception as e:
print_colored(f"Error during cleanup: {str(e)}", Fore.MAGENTA)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print_colored("\nKeyboardInterrupt received. Exiting...", Fore.RED)