This repository has been archived by the owner on Jun 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
repeater.py
175 lines (135 loc) · 5.26 KB
/
repeater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
Abstract base class for a service that keeps repeating the same action.
This is useful for implementing broadcasting or polling services.
Example use:
import asyncio
from datetime import datetime
from brewblox_service.repeater import RepeaterFeature, features, scheduler
class Greeter(RepeaterFeature):
async def prepare(self):
print('Hello, I am starting now')
async def run(self):
await asyncio.sleep(5)
print(datetime.now())
def setup(app):
scheduler.setup(app)
features.add(app, Greeter(app))
"""
import asyncio
from abc import abstractmethod
from typing import Optional
from aiohttp import web
from brewblox_service import (brewblox_logger, features, models, scheduler,
strex)
LOGGER = brewblox_logger(__name__, dedupe=True)
class RepeaterCancelled(Exception):
"""
This can be raised during either setup() or run() to permanently cancel execution.
"""
class RepeaterFeature(features.ServiceFeature):
"""Base class for Aiohttp handler classes that implement a background task.
RepeaterFeature wraps the `prepare()` and `run()` functions in an `asyncio.Task`,
and handles the boilerplate code for creation and cleanup.
`prepare()` is called once after the background task is started.
Afterwards, `run()` is called in a loop until the task is stopped.
The background task is stopped when either:
- The service stops, and `shutdown()` is called.
- `end()` is called manually.
- `prepare()` raises any exception.
- `prepare()` or `run()` raise a `RepeaterCancelled` exception.
The `startup()`, `before_shutdown()`, and `shutdown()` functions
are inherited from `ServiceFeature`.
During typical app lifetime, functions are called in this order:
- `startup()`
- `prepare()`
- `run()` [repeated until shutdown]
- `before_shutdown()`
- `shutdown()`
"""
def __init__(self, app: web.Application, autostart=True, **kwargs):
super().__init__(app, **kwargs)
config: models.BaseServiceConfig = app['config']
self._autostart: bool = autostart
self._task: Optional[asyncio.Task] = None
self._debug: bool = config.debug
async def _startup(self, app: web.Application):
"""
Overrides the private ServiceFeature startup hook.
This avoids a gotcha where subclasses have to call `super().startup(app)`
for RepeaterFeature, but not for ServiceFeature.
"""
await super()._startup(app)
if self._autostart:
await self.start()
async def _shutdown(self, app: web.Application):
"""
Overrides the private ServiceFeature shutdown hook.
This avoids a gotcha where subclasses have to call `super().shutdown(app)`
for RepeaterFeature, but not for ServiceFeature.
"""
await self.end()
await super()._shutdown(app)
async def __repeat(self):
last_ok = True
try:
LOGGER.debug(f'--> prepare {self}')
await self.prepare()
LOGGER.debug(f'<-- prepare {self}')
except asyncio.CancelledError:
raise
except RepeaterCancelled:
LOGGER.info(f'{self} cancelled during prepare().')
return
except Exception as ex:
LOGGER.error(f'{self} error during prepare(): {strex(ex)}')
raise ex
while True:
try:
await self.run()
if not last_ok:
LOGGER.info(f'{self} resumed OK')
last_ok = True
except asyncio.CancelledError:
raise
except RepeaterCancelled:
LOGGER.info(f'{self} cancelled during run().')
return
except Exception as ex:
# Duplicate log messages are automatically filtered
LOGGER.error(f'{self} error during run(): {strex(ex, tb=self._debug)}')
last_ok = False
@property
def active(self) -> bool:
"""
Indicates whether the background task is currently running: not finished, not cancelled.
"""
return bool(self._task and not self._task.done())
async def start(self):
"""
Initializes the background task.
By default called during startup, but implementations can disable this by using
`autostart=False` in the constructor.
Will cancel the previous task if called repeatedly.
"""
await self.end()
self._task = await scheduler.create(self.app, self.__repeat())
async def end(self):
"""
Ends the background task.
Always called during shutdown, but can be safely called earlier.
"""
await scheduler.cancel(self.app, self._task)
self._task = None
async def prepare(self):
"""
One-time preparation.
Any errors raised here will cause the repeater to abort.
Raise RepeaterCancelled to abort without error logs.
"""
@abstractmethod
async def run(self):
"""
This function will be called on repeat.
It is advisable to implement rate limiting through the use of
`await asyncio.sleep(interval)`
"""