From a1ef14f18adabc2346c52ed07d3fdd95c41b5285 Mon Sep 17 00:00:00 2001 From: Rodriguez Date: Fri, 17 Nov 2023 11:17:41 +0100 Subject: [PATCH] integrate kd3005p and fix usb hunt --- .../panduza_platform/connectors/udev_tty.py | 6 +- .../devices/korad/ka3005/dev_ka3005.py | 101 ++++++++++-------- .../korad/ka3005/itf_korad_ka3005p_ammeter.py | 30 +++--- .../korad/ka3005/itf_korad_ka3005p_bpc.py | 35 +++--- .../ka3005/itf_korad_ka3005p_voltmeter.py | 31 +++--- platform/tests/manual/load_config.py | 11 +- 6 files changed, 120 insertions(+), 94 deletions(-) diff --git a/platform/panduza_platform/connectors/udev_tty.py b/platform/panduza_platform/connectors/udev_tty.py index d227929..5772aa7 100644 --- a/platform/panduza_platform/connectors/udev_tty.py +++ b/platform/panduza_platform/connectors/udev_tty.py @@ -27,9 +27,9 @@ def SerialPortFromUsbSetting(**kwargs): print(f"kwargs={kwargs}\n") # Get parameters - usb_vendor = None if "usb_vendor" not in kwargs else kwargs["usb_vendor"] - usb_model = None if "usb_model" not in kwargs else kwargs["usb_model"] - usb_serial_short = None if "usb_serial_id" not in kwargs else kwargs["usb_serial_id"] + usb_vendor = None if "usb_vendor" not in kwargs else kwargs["usb_vendor"] + usb_model = None if "usb_model" not in kwargs else kwargs["usb_model"] + usb_serial_short = None if "usb_serial_short" not in kwargs else kwargs["usb_serial_short"] # Deep debug if ENABLE_LOCAL_DEBUG: diff --git a/platform/panduza_platform/devices/korad/ka3005/dev_ka3005.py b/platform/panduza_platform/devices/korad/ka3005/dev_ka3005.py index a58b0cb..2a71c74 100644 --- a/platform/panduza_platform/devices/korad/ka3005/dev_ka3005.py +++ b/platform/panduza_platform/devices/korad/ka3005/dev_ka3005.py @@ -35,59 +35,70 @@ def _PZA_DEV_config(self): async def _PZA_DEV_hunt(self): """ """ - print( HuntUsbDevs('0416', '5011', 'tty') ) - - # connector = await ConnectorSerialTty.Get(serial_port_name='/dev/ttyACM0') - # status = await connector.write_and_read_until("*IDN?", time_lock_s=0.5) - - # print(">>> ", status) - - return [] + bag = [] + + matches = HuntUsbDevs('0416', '5011', 'tty') + for match in matches: + # print('------', match) + devname = match.get('DEVNAME', None) + if devname: + try: + # self.log.info(devname) + connector = await ConnectorSerialTty.Get( + serial_port_name=devname, + serial_baudrate=9600 + ) + IDN = await connector.write_and_read_until("*IDN?", time_lock_s=0.5) + self.log.info(f">>>>>>>-------------------- {IDN}") + + if IDN.decode().startswith("KORAD KD3005P"): + ma = self._PZA_DEV_config()["manufacturer"] + mo = self._PZA_DEV_config()["model"] + ref = f"{ma}.{mo}" + bag.append({ + "ref": ref, + "settings": { + "usb_serial_short": match.get('ID_SERIAL_SHORT', None) + } + }) + + except asyncio.exceptions.TimeoutError: + print("tiemout") + + return bag + # --- - def _PZA_DEV_interfaces_generator(self): + async def _PZA_DEV_mount_interfaces(self): """ """ - port = self.get_settings().get("serial_port") - baudrate = 9600 - number_of_channel = 1 + settings = self.get_settings() - interfaces = [] - for chan in range(0, number_of_channel): - interfaces.append( - { - "name": f":channel_{chan}:_ctrl", - "driver": "korad.ka3005p.bpc", - "settings": { - "serial_port_name": port, - "serial_baudrate": baudrate - } - } - ) - interfaces.append( - { - "name": f":channel_{chan}:_vm", - "driver": "korad.ka3005p.voltmeter", - "settings": { - "serial_port_name": port, - "serial_baudrate": baudrate - } - } - ) - interfaces.append( - { - "name": f":channel_{chan}:_am", - "driver": "korad.ka3005p.ammeter", - "settings": { - "serial_port_name": port, - "serial_baudrate": baudrate - } - } - ) + if ('usb_serial_short' not in settings) and ('serial_port_name' not in settings): + raise Exception("At least one settings must be set") + + const_settings = { + "usb_vendor": '0416', + "usb_model": '5011', + "serial_baudrate": 9600 + } + + settings.update(const_settings) + + self.log.info(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!{settings}") + + self.mount_interface( + InterfaceKoradKa3005pBPC(name=f":channel_0:_ctrl", settings=settings) + ) + self.mount_interface( + InterfaceKoradKa3005pAmmeter(name=f":channel_0:_am", settings=settings) + ) + self.mount_interface( + InterfaceKoradKa3005pVoltmeter(name=f":channel_0:_vm", settings=settings) + ) - return interfaces diff --git a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_ammeter.py b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_ammeter.py index 0e98012..725f001 100644 --- a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_ammeter.py +++ b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_ammeter.py @@ -7,21 +7,25 @@ class InterfaceKoradKa3005pAmmeter(MetaDriverAmmeter): + # --- - async def _PZA_DRV_loop_init(self): - """Driver initialization + def __init__(self, name=None, settings={}) -> None: + """Constructor """ + self.settings = settings + super().__init__(name=name) - # Load settings - assert_that(tree, has_key("settings")) - settings = tree["settings"] - assert_that(settings, instance_of(dict)) - - # Checks - assert_that(settings, has_key("serial_baudrate")) + # --- - self.serial_connector = await ConnectorSerialTty.Get(loop,**settings) + async def _PZA_DRV_loop_init(self): + """Driver initialization + """ + # Get the Serial Connector + self.serial_connector = await ConnectorSerialTty.Get(**self.settings) + + # + self.channel = 1 # Call meta class BPC ini await super()._PZA_DRV_loop_init() @@ -29,8 +33,8 @@ async def _PZA_DRV_loop_init(self): # --- async def _PZA_DRV_AMMETER_read_measure_value(self): - await self.serial_connector.write("IOUT1?", time_lock_s=COMMAND_TIME_LOCK) - current = await self.serial_connector.read(n_bytes=5) - return float(current) + current = await self.serial_connector.write_and_read_until(f"IOUT{self.channel}?\n", time_lock_s=COMMAND_TIME_LOCK, read_duration_s=0.1) + return float(current.strip()) + # return float(current[1:].strip()) # --- \ No newline at end of file diff --git a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_bpc.py b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_bpc.py index fb0168e..1d01e61 100644 --- a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_bpc.py +++ b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_bpc.py @@ -9,8 +9,14 @@ COMMAND_TIME_LOCK=0.1 class InterfaceKoradKa3005pBPC(MetaDriverBpc): - # ============================================================================= - # FROM MetaDriverBpc + + # --- + + def __init__(self, name=None, settings={}) -> None: + """Constructor + """ + self.settings = settings + super().__init__(name=name) # --- @@ -27,16 +33,11 @@ def _PZA_DRV_BPC_config(self): async def _PZA_DRV_loop_init(self): """Driver initialization """ - - # Load settings - assert_that(tree, has_key("settings")) - settings = tree["settings"] - assert_that(settings, instance_of(dict)) - - # Checks - assert_that(settings, has_key("serial_baudrate")) - - self.serial_connector = await ConnectorSerialTty.Get(loop,**settings) + # Get the Serial Connector + self.serial_connector = await ConnectorSerialTty.Get(**self.settings) + + # + self.channel = 1 # Call meta class BPC ini await super()._PZA_DRV_loop_init() @@ -46,8 +47,8 @@ async def _PZA_DRV_loop_init(self): async def _PZA_DRV_BPC_read_enable_value(self): # await asyncio.sleep(1) - await self.serial_connector.write("STATUS?", time_lock_s=COMMAND_TIME_LOCK) - status = await self.serial_connector.read(n_bytes=1) + status = await self.serial_connector.write_and_read_until(f"STATUS?", time_lock_s=COMMAND_TIME_LOCK, read_duration_s=0.1) + return bool(status[0] & (1 << 6)) async def _PZA_DRV_BPC_write_enable_value(self, v): @@ -56,8 +57,7 @@ async def _PZA_DRV_BPC_write_enable_value(self, v): # --- async def _PZA_DRV_BPC_read_voltage_value(self): - await self.serial_connector.write("VSET1?", time_lock_s=COMMAND_TIME_LOCK) - voltage = await self.serial_connector.read(n_bytes=5) + voltage = await self.serial_connector.write_and_read_until(f"VSET{self.channel}?\n", time_lock_s=COMMAND_TIME_LOCK, read_duration_s=0.1) return float(voltage) async def _PZA_DRV_BPC_write_voltage_value(self, v): @@ -72,8 +72,7 @@ async def _PZA_DRV_BPC_read_voltage_decimals(self): # --- async def _PZA_DRV_BPC_read_current_value(self): - await self.serial_connector.write("ISET1?", time_lock_s=COMMAND_TIME_LOCK) - current = await self.serial_connector.read(n_bytes=6) + current = await self.serial_connector.write_and_read_until(f"ISET{self.channel}?\n", time_lock_s=COMMAND_TIME_LOCK, read_duration_s=0.1) return float(current[0:5]) async def _PZA_DRV_BPC_write_current_value(self, v): diff --git a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_voltmeter.py b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_voltmeter.py index c299a58..bbc4657 100644 --- a/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_voltmeter.py +++ b/platform/panduza_platform/devices/korad/ka3005/itf_korad_ka3005p_voltmeter.py @@ -9,28 +9,31 @@ class InterfaceKoradKa3005pVoltmeter(MetaDriverVoltmeter): # --- - async def _PZA_DRV_loop_init(self): - """Driver initialization + def __init__(self, name=None, settings={}) -> None: + """Constructor """ + self.settings = settings + super().__init__(name=name) - # Load settings - assert_that(tree, has_key("settings")) - settings = tree["settings"] - assert_that(settings, instance_of(dict)) - - # Checks - assert_that(settings, has_key("serial_baudrate")) + # --- - self.serial_connector = await ConnectorSerialTty.Get(loop,**settings) + async def _PZA_DRV_loop_init(self): + """Driver initialization + """ + # Get the Serial Connector + self.serial_connector = await ConnectorSerialTty.Get(**self.settings) + # + self.channel = 1 + # Call meta class BPC ini await super()._PZA_DRV_loop_init() # --- async def _PZA_DRV_VOLTMETER_read_measure_value(self): - await self.serial_connector.write("VOUT1?", time_lock_s=COMMAND_TIME_LOCK) - voltage = await self.serial_connector.read(n_bytes=5) - return float(voltage) + voltage = await self.serial_connector.write_and_read_until(f"VOUT{self.channel}?\n", time_lock_s=COMMAND_TIME_LOCK, read_duration_s=0.1) + return float(voltage.strip()) + # return float(voltage[1:].strip()) - # --- \ No newline at end of file + # --- diff --git a/platform/tests/manual/load_config.py b/platform/tests/manual/load_config.py index 65c6691..4dafd9f 100644 --- a/platform/tests/manual/load_config.py +++ b/platform/tests/manual/load_config.py @@ -43,10 +43,19 @@ # ] # }) +# plat.dtree.content.set({ +# "devices": [ +# {'ref': 'Korad.KA3005P', 'settings': {'usb_serial_short': '0034A5A10458'}} +# ] +# }) + plat.dtree.content.set({ "devices": [ - {'ref': 'Panduza.FakeBps', 'settings': {'number_of_channel': 1}}, + {'ref': 'Korad.KA3005P', 'settings': {'usb_serial_short': '0034A5A10458'}}, + {'ref': 'Hanmatek.Hm310t', 'settings': {}}, + {'ref': 'Tenma.72-2710', 'settings': {'usb_serial_short': '003222D50454'}} ] }) +