Skip to content

Commit

Permalink
Update vendored DuckDB sources to 1ab172e
Browse files Browse the repository at this point in the history
  • Loading branch information
duckdblabs-bot committed Nov 8, 2024
1 parent 1ab172e commit 3ca8544
Show file tree
Hide file tree
Showing 28 changed files with 348 additions and 186 deletions.
6 changes: 4 additions & 2 deletions src/duckdb/src/execution/index/fixed_size_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ void FixedSizeBuffer::Serialize(PartialBlockManager &partial_block_manager, cons
allocation.partial_block = std::move(p_block_for_index);
}

partial_block_manager.RegisterPartialBlock(std::move(allocation));

// resetting this buffer
buffer_handle.Destroy();

// register the partial block
partial_block_manager.RegisterPartialBlock(std::move(allocation));

block_handle = block_manager.RegisterBlock(block_pointer.block_id);
D_ASSERT(block_handle->BlockId() < MAXIMUM_BLOCK);

Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/src/function/table/copy_csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ static void WriteQuotedString(WriteStream &writer, WriteCSVData &csv_data, const
// force quote is disabled: check if we need to add quotes anyway
force_quote = RequiresQuotes(csv_data, str, len);
}
if (force_quote) {
// If a quote is set to none (i.e., null-terminator) we skip the quotation
if (force_quote && options.dialect_options.state_machine_options.quote.GetValue() != '\0') {
// quoting is enabled: we might need to escape things in the string
bool requires_escape = false;
// simple CSV
Expand Down
19 changes: 12 additions & 7 deletions src/duckdb/src/function/table/query_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,39 @@ static unique_ptr<SubqueryRef> ParseSubquery(const string &query, const ParserOp
return duckdb::make_uniq<SubqueryRef>(std::move(select_stmt));
}

static void UnionTablesQuery(TableFunctionBindInput &input, string &query) {
static string UnionTablesQuery(TableFunctionBindInput &input) {
for (auto &input_val : input.inputs) {
if (input_val.IsNull()) {
throw BinderException("Cannot use NULL as function argument");
}
}
string result;
string by_name = (input.inputs.size() == 2 &&
(input.inputs[1].type().id() == LogicalTypeId::BOOLEAN && input.inputs[1].GetValue<bool>()))
? "BY NAME "
: ""; // 'by_name' variable defaults to false
if (input.inputs[0].type().id() == LogicalTypeId::VARCHAR) {
query += "FROM " + KeywordHelper::WriteOptionallyQuoted(input.inputs[0].ToString());
auto from_path = input.inputs[0].ToString();
auto qualified_name = QualifiedName::Parse(from_path);
result += "FROM " + qualified_name.ToString();
} else if (input.inputs[0].type() == LogicalType::LIST(LogicalType::VARCHAR)) {
string union_all_clause = " UNION ALL " + by_name + "FROM ";
const auto &children = ListValue::GetChildren(input.inputs[0]);

if (children.empty()) {
throw InvalidInputException("Input list is empty");
}

query += "FROM " + KeywordHelper::WriteOptionallyQuoted(children[0].ToString());
auto qualified_name = QualifiedName::Parse(children[0].ToString());
result += "FROM " + qualified_name.ToString();
for (size_t i = 1; i < children.size(); ++i) {
auto child = children[i].ToString();
query += union_all_clause + KeywordHelper::WriteOptionallyQuoted(child);
auto qualified_name = QualifiedName::Parse(child);
result += union_all_clause + qualified_name.ToString();
}
} else {
throw InvalidInputException("Expected a table or a list with tables as input");
}
return result;
}

static unique_ptr<TableRef> QueryBindReplace(ClientContext &context, TableFunctionBindInput &input) {
Expand All @@ -52,8 +58,7 @@ static unique_ptr<TableRef> QueryBindReplace(ClientContext &context, TableFuncti
}

static unique_ptr<TableRef> TableBindReplace(ClientContext &context, TableFunctionBindInput &input) {
string query;
UnionTablesQuery(input, query);
auto query = UnionTablesQuery(input);
auto subquery_ref =
ParseSubquery(query, context.GetParserOptions(), "Expected a table or a list with tables as input");
return std::move(subquery_ref);
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/src/function/table/version/pragma_version.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef DUCKDB_PATCH_VERSION
#define DUCKDB_PATCH_VERSION "4-dev1594"
#define DUCKDB_PATCH_VERSION "4-dev1619"
#endif
#ifndef DUCKDB_MINOR_VERSION
#define DUCKDB_MINOR_VERSION 1
Expand All @@ -8,10 +8,10 @@
#define DUCKDB_MAJOR_VERSION 1
#endif
#ifndef DUCKDB_VERSION
#define DUCKDB_VERSION "v1.1.4-dev1594"
#define DUCKDB_VERSION "v1.1.4-dev1619"
#endif
#ifndef DUCKDB_SOURCE_ID
#define DUCKDB_SOURCE_ID "0ccf3c25cc"
#define DUCKDB_SOURCE_ID "2d1b7d796d"
#endif
#include "duckdb/function/table/system_functions.hpp"
#include "duckdb/main/database.hpp"
Expand Down
8 changes: 6 additions & 2 deletions src/duckdb/src/include/duckdb/common/file_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class FileBuffer {
virtual ~FileBuffer();

Allocator &allocator;
//! The type of the buffer
FileBufferType type;
//! The buffer that users can write to
data_ptr_t buffer;
//! The size of the portion that users can write to, this is equivalent to internal_size - BLOCK_HEADER_SIZE
Expand All @@ -47,6 +45,10 @@ class FileBuffer {

void Clear();

FileBufferType GetBufferType() const {
return type;
}

// Same rules as the constructor. We will add room for a header, in additio to
// the requested user bytes. We will then sector-align the result.
void Resize(uint64_t user_size);
Expand All @@ -68,6 +70,8 @@ class FileBuffer {
void Initialize(DebugInitialize info);

protected:
//! The type of the buffer
FileBufferType type;
//! The pointer to the internal buffer that will be read or written, including the buffer header
data_ptr_t internal_buffer;
//! The aligned size as passed to the constructor. This is the size that is read or written to disk.
Expand Down
1 change: 1 addition & 0 deletions src/duckdb/src/include/duckdb/parser/qualified_name.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct QualifiedName {
//! Parse the (optional) schema and a name from a string in the format of e.g. "schema"."table"; if there is no dot
//! the schema will be set to INVALID_SCHEMA
static QualifiedName Parse(const string &input);
string ToString() const;
};

struct QualifiedColumnName {
Expand Down
3 changes: 3 additions & 0 deletions src/duckdb/src/include/duckdb/storage/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace duckdb {
class BlockHandle;
class BufferHandle;
class BufferManager;
class ClientContext;
class DatabaseInstance;
Expand Down Expand Up @@ -86,6 +87,8 @@ class BlockManager {
//! Register a block with the given block id in the base file
shared_ptr<BlockHandle> RegisterBlock(block_id_t block_id);
//! Convert an existing in-memory buffer into a persistent disk-backed block
shared_ptr<BlockHandle> ConvertToPersistent(block_id_t block_id, shared_ptr<BlockHandle> old_block,
BufferHandle old_handle);
shared_ptr<BlockHandle> ConvertToPersistent(block_id_t block_id, shared_ptr<BlockHandle> old_block);

void UnregisterBlock(BlockHandle &block);
Expand Down
101 changes: 74 additions & 27 deletions src/duckdb/src/include/duckdb/storage/buffer/block_handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,9 @@ struct TempBufferPoolReservation : BufferPoolReservation {
}
};

class BlockHandle : public enable_shared_from_this<BlockHandle> {
friend class BlockManager;
friend struct BufferEvictionNode;
friend class BufferHandle;
friend class BufferManager;
friend class StandardBufferManager;
friend class BufferPool;
friend struct EvictionQueue;
using BlockLock = unique_lock<mutex>;

class BlockHandle : public enable_shared_from_this<BlockHandle> {
public:
BlockHandle(BlockManager &block_manager, block_id_t block_id, MemoryTag tag);
BlockHandle(BlockManager &block_manager, block_id_t block_id, MemoryTag tag, unique_ptr<FileBuffer> buffer,
Expand All @@ -73,21 +67,24 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
BlockManager &block_manager;

public:
block_id_t BlockId() {
block_id_t BlockId() const {
return block_id;
}

void ResizeBuffer(idx_t block_size, int64_t memory_delta) {
D_ASSERT(buffer);
// resize and adjust current memory
buffer->Resize(block_size);
memory_usage = NumericCast<idx_t>(NumericCast<int64_t>(memory_usage) + memory_delta);
D_ASSERT(memory_usage == buffer->AllocSize());
idx_t EvictionSequenceNumber() const {
return eviction_seq_num;
}

idx_t NextEvictionSequenceNumber() {
return ++eviction_seq_num;
}

int32_t Readers() const {
return readers;
}
int32_t DecrementReaders() {
return --readers;
}

inline bool IsSwizzled() const {
return !unswizzled;
Expand All @@ -113,27 +110,75 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
return destroy_buffer_upon == DestroyBufferUpon::BLOCK;
}

inline const idx_t &GetMemoryUsage() const {
inline idx_t GetMemoryUsage() const {
return memory_usage;
}

bool IsUnloaded() {
bool IsUnloaded() const {
return state == BlockState::BLOCK_UNLOADED;
}

void SetEvictionQueueIndex(const idx_t index) {
D_ASSERT(!eviction_queue_idx.IsValid()); // Cannot overwrite
D_ASSERT(buffer->type == FileBufferType::MANAGED_BUFFER); // MANAGED_BUFFER only (at least, for now)
// can only be set once
D_ASSERT(eviction_queue_idx == DConstants::INVALID_INDEX);
// MANAGED_BUFFER only (at least, for now)
D_ASSERT(GetBufferType() == FileBufferType::MANAGED_BUFFER);
eviction_queue_idx = index;
}

private:
idx_t GetEvictionQueueIndex() const {
return eviction_queue_idx;
}

FileBufferType GetBufferType() const {
return buffer_type;
}

BlockState GetState() const {
return state;
}

int64_t GetLRUTimestamp() const {
return lru_timestamp_msec;
}

void SetLRUTimestamp(int64_t timestamp_msec) {
lru_timestamp_msec = timestamp_msec;
}

BlockLock GetLock() {
return BlockLock(lock);
}

//! Gets a reference to the buffer - the lock must be held
unique_ptr<FileBuffer> &GetBuffer(BlockLock &l);

void ChangeMemoryUsage(BlockLock &l, int64_t delta);
BufferPoolReservation &GetMemoryCharge(BlockLock &l);
//! Merge a new memory reservation
void MergeMemoryReservation(BlockLock &, BufferPoolReservation reservation);
//! Resize the memory allocation
void ResizeMemory(BlockLock &, idx_t alloc_size);

//! Resize the actual buffer
void ResizeBuffer(BlockLock &, idx_t block_size, int64_t memory_delta);
BufferHandle Load(unique_ptr<FileBuffer> buffer = nullptr);
BufferHandle LoadFromBuffer(data_ptr_t data, unique_ptr<FileBuffer> reusable_buffer);
unique_ptr<FileBuffer> UnloadAndTakeBlock();
void Unload();
bool CanUnload();
BufferHandle LoadFromBuffer(BlockLock &l, data_ptr_t data, unique_ptr<FileBuffer> reusable_buffer,
BufferPoolReservation reservation);
unique_ptr<FileBuffer> UnloadAndTakeBlock(BlockLock &);
void Unload(BlockLock &);

//! Returns whether or not the block can be unloaded
//! Note that while this method does not require a lock, whether or not a block can be unloaded can change if the
//! lock is not held
bool CanUnload() const;

void ConvertToPersistent(BlockLock &, BlockHandle &new_block, unique_ptr<FileBuffer> new_buffer);

private:
void VerifyMutex(unique_lock<mutex> &l) const;

private:
//! The block-level lock
mutex lock;
//! Whether or not the block is loaded/unloaded
Expand All @@ -143,7 +188,9 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
//! The block id of the block
const block_id_t block_id;
//! Memory tag
MemoryTag tag;
const MemoryTag tag;
//! File buffer type
const FileBufferType buffer_type;
//! Pointer to loaded data (if any)
unique_ptr<FileBuffer> buffer;
//! Internal eviction sequence number
Expand All @@ -154,13 +201,13 @@ class BlockHandle : public enable_shared_from_this<BlockHandle> {
atomic<DestroyBufferUpon> destroy_buffer_upon;
//! The memory usage of the block (when loaded). If we are pinning/loading
//! an unloaded block, this tells us how much memory to reserve.
idx_t memory_usage;
atomic<idx_t> memory_usage;
//! Current memory reservation / usage
BufferPoolReservation memory_charge;
//! Does the block contain any memory pointers?
const char *unswizzled;
//! Index for eviction queue (FileBufferType::MANAGED_BUFFER only, for now)
optional_idx eviction_queue_idx;
atomic<idx_t> eviction_queue_idx;
};

} // namespace duckdb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class FileBuffer;
class BufferHandle {
public:
DUCKDB_API BufferHandle();
DUCKDB_API explicit BufferHandle(shared_ptr<BlockHandle> handle);
DUCKDB_API explicit BufferHandle(shared_ptr<BlockHandle> handle, optional_ptr<FileBuffer> node);
DUCKDB_API ~BufferHandle();
// disable copy constructors
BufferHandle(const BufferHandle &other) = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ struct AlpCompressionState : public CompressionState {
// Store the offset to the end of metadata (to be used as a backwards pointer in decoding)
Store<uint32_t>(NumericCast<uint32_t>(total_segment_size), dataptr);

handle.Destroy();
checkpoint_state.FlushSegment(std::move(current_segment), total_segment_size);
checkpoint_state.FlushSegment(std::move(current_segment), std::move(handle), total_segment_size);
data_bytes_used = 0;
vectors_flushed = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ struct AlpRDCompressionState : public CompressionState {
// Store the Dictionary
memcpy((void *)dataptr, (void *)state.left_parts_dict, actual_dictionary_size_bytes);

handle.Destroy();
checkpoint_state.FlushSegment(std::move(current_segment), total_segment_size);
checkpoint_state.FlushSegment(std::move(current_segment), std::move(handle), total_segment_size);
data_bytes_used = 0;
vectors_flushed = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class StandardBufferManager : public BufferManager {

//! When the BlockHandle reaches 0 readers, this creates a new FileBuffer for this BlockHandle and
//! overwrites the data within with garbage. Any readers that do not hold the pin will notice
void VerifyZeroReaders(shared_ptr<BlockHandle> &handle);
void VerifyZeroReaders(BlockLock &l, shared_ptr<BlockHandle> &handle);

void BatchRead(vector<shared_ptr<BlockHandle>> &handles, const map<block_id_t, idx_t> &load_map,
block_id_t first_block, block_id_t last_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ struct ColumnCheckpointState {
public:
virtual unique_ptr<BaseStatistics> GetStatistics();

virtual void FlushSegment(unique_ptr<ColumnSegment> segment, idx_t segment_size);
virtual void FlushSegmentInternal(unique_ptr<ColumnSegment> segment, idx_t segment_size);
virtual void FlushSegment(unique_ptr<ColumnSegment> segment, BufferHandle handle, idx_t segment_size);
virtual PersistentColumnData ToPersistentData();

PartialBlockManager &GetPartialBlockManager() {
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/src/parser/qualified_name.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
#include "duckdb/parser/qualified_name.hpp"
#include "duckdb/parser/parsed_data/parse_info.hpp"

namespace duckdb {

string QualifiedName::ToString() const {
return ParseInfo::QualifierToString(catalog, schema, name);
}

QualifiedName QualifiedName::Parse(const string &input) {
string catalog;
string schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,13 @@ unique_ptr<BoundTableRef> Binder::Bind(TableFunctionRef &ref) {
D_ASSERT(ref.function->type == ExpressionType::FUNCTION);
auto &fexpr = ref.function->Cast<FunctionExpression>();

string catalog = fexpr.catalog;
string schema = fexpr.schema;
Binder::BindSchemaOrCatalog(context, catalog, schema);

// fetch the function from the catalog
auto &func_catalog = *GetCatalogEntry(CatalogType::TABLE_FUNCTION_ENTRY, fexpr.catalog, fexpr.schema,
fexpr.function_name, OnEntryNotFound::THROW_EXCEPTION, error_context);
auto &func_catalog = *GetCatalogEntry(CatalogType::TABLE_FUNCTION_ENTRY, catalog, schema, fexpr.function_name,
OnEntryNotFound::THROW_EXCEPTION, error_context);

if (func_catalog.type == CatalogType::TABLE_MACRO_ENTRY) {
auto &macro_func = func_catalog.Cast<TableMacroCatalogEntry>();
Expand Down
Loading

0 comments on commit 3ca8544

Please sign in to comment.