Skip to content

Commit

Permalink
Rework transport to leave re-connect to user (#150)
Browse files Browse the repository at this point in the history
* Rework transport to leave re-connect to user

* Make compatible with older python

* Don't signal connection loss when asked to close

* Avoid blocking teardown

* Split connect from constructions

* Allow setting a connection timeout

* Raise timeout exception on timeout

* Convert more errors

* Correct linting

* Adjust some linting issues

* More flake fixes

* More lint fixes

* Inject constructed transport

* Change init order

* Use decorators to hide low level exceptions

* Make compatible with legacy python

* Fix lint

* Suppress errors on close
  • Loading branch information
elupus authored Feb 25, 2024
1 parent d5122c3 commit 17c8feb
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 90 deletions.
228 changes: 149 additions & 79 deletions RFXtrx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
# pylint: disable=R0903, invalid-name
# pylint: disable= too-many-lines

import functools
import glob
import socket
import threading
import time
import logging
from contextlib import suppress

from time import sleep

Expand Down Expand Up @@ -674,6 +675,21 @@ def __str__(self):
return "{0} device=[{1}]".format(
type(self), self.device)


class ConnectionEvent(RFXtrxEvent):
""" Connection event """
def __init__(self):
super().__init__(None)


class ConnectionLost(ConnectionEvent):
""" Connection lost """


class ConnectionDone(ConnectionEvent):
""" Connection lost """


###############################################################################
# DummySerial class
###############################################################################
Expand Down Expand Up @@ -730,10 +746,19 @@ def close(self):
self._close_event.set()


###############################################################################
# RFXtrxTransportError class
###############################################################################


class RFXtrxTransportError(Exception):
""" Connection error """

###############################################################################
# RFXtrxTransport class
###############################################################################


class RFXtrxTransport:
""" Abstract superclass for all transport mechanisms """

Expand All @@ -757,12 +782,40 @@ def parse(data):
return obj
return None

def connect(self, timeout=None):
""" connect to device """

def reset(self):
""" reset the rfxtrx device """

def close(self):
""" close connection to rfxtrx device """

def receive_blocking(self):
""" Wait until a packet is received and return with an RFXtrxEvent """

def send(self, data):
""" Send the given packet """


def transport_errors(message):
""" Decorator to wrap low level errors in known error. """
def _errors(func):
@functools.wraps(func)
def __errors(instance: RFXtrxTransport, *args, **kargs):
try:
return func(instance, *args, **kargs)
except (socket.error,
serial.SerialException,
OSError) as exception:
_LOGGER.debug("%s failed: %s", message,
str(exception), exc_info=True)
raise RFXtrxTransportError(
"{0} failed: {1}".format(message, exception)
) from exception
return __errors
return _errors

###############################################################################
# PySerialTransport class
###############################################################################
Expand All @@ -774,45 +827,39 @@ class PySerialTransport(RFXtrxTransport):
def __init__(self, port):
self.port = port
self.serial = None
self._run_event = threading.Event()
self._run_event.set()
self.connect()

def connect(self):
@transport_errors("connect")
def connect(self, timeout=None):
""" Open a serial connexion """
try:
self.serial = serial.Serial(self.port, 38400, timeout=0.1)
except serial.serialutil.SerialException:
self.serial = serial.Serial(self.port, 38400)
except serial.SerialException:
port = glob.glob('/dev/serial/by-id/usb-RFXCOM_*-port0')
if len(port) < 1:
return
self.serial = serial.Serial(port[0], 38400, timeout=0.1)
raise
_LOGGER.debug("Attempting connection by name %s", port)
self.serial = serial.Serial(port[0], 38400)

@transport_errors("receive")
def receive_blocking(self):
return self._receive_packet()

def _receive_packet(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = None
while self._run_event.is_set():
try:
data = self.serial.read()
except TypeError:
continue
except serial.serialutil.SerialException:
try:
self.connect()
except serial.serialutil.SerialException:
time.sleep(5)
continue
if not data or data == '\x00':
continue
pkt = bytearray(data)
data = self.serial.read(pkt[0])
data = self.serial.read()
if data == '\x00':
return None
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.serial.read(pkt[0]+1 - len(pkt))
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)

@transport_errors("send")
def send(self, data):
""" Send the given packet """
if isinstance(data, bytearray):
Expand All @@ -827,17 +874,19 @@ def send(self, data):
)
self.serial.write(pkt)

@transport_errors("reset")
def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
self.send(b'\x0D\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
sleep(0.3) # Should work with 0.05, but not for me
self.serial.flushInput()

@transport_errors("close")
def close(self):
""" close connection to rfxtrx device """
self._run_event.clear()
self.serial.close()

with suppress(serial.SerialException):
self.serial.close()

###############################################################################
# PyNetworkTransport class
Expand All @@ -850,44 +899,40 @@ class PyNetworkTransport(RFXtrxTransport):
def __init__(self, hostport):
self.hostport = hostport # must be a (host, port) tuple
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._run_event = threading.Event()
self._run_event.set()
self.connect()

def connect(self):
@transport_errors("connect")
def connect(self, timeout=None):
""" Open a socket connection """
try:
self.sock.connect(self.hostport)
_LOGGER.info("Connected to network socket")
except socket.error:
_LOGGER.error('Failed to create socket, check host port config')
# This may throw exception for use by caller:
self.sock.connect(self.hostport)
self.sock.settimeout(timeout)
self.sock.connect(self.hostport)
self.sock.settimeout(None)
_LOGGER.debug("Connected to network socket")

@transport_errors("receive")
def receive_blocking(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = None
while self._run_event.is_set():
try:
data = self.sock.recv(1)
except socket.error:
try:
self.connect()
except socket.error:
time.sleep(5)
continue
if not data or data == '\x00':
continue
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.sock.recv(pkt[0]+1 - len(pkt))
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)
return self._receive_packet()

def _receive_packet(self):
""" Wait until a packet is received and return with an RFXtrxEvent """
data = self.sock.recv(1)
if data == b'':
raise RFXtrxTransportError("Server was shutdown")
if data == '\x00':
return None
pkt = bytearray(data)
while len(pkt) < pkt[0]+1:
data = self.sock.recv(pkt[0]+1 - len(pkt))
if data == b'':
raise RFXtrxTransportError("Server was shutdown")
pkt.extend(bytearray(data))
_LOGGER.debug(
"Recv: %s",
" ".join("0x{0:02x}".format(x) for x in pkt)
)
return self.parse(pkt)

@transport_errors("send")
def send(self, data):
""" Send the given packet """
if isinstance(data, bytearray):
Expand All @@ -902,17 +947,23 @@ def send(self, data):
)
self.sock.send(pkt)

@transport_errors("reset")
def reset(self):
""" Reset the RFXtrx """
self.send(b'\x0D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
sleep(0.3)
self.sock.sendall(b'')

try:
self.send(b'\x0D\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00')
sleep(0.3)
self.sock.sendall(b'')
except socket.error as exception:
raise RFXtrxTransportError(
"Reset failed: {0}".format(exception)) from exception

@transport_errors("close")
def close(self):
""" close connection to rfxtrx device """
self._run_event.clear()
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
with suppress(socket.error):
self.sock.close()


class DummyTransport(RFXtrxTransport):
Expand All @@ -922,6 +973,9 @@ def __init__(self, device=""):
self.device = device
self._close_event = threading.Event()

def connect(self, timeout=None):
pass

def receive(self, data=None):
""" Emulate a receive by parsing the given data """
if data is None:
Expand Down Expand Up @@ -958,6 +1012,8 @@ class DummyTransport2(PySerialTransport):
def __init__(self, device=""):
self.serial = _dummySerial(device, 38400, timeout=0.1)
self._run_event = threading.Event()

def connect(self, timeout=None):
self._run_event.set()


Expand All @@ -966,22 +1022,34 @@ class Connect:
Has methods for sensors.
"""
# pylint: disable=too-many-instance-attributes, too-many-arguments
def __init__(self, device, event_callback=None,
transport_protocol=PySerialTransport,
def __init__(self, transport, event_callback=None,
modes=None):
self._run_event = threading.Event()
self._sensors = {}
self._status = None
self._modes = modes
self._thread = threading.Thread(target=self._connect, daemon=True)
self.event_callback = event_callback
self.transport: RFXtrxTransport = transport

self.transport = transport_protocol(device)
self._thread = threading.Thread(target=self._connect)
self._thread.daemon = True
def connect(self, timeout=None):
"""Connect to device."""
self.transport.connect(timeout)
self._thread.start()
self._run_event.wait()
if not self._run_event.wait(timeout):
self.close_connection()
raise TimeoutError()

def _connect(self):
try:
self._connect_internal()
except RFXtrxTransportError as exception:
_LOGGER.info("Connection lost %s", exception)
finally:
if self.event_callback and self._run_event.is_set():
self.event_callback(ConnectionLost())

def _connect_internal(self):
"""Connect """
self.transport.reset()
self._status = self.send_get_status()
Expand All @@ -998,6 +1066,8 @@ def _connect(self):
self.send_start()

self._run_event.set()
if self.event_callback:
self.event_callback(ConnectionDone())

while self._run_event.is_set():
event = self.transport.receive_blocking()
Expand Down
4 changes: 2 additions & 2 deletions RFXtrx/lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2278,7 +2278,7 @@ def load_receive(self, data):
(data[10] << 8) + data[11])
self.prodwatthours = ((data[12] * pow(2, 24)) + (data[13] << 16) +
(data[14] << 8) + data[15])
self.tarif_num = (data[16] & 0x0f)
self.tarif_num = data[16] & 0x0f
self.voltage = data[17] + 200
self.currentwatt = (data[18] << 8) + data[19]
self.state_byte = data[20]
Expand Down Expand Up @@ -2378,7 +2378,7 @@ def set_transmit(self, subtype, seqnbr, id1, id2, sound):
self.id2 = id2
self.sound = sound
self.rssi = 0
self.rssi_byte = (self.rssi << 4)
self.rssi_byte = self.rssi << 4
self.data = bytearray([self.packetlength, self.packettype,
self.subtype, self.seqnbr,
self.id1, self.id2, self.sound,
Expand Down
3 changes: 2 additions & 1 deletion examples/receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def main():

modes_list = sys.argv[2].split() if len(sys.argv) > 2 else None
print ("modes: ", modes_list)
core = RFXtrx.Core(rfxcom_device, print_callback, modes=modes_list)
core = RFXtrx.Connect(RFXtrx.PySerialTransport(rfxcom_device), print_callback, modes=modes_list)
core.connect()

print (core)
while True:
Expand Down
1 change: 1 addition & 0 deletions examples/send.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from time import sleep

transport = PySerialTransport('/dev/cu.usbserial-05VN8GHS')
transport.connect()
transport.reset()

while True:
Expand Down
Loading

0 comments on commit 17c8feb

Please sign in to comment.