diff --git a/custom_components/illuminance/sensor.py b/custom_components/illuminance/sensor.py index 1b13ead..7a5d2ca 100644 --- a/custom_components/illuminance/sensor.py +++ b/custom_components/illuminance/sensor.py @@ -3,16 +3,30 @@ A Sensor platform that estimates outdoor illuminance from current weather conditions. """ -import datetime as dt +from __future__ import annotations + +import asyncio +from collections.abc import Mapping, Sequence +from datetime import date, datetime, timedelta from enum import IntEnum import logging from math import asin, cos, exp, radians, sin import re +from typing import Any, Union, cast +from astral import Elevation +from astral.location import Location import voluptuous as vol from homeassistant.components.darksky.weather import MAP_CONDITION as DSW_MAP_CONDITION -from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN, PLATFORM_SCHEMA +from homeassistant.components.sensor import ( + DOMAIN as SENSOR_DOMAIN, + PLATFORM_SCHEMA, + SensorDeviceClass, + SensorEntity, + SensorEntityDescription, + SensorStateClass, +) from homeassistant.const import ( ATTR_ATTRIBUTION, CONF_ENTITY_ID, @@ -24,16 +38,18 @@ STATE_UNAVAILABLE, STATE_UNKNOWN, ) -from homeassistant.core import callback +from homeassistant.core import HomeAssistant, Event, State, callback import homeassistant.helpers.config_validation as cv -from homeassistant.helpers.entity import Entity +from homeassistant.helpers.entity_platform import AddEntitiesCallback, EntityPlatform from homeassistant.helpers.event import async_track_state_change_event from homeassistant.helpers.sun import get_astral_location +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType import homeassistant.util.dt as dt_util +DOMAIN = "illuminance" DEFAULT_NAME = "Illuminance" -MIN_SCAN_INTERVAL = dt.timedelta(minutes=5) -DEFAULT_SCAN_INTERVAL = dt.timedelta(minutes=5) +MIN_SCAN_INTERVAL = timedelta(minutes=5) +DEFAULT_SCAN_INTERVAL = timedelta(minutes=5) DEFAULT_FALLBACK = 10 CONF_FALLBACK = "fallback" @@ -147,30 +163,32 @@ } ) -_20_MIN = dt.timedelta(minutes=20) -_40_MIN = dt.timedelta(minutes=40) +_20_MIN = timedelta(minutes=20) +_40_MIN = timedelta(minutes=40) + +Num = Union[float, int] -async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): - """Set up platform.""" +async def async_setup_platform( + hass: HomeAssistant, + config: ConfigType, + async_add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType | None = None, +) -> None: + """Set up sensors.""" - def get_loc_elev(event=None): + def get_loc_elev(event: Event | None = None) -> None: """Get HA Location object & elevation.""" - try: - loc, elev = get_astral_location(hass) - except TypeError: - loc = get_astral_location(hass) - elev = None - hass.data["illuminance"] = loc, elev + hass.data[DOMAIN] = get_astral_location(hass) - if "illuminance" not in hass.data: + if DOMAIN not in hass.data: get_loc_elev() hass.bus.async_listen(EVENT_CORE_CONFIG_UPDATE, get_loc_elev) async_add_entities([IlluminanceSensor(config)], True) -def _illumiance(elev): +def _illumiance(elev: Num) -> float: """Calculate illuminance from sun at given elevation.""" elev_rad = radians(elev) u = sin(elev_rad) @@ -197,28 +215,39 @@ class EntityStatus(IntEnum): OK_CONDITION = 4 -class IlluminanceSensor(Entity): +class IlluminanceSensor(SensorEntity): """Illuminance sensor.""" - _state = None _entity_status = EntityStatus.NOT_SEEN - _sk_mapping = None - _cd_mapping = None - _sk = None - _cond_desc = None + _sk_mapping: Sequence[tuple[Num, Sequence[str]]] | None = None + _cd_mapping: Mapping[str, str | None] | None = None + _sk: Num + _cond_desc: str | None = None _warned = False - - def __init__(self, config): - """Initialize.""" - self._entity_id = config[CONF_ENTITY_ID] - self._name = config[CONF_NAME] - self._mode = config[CONF_MODE] - if self._mode == MODE_SIMPLE: - self._sun_data = None - self._fallback = config[CONF_FALLBACK] + _sun_data: tuple[date, tuple[datetime, datetime, datetime, datetime]] | None = None + + def __init__(self, config: ConfigType) -> None: + """Initialize sensor.""" + name = config[CONF_NAME] + self.entity_description = SensorEntityDescription( + DOMAIN, + device_class=SensorDeviceClass.ILLUMINANCE, + name=name, + native_unit_of_measurement=LIGHT_LUX, + state_class=SensorStateClass.MEASUREMENT, + ) + self._attr_unique_id = name + self._entity_id: str = config[CONF_ENTITY_ID] + self._mode: str = config[CONF_MODE] + self._fallback: float = config[CONF_FALLBACK] @callback - def add_to_platform_start(self, hass, platform, parallel_updates): + def add_to_platform_start( + self, + hass: HomeAssistant, + platform: EntityPlatform, + parallel_updates: asyncio.Semaphore | None, + ) -> None: """Start adding an entity to a platform.""" # This method is called before first call to async_update. @@ -229,9 +258,10 @@ def add_to_platform_start(self, hass, platform, parallel_updates): self._get_divisor_from_weather_data(hass.states.get(self._entity_id)) @callback - def sensor_state_listener(event): - new_state = event.data["new_state"] - old_state = event.data["old_state"] + def sensor_state_listener(event: Event) -> None: + """Process input entity state update.""" + new_state: State | None = event.data["new_state"] + old_state: State | None = event.data["old_state"] if ( self._entity_status <= EntityStatus.NO_ATTRIBUTION or not old_state @@ -246,22 +276,7 @@ def sensor_state_listener(event): async_track_state_change_event(hass, self._entity_id, sensor_state_listener) ) - @property - def name(self): - """Return name.""" - return self._name - - @property - def state(self): - """Return state.""" - return self._state - - @property - def unit_of_measurement(self): - """Return unit of measurement.""" - return LIGHT_LUX - - async def async_update(self): + async def async_update(self) -> None: """Update state.""" if ( self._entity_status <= EntityStatus.NO_ATTRIBUTION @@ -278,17 +293,17 @@ async def async_update(self): # Calculate final illuminance. - self._state = round(illuminance / self._sk) + self._attr_native_value = round(illuminance / self._sk) _LOGGER.debug( "%s: Updating %s -> %i / %0.1f = %i", self.name, self._cond_desc, round(illuminance), self._sk, - self._state, + self._attr_native_value, ) - def _get_divisor_from_weather_data(self, entity_state): + def _get_divisor_from_weather_data(self, entity_state: State | None) -> None: """Get weather data from input entity.""" # Use fallback unless divisor can be successfully determined from weather data. @@ -306,6 +321,7 @@ def _get_divisor_from_weather_data(self, entity_state): # yet, try to determine them. if self._entity_status <= EntityStatus.NO_ATTRIBUTION: if raw_condition: + assert entity_state try: float(raw_condition) self._entity_status = EntityStatus.OK_CLOUD @@ -315,7 +331,9 @@ def _get_divisor_from_weather_data(self, entity_state): self._entity_id, ) except ValueError: - attribution = entity_state.attributes.get(ATTR_ATTRIBUTION) + attribution = cast( + Union[str, None], entity_state.attributes.get(ATTR_ATTRIBUTION) + ) self._get_mappings(attribution, entity_state.domain) if self._entity_status == EntityStatus.BAD: _LOGGER.error( @@ -373,6 +391,7 @@ def _get_divisor_from_weather_data(self, entity_state): return assert self._entity_status == EntityStatus.OK_CONDITION + assert self._sk_mapping if self._cd_mapping: condition = self._cd_mapping.get(raw_condition) @@ -387,7 +406,7 @@ def _get_divisor_from_weather_data(self, entity_state): return _LOGGER.error("%s: Unexpected current condition: %s", self.name, raw_condition) - def _get_mappings(self, attribution, domain): + def _get_mappings(self, attribution: str | None, domain: str) -> None: """Get sk -> conditions mappings.""" if not attribution: self._entity_status = EntityStatus.NO_ATTRIBUTION @@ -403,10 +422,10 @@ def _get_mappings(self, attribution, domain): self._entity_status = EntityStatus.BAD - def _calculate_illuminance(self, now): + def _calculate_illuminance(self, now: datetime) -> Num: """Calculate sunny illuminance.""" if self._mode == MODE_NORMAL: - return _illumiance(self._astral_event("solar_elevation", now)) + return _illumiance(cast(Num, self._astral_event("solar_elevation", now))) sun_factor = self._sun_factor(now) @@ -415,27 +434,25 @@ def _calculate_illuminance(self, now): if sun_factor == 0: # For historic reasons, return a value of 10. _LOGGER.debug("%s: Updating -> 10", self.name) - self._state = 10 + self._attr_native_value = 10 raise AbortUpdate return 10000 * sun_factor - def _astral_event(self, event, date_or_dt): + def _astral_event(self, event: str, date_or_dt: date | datetime) -> Any: """Get astral event.""" - loc, elev = self.hass.data["illuminance"] - if elev is None: - return getattr(loc, event)(date_or_dt) + loc, elev = cast(tuple[Location, Elevation], self.hass.data[DOMAIN]) return getattr(loc, event)(date_or_dt, observer_elevation=elev) - def _sun_factor(self, now): + def _sun_factor(self, now: datetime) -> Num: """Calculate sun factor.""" now_date = now.date() if self._sun_data and self._sun_data[0] == now_date: (sunrise_begin, sunrise_end, sunset_begin, sunset_end) = self._sun_data[1] else: - sunrise = self._astral_event("sunrise", now_date) - sunset = self._astral_event("sunset", now_date) + sunrise = cast(datetime, self._astral_event("sunrise", now_date)) + sunset = cast(datetime, self._astral_event("sunset", now_date)) sunrise_begin = sunrise - _20_MIN sunrise_end = sunrise + _40_MIN sunset_begin = sunset - _40_MIN