Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 1414: Adds time_bucket() support for duckdb fdw #32

Merged
merged 38 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0dc02f4
feat: 1414: Adds time_bucket() support for duckdb fdw
devanbenz Aug 3, 2024
61a482b
feat: 1414: Adds time_bucket() support for duckdb fdw
devanbenz Aug 3, 2024
9813b8a
fmt
devanbenz Aug 3, 2024
c4cccf5
clippy linting
devanbenz Aug 3, 2024
9af31d5
adds testing
devanbenz Aug 3, 2024
5ac3e63
adds test for postgres fallback
devanbenz Aug 3, 2024
758dd53
clippy
devanbenz Aug 3, 2024
fe2ec75
fmt
devanbenz Aug 3, 2024
bc8852b
remove unneeded code
devanbenz Aug 3, 2024
8d0b7d5
fmt
devanbenz Aug 3, 2024
606a22c
adding native PG UDF functionality to time_bucket()
devanbenz Aug 5, 2024
f8bb86f
todo
devanbenz Aug 5, 2024
599f83c
fmt
devanbenz Aug 5, 2024
0d552fb
clippy
devanbenz Aug 5, 2024
22a26ae
refactoring calculations for time_bucket() I found edge cases
devanbenz Aug 7, 2024
42b4b78
Adding origin parameter
devanbenz Aug 7, 2024
8a29b62
Adding timestamp functions
devanbenz Aug 7, 2024
90ccb92
fmt
devanbenz Aug 7, 2024
cf155d8
(wip): working on offset
devanbenz Aug 7, 2024
7b7073f
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 8, 2024
7cdd2f6
revert to only push down time_bucket() to FDW duckdb relations
devanbenz Aug 9, 2024
ab11f71
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 9, 2024
67862f6
fmt
devanbenz Aug 9, 2024
5fed985
add errors for methods
devanbenz Aug 9, 2024
b1ce9a9
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 9, 2024
70e9865
typo
devanbenz Aug 9, 2024
686bbd7
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 9, 2024
bdc0cad
move folder and make error const
devanbenz Aug 11, 2024
a6457fc
clippy
devanbenz Aug 11, 2024
ff8e700
clippy
devanbenz Aug 11, 2024
0d7346c
add more tests for expected data
devanbenz Aug 11, 2024
a0910b0
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 11, 2024
02e8c84
fmt tests file
devanbenz Aug 11, 2024
8e61a4e
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 11, 2024
5aab710
rm dead code warning -- should be a fixture for rstest
devanbenz Aug 11, 2024
b0f6477
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 12, 2024
0507691
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 13, 2024
87e1358
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# VS Code
.vscode/

# Jetbrains
.idea/

# macOS
.DS_Store

Expand Down Expand Up @@ -31,4 +34,4 @@ TPC-H_V*/
# Tests
regression.*
results/
.env
.env
1 change: 1 addition & 0 deletions src/duckdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ pub mod delta;
pub mod iceberg;
pub mod parquet;
pub mod secret;
pub mod time_bucket;
pub mod utils;
161 changes: 161 additions & 0 deletions src/duckdb/time_bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use pgrx::iter::TableIterator;
use pgrx::*;
devanbenz marked this conversation as resolved.
Show resolved Hide resolved

const HOURS_PER_MICRO: i64 = 3600000000;
const MINUTES_PER_MICRO: i64 = 60000000;

fn set_date(year: i32, month: u8, day: u8) -> Date {
Date::from(
Timestamp::new(year, month, day, 0, 0, 0f64)
.unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)),
)
}

fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp {
Timestamp::new(year, month, day, hour, minute, 0f64)
.unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))
}

fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 {
let micros_quotient = (micros / divisor) as u8;
if micros_quotient == 0 {
return 0;
}
input % micros_quotient
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_no_offset(bucket_width: Interval, input: Date) -> Date {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() % years;
return set_date(input.year() - delta, input.month(), input.day());
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_date(input.year(), input.month() - delta as u8, input.day());
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_date(input.year(), input.month(), input.day() - delta as u8);
}

Date::from(Timestamp::new(input.year(), input.month(), input.day(), 0, 0, 0f64).unwrap())
}

// TODO: Need to implement offset for pg
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_date(
_bucket_width: Interval,
_input: Date,
_offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

// TODO: Need to implement offset for pg
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_interval(
_bucket_width: Interval,
_input: Date,
_offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() % years;
return set_timestamp(
input.year() - delta,
input.month(),
input.day(),
input.hour(),
input.minute(),
);
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_timestamp(
input.year(),
input.month() - delta as u8,
input.day(),
input.hour(),
input.minute(),
);
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_timestamp(
input.year(),
input.month(),
input.day() - delta as u8,
input.hour(),
input.minute(),
);
} else if bucket_width.micros() != 0 {
let hours_delta = get_micros_delta(bucket_width.micros(), input.hour(), HOURS_PER_MICRO);
let minutes_delta =
get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO);
return set_timestamp(
input.year(),
input.month(),
input.day(),
input.hour() - hours_delta,
input.minute() - minutes_delta,
);
}

Timestamp::new(
input.year(),
input.month(),
input.day(),
input.hour(),
input.minute(),
0f64,
)
.unwrap()
}

// TODO: Need to implement offset for pg
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_date(
_bucket_width: Interval,
_input: Timestamp,
_offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

// TODO: Need to implement offset for pg
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_interval(
_bucket_width: Interval,
_input: Timestamp,
_offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}
64 changes: 58 additions & 6 deletions tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::{
fs::{self, File},
io::Read,
path::{Path, PathBuf},
};

use anyhow::Result;
use async_std::task::block_on;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use chrono::{DateTime, Duration};
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit::Millisecond;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
arrow::{datatypes::FieldRef, record_batch::RecordBatch},
parquet::arrow::ArrowWriter,
Expand All @@ -35,6 +33,12 @@ use serde::Serialize;
use serde_arrow::schema::{SchemaLike, TracingOptions};
use shared::fixtures::tempfile::TempDir;
use sqlx::PgConnection;
use std::sync::Arc;
use std::{
fs::{self, File},
io::Read,
path::{Path, PathBuf},
};
use testcontainers::ContainerAsync;
use testcontainers_modules::{
localstack::LocalStack,
Expand Down Expand Up @@ -223,3 +227,51 @@ pub fn tempfile() -> std::fs::File {
pub fn duckdb_conn() -> duckdb::Connection {
duckdb::Connection::open_in_memory().unwrap()
}

#[allow(dead_code)]
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
pub fn time_series_record_batch_minutes() -> Result<RecordBatch> {
// Define the fields for each datatype
let fields = vec![
Field::new("value", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(Millisecond, None), false),
];

let schema = Arc::new(Schema::new(fields));

let start_time = DateTime::from_timestamp(60, 0).unwrap();
let timestamps: Vec<i64> = (0..10)
.map(|i| (start_time + Duration::minutes(i)).timestamp_millis())
.collect();

Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])),
Arc::new(TimestampMillisecondArray::from(timestamps)),
],
)?)
}

#[allow(dead_code)]
pub fn time_series_record_batch_years() -> Result<RecordBatch> {
// Define the fields for each datatype
let fields = vec![
Field::new("value", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(Millisecond, None), false),
];

let schema = Arc::new(Schema::new(fields));

let start_time = DateTime::from_timestamp(60, 0).unwrap();
let timestamps: Vec<i64> = (0..10)
.map(|i| (start_time + Duration::days(i * 366)).timestamp_millis())
.collect();

Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])),
Arc::new(TimestampMillisecondArray::from(timestamps)),
],
)?)
}
Loading