Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FR] Add IPv6 Support to CidrMatch #77

Closed
wants to merge 27 commits into from
Closed

Conversation

eric-forte-elastic
Copy link
Collaborator

@eric-forte-elastic eric-forte-elastic commented Oct 19, 2023

Issues

Relates https://github.com/elastic/ia-trade-team/issues/196

Alternative PR

This PR is an alternative version of #80

Details

This PR adds support for IPv6 to the cidrMatch function, for more context see the related issues.

IPv4 vs IPv6

This PR attempts to follow the same logical flow for the IPv4 version of cidrMatch. However, there are a few required changes worth noting to support some IPv6 specific features.

Compressed notation

  • IPv6 supports compressed notation where trailing zeroes can be truncated to :: and leading zeros can be dropped.
    • Example: 2001:0db8:0000:0000:0000:0000:0000:0001 is the same as 2001:db8::1
  • The generated regex patterns in the current implementation support addresses in either notation style.
  • When the python engine comparisons are made, the compressed notation addresses are expanded in order to be able to appropriately determine if the address is in the cidr range.

Increased address space

  • IPv4 uses 32 bit addresses where IPv6 uses 128 bit addresses.

  • In Python3 there are a number of different ways to convert these addresses into numeric values (or similar objects) for bitwise/masking comparison

  • In Python2 however, the reliable, native method for doing this is by using the struct.unpack function which unpacks the byte string (original string input from they query) into an integer.

    For IPv4 one can use the following unpack the string into a single integer value

     ip_bytes = socket.inet_aton(ip_string)
     (subnet_int,) = struct.unpack(">L", ip_bytes)

    For IPv6 since the address space is 128 bit instead of 32, we are unpacking the string into two unsigned long integers instead and recombining them into a single integer (after expanding IPv6 string in case it is in compressed notation).

     ip_string = cls.expand_ipv6_address(ip_string)
     ip_bytes = socket.inet_pton(socket.AF_INET6, ip_string)
     high, low = struct.unpack('>QQ', ip_bytes)
     address_int = (high << 64) | low

IPv4 Code

For existing IPv4 code this PR attempts to edit this code as little as possible. There are cases where significant readability improvements could be made by editing the code. If desired, happy to make these updated in this PR.

For instance the following update to make_octet_re

Updated "make_octet_re"

    @classmethod
    def make_octet_re(cls, start, end):
        """Convert an octet-range into a regular expression."""
        combos = []

        if start == 0 and end == 255:
            return cls.octet_re

        # IPv4 octet range
        if start >= 0 and start < 255 and end <= 255:
            for o in range(start, end + 1):
                combos.append("{:d}".format(o))
        # IPv6 h16 range
        elif start >= 0 and end <= 65535:
            for h16 in range(start, end + 1):
                combos.append("{:d}".format(h16))
        else:
            raise ValueError("Invalid octet range")

        return "(?:{})".format("|".join(combos))

Testing

The majority of the testing can be performed via the new ipv6 labeled unit tests in test_python_engine.py and test_functions.py. However, additional testing can be performed by calling the functions directly from a python script.

Example Python Script

from eql.functions import CidrMatch

# create a list of CIDR blocks to match against
cidr_blocks = ["192.168.0.0/24", "2001:db8::/32"]

# create an instance of the CidrMatch class
cidr_matcher = CidrMatch()


# call the run method with an IP address and the list of CIDR blocks
ip_address = "192.168.0.1"
result = cidr_matcher.run(ip_address, *cidr_blocks)

# print the result
print(result)  # True


# create a list of CIDR blocks to match against
cidr_blocks = ["2001:0db8:0000:0000:0000:0000:0000:0000/32", "fe80::/10"]

# create an instance of the CidrMatch class
cidr_matcher = CidrMatch()

# call the run method with an IPv6 address and the list of CIDR blocks
ipv6_address = "2001:0db8:0000:0000:0000:0000:0000:0001"
ipv6_address = "2001:db8::1"
ipv6_address = "fe80::1"
print(ipv6_address)
result = cidr_matcher.run(ipv6_address, *cidr_blocks)
print(result)

ipv6_address = CidrMatch.expand_ipv6_address(ipv6_address)
print(ipv6_address)
result = cidr_matcher.run(ipv6_address, *cidr_blocks)

# print the result
print(result)  # True

🔴 Important 🔴

The testing MUST be performed on both Python 3.7 and Python 2.7 as eql needs to be able to support both. Some editors such VSCode do not support debugging in Python 2.7 so one will have to run the unit tests via another method.

@eric-forte-elastic eric-forte-elastic added the enhancement New feature or request label Oct 19, 2023
@eric-forte-elastic eric-forte-elastic self-assigned this Oct 19, 2023
@eric-forte-elastic eric-forte-elastic marked this pull request as ready for review October 26, 2023 15:54
@eric-forte-elastic eric-forte-elastic linked an issue Oct 26, 2023 that may be closed by this pull request
eql/functions.py Outdated
socket.inet_pton(socket.AF_INET6, address)
# Check if the prefix length is a valid integer between 0 and 128
prefix_length = int(prefix_length)
if prefix_length < 0 or prefix_length > 128:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if prefix_length < 0 or prefix_length > 128:
if 0 < prefix_length <= 128:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you already made a check_ipv6 method, so why not also create a check_ipv6_prefix method and simplify this to

def check_ipv6 ...

@classmethod
def check_ipv6_prefix(cls, prefix):
    """Check if an int is a valid prefix range (0-128)."""
    if prefix_length < 0 or prefix_length > 128:
         return True
    return False
        

@classmethod
def check_ipv6_cidr(cls, cidr):
    """Check if a string is a valid IPv6 CIDR range."""
    # Split the CIDR range into the address and prefix length
    address, prefix_length = cidr.split("/")
    prefix_length = int(prefix_length)
    if check_ipv6(address) and check_ipv6_prefix(prefix_length):
        return True
    return False

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also is / requirement enforced prior to this? Your expand_ipv6 method seems to imply it may not be present

Copy link
Collaborator Author

@eric-forte-elastic eric-forte-elastic Oct 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the IPv4 implementation there is a check against the regex pattern specified in cidr_compiled in effect happens in the check_ipv6_cidr path. The expand_ipv6 should only be used for ip addresses instead of CIDR ranges, so it would cause an error if there was a / present.

(edited)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely make it closer to what you have above, but we will still need to try, except to catch a value error for the line address, prefix_length = cidr.split("/") in the event that there is not a / passed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

>>> x = "x"
>>> x.split("/")
['x']
>>> a, b = x.split("/")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: need more than 1 value to unpack

Co-authored-by: Justin Ibarra <[email protected]>
@eric-forte-elastic
Copy link
Collaborator Author

Closing this PR as its alternative has been merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FR] Add IPv6 Support to CidrMatch
2 participants