Skip to content

Commit

Permalink
Merge pull request #14 from alexrudd2/txt
Browse files Browse the repository at this point in the history
Add TXT datatype
  • Loading branch information
alexrudd2 authored Jul 2, 2024
2 parents 0c8e92d + 0c9b500 commit a70e300
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
77 changes: 75 additions & 2 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pydoc
from collections import defaultdict
from string import digits
from typing import Any, ClassVar
from typing import Any, ClassVar, overload

from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
Expand All @@ -39,6 +39,7 @@ class ClickPLC(AsyncioModbusClient):
'td': 'int16', # (T)imer register
'ctd': 'int32', # (C)oun(t)er Current values, (d)ouble
'sd': 'int16', # (S)ystem (D)ata register, single
'txt': 'str', # ASCII Text
}

def __init__(self, address, tag_filepath='', timeout=1):
Expand Down Expand Up @@ -134,7 +135,6 @@ async def set(self, address: str, data):
"""
if address in self.tags:
address = self.tags[address]['id']

if not isinstance(data, list):
data = [data]

Expand Down Expand Up @@ -472,6 +472,46 @@ async def _get_sd(self, start: int, end: int) -> dict | int:
return decoder.decode_16bit_int()
return {f'sd{n}': decoder.decode_16bit_int() for n in range(start, end + 1)}

@overload
async def _get_txt(self, start: int, end: None) -> str: ...
@overload
async def _get_txt(self, start: int, end: int) -> dict[str, str]: ...
async def _get_txt(self, start, end):
"""Read txt registers. Called by `get`.
TXT entries start at Modbus address 36864 (36865 in the Click software's
1-indexed notation). Each TXT entry takes 8 bits - which means a pair of
adjacent TXT entries are packed into a single 16-bit register. For some
strange reason they are packed little-endian, so the registers must be
manually decoded.
"""
if start < 1 or start > 1000:
raise ValueError('TXT must be in [1, 1000]')
if end is not None and (end < 1 or end > 1000):
raise ValueError('TXT end must be in [1, 1000]')

address = 36864 + (start - 1) // 2
if end is None:
registers = await self.read_registers(address, 1)
decoder = BinaryPayloadDecoder.fromRegisters(registers)
if start % 2: # if starting on the second byte of a 16-bit register, discard the MSB
decoder.decode_string()
return decoder.decode_string().decode()

count = 1 + (end - start) // 2 + (start - 1) % 2
registers = await self.read_registers(address, count)
decoder = BinaryPayloadDecoder.fromRegisters(registers) # endian irrelevant; manual decode
r = ''
for _ in range(count):
msb = chr(decoder.decode_8bit_int())
lsb = chr(decoder.decode_8bit_int())
r += lsb + msb
if end % 2: # if ending on the first byte of a 16-bit register, discard the final LSB
r = r[:-1]
if not start % 2:
r = r[1:] # if starting on the last byte of a 16-bit register, discard the first MSB
return {f'txt{start}-txt{end}': r}

async def _set_y(self, start: int, data: list[bool] | bool):
"""Set Y addresses. Called by `set`.
Expand Down Expand Up @@ -631,6 +671,39 @@ def _pack(values: list[int]):
else:
await self.write_register(address, _pack([data]), skip_encode=True)

async def _set_txt(self, start: int, data: str | list[str]):
"""Set TXT registers. Called by `set`.
See _get_txt for more information.
"""
if start < 1 or start > 1000:
raise ValueError('TXT must be in [1, 1000]')
address = 36864 + (start - 1) // 2

def _pack(values: str):
assert len(values) % 2 == 0
builder = BinaryPayloadBuilder() # endianness irrelevant; manual packing
for i in range(0, len(values), 2):
builder.add_8bit_uint(ord(values[i + 1]))
builder.add_8bit_uint(ord(values[i + 0]))
return builder.build()

assert isinstance(data, list)
if len(data) > 1000 - start + 1:
raise ValueError('Data list longer than available addresses.')
string = data[0]

# two 8-bit text addresses are packed into one 16-bit modbus register
# and we can't mask a modbus write (i.e. all 16 bits must be written)
# thus, if changing a single address retrieve and write its 'twin' address back
if len(string) % 2:
if start % 2:
string += await self._get_txt(start=start + 1, end=None)
else:
string = await self._get_txt(start=start - 1, end=None) + string
payload = _pack(string)
await self.write_registers(address, payload, skip_encode=True)

def _load_tags(self, tag_filepath: str) -> dict:
"""Load tags from file path.
Expand Down
17 changes: 17 additions & 0 deletions clickplc/tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,23 @@ async def test_dd_roundtrip(plc_driver):
await plc_driver.set('dd1000', 1000)
assert await plc_driver.get('dd1000') == 1000

@pytest.mark.asyncio(scope='session')
async def test_txt_roundtrip(plc_driver):
"""Confirm texts are read back correctly after being set."""
await plc_driver.set('txt1', 'AB')
await plc_driver.set('txt3', 'CDEF')
await plc_driver.set('txt7', 'G')
expected = {'txt1-txt7': 'ABCDEFG'}
assert expected == await plc_driver.get('txt1-txt7')
expected = {'txt2-txt7': 'BCDEFG'}
assert expected == await plc_driver.get('txt2-txt7')

await plc_driver.set('txt1000', '0')
assert await plc_driver.get('txt1000') == '0'
await plc_driver.set('txt999', '9')
assert await plc_driver.get('txt999') == '9'
assert await plc_driver.get('txt1000') == '0' # ensure txt999 did not clobber it

@pytest.mark.asyncio(scope='session')
async def test_get_error_handling(plc_driver):
"""Confirm the driver gives an error on invalid get() calls."""
Expand Down

0 comments on commit a70e300

Please sign in to comment.