forked from DevAlone/proxy_py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_requests.py
78 lines (59 loc) · 2.35 KB
/
async_requests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import aiohttp
import json
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def get(url, **kwargs):
return await request('get', url, **kwargs)
async def post(url, data, **kwargs):
if data is dict or data is str:
kwargs['json'] = data
else:
kwargs['data'] = data
return await request('post', url, **kwargs)
async def request(method, url, **kwargs):
session_kwargs = {}
if 'proxy' in kwargs and kwargs['proxy'].startswith('socks'):
session_kwargs['connector'] = ProxyConnector(remote_resolve=False)
session_kwargs['request_class'] = ProxyClientRequest
if 'cookies'in kwargs:
session_kwargs['cookies'] = kwargs['cookies']
del kwargs['cookies']
if 'timeout' not in kwargs:
kwargs['timeout'] = 10
# headers={'User-Agent': get_random_user_agent()}
if 'headers' not in kwargs:
kwargs['headers'] = {
'User-Agent': get_random_user_agent()
}
elif 'User-Agent' not in kwargs['headers']:
kwargs['headers']['User-Agent'] = get_random_user_agent()
if 'override_session' in kwargs:
session = kwargs['override_session']
del kwargs['override_session']
async with session.request(method, url, **kwargs) as response:
return await Response.from_aiohttp_response(response)
async with aiohttp.ClientSession(**session_kwargs) as session:
async with session.request(method, url, **kwargs) as response:
return await Response.from_aiohttp_response(response)
class Response:
def __init__(self, status, text, aiohttp_response=None):
self.status = status
self.text = text
self.aiohttp_response = aiohttp_response
@staticmethod
async def from_aiohttp_response(aiohttp_response):
return Response(
status=aiohttp_response.status,
text=await aiohttp_response.text(),
aiohttp_response=aiohttp_response
)
def __str__(self):
return json.dumps({
'status': self.status,
'text': self.text,
})
__repr__ = __str__
def get_random_user_agent():
return 'Mozilla/5.0 (Windows NT;) Gecko/20100101 Firefox/58.0'
# return 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:55.0) Gecko/20100101 Firefox/55.0'
# TODO: do it
# return UserAgent().random