Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added buffered reading to tokenizer #46

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions src/json_stream/httpx/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json_stream
from json_stream.select_tokenizer import default_tokenizer


CONTENT_CHUNK_SIZE = 10 * 1024
Expand All @@ -9,9 +8,21 @@ def _to_iterable(response, chunk_size):
return response.iter_bytes(chunk_size=chunk_size)


def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)
def load(response, persistent=False, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, buffering=0, **kwargs):
return json_stream.load(
_to_iterable(response, chunk_size),
persistent=persistent,
tokenizer=tokenizer,
buffering=buffering,
**kwargs
)


def visit(response, visitor, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, buffering=0, **kwargs):
return json_stream.visit(
_to_iterable(response, chunk_size),
visitor,
tokenizer=tokenizer,
buffering=buffering,
**kwargs
)
8 changes: 3 additions & 5 deletions src/json_stream/loader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from json_stream.base import StreamingJSONBase, TokenType
from json_stream.iterators import ensure_file
from json_stream.select_tokenizer import default_tokenizer
from json_stream.select_tokenizer import get_token_stream


def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
fp = ensure_file(fp_or_iterable)
token_stream = tokenizer(fp)
def load(fp_or_iterable, persistent=False, tokenizer=None, buffering=-1, **kwargs):
token_stream = get_token_stream(fp_or_iterable, tokenizer=tokenizer, buffering=buffering, **kwargs)
token_type, token = next(token_stream)
if token_type == TokenType.OPERATOR:
return StreamingJSONBase.factory(token, token_stream, persistent)
Expand Down
27 changes: 20 additions & 7 deletions src/json_stream/requests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json_stream
from json_stream.select_tokenizer import default_tokenizer


CONTENT_CHUNK_SIZE = 10 * 1024
Expand All @@ -9,9 +8,23 @@ def _to_iterable(response, chunk_size):
return response.iter_content(chunk_size=chunk_size)


def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)
def load(response, persistent=False, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE,
buffering=0, **kwargs):
return json_stream.load(
_to_iterable(response, chunk_size),
persistent=persistent,
tokenizer=tokenizer,
buffering=buffering,
**kwargs
)


def visit(response, visitor, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE,
buffering=0, **kwargs):
return json_stream.visit(
_to_iterable(response, chunk_size),
visitor,
tokenizer=tokenizer,
buffering=buffering,
**kwargs
)
20 changes: 14 additions & 6 deletions src/json_stream/select_tokenizer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from warnings import warn

from json_stream.iterators import ensure_file
from json_stream.tokenizer import tokenize
from json_stream_rs_tokenizer import rust_tokenizer_or_raise, ExtensionException

try:
default_tokenizer = rust_tokenizer_or_raise()
except ExtensionException as e:
warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default
default_tokenizer = tokenize

__all__ = ['default_tokenizer']
def get_tokenizer(**kwargs):
try:
return rust_tokenizer_or_raise(**kwargs)
except ExtensionException as e:
warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default
return tokenize


def get_token_stream(fp_or_iterable, tokenizer, **tokenizer_kwargs):
fp = ensure_file(fp_or_iterable)
if tokenizer is None:
tokenizer = get_tokenizer(**tokenizer_kwargs)
return tokenizer(fp, **tokenizer_kwargs)
5 changes: 2 additions & 3 deletions src/json_stream/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
from itertools import zip_longest
from unittest import TestCase

from json_stream.select_tokenizer import default_tokenizer

from json_stream import load
from json_stream.base import TransientAccessException


class JSONLoadTestCase(TestCase):
def _test_object(self, obj, persistent, binary=False, tokenizer=default_tokenizer):
def _test_object(self, obj, persistent, binary=False, tokenizer=None):
self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer)), list(obj))
self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer).keys()), list(obj.keys()))
self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer).values()), list(obj.values()))
Expand Down Expand Up @@ -40,7 +39,7 @@ def _test_object(self, obj, persistent, binary=False, tokenizer=default_tokenize
with self.assertRaises(TransientAccessException):
data.items() # can't get keys

def _test_list(self, obj, persistent, binary=False, tokenizer=default_tokenizer):
def _test_list(self, obj, persistent, binary=False, tokenizer=None):
self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer)), list(obj))
if persistent:
self.assertEqual(len(self._to_data(obj, persistent, binary, tokenizer)), len(obj))
Expand Down
2 changes: 1 addition & 1 deletion src/json_stream/tests/test_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def data_in_chunks(data, chunk_size=15):
yield part

json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}'
stream = json_stream.load(data_in_chunks(json_string))
stream = json_stream.load(data_in_chunks(json_string), buffering=0)

for task in stream["tasks"]:
happenings.append(('item', to_standard_types(task)))
Expand Down
6 changes: 3 additions & 3 deletions src/json_stream/tests/test_tokenizer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@

from json_stream import load

from json_stream.select_tokenizer import default_tokenizer
from json_stream.select_tokenizer import get_tokenizer

from json_stream.tests import JSONLoadTestCase


@skipUnless(hasattr(json_stream_rs_tokenizer, 'RustTokenizer'), 'rust tokenizer not available')
class TestRSTokenizer(JSONLoadTestCase):
def test_load_object(self):
self.assertIs(default_tokenizer, json_stream_rs_tokenizer.RustTokenizer)
self.assertIs(get_tokenizer(), json_stream_rs_tokenizer.RustTokenizer)
obj = {"a": 1, "b": None, "c": True}
self._test_object(obj, persistent=False)

def test_load_object_binary(self):
self.assertIs(default_tokenizer, json_stream_rs_tokenizer.RustTokenizer)
self.assertIs(get_tokenizer(), json_stream_rs_tokenizer.RustTokenizer)
obj = {"a": 1, "b": None, "c": True}
self._test_object(obj, persistent=False, binary=True)

Expand Down
21 changes: 14 additions & 7 deletions src/json_stream/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _ensure_text(stream):
return stream


def tokenize(stream):
def tokenize(stream, *, buffering=-1, **_):
stream = _ensure_text(stream)

def is_delimiter(char):
Expand Down Expand Up @@ -365,9 +365,19 @@ def process_char(char):

return advance, next_state
state = State.WHITESPACE
c = stream.read(1)
index = 0
while c:
if not buffering:
buffering = 1
elif buffering <= 0:
buffering = io.DEFAULT_BUFFER_SIZE
buffering = buffering.__index__()
buffer = stream.read(buffering)
c = None
index = -1
advance = True
while buffer or not advance:
if advance:
c, buffer = buffer[0], buffer[1:] or stream.read(buffering)
index += 1
try:
advance, state = process_char(c)
except ValueError as e:
Expand All @@ -376,9 +386,6 @@ def process_char(char):
completed = False
token = []
yield now_token
if advance:
c = stream.read(1)
index += 1
process_char(SpecialChar.EOF)
if completed:
yield now_token
8 changes: 3 additions & 5 deletions src/json_stream/visitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from json_stream.base import StreamingJSONObject, StreamingJSONList, StreamingJSONBase
from json_stream.iterators import ensure_file
from json_stream.select_tokenizer import default_tokenizer
from json_stream.select_tokenizer import get_token_stream


def _visit(obj, visitor, path):
Expand All @@ -19,9 +18,8 @@ def _visit(obj, visitor, path):
visitor(obj, path)


def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer):
fp = ensure_file(fp_or_iterator)
token_stream = tokenizer(fp)
def visit(fp_or_iterable, visitor, tokenizer=None, buffering=-1, **kwargs):
token_stream = get_token_stream(fp_or_iterable, tokenizer=tokenizer, buffering=buffering, **kwargs)
_, token = next(token_stream)
obj = StreamingJSONBase.factory(token, token_stream, persistent=False)
_visit(obj, visitor, ())