Skip to content

Commit

Permalink
load tags during driver init and return only those tags if get called…
Browse files Browse the repository at this point in the history
… w/o args
  • Loading branch information
JamesJeffryes committed May 19, 2020
1 parent 487dd36 commit 7d97245
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 12 deletions.
17 changes: 11 additions & 6 deletions clickplc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ def command_line():
parser = argparse.ArgumentParser(description="Control a ClickPLC from "
"the command line.")
parser.add_argument('address', help="The IP address of the ClickPLC.")
parser.add_argument('tags_file', default=None,
help="Optional: Path to a tags file for this PLC" )
args = parser.parse_args()

async def get():
async with ClickPLC(args.address) as plc:
d = await plc.get('x001-x816')
d.update(await plc.get('y001-y816'))
d.update(await plc.get('c1-c100'))
d.update(await plc.get('df1-df100'))
d.update(await plc.get('ds1-ds100'))
async with ClickPLC(args.address, args.tags_file) as plc:
if args.tags_file is not None:
d = await plc.get()
else:
d = await plc.get('x001-x816')
d.update(await plc.get('y001-y816'))
d.update(await plc.get('c1-c100'))
d.update(await plc.get('df1-df100'))
d.update(await plc.get('ds1-ds100'))
print(json.dumps(d, indent=4))

loop = asyncio.get_event_loop()
Expand Down
101 changes: 97 additions & 4 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
Distributed under the GNU General Public License v2
Copyright (C) 2020 NuMat Technologies
"""
import csv
import pydoc
from collections import defaultdict
from string import digits
from typing import Union, List

Expand All @@ -29,14 +31,44 @@ class ClickPLC(AsyncioModbusClient):
'ds': 'int16', # (D)ata register (s)igned int
}

async def get(self, address: str) -> dict:
def __init__(self, address, tag_filepath='', timeout=1):
"""Initialize PLC connection and data structure.
Args:
address: The PLC IP address or DNS name
tag_filepath: Path to the PLC tags file
timeout (optional): Timeout when communicating with PLC. Default 1s.
"""
super().__init__(address, timeout)
self.tags = self._load_tags(tag_filepath)
self.active_addresses = self._get_address_ranges(self.tags)

def get_tags(self) -> dict:
"""Return all tags and associated configuration information.
Use this data for debugging or to provide more detailed
information on user interfaces.
Returns:
A dictionary containing information associated with each tag name.
"""
return self.tags

async def get(self, address: str = None) -> dict:
"""Get variables from the ClickPLC.
Args:
address: ClickPLC address(es) to get. Specify a range with a
hyphen, e.g. 'DF1-DF40'
data: A value or list of values to set.
If driver is loaded with a tags file this can be called without an
address to return all nicknamed addresses in the tags file
>>> plc.get()
{'P-101': 0.0, 'P-102': 0.0 ..., T-101:0.0}
Otherwise one or more internal variable can be requested
>>> plc.get('df1')
0.0
>>> plc.get('df1-df20')
Expand All @@ -47,11 +79,21 @@ async def get(self, address: str) -> dict:
This uses the ClickPLC's internal variable notation, which can be
found in the Address Picker of the ClickPLC software.
"""
if address is None:
if not self.tags:
raise ValueError('An address must be supplied to get if tags were not '
'provided when driver initialized')
results = {}
for category, address in self.active_addresses.items():
results.update(await getattr(self, '_get_' + category)(address['min'], address['max']))
return {nickname: results[tag_info['id'].lower()]
for nickname, tag_info in self.tags.items()}

if '-' in address:
start, end = address.split('-')
else:
start, end = address, None
i = start.index(next(s for s in start if s.isdigit()))
i = next(i for i, s in enumerate(start) if s.isdigit())
category, start_index = start[:i].lower(), int(start[i:])
end_index = None if end is None else int(end[i:])

Expand Down Expand Up @@ -81,7 +123,7 @@ async def set(self, address, data):
if not isinstance(data, list):
data = [data]

i = address.index(next(s for s in address if s.isdigit()))
i = next(i for i, s in enumerate(address) if s.isdigit())
category, index = address[:i].lower(), int(address[i:])
if category not in self.data_types:
raise ValueError(f"{category} currently unsupported.")
Expand Down Expand Up @@ -389,3 +431,54 @@ def _pack(value):
await self.write_registers(address, payload, skip_encode=True)
else:
await self.write_register(address, _pack(data), skip_encode=True)

def _load_tags(self, tag_filepath: str) -> dict:
"""Load tags from file path.
This tag file is optional but is needed to identify the appropriate variable names,
and modbus addresses for tags in use on the PLC.
"""
if not tag_filepath:
return {}
with open(tag_filepath) as csv_file:
csv_data = csv_file.read().splitlines()
csv_data[0] = csv_data[0].lstrip('## ')
parsed = {
row['Nickname']: {
'address': {
'start': int(row['Modbus Address']),
},
'id': row['Address'],
'comment': row['Address Comment'],
'type': self.data_types.get(
row['Address'].rstrip(digits).lower()
),
}
for row in csv.DictReader(csv_data)
if row['Nickname'] and not row['Nickname'].startswith("_")
}
for data in parsed.values():
if not data['comment']:
del data['comment']
if not data['type']:
raise TypeError(
f"{data['id']} is an unsupported data type. Open a "
"github issue at numat/productivity to get it added."
)
sorted_tags = {k: parsed[k] for k in
sorted(parsed, key=lambda k: parsed[k]['address']['start'])}
return sorted_tags

@staticmethod
def _get_address_ranges(tags: dict) -> dict:
"""Parse the loaded tags to determine the range of addresses that must be
queried to return all values"""
address_dict = defaultdict(lambda: {'min': 1, 'max': 1})
for tag_info in tags.values():
i = next(i for i, s in enumerate(tag_info['id']) if s.isdigit())
category, index = tag_info['id'][:i].lower(), int(tag_info['id'][i:])
address_dict[category]['min'] = min(address_dict[category]['min'], index)
address_dict[category]['max'] = max(address_dict[category]['max'], index)
return address_dict

Empty file added clickplc/tests/__init__.py
Empty file.
30 changes: 30 additions & 0 deletions clickplc/tests/plc_tags.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Address,Data Type,Modbus Address,Function Code,Nickname,Initial Value,Retentive,Address Comment
X101,BIT,100033,"FC=02","_IO1_Module_Error",0,No,"On when module is not functioning"
X102,BIT,100034,"FC=02","_IO1_Module_Config",0,No,"On when module is initializing"
X103,BIT,100035,"FC=02","_IO1_CH1_Burnout",0,No,"On when CH1 senses burnout or open circuit"
X104,BIT,100036,"FC=02","_IO1_CH1_Under_Range",0,No,"On when CH1 receives under range input"
X105,BIT,100037,"FC=02","_IO1_CH1_Over_Range",0,No,"On when CH1 receives over range input"
X106,BIT,100038,"FC=02","_IO1_CH2_Burnout",0,No,"On when CH2 senses burnout or open circuit"
X107,BIT,100039,"FC=02","_IO1_CH2_Under_Range",0,No,"On when CH2 receives under range input"
X108,BIT,100040,"FC=02","_IO1_CH2_Over_Range",0,No,"On when CH2 receives over range input"
X109,BIT,100041,"FC=02","_IO1_CH3_Burnout",0,No,"On when CH3 senses burnout or open circuit"
X110,BIT,100042,"FC=02","_IO1_CH3_Under_Range",0,No,"On when CH3 receives under range input"
X111,BIT,100043,"FC=02","_IO1_CH3_Over_Range",0,No,"On when CH3 receives over range input"
X112,BIT,100044,"FC=02","_IO1_CH4_Burnout",0,No,"On when CH4 senses burnout or open circuit"
X113,BIT,100045,"FC=02","_IO1_CH4_Under_Range",0,No,"On when CH4 receives under range input"
X114,BIT,100046,"FC=02","_IO1_CH4_Over_Range",0,No,"On when CH4 receives over range input"
X201,BIT,100065,"FC=02","_IO2_Module_Error",0,No,"On when module is not functioning"
X202,BIT,100066,"FC=02","_IO2_Missing_24V",0,No,"On when missing external 24VDC input"
Y301,BIT,8289,"FC=01,05,15","P_101",0,No,""
Y302,BIT,8290,"FC=01,05,15","P_103",0,No,""
C1,BIT,16385,"FC=01,05,15","P_101_auto",0,No,""
C2,BIT,16386,"FC=01,05,15","P_102_auto",0,No,""
C10,BIT,16394,"FC=01,05,15","VAH_101_OK",0,No,""
C11,BIT,16395,"FC=01,05,15","VAHH_101_OK",0,No,""
C12,BIT,16396,"FC=01,05,15","IO2_Module_OK",0,No,""
C13,BIT,16397,"FC=01,05,15","IO2_24V_OK",0,No,""
DS100,INT,400100,"FC=03,06,16","TIC101_PID_ErrorCode",0,Yes,"PID Error Code"
DF1,FLOAT,428673,"FC=03,06,16","TI_101",0,Yes,""
DF5,FLOAT,428681,"FC=03,06,16","LI_102",0,Yes,""
DF6,FLOAT,428683,"FC=03,06,16","LI_101",0,Yes,""
DF7,FLOAT,428685,"FC=03,06,16","VI_101",0,Yes,""
41 changes: 39 additions & 2 deletions clickplc/test_driver.py → clickplc/tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,41 @@ def plc_driver():
return ClickPLC('fake ip')


@pytest.fixture
def tagged_driver():
return ClickPLC('fake ip', 'tests/plc_tags.csv')

@pytest.fixture
def expected_tags():
return {
'IO2_24V_OK': {'address': {'start': 16397}, 'id': 'C13', 'type': 'bool'},
'IO2_Module_OK': {'address': {'start': 16396}, 'id': 'C12', 'type': 'bool'},
'LI_101': {'address': {'start': 428683}, 'id': 'DF6', 'type': 'float'},
'LI_102': {'address': {'start': 428681}, 'id': 'DF5', 'type': 'float'},
'P_101': {'address': {'start': 8289}, 'id': 'Y301', 'type': 'bool'},
'P_101_auto': {'address': {'start': 16385}, 'id': 'C1', 'type': 'bool'},
'P_102_auto': {'address': {'start': 16386}, 'id': 'C2', 'type': 'bool'},
'P_103': {'address': {'start': 8290}, 'id': 'Y302', 'type': 'bool'},
'TIC101_PID_ErrorCode': {'address': {'start': 400100},
'comment': 'PID Error Code',
'id': 'DS100',
'type': 'int16'},
'TI_101': {'address': {'start': 428673}, 'id': 'DF1', 'type': 'float'},
'VAHH_101_OK': {'address': {'start': 16395}, 'id': 'C11', 'type': 'bool'},
'VAH_101_OK': {'address': {'start': 16394}, 'id': 'C10', 'type': 'bool'},
'VI_101': {'address': {'start': 428685}, 'id': 'DF7', 'type': 'float'}
}


def test_get_tags(tagged_driver, expected_tags):
assert expected_tags == tagged_driver.get_tags()


@pytest.mark.asyncio
async def test_get_tagged_driver(tagged_driver, expected_tags):
assert expected_tags.keys() == (await tagged_driver.get()).keys()


@pytest.mark.asyncio
@pytest.mark.parametrize('prefix', ['x', 'y'])
async def test_bool_roundtrip(plc_driver, prefix):
Expand Down Expand Up @@ -46,11 +81,13 @@ async def test_ds_roundtrip(plc_driver):

@pytest.mark.asyncio
async def test_get_error_handling(plc_driver):
with pytest.raises(ValueError, match='End address must be greater than start address.'):
with pytest.raises(ValueError, match='An address must be supplied'):
await plc_driver.get()
with pytest.raises(ValueError, match='End address must be greater than start address'):
await plc_driver.get('c3-c1')
with pytest.raises(ValueError, match='foo currently unsupported'):
await plc_driver.get('foo1')
with pytest.raises(ValueError, match='Inter-category ranges are unsupported.'):
with pytest.raises(ValueError, match='Inter-category ranges are unsupported'):
await plc_driver.get('c1-x3')


Expand Down

0 comments on commit 7d97245

Please sign in to comment.