Skip to content

Commit

Permalink
Add exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
jdeschamps committed Oct 27, 2021
1 parent c5f06cb commit 717f100
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 142 deletions.
2 changes: 1 addition & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import random as rd

# create a MicroFPGA controller, this will automatically disconnect at the end
with cl.MicroFPGA(n_lasers=3, n_ttls=2, n_servos=3, n_pwms=1, n_ais=2) as mufpga:
with cl.MicroFPGA(n_laser=3, n_ttl=2, n_servo=3, n_pwm=1, n_ai=2) as mufpga:
# checks if successful
print(f'Connected to {mufpga.get_id()}')

Expand Down
46 changes: 23 additions & 23 deletions microfpga/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class MicroFPGA:

def __init__(self, n_lasers, n_ttls, n_servos, n_pwms, n_ais):
def __init__(self, n_laser=0, n_ttl=0, n_servo=0, n_pwm=0, n_ai=0):
self._serial = regint.RegisterInterface()
self.device = self._serial.get_device()

Expand All @@ -20,33 +20,33 @@ def __init__(self, n_lasers, n_ttls, n_servos, n_pwms, n_ais):

if (self.version == signals.CURR_VER) and (self.id == signals.ID_AU or self.id == signals.ID_CU):
# instantiates lasers
for i in range(n_lasers):
for i in range(n_laser):
self._lasers.append(signals.LaserTrigger(i, self._serial))

# instantiates TTLs
for i in range(n_ttls):
for i in range(n_ttl):
self._ttls.append(signals.Ttl(i, self._serial))

# instantiates lasers
for i in range(n_servos):
for i in range(n_servo):
self._servos.append(signals.Servo(i, self._serial))

# instantiates lasers
for i in range(n_pwms):
for i in range(n_pwm):
self._pwms.append(signals.Pwm(i, self._serial))

# instantiates lasers
if self.id == signals.ID_AU:
for i in range(n_ais):
for i in range(n_ai):
self._ais.append(signals.Analog(i, self._serial))
else:
self.disconnect()
if self.version != signals.CURR_VER:
raise Warning('Wrong version: expected ' + str(signals.CURR_VER) + \
raise Warning('Wrong version: expected ' + str(signals.CURR_VER) +
', got ' + str(self.version) + '. The port has been disconnected')

if self.id != signals.ID_AU and self.id != signals.ID_CU:
raise Warning('Wrong board id: expected ' + str(signals.ID_AU) + \
raise Warning('Wrong board id: expected ' + str(signals.ID_AU) +
' (Au) or ' + str(signals.ID_CU) + ' (Cu), got ' + str(
self.id) + '. The port has been disconnected')

Expand Down Expand Up @@ -79,91 +79,91 @@ def get_number_analogs(self):
return len(self._ais)

def set_ttl_state(self, channel, value):
if channel >= 0 and channel < self.get_number_ttls():
if 0 <= channel < self.get_number_ttls():
return self._ttls[channel].set_state(value)
else:
return False

def get_ttl_state(self, channel):
if channel >= 0 and channel < self.get_number_ttls():
if 0 <= channel < self.get_number_ttls():
return self._ttls[channel].get_state()
else:
return -1

def set_servo_state(self, channel, value):
if channel >= 0 and channel < self.get_number_servos():
if 0 <= channel < self.get_number_servos():
return self._servos[channel].set_state(value)
else:
return False

def get_servo_state(self, channel):
if channel >= 0 and channel < self.get_number_servos():
if 0 <= channel < self.get_number_servos():
return self._servos[channel].get_state()
else:
return -1

def set_pwm_state(self, channel, value):
if channel >= 0 and channel < self.get_number_pwms():
if 0 <= channel < self.get_number_pwms():
return self._pwms[channel].set_state(value)
else:
return False

def get_pwm_state(self, channel):
if channel >= 0 and channel < self.get_number_pwms():
if 0 <= channel < self.get_number_pwms():
return self._pwms[channel].get_state()
else:
return -1

def get_analog_state(self, channel):
if channel >= 0 and channel < self.get_number_analogs():
if 0 <= channel < self.get_number_analogs():
return self._ais[channel].get_state()
else:
return -1

def set_mode(self, channel, value):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].set_mode(value)
else:
return False

def get_mode(self, channel):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].get_mode()
else:
return -1

def set_duration(self, channel, value):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].set_duration(value)
else:
return False

def get_duration(self, channel):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].get_duration()
else:
return -1

def set_sequence(self, channel, value):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].set_sequence(value)
else:
return False

def get_sequence(self, channel):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].get_sequence()
else:
return -1

def set_laser_state(self, channel, mode, duration, sequence):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].set_state(mode, duration, sequence)
else:
return False

def get_laser_state(self, channel):
if channel >= 0 and channel < self.get_number_lasers():
if 0 <= channel < self.get_number_lasers():
return self._lasers[channel].get_state()
else:
return [-1, -1, -1]
Expand Down
40 changes: 20 additions & 20 deletions microfpga/regint.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,58 @@
import serial.tools.list_ports


class RegisterInterface:

def __init__(self):
self._device = self.__find_port()

if self._device is not None:
self._serial = serial.Serial(self._device, 1000000, timeout=1)
self._connected = True
else:
self._serial = None
self._connected = False

def __find_port(self):
AU_CU_VID = '0403:6010'
VID_PID = 'VID:PID'[::-1]
SER = ' SER'

# list ports
plist = list(serial.tools.list_ports.comports())

# checks vendor and product IDs
for s in plist:
start = len(s.hwid)-s.hwid[::-1].find(VID_PID)+1
start = len(s.hwid) - s.hwid[::-1].find(VID_PID) + 1
end = s.hwid.find(SER)

vid_pid = s.hwid[start:end]
if vid_pid == AU_CU_VID:
return s.device

def is_connected(self):
return self._connected

def disconnect(self):
self._serial.close()

def get_device(self):
return self._device

def __format_read_request(self, address):
buff = bytearray(5)

buff[0] = 0 << 7
buff[1] = address & 0xff
buff[2] = (address >> 8) & 0xff
buff[3] = (address >> 16) & 0xff
buff[4] = (address >> 24) & 0xff

return buff

def __format_write_request(self, address, data):
buff = bytearray(9)

buff[0] = 1 << 7
buff[1] = address & 0xff
buff[2] = (address >> 8) & 0xff
Expand All @@ -61,19 +62,18 @@ def __format_write_request(self, address, data):
buff[6] = (data >> 8) & 0xff
buff[7] = (data >> 16) & 0xff
buff[8] = (data >> 24) & 0xff

return buff

def __format_to_int(self, data):
val = (data[0] & 0xff) | (data[1] & 0xff) << 8 | (data[2] & 0xff) << 16 | (data[3] & 0xff) << 24
return val

def write(self, address, value):
if self._connected:
self._serial.write(self.__format_write_request(address, value))

def read(self, address):
if self._connected:
self._serial.write(self.__format_read_request(address))
return self.__format_to_int(self._serial.read(4))

Loading

0 comments on commit 717f100

Please sign in to comment.