Skip to content

Commit

Permalink
Encode pages when pageRowCount > PageSize
Browse files Browse the repository at this point in the history
This moves data into encoded buffer as soon as possible, reducing memory requirements for the whole rowGroup
  • Loading branch information
ZJONSSON committed Feb 27, 2018
1 parent 1fa58b5 commit f2d8355
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 33 deletions.
2 changes: 1 addition & 1 deletion lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ function decodeDataPages(buffer, opts) {

while (cursor.offset < cursor.size) {
const pageHeader = new parquet_thrift.PageHeader();
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer);
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));

const pageType = parquet_util.getThriftEnum(
parquet_thrift.PageType,
Expand Down
4 changes: 4 additions & 0 deletions lib/shred.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ exports.shredRecord = function(schema, record, buffer) {
/* if no error during shredding, add the shredded record to the buffer */
if (!('columnData' in buffer) || !('rowCount' in buffer)) {
buffer.rowCount = 0;
buffer.pageRowCount = 0;
buffer.columnData = {};
buffer.pages = {};

for (let field of schema.fieldList) {
buffer.columnData[field.path] = {
Expand All @@ -51,10 +53,12 @@ exports.shredRecord = function(schema, record, buffer) {
values: [],
count: 0
};
buffer.pages[field.path] = [];
}
}

buffer.rowCount += 1;
buffer.pageRowCount += 1;
for (let field of schema.fieldList) {
Array.prototype.push.apply(
buffer.columnData[field.path].rlevels,
Expand Down
85 changes: 56 additions & 29 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ class ParquetWriter {

parquet_shredder.shredRecord(this.schema, row, this.rowBuffer);

if (this.rowBuffer.pageRowCount > this.envelopeWriter.pageSize) {
encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2});
}

if (this.rowBuffer.rowCount >= this.rowGroupSize) {
encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2});
await this.envelopeWriter.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
}
Expand All @@ -113,6 +118,7 @@ class ParquetWriter {
this.closed = true;

if (this.rowBuffer.rowCount > 0 || this.rowBuffer.rowCount >= this.rowGroupSize) {
encodePages(this.schema, this.rowBuffer, { useDataPageV2: this.envelopeWriter.useDataPageV2});
await this.envelopeWriter.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
}
Expand Down Expand Up @@ -177,7 +183,7 @@ class ParquetEnvelopeWriter {
this.offset = fileOffset;
this.rowCount = 0;
this.rowGroups = [];
this.pageSize = PARQUET_DEFAULT_PAGE_SIZE;
this.pageSize = opts.pageSize || PARQUET_DEFAULT_PAGE_SIZE;
this.useDataPageV2 = ("useDataPageV2" in opts) ? opts.useDataPageV2 : true;
}

Expand Down Expand Up @@ -287,6 +293,47 @@ function encodeValues(type, encoding, values, opts) {
return parquet_codec[encoding].encodeValues(type, values, opts);
}

function encodePages(schema, rowBuffer, opts) {
if (!rowBuffer.pageRowCount) {
return;
}

for (let field of schema.fieldList) {
if (field.isNested) {
continue;
}

let page;
const values = rowBuffer.columnData[field.path];

if (opts.useDataPageV2) {
page = encodeDataPageV2(
field,
values.count,
rowBuffer.pageRowCount,
values.values,
values.rlevels,
values.dlevels);
} else {
page = encodeDataPage(
field,
values.count,
values.values,
values.rlevels,
values.dlevels);
}

rowBuffer.pages[field.path].push( {page, count: rowBuffer.pageRowCount});

values.values = [];
values.rlevels = [];
values.dlevels = [];
values.count = 0;
}

rowBuffer.pageRowCount = 0;
}

/**
* Encode a parquet data page
*/
Expand Down Expand Up @@ -407,41 +454,21 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels
valuesBufCompressed]);
}




/**
* Encode an array of values into a parquet column chunk
*/
function encodeColumnChunk(values, opts) {
/* encode data page(s) */
let pages = [];

{
let dataPage;
if (opts.useDataPageV2) {
dataPage = encodeDataPageV2(
opts.column,
values.count,
opts.rowCount,
values.values,
values.rlevels,
values.dlevels);
} else {
dataPage = encodeDataPage(
opts.column,
values.count,
values.values,
values.rlevels,
values.dlevels);
}

pages.push(dataPage);
}
function encodeColumnChunk(pages, opts) {

let pagesBuf = Buffer.concat(pages);
let pagesBuf = Buffer.concat(pages.map(d => d.page));
let count = pages.reduce((p,d) => p + d.count, 0);

/* prepare metadata header */
let metadata = new parquet_thrift.ColumnMetaData();
metadata.path_in_schema = opts.column.path;
metadata.num_values = values.count;
metadata.num_values = count;
metadata.data_page_offset = opts.baseOffset;
metadata.encodings = [];
metadata.total_uncompressed_size = pagesBuf.length;
Expand Down Expand Up @@ -481,7 +508,7 @@ function encodeRowGroup(schema, data, opts) {
}

let cchunkData = encodeColumnChunk(
data.columnData[field.path],
data.pages[field.path],
{
column: field,
baseOffset: opts.baseOffset + body.length,
Expand Down
52 changes: 49 additions & 3 deletions test/integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const fs = require('fs');
const os = require('os');
const assert = chai.assert;
const parquet = require('../parquet.js');
const parquet_thrift = require('../gen-nodejs/parquet_types');
const parquet_util = require('../lib/util');
const objectStream = require('object-stream');

const TEST_NUM_ROWS = 10000;
Expand Down Expand Up @@ -112,6 +114,43 @@ async function writeTestFile(opts) {
await writer.close();
}

async function sampleColumnHeaders() {
let reader = await parquet.ParquetReader.openFile('fruits.parquet');
let column = reader.metadata.row_groups[0].columns[0];
let buffer = await reader.envelopeReader.read(+column.meta_data.data_page_offset, +column.meta_data.total_compressed_size);

let cursor = {
buffer: buffer,
offset: 0,
size: buffer.length
};

const pages = [];

while (cursor.offset < cursor.size) {
const pageHeader = new parquet_thrift.PageHeader();
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));
pages.push(pageHeader);
cursor.offset += pageHeader.compressed_page_size;
}

return {column, pages};
}

async function verifyPages() {
let rowCount = 0;
const column = await sampleColumnHeaders();

column.pages.forEach(d => {
let header = d.data_page_header || d.data_page_header_v2;
assert.isAbove(header.num_values,0);
rowCount += header.num_values;
});

assert.isAbove(column.pages.length,1);
assert.equal(rowCount, column.column.meta_data.num_values);
}

async function readTestFile() {
let reader = await parquet.ParquetReader.openFile('fruits.parquet');
assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4);
Expand Down Expand Up @@ -299,9 +338,13 @@ describe('Parquet', function() {
});

it('write a test file and then read it back', function() {
const opts = { useDataPageV2: false, compression: 'UNCOMPRESSED' };
const opts = { useDataPageV2: false, pageSize: 2000, compression: 'UNCOMPRESSED' };
return writeTestFile(opts).then(readTestFile);
});

it('verify that data is split into pages', function() {
return verifyPages();
});
});

describe('with DataPageHeaderV2', function() {
Expand All @@ -311,10 +354,14 @@ describe('Parquet', function() {
});

it('write a test file and then read it back', function() {
const opts = { useDataPageV2: true, compression: 'UNCOMPRESSED' };
const opts = { useDataPageV2: true, pageSize: 2000, compression: 'UNCOMPRESSED' };
return writeTestFile(opts).then(readTestFile);
});

it('verify that data is split into pages', function() {
return verifyPages();
});

it('write a test file with GZIP compression', function() {
const opts = { useDataPageV2: true, compression: 'GZIP' };
return writeTestFile(opts);
Expand Down Expand Up @@ -374,4 +421,3 @@ describe('Parquet', function() {
});

});

0 comments on commit f2d8355

Please sign in to comment.