Skip to content
This repository has been archived by the owner on Jan 10, 2023. It is now read-only.

Workaroud for UnicodeDecodeError #171

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions adb/adb_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

import io
import os
import socket
import posixpath
import socket

from adb import adb_protocol
from adb import common
Expand Down Expand Up @@ -371,9 +371,12 @@ def Shell(self, command, timeout_ms=None):
command: Shell command to run
timeout_ms: Maximum time to allow the command to run.
"""
return self.protocol_handler.Command(
if not isinstance(command, bytes):
command = command.encode('utf-8')
data = self.protocol_handler.BytesCommand(
self._handle, service=b'shell', command=command,
timeout_ms=timeout_ms)
return data.decode('utf-8')

def StreamingShell(self, command, timeout_ms=None):
"""Run command on the device, yielding each line of output.
Expand All @@ -389,6 +392,20 @@ def StreamingShell(self, command, timeout_ms=None):
self._handle, service=b'shell', command=command,
timeout_ms=timeout_ms)

def BytesStreamingShell(self, command, timeout_ms=None):
"""Run command on the device, yielding each line of output.

Args:
command: Command to run on the target.
timeout_ms: Maximum time to allow the command to run.

Yields:
The responses from the shell command.
"""
return self.protocol_handler.BytesStreamingCommand(
self._handle, service=b'shell', command=command,
timeout_ms=timeout_ms)

def Logcat(self, options, timeout_ms=None):
"""Run 'shell logcat' and stream the output to stdout.

Expand All @@ -398,6 +415,17 @@ def Logcat(self, options, timeout_ms=None):
"""
return self.StreamingShell('logcat %s' % options, timeout_ms)

def NonStreamingLogcat(self, options, timeout_ms=None):
"""Run 'shell logcat' and stream the output to stdout.

Args:
options: Arguments to pass to 'logcat'.
timeout_ms: Maximum time to allow the command to run.
"""
command = ('logcat %s' % options).encode('utf-8')
data = b''.join(self.BytesStreamingShell(command, timeout_ms))
return data.decode('utf-8')

def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True):
"""Get stdout from the currently open interactive shell and optionally run a command
on the device, returning all output.
Expand Down
56 changes: 55 additions & 1 deletion adb/adb_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import struct
import time
import warnings
from io import BytesIO

from adb import usb_exceptions

# Maximum amount of data in an ADB packet.
Expand Down Expand Up @@ -431,13 +433,65 @@ def StreamingCommand(cls, usb, service, command='', timeout_ms=None):
Yields:
The responses from the service.
"""
warnings.warn(DeprecationWarning("This method may fail with utf-8 multi-bytes sequences"))
if not isinstance(command, bytes):
command = command.encode('utf8')
connection = cls.Open(
usb, destination=b'%s:%s' % (service, command),
timeout_ms=timeout_ms)
for data in connection.ReadUntilClose():
yield data.decode('utf8')
# TODO: find a fix for this code without changing public API
yield data.decode('utf-8')

@classmethod
def BytesCommand(cls, usb, service, command=b'', timeout_ms=None):
"""One complete set of USB packets for a single command.

Sends service:command in a new connection, reading the data for the
response. All the data is held in memory, large responses will be slow and
can fill up memory.

Args:
usb: USB device handle with BulkRead and BulkWrite methods.
service: The service on the device to talk to.
command: The command to send to the service.
timeout_ms: Timeout for USB packets, in milliseconds.

Raises:
InterleavedDataError: Multiple streams running over usb.
InvalidCommandError: Got an unexpected response command.

Returns:
The response from the service.
"""
return b''.join(cls.BytesStreamingCommand(usb, service, command, timeout_ms))

@classmethod
def BytesStreamingCommand(cls, usb, service, command=b'', timeout_ms=None):
"""One complete set of USB packets for a single command.

Sends service:command in a new connection, reading the data for the
response. All the data is held in memory, large responses will be slow and
can fill up memory.

Args:
usb: USB device handle with BulkRead and BulkWrite methods.
service: The service on the device to talk to.
command: The command to send to the service.
timeout_ms: Timeout for USB packets, in milliseconds.

Raises:
InterleavedDataError: Multiple streams running over usb.
InvalidCommandError: Got an unexpected response command.

Returns:
The response from the service.
"""
connection = cls.Open(
usb, destination=b'%s:%s' % (service, command),
timeout_ms=timeout_ms)
for data in connection.ReadUntilClose():
yield data

@classmethod
def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, strip_delim=True, clean_stdout=True):
Expand Down
74 changes: 66 additions & 8 deletions test/adb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@
# limitations under the License.
"""Tests for adb."""

from io import BytesIO
import struct
import unittest
from mock import mock
from io import BytesIO
from unittest import skip

import common_stub
from mock import mock

from adb import common
from adb import adb_commands
from adb import adb_protocol
from adb.usb_exceptions import TcpTimeoutException, DeviceNotFoundError
import common_stub

from adb import common
from adb.usb_exceptions import TcpTimeoutException

BANNER = b'blazetest'
LOCAL_ID = 1
Expand Down Expand Up @@ -140,7 +140,7 @@ def testUninstall(self):

def testStreamingResponseShell(self):
command = b'keepin it real big'
# expect multiple lines
# expect multiple bulks

responses = ['other stuff, ', 'and some words.']

Expand All @@ -149,11 +149,69 @@ def testStreamingResponseShell(self):
dev = adb_commands.AdbCommands()
dev.ConnectDevice(handle=usb, banner=BANNER)
response_count = 0
for (expected,actual) in zip(responses, dev.StreamingShell(command)):
for (expected, actual) in zip(responses, dev.StreamingShell(command)):
self.assertEqual(expected, actual)
response_count = response_count + 1
self.assertEqual(len(responses), response_count)

@skip('Need fixing')
def testStreamingResponseShellWithMultiBytesSequences(self):
command = b'keepin it real big'
# expect multiple bulks

responses = [b'\xe2', b'\x81\x82'] # utf-8 encoded split Hiragana A (U+3042)

usb = self._ExpectCommand(b'shell', command, *responses)

dev = adb_commands.AdbCommands()
dev.ConnectDevice(handle=usb, banner=BANNER)

dev.StreamingShell(command)

def testShellWithMultiBytesSequences(self):
command = b'keepin it real big'
# expect multiple bulks

responses = [b'\xe3', b'\x81\x82'] # utf-8 encoded split Hiragana A (U+3042)

usb = self._ExpectCommand(b'shell', command, *responses)

dev = adb_commands.AdbCommands()
dev.ConnectDevice(handle=usb, banner=BANNER)

res = dev.Shell(command)
self.assertEqual(u'\u3042', res)

def testBytesStreamingResponseShell(self):
command = b'keepin it real big'
# expect multiple bulks

responses = [b'other stuff, ', b'and some words.']

usb = self._ExpectCommand(b'shell', command, *responses)

dev = adb_commands.AdbCommands()
dev.ConnectDevice(handle=usb, banner=BANNER)
response_count = 0
for (expected, actual) in zip(responses, dev.BytesStreamingShell(command)):
self.assertEqual(expected, actual)
response_count = response_count + 1
self.assertEqual(len(responses), response_count)

def testNonStreamingLogcatWithMultiBytesSequences(self):
command = b'logcat test\xe3\x81\x82'
# expect multiple bulks

responses = [b'\xe3', b'\x81\x82'] # utf-8 encoded split Hiragana A (U+3042)

usb = self._ExpectCommand(b'shell', command, *responses)

dev = adb_commands.AdbCommands()
dev.ConnectDevice(handle=usb, banner=BANNER)

res = dev.NonStreamingLogcat(u'test\u3042')
self.assertEqual(u'\u3042', res)

def testReboot(self):
usb = self._ExpectCommand(b'reboot', b'', b'')
dev = adb_commands.AdbCommands()
Expand Down