From de6dbfc521aa9518ecd278588bebbb9c106f49bd Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Mon, 28 Oct 2024 14:19:18 -0700 Subject: [PATCH 1/7] Improved Google Capirca functionality by enabling Fortigate support, modernizing outdated Python 2 code to Python 3 for improved compatibility, adding comprehensive documentation for clarity, and refactoring functions for enhanced structure and readability. --- capirca/aclgen.py | 28 +- capirca/lib/fortigate.py | 963 ++++++++++++++++++++++++++++ capirca/lib/policy.py | 19 + capirca/lib/policy_simple.py | 3 + policies/pol/sample_fortigate.pol | 50 ++ policies/pol/sample_multitarget.pol | 2 + tests/lib/fortigate_test.py | 496 ++++++++++++++ 7 files changed, 1552 insertions(+), 9 deletions(-) create mode 100644 capirca/lib/fortigate.py create mode 100644 policies/pol/sample_fortigate.pol create mode 100644 tests/lib/fortigate_test.py diff --git a/capirca/aclgen.py b/capirca/aclgen.py index fe29dbe7..f9374e20 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -24,34 +24,35 @@ from absl import logging from capirca.lib import aclgenerator from capirca.lib import arista -from capirca.lib import arista_tp +#from capirca.lib import arista_tp from capirca.lib import aruba from capirca.lib import brocade from capirca.lib import cisco from capirca.lib import ciscoasa -from capirca.lib import cisconx +#from capirca.lib import cisconx from capirca.lib import ciscoxr from capirca.lib import cloudarmor from capirca.lib import gce -from capirca.lib import gce_vpc_tf +#from capirca.lib import gce_vpc_tf from capirca.lib import gcp_hf +from capirca.lib import fortigate from capirca.lib import ipset from capirca.lib import iptables from capirca.lib import juniper -from capirca.lib import juniperevo +#from capirca.lib import juniperevo from capirca.lib import junipermsmpc from capirca.lib import junipersrx -from capirca.lib import k8s +#from capirca.lib import k8s from capirca.lib import naming from capirca.lib import nftables from capirca.lib import nsxv -from capirca.lib import nsxt -from capirca.lib import openconfig +#from capirca.lib import nsxt +#from capirca.lib import openconfig from capirca.lib import packetfilter from capirca.lib import paloaltofw from capirca.lib import pcap from capirca.lib import policy -from capirca.lib import sonic +#from capirca.lib import sonic from capirca.lib import speedway from capirca.lib import srxlo from capirca.lib import windows_advfirewall @@ -198,6 +199,7 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, sonic_pol = False k8s_pol = False gce_vpc_tf_pol = False + fcl = False try: with open(input_file) as f: @@ -286,6 +288,8 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, gca = copy.deepcopy(pol) if 'k8s' in platforms: k8s_pol = copy.deepcopy(pol) + if 'fortigate' in platforms: + fcl = copy.deepcopy(pol) acl_obj: aclgenerator.ACLGenerator @@ -447,6 +451,11 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, RenderACL( str(acl_obj), acl_obj.SUFFIX, output_directory, input_file, write_files) + + if fcl: + acl_obj = fortigate.Fortigate(fcl, exp_info) + RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, + input_file, write_files) # TODO(robankeny) add additional errors. except ( @@ -465,7 +474,8 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, gce.Error, gce_vpc_tf.Error, cloudarmor.Error, - k8s.Error) as e: + k8s.Error, + fortigate.Error) as e: raise ACLGeneratorError('Error generating target ACL for %s:\n%s' % (input_file, e)) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py new file mode 100644 index 00000000..45e8d2c4 --- /dev/null +++ b/capirca/lib/fortigate.py @@ -0,0 +1,963 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Version 1.1.14 + +"""Fortigate generator.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import datetime +from absl import logging +import six + +from capirca.lib import aclgenerator +from capirca.lib import nacaddr + +_ACTION_TABLE = { + 'accept': 'accept', + 'deny': 'deny', + 'next': 'next', + 'reject': 'reject', + 'reject-with-tcp-rst': 'reject', # set deny-tcp-with-icmp enable +} +_SP = ' ' +_DEFAULT_COMMENT = 'Generated by Capirca' +_SUPPORT_VERBATIM_TERM = False +_COMMENT_MAX_LENGTH = 1024 + + +class Error(Exception): + """Generic error class.""" + + +class FilterError(Error): + """Generic pol Filter class.""" + + +class FortiGateValueError(Error): + """Raised when invalid values provided.""" + + +class FortiGateFindServiceError(Error): + """Raised when unable to get the service name.""" + + +class FortiGateDuplicateTermError(Error): + """Raised when duplicate term found.""" + + +class FortiGatePortDoesNotExistError(Error): + """Raised when port is not found in ports list.""" + + +class FortiGateScheduleDateError(Error): + """Raised expiration date is invalid.""" + + +class FortigatePortMap(): + """Map port numbers to service names.""" + _PORTS_TCP = { + '179': 'BGP', + '53': 'DNS', + '7': 'PING', + '79': 'FINGER', + '21': 'FTP', + '70': 'GOPHER', + '443': 'HTTPS', + '194': 'IRC', + '2049': 'NFS', + '119': 'NNTP', + '110': 'POP3', + '1723': 'PPTP', + '25': 'SMTP', + '22': 'SSH', + '517': 'TALK', + '23': 'TELNET', + '540': 'UUCP', + '80': 'HTTP', + '993': 'IMAPS', + '3389': 'RDP', + '3306': 'MYSQL', + '1433': 'MS-SQL', + '1812': 'RADIUS', + '995': 'POP3S', + '465': 'SMTPS', + '389': 'LDAP', + '69': 'TFTP', + 'all': 'ALL_TCP' + } + _PORTS_UDP = { + '53': 'DNS', + '7': 'PING', + '500': 'IKE', + '2049': 'NFS', + '123': 'NTP', + '520': 'RIP', + '161': 'SNMP', + '162': 'snmptrap', + '514': 'SYSLOG', + '517': 'TALK', + '69': 'TFTP', + '37': 'TIMESTAMP', + '1812': 'RADIUS', + '67': 'DHCP', + 'all': 'ALL_UDP' + } + _PORTS_SCTP = { + #'53': 'DNS', + #'7': 'PING', + 'all': 'ALL_SCTP' + } + _PROTO_MAP = { + 'icmp': 'ALL_ICMP', + 'icmpv6': 'ALL_ICMP6', + #'gre': 'GRE', + #'ip': 'ALL', + 'tcp': _PORTS_TCP, + 'udp': _PORTS_UDP, + 'sctp': _PORTS_SCTP + } + + @staticmethod + def get_protocol(protocol, port=None): + """Converts a port number to a service name. + + Args: + protocol: string representing protocol (tcp, udp, etc) + port: integer representing the port number + + Returns: + string + + Raises: + FortiGateValueError: When unsupported protocol is used. + FortiGatePortDoesNotExistError: if the port does not exist. + FortiGateFindServiceError: when unable to find the requested service. + """ + f_proto = FortigatePortMap._PROTO_MAP.get(protocol, None) + if f_proto is None: + raise FortiGateValueError( + f"{protocol} protocol is unsupported, supported protocols = { + list(FortigatePortMap._PROTO_MAP.keys())}") + + if isinstance(f_proto, six.string_types): + return f_proto + if port: + try: + return f_proto[port] + except KeyError as exc: + raise FortiGatePortDoesNotExistError(f"Port {exc} does not exist") from exc + else: + raise FortiGateFindServiceError( + f'service not found from {protocol} protocol and {port} port') + + +class ObjectsContainer(): + """A Container that holds service and network objects.""" + + def __init__(self): + self.verbose = True + self._sys_settings = set() + self._dict_addresses = set() + self._dict_addrgrps = {} + self._dict_services = {} + self._dict_svcgrps = {} + self._dict_schedules = {} + + def get_sys_settings(self): + """Returns the collected addresses. + + """ + settings = [] + for setting in self._sys_settings: + settings += [_SP + setting] + + return settings + + def get_fw_addresses(self, ip_v=4): + """ + Returns the collected firewall addresses. + + Args: + ip_v: an integer. version 4 or 6. + + Returns: a list. contains address objects. + + """ + addresses = [] + addresses_v = [] + for addr in self._dict_addresses: + if ip_v == 4 and not isinstance(addr, nacaddr.IPv6): + addresses_v += [addr] + if ip_v == 6 and isinstance(addr, nacaddr.IPv6): + addresses_v += [addr] + + for addr in sorted(addresses_v): + addresses.extend(self.get_fw_address_obj(addr, self.verbose)) + + return addresses + + def get_fw_address_obj(self, addr, verbose=True): + """Gets firewall address objects. + + """ + addr_name = addr.with_prefixlen + address = [] + address += [f'{_SP} edit "{addr_name}"'] + if verbose: + addr_comment = _DEFAULT_COMMENT + if len(addr.text): + addr_comment += " (" + addr.text + ")" + address += [f'{_SP * 2} set comment "{self.fix_comment_length(addr_comment)}"'] + if not isinstance(addr, nacaddr.IPv6): + address += [f'{_SP * 2} set subnet {addr_name}'] + if isinstance(addr, nacaddr.IPv6): + address += [f'{_SP * 2} set ip6 {addr_name}'] + address += [_SP + 'next'] + + return address + + def get_fw_addrgrps(self, ip_v=4): + """Returns the collected address groups.""" + addrgrps = [] + for addrgrp_name, addrgrp_item in self._dict_addrgrps.items(): + if addrgrp_item[0] == ip_v: + address = addrgrp_item[1] + exclude_address = addrgrp_item[2] + addrgrps += [f'{_SP} edit "{addrgrp_name}"'] + if self.verbose: + addrgrps += [f'{_SP * 2} set comment "{_DEFAULT_COMMENT}"'] + if address: + addrgrps += [f'{_SP * 2} set member {" ".join(f"\"{v}\"" for v in address)}'] + else: + addrgrps += [_SP * 2 + 'set member "all"'] + if exclude_address: + addrgrps += [_SP * 2 + 'set exclude enable'] + addrgrps += [f'{_SP * 2} set exclude-member {" ".join(f"\"{v}\"" for v in exclude_address)}'] + addrgrps += [_SP + 'next'] + + return addrgrps + + def get_port_range(self, port): + """ + Returns the port range as a string. + + If the input port is a single integer, returns that value. + If the input port is a list of two integers, returns a range string (e.g. "1-10"). + + :param port: The port number or list of port numbers + :return: A string representing the port range + """ + port_range = '' + if len(port) == 1: + port_range = str(port[0]) + elif len(port) > 1: + port_range = str(port[0]) + if port[0] != port[1]: + port_range = str(min(port[0], port[1])) + '-' + str(max(port[0], port[1])) + + return port_range + + def get_fw_services(self): + """Returns the collected services.""" + fw_services = [] + for service_name in sorted(self._dict_services.keys()): + fw_services += [_SP + 'edit ' + service_name] + if self.verbose: + fw_services += [f'{_SP * 2} set comment {_DEFAULT_COMMENT}'] + for service_item in self._dict_services[service_name]: + fw_services += [_SP * 2 + service_item] + fw_services += [_SP + 'next'] + + return fw_services + + def get_fw_svcgrps(self): + """Returns the collected service groups.""" + svcgrps = [] + for svcgrp, value in self._dict_svcgrps.items(): + svcgrps += [f'{_SP} edit {svcgrp}'] + if self.verbose: + svcgrps += [f'{_SP * 2} set comment {_DEFAULT_COMMENT}'] + svcgrps += [_SP * 2 + value] + svcgrps += [_SP + 'next'] + + return svcgrps + + def get_fw_schedules(self): + """Returns the collected schedules.""" + schedules = [] + for schedule_name, schedule_date in self._dict_schedules.items(): + schedules.extend([f'{_SP} edit {schedule_name}', + f'{_SP * 2} set end {schedule_date}', + 'next']) + return schedules + + def process_action_setting(self, action): + """Process reject action.""" + found_action = _ACTION_TABLE.get(action) + if found_action == 'reject': + self._sys_settings.add('set deny-tcp-with-icmp enable') + + return action + + def add_address_to_fw_addrgrps( + self, + addrgrp_name, + address, + address_exclude): + """Add address and exclude to address group store.""" + address_v4 = [x.with_prefixlen for x in address if + not isinstance(x, nacaddr.IPv6)] + address_v6 = [x.with_prefixlen for x in address if + isinstance(x, nacaddr.IPv6)] + address_exclude_v4 = [x.with_prefixlen for x in address_exclude if + not isinstance(x, nacaddr.IPv6)] + address_exclude_v6 = [x.with_prefixlen for x in address_exclude if + isinstance(x, nacaddr.IPv6)] + + if address_exclude_v6: + raise FortiGateValueError( + 'Exclude IPv6 address is unsupported: {}'.join([f'{x.with_prefix_len}' for x in address_exclude_v6])) + + addr_names = [] + if address_v4 or address_exclude_v4: + addr_name = self.generate_address_or_addrgrp( + addrgrp_name, address_v4, address_exclude_v4, 4) + addr_names += [(4, addr_name)] + + if address_v6: + addr_name6 = self.generate_address_or_addrgrp( + addrgrp_name + '6', address_v6, None, 6) + addr_names += [(6, addr_name6)] + + return addr_names or 'all' + + def generate_address_or_addrgrp( + self, + addrgrp_name, + address, + address_exclude, + ip_v): + """ + Generates an address or address group. + + Args: + addrgrp_name: a string. name of address group + address: a string. ipv4 or ipv6 address + address_exclude: a string. ipv4 or ipv6 address + ip_v: an integer. 4 or 6 for ipVersion + + Returns: string + + """ + if not address and not address_exclude: + return 'all' + + if not address_exclude: + if len(address) == 1: + return address[0] + + if addrgrp_name not in self._dict_addrgrps: + self._dict_addrgrps[addrgrp_name] = [ + ip_v, address, address_exclude] + + return addrgrp_name + + def get_defined_service(self, protocol, port): + """return service if find service in defined map.""" + try: + service = FortigatePortMap.get_protocol(protocol, port) + return service + except FortiGatePortDoesNotExistError: + pass + + return None + + def add_service_to_fw_services(self, term_name, protocol_ports): + """Add service to services store.""" + protocols = set() + portranges = set() + for protocol, portrange in protocol_ports.items(): + protocols.add(protocol) + for range1 in portrange: + portranges.add(str(range1)) + + service_name = term_name + "-svc" + if service_name not in self._dict_services: + protocol_set = set() + for protocol, port_ranges in protocol_ports.items(): + portrange_str = ' '.join(str(v) for v in sorted(port_ranges)) + protocol_set.add( + f'set {protocol.lower()}-portrange {portrange_str}') + + self._dict_services[service_name] = sorted(protocol_set) + + return service_name + + def add_icmp_to_fw_services( + self, + protocol, + icmp_type, + normalized_icmptype, + icmp_code): + """ + Processes ICMP Additions to Firewall Services. + + """ + # icmp-types + if not normalized_icmptype and not icmp_code: + return 'ALL_ICMP6' if protocol == 'icmpv6' else 'ALL_ICMP' + + icmp_service_name = protocol + '-type-' + icmp_type + \ + (('-' + str(icmp_code)) if icmp_code else '') + if icmp_service_name not in self._dict_services: + protocol_set = [] + protocol_set += [f'set protocol {"ICMP6" if protocol == "icmpv6" else "ICMP"}'] + if normalized_icmptype: + protocol_set += [f'set icmptype {normalized_icmptype}'] + if icmp_code: + protocol_set += [f'set icmpcode {str(icmp_code)}'] + + self._dict_services[icmp_service_name] = protocol_set + + return icmp_service_name + + def add_icmp_service_grp(self, term_name, icmp_service_grp): + """ + Add an ICMP service group to the FortiGate configuration. + + Args: + term_name (str): The name of the term. + icmp_service_grp (list): A list of members to include in the service group. + + Returns: + str: The name of the new service group, or the name of an existing one if it already exists. + """ + icmp_service_grp_name = term_name + "-svcgrp" + if icmp_service_grp_name not in self._dict_svcgrps: + icmp_members = 'set member ' + (' ').join(sorted(icmp_service_grp)) + for key, value in self._dict_svcgrps.items(): + if icmp_members == value: + return key + self._dict_svcgrps[icmp_service_grp_name] = icmp_members + + return icmp_service_grp_name + + def add_expiration_to_fw_schedules(self, expiration): + """Add expiry date to schedule store.""" + schedule_name = expiration[-10:] + '_' + expiration[:5] + if schedule_name not in self._dict_schedules: + self._dict_schedules[schedule_name] = expiration + return schedule_name + + def fix_comment_length(self, comment): + """Return a comment which is equal or shorter than _COMMENT_MAX_LENGTH. + _COMMENT_MAX_LENGTH truncated as necessary. + """ + return comment[:_COMMENT_MAX_LENGTH] + + +class Term(aclgenerator.Term): + """Single Firewall Policy.""" + + _PLATFORM = 'fortigate' + _NGFW_MODE = 'profile-based' + CURRENT_ID = 0 + + def __init__(self, term, object_container, verbose=True): + super().__init__(term) + self._term = term + self._obj_container = object_container + self._term.verbose = verbose + + self.id_ = type(self).CURRENT_ID + if type(self).CURRENT_ID > 0: + type(self).CURRENT_ID += 1 + + def _get_services_name(self, protocols, destination_ports, source_ports): + """Get the service name, if not exist create it. + + Args: + protocols: list of protocols + destination_ports: list of destination ports + source_ports: list of source ports + + Returns: + string (all services separated by spaces). + """ + #if not protocols: + # raise FortiGateFindServiceError('protocol not found') + + ports = set() + # fortigate does not allow empty destination_ports + if not destination_ports and source_ports: + # source ports only, to set destination ports = 1-65535 + destination_ports.append((1, 65535)) + + if len(destination_ports): + for destination_port in destination_ports: + dest_port_range = self._obj_container.get_port_range(destination_port) + if source_ports: + for source_port in source_ports: + src_port_range = self._obj_container.get_port_range(source_port) + ports.add(dest_port_range + ':' + src_port_range) + else: + ports.add(dest_port_range) + + if not ports: + ports.add('all') + + ports = sorted(ports) + + services = set() + portranges = {} + for protocol in protocols: + port_map = FortigatePortMap() + if port_map.get_protocol(protocol) is None: + raise FortiGateValueError( + f"fortigate does not support {protocol} protocol") + + if protocol in {'icmp', 'icmpv6'}: + ip_v = 4 if protocol == 'icmp' else 6 + icmp_type_dict = {} + for icmp_type in self._term.icmp_type: + normalized_icmptype = self.NormalizeIcmpTypes( + [icmp_type], protocols, ip_v) + if normalized_icmptype: + icmp_type_dict[icmp_type] = normalized_icmptype[0] + + icmp_service_grp = set() + icmp_service_name = '' + for icmp_type in sorted( + icmp_type_dict, key=icmp_type_dict.get): + if self._term.icmp_code: + for each_code in sorted(self._term.icmp_code): + icmp_service_name = self._obj_container.add_icmp_to_fw_services( + protocol, icmp_type, icmp_type_dict[icmp_type], each_code) + icmp_service_grp.add(icmp_service_name) + else: + icmp_service_name = self._obj_container.add_icmp_to_fw_services( + protocol, + icmp_type, + icmp_type_dict[icmp_type], + self._term.icmp_code) + icmp_service_grp.add(icmp_service_name) + + if len(icmp_service_grp) > 1: + service = self._obj_container.add_icmp_service_grp( + self._term.name, icmp_service_grp) + else: + service = icmp_service_name + services.add(service) + else: + for port in ports: + service = self._obj_container.get_defined_service(protocol, port) + if service: + if service == 'ALL_SCTP': + if protocol not in portranges: + portranges[protocol] = set() + portranges[protocol].add('1-65535') + else: + services.add(service) + else: + if protocol not in portranges: + portranges[protocol] = set() + portranges[protocol].add(port) + + if portranges: + service = self._obj_container.add_service_to_fw_services( + self._term.name, portranges) + services.add(service) + + return ' '.join(sorted(services)) or 'ALL' + + def _generate_address_names(self, *addresses): + """Generate the addresses names (object-network names).""" + for group in addresses: + for addr in group: + if addr: + self._obj_container._dict_addresses.add(addr) + + def _process_verbatim_term(self): + """Process verbatim term output""" + # If Term includes verbatim token only + # Warning and skip this term + if not _SUPPORT_VERBATIM_TERM: + logging.warning( + 'WARNING: Term %s is a verbatim term. ' + 'term will not be rendered.', + self.term.name) + return '' + # output verbatim term and warning + output = [] + for verbatim_line in self.term.verbatim: + platform, contents = verbatim_line + if platform == self._PLATFORM: + output += [str(contents)] + logging.warning( + 'WARNING: Term %s is a verbatim term. ' + 'to ensure the term output is' + 'valid FortiGate items.', self.term.name) + return (_SP * 2 + ('\n' + _SP * 2).join(output)) if output else '' + + def _process_verbatim_item(self): + """Process verbatim output""" + # Term verbatim output + output = [] + if self.term.verbatim: + for verbatim_line in self.term.verbatim: + platform, contents = verbatim_line + if platform == self._PLATFORM: + output += [_SP * 2 + str(contents)] + + return output + + def _convert_date(self, expiration): + """Covert date format yyyy-mm-dd hh:mi to hh:mi yyyy/mm/dd.""" + try: + schedule_date = expiration.strftime('%H:%M %Y/%m/%d') + return schedule_date + except ValueError as e: + raise FortiGateScheduleDateError('Expiration is invalid datetime format.') from e + + def __str__(self): + lines = [] + + # process verbatim term + if self.term.verbatim and ( + not self.term.protocol or not self.term.action): + return self._process_verbatim_term() + + # Not support next action, skip this term + action = self._obj_container.process_action_setting( + self._term.action[0]) + if action == 'next': + return '' + + self._generate_address_names( + self._term.destination_address, + self._term.source_address, + self._term.destination_address_exclude, + self._term.source_address_exclude) + + dest_addresses = self._obj_container.add_address_to_fw_addrgrps( + self._term.name + '-dstgrp', + self._term.destination_address, + self._term.destination_address_exclude) + src_addresses = self._obj_container.add_address_to_fw_addrgrps( + self._term.name + '-srcgrp', + self._term.source_address, + self._term.source_address_exclude) + + services = self._get_services_name( + sorted( + self._term.protocol), + self._term.destination_port, + self._term.source_port) + + schedule_name = None + if self._term.expiration: + schedule_date = self._convert_date(self._term.expiration) + schedule_name = self._obj_container.add_expiration_to_fw_schedules( + schedule_date) + + lines += [f"{_SP * 2} set name {self._term.name}"] + # Owner (implement as comment) + if not self._term.comment: + self._term.comment = [_DEFAULT_COMMENT] + if self._term.owner: + self._term.comment += [f"Owner: {self._term.owner}"] + if self._term.comment and self._term.verbose: + lines += [f"{_SP * 2} set comments {self._obj_container.FixCommentLength((' ').join(self._term.comment))}"] + lines += [f"{_SP * 2} set srcintf {self._term.source_interface or 'any'}"] + lines += [f"{_SP * 2} set dstintf {self._term.destination_interface or 'any'}"] + exist_src6 = False + exist_dst6 = False + if isinstance(dest_addresses, list): + for (ip_v, addr_name) in dest_addresses: + lines += [f"{_SP * 2} set {('dstaddr' if ip_v == 4 else 'dstaddr6')} '{addr_name}'"] + if ip_v == 6: + exist_dst6 = True + else: + lines += [f"{_SP * 2} set dstaddr '{dest_addresses}'"] + if isinstance(src_addresses, list): + for (ip_v, addr_name) in src_addresses: + lines += [f"{_SP * 2} set {('srcaddr' if ip_v == 4 else 'srcaddr6')} '{addr_name}'"] + if ip_v == 6: + exist_src6 = True + else: + lines += [f"{_SP * 2} set srcaddr '{src_addresses}'"] + if exist_src6 and not exist_dst6: + lines += [_SP * 2 + 'set dstaddr6 "all"'] + elif not exist_src6 and exist_dst6: + lines += [_SP * 2 + 'set srcaddr6 "all"'] + + # if self._term.destination_address_exclude + # and not self._term.destination_address: + # lines += [_SP*2 + 'set dstaddr-negate enable'] + # if self._term.source_address_exclude + # and not self._term.source_address: + # lines += [_SP*2 + 'set srcaddr-negate enable'] + + # process verbatim items + # if self._term.verbatim: + # lines.extend(self._process_verbatim_item()) + + lines += [f"{_SP * 2} set action {action if action == 'accept' else 'deny'}"] + if action == 'reject': + lines += [_SP * 2 + 'set send-deny-packet enable'] + + if services: + if self._NGFW_MODE == 'policy-based': + lines += [_SP * 2 + 'set enforce-default-app-port disable'] + lines += [f"{_SP * 2} set service '{', '.join(services)}'"] + else: + if self._NGFW_MODE == 'policy-based': + lines += [_SP * 2 + 'set enforce-default-app-port enable'] + + + # if self._NGFW_MODE == 'profile-based': + # if self._term.av_profile or self._term.webfilter_profile or + # self._term.ssl_ssh_profile or self._term.dnsfilter_profile or + # self._term.ips_sensor: + # lines += [_SP*2 + 'set utm-status enable'] + + if self._NGFW_MODE == 'policy-based' and self._term.application_id: + lines += [f"{_SP * 2} set application {' '.join(str(v) for v in sorted(self._term.application_id))}"] + + lines += [f"{_SP * 2} set schedule {schedule_name if schedule_name else 'always'}"] + + # opts = [str(x) for x in self._term.option] + # if ('tcp-established' in opts or 'established' in opts): + # lines += [_SP*2 + 'set something'] + + if self._term.logging: + if self._term.logging == 'log-both': + lines += [_SP * 2 + 'set logtraffic all'] + lines += [_SP * 2 + 'set logtraffic-start enable'] + elif self._term.logging == 'disable': + lines += [_SP * 2 + 'set logtraffic disable'] + else: + lines += [_SP * 2 + 'set logtraffic all'] + + return '\n'.join(lines) + + +class Fortigate(aclgenerator.ACLGenerator): + """A Fortigate policy object.""" + + _PLATFORM = 'fortigate' + _NGFW_MODE = 'profile-based' + _DEFAULT_PROTOCOL = 'ALL' + SUFFIX = '.fcl' + + def __init__(self, *args, **kwargs): + self._obj_container = ObjectsContainer() + super().__init__(*args, **kwargs) + + def _BuildTokens(self): + """Build supported tokens for platform. + + Returns: + tuple containing both supported tokens and sub tokens. + """ + supported_tokens, supported_sub_tokens = super()._BuildTokens() + supported_tokens |= {'source_interface', + 'destination_interface', + 'source_address_exclude', + 'destination_address_exclude', + 'icmp_type', + 'icmp_code', + 'application_id'} + + supported_sub_tokens.update({'option': {'tcp-established'}, + # Warning, some of these are mapped + # differently. See _ACTION_TABLE + 'action': {'accept', 'deny', + 'next', 'reject', + 'reject-with-tcp-rst'}}) + + return supported_tokens, supported_sub_tokens + + def _TranslatePolicy(self, pol, exp_info): + """Translate Capirca pol to fortigate pol.""" + self.fortigate_policies = [] + current_date = datetime.datetime.utcnow().date() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + term_dup_check = set() + + for header, terms in pol.filters: + if self._PLATFORM not in header.platforms: + continue + + # fortigate option format: + # target:: fortigate from-id n + # target:: fortigate from-id n ngfw-mode profile-based | + # policy-based + filter_options = header.FilterOptions(self._PLATFORM) + + verbose = True + if 'noverbose' in filter_options: + filter_options.remove('noverbose') + verbose = False + + self._obj_container.verbose = verbose + + my_filter = {} + if len(filter_options) == 1: + my_filter[filter_options[0]] = '' + if len(filter_options) > 1: + my_filter[filter_options[0]] = filter_options[1] + if len(filter_options) > 3: + for key in filter_options: + if key == my_filter[2]: + raise FilterError('Fortigate filter arguments are duplicated: ' + key) + my_filter[filter_options[2]] = filter_options[3] + + # default from-id is 0 + Term.CURRENT_ID = 0 + # default ngfw_mode = profile-based + self.ngfw_mode = 'profile-based' + for filter_key, filter_val in my_filter.items(): + if filter_key == 'from-id': + from_id = int(filter_val) + if from_id < 1: + raise FilterError( + 'FortiGate from-id must be more than zero') + Term.CURRENT_ID = int(from_id) + elif filter_key == 'ngfw-mode': + if filter_val not in ['profile-based', 'policy-based']: + raise FilterError('FortiGate ngfw-mode only supports profile-based or policy-based') + self.ngfw_mode = filter_val + else: + raise FilterError( + 'FortiGate only support from-id and ngfw-mode filter') + + Term._NGFW_MODE = self.ngfw_mode + + for term in terms: + term.name = self.FixTermLength(term.name) + + filter_name = header.FilterName(self._PLATFORM) + if term.stateless_reply: + logging.warning( + 'WARNING: Term %s in policy %s is a stateless reply ' + 'term and will not be rendered. FortiGates are stateful', + term.name, + filter_name) + continue + if term.expiration: + if term.expiration <= exp_info_date: + logging.info( + 'INFO: Term %s in policy %s expires ' + 'in less than two weeks.', + term.name, filter_name) + if term.expiration <= current_date: + logging.warning( + 'WARNING: Term %s in policy %s is expired and ' + 'will not be rendered.', + term.name, filter_name) + continue + if term.name in term_dup_check: + raise FortiGateDuplicateTermError(f"You have a duplicate term: {term.name}") + term_dup_check.add(term.name) + + new_term = Term(term, self._obj_container, verbose) + + self.fortigate_policies += [(header, term.name, new_term)] + + def _get_fw_policies(self): + target_policies = [] + for (_, _, term) in self.fortigate_policies: + term_str = str(term) + if term_str != '': + target_policies += [_SP + f'edit {term.id}'] + target_policies += [term_str] + target_policies += [_SP + 'next'] + + return target_policies + + def __str__(self): + fw_policies = self._get_fw_policies() + + start_sys_settings = ['config sys setting'] + start_addresses_v4 = ['config firewall address'] + start_addresses_v6 = ['config firewall address6'] + start_addrgrps_v4 = ['config firewall addrgrp'] + start_addrgrps_v6 = ['config firewall addrgrp6'] + start_services = ['config firewall service custom'] + start_svcgrps = ['config firewall service group'] + start_schedules = ['config firewall schedule onetime'] + start_policies = [] + if self._NGFW_MODE == 'profile-based': + start_policies = ['config firewall policy'] + else: + start_policies = ['config firewall security-policy'] + end = ['end'] + + sys_settings = [] + if self._obj_container.get_sys_settings(): + sys_settings = start_sys_settings + \ + self._obj_container.get_sys_settings() + \ + end + [''] + + fw_addresses = [] + if self._obj_container.get_fw_addresses(4): + fw_addresses += start_addresses_v4 + \ + self._obj_container.get_fw_addresses(4) + \ + end + [''] + if self._obj_container.get_fw_addresses(6): + fw_addresses += start_addresses_v6 + \ + self._obj_container.get_fw_addresses(6) + \ + end + [''] + + fw_addr_grps = [] + if self._obj_container.get_fw_addrgrps(4): + fw_addr_grps += start_addrgrps_v4 + \ + self._obj_container.get_fw_addrgrps(4) + \ + end + [''] + if self._obj_container.get_fw_addrgrps(6): + fw_addr_grps += start_addrgrps_v6 + \ + self._obj_container.get_fw_addrgrps(6) + \ + end + [''] + + fw_services = [] + if self._obj_container.get_fw_services(): + fw_services = start_services + \ + self._obj_container.get_fw_services() + \ + end + [''] + + fw_svc_grps = [] + if self._obj_container.get_fw_svcgrps(): + fw_svc_grps = start_svcgrps + \ + self._obj_container.get_fw_svcgrps() + \ + end + [''] + + fw_schedules = [] + if self._obj_container.get_fw_schedules(): + fw_schedules = start_schedules + \ + self._obj_container.get_fw_schedules() + \ + end + [''] + + fw_policies = start_policies + fw_policies + end + + target = sys_settings + fw_addresses + fw_addr_grps + \ + fw_services + fw_svc_grps + fw_schedules + fw_policies + + return '\n'.join(target) diff --git a/capirca/lib/policy.py b/capirca/lib/policy.py index e5e980de..d7f0e890 100644 --- a/capirca/lib/policy.py +++ b/capirca/lib/policy.py @@ -365,6 +365,7 @@ class Term: source-zone: VarType.SZONE versa-application: VarType.VERSA_APPLICATION vpn: VarType.VPN + application-id: VarType.APPLICATION_ID """ # fmt: skip ICMP_TYPE = { 4: { @@ -516,6 +517,8 @@ def __init__(self, obj): self.flattened_saddr = None self.flattened_daddr = None self.stateless_reply = False + # fortigate specific + self.application_id = [] # AddObject touches variables which might not have been initialized # further up so this has to be at the end. @@ -878,6 +881,8 @@ def __str__(self): ret_str.append(' source_zone: %s' % sorted(self.source_zone)) if self.destination_zone: ret_str.append(' destination_zone: %s' % sorted(self.destination_zone)) + if self.application_id: + ret_str.append(' application_id: %s' % self.application_id) return '\n'.join(ret_str) @@ -993,6 +998,8 @@ def __eq__(self, other): return False if sorted(self.traffic_type) != sorted(other.traffic_type): return False + if sorted(self.application_id) != sorted(other.application_id): + return False # vpn if self.vpn != other.vpn: @@ -1268,6 +1275,8 @@ def AddObject(self, obj): self.source_zone.append(x.value) elif x.var_type is VarType.DZONE: self.destination_zone.append(x.value) + elif x.var_type is VarType.APPLICATION_ID: + self.application_id.append(x.value) else: raise TermObjectTypeError( "%s isn't a type I know how to deal with (contains '%s')" @@ -1363,6 +1372,8 @@ def AddObject(self, obj): self.target_service_accounts.append(obj.value) elif obj.var_type is VarType.FILTER_TERM: self.filter_term = obj.value + elif obj.var_type is VarType.APPLICATION_ID: + self.application_id.append(obj.value) else: raise TermObjectTypeError( "%s isn't a type I know how to deal with" % (type(obj)) @@ -1716,6 +1727,7 @@ class VarType: DECAPSULATE = 67 SOURCE_SERVICE_ACCOUNTS = 68 VERSA_APPLICATION = 69 + APPLICATION_ID = 70 def __init__(self, var_type, value): self.var_type = var_type @@ -1955,6 +1967,7 @@ def __ne__(self, other): 'VERBATIM', 'VERSA_APPLICATION', 'VPN', + 'APPLICATION_ID', ) literals = r':{},-/' @@ -2038,6 +2051,7 @@ def __ne__(self, other): 'verbatim': 'VERBATIM', 'versa-application': 'VERSA_APPLICATION', 'vpn': 'VPN', + 'application-id': 'APPLICATION_ID', } # disable linting warnings for lexx/yacc code @@ -2650,6 +2664,11 @@ def p_pan_application_spec(p): for apps in p[4]: p[0].append(VarType(VarType.PAN_APPLICATION, apps)) +def p_application_id_spec(p): + """ application_id_spec : APPLICATION_ID ':' ':' one_or_more_ints """ + p[0] = [] + for apps in p[4]: + p[0].append(VarType(VarType.APPLICATION_ID, apps)) def p_interface_spec(p): """interface_spec : SINTERFACE ':' ':' STRING diff --git a/capirca/lib/policy_simple.py b/capirca/lib/policy_simple.py index 3e848130..2f474fe1 100644 --- a/capirca/lib/policy_simple.py +++ b/capirca/lib/policy_simple.py @@ -331,6 +331,8 @@ class VersaApplication(Field): class Vpn(Field): """A vpn field.""" +class ApplicationID(Field): + """A application id field.""" destination_address_fields = (DestinationAddress, DestinationExclude, DestinationPrefix) @@ -391,6 +393,7 @@ class Vpn(Field): 'vpn': Vpn, 'encapsulate': Encapsulate, 'decapsulate': Decapsulate, + 'application-id': ApplicationID, } diff --git a/policies/pol/sample_fortigate.pol b/policies/pol/sample_fortigate.pol new file mode 100644 index 00000000..33b2aae1 --- /dev/null +++ b/policies/pol/sample_fortigate.pol @@ -0,0 +1,50 @@ +# Header Option: from-id -- Tells Capirca to number firewall +# policies starting at the provided integer. +# Header Option: ngfw-mode -- Default is profile. +# If Fortigate is using policy-based NGFW mode add 'ngfw-mode policy' + +header { + target:: fortigate +} + +term allow-web-outbound{ + source-address:: INTERNAL + destination-port:: HTTP HTTPS + source-port:: HTTP + protocol:: tcp udp + expiration:: 2020-12-20 + logging:: syslog + action:: accept +} + +term customers-policy { + destination-address:: INTERNAL + destination-exclude:: NTP_SERVERS + destination-port:: DNS HTTPS + source-exclude:: NTP_SERVERS + protocol:: tcp udp + action:: reject +} + +term customers-policy2 { + source-interface:: port2 + destination-interface:: port1 + source-address:: INTERNAL + source-port:: SMTP + destination-address:: MAIL_SERVERS + destination-port:: SMTP + protocol:: tcp udp + comment:: "this a test policy" + owner:: foo@google.com + action:: accept +} + +term ipv6-outbound{ + source-address:: LINKLOCAL + destination-address:: SITELOCAL LINKLOCAL + destination-port:: HTTP HTTPS + source-port:: HTTP + protocol:: tcp + expiration:: 2020-12-20 + action:: accept +} \ No newline at end of file diff --git a/policies/pol/sample_multitarget.pol b/policies/pol/sample_multitarget.pol index cf93f95f..da4d1a12 100644 --- a/policies/pol/sample_multitarget.pol +++ b/policies/pol/sample_multitarget.pol @@ -19,6 +19,7 @@ header { target:: brocade edge-inbound target:: cisconx edge-inbound target:: ciscoxr edge-inbound + target:: fortigate } #include 'includes/untrusted-networks-blocking.inc' @@ -63,6 +64,7 @@ header { target:: cisco edge-outbound mixed target:: speedway OUTPUT target:: ciscoasa asa_out + target:: fortigate } term deny-to-bad-destinations { diff --git a/tests/lib/fortigate_test.py b/tests/lib/fortigate_test.py new file mode 100644 index 00000000..b80cc95f --- /dev/null +++ b/tests/lib/fortigate_test.py @@ -0,0 +1,496 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unittest for fortigate policy rendering module.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import re +import string +import unittest +import mock + +from capirca.lib import fortigate +from capirca.lib import nacaddr +from capirca.lib import naming +from capirca.lib import policy + + +GOOD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate from-id 2 +} +""" + +GOOD_HEADER_1 = """ +header { + comment:: "this is a test acl" + target:: fortigate ngfw-mode policy-based +} +""" + +BAD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate edge-filter +} +""" + +TERM_TEMPLATE = """ +term good-term-2 {{ + source-interface:: {src_interface} + destination-interface:: {dest_interface} + protocol:: {protocol} + destination-address:: {dest_addr} + destination-port:: {dest_port} + source-address:: {src_addr} + source-port:: {src_port} + action:: {action} + logging:: {logging} +}} +""" + +_SP = ' ' + +EXP_INFO = 2 + + +class CustomFormatter(string.Formatter): + """ + Checks the custom formatter for fortigate output. + + """ + DEFAULT_VALUES = { + 'src_interface': 'wan1', + 'dest_interface': 'wan2', + 'protocol': 'tcp', + 'src_addr': 'SOME_HOST', + 'dest_addr': 'SOME_HOST', + 'src_port': 'HTTP', + 'dest_port': 'HTTP', + 'action': 'accept', + 'logging': 'true' + } + + def format(*args, **kwargs): + if 'remove_fields' in kwargs or 'add_fields' in kwargs: + args = list(args) + + if 'remove_fields' in kwargs: + for field in kwargs['remove_fields']: + remove_regex = '.*' + field + '.*' + args[1] = re.sub(remove_regex, '', args[1]) + + if 'add_fields' in kwargs: + add_fields_string = "" + for field, value in kwargs['add_fields'].items(): + add_fields_string += " " + field + ":: " + value + "\n" + args[1] = args[1][:-3] + add_fields_string + args[1][-3:] + + return string.Formatter.format(*args, **kwargs) + + def get_value(self, key, args, kwds): + try: + return kwds[key] + except KeyError: + return self.DEFAULT_VALUES[key] + + +class FortigateTest(unittest.TestCase): + """ + Fortigate test class. + + """ + def setUp(self): + self.naming = mock.create_autospec(naming.Naming) + + def get_addr_side_eff(host): + hosts = { + 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], + 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')], + 'SOME_HOST6': [nacaddr.IP('fec0::/10')] + } + return hosts[host] + + def get_port_side_eff(*args): + hosts = { + 'HTTP': ['80'], + 'HTTPS': ['443'], + 'SSH': ['22'], + 'WHOIS': ['43'] + } + return hosts[args[0]] + + self.naming.GetNetAddr.side_effect = get_addr_side_eff + self.naming.GetServiceByProto.side_effect = get_port_side_eff + self.fmt = CustomFormatter() + + def testGoodHeader(self): + """ + Tests a good header value. + + """ + term = self.fmt.format(TERM_TEMPLATE) + acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + + expected_sig = 'edit 2' + + get_net_calls = [mock.call('SOME_HOST')] * 2 + get_server_by_proto_calls = [mock.call('HTTP', 'tcp')] * 2 + + self.assertIn(expected_sig, str(acl), '[%s]' % str(acl)) + self.naming.GetNetAddr.assert_has_calls(get_net_calls) + self.naming.GetServiceByProto.assert_has_calls(get_server_by_proto_calls) + + def testBadHeader(self): + """ + Tests a bad header value. + + """ + term = self.fmt.format(TERM_TEMPLATE) + parsed_p = policy.ParsePolicy(BAD_HEADER + term, + self.naming) + + self.assertRaises(fortigate.FilterError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testAction(self): + """ + Tests the action detection. + + """ + accept_term = self.fmt.format(TERM_TEMPLATE, action='accept') + deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') + reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') + + accept_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) + deny_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) + reject_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) + + accept_sig = 'set action accept' + deny_sig = 'set action deny' + reject_sig = 'set send-deny-packet enable' + reject_sys_sig = ('config sys setting\n' + + _SP + 'set deny-tcp-with-icmp enable\n' + + 'end\n') + + self.assertIn( + accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + self.assertIn( + deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) + self.assertIn( + reject_sys_sig, str(reject_acl), '[%s]' % str(reject_acl)) + self.assertTrue( + deny_sig in str(reject_acl) and reject_sig in str(reject_acl), + '[%s]' % str(reject_acl)) + + def testAddresses(self): + """ + Tests an address object. + + """ + diff_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST', + dest_addr='SOME_HOST2') + same_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST2', + dest_addr='SOME_HOST2') + any_src_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_addr',)) + any_dest_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_addr',)) + # testing for IPv6 + same_addr6_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST6', + dest_addr='SOME_HOST6') + + diff_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) + + same_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) + + any_src_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) + + any_dest_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) + + same_addr6_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr6_term, + self.naming), EXP_INFO) + + src_sig = 'set srcaddr "10.0.0.0/8"' + dest_sig = 'set dstaddr "20.0.0.0/8"' + any_dest_sig = 'set dstaddr "all"' + any_src_sig = 'set srcaddr "all"' + src_sig_v6 = 'set srcaddr6 "fec0::/10"' + dest_sig_v6 = 'set dstaddr6 "fec0::/10"' + + self.assertTrue( + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) + # [] check acl generate one 'set subnet' for dup addresses + self.assertEqual( + str(same_addr_acl).count('set subnet'), 1) + self.assertIn( + any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) + self.assertIn( + any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) + self.assertTrue( + src_sig_v6 in str(same_addr6_acl) + and dest_sig_v6 in str(same_addr6_acl), + '[%s]' % str(same_addr6_acl)) + + def testServices(self): + """ + Tests services objects. + + """ + dest_only_term = self.fmt.format(TERM_TEMPLATE, + dest_port='HTTP', + remove_fields=('src_port',)) + diff_port_term = self.fmt.format(TERM_TEMPLATE, + dest_port='HTTP HTTPS', + remove_fields=('src_port',)) + dup_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTP') + icmp_term = self.fmt.format(TERM_TEMPLATE, + protocol='icmp', + add_fields={'icmp-type': 'echo-request'}, + remove_fields=('src_addr', 'dest_addr', + 'dest_port', 'src_port')) + ip_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_port', 'src_port')) + custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') + #print("\icmp_term=========\n", icmp_term) + + dest_only_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dest_only_term, + self.naming), EXP_INFO) + diff_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) + dup_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) + icmp_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) + ip_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + ip_term, + self.naming), EXP_INFO) + custom_port_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) + #print("\ncustom_port_acl=========\n", custom_port_acl) + + dest_only_sig = 'set service HTTP\n' + diff_sig = 'set service HTTP HTTPS\n' + dup_sig = 'set service good-term-2-svc\n' + icmp_sig = 'set service icmp-type-echo-request\n' + ip_sig = 'set service ALL_TCP\n' + custom_port_sig = ('config firewall service custom\n' + + _SP + 'edit good-term-2-svc\n' + + _SP*2 + 'set comment "Generated by Capirca"\n' + + _SP*2 + 'set tcp-portrange 80:43\n' + + _SP + 'next\n') + + self.assertIn( + dest_only_sig, str(dest_only_acl), '[%s]' % str(dest_only_acl)) + self.assertIn( + diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) + self.assertIn( + dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) + self.assertIn( + icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) + self.assertIn( + ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) + self.assertIn( + custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) + + def testInterfaces(self): + """ + Tests interfaces. + + """ + no_interfaces_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_interface', + 'dest_interface')) + #print("no_interfaces_term=", no_interfaces_term) + src_only_int_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + remove_fields=('dest_interface',)) + dest_only_int_term = self.fmt.format(TERM_TEMPLATE, + dest_interface='wan2', + remove_fields=('src_interface',)) + both_interfaces_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + dest_interface='wan2',) + + no_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) + src_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) + dest_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) + both_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) + + no_interfaces_sig = 'set srcintf any\n' + _SP*2 + 'set dstintf any' + src_int_only_sig = 'set srcintf wan1\n' + _SP*2 + 'set dstintf any' + dest_int_only_sig = 'set srcintf any\n' + _SP*2 + 'set dstintf wan2' + both_interfaces_sig = 'set srcintf wan1\n' + _SP*2 + 'set dstintf wan2' + + self.assertIn( + no_interfaces_sig, str(no_interfaces_acl), + '[%s]' % str(no_interfaces_acl)) + self.assertIn( + src_int_only_sig, str(src_only_int_acl), + '[%s]' % str(src_only_int_acl)) + self.assertIn( + dest_int_only_sig, str(dest_only_int_acl), + '[%s]' % str(dest_only_int_acl)) + self.assertIn( + both_interfaces_sig, str(both_interfaces_acl), + '[%s]' % str(both_interfaces_acl)) + + def testExpiration(self): + """ + Tests expiration / schedule object. + + """ + no_expiration_term = self.fmt.format(TERM_TEMPLATE) + expiration_term = self.fmt.format(TERM_TEMPLATE, + add_fields={'expiration': + '2022-12-31', + 'comment': + '"test expiration"'}) + + no_expiration_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_expiration_term, + self.naming), EXP_INFO) + expiration_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + expiration_term, + self.naming), EXP_INFO) + + no_expiration_sig = 'set schedule always' + expiration_sig = 'set schedule 2022/12/31_00:00' + expiration_config_sig = ('config firewall schedule onetime\n' + + _SP + 'edit 2022/12/31_00:00\n' + + _SP*2 + 'set end 00:00 2022/12/31\n' + + _SP + 'next\n' + + 'end\n') + + self.assertIn( + no_expiration_sig, str(no_expiration_acl), + '[%s]' % str(no_expiration_acl)) + self.assertTrue( + expiration_config_sig in str(expiration_acl) + and expiration_sig in str(expiration_acl), + '[%s]' % str(expiration_acl)) + + def testApplication_ID(self): + """ + Tests an application ID being used. + + """ + application_term = self.fmt.format(TERM_TEMPLATE, + add_fields={'application-id': '15816'}, + remove_fields=('src_addr', 'src_port')) + + application_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER_1 + application_term, + self.naming), EXP_INFO) + + application_sig = 'set application 15816' + + self.assertIn( + application_sig, str(application_acl), + '[%s]' % str(application_acl)) + + def testLogging(self): + """ + Tests logger input. + + """ + log_term = self.fmt.format(TERM_TEMPLATE, + logging='true') + no_log_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('logging',)) + + log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO) + no_log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO) + + log_sig = 'set logtraffic all' + + self.assertIn( + log_sig, str(log_acl), '[%s]' % str(log_acl)) + self.assertNotIn( + log_sig, str(no_log_term), '[%s]' % str(no_log_acl)) + + def testDuplicateTermError(self): + """ + Tests for duplicate term detection. + + """ + term = self.fmt.format(TERM_TEMPLATE, logging='true') + duplicate_terms = term + term + parsed_p = policy.ParsePolicy(GOOD_HEADER + duplicate_terms, + self.naming) + + self.assertRaises(fortigate.FortiGateDuplicateTermError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testPortMap(self): + """ + Tests port map object. + + """ + port_map = fortigate.FortigatePortMap() + self.assertEqual('SSH', port_map.get_protocol('tcp', '22')) + self.assertRaises(fortigate.FortiGatePortDoesNotExistError, + port_map.get_protocol, + 'tcp', 5000) + self.assertRaises(fortigate.FortiGateValueError, + port_map.get_protocol, + 'bad_proto', 22) From e67d1692323efa740c901e90ab431cc5f20fdb87 Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Tue, 29 Oct 2024 11:02:49 -0700 Subject: [PATCH 2/7] Add support for Fortigate Local-In Policy. --- capirca/aclgen.py | 11 +- capirca/lib/fortigatelocalin.py | 97 +++++ policies/pol/sample_fortigate_localin.pol | 50 +++ policies/pol/sample_multitarget.pol | 2 + tests/lib/fortigatelocalin_test.py | 496 ++++++++++++++++++++++ 5 files changed, 655 insertions(+), 1 deletion(-) create mode 100644 capirca/lib/fortigatelocalin.py create mode 100644 policies/pol/sample_fortigate_localin.pol create mode 100644 tests/lib/fortigatelocalin_test.py diff --git a/capirca/aclgen.py b/capirca/aclgen.py index f9374e20..783439b2 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -36,6 +36,7 @@ #from capirca.lib import gce_vpc_tf from capirca.lib import gcp_hf from capirca.lib import fortigate +from capirca.lib import fortigatelocalin from capirca.lib import ipset from capirca.lib import iptables from capirca.lib import juniper @@ -200,6 +201,7 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, k8s_pol = False gce_vpc_tf_pol = False fcl = False + lipfcl = False try: with open(input_file) as f: @@ -290,6 +292,8 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, k8s_pol = copy.deepcopy(pol) if 'fortigate' in platforms: fcl = copy.deepcopy(pol) + if 'fortigatelocalin' in platforms: + lipfcl = copy.deepcopy(pol) acl_obj: aclgenerator.ACLGenerator @@ -456,6 +460,10 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, acl_obj = fortigate.Fortigate(fcl, exp_info) RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, input_file, write_files) + if lipfcl: + acl_obj = fortigatelocalin.FortigateLocalIn(lipfcl, exp_info) + RenderACL(str(acl_obj), acl_obj.SUFFIX, output_directory, + input_file, write_files) # TODO(robankeny) add additional errors. except ( @@ -475,7 +483,8 @@ def RenderFile(base_directory: str, input_file: pathlib.Path, gce_vpc_tf.Error, cloudarmor.Error, k8s.Error, - fortigate.Error) as e: + fortigate.Error, + fortigatelocalin.Error) as e: raise ACLGeneratorError('Error generating target ACL for %s:\n%s' % (input_file, e)) diff --git a/capirca/lib/fortigatelocalin.py b/capirca/lib/fortigatelocalin.py new file mode 100644 index 00000000..b43f061e --- /dev/null +++ b/capirca/lib/fortigatelocalin.py @@ -0,0 +1,97 @@ +# Copyright 2022 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Fortigate local-in generator. + +This is a subclass of Fortigate generator. +""" + +from capirca.lib import fortigate + +class FortigateLocalIn(fortigate.Fortigate): + """Fortigate local-in generator.""" + + _PLATFORM = 'fortigatelocalin' + + def __str__(self): + fw_policies = self._get_fw_policies() + + start_sys_settings = ['config sys setting'] + start_addresses_v4 = ['config firewall address'] + start_addresses_v6 = ['config firewall address6'] + start_addrgrps_v4 = ['config firewall addrgrp'] + start_addrgrps_v6 = ['config firewall addrgrp6'] + start_services = ['config firewall service custom'] + start_svcgrps = ['config firewall service group'] + start_schedules = ['config firewall schedule onetime'] + start_policies = ['config firewall local-in-policy'] + end = ['end'] + + sys_settings = [] + if self._obj_container.get_sys_settings(): + sys_settings = start_sys_settings + \ + self._obj_container.get_sys_settings() + \ + end + [''] + + fw_addresses = [] + if self._obj_container.get_fw_addresses(4): + fw_addresses += start_addresses_v4 + \ + self._obj_container.get_fw_addresses(4) + \ + end + [''] + if self._obj_container.get_fw_addresses(6): + fw_addresses += start_addresses_v6 + \ + self._obj_container.get_fw_addresses(6) + \ + end + [''] + + fw_addr_grps = [] + if self._obj_container.get_fw_addrgrps(4): + fw_addr_grps += start_addrgrps_v4 + \ + self._obj_container.get_fw_addrgrps(4) + \ + end + [''] + if self._obj_container.get_fw_addrgrps(6): + fw_addr_grps += start_addrgrps_v6 + \ + self._obj_container.get_fw_addrgrps(6) + \ + end + [''] + + fw_services = [] + if self._obj_container.get_fw_services(): + fw_services = start_services + \ + self._obj_container.get_fw_services() + \ + end + [''] + + fw_svc_grps = [] + if self._obj_container.get_fw_svcgrps(): + fw_svc_grps = start_svcgrps + \ + self._obj_container.get_fw_svcgrps() + \ + end + [''] + + fw_schedules = [] + if self._obj_container.get_fw_schedules(): + fw_schedules = start_schedules + \ + self._obj_container.get_fw_schedules() + \ + end + [''] + + fw_policies = start_policies + fw_policies + end + + target = sys_settings + fw_addresses + fw_addr_grps + \ + fw_services + fw_svc_grps + fw_schedules + fw_policies + + return '\n'.join(target) + +class Error(Exception): + pass + + +class FilterDirectionError(Error): + pass diff --git a/policies/pol/sample_fortigate_localin.pol b/policies/pol/sample_fortigate_localin.pol new file mode 100644 index 00000000..bff9f2c4 --- /dev/null +++ b/policies/pol/sample_fortigate_localin.pol @@ -0,0 +1,50 @@ +# Header Option: from-id -- Tells Capirca to number firewall +# policies starting at the provided integer. +# Header Option: ngfw-mode -- Default is profile. +# If Fortigate is using policy-based NGFW mode add 'ngfw-mode policy' + +header { + target:: fortigatelocalin +} + +term allow-web-outbound{ + source-address:: INTERNAL + destination-port:: HTTP HTTPS + source-port:: HTTP + protocol:: tcp udp + expiration:: 2020-12-20 + logging:: syslog + action:: accept +} + +term customers-policy { + destination-address:: INTERNAL + destination-exclude:: NTP_SERVERS + destination-port:: DNS HTTPS + source-exclude:: NTP_SERVERS + protocol:: tcp udp + action:: reject +} + +term customers-policy2 { + source-interface:: port2 + destination-interface:: port1 + source-address:: INTERNAL + source-port:: SMTP + destination-address:: MAIL_SERVERS + destination-port:: SMTP + protocol:: tcp udp + comment:: "this a test policy" + owner:: foo@google.com + action:: accept +} + +term ipv6-outbound{ + source-address:: LINKLOCAL + destination-address:: SITELOCAL LINKLOCAL + destination-port:: HTTP HTTPS + source-port:: HTTP + protocol:: tcp + expiration:: 2020-12-20 + action:: accept +} \ No newline at end of file diff --git a/policies/pol/sample_multitarget.pol b/policies/pol/sample_multitarget.pol index da4d1a12..8d60b2f4 100644 --- a/policies/pol/sample_multitarget.pol +++ b/policies/pol/sample_multitarget.pol @@ -20,6 +20,7 @@ header { target:: cisconx edge-inbound target:: ciscoxr edge-inbound target:: fortigate + target:: fortigatelocalin } #include 'includes/untrusted-networks-blocking.inc' @@ -65,6 +66,7 @@ header { target:: speedway OUTPUT target:: ciscoasa asa_out target:: fortigate + target:: fortigatelocalin } term deny-to-bad-destinations { diff --git a/tests/lib/fortigatelocalin_test.py b/tests/lib/fortigatelocalin_test.py new file mode 100644 index 00000000..b80cc95f --- /dev/null +++ b/tests/lib/fortigatelocalin_test.py @@ -0,0 +1,496 @@ +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unittest for fortigate policy rendering module.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import re +import string +import unittest +import mock + +from capirca.lib import fortigate +from capirca.lib import nacaddr +from capirca.lib import naming +from capirca.lib import policy + + +GOOD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate from-id 2 +} +""" + +GOOD_HEADER_1 = """ +header { + comment:: "this is a test acl" + target:: fortigate ngfw-mode policy-based +} +""" + +BAD_HEADER = """ +header { + comment:: "this is a test acl" + target:: fortigate edge-filter +} +""" + +TERM_TEMPLATE = """ +term good-term-2 {{ + source-interface:: {src_interface} + destination-interface:: {dest_interface} + protocol:: {protocol} + destination-address:: {dest_addr} + destination-port:: {dest_port} + source-address:: {src_addr} + source-port:: {src_port} + action:: {action} + logging:: {logging} +}} +""" + +_SP = ' ' + +EXP_INFO = 2 + + +class CustomFormatter(string.Formatter): + """ + Checks the custom formatter for fortigate output. + + """ + DEFAULT_VALUES = { + 'src_interface': 'wan1', + 'dest_interface': 'wan2', + 'protocol': 'tcp', + 'src_addr': 'SOME_HOST', + 'dest_addr': 'SOME_HOST', + 'src_port': 'HTTP', + 'dest_port': 'HTTP', + 'action': 'accept', + 'logging': 'true' + } + + def format(*args, **kwargs): + if 'remove_fields' in kwargs or 'add_fields' in kwargs: + args = list(args) + + if 'remove_fields' in kwargs: + for field in kwargs['remove_fields']: + remove_regex = '.*' + field + '.*' + args[1] = re.sub(remove_regex, '', args[1]) + + if 'add_fields' in kwargs: + add_fields_string = "" + for field, value in kwargs['add_fields'].items(): + add_fields_string += " " + field + ":: " + value + "\n" + args[1] = args[1][:-3] + add_fields_string + args[1][-3:] + + return string.Formatter.format(*args, **kwargs) + + def get_value(self, key, args, kwds): + try: + return kwds[key] + except KeyError: + return self.DEFAULT_VALUES[key] + + +class FortigateTest(unittest.TestCase): + """ + Fortigate test class. + + """ + def setUp(self): + self.naming = mock.create_autospec(naming.Naming) + + def get_addr_side_eff(host): + hosts = { + 'SOME_HOST': [nacaddr.IP('10.0.0.0/8')], + 'SOME_HOST2': [nacaddr.IP('20.0.0.0/8')], + 'SOME_HOST6': [nacaddr.IP('fec0::/10')] + } + return hosts[host] + + def get_port_side_eff(*args): + hosts = { + 'HTTP': ['80'], + 'HTTPS': ['443'], + 'SSH': ['22'], + 'WHOIS': ['43'] + } + return hosts[args[0]] + + self.naming.GetNetAddr.side_effect = get_addr_side_eff + self.naming.GetServiceByProto.side_effect = get_port_side_eff + self.fmt = CustomFormatter() + + def testGoodHeader(self): + """ + Tests a good header value. + + """ + term = self.fmt.format(TERM_TEMPLATE) + acl = fortigate.Fortigate(policy.ParsePolicy(GOOD_HEADER + term, + self.naming), EXP_INFO) + + expected_sig = 'edit 2' + + get_net_calls = [mock.call('SOME_HOST')] * 2 + get_server_by_proto_calls = [mock.call('HTTP', 'tcp')] * 2 + + self.assertIn(expected_sig, str(acl), '[%s]' % str(acl)) + self.naming.GetNetAddr.assert_has_calls(get_net_calls) + self.naming.GetServiceByProto.assert_has_calls(get_server_by_proto_calls) + + def testBadHeader(self): + """ + Tests a bad header value. + + """ + term = self.fmt.format(TERM_TEMPLATE) + parsed_p = policy.ParsePolicy(BAD_HEADER + term, + self.naming) + + self.assertRaises(fortigate.FilterError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testAction(self): + """ + Tests the action detection. + + """ + accept_term = self.fmt.format(TERM_TEMPLATE, action='accept') + deny_term = self.fmt.format(TERM_TEMPLATE, action='deny') + reject_term = self.fmt.format(TERM_TEMPLATE, action='reject') + + accept_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + accept_term, + self.naming), EXP_INFO) + deny_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + deny_term, + self.naming), EXP_INFO) + reject_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + reject_term, + self.naming), EXP_INFO) + + accept_sig = 'set action accept' + deny_sig = 'set action deny' + reject_sig = 'set send-deny-packet enable' + reject_sys_sig = ('config sys setting\n' + + _SP + 'set deny-tcp-with-icmp enable\n' + + 'end\n') + + self.assertIn( + accept_sig, str(accept_acl), '[%s]' % str(accept_acl)) + self.assertIn( + deny_sig, str(deny_sig), '[%s]' % str(deny_acl)) + self.assertIn( + reject_sys_sig, str(reject_acl), '[%s]' % str(reject_acl)) + self.assertTrue( + deny_sig in str(reject_acl) and reject_sig in str(reject_acl), + '[%s]' % str(reject_acl)) + + def testAddresses(self): + """ + Tests an address object. + + """ + diff_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST', + dest_addr='SOME_HOST2') + same_addr_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST2', + dest_addr='SOME_HOST2') + any_src_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_addr',)) + any_dest_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_addr',)) + # testing for IPv6 + same_addr6_term = self.fmt.format(TERM_TEMPLATE, + src_addr='SOME_HOST6', + dest_addr='SOME_HOST6') + + diff_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + diff_addr_term, + self.naming), EXP_INFO) + + same_addr_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr_term, + self.naming), EXP_INFO) + + any_src_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_src_term, + self.naming), EXP_INFO) + + any_dest_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + any_dest_term, + self.naming), EXP_INFO) + + same_addr6_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + same_addr6_term, + self.naming), EXP_INFO) + + src_sig = 'set srcaddr "10.0.0.0/8"' + dest_sig = 'set dstaddr "20.0.0.0/8"' + any_dest_sig = 'set dstaddr "all"' + any_src_sig = 'set srcaddr "all"' + src_sig_v6 = 'set srcaddr6 "fec0::/10"' + dest_sig_v6 = 'set dstaddr6 "fec0::/10"' + + self.assertTrue( + src_sig in str(diff_addr_acl) and dest_sig in str(diff_addr_acl), + '[%s]' % str(diff_addr_acl)) + # [] check acl generate one 'set subnet' for dup addresses + self.assertEqual( + str(same_addr_acl).count('set subnet'), 1) + self.assertIn( + any_src_sig, str(any_src_acl), '[%s]' % str(any_src_acl)) + self.assertIn( + any_dest_sig, str(any_dest_acl), '[%s]' % str(any_dest_acl)) + self.assertTrue( + src_sig_v6 in str(same_addr6_acl) + and dest_sig_v6 in str(same_addr6_acl), + '[%s]' % str(same_addr6_acl)) + + def testServices(self): + """ + Tests services objects. + + """ + dest_only_term = self.fmt.format(TERM_TEMPLATE, + dest_port='HTTP', + remove_fields=('src_port',)) + diff_port_term = self.fmt.format(TERM_TEMPLATE, + dest_port='HTTP HTTPS', + remove_fields=('src_port',)) + dup_port_term = self.fmt.format(TERM_TEMPLATE, + src_port='HTTP', + dest_port='HTTP') + icmp_term = self.fmt.format(TERM_TEMPLATE, + protocol='icmp', + add_fields={'icmp-type': 'echo-request'}, + remove_fields=('src_addr', 'dest_addr', + 'dest_port', 'src_port')) + ip_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('dest_port', 'src_port')) + custom_port_term = self.fmt.format(TERM_TEMPLATE, src_port='WHOIS') + #print("\icmp_term=========\n", icmp_term) + + dest_only_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dest_only_term, + self.naming), EXP_INFO) + diff_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + diff_port_term, + self.naming), EXP_INFO) + dup_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + dup_port_term, + self.naming), EXP_INFO) + icmp_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + icmp_term, + self.naming), EXP_INFO) + ip_acl = fortigate.Fortigate(policy.ParsePolicy( + GOOD_HEADER + ip_term, + self.naming), EXP_INFO) + custom_port_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + custom_port_term, + self.naming), EXP_INFO) + #print("\ncustom_port_acl=========\n", custom_port_acl) + + dest_only_sig = 'set service HTTP\n' + diff_sig = 'set service HTTP HTTPS\n' + dup_sig = 'set service good-term-2-svc\n' + icmp_sig = 'set service icmp-type-echo-request\n' + ip_sig = 'set service ALL_TCP\n' + custom_port_sig = ('config firewall service custom\n' + + _SP + 'edit good-term-2-svc\n' + + _SP*2 + 'set comment "Generated by Capirca"\n' + + _SP*2 + 'set tcp-portrange 80:43\n' + + _SP + 'next\n') + + self.assertIn( + dest_only_sig, str(dest_only_acl), '[%s]' % str(dest_only_acl)) + self.assertIn( + diff_sig, str(diff_acl), '[%s]' % str(diff_acl)) + self.assertIn( + dup_sig, str(dup_acl), '[%s]' % str(dup_acl)) + self.assertIn( + icmp_sig, str(icmp_acl), '[%s]' % str(icmp_acl)) + self.assertIn( + ip_sig, str(ip_acl), '[%s]' % str(ip_acl)) + self.assertIn( + custom_port_sig, str(custom_port_acl), '[%s]' % str(custom_port_acl)) + + def testInterfaces(self): + """ + Tests interfaces. + + """ + no_interfaces_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('src_interface', + 'dest_interface')) + #print("no_interfaces_term=", no_interfaces_term) + src_only_int_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + remove_fields=('dest_interface',)) + dest_only_int_term = self.fmt.format(TERM_TEMPLATE, + dest_interface='wan2', + remove_fields=('src_interface',)) + both_interfaces_term = self.fmt.format(TERM_TEMPLATE, + src_interface='wan1', + dest_interface='wan2',) + + no_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_interfaces_term, + self.naming), EXP_INFO) + src_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + src_only_int_term, + self.naming), EXP_INFO) + dest_only_int_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + dest_only_int_term, + self.naming), EXP_INFO) + both_interfaces_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + both_interfaces_term, + self.naming), EXP_INFO) + + no_interfaces_sig = 'set srcintf any\n' + _SP*2 + 'set dstintf any' + src_int_only_sig = 'set srcintf wan1\n' + _SP*2 + 'set dstintf any' + dest_int_only_sig = 'set srcintf any\n' + _SP*2 + 'set dstintf wan2' + both_interfaces_sig = 'set srcintf wan1\n' + _SP*2 + 'set dstintf wan2' + + self.assertIn( + no_interfaces_sig, str(no_interfaces_acl), + '[%s]' % str(no_interfaces_acl)) + self.assertIn( + src_int_only_sig, str(src_only_int_acl), + '[%s]' % str(src_only_int_acl)) + self.assertIn( + dest_int_only_sig, str(dest_only_int_acl), + '[%s]' % str(dest_only_int_acl)) + self.assertIn( + both_interfaces_sig, str(both_interfaces_acl), + '[%s]' % str(both_interfaces_acl)) + + def testExpiration(self): + """ + Tests expiration / schedule object. + + """ + no_expiration_term = self.fmt.format(TERM_TEMPLATE) + expiration_term = self.fmt.format(TERM_TEMPLATE, + add_fields={'expiration': + '2022-12-31', + 'comment': + '"test expiration"'}) + + no_expiration_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_expiration_term, + self.naming), EXP_INFO) + expiration_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + expiration_term, + self.naming), EXP_INFO) + + no_expiration_sig = 'set schedule always' + expiration_sig = 'set schedule 2022/12/31_00:00' + expiration_config_sig = ('config firewall schedule onetime\n' + + _SP + 'edit 2022/12/31_00:00\n' + + _SP*2 + 'set end 00:00 2022/12/31\n' + + _SP + 'next\n' + + 'end\n') + + self.assertIn( + no_expiration_sig, str(no_expiration_acl), + '[%s]' % str(no_expiration_acl)) + self.assertTrue( + expiration_config_sig in str(expiration_acl) + and expiration_sig in str(expiration_acl), + '[%s]' % str(expiration_acl)) + + def testApplication_ID(self): + """ + Tests an application ID being used. + + """ + application_term = self.fmt.format(TERM_TEMPLATE, + add_fields={'application-id': '15816'}, + remove_fields=('src_addr', 'src_port')) + + application_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER_1 + application_term, + self.naming), EXP_INFO) + + application_sig = 'set application 15816' + + self.assertIn( + application_sig, str(application_acl), + '[%s]' % str(application_acl)) + + def testLogging(self): + """ + Tests logger input. + + """ + log_term = self.fmt.format(TERM_TEMPLATE, + logging='true') + no_log_term = self.fmt.format(TERM_TEMPLATE, + remove_fields=('logging',)) + + log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + log_term, + self.naming), EXP_INFO) + no_log_acl = fortigate.Fortigate( + policy.ParsePolicy(GOOD_HEADER + no_log_term, + self.naming), EXP_INFO) + + log_sig = 'set logtraffic all' + + self.assertIn( + log_sig, str(log_acl), '[%s]' % str(log_acl)) + self.assertNotIn( + log_sig, str(no_log_term), '[%s]' % str(no_log_acl)) + + def testDuplicateTermError(self): + """ + Tests for duplicate term detection. + + """ + term = self.fmt.format(TERM_TEMPLATE, logging='true') + duplicate_terms = term + term + parsed_p = policy.ParsePolicy(GOOD_HEADER + duplicate_terms, + self.naming) + + self.assertRaises(fortigate.FortiGateDuplicateTermError, + fortigate.Fortigate, + parsed_p, + EXP_INFO) + + def testPortMap(self): + """ + Tests port map object. + + """ + port_map = fortigate.FortigatePortMap() + self.assertEqual('SSH', port_map.get_protocol('tcp', '22')) + self.assertRaises(fortigate.FortiGatePortDoesNotExistError, + port_map.get_protocol, + 'tcp', 5000) + self.assertRaises(fortigate.FortiGateValueError, + port_map.get_protocol, + 'bad_proto', 22) From 1ca876a415e4470c33f1334305993e1e0feda253 Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Tue, 5 Nov 2024 20:54:00 -0800 Subject: [PATCH 3/7] Add support for local-in-policy for IPv4 and make policy IDs sequential --- capirca/lib/fortigate.py | 20 +- def/NETWORK.net | 43 ++++ .../sample_fortigate_localin_google_ipv4.pol | 36 ++++ .../sample_fortigate_localin_google_ipv6.pol | 201 ++++++++++++++++++ 4 files changed, 291 insertions(+), 9 deletions(-) create mode 100644 policies/pol/sample_fortigate_localin_google_ipv4.pol create mode 100644 policies/pol/sample_fortigate_localin_google_ipv6.pol diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index 45e8d2c4..ada220fc 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -162,9 +162,9 @@ def get_protocol(protocol, port=None): return f_proto[port] except KeyError as exc: raise FortiGatePortDoesNotExistError(f"Port {exc} does not exist") from exc - else: - raise FortiGateFindServiceError( - f'service not found from {protocol} protocol and {port} port') + # else: + # raise FortiGateFindServiceError( + # f'service not found from {protocol} protocol and {port} port') class ObjectsContainer(): @@ -528,9 +528,9 @@ def _get_services_name(self, protocols, destination_ports, source_ports): portranges = {} for protocol in protocols: port_map = FortigatePortMap() - if port_map.get_protocol(protocol) is None: - raise FortiGateValueError( - f"fortigate does not support {protocol} protocol") + # if port_map.get_protocol(protocol) is None: + # raise FortiGateValueError( + # f"fortigate does not support {protocol} protocol") if protocol in {'icmp', 'icmpv6'}: ip_v = 4 if protocol == 'icmp' else 6 @@ -683,7 +683,7 @@ def __str__(self): if self._term.owner: self._term.comment += [f"Owner: {self._term.owner}"] if self._term.comment and self._term.verbose: - lines += [f"{_SP * 2} set comments {self._obj_container.FixCommentLength((' ').join(self._term.comment))}"] + lines += [f"{_SP * 2} set comments {self._obj_container.fix_comment_length((' ').join(self._term.comment))}"] lines += [f"{_SP * 2} set srcintf {self._term.source_interface or 'any'}"] lines += [f"{_SP * 2} set dstintf {self._term.destination_interface or 'any'}"] exist_src6 = False @@ -725,7 +725,7 @@ def __str__(self): if services: if self._NGFW_MODE == 'policy-based': lines += [_SP * 2 + 'set enforce-default-app-port disable'] - lines += [f"{_SP * 2} set service '{', '.join(services)}'"] + lines += [f"{_SP * 2} set service '{services}'"] else: if self._NGFW_MODE == 'policy-based': lines += [_SP * 2 + 'set enforce-default-app-port enable'] @@ -884,12 +884,14 @@ def _TranslatePolicy(self, pol, exp_info): def _get_fw_policies(self): target_policies = [] + policy_id = 0 for (_, _, term) in self.fortigate_policies: term_str = str(term) if term_str != '': - target_policies += [_SP + f'edit {term.id}'] + target_policies += [_SP + f'edit {policy_id}'] target_policies += [term_str] target_policies += [_SP + 'next'] + policy_id += 1 return target_policies diff --git a/def/NETWORK.net b/def/NETWORK.net index e636805d..659c80ac 100644 --- a/def/NETWORK.net +++ b/def/NETWORK.net @@ -100,3 +100,46 @@ PUBLIC_IPV6_SERVERS = 2606:700:e:550:b01a::b00a # Example public web server WEB_IPV6_SERVERS = 2620:15c:2c4:202:b0e7:158f:6a7a:3188/128 # Example web server +HTTPS-PROXY-1 = 255.255.255.255 # HTTPS-PROXY-1 +HTTPS-PROXY-2 = 255.255.255.255 # HTTPS-PROXY-2 +HTTPS-PROXY-3 = 255.255.255.255 # HTTPS-PROXY-3 +HTTPS-PROXY-4 = 255.255.255.255 # HTTPS-PROXY-4 + +HTTPS-PROXY-NETS = HTTPS-PROXY-1 + HTTPS-PROXY-2 + HTTPS-PROXY-3 + HTTPS-PROXY-4 + +SNMP-SERVER-1 = 255.255.255.255 # SNMP-SERVER-1 +SNMP-SERVER-2 = 255.255.255.255 # SNMP-SERVER-2 +SNMP-SERVER-3 = 255.255.255.255 # SNMP-SERVER-3 +SNMP-SERVER-4 = 255.255.255.255 # SNMP-SERVER-4 + +SNMP-SERVERS-NETS = SNMP-SERVER-1 + SNMP-SERVER-2 + SNMP-SERVER-3 + SNMP-SERVER-4 + +DNS-VIP-1 = 255.255.255.255 # DNS-VIP-1 +GNTP-VIP-1 = 255.255.255.255 # GNTP-VIP-1 +SYSLOG-SERVER-1 = 255.255.255.255 # SYSLOG-SERVER-1 +SSH-RELAY-1 = 255.255.255.255 # SSH-RELAY-1 +NETFLOW-ANYCAST-1 = 255.255.255.255 # NETFLOW-ANYCAST-1 +GATEWAYS-LOOPBACK-1 = 0/48 +RFC1918-10 = 10.0.0.0 255.0.0.0 +RFC1918-172 = 172.16.0.0 255.240.0.0 +RFC1918-192 = 192.168.0.0 255.255.0.0 +RFC6598 = 100.64.0.0 255.192.0.0 +REGIONAL-FGM-1 = 255.255.255.255 +REGIONAL-FGM-2 = 255.255.255.255 +REGIONAL-LOOPBACKS = 255.255.255.0 +P2P-RANGE = 255.255.255.0 + +RFC1918-RFC6598 = RFC1918-10 + RFC1918-172 + RFC1918-192 + RFC6598 + +SSH-RELAY-NET = SSH-RELAY-1 +REGIONAL-FGMS = REGIONAL-FGM-1 + REGIONAL-FGM-2 \ No newline at end of file diff --git a/policies/pol/sample_fortigate_localin_google_ipv4.pol b/policies/pol/sample_fortigate_localin_google_ipv4.pol new file mode 100644 index 00000000..68a6cf7c --- /dev/null +++ b/policies/pol/sample_fortigate_localin_google_ipv4.pol @@ -0,0 +1,36 @@ +header { + target:: fortigatelocalin +} + +term accept-https-requests{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: HTTPS-PROXY-NETS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-https-requests" + action:: accept +} + +term accept-ssh-requests{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: SSH-RELAY-NET + destination-address:: REGIONAL-LOOPBACKS + destination-port:: SSH + protocol:: tcp + comment:: "accept-ssh-requests" + action:: accept +} + +term accept-snmp-requests{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: SNMP-SERVERS-NETS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: SNMP + protocol:: udp + comment:: "accept-snmp-requests" + action:: accept +} diff --git a/policies/pol/sample_fortigate_localin_google_ipv6.pol b/policies/pol/sample_fortigate_localin_google_ipv6.pol new file mode 100644 index 00000000..62f5c9b4 --- /dev/null +++ b/policies/pol/sample_fortigate_localin_google_ipv6.pol @@ -0,0 +1,201 @@ +terms: { + name: "accept-https-requests-v6" + source_net: "http-porxy6-nets" + destination_net: "regional-loopbacks6" + interface: "loopback-1" + source_protocols: { + ip_protocol: TCP + } + destination_protocols: { + port_spec: { + port: 443 + } + ip_protocol: TCP + } + fields: { + action: ACCEPT + priority_offset: -1 + } +} +terms: { + name: "accept-ssh-requests-v6" + source_net: "ssh-relay6-nets" + destination_net: "regional-loopbacks6" + interface: "loopback-1" + source_protocols: { + port_spec: { + lowest_port: 1024 + highest_port: 65535 + } + ip_protocol: TCP + } + destination_protocols: { + port_spec: { + port: 22 + } + ip_protocol: TCP + } + fields: { + action: ACCEPT + priority_offset: -2 + } +} +terms: { + name: "accept-snmp-requests-v6" + source_net: "snmp-servers-nets-v6" + destination_net: "regional-loopbacks6" + interface: "loopback-1" + source_protocols: { + ip_protocol: UDP + } + destination_protocols: { + port_spec: { + port: 161 + } + ip_protocol: UDP + } + fields: { + action: ACCEPT + priority_offset: -3 + } +} +terms: { + name: "accept-bgp-to-loopbacks-v6" + source_net: "loopback-nets-v6" + destination_net: "regional-loopbacks6" + interface: "loopback-2" + source_protocols: { + port_spec: { + lowest_port: 1024 + highest_port: 65535 + } + ip_protocol: TCP + } + destination_protocols: { + port_spec: { + port: 179 + } + ip_protocol: TCP + } + fields: { + action: ACCEPT + priority_offset: -4 + } +} +terms: { + name: "accept-esp-ah-tunnel-wan1-v6" + source_net: "p2p-range6" + destination_net: "all" + interface: "WAN-INTF-1" + source_protocols: { + ip_protocol: AH + ip_protocol: ESP + } + destination_protocols: { + ip_protocol: AH + ip_protocol: ESP + } + fields: { + action: ACCEPT + priority_offset: -5 + } +} +terms: { + name: "accept-ike-tunnel-wan1-v6" + source_net: "p2p-range6" + destination_net: "all" + interface: "WAN-INTF-1" + source_protocols: + port_spec: { + port: 500 + port: 4500 + } + ip_protocol: UDP + } + fields: { + action: ACCEPT + priority_offset: -6 + } +} +terms: { + name: "accept-esp-ah-tunnel-wan2-v6" + source_net: "p2p-range6" + destination_net: "all" + interface: "WAN-INTF-2" + source_protocols: { + ip_protocol: AH + ip_protocol: ESP + } + destination_protocols: { + ip_protocol: AH + ip_protocol: ESP + } + fields: { + action: ACCEPT + priority_offset: -7 + } +} +terms: { + name: "accept-ike-tunnel-wan2-v6" + source_net: "p2p-range6" + destination_net: "all" + interface: "WAN-INTF-2" + source_protocols: + port_spec: { + port: 500 + port: 4500 + } + ip_protocol: UDP + } + fields: { + action: ACCEPT + priority_offset: -8 + } +} +terms: { + name: "accept-bgp-wan-v6" + source_net: "all" + destination_net: "all" + interface: "WAN-INTF1" + source_protocols: { + port_spec: { + lowest_port: 1024 + highest_port: 65535 + } + ip_protocol: TCP + } + destination_protocols: { + port_spec: { + port: 179 + } + ip_protocol: TCP + } + fields: { + action: ACCEPT + priority_offset: -7 + } +} +terms: { + name: "accept-fgm-v6" + source_net: "regional-fgms6" + destination_net: "all" + interface: "WAN-INTF1" + source_protocols: { + port_spec: { + lowest_port: 1024 + highest_port: 65535 + } + ip_protocol: TCP + } + destination_protocols: { + port_spec: { + port: 541 + port: 542 + } + ip_protocol: TCP + } + fields: { + action: ACCEPT + priority_offset: -8 + } +} \ No newline at end of file From b650d1ad76bef298f0129f3dfdb1c1532ac0f877 Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Fri, 8 Nov 2024 07:26:19 -0800 Subject: [PATCH 4/7] Enhanced and tested IPv4/IPv6 network examples and policy samples, with full code cleanup and improved consistency --- capirca/lib/fortigate.py | 35 +-- def/NETWORK.net | 31 +- .../sample_fortigate_localin_google_ipv4.pol | 77 +++++ .../sample_fortigate_localin_google_ipv6.pol | 294 ++++++------------ 4 files changed, 213 insertions(+), 224 deletions(-) diff --git a/capirca/lib/fortigate.py b/capirca/lib/fortigate.py index ada220fc..85a0b499 100644 --- a/capirca/lib/fortigate.py +++ b/capirca/lib/fortigate.py @@ -162,9 +162,9 @@ def get_protocol(protocol, port=None): return f_proto[port] except KeyError as exc: raise FortiGatePortDoesNotExistError(f"Port {exc} does not exist") from exc - # else: - # raise FortiGateFindServiceError( - # f'service not found from {protocol} protocol and {port} port') + else: + raise FortiGateFindServiceError( + f'service not found from {protocol} protocol and {port} port') class ObjectsContainer(): @@ -500,8 +500,6 @@ def _get_services_name(self, protocols, destination_ports, source_ports): Returns: string (all services separated by spaces). """ - #if not protocols: - # raise FortiGateFindServiceError('protocol not found') ports = set() # fortigate does not allow empty destination_ports @@ -527,11 +525,6 @@ def _get_services_name(self, protocols, destination_ports, source_ports): services = set() portranges = {} for protocol in protocols: - port_map = FortigatePortMap() - # if port_map.get_protocol(protocol) is None: - # raise FortiGateValueError( - # f"fortigate does not support {protocol} protocol") - if protocol in {'icmp', 'icmpv6'}: ip_v = 4 if protocol == 'icmp' else 6 icmp_type_dict = {} @@ -707,17 +700,6 @@ def __str__(self): elif not exist_src6 and exist_dst6: lines += [_SP * 2 + 'set srcaddr6 "all"'] - # if self._term.destination_address_exclude - # and not self._term.destination_address: - # lines += [_SP*2 + 'set dstaddr-negate enable'] - # if self._term.source_address_exclude - # and not self._term.source_address: - # lines += [_SP*2 + 'set srcaddr-negate enable'] - - # process verbatim items - # if self._term.verbatim: - # lines.extend(self._process_verbatim_item()) - lines += [f"{_SP * 2} set action {action if action == 'accept' else 'deny'}"] if action == 'reject': lines += [_SP * 2 + 'set send-deny-packet enable'] @@ -729,23 +711,12 @@ def __str__(self): else: if self._NGFW_MODE == 'policy-based': lines += [_SP * 2 + 'set enforce-default-app-port enable'] - - - # if self._NGFW_MODE == 'profile-based': - # if self._term.av_profile or self._term.webfilter_profile or - # self._term.ssl_ssh_profile or self._term.dnsfilter_profile or - # self._term.ips_sensor: - # lines += [_SP*2 + 'set utm-status enable'] if self._NGFW_MODE == 'policy-based' and self._term.application_id: lines += [f"{_SP * 2} set application {' '.join(str(v) for v in sorted(self._term.application_id))}"] lines += [f"{_SP * 2} set schedule {schedule_name if schedule_name else 'always'}"] - # opts = [str(x) for x in self._term.option] - # if ('tcp-established' in opts or 'established' in opts): - # lines += [_SP*2 + 'set something'] - if self._term.logging: if self._term.logging == 'log-both': lines += [_SP * 2 + 'set logtraffic all'] diff --git a/def/NETWORK.net b/def/NETWORK.net index 659c80ac..6cc80c75 100644 --- a/def/NETWORK.net +++ b/def/NETWORK.net @@ -142,4 +142,33 @@ RFC1918-RFC6598 = RFC1918-10 SSH-RELAY-NET = SSH-RELAY-1 REGIONAL-FGMS = REGIONAL-FGM-1 - REGIONAL-FGM-2 \ No newline at end of file + REGIONAL-FGM-2 + +HTTP-PROXY6-1 = 1:1:1:1::/128 # HTTP-PROXY6-1 +HTTP-PROXY6-2 = 2:2:2:2::/128 # HTTP-PROXY6-2 +HTTP-PROXY6-3 = 3:3:3:3::/128 # HTTP-PROXY6-3 +HTTP-PROXY6-4 = 4:4:4:4::/128 # HTTP-PROXY6-4 + +HTTP-PROXY6-NETS = HTTP-PROXY6-1 + HTTP-PROXY6-2 + HTTP-PROXY6-3 + HTTP-PROXY6-4 + +SNMP-SERVER6-1 = 1:1:1:1::/128 # SNMP-SERVER6-1 +SNMP-SERVER6-2 = 2:2:2:2::/128 # SNMP-SERVER6-2 +SNMP-SERVER6-3 = 3:3:3:3::/128 # SNMP-SERVER6-3 +SNMP-SERVER6-4 = 4:4:4:4::/128 # SNMP-SERVER6-4 + +SNMP-SERVER6-NETS = SNMP-SERVER6-1 + SNMP-SERVER6-2 + SNMP-SERVER6-3 + SNMP-SERVER6-4 + +SYSLOG-SERVER6 = 1:1:1:1::/128 # SYSLOG-SERVER6 +SSH-RELAY6 = 2:2:2:2::/128 # SSH-RELAY6 + +SSH-RELAY6-NETS = SSH-RELAY6 + +P2P-RANGE6 = 2:2:2:2::/48 # P2P-RANGE6 +REGIONAL-LOOPBACKS6 = 3:3:3:3::/48 # REGIONAL-LOOPBACKS6 +REGIONAL-FGMS6 = 3:3:3:3::/48 # REGIONAL-FGMS6 \ No newline at end of file diff --git a/policies/pol/sample_fortigate_localin_google_ipv4.pol b/policies/pol/sample_fortigate_localin_google_ipv4.pol index 68a6cf7c..886a2655 100644 --- a/policies/pol/sample_fortigate_localin_google_ipv4.pol +++ b/policies/pol/sample_fortigate_localin_google_ipv4.pol @@ -34,3 +34,80 @@ term accept-snmp-requests{ comment:: "accept-snmp-requests" action:: accept } + +term accept-bgp-loopbacks{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: REGIONAL-LOOPBACKS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: BGP + protocol:: tcp + comment:: "accept-bgp-loopbacks" + action:: accept +} + +term accept-esp-ah-tunnel-wan1{ + source-interface:: x1.15 + destination-interface:: x1.15 + source-address:: P2P-RANGE + destination-address:: ANY + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-esp-ah-tunnel-wan1" + action:: accept +} + +term accept-ike-tunnel-wan1{ + source-interface:: x1.15 + destination-interface:: x1.15 + source-address:: P2P-RANGE + destination-address:: ANY + destination-port:: IKE + protocol:: udp + comment:: "accept-ike-tunnel-wan1" + action:: accept +} + +term accept-esp-ah-tunnel-wan2{ + source-interface:: x3.1 + destination-interface:: x3.1 + source-address:: P2P-RANGE + destination-address:: ANY + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-esp-ah-tunnel-wan2" + action:: accept +} + +term accept-ike-tunnel-wan2{ + source-interface:: x3.1 + destination-interface:: x3.1 + source-address:: P2P-RANGE + destination-address:: ANY + destination-port:: IKE + protocol:: udp + comment:: "accept-ike-tunnel-wan2" + action:: accept +} + +term accept-bgp-wan{ + source-interface:: x3.2 + destination-interface:: x3.2 + source-address:: P2P-RANGE + destination-address:: ANY + destination-port:: BGP + protocol:: tcp + comment:: "accept-bgp-wan" + action:: accept +} + +term accept-Fortigate-managers-request{ + source-interface:: x5.5 + destination-interface:: x5.5 + source-address:: REGIONAL-FGMS + destination-address:: ANY + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-Fortigate-managers-request" + action:: accept +} \ No newline at end of file diff --git a/policies/pol/sample_fortigate_localin_google_ipv6.pol b/policies/pol/sample_fortigate_localin_google_ipv6.pol index 62f5c9b4..8787cb5e 100644 --- a/policies/pol/sample_fortigate_localin_google_ipv6.pol +++ b/policies/pol/sample_fortigate_localin_google_ipv6.pol @@ -1,201 +1,113 @@ -terms: { - name: "accept-https-requests-v6" - source_net: "http-porxy6-nets" - destination_net: "regional-loopbacks6" - interface: "loopback-1" - source_protocols: { - ip_protocol: TCP - } - destination_protocols: { - port_spec: { - port: 443 - } - ip_protocol: TCP - } - fields: { - action: ACCEPT - priority_offset: -1 - } +header { + target:: fortigatelocalin } -terms: { - name: "accept-ssh-requests-v6" - source_net: "ssh-relay6-nets" - destination_net: "regional-loopbacks6" - interface: "loopback-1" - source_protocols: { - port_spec: { - lowest_port: 1024 - highest_port: 65535 - } - ip_protocol: TCP - } - destination_protocols: { - port_spec: { - port: 22 - } - ip_protocol: TCP - } - fields: { - action: ACCEPT - priority_offset: -2 - } + +term accept-https-requests-v6{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: HTTP-PROXY6-NETS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-https-requests-v6" + action:: accept } -terms: { - name: "accept-snmp-requests-v6" - source_net: "snmp-servers-nets-v6" - destination_net: "regional-loopbacks6" - interface: "loopback-1" - source_protocols: { - ip_protocol: UDP - } - destination_protocols: { - port_spec: { - port: 161 - } - ip_protocol: UDP - } - fields: { - action: ACCEPT - priority_offset: -3 - } + +term accept-ssh-requests-v6{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: SSH-RELAY6-NETS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: SSH + protocol:: tcp + comment:: "accept-ssh-requests-v6" + action:: accept } -terms: { - name: "accept-bgp-to-loopbacks-v6" - source_net: "loopback-nets-v6" - destination_net: "regional-loopbacks6" - interface: "loopback-2" - source_protocols: { - port_spec: { - lowest_port: 1024 - highest_port: 65535 - } - ip_protocol: TCP - } - destination_protocols: { - port_spec: { - port: 179 - } - ip_protocol: TCP - } - fields: { - action: ACCEPT - priority_offset: -4 - } + +term accept-snmp-requests-v6{ + source-interface:: loopback-1 + destination-interface:: loopback-1 + source-address:: SNMP-SERVER6-NETS + destination-address:: REGIONAL-LOOPBACKS + destination-port:: SNMP + protocol:: udp + comment:: "accept-snmp-requests-v6" + action:: accept } -terms: { - name: "accept-esp-ah-tunnel-wan1-v6" - source_net: "p2p-range6" - destination_net: "all" - interface: "WAN-INTF-1" - source_protocols: { - ip_protocol: AH - ip_protocol: ESP - } - destination_protocols: { - ip_protocol: AH - ip_protocol: ESP - } - fields: { - action: ACCEPT - priority_offset: -5 - } + +term accept-bgp-to-loopbacks-v6{ + source-interface:: loopback-2 + destination-interface:: loopback-2 + source-address:: REGIONAL-LOOPBACKS6 + destination-address:: REGIONAL-LOOPBACKS6 + destination-port:: BGP + protocol:: tcp + comment:: "accept-bgp-to-loopbacks-v6" + action:: accept } -terms: { - name: "accept-ike-tunnel-wan1-v6" - source_net: "p2p-range6" - destination_net: "all" - interface: "WAN-INTF-1" - source_protocols: - port_spec: { - port: 500 - port: 4500 - } - ip_protocol: UDP - } - fields: { - action: ACCEPT - priority_offset: -6 - } + +term accept-esp-ah-tunnel-wan1-v6{ + source-interface:: x1.15 + destination-interface:: x1.15 + source-address:: P2P-RANGE6 + destination-address:: ANY_V6 + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-esp-ah-tunnel-wan1-v6" + action:: accept } -terms: { - name: "accept-esp-ah-tunnel-wan2-v6" - source_net: "p2p-range6" - destination_net: "all" - interface: "WAN-INTF-2" - source_protocols: { - ip_protocol: AH - ip_protocol: ESP - } - destination_protocols: { - ip_protocol: AH - ip_protocol: ESP - } - fields: { - action: ACCEPT - priority_offset: -7 - } + +term accept-ike-tunnel-wan1-v6{ + source-interface:: x1.15 + destination-interface:: x1.15 + source-address:: P2P-RANGE6 + destination-address:: ANY_V6 + destination-port:: IKE + protocol:: udp + comment:: "accept-ike-tunnel-wan1-v6" + action:: accept } -terms: { - name: "accept-ike-tunnel-wan2-v6" - source_net: "p2p-range6" - destination_net: "all" - interface: "WAN-INTF-2" - source_protocols: - port_spec: { - port: 500 - port: 4500 - } - ip_protocol: UDP - } - fields: { - action: ACCEPT - priority_offset: -8 - } + +term accept-esp-ah-tunnel-wan2-v6{ + source-interface:: x3.1 + destination-interface:: x3.1 + source-address:: P2P-RANGE6 + destination-address:: ANY_V6 + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-esp-ah-tunnel-wan2-v6" + action:: accept } -terms: { - name: "accept-bgp-wan-v6" - source_net: "all" - destination_net: "all" - interface: "WAN-INTF1" - source_protocols: { - port_spec: { - lowest_port: 1024 - highest_port: 65535 - } - ip_protocol: TCP - } - destination_protocols: { - port_spec: { - port: 179 - } - ip_protocol: TCP - } - fields: { - action: ACCEPT - priority_offset: -7 - } + +term accept-ike-tunnel-wan2-v6{ + source-interface:: x3.1 + destination-interface:: x3.1 + source-address:: P2P-RANGE6 + destination-address:: ANY_V6 + destination-port:: IKE + protocol:: udp + comment:: "accept-ike-tunnel-wan2-v6" + action:: accept } -terms: { - name: "accept-fgm-v6" - source_net: "regional-fgms6" - destination_net: "all" - interface: "WAN-INTF1" - source_protocols: { - port_spec: { - lowest_port: 1024 - highest_port: 65535 - } - ip_protocol: TCP - } - destination_protocols: { - port_spec: { - port: 541 - port: 542 - } - ip_protocol: TCP - } - fields: { - action: ACCEPT - priority_offset: -8 - } + +term accept-bgp-wan-v6{ + source-interface:: x6.15 + destination-interface:: x6.15 + source-address:: ANY_V6 + destination-address:: ANY_V6 + destination-port:: BGP + protocol:: tcp + comment:: "accept-bgp-wan-v6" + action:: accept +} + +term accept-fgm-v6{ + source-interface:: x6.15 + destination-interface:: v6.15 + source-address:: REGIONAL-FGMS6 + destination-address:: ANY_V6 + destination-port:: HTTPS + protocol:: tcp + comment:: "accept-fgm-v6" + action:: accept } \ No newline at end of file From 592c9986777b6f8ef20dd187f6caba3fe10f27fc Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Fri, 8 Nov 2024 07:49:28 -0800 Subject: [PATCH 5/7] Fix: Use IPv6 regional-loopbacks in the policy sample --- policies/pol/sample_fortigate_localin_google_ipv6.pol | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/policies/pol/sample_fortigate_localin_google_ipv6.pol b/policies/pol/sample_fortigate_localin_google_ipv6.pol index 8787cb5e..96fbc587 100644 --- a/policies/pol/sample_fortigate_localin_google_ipv6.pol +++ b/policies/pol/sample_fortigate_localin_google_ipv6.pol @@ -6,7 +6,7 @@ term accept-https-requests-v6{ source-interface:: loopback-1 destination-interface:: loopback-1 source-address:: HTTP-PROXY6-NETS - destination-address:: REGIONAL-LOOPBACKS + destination-address:: REGIONAL-LOOPBACKS6 destination-port:: HTTPS protocol:: tcp comment:: "accept-https-requests-v6" @@ -17,7 +17,7 @@ term accept-ssh-requests-v6{ source-interface:: loopback-1 destination-interface:: loopback-1 source-address:: SSH-RELAY6-NETS - destination-address:: REGIONAL-LOOPBACKS + destination-address:: REGIONAL-LOOPBACKS6 destination-port:: SSH protocol:: tcp comment:: "accept-ssh-requests-v6" @@ -28,7 +28,7 @@ term accept-snmp-requests-v6{ source-interface:: loopback-1 destination-interface:: loopback-1 source-address:: SNMP-SERVER6-NETS - destination-address:: REGIONAL-LOOPBACKS + destination-address:: REGIONAL-LOOPBACKS6 destination-port:: SNMP protocol:: udp comment:: "accept-snmp-requests-v6" From 96ecb987cfc5fa26f6249f15c1b2bef256364cbb Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Mon, 11 Nov 2024 18:51:56 -0800 Subject: [PATCH 6/7] Uncomment vendor imports --- capirca/aclgen.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/capirca/aclgen.py b/capirca/aclgen.py index 783439b2..4f47cf07 100644 --- a/capirca/aclgen.py +++ b/capirca/aclgen.py @@ -24,36 +24,36 @@ from absl import logging from capirca.lib import aclgenerator from capirca.lib import arista -#from capirca.lib import arista_tp +from capirca.lib import arista_tp from capirca.lib import aruba from capirca.lib import brocade from capirca.lib import cisco from capirca.lib import ciscoasa -#from capirca.lib import cisconx +from capirca.lib import cisconx from capirca.lib import ciscoxr from capirca.lib import cloudarmor from capirca.lib import gce -#from capirca.lib import gce_vpc_tf +from capirca.lib import gce_vpc_tf from capirca.lib import gcp_hf from capirca.lib import fortigate from capirca.lib import fortigatelocalin from capirca.lib import ipset from capirca.lib import iptables from capirca.lib import juniper -#from capirca.lib import juniperevo +from capirca.lib import juniperevo from capirca.lib import junipermsmpc from capirca.lib import junipersrx -#from capirca.lib import k8s +from capirca.lib import k8s from capirca.lib import naming from capirca.lib import nftables from capirca.lib import nsxv -#from capirca.lib import nsxt -#from capirca.lib import openconfig +from capirca.lib import nsxt +from capirca.lib import openconfig from capirca.lib import packetfilter from capirca.lib import paloaltofw from capirca.lib import pcap from capirca.lib import policy -#from capirca.lib import sonic +from capirca.lib import sonic from capirca.lib import speedway from capirca.lib import srxlo from capirca.lib import windows_advfirewall From d6e440ae63d6300cbda16f07c72c2d62221fa24e Mon Sep 17 00:00:00 2001 From: pc48m8n1 Date: Mon, 11 Nov 2024 23:01:50 -0800 Subject: [PATCH 7/7] Rename policy files --- ..._localin_google_ipv4.pol => sample_fortigate_localin_ipv4.pol} | 0 ..._localin_google_ipv6.pol => sample_fortigate_localin_ipv6.pol} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename policies/pol/{sample_fortigate_localin_google_ipv4.pol => sample_fortigate_localin_ipv4.pol} (100%) rename policies/pol/{sample_fortigate_localin_google_ipv6.pol => sample_fortigate_localin_ipv6.pol} (100%) diff --git a/policies/pol/sample_fortigate_localin_google_ipv4.pol b/policies/pol/sample_fortigate_localin_ipv4.pol similarity index 100% rename from policies/pol/sample_fortigate_localin_google_ipv4.pol rename to policies/pol/sample_fortigate_localin_ipv4.pol diff --git a/policies/pol/sample_fortigate_localin_google_ipv6.pol b/policies/pol/sample_fortigate_localin_ipv6.pol similarity index 100% rename from policies/pol/sample_fortigate_localin_google_ipv6.pol rename to policies/pol/sample_fortigate_localin_ipv6.pol