Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statistics #53

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,419 changes: 2,408 additions & 1,011 deletions gen-nodejs/parquet_types.js

Large diffs are not rendered by default.

64 changes: 30 additions & 34 deletions lib/codec/rle.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const varint = require('varint')

function encodeRunBitpacked(values, opts) {
if (values.length % 8 !== 0) {
throw 'must be a multiple of 8';
for (let i = 0; i < values.length % 8; i++) {
values.push(0);
}

let buf = Buffer.alloc(Math.ceil(opts.bitWidth * (values.length / 8)));
Expand Down Expand Up @@ -50,39 +50,35 @@ exports.encodeValues = function(type, values, opts) {
}

let buf = Buffer.alloc(0);
let runs = [];
for (let cur = 0; cur < values.length; cur += 8) {
let repeating = true;
for (let i = 1; i < 8; ++i) {
if (values[cur + i] !== values[cur]) {
repeating = false;
let run = [];
let repeats = 0;

for (let i = 0; i < values.length; i++) {
// If we are at the beginning of a run and the next value is same we start
// collecting repeated values
if ( repeats === 0 && run.length % 8 === 0 && values[i] === values[i+1]) {
// If we have any data in runs we need to encode them
if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
run = [];
}
repeats = 1;
} else if (repeats > 0 && values[i] === values[i-1]) {
repeats += 1;
} else {
// If values changes we need to post any previous repeated values
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[i-1], repeats, opts)]);
repeats = 0;
}
run.push(values[i]);
}

const append =
runs.length > 0 &&
(runs[runs.length - 1][1] !== null) === repeating &&
(!repeating || runs[runs.length - 1][1] === values[cur]);

if (!append) {
runs.push([cur, repeating ? values[cur] : null]);
}
}

for (let i = values.length - (values.length % 8); i < values.length; ++i) {
runs.push([i, values[i]]);
}

for (let i = 0; i < runs.length; ++i) {
const begin = runs[i][0];
const end = i < runs.length - 1 ? runs[i + 1][0] : values.length;
const rep = runs[i][1];

if (rep === null) {
buf = Buffer.concat([buf, encodeRunBitpacked(values.slice(begin, end), opts)]);
} else {
buf = Buffer.concat([buf, encodeRunRepeated(rep, end - begin, opts)]);
}
if (repeats) {
buf = Buffer.concat([buf, encodeRunRepeated(values[values.length-1], repeats, opts)]);
} else if (run.length) {
buf = Buffer.concat([buf, encodeRunBitpacked(run, opts)]);
}

if (opts.disableEnvelope) {
Expand All @@ -94,7 +90,7 @@ exports.encodeValues = function(type, values, opts) {
buf.copy(envelope, 4);

return envelope;
}
};

function decodeRunBitpacked(cursor, count, opts) {
if (count % 8 !== 0) {
Expand Down Expand Up @@ -144,11 +140,11 @@ exports.decodeValues = function(type, cursor, count, opts) {
values.push(...decodeRunRepeated(cursor, count, opts));
}
}
values = values.slice(0,count);

if (values.length !== count) {
throw "invalid RLE encoding";
}

return values;
}

};
54 changes: 52 additions & 2 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const parquet_util = require('./util')
const parquet_schema = require('./schema')
const parquet_codec = require('./codec')
const parquet_compression = require('./compression')
const parquet_types = require('./types');

/**
* Parquet File Magic String
Expand Down Expand Up @@ -118,6 +119,18 @@ class ParquetReader {
this.schema = new parquet_schema.ParquetSchema(
decodeSchema(
this.metadata.schema.splice(1)));

/* decode any statistics values */
if (this.metadata.row_groups) {
this.metadata.row_groups.forEach(row => row.columns.forEach( col => {
const stats = col.meta_data.statistics;
if (stats) {
const field = this.schema.findField(col.meta_data.path_in_schema);
stats.max_value = decodeStatisticsValue(stats.max_value, field);
stats.min_value = decodeStatisticsValue(stats.min_value, field);
}
}));
}
}

/**
Expand Down Expand Up @@ -294,6 +307,38 @@ function decodeValues(type, encoding, cursor, count, opts) {
return parquet_codec[encoding].decodeValues(type, cursor, count, opts);
}


function decodeStatisticsValue(value, column) {
if (value === null || !value.length) {
return undefined;
}
if (!column.primitiveType.includes('BYTE_ARRAY')) {
value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column);
if (value.length === 1) value = value[0];
}
if (column.originalType) {
value = parquet_types.fromPrimitive(column.originalType, value);
}
return value;
}

function decodeStatistics(statistics, column) {
if (!statistics) {
return;
}
if (statistics.min_value !== null) {
statistics.min_value = decodeStatisticsValue(statistics.min_value, column);
}
if (statistics.max_value !== null) {
statistics.max_value = decodeStatisticsValue(statistics.max_value, column);
}

statistics.min = statistics.min_value;
statistics.max = statistics.max_value;

return statistics;
}

function decodeDataPages(buffer, opts) {
let cursor = {
buffer: buffer,
Expand All @@ -310,19 +355,24 @@ function decodeDataPages(buffer, opts) {

while (cursor.offset < cursor.size) {
const pageHeader = new parquet_thrift.PageHeader();
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer);
cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset));

const pageType = parquet_util.getThriftEnum(
parquet_thrift.PageType,
pageHeader.type);

let pageData = null;

switch (pageType) {
case 'DATA_PAGE':
pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column);
pageData = decodeDataPage(cursor, pageHeader, opts);

break;
case 'DATA_PAGE_V2':
pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column);
pageData = decodeDataPageV2(cursor, pageHeader, opts);

break;
default:
throw "invalid page type: " + pageType;
Expand Down Expand Up @@ -407,7 +457,7 @@ function decodeDataPage(cursor, header, opts) {
function decodeDataPageV2(cursor, header, opts) {
const cursorEnd = cursor.offset + header.compressed_page_size;

const valueCount = header.data_page_header_v2.num_values;
const valueCount = header.data_page_header_v2.num_rows;
const valueCountNonNull = valueCount - header.data_page_header_v2.num_nulls;
const valueEncoding = parquet_util.getThriftEnum(
parquet_thrift.Encoding,
Expand Down
2 changes: 2 additions & 0 deletions lib/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
rLevelMax: rLevelMax,
dLevelMax: dLevelMax,
isNested: true,
statistics: opts.statistics,
fieldCount: Object.keys(opts.fields).length,
fields: buildFields(
opts.fields,
Expand Down Expand Up @@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) {
path: path.concat([name]),
repetitionType: repetitionType,
encoding: opts.encoding,
statistics: opts.statistics,
compression: opts.compression,
typeLength: opts.typeLength || typeDef.typeLength,
rLevelMax: rLevelMax,
Expand Down
11 changes: 10 additions & 1 deletion lib/shred.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) {
dlevels: [],
rlevels: [],
values: [],
distinct_values: new Set(),
count: 0
};
}
Expand All @@ -42,19 +43,24 @@ exports.shredRecord = function(schema, record, buffer) {
/* if no error during shredding, add the shredded record to the buffer */
if (!('columnData' in buffer) || !('rowCount' in buffer)) {
buffer.rowCount = 0;
buffer.pageRowCount = 0;
buffer.columnData = {};
buffer.pages = {};

for (let field of schema.fieldList) {
buffer.columnData[field.path] = {
dlevels: [],
rlevels: [],
values: [],
distinct_values: new Set(),
count: 0
};
buffer.pages[field.path] = [];
}
}

buffer.rowCount += 1;
buffer.pageRowCount += 1;
for (let field of schema.fieldList) {
Array.prototype.push.apply(
buffer.columnData[field.path].rlevels,
Expand All @@ -68,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) {
buffer.columnData[field.path].values,
recordShredded[field.path].values);

[...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value));

buffer.columnData[field.path].count += recordShredded[field.path].count;
}
}
};

function shredRecordInternal(fields, record, data, rlvl, dlvl) {
for (let fieldName in fields) {
Expand Down Expand Up @@ -125,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) {
rlvl_i,
field.dLevelMax);
} else {
data[field.path].distinct_values.add(values[i]);
data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i]));
data[field.path].rlevels.push(rlvl_i);
data[field.path].dlevels.push(field.dLevelMax);
Expand Down
19 changes: 18 additions & 1 deletion lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,23 @@
const fs = require('fs');
const thrift = require('thrift');


/** We need to use a patched version of TFramedTransport where
* readString returns the original buffer instead of a string if the
* buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF)
*/

class fixedTFramedTransport extends thrift.TFramedTransport {
readString(len) {
this.ensureAvailable(len);
var buffer = this.inBuf.slice(this.readPos, this.readPos + len);
var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len);
this.readPos += len;
return (Buffer.from(str).equals(buffer)) ? str : buffer;
}
}


/**
* Helper function that serializes a thrift object into a buffer
*/
Expand All @@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) {
offset = 0;
}

var transport = new thrift.TFramedTransport(buf);
var transport = new fixedTFramedTransport(buf);
transport.readPos = offset;
var protocol = new thrift.TCompactProtocol(transport);
obj.read(protocol);
Expand Down
Loading