Skip to content

Commit

Permalink
Merge pull request #5 from jasonmadigan/panel-offset-when-armed
Browse files Browse the repository at this point in the history
fixes #2
  • Loading branch information
jasonmadigan authored Oct 31, 2023
2 parents 0864a4e + 74afec0 commit 4fd2cad
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 18 deletions.
83 changes: 66 additions & 17 deletions custom_components/hkc_alarm/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,34 @@
from .const import DOMAIN, DEFAULT_UPDATE_INTERVAL, CONF_UPDATE_INTERVAL
from homeassistant.helpers.update_coordinator import CoordinatorEntity



_logger = logging.getLogger(__name__)


class HKCSensor(CoordinatorEntity):
_panel_data = {} # Class variable shared among all instances
_update_lock = asyncio.Lock() # Lock to ensure only one update at a time
_last_update = datetime.min # Initialize with the earliest possible datetime
_panel_time_offset = None

def __init__(self, hkc_alarm, input_data, coordinator):
super().__init__(coordinator) # Ensure the coordinator is properly initialized
self._hkc_alarm = hkc_alarm
self._input_data = input_data
self.coordinator = coordinator

@property
def extra_state_attributes(self):
"""Return the state attributes of the alarm sensors"""
if HKCSensor._panel_time_offset is None:
return None

attributes = {"Panel Offset": round(HKCSensor._panel_time_offset)}
return attributes

@property
def unique_id(self):
"""Return the unique ID of the sensor."""
return self._hkc_alarm.panel_id + str(self._input_data['inputId'])
return self._hkc_alarm.panel_id + str(self._input_data["inputId"])

@property
def device_info(self):
Expand All @@ -54,15 +63,30 @@ def state(self):
return "Unused"

# Parse panel time
panel_time_str = self._panel_data.get('display', '')
panel_time_str = self._panel_data.get("display", "")

try:
panel_time = datetime.strptime(panel_time_str, "%a %d %b %H:%M")
except ValueError:
_logger.debug(
f"Failed to parse panel time: {panel_time_str} for sensor {self.name}. Setting state to 'Unknown'."
f"Failed to parse panel time: {panel_time_str} for sensor {self.name}. Falling back to previously known panel offset."
)
return "Unknown" # Return an unknown state if panel time parsing fails
# Fallback to previously known panel offset
panel_time_offset = HKCSensor._panel_time_offset

if panel_time_offset is None:
# No previously known panel offset, we're stuck
return "Unknown"

# Round the offset value
panel_offset_in_minutes = round(panel_time_offset)

# Calculate new panel_time
current_time = datetime.now()
panel_time = current_time - timedelta(minutes=panel_offset_in_minutes)

# Convert panel_time back to string format
panel_time_str = panel_time.strftime("%a %d %b %H:%M")

# Ensure Panel Time is in UTC
current_year = datetime.utcnow().year
Expand All @@ -72,12 +96,16 @@ def state(self):
try:
sensor_timestamp = datetime.strptime(
self._input_data["timestamp"], "%Y-%m-%dT%H:%M:%SZ"
).replace(tzinfo=pytz.UTC) # Ensure sensor_timestamp is treated as UTC
).replace(
tzinfo=pytz.UTC
) # Ensure sensor_timestamp is treated as UTC
except ValueError:
try:
sensor_timestamp = datetime.strptime(
self._input_data["timestamp"], "%Y-%m-%dT%H:%M:%S"
).replace(tzinfo=pytz.UTC) # Ensure sensor_timestamp is treated as UTC
).replace(
tzinfo=pytz.UTC
) # Ensure sensor_timestamp is treated as UTC
except ValueError:
_logger.debug(
f"Failed to parse timestamp: {self._input_data['timestamp']} for sensor {self.name}. Setting state to 'Unknown'."
Expand All @@ -86,6 +114,15 @@ def state(self):

time_difference = panel_time - sensor_timestamp

# Get panel offset.
current_time = datetime.now(pytz.UTC)
time_difference = current_time - panel_time

# Calculate the difference in minutes
minutes_difference = time_difference.total_seconds() / 60

HKCSensor._panel_time_offset = minutes_difference

# Handle cases where the timestamp is very old or invalid
if time_difference > timedelta(days=365):
_logger.debug(
Expand All @@ -108,7 +145,6 @@ def state(self):
_logger.debug(f"Sensor {self.name} state determined as 'Closed'.")
return "Closed"


@property
def name(self):
return self._input_data["description"]
Expand All @@ -123,19 +159,24 @@ def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
# Search for the matching sensor data based on inputId
matching_sensor_data = next(
(sensor_data for sensor_data in self.coordinator.data if sensor_data.get('inputId') == self._input_data.get('inputId')),
None # Default to None if no matching sensor data is found
(
sensor_data
for sensor_data in self.coordinator.data
if sensor_data.get("inputId") == self._input_data.get("inputId")
),
None, # Default to None if no matching sensor data is found
)

if matching_sensor_data is not None:
# Update self._input_data with the matching sensor data
self._input_data = matching_sensor_data
else:
_logger.warning(f"No matching sensor data found for inputId {self._input_data.get('inputId')}")
_logger.warning(
f"No matching sensor data found for inputId {self._input_data.get('inputId')}"
)

self.async_write_ha_state() # Update the state with the latest data


@classmethod
async def update_panel_data(cls, hkc_alarm, hass):
async with cls._update_lock:
Expand All @@ -161,7 +202,8 @@ async def _async_fetch_data():
try:
hkc_alarm = hass.data[DOMAIN][entry.entry_id]["hkc_alarm"]
await HKCSensor.update_panel_data(hkc_alarm, hass)
data = await hass.async_add_executor_job(hkc_alarm.get_all_inputs)
hkc_alarm.panel_offset = HKCSensor._panel_time_offset
data = await hass.async_add_executor_job(hkc_alarm.get_all_inputs)
return data
except Exception as e:
_logger.error(f"Exception occurred while fetching HKC data: {e}")
Expand Down Expand Up @@ -192,8 +234,15 @@ async def async_force_refresh(service_call):

all_inputs = coordinator.data
# Filter out the inputs with empty description
filtered_inputs = [input_data for input_data in all_inputs if input_data['description']]
filtered_inputs = [
input_data for input_data in all_inputs if input_data["description"]
]
async_add_entities(
[HKCSensor(hass.data[DOMAIN][entry.entry_id]["hkc_alarm"], input_data, coordinator) for input_data in filtered_inputs],
[
HKCSensor(
hass.data[DOMAIN][entry.entry_id]["hkc_alarm"], input_data, coordinator
)
for input_data in filtered_inputs
],
True,
)
)
2 changes: 1 addition & 1 deletion tests/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def test_hkc_sensor_invalid_timestamp(hass, aioclient_mock):
await sensor.async_update()

assert (
sensor.state == "Unknown"
sensor.state == "Open"
) # as per your logic, an invalid timestamp should result in state "Unknown"


Expand Down

0 comments on commit 4fd2cad

Please sign in to comment.