From e2cf4f950dbc9e39571ec02685646c402517d8ff Mon Sep 17 00:00:00 2001 From: James Souter Date: Fri, 8 Dec 2023 13:24:28 +0000 Subject: [PATCH] Discover Data and Capture records from PVI. breaks tests --- src/ophyd_async/panda/writers/__init__.py | 3 +- src/ophyd_async/panda/writers/hdf_writer.py | 170 ++++++++++++++++---- src/ophyd_async/panda/writers/panda_hdf.py | 36 ++--- 3 files changed, 157 insertions(+), 52 deletions(-) diff --git a/src/ophyd_async/panda/writers/__init__.py b/src/ophyd_async/panda/writers/__init__.py index 55a21fc142..a623a16a16 100644 --- a/src/ophyd_async/panda/writers/__init__.py +++ b/src/ophyd_async/panda/writers/__init__.py @@ -1,4 +1,3 @@ from .hdf_writer import PandaHDFWriter -from .panda_hdf import PandaHDF -__all__ = ["PandaHDFWriter", "PandaHDF"] +__all__ = ["PandaHDFWriter"] diff --git a/src/ophyd_async/panda/writers/hdf_writer.py b/src/ophyd_async/panda/writers/hdf_writer.py index 84ed22a42c..cb62a71e95 100644 --- a/src/ophyd_async/panda/writers/hdf_writer.py +++ b/src/ophyd_async/panda/writers/hdf_writer.py @@ -1,49 +1,159 @@ import asyncio -from typing import AsyncIterator, Dict, List, Optional +from typing import ( + AsyncIterator, + Dict, + List, + Optional, + FrozenSet, + Callable, + get_type_hints, +) +import atexit +from enum import Enum from bluesky.protocols import Asset, Descriptor, Hints +from p4p.client.thread import Context + from ophyd_async.core import ( DEFAULT_TIMEOUT, AsyncStatus, DetectorWriter, + Device, DirectoryProvider, NameProvider, - set_and_wait_for_value, + Signal, wait_for_value, ) -from .panda_hdf import PandaHDF, _HDFDataset, _HDFFile +from ophyd_async.epics.signal import ( + pvi_get, + epics_signal_r, + epics_signal_rw, + epics_signal_w, + epics_signal_x, +) + +from .panda_hdf import DataBlock, _HDFDataset, _HDFFile + + +class SimpleCapture(str, Enum): + No = "No" + Value = "Value" + + +class Capture(str, Enum): + No = "No" + Value = "Value" + Diff = "Diff" + Sum = "Sum" + Mean = "Mean" + Min = "Min" + Max = "Max" + MinMax = "Min Max" + MinMaxMean = "Min Max Mean" class PandaHDFWriter(DetectorWriter): + # hdf: DataBlock + _ctxt: Optional[Context] = None + + @property + def ctxt(self) -> Context: + if PandaHDFWriter._ctxt is None: + PandaHDFWriter._ctxt = Context("pva", nt=False) + + @atexit.register + def _del_ctxt(): + # If we don't do this we get messages like this on close: + # Error in sys.excepthook: + # Original exception was: + PandaHDFWriter._ctxt = None + + return PandaHDFWriter._ctxt + + async def connect(self, sim=False) -> None: + pvi_info = await pvi_get(self._prefix + ":PVI", self.ctxt) if not sim else {} + desired_signals = {} + for block_name, block in self._to_capture.items(): + if block_name not in desired_signals: + desired_signals[block_name] = [] + for signal_name in block: + desired_signals[block_name].append( + [f"{signal_name}_capture", SimpleCapture] + ) + if "hdf5" not in desired_signals: + desired_signals["hdf5"] = [] + for signal_name, hint in get_type_hints(self.hdf5).items(): + dtype = hint.__args__[0] + desired_signals["hdf5"].append([signal_name, dtype]) + for block_name, block_signals in desired_signals.items(): + if block_name not in pvi_info: + continue + if not hasattr(self, block_name): + setattr(self, block_name, Device()) + block_pvi = await pvi_get(pvi_info[block_name]["d"], self.ctxt) + block = getattr(self, block_name) + for signal_name, dtype in block_signals: + if signal_name not in block_pvi: + continue + signal_pvi = block_pvi[signal_name] + operations = frozenset(signal_pvi.keys()) + pvs = [signal_pvi[i] for i in operations] + write_pv = pvs[0] + read_pv = write_pv if len(pvs) == 1 else pvs[1] + pv_ctxt = self.ctxt.get(read_pv) + if dtype is SimpleCapture: # capture record + if set(pv_ctxt.value.choices) == set(v.value for v in Capture): + dtype = Capture + signal = self.pvi_mapping[operations]( + dtype, "pva://" + read_pv, "pva://" + write_pv + ) + setattr(block, signal_name, signal) + await block.connect() + def __init__( self, - hdf: PandaHDF, + prefix: str, directory_provider: DirectoryProvider, name_provider: NameProvider, + **to_capture: List[str], ) -> None: - self.hdf = hdf + self._connected_pvs = Device() + self._prefix = prefix + self.hdf5 = DataBlock() # needs a name self._directory_provider = directory_provider self._name_provider = name_provider - + self._to_capture = to_capture self._capture_status: Optional[AsyncStatus] = None self._datasets: List[_HDFDataset] = [] self._file: Optional[_HDFFile] = None self._multiplier = 1 + self.pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { + frozenset({"r", "w"}): lambda dtype, rpv, wpv: epics_signal_rw( + dtype, rpv, wpv + ), + frozenset({"rw"}): lambda dtype, rpv, wpv: epics_signal_rw(dtype, rpv, wpv), + frozenset({"r"}): lambda dtype, rpv, wpv: epics_signal_r(dtype, rpv), + frozenset({"w"}): lambda dtype, rpv, wpv: epics_signal_w(dtype, wpv), + frozenset({"x"}): lambda dtype, rpv, wpv: epics_signal_x(wpv), + } async def open(self, multiplier: int = 1) -> Dict[str, Descriptor]: self._file = None info = self._directory_provider() await asyncio.gather( - self.hdf.file_path.set(info.directory_path), - self.hdf.file_name.set(f"{info.filename_prefix}.h5"), + self.hdf5.filepath.set(info.directory_path), + self.hdf5.filename.set(f"{info.filename_prefix}.h5"), ) # Overwrite num_capture to go forever - await self.hdf.num_capture.set(0) + await self.hdf5.numcapture.set(0) # Wait for it to start, stashing the status that tells us when it finishes - self._capture_status = await set_and_wait_for_value(self.hdf.capture, True) + await self.hdf5.capture.set(True) + self._capture_status = await wait_for_value( + self.hdf5.capturing, 1, DEFAULT_TIMEOUT + ) name = self._name_provider() if multiplier > 1: raise ValueError( @@ -51,20 +161,18 @@ async def open(self, multiplier: int = 1) -> Dict[str, Descriptor]: ) self._multiplier = multiplier self._datasets = [] - # Add all the scalar datasets - for ds_name, ds_path in self.hdf.scalar_datasets.items(): - self._datasets.append( - _HDFDataset( - name, - ds_name, - ds_path, - [], - multiplier, + + for block, block_signals in self._to_capture.items(): + for signal in block_signals: + self._datasets.append( + _HDFDataset( + name, block, signal, f"{block}:{signal}".upper(), [], multiplier + ) ) - ) + describe = { ds.name: Descriptor( - source=self.hdf.full_file_name.source, + source=self.hdf5.fullfilename.source, shape=ds.shape, dtype="array" if ds.shape else "number", external="STREAM:", @@ -80,32 +188,36 @@ def matcher(value: int) -> bool: return value // self._multiplier >= index matcher.__name__ = f"index_at_least_{index}" - await wait_for_value(self.hdf.num_written, matcher, timeout=timeout) + await wait_for_value(self.hdf5.numwritten_rbv, matcher, timeout=timeout) async def get_indices_written(self) -> int: - num_written = await self.hdf.num_written.get_value() + value = await self.hdf5.numwritten_rbv.get_value() + num_written = await self.hdf5.numwritten_rbv.get_value() return num_written // self._multiplier async def collect_stream_docs(self, indices_written: int) -> AsyncIterator[Asset]: # TODO: fail if we get dropped frames - await self.hdf.flush_now.set(True) + await self.hdf5.flushnow.set(True) if indices_written: if not self._file: self._file = _HDFFile( - await self.hdf.full_file_name.get_value(), self._datasets + await self.hdf5.fullfilename.get_value(), self._datasets ) for doc in self._file.stream_resources(): ds_name = doc["resource_kwargs"]["name"] - capturing = getattr(self.hdf, "capturing_" + ds_name, None) - if capturing and await capturing.get_value(): + ds_block = doc["resource_kwargs"]["block"] + block = getattr(self, ds_block) + # capturing = getattr(self.hdf5, "capturing_" + ds_name, None) + capturing = getattr(block, f"{ds_name}_capture") + if capturing and await capturing.get_value() != Capture.No: yield "stream_resource", doc for doc in self._file.stream_data(indices_written): yield "stream_datum", doc async def close(self): # Already done a caput callback in _capture_status, so can't do one here - await self.hdf.capture.set(False, wait=False) - await wait_for_value(self.hdf.capture, False, DEFAULT_TIMEOUT) + await self.hdf5.capture.set(False, wait=False) + await wait_for_value(self.hdf5.capturing, False, DEFAULT_TIMEOUT) if self._capture_status: # We kicked off an open, so wait for it to return await self._capture_status diff --git a/src/ophyd_async/panda/writers/panda_hdf.py b/src/ophyd_async/panda/writers/panda_hdf.py index 4a6b7e5126..2f4c89ff54 100644 --- a/src/ophyd_async/panda/writers/panda_hdf.py +++ b/src/ophyd_async/panda/writers/panda_hdf.py @@ -4,34 +4,27 @@ from event_model import StreamDatum, StreamResource, compose_stream_resource from ophyd_async.core.device import Device -from ophyd_async.epics.signal.signal import epics_signal_r, epics_signal_rw +from ophyd_async.core import ( + SignalR, + SignalRW, +) -class PandaHDF(Device): - def __init__(self, prefix: str, name: str = "", **scalar_datasets: str) -> None: - # Define some signals - self.file_path = epics_signal_rw(str, prefix + ":HDF5:FilePath") - self.file_name = epics_signal_rw(str, prefix + ":HDF5:FileName") - self.full_file_name = epics_signal_r(str, prefix + ":HDF5:FullFileName") - self.num_capture = epics_signal_rw(int, prefix + ":HDF5:NumCapture") - self.num_written = epics_signal_r(int, prefix + ":HDF5:NumWritten_RBV") - self.capture = epics_signal_rw( - bool, prefix + ":HDF5:Capturing", prefix + ":HDF5:Capture" - ) - self.flush_now = epics_signal_rw(bool, prefix + ":HDF5:FlushNow") - self.scalar_datasets = scalar_datasets - for ds_name, ds_path in self.scalar_datasets.items(): - setattr( - self, - "capturing_" + ds_name, - epics_signal_r(bool, prefix + ":" + ds_path + ":CAPTURE"), - ) - super(PandaHDF, self).__init__(name) +class DataBlock(Device): + filepath: SignalRW[str] + filename: SignalRW[str] + fullfilename: SignalR[str] + numcapture: SignalRW[int] + flushnow: SignalRW[bool] + capture: SignalRW[bool] + capturing: SignalR[bool] + numwritten_rbv: SignalR[int] @dataclass class _HDFDataset: device_name: str + block: str name: str path: str shape: List[int] @@ -49,6 +42,7 @@ def __init__(self, full_file_name: str, datasets: List[_HDFDataset]) -> None: resource_path=full_file_name, resource_kwargs={ "name": ds.name, + "block": ds.block, "path": ds.path + ".Value", "multiplier": ds.multiplier, },