diff --git a/fnirsi_logger.py b/fnirsi_logger.py index 17a9238..5187c46 100755 --- a/fnirsi_logger.py +++ b/fnirsi_logger.py @@ -1,353 +1,297 @@ #!/usr/bin/env python3 +import logging import sys import os.path import time +from dataclasses import dataclass +from enum import Enum, auto +from typing import Optional, Callable import usb.core import usb.util import argparse -from typing import Union - - -# FNB48 -# Bus 001 Device 020: ID 0483:003a STMicroelectronics FNB-48 -VID = 0x0483 -PID_FNB48 = 0x003A - -# iManufacturer 1 FNIRSI -# iProduct 2 FNB-48 -# iSerial 3 0001A0000000 - -# C1 -# Bus 001 Device 029: ID 0483:003b STMicroelectronics USB Tester -PID_C1 = 0x003B - -# FNB58 -VID_FNB58 = 0x2E3C -PID_FNB58 = 0x5558 - -# FNB48S -# Bus 001 Device 003: ID 2e3c:0049 FNIRSI USB Tester -VID_FNB48S = 0x2E3C -PID_FNB48S = 0x0049 - - -def setup_crc(): - import crc - - # $ ./reveng -w 8 -s $(shuf -n 100 /tmp/dump2.txt) - # width=8 poly=0x39 init=0x42 refin=false refout=false xorout=0x00 check=0x4b residue=0x00 name=(none) - width = 8 - poly = 0x39 - init_value = 0x42 - final_xor_value = 0x00 - reverse_input = False - reverse_output = False - configuration = crc.Configuration(width, poly, init_value, final_xor_value, reverse_input, reverse_output) - if hasattr(crc, "CrcCalculator"): # crc 1.x - crc_calculator = crc.CrcCalculator(configuration, use_table=True) - return crc_calculator.calculate_checksum - else: # crc 2.x+ - calculator = crc.Calculator(configuration, optimized=True) - return calculator.checksum - - -def find_device(): - is_fnb58_or_fnb48s = False - dev = usb.core.find(idVendor=VID, idProduct=PID_FNB48) - if dev is None: - dev = usb.core.find(idVendor=VID, idProduct=PID_C1) - if dev is None: - dev = usb.core.find(idVendor=VID_FNB58, idProduct=PID_FNB58) - if dev: - is_fnb58_or_fnb48s = True - if dev is None: - dev = usb.core.find(idVendor=VID_FNB48S, idProduct=PID_FNB48S) - if dev: - is_fnb58_or_fnb48s = True - return dev, is_fnb58_or_fnb48s - - -def find_hid_interface_num(dev): - # https://github.com/pyusb/pyusb/issues/76#issuecomment-118460796 - for cfg in dev: - for interface in cfg: - if interface.bInterfaceClass == 0x03: # HID class - return interface.bInterfaceNumber - - -def ensure_interface_not_busy(dev, interface): - if dev.is_kernel_driver_active(interface.bInterfaceNumber): + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +class DeviceModel(Enum): + FNB48 = auto() + C1 = auto() + FNB58 = auto() + FNB48S = auto() + +@dataclass +class DeviceInfo: + vid: int + pid: int + model: DeviceModel + refresh_rate: float + +DEVICE_MAP = { + # FNB48 + # Bus 001 Device 020: ID 0483:003a STMicroelectronics FNB-48 + (0x0483, 0x003A): DeviceInfo(0x0483, 0x003A, DeviceModel.FNB48, 0.003), + # C1 + # Bus 001 Device 029: ID 0483:003b STMicroelectronics USB Tester + (0x0483, 0x003B): DeviceInfo(0x0483, 0x003B, DeviceModel.C1, 0.003), + # FNB58 + (0x2E3C, 0x5558): DeviceInfo(0x2E3C, 0x5558, DeviceModel.FNB58, 1.0), + # FNB48S + # Bus 001 Device 003: ID 2e3c:0049 FNIRSI USB Tester + (0x2E3C, 0x0049): DeviceInfo(0x2E3C, 0x0049, DeviceModel.FNB48S, 1.0), +} + +@dataclass +class MeasurementData: + timestamp: float + voltage: float + current: float + dp: float + dn: float + temperature: float + energy: float + capacity: float + +class USBMeter: + def __init__(self, verbose: bool = False, crc: bool = False, alpha: float = 0.9): + self.verbose = verbose + self.use_crc = crc + self.alpha = alpha + self.energy = 0.0 + self.capacity = 0.0 + self.temp_ema = None + self.crc_calculator = self._setup_crc() if crc else None + self.device_info = None + self.device = None + self.ep_in = None + self.ep_out = None + + def _setup_crc(self) -> Optional[Callable]: try: - dev.detach_kernel_driver(interface.bInterfaceNumber) - except usb.core.USBError as e: - print(f"Could not detatch kernel driver from interface({interface.bInterfaceNumber}): {e}", file=sys.stderr) - sys.exit(1) - - -def ensure_all_interfaces_not_busy(dev): - for cfg in dev: - for interface in cfg: - ensure_interface_not_busy(dev, interface) - - -def print_configs(dev): - for cfg in dev: - print("cfg", cfg.bConfigurationValue, file=sys.stderr) - for interface in cfg: - print( - " ", - "interface", - interface.bInterfaceNumber, - interface.bAlternateSetting, - file=sys.stderr, - ) - for ep in interface: - print(" ", "ep", ep.bEndpointAddress, file=sys.stderr) - - -def print_configs_overview(dev): - for cfg in dev: - for interface in cfg: - print(interface) - - -# State for accumulating in decodee -energy = 0.0 -capacity = 0.0 - - -def decode(data, calculate_crc, time_interval, alpha): - global energy - global capacity - temp_ema = None - - # Data is 64 bytes (64 bytes of HID data minus vendor constant 0xaa) - # First byte is HID vendor constant 0xaa - # Second byte is payload type: - # 0x04 is data packet - # Other types (0x03 and maybe other ones) is unknown - # Next 4 samples each 15 bytes. 60 bytes total. - # At the end 2 bytes: - # 1 byte is semi constant with unknown purpose. - # 1 byte (last) is a 8-bit CRC checksum - - packet_type = data[1] - if packet_type != 0x04: - # ignore all non-data packets - # print("Ignoring") - return - - if calculate_crc: - actual_checksum = data[-1] - expected_checksum = calculate_crc(bytearray(data[1:-1])) - if actual_checksum != expected_checksum: - print( - f"Ignoring packet of length {len(data)} with unexpected checksum. " - f"Expected: {expected_checksum:02x} Actual: {actual_checksum:02x}", - file=sys.stderr, + import crc + width = 8 + poly = 0x39 + init_value = 0x42 + final_xor_value = 0x00 + config = crc.Configuration( + width, poly, init_value, final_xor_value, + reverse_input=False, reverse_output=False ) - return - - t0 = time.time() - 4 * time_interval - for i in range(4): - offset = 2 + 15 * i - # 4 + 4 + 2 + 2 + 2 + 1 (unknown constant 1) = 15 bytes - voltage = ( - data[offset + 3] * 256 * 256 * 256 - + data[offset + 2] * 256 * 256 - + data[offset + 1] * 256 - + data[offset + 0] - ) / 100000 - current = ( - data[offset + 7] * 256 * 256 * 256 - + data[offset + 6] * 256 * 256 - + data[offset + 5] * 256 - + data[offset + 4] - ) / 100000 - dp = (data[offset + 8] + data[offset + 9] * 256) / 1000 - dn = (data[offset + 10] + data[offset + 11] * 256) / 1000 - # unknown12 = data[offset + 12] # ? constant 1 # some PD info? - # It does not look to be a sign of current. I tried reversing - # USB-C-In and USB-C-Out, which does reversed orientation of the - # blue arrow for current on device screen, but unknown12 remains 1. - # print(f"unknown{offset+12} {unknown12:02x}") - temp_C = (data[offset + 13] + data[offset + 14] * 256) / 10.0 - if temp_ema is not None: - temp_ema = temp_C * (1.0 - alpha) + temp_ema * alpha - else: - temp_ema = temp_C - power = voltage * current - # TODO(baryluk): This might be slightly inaccurate, if there is sampling jitter. - energy += power * time_interval - capacity += current * time_interval - t = t0 + i * time_interval - print( - f"{t:.3f} {i} {voltage:7.5f} {current:7.5f} {dp:5.3f} {dn:5.3f} " - f"{temp_ema:6.3f} {energy:.6f} {capacity:.6f}" + if hasattr(crc, "CrcCalculator"): + return crc.CrcCalculator(config, use_table=True).calculate_checksum + return crc.Calculator(config, optimized=True).checksum + except ImportError: + logger.warning("CRC package not found, disabling CRC checks") + return None + + def find_device(self) -> None: + for (vid, pid), info in DEVICE_MAP.items(): + device = usb.core.find(idVendor=vid, idProduct=pid) + if device: + self.device = device + self.device_info = info + logger.debug(f"Found {info.model.name} device") + return + raise RuntimeError("No supported USB meter found") + + def setup_device(self) -> None: + self.device.reset() + + if self.verbose: + self._print_device_info() + + # Find and setup HID interface + interface_num = self._find_hid_interface() + self._detach_kernel_driver(interface_num) + + # Configure device + self.device.set_configuration() + cfg = self.device.get_active_configuration() + intf = cfg[(interface_num, 0)] + + # Get endpoints + self.ep_out = self._find_endpoint(intf, usb.util.ENDPOINT_OUT) + self.ep_in = self._find_endpoint(intf, usb.util.ENDPOINT_IN) + + def _find_hid_interface(self) -> int: + for cfg in self.device: + for interface in cfg: + if interface.bInterfaceClass == 0x03: # HID class + return interface.bInterfaceNumber + raise RuntimeError("No HID interface found") + + def _detach_kernel_driver(self, interface_num: int) -> None: + if self.device.is_kernel_driver_active(interface_num): + try: + self.device.detach_kernel_driver(interface_num) + except usb.core.USBError as e: + raise RuntimeError(f"Could not detach kernel driver: {e}") + + def _find_endpoint(self, interface, direction) -> usb.core.Endpoint: + return usb.util.find_descriptor( + interface, + custom_match=lambda e: + usb.util.endpoint_direction(e.bEndpointAddress) == direction ) - # unknown62 = data[62] # data[-2] - # print(f"unknown62 {unknown:02x}") - # print() - -def request_data(is_fnb58_or_fnb48s, ep_out): - # Setup communication with power meter - ep_out.write(b"\xaa\x81" + b"\x00" * 61 + b"\x8e") - ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") - - if is_fnb58_or_fnb48s: - ep_out.write(b"\xaa\x82" + b"\x00" * 61 + b"\x96") - else: - ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") + def _print_device_info(self) -> None: + logger.debug("Device configuration:") + for cfg in self.device: + logger.debug(f"Config {cfg.bConfigurationValue}") + for interface in cfg: + logger.debug(f" Interface {interface.bInterfaceNumber}") + for ep in interface: + logger.debug(f" Endpoint {ep.bEndpointAddress:02x}") + + def initialize_communication(self) -> None: + init_sequence = [ + (b"\xaa\x81", b"\x8e"), + (b"\xaa\x82", b"\x96"), + ] + + if self.device_info.model in (DeviceModel.FNB58, DeviceModel.FNB48S): + init_sequence.append((b"\xaa\x82", b"\x96")) + else: + init_sequence.append((b"\xaa\x83", b"\x9e")) + + for prefix, suffix in init_sequence: + self.ep_out.write(prefix + b"\x00" * 61 + suffix) + time.sleep(0.01) + + def decode_packet(self, data: bytes, timestamp: float) -> Optional[MeasurementData]: + # Data is 64 bytes (64 bytes of HID data minus vendor constant 0xaa) + # First byte is HID vendor constant 0xaa + # Second byte is payload type: + # 0x04 is data packet + # Other types (0x03 and maybe other ones) is unknown + # Next 4 samples each 15 bytes. 60 bytes total. + # At the end 2 bytes: + # 1 byte is semi constant with unknown purpose. + # 1 byte (last) is a 8-bit CRC checksum + + if data[1] != 0x04: # Not a data packet + return None + + if self.use_crc and self.crc_calculator: + if not self._verify_crc(data): + return None + + measurements = [] + base_time = timestamp - 0.04 # 4 samples, 10ms each + + for i in range(4): + offset = 2 + 15 * i + measurement = self._decode_measurement(data[offset:offset+15], base_time + i * 0.01) + measurements.append(measurement) + + return measurements[-1] # Return most recent measurement + + def _decode_measurement(self, data: bytes, timestamp: float) -> MeasurementData: + voltage = int.from_bytes(data[0:4], 'little') / 100000 + current = int.from_bytes(data[4:8], 'little') / 100000 + dp = int.from_bytes(data[8:10], 'little') / 1000 + dn = int.from_bytes(data[10:12], 'little') / 1000 + temp_C = int.from_bytes(data[13:15], 'little') / 10.0 + + # Update running totals + power = voltage * current + self.energy += power * 0.01 # 10ms interval + self.capacity += current * 0.01 + # Update EMA temperature + if self.temp_ema is None: + self.temp_ema = temp_C + else: + self.temp_ema = temp_C * (1.0 - self.alpha) + self.temp_ema * self.alpha + + return MeasurementData( + timestamp=timestamp, + voltage=voltage, + current=current, + dp=dp, + dn=dn, + temperature=self.temp_ema, + energy=self.energy, + capacity=self.capacity + ) -# Argument handling -def str2bool(v: str) -> bool: - vl = v.lower() - if vl in ("yes", "true", "t", "1"): + def _verify_crc(self, data: bytes) -> bool: + actual = data[-1] + expected = self.crc_calculator(bytearray(data[1:-1])) + if actual != expected: + logger.warning( + f"CRC mismatch: expected {expected:02x}, got {actual:02x}" + ) + return False return True - if vl in ("no", "false", "f", "0"): - return False - assert False, "Argument must be a boolean (true or false)" + def run(self) -> None: + print("timestamp voltage_V current_A dp_V dn_V temp_C_ema energy_Ws capacity_As") + + next_refresh = time.time() + self.device_info.refresh_rate + stop = False + + while not stop: + try: + data = self.ep_in.read(64, timeout=5000) + measurement = self.decode_packet(data, time.time()) + + if measurement: + print( + f"{measurement.timestamp:.3f} {measurement.voltage:7.5f} " + f"{measurement.current:7.5f} {measurement.dp:5.3f} " + f"{measurement.dn:5.3f} {measurement.temperature:6.3f} " + f"{measurement.energy:.6f} {measurement.capacity:.6f}" + ) + + if time.time() >= next_refresh: + next_refresh = time.time() + self.device_info.refresh_rate + self.ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") + + if os.path.exists("fnirsi_stop"): + stop = True + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received, stopping...") + stop = True + + self._drain_buffer() + + def _drain_buffer(self) -> None: + logger.debug("Draining USB buffer...") + try: + while True: + data = self.ep_in.read(64, timeout=1000) + if data and self.verbose: + logger.debug(f"Drained {len(data)} bytes") + except usb.core.USBTimeoutError: + logger.debug("Buffer drain complete") def main(): - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("--crc", type=str2bool, help="Enable CRC checks", default=False) - parser.add_argument("--verbose", type=str2bool, help="Show some extra initalization information", default=False) - parser.add_argument("--alpha", type=float, help="Set temperature smoothing factor", default=0.9) + parser = argparse.ArgumentParser() + parser.add_argument("--crc", type=bool, default=False, help="Enable CRC checks") + parser.add_argument("--verbose", type=bool, default=False, help="Enable verbose logging") + parser.add_argument("--alpha", type=float, default=0.9, help="Temperature EMA factor") args = parser.parse_args() - crc_calculator = None - if args.crc: - try: - crc_calculator = setup_crc() # can be None - print("CRC checks enabled", file=sys.stderr) - except ModuleNotFoundError: - print("Warning: crc package not found. crc checks will not be performed", file=sys.stderr) - crc_calculator = None - except Exception as e: - print("Warning: crc calculator got exception: {e}, disabling crc checks", file=sys.stderr) - crc_calculator = None - - # At the moment only 100 sps is supported - sps = 100 - time_interval = 1.0 / sps - - dev, is_fnb58_or_fnb48s = find_device() - assert dev, "Device not found" - if args.verbose: - if is_fnb58_or_fnb48s: - print("Found FNB48S or FNB58 device", file=sys.stderr) - else: - print("Found FNB48 or C1 device", file=sys.stderr) - - dev.reset() - - if args.verbose: - print("Configs:", file=sys.stderr) - print_configs(dev) - print("", file=sys.stderr) - - if args.verbose: - print("Configs overview:", file=sys.stderr) - print_configs_overview(dev) - print("", file=sys.stderr) - - interface_hid_num = find_hid_interface_num(dev) - if args.verbose: - print(f"Using interface_hid_num={interface_hid_num}", file=sys.stderr) - - if args.verbose: - print(f"Ensuring all interfaces not busy and detaching kernel driver, if needed …", file=sys.stderr) - ensure_all_interfaces_not_busy(dev) - - # if args.verbose: - # print(f"Claining interface …", file=sys.stderr) - # usb.util.claim_interface(dev, 0) - - if args.verbose: - print(f"Setting configuration …", file=sys.stderr) - # Set the active configuration. With no arguments, the first - # configuration will be the active one - dev.set_configuration() - - # Get an endpoint instance - cfg = dev.get_active_configuration() - intf = cfg[(interface_hid_num, 0)] - - ep_out = usb.util.find_descriptor( - intf, - # match the first OUT endpoint - custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT, - ) - - ep_in = usb.util.find_descriptor( - intf, - # match the first IN endpoint - custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN, - ) - - assert ep_in - assert ep_out - if args.verbose: - print(f"Starting data request …", file=sys.stderr) - - request_data(is_fnb58_or_fnb48s, ep_out) - - print() # Extra line so concatenation work better in gnuplot. - print("timestamp sample_in_packet voltage_V current_A dp_V dn_V temp_C_ema energy_Ws capacity_As") - - time.sleep(0.1) - refresh = 1.0 if is_fnb58_or_fnb48s else 0.003 # 1 s for FNB58 / FNB48S, 3 ms for others - continue_time = time.time() + refresh - - alpha = args.alpha - - stop = False - while not stop: - try: - data = ep_in.read(size_or_buffer=64, timeout=5000) - - # print("".join([f"{x:02x}" for x in data])) - - decode(data, crc_calculator, time_interval, alpha) - - if time.time() >= continue_time: - continue_time = time.time() + refresh - ep_out.write(b"\xaa\x83" + b"\x00" * 61 + b"\x9e") - except KeyboardInterrupt: - print( - "Terminating due to the keyboard interrupt, please wait until draining is complete …", file=sys.stderr - ) - stop = True - - if os.path.exists("fnirsi_stop"): - stop = True + logger.setLevel(logging.DEBUG) + meter = USBMeter(verbose=args.verbose, crc=args.crc, alpha=args.alpha) + try: - while True: - # Exhaust data in descriptor - if args.verbose: - print("Please wait until draining is finished …", file=sys.stderr) - # We do not know exactly how many bytes to read (even if we track things like refresh - # and continue_time it might be tricky), so just read with timeout. - data = ep_in.read(size_or_buffer=64, timeout=1000) - if data: - if args.verbose: - print(f"During termination drained {len(data)} bytes", file=sys.stderr) - except usb.core.USBTimeoutError as e: - # Exception [Errno 110] Operation timed out - # This is expected during draining, and in fact indicates a success. - if args.verbose: - print("Exiting …", file=sys.stderr) + meter.find_device() + meter.setup_device() + meter.initialize_communication() + meter.run() except Exception as e: - print(f"Exception {type(e)} {e}", file=sys.stderr) - raise - + logger.error(f"Error: {e}") + sys.exit(1) if __name__ == "__main__": - main() + main() \ No newline at end of file