From ae4cfd8502d960339ae66b0bd4e799539badc7d0 Mon Sep 17 00:00:00 2001 From: Ziggy Jonsson Date: Fri, 2 Mar 2018 08:44:27 -0500 Subject: [PATCH] Add statistics to pages and columns Default for all columns unless `statistics: false` in the field definition --- lib/reader.js | 34 ++++++++++++++++- lib/schema.js | 2 + lib/shred.js | 7 +++- lib/util.js | 19 +++++++++- lib/writer.js | 92 ++++++++++++++++++++++++++++++++++++++++++--- test/integration.js | 27 +++++++++++++ 6 files changed, 173 insertions(+), 8 deletions(-) diff --git a/lib/reader.js b/lib/reader.js index 21ce54d3..a586290e 100644 --- a/lib/reader.js +++ b/lib/reader.js @@ -7,6 +7,7 @@ const parquet_util = require('./util') const parquet_schema = require('./schema') const parquet_codec = require('./codec') const parquet_compression = require('./compression') +const parquet_types = require('./types'); /** * Parquet File Magic String @@ -294,6 +295,32 @@ function decodeValues(type, encoding, cursor, count, opts) { return parquet_codec[encoding].decodeValues(type, cursor, count, opts); } + +function decodeStatisticsValue(value, column) { + if (column.primitiveType !== 'BYTE_ARRAY') { + value = decodeValues(column.primitiveType,'PLAIN',{buffer: Buffer.from(value), offset: 0}, 1, column); + if (value.length === 1) value = value[0]; + + } + if (column.originalType) { + value = parquet_types.fromPrimitive(column.originalType, value); + } + return value; +} + +function decodeStatistics(statistics, column) { + if (!statistics) { + return; + } + statistics.min_value = decodeStatisticsValue(statistics.min_value, column); + statistics.max_value = decodeStatisticsValue(statistics.max_value, column); + + statistics.min = statistics.min_value; + statistics.max = statistics.max_value; + + return statistics; +} + function decodeDataPages(buffer, opts) { let cursor = { buffer: buffer, @@ -310,19 +337,24 @@ function decodeDataPages(buffer, opts) { while (cursor.offset < cursor.size) { const pageHeader = new parquet_thrift.PageHeader(); - cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); + cursor.offset += parquet_util.decodeThrift(pageHeader, cursor.buffer.slice(cursor.offset)); const pageType = parquet_util.getThriftEnum( parquet_thrift.PageType, pageHeader.type); let pageData = null; + switch (pageType) { case 'DATA_PAGE': + pageHeader.data_page_header.statistics = decodeStatistics(pageHeader.data_page_header.statistics, opts.column); pageData = decodeDataPage(cursor, pageHeader, opts); + break; case 'DATA_PAGE_V2': + pageHeader.data_page_header_v2.statistics = decodeStatistics(pageHeader.data_page_header_v2.statistics, opts.column); pageData = decodeDataPageV2(cursor, pageHeader, opts); + break; default: throw "invalid page type: " + pageType; diff --git a/lib/schema.js b/lib/schema.js index 9cb7fa30..897c3552 100644 --- a/lib/schema.js +++ b/lib/schema.js @@ -108,6 +108,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) { rLevelMax: rLevelMax, dLevelMax: dLevelMax, isNested: true, + statistics: opts.statistics, fieldCount: Object.keys(opts.fields).length, fields: buildFields( opts.fields, @@ -150,6 +151,7 @@ function buildFields(schema, rLevelParentMax, dLevelParentMax, path) { path: path.concat([name]), repetitionType: repetitionType, encoding: opts.encoding, + statistics: opts.statistics, compression: opts.compression, typeLength: opts.typeLength || typeDef.typeLength, rLevelMax: rLevelMax, diff --git a/lib/shred.js b/lib/shred.js index 5270de9f..bdb9a719 100644 --- a/lib/shred.js +++ b/lib/shred.js @@ -33,6 +33,7 @@ exports.shredRecord = function(schema, record, buffer) { dlevels: [], rlevels: [], values: [], + distinct_values: new Set(), count: 0 }; } @@ -51,6 +52,7 @@ exports.shredRecord = function(schema, record, buffer) { dlevels: [], rlevels: [], values: [], + distinct_values: new Set(), count: 0 }; buffer.pages[field.path] = []; @@ -72,9 +74,11 @@ exports.shredRecord = function(schema, record, buffer) { buffer.columnData[field.path].values, recordShredded[field.path].values); + [...recordShredded[field.path].distinct_values].forEach(value => buffer.columnData[field.path].distinct_values.add(value)); + buffer.columnData[field.path].count += recordShredded[field.path].count; } -} +}; function shredRecordInternal(fields, record, data, rlvl, dlvl) { for (let fieldName in fields) { @@ -129,6 +133,7 @@ function shredRecordInternal(fields, record, data, rlvl, dlvl) { rlvl_i, field.dLevelMax); } else { + data[field.path].distinct_values.add(values[i]); data[field.path].values.push(parquet_types.toPrimitive(fieldType, values[i])); data[field.path].rlevels.push(rlvl_i); data[field.path].dlevels.push(field.dLevelMax); diff --git a/lib/util.js b/lib/util.js index 91356ca9..a6d2e773 100644 --- a/lib/util.js +++ b/lib/util.js @@ -2,6 +2,23 @@ const fs = require('fs'); const thrift = require('thrift'); + +/** We need to use a patched version of TFramedTransport where + * readString returns the original buffer instead of a string if the + * buffer can not be safely encoded as utf8 (see http://bit.ly/2GXeZEF) + */ + +class fixedTFramedTransport extends thrift.TFramedTransport { + readString(len) { + this.ensureAvailable(len); + var buffer = this.inBuf.slice(this.readPos, this.readPos + len); + var str = this.inBuf.toString('utf8', this.readPos, this.readPos + len); + this.readPos += len; + return (Buffer.from(str).equals(buffer)) ? str : buffer; + } +} + + /** * Helper function that serializes a thrift object into a buffer */ @@ -24,7 +41,7 @@ exports.decodeThrift = function(obj, buf, offset) { offset = 0; } - var transport = new thrift.TFramedTransport(buf); + var transport = new fixedTFramedTransport(buf); transport.readPos = offset; var protocol = new thrift.TCompactProtocol(transport); obj.read(protocol); diff --git a/lib/writer.js b/lib/writer.js index d820c0eb..2da87a08 100644 --- a/lib/writer.js +++ b/lib/writer.js @@ -7,6 +7,7 @@ const parquet_shredder = require('./shred') const parquet_util = require('./util') const parquet_codec = require('./codec') const parquet_compression = require('./compression') +const parquet_types = require('./types'); /** * Parquet File Magic String @@ -293,6 +294,27 @@ function encodeValues(type, encoding, values, opts) { return parquet_codec[encoding].encodeValues(type, values, opts); } +function encodeStatisticsValue(value, column) { + if (column.originalType) { + value = parquet_types.toPrimitive(column.originalType,value); + } + if (column.primitiveType !== 'BYTE_ARRAY') { + value = encodeValues(column.primitiveType,'PLAIN',[value],column); + } + return value; +} + +function encodeStatistics(statistics,column) { + statistics = Object.assign({},statistics); + statistics.min_value = encodeStatisticsValue(statistics.min_value, column); + statistics.max_value = encodeStatisticsValue(statistics.max_value, column); + + statistics.max = statistics.max_value; + statistics.min = statistics.min_value; + + return new parquet_thrift.Statistics(statistics); +} + function encodePages(schema, rowBuffer, opts) { if (!rowBuffer.pageRowCount) { return; @@ -305,6 +327,23 @@ function encodePages(schema, rowBuffer, opts) { let page; const values = rowBuffer.columnData[field.path]; + + let statistics; + + if (field.statistics !== false) { + statistics = {}; + [...values.distinct_values].forEach( (v,i) => { + if (i === 0 || v > statistics.max_value) { + statistics.max_value = v; + } + if (i === 0 || v < statistics.min_value) { + statistics.min_value = v; + } + }); + + statistics.null_count = values.count - values.values.length; + statistics.distinct_count = values.distinct_values.size; + } if (opts.useDataPageV2) { page = encodeDataPageV2( @@ -313,18 +352,26 @@ function encodePages(schema, rowBuffer, opts) { rowBuffer.pageRowCount, values.values, values.rlevels, - values.dlevels); + values.dlevels, + statistics); } else { page = encodeDataPage( field, values.count, values.values, values.rlevels, - values.dlevels); + values.dlevels, + statistics); } - rowBuffer.pages[field.path].push( {page, count: rowBuffer.pageRowCount}); + rowBuffer.pages[field.path].push({ + page, + statistics, + distinct_values: values.distinct_values, + count: rowBuffer.pageRowCount + }); + values.distinct_values = new Set(); values.values = []; values.rlevels = []; values.dlevels = []; @@ -337,7 +384,7 @@ function encodePages(schema, rowBuffer, opts) { /** * Encode a parquet data page */ -function encodeDataPage(column, valueCount, values, rlevels, dlevels) { +function encodeDataPage(column, valueCount, values, rlevels, dlevels, statistics) { /* encode values */ let valuesBuf = encodeValues( column.primitiveType, @@ -374,6 +421,9 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) { pageHeader.compressed_page_size = pageBody.length; pageHeader.data_page_header = new parquet_thrift.DataPageHeader(); pageHeader.data_page_header.num_values = valueCount; + if (column.statistics !== false) { + pageHeader.data_page_header.statistics = encodeStatistics(statistics, column); + } pageHeader.data_page_header.encoding = parquet_thrift.Encoding[column.encoding]; pageHeader.data_page_header.definition_level_encoding = @@ -388,7 +438,7 @@ function encodeDataPage(column, valueCount, values, rlevels, dlevels) { /** * Encode a parquet data page (v2) */ -function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels) { +function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels, statistics) { /* encode values */ let valuesBuf = encodeValues( column.primitiveType, @@ -433,6 +483,10 @@ function encodeDataPageV2(column, valueCount, rowCount, values, rlevels, dlevels pageHeader.data_page_header_v2.num_nulls = valueCount - values.length; pageHeader.data_page_header_v2.num_rows = rowCount; + if (column.statistics !== false) { + pageHeader.data_page_header_v2.statistics = encodeStatistics(statistics, column); + } + pageHeader.uncompressed_page_size = rLevelsBuf.length + dLevelsBuf.length + valuesBuf.length; @@ -478,6 +532,34 @@ function encodeColumnChunk(pages, opts) { metadata.codec = parquet_thrift.CompressionCodec[ opts.useDataPageV2 ? opts.column.compression : 'UNCOMPRESSED']; + /* compile statistics */ + let statistics = {}; + let distinct_values = new Set(); + statistics.null_count = 0; + statistics.distinct_count = 0; + + + for (let i = 0; i < pages.length; i++) { + let page = pages[i]; + + if (opts.column.statistics !== false) { + + if (page.statistics.max_value > statistics.max_value || i == 0) { + statistics.max_value = page.statistics.max_value; + } + if (page.statistics.min_value < statistics.min_value || i == 0) { + statistics.min_value = page.statistics.min_value; + } + statistics.null_count += page.statistics.null_count; + page.distinct_values.forEach(value => distinct_values.add(value)); + } + } + + if (opts.column.statistics !== false) { + statistics.distinct_count = distinct_values.size; + metadata.statistics = encodeStatistics(statistics, opts.column); + } + /* list encodings */ let encodingsSet = {}; encodingsSet[PARQUET_RDLVL_ENCODING] = true; diff --git a/test/integration.js b/test/integration.js index 4e40273a..279d7e7c 100644 --- a/test/integration.js +++ b/test/integration.js @@ -151,6 +151,25 @@ async function verifyPages() { assert.equal(rowCount, column.column.meta_data.num_values); } +async function verifyStatistics() { + const column = await sampleColumnHeaders(); + const colStats = column.column.meta_data.statistics; + + assert.equal(colStats.max_value, 'oranges'); + assert.equal(colStats.min_value, 'apples'); + assert.equal(colStats.null_count, 0); + assert.equal(colStats.distinct_count, 4); + + column.pages.forEach( (d, i) => { + let header = d.data_page_header || d.data_page_header_v2; + let pageStats = header.statistics; + assert.equal(pageStats.null_count,0); + assert.equal(pageStats.distinct_count, 4); + assert.equal(pageStats.max_value, 'oranges'); + assert.equal(pageStats.min_value, 'apples'); + }); +} + async function readTestFile() { let reader = await parquet.ParquetReader.openFile('fruits.parquet'); assert.equal(reader.getRowCount(), TEST_NUM_ROWS * 4); @@ -345,6 +364,10 @@ describe('Parquet', function() { it('verify that data is split into pages', function() { return verifyPages(); }); + + it('verify statistics', function() { + return verifyStatistics(); + }); }); describe('with DataPageHeaderV2', function() { @@ -362,6 +385,10 @@ describe('Parquet', function() { return verifyPages(); }); + it('verify statistics', function() { + return verifyStatistics(); + }); + it('write a test file with GZIP compression', function() { const opts = { useDataPageV2: true, compression: 'GZIP' }; return writeTestFile(opts);