The MATCH clause describes a pattern in the graph.
-
The pattern is given in a ASCII art representation
- where nodes are depicted by a pair of parentheses, (),
+
The pattern is given in a ASCII art representation
+ where nodes are depicted by a pair of parentheses, (),
and relationship are depicted with two dashes -- sometimes
including more information in square brackets -[]-.
Thus (iij:AS)-[:MEMBER_OF]-(ix:IXP)
- describes a path that starts from a node we'll call iij
+ describes a path that starts from a node we'll call iij
that connects to another node we'll call ix.
- iij and ix are arbitrary identifiers that allow us to refer to a
+ iij and ix are arbitrary identifiers that allow us to refer to a
certain node later on.
-
In IYP all nodes and relationships have a type (called
+
In IYP all nodes and relationships have a type (called
labels for nodes) that convey what the nodes and relationships represent.
- The labels/types are given after the colon, for example (:AS)
+ The labels/types are given after the colon, for example (:AS)
is a node representing an AS, and, -[:MEMBER_OF]- is relationship
- of type member of.
+ of type member of.
The WHERE clause describe conditions for nodes
- or relationship that match the pattern. Here we specify that the
+ or relationship that match the pattern. Here we specify that the
node called iij should have a property asn that equals to 2497.
The RETURN clause describes the nodes and links we want to display.
@@ -90,8 +90,8 @@
Node and relationship properties
All relationships in IYP have at least these properties:
reference_org: The organisation that provide the data.
-
reference_url: The URL where the data comes from.
-
reference_time: The time when the data was retrieved.
+
reference_url_data: The URL where the data comes from.
+
reference_time_fetch: The time when the data was retrieved.
reference_name: The name of the script that pushed data to IYP.
@@ -100,7 +100,7 @@
Node and relationship properties
Filter on properties
The previous example had a condition on the asn property of the AS node, you can also filter on the relationship properties. For example, this query look for IIJ memberships other than those given by PeeringDB.
From 00ee92051480bbdccb5f8fc18a0a16830c77952a Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Mon, 12 Feb 2024 07:42:48 +0000
Subject: [PATCH 02/22] Add info URL and modification timestamp for Alice-LG
---
iyp/crawlers/alice_lg/__init__.py | 63 ++++++++++++++++++++++++-------
1 file changed, 49 insertions(+), 14 deletions(-)
diff --git a/iyp/crawlers/alice_lg/__init__.py b/iyp/crawlers/alice_lg/__init__.py
index 09449d4..8f7f685 100644
--- a/iyp/crawlers/alice_lg/__init__.py
+++ b/iyp/crawlers/alice_lg/__init__.py
@@ -1,10 +1,9 @@
import ipaddress
import logging
import os
-import sys
from collections import defaultdict
from concurrent.futures import as_completed
-from datetime import datetime
+from datetime import datetime, timezone
from json import JSONDecodeError
from typing import Iterable, Tuple
@@ -84,6 +83,10 @@ def __init__(self,
# URLs to the API
url = url.rstrip('/')
+ if url.endswith('/api/v1'):
+ self.reference['reference_url_info'] = url[:-len('/api/v1')]
+ else:
+ logging.warning(f'Data URL does not end with "/api/v1", will not set info URL: {url}')
self.urls = {
'routeservers': f'{url}/routeservers',
'neighbors': url + '/routeservers/{rs}/neighbors',
@@ -97,6 +100,8 @@ def __init__(self,
# List of neighbor dicts. Each dict contains information about the route server,
# so we do not keep track of that separately.
self.neighbors = list()
+ # Dict mapping routeserver_id to the cache time of that server.
+ self.routeserver_cached_at = dict()
# Dict mapping (routeserver_id, neighbor_id) tuple to a list of route dicts.
self.routes = dict()
# If routes should be fetched or not.
@@ -123,8 +128,6 @@ def decode_json(resp: Response, *args, **kwargs) -> None:
try:
resp.data = resp.json()
except JSONDecodeError as e:
- print(f'Failed to retrieve data for {resp.url}', file=sys.stderr)
- print(f'Error while reading json data: {e}', file=sys.stderr)
logging.error(f'Error while reading json data: {e}')
logging.error(resp.status_code)
logging.error(resp.headers)
@@ -160,8 +163,6 @@ def fetch_urls(self, urls: list, additional_data=list()) -> Iterable:
except Exception as e:
logging.error(f'Failed to retrieve data for {future}')
logging.error(e)
- print(f'Failed to retrieve data for {future}', file=sys.stderr)
- print(e, file=sys.stderr)
return False, dict(), None
def fetch_url(self, url: str) -> Tuple[bool, dict]:
@@ -177,7 +178,6 @@ def __fetch_routeservers(self) -> None:
logging.info('Using cached route server information.')
self.routeservers = self.cache_handler.load_cached_object(routeserver_object_name)
else:
- print(f'Fetching route servers from {self.urls["routeservers"]}')
logging.info(f'Fetching route servers from {self.urls["routeservers"]}')
is_ok, routeservers_root = self.fetch_url(self.urls['routeservers'])
if not is_ok:
@@ -190,17 +190,43 @@ def __fetch_neighbors(self) -> None:
neighbor_object_name = 'neighbors'
if self.cache_handler.cached_object_exists(neighbor_object_name):
logging.info('Using cached neighbor information.')
- self.neighbors = self.cache_handler.load_cached_object(neighbor_object_name)
+ neighbor_object = self.cache_handler.load_cached_object(neighbor_object_name)
+ self.routeserver_cached_at = neighbor_object['routeserver_cached_at']
+ self.neighbors = neighbor_object['neighbors']
else:
- print(f'Fetching neighbor information from {len(self.routeservers)} route servers.')
logging.info(f'Fetching neighbor information from {len(self.routeservers)} route servers.')
neighbor_urls = [self.urls['neighbors'].format(rs=rs['id']) for rs in self.routeservers]
failed_routeservers = list()
- for is_ok, neighbor_list_root, routeserver_id in self.fetch_urls(neighbor_urls,
- additional_data=self.routeservers):
+ for is_ok, neighbor_list_root, routeserver in self.fetch_urls(neighbor_urls,
+ additional_data=self.routeservers):
+ routeserver_id = routeserver['id']
if not is_ok:
failed_routeservers.append(routeserver_id)
continue
+ try:
+ cached_at_str = neighbor_list_root['api']['cache_status']['cached_at']
+ except KeyError:
+ cached_at_str = str()
+ if cached_at_str:
+ cached_at = None
+ # Alice-LG uses nanosecond-granularity timestamps, which are not
+ # valid ISO format...
+ try:
+ pre, suf = cached_at_str.rsplit('.', maxsplit=1)
+ if not suf or not suf.endswith('Z'):
+ raise ValueError(f'Fractional seconds missing or timestamp not ending with "Z" (not UTC): '
+ f'{cached_at_str}')
+ suf = suf[:-1] # Strip "Z".
+ if not suf.isdigit():
+ raise ValueError(f'Fractional seconds are not digits: {cached_at_str}')
+ # Reduce to six digits (ms).
+ suf = suf[:6]
+ cached_at_str = f'{pre}.{suf}'
+ cached_at = datetime.fromisoformat(cached_at_str).replace(tzinfo=timezone.utc)
+ except ValueError as e:
+ logging.warning(f'Failed to get cached_at timestamp for routeserver "{routeserver_id}": {e}')
+ if cached_at:
+ self.routeserver_cached_at[routeserver_id] = cached_at
# Spelling of neighbors/neighbours field is not consistent...
if 'neighbors' in neighbor_list_root:
neighbor_list = neighbor_list_root['neighbors']
@@ -208,10 +234,11 @@ def __fetch_neighbors(self) -> None:
neighbor_list = neighbor_list_root['neighbours']
else:
logging.error(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}')
- print(f'Missing "neighbors"/"neighbours" field in reply: {neighbor_list_root}', file=sys.stderr)
continue
self.neighbors += neighbor_list
- self.cache_handler.save_cached_object(neighbor_object_name, self.neighbors)
+ neighbor_object = {'routeserver_cached_at': self.routeserver_cached_at,
+ 'neighbors': self.neighbors}
+ self.cache_handler.save_cached_object(neighbor_object_name, neighbor_object)
if failed_routeservers:
logging.warning(f'Failed to get neighbor information for {len(failed_routeservers)} routeservers: '
f'{failed_routeservers}')
@@ -343,7 +370,15 @@ def run(self) -> None:
if ('details:route_changes' in flattened_neighbor
and isinstance(flattened_neighbor['details:route_changes'], flatdict.FlatDict)):
flattened_neighbor.pop('details:route_changes')
- self.reference['reference_url_data'] = self.urls['neighbors'].format(rs=neighbor['routeserver_id'])
+ routeserver_id = neighbor['routeserver_id']
+ self.reference['reference_url_data'] = self.urls['neighbors'].format(rs=routeserver_id)
+ if routeserver_id in self.routeserver_cached_at:
+ self.reference['reference_time_modification'] = self.routeserver_cached_at[routeserver_id]
+ else:
+ logging.info(f'No modification time for routeserver: {routeserver_id}')
+ # Set to None to not reuse value of previous loop iteration.
+ self.reference['reference_time_modification'] = None
+
member_of_rels.append({'src_id': member_asn, # Translate to QID later.
'dst_id': n.data['ixp_qid'],
'props': [flattened_neighbor, self.reference.copy()]})
From 948ff145d6783048ed8764c607d08b5fac8c9fc6 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Mon, 12 Feb 2024 08:33:01 +0000
Subject: [PATCH 03/22] Add info URL and modification timestamp for BGPKIT
---
iyp/crawlers/bgpkit/__init__.py | 20 +++++++++++++++++---
iyp/crawlers/bgpkit/peerstats.py | 4 ++++
iyp/crawlers/bgpkit/pfx2asn.py | 19 ++++++++++++++++---
3 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/iyp/crawlers/bgpkit/__init__.py b/iyp/crawlers/bgpkit/__init__.py
index 1894490..523d078 100644
--- a/iyp/crawlers/bgpkit/__init__.py
+++ b/iyp/crawlers/bgpkit/__init__.py
@@ -1,5 +1,7 @@
import bz2
import json
+import logging
+from datetime import datetime, timezone
import requests
@@ -9,9 +11,9 @@
class AS2RelCrawler(BaseCrawler):
def __init__(self, organization, url, name, af):
"""Initialization: set the address family attribute (af)"""
-
- self.af = af
super().__init__(organization, url, name)
+ self.af = af
+ self.reference['reference_url_info'] = 'https://data.bgpkit.com/as2rel/README.txt'
def run(self):
"""Fetch the AS relationship file from BGPKIT website and process lines one by
@@ -19,7 +21,19 @@ def run(self):
req = requests.get(self.url, stream=True)
if req.status_code != 200:
- raise RequestStatusError('Error while fetching AS relationships')
+ raise RequestStatusError(f'Error while fetching AS relationships: {req.status_code}')
+
+ try:
+ last_modified_str = req.headers['Last-Modified']
+ # All HTTP dates are in UTC:
+ # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
+ last_modified = datetime.strptime(last_modified_str,
+ '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = last_modified
+ except KeyError:
+ logging.warning('No Last-Modified header; will not set modification time.')
+ except ValueError as e:
+ logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')
rels = []
asns = set()
diff --git a/iyp/crawlers/bgpkit/peerstats.py b/iyp/crawlers/bgpkit/peerstats.py
index e70a92a..58cf8cb 100644
--- a/iyp/crawlers/bgpkit/peerstats.py
+++ b/iyp/crawlers/bgpkit/peerstats.py
@@ -17,6 +17,9 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://data.bgpkit.com/peer-stats/README.md'
def run(self):
"""Fetch peer stats for each collector."""
@@ -49,6 +52,7 @@ def run(self):
prev_day -= timedelta(days=1)
logging.warning("Today's data not yet available!")
+ self.reference['reference_time_modification'] = self.now
for collector in collectors:
url = URL.format(collector=collector, year=self.now.year,
month=self.now.month, day=self.now.day,
diff --git a/iyp/crawlers/bgpkit/pfx2asn.py b/iyp/crawlers/bgpkit/pfx2asn.py
index 89f5a93..3f150b2 100644
--- a/iyp/crawlers/bgpkit/pfx2asn.py
+++ b/iyp/crawlers/bgpkit/pfx2asn.py
@@ -4,6 +4,7 @@
import logging
import os
import sys
+from datetime import datetime, timezone
import requests
@@ -22,7 +23,19 @@ def run(self):
req = requests.get(URL, stream=True)
if req.status_code != 200:
- raise RequestStatusError('Error while fetching pfx2as relationships')
+ raise RequestStatusError(f'Error while fetching pfx2as relationships: {req.status_code}')
+
+ try:
+ last_modified_str = req.headers['Last-Modified']
+ # All HTTP dates are in UTC:
+ # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
+ last_modified = datetime.strptime(last_modified_str,
+ '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = last_modified
+ except KeyError:
+ logging.warning('No Last-Modified header; will not set modification time.')
+ except ValueError as e:
+ logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')
entries = []
asns = set()
@@ -35,7 +48,7 @@ def run(self):
req.close()
- logging.info('Pushing nodes to neo4j...\n')
+ logging.info('Pushing nodes to neo4j...')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
self.prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
@@ -48,7 +61,7 @@ def run(self):
links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]}) # Set AS name
- logging.info('Pushing links to neo4j...\n')
+ logging.info('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('ORIGINATE', links)
From abaee2ea4fbe83cd52cb806ffbba151eb6fbad4a Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Mon, 12 Feb 2024 09:02:33 +0000
Subject: [PATCH 04/22] Add info URL and modification timestamp for BGP.Tools
---
iyp/__init__.py | 12 ++++++++++++
iyp/crawlers/bgptools/anycast_prefixes.py | 11 ++++++++++-
iyp/crawlers/bgptools/as_names.py | 4 ++--
iyp/crawlers/bgptools/tags.py | 4 ++--
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/iyp/__init__.py b/iyp/__init__.py
index 31eb2dd..b367f72 100644
--- a/iyp/__init__.py
+++ b/iyp/__init__.py
@@ -8,6 +8,7 @@
from typing import Optional
import requests
+from github import Github
from neo4j import GraphDatabase
BATCH_SIZE = 50000
@@ -79,6 +80,17 @@ def dict2str(d, eq=':', pfx=''):
return '{' + ','.join(data) + '}'
+def get_commit_datetime(repo, file_path):
+ """Get the datetime of the latest commit modifying a file in a GitHub repository.
+
+ repo: The name of the repository in org/repo format, e.g.,
+ "InternetHealthReport/internet-yellow-pages"
+ file_path: The path to the file relative to the repository root, e.g.,
+ "iyp/__init__.py"
+ """
+ return Github().get_repo(repo).get_commits(path=file_path)[0].commit.committer.date
+
+
class RequestStatusError(requests.HTTPError):
def __init__(self, message):
self.message = message
diff --git a/iyp/crawlers/bgptools/anycast_prefixes.py b/iyp/crawlers/bgptools/anycast_prefixes.py
index ea1b447..e096262 100644
--- a/iyp/crawlers/bgptools/anycast_prefixes.py
+++ b/iyp/crawlers/bgptools/anycast_prefixes.py
@@ -6,7 +6,8 @@
import requests
-from iyp import BaseCrawler, ConnectionError, RequestStatusError
+from iyp import (BaseCrawler, ConnectionError, RequestStatusError,
+ get_commit_datetime)
# Organization name and URL to data
ORG = 'BGP.Tools'
@@ -38,6 +39,12 @@ def fetch_dataset(url: str):
class Crawler(BaseCrawler):
# Base Crawler provides access to IYP via self.iyp
# and setup a dictionary with the org/url/today's date in self.reference
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.repo = 'bgptools/anycast-prefixes'
+ self.v4_file = 'anycatch-v4-prefixes.txt'
+ self.v6_file = 'anycatch-v6-prefixes.txt'
+ self.reference['reference_url_info'] = 'https://bgp.tools/kb/anycatch'
def run(self):
ipv4_prefixes_url = get_dataset_url(URL, 4)
@@ -53,12 +60,14 @@ def run(self):
# Fetch data and push to IYP.
# Overriding the reference_url_data according to prefixes
self.reference['reference_url_data'] = ipv4_prefixes_url
+ self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v4_file)
ipv4_prefixes_response = fetch_dataset(ipv4_prefixes_url)
logging.info('IPv4 prefixes fetched successfully.')
self.update(ipv4_prefixes_response, ipv4_prefixes_filename)
logging.info('IPv4 prefixes pushed to IYP.')
self.reference['reference_url_data'] = ipv6_prefixes_url
+ self.reference['reference_time_modification'] = get_commit_datetime(self.repo, self.v6_file)
ipv6_prefixes_response = fetch_dataset(ipv6_prefixes_url)
logging.info('IPv6 prefixes fetched successfully.')
self.update(ipv6_prefixes_response, ipv6_prefixes_filename)
diff --git a/iyp/crawlers/bgptools/as_names.py b/iyp/crawlers/bgptools/as_names.py
index e14adc3..53854a2 100644
--- a/iyp/crawlers/bgptools/as_names.py
+++ b/iyp/crawlers/bgptools/as_names.py
@@ -15,13 +15,13 @@
class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://bgp.tools/kb/api'
self.headers = {
'user-agent': 'IIJ/Internet Health Report - admin@ihr.live'
}
- super().__init__(organization, url, name)
-
def run(self):
"""Fetch the AS name file from BGP.Tools website and push it to IYP."""
diff --git a/iyp/crawlers/bgptools/tags.py b/iyp/crawlers/bgptools/tags.py
index ac93b6e..3a20fa1 100644
--- a/iyp/crawlers/bgptools/tags.py
+++ b/iyp/crawlers/bgptools/tags.py
@@ -37,13 +37,13 @@
class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://bgp.tools/kb/api'
self.headers = {
'user-agent': 'IIJ/Internet Health Report - admin@ihr.live'
}
- super().__init__(organization, url, name)
-
def run(self):
"""Fetch the AS name file from BGP.Tools website and process lines one by
one."""
From fc942c291cbe6aa7f1fab187afbc4c1cff143148 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Mon, 12 Feb 2024 09:30:36 +0000
Subject: [PATCH 05/22] Add info URL and modification timestamp for CAIDA
---
iyp/crawlers/caida/asrank.py | 3 +++
iyp/crawlers/caida/ix_asns.py | 4 ++++
iyp/crawlers/caida/ixs.py | 4 ++++
3 files changed, 11 insertions(+)
diff --git a/iyp/crawlers/caida/asrank.py b/iyp/crawlers/caida/asrank.py
index c565182..3cc8d9c 100644
--- a/iyp/crawlers/caida/asrank.py
+++ b/iyp/crawlers/caida/asrank.py
@@ -16,6 +16,9 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://asrank.caida.org/'
def run(self):
"""Fetch networks information from ASRank and push to IYP."""
diff --git a/iyp/crawlers/caida/ix_asns.py b/iyp/crawlers/caida/ix_asns.py
index a84ff02..4e49ecc 100644
--- a/iyp/crawlers/caida/ix_asns.py
+++ b/iyp/crawlers/caida/ix_asns.py
@@ -3,6 +3,7 @@
import logging
import os
import sys
+from datetime import timezone
import arrow
import flatdict
@@ -35,9 +36,12 @@ def __init__(self, organization, url, name):
else:
# for loop was not 'broken', no file available
raise Exception('No recent CAIDA ix-asns file available')
+ date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
logging.info('going to use this URL: ' + url)
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
+ self.reference['reference_time_modification'] = date
def run(self):
"""Fetch the latest file and process lines one by one."""
diff --git a/iyp/crawlers/caida/ixs.py b/iyp/crawlers/caida/ixs.py
index fdc3f31..5c854d1 100644
--- a/iyp/crawlers/caida/ixs.py
+++ b/iyp/crawlers/caida/ixs.py
@@ -4,6 +4,7 @@
import logging
import os
import sys
+from datetime import timezone
import arrow
import requests
@@ -36,9 +37,12 @@ def __init__(self, organization, url, name):
else:
# for loop was not 'broken', no file available
raise Exception('No recent CAIDA ix-asns file available')
+ date = date.datetime.replace(day=1, hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
logging.info('going to use this URL: ' + url)
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
+ self.reference['reference_time_modification'] = date
def run(self):
"""Fetch the latest file and process lines one by one."""
From 4549db1a479f66331fb43c1b51838e0565c18d49 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 02:30:42 +0000
Subject: [PATCH 06/22] Update CAIDA ix crawlers
We can actually get a more precise time from the first line of the
files.
---
iyp/crawlers/caida/ix_asns.py | 13 ++++++++++++-
iyp/crawlers/caida/ixs.py | 13 ++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/iyp/crawlers/caida/ix_asns.py b/iyp/crawlers/caida/ix_asns.py
index 4e49ecc..6fbb86e 100644
--- a/iyp/crawlers/caida/ix_asns.py
+++ b/iyp/crawlers/caida/ix_asns.py
@@ -3,7 +3,7 @@
import logging
import os
import sys
-from datetime import timezone
+from datetime import datetime, timezone
import arrow
import flatdict
@@ -43,6 +43,16 @@ def __init__(self, organization, url, name):
self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
self.reference['reference_time_modification'] = date
+ def __set_modification_time_from_metadata_line(self, line):
+ try:
+ date_str = json.loads(line.lstrip('#'))['date']
+ date = datetime.strptime(date_str, '%Y.%m.%d %H:%M:%S').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (json.JSONDecodeError, KeyError, ValueError) as e:
+ logging.warning(f'Failed to get modification date from metadata line: {line.strip()}')
+ logging.warning(e)
+ logging.warning('Using date from filename.')
+
def run(self):
"""Fetch the latest file and process lines one by one."""
@@ -56,6 +66,7 @@ def run(self):
# Find all possible values and create corresponding nodes
for line in req.text.splitlines():
if line.startswith('#'):
+ self.__set_modification_time_from_metadata_line(line)
continue
ix = json.loads(line)
diff --git a/iyp/crawlers/caida/ixs.py b/iyp/crawlers/caida/ixs.py
index 5c854d1..2301c09 100644
--- a/iyp/crawlers/caida/ixs.py
+++ b/iyp/crawlers/caida/ixs.py
@@ -4,7 +4,7 @@
import logging
import os
import sys
-from datetime import timezone
+from datetime import datetime, timezone
import arrow
import requests
@@ -44,6 +44,16 @@ def __init__(self, organization, url, name):
self.reference['reference_url_info'] = 'https://publicdata.caida.org/datasets/ixps/README.txt'
self.reference['reference_time_modification'] = date
+ def __set_modification_time_from_metadata_line(self, line):
+ try:
+ date_str = json.loads(line.lstrip('#'))['date']
+ date = datetime.strptime(date_str, '%Y.%m.%d %H:%M:%S').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (json.JSONDecodeError, KeyError, ValueError) as e:
+ logging.warning(f'Failed to get modification date from metadata line: {line.strip()}')
+ logging.warning(e)
+ logging.warning('Using date from filename.')
+
def run(self):
"""Fetch the latest file and process lines one by one."""
@@ -62,6 +72,7 @@ def run(self):
# Find all possible values and create corresponding nodes
for line in req.text.splitlines():
if line.startswith('#'):
+ self.__set_modification_time_from_metadata_line(line)
continue
ix = json.loads(line)
From a1dc4eadcd2ac809b551fb6b3c2d9049701924c6 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 02:57:40 +0000
Subject: [PATCH 07/22] Add info URL and modification timestamp for Cisco
Umbrella
---
iyp/crawlers/cisco/umbrella_top1M.py | 32 +++++++++++++++++++++++++++-
1 file changed, 31 insertions(+), 1 deletion(-)
diff --git a/iyp/crawlers/cisco/umbrella_top1M.py b/iyp/crawlers/cisco/umbrella_top1M.py
index 714681b..5b607f5 100644
--- a/iyp/crawlers/cisco/umbrella_top1M.py
+++ b/iyp/crawlers/cisco/umbrella_top1M.py
@@ -3,6 +3,7 @@
import logging
import os
import sys
+from datetime import datetime, timedelta, timezone
from zipfile import ZipFile
import requests
@@ -17,6 +18,33 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://s3-us-west-1.amazonaws.com/umbrella-static/index.html'
+
+ def __set_modification_time(self):
+ """Set the modification time by looking for the last available historical file.
+ The current (non-historical) file is created on the next day.
+
+ For example, if a file for 2024-02-13 is available, it means the current file
+ was created on 2024-02-14.
+ """
+ hist_url = 'http://s3-us-west-1.amazonaws.com/umbrella-static/top-1m-%Y-%m-%d.csv.zip'
+ date = datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
+ for attempt in range(7):
+ r = requests.head(date.strftime(hist_url))
+ if r.ok:
+ break
+ date -= timedelta(days=1)
+ else:
+ logging.warning(f'Failed to find historical list within search interval (>{date}); '
+ 'Will not set modification time.')
+ return
+
+ # date now points to the last available historical file , which means the
+ # current file is the day after this date.
+ self.reference['reference_time_modification'] = date + timedelta(days=1)
+ logging.info(self.reference)
def run(self):
"""Fetch Umbrella top 1M and push to IYP."""
@@ -26,7 +54,9 @@ def run(self):
logging.info('Downloading latest list...')
req = requests.get(URL)
if req.status_code != 200:
- raise RequestStatusError('Error while fetching Cisco Umbrella Top 1M csv file')
+ raise RequestStatusError(f'Error while fetching Cisco Umbrella Top 1M csv file: {req.status_code}')
+
+ self.__set_modification_time()
links = []
# open zip file and read top list
From d018f491c4e516232a956cf208ffa85d91170597 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 05:26:23 +0000
Subject: [PATCH 08/22] Add info URL for Citizen Lab
It is not clear what the correct value for the modification timestamp
would be.
---
iyp/crawlers/citizenlab/urldb.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/iyp/crawlers/citizenlab/urldb.py b/iyp/crawlers/citizenlab/urldb.py
index 109b3aa..ebd1837 100644
--- a/iyp/crawlers/citizenlab/urldb.py
+++ b/iyp/crawlers/citizenlab/urldb.py
@@ -23,6 +23,9 @@ def generate_url(suffix):
class Crawler(BaseCrawler):
# Base Crawler provides access to IYP via self.iyp
# and set up a dictionary with the org/url/today's date in self.reference
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://github.com/citizenlab/test-lists'
def run(self):
# Fetch country codes to generate urls
From d2386055859d1d0640a41ff3d5165ef7ae7f24ba Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 06:51:25 +0000
Subject: [PATCH 09/22] Add info URL and modification timestamp for Cloudflare
Also remove API-gated reference URL from ranking_bucket crawler.
---
iyp/crawlers/cloudflare/dns_top_locations.py | 19 ++++++++++++++++++-
iyp/crawlers/cloudflare/ranking_bucket.py | 17 +++++++++++++++--
iyp/crawlers/cloudflare/top100.py | 17 +++++++++++++++--
3 files changed, 48 insertions(+), 5 deletions(-)
diff --git a/iyp/crawlers/cloudflare/dns_top_locations.py b/iyp/crawlers/cloudflare/dns_top_locations.py
index 46e46a0..8f40586 100644
--- a/iyp/crawlers/cloudflare/dns_top_locations.py
+++ b/iyp/crawlers/cloudflare/dns_top_locations.py
@@ -9,6 +9,7 @@
import logging
import os
import sys
+from datetime import datetime, timezone
import flatdict
import requests
@@ -42,6 +43,12 @@ def __init__(self, organization, url, name):
# Initialize IYP connection
super().__init__(organization, url, name)
+ # Not super elegant.
+ if name == 'cloudflare.dns_top_ases':
+ self.reference['reference_url_info'] = 'https://developers.cloudflare.com/api/operations/radar-get-dns-top-ases' # noqa: E501
+ elif name == 'cloudflare.dns_top_locations':
+ self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/dns/#top-locations' # noqa: E501
+
# Fetch domain names registered in IYP
existing_dn = self.iyp.tx.run(
f"""MATCH (dn:DomainName)-[r:RANK]-(:Ranking)
@@ -117,7 +124,17 @@ def run(self):
for i, file in enumerate(files):
with open(file, 'rb') as fp:
# Process line one after the other
- for domain_top in json.load(fp)['result'].items():
+ results = json.load(fp)['result']
+ if not self.reference['reference_time_modification']:
+ # Get the reference time from the first file.
+ try:
+ date_str = results['meta']['dateRange'][0]['endTime']
+ date = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (KeyError, ValueError, TypeError) as e:
+ logging.warning(f'Failed to get modification time: {e}')
+
+ for domain_top in results.items():
self.compute_link(domain_top)
if i % 100 == 0:
diff --git a/iyp/crawlers/cloudflare/ranking_bucket.py b/iyp/crawlers/cloudflare/ranking_bucket.py
index b152864..6d9a02e 100644
--- a/iyp/crawlers/cloudflare/ranking_bucket.py
+++ b/iyp/crawlers/cloudflare/ranking_bucket.py
@@ -3,6 +3,7 @@
import logging
import os
import sys
+from datetime import datetime, timezone
import requests
from requests.adapters import HTTPAdapter, Retry
@@ -12,7 +13,7 @@
# Organization name and URL to data
ORG = 'Cloudflare'
URL_DATASETS = 'https://api.cloudflare.com/client/v4/radar/datasets?limit=10&offset=0&datasetType=RANKING_BUCKET&format=json' # noqa: E501
-URL = ''
+URL = 'https://api.cloudflare.com/client/v4/radar/datasets'
URL_DL = 'https://api.cloudflare.com/client/v4/radar/datasets/download'
NAME = 'cloudflare.ranking_bucket'
@@ -27,6 +28,9 @@ class Crawler(BaseCrawler):
#
# Cloudflare ranks second and third level domain names (not host names).
# See https://blog.cloudflare.com/radar-domain-rankings/
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/domain-ranking-datasets/' # noqa: E501
def run(self):
"""Fetch data and push to IYP."""
@@ -61,6 +65,16 @@ def run(self):
datasets = list()
all_domains = set()
for dataset in datasets_json['result']['datasets']:
+ if not self.reference['reference_time_modification']:
+ # Get modification time from first dataset. Should be the same for all
+ # datasets.
+ try:
+ date_str = dataset['meta']['targetDateEnd']
+ date = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (KeyError, ValueError, TypeError) as e:
+ logging.warning(f'Failed to get modification time: {e}')
+
# Get the dataset URL
req = req_session.post(URL_DL, json={'datasetId': dataset['id']})
if req.status_code != 200:
@@ -92,7 +106,6 @@ def run(self):
dataset_title = f'Cloudflare {dataset["title"]}'
logging.info(f'Processing dataset: {dataset_title}')
print(f'Processing dataset: {dataset_title}')
- self.reference['reference_url_data'] = dataset['url']
ranking_id = self.iyp.get_node('Ranking',
{
'name': dataset_title,
diff --git a/iyp/crawlers/cloudflare/top100.py b/iyp/crawlers/cloudflare/top100.py
index a92d5ee..2a6b63d 100644
--- a/iyp/crawlers/cloudflare/top100.py
+++ b/iyp/crawlers/cloudflare/top100.py
@@ -3,6 +3,7 @@
import logging
import os
import sys
+from datetime import datetime, timezone
import requests
@@ -24,6 +25,9 @@ class Crawler(BaseCrawler):
#
# Cloudflare ranks second and third level domain names (not host names).
# See https://blog.cloudflare.com/radar-domain-rankings/
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://developers.cloudflare.com/radar/investigate/domain-ranking-datasets/' # noqa: E501
def run(self):
"""Fetch data and push to IYP."""
@@ -40,10 +44,19 @@ def run(self):
req = requests.get(self.reference['reference_url_data'], headers=headers)
if req.status_code != 200:
print(f'Cannot download data {req.status_code}: {req.text}')
- raise RequestStatusError('Error while fetching data file')
+ raise RequestStatusError(f'Error while fetching data file: {req.status_code}')
+
+ results = req.json()['result']
+
+ try:
+ date_str = results['meta']['dateRange'][0]['endTime']
+ date = datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (KeyError, ValueError, TypeError) as e:
+ logging.warning(f'Failed to get modification time: {e}')
# Process line one after the other
- for i, _ in enumerate(map(self.update, req.json()['result']['top'])):
+ for i, _ in enumerate(map(self.update, results['top'])):
sys.stderr.write(f'\rProcessed {i} lines')
sys.stderr.write('\n')
From 736564dfd8e9abc7107d319055cd372991597e47 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 06:57:19 +0000
Subject: [PATCH 10/22] Add info URL and modification timestamp for emileaben
---
iyp/crawlers/emileaben/as_names.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/iyp/crawlers/emileaben/as_names.py b/iyp/crawlers/emileaben/as_names.py
index cebe71c..66d4b10 100644
--- a/iyp/crawlers/emileaben/as_names.py
+++ b/iyp/crawlers/emileaben/as_names.py
@@ -6,7 +6,7 @@
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import BaseCrawler, RequestStatusError, get_commit_datetime
# Organization name and URL to data
ORG = 'emileaben'
@@ -17,6 +17,10 @@
class Crawler(BaseCrawler):
# Base Crawler provides access to IYP via self.iyp
# and setup a dictionary with the org/url/today's date in self.reference
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://github.com/emileaben/asnames'
+ self.reference['reference_time_modification'] = get_commit_datetime('emileaben/asnames', 'asnames.csv')
def run(self):
# Create a temporary directory
From d37aa4ea0da32fca9bc6ebdb2101ac9e5893d007 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Tue, 13 Feb 2024 10:26:53 +0000
Subject: [PATCH 11/22] Add info URL and modification timestamp for IHR
---
iyp/crawlers/ihr/__init__.py | 15 +++++++------
iyp/crawlers/ihr/country_dependency.py | 30 ++++++++++++++------------
iyp/crawlers/ihr/rov.py | 23 +++++++++++---------
3 files changed, 37 insertions(+), 31 deletions(-)
diff --git a/iyp/crawlers/ihr/__init__.py b/iyp/crawlers/ihr/__init__.py
index cb5f676..9ff8517 100644
--- a/iyp/crawlers/ihr/__init__.py
+++ b/iyp/crawlers/ihr/__init__.py
@@ -1,6 +1,6 @@
import csv
import os
-from datetime import datetime, time, timezone
+from datetime import timezone
import arrow
import lz4.frame
@@ -34,6 +34,7 @@ class HegemonyCrawler(BaseCrawler):
def __init__(self, organization, url, name, af):
self.af = af
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://ihr.iijlab.net/ihr/en-us/documentation#AS_dependency'
def run(self):
"""Fetch data from file and push to IYP."""
@@ -50,12 +51,12 @@ def run(self):
url = self.url.format(year=today.year, month=today.month, day=today.day)
req = requests.head(url)
- self.reference = {
- 'reference_url_data': url,
- 'reference_org': self.organization,
- 'reference_name': self.name,
- 'reference_time_fetch': datetime.combine(today.date(), time.min, timezone.utc)
- }
+ self.reference['reference_url_data'] = url
+ self.reference['reference_time_modification'] = today.datetime.replace(hour=0,
+ minute=0,
+ second=0,
+ microsecond=0,
+ tzinfo=timezone.utc)
os.makedirs('tmp/', exist_ok=True)
os.system(f'wget {url} -P tmp/')
diff --git a/iyp/crawlers/ihr/country_dependency.py b/iyp/crawlers/ihr/country_dependency.py
index 18e2a1e..1a8d4af 100644
--- a/iyp/crawlers/ihr/country_dependency.py
+++ b/iyp/crawlers/ihr/country_dependency.py
@@ -3,7 +3,7 @@
import logging
import os
import sys
-from datetime import datetime, time, timezone
+from datetime import datetime, timezone
import arrow
import iso3166
@@ -37,6 +37,7 @@ def __init__(self, organization, url, name):
self.http_session.mount('https://', HTTPAdapter(max_retries=retries))
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://ihr.iijlab.net/ihr/en-us/documentation#Country_s_network_dependency' # noqa: E501
def run(self):
"""Fetch data from API and push to IYP."""
@@ -49,14 +50,8 @@ def run(self):
raise RequestStatusError('Error while fetching data for ' + cc)
data = json.loads(req.text)
ranking = data['results']
-
- # Setup references
- self.reference = {
- 'reference_org': ORG,
- 'reference_url_data': URL,
- 'reference_name': NAME,
- 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
- }
+ if not ranking:
+ continue
# Setup rankings' node
country_qid = self.iyp.get_node('Country',
@@ -65,15 +60,22 @@ def run(self):
}
)
- countryrank_statements = []
- if country_qid is not None:
- countryrank_statements = [('COUNTRY', country_qid, self.reference)]
-
# Find the latest timebin in the data
last_timebin = '1970-01-01'
for r in ranking:
if arrow.get(r['timebin']) > arrow.get(last_timebin):
last_timebin = r['timebin']
+ self.reference['reference_url_data'] = self.url + f'&timebin={last_timebin}'
+ self.reference['reference_time_modification'] = None
+ try:
+ date = datetime.strptime(last_timebin, '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except ValueError as e:
+ logging.warning(f'Failed to get modification time: {e}')
+
+ countryrank_statements = []
+ if country_qid is not None:
+ countryrank_statements = [('COUNTRY', country_qid, self.reference.copy())]
# Make ranking and push data
links = []
@@ -106,7 +108,7 @@ def run(self):
links.append({
'src_id': self.asn_id[asn['asn']],
'dst_id': self.countryrank_qid,
- 'props': [self.reference, asn]
+ 'props': [self.reference.copy(), asn]
})
# Push links to IYP
diff --git a/iyp/crawlers/ihr/rov.py b/iyp/crawlers/ihr/rov.py
index b8a3e29..4d5daf8 100644
--- a/iyp/crawlers/ihr/rov.py
+++ b/iyp/crawlers/ihr/rov.py
@@ -3,7 +3,7 @@
import logging
import os
import sys
-from datetime import datetime, time, timezone
+from datetime import timezone
import arrow
import lz4.frame
@@ -45,6 +45,9 @@ def close(self):
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://ihr-archive.iijlab.net/ihr/rov/README.txt'
def run(self):
"""Fetch data from file and push to IYP."""
@@ -60,12 +63,12 @@ def run(self):
today = today.shift(days=-1)
url = URL.format(year=today.year, month=today.month, day=today.day)
- self.reference = {
- 'reference_org': ORG,
- 'reference_url_data': url,
- 'reference_name': NAME,
- 'reference_time_fetch': datetime.combine(today.date(), time.min, timezone.utc)
- }
+ self.reference['reference_url_data'] = url
+ self.reference['reference_time_modification'] = today.datetime.replace(hour=0,
+ minute=0,
+ second=0,
+ microsecond=0,
+ tzinfo=timezone.utc)
os.makedirs('tmp/', exist_ok=True)
os.system(f'wget {url} -P tmp/')
@@ -73,7 +76,7 @@ def run(self):
local_filename = 'tmp/' + url.rpartition('/')[2]
self.csv = lz4Csv(local_filename)
- logging.warning('Getting node IDs from neo4j...\n')
+ logging.info('Getting node IDs from neo4j...')
asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn')
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix')
tag_id = self.iyp.batch_get_nodes_by_single_prop('Tag', 'label')
@@ -84,7 +87,7 @@ def run(self):
dep_links = []
country_links = []
- logging.warning('Computing links...\n')
+ logging.info('Computing links...')
for line in csv.reader(self.csv, quotechar='"', delimiter=',', skipinitialspace=True):
# header
# id, timebin, prefix, hege, af, visibility, rpki_status, irr_status,
@@ -158,7 +161,7 @@ def run(self):
self.csv.close()
# Push links to IYP
- logging.warning('Pushing links to neo4j...\n')
+ logging.info('Pushing links to neo4j...')
self.iyp.batch_add_links('ORIGINATE', orig_links)
self.iyp.batch_add_links('CATEGORIZED', tag_links)
self.iyp.batch_add_links('DEPENDS_ON', dep_links)
From 0d920c02417b92353f3462aac2184d5e3681f98f Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 02:02:09 +0000
Subject: [PATCH 12/22] Add info URL and modification timestamp for Internet
Intelligence Lab
---
iyp/crawlers/inetintel/as_org.py | 16 ++++++++++++++++
1 file changed, 16 insertions(+)
diff --git a/iyp/crawlers/inetintel/as_org.py b/iyp/crawlers/inetintel/as_org.py
index 6faa657..71676d9 100644
--- a/iyp/crawlers/inetintel/as_org.py
+++ b/iyp/crawlers/inetintel/as_org.py
@@ -4,6 +4,7 @@
import sys
import tempfile
from collections import defaultdict
+from datetime import datetime, timezone
import pandas as pd
import requests
@@ -40,6 +41,21 @@ def get_latest_dataset_url(github_repo: str, data_dir: str, file_extension: str)
class Crawler(BaseCrawler):
# Base Crawler provides access to IYP via self.iyp
# and set up a dictionary with the org/url/today's date in self.reference
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://github.com/InetIntel/Dataset-AS-to-Organization-Mapping'
+ self.__get_modification_time_from_url()
+
+ def __get_modification_time_from_url(self):
+ expected_suffix = '.json'
+ try:
+ if not URL.endswith(expected_suffix):
+ raise ValueError(f'Expected "{expected_suffix}" file for data URL')
+ _, date_str = URL[:-len(expected_suffix)].rsplit('.', maxsplit=1)
+ date = datetime.strptime(date_str, '%Y-%m').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except ValueError as e:
+ logging.warning(f'Failed to set modification time: {e}')
def run(self):
"""Fetch data and push to IYP."""
From b3081c557609fd5a28f5dc55db79cc868eade498 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 02:02:55 +0000
Subject: [PATCH 13/22] Add info URL and modification timestamp for NRO
---
iyp/crawlers/nro/delegated_stats.py | 17 ++++++++++++++---
1 file changed, 14 insertions(+), 3 deletions(-)
diff --git a/iyp/crawlers/nro/delegated_stats.py b/iyp/crawlers/nro/delegated_stats.py
index ded0bbe..ea96368 100644
--- a/iyp/crawlers/nro/delegated_stats.py
+++ b/iyp/crawlers/nro/delegated_stats.py
@@ -4,6 +4,7 @@
import os
import sys
from collections import defaultdict
+from datetime import datetime, timezone
import requests
@@ -18,6 +19,9 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://www.nro.net/wp-content/uploads/nro-extended-stats-readme5.txt'
def run(self):
"""Fetch the delegated stat file from RIPE website and process lines one by
@@ -43,8 +47,15 @@ def run(self):
if line.strip().startswith('#'):
continue
- # skip version and summary lines
fields_value = line.split('|')
+ # get modification time from version line
+ if len(fields_value) == 7 and fields_value[0].isdigit():
+ try:
+ date = datetime.strptime(fields_value[5], '%Y%m%d').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except ValueError as e:
+ logging.warning(f'Failed to set modification time: {e}')
+ # skip summary lines
if len(fields_value) < 8:
continue
@@ -65,7 +76,7 @@ def run(self):
prefixes.add(prefix)
# Create all nodes
- logging.warning('Pushing nodes to neo4j...\n')
+ logging.warning('Pushing nodes to neo4j...')
opaqueid_id = self.iyp.batch_get_nodes_by_single_prop('OpaqueID', 'id', opaqueids)
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)
@@ -120,7 +131,7 @@ def run(self):
status_links[rec['status'].upper()].append(
{'src_id': prefix_qid, 'dst_id': opaqueid_qid, 'props': [reference]})
- logging.warning('Pusing links to neo4j...\n')
+ logging.warning('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('COUNTRY', country_links)
for label, links in status_links.items():
From 3d3c135d3f213a15ab28d8dd8b71683293720363 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 03:18:34 +0000
Subject: [PATCH 14/22] Add info URL and modification timestamp for OpenINTEL
Also add a proper check for latest available data.
---
iyp/__init__.py | 6 ++
iyp/crawlers/openintel/__init__.py | 94 +++++++++++++++---------------
2 files changed, 54 insertions(+), 46 deletions(-)
diff --git a/iyp/__init__.py b/iyp/__init__.py
index b367f72..00b62dc 100644
--- a/iyp/__init__.py
+++ b/iyp/__init__.py
@@ -121,6 +121,12 @@ def __init__(self, message):
super().__init__(self.message)
+class DataNotAvailableError(Exception):
+ def __init__(self, message):
+ self.message = message
+ super().__init__(self.message)
+
+
class IYP(object):
def __init__(self):
diff --git a/iyp/crawlers/openintel/__init__.py b/iyp/crawlers/openintel/__init__.py
index 597e754..ddb5415 100644
--- a/iyp/crawlers/openintel/__init__.py
+++ b/iyp/crawlers/openintel/__init__.py
@@ -1,7 +1,6 @@
# Simple Python script to fetch domain name to IP address mappings from OpenINTEL data
# OpenIntelCrawler is based on code from Mattijs Jonker
-import argparse
import json
import logging
import os
@@ -15,7 +14,7 @@
import pandas as pd
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import BaseCrawler, DataNotAvailableError
TMP_DIR = './tmp'
os.makedirs(TMP_DIR, exist_ok=True)
@@ -29,13 +28,11 @@
OPENINTEL_ACCESS_KEY = config['openintel']['access_key']
OPENINTEL_SECRET_KEY = config['openintel']['secret_key']
-
-def valid_date(s):
- try:
- return datetime.strptime(s, '%Y-%m-%d')
- except ValueError:
- msg = 'not a valid ISO 8601 date: {0!r}'.format(s)
- raise argparse.ArgumentTypeError(msg)
+# We use the AWS interface to get data, but can not provide AWS URLs as data source, so
+# at least for the Tranco and Umbrella datasets we can point to the publicly available
+# archives.
+TRANCO_REFERENCE_URL_DATA_FMT = 'https://data.openintel.nl/data/tranco1m/%Y/openintel-tranco1m-%Y%m%d.tar'
+UMBRELLA_REFERENCE_URL_DATA_FMT = 'https://data.openintel.nl/data/umbrella1m/%Y/openintel-umbrella1m-%Y%m%d.tar'
class OpenIntelCrawler(BaseCrawler):
@@ -45,6 +42,11 @@ def __init__(self, organization, url, name, dataset):
self.dataset = dataset
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://www.openintel.nl/'
+ if dataset == 'tranco':
+ self.reference['reference_url_info'] = 'https://data.openintel.nl/data/tranco1m'
+ elif dataset == 'umbrella':
+ self.reference['reference_url_info'] = 'https://data.openintel.nl/data/umbrella1m'
def get_parquet(self):
"""Fetch the forward DNS data, populate a data frame, and process lines one by
@@ -72,48 +74,40 @@ def get_parquet(self):
# OpenINTEL measurement data objects base prefix
FDNS_WAREHOUSE_S3 = 'category=fdns/type=warehouse'
- # check on the website if yesterday's data is available
- yesterday = arrow.utcnow().shift(days=-1)
- # FIXME Check at the proper place. Remove flake8 exception afterwards.
- # flake8: noqa
- # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day)
- # try:
- # req = requests.head(url)
-
- # attempt = 3
- # while req.status_code != 200 and attempt > 0:
- # print(req.status_code)
- # attempt -= 1
- # yesterday = yesterday.shift(days=-1)
- # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day)
- # req = requests.head(url)
-
- # except requests.exceptions.ConnectionError:
- # logging.warning("Cannot reach OpenINTEL website, try yesterday's data")
- # yesterday = arrow.utcnow().shift(days=-1)
- # url = URL.format(year=yesterday.year, month=yesterday.month, day=yesterday.day)
-
- logging.warning(f'Fetching data for {yesterday}')
+ # Get latest available data.
+ date = arrow.utcnow()
+ for lookback_days in range(6):
+ objects = list(WAREHOUSE_BUCKET.objects.filter(
+ # Build a partition path for the given source and date
+ Prefix=os.path.join(
+ FDNS_WAREHOUSE_S3,
+ 'source={}'.format(self.dataset),
+ 'year={}'.format(date.year),
+ 'month={:02d}'.format(date.month),
+ 'day={:02d}'.format(date.day)
+ )).all())
+ if len(objects) > 0:
+ break
+ date = date.shift(days=-1)
+ else:
+ logging.error('Failed to find data within the specified lookback interval.')
+ raise DataNotAvailableError('Failed to find data within the specified lookback interval.')
+ self.reference['reference_time_modification'] = \
+ date.datetime.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc)
+ if self.dataset == 'tranco':
+ self.reference['reference_url_data'] = date.strftime(TRANCO_REFERENCE_URL_DATA_FMT)
+ elif self.dataset == 'umbrella':
+ self.reference['reference_url_data'] = date.strftime(UMBRELLA_REFERENCE_URL_DATA_FMT)
- # Start one day before ? # TODO remove this line?
- yesterday = yesterday.shift(days=-1)
+ logging.info(f'Fetching data for {date.strftime("%Y-%m-%d")}')
# Iterate objects in bucket with given (source, date)-partition prefix
- for i_obj in WAREHOUSE_BUCKET.objects.filter(
- # Build a partition path for the given source and date
- Prefix=os.path.join(
- FDNS_WAREHOUSE_S3,
- 'source={}'.format(self.dataset),
- 'year={}'.format(yesterday.year),
- 'month={:02d}'.format(yesterday.month),
- 'day={:02d}'.format(yesterday.day)
- )
- ):
+ for i_obj in objects:
# Open a temporary file to download the Parquet object into
with tempfile.NamedTemporaryFile(mode='w+b',
dir=TMP_DIR,
- prefix='{}.'.format(yesterday.date().isoformat()),
+ prefix='{}.'.format(date.date().isoformat()),
suffix='.parquet',
delete=True) as tempFile:
@@ -249,6 +243,7 @@ class DnsDependencyCrawler(BaseCrawler):
def __init__(self, organization, url, name):
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://dnsgraph.dacs.utwente.nl'
@staticmethod
def remove_root(name):
@@ -279,7 +274,14 @@ def run(self):
break
else:
logging.error('Failed to find data within the specified lookback interval.')
- raise RequestStatusError('Failed to find data within the specified lookback interval.')
+ raise DataNotAvailableError('Failed to find data within the specified lookback interval.')
+
+ # Shift to Monday and set to midnight.
+ mod_date = (current_date - timedelta(days=current_date.weekday())).replace(hour=0,
+ minute=0,
+ second=0,
+ microsecond=0)
+ self.reference['reference_time_modification'] = mod_date
logging.info('Reading connections')
connections = pd.read_json(f'{base_url}/connections.json.gz', lines=True)
@@ -289,7 +291,7 @@ def run(self):
# Currently there are only DOMAIN and HOSTNAME entries in from_nodeType, but
# maybe that changes in the future.
connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'] = \
- connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'].map(self.remove_root)
+ connections.loc[connections['from_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'from_nodeKey'].map(self.remove_root) # noqa: E501
connections.loc[connections['to_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'to_nodeKey'] = \
connections.loc[connections['to_nodeType'].isin(('DOMAIN', 'HOSTNAME')), 'to_nodeKey'].map(self.remove_root)
# Normalize IPv6 addresses.
From d2fac4a2151a69e40f2b65e62aae1412bb01f04d Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 04:20:43 +0000
Subject: [PATCH 15/22] Add info URL and modification timestamp for Packet
Clearing House
---
iyp/crawlers/pch/__init__.py | 28 ++++++++++++----------------
1 file changed, 12 insertions(+), 16 deletions(-)
diff --git a/iyp/crawlers/pch/__init__.py b/iyp/crawlers/pch/__init__.py
index df258f3..7483ef1 100644
--- a/iyp/crawlers/pch/__init__.py
+++ b/iyp/crawlers/pch/__init__.py
@@ -15,7 +15,8 @@
from requests_futures.sessions import FuturesSession
from urllib3.util.retry import Retry
-from iyp import AddressValueError, BaseCrawler, CacheHandler
+from iyp import (AddressValueError, BaseCrawler, CacheHandler,
+ DataNotAvailableError)
from iyp.crawlers.pch.show_bgp_parser import ShowBGPParser
PARALLEL_DOWNLOADS = 8
@@ -61,6 +62,7 @@ def __init__(self, organization: str, url: str, name: str, af: int):
self.collector_site_url = str()
self.__initialize_session()
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://www.pch.net/resources/Routing_Data/'
def __initialize_session(self) -> None:
self.session = FuturesSession(max_workers=PARALLEL_DOWNLOADS)
@@ -120,16 +122,14 @@ def fetch_collector_site(self) -> str:
lookback = today - self.MAX_LOOKBACK
if lookback.month == curr_month:
logging.error('Failed to find current data.')
- print('Failed to find current data.', file=sys.stderr)
- return str()
+ raise DataNotAvailableError('Failed to find current data.')
self.collector_site_url = self.url + today.strftime('%Y/%m/')
resp: Response = self.session.get(self.collector_site_url).result()
if resp.ok:
return resp.text
logging.warning(f'Failed to retrieve collector site from: {self.collector_site_url}')
logging.error('Failed to find current data.')
- print('Failed to find current data.', file=sys.stderr)
- return str()
+ raise DataNotAvailableError('Failed to find current data.')
@staticmethod
def filter_route_collector_links(links: ResultSet) -> list:
@@ -175,10 +175,9 @@ def probe_latest_set(self, collector_name: str) -> datetime:
return curr_date
curr_date -= timedelta(days=1)
logging.error('Failed to find current data.')
- print('Failed to find current data.', file=sys.stderr)
- return None
+ raise DataNotAvailableError('Failed to find current data.')
- def fetch(self) -> bool:
+ def fetch(self) -> None:
"""Fetch and cache all data.
First get a list of collector names and their associated files. Then fetch the
@@ -202,8 +201,6 @@ def fetch(self) -> bool:
self.collector_site_url, collector_names = self.cache_handler.load_cached_object(collector_names_name)
else:
collector_site = self.fetch_collector_site()
- if not collector_site:
- return True
soup = BeautifulSoup(collector_site, features='html.parser')
links = soup.find_all('a')
collector_names = self.filter_route_collector_links(links)
@@ -218,8 +215,10 @@ def fetch(self) -> bool:
# files (one per collector) if the data for the current date
# is not yet available for all collectors.
latest_available_date = self.probe_latest_set(collector_names[0])
- if latest_available_date is None:
- return True
+ self.reference['reference_time_modification'] = latest_available_date.replace(hour=0,
+ minute=0,
+ second=0,
+ microsecond=0)
curr_date = datetime.now(tz=timezone.utc)
max_lookback = curr_date - self.MAX_LOOKBACK
@@ -270,14 +269,11 @@ def fetch(self) -> bool:
print(f'Failed to find current data for {len(failed_fetches)} collectors: {failed_fetches}',
file=sys.stderr)
- return False
-
def run(self) -> None:
"""Fetch data from PCH, parse the files, and push nodes and relationships to the
database."""
# Pre-fetch all data.
- if self.fetch():
- return
+ self.fetch()
# Parse files in parallel.
logging.info(f'Parsing {len(self.collector_files)} collector files.')
From 9fa906a55ebe5cc56b41f0075623e84a36f5d0da Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 04:57:59 +0000
Subject: [PATCH 16/22] Add info URL and modification timestamp for PeeringDB
---
iyp/crawlers/peeringdb/fac.py | 8 ++++++--
iyp/crawlers/peeringdb/ix.py | 33 +++++++++++++++++++++++++++------
iyp/crawlers/peeringdb/org.py | 8 ++++++--
3 files changed, 39 insertions(+), 10 deletions(-)
diff --git a/iyp/crawlers/peeringdb/fac.py b/iyp/crawlers/peeringdb/fac.py
index 834e96f..d86dbd6 100644
--- a/iyp/crawlers/peeringdb/fac.py
+++ b/iyp/crawlers/peeringdb/fac.py
@@ -10,7 +10,8 @@
import requests_cache
from iyp import BaseCrawler
-from iyp.crawlers.peeringdb.ix import handle_social_media
+from iyp.crawlers.peeringdb.ix import (handle_social_media,
+ set_reference_time_from_metadata)
# NOTES This script should be executed after peeringdb.org
@@ -44,6 +45,7 @@ def __init__(self, organization, url, name):
self.requests = requests_cache.CachedSession(os.path.join(CACHE_DIR, ORG), expire_after=CACHE_DURATION)
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20fac'
def run(self):
"""Fetch facilities information from PeeringDB and push to IYP."""
@@ -54,7 +56,9 @@ def run(self):
logging.error('Error while fetching peeringDB data')
raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}')
- facilities = json.loads(req.text)['data']
+ result = req.json()
+ set_reference_time_from_metadata(self.reference, result)
+ facilities = result['data']
# compute nodes
facs = set()
diff --git a/iyp/crawlers/peeringdb/ix.py b/iyp/crawlers/peeringdb/ix.py
index 2b86e78..7263166 100644
--- a/iyp/crawlers/peeringdb/ix.py
+++ b/iyp/crawlers/peeringdb/ix.py
@@ -58,6 +58,15 @@ def handle_social_media(d: dict, website_set: set = None):
d[f'social_media_{service}'] = identifier
+def set_reference_time_from_metadata(reference_dict, data):
+ try:
+ generated_timestamp = data['meta']['generated']
+ date = datetime.fromtimestamp(generated_timestamp, tz=timezone.utc)
+ reference_dict['reference_time_modification'] = date
+ except (KeyError, ValueError) as e:
+ logging.warning(f'Failed to set modification time: {e}')
+
+
class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
"""Initialisation for pushing peeringDB IXPs to IYP."""
@@ -68,21 +77,27 @@ def __init__(self, organization, url, name):
'reference_org': ORG,
'reference_name': NAME,
'reference_url_data': URL_PDB_IXS,
- 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
+ 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20ix',
+ 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc),
+ 'reference_time_modification': None
}
self.reference_lan = {
'reference_org': ORG,
'reference_name': NAME,
'reference_url_data': URL_PDB_LANS,
- 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
+ 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20ixlan',
+ 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc),
+ 'reference_time_modification': None
}
self.reference_netfac = {
'reference_org': ORG,
'reference_name': NAME,
'reference_url_data': URL_PDB_NETFAC,
- 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc)
+ 'reference_url_info': 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20netfac',
+ 'reference_time_fetch': datetime.combine(datetime.utcnow(), time.min, timezone.utc),
+ 'reference_time_modification': None
}
# keep track of added networks
@@ -110,7 +125,9 @@ def run(self):
logging.error(f'Error while fetching IXs data\n({req.status_code}) {req.text}')
raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}')
- self.ixs = json.loads(req.text)['data']
+ result = req.json()
+ set_reference_time_from_metadata(self.reference_ix, result)
+ self.ixs = result['data']
# Register IXPs
logging.warning('Pushing IXP info...')
@@ -122,7 +139,9 @@ def run(self):
logging.error(f'Error while fetching IXLANs data\n({req.status_code}) {req.text}')
raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}')
- ixlans = json.loads(req.text)['data']
+ result = req.json()
+ set_reference_time_from_metadata(self.reference_lan, result)
+ ixlans = result['data']
# index ixlans by their id
self.ixlans = {}
@@ -139,7 +158,9 @@ def run(self):
logging.error(f'Error while fetching IXLANs data\n({req.status_code}) {req.text}')
raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}')
- self.netfacs = json.loads(req.text)['data']
+ result = req.json()
+ set_reference_time_from_metadata(self.reference_netfac, result)
+ self.netfacs = result['data']
self.register_net_fac()
def register_net_fac(self):
diff --git a/iyp/crawlers/peeringdb/org.py b/iyp/crawlers/peeringdb/org.py
index c1d3301..f1f1beb 100644
--- a/iyp/crawlers/peeringdb/org.py
+++ b/iyp/crawlers/peeringdb/org.py
@@ -10,7 +10,8 @@
import requests_cache
from iyp import BaseCrawler
-from iyp.crawlers.peeringdb.ix import handle_social_media
+from iyp.crawlers.peeringdb.ix import (handle_social_media,
+ set_reference_time_from_metadata)
ORG = 'PeeringDB'
@@ -41,6 +42,7 @@ def __init__(self, organization, url, name):
self.requests = requests_cache.CachedSession(os.path.join(CACHE_DIR, ORG), expire_after=CACHE_DURATION)
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://www.peeringdb.com/apidocs/#tag/api/operation/list%20org'
def run(self):
"""Fetch organizations information from PeeringDB and push to IYP."""
@@ -50,7 +52,9 @@ def run(self):
logging.error('Error while fetching peeringDB data')
raise Exception(f'Cannot fetch peeringdb data, status code={req.status_code}\n{req.text}')
- organizations = json.loads(req.text)['data']
+ result = req.json()
+ set_reference_time_from_metadata(self.reference, result)
+ organizations = result['data']
# compute nodes
orgs = set()
From e77e7bd338bd2fa67c7cbff147ede85475f283f0 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 05:16:04 +0000
Subject: [PATCH 17/22] Move datetime from Last-Modified header to shared
function
---
iyp/__init__.py | 17 +++++++++++++++++
iyp/crawlers/bgpkit/__init__.py | 17 +++--------------
iyp/crawlers/bgpkit/pfx2asn.py | 16 +++-------------
3 files changed, 23 insertions(+), 27 deletions(-)
diff --git a/iyp/__init__.py b/iyp/__init__.py
index 00b62dc..f9fccef 100644
--- a/iyp/__init__.py
+++ b/iyp/__init__.py
@@ -91,6 +91,23 @@ def get_commit_datetime(repo, file_path):
return Github().get_repo(repo).get_commits(path=file_path)[0].commit.committer.date
+def set_modification_time_from_last_modified_header(reference, response):
+ """Set the reference_time_modification field of the specified reference dict to the
+ datetime parsed from the Last-Modified header of the specified response if
+ possible."""
+ try:
+ last_modified_str = response.headers['Last-Modified']
+ # All HTTP dates are in UTC:
+ # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
+ last_modified = datetime.strptime(last_modified_str,
+ '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
+ reference['reference_time_modification'] = last_modified
+ except KeyError:
+ logging.warning('No Last-Modified header; will not set modification time.')
+ except ValueError as e:
+ logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')
+
+
class RequestStatusError(requests.HTTPError):
def __init__(self, message):
self.message = message
diff --git a/iyp/crawlers/bgpkit/__init__.py b/iyp/crawlers/bgpkit/__init__.py
index 523d078..e5d8e3b 100644
--- a/iyp/crawlers/bgpkit/__init__.py
+++ b/iyp/crawlers/bgpkit/__init__.py
@@ -1,11 +1,10 @@
import bz2
import json
-import logging
-from datetime import datetime, timezone
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import (BaseCrawler, RequestStatusError,
+ set_modification_time_from_last_modified_header)
class AS2RelCrawler(BaseCrawler):
@@ -23,17 +22,7 @@ def run(self):
if req.status_code != 200:
raise RequestStatusError(f'Error while fetching AS relationships: {req.status_code}')
- try:
- last_modified_str = req.headers['Last-Modified']
- # All HTTP dates are in UTC:
- # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
- last_modified = datetime.strptime(last_modified_str,
- '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
- self.reference['reference_time_modification'] = last_modified
- except KeyError:
- logging.warning('No Last-Modified header; will not set modification time.')
- except ValueError as e:
- logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')
+ set_modification_time_from_last_modified_header(self.reference, req)
rels = []
asns = set()
diff --git a/iyp/crawlers/bgpkit/pfx2asn.py b/iyp/crawlers/bgpkit/pfx2asn.py
index 3f150b2..76d0b69 100644
--- a/iyp/crawlers/bgpkit/pfx2asn.py
+++ b/iyp/crawlers/bgpkit/pfx2asn.py
@@ -4,11 +4,11 @@
import logging
import os
import sys
-from datetime import datetime, timezone
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import (BaseCrawler, RequestStatusError,
+ set_modification_time_from_last_modified_header)
URL = 'https://data.bgpkit.com/pfx2as/pfx2as-latest.json.bz2'
ORG = 'BGPKIT'
@@ -25,17 +25,7 @@ def run(self):
if req.status_code != 200:
raise RequestStatusError(f'Error while fetching pfx2as relationships: {req.status_code}')
- try:
- last_modified_str = req.headers['Last-Modified']
- # All HTTP dates are in UTC:
- # https://www.rfc-editor.org/rfc/rfc2616#section-3.3.1
- last_modified = datetime.strptime(last_modified_str,
- '%a, %d %b %Y %H:%M:%S %Z').replace(tzinfo=timezone.utc)
- self.reference['reference_time_modification'] = last_modified
- except KeyError:
- logging.warning('No Last-Modified header; will not set modification time.')
- except ValueError as e:
- logging.error(f'Failed to parse Last-Modified header "{last_modified_str}": {e}')
+ set_modification_time_from_last_modified_header(self.reference, req)
entries = []
asns = set()
From 25ae40220f54dace31abeee1665cfc2cd7ce2b57 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 05:17:59 +0000
Subject: [PATCH 18/22] Add info URL and modification timestamp for RIPE
---
iyp/crawlers/ripe/as_names.py | 7 +++++--
iyp/crawlers/ripe/atlas_measurements.py | 3 +++
iyp/crawlers/ripe/atlas_probes.py | 3 +++
iyp/crawlers/ripe/roa.py | 6 ++++--
4 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/iyp/crawlers/ripe/as_names.py b/iyp/crawlers/ripe/as_names.py
index bd4ea21..609cdca 100644
--- a/iyp/crawlers/ripe/as_names.py
+++ b/iyp/crawlers/ripe/as_names.py
@@ -5,7 +5,8 @@
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import (BaseCrawler, RequestStatusError,
+ set_modification_time_from_last_modified_header)
URL = 'https://ftp.ripe.net/ripe/asnames/asn.txt'
ORG = 'RIPE NCC'
@@ -19,7 +20,9 @@ def run(self):
req = requests.get(URL)
if req.status_code != 200:
- raise RequestStatusError('Error while fetching AS names')
+ raise RequestStatusError(f'Error while fetching AS names: {req.status_code}')
+
+ set_modification_time_from_last_modified_header(self.reference, req)
lines = []
asns = set()
diff --git a/iyp/crawlers/ripe/atlas_measurements.py b/iyp/crawlers/ripe/atlas_measurements.py
index 1987e6f..25c7b77 100644
--- a/iyp/crawlers/ripe/atlas_measurements.py
+++ b/iyp/crawlers/ripe/atlas_measurements.py
@@ -24,6 +24,9 @@ class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
self.__initialize_session()
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://atlas.ripe.net/docs/apis/rest-api-manual/measurements/'
+ # Atlas API is real-time, i.e., we can use the same timestamp.
+ self.reference['reference_time_modification'] = self.reference['reference_time_fetch']
def __initialize_session(self) -> None:
self.session = Session()
diff --git a/iyp/crawlers/ripe/atlas_probes.py b/iyp/crawlers/ripe/atlas_probes.py
index 03c1ddb..2a94c61 100644
--- a/iyp/crawlers/ripe/atlas_probes.py
+++ b/iyp/crawlers/ripe/atlas_probes.py
@@ -24,6 +24,9 @@ class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
self.__initialize_session()
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://atlas.ripe.net/docs/apis/rest-api-manual/probes/'
+ # Atlas API is real-time, i.e., we can use the same timestamp.
+ self.reference['reference_time_modification'] = self.reference['reference_time_fetch']
def __initialize_session(self) -> None:
self.session = Session()
diff --git a/iyp/crawlers/ripe/roa.py b/iyp/crawlers/ripe/roa.py
index b4a62af..aeaae42 100644
--- a/iyp/crawlers/ripe/roa.py
+++ b/iyp/crawlers/ripe/roa.py
@@ -4,7 +4,7 @@
import os
import sys
from collections import defaultdict
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from io import BytesIO
import requests
@@ -23,7 +23,7 @@ class Crawler(BaseCrawler):
def __init__(self, organization, url, name):
"""Initialize IYP and statements for pushed data."""
- now = datetime.utcnow()
+ now = datetime.now(tz=timezone.utc)
self.date_path = f'{now.year}/{now.month:02d}/{now.day:02d}'
# Check if today's data is available
@@ -36,6 +36,8 @@ def __init__(self, organization, url, name):
logging.warning("Using yesterday's data: " + self.date_path)
super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://rpki-study.github.io/rpki-archive/'
+ self.reference['reference_time_modification'] = now.replace(hour=0, minute=0, second=0)
def run(self):
"""Fetch data from RIPE and push to IYP."""
From b5b3bf4fc95f3514d9b5b3abb8afad2492c621c2 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 05:35:49 +0000
Subject: [PATCH 19/22] Add info URL and modification timestamp for Stanford
---
iyp/crawlers/stanford/asdb.py | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/iyp/crawlers/stanford/asdb.py b/iyp/crawlers/stanford/asdb.py
index 9aaecca..b32186c 100644
--- a/iyp/crawlers/stanford/asdb.py
+++ b/iyp/crawlers/stanford/asdb.py
@@ -4,7 +4,7 @@
import os
import re
import sys
-from datetime import datetime
+from datetime import datetime, timezone
import bs4
import requests
@@ -32,6 +32,19 @@ def get_latest_asdb_dataset_url(asdb_stanford_data_url: str, file_name_format: s
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://asdb.stanford.edu/'
+ self.__set_modification_time_from_url()
+
+ def __set_modification_time_from_url(self):
+ fmt = 'https://asdb.stanford.edu/data/%Y-%m_categorized_ases.csv'
+ try:
+ date = datetime.strptime(URL, fmt).replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except ValueError as e:
+ logging.warning(f'Failed to set modification time: {e}')
+
def run(self):
"""Fetch the ASdb file and push it to IYP."""
From 51f411e8260bf04e4e38c0c94d741f77b62c8e1b Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 05:38:13 +0000
Subject: [PATCH 20/22] Add info URL and modification timestamp for Tranco
---
iyp/crawlers/tranco/top1M.py | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git a/iyp/crawlers/tranco/top1M.py b/iyp/crawlers/tranco/top1M.py
index e4b1923..6b4eab8 100644
--- a/iyp/crawlers/tranco/top1M.py
+++ b/iyp/crawlers/tranco/top1M.py
@@ -7,7 +7,8 @@
import requests
-from iyp import BaseCrawler, RequestStatusError
+from iyp import (BaseCrawler, RequestStatusError,
+ set_modification_time_from_last_modified_header)
# URL to Tranco top 1M
URL = 'https://tranco-list.eu/top-1m.csv.zip'
@@ -16,6 +17,19 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://tranco-list.eu/methodology'
+
+ def __set_data_url(self):
+ """Set the data URL using the permanent ID of the current list, which stays
+ valid once the permalink is updated."""
+ try:
+ res = requests.get('https://tranco-list.eu/top-1m-id')
+ res.raise_for_status()
+ self.reference['reference_url_data'] = f'https://tranco-list.eu/download_daily/{res.text}'
+ except requests.HTTPError as e:
+ logging.warning(f'Failed to update data URL: {e}')
def run(self):
"""Fetch Tranco top 1M and push to IYP."""
@@ -27,6 +41,9 @@ def run(self):
if req.status_code != 200:
raise RequestStatusError('Error while fetching Tranco csv file')
+ set_modification_time_from_last_modified_header(self.reference, req)
+ self.__set_data_url()
+
links = []
domains = set()
# open zip file and read top list
From 9d79c1198c9f38d603a27730788a77bffea83d8e Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Wed, 14 Feb 2024 05:52:33 +0000
Subject: [PATCH 21/22] Add info URL and modification timestamp for Virginia
Tech
---
iyp/crawlers/virginiatech/rovista.py | 27 +++++++++++++++++++++------
1 file changed, 21 insertions(+), 6 deletions(-)
diff --git a/iyp/crawlers/virginiatech/rovista.py b/iyp/crawlers/virginiatech/rovista.py
index a11b4a9..b72e734 100644
--- a/iyp/crawlers/virginiatech/rovista.py
+++ b/iyp/crawlers/virginiatech/rovista.py
@@ -2,6 +2,7 @@
import logging
import os
import sys
+from datetime import datetime, timezone
import requests
@@ -13,10 +14,21 @@
class Crawler(BaseCrawler):
+ def __init__(self, organization, url, name):
+ super().__init__(organization, url, name)
+ self.reference['reference_url_info'] = 'https://rovista.netsecurelab.org/'
+
+ def __set_modification_time(self, entry):
+ try:
+ date_str = entry['lastUpdatedDate']
+ date = datetime.strptime(date_str, '%Y-%m-%d').replace(tzinfo=timezone.utc)
+ self.reference['reference_time_modification'] = date
+ except (KeyError, ValueError) as e:
+ logging.warning(f'Failed to set modification time: {e}')
def run(self):
"""Get RoVista data from their API."""
- batch_size = 1000 # Adjust batch size as needed
+ batch_size = 1000
offset = 0
entries = []
asns = set()
@@ -25,26 +37,29 @@ def run(self):
# Make a request with the current offset
response = requests.get(URL, params={'offset': offset, 'count': batch_size})
if response.status_code != 200:
- raise RequestStatusError('Error while fetching RoVista data')
+ raise RequestStatusError(f'Error while fetching RoVista data: {response.status_code}')
data = response.json().get('data', [])
for entry in data:
+ if not self.reference['reference_time_modification']:
+ self.__set_modification_time(entry)
asns.add(entry['asn'])
if entry['ratio'] > 0.5:
- entries.append({'asn': entry['asn'], 'ratio': entry['ratio'], 'label': 'Validating RPKI ROV'})
+ entries.append({'asn': entry['asn'], 'ratio': entry['ratio']})
else:
- entries.append({'asn': entry['asn'], 'ratio': entry['ratio'], 'label': 'Not Validating RPKI ROV'})
+ entries.append({'asn': entry['asn'], 'ratio': entry['ratio']})
# Move to the next page
offset += 1
# Break the loop if there's no more data
if len(data) < batch_size:
break
+
logging.info('Pushing nodes to neo4j...')
# get ASNs and prefixes IDs
self.asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn', asns)
- tag_id_not_valid = self.iyp.get_node('Tag', {'label': 'Not Validating RPKI ROV'}, create=True)
- tag_id_valid = self.iyp.get_node('Tag', {'label': 'Validating RPKI ROV'}, create=True)
+ tag_id_not_valid = self.iyp.get_node('Tag', {'label': 'Not Validating RPKI ROV'})
+ tag_id_valid = self.iyp.get_node('Tag', {'label': 'Validating RPKI ROV'})
# Compute links
links = []
for entry in entries:
From fd6f8be2b30ee9f28477fae072c64884f1d40d67 Mon Sep 17 00:00:00 2001
From: Malte Tashiro
Date: Thu, 15 Feb 2024 04:10:37 +0000
Subject: [PATCH 22/22] Add timezone support to Alice-LG crawler
AMS-IX does not use UTC.
---
iyp/crawlers/alice_lg/__init__.py | 24 +++++++++++++++---------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git a/iyp/crawlers/alice_lg/__init__.py b/iyp/crawlers/alice_lg/__init__.py
index 8f7f685..d7bbdfe 100644
--- a/iyp/crawlers/alice_lg/__init__.py
+++ b/iyp/crawlers/alice_lg/__init__.py
@@ -3,7 +3,7 @@
import os
from collections import defaultdict
from concurrent.futures import as_completed
-from datetime import datetime, timezone
+from datetime import datetime
from json import JSONDecodeError
from typing import Iterable, Tuple
@@ -213,16 +213,22 @@ def __fetch_neighbors(self) -> None:
# valid ISO format...
try:
pre, suf = cached_at_str.rsplit('.', maxsplit=1)
- if not suf or not suf.endswith('Z'):
- raise ValueError(f'Fractional seconds missing or timestamp not ending with "Z" (not UTC): '
- f'{cached_at_str}')
- suf = suf[:-1] # Strip "Z".
- if not suf.isdigit():
+ if suf.endswith('Z'):
+ # UTC
+ frac_seconds = suf[:-1]
+ tz_suffix = '+00:00'
+ elif '+' in suf:
+ # Hopefully a timezone identifier of form +HH:MM
+ frac_seconds, tz_suffix = suf.split('+')
+ tz_suffix = '+' + tz_suffix
+ else:
+ raise ValueError(f'Failed to get timezone from timestamp :{cached_at_str}')
+ if not frac_seconds.isdigit():
raise ValueError(f'Fractional seconds are not digits: {cached_at_str}')
# Reduce to six digits (ms).
- suf = suf[:6]
- cached_at_str = f'{pre}.{suf}'
- cached_at = datetime.fromisoformat(cached_at_str).replace(tzinfo=timezone.utc)
+ frac_seconds = frac_seconds[:6]
+ cached_at_str = f'{pre}.{frac_seconds}{tz_suffix}'
+ cached_at = datetime.fromisoformat(cached_at_str)
except ValueError as e:
logging.warning(f'Failed to get cached_at timestamp for routeserver "{routeserver_id}": {e}')
if cached_at: