Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental bytes support #15

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/ohio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""I/O extras"""
from .baseio import (IOClosed, StreamTextIOBase)
from .iterio import IteratorTextIO
from .baseio import (IOClosed, StreamTextIOBase, StreamBufferedIOBase)
from .iterio import IteratorTextIO, IteratorBufferedIO
from .csvio import (csv_text, CsvWriterTextIO, CsvDictWriterTextIO)
from .pipeio import PipeTextIO, pipe_text

Expand Down
55 changes: 47 additions & 8 deletions src/ohio/baseio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ def __init__(self, *args):
super().__init__(*args)


class StreamTextIOBase(io.TextIOBase):
class StreamIOBase(object):
"""Readable file-like abstract base class.

Concrete classes may implemented method `__next_chunk__` to return
chunks (or all) of the text to be read.

"""
def __init__(self):
self._remainder = ''

def __next_chunk__(self):
raise NotImplementedError("StreamTextIOBase subclasses must implement __next_chunk__")
raise NotImplementedError("StreamIOBase subclasses must implement __next_chunk__")

def _get_empty_value(self):
raise NotImplementedError("StreamIOBase subclasses must implement _get_empty_value")

def readable(self):
if self.closed:
Expand All @@ -50,7 +50,7 @@ def read(self, size=None):
if size is not None and size < 0:
size = None

result = ''
result = self._get_empty_value()

while size is None or size > 0:
content = self._read1(size)
Expand All @@ -68,10 +68,11 @@ def readline(self):
if self.closed:
raise IOClosed()

result = ''
result = self._get_empty_value()
newline = self._get_newline()

while True:
index = self._remainder.find('\n')
index = self._remainder.find(newline)
if index == -1:
result += self._remainder
try:
Expand All @@ -85,3 +86,41 @@ def readline(self):
break

return result


class StreamTextIOBase(StreamIOBase, io.TextIOBase):
"""Readable file-like abstract base class for text.

Concrete classes may implemented method `__next_chunk__` to return
chunks (or all) of the text to be read.

"""

def __init__(self):
self._remainder = ''

@staticmethod
def _get_empty_value():
return ''

@staticmethod
def _get_newline():
return '\n'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't the interface be condensed?

class StreamIOBase(io.TextIOBase):

    def __init__(self):
        self._remainder = self.constructor()

    # (etc)

class StreamTextIOBase(StreamIOBase):

    constructor = str
    newline = '\n'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there would be utility in keeping it even more condensed than that however.

class StreamTextIOBase(io.TextIOBase):

    def __init__(self, buffer_cls=str):
        self._buffer_cls = buffer_cls

        if self._buffer_cls is str:
            self._newline = '\n'
        elif self._buffer_cls is bytes:
            self._newline = b'\n'
        else:
            raise TypeError("StreamTextIOBase supports buffers of type 'str' or 'bytes' not: %r" % self._buffer_cls)

        self._remainder = self._buffer_cls()

    …

Not only would that be less boilerplate here, then also perhaps it would be simpler down the line:

class IteratorTextIO(baseio.StreamTextIOBase):

    def __init__(self, iterable, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.__iterator__ = iter(iterable)

…

And in your bytes iterator case, just: my_iterator = IteratorTextIO(my_iterable, bytes).



class StreamBufferedIOBase(StreamIOBase, io.BufferedIOBase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing it, but is there a reason for the change in base class? It seems like it could still inherit TextIOBase.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just how Python describes working with the different types of streams. https://docs.python.org/3/library/io.html#text-i-o

I didn't try and test TextIO as a base class, but I didn't see any reason to deviate from the docs if we're working with bytes here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I reviewed the docs too, just didn't think it mattered. But thanks, I see what you mean now.

I had been meaning to play around with stuff like that, namely giving it a base of say IOBase (rather than TextIOBase) – #3. Really, it's just a pass-through, so it doesn't seem like it should care either way; (and, that's why I like the lazy type detection version, discussed below).

"""Readable file-like abstract base class for bytes.

Concrete classes may implemented method `__next_chunk__` to return
chunks (or all) of the bytes to be read.
"""
def __init__(self):
self._remainder = b''

@staticmethod
def _get_empty_value():
return b''

@staticmethod
def _get_newline():
return b'\n'
11 changes: 9 additions & 2 deletions src/ohio/iterio.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from . import baseio


class IteratorTextIO(baseio.StreamTextIOBase):
"""Readable file-like interface for iterable text streams."""
class IteratorIO(baseio.StreamIOBase):
"""Readable file-like interface for iterable streams."""

def __init__(self, iterable):
super().__init__()
self.__iterator__ = iter(iterable)

def __next_chunk__(self):
return next(self.__iterator__)


class IteratorTextIO(IteratorIO, baseio.StreamTextIOBase):
pass

class IteratorBufferedIO(IteratorIO, baseio.StreamBufferedIOBase):
pass
5 changes: 5 additions & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ def ex_csv_stream():
yield '1/4/09 20:11,Product1,1200,Mastercard,Fleur\r\n'
yield '1/2/09 20:09,Product1,1200,Mastercard,adam\r\n'
yield '1/4/09 13:17,Product1,1200,Mastercard,Renee Elisabeth\r\n'


def ex_csv_bytestream():
for csvline in ex_csv_stream():
yield csvline.encode('utf-8')
97 changes: 96 additions & 1 deletion test/iterio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import ohio

from . import ex_csv_stream
from . import ex_csv_stream, ex_csv_bytestream


class TestIteratorTextIO:
Expand Down Expand Up @@ -101,3 +101,98 @@ def test_write_methods(self, buffer, method_name, method_args):

with pytest.raises(io.UnsupportedOperation):
method(*method_args)


class TestIteratorBufferedIO:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And since a lot of this is repetition, I would hope that condensing the base classes would mean we could get away with just a couple extra tests on the original class, to test just the added branches/cases.


@pytest.fixture
def csv_stream(self):
# only necessary to *spy* on iteration (calls to __next__)
iter_mock = unittest.mock.MagicMock(**{
'return_value.__next__.side_effect': ex_csv_bytestream(),
})
return unittest.mock.Mock(__iter__=iter_mock)

@pytest.fixture
def buffer(self, csv_stream):
return ohio.IteratorBufferedIO(csv_stream)

def test_context_manager(self, buffer):
assert not buffer.closed

with buffer as buffer1:
assert buffer is buffer1
assert not buffer.closed

assert buffer.closed

def test_readable(self, buffer):
assert buffer.readable()

def test_readable_closed(self, buffer):
buffer.close()

with pytest.raises(ohio.IOClosed):
buffer.readable()

def test_read(self, buffer):
all_content = b''.join(ex_csv_bytestream())
assert buffer.read() == all_content
assert buffer.__iterator__.__next__.call_count == 11

def test_read_closed(self, buffer):
buffer.close()

with pytest.raises(ohio.IOClosed):
buffer.read()

def test_read_parts(self, buffer):
for (iteration, size, chunk) in (
(1, 5, b'Trans'),
(1, 15, b'action_date,Pro'),
(2, 43, b'duct,Price,Payment_Type,Name\r\n1/2/09 6:17,P'),
):
assert buffer.read(size) == chunk
assert buffer.__iterator__.__next__.call_count == iteration

assert buffer.read(None)
assert buffer.__iterator__.__next__.call_count == 11

def test_readline(self, buffer):
for (count, line) in enumerate(ex_csv_bytestream(), 1):
assert buffer.readline() == line
assert buffer.__iterator__.__next__.call_count == count

def test_readline_closed(self, buffer):
buffer.close()

with pytest.raises(ohio.IOClosed):
buffer.readline()

def test_readlines(self, buffer):
assert buffer.readlines() == list(ex_csv_bytestream())
assert buffer.__iterator__.__next__.call_count == 11

def test_iter(self, buffer):
for (count, (buffer_line, example_line)) in enumerate(zip(buffer, ex_csv_bytestream()), 1):
assert buffer_line == example_line
assert buffer.__iterator__.__next__.call_count == count

def test_not_seekable(self, buffer):
assert not buffer.seekable()

def test_not_writable(self, buffer):
assert not buffer.writable()

@pytest.mark.parametrize('method_name,method_args', (
('seek', ()),
('tell', ()),
('truncate', ()),
('write', ()),
('writelines', (['hi\n'],)),
))
def test_write_methods(self, buffer, method_name, method_args):
method = getattr(buffer, method_name)

with pytest.raises(io.UnsupportedOperation):
method(*method_args)