From f2d8355a3e7b14406a51acd116002c9bb6aec6c8 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Wed, 14 Feb 2018 13:45:31 -0500 Subject: [PATCH] Encode pages when pageRowCount > PageSize This moves data into encoded buffer as soon as possible, reducing memory requirements for the whole rowGroup --- lib/reader.js | 2 +- lib/shred.js | 4 +++ lib/writer.js | 85 +++++++++++++++++++++++++++++---------------- test/integration.js | 52 +++++++++++++++++++++++++-- 4 files changed, 110 insertions(+), 33 deletions(-) diff --git a/lib/reader.js b/lib/reader.js index 43e78d9d..21ce54d3 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -310,7 +310,7 @@ function decodeDataPages(buffer, opts) { while (cursor.offset < cursor.size) { const pageHeader = new parquet_thrift.PageHeader(); - cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer); + cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); const pageType = parquet_util.getThriftEnum( parquet_thrift.PageType, diff --git a/lib/shred.js b/lib/shred.js index 1e082b1d..5270de9f 100644 --- a/lib/shred.js +++ b/lib/shred.js @@ -42,7 +42,9 @@ exports.shredRecord = function(schema, record, buffer) { /* if no error during shredding, add the shredded record to the buffer */ if (!('columnData' in buffer) || !('rowCount' in buffer)) { buffer.rowCount = 0; + buffer.pageRowCount = 0; buffer.columnData = {}; + buffer.pages = {}; for (let field of schema.fieldList) { buffer.columnData[field.path] = { @@ -51,10 +53,12 @@ exports.shredRecord = function(schema, record, buffer) { values: [], count: 0 }; + buffer.pages[field.path] = []; } } buffer.rowCount += 1; + buffer.pageRowCount += 1; for (let field of schema.fieldList) { Array.prototype.push.apply( buffer.columnData[field.path].rlevels, diff --git a/lib/writer.js b/lib/writer.js index f1130e81..d820c0eb 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -93,7 +93,12 @@ class ParquetWriter { parquet_shredder.shredRecord(this.schema, row, this.rowBuffer); + if (this.rowBuffer.pageRowCount > this.envelopeWriter.pageSize) { + encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2}); + } + if (this.rowBuffer.rowCount >= this.rowGroupSize) { + encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2}); await this.envelopeWriter.writeRowGroup(this.rowBuffer); this.rowBuffer = {}; } @@ -113,6 +118,7 @@ class ParquetWriter { this.closed = true; if (this.rowBuffer.rowCount > 0 || this.rowBuffer.rowCount >= this.rowGroupSize) { + encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2}); await this.envelopeWriter.writeRowGroup(this.rowBuffer); this.rowBuffer = {}; } @@ -177,7 +183,7 @@ class ParquetEnvelopeWriter { this.offset = fileOffset; this.rowCount = 0; this.rowGroups = []; - this.pageSize = PARQUET_DEFAULT_PAGE_SIZE; + this.pageSize = opts.pageSize || PARQUET_DEFAULT_PAGE_SIZE; this.useDataPageV2 = ("useDataPageV2" in opts) ? opts.useDataPageV2 : true; } @@ -287,6 +293,47 @@ function encodeValues(type, encoding, values, opts) { return parquet_codec[encoding].encodeValues(type, values, opts); } +function encodePages(schema, rowBuffer, opts) { + if (!rowBuffer.pageRowCount) { + return; + } + + for (let field of schema.fieldList) { + if (field.isNested) { + continue; + } + + let page; + const values = rowBuffer.columnData[field.path]; + + if (opts.useDataPageV2) { + page = encodeDataPageV2( + field, + values.count, + rowBuffer.pageRowCount, + values.values, + values.rlevels, + values.dlevels); + } else { + page = encodeDataPage( + field, + values.count, + values.values, + values.rlevels, + values.dlevels); + } + + rowBuffer.pages[field.path].push( {page, count: rowBuffer.pageRowCount}); + + values.values = []; + values.rlevels = []; + values.dlevels = []; + values.count = 0; + } + + rowBuffer.pageRowCount = 0; +} + /** * Encode a parquet data page */ @@ -407,41 +454,21 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels valuesBufCompressed]); } + + + /** * Encode an array of values into a parquet column chunk */ -function encodeColumnChunk(values, opts) { - /* encode data page(s) */ - let pages = []; - - { - let dataPage; - if (opts.useDataPageV2) { - dataPage = encodeDataPageV2( - opts.column, - values.count, - opts.rowCount, - values.values, - values.rlevels, - values.dlevels); - } else { - dataPage = encodeDataPage( - opts.column, - values.count, - values.values, - values.rlevels, - values.dlevels); - } - - pages.push(dataPage); - } +function encodeColumnChunk(pages, opts) { - let pagesBuf = Buffer.concat(pages); + let pagesBuf = Buffer.concat(pages.map(d => d.page)); + let count = pages.reduce((p,d) => p + d.count, 0); /* prepare metadata header */ let metadata = new parquet_thrift.ColumnMetaData(); metadata.path_in_schema = opts.column.path; - metadata.num_values = values.count; + metadata.num_values = count; metadata.data_page_offset = opts.baseOffset; metadata.encodings = []; metadata.total_uncompressed_size = pagesBuf.length; @@ -481,7 +508,7 @@ function encodeRowGroup(schema, data, opts) { } let cchunkData = encodeColumnChunk( - data.columnData[field.path], + data.pages[field.path], { column: field, baseOffset: opts.baseOffset + body.length, diff --git a/test/integration.js b/test/integration.js index 5cfb2089..4e40273a 100644 --- a/test/integration.js +++ b/test/integration.js @@ -4,6 +4,8 @@ const fs = require('fs'); const os = require('os'); const assert = chai.assert; const parquet = require('../parquet.js'); +const parquet_thrift = require('../gen-nodejs/parquet_types'); +const parquet_util = require('../lib/util'); const objectStream = require('object-stream'); const TEST_NUM_ROWS = 10000; @@ -112,6 +114,43 @@ async function writeTestFile(opts) { await writer.close(); } +async function sampleColumnHeaders() { + let reader = await parquet.ParquetReader.openFile('fruits.parquet'); + let column = reader.metadata.row_groups[0].columns[0]; + let buffer = await reader.envelopeReader.read(+column.meta_data.data_page_offset, +column.meta_data.total_compressed_size); + + let cursor = { + buffer: buffer, + offset: 0, + size: buffer.length + }; + + const pages = []; + + while (cursor.offset < cursor.size) { + const pageHeader = new parquet_thrift.PageHeader(); + cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); + pages.push(pageHeader); + cursor.offset += pageHeader.compressed_page_size; + } + + return {column, pages}; +} + +async function verifyPages() { + let rowCount = 0; + const column = await sampleColumnHeaders(); + + column.pages.forEach(d => { + let header = d.data_page_header || d.data_page_header_v2; + assert.isAbove(header.num_values,0); + rowCount += header.num_values; + }); + + assert.isAbove(column.pages.length,1); + assert.equal(rowCount, column.column.meta_data.num_values); +} + async function readTestFile() { let reader = await parquet.ParquetReader.openFile('fruits.parquet'); assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4); @@ -299,9 +338,13 @@ describe('Parquet', function() { }); it('write a test file and then read it back', function() { - const opts = { useDataPageV2: false, compression: 'UNCOMPRESSED' }; + const opts = { useDataPageV2: false, pageSize: 2000, compression: 'UNCOMPRESSED' }; return writeTestFile(opts).then(readTestFile); }); + + it('verify that data is split into pages', function() { + return verifyPages(); + }); }); describe('with DataPageHeaderV2', function() { @@ -311,10 +354,14 @@ describe('Parquet', function() { }); it('write a test file and then read it back', function() { - const opts = { useDataPageV2: true, compression: 'UNCOMPRESSED' }; + const opts = { useDataPageV2: true, pageSize: 2000, compression: 'UNCOMPRESSED' }; return writeTestFile(opts).then(readTestFile); }); + it('verify that data is split into pages', function() { + return verifyPages(); + }); + it('write a test file with GZIP compression', function() { const opts = { useDataPageV2: true, compression: 'GZIP' }; return writeTestFile(opts); @@ -374,4 +421,3 @@ describe('Parquet', function() { }); }); -