Skip to content

Commit

Permalink
All enlinkd model types
Browse files Browse the repository at this point in the history
Tested processing bridge/cdp/lldp
  • Loading branch information
mmahacek committed Jan 10, 2024
1 parent 5de0ccd commit ace7307
Showing 1 changed file with 189 additions and 16 deletions.
205 changes: 189 additions & 16 deletions pyonms/models/enlinkd.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,143 @@ class Element:
pass


@dataclass
class BridgeLinkRemoteNode:
"Bridge Link Remote Node"
bridgeRemote: str = None
bridgeRemoteUrl: str = None
bridgeRemotePort: str = None
bridgeRemotePortUrl: str = None
bridgeRemoteNodeId: int = None
bridgeRemoteMAC: str = None
bridgeRemoteIfIndex: int = None
bridgeRemoteIP: str = None

def __post_init__(self):
if self.bridgeRemoteUrl:
if remote_node := infer_node_from_url(url=self.bridgeRemoteUrl):
self.bridgeRemoteNodeId = remote_node
if self.bridgeRemotePort:
if remote_ifindex := infer_ifindex(text=self.bridgeRemotePort):
self.bridgeRemoteIfIndex = remote_ifindex
if remote_ip := infer_ip(text=self.bridgeRemotePort):
self.bridgeRemoteIP = remote_ip
if self.bridgeRemote:
if remote_mac := infer_mac(text=self.bridgeRemote):
self.bridgeRemoteMAC = remote_mac


@dataclass
class BridgeLink(Link):
"Bridge Link"
bridgeLocalPort: str = None
bridgeLocalPortUrl: str = None
bridgeInfo: str = None
bridgeLinkCreateTime: Union[str, datetime] = None
bridgeLinkLastPollTime: Union[str, datetime] = None
BridgeLinkRemoteNodes: List[BridgeLinkRemoteNode] = field(default_factory=list)
linkType = LinkType.BRIDGE
bridgeLocalNodeId: int = None
bridgeLocalMAC: str = None
bridgeLocalIfIndex: int = None

def __post_init__(self):
if isinstance(self.bridgeLinkCreateTime, str):
self.bridgeLinkCreateTime = utils.convert_link_time(
self.bridgeLinkCreateTime
)
if isinstance(self.bridgeLinkLastPollTime, str):
self.bridgeLinkLastPollTime = utils.convert_link_time(
self.bridgeLinkLastPollTime
)
if self.BridgeLinkRemoteNodes:
if isinstance(self.BridgeLinkRemoteNodes[0], dict):
remotes = []
for remote in self.BridgeLinkRemoteNodes:
remotes.append(BridgeLinkRemoteNode(**remote))
self.BridgeLinkRemoteNodes = remotes
if self.bridgeLocalPortUrl:
if local_node := infer_node_from_url(url=self.bridgeLocalPortUrl):
self.bridgeLocalNodeId = local_node
if self.bridgeLocalPort:
if local_mac := infer_mac(self.bridgeLocalPort):
self.bridgeLocalMAC = local_mac
if local_ifindex := infer_ifindex(self.bridgeLocalPort):
self.bridgeLocalIfIndex = local_ifindex


@dataclass
class CdpLink(Link):
"CDP Link"
cdpLocalPort: str = None
cdpLocalPortUrl: str = None
cdpCacheDevice: str = None
cdpCacheDeviceUrl: str = None
cdpCacheDevicePort: str = None
cdpCacheDevicePortUrl: str = None
cdpCachePlatform: str = None
cdpCreateTime: Union[str, datetime] = None
cdpLastPollTime: Union[str, datetime] = None
linkType = LinkType.CDP
cdpLocalNodeId: int = None
cdpCacheDeviceNodeId: int = None
cdpLocalIfIndex: int = None
cdpCacheDeviceIfIndex: int = None
cdpCacheDeviceIP: str = None

def __post_init__(self):
if isinstance(self.cdpCreateTime, str):
self.cdpCreateTime = utils.convert_link_time(self.cdpCreateTime)
if isinstance(self.cdpLastPollTime, str):
self.cdpLastPollTime = utils.convert_link_time(self.cdpLastPollTime)
if self.cdpLocalPortUrl:
if local_node := infer_node_from_url(url=self.cdpLocalPortUrl):
self.cdpLocalNodeId = local_node
if self.cdpCacheDeviceUrl:
if remote_node := infer_node_from_url(url=self.cdpCacheDeviceUrl):
self.cdpCacheDeviceNodeId = remote_node
if self.cdpLocalPort:
if local_ifindex := infer_ifindex(self.cdpLocalPort):
self.cdpLocalIfIndex = local_ifindex
if self.cdpCacheDevicePort:
if remote_ifindex := infer_ifindex(self.cdpCacheDevicePort):
self.cdpCacheDeviceIfIndex = remote_ifindex
if remote_ip := infer_ip(self.cdpCacheDevicePort):
self.cdpCacheDeviceIP = remote_ip


@dataclass
class IsIsLink(Link):
"IS-IS Link"
isisCircIfIndex: int = None
isisCircAdminState: str = None
isisISAdjNeighSysID: str = None
isisISAdjNeighSysType: str = None
isisISAdjNeighSysUrl: str = None
isisISAdjNeighSNPAAddress: str = None
isisISAdjNeighPort: str = None
isisISAdjState: str = None
isisISAdjNbrExtendedCircID: int = None
isisISAdjUrl: str = None
isisLinkCreateTime: Union[str, datetime] = None
isisLinkLastPollTime: Union[str, datetime] = None
linkType = LinkType.ISIS
isisLocalNodeId: int = None
isisRemNodeId: int = None
isisLocalMAC: str = None
isisRemMAC: str = None
isisLocalIfIndex: int = None
isisRemIfIndex: int = None

def __post_init__(self):
if isinstance(self.isisLinkCreateTime, str):
self.isisLinkCreateTime = utils.convert_link_time(self.isisLinkCreateTime)
if isinstance(self.isisLinkLastPollTime, str):
self.isisLinkLastPollTime = utils.convert_link_time(
self.isisLinkLastPollTime
)


@dataclass
class LldpLink(Link):
"LLDP Link"
Expand Down Expand Up @@ -75,6 +212,35 @@ def __post_init__(self):
self.lldpRemIfIndex = remote_ifindex


@dataclass
class OspfLink(Link):
"OSPF Link"
ospfLocalPort: str = None
ospfLocalPortUrl: str = None
ospfRemRouterId: str = None
ospfRemRouterUrl: str = None
ospfRemPort: str = None
ospfRemPortUrl: str = None
ospfLinkInfo: str = None
ospfLinkCreateTime: Union[str, datetime] = None
ospfLinkLastPollTime: Union[str, datetime] = None
linkType = LinkType.OSPF
ospfLocalNodeId: int = None
ospfRemNodeId: int = None
ospfLocalMAC: str = None
ospfRemMAC: str = None
ospfLocalIfIndex: int = None
ospfRemIfIndex: int = None

def __post_init__(self):
if isinstance(self.ospfLinkCreateTime, str):
self.ospfLinkCreateTime = utils.convert_link_time(self.ospfLinkCreateTime)
if isinstance(self.ospfLinkLastPollTime, str):
self.ospfLinkLastPollTime = utils.convert_link_time(
self.ospfLinkLastPollTime
)


@dataclass
class BridgeElement(Element):
"Bridge Element"
Expand Down Expand Up @@ -172,16 +338,30 @@ def __post_init__(self):
@dataclass
class Topology:
"Topology class"
lldp_links: List[LldpLink] = field(default_factory=list)
lldp_elements: List[LldpElement] = field(default_factory=list)
bridge_links: List[Link] = field(default_factory=list)
bridge_links: List[BridgeLink] = field(default_factory=list)
bridge_elements: List[BridgeElement] = field(default_factory=list)
cdp_links: List[Link] = field(default_factory=list)
cdp_links: List[CdpLink] = field(default_factory=list)
cdp_elements: List[CdpElement] = field(default_factory=list)
ospf_links: List[Link] = field(default_factory=list)
ospf_elements: List[OspfElement] = field(default_factory=list)
isis_links: List[Link] = field(default_factory=list)
isis_links: List[IsIsLink] = field(default_factory=list)
isis_elements: List[IsisElement] = field(default_factory=list)
lldp_links: List[LldpLink] = field(default_factory=list)
lldp_elements: List[LldpElement] = field(default_factory=list)
ospf_links: List[OspfLink] = field(default_factory=list)
ospf_elements: List[OspfElement] = field(default_factory=list)


def infer_ifindex(text: str) -> int:
"Infer ifIndex from string"
ifindex = re.search(r"^.*\(ifindex:(\d+)\).*", text)
if ifindex:
return int(ifindex.group(1))


def infer_ip(text: str) -> str:
"Infer IP address from string"
ip = re.search(r"^.*\((((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4})\).*", text)
if ip:
return ip.group(1)


def infer_node_from_url(url: str) -> int:
Expand All @@ -193,13 +373,6 @@ def infer_node_from_url(url: str) -> int:

def infer_mac(text: str) -> str:
"Infer MAC address from string"
mac = re.search(r"^.*\(macAddress:([0-9a-fA-F]+)\).*", text)
mac = re.search(r"^.*\((mac|macAddress):([0-9A-Fa-f]+)\).*", text)
if mac:
return mac.group(1)


def infer_ifindex(text: str) -> int:
"Infer ifIndex from string"
ifindex = re.search(r"^.*\(ifindex:(\d+)\).*", text)
if ifindex:
return int(ifindex.group(1))
return mac.group(2)

0 comments on commit ace7307

Please sign in to comment.