Skip to content

Commit

Permalink
Prototyped IOC to accept the a dict of arrays to save - the ophyd/blu…
Browse files Browse the repository at this point in the history
…esky side should do the logic and call the saver when done
  • Loading branch information
mrakitin authored and SRX Operator committed May 17, 2024
1 parent 6524861 commit adb96a3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 70 deletions.
2 changes: 1 addition & 1 deletion src/srx_caproto_iocs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd.status import SubscriptionStatus

from .utils import now, save_hdf5_1d
from .utils import now, save_hdf5_nd


class AcqStatuses(Enum):
Expand Down
16 changes: 8 additions & 8 deletions src/srx_caproto_iocs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ def now(as_object=False):
return _now.isoformat()


def save_hdf5_1d(
def save_hdf5_zebra(
fname,
data,
group_path="data",
dtype="float32",
mode="x",
):
Expand All @@ -33,12 +32,13 @@ def save_hdf5_1d(
a Read/write if exists, create otherwise
"""
with h5py.File(fname, mode, libver="latest") as h5file_desc:
dataset = h5file_desc.create_dataset(
group_path,
data=data,
dtype=dtype,
)
dataset.flush()
for pvname, value in data.items():
dataset = h5file_desc.create_dataset(
pvname,
data=value,
dtype=dtype,
)
dataset.flush()


def save_hdf5_nd(
Expand Down
110 changes: 49 additions & 61 deletions src/srx_caproto_iocs/zebra/caproto_ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import textwrap

from caproto.asyncio.client import Context
from caproto.server import run, template_arg_parser
from caproto.server import pvproperty, run, template_arg_parser

from ..base import CaprotoSaveIOC, check_args
from ..utils import now
from ..utils import now, save_hdf5_zebra

# def export_nano_zebra_data(zebra, filepath, fastaxis):
# j = 0
Expand Down Expand Up @@ -76,20 +75,45 @@
# read_attrs=["pc.data.enc1", "pc.data.enc2", "pc.data.enc3", "pc.data.time"],
# )

DEFAULT_MAX_LENGTH = 100_000


class ZebraSaveIOC(CaprotoSaveIOC):
"""Zebra caproto save IOC."""

enc1 = pvproperty(
value=0,
doc="enc1 data",
max_length=DEFAULT_MAX_LENGTH,
)

enc2 = pvproperty(
value=0,
doc="enc2 data",
max_length=DEFAULT_MAX_LENGTH,
)

enc3 = pvproperty(
value=0,
doc="enc3 data",
max_length=DEFAULT_MAX_LENGTH,
)

zebra_time = pvproperty(
value=0,
doc="zebra time",
max_length=DEFAULT_MAX_LENGTH,
)

# enc1 = Cpt(EpicsSignal, "PC_ENC1")
# enc2 = Cpt(EpicsSignal, "PC_ENC2")
# enc3 = Cpt(EpicsSignal, "PC_ENC3")
# enc4 = Cpt(EpicsSignal, "PC_ENC4")

# data_in_progress = pvproperty() # +
# time_d = pvproperty()
# enc1_d = pvproperty()
# enc2_d = pvproperty()
# enc3_d = pvproperty()
# enc1_d = pvproperty()TypeError: ZebraSaveIOC._get_current_dataset() got an unexpected keyword argument 'frame'

# pulse_step = pvproperty()

def __init__(self, *args, external_pvs=None, **kwargs):
Expand All @@ -105,73 +129,35 @@ def __init__(self, *args, external_pvs=None, **kwargs):
# ret = await super()._stage(*args, **kwargs)
# return ret

async def _get_current_dataset(self, frame, external_pv="enc1"):
client_context = Context()
(pvobject,) = await client_context.get_pvs(self._external_pvs[external_pv])
print(f"{pvobject = }")
# pvobject = pvobjects[0]
ret = await pvobject.read()
async def _get_current_dataset(
self, *args, **kwargs
): # , frame, external_pv="enc1"):
# client_context = Context()
# (pvobject,) = await client_context.get_pvs(self._external_pvs[external_pv])
# print(f"{pvobject = }")
# # pvobject = pvobjects[0]
# ret = await pvobject.read()

dataset = ret.data
dataset = {}
for pvname in ["enc1", "enc2", "enc3", "zebra_time"]:
dataset[pvname] = getattr(self, pvname).value

print(f"{now()}:\n{dataset} {dataset.shape}")
print(f"{now()}:\n{dataset}")

return dataset

@acquire.putter
@no_reentry
async def acquire(self, instance, value):
"""The acquire method to perform an individual acquisition of a data point."""
if (
value != AcqStatuses.ACQUIRING.value
# or self.stage.value not in [True, StageStates.STAGED.value]
):
return False

if (
instance.value in [True, AcqStatuses.ACQUIRING.value]
and value == AcqStatuses.ACQUIRING.value
):
print(
f"The device is already acquiring. Please wait until the '{AcqStatuses.IDLE.value}' status."
)
return True

await self.acquire.write(AcqStatuses.ACQUIRING.value)

# Delegate saving the resulting data to a blocking callback in a thread.
payload = {
"filename": self.full_file_path.value,
"data": await self._get_current_dataset(frame=self.frame_num.value),
"uid": str(uuid.uuid4()),
"timestamp": ttime.time(),
"frame_number": self.frame_num.value,
}

await self._request_queue.async_put(payload)
response = await self._response_queue.async_get()

if response["success"]:
# Increment the counter only on a successful saving of the file.
await self.frame_num.write(self.frame_num.value + 1)

# await self.acquire.write(AcqStatuses.IDLE.value)

return False

@staticmethod
def saver(request_queue, response_queue):
"""The saver callback for threading-based queueing."""
while True:
received = request_queue.get()
filename = received["filename"]
data = received["data"]
frame_number = received["frame_number"]
# 'frame_number' is not used for this exporter.
frame_number = received["frame_number"] # noqa: F841
try:
save_hdf5_1d(fname=filename, data=data, mode="x", group_path="enc1")
print(
f"{now()}: saved {frame_number=} {data.shape} data into:\n {filename}"
)
save_hdf5_zebra(fname=filename, data=data, mode="x")
print(f"{now()}: saved data into:\n {filename}")

success = True
error_message = ""
Expand All @@ -192,7 +178,9 @@ def saver(request_queue, response_queue):
)
ioc_options, run_options = check_args(parser, split_args)

external_pv_prefix = ioc_options["prefix"].replace("{{", "{").replace("}}", "}") # "XF:05IDD-ES:1{Dev:Zebra2}:"
external_pv_prefix = (
ioc_options["prefix"].replace("{{", "{").replace("}}", "}")
) # "XF:05IDD-ES:1{Dev:Zebra2}:"

external_pvs = {
"pulse_step": external_pv_prefix + "PC_PULSE_STEP",
Expand Down

0 comments on commit adb96a3

Please sign in to comment.