Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added context manager for Device ('with'-support) #108

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ List devices
Device class
------------

.. autoclass:: hid.device
.. autoclass:: hid.Device
:members:
:undoc-members:
56 changes: 27 additions & 29 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import hid

for device_dict in hid.enumerate():
keys = list(device_dict.keys())
keys.sort()
RobertoRoos marked this conversation as resolved.
Show resolved Hide resolved
for key in keys:
print("%s : %s" % (key, device_dict[key]))
print()
Expand All @@ -22,34 +21,33 @@ for device_dict in hid.enumerate():
try:
print("Opening the device")

h = hid.device()
h.open(0x534C, 0x0001) # TREZOR VendorID/ProductID

print("Manufacturer: %s" % h.get_manufacturer_string())
print("Product: %s" % h.get_product_string())
print("Serial No: %s" % h.get_serial_number_string())

# enable non-blocking mode
h.set_nonblocking(1)

# write some data to the device
print("Write the data")
h.write([0, 63, 35, 35] + [0] * 61)

# wait
time.sleep(0.05)

# read back the answer
print("Read the data")
while True:
d = h.read(64)
if d:
print(d)
else:
break

print("Closing the device")
h.close()
h = hid.Device()
with h.open(0x534C, 0x0001): # TREZOR VendorID/ProductID

print("Manufacturer: %s" % h.get_manufacturer_string())
print("Product: %s" % h.get_product_string())
print("Serial No: %s" % h.get_serial_number_string())

# enable non-blocking mode
h.set_nonblocking(1)

# write some data to the device
print("Write the data")
h.write([0, 63, 35, 35] + [0] * 61)

# wait
time.sleep(0.05)

# read back the answer
print("Read the data")
while True:
d = h.read(64)
if d:
print(d)
else:
break

print("Closing the device")

except IOError as ex:
print(ex)
Expand Down
97 changes: 75 additions & 22 deletions hid.pyx
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
import sys
import warnings
from chid cimport *
from libc.stddef cimport wchar_t, size_t
from cpython.unicode cimport PyUnicode_FromUnicode


cdef extern from "ctype.h":
int wcslen(wchar_t*)


cdef extern from "stdlib.h":
void free(void* ptr)
void* malloc(size_t size)


cdef extern from *:
object PyUnicode_FromWideChar(const wchar_t *w, Py_ssize_t size)
Py_ssize_t PyUnicode_AsWideChar(object unicode, wchar_t *w, Py_ssize_t size)


cdef object U(wchar_t *wcs):
if wcs == NULL:
return ''
cdef int n = wcslen(wcs)
return PyUnicode_FromWideChar(wcs, n)


def enumerate(int vendor_id=0, int product_id=0):
"""Return a list of discovered HID devices.

Expand Down Expand Up @@ -65,10 +71,22 @@ def enumerate(int vendor_id=0, int product_id=0):
hid_free_enumeration(info)
return res

cdef class device:

cdef class Device:
"""Device class.

A device instance can be used to read from and write to a HID device.

Use it like:

.. code-block:: text

h = hid.device()
with h.open(0x0000, 0x0000):
h.write(...)

The context manager will take care of closing the device again, regardless of
exceptions.
"""
cdef hid_device *_c_hid

Expand All @@ -82,6 +100,8 @@ cdef class device:
:param serial_number:
:type serial_number: unicode, optional
:raises IOError:
:raises MemoryError:
:raises ValueError:
"""
cdef wchar_t * cserial_number = NULL
cdef int serial_len
Expand All @@ -94,14 +114,16 @@ cdef class device:
raise MemoryError()
result = PyUnicode_AsWideChar(serial_number, cserial_number, serial_len)
if result == -1:
raise ValueError("invalid serial number string")
raise ValueError("Invalid serial number string")
cserial_number[serial_len] = 0 # Must explicitly null-terminate
self._c_hid = hid_open(vendor_id, product_id, cserial_number)
finally:
if cserial_number != NULL:
free(cserial_number)
if self._c_hid == NULL:
raise IOError('open failed')
raise IOError("Failed to open device")

return self

def open_path(self, bytes path):
"""Open connection by path.
Expand All @@ -113,7 +135,9 @@ cdef class device:
cdef char* cbuff = path
self._c_hid = hid_open_path(cbuff)
if self._c_hid == NULL:
raise IOError('open failed')
raise IOError("Failed to open device")
RobertoRoos marked this conversation as resolved.
Show resolved Hide resolved

return self

def close(self):
"""Close connection.
Expand All @@ -124,6 +148,28 @@ cdef class device:
hid_close(self._c_hid)
self._c_hid = NULL

def __enter__(self):
"""Opening statement for `with ...` clause.

Either `open...()` method must be called in the same line!

The `with ...` context manager will make sure the object is opened and closed
(regardless of error status).

Specify either `path` or the other values. See :func:`~hid.device.open` and
:func:`~hid.open_path` for usages.
"""
if self._c_hid == NULL:
raise IOError('Device not open, make sure to call it like: '
RobertoRoos marked this conversation as resolved.
Show resolved Hide resolved
'`with device.open(): ...`')

def __exit__(self, exc_type, exc_val, exc_tb):
"""Closing statement for `with ...` clause.

This makes a try-finally clause in the user code unnecessary
"""
self.close()

def write(self, buff):
"""Accept a list of integers (0-255) and send them to the device.

Expand All @@ -133,7 +179,7 @@ cdef class device:
:rtype: int
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
# convert to bytes
if sys.version_info < (3, 0):
buff = ''.join(map(chr, buff))
Expand All @@ -156,7 +202,7 @@ cdef class device:
:rtype: int
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
return hid_set_nonblocking(self._c_hid, v)

def read(self, int max_length, int timeout_ms=0):
Expand All @@ -170,7 +216,7 @@ cdef class device:
:rtype: List[int]
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef unsigned char lbuff[16]
cdef unsigned char* cbuff
cdef size_t c_max_length = max_length
Expand All @@ -188,7 +234,7 @@ cdef class device:
with nogil:
n = hid_read(c_hid, cbuff, c_max_length)
if n is -1:
raise IOError('read error')
raise IOError("Read error")
res = []
for i in range(n):
res.append(cbuff[i])
Expand All @@ -206,11 +252,11 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef wchar_t buff[255]
cdef int r = hid_get_manufacturer_string(self._c_hid, buff, 255)
if r < 0:
raise IOError('get manufacturer string error')
raise IOError("Read error while getting manufacturer string")
return U(buff)


Expand All @@ -223,11 +269,11 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef wchar_t buff[255]
cdef int r = hid_get_product_string(self._c_hid, buff, 255)
if r < 0:
raise IOError('get product string error')
raise IOError("Read error while getting product string")
return U(buff)

def get_serial_number_string(self):
Expand All @@ -239,11 +285,11 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef wchar_t buff[255]
cdef int r = hid_get_serial_number_string(self._c_hid, buff, 255)
if r < 0:
raise IOError('get serial number string error')
raise IOError("Read error while getting serial number string")
return U(buff)

def get_indexed_string(self, index):
Expand All @@ -255,12 +301,12 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef wchar_t buff[255]
cdef unsigned char c_index = index
cdef int r = hid_get_indexed_string(self._c_hid, c_index, buff, 255)
if r < 0:
raise IOError('get indexed string error')
raise IOError("Read error while getting indexed string")
return U(buff)

def send_feature_report(self, buff):
Expand All @@ -272,7 +318,7 @@ cdef class device:
:rtype: int
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
# convert to bytes
if sys.version_info < (3, 0):
buff = ''.join(map(chr, buff))
Expand All @@ -299,7 +345,7 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef hid_device * c_hid = self._c_hid
cdef unsigned char lbuff[16]
cdef unsigned char* cbuff
Expand All @@ -315,7 +361,7 @@ cdef class device:
n = hid_get_feature_report(c_hid, cbuff, c_max_length)
res = []
if n < 0:
raise IOError('read error')
raise IOError("Read error")
for i in range(n):
res.append(cbuff[i])
finally:
Expand All @@ -336,7 +382,7 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
cdef hid_device * c_hid = self._c_hid
cdef unsigned char lbuff[16]
cdef unsigned char* cbuff
Expand All @@ -352,7 +398,7 @@ cdef class device:
n = hid_get_input_report(c_hid, cbuff, c_max_length)
res = []
if n < 0:
raise IOError('read error')
raise IOError("Read error")
for i in range(n):
res.append(cbuff[i])
finally:
Expand All @@ -369,5 +415,12 @@ cdef class device:
:raises IOError:
"""
if self._c_hid == NULL:
raise ValueError('not open')
raise ValueError("Device is not open")
return U(<wchar_t*>hid_error(self._c_hid))


cdef class device(Device):
"""Old alias for the `Device` class, use the new version instead!"""

def __cinit__(self):
warnings.warn("Use the `Device` class instead", DeprecationWarning)
RobertoRoos marked this conversation as resolved.
Show resolved Hide resolved
Loading