Skip to content

Commit

Permalink
Move decorator inside EmulatorSimulator.
Browse files Browse the repository at this point in the history
This removes one lint warning due to protected access, but it also simplifies
the code because we don't need to keep a separate `self._grpc_port` attribute.
This will also help in the migration to strongly-typed config classes.

PiperOrigin-RevId: 598649852
  • Loading branch information
kenjitoyama authored and copybara-github committed Jan 15, 2024
1 parent c42d1f5 commit e410766
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 27 deletions.
57 changes: 34 additions & 23 deletions android_env/components/simulators/emulator/emulator_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,6 @@ def _pick_emulator_grpc_port() -> int:
return portpicker.pick_unused_port()


def _reconnect_on_grpc_error(func):
"""Decorator function for reconnecting to emulator upon grpc errors."""

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs) # pytype: disable=missing-parameter # always-use-return-annotations
except grpc.RpcError:
logging.exception('RpcError caught. Reconnecting to emulator...')
emu = args[0] # The first arg of the function is "self"
emu._emulator_stub, emu._snapshot_stub = emu._connect_to_emulator( # pylint: disable=protected-access
emu._grpc_port # pylint: disable=protected-access
)
return func(*args, **kwargs) # pytype: disable=missing-parameter # always-use-return-annotations

return wrapper


class EmulatorBootError(errors.SimulatorError):
"""Raised when an emulator failed to boot."""

Expand Down Expand Up @@ -139,7 +122,6 @@ def __init__(
# we assume the emulator already exists and there's no need to launch.
if is_existing_emulator_provided(self._emulator_launcher_args):
self._existing_emulator_provided = True
self._grpc_port = self._emulator_launcher_args['grpc_port']
logging.info('Connecting to existing emulator "%r"',
self.adb_device_name())
else:
Expand All @@ -148,7 +130,7 @@ def __init__(
self._emulator_launcher_args['emulator_console_port'] = (
portpicker.pick_unused_port()
)
self._grpc_port = _pick_emulator_grpc_port()
self._emulator_launcher_args['grpc_port'] = _pick_emulator_grpc_port()

self._channel = None
self._emulator_stub: emulator_controller_pb2_grpc.EmulatorControllerStub | None = (
Expand Down Expand Up @@ -189,14 +171,29 @@ def __init__(
self._emulator_launcher_args.update({
'adb_path': self._adb_controller_config.adb_path,
'adb_server_port': self._adb_controller_config.adb_server_port,
'grpc_port': self._grpc_port,
'grpc_port': self._emulator_launcher_args['grpc_port'],
'tmp_dir': tmp_dir,
})
logging.info('emulator_launcher_args: %r', self._emulator_launcher_args)
self._launcher = emulator_launcher.EmulatorLauncher(
**self._emulator_launcher_args)
self._logfile_path = logfile_path or self._launcher.logfile_path()

def _reconnect_on_grpc_error(func):
"""Decorator function for reconnecting to emulator upon grpc errors."""

def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs) # pytype: disable=missing-parameter # always-use-return-annotations
except grpc.RpcError:
logging.exception('RpcError caught. Reconnecting to emulator...')
self._emulator_stub, self._snapshot_stub = self._connect_to_emulator(
self._emulator_launcher_args['grpc_port']
)
return func(self, *args, **kwargs) # pytype: disable=missing-parameter # always-use-return-annotations

return wrapper

def get_logs(self) -> str:
"""Returns logs recorded by the emulator."""
if self._logfile_path and os.path.exists(self._logfile_path):
Expand Down Expand Up @@ -246,7 +243,7 @@ def _launch_impl(self) -> None:
self._launcher.launch_emulator_process()
# Establish grpc connection to emulator process.
self._emulator_stub, self._snapshot_stub = self._connect_to_emulator(
self._grpc_port
self._emulator_launcher_args['grpc_port']
)

# Confirm booted status.
Expand Down Expand Up @@ -372,6 +369,9 @@ def _connect_to_emulator(
def _confirm_booted(self, startup_wait_time_sec: int = 300):
"""Waits until the emulator is fully booted."""

assert (
self._emulator_stub is not None
), 'Emulator stub has not been initialized yet.'
start_time = time.time()
deadline = start_time + startup_wait_time_sec
success = False
Expand Down Expand Up @@ -409,7 +409,9 @@ def send_touch(self, touches: list[tuple[int, int, bool, int]]) -> None:
3 identifier: Identifies a particular finger in a multitouch event.
"""

assert self._emulator_stub, 'Emulator stub has not been initialized yet.'
assert (
self._emulator_stub is not None
), 'Emulator stub has not been initialized yet.'
touch_events = [
emulator_controller_pb2.Touch(
x=t[0], y=t[1], pressure=int(t[2]), identifier=t[3])
Expand All @@ -427,11 +429,15 @@ def send_key(self, keycode: np.int32, event_type: str) -> None:
See the emulator_controller_pb2 for details.
event_type: Type of key event to be sent.
"""

event_types = emulator_controller_pb2.KeyboardEvent.KeyEventType.keys()
if event_type not in event_types:
raise ValueError(
f'Event type must be one of {event_types} but is {event_type}.')

assert (
self._emulator_stub is not None
), 'Emulator stub has not been initialized yet.'
self._emulator_stub.sendKey(
emulator_controller_pb2.KeyboardEvent(
codeType=emulator_controller_pb2.KeyboardEvent.KeyCodeType.XKB,
Expand All @@ -445,7 +451,10 @@ def send_key(self, keycode: np.int32, event_type: str) -> None:
@_reconnect_on_grpc_error
def get_screenshot(self) -> np.ndarray:
"""Fetches the latest screenshot from the emulator."""
assert self._emulator_stub, 'Emulator stub has not been initialized yet.'

assert (
self._emulator_stub is not None
), 'Emulator stub has not been initialized yet.'
assert self._image_format, 'ImageFormat has not been initialized yet.'
image_proto = self._emulator_stub.getScreenshot(self._image_format)
h, w = image_proto.format.height, image_proto.format.width
Expand All @@ -461,6 +470,8 @@ def _shutdown_emulator(self):
logging.info('Emulator (%r) is not up.', self.adb_device_name())
return

assert self._launcher is not None, 'Launcher is already down.'

logging.info('Shutting down the emulator (%r)...', self.adb_device_name())
self._emulator_stub.setVmState(
emulator_controller_pb2.VmRunState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,31 @@ def test_logfile_path(self, mock_open, unused_mock_exists):

@mock.patch.object(portpicker, 'is_port_free', return_value=True)
def test_grpc_port(self, unused_mock_portpicker):

launcher_args = {}
simulator = emulator_simulator.EmulatorSimulator(
tmp_dir=absltest.get_default_test_tmpdir(),
emulator_launcher_args={},
emulator_launcher_args=launcher_args,
adb_controller_config=config_classes.AdbControllerConfig(
adb_path='/my/adb',
adb_server_port=5037,
),
)
self.assertEqual(simulator._grpc_port, 8554)
self.assertEqual(launcher_args['grpc_port'], 8554)

@mock.patch.object(portpicker, 'is_port_free', return_value=False)
def test_grpc_port_unavailable(self, unused_mock_portpicker):

launcher_args = {}
simulator = emulator_simulator.EmulatorSimulator(
tmp_dir=absltest.get_default_test_tmpdir(),
emulator_launcher_args={},
emulator_launcher_args=launcher_args,
adb_controller_config=config_classes.AdbControllerConfig(
adb_path='/my/adb',
adb_server_port=5037,
),
)
self.assertNotEqual(simulator._grpc_port, 8554)
self.assertNotEqual(launcher_args['grpc_port'], 8554)

def test_launch_operation_order(self):
"""Makes sure that adb_controller is started before Emulator is launched."""
Expand Down

0 comments on commit e410766

Please sign in to comment.