Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to DuckDB v1.1.1 #121

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@
"DUCKDB_EXTENSION_ICU_LINKED",
"DUCKDB_EXTENSION_JSON_LINKED",
"DUCKDB_EXTENSION_AUTOLOAD_DEFAULT=1",
"DUCKDB_EXTENSION_AUTOINSTALL_DEFAULT=1"
"DUCKDB_EXTENSION_AUTOINSTALL_DEFAULT=1",
"NDEBUG"
],
"cflags_cc": [
"-frtti",
Expand Down

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions src/duckdb/extension/json/include/json_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,16 @@ struct JSONCommon {
};

//! Get JSON value using JSON path query (safe, checks the path query)
static inline yyjson_val *Get(yyjson_val *val, const string_t &path_str) {
static inline yyjson_val *Get(yyjson_val *val, const string_t &path_str, bool integral_argument) {
auto ptr = path_str.GetData();
auto len = path_str.GetSize();
if (len == 0) {
return GetUnsafe(val, ptr, len);
}
if (integral_argument) {
auto str = "$[" + path_str.GetString() + "]";
return GetUnsafe(val, str.c_str(), str.length());
}
switch (*ptr) {
case '/': {
// '/' notation must be '\0'-terminated
Expand All @@ -260,9 +264,15 @@ struct JSONCommon {
}
return GetUnsafe(val, ptr, len);
}
default:
auto str = "/" + string(ptr, len);
return GetUnsafe(val, str.c_str(), len + 1);
default: {
string path;
if (memchr(ptr, '"', len)) {
path = "/" + string(ptr, len);
} else {
path = "$.\"" + path_str.GetString() + "\"";
}
return GetUnsafe(val, path.c_str(), path.length());
}
}
}

Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/extension/json/include/json_executors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "json_functions.hpp"

Expand Down Expand Up @@ -88,11 +89,18 @@ struct JSONExecutors {
}
} else { // Columnref path
D_ASSERT(info.path_type == JSONCommon::JSONPathType::REGULAR);
auto &paths = args.data[1];
unique_ptr<Vector> casted_paths;
if (args.data[1].GetType().id() == LogicalTypeId::VARCHAR) {
casted_paths = make_uniq<Vector>(args.data[1]);
} else {
casted_paths = make_uniq<Vector>(LogicalTypeId::VARCHAR);
VectorOperations::DefaultCast(args.data[1], *casted_paths, args.size(), true);
}
BinaryExecutor::ExecuteWithNulls<string_t, string_t, T>(
inputs, paths, result, args.size(), [&](string_t input, string_t path, ValidityMask &mask, idx_t idx) {
inputs, *casted_paths, result, args.size(),
[&](string_t input, string_t path, ValidityMask &mask, idx_t idx) {
auto doc = JSONCommon::ReadDocument(input, JSONCommon::READ_FLAG, lstate.json_allocator.GetYYAlc());
auto val = JSONCommon::Get(doc->root, path);
auto val = JSONCommon::Get(doc->root, path, args.data[1].GetType().IsIntegral());
if (SET_NULL_IF_NOT_FOUND && !val) {
mask.SetInvalid(idx);
return T {};
Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/extension/json/json_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static DefaultMacro json_macros[] = {
"json_group_structure",
{"x", nullptr},
{{nullptr, nullptr}},
"json_structure(json_group_array(x))->'0'"},
"json_structure(json_group_array(x))->0"},
{DEFAULT_SCHEMA, "json", {"x", nullptr}, {{nullptr, nullptr}}, "json_extract(x, '$')"},
{nullptr, nullptr, {nullptr}, {{nullptr, nullptr}}, nullptr}};

Expand Down
23 changes: 16 additions & 7 deletions src/duckdb/extension/json/json_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@ static JSONPathType CheckPath(const Value &path_val, string &path, size_t &len)
const auto path_str_val = path_val.DefaultCastAs(LogicalType::VARCHAR);
auto path_str = path_str_val.GetValueUnsafe<string_t>();
len = path_str.GetSize();
auto ptr = path_str.GetData();
const auto ptr = path_str.GetData();
// Empty strings and invalid $ paths yield an error
if (len == 0) {
throw BinderException("Empty JSON path");
}
JSONPathType path_type = JSONPathType::REGULAR;
if (*ptr == '$') {
path_type = JSONCommon::ValidatePath(ptr, len, true);
}
// Copy over string to the bind data
if (*ptr == '/' || *ptr == '$') {
path = string(ptr, len);
} else {
} else if (path_val.type().IsIntegral()) {
path = "$[" + string(ptr, len) + "]";
} else if (memchr(ptr, '"', len)) {
path = "/" + string(ptr, len);
len++;
} else {
path = "$.\"" + string(ptr, len) + "\"";
}
len = path.length();
if (*path.c_str() == '$') {
path_type = JSONCommon::ValidatePath(path.c_str(), len, true);
}
return path_type;
}
Expand Down Expand Up @@ -67,7 +71,11 @@ unique_ptr<FunctionData> JSONReadFunctionData::Bind(ClientContext &context, Scal
path_type = CheckPath(path_val, path, len);
}
}
bound_function.arguments[1] = LogicalType::VARCHAR;
if (arguments[1]->return_type.IsIntegral()) {
bound_function.arguments[1] = LogicalType::BIGINT;
} else {
bound_function.arguments[1] = LogicalType::VARCHAR;
}
if (path_type == JSONCommon::JSONPathType::WILDCARD) {
bound_function.return_type = LogicalType::LIST(bound_function.return_type);
}
Expand Down Expand Up @@ -117,6 +125,7 @@ unique_ptr<FunctionData> JSONReadManyFunctionData::Bind(ClientContext &context,

JSONFunctionLocalState::JSONFunctionLocalState(Allocator &allocator) : json_allocator(allocator) {
}

JSONFunctionLocalState::JSONFunctionLocalState(ClientContext &context)
: JSONFunctionLocalState(BufferAllocator::Get(context)) {
}
Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/extension/json/json_functions/json_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@ static inline string_t ExtractFromVal(yyjson_val *val, yyjson_alc *alc, Vector &
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}

static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &, idx_t) {
return yyjson_is_str(val) ? string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val))
: JSONCommon::WriteVal<yyjson_val>(val, alc);
static inline string_t ExtractStringFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
return string_t {};
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NOESC:
case YYJSON_TYPE_STR | YYJSON_SUBTYPE_NONE:
return string_t(unsafe_yyjson_get_str(val), unsafe_yyjson_get_len(val));
default:
return JSONCommon::WriteVal<yyjson_val>(val, alc);
}
}

static void ExtractFunction(DataChunk &args, ExpressionState &state, Vector &result) {
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/extension/json/json_functions/json_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace duckdb {

static inline string_t ValueFromVal(yyjson_val *val, yyjson_alc *alc, Vector &, ValidityMask &mask, idx_t idx) {
switch (yyjson_get_tag(val)) {
case YYJSON_TYPE_NULL | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_ARR | YYJSON_SUBTYPE_NONE:
case YYJSON_TYPE_OBJ | YYJSON_SUBTYPE_NONE:
mask.SetInvalid(idx);
Expand All @@ -22,12 +23,12 @@ static void ValueManyFunction(DataChunk &args, ExpressionState &state, Vector &r
}

static void GetValueFunctionsInternal(ScalarFunctionSet &set, const LogicalType &input_type) {
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::BIGINT}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::JSON(), ValueFunction,
set.AddFunction(ScalarFunction({input_type, LogicalType::VARCHAR}, LogicalType::VARCHAR, ValueFunction,
JSONReadFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
set.AddFunction(ScalarFunction({input_type, LogicalType::LIST(LogicalType::VARCHAR)},
LogicalType::LIST(LogicalType::JSON()), ValueManyFunction,
LogicalType::LIST(LogicalType::VARCHAR), ValueManyFunction,
JSONReadManyFunctionData::Bind, nullptr, nullptr, JSONFunctionLocalState::Init));
}

Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ void ColumnReader::PrepareRead(parquet_filter_t &filter) {
break;
case PageType::DICTIONARY_PAGE:
PreparePage(page_hdr);
if (page_hdr.dictionary_page_header.num_values < 0) {
throw std::runtime_error("Invalid dictionary page header (num_values < 0)");
}
Dictionary(std::move(block), page_hdr.dictionary_page_header.num_values);
break;
default:
Expand Down
97 changes: 54 additions & 43 deletions src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1209,47 +1209,6 @@ class IntervalColumnWriter : public BasicColumnWriter {
}
};

//===--------------------------------------------------------------------===//
// Geometry Column Writer
//===--------------------------------------------------------------------===//
// This class just wraps another column writer, but also calculates the extent
// of the geometry column by updating the geodata object with every written
// vector.
template <class WRITER_IMPL>
class GeometryColumnWriter : public WRITER_IMPL {
GeoParquetColumnMetadata geo_data;
GeoParquetColumnMetadataWriter geo_data_writer;
string column_name;

public:
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override {
// Just write normally
WRITER_IMPL::Write(state, vector, count);

// And update the geodata object
geo_data_writer.Update(geo_data, vector, count);
}
void FinalizeWrite(ColumnWriterState &state) override {
WRITER_IMPL::FinalizeWrite(state);

// Add the geodata object to the writer
this->writer.GetGeoParquetData().geometry_columns[column_name] = geo_data;
}

public:
GeometryColumnWriter(ClientContext &context, ParquetWriter &writer, idx_t schema_idx, vector<string> schema_path_p,
idx_t max_repeat, idx_t max_define, bool can_have_nulls, string name)
: WRITER_IMPL(writer, schema_idx, std::move(schema_path_p), max_repeat, max_define, can_have_nulls),
geo_data_writer(context), column_name(std::move(name)) {

auto &geo_data = writer.GetGeoParquetData();
if (geo_data.primary_geometry_column.empty()) {
// Set the first column to the primary column
geo_data.primary_geometry_column = column_name;
}
}
};

//===--------------------------------------------------------------------===//
// String Column Writer
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1563,6 +1522,58 @@ class StringColumnWriter : public BasicColumnWriter {
}
};

//===--------------------------------------------------------------------===//
// WKB Column Writer
//===--------------------------------------------------------------------===//
// Used to store the metadata for a WKB-encoded geometry column when writing
// GeoParquet files.
class WKBColumnWriterState final : public StringColumnWriterState {
public:
WKBColumnWriterState(ClientContext &context, duckdb_parquet::format::RowGroup &row_group, idx_t col_idx)
: StringColumnWriterState(row_group, col_idx), geo_data(), geo_data_writer(context) {
}

GeoParquetColumnMetadata geo_data;
GeoParquetColumnMetadataWriter geo_data_writer;
};

class WKBColumnWriter final : public StringColumnWriter {
public:
WKBColumnWriter(ClientContext &context_p, ParquetWriter &writer, idx_t schema_idx, vector<string> schema_path_p,
idx_t max_repeat, idx_t max_define, bool can_have_nulls, string name)
: StringColumnWriter(writer, schema_idx, std::move(schema_path_p), max_repeat, max_define, can_have_nulls),
column_name(std::move(name)), context(context_p) {

this->writer.GetGeoParquetData().RegisterGeometryColumn(column_name);
}

unique_ptr<ColumnWriterState> InitializeWriteState(duckdb_parquet::format::RowGroup &row_group) override {
auto result = make_uniq<WKBColumnWriterState>(context, row_group, row_group.columns.size());
RegisterToRowGroup(row_group);
return std::move(result);
}
void Write(ColumnWriterState &state, Vector &vector, idx_t count) override {
StringColumnWriter::Write(state, vector, count);

auto &geo_state = state.Cast<WKBColumnWriterState>();
geo_state.geo_data_writer.Update(geo_state.geo_data, vector, count);
}

void FinalizeWrite(ColumnWriterState &state) override {
StringColumnWriter::FinalizeWrite(state);

// Add the geodata object to the writer
const auto &geo_state = state.Cast<WKBColumnWriterState>();

// Merge this state's geo column data with the writer's geo column data
writer.GetGeoParquetData().FlushColumnMeta(column_name, geo_state.geo_data);
}

private:
string column_name;
ClientContext &context;
};

//===--------------------------------------------------------------------===//
// Enum Column Writer
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -2234,8 +2245,8 @@ unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(ClientContext &cont
schema_path.push_back(name);

if (type.id() == LogicalTypeId::BLOB && type.GetAlias() == "WKB_BLOB") {
return make_uniq<GeometryColumnWriter<StringColumnWriter>>(context, writer, schema_idx, std::move(schema_path),
max_repeat, max_define, can_have_nulls, name);
return make_uniq<WKBColumnWriter>(context, writer, schema_idx, std::move(schema_path), max_repeat, max_define,
can_have_nulls, name);
}

switch (type.id()) {
Expand Down
19 changes: 19 additions & 0 deletions src/duckdb/extension/parquet/geo_parquet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ GeoParquetFileMetadata::TryRead(const duckdb_parquet::format::FileMetaData &file
return nullptr;
}

void GeoParquetFileMetadata::FlushColumnMeta(const string &column_name, const GeoParquetColumnMetadata &meta) {
// Lock the metadata
lock_guard<mutex> glock(write_lock);

auto &column = geometry_columns[column_name];

// Combine the metadata
column.geometry_types.insert(meta.geometry_types.begin(), meta.geometry_types.end());
column.bbox.Combine(meta.bbox);
}

void GeoParquetFileMetadata::Write(duckdb_parquet::format::FileMetaData &file_meta_data) const {

yyjson_mut_doc *doc = yyjson_mut_doc_new(nullptr);
Expand Down Expand Up @@ -349,6 +360,14 @@ bool GeoParquetFileMetadata::IsGeometryColumn(const string &column_name) const {
return geometry_columns.find(column_name) != geometry_columns.end();
}

void GeoParquetFileMetadata::RegisterGeometryColumn(const string &column_name) {
lock_guard<mutex> glock(write_lock);
if (primary_geometry_column.empty()) {
primary_geometry_column = column_name;
}
geometry_columns[column_name] = GeoParquetColumnMetadata();
}

unique_ptr<ColumnReader> GeoParquetFileMetadata::CreateColumnReader(ParquetReader &reader,
const LogicalType &logical_type,
const SchemaElement &s_ele, idx_t schema_idx_p,
Expand Down
16 changes: 10 additions & 6 deletions src/duckdb/extension/parquet/include/geo_parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,29 @@ class GeoParquetColumnMetadataWriter {
void Update(GeoParquetColumnMetadata &meta, Vector &vector, idx_t count);
};

struct GeoParquetFileMetadata {
class GeoParquetFileMetadata {
public:
// Try to read GeoParquet metadata. Returns nullptr if not found, invalid or the required spatial extension is not
// available.
static unique_ptr<GeoParquetFileMetadata> TryRead(const duckdb_parquet::format::FileMetaData &file_meta_data,
ClientContext &context);
void Write(duckdb_parquet::format::FileMetaData &file_meta_data) const;

public:
// Default to 1.1.0 for now
string version = "1.1.0";
string primary_geometry_column;
unordered_map<string, GeoParquetColumnMetadata> geometry_columns;
void FlushColumnMeta(const string &column_name, const GeoParquetColumnMetadata &meta);
const unordered_map<string, GeoParquetColumnMetadata> &GetColumnMeta() const;

unique_ptr<ColumnReader> CreateColumnReader(ParquetReader &reader, const LogicalType &logical_type,
const duckdb_parquet::format::SchemaElement &s_ele, idx_t schema_idx_p,
idx_t max_define_p, idx_t max_repeat_p, ClientContext &context);

bool IsGeometryColumn(const string &column_name) const;
void RegisterGeometryColumn(const string &column_name);

private:
mutex write_lock;
string version = "1.1.0";
string primary_geometry_column;
unordered_map<string, GeoParquetColumnMetadata> geometry_columns;
};

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class TemplatedColumnReader : public ColumnReader {

void Offsets(uint32_t *offsets, uint8_t *defines, uint64_t num_values, parquet_filter_t &filter,
idx_t result_offset, Vector &result) override {
if (!dict) {
throw IOException(
"Parquet file is likely corrupted, cannot have dictionary offsets without seeing a dictionary first.");
if (!dict || dict->len == 0) {
throw IOException("Parquet file is likely corrupted, cannot have dictionary offsets without seeing a "
"non-empty dictionary first.");
}
if (HasDefines()) {
OffsetsInternal<true>(*dict, offsets, defines, num_values, filter, result_offset, result);
Expand Down
Loading
Loading