Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental ble-nus-repl support. #135

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions rshell/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,15 @@ def connect_telnet(name, ip_address=None, user='micro', password='python'):
add_device(dev)


def connect_ble(mac):
if not QUIET:
if mac == "ble":
print('Connecting ble...')
else:
print('Connecting ble to %s...' % (mac))
dev = DeviceBle(mac)
add_device(dev)

def connect_serial(port, baud=115200, wait=0):
"""Connect to a MicroPython board via a serial port."""
if not QUIET:
Expand Down Expand Up @@ -1679,6 +1688,32 @@ def timeout(self, value):
pass


class DeviceBle(Device):

def __init__(self, mac):
self.dev_name_short = 'mpus ({})'.format(mac, "" if mac == "ble" else mac)
self.dev_name_long = self.dev_name_short

try:
pyb = Pyboard(mac)
except KeyboardInterrupt:
raise DeviceError('Interrupted')
Device.__init__(self, pyb)

def default_board_name(self):
return 'mpus'

@property
def timeout(self):
"""Not implemented for the ble connection."""
return None

@timeout.setter
def timeout(self, value):
"""Not implemented for the ble connection."""
pass


class AutoBool(object):
"""A simple class which allows a boolean to be set to False in conjunction
with a with: statement.
Expand Down Expand Up @@ -2099,6 +2134,7 @@ def do_connect(self, line):
"""connect TYPE TYPE_PARAMS
connect serial port [baud]
connect telnet ip-address-or-name
connect ble [mac-addr]

Connects a pyboard to rshell.
"""
Expand Down Expand Up @@ -2128,6 +2164,12 @@ def do_connect(self, line):
return
name = args[1]
connect_telnet(name)
elif connect_type == "ble":
if num_args < 2:
mac = "ble"
else:
mac = args[1]
connect_ble(mac)
else:
print_err('Unrecognized connection TYPE: {}'.format(connect_type))

Expand Down
148 changes: 133 additions & 15 deletions rshell/pyboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,113 @@ def inWaiting(self):
else:
return n_waiting


class BleNusToSerial:
NUS_SERVICE_ID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
NUS_RX_CHAR = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
NUS_TX_CHAR = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'

def __init__(self, mac_addr=None, read_timeout=None):
import asyncio
from bleak import BleakClient, BleakScanner
from collections import deque
from threading import Thread

self.read_timeout = read_timeout
self.ble = None
self.tx_fifo = deque()
self.rx_fifo = deque()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

async def _scan_and_conn(_self):
async with BleakScanner() as scanner:
await asyncio.sleep(5.0)
devices = list(sorted(await scanner.get_discovered_devices(), key=lambda d: d.rssi, reverse=True))
for d in devices:
# print(d, d.rssi)
if d.name == "mpus" and BleNusToSerial.NUS_SERVICE_ID in d.metadata["uuids"]:
# Correct device, lets connect device and if success run ble thread in main
# print("Correct device")
ble = BleakClient(d.address, loop=_self.loop)
con_succ = await ble.connect()
# print("Connected...", con_succ)
if con_succ:
_self.ble = ble

return

self.loop.run_until_complete(_scan_and_conn(self))
# print("Done init")
if self.ble is None:
# print("No conn")
raise PyboardError('Failed to establish a ble connection with the board')
else:
self.thr_run = True
self.thr = Thread(target=self._ble_thread)
self.thr.daemon = True
self.thr.start()

def _ble_thread(self):
import asyncio
async def _ble_async(_self):
def not_rx(sender, data):
# print("Got", data)
_self.rx_fifo.extend(list(data))

await _self.ble.start_notify(BleNusToSerial.NUS_RX_CHAR, not_rx)
while _self.thr_run:
if len(_self.tx_fifo) > 0:
data = b''
# Write in chunks
while len(_self.tx_fifo) > 0 and len(data) < 16: # Should be 22, but this works
data += bytes([_self.tx_fifo.popleft()])
await self.ble.write_gatt_char(BleNusToSerial.NUS_TX_CHAR, data, True)
else:
await asyncio.sleep(0.1) # Sleep while waiting
await _self.ble.stop_notify(BleNusToSerial.NUS_RX_CHAR)

res = self.loop.run_until_complete(_ble_async(self))

def __del__(self):
self.close()

def close(self):
async def _close(_ble):
await _ble.disconnect()

if self.ble:
if self.thr and self.thr.is_alive():
self.thr_run = False
self.thr.join()
self.loop.run_until_complete(_close(self.ble))
self.loop.close()
print("Clean end")

def read(self, size=1):
timeout_count = 0
while len(self.rx_fifo) < size:
time.sleep(0.25)
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
break
timeout_count += 1

data = b''
while len(data) < size and len(self.rx_fifo) > 0:
data += bytes([self.rx_fifo.popleft()])
return data

def write(self, data):
while len(self.tx_fifo) > 22:
time.sleep(0.1) # Throttle write to ensure successful writes
self.tx_fifo.extend(list(data))
# print("Writing", data)
return len(data)

def inWaiting(self):
return len(self.rx_fifo)


def parse_bool(str):
return str == '1' or str.lower() == 'true'

Expand All @@ -124,6 +231,9 @@ def __init__(self, device, baudrate=115200, user='micro', password='python', wai
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
elif device and (device == "ble" or (device[0] in '0123456789abcdefABCDEF' and device.count(":") == 5)):
# device looks like mac address
self.serial = BleNusToSerial(mac_addr=(device if device != "ble" else None), read_timeout=10)
else:
import serial
delayed = False
Expand Down Expand Up @@ -191,22 +301,30 @@ def enter_raw_repl(self):
n = self.serial.inWaiting()

self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
print(data)
raise PyboardError('could not enter raw repl')
# Do not soft reset if ble, if needed add microcode to be executed
if not isinstance(self.serial, BleNusToSerial):
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
print(data)
raise PyboardError('could not enter raw repl')

self.serial.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(1, b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
# By splitting this into 2 reads, it allows boot.py to print stuff,
# which will show up after the soft reboot and before the raw REPL.
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
else:
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')

self.serial.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(1, b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
# By splitting this into 2 reads, it allows boot.py to print stuff,
# which will show up after the soft reboot and before the raw REPL.
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')

def exit_raw_repl(self):
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
Expand Down