Skip to content

Commit

Permalink
Discover Data and Capture records from PVI. breaks tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jsouter committed Dec 8, 2023
1 parent e510d24 commit e2cf4f9
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 52 deletions.
3 changes: 1 addition & 2 deletions src/ophyd_async/panda/writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .hdf_writer import PandaHDFWriter
from .panda_hdf import PandaHDF

__all__ = ["PandaHDFWriter", "PandaHDF"]
__all__ = ["PandaHDFWriter"]
170 changes: 141 additions & 29 deletions src/ophyd_async/panda/writers/hdf_writer.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,178 @@
import asyncio
from typing import AsyncIterator, Dict, List, Optional
from typing import (
AsyncIterator,
Dict,
List,
Optional,
FrozenSet,
Callable,
get_type_hints,
)
import atexit
from enum import Enum

from bluesky.protocols import Asset, Descriptor, Hints

from p4p.client.thread import Context

from ophyd_async.core import (
DEFAULT_TIMEOUT,
AsyncStatus,
DetectorWriter,
Device,
DirectoryProvider,
NameProvider,
set_and_wait_for_value,
Signal,
wait_for_value,
)

from .panda_hdf import PandaHDF, _HDFDataset, _HDFFile
from ophyd_async.epics.signal import (
pvi_get,
epics_signal_r,
epics_signal_rw,
epics_signal_w,
epics_signal_x,
)

from .panda_hdf import DataBlock, _HDFDataset, _HDFFile


class SimpleCapture(str, Enum):
No = "No"
Value = "Value"


class Capture(str, Enum):
No = "No"
Value = "Value"
Diff = "Diff"
Sum = "Sum"
Mean = "Mean"
Min = "Min"
Max = "Max"
MinMax = "Min Max"
MinMaxMean = "Min Max Mean"


class PandaHDFWriter(DetectorWriter):
# hdf: DataBlock
_ctxt: Optional[Context] = None

@property
def ctxt(self) -> Context:
if PandaHDFWriter._ctxt is None:
PandaHDFWriter._ctxt = Context("pva", nt=False)

@atexit.register
def _del_ctxt():
# If we don't do this we get messages like this on close:
# Error in sys.excepthook:
# Original exception was:
PandaHDFWriter._ctxt = None

return PandaHDFWriter._ctxt

async def connect(self, sim=False) -> None:
pvi_info = await pvi_get(self._prefix + ":PVI", self.ctxt) if not sim else {}
desired_signals = {}
for block_name, block in self._to_capture.items():
if block_name not in desired_signals:
desired_signals[block_name] = []
for signal_name in block:
desired_signals[block_name].append(
[f"{signal_name}_capture", SimpleCapture]
)
if "hdf5" not in desired_signals:
desired_signals["hdf5"] = []
for signal_name, hint in get_type_hints(self.hdf5).items():
dtype = hint.__args__[0]
desired_signals["hdf5"].append([signal_name, dtype])
for block_name, block_signals in desired_signals.items():
if block_name not in pvi_info:
continue
if not hasattr(self, block_name):
setattr(self, block_name, Device())
block_pvi = await pvi_get(pvi_info[block_name]["d"], self.ctxt)
block = getattr(self, block_name)
for signal_name, dtype in block_signals:
if signal_name not in block_pvi:
continue
signal_pvi = block_pvi[signal_name]
operations = frozenset(signal_pvi.keys())
pvs = [signal_pvi[i] for i in operations]
write_pv = pvs[0]
read_pv = write_pv if len(pvs) == 1 else pvs[1]
pv_ctxt = self.ctxt.get(read_pv)
if dtype is SimpleCapture: # capture record
if set(pv_ctxt.value.choices) == set(v.value for v in Capture):
dtype = Capture
signal = self.pvi_mapping[operations](
dtype, "pva://" + read_pv, "pva://" + write_pv
)
setattr(block, signal_name, signal)
await block.connect()

def __init__(
self,
hdf: PandaHDF,
prefix: str,
directory_provider: DirectoryProvider,
name_provider: NameProvider,
**to_capture: List[str],
) -> None:
self.hdf = hdf
self._connected_pvs = Device()
self._prefix = prefix
self.hdf5 = DataBlock() # needs a name
self._directory_provider = directory_provider
self._name_provider = name_provider

self._to_capture = to_capture
self._capture_status: Optional[AsyncStatus] = None
self._datasets: List[_HDFDataset] = []
self._file: Optional[_HDFFile] = None
self._multiplier = 1
self.pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = {
frozenset({"r", "w"}): lambda dtype, rpv, wpv: epics_signal_rw(
dtype, rpv, wpv
),
frozenset({"rw"}): lambda dtype, rpv, wpv: epics_signal_rw(dtype, rpv, wpv),
frozenset({"r"}): lambda dtype, rpv, wpv: epics_signal_r(dtype, rpv),
frozenset({"w"}): lambda dtype, rpv, wpv: epics_signal_w(dtype, wpv),
frozenset({"x"}): lambda dtype, rpv, wpv: epics_signal_x(wpv),
}

async def open(self, multiplier: int = 1) -> Dict[str, Descriptor]:
self._file = None
info = self._directory_provider()
await asyncio.gather(
self.hdf.file_path.set(info.directory_path),
self.hdf.file_name.set(f"{info.filename_prefix}.h5"),
self.hdf5.filepath.set(info.directory_path),
self.hdf5.filename.set(f"{info.filename_prefix}.h5"),
)

# Overwrite num_capture to go forever
await self.hdf.num_capture.set(0)
await self.hdf5.numcapture.set(0)
# Wait for it to start, stashing the status that tells us when it finishes
self._capture_status = await set_and_wait_for_value(self.hdf.capture, True)
await self.hdf5.capture.set(True)
self._capture_status = await wait_for_value(
self.hdf5.capturing, 1, DEFAULT_TIMEOUT
)
name = self._name_provider()
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
self._multiplier = multiplier
self._datasets = []
# Add all the scalar datasets
for ds_name, ds_path in self.hdf.scalar_datasets.items():
self._datasets.append(
_HDFDataset(
name,
ds_name,
ds_path,
[],
multiplier,

for block, block_signals in self._to_capture.items():
for signal in block_signals:
self._datasets.append(
_HDFDataset(
name, block, signal, f"{block}:{signal}".upper(), [], multiplier
)
)
)

describe = {
ds.name: Descriptor(
source=self.hdf.full_file_name.source,
source=self.hdf5.fullfilename.source,
shape=ds.shape,
dtype="array" if ds.shape else "number",
external="STREAM:",
Expand All @@ -80,32 +188,36 @@ def matcher(value: int) -> bool:
return value // self._multiplier >= index

matcher.__name__ = f"index_at_least_{index}"
await wait_for_value(self.hdf.num_written, matcher, timeout=timeout)
await wait_for_value(self.hdf5.numwritten_rbv, matcher, timeout=timeout)

async def get_indices_written(self) -> int:
num_written = await self.hdf.num_written.get_value()
value = await self.hdf5.numwritten_rbv.get_value()
num_written = await self.hdf5.numwritten_rbv.get_value()
return num_written // self._multiplier

async def collect_stream_docs(self, indices_written: int) -> AsyncIterator[Asset]:
# TODO: fail if we get dropped frames
await self.hdf.flush_now.set(True)
await self.hdf5.flushnow.set(True)
if indices_written:
if not self._file:
self._file = _HDFFile(
await self.hdf.full_file_name.get_value(), self._datasets
await self.hdf5.fullfilename.get_value(), self._datasets
)
for doc in self._file.stream_resources():
ds_name = doc["resource_kwargs"]["name"]
capturing = getattr(self.hdf, "capturing_" + ds_name, None)
if capturing and await capturing.get_value():
ds_block = doc["resource_kwargs"]["block"]
block = getattr(self, ds_block)
# capturing = getattr(self.hdf5, "capturing_" + ds_name, None)
capturing = getattr(block, f"{ds_name}_capture")
if capturing and await capturing.get_value() != Capture.No:
yield "stream_resource", doc
for doc in self._file.stream_data(indices_written):
yield "stream_datum", doc

async def close(self):
# Already done a caput callback in _capture_status, so can't do one here
await self.hdf.capture.set(False, wait=False)
await wait_for_value(self.hdf.capture, False, DEFAULT_TIMEOUT)
await self.hdf5.capture.set(False, wait=False)
await wait_for_value(self.hdf5.capturing, False, DEFAULT_TIMEOUT)
if self._capture_status:
# We kicked off an open, so wait for it to return
await self._capture_status
Expand Down
36 changes: 15 additions & 21 deletions src/ophyd_async/panda/writers/panda_hdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,27 @@
from event_model import StreamDatum, StreamResource, compose_stream_resource

from ophyd_async.core.device import Device
from ophyd_async.epics.signal.signal import epics_signal_r, epics_signal_rw
from ophyd_async.core import (
SignalR,
SignalRW,
)


class PandaHDF(Device):
def __init__(self, prefix: str, name: str = "", **scalar_datasets: str) -> None:
# Define some signals
self.file_path = epics_signal_rw(str, prefix + ":HDF5:FilePath")
self.file_name = epics_signal_rw(str, prefix + ":HDF5:FileName")
self.full_file_name = epics_signal_r(str, prefix + ":HDF5:FullFileName")
self.num_capture = epics_signal_rw(int, prefix + ":HDF5:NumCapture")
self.num_written = epics_signal_r(int, prefix + ":HDF5:NumWritten_RBV")
self.capture = epics_signal_rw(
bool, prefix + ":HDF5:Capturing", prefix + ":HDF5:Capture"
)
self.flush_now = epics_signal_rw(bool, prefix + ":HDF5:FlushNow")
self.scalar_datasets = scalar_datasets
for ds_name, ds_path in self.scalar_datasets.items():
setattr(
self,
"capturing_" + ds_name,
epics_signal_r(bool, prefix + ":" + ds_path + ":CAPTURE"),
)
super(PandaHDF, self).__init__(name)
class DataBlock(Device):
filepath: SignalRW[str]
filename: SignalRW[str]
fullfilename: SignalR[str]
numcapture: SignalRW[int]
flushnow: SignalRW[bool]
capture: SignalRW[bool]
capturing: SignalR[bool]
numwritten_rbv: SignalR[int]


@dataclass
class _HDFDataset:
device_name: str
block: str
name: str
path: str
shape: List[int]
Expand All @@ -49,6 +42,7 @@ def __init__(self, full_file_name: str, datasets: List[_HDFDataset]) -> None:
resource_path=full_file_name,
resource_kwargs={
"name": ds.name,
"block": ds.block,
"path": ds.path + ".Value",
"multiplier": ds.multiplier,
},
Expand Down

0 comments on commit e2cf4f9

Please sign in to comment.