Skip to content

Commit

Permalink
Merge pull request InternetHealthReport#136 from m-appel/112-better-f…
Browse files Browse the repository at this point in the history
…ormatting-checks-for-ip-prefixes-and-countries

Ensure proper formatting of IPs and IP prefixes.
  • Loading branch information
romain-fontugne authored May 28, 2024
2 parents 08a820e + 579a252 commit ffaf2f2
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 82 deletions.
6 changes: 3 additions & 3 deletions iyp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import bz2
import glob
import ipaddress
import logging
import os
import pickle
Expand All @@ -16,9 +17,8 @@
prop_formatters = {
# asn is stored as an int
'asn': int,
# ipv6 is stored in lowercase
'ip': str.lower,
'prefix': str.lower,
'ip': lambda s: ipaddress.ip_address(s).compressed,
'prefix': lambda s: ipaddress.ip_network(s).compressed,
# country code is kept in capital letter
'country_code': lambda s: str.upper(str.strip(s))
}
Expand Down
11 changes: 9 additions & 2 deletions iyp/crawlers/bgpkit/pfx2asn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import os
import sys
from ipaddress import ip_network

import requests

Expand Down Expand Up @@ -32,7 +33,13 @@ def run(self):
prefixes = set()

for entry in json.load(bz2.open(req.raw)):
prefixes.add(entry['prefix'])
try:
prefix = ip_network(entry['prefix']).compressed
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{entry["prefix"]}": {e}')
continue
entry['prefix'] = prefix
prefixes.add(prefix)
asns.add(entry['asn'])
entries.append(entry)

Expand All @@ -49,7 +56,7 @@ def run(self):
asn_qid = self.asn_id[entry['asn']]
prefix_qid = self.prefix_id[entry['prefix']]

links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]}) # Set AS name
links.append({'src_id': asn_qid, 'dst_id': prefix_qid, 'props': [self.reference, entry]})

logging.info('Pushing links to neo4j...')
# Push all links to IYP
Expand Down
15 changes: 10 additions & 5 deletions iyp/crawlers/bgptools/anycast_prefixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
import tempfile
from ipaddress import ip_network

import requests

Expand Down Expand Up @@ -82,11 +83,15 @@ def update(self, res, filename: str):

with open(filename, 'r') as file:
for line in file:
line = line.strip()
prefixes.add(line)
lines.append(line)

prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
try:
prefix = ip_network(line.strip()).compressed
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{line.strip()}": {e}')
continue
prefixes.add(prefix)
lines.append(prefix)

prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)
tag_id = self.iyp.get_node('Tag', {'label': 'Anycast'})

links = []
Expand Down
8 changes: 7 additions & 1 deletion iyp/crawlers/ihr/rov.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
from datetime import timezone
from ipaddress import ip_network

import arrow
import lz4.frame
Expand Down Expand Up @@ -99,7 +100,12 @@ def run(self):
rec['visibility'] = float(rec['visibility'])
rec['af'] = int(rec['af'])

prefix = rec['prefix']
try:
prefix = ip_network(rec['prefix']).compressed
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{rec["prefix"]}": {e}')
continue

if prefix not in prefix_id:
prefix_id[prefix] = self.iyp.get_node('Prefix', {'prefix': prefix})

Expand Down
198 changes: 129 additions & 69 deletions iyp/crawlers/nro/delegated_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
from collections import defaultdict
from datetime import datetime, timezone
from ipaddress import IPv4Address, IPv4Network

import requests

Expand All @@ -23,6 +24,35 @@ def __init__(self, organization, url, name):
super().__init__(organization, url, name)
self.reference['reference_url_info'] = 'https://www.nro.net/wp-content/uploads/nro-extended-stats-readme5.txt'

@staticmethod
def ffs(x):
"""Returns the index, counting from 0, of the least significant set bit in
`x`."""
return (x & -x).bit_length() - 1

@staticmethod
def decompose_prefix(ip, hosts):
# First address of this range
start = IPv4Address(ip)
# Last address of this range
stop = start + hosts - 1
remaining = int.from_bytes(stop.packed, byteorder='big') - int.from_bytes(start.packed, byteorder='big') + 1
next_address = start
while remaining > 0:
# Get the largest possible prefix length by checking the last bit set.
next_address_packed = int.from_bytes(next_address.packed, byteorder='big')
first_bit = Crawler.ffs(next_address_packed)
# Get the number of host bits required to cover the remaining addresses. If
# remaining is not a power of 2, round down to not cover too much.
required_host_bits = int(math.log2(remaining))
# Due to the last bit set in next_address, we can not choose more host bits,
# only less.
host_bits = min(first_bit, required_host_bits)
next_prefix = IPv4Network(f'{next_address}/{32 - host_bits}')
remaining -= next_prefix.num_addresses
next_address = next_prefix.broadcast_address + 1
yield str(next_prefix)

def run(self):
"""Fetch the delegated stat file from RIPE website and process lines one by
one."""
Expand All @@ -32,109 +62,139 @@ def run(self):
raise RequestStatusError('Error while fetching delegated file')

asn_id = self.iyp.batch_get_nodes_by_single_prop('AS', 'asn')
asn_in_iyp = sorted(asn_id.keys())

# Read delegated-stats file. see documentation:
# https://www.nro.net/wp-content/uploads/nro-extended-stats-readme5.txt
self.fields_name = ['registry', 'cc', 'type', 'start', 'value', 'date', 'status', 'opaque-id']

# Compute nodes
opaqueids = set()
prefixes = set()
countries = set()
asn_country_links = list()
prefix_country_links = list()
asn_status_links = defaultdict(list)
prefix_status_links = defaultdict(list)

logging.info('Parsing file')
for line in req.text.splitlines():
# skip comments
# Skip comments.
if line.strip().startswith('#'):
continue

fields_value = line.split('|')
# get modification time from version line
# Get modification time from version line.
if len(fields_value) == 7 and fields_value[0].isdigit():
try:
date = datetime.strptime(fields_value[5], '%Y%m%d').replace(tzinfo=timezone.utc)
self.reference['reference_time_modification'] = date
except ValueError as e:
logging.warning(f'Failed to set modification time: {e}')
# skip summary lines
# Skip summary lines.
if len(fields_value) < 8:
continue

# parse records
# Parse record lines.
rec = dict(zip(self.fields_name, fields_value))
rec['value'] = int(rec['value'])
rec['status'] = rec['status'].upper()

countries.add(rec['cc'])
opaqueids.add(rec['opaque-id'])

if rec['type'] == 'ipv4' or rec['type'] == 'ipv6':
# compute prefix length
prefix_len = rec['value']
if rec['type'] == 'ipv4':
prefix_len = int(32 - math.log2(rec['value']))

prefix = f"{rec['start']}/{prefix_len}"
prefixes.add(prefix)

# Create all nodes
logging.warning('Pushing nodes to neo4j...')
opaqueid_id = self.iyp.batch_get_nodes_by_single_prop('OpaqueID', 'id', opaqueids)
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes)
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries)

# Compute links
country_links = []
status_links = defaultdict(list)
additional_props = {'registry': rec['registry']}

for line in req.text.splitlines():
# skip comments
if line.strip().startswith('#'):
continue

# skip version and summary lines
fields_value = line.split('|')
if len(fields_value) < 8:
continue

# parse records
rec = dict(zip(self.fields_name, fields_value))
rec['value'] = int(rec['value'])

reference = dict(self.reference)
reference['registry'] = rec['registry']
country_qid = country_id[rec['cc']]
opaqueid_qid = opaqueid_id[rec['opaque-id']]

# ASN record
if rec['type'] == 'asn':
for i in range(int(rec['start']), int(rec['start']) + int(rec['value']), 1):
if i not in asn_id:
continue

asn_qid = asn_id[i]

country_links.append({'src_id': asn_qid, 'dst_id': country_qid, 'props': [reference]})
status_links[rec['status'].upper()].append(
{'src_id': asn_qid, 'dst_id': opaqueid_qid, 'props': [reference]})

# prefix record
start_asn = int(rec['start'])
asns_to_link = list()
if rec['value'] == 1 and start_asn in asn_id:
# Fast path.
asns_to_link = [start_asn]
elif rec['value'] > 1:
as_range_start = start_asn
as_range_end = start_asn + rec['value'] - 1
# Get overlap between existing ASes and the AS range specified in
# the record.
asns_to_link = [a for a in asn_in_iyp if as_range_start <= a <= as_range_end]
if asns_to_link:
# Only add if ASN is already present.
countries.add(rec['cc'])
opaqueids.add(rec['opaque-id'])
for i in asns_to_link:
asn_qid = asn_id[i]
asn_country_links.append({'src_id': asn_qid,
'dst_id': rec['cc'],
'props': [self.reference, additional_props]})
asn_status_links[rec['status']].append({'src_id': asn_qid,
'dst_id': rec['opaque-id'],
'props': [self.reference, additional_props]})
elif rec['type'] == 'ipv4' or rec['type'] == 'ipv6':

# compute prefix length
countries.add(rec['cc'])
opaqueids.add(rec['opaque-id'])
# Compute the prefix length.
# 'value' is a CIDR prefix length for IPv6, but a count of hosts for
# IPv4.
start = rec['start']
prefix_len = rec['value']
if rec['type'] == 'ipv4':
prefix_len = int(32 - math.log2(rec['value']))
# Some IPv4 prefixes are not CIDR aligned.
# Either their size is not a power of two and/or their start address
# is not aligned with their size.
needs_decomposition = False
prefix_len = 32 - math.log2(rec['value'])
if not prefix_len.is_integer():
needs_decomposition = True
else:
# Size is CIDR aligned
prefix = f'{start}/{int(prefix_len)}'
try:
IPv4Network(prefix)
except ValueError:
# Start address is not aligned.
needs_decomposition = True
if needs_decomposition:
# Decompose into CIDR prefixes.
record_prefixes = [prefix for prefix in self.decompose_prefix(start, rec['value'])]
else:
# Valid prefix, no decomposition required.
record_prefixes = [prefix]
else:
# IPv6 prefixes are always CIDR aligned.
prefix = f'{start}/{prefix_len}'
record_prefixes = [prefix]
# Add prefix(es) to IYP set.
prefixes.update(record_prefixes)
for prefix in record_prefixes:
# Create links for prefix(es)
prefix_country_links.append({'src_id': prefix,
'dst_id': rec['cc'],
'props': [self.reference, additional_props]})
prefix_status_links[rec['status']].append({'src_id': prefix,
'dst_id': rec['opaque-id'],
'props': [self.reference, additional_props]})

prefix = f"{rec['start']}/{prefix_len}"
prefix_qid = prefix_id[prefix]

country_links.append({'src_id': prefix_qid, 'dst_id': country_qid, 'props': [reference]})
status_links[rec['status'].upper()].append(
{'src_id': prefix_qid, 'dst_id': opaqueid_qid, 'props': [reference]})
# Create all nodes
opaqueid_id = self.iyp.batch_get_nodes_by_single_prop('OpaqueID', 'id', opaqueids, all=False)
prefix_id = self.iyp.batch_get_nodes_by_single_prop('Prefix', 'prefix', prefixes, all=False)
country_id = self.iyp.batch_get_nodes_by_single_prop('Country', 'country_code', countries, all=False)

# Replace with QIDs
for link in asn_country_links:
link['dst_id'] = country_id[link['dst_id']]
for link in prefix_country_links:
link['src_id'] = prefix_id[link['src_id']]
link['dst_id'] = country_id[link['dst_id']]
for links in asn_status_links.values():
for link in links:
link['dst_id'] = opaqueid_id[link['dst_id']]
for links in prefix_status_links.values():
for link in links:
link['src_id'] = prefix_id[link['src_id']]
link['dst_id'] = opaqueid_id[link['dst_id']]

logging.warning('Pushing links to neo4j...')
# Push all links to IYP
self.iyp.batch_add_links('COUNTRY', country_links)
for label, links in status_links.items():
self.iyp.batch_add_links('COUNTRY', asn_country_links)
self.iyp.batch_add_links('COUNTRY', prefix_country_links)
for label, links in asn_status_links.items():
self.iyp.batch_add_links(label, links)
for label, links in prefix_status_links.items():
self.iyp.batch_add_links(label, links)


Expand Down
6 changes: 6 additions & 0 deletions iyp/crawlers/pch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from ipaddress import ip_network
from multiprocessing import Pool
from typing import Iterable, Tuple

Expand Down Expand Up @@ -294,6 +295,11 @@ def run(self) -> None:
raw_links = defaultdict(set)
for collector_name, prefix_map in prefix_maps.items():
for prefix, asn_set in prefix_map.items():
try:
prefix = ip_network(prefix).compressed
except ValueError as e:
logging.warning(f'Ignoring malformed prefix: "{prefix}": {e}')
continue
ases.update(asn_set)
prefixes.add(prefix)
for asn in asn_set:
Expand Down
Loading

0 comments on commit ffaf2f2

Please sign in to comment.