From 717f1002e31f0002a24474d92598fd8fe44edb99 Mon Sep 17 00:00:00 2001 From: jdeschamps <6367888+jdeschamps@users.noreply.github.com> Date: Wed, 27 Oct 2021 11:10:48 +0200 Subject: [PATCH] Add exceptions --- example.py | 2 +- microfpga/controller.py | 46 ++++----- microfpga/regint.py | 40 ++++---- microfpga/signals.py | 206 +++++++++++++++++++++------------------- 4 files changed, 152 insertions(+), 142 deletions(-) diff --git a/example.py b/example.py index 0050168..1c52bd2 100644 --- a/example.py +++ b/example.py @@ -3,7 +3,7 @@ import random as rd # create a MicroFPGA controller, this will automatically disconnect at the end -with cl.MicroFPGA(n_lasers=3, n_ttls=2, n_servos=3, n_pwms=1, n_ais=2) as mufpga: +with cl.MicroFPGA(n_laser=3, n_ttl=2, n_servo=3, n_pwm=1, n_ai=2) as mufpga: # checks if successful print(f'Connected to {mufpga.get_id()}') diff --git a/microfpga/controller.py b/microfpga/controller.py index f33c4c4..44d1707 100644 --- a/microfpga/controller.py +++ b/microfpga/controller.py @@ -4,7 +4,7 @@ class MicroFPGA: - def __init__(self, n_lasers, n_ttls, n_servos, n_pwms, n_ais): + def __init__(self, n_laser=0, n_ttl=0, n_servo=0, n_pwm=0, n_ai=0): self._serial = regint.RegisterInterface() self.device = self._serial.get_device() @@ -20,33 +20,33 @@ def __init__(self, n_lasers, n_ttls, n_servos, n_pwms, n_ais): if (self.version == signals.CURR_VER) and (self.id == signals.ID_AU or self.id == signals.ID_CU): # instantiates lasers - for i in range(n_lasers): + for i in range(n_laser): self._lasers.append(signals.LaserTrigger(i, self._serial)) # instantiates TTLs - for i in range(n_ttls): + for i in range(n_ttl): self._ttls.append(signals.Ttl(i, self._serial)) # instantiates lasers - for i in range(n_servos): + for i in range(n_servo): self._servos.append(signals.Servo(i, self._serial)) # instantiates lasers - for i in range(n_pwms): + for i in range(n_pwm): self._pwms.append(signals.Pwm(i, self._serial)) # instantiates lasers if self.id == signals.ID_AU: - for i in range(n_ais): + for i in range(n_ai): self._ais.append(signals.Analog(i, self._serial)) else: self.disconnect() if self.version != signals.CURR_VER: - raise Warning('Wrong version: expected ' + str(signals.CURR_VER) + \ + raise Warning('Wrong version: expected ' + str(signals.CURR_VER) + ', got ' + str(self.version) + '. The port has been disconnected') if self.id != signals.ID_AU and self.id != signals.ID_CU: - raise Warning('Wrong board id: expected ' + str(signals.ID_AU) + \ + raise Warning('Wrong board id: expected ' + str(signals.ID_AU) + ' (Au) or ' + str(signals.ID_CU) + ' (Cu), got ' + str( self.id) + '. The port has been disconnected') @@ -79,91 +79,91 @@ def get_number_analogs(self): return len(self._ais) def set_ttl_state(self, channel, value): - if channel >= 0 and channel < self.get_number_ttls(): + if 0 <= channel < self.get_number_ttls(): return self._ttls[channel].set_state(value) else: return False def get_ttl_state(self, channel): - if channel >= 0 and channel < self.get_number_ttls(): + if 0 <= channel < self.get_number_ttls(): return self._ttls[channel].get_state() else: return -1 def set_servo_state(self, channel, value): - if channel >= 0 and channel < self.get_number_servos(): + if 0 <= channel < self.get_number_servos(): return self._servos[channel].set_state(value) else: return False def get_servo_state(self, channel): - if channel >= 0 and channel < self.get_number_servos(): + if 0 <= channel < self.get_number_servos(): return self._servos[channel].get_state() else: return -1 def set_pwm_state(self, channel, value): - if channel >= 0 and channel < self.get_number_pwms(): + if 0 <= channel < self.get_number_pwms(): return self._pwms[channel].set_state(value) else: return False def get_pwm_state(self, channel): - if channel >= 0 and channel < self.get_number_pwms(): + if 0 <= channel < self.get_number_pwms(): return self._pwms[channel].get_state() else: return -1 def get_analog_state(self, channel): - if channel >= 0 and channel < self.get_number_analogs(): + if 0 <= channel < self.get_number_analogs(): return self._ais[channel].get_state() else: return -1 def set_mode(self, channel, value): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].set_mode(value) else: return False def get_mode(self, channel): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].get_mode() else: return -1 def set_duration(self, channel, value): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].set_duration(value) else: return False def get_duration(self, channel): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].get_duration() else: return -1 def set_sequence(self, channel, value): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].set_sequence(value) else: return False def get_sequence(self, channel): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].get_sequence() else: return -1 def set_laser_state(self, channel, mode, duration, sequence): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].set_state(mode, duration, sequence) else: return False def get_laser_state(self, channel): - if channel >= 0 and channel < self.get_number_lasers(): + if 0 <= channel < self.get_number_lasers(): return self._lasers[channel].get_state() else: return [-1, -1, -1] diff --git a/microfpga/regint.py b/microfpga/regint.py index addaedc..f764f90 100644 --- a/microfpga/regint.py +++ b/microfpga/regint.py @@ -1,57 +1,58 @@ import serial.tools.list_ports + class RegisterInterface: - + def __init__(self): self._device = self.__find_port() - + if self._device is not None: self._serial = serial.Serial(self._device, 1000000, timeout=1) self._connected = True else: self._serial = None self._connected = False - + def __find_port(self): AU_CU_VID = '0403:6010' VID_PID = 'VID:PID'[::-1] SER = ' SER' - + # list ports plist = list(serial.tools.list_ports.comports()) - + # checks vendor and product IDs for s in plist: - start = len(s.hwid)-s.hwid[::-1].find(VID_PID)+1 + start = len(s.hwid) - s.hwid[::-1].find(VID_PID) + 1 end = s.hwid.find(SER) - + vid_pid = s.hwid[start:end] if vid_pid == AU_CU_VID: return s.device - + def is_connected(self): return self._connected - + def disconnect(self): self._serial.close() - + def get_device(self): return self._device - + def __format_read_request(self, address): buff = bytearray(5) - + buff[0] = 0 << 7 buff[1] = address & 0xff buff[2] = (address >> 8) & 0xff buff[3] = (address >> 16) & 0xff buff[4] = (address >> 24) & 0xff - + return buff - + def __format_write_request(self, address, data): buff = bytearray(9) - + buff[0] = 1 << 7 buff[1] = address & 0xff buff[2] = (address >> 8) & 0xff @@ -61,19 +62,18 @@ def __format_write_request(self, address, data): buff[6] = (data >> 8) & 0xff buff[7] = (data >> 16) & 0xff buff[8] = (data >> 24) & 0xff - + return buff - + def __format_to_int(self, data): val = (data[0] & 0xff) | (data[1] & 0xff) << 8 | (data[2] & 0xff) << 16 | (data[3] & 0xff) << 24 return val - + def write(self, address, value): if self._connected: self._serial.write(self.__format_write_request(address, value)) - + def read(self, address): if self._connected: self._serial.write(self.__format_read_request(address)) return self.__format_to_int(self._serial.read(4)) - diff --git a/microfpga/signals.py b/microfpga/signals.py index 9b27352..31482c7 100644 --- a/microfpga/signals.py +++ b/microfpga/signals.py @@ -8,12 +8,12 @@ NUM_AI = 8 ADDR_MODE = 0 -ADDR_DUR = ADDR_MODE+NUM_LASERS -ADDR_SEQ = ADDR_DUR+NUM_LASERS -ADDR_TTL = ADDR_SEQ+NUM_LASERS -ADDR_SERVO = ADDR_TTL+NUM_TTL -ADDR_PWM = ADDR_SERVO+NUM_SERVOS -ADDR_AI = ADDR_PWM+NUM_PWM +ADDR_DUR = ADDR_MODE + NUM_LASERS +ADDR_SEQ = ADDR_DUR + NUM_LASERS +ADDR_TTL = ADDR_SEQ + NUM_LASERS +ADDR_SERVO = ADDR_TTL + NUM_TTL +ADDR_PWM = ADDR_SERVO + NUM_SERVOS +ADDR_AI = ADDR_PWM + NUM_PWM ADDR_VER = 100 ADDR_ID = 101 @@ -31,220 +31,230 @@ MAX_MODE = 4 MAX_DURATION = 65535 MAX_SEQUENCE = 65535 +MAX_AI = 65535 def format_sequence(string): b = True for ch in string: if ch != '0' and ch != '1': - b = False - + b = False + if b and len(string) == 16: - return int(string,2) - else : + return int(string, 2) + else: return -1 + class Signal(ABC): - + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface, output: bool = True): if channel_id < self.get_num_signal(): self.channel_id = channel_id self.output = output self._serial_com = serial_com else: - raise Exception('Index exceed maximum number of '+self.get_name()+' signals.') - + raise Exception(f'{channel_id} exceeds maximum number of {self.get_name()} signals.') + @abstractmethod def get_address(self): pass - - @abstractmethod - def is_allowed(self, value): + + @abstractmethod + def get_max(self): pass - - @abstractmethod + + def is_allowed(self, value): + return 0 <= value <= self.get_max() + + @abstractmethod def get_num_signal(self): pass - - @abstractmethod + + @abstractmethod def get_name(self): pass - + def set_state(self, value): if self.output and self.is_allowed(value): - self._serial_com.write(self.get_address()+self.channel_id, value) + self._serial_com.write(self.get_address() + self.channel_id, value) return True else: - return False - + raise ValueError(f'Value {value} not allowed in {self.get_name()} (channel {self.channel_id}).') + def get_state(self): - return self._serial_com.read(self.get_address()+self.channel_id) - - + return self._serial_com.read(self.get_address() + self.channel_id) + + class Ttl(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_TTL - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_TTL) + + def get_max(self): + return MAX_TTL def get_num_signal(self): return NUM_TTL - + def get_name(self): return 'TTL' - + + class Pwm(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_PWM - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_PWM) + + def get_max(self): + return MAX_PWM def get_num_signal(self): return NUM_PWM - + def get_name(self): return 'PWM' - + + class Servo(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_SERVO - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_SERVO) + + def get_max(self): + return MAX_SERVO def get_num_signal(self): return NUM_SERVOS - + def get_name(self): return 'Servos' - + + class Analog(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com, False) - + def get_address(self): return ADDR_AI - - def is_allowed(self, value): - return False + + def get_max(self): + return -1 def get_num_signal(self): return NUM_AI - + def get_name(self): return 'AI' - + + class _Mode(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_MODE - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_MODE) + + def get_max(self): + return MAX_MODE def get_num_signal(self): return NUM_LASERS - + def get_name(self): return 'Laser mode' + class _Duration(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_DUR - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_DUR) + + def get_max(self): + return MAX_DUR def get_num_signal(self): return NUM_LASERS - + def get_name(self): return 'Laser duration' - + + class _Sequence(Signal): - - def __init__(self, channel_id:int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): Signal.__init__(self, channel_id, serial_com) - + def get_address(self): return ADDR_SEQ - - def is_allowed(self, value): - return (value >= 0) and (value <= MAX_SEQ) - + + def get_max(self): + return MAX_SEQ + def get_num_signal(self): return NUM_LASERS - + def get_name(self): return 'Laser sequence' - - + + class LaserTrigger: MODE_OFF = 0 MODE_ON = 1 MODE_RISING = 2 MODE_FALLING = 3 MODE_CAMERA = 4 - - def __init__(self, channel_id: int, serial_com:regint.RegisterInterface): + + def __init__(self, channel_id: int, serial_com: regint.RegisterInterface): self.channel_id = channel_id - + self.mode = _Mode(channel_id, serial_com) self.duration = _Duration(channel_id, serial_com) self.seq = _Sequence(channel_id, serial_com) - + def set_mode(self, value): return self.mode.set_state(value) - + def get_mode(self): return self.mode.get_state() - + def set_duration(self, value): return self.duration.set_state(value) - + def get_duration(self): return self.duration.get_state() - + def set_sequence(self, value): return self.seq.set_state(value) - + def get_sequence(self): return self.seq.get_state() - + def set_state(self, mode, duration, sequence): b = self.set_mode(mode) if not b: return b - + b = self.set_duration(duration) if not b: return b - + b = self.set_sequence(sequence) return b - + def get_state(self): return [self.get_mode(), self.get_duration(), self.get_sequence()] - \ No newline at end of file