forked from OxfordIonTrapGroup/entangler-core
-
Notifications
You must be signed in to change notification settings - Fork 1
/
driver.py
executable file
·304 lines (248 loc) · 11.3 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""ARTIQ kernel interface to the entangler core.
NOTE: requires ARTIQ >= 5, for the
:func:`artiq.coredevice.rtio.rtio_input_timestamped_data` RUST syscall.
"""
import numpy as np
from artiq.coredevice.rtio import rtio_input_data
from artiq.coredevice.rtio import rtio_input_timestamped_data
from artiq.coredevice.rtio import rtio_output
from artiq.language.core import delay_mu
from artiq.language.core import kernel
from artiq.language.types import TInt32
from artiq.language.types import TInt64
from artiq.language.types import TList
from artiq.language.types import TTuple
from entangler.phy_registers import ADDRESS_READ, ADDRESS_WRITE
from entangler.config import settings
class Entangler:
"""Sequences remote entanglement experiments between a master and a slave."""
# label internal variables as constant to optimize compilation.
kernel_invariants = {
"core",
"channel",
"is_master",
"ref_period_mu",
"num_inputs",
"num_outputs",
"_SEQUENCER_TIME_MASK",
"_ADDRESS_WRITE",
"_ADDRESS_READ",
"_NUM_ALLOWED_PATTERNS",
"_PATTERN_LENGTH_MASK",
"_PATTERN_WIDTH",
}
def __init__(self, dmgr, channel, is_master=True, core_device="core"):
"""Fast sequencer for generating remote entanglement.
Args:
dmgr (artiq.DeviceManager): ARTIQ device manager
channel (int): RTIO channel number
is_master (bool, optional): Is this Kasli the sequencer master or the
slave. Defaults to True.
core_device (str, optional): Core device name. Defaults to "core".
"""
self.core = dmgr.get(core_device)
self.channel = channel
self.is_master = is_master
self.ref_period_mu = self.core.seconds_to_mu(self.core.coarse_ref_period)
self.num_outputs = settings.NUM_OUTPUT_CHANNELS
self.num_inputs = settings.NUM_ENTANGLER_INPUT_SIGNALS
self._SEQUENCER_TIME_MASK = (1 << settings.FULL_COUNTER_WIDTH) - 1
self._ADDRESS_WRITE = ADDRESS_WRITE
self._ADDRESS_READ = ADDRESS_READ
self._NUM_ALLOWED_PATTERNS = settings.NUM_PATTERNS_ALLOWED
self._PATTERN_LENGTH_MASK = (1 << settings.NUM_PATTERNS_ALLOWED) - 1
self._PATTERN_WIDTH = settings.NUM_ENTANGLER_INPUT_SIGNALS
@kernel
def init(self):
"""Initialize the ``Entangler`` core gateware settings."""
self.set_config() # Write is_master
@kernel
def _write(self, addr, value):
"""Write parameter.
This method advances the timeline by one coarse RTIO cycle.
Args:
addr: parameter memory address.
value: Data to be written.
"""
rtio_output((self.channel << 8) | addr, value)
delay_mu(self.ref_period_mu)
@kernel
def _read(self, addr: TInt32) -> TInt32:
"""Read parameter.
This method does not advance the timeline but consumes all slack.
Args:
addr: Memory location address.
Returns:
Value of the ``Entangler`` setting (register) that you are querying.
"""
rtio_output((self.channel << 8) | addr, 0)
return rtio_input_data(self.channel)
@kernel
def set_config(self, enable=False, standalone=True):
"""Configure the core gateware.
Args:
enable: allow core to drive outputs (otherwise they are connected to
normal TTLOut phys). Do not enable if the cycle length and timing
parameters are not set or you are trying to use the outputs elsewhere
for other things.
NOTE: you must also disable the entangler once done to use outputs.
standalone: don't attempt synchronization with partner, just run when
ready. Used for testing and single-trap mode.
"""
data = 0
if enable:
data |= 1
if self.is_master:
data |= 1 << 1
if standalone:
data |= 1 << 2
self._write(self._ADDRESS_WRITE.CONFIG, data)
@kernel
def set_timing_mu(self, channel: TInt32, t_start_mu: TInt32, t_stop_mu: TInt32):
"""Set the output channel timing and input gate times.
``t_start_mu`` and ``t_start_mu`` define a window.
For an output, this window is when the output signal is HIGH (Logic 1).
For an input, this window is when the Entangler can register an input pulse
(positive edge triggered). Minimum values for t_start_mu/t_stop_mu is 8 (mu).
Times are in machine units.
For output channels the timing resolution is the coarse clock (8ns), and
the times are relative to the start of the entanglement cycle.
For gate channels, if you are using a reference pulse then the time is
relative to the reference pulse (422 pulse input).
Otherwise it is relative to the cycle start (IonPhoton).
Input gating has fine timing resolution (1ns).
The start / stop times can be between 0 and the cycle length
(i.e for a cycle length of 800ns, stop can be at most 800ns).
(in mu, 100*8ns is typically 800).
If the stop time is after the cycle length, the pulse stops at the cycle length.
If the start is after the cycle length there is no pulse.
Channels are numbered (0, num_outputs, num_inputs + num_outputs),
where the # of I/O is defined in settings.toml. That is, the outputs come first,
and then the inputs. So to find the channel number for input #2 (0-indexed):
``in2chan = driver.num_outputs + 2``.
Likewise, input #0: ``in0chan = driver.num_outputs + 0``.
Note that changing the number of inputs/outputs requires re-compiling the
gateware for the Kasli/Entangler.
NOTE: if both ``gate_start, gate_stop < 8``, or if ``gate_start >= gate_stop``,
the inputs will not register.
"""
if channel < self.num_outputs:
# remove the fine timestamp from outputs
t_start_mu = t_start_mu >> 3
t_stop_mu = t_stop_mu >> 3
# TODO: don't know why add 1...
t_start_mu += 1
t_stop_mu += 1
# Truncate to settings.FULL_COUNTER_WIDTH.
t_start_mu &= self._SEQUENCER_TIME_MASK
t_stop_mu &= self._SEQUENCER_TIME_MASK
# Convert to channel write address
channel = self._ADDRESS_WRITE.TIMING + channel
self._write(channel, (t_stop_mu << 16) | t_start_mu)
@kernel
def set_timing(self, channel, t_start, t_stop):
"""Set the output channel timing and relative gate times.
Times are in seconds. See :meth:`set_timing_mu` for details.
"""
t_start_mu = np.int32(self.core.seconds_to_mu(t_start))
t_stop_mu = np.int32(self.core.seconds_to_mu(t_stop))
self.set_timing_mu(channel, t_start_mu, t_stop_mu)
@kernel
def set_cycle_length_mu(self, t_cycle_mu: TInt32):
"""Set the entanglement cycle length.
If the herald module does not signal success by this time the loop
repeats. Resolution is coarse_ref_period.
"""
t_cycle_mu = t_cycle_mu >> 3
self._write(self._ADDRESS_WRITE.TCYCLE, t_cycle_mu)
@kernel
def set_cycle_length(self, t_cycle):
"""Set the entanglement cycle length.
Times are in seconds.
"""
t_cycle_mu = np.int32(self.core.seconds_to_mu(t_cycle))
self.set_cycle_length_mu(t_cycle_mu)
@kernel
def set_patterns(self, patterns: TList(TInt32)):
"""Set the count patterns that cause the entangler loop to exit.
Up to 4 patterns can be set.
Each pattern is a 4 bit number, with the order (MSB first):
input3, input2, input1, input0
E.g. to set a match on input0 only: set_patterns([0b0001])
to match on BOTH input3 & input1: set_patterns([0b1010])
To match on either (input0) or (input1 & input3): set_patterns([0b0001, 0b1010]).
NOTE: inputs are in numerical order in their respective DIO/TTL bank.
Thus, input0 is usually either TTLX-IN0 or TTLX-IN4.
"""
data = 0
assert len(patterns) <= self._NUM_ALLOWED_PATTERNS
for i in range(len(patterns)):
data |= (patterns[i] & self._PATTERN_LENGTH_MASK) << (
self._PATTERN_WIDTH * i
)
data |= 1 << (self._NUM_ALLOWED_PATTERNS * self._PATTERN_WIDTH + i)
self._write(self._ADDRESS_WRITE.PATTERNS, data)
@kernel
def run_mu(self, duration_mu) -> TTuple([TInt64, TInt32]):
"""Run the entanglement sequence until success, or duration_mu has elapsed.
NOTE: THIS IS A BLOCKING CALL (eats all slack).
Args:
duration_mu (int): Timeout duration of this entanglement cycle, in mu.
Returns:
tuple of (timestamp, reason).
timestamp is the RTIO time at the end of the final cycle.
reason is 0x3fff if there was a timeout, or a bitfield giving the
herald matches if there was a success.
"""
duration_mu = duration_mu >> 3
self._write(self._ADDRESS_WRITE.RUN, duration_mu)
# Following func is only in ARTIQ >= 5, don't have in dev environment
# pylint: disable=no-name-in-module
return rtio_input_timestamped_data(np.int64(-1), self.channel)
@kernel
def run(self, duration):
"""Run the entanglement sequence.
See :meth:`run_mu` for details. NOTE: this is a blocking call (eats all slack).
Duration is in seconds, max of about 4 seconds.
"""
duration_mu = np.int32(self.core.seconds_to_mu(duration))
return self.run_mu(duration_mu)
@kernel
def get_status(self):
"""Get status of the entangler gateware.
Returns:
(int): 3 flag bits, MSB -> LSB:
last run timed out; last run was a success; ready to start
NOTE: ready_to_start is usually not set b/c only asserted during run()
"""
return self._read(self._ADDRESS_READ.STATUS)
@kernel
def get_ncycles(self):
"""Get the number of cycles the core has completed.
This value is reset every :meth:`run` call, so this is the number since the
last :meth:`run` call.
"""
return self._read(self._ADDRESS_READ.NCYCLES)
@kernel
def get_ntriggers(self):
"""Get the number of 422pulsed triggers the core has received.
This value is reset every :meth:`run` call, so this is the number since the
last :meth:`run` call.
"""
return self._read(self._ADDRESS_READ.NTRIGGERS)
@kernel
def get_time_remaining(self):
"""Return the remaining number of clock cycles until the core times out.
NOTE: This only works as expected if the Entangler matches a pattern.
This cannot be used during :meth:`run` because that call blocks execution.
"""
return self._read(self._ADDRESS_READ.TIME_REMAINING)
@kernel
def get_timestamp_mu(self, channel: TInt32) -> TInt32:
"""Get the input timestamp for an input channel.
Channels are numbered from (0, settings.NUM_ENTANGLER_INPUT_SIGNALS)
(add 1 if using a reference).
The timestamp is the time offset, in mu, from the start of the cycle to
the detected rising edge.
"""
return self._read(np.int32(self._ADDRESS_READ.TIMESTAMP) + channel)