Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
alexrudd2 committed Dec 1, 2024
1 parent 7847b6e commit 942c523
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
76 changes: 76 additions & 0 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ class ClickPLC(AsyncioModbusClient):
'c': 'bool', # (C)ontrol relay
't': 'bool', # (T)imer
'ct': 'bool', # (C)oun(t)er
'sc': 'bool', # (S)ystem (C)ontrol Bit
'ds': 'int16', # (D)ata register (s)ingle
'dd': 'int32', # (D)ata register, (d)ouble
'dh': 'int16', # (D)ata register, (h)ex
'df': 'float', # (D)ata register (f)loating point
'xd': None, # Input register
'yd': None, # Output register
'td': 'int16', # (T)imer register
'ctd': 'int32', # (C)oun(t)er Current values, (d)ouble
'sd': 'int16', # (S)ystem (D)ata register, single
Expand Down Expand Up @@ -326,6 +329,39 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool:
coils = await self.read_coils(start_coil, count)
return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_sc(self, start: int, end: int | None) -> dict | bool:
"""Read SC addresses.
SC entries start at 61441 (61442 in the Click software's 1-indexed
notation). This continues* for 1000 bits, ending at 62439.
Note that some SC entries are read-only (Discrete Input), and others are
read-write (Coils). These are unfortunately mixed together. For example:
SC49 - di - 61489 - function code 02
SC50 - coil - 61490 - function code 01
SC51 - coil - 61490 - function code 01
The response always returns a full byte of data. If you request
a number of addresses not divisible by 8, it will have extra data. The
extra data here is discarded before returning.
"""
if start < 1 or start > 1000:
raise ValueError('SC start address must be 1-1000.')

start_coil = 61441 + start - 1
if end is None:
count = 1
else:
if end <= start or end > 1000:
raise ValueError('SC end address must be >start and <=1000.')
end_coil = 62439 + end - 1
count = end_coil - start_coil + 1

coils = await self.read_coils(start_coil, count)
if count == 1:
return coils.bits[0]
return {f'sc{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count}

async def _get_ds(self, start: int, end: int | None) -> dict | int:
"""Read DS registers. Called by `get`.
Expand Down Expand Up @@ -559,6 +595,46 @@ async def _set_c(self, start: int, data: list[bool] | bool):
else:
await self.write_coil(coil, data)

async def _set_sc(self, start: int, data: list[bool] | bool):
"""Set SC addresses. Called by `set`.
For more information on the quirks of SC coils, read the `_get_sc`
docstring.
"""
raise NotImplementedError
if start < 1 or start > 1000:
raise ValueError('SC start address must be 1-1000.')
writeable = ( # only these are writeable
50, # _PLC_Mode_Change_to_STOP
51, # _Watchdog_Timer_Reset
53, # _RTC_Date_Change
55, # _RTC_Time_Change
60, # _BT_Disable_Pairing (Plus only?)
61, # _BT_Activate_Pairing (Plus only?)
65, # _SD_Eject
66, # _SD_Delete_All
67, # _SD_Copy_System
75, # _WLAN_Reset (Plus only?)
76, # _Sub_CPU_Reset,
120, # _Network_Time_Request
121, # _Network_Time_DST
)
coil = 1441 + start - 1
raise ValueError(coil)

if isinstance(data, list):
if len(data) > (1000 - start + 1):
raise ValueError('Data list longer than available addresses.')
for offset, value in enumerate(data):
if start + offset not in writeable:
raise ValueError(f'sd{start + offset} is not writeable')
await self.write_coil(coil, value)
else:
if start not in writeable:
raise ValueError(f'sd{start} is not writeable')
await self.write_coil(coil, data)


async def _set_df(self, start: int, data: list[float] | float):
"""Set DF registers. Called by `set`.
Expand Down
8 changes: 8 additions & 0 deletions clickplc/tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,14 @@ async def test_ct_error_handling(plc_driver):
with pytest.raises(ValueError, match=r'CT end address must be >start and <=250.'):
await plc_driver.get('ct1-ct251')

@pytest.mark.asyncio(loop_scope='session')
async def test_sc_error_handling(plc_driver):
"""Ensure errors are handled for invalid requests of sc registers."""
with pytest.raises(ValueError, match=r'SC start address must be 1-1000.'):
await plc_driver.get('sc1001')
with pytest.raises(ValueError, match=r'SC end address must be >start and <=1000.'):
await plc_driver.get('sc1-sc1001')

@pytest.mark.asyncio(loop_scope='session')
async def test_dh_error_handling(plc_driver):
"""Ensure errors are handled for invalid requests of df registers."""
Expand Down

0 comments on commit 942c523

Please sign in to comment.