Skip to content

Commit

Permalink
Fix: Check validity of all IP addresses assigned to node interfaces
Browse files Browse the repository at this point in the history
This fix extends the 'host bits' check to all IP addresses assigned to
node interfaces and verifies that the interface IP addresses are not
multicast addresses.

Closes #1779, #1778
  • Loading branch information
ipspace committed Jan 28, 2025
1 parent 58c076f commit ace97b9
Showing 1 changed file with 56 additions and 23 deletions.
79 changes: 56 additions & 23 deletions netsim/augment/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,29 +518,44 @@ def get_nth_ip_from_prefix(pfx: netaddr.IPNetwork, n_id: int) -> str:
check_interface_host_bits: Check whether the IP addresses on an interface have host bits. The check is disabled
for special prefixes (IPv4: /31, /32, IPv6: /127, /128)
"""
def check_interface_host_bits(intf: Box, node: Box) -> bool:
def check_interface_host_bits(intf: Box, node: Box, link: typing.Optional[Box] = None) -> bool:
OK = True

if 'ifname' in intf:
intf_name = f'{intf.ifname}({intf.name})' if 'name' in intf else intf.ifname
elif link is not None:
intf_name = link._linkname
else:
intf_name = ''

for af in log.AF_LIST:
if af not in intf: # Skip unusued AFs
continue
if not isinstance(intf[af],str): # Skip unnumbered interfaces
continue
pfx = netaddr.IPNetwork(intf[af])
if pfx[0].is_multicast():
log.error(
f'Interfaces cannot have multicast {af} addresses ({intf[af]} on {node.name}/{intf_name})',
log.IncorrectValue,
'links')
OK = False
continue

if pfx.last <= pfx.first + 1: # Are we dealing with special prefix (loopback or /31)
continue # ... then it's OK not to have host bits

if str(pfx) == str(pfx.cidr): # Does the IP address have host bits -- is it different from its CIDR subnet?
log.error(
f'Address {intf[af]} on interface {intf.ifname}/node {node.name} does not contain host bits',
f'Address {intf[af]} on node {node.name}/{intf_name} does not contain host bits',
log.IncorrectValue,
'links')
OK = False
continue

if str(pfx[-1]) == str(pfx.ip) and af == 'ipv4':
log.error(
f'Address {intf[af]} on interface {intf.ifname}/node {node.name} is a subnet broadcast address',
f'Address {intf[af]} on node {node.name}/{intf_name} is a subnet broadcast address',
log.IncorrectValue,
'links')
OK = False
Expand Down Expand Up @@ -571,17 +586,7 @@ def set_interface_address(intf: Box, af: str, pfx: netaddr.IPNetwork, node_id: i
'links')
return False

if str(intf_pfx) != str(intf_pfx.cidr): # Does the IP address have host bits -- is it different from its CIDR subnet?
return True # That's it -- the user knows what she's doing

if intf_pfx.last <= intf_pfx.first + 1: # Are we dealing with special prefix (loopback or /31)
return True # ... then it's OK not to have host bits

log.error(
f'Address {intf[af]} for node {intf.node} does not contain host bits',
log.IncorrectValue,
'links')
return False
return True # Further checks done in check_interface_host_bits

# No static interface address, or static address specified as relative node_id
try:
Expand All @@ -601,7 +606,8 @@ def set_interface_address(intf: Box, af: str, pfx: netaddr.IPNetwork, node_id: i
If the interface address is set, validate that it's a valid address (can't be int)
"""
def IPAM_unnumbered(link: Box, af: str, pfx: typing.Optional[bool], ndict: Box) -> None:
def IPAM_unnumbered(link: Box, af: str, pfx: typing.Optional[bool], ndict: Box) -> bool:
OK = True
for intf in link.interfaces:
if not af in intf: # No static address
if isinstance(pfx,bool): # Set it to link bool value if it exists
Expand All @@ -615,29 +621,43 @@ def IPAM_unnumbered(link: Box, af: str, pfx: typing.Optional[bool], ndict: Box)
f'Node {intf.node} is using host index {intf[af]} for {af} on an unnumbered link',
log.IncorrectValue,
'links')
OK = False

return OK

def IPAM_sequential(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> None:
def IPAM_sequential(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> bool:
start = 1 if pfx.last != pfx.first + 1 else 0
gwid = get_gateway_id(link)
OK = True
for count,intf in enumerate(link.interfaces):
if count + start == gwid: # Would the next address overlap with gateway ID
start = start + 1 # ... no big deal, just move the starting point ;)
set_interface_address(intf,af,pfx,count+start)
OK = set_interface_address(intf,af,pfx,count+start) and OK

return OK

def IPAM_p2p(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> None:
def IPAM_p2p(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> bool:
start = 1 if pfx.last != pfx.first + 1 else 0
OK = True
for count,intf in enumerate(sorted(link.interfaces, key=lambda intf: intf.node)):
set_interface_address(intf,af,pfx,count+start)
OK = set_interface_address(intf,af,pfx,count+start) and OK

def IPAM_id_based(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> None:
return OK

def IPAM_id_based(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> bool:
OK = True
for intf in link.interfaces:
set_interface_address(intf,af,pfx,ndict[intf.node].id)
OK = set_interface_address(intf,af,pfx,ndict[intf.node].id) and OK

return OK

def IPAM_loopback(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> None:
def IPAM_loopback(link: Box, af: str, pfx: netaddr.IPNetwork, ndict: Box) -> bool:
for intf in link.interfaces:
pfx.prefixlen = 128 if ':' in str(pfx) else 32
intf[af] = str(pfx)

return True

IPAM_dispatch: typing.Final[dict] = {
'unnumbered': IPAM_unnumbered,
'p2p': IPAM_p2p,
Expand Down Expand Up @@ -669,6 +689,7 @@ def assign_interface_addresses(link: Box, addr_pools: Box, ndict: Box, defaults:
link.pop('unnumbered')
return

error = False
for af in ('ipv4','ipv6'):
if not af in pfx_list: # Skip address families not used on the link
continue
Expand All @@ -686,6 +707,7 @@ def assign_interface_addresses(link: Box, addr_pools: Box, ndict: Box, defaults:
strings.extra_data_printout(f'{link}'),
log.IncorrectValue,
'links')
error = True
continue

if 'allocation' in pfx_list:
Expand All @@ -707,14 +729,25 @@ def assign_interface_addresses(link: Box, addr_pools: Box, ndict: Box, defaults:
more_hints=hints,
category=log.IncorrectValue,
module='links')
error = True
continue

if not allocation_policy in IPAM_dispatch:
log.error(
f'Invalid IP address allocation policy specified in prefix {pfx_list} found on {link._linkname}',
log.IncorrectValue,
'links')
IPAM_dispatch[allocation_policy](link,af,pfx_net,ndict) # execute IPAM policy to get AF addresses on interfaces
error = True

# execute IPAM policy to get AF addresses on interfaces
if not IPAM_dispatch[allocation_policy](link,af,pfx_net,ndict):
error = True

if error:
return

for intf in link.interfaces:
check_interface_host_bits(intf,ndict[intf.node],link)

"""
cleanup 'af: False' entries from interfaces
Expand Down

0 comments on commit ace97b9

Please sign in to comment.