Skip to content

Commit

Permalink
Merge pull request #125 from m-appel/121-add-informative-reference-ur…
Browse files Browse the repository at this point in the history
…l-and-data-modification-time

Add informative reference url and data modification time
  • Loading branch information
romain-fontugne authored Feb 20, 2024
2 parents de1f514 + fd6f8be commit 9568521
Show file tree
Hide file tree
Showing 37 changed files with 515 additions and 185 deletions.
53 changes: 46 additions & 7 deletions iyp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Optional

import requests
from github import Github
from neo4j import GraphDatabase

BATCH_SIZE = 50000
Expand Down Expand Up @@ -79,6 +80,34 @@ def dict2str(d, eq=':', pfx=''):
return '{' + ','.join(data) + '}'


def get_commit_datetime(repo, file_path):
"""Get the datetime of the latest commit modifying a file in a GitHub repository.
repo: The name of the repository in org/repo format, e.g.,
"InternetHealthReport/internet-yellow-pages"
file_path: The path to the file relative to the repository root, e.g.,
"iyp/__init__.py"
"""
return Github().get_repo(repo).get_commits(path=file_path)[0].commit.committer.date


def set_modification_time_from_last_modified_header(reference, response):
"""Set the reference_time_modification field of the specified reference dict to the
datetime parsed from the Last-Modified header of the specified response if
possible."""
try:
last_modified_str = response.headers['Last-Modified']
# All HTTP dates are in UTC:
# https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
last_modified = datetime.strptime(last_modified_str,
'%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
reference['reference_time_modification'] = last_modified
except KeyError:
logging.warning('No Last-Modified header; will not set modification time.')
except ValueError as e:
logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')


class RequestStatusError(requests.HTTPError):
def __init__(self, message):
self.message = message
Expand Down Expand Up @@ -109,6 +138,12 @@ def __init__(self, message):
super().__init__(self.message)


class DataNotAvailableError(Exception):
def __init__(self, message):
self.message = message
super().__init__(self.message)


class IYP(object):

def __init__(self):
Expand Down Expand Up @@ -548,9 +583,9 @@ def add_links(self, src_node, links):
for i, (type, dst_node, prop) in enumerate(links):

assert 'reference_org' in prop
assert 'reference_url' in prop
assert 'reference_url_data' in prop
assert 'reference_name' in prop
assert 'reference_time' in prop
assert 'reference_time_fetch' in prop

prop = format_properties(prop)

Expand Down Expand Up @@ -589,10 +624,12 @@ def __init__(self):
"""IYP and references initialization."""

self.reference = {
'reference_org': 'Internet Yellow Pages',
'reference_url': 'https://iyp.iijlab.net',
'reference_name': 'iyp',
'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
'reference_org': 'Internet Yellow Pages',
'reference_url_data': 'https://iyp.iijlab.net',
'reference_url_info': str(),
'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc),
'reference_time_modification': None
}

# connection to IYP database
Expand All @@ -617,8 +654,10 @@ def __init__(self, organization, url, name):
self.reference = {
'reference_name': name,
'reference_org': organization,
'reference_url': url,
'reference_time': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
'reference_url_data': url,
'reference_url_info': str(),
'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc),
'reference_time_modification': None
}

# connection to IYP database
Expand Down
70 changes: 56 additions & 14 deletions iyp/crawlers/alice_lg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import ipaddress
import logging
import os
import sys
from collections import defaultdict
from concurrent.futures import as_completed
from datetime import datetime
Expand Down Expand Up @@ -84,6 +83,10 @@ def __init__(self,

# URLs to the API
url = url.rstrip('/')
if url.endswith('/api/v1'):
self.reference['reference_url_info'] = url[:-len('/api/v1')]
else:
logging.warning(f'Data URL does not end with "/api/v1", will not set info URL: {url}')
self.urls = {
'routeservers': f'{url}/routeservers',
'neighbors': url + '/routeservers/{rs}/neighbors',
Expand All @@ -97,6 +100,8 @@ def __init__(self,
# List of neighbor dicts. Each dict contains information about the route server,
# so we do not keep track of that separately.
self.neighbors = list()
# Dict mapping routeserver_id to the cache time of that server.
self.routeserver_cached_at = dict()
# Dict mapping (routeserver_id, neighbor_id) tuple to a list of route dicts.
self.routes = dict()
# If routes should be fetched or not.
Expand All @@ -123,8 +128,6 @@ def decode_json(resp: Response, *args, **kwargs) -> None:
try:
resp.data = resp.json()
except JSONDecodeError as e:
print(f'Failed to retrieve data for {resp.url}', file=sys.stderr)
print(f'Error while reading json data: {e}', file=sys.stderr)
logging.error(f'Error while reading json data: {e}')
logging.error(resp.status_code)
logging.error(resp.headers)
Expand Down Expand Up @@ -160,8 +163,6 @@ def fetch_urls(self, urls: list, additional_data=list()) -> Iterable:
except Exception as e:
logging.error(f'Failed to retrieve data for {future}')
logging.error(e)
print(f'Failed to retrieve data for {future}', file=sys.stderr)
print(e, file=sys.stderr)
return False, dict(), None

def fetch_url(self, url: str) -> Tuple[bool, dict]:
Expand All @@ -177,7 +178,6 @@ def __fetch_routeservers(self) -> None:
logging.info('Using cached route server information.')
self.routeservers = self.cache_handler.load_cached_object(routeserver_object_name)
else:
print(f'Fetching route servers from {self.urls["routeservers"]}')
logging.info(f'Fetching route servers from {self.urls["routeservers"]}')
is_ok, routeservers_root = self.fetch_url(self.urls['routeservers'])
if not is_ok:
Expand All @@ -190,28 +190,61 @@ def __fetch_neighbors(self) -> None:
neighbor_object_name = 'neighbors'
if self.cache_handler.cached_object_exists(neighbor_object_name):
logging.info('Using cached neighbor information.')
self.neighbors = self.cache_handler.load_cached_object(neighbor_object_name)
neighbor_object = self.cache_handler.load_cached_object(neighbor_object_name)
self.routeserver_cached_at = neighbor_object['routeserver_cached_at']
self.neighbors = neighbor_object['neighbors']
else:
print(f'Fetching neighbor information from {len(self.routeservers)} route servers.')
logging.info(f'Fetching neighbor information from {len(self.routeservers)} route servers.')
neighbor_urls = [self.urls['neighbors'].format(rs=rs['id']) for rs in self.routeservers]
failed_routeservers = list()
for is_ok, neighbor_list_root, routeserver_id in self.fetch_urls(neighbor_urls,
additional_data=self.routeservers):
for is_ok, neighbor_list_root, routeserver in self.fetch_urls(neighbor_urls,
additional_data=self.routeservers):
routeserver_id = routeserver['id']
if not is_ok:
failed_routeservers.append(routeserver_id)
continue
try:
cached_at_str = neighbor_list_root['api']['cache_status']['cached_at']
except KeyError:
cached_at_str = str()
if cached_at_str:
cached_at = None
# Alice-LG uses nanosecond-granularity timestamps, which are not
# valid ISO format...
try:
pre, suf = cached_at_str.rsplit('.', maxsplit=1)
if suf.endswith('Z'):
# UTC
frac_seconds = suf[:-1]
tz_suffix = '+00:00'
elif '+' in suf:
# Hopefully a timezone identifier of form +HH:MM
frac_seconds, tz_suffix = suf.split('+')
tz_suffix = '+' + tz_suffix
else:
raise ValueError(f'Failed to get timezone from timestamp :{cached_at_str}')
if not frac_seconds.isdigit():
raise ValueError(f'Fractional seconds are not digits: {cached_at_str}')
# Reduce to six digits (ms).
frac_seconds = frac_seconds[:6]
cached_at_str = f'{pre}.{frac_seconds}{tz_suffix}'
cached_at = datetime.fromisoformat(cached_at_str)
except ValueError as e:
logging.warning(f'Failed to get cached_at timestamp for routeserver "{routeserver_id}": {e}')
if cached_at:
self.routeserver_cached_at[routeserver_id] = cached_at
# Spelling of neighbors/neighbours field is not consistent...
if 'neighbors' in neighbor_list_root:
neighbor_list = neighbor_list_root['neighbors']
elif 'neighbours' in neighbor_list_root:
neighbor_list = neighbor_list_root['neighbours']
else:
logging.error(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}')
print(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}', file=sys.stderr)
continue
self.neighbors += neighbor_list
self.cache_handler.save_cached_object(neighbor_object_name, self.neighbors)
neighbor_object = {'routeserver_cached_at': self.routeserver_cached_at,
'neighbors': self.neighbors}
self.cache_handler.save_cached_object(neighbor_object_name, neighbor_object)
if failed_routeservers:
logging.warning(f'Failed to get neighbor information for {len(failed_routeservers)} routeservers: '
f'{failed_routeservers}')
Expand Down Expand Up @@ -343,7 +376,15 @@ def run(self) -> None:
if ('details:route_changes' in flattened_neighbor
and isinstance(flattened_neighbor['details:route_changes'], flatdict.FlatDict)):
flattened_neighbor.pop('details:route_changes')
self.reference['reference_url'] = self.urls['neighbors'].format(rs=neighbor['routeserver_id'])
routeserver_id = neighbor['routeserver_id']
self.reference['reference_url_data'] = self.urls['neighbors'].format(rs=routeserver_id)
if routeserver_id in self.routeserver_cached_at:
self.reference['reference_time_modification'] = self.routeserver_cached_at[routeserver_id]
else:
logging.info(f'No modification time for routeserver: {routeserver_id}')
# Set to None to not reuse value of previous loop iteration.
self.reference['reference_time_modification'] = None

member_of_rels.append({'src_id': member_asn, # Translate to QID later.
'dst_id': n.data['ixp_qid'],
'props': [flattened_neighbor, self.reference.copy()]})
Expand All @@ -354,7 +395,8 @@ def run(self) -> None:
if self.fetch_routes:
logging.info('Iterating routes.')
for (routeserver_id, neighbor_id), routes in self.routes.items():
self.reference['reference_url'] = self.urls['routes'].format(rs=routeserver_id, neighbor=neighbor_id)
self.reference['reference_url_data'] = self.urls['routes'].format(rs=routeserver_id,
neighbor=neighbor_id)
for route in routes:
prefix = ipaddress.ip_network(route['network']).compressed
origin_asn = route['bgp']['as_path'][-1]
Expand Down
11 changes: 7 additions & 4 deletions iyp/crawlers/bgpkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,26 @@

import requests

from iyp import BaseCrawler, RequestStatusError
from iyp import (BaseCrawler, RequestStatusError,
set_modification_time_from_last_modified_header)


class AS2RelCrawler(BaseCrawler):
def __init__(self, organization, url, name, af):
"""Initialization: set the address family attribute (af)"""

self.af = af
super().__init__(organization, url, name)
self.af = af
self.reference['reference_url_info'] = 'https://data.bgpkit.com/as2rel/README.txt'

def run(self):
"""Fetch the AS relationship file from BGPKIT website and process lines one by
one."""

req = requests.get(self.url, stream=True)
if req.status_code != 200:
raise RequestStatusError('Error while fetching AS relationships')
raise RequestStatusError(f'Error while fetching AS relationships: {req.status_code}')

set_modification_time_from_last_modified_header(self.reference, req)

rels = []
asns = set()
Expand Down
6 changes: 5 additions & 1 deletion iyp/crawlers/bgpkit/peerstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@


class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
super().__init__(organization, url, name)
self.reference['reference_url_info'] = 'https://data.bgpkit.com/peer-stats/README.md'

def run(self):
"""Fetch peer stats for each collector."""
Expand Down Expand Up @@ -49,6 +52,7 @@ def run(self):
prev_day -= timedelta(days=1)
logging.warning("Today's data not yet available!")

self.reference['reference_time_modification'] = self.now
for collector in collectors:
url = URL.format(collector=collector, year=self.now.year,
month=self.now.month, day=self.now.day,
Expand All @@ -65,7 +69,7 @@ def run(self):
'BGPCollector',
{'name': stats['collector'], 'project': stats['project']}
)
self.reference['reference_url'] = url
self.reference['reference_url_data'] = url

asns = set()

Expand Down
11 changes: 7 additions & 4 deletions iyp/crawlers/bgpkit/pfx2asn.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import requests

from iyp import BaseCrawler, RequestStatusError
from iyp import (BaseCrawler, RequestStatusError,
set_modification_time_from_last_modified_header)

URL = 'https://data.bgpkit.com/pfx2as/pfx2as-latest.json.bz2'
ORG = 'BGPKIT'
Expand All @@ -22,7 +23,9 @@ def run(self):

req = requests.get(URL, stream=True)
if req.status_code != 200:
raise RequestStatusError('Error while fetching pfx2as relationships')
raise RequestStatusError(f'Error while fetching pfx2as relationships: {req.status_code}')

set_modification_time_from_last_modified_header(self.reference, req)

entries = []
asns = set()
Expand All @@ -35,7 +38,7 @@ def run(self):

req.close()

logging.info('Pushing nodes to neo4j...\n')
logging.info('Pushing nodes to neo4j...')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
Expand All @@ -48,7 +51,7 @@ def run(self):

links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]}) # Set AS name

logging.info('Pushing links to neo4j...\n')
logging.info('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('ORIGINATE', links)

Expand Down
16 changes: 13 additions & 3 deletions iyp/crawlers/bgptools/anycast_prefixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

import requests

from iyp import BaseCrawler, ConnectionError, RequestStatusError
from iyp import (BaseCrawler, ConnectionError, RequestStatusError,
get_commit_datetime)

# Organization name and URL to data
ORG = 'BGP.Tools'
Expand Down Expand Up @@ -38,6 +39,12 @@ def fetch_dataset(url: str):
class Crawler(BaseCrawler):
# Base Crawler provides access to IYP via self.iyp
# and setup a dictionary with the org/url/today's date in self.reference
def __init__(self, organization, url, name):
super().__init__(organization, url, name)
self.repo = 'bgptools/anycast-prefixes'
self.v4_file = 'anycatch-v4-prefixes.txt'
self.v6_file = 'anycatch-v6-prefixes.txt'
self.reference['reference_url_info'] = 'https://bgp.tools/kb/anycatch'

def run(self):
ipv4_prefixes_url = get_dataset_url(URL, 4)
Expand All @@ -51,13 +58,16 @@ def run(self):
ipv6_prefixes_filename = os.path.join(tmpdir, 'anycast_ipv6_prefixes.txt')

# Fetch data and push to IYP.
self.reference['reference_url'] = ipv4_prefixes_url # Overriding the reference_url according to prefixes
# Overriding the reference_url_data according to prefixes
self.reference['reference_url_data'] = ipv4_prefixes_url
self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v4_file)
ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url)
logging.info('IPv4 prefixes fetched successfully.')
self.update(ipv4_prefixes_response, ipv4_prefixes_filename)
logging.info('IPv4 prefixes pushed to IYP.')

self.reference['reference_url'] = ipv6_prefixes_url
self.reference['reference_url_data'] = ipv6_prefixes_url
self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file)
ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url)
logging.info('IPv6 prefixes fetched successfully.')
self.update(ipv6_prefixes_response, ipv6_prefixes_filename)
Expand Down
Loading

0 comments on commit 9568521

Please sign in to comment.