From 986a730d310369eefd287101caf1debd83e0b8c5 Mon Sep 17 00:00:00 2001 From: woohung Date: Sun, 23 Feb 2025 20:09:58 +0300 Subject: [PATCH] feat: Add support for NetBox v4.x in Annet (related to annetutil/annet#247) Co-authored-by: Artem Kotik --- annet/adapters/netbox/common/models.py | 4 +- annet/adapters/netbox/common/storage_base.py | 344 +++++++++++++++++++ annet/adapters/netbox/provider.py | 27 +- annet/adapters/netbox/v24/models.py | 295 ++++++++++++++++ annet/adapters/netbox/v24/storage.py | 2 +- annet/adapters/netbox/v37/models.py | 21 ++ annet/adapters/netbox/v37/storage.py | 323 +---------------- annet/adapters/netbox/v41/__init__.py | 0 annet/adapters/netbox/v41/models.py | 39 +++ annet/adapters/netbox/v41/storage.py | 14 + annet/adapters/netbox/v42/__init__.py | 0 annet/adapters/netbox/v42/models.py | 20 ++ annet/adapters/netbox/v42/storage.py | 14 + 13 files changed, 778 insertions(+), 325 deletions(-) create mode 100644 annet/adapters/netbox/common/storage_base.py create mode 100644 annet/adapters/netbox/v24/models.py create mode 100644 annet/adapters/netbox/v37/models.py create mode 100644 annet/adapters/netbox/v41/__init__.py create mode 100644 annet/adapters/netbox/v41/models.py create mode 100644 annet/adapters/netbox/v41/storage.py create mode 100644 annet/adapters/netbox/v42/__init__.py create mode 100644 annet/adapters/netbox/v42/models.py create mode 100644 annet/adapters/netbox/v42/storage.py diff --git a/annet/adapters/netbox/common/models.py b/annet/adapters/netbox/common/models.py index bca6be79..3ed8bcf5 100644 --- a/annet/adapters/netbox/common/models.py +++ b/annet/adapters/netbox/common/models.py @@ -53,7 +53,7 @@ def _dump__list_key(self): class Prefix(DumpableView): id: int prefix: str - site: Optional[Entity] + # `site` deprecated since v4.2, replace in derived classes. vrf: Optional[Entity] tenant: Optional[Entity] vlan: Optional[Entity] @@ -168,7 +168,7 @@ class NetboxDevice(Entity): display: str device_type: DeviceType - device_role: Entity + # `device_role` deprecated since v4.0, replace in derived classes. tenant: Optional[Entity] platform: Optional[Entity] serial: str diff --git a/annet/adapters/netbox/common/storage_base.py b/annet/adapters/netbox/common/storage_base.py new file mode 100644 index 00000000..f5da84e3 --- /dev/null +++ b/annet/adapters/netbox/common/storage_base.py @@ -0,0 +1,344 @@ +import ssl +from ipaddress import ip_interface +from logging import getLogger +from typing import Any, Optional, List, Union, Dict, cast + +from adaptix import P +from adaptix.conversion import impl_converter, link, link_constant +from annetbox.v37 import models as api_models + +from annet.adapters.netbox.common import models +from annet.adapters.netbox.common.manufacturer import ( + get_hw, get_breed, +) +from annet.adapters.netbox.common.query import NetboxQuery, FIELD_VALUE_SEPARATOR +from annet.adapters.netbox.common.storage_opts import NetboxStorageOpts +from annet.annlib.netdev.views.hardware import HardwareView +from annet.storage import Storage, Device, Interface + + +logger = getLogger(__name__) + + +class BaseNetboxStorage(Storage): + """ + Base class for Netbox storage adapters. + Attributes: + netbox_class: The Netbox class to use for API interactions from Annetbox. + api_models: The API models used by the storage from Annetbox. + device_model: The model for Netbox devices. + prefix_model: The model for Netbox prefixes. + interface_model: The model for Netbox interfaces. + ipaddress_model: The model for Netbox IP addresses. + """ + netbox_class = None + api_models = api_models + device_model = models.NetboxDevice + prefix_model = models.Prefix + interface_model = models.Interface + ipaddress_model = models.IpAddress + + def __init__(self, opts: Optional[NetboxStorageOpts] = None): + ctx: Optional[ssl.SSLContext] = None + url = "" + token = "" + self.exact_host_filter = False + threads = 1 + if opts: + if opts.insecure: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + url = opts.url + token = opts.token + threads = opts.threads + self.exact_host_filter = opts.exact_host_filter + + if self.netbox_class is None: + raise ValueError("netbox_class is not set in the derived class") + self.netbox = self.netbox_class(url=url, token=token, ssl_context=ctx, threads=threads) + self._initialize_impl_converter_methods() + self._all_fqdns: Optional[list[str]] = None + self._id_devices: dict[int, self.device_model] = {} + self._name_devices: dict[str, self.device_model] = {} + self._short_name_devices: dict[str, self.device_model] = {} + + def _initialize_impl_converter_methods(self): + @impl_converter(recipe=[ + link(P[self.api_models.Device].name, P[self.device_model].hostname), + link(P[self.api_models.Device].name, P[self.device_model].fqdn), + ]) + def extend_device_base( + device: self.api_models.Device, + interfaces: List[self.interface_model], + hw: Optional[HardwareView], + breed: str, + storage: Storage, + ) -> self.device_model: + ... + + def extend_device( + device: self.api_models.Device, + interfaces: List[self.interface_model], + storage: Storage, + ) -> self.device_model: + platform_name: str = "" + breed: str = "" + hw = HardwareView("", "") + if device.platform: + platform_name = device.platform.name + if device.device_type and device.device_type.manufacturer: + breed = get_breed( + device.device_type.manufacturer.name, + device.device_type.model, + ) + hw = get_hw( + device.device_type.manufacturer.name, + device.device_type.model, + platform_name, + ) + res = extend_device_base( + device=device, + interfaces=interfaces, + breed=breed, + hw=hw, + storage=storage, + ) + return res + + + @impl_converter( + recipe=[link_constant(P[self.interface_model].lag_min_links, value=None)], + ) + def extend_interface( + interface: self.api_models.Interface, + ip_addresses: List[self.ipaddress_model], + ) -> self.interface_model: + ... + + + @impl_converter + def extend_ip_address( + ip_address: self.ipaddress_model, prefix: Optional[self.prefix_model], + ) -> self.ipaddress_model: + ... + + self.extend_device_base = extend_device_base + self.extend_device = extend_device + self.extend_interface = extend_interface + self.extend_ip_address = extend_ip_address + + def __enter__(self): + return self + + def __exit__(self, _, __, ___): + pass + + def resolve_object_ids_by_query(self, query: NetboxQuery): + return [ + d.id for d in self._load_devices(query) + ] + + def resolve_fdnds_by_query(self, query: NetboxQuery): + return [ + d.name for d in self._load_devices(query) + ] + + def resolve_all_fdnds(self) -> list[str]: + if self._all_fqdns is None: + self._all_fqdns = [ + d.name + for d in self.netbox.dcim_all_devices_brief().results + ] + return self._all_fqdns + + def make_devices( + self, + query: Union[NetboxQuery, list], + preload_neighbors=False, + use_mesh=None, + preload_extra_fields=False, + **kwargs, + ) -> List[device_model]: + if isinstance(query, list): + query = NetboxQuery.new(query) + + devices = [] + if query.is_host_query(): + globs = [] + for glob in query.globs: + if glob in self._name_devices: + devices.append(self._name_devices[glob]) + if glob in self._short_name_devices: + devices.append(self._short_name_devices[glob]) + else: + globs.append(glob) + if not globs: + return devices + query = NetboxQuery.new(globs) + + return devices + self._make_devices( + query=query, + preload_neighbors=preload_neighbors, + use_mesh=use_mesh, + preload_extra_fields=preload_extra_fields, + **kwargs + ) + + def _make_devices( + self, + query: NetboxQuery, + preload_neighbors=False, + use_mesh=None, + preload_extra_fields=False, + **kwargs, + ) -> List[device_model]: + device_ids = { + device.id: self.extend_device( + device=device, + interfaces=[], + storage=self, + ) + for device in self._load_devices(query) + } + if not device_ids: + return [] + + for device in device_ids.values(): + self._record_device(device) + + interfaces = self._load_interfaces(list(device_ids)) + for interface in interfaces: + device_ids[interface.device.id].interfaces.append(interface) + + return list(device_ids.values()) + + def _record_device(self, device: device_model): + self._id_devices[device.id] = device + self._short_name_devices[device.name] = device + if not self.exact_host_filter: + short_name = device.name.split(".")[0] + self._short_name_devices[short_name] = device + + def _load_devices(self, query: NetboxQuery) -> List[api_models.Device]: + if not query.globs: + return [] + query_groups = parse_glob(self.exact_host_filter, query) + return [ + device + for device in self.netbox.dcim_all_devices(**query_groups).results + if _match_query(self.exact_host_filter, query, device) + ] + + def _extend_interfaces(self, interfaces: List[interface_model]) -> List[interface_model]: + extended_ifaces = { + interface.id: self.extend_interface(interface, []) + for interface in interfaces + } + + ips = self.netbox.ipam_all_ip_addresses(interface_id=list(extended_ifaces)) + ip_to_cidrs: Dict[str, str] = {ip.address: str(ip_interface(ip.address).network) for ip in ips.results} + prefixes = self.netbox.ipam_all_prefixes(prefix=list(ip_to_cidrs.values())) + cidr_to_prefix: Dict[str, models.Prefix] = {x.prefix: x for x in prefixes.results} + + for ip in ips.results: + cidr = ip_to_cidrs[ip.address] + ip = self.extend_ip_address(ip, prefix=cidr_to_prefix.get(cidr)) + extended_ifaces[ip.assigned_object_id].ip_addresses.append(ip) + return list(extended_ifaces.values()) + + def _load_interfaces(self, device_ids: List[int]) -> List[interface_model]: + interfaces = self.netbox.dcim_all_interfaces(device_id=device_ids) + return self._extend_interfaces(interfaces.results) + + def _load_interfaces_by_id(self, ids: List[int]) -> List[interface_model]: + interfaces = self.netbox.dcim_all_interfaces_by_id(id=ids) + return self._extend_interfaces(interfaces.results) + + def get_device( + self, obj_id, preload_neighbors=False, use_mesh=None, + **kwargs, + ) -> device_model: + if obj_id in self._id_devices: + return self._id_devices[obj_id] + + device = self.netbox.dcim_device(obj_id) + interfaces = self._load_interfaces([device.id]) + + res = self.extend_device( + device=device, + storage=self, + interfaces=interfaces, + ) + self._record_device(res) + return res + + def flush_perf(self): + pass + + def search_connections(self, device: Device, neighbor: Device) -> list[tuple[Interface, Interface]]: + if device.storage is not self: + raise ValueError("device does not belong to this storage") + if neighbor.storage is not self: + raise ValueError("neighbor does not belong to this storage") + # both devices are NetboxDevice if they are loaded from this storage + res = [] + for local_port in device.interfaces: + if not local_port.connected_endpoints: + continue + for endpoint in local_port.connected_endpoints: + if endpoint.device.id == neighbor.id: + for remote_port in neighbor.interfaces: + if remote_port.name == endpoint.name: + res.append((local_port, remote_port)) + break + return res + + +def _match_query(exact_host_filter: bool, query: NetboxQuery, device_data: api_models.Device) -> bool: + """ + Additional filtering after netbox due to limited backend logic. + """ + if exact_host_filter: + return True # nothing to check, all filtering is done by netbox + hostnames = [subquery.strip() for subquery in query.globs if FIELD_VALUE_SEPARATOR not in subquery] + if not hostnames: + return True # no hostnames to check + + short_name = device_data.name.split(".")[0] + for hostname in hostnames: + hostname = hostname.strip().rstrip(".") + if short_name == hostname or device_data.name == hostname: + return True + return False + + +def _hostname_dot_hack(raw_query: str) -> str: + # there is no proper way to lookup host by its hostname + # ie find "host" with fqdn "host.example.com" + # besides using name__ic (ie startswith) + # since there is no direct analogue for this field in netbox + # so we need to add a dot to hostnames (top-level fqdn part) + # so we would not receive devices with a common name prefix + def add_dot(raw_query: Any) -> Any: + if isinstance(raw_query, str) and "." not in raw_query: + raw_query = raw_query + "." + return raw_query + + if isinstance(raw_query, list): + for i, name in enumerate(raw_query): + raw_query[i] = add_dot(name) + elif isinstance(raw_query, str): + raw_query = add_dot(raw_query) + + return raw_query + + +def parse_glob(exact_host_filter: bool, query: NetboxQuery) -> dict[str, list[str]]: + query_groups = cast(dict[str, list[str]], query.parse_query()) + if names := query_groups.pop("name", None): + if exact_host_filter: + query_groups["name__ie"] = names + else: + query_groups["name__ic"] = [_hostname_dot_hack(name) for name in names] + return query_groups diff --git a/annet/adapters/netbox/provider.py b/annet/adapters/netbox/provider.py index e3849556..3e647e44 100644 --- a/annet/adapters/netbox/provider.py +++ b/annet/adapters/netbox/provider.py @@ -1,6 +1,6 @@ from typing import Dict, Any, Optional -from dataclass_rest.exceptions import ClientError +from dataclass_rest.exceptions import ClientError, ClientLibraryError from annet.storage import StorageProvider, Storage from annet.connectors import AdapterWithName, AdapterWithConfig, T @@ -9,21 +9,34 @@ from .common.query import NetboxQuery from .v24.storage import NetboxStorageV24 from .v37.storage import NetboxStorageV37 +from .v41.storage import NetboxStorageV41 +from .v42.storage import NetboxStorageV42 def storage_factory(opts: NetboxStorageOpts) -> Storage: client = NetboxStatusClient(opts.url, opts.token, opts.insecure) + version_class_map = { + "3.": NetboxStorageV37, + "4.0": NetboxStorageV41, + "4.1": NetboxStorageV41, + "4.2": NetboxStorageV42, + } + + status = None + try: status = client.status() + for version_prefix, storage_class in version_class_map.items(): + if status.netbox_version.startswith(version_prefix): + return storage_class(opts) + except ClientError as e: if e.status_code == 404: - # old version do not support status reqeust return NetboxStorageV24(opts) - raise - if status.netbox_version.startswith("3."): - return NetboxStorageV37(opts) - else: - raise ValueError(f"Unsupported version: {status.netbox_version}") + else: + raise ValueError(f"Unsupported version: {status.netbox_version}") + except ClientLibraryError: + raise ValueError(f"Connection error: Unable to reach Netbox at URL: {opts.url}") class NetboxProvider(StorageProvider, AdapterWithName, AdapterWithConfig): diff --git a/annet/adapters/netbox/v24/models.py b/annet/adapters/netbox/v24/models.py new file mode 100644 index 00000000..bca6be79 --- /dev/null +++ b/annet/adapters/netbox/v24/models.py @@ -0,0 +1,295 @@ +from dataclasses import dataclass, field +from datetime import datetime, timezone +from ipaddress import ip_interface, IPv6Interface +from typing import List, Optional, Any, Dict, Sequence, Callable + +from annet.annlib.netdev.views.dump import DumpableView +from annet.annlib.netdev.views.hardware import HardwareView, lag_name, svi_name +from annet.storage import Storage + + +@dataclass +class Entity(DumpableView): + id: int + name: str + + @property + def _dump__list_key(self): + return self.name + + +@dataclass +class Label: + value: str + label: str + + +@dataclass +class IpFamily: + value: int + label: str + + +@dataclass +class DeviceType: + id: int + manufacturer: Entity + model: str + + +@dataclass +class DeviceIp(DumpableView): + id: int + display: str + address: str + family: int + + @property + def _dump__list_key(self): + return self.address + + +@dataclass +class Prefix(DumpableView): + id: int + prefix: str + site: Optional[Entity] + vrf: Optional[Entity] + tenant: Optional[Entity] + vlan: Optional[Entity] + role: Optional[Entity] + status: Label + is_pool: bool + custom_fields: dict[str, Any] + created: datetime + last_updated: datetime + description: Optional[str] = "" + + @property + def _dump__list_key(self): + return self.prefix + + +@dataclass +class IpAddress(DumpableView): + id: int + assigned_object_id: int + display: str + family: IpFamily + address: str + status: Label + tags: List[Entity] + created: datetime + last_updated: datetime + prefix: Optional[Prefix] = None + vrf: Optional[Entity] = None + + @property + def _dump__list_key(self): + return self.address + + +@dataclass +class InterfaceConnectedEndpoint(Entity): + device: Entity + + +@dataclass +class InterfaceType: + value: str + label: str + + +@dataclass +class InterfaceMode: + value: str + label: str + + +@dataclass +class InterfaceVlan(Entity): + vid: int + + +@dataclass +class Interface(Entity): + device: Entity + enabled: bool + description: str + type: InterfaceType + connected_endpoints: Optional[list[InterfaceConnectedEndpoint]] + mode: Optional[InterfaceMode] + untagged_vlan: Optional[InterfaceVlan] + tagged_vlans: Optional[List[InterfaceVlan]] + display: str = "" + ip_addresses: List[IpAddress] = field(default_factory=list) + vrf: Optional[Entity] = None + mtu: int | None = None + lag: Entity | None = None + lag_min_links: int | None = None + + def add_addr(self, address_mask: str, vrf: str | None) -> None: + addr = ip_interface(address_mask) + if vrf is None: + vrf_obj = None + else: + vrf_obj = Entity(id=0, name=vrf) + + if isinstance(addr, IPv6Interface): + family = IpFamily(value=6, label="IPv6") + else: + family = IpFamily(value=4, label="IPv4") + + for existing_addr in self.ip_addresses: + if existing_addr.address == address_mask and ( + (existing_addr.vrf is None and vrf is None) or + (existing_addr.vrf is not None and existing_addr.vrf.name == vrf) + ): + return + self.ip_addresses.append(IpAddress( + id=0, + display=address_mask, + address=address_mask, + vrf=vrf_obj, + prefix=None, + family=family, + created=datetime.now(timezone.utc), + last_updated=datetime.now(timezone.utc), + tags=[], + status=Label(value="active", label="Active"), + assigned_object_id=self.id, + )) + + +@dataclass +class NetboxDevice(Entity): + url: str + storage: Storage + + display: str + device_type: DeviceType + device_role: Entity + tenant: Optional[Entity] + platform: Optional[Entity] + serial: str + asset_tag: Optional[str] + site: Entity + rack: Optional[Entity] + position: Optional[float] + face: Optional[Label] + status: Label + primary_ip: Optional[DeviceIp] + primary_ip4: Optional[DeviceIp] + primary_ip6: Optional[DeviceIp] + tags: List[Entity] + custom_fields: Dict[str, Any] + created: datetime + last_updated: datetime + + fqdn: str + hostname: str + hw: Optional[HardwareView] + breed: str + + interfaces: List[Interface] + + @property + def neighbors(self) -> List["Entity"]: + return [ + endpoint.device + for iface in self.interfaces + if iface.connected_endpoints + for endpoint in iface.connected_endpoints + if endpoint.device + ] + + # compat + @property + def neighbours_fqdns(self) -> list[str]: + return [dev.name for dev in self.neighbors] + + @property + def neighbours_ids(self): + return [dev.id for dev in self.neighbors] + + def __hash__(self): + return hash((self.id, type(self))) + + def __eq__(self, other): + return type(self) is type(other) and self.url == other.url + + def is_pc(self) -> bool: + custom_breed_pc = ("Mellanox", "NVIDIA", "Moxa", "Nebius") + return self.device_type.manufacturer.name in custom_breed_pc or self.breed == "pc" + + def _make_interface(self, name: str, type: InterfaceType) -> Interface: + return Interface( + name=name, + device=self, + enabled=True, + description="", + type=type, + id=0, + vrf=None, + display=name, + untagged_vlan=None, + tagged_vlans=[], + ip_addresses=[], + connected_endpoints=[], + mode=None, + ) + + def _lag_name(self, lag: int) -> str: + return lag_name(self.hw, lag) + + def make_lag(self, lag: int, ports: Sequence[str], lag_min_links: int | None) -> Interface: + new_name = self._lag_name(lag) + for target_interface in self.interfaces: + if target_interface.name == new_name: + return target_interface + lag_interface = self._make_interface( + name=new_name, + type=InterfaceType(value="lag", label="Link Aggregation Group (LAG)"), + ) + lag_interface.lag_min_links = lag_min_links + for interface in self.interfaces: + if interface.name in ports: + interface.lag = lag_interface + self.interfaces.append(lag_interface) + return lag_interface + + def _svi_name(self, svi: int) -> str: + return svi_name(self.hw, svi) + + def add_svi(self, svi: int) -> Interface: + name = self._svi_name(svi) + for interface in self.interfaces: + if interface.name == name: + return interface + interface = self._make_interface( + name=name, + type=InterfaceType("virtual", "Virtual") + ) + self.interfaces.append(interface) + return interface + + def _subif_name(self, interface: str, subif: int) -> str: + return f"{interface}.{subif}" + + def add_subif(self, interface: str, subif: int) -> Interface: + name = self._subif_name(interface, subif) + for target_port in self.interfaces: + if target_port.name == name: + return target_port + target_port = self._make_interface( + name=name, + type=InterfaceType("virtual", "Virtual") + ) + self.interfaces.append(target_port) + return target_port + + def find_interface(self, name: str) -> Optional[Interface]: + for iface in self.interfaces: + if iface.name == name: + return iface + return None diff --git a/annet/adapters/netbox/v24/storage.py b/annet/adapters/netbox/v24/storage.py index 46b1dd08..a5842a28 100644 --- a/annet/adapters/netbox/v24/storage.py +++ b/annet/adapters/netbox/v24/storage.py @@ -4,7 +4,7 @@ from annetbox.v24 import models as api_models from annetbox.v24.client_sync import NetboxV24 -from annet.adapters.netbox.common import models +from annet.adapters.netbox.v24 import models from annet.adapters.netbox.common.manufacturer import ( get_hw, get_breed, ) diff --git a/annet/adapters/netbox/v37/models.py b/annet/adapters/netbox/v37/models.py new file mode 100644 index 00000000..0306c13d --- /dev/null +++ b/annet/adapters/netbox/v37/models.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass +from typing import Optional +from annet.adapters.netbox.common.models import IpAddress, NetboxDevice, Entity, Prefix + + +@dataclass +class PrefixV37(Prefix): + site: Optional[Entity] = None + + +@dataclass +class IpAddressV37(IpAddress): + prefix: Optional[PrefixV37] = None + + +@dataclass +class NetboxDeviceV37(NetboxDevice): + device_role: Entity + + def __hash__(self): + return hash((self.id, type(self))) diff --git a/annet/adapters/netbox/v37/storage.py b/annet/adapters/netbox/v37/storage.py index 56cb2e45..fbcaaf18 100644 --- a/annet/adapters/netbox/v37/storage.py +++ b/annet/adapters/netbox/v37/storage.py @@ -1,321 +1,14 @@ -import ssl -from collections import defaultdict -from ipaddress import ip_interface -from logging import getLogger -from typing import Any, Optional, List, Union, Dict, cast - -from adaptix import P -from adaptix.conversion import impl_converter, link, link_constant from annetbox.v37 import models as api_models from annetbox.v37.client_sync import NetboxV37 -from annet.adapters.netbox.common import models -from annet.adapters.netbox.common.manufacturer import ( - get_hw, get_breed, -) -from annet.adapters.netbox.common.query import NetboxQuery, FIELD_VALUE_SEPARATOR -from annet.adapters.netbox.common.storage_opts import NetboxStorageOpts -from annet.annlib.netdev.views.hardware import HardwareView -from annet.storage import Storage, Device, Interface - -logger = getLogger(__name__) - - -@impl_converter(recipe=[ - link(P[api_models.Device].name, P[models.NetboxDevice].hostname), - link(P[api_models.Device].name, P[models.NetboxDevice].fqdn), -]) -def extend_device_base( - device: api_models.Device, - interfaces: List[models.Interface], - hw: Optional[HardwareView], - breed: str, - storage: Storage, -) -> models.NetboxDevice: - ... - - -def extend_device( - device: api_models.Device, - interfaces: List[models.Interface], - storage: Storage, -) -> models.NetboxDevice: - platform_name: str = "" - breed: str = "" - hw = HardwareView("", "") - if device.platform: - platform_name = device.platform.name - if device.device_type and device.device_type.manufacturer: - breed = get_breed( - device.device_type.manufacturer.name, - device.device_type.model, - ) - hw = get_hw( - device.device_type.manufacturer.name, - device.device_type.model, - platform_name, - ) - res = extend_device_base( - device=device, - interfaces=interfaces, - breed=breed, - hw=hw, - storage=storage, - ) - return res - - -@impl_converter( - recipe=[link_constant(P[models.Interface].lag_min_links, value=None)], -) -def extend_interface( - interface: api_models.Interface, - ip_addresses: List[models.IpAddress], -) -> models.Interface: - ... - - -@impl_converter -def extend_ip_address( - ip_address: models.IpAddress, prefix: Optional[models.Prefix], -) -> models.IpAddress: - ... - - -class NetboxStorageV37(Storage): - def __init__(self, opts: Optional[NetboxStorageOpts] = None): - ctx: Optional[ssl.SSLContext] = None - url = "" - token = "" - self.exact_host_filter = False - threads = 1 - if opts: - if opts.insecure: - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - url = opts.url - token = opts.token - threads = opts.threads - self.exact_host_filter = opts.exact_host_filter - - self.netbox = NetboxV37(url=url, token=token, ssl_context=ctx, threads=threads) - self._all_fqdns: Optional[list[str]] = None - self._id_devices: dict[int, models.NetboxDevice] = {} - self._name_devices: dict[str, models.NetboxDevice] = {} - self._short_name_devices: dict[str, models.NetboxDevice] = {} - - def __enter__(self): - return self - - def __exit__(self, _, __, ___): - pass - - def resolve_object_ids_by_query(self, query: NetboxQuery): - return [ - d.id for d in self._load_devices(query) - ] - - def resolve_fdnds_by_query(self, query: NetboxQuery): - return [ - d.name for d in self._load_devices(query) - ] - - def resolve_all_fdnds(self) -> list[str]: - if self._all_fqdns is None: - self._all_fqdns = [ - d.name - for d in self.netbox.dcim_all_devices_brief().results - ] - return self._all_fqdns - - def make_devices( - self, - query: Union[NetboxQuery, list], - preload_neighbors=False, - use_mesh=None, - preload_extra_fields=False, - **kwargs, - ) -> List[models.NetboxDevice]: - if isinstance(query, list): - query = NetboxQuery.new(query) - - devices = [] - if query.is_host_query(): - globs = [] - for glob in query.globs: - if glob in self._name_devices: - devices.append(self._name_devices[glob]) - if glob in self._short_name_devices: - devices.append(self._short_name_devices[glob]) - else: - globs.append(glob) - if not globs: - return devices - query = NetboxQuery.new(globs) - - return devices + self._make_devices( - query=query, - preload_neighbors=preload_neighbors, - use_mesh=use_mesh, - preload_extra_fields=preload_extra_fields, - **kwargs - ) - - def _make_devices( - self, - query: NetboxQuery, - preload_neighbors=False, - use_mesh=None, - preload_extra_fields=False, - **kwargs, - ) -> List[models.NetboxDevice]: - device_ids = { - device.id: extend_device( - device=device, - interfaces=[], - storage=self, - ) - for device in self._load_devices(query) - } - if not device_ids: - return [] - - for device in device_ids.values(): - self._record_device(device) - - interfaces = self._load_interfaces(list(device_ids)) - for interface in interfaces: - device_ids[interface.device.id].interfaces.append(interface) - - return list(device_ids.values()) - - def _record_device(self, device: models.NetboxDevice): - self._id_devices[device.id] = device - self._short_name_devices[device.name] = device - if not self.exact_host_filter: - short_name = device.name.split(".")[0] - self._short_name_devices[short_name] = device - - def _load_devices(self, query: NetboxQuery) -> List[api_models.Device]: - if not query.globs: - return [] - query_groups = parse_glob(self.exact_host_filter, query) - return [ - device - for device in self.netbox.dcim_all_devices(**query_groups).results - if _match_query(self.exact_host_filter, query, device) - ] - - def _extend_interfaces(self, interfaces: List[models.Interface]) -> List[models.Interface]: - extended_ifaces = { - interface.id: extend_interface(interface, []) - for interface in interfaces - } - - ips = self.netbox.ipam_all_ip_addresses(interface_id=list(extended_ifaces)) - ip_to_cidrs: Dict[str, str] = {ip.address: str(ip_interface(ip.address).network) for ip in ips.results} - prefixes = self.netbox.ipam_all_prefixes(prefix=list(ip_to_cidrs.values())) - cidr_to_prefix: Dict[str, models.Prefix] = {x.prefix: x for x in prefixes.results} - - for ip in ips.results: - cidr = ip_to_cidrs[ip.address] - ip = extend_ip_address(ip, prefix=cidr_to_prefix.get(cidr)) - extended_ifaces[ip.assigned_object_id].ip_addresses.append(ip) - return list(extended_ifaces.values()) - - def _load_interfaces(self, device_ids: List[int]) -> List[models.Interface]: - interfaces = self.netbox.dcim_all_interfaces(device_id=device_ids) - return self._extend_interfaces(interfaces.results) - - def _load_interfaces_by_id(self, ids: List[int]) -> List[models.Interface]: - interfaces = self.netbox.dcim_all_interfaces_by_id(id=ids) - return self._extend_interfaces(interfaces.results) - - def get_device( - self, obj_id, preload_neighbors=False, use_mesh=None, - **kwargs, - ) -> models.NetboxDevice: - if obj_id in self._id_devices: - return self._id_devices[obj_id] - - device = self.netbox.dcim_device(obj_id) - interfaces = self._load_interfaces([device.id]) - - res = extend_device( - device=device, - storage=self, - interfaces=interfaces, - ) - self._record_device(res) - return res - - def flush_perf(self): - pass - - def search_connections(self, device: Device, neighbor: Device) -> list[tuple[Interface, Interface]]: - if device.storage is not self: - raise ValueError("device does not belong to this storage") - if neighbor.storage is not self: - raise ValueError("neighbor does not belong to this storage") - # both devices are NetboxDevice if they are loaded from this storage - res = [] - for local_port in device.interfaces: - if not local_port.connected_endpoints: - continue - for endpoint in local_port.connected_endpoints: - if endpoint.device.id == neighbor.id: - for remote_port in neighbor.interfaces: - if remote_port.name == endpoint.name: - res.append((local_port, remote_port)) - break - return res - - -def _match_query(exact_host_filter: bool, query: NetboxQuery, device_data: api_models.Device) -> bool: - """ - Additional filtering after netbox due to limited backend logic. - """ - if exact_host_filter: - return True # nothing to check, all filtering is done by netbox - hostnames = [subquery.strip() for subquery in query.globs if FIELD_VALUE_SEPARATOR not in subquery] - if not hostnames: - return True # no hostnames to check - - short_name = device_data.name.split(".")[0] - for hostname in hostnames: - hostname = hostname.strip().rstrip(".") - if short_name == hostname or device_data.name == hostname: - return True - return False - - -def _hostname_dot_hack(raw_query: str) -> str: - # there is no proper way to lookup host by its hostname - # ie find "host" with fqdn "host.example.com" - # besides using name__ic (ie startswith) - # since there is no direct analogue for this field in netbox - # so we need to add a dot to hostnames (top-level fqdn part) - # so we would not receive devices with a common name prefix - def add_dot(raw_query: Any) -> Any: - if isinstance(raw_query, str) and "." not in raw_query: - raw_query = raw_query + "." - return raw_query - - if isinstance(raw_query, list): - for i, name in enumerate(raw_query): - raw_query[i] = add_dot(name) - elif isinstance(raw_query, str): - raw_query = add_dot(raw_query) +from annet.adapters.netbox.common.storage_base import BaseNetboxStorage +from annet.adapters.netbox.v37.models import IpAddressV37, NetboxDeviceV37, PrefixV37 - return raw_query +class NetboxStorageV37(BaseNetboxStorage): + netbox_class = NetboxV37 + api_models = api_models + device_model = NetboxDeviceV37 + prefix_model = PrefixV37 + ipaddress_model = IpAddressV37 -def parse_glob(exact_host_filter: bool, query: NetboxQuery) -> dict[str, list[str]]: - query_groups = cast(dict[str, list[str]], query.parse_query()) - if names := query_groups.pop("name", None): - if exact_host_filter: - query_groups["name__ie"] = names - else: - query_groups["name__ic"] = [_hostname_dot_hack(name) for name in names] - return query_groups diff --git a/annet/adapters/netbox/v41/__init__.py b/annet/adapters/netbox/v41/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/annet/adapters/netbox/v41/models.py b/annet/adapters/netbox/v41/models.py new file mode 100644 index 00000000..ab60e446 --- /dev/null +++ b/annet/adapters/netbox/v41/models.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass +from typing import Optional + +from annet.adapters.netbox.common.models import Entity, IpAddress, NetboxDevice, DeviceIp, IpFamily, Prefix + + +@dataclass +class PrefixV41(Prefix): + site: Optional[Entity] = None + + +@dataclass +class IpAddressV41(IpAddress): + prefix: Optional[PrefixV41] = None + + +@dataclass +class DeviceRole: + id: int + url: str + + +@dataclass +class DeviceIpV41(DeviceIp): + id: int + display: str + address: str + family: IpFamily + + +@dataclass +class NetboxDeviceV41(NetboxDevice): + role: DeviceRole + primary_ip: Optional[DeviceIpV41] + primary_ip4: Optional[DeviceIpV41] + primary_ip6: Optional[DeviceIpV41] + + def __hash__(self): + return hash((self.id, type(self))) diff --git a/annet/adapters/netbox/v41/storage.py b/annet/adapters/netbox/v41/storage.py new file mode 100644 index 00000000..e6da16b0 --- /dev/null +++ b/annet/adapters/netbox/v41/storage.py @@ -0,0 +1,14 @@ +from annetbox.v41.client_sync import NetboxV41 +from annetbox.v41 import models as api_models + +from annet.adapters.netbox.v41.models import IpAddressV41, NetboxDeviceV41, PrefixV41 +from annet.adapters.netbox.common.storage_base import BaseNetboxStorage + + +class NetboxStorageV41(BaseNetboxStorage): + netbox_class = NetboxV41 + api_models = api_models + device_model = NetboxDeviceV41 + prefix_model = PrefixV41 + ipaddress_model = IpAddressV41 + diff --git a/annet/adapters/netbox/v42/__init__.py b/annet/adapters/netbox/v42/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/annet/adapters/netbox/v42/models.py b/annet/adapters/netbox/v42/models.py new file mode 100644 index 00000000..06ab36f0 --- /dev/null +++ b/annet/adapters/netbox/v42/models.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass +from typing import Optional +from annet.adapters.netbox.common.models import Prefix, IpAddress, Entity +from annet.adapters.netbox.v41.models import NetboxDeviceV41 + + +@dataclass +class PrefixV42(Prefix): + scope: Optional[Entity] = None + + +@dataclass +class IpAddressV42(IpAddress): + prefix: Optional[PrefixV42] = None + + +@dataclass +class NetboxDeviceV42(NetboxDeviceV41): + def __hash__(self): + return hash((self.id, type(self))) diff --git a/annet/adapters/netbox/v42/storage.py b/annet/adapters/netbox/v42/storage.py new file mode 100644 index 00000000..79354900 --- /dev/null +++ b/annet/adapters/netbox/v42/storage.py @@ -0,0 +1,14 @@ +from annetbox.v42.client_sync import NetboxV42 +from annetbox.v42 import models as api_models + +from annet.adapters.netbox.common.storage_base import BaseNetboxStorage +from annet.adapters.netbox.v41.models import NetboxDeviceV41 +from annet.adapters.netbox.v42.models import PrefixV42, IpAddressV42 + + +class NetboxStorageV42(BaseNetboxStorage): + netbox_class = NetboxV42 + api_models = api_models + device_model = NetboxDeviceV41 + prefix_model = PrefixV42 + ipaddress_model = IpAddressV42