-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added functions to convert to and from column indexed tables
- Loading branch information
1 parent
c509dfd
commit 77eaad3
Showing
3 changed files
with
311 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,65 +1,140 @@ | ||
from typing import Dict, List, Sequence, Union | ||
|
||
import numpy as np | ||
import numpy.typing as npt | ||
|
||
def seq_table2cmd(repeats: int, trigger: int, position: int, time1: int, phase1: dict, time2: int, phase2: dict): | ||
from pandablocks.responses import TableFieldInfo | ||
|
||
UnpackedArray = Union[ | ||
npt.NDArray[np.int32], npt.NDArray[np.uint8], npt.NDArray[np.uint16] | ||
] | ||
|
||
|
||
def words_to_table( | ||
words: Sequence[str], table_field_info: TableFieldInfo | ||
) -> Dict[str, List]: | ||
"""Unpacks the given `packed` data based on the fields provided. | ||
Returns the unpacked data in {column_name: column_data} column-indexed format | ||
Args: | ||
words: The list of data for this table, from PandA. Each item is | ||
expected to be the string representation of a uint32. | ||
table_fields: The list of fields present in the packed data. | ||
Returns: | ||
unpacked: A dict of lists, one item per field. | ||
""" | ||
Function to encode (repeats, trigger, phase1, phase2) into 32bit integer (code). | ||
Return a list of str [code, position, time1, time2] to send to pandabox sequencer (SEQ) | ||
table. | ||
Example: | ||
```python | ||
keys = ["a", "b", "c", "d", "e", "f"] | ||
repeats = 1 | ||
time1 = 0 | ||
position = 0 | ||
trigger = "Immediate" | ||
phase1 = {key: False for key in keys} | ||
time2 = 0 | ||
phase2 = {key: False if key != "a" else True for key in keys} | ||
pandabox.send( | ||
Put( | ||
"SEQ1.TABLE",seq_table( | ||
repeats, trigger, position, time1, phase2, time2, phase2 | ||
), | ||
) | ||
) | ||
``` | ||
|
||
row_words = table_field_info.row_words | ||
data = np.array(words, dtype=np.uint32) | ||
# Convert 1-D array into 2-D, one row element per row in the PandA table | ||
data = data.reshape(len(data) // row_words, row_words) | ||
packed = data.T | ||
|
||
# Ensure fields are in bit-order | ||
table_fields = dict( | ||
sorted( | ||
table_field_info.fields.items(), | ||
key=lambda item: item[1].bit_low, | ||
) | ||
) | ||
|
||
unpacked: Dict[str, List] = {} | ||
|
||
for field_name, field_info in table_fields.items(): | ||
offset = field_info.bit_low | ||
bit_length = field_info.bit_high - field_info.bit_low + 1 | ||
|
||
# The word offset indicates which column this field is in | ||
# (column is exactly one 32-bit word) | ||
word_offset = offset // 32 | ||
|
||
# bit offset is location of field inside the word | ||
bit_offset = offset & 0x1F | ||
|
||
# Mask to remove every bit that isn't in the range we want | ||
mask = (1 << bit_length) - 1 | ||
|
||
value: UnpackedArray = (packed[word_offset] >> bit_offset) & mask | ||
|
||
if field_info.subtype == "int": | ||
# First convert from 2's complement to offset, then add in offset. | ||
value = (value ^ (1 << (bit_length - 1))) + (-1 << (bit_length - 1)) | ||
value = value.astype(np.int32) | ||
else: | ||
# Use shorter types, as these are used in waveform creation | ||
if bit_length <= 8: | ||
value = value.astype(np.uint8) | ||
elif bit_length <= 16: | ||
value = value.astype(np.uint16) | ||
|
||
value_list = value.tolist() | ||
# Convert back to label from integer | ||
if field_info.labels: | ||
value_list = [field_info.labels[x] for x in value_list] | ||
|
||
unpacked.update({field_name: value_list}) | ||
|
||
return unpacked | ||
|
||
|
||
def table_to_words( | ||
table: Dict[str, Sequence], table_field_info: TableFieldInfo | ||
) -> List[str]: | ||
"""Pack the records based on the field definitions into the format PandA expects | ||
for table writes. | ||
Args: | ||
row_words: The number of 32-bit words per row | ||
table_fields_info: The info for tables, containing the number of words per row, | ||
and the bit information for fields. | ||
Returns: | ||
List[str]: The list of data ready to be sent to PandA | ||
""" | ||
trigger_options = { | ||
"Immediate": 0, | ||
"bita=0": 1, | ||
"bita=1": 2, | ||
"bitb=0": 3, | ||
"bitb=1": 4, | ||
"bitc=0": 5, | ||
"bitc=1": 6, | ||
"posa>=position": 7, | ||
"posa<=position": 8, | ||
"posb>=position": 9, | ||
"posb<=position": 10, | ||
"posc>=position": 11, | ||
"posc<=position": 12, | ||
} | ||
|
||
# _b binary code | ||
repeats_b = "{0:016b}".format(repeats) # 16 bits | ||
trigger_b = "{0:04b}".format(trigger_options[trigger]) # 4 bits (17-20) | ||
phase1_b = "" | ||
for key, value in sorted(phase1.items()): # 6 bits (a-f) | ||
phase1_b = "1" + phase1_b if value else "0" + phase1_b | ||
phase2_b = "" | ||
for key, value in sorted(phase2.items()): # 6 bits (a-f) | ||
phase2_b = "1" + phase2_b if value else "0" + phase2_b | ||
code_b = phase2_b + phase1_b + trigger_b + repeats_b # 32 bits code | ||
code = int(code_b, 2) | ||
|
||
# a table line = [code position time1 time2] | ||
pos_cmd = [ | ||
f"{code:d}", | ||
f"{int(position):d}", | ||
f"{int(time1):d}", | ||
f"{int(time2):d}", | ||
] | ||
|
||
return pos_cmd | ||
row_words = table_field_info.row_words | ||
|
||
# Ensure fields are in bit-order | ||
table_fields = dict( | ||
sorted( | ||
table_field_info.fields.items(), | ||
key=lambda item: item[1].bit_low, | ||
) | ||
) | ||
|
||
# Iterate over the zipped fields and their associated records to construct the | ||
# packed array. | ||
packed = None | ||
|
||
for column_name, column in table.items(): | ||
field_details = table_fields[column_name] | ||
if field_details.labels: | ||
# Must convert the list of ints into strings | ||
column = [field_details.labels.index(x) for x in column] | ||
|
||
# PandA always handles tables in uint32 format | ||
column_value = np.uint32(np.array(column)) | ||
|
||
if packed is None: | ||
# Create 1-D array sufficiently long to exactly hold the entire table | ||
packed = np.zeros((len(column), row_words), dtype=np.uint32) | ||
else: | ||
assert len(packed) == len(column), ( | ||
f"Table record {column_name} has mismatched length " | ||
"compared to other records, cannot pack data" | ||
) | ||
|
||
offset = field_details.bit_low | ||
|
||
# The word offset indicates which column this field is in | ||
# (each column is one 32-bit word) | ||
word_offset = offset // 32 | ||
# bit offset is location of field inside the word | ||
bit_offset = offset & 0x1F | ||
|
||
# Slice to get the column to apply the values to. | ||
# bit shift the value to the relevant bits of the word | ||
packed[:, word_offset] |= column_value << bit_offset | ||
|
||
assert isinstance(packed, np.ndarray) # Squash mypy warning | ||
|
||
# 2-D array -> 1-D array -> list[int] -> list[str] | ||
return [str(x) for x in packed.flatten().tolist()] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
[build-system] | ||
# Pin versions compatible with dls-python3 for reproducible wheels | ||
requires = ["setuptools==44.1.1", "wheel==0.33.1"] | ||
build-backend = "setuptools.build_meta" | ||
build-backend = "setuptools.build_meta" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
from typing import Dict | ||
from pandablocks.utils import table_to_words, words_to_table | ||
from pandablocks.responses import TableFieldDetails, TableFieldInfo | ||
import pytest | ||
|
||
|
||
@pytest.fixture | ||
def table_fields() -> Dict[str, TableFieldDetails]: | ||
"""Table field definitions, taken from a SEQ.TABLE instance. | ||
Associated with table_data and table_field_info fixtures""" | ||
return { | ||
"REPEATS": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=0, | ||
bit_high=15, | ||
description="Number of times the line will repeat", | ||
labels=None, | ||
), | ||
"TRIGGER": TableFieldDetails( | ||
subtype="enum", | ||
bit_low=16, | ||
bit_high=19, | ||
description="The trigger condition to start the phases", | ||
labels=[ | ||
"Immediate", | ||
"BITA=0", | ||
"BITA=1", | ||
"BITB=0", | ||
"BITB=1", | ||
"BITC=0", | ||
"BITC=1", | ||
"POSA>=POSITION", | ||
"POSA<=POSITION", | ||
"POSB>=POSITION", | ||
"POSB<=POSITION", | ||
"POSC>=POSITION", | ||
"POSC<=POSITION", | ||
], | ||
), | ||
"POSITION": TableFieldDetails( | ||
subtype="int", | ||
bit_low=32, | ||
bit_high=63, | ||
description="The position that can be used in trigger condition", | ||
labels=None, | ||
), | ||
"TIME1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=64, | ||
bit_high=95, | ||
description="The time the optional phase 1 should take", | ||
labels=None, | ||
), | ||
"OUTA1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=20, | ||
bit_high=20, | ||
description="Output A value during phase 1", | ||
labels=None, | ||
), | ||
"OUTB1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=21, | ||
bit_high=21, | ||
description="Output B value during phase 1", | ||
labels=None, | ||
), | ||
"OUTC1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=22, | ||
bit_high=22, | ||
description="Output C value during phase 1", | ||
labels=None, | ||
), | ||
"OUTD1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=23, | ||
bit_high=23, | ||
description="Output D value during phase 1", | ||
labels=None, | ||
), | ||
"OUTE1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=24, | ||
bit_high=24, | ||
description="Output E value during phase 1", | ||
labels=None, | ||
), | ||
"OUTF1": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=25, | ||
bit_high=25, | ||
description="Output F value during phase 1", | ||
labels=None, | ||
), | ||
"TIME2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=96, | ||
bit_high=127, | ||
description="The time the mandatory phase 2 should take", | ||
labels=None, | ||
), | ||
"OUTA2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=26, | ||
bit_high=26, | ||
description="Output A value during phase 2", | ||
labels=None, | ||
), | ||
"OUTB2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=27, | ||
bit_high=27, | ||
description="Output B value during phase 2", | ||
labels=None, | ||
), | ||
"OUTC2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=28, | ||
bit_high=28, | ||
description="Output C value during phase 2", | ||
labels=None, | ||
), | ||
"OUTD2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=29, | ||
bit_high=29, | ||
description="Output D value during phase 2", | ||
labels=None, | ||
), | ||
"OUTE2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=30, | ||
bit_high=30, | ||
description="Output E value during phase 2", | ||
labels=None, | ||
), | ||
"OUTF2": TableFieldDetails( | ||
subtype="uint", | ||
bit_low=31, | ||
bit_high=31, | ||
description="Output F value during phase 2", | ||
labels=None, | ||
), | ||
} | ||
|
||
|
||
@pytest.fixture | ||
def table_field_info(table_fields) -> TableFieldInfo: | ||
"""Table data associated with table_fields and table_data fixtures""" | ||
return TableFieldInfo( | ||
"table", None, "Sequencer table of lines", 16384, table_fields, 4 | ||
) | ||
|
||
|
||
def test_table_to_words_and_words_to_table(table_field_info): | ||
table = dict( | ||
REPEATS=[1, 0], | ||
TRIGGER=["Immediate", "Immediate"], | ||
POSITION=[0, 1], | ||
TIME1=[0, 1], | ||
TIME2=[0, 1], | ||
) | ||
|
||
table["OUTA1"] = [False, True] | ||
table["OUTA2"] = [True, False] | ||
for key in "BCDEF": | ||
table[f"OUT{key}1"] = table[f"OUT{key}2"] = [False, False] | ||
|
||
words = table_to_words(table, table_field_info) | ||
assert words == ["67108865", "0", "0", "0", "1048576", "1", "1", "1"] | ||
|
||
# Verify the methods are inverse | ||
assert words_to_table(words, table_field_info) == table |