Skip to content

Commit

Permalink
Merge pull request #10 from numat/add_tags
Browse files Browse the repository at this point in the history
Add tags
  • Loading branch information
JamesJeffryes authored Jun 4, 2020
2 parents 437ba39 + 301d64e commit 2ee09bb
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 17 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
clickplc
clickplc [![Build Status](https://dev.azure.com/jjeffryes/jjeffryes/_apis/build/status/numat.clickplc?branchName=master)](https://dev.azure.com/jjeffryes/jjeffryes/_build/latest?definitionId=3&branchName=master)
========

Python ≥3.6 driver and command-line tool for [Koyo Ethernet ClickPLCs](https://www.automationdirect.com/adc/Overview/Catalog/Programmable_Controllers/CLICK_Series_PLCs_(Stackable_Micro_Brick)).
Expand Down Expand Up @@ -57,11 +57,30 @@ The entire API is `get` and `set`, and takes a range of inputs:
>>> await plc.set('y101', True) # Sets Y101 to true
```

#### Using named tags

The ClickPLC software provides the ability to export a file containing all the addresses in use
along with their user assign nicknames. To export this file, open the Address Picker, select
"Display MODBUS address" and export the file.
![Export Tags](tags_export.png)
If a path to this tags file is provided when the
driver is initialized, tag nicknames can be used to set values. Calling `.get()` without
arguments will return all named tags.
```python
async with ClickPLC('the-plc-ip-address', 'path-to-tags-csv') as plc:
await plc.set('myTagNickname', True)
print(await plc.get())
```

Currently, only X, Y, C, DS, and DF are supported:

| | | |
|---|---|---|
| x | bool | Input point |
| y | bool | Output point |
| c | bool | (C)ontrol relay |
| df | float | (D)ata register, (f)loating point |
| ds | int16 | (D)ata register, (s)igned int |

I personally haven't needed to use the other categories, but they are
straightforward to add if needed.
21 changes: 13 additions & 8 deletions clickplc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@ def command_line():
import json

parser = argparse.ArgumentParser(description="Control a ClickPLC from "
"the command line.")
parser.add_argument('address', help="The IP address of the ClickPLC.")
"the command line")
parser.add_argument('address', help="The IP address of the ClickPLC")
parser.add_argument('tags_file', default=None, required=False,
help="Optional: Path to a tags file for this PLC")
args = parser.parse_args()

async def get():
async with ClickPLC(args.address) as plc:
d = await plc.get('x001-x816')
d.update(await plc.get('y001-y816'))
d.update(await plc.get('c1-c100'))
d.update(await plc.get('df1-df100'))
d.update(await plc.get('ds1-ds100'))
async with ClickPLC(args.address, args.tags_file) as plc:
if args.tags_file is not None:
d = await plc.get()
else:
d = await plc.get('x001-x816')
d.update(await plc.get('y001-y816'))
d.update(await plc.get('c1-c100'))
d.update(await plc.get('df1-df100'))
d.update(await plc.get('ds1-ds100'))
print(json.dumps(d, indent=4))

loop = asyncio.get_event_loop()
Expand Down
109 changes: 103 additions & 6 deletions clickplc/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
Distributed under the GNU General Public License v2
Copyright (C) 2020 NuMat Technologies
"""
import csv
import pydoc
from collections import defaultdict
from string import digits
from typing import Union, List

Expand All @@ -29,14 +31,44 @@ class ClickPLC(AsyncioModbusClient):
'ds': 'int16', # (D)ata register (s)igned int
}

async def get(self, address: str) -> dict:
def __init__(self, address, tag_filepath='', timeout=1):
"""Initialize PLC connection and data structure.
Args:
address: The PLC IP address or DNS name
tag_filepath: Path to the PLC tags file
timeout (optional): Timeout when communicating with PLC. Default 1s.
"""
super().__init__(address, timeout)
self.tags = self._load_tags(tag_filepath)
self.active_addresses = self._get_address_ranges(self.tags)

def get_tags(self) -> dict:
"""Return all tags and associated configuration information.
Use this data for debugging or to provide more detailed
information on user interfaces.
Returns:
A dictionary containing information associated with each tag name.
"""
return self.tags

async def get(self, address: str = None) -> dict:
"""Get variables from the ClickPLC.
Args:
address: ClickPLC address(es) to get. Specify a range with a
hyphen, e.g. 'DF1-DF40'
data: A value or list of values to set.
If driver is loaded with a tags file this can be called without an
address to return all nicknamed addresses in the tags file
>>> plc.get()
{'P-101': 0.0, 'P-102': 0.0 ..., T-101:0.0}
Otherwise one or more internal variable can be requested
>>> plc.get('df1')
0.0
>>> plc.get('df1-df20')
Expand All @@ -47,11 +79,21 @@ async def get(self, address: str) -> dict:
This uses the ClickPLC's internal variable notation, which can be
found in the Address Picker of the ClickPLC software.
"""
if address is None:
if not self.tags:
raise ValueError('An address must be supplied to get if tags were not '
'provided when driver initialized')
results = {}
for category, address in self.active_addresses.items():
results.update(await getattr(self, '_get_' + category)(address['min'], address['max']))
return {tag_name: results[tag_info['id'].lower()]
for tag_name, tag_info in self.tags.items()}

if '-' in address:
start, end = address.split('-')
else:
start, end = address, None
i = start.index(next(s for s in start if s.isdigit()))
i = next(i for i, s in enumerate(start) if s.isdigit())
category, start_index = start[:i].lower(), int(start[i:])
end_index = None if end is None else int(end[i:])

Expand All @@ -73,15 +115,19 @@ async def set(self, address, data):
>>> plc.set('df1', 0.0) # Sets DF1 to 0.0
>>> plc.set('df1', [0.0, 0.0, 0.0]) # Sets DF1-DF3 to 0.0.
>>> plc.set('y101', True) # Sets Y101 to true
>>> plc.set('myTagNickname', True) # Sets address named myTagNickname to true
This uses the ClickPLC's internal variable notation, which can be
found in the Address Picker of the ClickPLC software.
found in the Address Picker of the ClickPLC software. If a tags file
was loaded at driver initalization, nicknames can be used instead.
"""
if address in self.tags:
address = self.tags[address]['id']

if not isinstance(data, list):
data = [data]

i = address.index(next(s for s in address if s.isdigit()))
i = next(i for i, s in enumerate(address) if s.isdigit())
category, index = address[:i].lower(), int(address[i:])
if category not in self.data_types:
raise ValueError(f"{category} currently unsupported.")
Expand Down Expand Up @@ -389,3 +435,54 @@ def _pack(value):
await self.write_registers(address, payload, skip_encode=True)
else:
await self.write_register(address, _pack(data), skip_encode=True)

def _load_tags(self, tag_filepath: str) -> dict:
"""Load tags from file path.
This tag file is optional but is needed to identify the appropriate variable names,
and modbus addresses for tags in use on the PLC.
"""
if not tag_filepath:
return {}
with open(tag_filepath) as csv_file:
csv_data = csv_file.read().splitlines()
csv_data[0] = csv_data[0].lstrip('## ')
parsed = {
row['Nickname']: {
'address': {
'start': int(row['Modbus Address']),
},
'id': row['Address'],
'comment': row['Address Comment'],
'type': self.data_types.get(
row['Address'].rstrip(digits).lower()
),
}
for row in csv.DictReader(csv_data)
if row['Nickname'] and not row['Nickname'].startswith("_")
}
for data in parsed.values():
if not data['comment']:
del data['comment']
if not data['type']:
raise TypeError(
f"{data['id']} is an unsupported data type. Open a "
"github issue at numat/clickplc to get it added."
)
sorted_tags = {k: parsed[k] for k in
sorted(parsed, key=lambda k: parsed[k]['address']['start'])}
return sorted_tags

@staticmethod
def _get_address_ranges(tags: dict) -> dict:
"""Parse the loaded tags to determine the range of addresses that must be
queried to return all values"""
address_dict = defaultdict(lambda: {'min': 1, 'max': 1})
for tag_info in tags.values():
i = next(i for i, s in enumerate(tag_info['id']) if s.isdigit())
category, index = tag_info['id'][:i].lower(), int(tag_info['id'][i:])
address_dict[category]['min'] = min(address_dict[category]['min'], index)
address_dict[category]['max'] = max(address_dict[category]['max'], index)
return address_dict

Empty file added clickplc/tests/__init__.py
Empty file.
3 changes: 3 additions & 0 deletions clickplc/tests/bad_tags.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Address,Data Type,Modbus Address,Function Code,Nickname,Initial Value,Retentive,Address Comment
Y301,BIT,8289,"FC=01,05,15","P_101",0,No,""
FOO1,BIT,16385,"FC=01,05,15","P_101_auto",0,No,""
30 changes: 30 additions & 0 deletions clickplc/tests/plc_tags.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Address,Data Type,Modbus Address,Function Code,Nickname,Initial Value,Retentive,Address Comment
X101,BIT,100033,"FC=02","_IO1_Module_Error",0,No,"On when module is not functioning"
X102,BIT,100034,"FC=02","_IO1_Module_Config",0,No,"On when module is initializing"
X103,BIT,100035,"FC=02","_IO1_CH1_Burnout",0,No,"On when CH1 senses burnout or open circuit"
X104,BIT,100036,"FC=02","_IO1_CH1_Under_Range",0,No,"On when CH1 receives under range input"
X105,BIT,100037,"FC=02","_IO1_CH1_Over_Range",0,No,"On when CH1 receives over range input"
X106,BIT,100038,"FC=02","_IO1_CH2_Burnout",0,No,"On when CH2 senses burnout or open circuit"
X107,BIT,100039,"FC=02","_IO1_CH2_Under_Range",0,No,"On when CH2 receives under range input"
X108,BIT,100040,"FC=02","_IO1_CH2_Over_Range",0,No,"On when CH2 receives over range input"
X109,BIT,100041,"FC=02","_IO1_CH3_Burnout",0,No,"On when CH3 senses burnout or open circuit"
X110,BIT,100042,"FC=02","_IO1_CH3_Under_Range",0,No,"On when CH3 receives under range input"
X111,BIT,100043,"FC=02","_IO1_CH3_Over_Range",0,No,"On when CH3 receives over range input"
X112,BIT,100044,"FC=02","_IO1_CH4_Burnout",0,No,"On when CH4 senses burnout or open circuit"
X113,BIT,100045,"FC=02","_IO1_CH4_Under_Range",0,No,"On when CH4 receives under range input"
X114,BIT,100046,"FC=02","_IO1_CH4_Over_Range",0,No,"On when CH4 receives over range input"
X201,BIT,100065,"FC=02","_IO2_Module_Error",0,No,"On when module is not functioning"
X202,BIT,100066,"FC=02","_IO2_Missing_24V",0,No,"On when missing external 24VDC input"
Y301,BIT,8289,"FC=01,05,15","P_101",0,No,""
Y302,BIT,8290,"FC=01,05,15","P_103",0,No,""
C1,BIT,16385,"FC=01,05,15","P_101_auto",0,No,""
C2,BIT,16386,"FC=01,05,15","P_102_auto",0,No,""
C10,BIT,16394,"FC=01,05,15","VAH_101_OK",0,No,""
C11,BIT,16395,"FC=01,05,15","VAHH_101_OK",0,No,""
C12,BIT,16396,"FC=01,05,15","IO2_Module_OK",0,No,""
C13,BIT,16397,"FC=01,05,15","IO2_24V_OK",0,No,""
DS100,INT,400100,"FC=03,06,16","TIC101_PID_ErrorCode",0,Yes,"PID Error Code"
DF1,FLOAT,428673,"FC=03,06,16","TI_101",0,Yes,""
DF5,FLOAT,428681,"FC=03,06,16","LI_102",0,Yes,""
DF6,FLOAT,428683,"FC=03,06,16","LI_101",0,Yes,""
DF7,FLOAT,428685,"FC=03,06,16","VI_101",0,Yes,""
49 changes: 47 additions & 2 deletions clickplc/test_driver.py → clickplc/tests/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,49 @@ def plc_driver():
return ClickPLC('fake ip')


@pytest.fixture
def tagged_driver():
return ClickPLC('fake ip', 'tests/plc_tags.csv')

@pytest.fixture
def expected_tags():
return {
'IO2_24V_OK': {'address': {'start': 16397}, 'id': 'C13', 'type': 'bool'},
'IO2_Module_OK': {'address': {'start': 16396}, 'id': 'C12', 'type': 'bool'},
'LI_101': {'address': {'start': 428683}, 'id': 'DF6', 'type': 'float'},
'LI_102': {'address': {'start': 428681}, 'id': 'DF5', 'type': 'float'},
'P_101': {'address': {'start': 8289}, 'id': 'Y301', 'type': 'bool'},
'P_101_auto': {'address': {'start': 16385}, 'id': 'C1', 'type': 'bool'},
'P_102_auto': {'address': {'start': 16386}, 'id': 'C2', 'type': 'bool'},
'P_103': {'address': {'start': 8290}, 'id': 'Y302', 'type': 'bool'},
'TIC101_PID_ErrorCode': {'address': {'start': 400100},
'comment': 'PID Error Code',
'id': 'DS100',
'type': 'int16'},
'TI_101': {'address': {'start': 428673}, 'id': 'DF1', 'type': 'float'},
'VAHH_101_OK': {'address': {'start': 16395}, 'id': 'C11', 'type': 'bool'},
'VAH_101_OK': {'address': {'start': 16394}, 'id': 'C10', 'type': 'bool'},
'VI_101': {'address': {'start': 428685}, 'id': 'DF7', 'type': 'float'}
}


def test_get_tags(tagged_driver, expected_tags):
assert expected_tags == tagged_driver.get_tags()


def test_unsupported_tags():
with pytest.raises(TypeError, match='unsupported data type'):
ClickPLC('fake ip', 'tests/bad_tags.csv')


@pytest.mark.asyncio
async def test_tagged_driver(tagged_driver, expected_tags):
await tagged_driver.set('VAH_101_OK', True)
state = await tagged_driver.get()
assert state.get('VAH_101_OK')
assert expected_tags.keys() == state.keys()


@pytest.mark.asyncio
@pytest.mark.parametrize('prefix', ['x', 'y'])
async def test_bool_roundtrip(plc_driver, prefix):
Expand Down Expand Up @@ -46,11 +89,13 @@ async def test_ds_roundtrip(plc_driver):

@pytest.mark.asyncio
async def test_get_error_handling(plc_driver):
with pytest.raises(ValueError, match='End address must be greater than start address.'):
with pytest.raises(ValueError, match='An address must be supplied'):
await plc_driver.get()
with pytest.raises(ValueError, match='End address must be greater than start address'):
await plc_driver.get('c3-c1')
with pytest.raises(ValueError, match='foo currently unsupported'):
await plc_driver.get('foo1')
with pytest.raises(ValueError, match='Inter-category ranges are unsupported.'):
with pytest.raises(ValueError, match='Inter-category ranges are unsupported'):
await plc_driver.get('c1-x3')


Expand Down
Binary file added tags_export.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 2ee09bb

Please sign in to comment.