Skip to content

Commit

Permalink
Add unique IDs (#31)
Browse files Browse the repository at this point in the history
Allows entities to be managed on UI Entities page.
Support HA versions 2021.12 and newer.
  • Loading branch information
pnbruckner authored Jan 9, 2023
1 parent f9411f6 commit 6534141
Showing 1 changed file with 86 additions and 69 deletions.
155 changes: 86 additions & 69 deletions custom_components/illuminance/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,30 @@
A Sensor platform that estimates outdoor illuminance from current weather conditions.
"""
import datetime as dt
from __future__ import annotations

import asyncio
from collections.abc import Mapping, Sequence
from datetime import date, datetime, timedelta
from enum import IntEnum
import logging
from math import asin, cos, exp, radians, sin
import re
from typing import Any, Union, cast

from astral import Elevation
from astral.location import Location
import voluptuous as vol

from homeassistant.components.darksky.weather import MAP_CONDITION as DSW_MAP_CONDITION
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN, PLATFORM_SCHEMA
from homeassistant.components.sensor import (
DOMAIN as SENSOR_DOMAIN,
PLATFORM_SCHEMA,
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import (
ATTR_ATTRIBUTION,
CONF_ENTITY_ID,
Expand All @@ -24,16 +38,18 @@
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, Event, State, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback, EntityPlatform
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.sun import get_astral_location
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
import homeassistant.util.dt as dt_util

DOMAIN = "illuminance"
DEFAULT_NAME = "Illuminance"
MIN_SCAN_INTERVAL = dt.timedelta(minutes=5)
DEFAULT_SCAN_INTERVAL = dt.timedelta(minutes=5)
MIN_SCAN_INTERVAL = timedelta(minutes=5)
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
DEFAULT_FALLBACK = 10

CONF_FALLBACK = "fallback"
Expand Down Expand Up @@ -147,30 +163,32 @@
}
)

_20_MIN = dt.timedelta(minutes=20)
_40_MIN = dt.timedelta(minutes=40)
_20_MIN = timedelta(minutes=20)
_40_MIN = timedelta(minutes=40)

Num = Union[float, int]


async def async_setup_platform(hass, config, async_add_entities, discovery_info=None):
"""Set up platform."""
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up sensors."""

def get_loc_elev(event=None):
def get_loc_elev(event: Event | None = None) -> None:
"""Get HA Location object & elevation."""
try:
loc, elev = get_astral_location(hass)
except TypeError:
loc = get_astral_location(hass)
elev = None
hass.data["illuminance"] = loc, elev
hass.data[DOMAIN] = get_astral_location(hass)

if "illuminance" not in hass.data:
if DOMAIN not in hass.data:
get_loc_elev()
hass.bus.async_listen(EVENT_CORE_CONFIG_UPDATE, get_loc_elev)

async_add_entities([IlluminanceSensor(config)], True)


def _illumiance(elev):
def _illumiance(elev: Num) -> float:
"""Calculate illuminance from sun at given elevation."""
elev_rad = radians(elev)
u = sin(elev_rad)
Expand All @@ -197,28 +215,39 @@ class EntityStatus(IntEnum):
OK_CONDITION = 4


class IlluminanceSensor(Entity):
class IlluminanceSensor(SensorEntity):
"""Illuminance sensor."""

_state = None
_entity_status = EntityStatus.NOT_SEEN
_sk_mapping = None
_cd_mapping = None
_sk = None
_cond_desc = None
_sk_mapping: Sequence[tuple[Num, Sequence[str]]] | None = None
_cd_mapping: Mapping[str, str | None] | None = None
_sk: Num
_cond_desc: str | None = None
_warned = False

def __init__(self, config):
"""Initialize."""
self._entity_id = config[CONF_ENTITY_ID]
self._name = config[CONF_NAME]
self._mode = config[CONF_MODE]
if self._mode == MODE_SIMPLE:
self._sun_data = None
self._fallback = config[CONF_FALLBACK]
_sun_data: tuple[date, tuple[datetime, datetime, datetime, datetime]] | None = None

def __init__(self, config: ConfigType) -> None:
"""Initialize sensor."""
name = config[CONF_NAME]
self.entity_description = SensorEntityDescription(
DOMAIN,
device_class=SensorDeviceClass.ILLUMINANCE,
name=name,
native_unit_of_measurement=LIGHT_LUX,
state_class=SensorStateClass.MEASUREMENT,
)
self._attr_unique_id = name
self._entity_id: str = config[CONF_ENTITY_ID]
self._mode: str = config[CONF_MODE]
self._fallback: float = config[CONF_FALLBACK]

@callback
def add_to_platform_start(self, hass, platform, parallel_updates):
def add_to_platform_start(
self,
hass: HomeAssistant,
platform: EntityPlatform,
parallel_updates: asyncio.Semaphore | None,
) -> None:
"""Start adding an entity to a platform."""
# This method is called before first call to async_update.

Expand All @@ -229,9 +258,10 @@ def add_to_platform_start(self, hass, platform, parallel_updates):
self._get_divisor_from_weather_data(hass.states.get(self._entity_id))

@callback
def sensor_state_listener(event):
new_state = event.data["new_state"]
old_state = event.data["old_state"]
def sensor_state_listener(event: Event) -> None:
"""Process input entity state update."""
new_state: State | None = event.data["new_state"]
old_state: State | None = event.data["old_state"]
if (
self._entity_status <= EntityStatus.NO_ATTRIBUTION
or not old_state
Expand All @@ -246,22 +276,7 @@ def sensor_state_listener(event):
async_track_state_change_event(hass, self._entity_id, sensor_state_listener)
)

@property
def name(self):
"""Return name."""
return self._name

@property
def state(self):
"""Return state."""
return self._state

@property
def unit_of_measurement(self):
"""Return unit of measurement."""
return LIGHT_LUX

async def async_update(self):
async def async_update(self) -> None:
"""Update state."""
if (
self._entity_status <= EntityStatus.NO_ATTRIBUTION
Expand All @@ -278,17 +293,17 @@ async def async_update(self):

# Calculate final illuminance.

self._state = round(illuminance / self._sk)
self._attr_native_value = round(illuminance / self._sk)
_LOGGER.debug(
"%s: Updating %s -> %i / %0.1f = %i",
self.name,
self._cond_desc,
round(illuminance),
self._sk,
self._state,
self._attr_native_value,
)

def _get_divisor_from_weather_data(self, entity_state):
def _get_divisor_from_weather_data(self, entity_state: State | None) -> None:
"""Get weather data from input entity."""

# Use fallback unless divisor can be successfully determined from weather data.
Expand All @@ -306,6 +321,7 @@ def _get_divisor_from_weather_data(self, entity_state):
# yet, try to determine them.
if self._entity_status <= EntityStatus.NO_ATTRIBUTION:
if raw_condition:
assert entity_state
try:
float(raw_condition)
self._entity_status = EntityStatus.OK_CLOUD
Expand All @@ -315,7 +331,9 @@ def _get_divisor_from_weather_data(self, entity_state):
self._entity_id,
)
except ValueError:
attribution = entity_state.attributes.get(ATTR_ATTRIBUTION)
attribution = cast(
Union[str, None], entity_state.attributes.get(ATTR_ATTRIBUTION)
)
self._get_mappings(attribution, entity_state.domain)
if self._entity_status == EntityStatus.BAD:
_LOGGER.error(
Expand Down Expand Up @@ -373,6 +391,7 @@ def _get_divisor_from_weather_data(self, entity_state):
return

assert self._entity_status == EntityStatus.OK_CONDITION
assert self._sk_mapping

if self._cd_mapping:
condition = self._cd_mapping.get(raw_condition)
Expand All @@ -387,7 +406,7 @@ def _get_divisor_from_weather_data(self, entity_state):
return
_LOGGER.error("%s: Unexpected current condition: %s", self.name, raw_condition)

def _get_mappings(self, attribution, domain):
def _get_mappings(self, attribution: str | None, domain: str) -> None:
"""Get sk -> conditions mappings."""
if not attribution:
self._entity_status = EntityStatus.NO_ATTRIBUTION
Expand All @@ -403,10 +422,10 @@ def _get_mappings(self, attribution, domain):

self._entity_status = EntityStatus.BAD

def _calculate_illuminance(self, now):
def _calculate_illuminance(self, now: datetime) -> Num:
"""Calculate sunny illuminance."""
if self._mode == MODE_NORMAL:
return _illumiance(self._astral_event("solar_elevation", now))
return _illumiance(cast(Num, self._astral_event("solar_elevation", now)))

sun_factor = self._sun_factor(now)

Expand All @@ -415,27 +434,25 @@ def _calculate_illuminance(self, now):
if sun_factor == 0:
# For historic reasons, return a value of 10.
_LOGGER.debug("%s: Updating -> 10", self.name)
self._state = 10
self._attr_native_value = 10
raise AbortUpdate

return 10000 * sun_factor

def _astral_event(self, event, date_or_dt):
def _astral_event(self, event: str, date_or_dt: date | datetime) -> Any:
"""Get astral event."""
loc, elev = self.hass.data["illuminance"]
if elev is None:
return getattr(loc, event)(date_or_dt)
loc, elev = cast(tuple[Location, Elevation], self.hass.data[DOMAIN])
return getattr(loc, event)(date_or_dt, observer_elevation=elev)

def _sun_factor(self, now):
def _sun_factor(self, now: datetime) -> Num:
"""Calculate sun factor."""
now_date = now.date()

if self._sun_data and self._sun_data[0] == now_date:
(sunrise_begin, sunrise_end, sunset_begin, sunset_end) = self._sun_data[1]
else:
sunrise = self._astral_event("sunrise", now_date)
sunset = self._astral_event("sunset", now_date)
sunrise = cast(datetime, self._astral_event("sunrise", now_date))
sunset = cast(datetime, self._astral_event("sunset", now_date))
sunrise_begin = sunrise - _20_MIN
sunrise_end = sunrise + _40_MIN
sunset_begin = sunset - _40_MIN
Expand Down

0 comments on commit 6534141

Please sign in to comment.