From 5b4e8918715a1d2e4abf77ed4eb3252486a19109 Mon Sep 17 00:00:00 2001 From: Eric Garver Date: Tue, 23 Feb 2021 09:18:33 -0500 Subject: [PATCH] fix(ipset): disallow overlapping entries These are already being blocked by the ipset backend, but we should catch them higher up to avoid differences in the backends. --- src/firewall/client.py | 4 +++- src/firewall/core/fw_ipset.py | 4 +++- src/firewall/core/ipset.py | 13 +++++++++++++ src/firewall/server/config_ipset.py | 5 ++++- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/firewall/client.py b/src/firewall/client.py index 4ff6349a9..2aa8a0d9e 100644 --- a/src/firewall/client.py +++ b/src/firewall/client.py @@ -33,7 +33,7 @@ from firewall.core.base import DEFAULT_ZONE_TARGET, DEFAULT_POLICY_TARGET, DEFAULT_POLICY_PRIORITY from firewall.dbus_utils import dbus_to_python from firewall.core.rich import Rich_Rule -from firewall.core.ipset import normalize_ipset_entry +from firewall.core.ipset import normalize_ipset_entry, check_entry_overlaps_existing from firewall import errors from firewall.errors import FirewallError @@ -1618,6 +1618,7 @@ def setEntries(self, entries): raise FirewallError(errors.IPSET_WITH_TIMEOUT) _entries = set() for _entry in dbus_to_python(entries, list): + check_entry_overlaps_existing(_entry, _entries) _entries.add(normalize_ipset_entry(_entry)) self.settings[5] = list(_entries) @handle_exceptions @@ -1627,6 +1628,7 @@ def addEntry(self, entry): raise FirewallError(errors.IPSET_WITH_TIMEOUT) entry = normalize_ipset_entry(entry) if entry not in self.settings[5]: + check_entry_overlaps_existing(entry, self.settings[5]) self.settings[5].append(entry) else: raise FirewallError(errors.ALREADY_ENABLED, entry) diff --git a/src/firewall/core/fw_ipset.py b/src/firewall/core/fw_ipset.py index e53489494..a285fd4a4 100644 --- a/src/firewall/core/fw_ipset.py +++ b/src/firewall/core/fw_ipset.py @@ -25,7 +25,7 @@ from firewall.core.logger import log from firewall.core.ipset import remove_default_create_options as rm_def_cr_opts, \ - normalize_ipset_entry + normalize_ipset_entry, check_entry_overlaps_existing from firewall.core.io.ipset import IPSet from firewall import errors from firewall.errors import FirewallError @@ -196,6 +196,7 @@ def add_entry(self, name, entry): if entry in obj.entries: raise FirewallError(errors.ALREADY_ENABLED, "'%s' already is in '%s'" % (entry, name)) + check_entry_overlaps_existing(entry, obj.entries) try: for backend in self.backends(): @@ -245,6 +246,7 @@ def set_entries(self, name, entries): _entries = set() for _entry in entries: + check_entry_overlaps_existing(_entry, _entries) _entries.add(normalize_ipset_entry(_entry)) entries = list(_entries) diff --git a/src/firewall/core/ipset.py b/src/firewall/core/ipset.py index 5bb21856f..d6defa395 100644 --- a/src/firewall/core/ipset.py +++ b/src/firewall/core/ipset.py @@ -302,3 +302,16 @@ def normalize_ipset_entry(entry): _entry.append(_part) return ",".join(_entry) + +def check_entry_overlaps_existing(entry, entries): + """ Check if entry overlaps any entry in the list of entries """ + # Only check simple types + if len(entry.split(",")) > 1: + return + + for itr in entries: + try: + if ipaddress.ip_network(itr, strict=False).overlaps(ipaddress.ip_network(entry, strict=False)): + raise FirewallError(errors.INVALID_ENTRY, "Entry '{}' overlaps with existing entry '{}'".format(itr, entry)) + except ValueError: + pass diff --git a/src/firewall/server/config_ipset.py b/src/firewall/server/config_ipset.py index 18ef5783d..f33c2a029 100644 --- a/src/firewall/server/config_ipset.py +++ b/src/firewall/server/config_ipset.py @@ -33,7 +33,8 @@ dbus_introspection_prepare_properties, \ dbus_introspection_add_properties from firewall.core.io.ipset import IPSet -from firewall.core.ipset import IPSET_TYPES, normalize_ipset_entry +from firewall.core.ipset import IPSET_TYPES, normalize_ipset_entry, \ + check_entry_overlaps_existing from firewall.core.logger import log from firewall.server.decorators import handle_exceptions, \ dbus_handle_exceptions, dbus_service_method @@ -408,6 +409,7 @@ def getEntries(self, sender=None): # pylint: disable=W0613 def setEntries(self, entries, sender=None): _entries = set() for _entry in dbus_to_python(entries, list): + check_entry_overlaps_existing(_entry, _entries) _entries.add(normalize_ipset_entry(_entry)) entries = list(_entries) log.debug1("%s.setEntries('[%s]')", self._log_prefix, @@ -432,6 +434,7 @@ def addEntry(self, entry, sender=None): raise FirewallError(errors.IPSET_WITH_TIMEOUT) if entry in settings[5]: raise FirewallError(errors.ALREADY_ENABLED, entry) + check_entry_overlaps_existing(entry, settings[5]) settings[5].append(entry) self.update(settings)