Skip to content

Commit

Permalink
AvlEventHandler async fix, remove_avl_event_handler added
Browse files Browse the repository at this point in the history
  • Loading branch information
o-murphy committed Aug 13, 2024
1 parent fc740d1 commit a98d3f0
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 23 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ wialon.start_polling(token=TOKEN, logout_finally=False)
### Critical requests execution
Some requests to services like `Render`, `Reports`, `Messages` requires blocking other requests to be executed together per single session.
* Use the `@wialon.lock_session` decorator to block async loop till your operation done
* You can apply `@wialon.session_lock` also for handlers, order of decorators doesn't matter
* You can apply `@wialon.session_lock` also for handlers
* You can use `@wialon.session_lock` inside the methods when [inheriting Wialon](#extending-aio-wialon)

```python
Expand Down
32 changes: 27 additions & 5 deletions aiowialon/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, scheme: Literal['https', 'http'] = 'https',
self.__base_url = f"{scheme}://{host}:{port if port else 443 if scheme == 'https' else 80}"
self.__base_api_url: str = urljoin(self.__base_url, 'wialon/ajax.html')

self.__handlers: Dict[str, AvlEventHandler] = {}
self.__avl_event_handlers: Dict[str, AvlEventHandler] = {}
self.__on_session_open: Optional[LoginCallback] = None
self.__on_session_close: Optional[LogoutCallback] = None

Expand Down Expand Up @@ -135,20 +135,37 @@ def avl_event_handler(self, filter_: Optional[AvlEventFilter] = None) -> Callabl

def wrapper(callback: AvlEventCallback):
handler = AvlEventHandler(callback, filter_)
if callback.__name__ in self.__handlers:
raise KeyError(f"Detected EventHandler duplicate {callback.__name__}")
self.__handlers[callback.__name__] = handler
if callback.__name__ in self.__avl_event_handlers:
raise KeyError(f"Detected AVLEventHandler duplicate {callback.__name__}")
self.__avl_event_handlers[callback.__name__] = handler
return callback

return wrapper

def remove_avl_event_handler(self, callback: [str, AvlEventCallback]):
"""Manually remove AVL event handler"""

if callable(callback):
callback = callback.__name__
if isinstance(callback, str):
handler = self.__avl_event_handlers.pop(callback)
asyncio.create_task(handler.cleanup())
else:
warnings.warn(f"Can't remove AVL event handler: {callback}")

async def _process_event_handlers(self, event: AvlEvent) -> None:
"""Process event handlers for current item"""

for _, handler in self.__handlers.items():
for _, handler in self.__avl_event_handlers.items():
if await handler(event):
break

async def _cleanup_event_handlers(self) -> None:
"""Cleanup event handlers"""

for _, handler in self.__avl_event_handlers.items():
await handler.cleanup()

async def start_polling(self, timeout: Union[int, float] = 2,
logout_finally: bool = True,
**params: Unpack[LoginParams]) -> None:
Expand Down Expand Up @@ -183,6 +200,7 @@ async def stop_polling(self, logout: bool = False) -> None:
if self.__polling_task:
logger.info("Stopping polling task")
self.__polling_task.cancel()
await self._cleanup_event_handlers()
with suppress(asyncio.CancelledError):
await self.__polling_task
self.__polling_task = None
Expand Down Expand Up @@ -242,13 +260,16 @@ async def _polling(self, timeout: Union[int, float] = 2) -> None:

async def avl_evts(self) -> Any:
"""Call avl_event request"""

await self._session_lock_event.wait()
if self.__polling_task:
warnings.warn("Polling running, don't recommended to call 'avl_evts' manually",
WialonWarning)
url = urljoin(self.__base_url, 'avl_evts')
params = {
'sid': self._sid
}
print('evt')
return await self.request('avl_evts', url, params)

# pylint: disable=unused-argument
Expand Down Expand Up @@ -295,6 +316,7 @@ async def multipart(self, call: Coroutine[Any, Any, Any],
"""Adapter method for 'Wialon.call()' coroutine
to send multipart data to server"""

await self._session_lock_event.wait()
if not self._is_call(call) or not call.cr_frame:
raise TypeError("Coroutine is not an Wialon.call")
coroutine_locals = call.cr_frame.f_locals
Expand Down
57 changes: 42 additions & 15 deletions aiowialon/types/avl_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ class AvlEventHandler:
def __init__(self,
callback: AvlEventCallback,
filter_: Optional[AvlEventFilter] = None) -> None:
self._callback: AvlEventCallback = callback
self._filter: Optional[AvlEventFilter] = filter_
self._callback: AvlEventCallback
self._filter: Optional[AvlEventFilter]
self._tasks: List[asyncio.Task] = []

self.callback = callback
self.filter = filter_
Expand All @@ -87,29 +88,55 @@ async def __call__(self, event: AvlEvent) -> bool:
"""

if not self._filter:
await self.__handle(event)
await self.__process_event(event)
return True
if self._filter is not None:
if self._filter(event):
await self.__handle(event)
await self.__process_event(event)
return True
return False

async def __handle(self, event: AvlEvent) -> None:
async def __process_event(self, event: AvlEvent) -> None:
"""
Executes the callback function with handled AvlEvent,
suppressing the exceptions if callback raises it to prevent app braiking
suppressing the exceptions if callback raises it to prevent app breaking.
"""

logger.info("Got AVL event %s", event)
try:
with suppress(asyncio.CancelledError):
await self._callback(event)
except asyncio.CancelledError:
logger.info("%s cancelled", self._callback.__name__)
except (WialonError, aiohttp.ClientError) as e:
logger.error("Exception happened on %s", self._callback.__name__)
logger.exception(e)
with suppress(asyncio.CancelledError):
# Wrap the callback with a try-except block to handle exceptions
async def wrapped_callback(event: AvlEvent):
try:
await self._callback(event)
except (WialonError, aiohttp.ClientError) as e:
logger.error("Exception happened in %s", self._callback.__name__)
logger.exception(e)

callback_task = asyncio.create_task(
wrapped_callback(event),
name=f"AvlEventHandler ({len(self._tasks)}): {self._callback.__name__}"
)
self._tasks.append(callback_task)
callback_task.add_done_callback(self.__cleanup_task)

def __cleanup_task(self, task: asyncio.Task):
"""Remove the task from the list once it's done"""

task.cancel()
with suppress(asyncio.CancelledError):
task.__await__()
if task in self._tasks:
self._tasks.remove(task)
logger.debug("Task completed and removed: %s", task.get_name())

async def cleanup(self):
"""cleaning the AvlEventHandler tasks"""

logger.debug("Cleaning up AvlEventHandler: %s, cancelling all tasks",
self._callback.__name__)
for task in self._tasks:
self.__cleanup_task(task)
logger.debug("All handler tasks cancelled")

@property
def callback(self) -> AvlEventCallback:
Expand Down Expand Up @@ -139,7 +166,7 @@ def filter(self, filter_: Optional[AvlEventFilter] = None) -> None:

if filter_ and not callable(filter_):
raise TypeError(f'AvlEventHandler.filter_ must be a type of {AvlEventFilter}')
self.filter_ = filter_
self._filter = filter_


__all__ = (
Expand Down
6 changes: 4 additions & 2 deletions examples/aiolock.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ async def register_avl_events(session_login):
return await wialon.core_update_data_flags(spec=spec)


@wialon.avl_event_handler(lambda event: event.data.i == 734455)
@wialon.avl_event_handler()
@wialon.session_lock
async def unit_event(event: AvlEvent):
print("Handler got event:", event)
for i in range(5):
print("Waiting lock release", i)
print("Waiting lock release", i, "item:", event.data.i)
await asyncio.sleep(1)
# remove handler
wialon.remove_avl_event_handler(unit_event.__name__)


if __name__ == "__main__":
Expand Down

0 comments on commit a98d3f0

Please sign in to comment.