From 942c5234beca19c82c05651db37f964e2ba8ceb5 Mon Sep 17 00:00:00 2001 From: Alex Ruddick Date: Mon, 12 Aug 2024 16:37:16 -0500 Subject: [PATCH] WIP --- clickplc/driver.py | 76 +++++++++++++++++++++++++++++++++++ clickplc/tests/test_driver.py | 8 ++++ 2 files changed, 84 insertions(+) diff --git a/clickplc/driver.py b/clickplc/driver.py index 6553923..2817c5e 100644 --- a/clickplc/driver.py +++ b/clickplc/driver.py @@ -33,10 +33,13 @@ class ClickPLC(AsyncioModbusClient): 'c': 'bool', # (C)ontrol relay 't': 'bool', # (T)imer 'ct': 'bool', # (C)oun(t)er + 'sc': 'bool', # (S)ystem (C)ontrol Bit 'ds': 'int16', # (D)ata register (s)ingle 'dd': 'int32', # (D)ata register, (d)ouble 'dh': 'int16', # (D)ata register, (h)ex 'df': 'float', # (D)ata register (f)loating point + 'xd': None, # Input register + 'yd': None, # Output register 'td': 'int16', # (T)imer register 'ctd': 'int32', # (C)oun(t)er Current values, (d)ouble 'sd': 'int16', # (S)ystem (D)ata register, single @@ -326,6 +329,39 @@ async def _get_ct(self, start: int, end: int | None) -> dict | bool: coils = await self.read_coils(start_coil, count) return {f'ct{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} + async def _get_sc(self, start: int, end: int | None) -> dict | bool: + """Read SC addresses. + + SC entries start at 61441 (61442 in the Click software's 1-indexed + notation). This continues* for 1000 bits, ending at 62439. + + Note that some SC entries are read-only (Discrete Input), and others are + read-write (Coils). These are unfortunately mixed together. For example: + SC49 - di - 61489 - function code 02 + SC50 - coil - 61490 - function code 01 + SC51 - coil - 61490 - function code 01 + + The response always returns a full byte of data. If you request + a number of addresses not divisible by 8, it will have extra data. The + extra data here is discarded before returning. + """ + if start < 1 or start > 1000: + raise ValueError('SC start address must be 1-1000.') + + start_coil = 61441 + start - 1 + if end is None: + count = 1 + else: + if end <= start or end > 1000: + raise ValueError('SC end address must be >start and <=1000.') + end_coil = 62439 + end - 1 + count = end_coil - start_coil + 1 + + coils = await self.read_coils(start_coil, count) + if count == 1: + return coils.bits[0] + return {f'sc{(start + i)}': bit for i, bit in enumerate(coils.bits) if i < count} + async def _get_ds(self, start: int, end: int | None) -> dict | int: """Read DS registers. Called by `get`. @@ -559,6 +595,46 @@ async def _set_c(self, start: int, data: list[bool] | bool): else: await self.write_coil(coil, data) + async def _set_sc(self, start: int, data: list[bool] | bool): + """Set SC addresses. Called by `set`. + + For more information on the quirks of SC coils, read the `_get_sc` + docstring. + """ + raise NotImplementedError + if start < 1 or start > 1000: + raise ValueError('SC start address must be 1-1000.') + writeable = ( # only these are writeable + 50, # _PLC_Mode_Change_to_STOP + 51, # _Watchdog_Timer_Reset + 53, # _RTC_Date_Change + 55, # _RTC_Time_Change + 60, # _BT_Disable_Pairing (Plus only?) + 61, # _BT_Activate_Pairing (Plus only?) + 65, # _SD_Eject + 66, # _SD_Delete_All + 67, # _SD_Copy_System + 75, # _WLAN_Reset (Plus only?) + 76, # _Sub_CPU_Reset, + 120, # _Network_Time_Request + 121, # _Network_Time_DST + ) + coil = 1441 + start - 1 + raise ValueError(coil) + + if isinstance(data, list): + if len(data) > (1000 - start + 1): + raise ValueError('Data list longer than available addresses.') + for offset, value in enumerate(data): + if start + offset not in writeable: + raise ValueError(f'sd{start + offset} is not writeable') + await self.write_coil(coil, value) + else: + if start not in writeable: + raise ValueError(f'sd{start} is not writeable') + await self.write_coil(coil, data) + + async def _set_df(self, start: int, data: list[float] | float): """Set DF registers. Called by `set`. diff --git a/clickplc/tests/test_driver.py b/clickplc/tests/test_driver.py index c36aa3d..7b072dc 100644 --- a/clickplc/tests/test_driver.py +++ b/clickplc/tests/test_driver.py @@ -231,6 +231,14 @@ async def test_ct_error_handling(plc_driver): with pytest.raises(ValueError, match=r'CT end address must be >start and <=250.'): await plc_driver.get('ct1-ct251') +@pytest.mark.asyncio(loop_scope='session') +async def test_sc_error_handling(plc_driver): + """Ensure errors are handled for invalid requests of sc registers.""" + with pytest.raises(ValueError, match=r'SC start address must be 1-1000.'): + await plc_driver.get('sc1001') + with pytest.raises(ValueError, match=r'SC end address must be >start and <=1000.'): + await plc_driver.get('sc1-sc1001') + @pytest.mark.asyncio(loop_scope='session') async def test_dh_error_handling(plc_driver): """Ensure errors are handled for invalid requests of df registers."""