Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bogon.py updates #45

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 24 additions & 42 deletions kartograf/bogon.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
# "fec0::/10",
]

# Sets of networks
SPECIAL_IPV4_NETWORKS = {ipaddress.ip_network(prefix) for prefix in SPECIAL_IPV4}
SPECIAL_IPV6_NETWORKS = {ipaddress.ip_network(prefix) for prefix in SPECIAL_IPV6}

def is_bogon_pfx(prefix):
"""
Expand All @@ -134,18 +137,8 @@ def is_bogon_pfx(prefix):
- https://bgpfilterguide.nlnog.net/guides/bogon_prefixes/
"""
network = ipaddress.ip_network(prefix)
version = network.version

if version == 4:
for ipv4_range in SPECIAL_IPV4:
if network.subnet_of(ipaddress.ip_network(ipv4_range)):
return True
elif version == 6:
for ipv6_range in SPECIAL_IPV6:
if network.subnet_of(ipaddress.ip_network(ipv6_range)):
return True

return False
networks = SPECIAL_IPV4_NETWORKS if network.version == 4 else SPECIAL_IPV6_NETWORKS
return any(network.subnet_of(special_net) for special_net in networks)


def is_bogon_asn(asn_raw):
Expand All @@ -159,38 +152,27 @@ def is_bogon_asn(asn_raw):
"""
asn = extract_asn(asn_raw)

if asn == 0:
# AS 0 is reserved, RFC7607
return True
if asn == 65535:
# Last 16 bit ASN, RFC7300
return True
if asn == 4294967295:
# Last 32 bit ASN, RFC7300
return True
if asn == 112:
# AS 112 is used by the AS112 project to sink misdirected DNS queries,
# RFC7534
return True
if asn == 23456:
# AS 23456 is reserved as AS_TRANS, RFC6793
return True
if 64496 <= asn <= 64511:
# AS 64496-64511 are reserved for documentation and sample code,
# RFC5398
return True
if 64512 <= asn <= 65534 or 4200000000 <= asn <= 4294967294:
# AS 64512-65534 and AS 4200000000-4294967294 are reserved for private
# use, RFC6996
return True
if 65536 <= asn <= 65551:
# AS 65536-65551 are reserved for documentation and sample code,
# RFC5398
return True
if 65552 <= asn <= 131071:
# IANA reserved ASNs, no RFC
reserved_asns = {
0, # Reserved, RFC7607
112, # AS112 project, RFC7534
23456, # AS_TRANS, RFC6793
65535, # Last 16 bit ASN, RFC7300
4294967295 # Last 32 bit ASN, RFC7300
}

if asn in reserved_asns:
return True

reserved_asn_ranges = {
(64496, 64511), # Documentation/Sample, RFC5398
(64512, 65534), # Private use, RFC6996
(65536, 65551), # Documentation/Sample, RFC5398
(65552, 131071), # IANA reserved, no RFC
(4200000000, 4294967294) # Private use, RFC6996
}

if any(start <= asn <= end for start, end in reserved_asn_ranges):
return True
return False


Expand Down
12 changes: 9 additions & 3 deletions kartograf/collectors/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand All @@ -23,13 +23,16 @@ def parse_routeviews_pfx2as(context):
if ',' not in line and '_' not in line:
# Still need to check for bogons
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
continue

if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}")
continue

clean.write(f"{prefix} {asn}\n")
Expand All @@ -49,10 +52,13 @@ def parse_routeviews_pfx2as(context):
# Bogon prefixes and ASNs are excluded since they can not be used
# for routing.
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}")
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
Expand Down
7 changes: 5 additions & 2 deletions kartograf/irr/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx, rir_from_str
from kartograf.util import parse_pfx, rir_from_str


@timed
Expand Down Expand Up @@ -62,7 +62,7 @@ def parse_irr(context):
else:
continue

route = format_pfx(route)
route = parse_pfx(route)

last_modified = datetime.strptime(entry["last-modified"], '%Y-%m-%dT%H:%M:%SZ')
last_modified = last_modified.replace(tzinfo=timezone.utc)
Expand All @@ -71,6 +71,9 @@ def parse_irr(context):
# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if not route or is_bogon_pfx(route) or is_bogon_asn(origin):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"IRR: parser encountered an invalid route: {route}")
continue

if context.max_encode and is_out_of_encoding_range(origin, context.max_encode):
Expand Down
7 changes: 5 additions & 2 deletions kartograf/rpki/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand Down Expand Up @@ -55,12 +55,15 @@ def parse_rpki(context):
valid_since = roa['valid_since']

for vrp in roa['vrps']:
prefix = format_pfx(vrp['prefix'])
asn = vrp['asid']
prefix = parse_pfx(vrp['prefix'])

# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}")
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
Expand Down
21 changes: 11 additions & 10 deletions kartograf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def wait_for_launch(wait):
time.sleep(1)


def format_pfx(pfx):
def parse_pfx(pfx):
"""
Attempt to format an IP network or address.
If invalid, return None.
Expand Down Expand Up @@ -163,12 +163,13 @@ def get_root_network(pfx):
Extract the top-level network from an IPv4 or IPv6 address.
Returns the value as an integer.
"""
network = format_pfx(pfx)
v = ipaddress.ip_network(network).version
if v == 4:
return int(network.split(".", maxsplit=1)[0])

root_net = network.split(":", maxsplit=1)[0]
if root_net:
return int(root_net, 16)
return 0
network = parse_pfx(pfx)
jurraca marked this conversation as resolved.
Show resolved Hide resolved
if network:
v = ipaddress.ip_network(network).version
if v == 4:
return int(network.split(".", maxsplit=1)[0])

root_net = network.split(":", maxsplit=1)[0]
if root_net:
return int(root_net, 16)
return None
81 changes: 81 additions & 0 deletions tests/bogon_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from kartograf.bogon import is_bogon_pfx, is_bogon_asn, extract_asn, is_out_of_encoding_range
from kartograf.util import parse_pfx

def test_special_asns():
special_cases = [0, 112, 23456, 65535, 4294967295]
for asn in special_cases:
assert is_bogon_asn(asn) is True
assert is_bogon_asn(f"AS{asn}") is True

def test_documentation_range():
assert is_bogon_asn(64496) is True
assert is_bogon_asn(64511) is True
assert is_bogon_asn(64503) is True

def test_private_use_range():
assert is_bogon_asn(64512) is True
assert is_bogon_asn(65534) is True
assert is_bogon_asn(4200000000) is True
assert is_bogon_asn(4294967294) is True

def test_valid_asns():
valid_asns = [13335, 15169, 32934, 16509] # Cloudflare, Google, Facebook, Amazon
for asn in valid_asns:
assert is_bogon_asn(asn) is False
assert is_bogon_asn(f"AS{asn}") is False

def test_asn_extraction():
assert extract_asn("AS12345") == 12345
assert extract_asn("as12345") == 12345
assert extract_asn(" AS12345 ") == 12345
assert extract_asn(12345) == 12345

def test_encoding_range():
assert is_out_of_encoding_range(33521665) is True
assert is_out_of_encoding_range(33521664) is False
assert is_out_of_encoding_range(100) is False

def test_bogon_ipv4_prefixes():
bogon_prefixes = [
"0.0.0.0/8", # RFC791
"10.0.0.0/8", # RFC1918 Private-Use
"127.0.0.0/8", # Loopback
"169.254.0.0/16", # Link Local
"192.168.0.0/16", # Private-Use
"224.0.0.0/4", # Multicast
"240.0.0.0/4" # Reserved
]
for prefix in bogon_prefixes:
assert is_bogon_pfx(prefix) is True

def test_valid_ipv4_prefixes():
valid_prefixes = [
"8.8.8.0/24", # Google DNS
"1.1.1.0/24", # Cloudflare
"104.16.0.0/12", # Cloudflare range
"157.240.0.0/16" # Facebook
]
for prefix in valid_prefixes:
assert is_bogon_pfx(prefix) is False

def test_bogon_ipv6_prefixes():
bogon_prefixes = [
"::/8", # IPv4-compatible
"100::/64", # Discard-Only
"2001:db8::/32", # Documentation
"fc00::/7", # Unique-Local
"fe80::/10", # Link-Local Unicast
"ff00::/8" # Multicast
]
for prefix in bogon_prefixes:
assert is_bogon_pfx(prefix) is True

def test_valid_ipv6_prefixes():
valid_prefixes = [
"2606:4700::/32", # Cloudflare
"2620:fe::/48", # Google
"2a03:2880::/32" # Facebook
]
for prefix in valid_prefixes:
network = parse_pfx(prefix)
assert is_bogon_pfx(network) is False
38 changes: 21 additions & 17 deletions tests/util_test.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,59 @@
from kartograf.util import format_pfx, is_valid_pfx, get_root_network
from kartograf.util import parse_pfx, is_valid_pfx, get_root_network

def test_valid_ipv4_network():
pfx = "192.144.11.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv4_addr():
pfx = "192.144.11.0"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_network():
pfx = "2001:db8::/64"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_addr():
pfx = "2001:db8::1"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_invalid_ip_network():
pfx = "192.1/asdf"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_input():
pfx = "no.slash"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_prefixes():
invalid_prefixes = [
"not.a.prefix",
"300.0.0.0/8", # Invalid IPv4
"2001:xyz::/32" # Invalid IPv6
]
for prefix in invalid_prefixes:
assert is_valid_pfx(prefix) is False


def test_private_network():
pfx = "0.128.0.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_ipv4_prefix_with_leading_zeros():
pfx = "010.10.00.00/16"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


def test_ipv6_prefix_with_leading_zeros():
pfx = "001:db8::0/24"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


Expand All @@ -53,10 +63,4 @@ def test_get_root_network():
ipv6 = "2001:db8::/64"
assert get_root_network(ipv6) == int("2001", 16)
invalid = "not.a.network"
# TODO: use pytest
try:
get_root_network(invalid)
except ValueError:
assert True
else:
assert False
assert get_root_network(invalid) is None
Loading