Skip to content

Commit

Permalink
Merge pull request #28 from wilsonh1/main
Browse files Browse the repository at this point in the history
Update Tecan config and plate robot
  • Loading branch information
rickwierenga authored Aug 2, 2023
2 parents 861d2e9 + 6c259c8 commit 5cebe3f
Show file tree
Hide file tree
Showing 4 changed files with 413 additions and 80 deletions.
176 changes: 119 additions & 57 deletions pylabrobot/liquid_handling/backends/tecan/EVO.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Resource,
Coordinate,
Liquid,
TecanPlateCarrier,
TecanTipRack,
TecanPlate,
TecanTip,
Expand All @@ -41,9 +42,9 @@ class TecanLiquidHandler(USBBackend, metaclass=ABCMeta):
@abstractmethod
def __init__(
self,
packet_read_timeout: int = 3,
read_timeout: int = 30,
write_timeout: int = 30,
packet_read_timeout: int = 120,
read_timeout: int = 300,
write_timeout: int = 300,
):
"""
Expand Down Expand Up @@ -155,11 +156,11 @@ class EVO(TecanLiquidHandler):
def __init__(
self,
diti_count: int = 0,
packet_read_timeout: int = 10,
read_timeout: int = 30,
write_timeout: int = 30,
packet_read_timeout: int = 120,
read_timeout: int = 300,
write_timeout: int = 300,
):
""" Create a new STAR interface.
""" Create a new EVO interface.
Args:
packet_read_timeout: timeout in seconds for reading a single packet.
Expand Down Expand Up @@ -243,14 +244,17 @@ async def setup(self):
if self.roma_connected: # position_initialization_x in reverse order from setup_arm
self.roma = RoMa(self, EVO.ROMA)
await self.roma.position_initialization_x()
# move to home position (TBD) after initialization
await self.roma.set_vector_coordinate_position(1, 9000, 2000, 2464, 1800, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()
if self.liha_connected:
self.liha = LiHa(self, EVO.LIHA)
await self.liha.position_initialization_x()

self._num_channels = await self.liha.report_number_tips()
self._x_range = await self.liha.report_x_param(5)
self._y_range = (await self.liha.report_y_param(5))[0]
self._z_range = (await self.liha.report_z_param(5))[0] # TODO: assert all are same?
self._z_range = (await self.liha.report_z_param(5))[0]

# Initialize plungers. Assumes wash station assigned at rail 1.
await self.liha.set_z_travel_height([self._z_range] * self.num_channels)
Expand All @@ -263,8 +267,6 @@ async def setup(self):
await self.liha.move_plunger_relative([-100] * self.num_channels)
await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels)

# TODO: cache arm positions to prevent collisions

async def setup_arm(self, module):
try:
await self.send_command(module, command="PIA")
Expand All @@ -276,6 +278,10 @@ async def setup_arm(self, module):
await self.send_command(module, command="BMX", params=[2])
return True

async def _park_liha(self):
await self.liha.set_z_travel_height([self._z_range] * self.num_channels)
await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels)

# ============== LiquidHandlerBackend methods ==============

async def aspirate(
Expand Down Expand Up @@ -495,28 +501,29 @@ async def move_resource(self, move: Move):
""" Pick up a resource and move it to a new location. """

# TODO: implement PnP for moving tubes
assert self.roma_connected

x, y, z = self._roma_positions(move.resource, move.resource.get_absolute_location() )
z_range = await self.roma.report_z_param(5)
x, y, z = self._roma_positions(move.resource, move.resource.get_absolute_location(), z_range)
h = int(move.resource.get_size_y() * 10)
xt, yt, zt = self._roma_positions(move.resource, move.to)
xt, yt, zt = self._roma_positions(move.resource, move.to, z_range)

# move to resource
# TODO: check collision with other arms
await self.roma.set_smooth_move_x(1)
await self.roma.set_fast_speed_x(10000)
await self.roma.set_fast_speed_y(5000, 1500)
await self.roma.set_fast_speed_z(1000)
await self.roma.set_fast_speed_z(1300)
await self.roma.set_fast_speed_r(5000, 1500)
await self.roma.set_vector_coordinate_position(1, x, y, 1608, 900, None, 1, 0)
await self.roma.set_vector_coordinate_position(1, x, y, z["safe"], 900, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()
await self.roma.set_smooth_move_x(0)

# pick up resource
await self.roma.position_absolute_g(900) # TODO: verify
await self.roma.set_target_window_class(1, 0, 0, 55, 135, 0)
await self.roma.set_vector_coordinate_position(1, x, y, 1241, 900, None, 1, 1)
await self.roma.set_target_window_class(1, 0, 0, 0, 135, 0)
await self.roma.set_vector_coordinate_position(1, x, y, z["travel"], 900, None, 1, 1)
# TODO verify z param
await self.roma.set_vector_coordinate_position(1, x, y, z, 900, None, 1, 0)
await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()
await self.roma.set_fast_speed_y(3500, 1000)
await self.roma.set_fast_speed_r(2000, 600)
Expand All @@ -528,21 +535,21 @@ async def move_resource(self, move: Move):
await self.roma.set_target_window_class(2, 0, 0, 0, 53, 0)
await self.roma.set_target_window_class(3, 0, 0, 0, 55, 0)
await self.roma.set_target_window_class(4, 45, 0, 0, 0, 0)
await self.roma.set_vector_coordinate_position(1, x, y, z, 900, None, 1, 1)
await self.roma.set_vector_coordinate_position(2, x, y, 1241, 900, None, 1, 2)
await self.roma.set_vector_coordinate_position(3, x, y, 1608, 900, None, 1, 3)
await self.roma.set_vector_coordinate_position(4, xt, yt, 1608, 900, None, 1, 4)
await self.roma.set_vector_coordinate_position(5, xt, yt, 1241, 900, None, 1, 3)
await self.roma.set_vector_coordinate_position(6, xt, yt, zt, 900, None, 1, 0)
await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 1)
await self.roma.set_vector_coordinate_position(2, x, y, z["travel"], 900, None, 1, 2)
await self.roma.set_vector_coordinate_position(3, x, y, z["safe"], 900, None, 1, 3)
await self.roma.set_vector_coordinate_position(4, xt, yt, zt["safe"], 900, None, 1, 4)
await self.roma.set_vector_coordinate_position(5, xt, yt, zt["travel"], 900, None, 1, 3)
await self.roma.set_vector_coordinate_position(6, xt, yt, zt["end"], 900, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()

# release resource
await self.roma.position_absolute_g(900)
await self.roma.set_fast_speed_y(5000, 1500)
await self.roma.set_fast_speed_r(5000, 1500)
await self.roma.set_vector_coordinate_position(1, xt, yt, zt, 900, None, 1, 1)
await self.roma.set_vector_coordinate_position(2, xt, yt, 1241, 900, None, 1, 2)
await self.roma.set_vector_coordinate_position(3, xt, yt, 1608, 900, None, 1, 0)
await self.roma.set_vector_coordinate_position(1, xt, yt, zt["end"], 900, None, 1, 1)
await self.roma.set_vector_coordinate_position(2, xt, yt, zt["travel"], 900, None, 1, 2)
await self.roma.set_vector_coordinate_position(3, xt, yt, zt["safe"], 900, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()
await self.roma.set_fast_speed_y(3500, 1000)
await self.roma.set_fast_speed_r(2000, 600)
Expand Down Expand Up @@ -590,7 +597,7 @@ def get_z_position(z, z_off, tip_length):
for i, channel in enumerate(use_channels):
location = ops[i].resource.get_absolute_location() + ops[i].resource.center()
x_positions[channel] = int((location.x - 100) * 10)
y_positions[channel] = int((345 - location.y) * 10) # TODO: verify
y_positions[channel] = int((346.5 - location.y) * 10) # TODO: verify

par = ops[i].resource.parent
if not isinstance(par, (TecanPlate, TecanTipRack)):
Expand Down Expand Up @@ -634,11 +641,11 @@ def _aspirate_airgap(
assert tlc is not None
pvl[channel] = 0
if airgap == "lag":
sep[channel] = int(tlc.aspirate_lag_speed * 6) # 12? TODO: verify step unit
ppr[channel] = int(tlc.aspirate_lag_volume * 3) # 6?
sep[channel] = int(tlc.aspirate_lag_speed * 12) # 6? TODO: verify step unit
ppr[channel] = int(tlc.aspirate_lag_volume * 6) # 3?
elif airgap == "tag":
sep[channel] = int(tlc.aspirate_tag_speed * 6) # 12?
ppr[channel] = int(tlc.aspirate_tag_volume * 3) # 6?
sep[channel] = int(tlc.aspirate_tag_speed * 12) # 6?
ppr[channel] = int(tlc.aspirate_tag_volume * 6) # 3?

return pvl, sep, ppr

Expand Down Expand Up @@ -703,9 +710,9 @@ def _aspirate_action(
tlc = tecan_liquid_classes[i]
z = zadd[channel]
assert tlc is not None and z is not None
sep[channel] = int(tlc.aspirate_speed * 6) # 12?
sep[channel] = int(tlc.aspirate_speed * 12) # 6?
ssz[channel] = round(z * tlc.aspirate_speed / ops[i].volume)
mtr[channel] = round(ops[i].volume * 3) # 6?
mtr[channel] = round(ops[i].volume * 6) # 3?
ssz_r[channel] = int(tlc.aspirate_retract_speed * 10)

return ssz, sep, stz, mtr, ssz_r
Expand Down Expand Up @@ -733,29 +740,49 @@ def _dispense_action(
for i, channel in enumerate(use_channels):
tlc = tecan_liquid_classes[i]
assert tlc is not None
sep[channel] = int(tlc.dispense_speed * 6) # 12?
spp[channel] = int(tlc.dispense_breakoff * 6) # 12?
sep[channel] = int(tlc.dispense_speed * 12) # 6?
spp[channel] = int(tlc.dispense_breakoff * 12) # 6?
stz[channel] = 0
mtr[channel] = -round(ops[i].volume * 3) # 6?
mtr[channel] = -round(ops[i].volume * 6) # 3?

return sep, spp, stz, mtr

def _roma_positions(
self,
resource: Resource,
offset: Coordinate
) -> Tuple[int, int, int]:
offset: Coordinate,
z_range: int
) -> Tuple[int, int, Dict[str, int]]:
""" Creates x, y, and z positions used by RoMa ops. """

center = offset + resource.center()
return int((center.x - 100)* 10) + 1240, int((344.5 - center.y) * 10), int(center.z * 10) + 17
par = resource.parent
if par is None:
raise ValueError(f"Operation is not supported by resource {resource}.")
par = par.parent
if not isinstance(par, TecanPlateCarrier):
raise ValueError(f"Operation is not supported by resource {par}.")

if par.roma_x is None or par.roma_y is None or par.roma_z_safe is None \
or par.roma_z_travel is None or par.roma_z_end is None:
raise ValueError(f"Operation is not supported by resource {par}.")
x_position = int((offset.x - 100)* 10 + par.roma_x)
y_position = int((347.1 - (offset.y + resource.get_size_y())) * 10 + par.roma_y)
z_positions = {
"safe": z_range - int(par.roma_z_safe),
"travel": z_range - int(par.roma_z_travel - offset.z * 10),
"end": z_range - int(par.roma_z_end - offset.z * 10)
}

return x_position, y_position, z_positions


class EVOArm:
"""
Provides firmware commands for EVO arms
Provides firmware commands for EVO arms. Caches arm positions.
"""

_pos_cache: Dict[str, int] = {}

def __init__(
self,
backend: EVO,
Expand Down Expand Up @@ -791,17 +818,6 @@ async def report_y_param(self, param: int) -> List[int]:
command="RPY", params=[param]))["data"]
return resp

async def report_z_param(self, param: int) -> List[int]:
""" Report current parameters for z-axis.
Args:
param: 0 - current position, 5 - actual machine range
"""

resp: List[int] = (await self.backend.send_command(module=self.module,
command="RPZ", params=[param]))["data"]
return resp


class LiHa(EVOArm):
"""
Expand All @@ -816,6 +832,17 @@ async def initialize_plunger(self, tips):
"""
await self.backend.send_command(module=self.module, command="PID", params=[tips])

async def report_z_param(self, param: int) -> List[int]:
""" Report current parameters for z-axis.
Args:
param: 0 - current position, 5 - actual machine range
"""

resp: List[int] = (await self.backend.send_command(module=self.module,
command="RPZ", params=[param]))["data"]
return resp

async def report_number_tips(self) -> int:
""" Report number of tips on arm. """

Expand All @@ -832,10 +859,22 @@ async def position_absolute_all_axis(self, x: int, y: int, ys: int, z: List[int]
ys: absolute y spacing in 1/10 mm, must be between 90 and 380
z: absolute z position in 1/10 mm for each channel, must be in
allowed machine range
Raises:
TecanError: if moving to the target position causes a collision
"""

cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0))
for module, pos in EVOArm._pos_cache.items():
if module == self.module:
continue
if cur_x < pos < x or x < pos < cur_x or abs(pos - x) < 1500:
raise TecanError("Invalid command (collision)", self.module, 2)

await self.backend.send_command(module=self.module, command="PAA", params=list([x, y, ys] + z))

EVOArm._pos_cache[self.module] = x

async def position_valve_logical(self, param: List[Optional[int]]):
""" Position valve logical for each channel.
Expand Down Expand Up @@ -1021,6 +1060,17 @@ class RoMa(EVOArm):
Provides firmware commands for the RoMa plate robot
"""

async def report_z_param(self, param: int) -> int:
""" Report current parameter for z-axis.
Args:
param: 0 - current position, 5 - actual machine range
"""

resp: List[int] = (await self.backend.send_command(module=self.module,
command="RPZ", params=[param]))["data"]
return resp[0]

async def report_r_param(self, param: int) -> int:
""" Report current parameter for r-axis.
Expand Down Expand Up @@ -1095,10 +1145,10 @@ async def set_fast_speed_r(self, speed: Optional[int], accel: Optional[int] = No
async def set_vector_coordinate_position(
self,
v: int,
x: Optional[int],
y: Optional[int],
z: Optional[int],
r: Optional[int],
x: int,
y: int,
z: int,
r: int,
g: Optional[int],
speed: int,
tw: int = 0
Expand All @@ -1114,8 +1164,18 @@ async def set_vector_coordinate_position(
g: aboslute g position in 1/10 mm
speed: speed select, 0 - slow, 1 - fast
tw: target window class, set with STW
Raises:
TecanError: if moving to the target position causes a collision
"""

cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0))
for module, pos in EVOArm._pos_cache.items():
if module == self.module:
continue
if cur_x < pos < x or x < pos < cur_x or abs(pos - x) < 1500:
raise TecanError("Invalid command (collision)", self.module, 2)

await self.backend.send_command(module=self.module, command="SAA",
params=[v, x, y, z, r, g, speed, 0, tw])

Expand All @@ -1124,6 +1184,8 @@ async def action_move_vector_coordinate_position(self):

await self.backend.send_command(module=self.module, command="AAC")

EVOArm._pos_cache[self.module] = await self.report_x_param(0)

async def position_absolute_g(self, g: int):
""" Position absolute for G-axis
Expand Down
Loading

0 comments on commit 5cebe3f

Please sign in to comment.