Skip to content

Commit

Permalink
trying to implement main.py
Browse files Browse the repository at this point in the history
  • Loading branch information
o-murphy committed Apr 29, 2024
1 parent 07e95dd commit c585e65
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 78 deletions.
205 changes: 129 additions & 76 deletions pydfuutil/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
"""
import argparse
import errno
import io
import logging
import re
import sys
from enum import IntEnum
from enum import IntEnum, Enum, IntFlag
from typing import Any, Callable

import usb.core
from usb.backend import libusb1
import libusb_package

from pydfuutil import __version__, __copyright__, dfu

Expand All @@ -20,7 +22,7 @@


# TODO: not implemented yet

libusb1.get_backend(libusb_package.find_library)

def atoi(s: str) -> int:
"""
Expand All @@ -47,8 +49,8 @@ def usb_path2devnum(path: str) -> int:


def find_dfu_if(dev: usb.core.Device,
handler: Callable[[dfu.DfuIf, Any], int],
v: Any) -> int:
handler: Callable[[dfu.DfuIf, Any], Any] = None,
v: Any = None) -> int:
"""
Find DFU interface for a given USB device.
Expand All @@ -57,15 +59,16 @@ def find_dfu_if(dev: usb.core.Device,
:param v: Additional user-defined data for the callback function.
:return: 0 if no DFU interface is found, or the result of the handler function.
"""
desc = dev.get_active_configuration().desc

for cfg in desc:
configs = dev.configurations() # .desc

for cfg in configs:
for intf in cfg:
if intf.bInterfaceClass == 0xfe and intf.bInterfaceSubClass == 1:
dfu_if = dfu.DfuIf(
vendor=desc.idVendor,
product=desc.idProduct,
bcdDevice=desc.bcdDevice,
vendor=dev.idVendor,
product=dev.idProduct,
bcdDevice=dev.bcdDevice,
configuration=cfg.bConfigurationValue,
interface=intf.bInterfaceNumber,
altsetting=intf.bAlternateSetting,
Expand Down Expand Up @@ -173,20 +176,20 @@ def get_alt_name(dfu_if: dfu.DfuIf) -> [int, str]:
"""
dev = dfu_if.dev
cfg = dev.get_active_configuration()
intf = cfg[(dfu_if.interface, dfu_if.altsetting)]
intf: usb.core.Interface = cfg[(dfu_if.interface, dfu_if.altsetting)]

alt_name_str_idx = intf.iInterface

if alt_name_str_idx:
if not dfu_if.dev.handle:
if not dfu_if.dev:
try:
dfu_if.dev.handle = usb.util.find_descriptor(dev, find_all=True)
dfu_if.dev = usb.util.find_descriptor(dev, find_all=True)
except usb.core.USBError:
dfu_if.dev.handle = None
dfu_if.dev = None

if dfu_if.dev.handle:
if dfu_if.dev:
try:
return dfu_if.dev.handle.get_string(alt_name_str_idx, MAX_DESC_STR_LEN)
return usb.util.get_string(dev, alt_name_str_idx)
except usb.core.USBError:
return -1

Expand All @@ -209,24 +212,23 @@ def print_dfu_if(dfu_if: dfu.DfuIf, v: Any) -> int:
print(f"Found {'DFU' if dfu_if.flags & dfu.Mode.IFF_DFU else 'Runtime'}: "
f"[{dfu_if.vendor:04x}:{dfu_if.product:04x}] devnum={dfu_if.devnum}, "
f"cfg={dfu_if.configuration}, intf={dfu_if.interface}, "
f"alt={dfu_if.altsetting}, name=\"{name.decode('utf-8')}\"")
f"alt={dfu_if.altsetting}, name=\"{name}\"")

return 0


def list_dfu_interfaces(ctx: 'usb.core.Context') -> int:
def list_dfu_interfaces(ctx: list[usb.core.Device]) -> int:
"""
Walk the device tree and print out DFU devices.
:param ctx: The USB context.
:param dif: dfu.DfuIf
:param ctx: libusb context
:return: 0 on success.
"""
list = ctx.get_device_list()

for dev in list:
for dev in ctx:
find_dfu_if(dev, print_dfu_if, None)

usb.util.dispose_resources(ctx)
usb.util.dispose_resources(dev)
return 0


Expand All @@ -250,19 +252,6 @@ def alt_by_name(dfu_if: dfu.DfuIf, v: bytes) -> int:
return dfu_if.altsetting + 1


def _count_cb(dif: dfu.DfuIf, v: list) -> int:
"""
Callback function to count DFU interfaces.
:param dif: The DFU interface struct.
:param v: Pointer to the count variable.
:return: Always returns 0.
"""
count: list[int] = v
count[0] += 1
return 0


def count_dfu_interfaces(dev: usb.core.Device) -> int:
"""
Count DFU interfaces within a single device.
Expand All @@ -271,11 +260,16 @@ def count_dfu_interfaces(dev: usb.core.Device) -> int:
:return: The number of DFU interfaces found.
"""
num_found: int = 0
find_dfu_if(dev, _count_cb, num_found)
def count_cb(dif, v):
nonlocal num_found
num_found += v
return 0

find_dfu_if(dev, count_cb, 1)
return num_found


def iterate_dfu_devices(ctx: 'usb.core.Context', dif: dfu.DfuIf, action, user) -> int:
def iterate_dfu_devices(ctx: list[usb.core.Device], dif: dfu.DfuIf) -> list[usb.core.Device]:
"""
Iterate over all matching DFU capable devices within the system.
Expand All @@ -285,24 +279,21 @@ def iterate_dfu_devices(ctx: 'usb.core.Context', dif: dfu.DfuIf, action, user) -
:param user: Additional user-defined data.
:return: 0 on success, or an error code.
"""
list = ctx.get_device_list()
for dev in list:
desc = dev.get_device_descriptor()
retval = []
for dev in ctx:

if dif and (dif.flags & dfu.Mode.IFF_DEVNUM) and (dev.bus != dif.bus or dev.address != dif.devnum):
continue
if dif and (dif.flags & dfu.Mode.IFF_VENDOR) and desc.idVendor != dif.vendor:
if dif and (dif.flags & dfu.Mode.IFF_VENDOR) and dev.idVendor != dif.vendor:
continue
if dif and (dif.flags & dfu.Mode.IFF_PRODUCT) and desc.idProduct != dif.product:
if dif and (dif.flags & dfu.Mode.IFF_PRODUCT) and dev.idProduct != dif.product:
continue
if not count_dfu_interfaces(dev):
continue
retval.append(dev)

retval = action(dev, user)
if retval:
return retval

return 0
return retval


def found_dfu_device(dev: usb.core.Device, dif: dfu.DfuIf) -> int:
Expand All @@ -317,15 +308,15 @@ def found_dfu_device(dev: usb.core.Device, dif: dfu.DfuIf) -> int:
return 1


def get_first_dfu_device(ctx: 'usb.core.Context', dif: dfu.DfuIf) -> int:
"""
Find the first DFU-capable device and save it in dif.dev.
:param ctx: The USB context.
:param dif: The DFU interface struct.
:return: 0 on success, or an error code.
"""
return iterate_dfu_devices(ctx, dif, found_dfu_device, dif)
# def get_first_dfu_device(ctx: 'usb.core.Context', dif: dfu.DfuIf) -> int:
# """
# Find the first DFU-capable device and save it in dif.dev.
#
# :param ctx: The USB context.
# :param dif: The DFU interface struct.
# :return: 0 on success, or an error code.
# """
# return iterate_dfu_devices(ctx, dif, found_dfu_device, dif)


def count_one_dfu_device(dev: usb.core.Device, user: list) -> int:
Expand All @@ -341,18 +332,18 @@ def count_one_dfu_device(dev: usb.core.Device, user: list) -> int:
return 0


def count_dfu_devices(ctx: 'usb.core.Context', dif: dfu.DfuIf) -> int:
"""
Count the number of DFU devices connected to the USB context.
:param ctx: The libusb context.
:param dif: The DFU interface struct.
:return: The number of DFU devices found.
"""
num_found = 0

iterate_dfu_devices(ctx, dif, count_one_dfu_device, num_found)
return num_found
# def count_dfu_devices(ctx: list[usb.core.Device], dif: dfu.DfuIf) -> int:
# """
# Count the number of DFU devices connected to the USB context.
#
# :param ctx: The libusb context.
# :param dif: The DFU interface struct.
# :return: The number of DFU devices found.
# """
# num_found = 0
#
# iterate_dfu_devices(ctx, dif, count_one_dfu_device, num_found)
# return num_found


def parse_vendprod(string: str) -> tuple[int, int]:
Expand All @@ -365,12 +356,12 @@ def parse_vendprod(string: str) -> tuple[int, int]:
vendor = 0
product = 0

vendor_str, _, product_str = string.partition(':')
vendor_str, product_str = string.split(':')

if vendor_str:
vendor = int(vendor_str, 16)
vendor = atoi(vendor_str)
if product_str:
product = int(product_str, 16)
product = atoi(product_str)

return vendor, product

Expand Down Expand Up @@ -527,15 +518,15 @@ def get_cached_extra_descriptor(dev: usb.core.Device,


VERSION = (f"{__version__}\n\n"
f"('Copyright 2005-2008 Weston Schmidt, Harald Welte and OpenMoko Inc.\n')"
f"('Copyright 2005-2008 Weston Schmidt, Harald Welte and OpenMoko Inc.')\n"
f"{__copyright__}\n"
f"This program is Free Software and has ABSOLUTELY NO WARRANTY\n\n')")
f"This program is Free Software and has ABSOLUTELY NO WARRANTY')\n")


class Mode(IntEnum):
class Mode(IntFlag):
NONE = 0
VERSION = 1
LIST = 1
LIST = 2
DETACH = 3
UPLOAD = 4
DOWNLOAD = 5
Expand Down Expand Up @@ -586,6 +577,9 @@ def main() -> None:

dif: dfu.DfuIf = dfu.DfuIf()
file_name = None
mode = Mode.NONE
device_id_filter = None


if args.verbose:
VERBOSE = True
Expand Down Expand Up @@ -637,12 +631,71 @@ def main() -> None:
file_name = args.download

if args.reset:
mode = Mode.DOWNLOAD
final_reset = 1

if args.serial:
dfuse_options = args.serial

print(VERSION)

if mode == Mode.NONE:
print("Error: You need to specify one of -D or -U\n\n")
parser.print_help()
sys.exit(2)

filter = {}
if device_id_filter:
dif.vendor, dif.product = parse_vendprod(device_id_filter)
print(f"Filter on VID = {hex(dif.vendor)} PID = {hex(dif.product)}\n")
if dif.vendor:
dif.flags |= dfu.Mode.IFF_VENDOR
filter["idVendor"] = dif.vendor
if dif.product:
dif.flags |= dfu.Mode.IFF_PRODUCT
filter["idProduct"] = dif.vendor

if VERBOSE > 1:
dfu.logger.setLevel(logging.DEBUG)

# libusb init
libusb_ctx = list(usb.core.find(find_all=True, **filter))

if mode == Mode.LIST:
list_dfu_interfaces(libusb_ctx)
sys.exit(0)

dfu.init(5000)

dfu_capable = iterate_dfu_devices(libusb_ctx, dif)

if len(dfu_capable) == 0:
print("No DFU capable USB device found")
sys.exit(1)
elif len(dfu_capable) > 1:
# We cannot safely support more than one DFU capable device
# with same vendor/product ID, since during DFU we need to do
# a USB bus reset, after which the target device will get a
# new address */
print("More than one DFU capable USB device found, "
"you might try `--list' and then disconnect all but one "
"device")
sys.exit(3)

# get_first_dfu_device
if not (dev := dfu_capable[0]):
sys.exit(3)

# We have exactly one device. Its libusb_device is now in dif->dev

print("Opening DFU capable USB device... ")

def get_dfu_intf(dif_: dfu.DfuIf, v: Any = None):
nonlocal dif
dif = dif_

find_dfu_if(dev, get_dfu_intf)



if __name__ == '__main__':
main()
4 changes: 2 additions & 2 deletions pydfuutil/dfu.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ class DfuIf: # pylint: disable=too-many-instance-attributes

"""DfuIf structure implementation"""

vendor: int = field(default=0, )
product: int = field(default=0, )
vendor: int = field(default=None, )
product: int = field(default=None, )
bcdDevice: int = field(default=0, )
configuration: int = field(default=0, )
interface: int = field(default=0, )
Expand Down

0 comments on commit c585e65

Please sign in to comment.