Skip to content

DONT MERGE YET #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions lib/src/codecs/combiner.dart
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
import 'dart:convert';

/// A [Converter] that encodes multiple objects to a stream.
///
/// This is the counterpart to [Splitter] and like that class and its [split]
/// method, the crucial piece here is the [encode] method, and everything
/// else is boilerplate.
abstract class Combiner<S, T> extends Converter<S, T> {
/// Subclasses override to transform an object before it is added to the
/// output stream.
T encode(S input);
/// A [Converter] which when used to transform a Stream will transform each item with convert.
abstract class StreamMappingConverter<S, T> extends Converter<S, T> {
StreamMappingConverter();

factory StreamMappingConverter.from(T f(S s)) {
return StreamFnMappingConverter(f);
}

@override
convert(input) => encode(input);
startChunkedConversion(sink) => _MappingSink(sink, convert);
}

class StreamFnMappingConverter<S, T> extends StreamMappingConverter<S, T> {
final T Function(S) _f;

StreamFnMappingConverter(this._f);

@override
startChunkedConversion(sink) => _CombinerSink(sink, encode);
convert(x) => _f(x);
}

class _CombinerSink<S, T> extends ChunkedConversionSink<S> {
class _MappingSink<S, T> implements Sink<S> {
final Sink<T> _sink;
final T Function(S chunk) _serialize;
final T Function(S) _f;

_CombinerSink(this._sink, this._serialize);
_MappingSink(this._sink, this._f);

@override
void add(chunk) {
_sink.add(_serialize(chunk));
void add(x) {
_sink.add(_f(x));
}

@override
Expand Down
143 changes: 88 additions & 55 deletions lib/src/codecs/json.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import 'dart:async';
import 'dart:convert';

import 'combiner.dart';
import 'splitter.dart';

/// A [Converter] that splits a [String] into separate JSON forms, emitting a
/// parsed JSON object for each form.
Expand All @@ -22,81 +21,115 @@ import 'splitter.dart';
/// Likewise `[1,2}` is considered a valid form. Of course neither of those
/// examples, nor any other malformed JSON, will survive the call to
/// [jsonDecode()].
class JsonRepeatDecoder extends Splitter<String, dynamic> {

class JsonFramingDecoder extends Converter<String, String> {
final bool _strict;
final StringBuffer _buffer = StringBuffer();
JsonFramingDecoder(this._strict);

@override
String convert(String input) {
return input;
}

@override
Sink<String> startChunkedConversion(Sink<String> sink) {
return _JsonFramingSink(sink, _strict);
}
}

class JsonRepeatDecoder extends Converter<String, dynamic> {
final Converter<String, dynamic> _converter;

JsonRepeatDecoder({bool strict = false})
: _converter = JsonFramingDecoder(strict)
.fuse(StreamMappingConverter.from(JsonDecoder().convert));

@override
dynamic convert(String input) => _converter.convert(input);

JsonRepeatDecoder({bool strict = false}) : _strict = strict;
@override
Sink<String> startChunkedConversion(Sink<dynamic> sink) {
return _converter.startChunkedConversion(sink);
}
}

class _JsonFramingSink implements Sink<String> {
static const int _doubleQuote = 34;
static const int _leftBrace = 123;
static const int _rightBrace = 125;
static const int _leftBracket = 91;
static const int _rightBracket = 93;
static const int _backslash = 92;

@override
Stream split(stream) async* {
var skipEscape = false;
var quoted = false;
var stackDepth = 0;
final bool _strict;
final StringBuffer _buffer = StringBuffer();
var _skipEscape = false;
var _quoted = false;
var _stackDepth = 0;
final Sink<String> _sink;

_JsonFramingSink(this._sink, this._strict);

await for (final s in stream) {
var sliceStart = 0;
for (var i = 0; i < s.length; i++) {
var char = s.codeUnitAt(i);
// If prev char was escape, then skip this one.
if (skipEscape) {
skipEscape = false;
@override
void add(String s) {
var sliceStart = 0;
for (var i = 0; i < s.length; i++) {
var char = s.codeUnitAt(i);
// If prev char was escape, then skip this one.
if (_skipEscape) {
_skipEscape = false;
continue;
}
// Check for interesting chars.
switch (char) {
case _doubleQuote:
_quoted = !_quoted;
continue;
}
// Check for interesting chars.
switch (char) {
case _doubleQuote:
quoted = !quoted;
continue;
case _leftBrace:
case _leftBracket:
if (!quoted && 0 == stackDepth++ && !_strict) {
sliceStart = i;
_buffer.clear();
}
continue;
case _rightBrace:
case _rightBracket:
if (quoted) {
continue;
} else if (--stackDepth > 0) {
continue;
} else {
break;
}
case _backslash:
skipEscape = true;
case _leftBrace:
case _leftBracket:
if (!_quoted && 0 == _stackDepth++ && !_strict) {
// when lenient (not strict), discard all characters before the top-level dict/array
sliceStart = i;
_buffer.clear();
}
continue;
case _rightBrace:
case _rightBracket:
if (_quoted) {
continue;
default:
} else if (--_stackDepth > 0) {
continue;
}
// Reached the end of a JSON form.
var slice = s.substring(sliceStart, i + 1);
_buffer.write(slice);
yield jsonDecode(_buffer.toString());
_buffer.clear();
sliceStart = i + 1;
}
// If didn't exhaust the full string, hold the tail in the buffer.
if (sliceStart < s.length) {
_buffer.write(s.substring(sliceStart));
} else {
break;
}
case _backslash:
_skipEscape = true;
continue;
default:
continue;
}
// Reached the end of a JSON form.
var slice = s.substring(sliceStart, i + 1);
_buffer.write(slice);
sliceStart = i + 1;
String read = _buffer.toString();
_buffer.clear();
_sink.add(read);
}
_buffer.write(s.substring(sliceStart));
}

@override
void close() {
_buffer.clear();
_sink.close();
}
}

/// A [Converter] that encodes multiple JSON objects to strings.
///
/// The Dart-provided [JsonEncoder] closes its underlying stream after encoding
/// JSON object, which is annoying. This converter stays open for business.
class JsonRepeatEncoder extends Combiner<dynamic, String> {
@override
encode(input) => jsonEncode(input);
class JsonRepeatEncoder extends StreamFnMappingConverter<dynamic, String> {
JsonRepeatEncoder() : super(JsonEncoder().convert);
}
Loading