From adb96a3ef5fb2ef89a69a8adb8a8b4a99953ea41 Mon Sep 17 00:00:00 2001 From: Max Rakitin Date: Fri, 17 May 2024 14:44:12 -0400 Subject: [PATCH] Prototyped IOC to accept the a dict of arrays to save - the ophyd/bluesky side should do the logic and call the saver when done --- src/srx_caproto_iocs/base.py | 2 +- src/srx_caproto_iocs/utils.py | 16 ++-- src/srx_caproto_iocs/zebra/caproto_ioc.py | 110 ++++++++++------------ 3 files changed, 58 insertions(+), 70 deletions(-) diff --git a/src/srx_caproto_iocs/base.py b/src/srx_caproto_iocs/base.py index 1bf4658..412d33c 100644 --- a/src/srx_caproto_iocs/base.py +++ b/src/srx_caproto_iocs/base.py @@ -16,7 +16,7 @@ from ophyd import Device, EpicsSignal, EpicsSignalRO from ophyd.status import SubscriptionStatus -from .utils import now, save_hdf5_1d +from .utils import now, save_hdf5_nd class AcqStatuses(Enum): diff --git a/src/srx_caproto_iocs/utils.py b/src/srx_caproto_iocs/utils.py index fd17f4c..737fe3d 100644 --- a/src/srx_caproto_iocs/utils.py +++ b/src/srx_caproto_iocs/utils.py @@ -15,10 +15,9 @@ def now(as_object=False): return _now.isoformat() -def save_hdf5_1d( +def save_hdf5_zebra( fname, data, - group_path="data", dtype="float32", mode="x", ): @@ -33,12 +32,13 @@ def save_hdf5_1d( a Read/write if exists, create otherwise """ with h5py.File(fname, mode, libver="latest") as h5file_desc: - dataset = h5file_desc.create_dataset( - group_path, - data=data, - dtype=dtype, - ) - dataset.flush() + for pvname, value in data.items(): + dataset = h5file_desc.create_dataset( + pvname, + data=value, + dtype=dtype, + ) + dataset.flush() def save_hdf5_nd( diff --git a/src/srx_caproto_iocs/zebra/caproto_ioc.py b/src/srx_caproto_iocs/zebra/caproto_ioc.py index 153a9b8..405fd8f 100644 --- a/src/srx_caproto_iocs/zebra/caproto_ioc.py +++ b/src/srx_caproto_iocs/zebra/caproto_ioc.py @@ -2,11 +2,10 @@ import textwrap -from caproto.asyncio.client import Context -from caproto.server import run, template_arg_parser +from caproto.server import pvproperty, run, template_arg_parser from ..base import CaprotoSaveIOC, check_args -from ..utils import now +from ..utils import now, save_hdf5_zebra # def export_nano_zebra_data(zebra, filepath, fastaxis): # j = 0 @@ -76,10 +75,36 @@ # read_attrs=["pc.data.enc1", "pc.data.enc2", "pc.data.enc3", "pc.data.time"], # ) +DEFAULT_MAX_LENGTH = 100_000 + class ZebraSaveIOC(CaprotoSaveIOC): """Zebra caproto save IOC.""" + enc1 = pvproperty( + value=0, + doc="enc1 data", + max_length=DEFAULT_MAX_LENGTH, + ) + + enc2 = pvproperty( + value=0, + doc="enc2 data", + max_length=DEFAULT_MAX_LENGTH, + ) + + enc3 = pvproperty( + value=0, + doc="enc3 data", + max_length=DEFAULT_MAX_LENGTH, + ) + + zebra_time = pvproperty( + value=0, + doc="zebra time", + max_length=DEFAULT_MAX_LENGTH, + ) + # enc1 = Cpt(EpicsSignal, "PC_ENC1") # enc2 = Cpt(EpicsSignal, "PC_ENC2") # enc3 = Cpt(EpicsSignal, "PC_ENC3") @@ -87,9 +112,8 @@ class ZebraSaveIOC(CaprotoSaveIOC): # data_in_progress = pvproperty() # + # time_d = pvproperty() - # enc1_d = pvproperty() - # enc2_d = pvproperty() - # enc3_d = pvproperty() + # enc1_d = pvproperty()TypeError: ZebraSaveIOC._get_current_dataset() got an unexpected keyword argument 'frame' + # pulse_step = pvproperty() def __init__(self, *args, external_pvs=None, **kwargs): @@ -105,60 +129,23 @@ def __init__(self, *args, external_pvs=None, **kwargs): # ret = await super()._stage(*args, **kwargs) # return ret - async def _get_current_dataset(self, frame, external_pv="enc1"): - client_context = Context() - (pvobject,) = await client_context.get_pvs(self._external_pvs[external_pv]) - print(f"{pvobject = }") - # pvobject = pvobjects[0] - ret = await pvobject.read() + async def _get_current_dataset( + self, *args, **kwargs + ): # , frame, external_pv="enc1"): + # client_context = Context() + # (pvobject,) = await client_context.get_pvs(self._external_pvs[external_pv]) + # print(f"{pvobject = }") + # # pvobject = pvobjects[0] + # ret = await pvobject.read() - dataset = ret.data + dataset = {} + for pvname in ["enc1", "enc2", "enc3", "zebra_time"]: + dataset[pvname] = getattr(self, pvname).value - print(f"{now()}:\n{dataset} {dataset.shape}") + print(f"{now()}:\n{dataset}") return dataset - @acquire.putter - @no_reentry - async def acquire(self, instance, value): - """The acquire method to perform an individual acquisition of a data point.""" - if ( - value != AcqStatuses.ACQUIRING.value - # or self.stage.value not in [True, StageStates.STAGED.value] - ): - return False - - if ( - instance.value in [True, AcqStatuses.ACQUIRING.value] - and value == AcqStatuses.ACQUIRING.value - ): - print( - f"The device is already acquiring. Please wait until the '{AcqStatuses.IDLE.value}' status." - ) - return True - - await self.acquire.write(AcqStatuses.ACQUIRING.value) - - # Delegate saving the resulting data to a blocking callback in a thread. - payload = { - "filename": self.full_file_path.value, - "data": await self._get_current_dataset(frame=self.frame_num.value), - "uid": str(uuid.uuid4()), - "timestamp": ttime.time(), - "frame_number": self.frame_num.value, - } - - await self._request_queue.async_put(payload) - response = await self._response_queue.async_get() - - if response["success"]: - # Increment the counter only on a successful saving of the file. - await self.frame_num.write(self.frame_num.value + 1) - - # await self.acquire.write(AcqStatuses.IDLE.value) - - return False - @staticmethod def saver(request_queue, response_queue): """The saver callback for threading-based queueing.""" @@ -166,12 +153,11 @@ def saver(request_queue, response_queue): received = request_queue.get() filename = received["filename"] data = received["data"] - frame_number = received["frame_number"] + # 'frame_number' is not used for this exporter. + frame_number = received["frame_number"] # noqa: F841 try: - save_hdf5_1d(fname=filename, data=data, mode="x", group_path="enc1") - print( - f"{now()}: saved {frame_number=} {data.shape} data into:\n {filename}" - ) + save_hdf5_zebra(fname=filename, data=data, mode="x") + print(f"{now()}: saved data into:\n {filename}") success = True error_message = "" @@ -192,7 +178,9 @@ def saver(request_queue, response_queue): ) ioc_options, run_options = check_args(parser, split_args) - external_pv_prefix = ioc_options["prefix"].replace("{{", "{").replace("}}", "}") # "XF:05IDD-ES:1{Dev:Zebra2}:" + external_pv_prefix = ( + ioc_options["prefix"].replace("{{", "{").replace("}}", "}") + ) # "XF:05IDD-ES:1{Dev:Zebra2}:" external_pvs = { "pulse_step": external_pv_prefix + "PC_PULSE_STEP",