diff --git a/custom_components/hikvision_next/binary_sensor.py b/custom_components/hikvision_next/binary_sensor.py index 0dc81d5..5a104da 100644 --- a/custom_components/hikvision_next/binary_sensor.py +++ b/custom_components/hikvision_next/binary_sensor.py @@ -8,7 +8,7 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback from .const import DATA_ISAPI, DOMAIN, EVENTS -from .isapi import ISAPI, AnalogCamera, EventInfo, IPCamera +from .isapi import EventInfo async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None: @@ -22,39 +22,25 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e # Camera Events for camera in isapi.cameras: for event in camera.supported_events: - entities.append(CameraEventBinarySensor(isapi, camera, event)) + entities.append(EventBinarySensor(isapi, camera.id, event)) # NVR Events if isapi.device_info.is_nvr: - for io_event in isapi.device_info.supported_events: - entities.append(NVREventBinarySensor(isapi, io_event)) + for event in isapi.device_info.supported_events: + entities.append(EventBinarySensor(isapi, 0, event)) async_add_entities(entities) -class CameraEventBinarySensor(BinarySensorEntity): +class EventBinarySensor(BinarySensorEntity): """Event detection sensor.""" _attr_has_entity_name = True _attr_is_on = False - def __init__(self, isapi, camera: AnalogCamera | IPCamera, event: EventInfo) -> None: + def __init__(self, isapi, device_id: int, event: EventInfo) -> None: self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) self._attr_unique_id = self.entity_id - self._attr_name = EVENTS[event.id]["label"] + self._attr_name = f"{EVENTS[event.id]['label']}{' ' + str(event.io_port_id) if event.io_port_id != 0 else ''}" self._attr_device_class = EVENTS[event.id]["device_class"] - self._attr_device_info = isapi.get_device_info(camera.id) - - -class NVREventBinarySensor(BinarySensorEntity): - """IO Event detection sensor.""" - - _attr_has_entity_name = True - _attr_is_on = False - - def __init__(self, isapi: ISAPI, event: EventInfo) -> None: - self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) - self._attr_unique_id = self.entity_id - self._attr_name = f"{EVENTS[event.id]['label']} {event.channel_id}" - self._attr_device_class = EVENTS[event.id]["device_class"] - self._attr_device_info = isapi.get_device_info(0) + self._attr_device_info = isapi.get_device_info(device_id) diff --git a/custom_components/hikvision_next/const.py b/custom_components/hikvision_next/const.py index 214f4ea..b9336a2 100644 --- a/custom_components/hikvision_next/const.py +++ b/custom_components/hikvision_next/const.py @@ -19,13 +19,12 @@ HOLIDAY_MODE_SWITCH_LABEL = "Holiday mode" ALARM_SERVER_SENSOR_LABEL_FORMAT = "Alarm Server {}" -DEVICE_TYPE_IP_CAMERA = "IPCamera" -DEVICE_TYPE_ANALOG_CAMERA = "AnalogCamera" -DEVICE_TYPE_NVR = "NVR" +CONNECTION_TYPE_DIRECT = "Direct" +CONNECTION_TYPE_PROXIED = "Proxied" HIKVISION_EVENT = f"{DOMAIN}_event" EVENT_BASIC: Final = "basic" -EVENT_NVR_BASIC: Final = "nvr_basic" +EVENT_IO: Final = "io" EVENT_SMART: Final = "smart" EVENTS = { "motiondetection": { @@ -81,11 +80,11 @@ "device_class": BinarySensorDeviceClass.MOTION, }, "io": { - "type": EVENT_NVR_BASIC, + "type": EVENT_IO, "label": "Alarm Input", - "channel_attr": "inputIOPortID", - "url_path": "IO/inputs", - "slug": "IOInputPort", + "slug": "inputs", + "direct_node": "IOInputPort", + "proxied_node": "IOProxyInputPort", "device_class": BinarySensorDeviceClass.MOTION } } diff --git a/custom_components/hikvision_next/coordinator.py b/custom_components/hikvision_next/coordinator.py index 0f48118..aad60b3 100644 --- a/custom_components/hikvision_next/coordinator.py +++ b/custom_components/hikvision_next/coordinator.py @@ -57,16 +57,15 @@ async def _async_update_data(self): except Exception as ex: # pylint: disable=broad-except self.isapi.handle_exception(ex, f"Cannot fetch state for {event.id}") - # Get NVR outputs status - if self.isapi.device_info.is_nvr: - for i in range(1, self.isapi.device_info.output_ports + 1): - try: - entity_id = ENTITY_ID_FORMAT.format( - f"{slugify(self.isapi.device_info.serial_no.lower())}_{i}_alarm_output" - ) - data[entity_id] = await self.isapi.get_port_status("output", i) - except Exception as ex: # pylint: disable=broad-except - self.isapi.handle_exception(ex, f"Cannot fetch state for {event.id}") + # Get output port(s) status + for i in range(1, self.isapi.device_info.output_ports + 1): + try: + entity_id = ENTITY_ID_FORMAT.format( + f"{slugify(self.isapi.device_info.serial_no.lower())}_{i}_alarm_output" + ) + data[entity_id] = await self.isapi.get_port_status("output", i) + except Exception as ex: # pylint: disable=broad-except + self.isapi.handle_exception(ex, f"Cannot fetch state for {event.id}") # Refresh HDD data try: diff --git a/custom_components/hikvision_next/diagnostics.py b/custom_components/hikvision_next/diagnostics.py index e7dc975..ef265a2 100644 --- a/custom_components/hikvision_next/diagnostics.py +++ b/custom_components/hikvision_next/diagnostics.py @@ -63,44 +63,36 @@ async def _async_get_diagnostics( info.update({"Entity Data": to_json(coordinator.data)}) # Add raw device info - info.update(await get_isapi_data("RAW Device Info", isapi.isapi.System.deviceInfo, "DeviceInfo")) - - # Add raw camera info - info.update( - await get_isapi_data( - "RAW Analog Camera Info", - isapi.isapi.System.Video.inputs.channels, - "VideoInputChannelList", - ) - ) - info.update( - await get_isapi_data( - "RAW IP Camera Info", - isapi.isapi.ContentMgmt.InputProxy.channels, - "InputProxyChannelList", - ) - ) + info.update(await get_isapi_data("RAW Device Info", isapi.isapi.System.deviceInfo)) + + # Add raw camera info - Direct connected + info.update(await get_isapi_data("RAW Analog Camera Info", isapi.isapi.System.Video.inputs.channels)) + + # Add raw camera info - Proxy connected + info.update(await get_isapi_data("RAW IP Camera Info", isapi.isapi.ContentMgmt.InputProxy.channels)) # Add raw capabilities - info.update(await get_isapi_data("RAW Capabilities Info", isapi.isapi.System.capabilities, "DeviceCap")) + info.update(await get_isapi_data("RAW Capabilities Info", isapi.isapi.System.capabilities)) # Add raw supported events - info.update(await get_isapi_data("RAW Events Info", isapi.isapi.Event.triggers, "")) + info.update(await get_isapi_data("RAW Events Info", isapi.isapi.Event.triggers)) + + # Add IO info - direct connected + info.update(await get_isapi_data("Direct IO Inputs", isapi.isapi.System.IO.inputs)) + info.update(await get_isapi_data("Direct IO Outputs", isapi.isapi.System.IO.outputs)) + + # Add IO info - proxy connected + info.update(await get_isapi_data("Proxied IO Inputs", isapi.isapi.ContentMgmt.IOProxy.inputs)) + info.update(await get_isapi_data("Proxied IO Outputs", isapi.isapi.ContentMgmt.IOProxy.outputs)) # Add raw streams info - info.update(await get_isapi_data("RAW Streams Info", isapi.isapi.Streaming.channels, "StreamingChannelList")) + info.update(await get_isapi_data("RAW Streams Info", isapi.isapi.Streaming.channels)) # Add raw holiday info - info.update(await get_isapi_data("RAW Holiday Info", isapi.isapi.System.Holidays, "HolidayList")) + info.update(await get_isapi_data("RAW Holiday Info", isapi.isapi.System.Holidays)) # Add alarms server info - info.update( - await get_isapi_data( - "RAW Alarm Server Info", - isapi.isapi.Event.notification.httpHosts, - "HttpHostNotificationList", - ) - ) + info.update(await get_isapi_data("RAW Alarm Server Info", isapi.isapi.Event.notification.httpHosts)) return info diff --git a/custom_components/hikvision_next/isapi.py b/custom_components/hikvision_next/isapi.py index 5a1ea9f..8dee28e 100644 --- a/custom_components/hikvision_next/isapi.py +++ b/custom_components/hikvision_next/isapi.py @@ -22,12 +22,11 @@ from homeassistant.util import slugify from .const import ( - DEVICE_TYPE_ANALOG_CAMERA, - DEVICE_TYPE_IP_CAMERA, - DEVICE_TYPE_NVR, + CONNECTION_TYPE_DIRECT, + CONNECTION_TYPE_PROXIED, DOMAIN, EVENT_BASIC, - EVENT_NVR_BASIC, + EVENT_IO, EVENTS, EVENTS_ALTERNATE_ID, MUTEX_ALTERNATE_IDS, @@ -59,6 +58,7 @@ class AlertInfo: """Holds NVR/Camera event notification info""" channel_id: int + io_port_id: int event_id: str device_serial_no: Optional[str] mac: str = "" @@ -78,6 +78,7 @@ class EventInfo: id: str channel_id: int + io_port_id: int unique_id: str url: str notifiers: list[str] = field(default_factory=list) @@ -88,6 +89,7 @@ class SupportedEventsInfo: """Holds supported event info for NVR/IP Camera""" channel_id: int + io_port_id: int event_id: str notifications: list[str] = field(default_factory=list) @@ -155,6 +157,7 @@ class AnalogCamera: model: str serial_no: str input_port: int + connection_type: str streams: list[CameraStreamInfo] = field(default_factory=list) supported_events: list[EventInfo] = field(default_factory=list) @@ -210,7 +213,9 @@ async def get_hardware_info(self): input_ports=int(deep_get(capabilities, "SysCap.IOCap.IOInputPortNums", 0)), output_ports=int(deep_get(capabilities, "SysCap.IOCap.IOOutputPortNums", 0)), storage=await self.get_storage_devices(), - supported_events=await self.get_nvr_event_capabilities(hw_info.get("serialNumber"), self.supported_events) + supported_events=await self.get_device_event_capabilities( + self.supported_events, hw_info.get("serialNumber"), 0 + ) ) await self.get_protocols() @@ -233,10 +238,11 @@ async def get_cameras(self): serial_no=self.device_info.serial_no, firmware=self.device_info.firmware, input_port=1, + connection_type=CONNECTION_TYPE_DIRECT, ip_addr=self.device_info.ip_address, streams=await self.get_camera_streams(1), - supported_events=await self.get_camera_event_capabilities( - self.supported_events, 1, DEVICE_TYPE_IP_CAMERA + supported_events=await self.get_device_event_capabilities( + self.supported_events, self.device_info.serial_no, 1, CONNECTION_TYPE_DIRECT ), ) ) @@ -279,13 +285,15 @@ async def get_cameras(self): serial_no=serial_no, firmware=source.get("firmwareVersion"), input_port=int(source.get("srcInputPort")), + connection_type=CONNECTION_TYPE_PROXIED, ip_addr=source.get("ipAddress"), ip_port=source.get("managePortNo"), streams=await self.get_camera_streams(camera_id), - supported_events=await self.get_camera_event_capabilities( + supported_events=await self.get_device_event_capabilities( self.supported_events, + self.device_info.serial_no, camera_id, - DEVICE_TYPE_IP_CAMERA, + CONNECTION_TYPE_PROXIED, ), ) ) @@ -318,11 +326,13 @@ async def get_cameras(self): model=analog_camera.get("resDesc"), serial_no=device_serial_no, input_port=int(analog_camera.get("inputPort")), + connection_type=CONNECTION_TYPE_DIRECT, streams=await self.get_camera_streams(camera_id), - supported_events=await self.get_camera_event_capabilities( + supported_events=await self.get_device_event_capabilities( self.supported_events, + self.device_info.serial_no, camera_id, - DEVICE_TYPE_ANALOG_CAMERA, + CONNECTION_TYPE_DIRECT, ), ) ) @@ -351,64 +361,46 @@ async def get_protocols(self): except HTTPStatusError: pass - async def get_camera_event_capabilities( + async def get_device_event_capabilities( self, supported_events: list[SupportedEventsInfo], - channel_id: int, - camera_type: str, + serial_no: str, + device_id: int, + connection_type: str = CONNECTION_TYPE_DIRECT, ) -> list[EventInfo]: - """Get events support by camera device and integration""" + """Get events support by device (device id: NVR = 0, camera > 0)""" events = [] - camera_supported_events = [s for s in supported_events if ( - s.channel_id == int(channel_id) - and s.event_id in EVENTS - and EVENTS[s.event_id].get("type") != EVENT_NVR_BASIC - )] + if device_id == 0: # NVR + device_supported_events = [s for s in supported_events if ( + s.event_id in EVENTS and EVENTS[s.event_id].get("type") == EVENT_IO + )] + else: # Camera + device_supported_events = [s for s in supported_events if ( + s.channel_id == int(device_id) + and s.event_id in EVENTS + )] + + for event in device_supported_events: + # Build unique_id + device_id_param = f"_{device_id}" if device_id != 0 else "" + io_port_id_param = f"_{event.io_port_id}" if event.io_port_id != 0 else "" + unique_id = ( + f"{slugify(serial_no.lower())}{device_id_param}{io_port_id_param}_{event.event_id}" + ) - for event in camera_supported_events: if EVENTS.get(event.event_id): event_info = EventInfo( id=event.event_id, channel_id=event.channel_id, - unique_id=f"{slugify(self.device_info.serial_no.lower())}_{channel_id}_{event.event_id}", - url=self.get_event_url( - event.event_id, - channel_id, - self.device_info.is_nvr, - camera_type, - ), + io_port_id=event.io_port_id, + unique_id=unique_id, + url=self.get_event_url(event, connection_type), notifiers=event.notifications, ) events.append(event_info) return events - async def get_nvr_event_capabilities( - self, serial_no: str, supported_events: list[SupportedEventsInfo] - ) -> list[EventInfo]: - """Get events supported by the NVR""" - events = [] - - nvr_supported_events = [s for s in supported_events if ( - s.event_id in EVENTS and EVENTS[s.event_id].get("type") == EVENT_NVR_BASIC - )] - - for event in nvr_supported_events: - event_info = EventInfo( - id=event.event_id, - channel_id=event.channel_id, - unique_id=f"{slugify(serial_no.lower())}_{event.channel_id}_{event.event_id}", - url=self.get_event_url( - event.event_id, - event.channel_id, - True, - DEVICE_TYPE_NVR, - ), - notifiers=event.notifications, - ) - events.append(event_info) - return events - async def get_supported_events_info(self): """Get list of all supported events available""" events = [] @@ -421,17 +413,14 @@ async def get_supported_events_info(self): supported_events = deep_get(event_triggers, "EventTriggerList.EventTrigger") for support_event in supported_events: - event_type = support_event.get("eventType") - channel = support_event.get( - "videoInputChannelID", - support_event.get("dynVideoInputChannelID", support_event.get("inputIOPortID", 0)), - ) notifications = support_event.get("EventTriggerNotificationList", {}) - # Fix for empty EventTriggerNotificationList in IP camera if not notifications: continue + event_type = support_event.get("eventType") + channel = support_event.get("videoInputChannelID", support_event.get("dynVideoInputChannelID", 0)) + io_port = support_event.get("inputIOPortID", support_event.get("dynInputIOPortID", 0)) notifications = notifications.get("EventTriggerNotification", []) if not isinstance(notifications, list): @@ -444,6 +433,7 @@ async def get_supported_events_info(self): events.append( SupportedEventsInfo( channel_id=int(channel), + io_port_id=int(io_port), event_id=event_type.lower(), notifications=[notify.get("notificationMethod") for notify in notifications] if notifications @@ -453,25 +443,30 @@ async def get_supported_events_info(self): return events - def get_event_url(self, event_id: str, channel_id: int, is_nvr: bool, device_type: str) -> str: + def get_event_url(self, event: SupportedEventsInfo, connection_type: str) -> str: """Get event ISAPI URL.""" - event_type = EVENTS[event_id]["type"] - url_path = EVENTS[event_id].get("url_path") - slug = EVENTS[event_id]["slug"] if not url_path else url_path - - if is_nvr and device_type == DEVICE_TYPE_IP_CAMERA and event_type == EVENT_BASIC: - # ISAPI/ContentMgmt/InputProxy/channels/{channel_id}/video/{event} - url = f"ContentMgmt/InputProxy/channels/{channel_id}/video/{slug}" - elif (is_nvr and device_type == DEVICE_TYPE_ANALOG_CAMERA or not is_nvr) and event_type == EVENT_BASIC: - # ISAPI/System/Video/inputs/channels/{channel_id}/{event} - url = f"System/Video/inputs/channels/{channel_id}/{slug}" - elif is_nvr and device_type == DEVICE_TYPE_NVR and event_type == EVENT_NVR_BASIC: - # ISAPI/System/{event}/inputs/{channel_id} - url = f"System/{slug}/{channel_id}" + event_type = EVENTS[event.event_id]["type"] + slug = EVENTS[event.event_id]["slug"] + + if event_type == EVENT_BASIC: + if connection_type == CONNECTION_TYPE_PROXIED: + # ISAPI/ContentMgmt/InputProxy/channels/{channel_id}/video/{event} + url = f"ContentMgmt/InputProxy/channels/{event.channel_id}/video/{slug}" + else: + # ISAPI/System/Video/inputs/channels/{channel_id}/{event} + url = f"System/Video/inputs/channels/{event.channel_id}/{slug}" + + elif event_type == EVENT_IO: + if connection_type == CONNECTION_TYPE_PROXIED: + # ISAPI/ContentMgmt/IOProxy/{slug}/{channel_id} + url = f"ContentMgmt/IOProxy/{slug}/{event.io_port_id}" + else: + # ISAPI/System/IO/{slug}}/{channel_id} + url = f"System/IO/{slug}/{event.io_port_id}" else: # ISAPI/Smart/{event}/{channel_id} - url = f"Smart/{slug}/{channel_id}" + url = f"Smart/{slug}/{event.channel_id}" return url async def get_camera_streams(self, channel_id: int) -> list[CameraStreamInfo]: @@ -524,22 +519,22 @@ async def get_storage_devices(self): _LOGGER.debug("%s/ISAPI/ContentMgmt/Storage %s", self.isapi.host, storage_info) for storage in storage_info: - storage = storage.get("hdd") - if not isinstance(storage, list): - storage = [storage] - if storage: - for hdd in storage: - storage_list.append( - HDDInfo( - id=int(hdd.get("id")), - name=hdd.get("hddName"), - type=hdd.get("hddType"), - status=hdd.get("status"), - capacity=int(hdd.get("capacity")), - freespace=int(hdd.get("freeSpace")), - property=hdd.get("property"), + if storage := storage.get("hdd"): + if not isinstance(storage, list): + storage = [storage] + if storage: + for hdd in storage: + storage_list.append( + HDDInfo( + id=int(hdd.get("id")), + name=hdd.get("hddName"), + type=hdd.get("hddType"), + status=hdd.get("status"), + capacity=int(hdd.get("capacity")), + freespace=int(hdd.get("freeSpace")), + property=hdd.get("property"), + ) ) - ) return storage_list @@ -551,30 +546,6 @@ def get_storage_device_by_id(self, device_id: int) -> HDDInfo | None: # Storage id does not exist return None - async def get_port_status(self, port_type: str, port_no: int) -> str: - """Get status of physical ports""" - if port_type == "input": - status = await self.isapi.System.IO.inputs[port_no].status(method=GET) - _LOGGER.debug("%s/ISAPI/System/IO/inputs/%s/status %s", self.isapi.host, port_no, status) - else: - status = await self.isapi.System.IO.outputs[port_no].status(method=GET) - _LOGGER.debug("%s/ISAPI/System/IO/outputs/%s/status %s", self.isapi.host, port_no, status) - - if status.get("IOPortStatus"): - return status["IOPortStatus"].get("ioState") - - async def set_port_state(self, port_no: int, turn_on: bool): - """Set status of output port""" - data = {} - if turn_on: - data["IOPortData"] = {"outputState": "high"} - else: - data["IOPortData"] = {"outputState": "low"} - - xml = xmltodict.unparse(data) - response = await self.isapi.System.IO.outputs[port_no].trigger(method=PUT, data=xml) - _LOGGER.debug("[PUT] %s/ISAPI/System/IO/outputs/%s/trigger %s", self.isapi.host, port_no, response) - def get_device_info(self, device_id: int = 0) -> DeviceInfo: """Return device registry information.""" if device_id == 0: @@ -599,13 +570,30 @@ def get_device_info(self, device_id: int = 0) -> DeviceInfo: via_device=(DOMAIN, self.device_info.serial_no) if self.device_info.is_nvr else None, ) + def get_event_state_node(self, event: EventInfo) -> str: + """Get xml key for event state""" + slug = EVENTS[event.id]["slug"] + + # Alternate node name for some event types + if event.channel_id == 0: # NVR + if EVENTS[event.id].get("direct_node"): + slug = EVENTS[event.id]["direct_node"] + else: + camera = self.get_camera_by_id(event.channel_id) + if camera.connection_type == CONNECTION_TYPE_DIRECT and EVENTS[event.id].get("direct_node"): + slug = EVENTS[event.id]["direct_node"] + + if camera.connection_type == CONNECTION_TYPE_PROXIED and EVENTS[event.id].get("proxied_node"): + slug = EVENTS[event.id]["proxied_node"] + + node = slug[0].upper() + slug[1:] + return node + async def get_event_enabled_state(self, event: EventInfo) -> bool: """Get event detection state.""" - state = await self.request(GET, event.url) - slug = EVENTS[event.id]["slug"] - node = slug[0].upper() + slug[1:] - return str_to_bool(state[node]["enabled"]) + node = self.get_event_state_node(event) + return str_to_bool(state[node].get("enabled", False)) if state.get(node) else False async def get_event_switch_mutex(self, event: EventInfo, channel_id: int) -> list[MutexIssue]: """Get if event is mutually exclusive with enabled events""" @@ -647,14 +635,13 @@ async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_en # Validate that this event switch is not mutually exclusive with another enabled one mutex_issues = [] - if is_enabled and self.device_info.support_event_mutex_checking: + if channel_id != 0 and is_enabled and self.device_info.support_event_mutex_checking: mutex_issues = await self.get_event_switch_mutex(event, channel_id) if not mutex_issues: data = await self.request(GET, event.url) _LOGGER.debug("%s/ISAPI/%s %s", self.isapi.host, event.url, data) - slug = EVENTS[event.id]["slug"] - node = slug[0].upper() + slug[1:] + node = self.get_event_state_node(event) new_state = bool_to_str(is_enabled) if new_state == data[node]["enabled"]: return @@ -667,6 +654,30 @@ async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_en f"You cannot enable {EVENTS[event.id]['label']} events. Please disable {EVENTS[mutex_issues[0].event_id]['label']} on channels {mutex_issues[0].channels} first" ) + async def get_port_status(self, port_type: str, port_no: int) -> str: + """Get status of physical ports""" + if port_type == "input": + status = await self.isapi.System.IO.inputs[port_no].status(method=GET) + _LOGGER.debug("%s/ISAPI/System/IO/inputs/%s/status %s", self.isapi.host, port_no, status) + else: + status = await self.isapi.System.IO.outputs[port_no].status(method=GET) + _LOGGER.debug("%s/ISAPI/System/IO/outputs/%s/status %s", self.isapi.host, port_no, status) + + if status.get("IOPortStatus"): + return status["IOPortStatus"].get("ioState") + + async def set_port_state(self, port_no: int, turn_on: bool): + """Set status of output port""" + data = {} + if turn_on: + data["IOPortData"] = {"outputState": "high"} + else: + data["IOPortData"] = {"outputState": "low"} + + xml = xmltodict.unparse(data) + response = await self.isapi.System.IO.outputs[port_no].trigger(method=PUT, data=xml) + _LOGGER.debug("[PUT] %s/ISAPI/System/IO/outputs/%s/trigger %s", self.isapi.host, port_no, response) + async def get_holiday_enabled_state(self, holiday_index=0) -> bool: """Get holiday state""" @@ -800,7 +811,6 @@ def parse_event_notification(xml: str) -> AlertInfo: xml = xml.replace("&", "&") data = xmltodict.parse(xml) - alert = data["EventNotificationAlert"] event_id = alert.get("eventType") @@ -808,27 +818,22 @@ def parse_event_notification(xml: str) -> AlertInfo: # None: _LOGGER.debug("Alert: %s", alert) serial_no = self.isapi.device_info.serial_no.lower() - unique_id = f"binary_sensor.{slugify(serial_no)}_{alert.channel_id}" f"_{alert.event_id}" + + device_id_param = f"_{alert.channel_id}" if alert.channel_id != 0 else "" + io_port_id_param = f"_{alert.io_port_id}" if alert.io_port_id != 0 else "" + unique_id = ( + f"binary_sensor.{slugify(serial_no)}{device_id_param}{io_port_id_param}_{alert.event_id}" + ) + + _LOGGER.debug("UNIQUE_ID: %s", unique_id) + + # unique_id = f"binary_sensor.{slugify(serial_no)}_{alert.channel_id}" f"_{alert.event_id}" entity_registry = async_get(self.hass) entity_id = entity_registry.async_get_entity_id(Platform.BINARY_SENSOR, DOMAIN, unique_id) if entity_id: @@ -156,11 +165,14 @@ def trigger_sensor(self, xml: str) -> None: def fire_hass_event(self, alert: AlertInfo): """Fire HASS event""" + camera_name = "" + if camera := self.isapi.get_camera_by_id(alert.channel_id): + camera_name = camera.name - camera = self.isapi.get_camera_by_id(alert.channel_id) message = { "channel_id": alert.channel_id, - "camera_name": camera.name, + "io_port_id": alert.io_port_id, + "camera_name": camera_name, "event_id": alert.event_id, } diff --git a/custom_components/hikvision_next/switch.py b/custom_components/hikvision_next/switch.py index b578c77..89ebcd1 100644 --- a/custom_components/hikvision_next/switch.py +++ b/custom_components/hikvision_next/switch.py @@ -31,77 +31,43 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_e secondary_coordinator = config.get(SECONDARY_COORDINATOR) entities = [] + + # Camera supported events for camera in events_coordinator.isapi.cameras: for event in camera.supported_events: - entities.append(CameraEventSwitch(camera, event, events_coordinator)) + entities.append(EventSwitch(camera.id, event, events_coordinator)) + # NVR supported events if events_coordinator.isapi.device_info.is_nvr: for event in events_coordinator.isapi.device_info.supported_events: - entities.append(NVREventSwitch(event, events_coordinator)) + entities.append(EventSwitch(0, event, events_coordinator)) - for i in range(1, events_coordinator.isapi.device_info.output_ports + 1): - entities.append(NVROutputSwitch(events_coordinator, i)) + # Output port switch + for i in range(1, events_coordinator.isapi.device_info.output_ports + 1): + entities.append(NVROutputSwitch(events_coordinator, i)) + # Holiday mode switch if secondary_coordinator.isapi.device_info.support_holiday_mode: entities.append(HolidaySwitch(secondary_coordinator)) async_add_entities(entities) -class CameraEventSwitch(CoordinatorEntity, SwitchEntity): - """Detection events switch.""" - - _attr_has_entity_name = True - _attr_icon = "mdi:eye-outline" - - def __init__(self, camera, event: EventInfo, coordinator) -> None: - super().__init__(coordinator) - self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) - self._attr_unique_id = self.entity_id - self._attr_device_info = coordinator.isapi.get_device_info(camera.id) - self._attr_name = EVENT_SWITCH_LABEL_FORMAT.format(EVENTS[event.id]["label"]) - self.camera = camera - self.event = event - - @property - def is_on(self) -> bool | None: - return self.coordinator.data.get(self.entity_id) - - async def async_turn_on(self, **kwargs: Any) -> None: - try: - await self.coordinator.isapi.set_event_enabled_state(self.camera.id, self.event, True) - except Exception as ex: - raise ex - finally: - await self.coordinator.async_request_refresh() - - async def async_turn_off(self, **kwargs: Any) -> None: - try: - await self.coordinator.isapi.set_event_enabled_state(self.camera.id, self.event, False) - except Exception as ex: - raise ex - finally: - await self.coordinator.async_request_refresh() - - @property - def extra_state_attributes(self): - attrs = {} - attrs["notify_HA"] = True if "center" in self.event.notifiers else False - return attrs - - -class NVREventSwitch(CoordinatorEntity, SwitchEntity): +class EventSwitch(CoordinatorEntity, SwitchEntity): """Detection events switch.""" _attr_has_entity_name = True _attr_icon = "mdi:eye-outline" - def __init__(self, event: EventInfo, coordinator) -> None: + def __init__(self, device_id: int, event: EventInfo, coordinator) -> None: super().__init__(coordinator) self.entity_id = ENTITY_ID_FORMAT.format(event.unique_id) self._attr_unique_id = self.entity_id - self._attr_device_info = coordinator.isapi.get_device_info(0) - self._attr_name = EVENT_SWITCH_LABEL_FORMAT.format(f"{EVENTS[event.id]['label']} {event.channel_id}") + self._attr_device_info = coordinator.isapi.get_device_info(device_id) + self._attr_name = EVENT_SWITCH_LABEL_FORMAT.format( + f"{EVENTS[event.id]['label']}{' ' + str(event.io_port_id) if event.io_port_id != 0 else ''}" + ) + self.device_id = device_id self.event = event @property @@ -110,7 +76,7 @@ def is_on(self) -> bool | None: async def async_turn_on(self, **kwargs: Any) -> None: try: - await self.coordinator.isapi.set_event_enabled_state(self.event.channel_id, self.event, True) + await self.coordinator.isapi.set_event_enabled_state(self.device_id, self.event, True) except Exception as ex: raise ex finally: @@ -118,7 +84,7 @@ async def async_turn_on(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None: try: - await self.coordinator.isapi.set_event_enabled_state(self.event.channel_id, self.event, False) + await self.coordinator.isapi.set_event_enabled_state(self.device_id, self.event, False) except Exception as ex: raise ex finally: