Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the ProactorEventLoop APIs on Windows #91

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 66 additions & 17 deletions serial_asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
"""\
Support asyncio with serial ports.

Posix platforms only, Python 3.5+ only.

Windows event loops can not wait for serial ports with the current
implementation. It should be possible to get that working though.
Python 3.5+ only.
"""
import asyncio
import os
Expand All @@ -30,6 +27,21 @@
__version__ = '0.6'


class SerialSocketAdaptor:
"""Wraps a Windows serial port file handle for use by the socket API.

The ProactorEventLoop sock_recv() and sock_sendall() methods, when called
with something that isn't actually a socket, will dispatch to the ReadFile
and WriteFile APIs we want to use. We just need to pass a file-like object,
which in this case means something implementing a fileno() method.
"""
def __init__(self, handle):
self.handle = handle

def fileno(self):
return self.handle


class SerialTransport(asyncio.Transport):
"""An asyncio transport model of a serial communication channel.

Expand Down Expand Up @@ -61,6 +73,10 @@ def __init__(self, loop, protocol, serial_instance):
self._poll_wait_time = 0.0005
self._max_out_waiting = 1024

# TODO: Ask pyserial to implement fileno() on win32 Serial objects
if os.name == "nt":
self._serial_handle = SerialSocketAdaptor(self._serial._port_handle)

# XXX how to support url handlers too

# Asynchronous I/O requires non-blocking devices
Expand Down Expand Up @@ -285,33 +301,66 @@ def _write_ready(self):
assert self._has_writer

if os.name == "nt":
def _poll_read(self):
if self._has_reader and not self._closing:
async def _await_read(self):
while not self._closing:
try:
self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read)
if self.serial.in_waiting:
self._read_ready()
self.serial.timeout = None
first = await self._loop.sock_recv(self._serial_handle, 1)
if not first:
continue
self.serial.timeout = 0
rest = await self._loop.sock_recv(self._serial_handle,
self._max_read_size - 1)
self._protocol.data_received(first + rest)
except serial.SerialException as exc:
self._fatal_error(exc, 'Fatal write error on serial transport')
self._fatal_error(exc, 'Fatal read error on serial transport')

def _cleanup_reader(self, reader):
self._has_reader = False
if not reader.cancelled():
try:
reader.result()
except Exception:
raise

def _ensure_reader(self):
if not self._has_reader and not self._closing:
self._has_reader = self._loop.call_later(self._poll_wait_time, self._poll_read)
self._has_reader = self._loop.create_task(self._await_read())
self._has_reader.add_done_callback(self._cleanup_reader)

def _remove_reader(self):
if self._has_reader:
self._has_reader.cancel()
self._has_reader = False

def _poll_write(self):
if self._has_writer and not self._closing:
self._has_writer = self._loop.call_later(self._poll_wait_time, self._poll_write)
if self.serial.out_waiting < self._max_out_waiting:
self._write_ready()
async def _await_write(self):
while self._write_buffer and not self._closing:
data = b"".join(self._write_buffer)
self._write_buffer.clear()
try:
await self._loop.sock_sendall(self._serial_handle, data)
except BlockingIOError:
self._write_buffer.append(data)
except asyncio.CancelledError:
self._write_buffer.append(data)
raise
else:
self._maybe_resume_protocol()
if self._closing and self._flushed():
self._close()

def _cleanup_writer(self, writer):
self._has_writer = False
if not writer.cancelled():
try:
writer.result()
except Exception:
raise

def _ensure_writer(self):
if not self._has_writer and not self._closing:
self._has_writer = self._loop.call_soon(self._poll_write)
self._has_writer = self._loop.create_task(self._await_write())
self._has_writer.add_done_callback(self._cleanup_writer)

def _remove_writer(self):
if self._has_writer:
Expand Down
2 changes: 0 additions & 2 deletions test/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

"""

import os
import unittest
import asyncio

Expand All @@ -28,7 +27,6 @@
PORT = 'socket://%s:%s' % (HOST, _PORT)


@unittest.skipIf(os.name != 'posix', "asyncio not supported on platform")
class Test_asyncio(unittest.TestCase):
"""Test asyncio related functionality"""

Expand Down