Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial debugging for issue #149 #152

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions custom_components/hikvision_next/isapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class AlarmServer:
"""Holds alarm server info"""

# Uses pylint invalid names to not break previous versions
id: int # pylint: disable=invalid-name
ipAddress: str # pylint: disable=invalid-name
portNo: int # pylint: disable=invalid-name
url: str # pylint: disable=invalid-name
Expand Down Expand Up @@ -141,6 +142,7 @@ class HikDeviceInfo:
support_alarm_server: bool = False
support_channel_zero: bool = False
support_event_mutex_checking: bool = False
support_event_subscription: bool = False
input_ports: int = 0
output_ports: int = 0
rtsp_port: int = 554
Expand Down Expand Up @@ -214,6 +216,7 @@ async def get_hardware_info(self):
self.device_info.support_holiday_mode = deep_get(capabilities, "SysCap.isSupportHolidy", False)
self.device_info.support_channel_zero = deep_get(capabilities, "RacmCap.isSupportZeroChan", False)
self.device_info.support_event_mutex_checking = capabilities.get("isSupportGetmutexFuncErrMsg", False)
self.device_info.support_event_subscription = deep_get(capabilities, "SysCap.isSupportSubscribeEvent", False)
self.device_info.input_ports = int(deep_get(capabilities, "SysCap.IOCap.IOInputPortNums", 0))
self.device_info.output_ports = int(deep_get(capabilities, "SysCap.IOCap.IOOutputPortNums", 0))

Expand All @@ -223,6 +226,10 @@ async def get_hardware_info(self):
)
self.device_info.support_alarm_server = bool(await self.get_alarm_server())

_LOGGER.debug(f"support_event_subscription: {self.device_info.support_event_subscription}")
_LOGGER.debug(f"support_alarm_server: {self.device_info.support_alarm_server}")
_LOGGER.debug(f"supported_events: {self.supported_events}")

await self.get_protocols()

# Set if NVR based on whether more than 1 supported IP or analog cameras
Expand Down Expand Up @@ -648,6 +655,7 @@ async def set_event_enabled_state(self, channel_id: int, event: EventInfo, is_en
return
data[node]["enabled"] = new_state
xml = xmltodict.unparse(data)
_LOGGER.debug(f"[PUT] {self.isapi.host}/ISAPI/{event.url} request data is {xml}")
response = await self.request(PUT, event.url, data=xml)
_LOGGER.debug("[PUT] %s/ISAPI/%s %s", self.isapi.host, event.url, response)
else:
Expand Down Expand Up @@ -717,9 +725,41 @@ def _get_event_notification_host(self, data: Node) -> Node:
# <HttpHostNotificationList xmlns="http://www.isapi.org/ver20/XMLSchema">
return hosts

async def _get_alarm_server_capabilities(self) -> None:
"""Get capability set of HTTP listening server."""

try:
data = await self.isapi.Event.notification.httpHosts.capabilities(method=GET)
except HTTPStatusError as ex:
_LOGGER.error(f"Cannot GET {self.isapi.host}/ISAPI/Event/notification/httpHosts/capabilities. {ex}")
return

_LOGGER.debug("%s/ISAPI/Event/notification/httpHosts/capabilities %s", self.isapi.host, data)

async def _test_alarm_server_by_id(self, id: int) -> None:
"""Detect whether the listening server is working normally."""

try:
data = await self.isapi.Event.notification.httpHosts[id](method=GET)
except HTTPStatusError as ex:
_LOGGER.error(f"Cannot GET {self.isapi.host}/ISAPI/Event/notification/httpHosts/{id}. {ex}")
return None
_LOGGER.debug("%s/ISAPI/Event/notification/httpHosts/%d %s", self.isapi.host, id, data)

xml = xmltodict.unparse(data)
_LOGGER.debug(f"[POST] {self.isapi.host}/ISAPI/Event/notification/httpHosts/{id}/test request data is {xml}")
try:
data = await self.isapi.Event.notification.httpHosts[id].test(method=POST, data=xml)
except HTTPStatusError as ex:
_LOGGER.error(f"Cannot POST {self.isapi.host}/ISAPI/Event/notification/httpHosts/{id}/test. {ex}")
return

_LOGGER.debug("[POST] %s/ISAPI/Event/notification/httpHosts/%d/test %s", self.isapi.host, id, data)

async def get_alarm_server(self) -> AlarmServer | None:
"""Get event notifications listener server URL."""

await self._get_alarm_server_capabilities()
try:
data = await self.isapi.Event.notification.httpHosts(method=GET)
except HTTPStatusError:
Expand All @@ -730,6 +770,7 @@ async def get_alarm_server(self) -> AlarmServer | None:
host = self._get_event_notification_host(data)

alarm_server = AlarmServer(
id=int(host.get("id")),
ipAddress=host.get("ipAddress"),
portNo=int(host.get("portNo")),
url=host.get("url"),
Expand All @@ -741,6 +782,7 @@ async def get_alarm_server(self) -> AlarmServer | None:
async def set_alarm_server(self, base_url: str, path: str) -> None:
"""Set event notifications listener server."""

await self._get_alarm_server_capabilities()
address = urlparse(base_url)
data = await self.isapi.Event.notification.httpHosts(method=GET)
_LOGGER.debug("%s/ISAPI/Event/notification/httpHosts %s", self.isapi.host, data)
Expand All @@ -751,6 +793,7 @@ async def set_alarm_server(self, base_url: str, path: str) -> None:
and host.get("portNo") == str(address.port)
and host["url"] == path
):
await self._test_alarm_server_by_id(int(host.get("id")))
return
host["url"] = path
host["protocolType"] = address.scheme.upper()
Expand All @@ -761,12 +804,15 @@ async def set_alarm_server(self, base_url: str, path: str) -> None:
host["httpAuthenticationMethod"] = "none"

xml = xmltodict.unparse(data)
_LOGGER.debug(f"[PUT] {self.isapi.host}/ISAPI/Event/notification/httpHosts request data is {xml}")
response = await self.isapi.Event.notification.httpHosts(method=PUT, data=xml)
_LOGGER.debug(
"[PUT] %s/ISAPI/Event/notification/httpHosts %s",
self.isapi.host,
response,
)
await self._get_alarm_server_capabilities()
await self._test_alarm_server_by_id(int(host.get("id")))

async def request(self, method: str, url: str, present: str = "dict", **data) -> Any:
"""Send request"""
Expand Down