Skip to content

Commit

Permalink
0.9.0 Release.
Browse files Browse the repository at this point in the history
  • Loading branch information
Adminiuga authored Jul 23, 2019
2 parents 7df8062 + 82bb1d8 commit 21af65c
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 25 deletions.
4 changes: 2 additions & 2 deletions bellows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
MAJOR_VERSION = 0
MINOR_VERSION = 8
PATCH_VERSION = '2'
MINOR_VERSION = 9
PATCH_VERSION = '0'
__short_version__ = '{}.{}'.format(MAJOR_VERSION, MINOR_VERSION)
__version__ = '{}.{}'.format(__short_version__, PATCH_VERSION)
98 changes: 98 additions & 0 deletions bellows/thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import asyncio
import logging

import sys

import functools
from concurrent.futures import ThreadPoolExecutor

LOGGER = logging.getLogger(__name__)


class EventLoopThread:
''' Run a parallel event loop in a separate thread '''
def __init__(self):
self.loop = None
self.thread_complete = None

def run_coroutine_threadsafe(self, coroutine):
current_loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(coroutine, self.loop)
return asyncio.wrap_future(future, loop=current_loop)

def _thread_main(self, init_task):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

try:
self.loop.run_until_complete(init_task)
self.loop.run_forever()
finally:
self.loop.close()
self.loop = None

async def start(self):
current_loop = asyncio.get_event_loop()
if self.loop is not None and not self.loop.is_closed():
return

executor_opts = {'max_workers': 1}
if sys.version_info[:2] >= (3, 6):
executor_opts['thread_name_prefix'] = __name__
executor = ThreadPoolExecutor(**executor_opts)

thread_started_future = current_loop.create_future()

async def init_task():
current_loop.call_soon_threadsafe(thread_started_future.set_result, None)

# Use current loop so current loop has a reference to the long-running thread as one of its tasks
thread_complete = current_loop.run_in_executor(executor, self._thread_main, init_task())
self.thread_complete = thread_complete
current_loop.call_soon(executor.shutdown, False)
await thread_started_future
return thread_complete

def force_stop(self):
if self.loop is not None:
self.loop.call_soon_threadsafe(self.loop.stop)


class ThreadsafeProxy:
''' Proxy class which enforces threadsafe non-blocking calls
This class can be used to wrap an object to ensure any calls
using that object's methods are done on a particular event loop
'''
def __init__(self, obj, obj_loop):
self._obj = obj
self._obj_loop = obj_loop

def __getattr__(self, name):
func = getattr(self._obj, name)
if not callable(func):
raise TypeError("Can only use ThreadsafeProxy with callable attributes: {}.{}".format(
self._obj.__class__.__name__, name))

def func_wrapper(*args, **kwargs):
loop = self._obj_loop
curr_loop = asyncio.get_event_loop()
call = functools.partial(func, *args, **kwargs)
if loop == curr_loop:
return call()
if loop.is_closed():
# Disconnected
LOGGER.warning("Attempted to use a closed event loop")
return
if asyncio.iscoroutinefunction(func):
future = asyncio.run_coroutine_threadsafe(call(), loop)
return asyncio.wrap_future(future, loop=curr_loop)
else:
def check_result_wrapper():
result = call()
if result is not None:
raise TypeError("ThreadsafeProxy can only wrap functions with no return value \
\nUse an async method to return values: {}.{}".format(
self._obj.__class__.__name__, name))

loop.call_soon_threadsafe(check_result_wrapper)
return func_wrapper
40 changes: 29 additions & 11 deletions bellows/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import serial
import serial_asyncio

from bellows.thread import EventLoopThread, ThreadsafeProxy

import bellows.types as t

LOGGER = logging.getLogger(__name__)
Expand All @@ -27,7 +29,7 @@ class Gateway(asyncio.Protocol):
class Terminator:
pass

def __init__(self, application, connected_future=None):
def __init__(self, application, connected_future=None, connection_done_future=None):
self._send_seq = 0
self._rec_seq = 0
self._buffer = b''
Expand All @@ -36,6 +38,7 @@ def __init__(self, application, connected_future=None):
self._connected_future = connected_future
self._sendq = asyncio.Queue()
self._pending = (-1, None)
self._connection_done_future = connection_done_future

def connection_made(self, transport):
"""Callback when the uart is connected"""
Expand Down Expand Up @@ -173,20 +176,22 @@ def _reset_cleanup(self, future):

def connection_lost(self, exc):
"""Port was closed unexpectedly."""
if self._connection_done_future:
self._connection_done_future.set_result(exc)
if exc is None:
LOGGER.debug("Closed serial connection")
return

LOGGER.error("Lost serial connection: %s", exc)
self._application.connection_lost(exc)

def reset(self):
async def reset(self):
"""Send a reset frame and init internal state."""
LOGGER.debug("Resetting ASH")
if self._reset_future is not None:
LOGGER.error(("received new reset request while an existing "
"one is in progress"))
return self._reset_future
return await self._reset_future

self._send_seq = 0
self._rec_seq = 0
Expand All @@ -197,10 +202,10 @@ def reset(self):
self._pending[1].set_result(True)
self._pending = (-1, None)

self._reset_future = asyncio.Future()
self._reset_future = asyncio.get_event_loop().create_future()
self._reset_future.add_done_callback(self._reset_cleanup)
self.write(self._rst_frame())
return asyncio.wait_for(self._reset_future, timeout=RESET_TIMEOUT)
return await asyncio.wait_for(self._reset_future, timeout=RESET_TIMEOUT)

async def _send_task(self):
"""Send queue handler"""
Expand All @@ -212,7 +217,7 @@ async def _send_task(self):
success = False
rxmit = 0
while not success:
self._pending = (seq, asyncio.Future())
self._pending = (seq, asyncio.get_event_loop().create_future())
self.write(self._data_frame(data, seq, rxmit))
rxmit = 1
success = await self._pending[1]
Expand Down Expand Up @@ -305,12 +310,12 @@ def _unstuff(self, s):
return out


async def connect(port, baudrate, application, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
async def _connect(port, baudrate, application):
loop = asyncio.get_event_loop()

connection_future = asyncio.Future()
protocol = Gateway(application, connection_future)
connection_future = loop.create_future()
connection_done_future = loop.create_future()
protocol = Gateway(application, connection_future, connection_done_future)

transport, protocol = await serial_asyncio.create_serial_connection(
loop,
Expand All @@ -324,4 +329,17 @@ async def connect(port, baudrate, application, loop=None):

await connection_future

thread_safe_protocol = ThreadsafeProxy(protocol, loop)
return thread_safe_protocol, connection_done_future


async def connect(port, baudrate, application, use_thread=True):
if use_thread:
application = ThreadsafeProxy(application, asyncio.get_event_loop())
thread = EventLoopThread()
await thread.start()
protocol, connection_done = await thread.run_coroutine_threadsafe(_connect(port, baudrate, application))
connection_done.add_done_callback(lambda _: thread.force_stop())
else:
protocol, _ = await _connect(port, baudrate, application)
return protocol
2 changes: 1 addition & 1 deletion tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async def mocksend(method, nwk, aps_frame, seq, data):
return [returnvals.pop(0)]

def mock_get_device(*args, **kwargs):
dev = Device(app, mock.sentinel.ieee, mock.sentinel.nwk)
dev = Device(app, mock.sentinel.ieee, 0xaa55)
dev.node_desc = mock.MagicMock()
dev.node_desc.is_end_device = is_an_end_dev
return dev
Expand Down
Loading

0 comments on commit 21af65c

Please sign in to comment.