Skip to content

Commit 29dc268

Browse files
authored
usb backend for opentrons thermocycler standalone operation (#673)
1 parent 20633c7 commit 29dc268

File tree

2 files changed

+337
-1
lines changed

2 files changed

+337
-1
lines changed
Lines changed: 336 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
# For direct control of the Opentrons Thermocycler to any USB port.
2+
# Does not require an Opentrons liquid handler to use.
3+
4+
import asyncio
5+
from typing import List, Optional
6+
7+
from pylabrobot.thermocycling.backend import ThermocyclerBackend
8+
from pylabrobot.thermocycling.standard import (
9+
BlockStatus,
10+
LidStatus,
11+
Protocol,
12+
)
13+
14+
try:
15+
import serial.tools.list_ports
16+
from opentrons.drivers.thermocycler import ThermocyclerDriverFactory
17+
from opentrons.drivers.thermocycler.abstract import AbstractThermocyclerDriver
18+
from opentrons.drivers.types import ThermocyclerLidStatus
19+
20+
USE_OPENTRONS_DRIVER = True
21+
_import_error = None
22+
except ImportError as e:
23+
USE_OPENTRONS_DRIVER = False
24+
_import_error = e
25+
26+
27+
async def set_temperature_no_pause(
28+
driver,
29+
temperature: float,
30+
hold_time_seconds: Optional[float],
31+
hold_time_minutes: Optional[float],
32+
ramp_rate: Optional[float],
33+
volume: Optional[float],
34+
) -> None:
35+
"""Set temperature without waiting for completion."""
36+
seconds = hold_time_seconds if hold_time_seconds is not None else 0
37+
minutes = hold_time_minutes if hold_time_minutes is not None else 0
38+
total_seconds = seconds + (minutes * 60)
39+
hold_time = total_seconds if total_seconds > 0 else 0
40+
41+
if ramp_rate is not None:
42+
await driver.set_ramp_rate(ramp_rate=ramp_rate)
43+
44+
await driver.set_plate_temperature(temp=temperature, hold_time=hold_time, volume=volume)
45+
46+
47+
async def wait_for_block_target(driver) -> None:
48+
"""Wait for block temperature to reach target."""
49+
max_attempts = 300 # 5 minutes max wait (300 * 1 second)
50+
attempt = 0
51+
52+
while attempt < max_attempts:
53+
try:
54+
plate_temp = await driver.get_plate_temperature()
55+
if plate_temp.target is not None and abs(plate_temp.current - plate_temp.target) < 1.0:
56+
break
57+
except Exception as e:
58+
if "invalid thermistor" in str(e).lower() or "error" in str(e).lower():
59+
raise RuntimeError(f"Thermocycler hardware error: {e}")
60+
print(f"Temperature check failed (attempt {attempt + 1}), retrying: {e}")
61+
attempt += 1
62+
await asyncio.sleep(1.0)
63+
else:
64+
raise TimeoutError(f"Temperature did not reach target within {max_attempts} seconds")
65+
66+
67+
async def execute_cycle_step(
68+
driver,
69+
temperature: float,
70+
hold_time_seconds: float,
71+
ramp_rate: Optional[float] = None,
72+
volume: Optional[float] = None,
73+
) -> None:
74+
"""Execute a single thermocycler step."""
75+
await set_temperature_no_pause(
76+
driver=driver,
77+
temperature=temperature,
78+
hold_time_seconds=hold_time_seconds,
79+
hold_time_minutes=None,
80+
ramp_rate=ramp_rate,
81+
volume=volume,
82+
)
83+
await wait_for_block_target(driver)
84+
85+
86+
class OpentronsThermocyclerUSBBackend(ThermocyclerBackend):
87+
"""USB backend for the Opentrons GEN-1/GEN-2 Thermocycler."""
88+
89+
SUPPORTED_USB_IDS = {
90+
(0x04D8, 0xED8C), # legacy Microchip bridge
91+
(0x0483, 0xED8D), # STMicroelectronics bridge seen in newer units
92+
}
93+
94+
def __init__(self):
95+
"""Create a new USB backend."""
96+
super().__init__()
97+
if not USE_OPENTRONS_DRIVER:
98+
raise RuntimeError("Opentrons thermocycler driver not available") from _import_error
99+
100+
self._driver: Optional[AbstractThermocyclerDriver] = None
101+
self._current_protocol: Optional[Protocol] = None
102+
self._loop: Optional[asyncio.AbstractEventLoop] = None
103+
104+
self._total_cycle_count: Optional[int] = None
105+
self._current_cycle_index: Optional[int] = None
106+
self._total_step_count: Optional[int] = None
107+
self._current_step_index: Optional[int] = None
108+
109+
def _clear_cycle_counters(self) -> None:
110+
self._total_cycle_count = None
111+
self._current_cycle_index = None
112+
self._total_step_count = None
113+
self._current_step_index = None
114+
115+
async def _execute_cycle_step(
116+
self,
117+
temperature: float,
118+
hold_time_seconds: float,
119+
ramp_rate: Optional[float] = None,
120+
volume: Optional[float] = None,
121+
) -> None:
122+
"""Execute a single thermocycler step (uses shared utility)."""
123+
assert self._driver is not None
124+
await execute_cycle_step(
125+
driver=self._driver,
126+
temperature=temperature,
127+
hold_time_seconds=hold_time_seconds,
128+
ramp_rate=ramp_rate,
129+
volume=volume,
130+
)
131+
132+
async def _execute_cycles(
133+
self,
134+
protocol: Protocol,
135+
repetitions: int,
136+
volume: Optional[float],
137+
) -> None:
138+
"""Execute cycles of temperature steps directly from protocol (with cycle tracking)."""
139+
assert self._driver is not None
140+
self._total_cycle_count = repetitions
141+
self._total_step_count = 0
142+
for stage in protocol.stages:
143+
for _ in range(stage.repeats):
144+
for step in stage.steps:
145+
self._total_step_count += 1
146+
147+
self._current_step_index = 0
148+
for rep in range(repetitions):
149+
self._current_cycle_index = rep + 1
150+
for stage in protocol.stages:
151+
for _ in range(stage.repeats):
152+
for step in stage.steps:
153+
if len(set(step.temperature)) != 1:
154+
raise ValueError(
155+
f"Opentrons thermocycler only supports a single unique temperature per step, got {set(step.temperature)}"
156+
)
157+
temperature = step.temperature[0]
158+
hold_time = step.hold_seconds
159+
ramp_rate = step.rate if step.rate is not None else None
160+
self._current_step_index += 1
161+
await execute_cycle_step(
162+
driver=self._driver,
163+
temperature=temperature,
164+
hold_time_seconds=hold_time,
165+
ramp_rate=ramp_rate,
166+
volume=volume,
167+
)
168+
169+
async def run_protocol(self, protocol: Protocol, block_max_volume: float):
170+
"""Execute thermocycler protocol using similar execution logic from thermocycler.py.
171+
172+
Implements specific to opentrons thermocycler:
173+
- Multiple stages with repeats
174+
- Individual step tracking
175+
- Cycle counting
176+
- Ramp rate control
177+
- Temperature waiting
178+
"""
179+
try:
180+
# Execute all steps as one cycle
181+
await self._execute_cycles(
182+
protocol=protocol,
183+
repetitions=1, # Protocol is executed once
184+
volume=block_max_volume,
185+
)
186+
finally:
187+
self._clear_cycle_counters()
188+
189+
self._current_protocol = protocol
190+
191+
async def setup(self, port: Optional[str] = None):
192+
"""Setup the USB connection to the thermocycler."""
193+
if self._loop is None:
194+
self._loop = asyncio.get_event_loop()
195+
196+
if port is None:
197+
ports = serial.tools.list_ports.comports()
198+
opentrons_ports = [p for p in ports if (p.vid, p.pid) in self.SUPPORTED_USB_IDS]
199+
if len(opentrons_ports) == 0:
200+
raise RuntimeError(
201+
f"No Opentrons Thermocycler found with supported VID:PID pairs: {self.SUPPORTED_USB_IDS}"
202+
)
203+
elif len(opentrons_ports) > 1:
204+
available_ports = [p.device for p in opentrons_ports]
205+
raise RuntimeError(
206+
f"Multiple Opentrons Thermocyclers found: {available_ports}. Please specify the port explicitly."
207+
)
208+
else:
209+
port = opentrons_ports[0].device
210+
211+
self._driver = await ThermocyclerDriverFactory.create(port, self._loop)
212+
213+
async def stop(self):
214+
if self._driver is not None:
215+
await self.deactivate_block()
216+
await self.deactivate_lid()
217+
await self._driver.disconnect()
218+
self._driver = None
219+
220+
async def open_lid(self):
221+
assert self._driver is not None
222+
await self._driver.open_lid()
223+
224+
async def close_lid(self):
225+
assert self._driver is not None
226+
await self._driver.close_lid()
227+
228+
async def lift_plate(self):
229+
"""Lift the thermocycler plate to un-stick and robustly pick up with robot arm."""
230+
assert self._driver is not None
231+
await self._driver.lift_plate()
232+
233+
async def jog_lid(self, angle: float):
234+
"""Jog the lid to a specific angle position."""
235+
assert self._driver is not None
236+
await self._driver.jog_lid(angle)
237+
238+
async def set_block_temperature(self, temperature: List[float]):
239+
"""Set block temperature in °C. Only single unique temperature supported.
240+
use set_ramp_rate inependently to control ramp rate to determined temperature
241+
"""
242+
if len(set(temperature)) != 1:
243+
raise ValueError(
244+
f"Opentrons thermocycler only supports a single unique block temperature, got {set(temperature)}"
245+
)
246+
temp_value = temperature[0]
247+
assert self._driver is not None
248+
await self._driver.set_plate_temperature(temp_value)
249+
250+
async def set_lid_temperature(self, temperature: List[float]):
251+
"""Set lid temperature in °C. Only single unique temperature supported."""
252+
if len(set(temperature)) != 1:
253+
raise ValueError(
254+
f"Opentrons thermocycler only supports a single unique lid temperature, got {set(temperature)}"
255+
)
256+
temp_value = temperature[0]
257+
assert self._driver is not None
258+
await self._driver.set_lid_temperature(temp_value)
259+
260+
async def set_ramp_rate(self, ramp_rate: float):
261+
"""Set the temperature ramp rate in °C/minute."""
262+
assert self._driver is not None
263+
await self._driver.set_ramp_rate(ramp_rate)
264+
265+
async def deactivate_block(self):
266+
"""Deactivate the block heater."""
267+
assert self._driver is not None
268+
await self._driver.deactivate_block()
269+
270+
async def deactivate_lid(self):
271+
"""Deactivate the lid heater."""
272+
assert self._driver is not None
273+
await self._driver.deactivate_lid()
274+
275+
async def get_device_info(self) -> dict:
276+
assert self._driver is not None
277+
return await self._driver.get_device_info()
278+
279+
async def get_block_current_temperature(self) -> List[float]:
280+
assert self._driver is not None
281+
plate_temp = await self._driver.get_plate_temperature()
282+
return [plate_temp.current]
283+
284+
async def get_block_target_temperature(self) -> List[float]:
285+
assert self._driver is not None
286+
plate_temp = await self._driver.get_plate_temperature()
287+
if plate_temp.target is not None:
288+
return [plate_temp.target]
289+
raise RuntimeError("Block target temperature is not set.")
290+
291+
async def get_lid_current_temperature(self) -> List[float]:
292+
assert self._driver is not None
293+
lid_temp = await self._driver.get_lid_temperature()
294+
return [lid_temp.current]
295+
296+
async def get_lid_target_temperature(self) -> List[float]:
297+
assert self._driver is not None
298+
lid_temp = await self._driver.get_lid_temperature()
299+
if lid_temp.target is not None:
300+
return [lid_temp.target]
301+
raise RuntimeError("Lid target temperature is not set.")
302+
303+
async def get_lid_open(self) -> bool:
304+
"""Return True if the lid is open."""
305+
assert self._driver is not None
306+
lid_status = await self._driver.get_lid_status()
307+
return lid_status == ThermocyclerLidStatus.OPEN
308+
309+
async def get_lid_status(self) -> LidStatus:
310+
assert self._driver is not None
311+
lid_temp = await self._driver.get_lid_temperature()
312+
if lid_temp.target is not None and abs(lid_temp.current - lid_temp.target) < 1.0:
313+
return LidStatus.HOLDING_AT_TARGET
314+
return LidStatus.IDLE
315+
316+
async def get_block_status(self) -> BlockStatus:
317+
assert self._driver is not None
318+
plate_temp = await self._driver.get_plate_temperature()
319+
if plate_temp.target is not None and abs(plate_temp.current - plate_temp.target) < 1.0:
320+
return BlockStatus.HOLDING_AT_TARGET
321+
return BlockStatus.IDLE
322+
323+
async def get_hold_time(self) -> float:
324+
raise NotImplementedError("USB driver doesn't provide hold time information")
325+
326+
async def get_current_cycle_index(self) -> int:
327+
return self._current_cycle_index if self._current_cycle_index is not None else 0
328+
329+
async def get_total_cycle_count(self) -> int:
330+
return self._total_cycle_count if self._total_cycle_count is not None else 0
331+
332+
async def get_current_step_index(self) -> int:
333+
return self._current_step_index if self._current_step_index is not None else 0
334+
335+
async def get_total_step_count(self) -> int:
336+
return self._total_step_count if self._total_step_count is not None else 0

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
extras_visualizer = extras_websockets
2020

21-
extras_opentrons = ["opentrons-http-api-client", "opentrons-shared-data"]
21+
extras_opentrons = ["opentrons-http-api-client", "opentrons-shared-data", "opentrons"]
2222

2323
extras_server = [
2424
"flask[async]",

0 commit comments

Comments
 (0)