Skip to content

Commit

Permalink
add try-finally to contextmanagers
Browse files Browse the repository at this point in the history
Signed-off-by: Jonathan Rico <[email protected]>
  • Loading branch information
jori-nordic committed Aug 15, 2022
1 parent 141ac67 commit 7e95a79
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 40 deletions.
24 changes: 13 additions & 11 deletions targettest/targettest/devkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
@contextmanager
def SeggerEmulator(family='UNKNOWN', id=None, core=None):
"""Instantiate the pynrfjprog API and optionally connect to a device."""
api = LowLevel.API(family)
api.open()
if id is not None:
api.connect_to_emu_with_snr(id)
try:
api = LowLevel.API(family)
api.open()
if id is not None:
api.connect_to_emu_with_snr(id)

if core is not None:
cpu = coproc[core]
api.select_coprocessor(cpu)
if core is not None:
cpu = coproc[core]
api.select_coprocessor(cpu)

yield api
yield api

if id is not None:
api.disconnect_from_emu()
api.close()
finally:
if id is not None:
api.disconnect_from_emu()
api.close()

coproc = {'APP': Parameters.CoProcessor.CP_APPLICATION,
'NET': Parameters.CoProcessor.CP_NETWORK}
Expand Down
58 changes: 29 additions & 29 deletions targettest/targettest/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,35 +67,35 @@ def FlashedDevice(request, family='NRF53', id=None, board='nrf5340dk_nrf5340_cpu

@contextmanager
def RPCDevice(device: Devkit, group='nrf_pytest'):
# Manage RPC transport
channel = UARTRPCChannel(port=device.port, group_name=group)
# Start receiving bytes
device.reset()
device.start_logging()

channel.start()
print('Wait for RPC ready')
# Wait until we have received the handshake/init packet
end_time = time.monotonic() + 5
while not channel.ready:
time.sleep(.1)
if time.monotonic() > end_time:
channel.close()
device.stop_logging()
raise Exception('Unresponsive device')

# Wait for the READY event (sent from main)
# This is a user-defined event, it's not part of the nrf-rpc init sequence.
event = channel.get_evt()
assert event.opcode == 0x01
print(f'[{device.port}] channel ready')

yield channel
print(f'[{device.port}] closing channel')

channel.close()
device.stop_logging()
device.halt()
try:
# Manage RPC transport
channel = UARTRPCChannel(port=device.port, group_name=group)
# Start receiving bytes
device.reset()
device.start_logging()

channel.start()
print('Wait for RPC ready')
# Wait until we have received the handshake/init packet
end_time = time.monotonic() + 5
while not channel.ready:
time.sleep(.1)
if time.monotonic() > end_time:
raise Exception('Unresponsive device')

# Wait for the READY event (sent from main)
# This is a user-defined event, it's not part of the nrf-rpc init sequence.
event = channel.get_evt()
assert event.opcode == 0x01
print(f'[{device.port}] channel ready')

yield channel

finally:
print(f'[{device.port}] closing channel')
channel.close()
device.stop_logging()
device.halt()


class TestDevice():
Expand Down

0 comments on commit 7e95a79

Please sign in to comment.