-
Notifications
You must be signed in to change notification settings - Fork 57
/
3b-fetch-ip-addresses-future-exceptions.py
47 lines (34 loc) · 1.24 KB
/
3b-fetch-ip-addresses-future-exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from collections import namedtuple
import time
import asyncio
import aiohttp
import traceback
Service = namedtuple('Service', ('name', 'url', 'ip_attr'))
SERVICES = (
Service('ipify', 'https://api.ipify.org?format=json', 'ip'),
Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'),
Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip')
)
async def aiohttp_get_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_ip(service):
start = time.time()
print('Fetching IP from {}'.format(service.name))
try:
json_response = await aiohttp_get_json(service.url)
except:
return '{} is unresponsive'.format(service.name)
ip = json_response[service.ip_attr]
return '{} finished with result: {}, took: {:.2f} seconds'.format(
service.name, ip, time.time() - start)
async def main():
futures = [fetch_ip(service) for service in SERVICES]
done, _ = await asyncio.wait(futures)
for future in done:
try:
print(future.result())
except:
print("Unexpected error: {}".format(traceback.format_exc()))
asyncio.run(main())