diff --git a/src/srx_caproto_iocs/base.py b/src/srx_caproto_iocs/base.py index ceb456b..cd9beab 100644 --- a/src/srx_caproto_iocs/base.py +++ b/src/srx_caproto_iocs/base.py @@ -7,12 +7,13 @@ from enum import Enum from pathlib import Path -import numpy as np +import skimage.data from caproto import ChannelType from caproto.ioc_examples.mini_beamline import no_reentry from caproto.server import PVGroup, pvproperty, run, template_arg_parser from ophyd import Component as Cpt from ophyd import Device, EpicsSignal, EpicsSignalRO +from ophyd.status import SubscriptionStatus from .utils import now, save_hdf5 @@ -142,14 +143,18 @@ async def stage(self, instance, value): return False - def _get_current_dataset(self): - return np.random.random((10, 20)) + def _get_current_dataset(self, frame): + dataset = skimage.data.cells3d().sum(axis=1) + return dataset[frame, ...] @acquire.putter @no_reentry async def acquire(self, instance, value): """The acquire method to perform an individual acquisition of a data point.""" - if value != AcqStatuses.ACQUIRING.value: + if ( + value != AcqStatuses.ACQUIRING.value + # or self.stage.value not in [True, StageStates.STAGED.value] + ): return False if ( @@ -161,12 +166,16 @@ async def acquire(self, instance, value): ) return True + await self.acquire.write(AcqStatuses.ACQUIRING.value) + # Delegate saving the resulting data to a blocking callback in a thread. payload = { "filename": self.full_file_path.value, - "data": self._get_current_dataset(), + "data": self._get_current_dataset(frame=self.frame_num.value), "uid": str(uuid.uuid4()), "timestamp": ttime.time(), + "frame_number": self.frame_num.value, + "update_existing": self.frame_num.value > 0, } await self._request_queue.async_put(payload) @@ -176,6 +185,8 @@ async def acquire(self, instance, value): # Increment the counter only on a successful saving of the file. await self.frame_num.write(self.frame_num.value + 1) + # await self.acquire.write(AcqStatuses.IDLE.value) + return False @staticmethod @@ -185,9 +196,15 @@ def saver(request_queue, response_queue): received = request_queue.get() filename = received["filename"] data = received["data"] + frame_number = received["frame_number"] + update_existing = received["update_existing"] try: - save_hdf5(fname=filename, data=data) - print(f"{now()}: saved {data.shape} data into:\n {filename}") + save_hdf5( + fname=filename, data=data, mode="a", update_existing=update_existing + ) + print( + f"{now()}: saved {frame_number=} {data.shape} data into:\n {filename}" + ) success = True error_message = "" @@ -213,6 +230,41 @@ class OphydDeviceWithCaprotoIOC(Device): full_file_path = Cpt(EpicsSignalRO, "full_file_path", string=True) frame_num = Cpt(EpicsSignal, "frame_num") ioc_stage = Cpt(EpicsSignal, "stage", string=True) + acquire = Cpt(EpicsSignal, "acquire", string=True) + + def set(self, command): + """The set method with values for staging and acquiring.""" + + print(f"{now()}: {command = }") + if command in [StageStates.STAGED.value, "stage"]: + expected_old_value = StageStates.UNSTAGED.value + expected_new_value = StageStates.STAGED.value + obj = self.ioc_stage + cmd = StageStates.STAGED.value + + if command in [StageStates.UNSTAGED.value, "unstage"]: + expected_old_value = StageStates.STAGED.value + expected_new_value = StageStates.UNSTAGED.value + obj = self.ioc_stage + cmd = StageStates.UNSTAGED.value + + if command in [AcqStatuses.ACQUIRING.value, "acquire"]: + expected_old_value = AcqStatuses.ACQUIRING.value + expected_new_value = AcqStatuses.IDLE.value + obj = self.acquire + cmd = AcqStatuses.ACQUIRING.value + + def cb(value, old_value, **kwargs): + # pylint: disable=unused-argument + print(f"{now()}: {old_value} -> {value}") + if value == expected_new_value and old_value == expected_old_value: + return True + return False + + st = SubscriptionStatus(obj, callback=cb, run=False) + print(f"{now()}: {cmd = }") + obj.put(cmd) + return st def check_args(parser_, split_args_): diff --git a/src/srx_caproto_iocs/utils.py b/src/srx_caproto_iocs/utils.py index 8348255..07e99cb 100644 --- a/src/srx_caproto_iocs/utils.py +++ b/src/srx_caproto_iocs/utils.py @@ -23,25 +23,35 @@ def save_hdf5( mode="x", update_existing=False, ): - """The function to export the data to an HDF5 file.""" - h5file_desc = h5py.File(fname, mode, libver="latest") - frame_shape = data.shape - if not update_existing: - group = h5file_desc.create_group(group_name) - dataset = group.create_dataset( - "data/data", - data=np.full(fill_value=np.nan, shape=(1, *frame_shape)), - maxshape=(None, *frame_shape), - chunks=(1, *frame_shape), - dtype=dtype, - ) - frame_num = 0 - else: - dataset = h5file_desc[f"{group_name}/{group_path}"] - frame_num = dataset.shape[0] - - h5file_desc.swmr_mode = True - - dataset.resize((frame_num + 1, *frame_shape)) - dataset[frame_num, :, :] = data - dataset.flush() + """The function to export the data to an HDF5 file. + + Check https://docs.h5py.org/en/stable/high/file.html#opening-creating-files for modes: + + r Readonly, file must exist (default) + r+ Read/write, file must exist + w Create file, truncate if exists + w- or x Create file, fail if exists + a Read/write if exists, create otherwise + """ + with h5py.File(fname, mode, libver="latest") as h5file_desc: + frame_shape = data.shape + if not update_existing: + group = h5file_desc.create_group(group_name) + dataset = group.create_dataset( + group_path, + data=np.full(fill_value=np.nan, shape=(1, *frame_shape)), + maxshape=(None, *frame_shape), + chunks=(1, *frame_shape), + dtype=dtype, + ) + frame_num = 0 + else: + dataset = h5file_desc[f"{group_name}/{group_path}"] + frame_num = dataset.shape[0] + + # https://docs.h5py.org/en/stable/swmr.html + h5file_desc.swmr_mode = True + + dataset.resize((frame_num + 1, *frame_shape)) + dataset[frame_num, :, :] = data + dataset.flush() diff --git a/tests/conftest.py b/tests/conftest.py index 77e76f4..5137c09 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,7 +26,7 @@ def base_ophyd_device(): @pytest.fixture(scope="session") -def base_caproto_ioc(wait=3): +def base_caproto_ioc(wait=5): first_three = ".".join(socket.gethostbyname(socket.gethostname()).split(".")[:3]) broadcast = f"{first_three}.255" diff --git a/tests/test_base_ophyd.py b/tests/test_base_ophyd.py index 9d3fae4..2ed76f7 100644 --- a/tests/test_base_ophyd.py +++ b/tests/test_base_ophyd.py @@ -1,17 +1,21 @@ from __future__ import annotations import tempfile +import time as ttime from pathlib import Path +import h5py import pytest -from ophyd.status import SubscriptionStatus from srx_caproto_iocs.utils import now @pytest.mark.cloud_friendly() -@pytest.mark.parametrize("date_template", ["%Y/%m/", "%Y/%m/%d", "mydir/%Y/%m/%d"]) -def test_base_ophyd_templates(base_caproto_ioc, base_ophyd_device, date_template): +# @pytest.mark.parametrize("date_template", ["%Y/%m/", "%Y/%m/%d", "mydir/%Y/%m/%d"]) +@pytest.mark.parametrize("date_template", ["%Y/%m/"]) +def test_base_ophyd_templates( + base_caproto_ioc, base_ophyd_device, date_template, num_frames=50 +): with tempfile.TemporaryDirectory(prefix="/tmp/") as tmpdirname: date = now(as_object=True) write_dir_root = Path(tmpdirname) @@ -25,16 +29,23 @@ def test_base_ophyd_templates(base_caproto_ioc, base_ophyd_device, date_template dev.write_dir.put(dir_template) dev.file_name.put(file_template) - def cb(value, old_value, **kwargs): - if value == "staged" and old_value == "unstaged": - return True - return False - - st = SubscriptionStatus(dev.ioc_stage, callback=cb, run=False) - dev.ioc_stage.put("staged") - st.wait() + dev.set("stage").wait() full_file_path = dev.full_file_path.get() print(f"{full_file_path = }") + for i in range(num_frames): + print(f"Collecting frame {i}...") + dev.set("acquire").wait() + ttime.sleep(0.1) + + dev.set("unstage").wait() + assert full_file_path, "The returned 'full_file_path' did not change." + assert Path(full_file_path).is_file(), f"No such file '{full_file_path}'" + + with h5py.File(full_file_path, "r", swmr=True) as f: + dataset = f["/entry/data/data"] + assert dataset.shape == (num_frames, 256, 256) + + ttime.sleep(1.0)