diff --git a/0.10.1/.buildinfo b/0.10.1/.buildinfo new file mode 100644 index 00000000..5f110ee4 --- /dev/null +++ b/0.10.1/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: bef638aa70d2274b6ec88c3170a97896 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/0.10.1/.doctrees/explanations.doctree b/0.10.1/.doctrees/explanations.doctree new file mode 100644 index 00000000..90f61072 Binary files /dev/null and b/0.10.1/.doctrees/explanations.doctree differ diff --git a/0.10.1/.doctrees/explanations/decisions.doctree b/0.10.1/.doctrees/explanations/decisions.doctree new file mode 100644 index 00000000..bc96f7ec Binary files /dev/null and b/0.10.1/.doctrees/explanations/decisions.doctree differ diff --git a/0.10.1/.doctrees/explanations/decisions/0001-record-architecture-decisions.doctree b/0.10.1/.doctrees/explanations/decisions/0001-record-architecture-decisions.doctree new file mode 100644 index 00000000..2f469806 Binary files /dev/null and b/0.10.1/.doctrees/explanations/decisions/0001-record-architecture-decisions.doctree differ diff --git a/0.10.1/.doctrees/explanations/decisions/0002-switched-to-python-copier-template.doctree b/0.10.1/.doctrees/explanations/decisions/0002-switched-to-python-copier-template.doctree new file mode 100644 index 00000000..9a30a0b2 Binary files /dev/null and b/0.10.1/.doctrees/explanations/decisions/0002-switched-to-python-copier-template.doctree differ diff --git a/0.10.1/.doctrees/explanations/decisions/0003-make-library-sans-io.doctree b/0.10.1/.doctrees/explanations/decisions/0003-make-library-sans-io.doctree new file mode 100644 index 00000000..60593ba1 Binary files /dev/null and b/0.10.1/.doctrees/explanations/decisions/0003-make-library-sans-io.doctree differ diff --git a/0.10.1/.doctrees/explanations/performance.doctree b/0.10.1/.doctrees/explanations/performance.doctree new file mode 100644 index 00000000..16d4d808 Binary files /dev/null and b/0.10.1/.doctrees/explanations/performance.doctree differ diff --git a/0.10.1/.doctrees/explanations/sans-io.doctree b/0.10.1/.doctrees/explanations/sans-io.doctree new file mode 100644 index 00000000..6f692ff3 Binary files /dev/null and b/0.10.1/.doctrees/explanations/sans-io.doctree differ diff --git a/0.10.1/.doctrees/genindex.doctree b/0.10.1/.doctrees/genindex.doctree new file mode 100644 index 00000000..d9b3361f Binary files /dev/null and b/0.10.1/.doctrees/genindex.doctree differ diff --git a/0.10.1/.doctrees/how-to.doctree b/0.10.1/.doctrees/how-to.doctree new file mode 100644 index 00000000..f8fd63ef Binary files /dev/null and b/0.10.1/.doctrees/how-to.doctree differ diff --git a/0.10.1/.doctrees/how-to/contribute.doctree b/0.10.1/.doctrees/how-to/contribute.doctree new file mode 100644 index 00000000..119fe0e1 Binary files /dev/null and b/0.10.1/.doctrees/how-to/contribute.doctree differ diff --git a/0.10.1/.doctrees/how-to/introspect-panda.doctree b/0.10.1/.doctrees/how-to/introspect-panda.doctree new file mode 100644 index 00000000..62ceff65 Binary files /dev/null and b/0.10.1/.doctrees/how-to/introspect-panda.doctree differ diff --git a/0.10.1/.doctrees/how-to/library-hdf.doctree b/0.10.1/.doctrees/how-to/library-hdf.doctree new file mode 100644 index 00000000..b4063367 Binary files /dev/null and b/0.10.1/.doctrees/how-to/library-hdf.doctree differ diff --git a/0.10.1/.doctrees/how-to/poll-changes.doctree b/0.10.1/.doctrees/how-to/poll-changes.doctree new file mode 100644 index 00000000..b02bf796 Binary files /dev/null and b/0.10.1/.doctrees/how-to/poll-changes.doctree differ diff --git a/0.10.1/.doctrees/how-to/run-container.doctree b/0.10.1/.doctrees/how-to/run-container.doctree new file mode 100644 index 00000000..b6f26077 Binary files /dev/null and b/0.10.1/.doctrees/how-to/run-container.doctree differ diff --git a/0.10.1/.doctrees/index.doctree b/0.10.1/.doctrees/index.doctree new file mode 100644 index 00000000..da1f88db Binary files /dev/null and b/0.10.1/.doctrees/index.doctree differ diff --git a/0.10.1/.doctrees/reference.doctree b/0.10.1/.doctrees/reference.doctree new file mode 100644 index 00000000..e868c2eb Binary files /dev/null and b/0.10.1/.doctrees/reference.doctree differ diff --git a/0.10.1/.doctrees/reference/api.doctree b/0.10.1/.doctrees/reference/api.doctree new file mode 100644 index 00000000..3c99e862 Binary files /dev/null and b/0.10.1/.doctrees/reference/api.doctree differ diff --git a/0.10.1/.doctrees/tutorials.doctree b/0.10.1/.doctrees/tutorials.doctree new file mode 100644 index 00000000..dd2b185c Binary files /dev/null and b/0.10.1/.doctrees/tutorials.doctree differ diff --git a/0.10.1/.doctrees/tutorials/commandline-hdf.doctree b/0.10.1/.doctrees/tutorials/commandline-hdf.doctree new file mode 100644 index 00000000..7b6c1018 Binary files /dev/null and b/0.10.1/.doctrees/tutorials/commandline-hdf.doctree differ diff --git a/0.10.1/.doctrees/tutorials/control.doctree b/0.10.1/.doctrees/tutorials/control.doctree new file mode 100644 index 00000000..c9258506 Binary files /dev/null and b/0.10.1/.doctrees/tutorials/control.doctree differ diff --git a/0.10.1/.doctrees/tutorials/installation.doctree b/0.10.1/.doctrees/tutorials/installation.doctree new file mode 100644 index 00000000..ad3b03dd Binary files /dev/null and b/0.10.1/.doctrees/tutorials/installation.doctree differ diff --git a/0.10.1/.doctrees/tutorials/load-save.doctree b/0.10.1/.doctrees/tutorials/load-save.doctree new file mode 100644 index 00000000..4a41ef27 Binary files /dev/null and b/0.10.1/.doctrees/tutorials/load-save.doctree differ diff --git a/0.10.1/_downloads/61e87182d6ca3a4a1f4e441aec8c2743/commandline-hdf-1.hires.png b/0.10.1/_downloads/61e87182d6ca3a4a1f4e441aec8c2743/commandline-hdf-1.hires.png new file mode 100644 index 00000000..7286c82d Binary files /dev/null and b/0.10.1/_downloads/61e87182d6ca3a4a1f4e441aec8c2743/commandline-hdf-1.hires.png differ diff --git a/0.10.1/_downloads/89d50336567b1a2b439acb87eb112581/commandline-hdf-1.png b/0.10.1/_downloads/89d50336567b1a2b439acb87eb112581/commandline-hdf-1.png new file mode 100644 index 00000000..f0bce322 Binary files /dev/null and b/0.10.1/_downloads/89d50336567b1a2b439acb87eb112581/commandline-hdf-1.png differ diff --git a/0.10.1/_downloads/c3bddecf4fb6222f2e2ec7db6e4e735c/commandline-hdf-1.pdf b/0.10.1/_downloads/c3bddecf4fb6222f2e2ec7db6e4e735c/commandline-hdf-1.pdf new file mode 100644 index 00000000..5eba6bbc Binary files /dev/null and b/0.10.1/_downloads/c3bddecf4fb6222f2e2ec7db6e4e735c/commandline-hdf-1.pdf differ diff --git a/0.10.1/_downloads/dc1bb79c1d7b7174ee1af8866d1dff88/commandline-hdf-1.py b/0.10.1/_downloads/dc1bb79c1d7b7174ee1af8866d1dff88/commandline-hdf-1.py new file mode 100644 index 00000000..a1048a59 --- /dev/null +++ b/0.10.1/_downloads/dc1bb79c1d7b7174ee1af8866d1dff88/commandline-hdf-1.py @@ -0,0 +1,4 @@ +for i in range(1, 4): + plt.plot(np.arange(1, 10000001) * i, label=f"Counter {i}") +plt.legend() +plt.show() \ No newline at end of file diff --git a/0.10.1/_images/commandline-hdf-1.png b/0.10.1/_images/commandline-hdf-1.png new file mode 100644 index 00000000..f0bce322 Binary files /dev/null and b/0.10.1/_images/commandline-hdf-1.png differ diff --git a/0.10.1/_images/tutorial_layout.png b/0.10.1/_images/tutorial_layout.png new file mode 100644 index 00000000..f3646496 Binary files /dev/null and b/0.10.1/_images/tutorial_layout.png differ diff --git a/0.10.1/_modules/index.html b/0.10.1/_modules/index.html new file mode 100644 index 00000000..200a65ed --- /dev/null +++ b/0.10.1/_modules/index.html @@ -0,0 +1,540 @@ + + + + + + +
+ + +
+import asyncio
+import logging
+from asyncio.streams import StreamReader, StreamWriter
+from collections.abc import AsyncGenerator, Iterable
+from contextlib import suppress
+from typing import Optional
+
+from .commands import Command, T
+from .connections import ControlConnection, DataConnection
+from .responses import Data
+
+# Define the public API of this module
+__all__ = ["AsyncioClient"]
+
+
+class _StreamHelper:
+ _reader: Optional[StreamReader] = None
+ _writer: Optional[StreamWriter] = None
+
+ @property
+ def reader(self) -> StreamReader:
+ assert self._reader, "connect() not called yet"
+ return self._reader
+
+ @property
+ def writer(self) -> StreamWriter:
+ assert self._writer, "connect() not called yet"
+ return self._writer
+
+ async def write_and_drain(self, data: bytes, timeout: Optional[float] = None):
+ writer = self.writer
+ writer.write(data)
+
+ # Cannot simply await the drain, as if the remote end has disconnected
+ # then the drain will never complete as the OS cannot clear its send buffer.
+ write_task = asyncio.create_task(writer.drain())
+ _, pending = await asyncio.wait([write_task], timeout=timeout)
+ if len(pending):
+ for task in pending:
+ task.cancel()
+ raise asyncio.TimeoutError("Timeout writing data")
+
+ async def connect(self, host: str, port: int):
+ self._reader, self._writer = await asyncio.open_connection(host, port)
+
+ async def close(self):
+ writer = self.writer
+ self._reader = None
+ self._writer = None
+ writer.close()
+ await writer.wait_closed()
+
+
+
+[docs]
+class AsyncioClient:
+ """Asyncio implementation of a PandABlocks client.
+ For example::
+
+ async with AsyncioClient("hostname-or-ip") as client:
+ # Control port is now connected
+ resp1, resp2 = await asyncio.gather(client.send(cmd1), client.send(cmd2))
+ resp3 = await client.send(cmd3)
+ async for data in client.data():
+ handle(data)
+ # Control and data ports are now disconnected
+ """
+
+ def __init__(self, host: str):
+ self._host = host
+ self._ctrl_connection = ControlConnection()
+ self._ctrl_task: Optional[asyncio.Task] = None
+ self._ctrl_queues: dict[int, asyncio.Queue] = {}
+ self._ctrl_stream = _StreamHelper()
+
+
+[docs]
+ async def connect(self):
+ """Connect to the control port, and be ready to handle commands"""
+ await self._ctrl_stream.connect(self._host, 8888)
+
+ self._ctrl_task = asyncio.create_task(
+ self._ctrl_read_forever(self._ctrl_stream.reader)
+ )
+
+
+
+[docs]
+ def is_connected(self):
+ """True if there is a currently active connection.
+ NOTE: This does not indicate if the remote end is still connected."""
+ if self._ctrl_task and not self._ctrl_task.done():
+ return True
+ return False
+
+
+
+[docs]
+ async def close(self):
+ """Close the control connection, and wait for completion"""
+ assert self._ctrl_task, "connect() not called yet"
+ self._ctrl_task.cancel()
+ await self._ctrl_stream.close()
+
+
+ async def __aenter__(self) -> "AsyncioClient":
+ await self.connect()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.close()
+
+ async def _ctrl_read_forever(self, reader: asyncio.StreamReader):
+ """Continually read data from the stream reader and add to the data queue.
+
+ Args:
+ reader: The `StreamReader` to read from
+ """
+ while True:
+ received = await reader.read(4096)
+ if received == b"":
+ error_str = (
+ "Received an empty packet. Closing connection. "
+ "Has the PandA disconnected?"
+ )
+ logging.error(error_str)
+ raise ConnectionError(error_str)
+
+ try:
+ to_send = self._ctrl_connection.receive_bytes(received)
+ await self._ctrl_stream.write_and_drain(to_send)
+ for command, response in self._ctrl_connection.responses():
+ queue = self._ctrl_queues.pop(id(command))
+ queue.put_nowait(response)
+ except Exception:
+ logging.exception(f"Error handling '{received.decode()}'")
+
+
+[docs]
+ async def send(self, command: Command[T], timeout: Optional[float] = None) -> T:
+ """Send a command to control port of the PandA, returning its response.
+
+ Args:
+ command: The `Command` to send
+ """
+ queue: asyncio.Queue[T] = asyncio.Queue()
+ # Need to use the id as non-frozen dataclasses don't hash
+ self._ctrl_queues[id(command)] = queue
+ to_send = self._ctrl_connection.send(command)
+ await self._ctrl_stream.write_and_drain(to_send, timeout)
+ response = await asyncio.wait_for(queue.get(), timeout)
+ if isinstance(response, Exception):
+ raise response
+ else:
+ return response
+
+
+
+[docs]
+ async def data(
+ self,
+ scaled: bool = True,
+ flush_period: Optional[float] = None,
+ frame_timeout: Optional[float] = None,
+ ) -> AsyncGenerator[Data, None]:
+ """Connect to data port and yield data frames
+
+ Args:
+ scaled: Whether to scale and average data frames, reduces throughput
+ flush_period: How often to flush partial data frames, None is on every
+ chunk of data from the server
+ frame_timeout: If no data is received for this amount of time, raise
+ `asyncio.TimeoutError`
+ """
+
+ stream = _StreamHelper()
+ connection = DataConnection()
+ queue: asyncio.Queue[Iterable[Data]] = asyncio.Queue()
+
+ def raise_timeouterror():
+ raise asyncio.TimeoutError(f"No data received for {frame_timeout}s")
+ yield
+
+ async def periodic_flush():
+ if flush_period is not None:
+ while True:
+ # Every flush_period seconds flush and queue data
+ await asyncio.sleep(flush_period)
+ queue.put_nowait(connection.flush())
+
+ async def read_from_stream():
+ reader = stream.reader
+ # Should we flush every FrameData?
+ flush_every_frame = flush_period is None
+ while True:
+ try:
+ recv = await asyncio.wait_for(reader.read(4096), frame_timeout)
+ except asyncio.TimeoutError:
+ queue.put_nowait(raise_timeouterror())
+ break
+ else:
+ queue.put_nowait(connection.receive_bytes(recv, flush_every_frame))
+
+ await stream.connect(self._host, 8889)
+ await stream.write_and_drain(connection.connect(scaled))
+ fut = asyncio.gather(periodic_flush(), read_from_stream())
+ try:
+ while True:
+ for data in await queue.get():
+ yield data
+ finally:
+ fut.cancel()
+ await stream.close()
+ with suppress(asyncio.CancelledError):
+ await fut
+
+
+
+import socket
+from collections.abc import Iterable, Iterator
+from typing import Optional, Union, overload
+
+from .commands import Command, T
+from .connections import ControlConnection, DataConnection
+from .responses import Data
+
+# Define the public API of this module
+__all__ = ["BlockingClient"]
+
+
+class _SocketHelper:
+ _socket: Optional[socket.socket] = None
+
+ def connect(self, host: str, port: int):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host, port))
+ s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+ self._socket = s
+
+ @property
+ def socket(self) -> socket.socket:
+ assert self._socket, "connect() not called yet"
+ return self._socket
+
+ def close(self):
+ s = self.socket
+ self._socket = None
+ s.shutdown(socket.SHUT_WR)
+ s.close()
+
+
+
+[docs]
+class BlockingClient:
+ """Blocking implementation of a PandABlocks client.
+ For example::
+
+ with BlockingClient("hostname-or-ip") as client:
+ # Control port is now connected
+ resp1, resp2 = client.send([cmd1, cmd2])
+ resp3 = client.send(cmd3)
+ for data in client.data():
+ handle(data)
+ # Control and data ports are now disconnected
+ """
+
+ def __init__(self, host: str):
+ self._host = host
+ self._ctrl_connection = ControlConnection()
+ self._ctrl_socket = _SocketHelper()
+
+
+[docs]
+ def connect(self):
+ """Connect to the control port, and be ready to handle commands"""
+ self._ctrl_socket.connect(self._host, 8888)
+
+
+
+[docs]
+ def close(self):
+ """Close the control connection, and wait for completion"""
+ self._ctrl_socket.close()
+
+
+ def __enter__(self) -> "BlockingClient":
+ self.connect()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ @overload
+ def send(self, commands: Command[T], timeout: Optional[int] = None) -> T: ...
+
+ @overload
+ def send(
+ self, commands: Iterable[Command], timeout: Optional[int] = None
+ ) -> list: ...
+
+
+[docs]
+ def send(
+ self,
+ commands: Union[Command[T], Iterable[Command]],
+ timeout: Optional[int] = None,
+ ):
+ """Send a command to control port of the PandA, returning its response.
+
+ Args:
+ commands: If single `Command`, return its response. If a list of commands
+ return a list of reponses
+ timeout: If no reponse in this time, raise `socket.timeout`
+ """
+ s = self._ctrl_socket.socket
+ s.settimeout(timeout)
+ if isinstance(commands, Command):
+ commands = [commands]
+ else:
+ commands = list(commands)
+ for command in commands:
+ to_send = self._ctrl_connection.send(command)
+ s.sendall(to_send)
+ # Rely on dicts being ordered, Ellipsis is shorthand for "no response yet"
+ cr = {id(command): ... for command in commands}
+ while ... in cr.values():
+ received = s.recv(4096)
+ to_send = self._ctrl_connection.receive_bytes(received)
+ s.sendall(to_send)
+ for command, response in self._ctrl_connection.responses():
+ assert cr[id(command)] is ..., "Already got response for {command}"
+ cr[id(command)] = response
+ responses = list(cr.values())
+ for response in responses:
+ if isinstance(response, Exception):
+ raise response
+ if len(responses) == 1:
+ return responses[0]
+ else:
+ return responses
+
+
+
+[docs]
+ def data(
+ self, scaled: bool = True, frame_timeout: Optional[int] = None
+ ) -> Iterator[Data]:
+ """Connect to data port and yield data frames
+
+ Args:
+ scaled: Whether to scale and average data frames, reduces throughput
+ frame_timeout: If no data is received for this amount of time, raise
+ `socket.timeout`
+ """
+ data_socket = _SocketHelper()
+ data_socket.connect(self._host, 8889)
+
+ connection = DataConnection()
+ s = data_socket.socket
+ s.settimeout(frame_timeout) # close enough
+ s.sendall(connection.connect(scaled))
+ try:
+ while True:
+ received = s.recv(4096)
+ yield from connection.receive_bytes(received)
+ finally:
+ data_socket.close()
+
+
+
+import logging
+import re
+from collections.abc import Generator
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import (
+ Any,
+ Callable,
+ Generic,
+ Optional,
+ TypeVar,
+ Union,
+ overload,
+)
+
+from ._exchange import Exchange, ExchangeGenerator
+from .responses import (
+ BitMuxFieldInfo,
+ BitOutFieldInfo,
+ BlockInfo,
+ Changes,
+ EnumFieldInfo,
+ ExtOutBitsFieldInfo,
+ ExtOutFieldInfo,
+ FieldInfo,
+ PosMuxFieldInfo,
+ PosOutFieldInfo,
+ ScalarFieldInfo,
+ SubtypeTimeFieldInfo,
+ TableFieldDetails,
+ TableFieldInfo,
+ TimeFieldInfo,
+ UintFieldInfo,
+)
+
+# Define the public API of this module
+__all__ = [
+ "Command",
+ "CommandException",
+ "Raw",
+ "Get",
+ "GetLine",
+ "GetMultiline",
+ "Put",
+ "Append",
+ "Arm",
+ "Disarm",
+ "GetBlockInfo",
+ "GetFieldInfo",
+ "GetPcapBitsLabels",
+ "ChangeGroup",
+ "GetChanges",
+ "GetState",
+ "SetState",
+]
+
+
+T = TypeVar("T")
+T2 = TypeVar("T2")
+T3 = TypeVar("T3")
+T4 = TypeVar("T4")
+
+
+# Checks whether the server will interpret cmd as a table command: search for
+# first of '?', '=', '<', if '<' found first then it's a multiline command.
+MULTILINE_COMMAND = re.compile(r"^[^?=]*<")
+
+
+def is_multiline_command(cmd: str):
+ return MULTILINE_COMMAND.match(cmd) is not None
+
+
+
+[docs]
+@dataclass
+class Command(Generic[T]):
+ """Abstract baseclass for all ControlConnection commands to be inherited from"""
+
+ def execute(self) -> ExchangeGenerator[T]:
+ # A generator that sends lines to the PandA, gets lines back, and returns a
+ # response
+ raise NotImplementedError(self)
+
+
+
+
+[docs]
+class CommandException(Exception):
+ """Raised if a `Command` receives a mal-formed response"""
+
+
+
+# `execute_commands()` actually returns a list with length equal to the number
+# of tasks passed; however, Tuple is used similar to the annotation for
+# zip() because typing does not support variadic type variables. See
+# typeshed PR #1550 for discussion.
+@overload
+def _execute_commands(c1: Command[T]) -> ExchangeGenerator[tuple[T]]: ...
+
+
+@overload
+def _execute_commands(
+ c1: Command[T], c2: Command[T2]
+) -> ExchangeGenerator[tuple[T, T2]]: ...
+
+
+@overload
+def _execute_commands(
+ c1: Command[T], c2: Command[T2], c3: Command[T3]
+) -> ExchangeGenerator[tuple[T, T2, T3]]: ...
+
+
+@overload
+def _execute_commands(
+ c1: Command[T], c2: Command[T2], c3: Command[T3], c4: Command[T4]
+) -> ExchangeGenerator[tuple[T, T2, T3, T4]]: ...
+
+
+@overload
+def _execute_commands(
+ *commands: Command[Any],
+) -> ExchangeGenerator[tuple[Any, ...]]: ...
+
+
+def _execute_commands(*commands):
+ """Call the `Command.execute` method on each of the commands to produce
+ some `Exchange` generators, which are yielded back to the connection,
+ then zip together the responses to those exchanges into a tuple"""
+ # If we add type annotations to this function then mypy complains:
+ # Overloaded function implementation does not accept all possible arguments
+ # As we want to type check this, we put the logic in _zip_with_return
+ ret = yield from _zip_with_return([command.execute() for command in commands])
+ return ret
+
+
+def _zip_with_return(
+ generators: list[ExchangeGenerator[Any]],
+) -> ExchangeGenerator[tuple[Any, ...]]:
+ # Sentinel to show what generators are not yet exhausted
+ pending = object()
+ returns = [pending] * len(generators)
+ while True:
+ yields: list[Exchange] = []
+ for i, gen in enumerate(generators):
+ # If we haven't exhausted the generator
+ if returns[i] is pending:
+ try:
+ # Get the exchanges that it wants to fill in
+ exchanges = next(gen)
+ except StopIteration as e:
+ # Generator is exhausted, store its return value
+ returns[i] = e.value
+ else:
+ # Add the exchanges to the list
+ if isinstance(exchanges, list):
+ yields += exchanges
+ else:
+ yields.append(exchanges)
+ if yields:
+ # There were some Exchanges yielded, so yield them all up
+ # for the Connection to fill in
+ yield yields
+ else:
+ # All the generators are exhausted, so return the tuple of all
+ # their return values
+ return tuple(returns)
+
+
+
+[docs]
+@dataclass
+class Raw(Command[list[str]]):
+ """Send a raw command
+
+ Args:
+ inp: The input lines to send
+
+ For example::
+
+ Raw(["PCAP.ACTIVE?"]) -> ["OK =1"]
+ Raw(["SEQ1.TABLE>", "1", "1", "0", "0", ""]) -> ["OK"]
+ Raw(["SEQ1.TABLE?"]) -> ["!1", "!1", "!0", "!0", "."])
+ """
+
+ inp: list[str]
+
+ def execute(self) -> ExchangeGenerator[list[str]]:
+ ex = Exchange(self.inp)
+ yield ex
+ return ex.received
+
+
+
+
+[docs]
+@dataclass
+class Get(Command[Union[str, list[str]]]):
+ """Get the value of a field or star command.
+
+ If the form of the expected return is known, consider using `GetLine`
+ or `GetMultiline` instead.
+
+ Args:
+ field: The field, attribute, or star command to get
+
+ For example::
+
+ Get("PCAP.ACTIVE") -> "1"
+ Get("SEQ1.TABLE") -> ["1048576", "0", "1000", "1000"]
+ Get("*IDN") -> "PandA 1.1..."
+ """
+
+ field: str
+
+ def execute(self) -> ExchangeGenerator[Union[str, list[str]]]:
+ ex = Exchange(f"{self.field}?")
+ yield ex
+ if ex.is_multiline:
+ return ex.multiline
+ else:
+ return ex.line
+
+
+
+
+[docs]
+@dataclass
+class GetLine(Command[str]):
+ """Get the value of a field or star command, when the result is expected to be a
+ single line.
+
+ Args:
+ field: The field, attribute, or star command to get
+
+ For example::
+
+ GetLine("PCAP.ACTIVE") -> "1"
+ GetLine("*IDN") -> "PandA 1.1..."
+ """
+
+ field: str
+
+ def execute(self) -> ExchangeGenerator[str]:
+ ex = Exchange(f"{self.field}?")
+ yield ex
+ return ex.line
+
+
+
+
+[docs]
+@dataclass
+class GetMultiline(Command[list[str]]):
+ """Get the value of a field or star command, when the result is expected to be a
+ multiline response.
+
+ Args:
+ field: The field, attribute, or star command to get
+
+ For example::
+
+ GetMultiline("SEQ1.TABLE") -> ["1048576", "0", "1000", "1000"]
+ GetMultiline("*METADATA.*") -> ["LABEL_FILTER1", "APPNAME", ...]
+ """
+
+ field: str
+
+ def execute(self) -> ExchangeGenerator[list[str]]:
+ ex = Exchange(f"{self.field}?")
+ yield ex
+ return ex.multiline
+
+
+
+
+[docs]
+@dataclass
+class Put(Command[None]):
+ """Put the value of a field.
+
+ Args:
+ field: The field, attribute, or star command to put
+ value: The value, either string or list of strings or empty, to put
+
+ For example::
+
+ Put("PCAP.TRIG", "PULSE1.OUT")
+ Put("SEQ1.TABLE", ["1048576", "0", "1000", "1000"])
+ Put("SFP3_SYNC_IN1.SYNC_RESET")
+ """
+
+ field: str
+ value: Union[str, list[str]] = ""
+
+ def execute(self) -> ExchangeGenerator[None]:
+ if isinstance(self.value, list):
+ # Multiline table with blank line to terminate
+ ex = Exchange([f"{self.field}<"] + self.value + [""])
+ else:
+ ex = Exchange(f"{self.field}={self.value}")
+ yield ex
+ ex.check_ok()
+
+
+
+
+[docs]
+@dataclass
+class Append(Command[None]):
+ """Append the value of a table field.
+
+ Args:
+ field: The field, attribute, or star command to append
+ value: The value, list of strings, to append
+
+ For example::
+
+ Append("SEQ1.TABLE", ["1048576", "0", "1000", "1000"])
+ """
+
+ field: str
+ value: list[str]
+
+ def execute(self) -> ExchangeGenerator[None]:
+ # Multiline table with blank line to terminate
+ ex = Exchange([f"{self.field}<<"] + self.value + [""])
+ yield ex
+ ex.check_ok()
+
+
+
+
+[docs]
+class Arm(Command[None]):
+ """Arm PCAP for an acquisition by sending ``*PCAP.ARM=``"""
+
+ def execute(self) -> ExchangeGenerator[None]:
+ ex = Exchange("*PCAP.ARM=")
+ yield ex
+ ex.check_ok()
+
+
+
+
+[docs]
+class Disarm(Command[None]):
+ """Disarm PCAP, stopping acquisition by sending ``*PCAP.DISARM=``"""
+
+ def execute(self) -> ExchangeGenerator[None]:
+ ex = Exchange("*PCAP.DISARM=")
+ yield ex
+ ex.check_ok()
+
+
+
+
+[docs]
+@dataclass
+class GetBlockInfo(Command[dict[str, BlockInfo]]):
+ """Get the name, number, and description of each block type
+ in a dictionary, alphabetically ordered
+
+ Args:
+ skip_description: If `True`, prevents retrieving the description
+ for each Block. This will reduce network calls.
+
+ For example::
+
+ GetBlockInfo() ->
+ {
+ "LUT": BlockInfo(number=8, description="Lookup table"),
+ "PCAP": BlockInfo(number=1, description="Position capture control"),
+ ...
+ }
+ """
+
+ skip_description: bool = False
+
+ def execute(self) -> ExchangeGenerator[dict[str, BlockInfo]]:
+ ex = Exchange("*BLOCKS?")
+ yield ex
+
+ blocks_list, commands = [], []
+ for line in ex.multiline:
+ block, num = line.split()
+ blocks_list.append((block, int(num)))
+ commands.append(GetLine(f"*DESC.{block}"))
+
+ if self.skip_description:
+ # Must use tuple() to match type returned by _execute_commands
+ description_values = tuple(None for _ in commands)
+ else:
+ description_values = yield from _execute_commands(*commands)
+
+ block_infos = {
+ block: BlockInfo(number=num, description=desc)
+ for (block, num), desc in sorted(zip(blocks_list, description_values))
+ }
+
+ return block_infos
+
+
+
+# The type of the generators used for creating the Get commands for each field
+# and setting the returned data into the FieldInfo structure
+_FieldGeneratorType = Generator[
+ Union[Exchange, list[Exchange]],
+ None,
+ tuple[str, FieldInfo],
+]
+
+
+
+[docs]
+@dataclass
+class GetFieldInfo(Command[dict[str, FieldInfo]]):
+ """Get the fields of a block, returning a `FieldInfo` (or appropriate subclass) for
+ each one, ordered to match the definition order in the PandA
+
+ Args:
+ block: The name of the block type
+ extended_metadata: If `True`, retrieves detailed metadata about a field and
+ all of its attributes. This will cause an additional network round trip.
+ If `False` only the field names and types will be returned. Default `True`.
+
+ For example::
+
+ GetFieldInfo("LUT") -> {
+ "INPA":
+ BitMuxFieldInfo(type='bit_mux',
+ subtype=None,
+ description='Input A',
+ max_delay=5
+ label=['TTLIN1.VAL', 'TTLIN2.VAL', ...]),
+ ...}
+ """
+
+ block: str
+ extended_metadata: bool = True
+
+ _commands_map: dict[
+ tuple[str, Optional[str]],
+ Callable[
+ [str, str, Optional[str]],
+ _FieldGeneratorType,
+ ],
+ ] = field(init=False, repr=False, default_factory=dict)
+
+ def __post_init__(self):
+ # Map a (type, subtype) to a method that returns the appropriate
+ # subclasss of FieldInfo, and a list of all the Commands to request.
+ # Note that fields that do not have additional attributes are not listed.
+ self._commands_map = {
+ # Order matches that of PandA server's Field Types docs
+ ("time", None): self._time,
+ ("bit_out", None): self._bit_out,
+ ("pos_out", None): self._pos_out,
+ ("ext_out", "timestamp"): self._ext_out,
+ ("ext_out", "samples"): self._ext_out,
+ ("ext_out", "bits"): self._ext_out_bits,
+ ("bit_mux", None): self._bit_mux,
+ ("pos_mux", None): self._pos_mux,
+ ("table", None): self._table,
+ ("param", "uint"): self._uint,
+ ("read", "uint"): self._uint,
+ ("write", "uint"): self._uint,
+ ("param", "int"): self._no_attributes,
+ ("read", "int"): self._no_attributes,
+ ("write", "int"): self._no_attributes,
+ ("param", "scalar"): self._scalar,
+ ("read", "scalar"): self._scalar,
+ ("write", "scalar"): self._scalar,
+ ("param", "bit"): self._no_attributes,
+ ("read", "bit"): self._no_attributes,
+ ("write", "bit"): self._no_attributes,
+ ("param", "action"): self._no_attributes,
+ ("read", "action"): self._no_attributes,
+ ("write", "action"): self._no_attributes,
+ ("param", "lut"): self._no_attributes,
+ ("read", "lut"): self._no_attributes,
+ ("write", "lut"): self._no_attributes,
+ ("param", "enum"): self._enum,
+ ("read", "enum"): self._enum,
+ ("write", "enum"): self._enum,
+ ("param", "time"): self._subtype_time,
+ ("read", "time"): self._subtype_time,
+ ("write", "time"): self._subtype_time,
+ }
+
+ def _get_desc(self, field_name: str) -> GetLine:
+ """Create the Command to retrieve the description"""
+ return GetLine(f"*DESC.{self.block}.{field_name}")
+
+ def _uint(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, maximum = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetLine(f"{self.block}1.{field_name}.MAX"),
+ )
+
+ field_info = UintFieldInfo(field_type, field_subtype, desc, int(maximum))
+ return field_name, field_info
+
+ def _scalar(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, units, scale, offset = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetLine(f"{self.block}.{field_name}.UNITS"),
+ GetLine(f"{self.block}.{field_name}.SCALE"),
+ GetLine(f"{self.block}.{field_name}.OFFSET"),
+ )
+
+ field_info = ScalarFieldInfo(
+ field_type, field_subtype, desc, units, float(scale), int(offset)
+ )
+
+ return field_name, field_info
+
+ def _subtype_time(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, units_labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}.UNITS"),
+ )
+
+ field_info = SubtypeTimeFieldInfo(field_type, field_subtype, desc, units_labels)
+
+ return field_name, field_info
+
+ def _enum(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}"),
+ )
+
+ field_info = EnumFieldInfo(field_type, field_subtype, desc, labels)
+
+ return field_name, field_info
+
+ def _time(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, units = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}.UNITS"),
+ )
+
+ field_info = TimeFieldInfo(field_type, field_subtype, desc, units)
+
+ return field_name, field_info
+
+ def _bit_out(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, capture_word, offset = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetLine(f"{self.block}1.{field_name}.CAPTURE_WORD"),
+ GetLine(f"{self.block}1.{field_name}.OFFSET"),
+ )
+
+ field_info = BitOutFieldInfo(
+ field_type, field_subtype, desc, capture_word, int(offset)
+ )
+
+ return field_name, field_info
+
+ def _bit_mux(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, max_delay, labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetLine(f"{self.block}1.{field_name}.MAX_DELAY"),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}"),
+ )
+
+ field_info = BitMuxFieldInfo(
+ field_type, field_subtype, desc, int(max_delay), labels
+ )
+
+ return field_name, field_info
+
+ def _pos_mux(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}"),
+ )
+ field_info = PosMuxFieldInfo(field_type, field_subtype, desc, labels)
+
+ return field_name, field_info
+
+ def _table(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ # Ignore the ROW_WORDS attribute as it's new and won't be present on all PandAs,
+ # and there's no easy way to try it and catch an error while also running other
+ # Get commands at the same time
+ table_desc, max_length, fields = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetLine(f"{self.block}1.{field_name}.MAX_LENGTH"),
+ GetMultiline(f"{self.block}1.{field_name}.FIELDS"),
+ )
+
+ # Keep track of highest bit index
+ max_bit_offset: int = 0
+
+ desc_gets: list[GetLine] = []
+ enum_field_gets: list[GetMultiline] = []
+ enum_field_names: list[str] = []
+ fields_dict: dict[str, TableFieldDetails] = {}
+ for field_details in fields:
+ # Fields are of the form <bit_high>:<bit_low> <name> <subtype>
+ bit_range, name, subtype = field_details.split()
+ bit_high_str, bit_low_str = bit_range.split(":")
+ bit_high = int(bit_high_str)
+ bit_low = int(bit_low_str)
+
+ if bit_high > max_bit_offset:
+ max_bit_offset = bit_high
+
+ if subtype == "enum":
+ enum_field_gets.append(
+ GetMultiline(f"*ENUMS.{self.block}1.{field_name}[].{name}")
+ )
+ enum_field_names.append(name)
+
+ fields_dict[name] = TableFieldDetails(subtype, bit_low, bit_high)
+
+ desc_gets.append(GetLine(f"*DESC.{self.block}1.{field_name}[].{name}"))
+
+ # Calculate the number of 32 bit words that comprises one table row
+ row_words = max_bit_offset // 32 + 1
+
+ # The first len(enum_field_gets) items are enum labels, type list[str]
+ # The second part of the list are descriptions, type str
+ labels_and_descriptions = yield from _execute_commands(
+ *enum_field_gets, *desc_gets
+ )
+
+ for name, labels in zip(
+ enum_field_names, labels_and_descriptions[: len(enum_field_gets)]
+ ):
+ fields_dict[name].labels = labels
+
+ for name, desc in zip(
+ fields_dict.keys(), labels_and_descriptions[len(enum_field_gets) :]
+ ):
+ fields_dict[name].description = desc
+
+ field_info = TableFieldInfo(
+ field_type,
+ field_subtype,
+ table_desc,
+ int(max_length),
+ fields_dict,
+ row_words,
+ )
+
+ return field_name, field_info
+
+ def _pos_out(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, capture_labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}.CAPTURE"),
+ )
+
+ field_info = PosOutFieldInfo(field_type, field_subtype, desc, capture_labels)
+ return field_name, field_info
+
+ def _ext_out(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, capture_labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}.CAPTURE"),
+ )
+
+ field_info = ExtOutFieldInfo(field_type, field_subtype, desc, capture_labels)
+ return field_name, field_info
+
+ def _ext_out_bits(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ desc, bits, capture_labels = yield from _execute_commands(
+ self._get_desc(field_name),
+ GetMultiline(f"{self.block}.{field_name}.BITS"),
+ GetMultiline(f"*ENUMS.{self.block}.{field_name}.CAPTURE"),
+ )
+ field_info = ExtOutBitsFieldInfo(
+ field_type, field_subtype, desc, capture_labels, bits
+ )
+ return field_name, field_info
+
+ def _no_attributes(
+ self, field_name: str, field_type: str, field_subtype: Optional[str]
+ ) -> _FieldGeneratorType:
+ """Calling this method indicates type-subtype pair is known and
+ has no attributes, so only a description needs to be retrieved"""
+ desc = yield from self._get_desc(field_name).execute()
+
+ return field_name, FieldInfo(field_type, field_subtype, desc)
+
+ def execute(self) -> ExchangeGenerator[dict[str, FieldInfo]]:
+ ex = Exchange(f"{self.block}.*?")
+ yield ex
+ unsorted: dict[int, tuple[str, FieldInfo]] = {}
+ field_generators: list[ExchangeGenerator] = []
+
+ for line in ex.multiline:
+ field_name, index, type_subtype = line.split(maxsplit=2)
+
+ field_type: str
+ subtype: Optional[str]
+ # Append "None" to list below so there are always at least 2 elements
+ # so we can always unpack into subtype, even if no split occurs.
+ field_type, subtype, *_ = [*type_subtype.split(maxsplit=1), None]
+
+ # Always create default FieldInfo. If necessary we will replace it later
+ # with a more type-specific version.
+ field_info = FieldInfo(field_type, subtype, None)
+
+ if self.extended_metadata:
+ try:
+ # Construct the list of type-specific generators
+ field_generators.append(
+ self._commands_map[(field_type, subtype)](
+ field_name, field_type, subtype
+ )
+ )
+
+ except KeyError:
+ # This exception will be hit if PandA ever defines new types
+ logging.exception(
+ f"Unknown type {(field_type, subtype)} detected for "
+ f"{field_name}, cannot retrieve extended information for it."
+ )
+ # We can assume the new field will have a description though
+ field_generators.append(
+ self._no_attributes(field_name, field_type, subtype)
+ )
+
+ # Keep track of order of fields as returned by PandA. Important for later
+ # matching descriptions back to their field.
+ unsorted[int(index)] = (field_name, field_info)
+
+ # Dict keeps insertion order, so insert in the order the server said
+ fields = {name: field for _, (name, field) in sorted(unsorted.items())}
+
+ if self.extended_metadata is False:
+ # Asked to not perform the requests for extra metadata.
+ return fields
+
+ field_name_info: tuple[tuple[str, FieldInfo], ...]
+ field_name_info = yield from _zip_with_return(field_generators)
+
+ fields.update(field_name_info)
+
+ return fields
+
+
+
+
+[docs]
+class GetPcapBitsLabels(Command):
+ """Get the labels for the bit fields in PCAP.
+
+ For example::
+
+ GetPcapBitsLabels() -> {"BITS0" : ["TTLIN1.VAL", "TTLIN2.VAL", ...], ...}
+ """
+
+ def execute(self) -> ExchangeGenerator[dict[str, list[str]]]:
+ ex = Exchange("PCAP.*?")
+ yield ex
+ bits_fields = []
+ for line in ex.multiline:
+ split = line.split()
+ if len(split) == 4:
+ field_name, _, field_type, field_subtype = split
+
+ if field_type == "ext_out" and field_subtype == "bits":
+ bits_fields.append(f"PCAP.{field_name}")
+
+ exchanges = [Exchange(f"{field}.BITS?") for field in bits_fields]
+ yield exchanges
+ bits = {field: ex.multiline for field, ex in zip(bits_fields, exchanges)}
+ return bits
+
+
+
+
+[docs]
+class ChangeGroup(Enum):
+ """Which group of values to ask for ``*CHANGES`` on:
+ https://pandablocks-server.readthedocs.io/en/latest/commands.html#system-commands
+ """
+
+ #: All the groups below
+ ALL = ""
+ #: Configuration settings
+ CONFIG = ".CONFIG"
+ #: Bits on the system bus
+ BITS = ".BITS"
+ #: Positions
+ POSN = ".POSN"
+ #: Polled read values
+ READ = ".READ"
+ #: Attributes (included capture enable flags)
+ ATTR = ".ATTR"
+ #: Table changes
+ TABLE = ".TABLE"
+ #: Table changes
+ METADATA = ".METADATA"
+
+
+
+
+[docs]
+@dataclass
+class GetChanges(Command[Changes]):
+ """Get a `Changes` object showing which fields have changed since the last
+ time this was called
+
+ Args:
+ group: Restrict to a particular `ChangeGroup`
+ get_multiline: If `True`, return values of multiline fields in the
+ `multiline_values` attribute. Note that this will invoke additional network
+ requests.
+ If `False` these fields will instead be returned in the `no_value`
+ attribute. Default value is `False`.
+
+ For example::
+
+ GetChanges() -> Changes(
+ value={"PCAP.TRIG": "PULSE1.OUT"},
+ no_value=["SEQ1.TABLE"],
+ in_error=["BAD.ENUM"],
+ multiline_values={}
+ )
+
+ GetChanges(ChangeGroup.ALL, True) -> Changes(
+ values={"PCAP.TRIG": "PULSE1.OUT"},
+ no_value=[],
+ in_error=["BAD.ENUM"],
+ multiline_values={"SEQ1.TABLE" : ["1", "2", "3",...]}
+ )
+ """
+
+ group: ChangeGroup = ChangeGroup.ALL
+ get_multiline: bool = False
+
+ def execute(self) -> ExchangeGenerator[Changes]:
+ ex = Exchange(f"*CHANGES{self.group.value}?")
+ yield ex
+ changes = Changes({}, [], [], {})
+ multivalue_get_commands: list[tuple[str, GetMultiline]] = []
+ for line in ex.multiline:
+ if line[-1] == "<":
+ if self.get_multiline:
+ field = line[0:-1]
+ multivalue_get_commands.append((field, GetMultiline(field)))
+ else:
+ changes.no_value.append(line[:-1])
+
+ elif line.endswith("(error)"):
+ changes.in_error.append(line.split(" ", 1)[0])
+ else:
+ field, value = line.split("=", maxsplit=1)
+ changes.values[field] = value
+
+ if self.get_multiline:
+ multiline_vals = yield from _execute_commands(
+ *[item[1] for item in multivalue_get_commands]
+ )
+
+ for field, value in zip(
+ [item[0] for item in multivalue_get_commands], multiline_vals
+ ):
+ assert isinstance(value, list)
+ changes.multiline_values[field] = value
+
+ return changes
+
+
+
+
+[docs]
+@dataclass
+class GetState(Command[list[str]]):
+ """Get the state of all the fields in a PandA that should be saved as a
+ list of raw lines that could be sent with `SetState`.
+
+ NOTE: `GetState` may behave unexpectedly if `GetChanges` has previously been
+ called using the same client. The caller should use separate clients to avoid
+ potential issues.
+
+ For example::
+
+ GetState() -> [
+ "SEQ1.TABLE<B"
+ "234fds0SDklnmnr"
+ ""
+ "PCAP.TRIG=PULSE1.OUT",
+ ]
+ """
+
+ def execute(self) -> ExchangeGenerator[list[str]]:
+ # TODO: explain in detail how this works
+ # See: references/how-it-works
+ attr, config, table, metadata = yield from _execute_commands(
+ GetChanges(ChangeGroup.ATTR),
+ GetChanges(ChangeGroup.CONFIG),
+ GetChanges(ChangeGroup.TABLE),
+ GetChanges(ChangeGroup.METADATA),
+ )
+ # Add the single line values
+ line_values = dict(**attr.values, **config.values, **metadata.values)
+ state = [f"{k}={v}" for k, v in line_values.items()]
+ # Get the multiline values
+ multiline_keys, commands = [], []
+ for field_name in table.no_value:
+ # Get tables as base64
+ multiline_keys.append(f"{field_name}<B")
+ commands.append(GetMultiline(f"{field_name}.B"))
+ for field_name in metadata.no_value:
+ # Get metadata as string list
+ multiline_keys.append(f"{field_name}<")
+ commands.append(GetMultiline(f"{field_name}"))
+ multiline_values = yield from _execute_commands(*commands)
+ for k, v in zip(multiline_keys, multiline_values):
+ state += [k] + v + [""]
+ return state
+
+
+
+
+[docs]
+@dataclass
+class SetState(Command[None]):
+ """Set the state of all the fields in a PandA
+
+ Args:
+ state: A list of raw lines as produced by `GetState`
+
+ For example::
+
+ SetState([
+ "SEQ1.TABLE<B"
+ "234fds0SDklnmnr"
+ ""
+ "PCAP.TRIG=PULSE1.OUT",
+ ])
+ """
+
+ state: list[str]
+
+ def execute(self) -> ExchangeGenerator[None]:
+ commands: list[Raw] = []
+ command_lines: list[str] = []
+ for line in self.state:
+ command_lines.append(line)
+ first_line = len(command_lines) == 1
+ if (first_line and not is_multiline_command(line)) or not line:
+ # If not a multiline command
+ # Or blank line at the end of a multiline command
+ commands.append(Raw(command_lines))
+ command_lines = []
+ returns = yield from _execute_commands(*commands)
+ for command, ret in zip(commands, returns):
+ if ret != ["OK"]:
+ logging.warning(f"command {command.inp} failed with {ret}")
+
+
+import struct
+import sys
+import xml.etree.ElementTree as ET
+from collections import deque
+from collections.abc import Iterator
+from dataclasses import dataclass
+from typing import Any, Callable, Optional
+
+import numpy as np
+
+from ._exchange import Exchange, ExchangeGenerator, Exchanges
+from .commands import Command, CommandException
+from .responses import (
+ Data,
+ EndData,
+ EndReason,
+ FieldCapture,
+ FrameData,
+ ReadyData,
+ StartData,
+)
+
+# Define the public API of this module
+__all__ = [
+ "NeedMoreData",
+ "NoContextAvailable",
+ "Buffer",
+ "ControlConnection",
+ "DataConnection",
+]
+
+# The names of the samples field used for averaging unscaled fields
+# In newer versions it's GATE_DURATION but we keep SAMPLES for backwards
+# compatibility
+GATE_DURATION_FIELD = "PCAP.GATE_DURATION.Value"
+SAMPLES_FIELD = "PCAP.SAMPLES.Value"
+
+
+
+[docs]
+class NeedMoreData(Exception):
+ """Raised if the `Buffer` isn't full enough to return the requested bytes"""
+
+
+
+
+[docs]
+class NoContextAvailable(Exception):
+ """Raised if there were no contexts available for this connection.
+ This may result from calling `ControlConnection.receive_bytes()` without calling
+ `ControlConnection.send()`, or if there were unmatched sends/receives"""
+
+
+
+
+[docs]
+class Buffer:
+ """Byte storage that provides line reader and bytes reader interfaces.
+ For example::
+
+ buf = Buffer()
+ buf += bytes_from_server
+ line = buf.read_line() # raises NeedMoreData if no line
+ for line in buf:
+ pass
+ bytes = buf.read_bytes(50) # raises NeedMoreData if not enough bytes
+ """
+
+ def __init__(self):
+ self._buf = bytearray()
+
+ def __iadd__(self, byteslike: bytes):
+ """Add some data from the server"""
+ self._buf += byteslike
+ return self
+
+ def _extract_frame(self, num_to_extract, num_to_discard=0) -> bytearray:
+ # extract num_to_extract bytes from the start of the buffer
+ frame = self._buf[:num_to_extract]
+ # Update the buffer in place, to take advantage of bytearray's
+ # optimized delete-from-beginning feature.
+ del self._buf[: num_to_extract + num_to_discard]
+ return frame
+
+
+[docs]
+ def read_bytes(self, num: int) -> bytearray:
+ """Read and pop num bytes from the beginning of the buffer, raising
+ `NeedMoreData` if the buffer isn't full enough to do so"""
+ if num > len(self._buf):
+ raise NeedMoreData()
+ else:
+ return self._extract_frame(num)
+
+
+
+[docs]
+ def peek_bytes(self, num: int) -> bytearray:
+ """Read but do not pop num bytes from the beginning of the buffer,
+ raising `NeedMoreData` if the buffer isn't full enough to do so"""
+ if num > len(self._buf):
+ raise NeedMoreData()
+ else:
+ return self._buf[:num]
+
+
+
+[docs]
+ def read_line(self):
+ """Read and pop a newline terminated line (without terminator)
+ from the beginning of the buffer, raising `NeedMoreData` if the
+ buffer isn't full enough to do so"""
+ idx = self._buf.find(b"\n")
+ if idx < 0:
+ raise NeedMoreData()
+ else:
+ return self._extract_frame(idx, num_to_discard=1)
+
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> bytes:
+ try:
+ return self.read_line()
+ except NeedMoreData as err:
+ raise StopIteration() from err
+
+
+
+@dataclass
+class _ExchangeContext:
+ #: The exchange we should be filling
+ exchange: Exchange
+ #: The command that produced it
+ command: Command
+ #: If this was the last in the list, the generator to call next
+ generator: Optional[ExchangeGenerator[Any]] = None
+
+ def exception(self, e: Exception) -> CommandException:
+ msg = f"{self.command} raised error:\n{type(e).__name__}: {e}"
+ return CommandException(msg).with_traceback(e.__traceback__)
+
+
+
+[docs]
+class ControlConnection:
+ """Sans-IO connection to control port of PandA TCP server, supporting a
+ Command based interface. For example::
+
+ cc = ControlConnection()
+ # Connection says what bytes should be sent to execute command
+ to_send = cc.send(command)
+ socket.sendall(to_send)
+ while True:
+ # Repeatedly process bytes from the PandA
+ received = socket.recv()
+ # Sending any subsequent bytes to be sent back to the PandA
+ to_send = cc.receive_bytes(received)
+ socket.sendall(to_send)
+ # And processing the produced responses
+ for command, response in cc.responses()
+ do_something_with(response)
+ """
+
+ def __init__(self) -> None:
+ self._buf = Buffer()
+ self._lines: list[str] = []
+ self._contexts: deque[_ExchangeContext] = deque()
+ self._responses: deque[tuple[Command, Any]] = deque()
+
+ def _update_contexts(self, lines: list[str], is_multiline=False) -> bytes:
+ to_send = b""
+ if len(self._contexts) == 0:
+ raise NoContextAvailable()
+ context = self._contexts.popleft()
+ # Update the exchange with what we've got
+ context.exchange.received = lines
+ context.exchange.is_multiline = is_multiline
+ # If we're given a generator to run then do so
+ if context.generator:
+ try:
+ # Return the bytes from sending the next bit of the command
+ exchanges = next(context.generator)
+ except StopIteration as e:
+ # Command complete, store the result
+ self._responses.append((context.command, e.value))
+ except Exception as e:
+ # Command failed, store an exception
+ self._responses.append((context.command, context.exception(e)))
+ else:
+ to_send = b"".join(
+ self._bytes_from_exchanges(
+ exchanges, context.command, context.generator
+ )
+ )
+ return to_send
+
+ def _bytes_from_exchanges(
+ self, exchanges: Exchanges, command: Command, generator: ExchangeGenerator[Any]
+ ) -> Iterator[bytes]:
+ if not isinstance(exchanges, list):
+ exchanges = [exchanges]
+ # No Exchanges when a Command's yield is empty e.g. unexpected/unparseable data
+ # received from PandA
+ if len(exchanges) == 0:
+ return
+ for ex in exchanges:
+ context = _ExchangeContext(ex, command)
+ self._contexts.append(context)
+ text = "\n".join(ex.to_send) + "\n"
+ yield text.encode()
+ # The last exchange gets the generator so it triggers the next thing to send
+ context.generator = generator
+
+
+[docs]
+ def receive_bytes(self, received: bytes) -> bytes:
+ """Tell the connection that you have received some bytes off the network.
+ Parse these into high level responses which are yielded back by `responses`.
+ Return any bytes to send back"""
+ self._buf += received
+ is_multiline = bool(self._lines)
+ to_send = b""
+ for line_b in self._buf:
+ line = line_b.decode()
+ if not is_multiline:
+ # Check if we need to switch to multiline mode
+ is_multiline = line.startswith("!") or line == "."
+ if is_multiline:
+ # Add a new line to the buffer
+ self._lines.append(line)
+ if line == ".":
+ # End of multiline mode, return what we've got
+ to_send += self._update_contexts(self._lines, is_multiline)
+ self._lines = []
+ is_multiline = False
+ else:
+ # Check a correctly formatted response
+ assert line.startswith(
+ "!"
+ ), f"Multiline response {line} doesn't start with !"
+ else:
+ # Single line mode
+ assert (
+ not self._lines
+ ), f"Multiline response {self._lines} not terminated"
+ to_send += self._update_contexts([line])
+ return to_send
+
+
+
+[docs]
+ def responses(self) -> Iterator[tuple[Command, Any]]:
+ """Get the (command, response) tuples generated as part of the last
+ receive_bytes"""
+ while self._responses:
+ yield self._responses.popleft()
+
+
+
+[docs]
+ def send(self, command: Command) -> bytes:
+ """Tell the connection you want to send an event, and it will return
+ some bytes to send down the network
+ """
+ # If not given a partially run generator, start one here
+ generator = command.execute()
+ exchanges = next(generator)
+ to_send = b"".join(self._bytes_from_exchanges(exchanges, command, generator))
+ return to_send
+
+
+
+
+
+[docs]
+class DataConnection:
+ """Sans-IO connection to data port of PandA TCP server, supporting an
+ flushable iterator interface. For example::
+
+ dc = DataConnection()
+ # Single connection string to send
+ to_send = dc.connect()
+ socket.sendall(to_send)
+ while True:
+ # Repeatedly process bytes from the PandA looking for data
+ received = socket.recv()
+ for data in dc.receive_bytes(received):
+ do_something_with(data)
+ """
+
+ def __init__(self) -> None:
+ # TODO: could support big endian, but are there any systems out there?
+ assert sys.byteorder == "little", "PandA sends data little endian"
+ # Store of bytes received so far to parse in the handlers
+ self._buf = Buffer()
+ # Header text from PandA with field info
+ self._header = ""
+ # The next parsing handler that should be called if there is data in buffer
+ self._next_handler: Optional[Callable[[], Optional[Iterator[Data]]]] = None
+ # numpy dtype of produced FrameData
+ self._frame_dtype = None
+ # frame data that has been received but not flushed yet
+ self._partial_data = bytearray()
+ # whether to flush after every frame
+ self._flush_every_frame = False
+
+ def _handle_connected(self):
+ # Get the response from connect()
+ line = self._buf.read_line()
+ assert line == b"OK", f"Expected OK, got {line!r}"
+ yield ReadyData()
+ self._next_handler = self._handle_header_start
+
+ def _handle_header_start(self):
+ # Discard lines until we see header start tag
+ line = self._buf.read_line()
+ if line == b"<header>":
+ self._header = line
+ self._next_handler = self._handle_header_body
+
+ def _handle_header_body(self):
+ # Accumumlate header until the end tag, then parese and return
+ line = self._buf.read_line()
+ self._header += line
+ if line == b"</header>":
+ fields = []
+ root = ET.fromstring(self._header)
+
+ for field in root.find("fields"):
+ fields.append(
+ FieldCapture(
+ name=str(field.get("name")),
+ type=np.dtype(field.get("type")),
+ capture=str(field.get("capture")),
+ scale=float(scale)
+ if (scale := field.get("scale")) is not None
+ else None,
+ offset=float(offset)
+ if (offset := field.get("offset")) is not None
+ else None,
+ units=str(units)
+ if (units := field.get("units")) is not None
+ else None,
+ )
+ )
+ data = root.find("data")
+ sample_bytes = int(data.get("sample_bytes"))
+ if sample_bytes - sum(f.type.itemsize for f in fields) == 4:
+ # In raw mode with panda-server < 2.1 samples wasn't
+ # put in if not specifically requested, but was still
+ # sent
+ name, capture = SAMPLES_FIELD.rsplit(".", maxsplit=1)
+ fields.insert(
+ 0,
+ FieldCapture(
+ name=name,
+ type=np.dtype("uint32"),
+ capture=capture,
+ scale=None,
+ offset=None,
+ units=None,
+ ),
+ )
+ self._frame_dtype = np.dtype(
+ [(f"{f.name}.{f.capture}", f.type) for f in fields]
+ )
+
+ try:
+ hw_time_offset_ns = np.int64(data.get("hw_time_offset_ns", ""))
+ except ValueError:
+ hw_time_offset_ns = None
+
+ yield StartData(
+ fields=fields,
+ missed=int(data.get("missed")),
+ process=str(data.get("process")),
+ format=str(data.get("format")),
+ sample_bytes=sample_bytes,
+ arm_time=data.get("arm_time", None),
+ start_time=data.get("start_time", None),
+ hw_time_offset_ns=hw_time_offset_ns,
+ )
+ self._next_handler = self._handle_header_end
+
+ def _handle_header_end(self):
+ # Discard the newline at the end of the header
+ assert self._buf.read_bytes(1) == b"\n", "Expected newline at end of header"
+ self._next_handler = self._handle_data_start
+
+ def _handle_data_start(self):
+ # Handle "BIN " or "END "
+ bytes = self._buf.read_bytes(4)
+ if bytes == b"BIN ":
+ self._next_handler = self._handle_data_frame
+ elif bytes == b"END ":
+ self._next_handler = self._handle_data_end
+ else:
+ raise ValueError(f"Bad data '{bytes}'")
+
+ def _handle_data_frame(self):
+ # Handle a whole data frame
+ # Peek message length as uint32 LE
+ # length = len("BIN " + 4_bytes_encoded_length + data)
+ length = struct.unpack("<I", self._buf.peek_bytes(4))[0]
+ # we already read "BIN ", so read the rest
+ data = self._buf.read_bytes(length - 4)[4:]
+ self._partial_data += data
+ # if told to flush now, then yield what we have
+ if self._flush_every_frame:
+ yield from self.flush()
+ self._next_handler = self._handle_data_start
+
+ def _handle_data_end(self):
+ # Handle the end reason
+ samples, reason = self._buf.read_line().split(maxsplit=1)
+ reason_enum = EndReason(reason.decode())
+ # Flush whatever is not already flushed
+ yield from self.flush()
+ yield EndData(samples=int(samples), reason=reason_enum)
+ self._next_handler = self._handle_header_start
+
+
+[docs]
+ def receive_bytes(self, received: bytes, flush_every_frame=True) -> Iterator[Data]:
+ """Tell the connection that you have received some bytes off the network.
+ Parse these into Data structures and yield them back.
+
+ Args:
+ received: the bytes you received from the socket
+ flush_every_frame: Whether to flush `FrameData` as soon as received.
+ If False then they will only be sent if `flush` is called or
+ end of acquisition reached
+ """
+ assert self._next_handler, "Connect not called"
+ self._flush_every_frame = flush_every_frame
+ self._buf += received
+ while True:
+ # Each of these handlers should call read at most once, so that
+ # if we don't have enough data we don't lose partial data
+ try:
+ ret = self._next_handler()
+ if ret:
+ # This is an iterator of Data objects
+ yield from ret
+ except NeedMoreData:
+ break
+
+
+
+[docs]
+ def flush(self) -> Iterator[FrameData]:
+ """If there is a partial data frame, pop and yield it"""
+ if self._partial_data:
+ # Make a numpy array wrapper to the bytearray, no copying here
+ data = np.frombuffer(self._partial_data, self._frame_dtype)
+ # Make a new bytearray, numpy view will keep the reference
+ # to the old one so can't clear it in place
+ self._partial_data = bytearray()
+ yield FrameData(data)
+
+
+
+[docs]
+ def connect(self, scaled: bool) -> bytes:
+ """Return the bytes that need to be sent on connection"""
+ assert not self._next_handler, "Can't connect while midway through collection"
+ self._next_handler = self._handle_connected
+ if scaled:
+ return b"XML FRAMED SCALED\n"
+ else:
+ return b"XML FRAMED RAW\n"
+
+
+
+import logging
+import queue
+import threading
+from collections.abc import Iterator
+from typing import Any, Callable, Optional
+
+import h5py
+import numpy as np
+
+from pandablocks.commands import Arm
+
+from .asyncio import AsyncioClient
+from .connections import GATE_DURATION_FIELD, SAMPLES_FIELD
+from .responses import EndData, EndReason, FieldCapture, FrameData, ReadyData, StartData
+
+# Define the public API of this module
+__all__ = [
+ "Pipeline",
+ "HDFWriter",
+ "FrameProcessor",
+ "HDFDataOverrunException",
+ "create_pipeline",
+ "create_default_pipeline",
+ "stop_pipeline",
+ "write_hdf_files",
+]
+
+
+
+[docs]
+class HDFDataOverrunException(Exception):
+ """Raised if `DATA_OVERRUN` occurs while receiving data for HDF file"""
+
+
+
+class Stop:
+ def __str__(self) -> "str":
+ return "<Stop>"
+
+ __repr__ = __str__
+
+
+STOP = Stop()
+
+
+
+[docs]
+class Pipeline(threading.Thread):
+ """Helper class that runs a pipeline consumer process in its own thread"""
+
+ #: Subclasses should create this dictionary with handlers for each data
+ #: type, returning transformed data that should be passed downstream
+ what_to_do: dict[type, Callable]
+ downstream: Optional["Pipeline"] = None
+
+ def __init__(self):
+ super().__init__()
+ self.queue: queue.Queue[Any] = queue.Queue() # type: ignore
+
+ def run(self):
+ while True:
+ data = self.queue.get()
+ if data is STOP:
+ # stop() called below
+ break
+ else:
+ func = self.what_to_do.get(type(data), None)
+ if func:
+ # If we have a handler, use it to transform the data
+ data = func(data)
+ if self.downstream and data is not None:
+ # Pass the (possibly transformed) data downstream
+ self.downstream.queue.put_nowait(data)
+
+
+[docs]
+ def stop(self):
+ """Stop the processing after the current queue has been emptied"""
+ self.queue.put(STOP)
+
+
+
+
+
+[docs]
+class HDFWriter(Pipeline):
+ """Write an HDF file per data collection. Each field will be
+ written in a 1D dataset ``/<field.name>.<field.capture>``.
+
+ Args:
+ file_names: Iterator of file names. Must be full file paths. Will be called once
+ per file created.
+ capture_record_hdf_names: A dictionary of alternate dataset names to use for
+ each field. For example
+
+ .. code-block:: python
+
+ {
+ "COUNTER1.OUT": {
+ "Value": "name",
+ "Min": "name-min",
+ "Max": "name-max"
+ }
+ }
+ """
+
+ def __init__(
+ self,
+ file_names: Iterator[str],
+ capture_record_hdf_names: dict[str, dict[str, str]],
+ ):
+ super().__init__()
+ self.file_names = file_names
+ self.hdf_file: Optional[h5py.File] = None
+ self.datasets: list[h5py.Dataset] = []
+ self.capture_record_hdf_names = capture_record_hdf_names
+ self.what_to_do = {
+ StartData: self.open_file,
+ list: self.write_frame,
+ EndData: self.close_file,
+ }
+
+ def create_dataset(self, field: FieldCapture, raw: bool):
+ # Data written in a big stack, growing in that dimension
+ assert self.hdf_file, "File not open yet"
+
+ dataset_name = self.capture_record_hdf_names.get(field.name, {}).get(
+ field.capture, f"{field.name}.{field.capture}"
+ )
+
+ dtype = field.raw_mode_dataset_dtype if raw else field.type
+
+ return self.hdf_file.create_dataset(
+ f"/{dataset_name}",
+ dtype=dtype,
+ shape=(0,),
+ maxshape=(None,),
+ )
+
+ def open_file(self, data: StartData):
+ try:
+ self.file_path = next(self.file_names)
+ except IndexError:
+ logging.exception(
+ "Not enough file names available when opening new HDF5 file"
+ )
+ raise
+ self.hdf_file = h5py.File(self.file_path, "w", libver="latest")
+ raw = data.process == "Raw"
+ self.datasets = [self.create_dataset(field, raw) for field in data.fields]
+ self.hdf_file.swmr_mode = True
+
+ # Save parameters
+ if data.arm_time is not None:
+ self.hdf_file.attrs["arm_time"] = data.arm_time
+ if data.start_time is not None:
+ self.hdf_file.attrs["start_time"] = data.start_time
+ if data.hw_time_offset_ns is not None:
+ self.hdf_file.attrs["hw_time_offset_ns"] = data.hw_time_offset_ns
+
+ logging.info(
+ f"Opened '{self.file_path}' with {data.sample_bytes} byte samples "
+ f"stored in {len(self.datasets)} datasets"
+ )
+
+ def write_frame(self, data: list[np.ndarray]):
+ for dataset, column in zip(self.datasets, data):
+ # Append to the end, flush when done
+ written = dataset.shape[0]
+ dataset.resize((written + column.shape[0],))
+ dataset[written:] = column
+ dataset.flush()
+
+ # Return the number of samples written
+ return dataset.shape[0]
+
+ def close_file(self, data: EndData):
+ assert self.hdf_file, "File not open yet"
+ self.hdf_file.close()
+ self.hdf_file = None
+ logging.info(
+ f"Closed '{self.file_path}' after receiving {data.samples} "
+ f"samples. End reason is '{data.reason.value}'"
+ )
+ self.file_path = ""
+
+
+
+
+[docs]
+class FrameProcessor(Pipeline):
+ """Scale field data according to the information in the StartData"""
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.processors: list[Callable] = []
+ self.what_to_do = {
+ StartData: self.create_processors,
+ FrameData: self.scale_data,
+ }
+
+ def create_processor(self, field: FieldCapture, raw: bool):
+ column_name = f"{field.name}.{field.capture}"
+
+ if raw and field.capture == "Mean":
+
+ def mean_callable(data):
+ if GATE_DURATION_FIELD in data.dtype.names:
+ gate_duration = data[GATE_DURATION_FIELD]
+ else:
+ gate_duration = data[SAMPLES_FIELD]
+
+ return (data[column_name] * field.scale / gate_duration) + field.offset
+
+ return mean_callable
+ elif raw and field.has_scale_or_offset:
+ return lambda data: data[column_name] * field.scale + field.offset
+ else:
+ return lambda data: data[column_name]
+
+ def create_processors(self, data: StartData) -> StartData:
+ raw = data.process == "Raw"
+ self.processors = [self.create_processor(field, raw) for field in data.fields]
+ return data
+
+ def scale_data(self, data: FrameData) -> list[np.ndarray]:
+ return [process(data.data) for process in self.processors]
+
+
+
+
+[docs]
+def create_default_pipeline(
+ file_names: Iterator[str],
+ capture_record_hdf_names: dict[str, dict[str, str]],
+ *additional_downstream_pipelines: Pipeline,
+) -> list[Pipeline]:
+ """Create the default processing pipeline consisting of one `FrameProcessor` and
+ one `HDFWriter`. See `create_pipeline` for more details.
+
+ Args:
+ file_names: Iterator of file names. Must be full file paths. Will be called once
+ per file created. As required by `HDFWriter`.
+ capture_record_hdf_names: A dictionary of dataset names to use for each field.
+ The keys are record names, the values are another dictionary of
+ capture type to dataset name.
+ additional_downstream_pipelines: Any number of additional pipelines to add
+ downstream.
+ """
+
+ return create_pipeline(
+ FrameProcessor(),
+ HDFWriter(file_names, capture_record_hdf_names),
+ *additional_downstream_pipelines,
+ )
+
+
+
+
+[docs]
+def create_pipeline(*elements: Pipeline) -> list[Pipeline]:
+ """Create a pipeline of elements, wiring them and starting them before
+ returning them"""
+ pipeline: list[Pipeline] = []
+ for element in elements:
+ if pipeline:
+ pipeline[-1].downstream = element
+ pipeline.append(element)
+ element.start()
+ return pipeline
+
+
+
+
+[docs]
+def stop_pipeline(pipeline: list[Pipeline]):
+ """Stop and join each element of the pipeline"""
+ for element in pipeline:
+ # Note that we stop and join each element in turn.
+ # This ensures all data is flushed all the way down
+ # even if there is lots left in a queue
+ element.stop()
+ element.join()
+
+
+
+
+[docs]
+async def write_hdf_files(
+ client: AsyncioClient,
+ file_names: Iterator[str],
+ num: int = 1,
+ arm: bool = False,
+ flush_period: float = 1,
+):
+ """Connect to host PandA data port, and write num acquisitions
+ to HDF file according to scheme
+
+ Args:
+ client: The `AsyncioClient` to use for communications
+ file_names: Iterator of file names. Must be full file paths. Will be called once
+ per file created.
+ num: The number of acquisitions to store in separate files. 0 = Infinite capture
+ arm: Whether to arm PCAP at the start, and after each successful acquisition
+
+ Raises:
+ HDFDataOverrunException: if there is a data overrun.
+ """
+ counter = 0
+
+ end_data = None
+ pipeline = create_default_pipeline(file_names, {})
+ try:
+ async for data in client.data(scaled=False, flush_period=flush_period):
+ pipeline[0].queue.put_nowait(data)
+ if isinstance(data, EndData):
+ end_data = data
+ counter += 1
+ if counter == num:
+ # We produced the right number of frames
+ break
+ if type(data) in (ReadyData, EndData) and arm:
+ # Told to arm at the beginning, and after each acquisition ends
+ await client.send(Arm())
+ if end_data and end_data.reason == EndReason.DATA_OVERRUN:
+ raise HDFDataOverrunException(
+ "Data overrun - streaming aborted! Last frame may be corrupt."
+ )
+ finally:
+ stop_pipeline(pipeline)
+
+
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Optional
+
+import numpy as np
+
+# Define the public API of this module
+__all__ = [
+ "BlockInfo",
+ "FieldInfo",
+ "UintFieldInfo",
+ "ScalarFieldInfo",
+ "TimeFieldInfo",
+ "SubtypeTimeFieldInfo",
+ "EnumFieldInfo",
+ "BitOutFieldInfo",
+ "BitMuxFieldInfo",
+ "PosMuxFieldInfo",
+ "TableFieldDetails",
+ "TableFieldInfo",
+ "PosOutFieldInfo",
+ "ExtOutFieldInfo",
+ "ExtOutBitsFieldInfo",
+ "Changes",
+ "EndReason",
+ "FieldCapture",
+ "Data",
+ "ReadyData",
+ "StartData",
+ "FrameData",
+ "EndData",
+]
+
+# Control
+
+
+
+[docs]
+@dataclass
+class BlockInfo:
+ """Block number and description as exposed by the TCP server
+
+ Attributes:
+ number: The index of this block
+ description: The description for this block"""
+
+ number: int = 0
+ description: Optional[str] = None
+
+
+
+
+[docs]
+@dataclass
+class FieldInfo:
+ """Field type, subtype, description and labels as exposed by TCP server:
+ https://pandablocks-server.readthedocs.io/en/latest/fields.html#field-types
+
+ Note that many fields will use a more specialised subclass of FieldInfo for
+ their additional attributes.
+
+ Attributes:
+ type: Field type, like "param", "bit_out", "pos_mux", etc.
+ subtype: Some types have subtype, like "uint", "scalar", "lut", etc.
+ description: A description of the field
+ labels: A list of the valid values for the field when there is a defined list
+ of valid values, e.g. those with sub-type "enum"
+ """
+
+ type: str
+ subtype: Optional[str]
+ description: Optional[str]
+
+
+
+
+[docs]
+@dataclass
+class UintFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "param","read", or "write" and subtype
+ "uint"""
+
+ max_val: int
+
+
+
+
+[docs]
+@dataclass
+class ScalarFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "param","read", or "write" and subtype
+ "scalar"""
+
+ units: str
+ scale: float
+ offset: float
+
+
+
+
+[docs]
+@dataclass
+class TimeFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "time"""
+
+ units_labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class SubtypeTimeFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "param","read", or "write" and subtype
+ "time"""
+
+ units_labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class EnumFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "param","read", or "write" and subtype
+ "enum"""
+
+ labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class BitOutFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "bit_out"""
+
+ capture_word: str
+ offset: int
+
+
+
+
+[docs]
+@dataclass
+class BitMuxFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "bit_mux"""
+
+ max_delay: int
+ labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class PosMuxFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "pos_mux"""
+
+ labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class TableFieldDetails:
+ """Info for each field in a table"""
+
+ subtype: str
+ bit_low: int
+ bit_high: int
+ description: Optional[str] = None
+ labels: Optional[list[str]] = None
+
+
+
+
+[docs]
+@dataclass
+class TableFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "table"""
+
+ max_length: int
+ fields: dict[str, TableFieldDetails]
+ row_words: int
+
+
+
+
+[docs]
+@dataclass
+class PosOutFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "pos_out"""
+
+ capture_labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class ExtOutFieldInfo(FieldInfo):
+ """Extended `FieldInfo` for fields with type "ext_out" and subtypes "timestamp"
+ or "samples"""
+
+ capture_labels: list[str]
+
+
+
+
+[docs]
+@dataclass
+class ExtOutBitsFieldInfo(ExtOutFieldInfo):
+ """Extended `ExtOutFieldInfo` for fields with type "ext_out" and subtype "bits"""
+
+ bits: list[str]
+
+
+
+
+[docs]
+@dataclass
+class Changes:
+ """The changes returned from a ``*CHANGES`` command"""
+
+ #: Map field -> value for single-line values that were returned
+ values: dict[str, str]
+ #: The fields that were present but without value
+ no_value: list[str]
+ #: The fields that were in error
+ in_error: list[str]
+ #: Map field -> value for multi-line values that were returned
+ multiline_values: dict[str, list[str]]
+
+
+
+# Data
+
+
+
+[docs]
+class EndReason(Enum):
+ """The reason that a PCAP acquisition completed"""
+
+ #: Experiment completed by falling edge of ``PCAP.ENABLE```
+ OK = "Ok"
+ #: Client disconnect detected
+ EARLY_DISCONNECT = "Early disconnect"
+ #: Client not taking data quickly or network congestion, internal buffer overflow.
+ #: In raw unscaled mode (i.e., no server-side scaling), the most recent
+ #: `FrameData` is likely corrupted.
+ DATA_OVERRUN = "Data overrun"
+ #: Triggers too fast for configured data capture
+ FRAMING_ERROR = "Framing error"
+ #: Probable CPU overload on PandA, should not occur
+ DRIVER_DATA_OVERRUN = "Driver data overrun"
+ #: Data capture too fast for memory bandwidth
+ DMA_DATA_ERROR = "DMA data error"
+ # Reasons below this point are not from the server, they are generated in code
+ #: An unknown exception occurred during HDF5 file processing
+ UNKNOWN_EXCEPTION = "Unknown exception"
+ #: StartData packets did not match when trying to continue printing to a file
+ START_DATA_MISMATCH = "Start Data mismatched"
+ #: Experiment manually completed by ``DATA:CAPTURE``
+ MANUALLY_STOPPED = "Manually stopped"
+ #: Experiment manually completed by ``*PCAP.DISARM=`` command
+ DISARMED = "Disarmed"
+
+
+
+
+[docs]
+@dataclass
+class FieldCapture:
+ """Information about a field that is being captured
+
+ If scale, offset, and units are all `None`, then the field is a
+ ``PCAP.BITS``.
+
+ Attributes:
+ name: Name of captured field
+ type: Numpy data type of the field as transmitted
+ capture: Value of CAPTURE field used to enable this field
+ scale: Scaling factor
+ offset: Offset
+ units: Units string
+ """
+
+ name: str
+ type: np.dtype
+ capture: str
+ scale: Optional[float] = field(default=None)
+ offset: Optional[float] = field(default=None)
+ units: Optional[str] = field(default=None)
+
+ def __post_init__(self):
+ sou = (self.scale, self.offset, self.units)
+ if sou != (None, None, None) and None in sou:
+ raise ValueError(
+ f"If any of `scale={self.scale}`, `offset={self.offset}`"
+ f", or `units={self.units}` is set, all must be set."
+ )
+
+ @property
+ def raw_mode_dataset_dtype(self) -> np.dtype:
+ """We use double for all dtypes that have scale and offset."""
+ if self.scale is not None and self.offset is not None:
+ return np.dtype("float64")
+ return self.type
+
+ @property
+ def has_scale_or_offset(self) -> bool:
+ """Return True if this field is a PCAP.BITS or PCAP.SAMPLES field"""
+ return (self.scale is not None and self.offset is not None) and (
+ self.scale != 1 or self.offset != 0
+ )
+
+
+
+
+
+
+
+
+[docs]
+@dataclass
+class ReadyData(Data):
+ """Yielded once when the connection is established and ready to take data"""
+
+
+
+
+[docs]
+@dataclass
+class StartData(Data):
+ """Yielded when a new PCAP acquisition starts.
+
+ Attributes:
+ fields: Information about each captured field as a `FieldCapture` object
+ missed: Number of samples missed by late data port connection
+ process: Data processing option, only "Scaled" or "Raw" are requested
+ format: Data delivery formatting, only "Framed" is requested
+ sample_bytes: Number of bytes in one sample
+ """
+
+ fields: list[FieldCapture]
+ missed: int
+ process: str
+ format: str
+ sample_bytes: int
+ arm_time: Optional[str]
+ start_time: Optional[str]
+ hw_time_offset_ns: Optional[int]
+
+
+
+
+[docs]
+@dataclass
+class FrameData(Data):
+ """Yielded when a new data frame is flushed.
+
+ Attributes:
+ data: A numpy `Structured Array <structured_arrays>`
+
+ Data is structured into complete columns. Each column name is
+ ``<name>.<capture>`` from the corresponding `FieldInfo`. Data
+ can be accessed with these column names. For example::
+
+ # Table view with 2 captured fields
+ >>> import numpy
+ >>> data = numpy.array([(0, 10),
+ ... (1, 11),
+ ... (2, 12)],
+ ... dtype=[('COUNTER1.OUT.Value', '<f8'), ('COUNTER2.OUT.Value', '<f8')])
+ >>> fdata = FrameData(data)
+ >>> (fdata.data[0]['COUNTER1.OUT.Value'], fdata.data[0]['COUNTER2.OUT.Value'])
+ (np.float64(0.0), np.float64(10.0))
+ >>> fdata.column_names # Column names
+ ('COUNTER1.OUT.Value', 'COUNTER2.OUT.Value')
+ >>> fdata.data['COUNTER1.OUT.Value'] # Column view
+ array([0., 1., 2.])
+ """
+
+ data: np.ndarray
+
+ @property
+ def column_names(self) -> tuple[str, ...]:
+ """Return all the column names"""
+ names = self.data.dtype.names
+ assert names, f"No column names for {self.data.dtype}"
+ return names
+
+
+
+
+[docs]
+@dataclass
+class EndData(Data):
+ """Yielded when a PCAP acquisition ends.
+
+ Attributes:
+ samples: The total number of samples (rows) that were yielded
+ reason: The `EndReason` for the end of acquisition
+ """
+
+ samples: int
+ reason: EndReason
+
+
Short
+ */ + .o-tooltip--left { + position: relative; + } + + .o-tooltip--left:after { + opacity: 0; + visibility: hidden; + position: absolute; + content: attr(data-tooltip); + padding: .2em; + font-size: .8em; + left: -.2em; + background: grey; + color: white; + white-space: nowrap; + z-index: 2; + border-radius: 2px; + transform: translateX(-102%) translateY(0); + transition: opacity 0.2s cubic-bezier(0.64, 0.09, 0.08, 1), transform 0.2s cubic-bezier(0.64, 0.09, 0.08, 1); +} + +.o-tooltip--left:hover:after { + display: block; + opacity: 1; + visibility: visible; + transform: translateX(-100%) translateY(0); + transition: opacity 0.2s cubic-bezier(0.64, 0.09, 0.08, 1), transform 0.2s cubic-bezier(0.64, 0.09, 0.08, 1); + transition-delay: .5s; +} + +/* By default the copy button shouldn't show up when printing a page */ +@media print { + button.copybtn { + display: none; + } +} diff --git a/0.10.1/_static/copybutton.js b/0.10.1/_static/copybutton.js new file mode 100644 index 00000000..e0da1932 --- /dev/null +++ b/0.10.1/_static/copybutton.js @@ -0,0 +1,248 @@ +// Localization support +const messages = { + 'en': { + 'copy': 'Copy', + 'copy_to_clipboard': 'Copy to clipboard', + 'copy_success': 'Copied!', + 'copy_failure': 'Failed to copy', + }, + 'es' : { + 'copy': 'Copiar', + 'copy_to_clipboard': 'Copiar al portapapeles', + 'copy_success': '¡Copiado!', + 'copy_failure': 'Error al copiar', + }, + 'de' : { + 'copy': 'Kopieren', + 'copy_to_clipboard': 'In die Zwischenablage kopieren', + 'copy_success': 'Kopiert!', + 'copy_failure': 'Fehler beim Kopieren', + }, + 'fr' : { + 'copy': 'Copier', + 'copy_to_clipboard': 'Copier dans le presse-papier', + 'copy_success': 'Copié !', + 'copy_failure': 'Échec de la copie', + }, + 'ru': { + 'copy': 'Скопировать', + 'copy_to_clipboard': 'Скопировать в буфер', + 'copy_success': 'Скопировано!', + 'copy_failure': 'Не удалось скопировать', + }, + 'zh-CN': { + 'copy': '复制', + 'copy_to_clipboard': '复制到剪贴板', + 'copy_success': '复制成功!', + 'copy_failure': '复制失败', + }, + 'it' : { + 'copy': 'Copiare', + 'copy_to_clipboard': 'Copiato negli appunti', + 'copy_success': 'Copiato!', + 'copy_failure': 'Errore durante la copia', + } +} + +let locale = 'en' +if( document.documentElement.lang !== undefined + && messages[document.documentElement.lang] !== undefined ) { + locale = document.documentElement.lang +} + +let doc_url_root = DOCUMENTATION_OPTIONS.URL_ROOT; +if (doc_url_root == '#') { + doc_url_root = ''; +} + +/** + * SVG files for our copy buttons + */ +let iconCheck = `` + +// If the user specified their own SVG use that, otherwise use the default +let iconCopy = ``; +if (!iconCopy) { + iconCopy = `` +} + +/** + * Set up copy/paste for code blocks + */ + +const runWhenDOMLoaded = cb => { + if (document.readyState != 'loading') { + cb() + } else if (document.addEventListener) { + document.addEventListener('DOMContentLoaded', cb) + } else { + document.attachEvent('onreadystatechange', function() { + if (document.readyState == 'complete') cb() + }) + } +} + +const codeCellId = index => `codecell${index}` + +// Clears selected text since ClipboardJS will select the text when copying +const clearSelection = () => { + if (window.getSelection) { + window.getSelection().removeAllRanges() + } else if (document.selection) { + document.selection.empty() + } +} + +// Changes tooltip text for a moment, then changes it back +// We want the timeout of our `success` class to be a bit shorter than the +// tooltip and icon change, so that we can hide the icon before changing back. +var timeoutIcon = 2000; +var timeoutSuccessClass = 1500; + +const temporarilyChangeTooltip = (el, oldText, newText) => { + el.setAttribute('data-tooltip', newText) + el.classList.add('success') + // Remove success a little bit sooner than we change the tooltip + // So that we can use CSS to hide the copybutton first + setTimeout(() => el.classList.remove('success'), timeoutSuccessClass) + setTimeout(() => el.setAttribute('data-tooltip', oldText), timeoutIcon) +} + +// Changes the copy button icon for two seconds, then changes it back +const temporarilyChangeIcon = (el) => { + el.innerHTML = iconCheck; + setTimeout(() => {el.innerHTML = iconCopy}, timeoutIcon) +} + +const addCopyButtonToCodeCells = () => { + // If ClipboardJS hasn't loaded, wait a bit and try again. This + // happens because we load ClipboardJS asynchronously. + if (window.ClipboardJS === undefined) { + setTimeout(addCopyButtonToCodeCells, 250) + return + } + + // Add copybuttons to all of our code cells + const COPYBUTTON_SELECTOR = 'div.highlight pre'; + const codeCells = document.querySelectorAll(COPYBUTTON_SELECTOR) + codeCells.forEach((codeCell, index) => { + const id = codeCellId(index) + codeCell.setAttribute('id', id) + + const clipboardButton = id => + `` + codeCell.insertAdjacentHTML('afterend', clipboardButton(id)) + }) + +function escapeRegExp(string) { + return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string +} + +/** + * Removes excluded text from a Node. + * + * @param {Node} target Node to filter. + * @param {string} exclude CSS selector of nodes to exclude. + * @returns {DOMString} Text from `target` with text removed. + */ +function filterText(target, exclude) { + const clone = target.cloneNode(true); // clone as to not modify the live DOM + if (exclude) { + // remove excluded nodes + clone.querySelectorAll(exclude).forEach(node => node.remove()); + } + return clone.innerText; +} + +// Callback when a copy button is clicked. Will be passed the node that was clicked +// should then grab the text and replace pieces of text that shouldn't be used in output +function formatCopyText(textContent, copybuttonPromptText, isRegexp = false, onlyCopyPromptLines = true, removePrompts = true, copyEmptyLines = true, lineContinuationChar = "", hereDocDelim = "") { + var regexp; + var match; + + // Do we check for line continuation characters and "HERE-documents"? + var useLineCont = !!lineContinuationChar + var useHereDoc = !!hereDocDelim + + // create regexp to capture prompt and remaining line + if (isRegexp) { + regexp = new RegExp('^(' + copybuttonPromptText + ')(.*)') + } else { + regexp = new RegExp('^(' + escapeRegExp(copybuttonPromptText) + ')(.*)') + } + + const outputLines = []; + var promptFound = false; + var gotLineCont = false; + var gotHereDoc = false; + const lineGotPrompt = []; + for (const line of textContent.split('\n')) { + match = line.match(regexp) + if (match || gotLineCont || gotHereDoc) { + promptFound = regexp.test(line) + lineGotPrompt.push(promptFound) + if (removePrompts && promptFound) { + outputLines.push(match[2]) + } else { + outputLines.push(line) + } + gotLineCont = line.endsWith(lineContinuationChar) & useLineCont + if (line.includes(hereDocDelim) & useHereDoc) + gotHereDoc = !gotHereDoc + } else if (!onlyCopyPromptLines) { + outputLines.push(line) + } else if (copyEmptyLines && line.trim() === '') { + outputLines.push(line) + } + } + + // If no lines with the prompt were found then just use original lines + if (lineGotPrompt.some(v => v === true)) { + textContent = outputLines.join('\n'); + } + + // Remove a trailing newline to avoid auto-running when pasting + if (textContent.endsWith("\n")) { + textContent = textContent.slice(0, -1) + } + return textContent +} + + +var copyTargetText = (trigger) => { + var target = document.querySelector(trigger.attributes['data-clipboard-target'].value); + + // get filtered text + let exclude = '.linenos'; + + let text = filterText(target, exclude); + return formatCopyText(text, '>>> |\\.\\.\\. |\\$ |In \\[\\d*\\]: | {2,5}\\.\\.\\.: | {5,8}: ', true, true, true, true, '', '') +} + + // Initialize with a callback so we can modify the text before copy + const clipboard = new ClipboardJS('.copybtn', {text: copyTargetText}) + + // Update UI with error/success messages + clipboard.on('success', event => { + clearSelection() + temporarilyChangeTooltip(event.trigger, messages[locale]['copy'], messages[locale]['copy_success']) + temporarilyChangeIcon(event.trigger) + }) + + clipboard.on('error', event => { + temporarilyChangeTooltip(event.trigger, messages[locale]['copy'], messages[locale]['copy_failure']) + }) +} + +runWhenDOMLoaded(addCopyButtonToCodeCells) \ No newline at end of file diff --git a/0.10.1/_static/copybutton_funcs.js b/0.10.1/_static/copybutton_funcs.js new file mode 100644 index 00000000..dbe1aaad --- /dev/null +++ b/0.10.1/_static/copybutton_funcs.js @@ -0,0 +1,73 @@ +function escapeRegExp(string) { + return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string +} + +/** + * Removes excluded text from a Node. + * + * @param {Node} target Node to filter. + * @param {string} exclude CSS selector of nodes to exclude. + * @returns {DOMString} Text from `target` with text removed. + */ +export function filterText(target, exclude) { + const clone = target.cloneNode(true); // clone as to not modify the live DOM + if (exclude) { + // remove excluded nodes + clone.querySelectorAll(exclude).forEach(node => node.remove()); + } + return clone.innerText; +} + +// Callback when a copy button is clicked. Will be passed the node that was clicked +// should then grab the text and replace pieces of text that shouldn't be used in output +export function formatCopyText(textContent, copybuttonPromptText, isRegexp = false, onlyCopyPromptLines = true, removePrompts = true, copyEmptyLines = true, lineContinuationChar = "", hereDocDelim = "") { + var regexp; + var match; + + // Do we check for line continuation characters and "HERE-documents"? + var useLineCont = !!lineContinuationChar + var useHereDoc = !!hereDocDelim + + // create regexp to capture prompt and remaining line + if (isRegexp) { + regexp = new RegExp('^(' + copybuttonPromptText + ')(.*)') + } else { + regexp = new RegExp('^(' + escapeRegExp(copybuttonPromptText) + ')(.*)') + } + + const outputLines = []; + var promptFound = false; + var gotLineCont = false; + var gotHereDoc = false; + const lineGotPrompt = []; + for (const line of textContent.split('\n')) { + match = line.match(regexp) + if (match || gotLineCont || gotHereDoc) { + promptFound = regexp.test(line) + lineGotPrompt.push(promptFound) + if (removePrompts && promptFound) { + outputLines.push(match[2]) + } else { + outputLines.push(line) + } + gotLineCont = line.endsWith(lineContinuationChar) & useLineCont + if (line.includes(hereDocDelim) & useHereDoc) + gotHereDoc = !gotHereDoc + } else if (!onlyCopyPromptLines) { + outputLines.push(line) + } else if (copyEmptyLines && line.trim() === '') { + outputLines.push(line) + } + } + + // If no lines with the prompt were found then just use original lines + if (lineGotPrompt.some(v => v === true)) { + textContent = outputLines.join('\n'); + } + + // Remove a trailing newline to avoid auto-running when pasting + if (textContent.endsWith("\n")) { + textContent = textContent.slice(0, -1) + } + return textContent +} diff --git a/0.10.1/_static/design-tabs.js b/0.10.1/_static/design-tabs.js new file mode 100644 index 00000000..b25bd6a4 --- /dev/null +++ b/0.10.1/_static/design-tabs.js @@ -0,0 +1,101 @@ +// @ts-check + +// Extra JS capability for selected tabs to be synced +// The selection is stored in local storage so that it persists across page loads. + +/** + * @type {Record