Skip to content

Commit

Permalink
0.12.5 (#116)
Browse files Browse the repository at this point in the history
* Created AggregationItem & small cleanup
* fixed Error None in __cleanup_objs
* reworked loading of rules
* Added some documentation
  • Loading branch information
spacemanspiff2007 authored Mar 18, 2020
1 parent 3d1cab4 commit 46d47bd
Show file tree
Hide file tree
Showing 17 changed files with 374 additions and 77 deletions.
2 changes: 1 addition & 1 deletion HABApp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import HABApp.util

import HABApp.config
#
Expand All @@ -9,6 +8,7 @@
import HABApp.rule
import HABApp.runtime

import HABApp.util
from HABApp.rule import Rule
from HABApp.parameters import Parameter

Expand Down
2 changes: 1 addition & 1 deletion HABApp/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VERSION__ = '0.12.4'
__VERSION__ = '0.12.5'
4 changes: 3 additions & 1 deletion HABApp/core/Items.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ def set_item(item: __BaseItem):


def pop_item(name: str) -> __BaseItem:
return _ALL_ITEMS.pop(name)
item = _ALL_ITEMS.pop(name)
item._on_item_remove()
return item
1 change: 1 addition & 0 deletions HABApp/core/items/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .base_valueitem import BaseValueItem
from .item import Item
from .item_color import ColorItem
from .item_aggregation import AggregationItem
7 changes: 6 additions & 1 deletion HABApp/core/items/base_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pytz import utc

import HABApp
from .item_times import BaseWatch, ChangedTime, UpdatedTime
from .base_item_times import BaseWatch, ChangedTime, UpdatedTime


class BaseItem:
Expand Down Expand Up @@ -105,3 +105,8 @@ def listen_event(self, callback: typing.Callable[[typing.Any], typing.Any],
"""
rule = HABApp.rule.get_parent_rule()
return rule.listen_event(self._name, callback=callback, event_type=event_type)

def _on_item_remove(self):
"""This function gets called when the item is removed from the item registry
"""
pass
59 changes: 59 additions & 0 deletions HABApp/core/items/base_item_times.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import datetime
import typing

from HABApp.core.wrapper import log_exception
from .base_item_watch import BaseWatch, ItemNoUpdateWatch, ItemNoChangeWatch
from ..const import loop


class ItemTimes:
WATCH: typing.Union[typing.Type[ItemNoUpdateWatch], typing.Type[ItemNoChangeWatch]]

def __init__(self, name: str, dt: datetime.datetime):
self.name: str = name
self.dt: datetime.datetime = dt
self.tasks: typing.List[BaseWatch] = []

def set(self, dt: datetime.datetime, events=True):
self.dt = dt
if not self.tasks:
return

if events:
asyncio.run_coroutine_threadsafe(self.schedule_events(), loop)
return None

def add_watch(self, secs: typing.Union[int, float]) -> BaseWatch:
assert secs > 0, secs

# don't add the watch two times
for t in self.tasks:
if t._secs == secs:
return t
w = self.WATCH(self.name, secs)
self.tasks.append(w)
return w

@log_exception
async def schedule_events(self):
clean = False
for t in self.tasks:
if t._secs <= 0:
clean = True
else:
# Schedule the new task, todo: rename to asyncio.create_task once we go py3.7 only
asyncio.ensure_future(t._schedule_event())

# remove canceled tasks
if clean:
self.tasks = [t for t in self.tasks if t._secs > 0]
return None


class UpdatedTime(ItemTimes):
WATCH = ItemNoUpdateWatch


class ChangedTime(ItemTimes):
WATCH = ItemNoChangeWatch
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import datetime
import typing

import HABApp
from HABApp.core.wrapper import log_exception
from ..const import loop
from ..events import ItemNoChangeEvent, ItemNoUpdateEvent

Expand Down Expand Up @@ -56,55 +54,3 @@ class ItemNoUpdateWatch(BaseWatch):

class ItemNoChangeWatch(BaseWatch):
EVENT = ItemNoChangeEvent


class ItemTimes:
WATCH: typing.Union[typing.Type[ItemNoUpdateWatch], typing.Type[ItemNoChangeWatch]]

def __init__(self, name: str, dt: datetime.datetime):
self.name: str = name
self.dt: datetime.datetime = dt
self.tasks: typing.List[BaseWatch] = []

def set(self, dt: datetime.datetime, events=True):
self.dt = dt
if not self.tasks:
return

if events:
asyncio.run_coroutine_threadsafe(self.schedule_events(), loop)
return None

def add_watch(self, secs: typing.Union[int, float]) -> BaseWatch:
assert secs > 0, secs

# don't add the watch two times
for t in self.tasks:
if t._secs == secs:
return t
w = self.WATCH(self.name, secs)
self.tasks.append(w)
return w

@log_exception
async def schedule_events(self):
clean = False
for t in self.tasks:
if t._secs <= 0:
clean = True
else:
# Schedule the new task, todo: rename to asyncio.create_task once we go py3.7 only
asyncio.ensure_future(t._schedule_event())

# remove canceled tasks
if clean:
self.tasks = [t for t in self.tasks if t._secs > 0]
return None


class UpdatedTime(ItemTimes):
WATCH = ItemNoUpdateWatch


class ChangedTime(ItemTimes):
WATCH = ItemNoChangeWatch
122 changes: 122 additions & 0 deletions HABApp/core/items/item_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio
import collections
import time
import typing

import HABApp
from . import BaseValueItem
from ..wrapper import ignore_exception


class AggregationItem(BaseValueItem):

@classmethod
def get_create_item(cls, name: str):
"""Creates a new AggregationItem in HABApp and returns it or returns the already existing one with the given name
:param name: item name
:return: item
"""
assert isinstance(name, str), type(name)

try:
item = HABApp.core.Items.get_item(name)
except HABApp.core.Items.ItemNotFoundException:
item = cls(name)
HABApp.core.Items.set_item(item)

assert isinstance(item, cls), f'{cls} != {type(item)}'
return item

def __init__(self, name: str):
super().__init__(name)
self.__period: float = 0
self.__aggregation_func: typing.Callable[[typing.Iterable], typing.Any] = lambda x: None

self._ts: typing.Deque[float] = collections.deque()
self._vals: typing.Deque[typing.Any] = collections.deque()

self.__listener: typing.Optional[HABApp.core.EventBusListener] = None

def aggregation_func(self, func: typing.Callable[[typing.Iterable], typing.Any]) -> 'AggregationItem':
"""Set the function which will be used to aggregate all values. E.g. ``min`` or ``max``
:param func: The function which takes an iterator an returns an aggregated value.
Important: the function must be **non blocking**!
"""
self.__aggregation_func = func
return self

def aggregation_period(self, period: typing.Union[float, int]) -> 'AggregationItem':
"""Set the period in which the items will be aggregated
:param period: period in seconds
"""
assert period > 0, period
self.__period = period
return self

def aggregation_source(self, source: typing.Union[BaseValueItem, str]) -> 'AggregationItem':
"""Set the source item which changes will be aggregated
:param item_or_name: name or Item obj
"""
# If we already have one we cancel it
if self.__listener is not None:
self.__listener.cancel()
self.__listener = None

self.__listener = HABApp.core.EventBusListener(
topic=source.name if isinstance(source, HABApp.core.items.BaseValueItem) else source,
callback=HABApp.core.WrappedFunction(self._add_value, name=f'{self.name}.add_value'),
event_type=HABApp.core.events.ValueChangeEvent
)
HABApp.core.EventBus.add_listener(self.__listener)
return self

def _on_item_remove(self):
if self.__listener is not None:
self.__listener.cancel()
self.__listener = None

async def __force_update(self):
start = time.time()
await asyncio.sleep(self.__period)
sleep = time.time() - start

# we need to sleep minimum the period, otherwise the value doesn't fall out of the interval
# sometimes asyncio.sleep returns a little bit too early - this is what gets prevented here
if sleep < self.__period:
await asyncio.sleep(self.__period - sleep)

self._aggregate()

async def _add_value(self, event: 'HABApp.core.events.ValueChangeEvent'):
self._ts.append(time.time())
self._vals.append(event.value)

# do another update when the value has fallen ouf of the period
asyncio.ensure_future(self.__force_update())

self._aggregate()
return None

@ignore_exception
def _aggregate(self):
# first remove entries which are too old
now = time.time()
while True:
ct = len(self._ts)
if ct <= 1:
break

# we keep one item from before the period because its value is valid into the period
if (now - self._ts[1]) <= self.__period:
break

self._ts.popleft()
self._vals.popleft()

# old entries are removed -> now do the aggregation
val = self.__aggregation_func(self._vals)
self.post_value(val)
5 changes: 3 additions & 2 deletions HABApp/core/wrappedfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import HABApp


default_logger = logging.getLogger('HABApp.Worker')


Expand Down Expand Up @@ -44,7 +45,7 @@ def run(self, *args, **kwargs):
if self.is_async:
# schedule run async, we need to pass the event loop because we can create an async WrappedFunction
# from a worker thread (if we have a mixture between async and non-async)!
asyncio.run_coroutine_threadsafe(self.__async_run(*args, **kwargs), loop=WrappedFunction._EVENT_LOOP)
asyncio.run_coroutine_threadsafe(self.async_run(*args, **kwargs), loop=WrappedFunction._EVENT_LOOP)
else:
self.__time_submitted = time.time()
WrappedFunction._WORKERS.submit(self.__run, *args, **kwargs)
Expand All @@ -71,7 +72,7 @@ def __format_traceback(self, e: Exception, *args, **kwargs):
)
)

async def __async_run(self, *args, **kwargs):
async def async_run(self, *args, **kwargs):
try:
await self._func(*args, **kwargs)
except Exception as e:
Expand Down
26 changes: 21 additions & 5 deletions HABApp/core/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,27 @@ def __init__(self, logger: typing.Optional[Logger] = None, log_level: int = logg
self.log_level = log_level
self.ignore_exception: bool = ignore_exception

self.raised_exception = False

self.proc_tb: typing.Optional[typing.Callable[[list], list]] = None

def __enter__(self):
pass
self.raised_exception = False

def __exit__(self, exc_type, exc_val, exc_tb):
tb = traceback.format_exc()
# no exception -> we exit gracefully
if exc_type is None and exc_val is None:
return True

self.raised_exception = True

tb = traceback.format_exception(exc_type, exc_val, exc_tb)
# there is an inconsistent use of newlines and array entries so we normalize it
tb = '\n'.join(map(lambda x: x.strip(' \n'), tb))
tb = tb.splitlines()
# possibility to reprocess tb
if self.proc_tb is not None:
tb = self.proc_tb(tb)

# try to get the parent function name
try:
Expand All @@ -106,15 +122,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):

# log error
if self.log is not None:
self.log.log(self.log_level, f'Error {exc_val} in {f_name}:')
for l in tb.splitlines():
self.log.log(self.log_level, f'Error "{exc_val}" in {f_name}:')
for l in tb:
self.log.log(self.log_level, l)

# send Error to internal event bus so we can reprocess it and notify the user
HABApp.core.EventBus.post_event(
HABApp.core.const.topics.WARNINGS if self.log_level == logging.WARNING else HABApp.core.const.topics.ERRORS,
HABApp.core.events.habapp_events.HABAppError(
func_name=f_name, exception=exc_val, traceback=tb
func_name=f_name, exception=exc_val, traceback='\n'.join(tb)
)
)
return self.ignore_exception
Loading

0 comments on commit 46d47bd

Please sign in to comment.