-
-
Notifications
You must be signed in to change notification settings - Fork 47
Home
- Release history — what was added and when.
- Recipes — learn tricky parts of streams.
stream-json
is a micro-library, which provides a set of light-weight stream components to process huge JSON files with a minimal memory footprint. It can:
- Parse JSON files far exceeding available memory.
- Even individual primitive data items (keys, strings, and numbers) can be streamed piece-wise.
- Processing humongous files can take minutes and even hours. Shaving even a microsecond from each operation can save a lot of time waiting for results. That's why all
stream-json
components were meticulously optimized.- See Performance for hints on speeding pipelines up.
- Stream using a SAX-inspired event-based API.
- Provide utilities to handle huge Django-like JSON database dumps.
- Support JSON Streaming protocol.
- Follows conventions of a no-dependency micro-library stream-chain.
It was meant to be a set of building blocks for data processing pipelines organized around JSON and JavaScript objects. Users can easily create their own "blocks" using provided facilities.
Companion projects:
-
stream-csv-as-json streams huge CSV files in a format compatible with
stream-json
: rows as arrays of string values. If a header row is used, it can stream rows as objects with named fields. - stream-chain is a micro helper to create data processing pipelines from streams, functions, generators, asynchronous functions, and present them as streams suitable to be building blocks for other pipelines.
- stream-join is a micro helper useful for merging side channels to connect different parts of a linear pipeline for more complex data processing scenarios.
This is an overview, which can be used as a cheat sheet. Click on individual components to see detailed API documentation with examples.
The main module returns a factory function, which produces instances of Parser decorated with emit().
The heart of the package is Parser — a streaming JSON parser, which consumes text and produces a stream of tokens. Both the standard JSON and JSON Streaming are supported.
const {parser} = require('stream-json');
const pipeline = fs.createReadStream('data.json').pipe(parser());
Sometimes we are presented with an incorrect JSON input. When it happens we can use a utility: Verifier, which provides an exact position of an error.
Filters can edit a stream of tokens on the fly. The following filters are provided:
-
Pick picks out a subobject for processing ignoring the rest.
{total: 10000000, data: [...]} // Pick can isolate 'data' and remove the outer object completely: [...]
const {pick} = require('stream-json/filters/Pick'); const picked = pipeline.pipe(pick({filter: 'data'}));
-
Pick
can be used to select one or more subobjects and streams them out as individual objects. - If
Pick
picks more than one subobject it is usually followed byStreamValues
described below.
-
-
Replace replaces subobjects with something else or even removes them completely. It is used to remove unnecessary details, e.g., for performance reasons.
[ {data: {...}, extra: {...}}, {data: {...}, extra: {...}}, ... ] // Replace can remove 'extra' or replace it with something else, // like null (the default): [{data: {...}, extra: null}, ...]
const {replace} = require('stream-json/filters/Replace'); const replaced = pipeline.pipe(replace({filter: /^\d+\.extra\b/}));
-
Ignore removes subobjects completely. It is a helper class based on
Replace
.[{data: {...}, extra: {...}}, ...] // Ignore can remove 'extra': [{data: {...}}, ...]
const {ignore} = require('stream-json/filters/Ignore'); const ignored = pipeline.pipe(ignore({filter: /^\d+\.extra\b/}));
-
Ignore removes subobjects completely. It is a helper class based on
-
Filter filters out subobjects preserving an original shape of incoming data.
{total: 10000000, data: [...]} // Filter can isolate 'data' preserving the original shape {data: [...]}
const {filter} = require('stream-json/filters/Filter'); const filtered = pipeline.pipe(filter({filter: /^data\b/}));
Filters are used after Parser
and can be chained to achieve a desirable effect.
In many cases working at a token level can be tedious. Frequently, while a source file is huge, individual data pieces are relatively small and can fit in memory. A typical example is a database dump.
Additionally, all streamers support efficient and flexible filtering of data items complimenting and sometimes eliminating token-level filters. See objectFilter
option in StreamBase for more details.
stream-json
provides the following streaming helpers:
-
StreamValues assumes that a token stream represents subsequent values and stream them out one by one. It usually happens in two major cases:
- We parse a JSON Streaming source:
1 "a" [] {} true // StreamValues will produce an object stream: {key: 0, value: 1} {key: 1, value: 'a'} {key: 2, value: []} {key: 3, value: {}} {key: 4, value: true}
const {streamValues} = require('stream-json/streamers/StreamValues'); const stream = pipeline.pipe(streamValues());
- We
Pick
several subobjects:[{..., value: ...}, {..., value: ...}, ...] // Pick can isolate 'value' and stream them separately: [...]
const stream = pipeline .pipe(pick({filter: /\bvalue\b/i})) .pipe(streamValues());
- We parse a JSON Streaming source:
-
StreamArray assumes that a token stream represents an array of objects and streams out assembled JavaScript objects. Only one array is valid as input.
[1, "a", [], {}, true] // StreamArray will produce an object stream: {key: 0, value: 1} {key: 1, value: 'a'} {key: 2, value: []} {key: 3, value: {}} {key: 4, value: true}
const {streamArray} = require('stream-json/streamers/StreamArray'); const stream = pipeline.pipe(streamArray());
-
StreamObject assumes that a token stream represents an object and streams out its top-level properties. Only one object is valid as input.
{"a": 1, "b": "a", "c": [], "d": {}, "e": true} // StreamObject will produce an object stream: {key: 'a', value: 1} {key: 'b', value: 'a'} {key: 'c', value: []} {key: 'd', value: {}} {key: 'e', value: true}
const {streamObject} = require('stream-json/streamers/StreamObject'); const stream = pipeline.pipe(streamObject());
Streamers are used after Parser
and optional filters. All of them support efficient filtering of objects while assembling: if it was determined that we have no interest in a certain object, it will be abandoned and skipped without spending any more time on it.
Classes and functions to make streaming data processing enjoyable:
-
Assembler receives a token stream and assembles JavaScript objects. It is used as a building block for streamers.
const {chain} = require('stream-chain'); const Asm = require('stream-json/Assembler'); const pipeline = chain([ fs.createReadStream('data.json.gz'), zlib.createGunzip(), parser() ]); const asm = Asm.connectTo(pipeline); asm.on('done', asm => console.log(asm.current));
-
Disassembler is a Transform stream. It receives a stream of JavaScript objects and converts them to a token stream. It is useful to edit objects using stream algorithms and with an alternative source of objects.
const {disassembler} = require('stream-json/Disassembler'); const pipeline = chain([ fs.createReadStream('array.json.gz'), zlib.createGunzip(), parser(), streamArray(), disassembler(), pick({filter: 'value'}), streamValues() ]);
-
Stringer is a Transform stream. It receives a token stream and converts it to text representing a JSON object. It is very useful when you want to edit a stream with filters and custom code, and save it back to a file.
const {stringer} = require('stream-json/Stringer'); chain([ fs.createReadStream('data.json.gz'), zlib.createGunzip(), parser(), pick({filter: 'data'}), stringer(), zlib.createGzip(), fs.createWriteStream('edited.json.gz') ]);
-
Emitter is a Writable stream. It consumes a token stream and emits tokens as events on itself.
const {emitter} = require('stream-json/Emitter'); const e = emitter(); chain([ fs.createReadStream('data.json'), parser(), e ]); let counter = 0; e.on('startObject', () => ++counter); e.on('finish', () => console.log(counter, 'objects'));
The following functions are included:
-
emit() listens to a token stream and emits tokens as events on that stream. This is a light-weight version of
Emitter
.When the main module is requested, it returns a function, which creates aconst emit = require('stream-json/utils/emit'); const pipeline = chain([ fs.createReadStream('data.json'), parser() ]); emit(pipeline); let counter = 0; pipeline.on('startObject', () => ++counter); pipeline.on('finish', () => console.log(counter, 'objects'));
Parser
instance, then it appliesemit()
so the user can use this simple API for immediate processing. -
withParser() creates an instance of
Parser
, creates an instance of a data stream with a provided function, connects them, and returns as a chain.Each stream provided byconst withParser = require('stream-json/utils/withParser'); const pipeline = withParser(pick, {filter: 'data'});
stream-json
implementswithParser(options)
as a static method:const StreamArray = require('stream-json/streamers/StreamArray'); const pipeline = StreamArray.withParser();
- Batch accepts items and packs them into an array of a configurable size. It can be used as a performance optimization helper.
- Verifier reads a text input and either completes successfully or fails with an error. This error points to an offset, line, and position of an error. It is a Writable stream.
-
Utf8Stream reads buffers of text, which can cut off in the middle of a multibyte
utf8
symbol. It sends downstream correctly sanitized text with all symbols properly decoded. It can be used as a base class for text-processing streams. All parsers use it as a foundation.
In order to support JSONL more efficiently, the following helpers are provided:
-
jsonl/Parser reads a JSONL file and produces a stream of JavaScript objects like StreamValues.
-
jsonl/Stringer produces a JSONL file from a stream of JavaScript objects.
const {stringer} = require('stream-json/jsonl/Stringer'); const {parser} = require('stream-json/jsonl/Parser'); const {chain} = require('stream-chain'); const fs = require('fs'); const zlib = require('zlib'); // roundtrips data const pipeline = chain([ fs.createReadStream('sample1.jsonl.br'), zlib.createBrotliDecompress(), parser(), data => data.value, stringer(), zlib.createBrotliCompress(), fs.createWriteStream('sample2.jsonl.br') ]);
Frequently asked questions and tips can be found in FAQ.
Performance considerations are discussed in a separate document dedicated to Performance.
README, which includes the documentation.
The test file tests/sample.json.gz
is a combination of several publicly available datasets merged and compressed with gzip:
- a snapshot of publicly available Japanese statistics on birth and marriage in JSON.
- a snapshot of publicly available US Department of Housing and Urban Development - HUD's published metadata catalog (Schema Version 1.1).
- a small fake sample made up by me featuring non-ASCII keys, non-ASCII strings, and primitive data missing in the other two samples.
The test file tests/sample.jsonl.gz
is the first 100 rows from a snapshot of "Database of COVID-19 Research Articles" for 7/9/2020 publicly provided by CDC at this link: https://www.cdc.gov/library/docs/covid19/ONLY_New_Articles_9July2020_Excel.xlsx then converted to JSONL.