diff --git a/custom_components/hkc_alarm/sensor.py b/custom_components/hkc_alarm/sensor.py index 27edebb..4a31f4b 100644 --- a/custom_components/hkc_alarm/sensor.py +++ b/custom_components/hkc_alarm/sensor.py @@ -12,14 +12,14 @@ from .const import DOMAIN, DEFAULT_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL from homeassistant.helpers.update_coordinator import CoordinatorEntity - - _logger = logging.getLogger(__name__) + class HKCSensor(CoordinatorEntity): _panel_data = {} # Class variable shared among all instances _update_lock = asyncio.Lock() # Lock to ensure only one update at a time _last_update = datetime.min # Initialize with the earliest possible datetime + _panel_time_offset = None def __init__(self, hkc_alarm, input_data, coordinator): super().__init__(coordinator) # Ensure the coordinator is properly initialized @@ -27,10 +27,19 @@ def __init__(self, hkc_alarm, input_data, coordinator): self._input_data = input_data self.coordinator = coordinator + @property + def extra_state_attributes(self): + """Return the state attributes of the alarm sensors""" + if HKCSensor._panel_time_offset is None: + return None + + attributes = {"Panel Offset": round(HKCSensor._panel_time_offset)} + return attributes + @property def unique_id(self): """Return the unique ID of the sensor.""" - return self._hkc_alarm.panel_id + str(self._input_data['inputId']) + return self._hkc_alarm.panel_id + str(self._input_data["inputId"]) @property def device_info(self): @@ -54,15 +63,30 @@ def state(self): return "Unused" # Parse panel time - panel_time_str = self._panel_data.get('display', '') + panel_time_str = self._panel_data.get("display", "") try: panel_time = datetime.strptime(panel_time_str, "%a %d %b %H:%M") except ValueError: _logger.debug( - f"Failed to parse panel time: {panel_time_str} for sensor {self.name}. Setting state to 'Unknown'." + f"Failed to parse panel time: {panel_time_str} for sensor {self.name}. Falling back to previously known panel offset." ) - return "Unknown" # Return an unknown state if panel time parsing fails + # Fallback to previously known panel offset + panel_time_offset = HKCSensor._panel_time_offset + + if panel_time_offset is None: + # No previously known panel offset, we're stuck + return "Unknown" + + # Round the offset value + panel_offset_in_minutes = round(panel_time_offset) + + # Calculate new panel_time + current_time = datetime.now() + panel_time = current_time - timedelta(minutes=panel_offset_in_minutes) + + # Convert panel_time back to string format + panel_time_str = panel_time.strftime("%a %d %b %H:%M") # Ensure Panel Time is in UTC current_year = datetime.utcnow().year @@ -72,12 +96,16 @@ def state(self): try: sensor_timestamp = datetime.strptime( self._input_data["timestamp"], "%Y-%m-%dT%H:%M:%SZ" - ).replace(tzinfo=pytz.UTC) # Ensure sensor_timestamp is treated as UTC + ).replace( + tzinfo=pytz.UTC + ) # Ensure sensor_timestamp is treated as UTC except ValueError: try: sensor_timestamp = datetime.strptime( self._input_data["timestamp"], "%Y-%m-%dT%H:%M:%S" - ).replace(tzinfo=pytz.UTC) # Ensure sensor_timestamp is treated as UTC + ).replace( + tzinfo=pytz.UTC + ) # Ensure sensor_timestamp is treated as UTC except ValueError: _logger.debug( f"Failed to parse timestamp: {self._input_data['timestamp']} for sensor {self.name}. Setting state to 'Unknown'." @@ -86,6 +114,15 @@ def state(self): time_difference = panel_time - sensor_timestamp + # Get panel offset. + current_time = datetime.now(pytz.UTC) + time_difference = current_time - panel_time + + # Calculate the difference in minutes + minutes_difference = time_difference.total_seconds() / 60 + + HKCSensor._panel_time_offset = minutes_difference + # Handle cases where the timestamp is very old or invalid if time_difference > timedelta(days=365): _logger.debug( @@ -108,7 +145,6 @@ def state(self): _logger.debug(f"Sensor {self.name} state determined as 'Closed'.") return "Closed" - @property def name(self): return self._input_data["description"] @@ -123,19 +159,24 @@ def _handle_coordinator_update(self) -> None: """Handle updated data from the coordinator.""" # Search for the matching sensor data based on inputId matching_sensor_data = next( - (sensor_data for sensor_data in self.coordinator.data if sensor_data.get('inputId') == self._input_data.get('inputId')), - None # Default to None if no matching sensor data is found + ( + sensor_data + for sensor_data in self.coordinator.data + if sensor_data.get("inputId") == self._input_data.get("inputId") + ), + None, # Default to None if no matching sensor data is found ) if matching_sensor_data is not None: # Update self._input_data with the matching sensor data self._input_data = matching_sensor_data else: - _logger.warning(f"No matching sensor data found for inputId {self._input_data.get('inputId')}") + _logger.warning( + f"No matching sensor data found for inputId {self._input_data.get('inputId')}" + ) self.async_write_ha_state() # Update the state with the latest data - @classmethod async def update_panel_data(cls, hkc_alarm, hass): async with cls._update_lock: @@ -161,7 +202,8 @@ async def _async_fetch_data(): try: hkc_alarm = hass.data[DOMAIN][entry.entry_id]["hkc_alarm"] await HKCSensor.update_panel_data(hkc_alarm, hass) - data = await hass.async_add_executor_job(hkc_alarm.get_all_inputs) + hkc_alarm.panel_offset = HKCSensor._panel_time_offset + data = await hass.async_add_executor_job(hkc_alarm.get_all_inputs) return data except Exception as e: _logger.error(f"Exception occurred while fetching HKC data: {e}") @@ -192,8 +234,15 @@ async def async_force_refresh(service_call): all_inputs = coordinator.data # Filter out the inputs with empty description - filtered_inputs = [input_data for input_data in all_inputs if input_data['description']] + filtered_inputs = [ + input_data for input_data in all_inputs if input_data["description"] + ] async_add_entities( - [HKCSensor(hass.data[DOMAIN][entry.entry_id]["hkc_alarm"], input_data, coordinator) for input_data in filtered_inputs], + [ + HKCSensor( + hass.data[DOMAIN][entry.entry_id]["hkc_alarm"], input_data, coordinator + ) + for input_data in filtered_inputs + ], True, - ) \ No newline at end of file + ) diff --git a/tests/test_sensor.py b/tests/test_sensor.py index 9d0115f..1bdeb38 100644 --- a/tests/test_sensor.py +++ b/tests/test_sensor.py @@ -59,7 +59,7 @@ async def test_hkc_sensor_invalid_timestamp(hass, aioclient_mock): await sensor.async_update() assert ( - sensor.state == "Unknown" + sensor.state == "Open" ) # as per your logic, an invalid timestamp should result in state "Unknown"