Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add seq table2cmd #52

Merged
merged 6 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/reference/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from code:
- `pandablocks.asyncio`: An asyncio client that uses the control and data connections
- `pandablocks.blocking`: A blocking client that uses the control and data connections
- `pandablocks.hdf`: Some helpers for efficiently writing data responses to disk
- `pandablocks.utils`: General utility methods for use with pandablocks


.. automodule:: pandablocks.commands
Expand Down Expand Up @@ -82,3 +83,10 @@ from code:
gives multi-CPU benefits without hitting the limit of the GIL.

.. seealso:: `library-hdf`, `performance`

.. automodule:: pandablocks.utils

Utilities
---------

This package contains general methods for working with pandablocks.
129 changes: 129 additions & 0 deletions pandablocks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from typing import Dict, Iterable, List, Sequence, Union, cast

import numpy as np
import numpy.typing as npt

from pandablocks.responses import TableFieldInfo

UnpackedArray = Union[
npt.NDArray[np.int32],
npt.NDArray[np.uint8],
npt.NDArray[np.uint16],
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
npt.NDArray[np.bool_],
Sequence[str],
]


def words_to_table(
words: Iterable[str], table_field_info: TableFieldInfo
) -> Dict[str, UnpackedArray]:
"""Unpacks the given `packed` data based on the fields provided.
Returns the unpacked data in {column_name: column_data} column-indexed format

Args:
words: An iterable of data for this table, from PandA. Each item is
expected to be the string representation of a uint32.
table_fields_info: The info for tables, containing the number of words per row,
and the bit information for fields.
Returns:
unpacked: A dict containing record information, where keys are field names
and values are numpy arrays of record values in that column.
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
"""

row_words = table_field_info.row_words
data = np.array(words, dtype=np.uint32)
# Convert 1-D array into 2-D, one row element per row in the PandA table
data = data.reshape(len(data) // row_words, row_words)
packed = data.T

unpacked: Dict[str, UnpackedArray] = {}

for field_name, field_info in table_field_info.fields.items():
offset = field_info.bit_low
bit_length = field_info.bit_high - field_info.bit_low + 1

# The word offset indicates which column this field is in
# (column is exactly one 32-bit word)
word_offset = offset // 32

# bit offset is location of field inside the word
bit_offset = offset & 0x1F

# Mask to remove every bit that isn't in the range we want
mask = (1 << bit_length) - 1

value = (packed[word_offset] >> bit_offset) & mask

if field_info.subtype == "int":
# First convert from 2's complement to offset, then add in offset.
packing_value = (value ^ (1 << (bit_length - 1))) + (-1 << (bit_length - 1))
packing_value = value.astype(np.int32)
elif field_info.labels:
packing_value = [field_info.labels[x] for x in value]
elif bit_length == 1:
packing_value = value.astype(np.bool_)
else:
packing_value = value

unpacked.update({field_name: packing_value})

return unpacked


def table_to_words(
table: Dict[str, Iterable], table_field_info: TableFieldInfo
) -> List[str]:
"""Convert records based on the field definitions into the format PandA expects
for table writes.

Args:
table: A dict containing record information, where keys are field names
and values are iterables of record values in that column.
table_field_info: The info for tables, containing the dict `fields` for
information on each field, and the number of words per row.
Returns:
List[str]: The list of data ready to be sent to PandA
"""
row_words = table_field_info.row_words

# Iterate over the zipped fields and their associated records to construct the
# packed array.
packed = None

for column_name, column in table.items():
field_details = table_field_info.fields[column_name]
if field_details.labels:
# Must convert the list of ints into strings
column = [field_details.labels.index(x) for x in column]

# PandA always handles tables in uint32 format
column_value = np.uint32(np.array(column))
evalott100 marked this conversation as resolved.
Show resolved Hide resolved

if packed is None:
# Create 1-D array sufficiently long to exactly hold the entire table, cast
# to prevent type error, this will still work if column is another iterable
# e.g numpy array
column = cast(List, column)
packed = np.zeros((len(column), row_words), dtype=np.uint32)
else:
assert len(packed) == len(column), (
f"Table record {column_name} has mismatched length "
"compared to other records, cannot pack data"
)

offset = field_details.bit_low

# The word offset indicates which column this field is in
# (each column is one 32-bit word)
word_offset = offset // 32
# bit offset is location of field inside the word
bit_offset = offset & 0x1F

# Slice to get the column to apply the values to.
# bit shift the value to the relevant bits of the word
packed[:, word_offset] |= column_value << bit_offset

assert isinstance(packed, np.ndarray), "Table has no columns" # Squash mypy warning

# 2-D array -> 1-D array -> list[int] -> list[str]
return [str(x) for x in packed.flatten().tolist()]
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[build-system]
# Pin versions compatible with dls-python3 for reproducible wheels
requires = ["setuptools==44.1.1", "wheel==0.33.1"]
build-backend = "setuptools.build_meta"
build-backend = "setuptools.build_meta"
202 changes: 202 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from typing import Dict, Iterable, List, OrderedDict

import pytest

from pandablocks.responses import TableFieldDetails, TableFieldInfo
from pandablocks.utils import table_to_words, words_to_table


@pytest.fixture
def table_fields() -> Dict[str, TableFieldDetails]:
"""Table field definitions, taken from a SEQ.TABLE instance.
Associated with table_data and table_field_info fixtures"""
return {
"REPEATS": TableFieldDetails(
subtype="uint",
bit_low=0,
bit_high=15,
description="Number of times the line will repeat",
labels=None,
),
"TRIGGER": TableFieldDetails(
subtype="enum",
bit_low=16,
bit_high=19,
description="The trigger condition to start the phases",
labels=[
"Immediate",
"BITA=0",
"BITA=1",
"BITB=0",
"BITB=1",
"BITC=0",
"BITC=1",
"POSA>=POSITION",
"POSA<=POSITION",
"POSB>=POSITION",
"POSB<=POSITION",
"POSC>=POSITION",
"POSC<=POSITION",
],
),
"POSITION": TableFieldDetails(
subtype="int",
bit_low=32,
bit_high=63,
description="The position that can be used in trigger condition",
labels=None,
),
"TIME1": TableFieldDetails(
subtype="uint",
bit_low=64,
bit_high=95,
description="The time the optional phase 1 should take",
labels=None,
),
"OUTA1": TableFieldDetails(
subtype="uint",
bit_low=20,
bit_high=20,
description="Output A value during phase 1",
labels=None,
),
"OUTB1": TableFieldDetails(
subtype="uint",
bit_low=21,
bit_high=21,
description="Output B value during phase 1",
labels=None,
),
"OUTC1": TableFieldDetails(
subtype="uint",
bit_low=22,
bit_high=22,
description="Output C value during phase 1",
labels=None,
),
"OUTD1": TableFieldDetails(
subtype="uint",
bit_low=23,
bit_high=23,
description="Output D value during phase 1",
labels=None,
),
"OUTE1": TableFieldDetails(
subtype="uint",
bit_low=24,
bit_high=24,
description="Output E value during phase 1",
labels=None,
),
"OUTF1": TableFieldDetails(
subtype="uint",
bit_low=25,
bit_high=25,
description="Output F value during phase 1",
labels=None,
),
"TIME2": TableFieldDetails(
subtype="uint",
bit_low=96,
bit_high=127,
description="The time the mandatory phase 2 should take",
labels=None,
),
"OUTA2": TableFieldDetails(
subtype="uint",
bit_low=26,
bit_high=26,
description="Output A value during phase 2",
labels=None,
),
"OUTB2": TableFieldDetails(
subtype="uint",
bit_low=27,
bit_high=27,
description="Output B value during phase 2",
labels=None,
),
"OUTC2": TableFieldDetails(
subtype="uint",
bit_low=28,
bit_high=28,
description="Output C value during phase 2",
labels=None,
),
"OUTD2": TableFieldDetails(
subtype="uint",
bit_low=29,
bit_high=29,
description="Output D value during phase 2",
labels=None,
),
"OUTE2": TableFieldDetails(
subtype="uint",
bit_low=30,
bit_high=30,
description="Output E value during phase 2",
labels=None,
),
"OUTF2": TableFieldDetails(
subtype="uint",
bit_low=31,
bit_high=31,
description="Output F value during phase 2",
labels=None,
),
}


@pytest.fixture
def table_field_info(table_fields) -> TableFieldInfo:
"""Table data associated with table_fields and table_data fixtures"""
return TableFieldInfo(
"table", None, "Sequencer table of lines", 16384, table_fields, 4
)


def ensure_matching_order(list1: List, list2: List):
old_index = 0
for list1_element in list1:
new_index = list2.index(list1_element)
if new_index < old_index:
return False
old_index = new_index


def test_table_to_words_and_words_to_table(table_field_info: TableFieldInfo):
table: Dict[str, Iterable] = dict(
REPEATS=[1, 0],
TRIGGER=["Immediate", "Immediate"],
POSITION=[-20, 2**31 - 1],
TIME1=[12, 2**32 - 1],
TIME2=[32, 1],
)

table["OUTA1"] = [False, True]
table["OUTA2"] = [True, False]
for key in "BCDEF":
table[f"OUT{key}1"] = table[f"OUT{key}2"] = [False, False]

words = table_to_words(table, table_field_info)
assert words == [
"67108865",
"4294967276",
"12",
"32",
"1048576",
"2147483647",
"4294967295",
"1",
]
output = words_to_table(words, table_field_info)

# Test the correct keys are outputted
assert output.keys() == table.keys()

# Check the items have been inserted in panda order
sorted_table = OrderedDict({key: table[key] for key in output.keys()})
assert sorted_table != OrderedDict(table)

# Check the values are the same
assert [(x, list(y)) for x, y in output.items()] == list(sorted_table.items())