diff --git a/HABApp/__init__.py b/HABApp/__init__.py index e367c67f..e85b9cd7 100644 --- a/HABApp/__init__.py +++ b/HABApp/__init__.py @@ -1,4 +1,3 @@ -import HABApp.util import HABApp.config # @@ -9,6 +8,7 @@ import HABApp.rule import HABApp.runtime +import HABApp.util from HABApp.rule import Rule from HABApp.parameters import Parameter diff --git a/HABApp/__version__.py b/HABApp/__version__.py index 2c69a781..c107b415 100644 --- a/HABApp/__version__.py +++ b/HABApp/__version__.py @@ -1 +1 @@ -__VERSION__ = '0.12.4' +__VERSION__ = '0.12.5' diff --git a/HABApp/core/Items.py b/HABApp/core/Items.py index 08797cf7..0fdf7b33 100644 --- a/HABApp/core/Items.py +++ b/HABApp/core/Items.py @@ -40,4 +40,6 @@ def set_item(item: __BaseItem): def pop_item(name: str) -> __BaseItem: - return _ALL_ITEMS.pop(name) + item = _ALL_ITEMS.pop(name) + item._on_item_remove() + return item diff --git a/HABApp/core/items/__init__.py b/HABApp/core/items/__init__.py index 681a1da4..299d7f5c 100644 --- a/HABApp/core/items/__init__.py +++ b/HABApp/core/items/__init__.py @@ -1,3 +1,4 @@ from .base_valueitem import BaseValueItem from .item import Item from .item_color import ColorItem +from .item_aggregation import AggregationItem diff --git a/HABApp/core/items/base_item.py b/HABApp/core/items/base_item.py index a6e7b4cf..c913fea8 100644 --- a/HABApp/core/items/base_item.py +++ b/HABApp/core/items/base_item.py @@ -5,7 +5,7 @@ from pytz import utc import HABApp -from .item_times import BaseWatch, ChangedTime, UpdatedTime +from .base_item_times import BaseWatch, ChangedTime, UpdatedTime class BaseItem: @@ -105,3 +105,8 @@ def listen_event(self, callback: typing.Callable[[typing.Any], typing.Any], """ rule = HABApp.rule.get_parent_rule() return rule.listen_event(self._name, callback=callback, event_type=event_type) + + def _on_item_remove(self): + """This function gets called when the item is removed from the item registry + """ + pass diff --git a/HABApp/core/items/base_item_times.py b/HABApp/core/items/base_item_times.py new file mode 100644 index 00000000..036349e6 --- /dev/null +++ b/HABApp/core/items/base_item_times.py @@ -0,0 +1,59 @@ +import asyncio +import datetime +import typing + +from HABApp.core.wrapper import log_exception +from .base_item_watch import BaseWatch, ItemNoUpdateWatch, ItemNoChangeWatch +from ..const import loop + + +class ItemTimes: + WATCH: typing.Union[typing.Type[ItemNoUpdateWatch], typing.Type[ItemNoChangeWatch]] + + def __init__(self, name: str, dt: datetime.datetime): + self.name: str = name + self.dt: datetime.datetime = dt + self.tasks: typing.List[BaseWatch] = [] + + def set(self, dt: datetime.datetime, events=True): + self.dt = dt + if not self.tasks: + return + + if events: + asyncio.run_coroutine_threadsafe(self.schedule_events(), loop) + return None + + def add_watch(self, secs: typing.Union[int, float]) -> BaseWatch: + assert secs > 0, secs + + # don't add the watch two times + for t in self.tasks: + if t._secs == secs: + return t + w = self.WATCH(self.name, secs) + self.tasks.append(w) + return w + + @log_exception + async def schedule_events(self): + clean = False + for t in self.tasks: + if t._secs <= 0: + clean = True + else: + # Schedule the new task, todo: rename to asyncio.create_task once we go py3.7 only + asyncio.ensure_future(t._schedule_event()) + + # remove canceled tasks + if clean: + self.tasks = [t for t in self.tasks if t._secs > 0] + return None + + +class UpdatedTime(ItemTimes): + WATCH = ItemNoUpdateWatch + + +class ChangedTime(ItemTimes): + WATCH = ItemNoChangeWatch diff --git a/HABApp/core/items/item_times.py b/HABApp/core/items/base_item_watch.py similarity index 52% rename from HABApp/core/items/item_times.py rename to HABApp/core/items/base_item_watch.py index cbfaf85e..c783a657 100644 --- a/HABApp/core/items/item_times.py +++ b/HABApp/core/items/base_item_watch.py @@ -1,9 +1,7 @@ import asyncio -import datetime import typing import HABApp -from HABApp.core.wrapper import log_exception from ..const import loop from ..events import ItemNoChangeEvent, ItemNoUpdateEvent @@ -56,55 +54,3 @@ class ItemNoUpdateWatch(BaseWatch): class ItemNoChangeWatch(BaseWatch): EVENT = ItemNoChangeEvent - - -class ItemTimes: - WATCH: typing.Union[typing.Type[ItemNoUpdateWatch], typing.Type[ItemNoChangeWatch]] - - def __init__(self, name: str, dt: datetime.datetime): - self.name: str = name - self.dt: datetime.datetime = dt - self.tasks: typing.List[BaseWatch] = [] - - def set(self, dt: datetime.datetime, events=True): - self.dt = dt - if not self.tasks: - return - - if events: - asyncio.run_coroutine_threadsafe(self.schedule_events(), loop) - return None - - def add_watch(self, secs: typing.Union[int, float]) -> BaseWatch: - assert secs > 0, secs - - # don't add the watch two times - for t in self.tasks: - if t._secs == secs: - return t - w = self.WATCH(self.name, secs) - self.tasks.append(w) - return w - - @log_exception - async def schedule_events(self): - clean = False - for t in self.tasks: - if t._secs <= 0: - clean = True - else: - # Schedule the new task, todo: rename to asyncio.create_task once we go py3.7 only - asyncio.ensure_future(t._schedule_event()) - - # remove canceled tasks - if clean: - self.tasks = [t for t in self.tasks if t._secs > 0] - return None - - -class UpdatedTime(ItemTimes): - WATCH = ItemNoUpdateWatch - - -class ChangedTime(ItemTimes): - WATCH = ItemNoChangeWatch diff --git a/HABApp/core/items/item_aggregation.py b/HABApp/core/items/item_aggregation.py new file mode 100644 index 00000000..dc959cc8 --- /dev/null +++ b/HABApp/core/items/item_aggregation.py @@ -0,0 +1,122 @@ +import asyncio +import collections +import time +import typing + +import HABApp +from . import BaseValueItem +from ..wrapper import ignore_exception + + +class AggregationItem(BaseValueItem): + + @classmethod + def get_create_item(cls, name: str): + """Creates a new AggregationItem in HABApp and returns it or returns the already existing one with the given name + + :param name: item name + :return: item + """ + assert isinstance(name, str), type(name) + + try: + item = HABApp.core.Items.get_item(name) + except HABApp.core.Items.ItemNotFoundException: + item = cls(name) + HABApp.core.Items.set_item(item) + + assert isinstance(item, cls), f'{cls} != {type(item)}' + return item + + def __init__(self, name: str): + super().__init__(name) + self.__period: float = 0 + self.__aggregation_func: typing.Callable[[typing.Iterable], typing.Any] = lambda x: None + + self._ts: typing.Deque[float] = collections.deque() + self._vals: typing.Deque[typing.Any] = collections.deque() + + self.__listener: typing.Optional[HABApp.core.EventBusListener] = None + + def aggregation_func(self, func: typing.Callable[[typing.Iterable], typing.Any]) -> 'AggregationItem': + """Set the function which will be used to aggregate all values. E.g. ``min`` or ``max`` + + :param func: The function which takes an iterator an returns an aggregated value. + Important: the function must be **non blocking**! + """ + self.__aggregation_func = func + return self + + def aggregation_period(self, period: typing.Union[float, int]) -> 'AggregationItem': + """Set the period in which the items will be aggregated + + :param period: period in seconds + """ + assert period > 0, period + self.__period = period + return self + + def aggregation_source(self, source: typing.Union[BaseValueItem, str]) -> 'AggregationItem': + """Set the source item which changes will be aggregated + + :param item_or_name: name or Item obj + """ + # If we already have one we cancel it + if self.__listener is not None: + self.__listener.cancel() + self.__listener = None + + self.__listener = HABApp.core.EventBusListener( + topic=source.name if isinstance(source, HABApp.core.items.BaseValueItem) else source, + callback=HABApp.core.WrappedFunction(self._add_value, name=f'{self.name}.add_value'), + event_type=HABApp.core.events.ValueChangeEvent + ) + HABApp.core.EventBus.add_listener(self.__listener) + return self + + def _on_item_remove(self): + if self.__listener is not None: + self.__listener.cancel() + self.__listener = None + + async def __force_update(self): + start = time.time() + await asyncio.sleep(self.__period) + sleep = time.time() - start + + # we need to sleep minimum the period, otherwise the value doesn't fall out of the interval + # sometimes asyncio.sleep returns a little bit too early - this is what gets prevented here + if sleep < self.__period: + await asyncio.sleep(self.__period - sleep) + + self._aggregate() + + async def _add_value(self, event: 'HABApp.core.events.ValueChangeEvent'): + self._ts.append(time.time()) + self._vals.append(event.value) + + # do another update when the value has fallen ouf of the period + asyncio.ensure_future(self.__force_update()) + + self._aggregate() + return None + + @ignore_exception + def _aggregate(self): + # first remove entries which are too old + now = time.time() + while True: + ct = len(self._ts) + if ct <= 1: + break + + # we keep one item from before the period because its value is valid into the period + if (now - self._ts[1]) <= self.__period: + break + + self._ts.popleft() + self._vals.popleft() + + # old entries are removed -> now do the aggregation + val = self.__aggregation_func(self._vals) + self.post_value(val) diff --git a/HABApp/core/wrappedfunction.py b/HABApp/core/wrappedfunction.py index 4abd969f..280a16b6 100644 --- a/HABApp/core/wrappedfunction.py +++ b/HABApp/core/wrappedfunction.py @@ -14,6 +14,7 @@ import HABApp + default_logger = logging.getLogger('HABApp.Worker') @@ -44,7 +45,7 @@ def run(self, *args, **kwargs): if self.is_async: # schedule run async, we need to pass the event loop because we can create an async WrappedFunction # from a worker thread (if we have a mixture between async and non-async)! - asyncio.run_coroutine_threadsafe(self.__async_run(*args, **kwargs), loop=WrappedFunction._EVENT_LOOP) + asyncio.run_coroutine_threadsafe(self.async_run(*args, **kwargs), loop=WrappedFunction._EVENT_LOOP) else: self.__time_submitted = time.time() WrappedFunction._WORKERS.submit(self.__run, *args, **kwargs) @@ -71,7 +72,7 @@ def __format_traceback(self, e: Exception, *args, **kwargs): ) ) - async def __async_run(self, *args, **kwargs): + async def async_run(self, *args, **kwargs): try: await self._func(*args, **kwargs) except Exception as e: diff --git a/HABApp/core/wrapper.py b/HABApp/core/wrapper.py index b1b6d4ce..05b54591 100644 --- a/HABApp/core/wrapper.py +++ b/HABApp/core/wrapper.py @@ -92,11 +92,27 @@ def __init__(self, logger: typing.Optional[Logger] = None, log_level: int = logg self.log_level = log_level self.ignore_exception: bool = ignore_exception + self.raised_exception = False + + self.proc_tb: typing.Optional[typing.Callable[[list], list]] = None + def __enter__(self): - pass + self.raised_exception = False def __exit__(self, exc_type, exc_val, exc_tb): - tb = traceback.format_exc() + # no exception -> we exit gracefully + if exc_type is None and exc_val is None: + return True + + self.raised_exception = True + + tb = traceback.format_exception(exc_type, exc_val, exc_tb) + # there is an inconsistent use of newlines and array entries so we normalize it + tb = '\n'.join(map(lambda x: x.strip(' \n'), tb)) + tb = tb.splitlines() + # possibility to reprocess tb + if self.proc_tb is not None: + tb = self.proc_tb(tb) # try to get the parent function name try: @@ -106,15 +122,15 @@ def __exit__(self, exc_type, exc_val, exc_tb): # log error if self.log is not None: - self.log.log(self.log_level, f'Error {exc_val} in {f_name}:') - for l in tb.splitlines(): + self.log.log(self.log_level, f'Error "{exc_val}" in {f_name}:') + for l in tb: self.log.log(self.log_level, l) # send Error to internal event bus so we can reprocess it and notify the user HABApp.core.EventBus.post_event( HABApp.core.const.topics.WARNINGS if self.log_level == logging.WARNING else HABApp.core.const.topics.ERRORS, HABApp.core.events.habapp_events.HABAppError( - func_name=f_name, exception=exc_val, traceback=tb + func_name=f_name, exception=exc_val, traceback='\n'.join(tb) ) ) return self.ignore_exception diff --git a/HABApp/rule_manager/rule_file.py b/HABApp/rule_manager/rule_file.py index 4667282c..299782ef 100644 --- a/HABApp/rule_manager/rule_file.py +++ b/HABApp/rule_manager/rule_file.py @@ -1,12 +1,10 @@ import collections import logging import runpy -import traceback import typing from pathlib import Path -if typing.TYPE_CHECKING: - import HABApp +import HABApp log = logging.getLogger(f'HABApp.Rules') @@ -56,10 +54,19 @@ def unload(self): log.debug(f'File {self.path} successfully unloaded!') return None + def __process_tc(self, tb: list): + tb = tb[-5:] + tb.insert(0, f"Could not load {self.path}!") + return [l.replace('', self.path.name) for l in tb] + def load(self): created_rules = [] - try: + + ign = HABApp.core.wrapper.ExceptionToHABApp(logger=log) + ign.proc_tb = self.__process_tc + + with ign: # It seems like python 3.8 doesn't allow path like objects any more: # https://github.com/spacemanspiff2007/HABApp/issues/111 runpy.run_path(str(self.path), run_name=str(self.path), init_globals={ @@ -67,10 +74,14 @@ def load(self): '__HABAPP__RULE_FILE__': self, '__HABAPP__RULES': created_rules, }) - except Exception as e: - log.error(f"Could not load {self.path}: {e}!") - for l in traceback.format_exc().splitlines()[-5:]: - log.error(l) + + if ign.raised_exception: + # unload all rule instances which might have already been created otherwise they might + # still listen to events and do stuff + for rule in created_rules: + with ign: + rule._unload() + return None len_found = len(created_rules) if not len_found: diff --git a/HABApp/rule_manager/rule_manager.py b/HABApp/rule_manager/rule_manager.py index ff2700fc..6c6abbfe 100644 --- a/HABApp/rule_manager/rule_manager.py +++ b/HABApp/rule_manager/rule_manager.py @@ -185,7 +185,7 @@ def request_file_load(self, event: HABApp.core.events.habapp_events.RequestFileL self.files[path_str] = file = RuleFile(self, path) file.load() except Exception: - log.error(f"Could not (fully) load {path}!") + log.error(f"Could not load {path}!") for l in traceback.format_exc().splitlines(): log.error(l) return None diff --git a/HABApp/util/counter_item.py b/HABApp/util/counter_item.py index ceeecf12..884faa12 100644 --- a/HABApp/util/counter_item.py +++ b/HABApp/util/counter_item.py @@ -1,8 +1,9 @@ -from HABApp.core.items import Item from threading import Lock +import HABApp -class CounterItem(Item): + +class CounterItem(HABApp.core.items.Item): """Implements a simple thread safe counter""" # todo: Max Value and events when counter is 0 or has max value diff --git a/_doc/advanced_usage.rst b/_doc/advanced_usage.rst index 44ba0104..f99684b7 100644 --- a/_doc/advanced_usage.rst +++ b/_doc/advanced_usage.rst @@ -40,3 +40,33 @@ an own notifier in case there are errors (e.g. Pushover). .. autoclass:: HABApp.core.events.habapp_events.HABAppError :members: + +AggregationItem +------------------------------ +The aggregation item is an item which takes the values of another item as an input. +It then allows to process these values and generate an aggregated output based on it. +The item makes implementing time logic like "Has it been dark for the last hour?" or +"Was there frost during the last six hours?" really easy. +And since it is just like a normal item triggering on changes etc. is possible, too. + +.. execute_code:: + :hide_output: + + from HABApp.core.items import AggregationItem + my_agg = AggregationItem('MyAggregationItem') + + # Connect the source item with the aggregation item + my_agg.aggregation_source('MyInputItem') + + # Aggregate all changes in the last hour + my_agg.aggregation_period(3600) + + # Use max as an aggregation function + my_agg.aggregation_func = max + +The value of ``my_add`` in the example will now always be the maximum of ``MyInputItem`` in the last two hours. +It will automatically update and always reflect the latest changes of ``MyInputItem``. + + +.. autoclass:: HABApp.core.items.AggregationItem + :members: \ No newline at end of file diff --git a/_doc/rule.rst b/_doc/rule.rst index a16d1c9c..ba1e697f 100644 --- a/_doc/rule.rst +++ b/_doc/rule.rst @@ -14,7 +14,7 @@ Rule Interacting with items ------------------------------ -Items are like variables. They have a name and a state (which can be anything). +Items are like variables. They have a name and a value (which can be anything). Items from openhab use the item name from openhab and get created when HABApp successfully connects to openhab or when the openhab configuration changes. Items from MQTT use the topic as item name and get created as soon as a message gets processed. diff --git a/tests/test_core/test_items/test_item_aggregation.py b/tests/test_core/test_items/test_item_aggregation.py new file mode 100644 index 00000000..62c38a2d --- /dev/null +++ b/tests/test_core/test_items/test_item_aggregation.py @@ -0,0 +1,58 @@ +import asyncio + +import pytest + +import HABApp + + +@pytest.yield_fixture() +def event_loop(): + HABApp.core.WrappedFunction._EVENT_LOOP = HABApp.core.const.loop + yield HABApp.core.const.loop + + +@pytest.mark.asyncio +async def test_aggregation_item(): + agg = HABApp.core.items.AggregationItem.get_create_item('MyAggregation') + src = HABApp.core.items.Item.get_create_item('MySource') + + INTERVAL = 0.2 + + agg.aggregation_period(INTERVAL * 6) + agg.aggregation_source(src) + agg.aggregation_func(max) + + async def post_val(t, v): + await asyncio.sleep(t) + src.post_value(v) + + + asyncio.ensure_future(post_val(1 * INTERVAL, 1)) + asyncio.ensure_future(post_val(2 * INTERVAL, 3)) + asyncio.ensure_future(post_val(3 * INTERVAL, 5)) + asyncio.ensure_future(post_val(4 * INTERVAL, 4)) + asyncio.ensure_future(post_val(5 * INTERVAL, 2)) + + await asyncio.sleep(INTERVAL + INTERVAL / 2) + assert agg.value == 1 + + await asyncio.sleep(INTERVAL) + assert agg.value == 3 + + await asyncio.sleep(INTERVAL) + assert agg.value == 5 + + await asyncio.sleep(INTERVAL * 6) # 0.6 because the value reaches into the interval! + assert agg.value == 5 + + await asyncio.sleep(INTERVAL) + assert agg.value == 4 + + await asyncio.sleep(INTERVAL) + assert agg.value == 2 + + await asyncio.sleep(INTERVAL) + assert agg.value == 2 + + await asyncio.sleep(INTERVAL) + assert agg.value == 2 diff --git a/tests/test_core/test_wrapper.py b/tests/test_core/test_wrapper.py new file mode 100644 index 00000000..bf20686e --- /dev/null +++ b/tests/test_core/test_wrapper.py @@ -0,0 +1,45 @@ +import logging +from unittest.mock import MagicMock + +import pytest + +import HABApp +from HABApp.core.wrapper import ExceptionToHABApp + +log = logging.getLogger('WrapperTest') + + +@pytest.fixture +def p_mock(): + post_event = HABApp.core.EventBus.post_event + HABApp.core.EventBus.post_event = m = MagicMock() + yield m + HABApp.core.EventBus.post_event = post_event + + +def test_error_catch(p_mock): + + p_mock.assert_not_called() + + with ExceptionToHABApp(log, logging.WARNING): + pass + p_mock.assert_not_called() + + with ExceptionToHABApp(log, logging.WARNING): + 1 / 0 + p_mock.assert_called_once() + print(p_mock.call_args[0][0] == 'HABApp.Warnings') + + +def test_error_level(p_mock): + with ExceptionToHABApp(log, logging.WARNING): + 1 / 0 + p_mock.assert_called_once() + assert p_mock.call_args[0][0] == HABApp.core.const.topics.WARNINGS + + p_mock.reset_mock() + + with ExceptionToHABApp(log): + 1 / 0 + p_mock.assert_called_once() + assert p_mock.call_args[0][0] == HABApp.core.const.topics.ERRORS