Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 1414: Adds time_bucket() support for duckdb fdw #32

Merged
merged 38 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0dc02f4
feat: 1414: Adds time_bucket() support for duckdb fdw
devanbenz Aug 3, 2024
61a482b
feat: 1414: Adds time_bucket() support for duckdb fdw
devanbenz Aug 3, 2024
9813b8a
fmt
devanbenz Aug 3, 2024
c4cccf5
clippy linting
devanbenz Aug 3, 2024
9af31d5
adds testing
devanbenz Aug 3, 2024
5ac3e63
adds test for postgres fallback
devanbenz Aug 3, 2024
758dd53
clippy
devanbenz Aug 3, 2024
fe2ec75
fmt
devanbenz Aug 3, 2024
bc8852b
remove unneeded code
devanbenz Aug 3, 2024
8d0b7d5
fmt
devanbenz Aug 3, 2024
606a22c
adding native PG UDF functionality to time_bucket()
devanbenz Aug 5, 2024
f8bb86f
todo
devanbenz Aug 5, 2024
599f83c
fmt
devanbenz Aug 5, 2024
0d552fb
clippy
devanbenz Aug 5, 2024
22a26ae
refactoring calculations for time_bucket() I found edge cases
devanbenz Aug 7, 2024
42b4b78
Adding origin parameter
devanbenz Aug 7, 2024
8a29b62
Adding timestamp functions
devanbenz Aug 7, 2024
90ccb92
fmt
devanbenz Aug 7, 2024
cf155d8
(wip): working on offset
devanbenz Aug 7, 2024
7b7073f
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 8, 2024
7cdd2f6
revert to only push down time_bucket() to FDW duckdb relations
devanbenz Aug 9, 2024
ab11f71
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 9, 2024
67862f6
fmt
devanbenz Aug 9, 2024
5fed985
add errors for methods
devanbenz Aug 9, 2024
b1ce9a9
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 9, 2024
70e9865
typo
devanbenz Aug 9, 2024
686bbd7
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 9, 2024
bdc0cad
move folder and make error const
devanbenz Aug 11, 2024
a6457fc
clippy
devanbenz Aug 11, 2024
ff8e700
clippy
devanbenz Aug 11, 2024
0d7346c
add more tests for expected data
devanbenz Aug 11, 2024
a0910b0
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 11, 2024
02e8c84
fmt tests file
devanbenz Aug 11, 2024
8e61a4e
Merge branch 'feat/time-bucket' of github.com:devanbenz/pg_analytics …
devanbenz Aug 11, 2024
5aab710
rm dead code warning -- should be a fixture for rstest
devanbenz Aug 11, 2024
b0f6477
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 12, 2024
0507691
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 13, 2024
87e1358
Merge branch 'dev' into feat/time-bucket
devanbenz Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# VS Code
.vscode/

# Jetbrains
.idea/

# macOS
.DS_Store

Expand Down Expand Up @@ -31,4 +34,4 @@ TPC-H_V*/
# Tests
regression.*
results/
.env
.env
1 change: 1 addition & 0 deletions src/duckdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ pub mod iceberg;
pub mod parquet;
pub mod secret;
pub mod utils;
pub mod time_bucket;
162 changes: 162 additions & 0 deletions src/duckdb/time_bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use pgrx::iter::TableIterator;
use pgrx::*;
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
use std::fmt::{Display, Formatter};

pub enum TimeBucketInput {
Date(Date),
Timestamp(Timestamp),
}

pub enum TimeBucketOffset {
Interval(Interval),
Date(Date),
}

impl Display for TimeBucketInput {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TimeBucketInput::Date(input) => {
write!(f, "{}::DATE", input.to_string())
}
TimeBucketInput::Timestamp(input) => {
write!(f, "{}", input.to_string())
}
}
}
}

impl Display for TimeBucketOffset {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
TimeBucketOffset::Date(input) => {
write!(f, "DATE {}", input.to_string())
}
TimeBucketOffset::Interval(input) => {
write!(f, "INTERVAL {}", input.to_string())
}
}
}
}

fn create_time_bucket(
bucket_width: Interval,
input: TimeBucketInput,
offset: Option<TimeBucketOffset>,
) -> String {
if let Some(bucket_offset) = offset {
format!(
"SELECT time_bucket(INTERVAL {}, {}, {});",
bucket_width, input, bucket_offset
)
} else {
format!("SELECT time_bucket(INTERVAL {}, {});", bucket_width, input)
}
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_no_offset(
bucket_width: Interval,
input: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Date(input), None);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_date(
bucket_width: Interval,
input: Date,
offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
let bucket_query = create_time_bucket(
bucket_width,
TimeBucketInput::Date(input),
Some(TimeBucketOffset::Date(offset)),
);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_interval(
bucket_width: Interval,
input: Date,
offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
let bucket_query = create_time_bucket(
bucket_width,
TimeBucketInput::Date(input),
Some(TimeBucketOffset::Interval(offset)),
);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp(
bucket_width: Interval,
input: Timestamp,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Timestamp(input), None);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_date(
bucket_width: Interval,
input: Timestamp,
offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
let bucket_query = create_time_bucket(
bucket_width,
TimeBucketInput::Timestamp(input),
Some(TimeBucketOffset::Date(offset)),
);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_interval(
bucket_width: Interval,
input: Timestamp,
offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
let bucket_query = create_time_bucket(
bucket_width,
TimeBucketInput::Timestamp(input),
Some(TimeBucketOffset::Interval(offset)),
);

TableIterator::once((bucket_query
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}
5 changes: 5 additions & 0 deletions src/hooks/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ pub async fn executor_run(
}
});

if !is_duckdb_query && query.contains("time_bucket") {
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.\
If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.")
}

if rtable.is_null()
|| query_desc.operation != pg_sys::CmdType_CMD_SELECT
|| !is_duckdb_query
Expand Down
29 changes: 28 additions & 1 deletion tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ use std::{
io::Read,
path::{Path, PathBuf},
};

use std::sync::Arc;
use anyhow::Result;
use async_std::task::block_on;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use chrono::{DateTime, Duration};
use datafusion::{
arrow::{datatypes::FieldRef, record_batch::RecordBatch},
parquet::arrow::ArrowWriter,
};
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::datatypes::TimeUnit::Millisecond;
use futures::future::{BoxFuture, FutureExt};
use rstest::*;
use serde::Serialize;
Expand Down Expand Up @@ -223,3 +227,26 @@ pub fn tempfile() -> std::fs::File {
pub fn duckdb_conn() -> duckdb::Connection {
duckdb::Connection::open_in_memory().unwrap()
}

pub fn time_series_record_batch() -> Result<RecordBatch> {
// Define the fields for each datatype
let fields = vec![
Field::new("value", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(Millisecond, None), false),
];

let schema = Arc::new(Schema::new(fields));

let start_time = DateTime::from_timestamp(60, 0).unwrap();
let timestamps: Vec<i64> = (0..10)
.map(|i| (start_time + Duration::minutes(i)).timestamp_millis())
.collect();

Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])),
Arc::new(TimestampMillisecondArray::from(timestamps)),
],
)?)
}
79 changes: 79 additions & 0 deletions tests/time_bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod fixtures;

use anyhow::Result;
use chrono::NaiveDateTime;
use datafusion::parquet::arrow::ArrowWriter;
use fixtures::*;
use rstest::*;
use shared::fixtures::arrow::{primitive_setup_fdw_local_file_listing};
use shared::fixtures::tempfile::TempDir;
use sqlx::PgConnection;
use std::fs::File;

#[rstest]
async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = time_series_record_batch()?;
let parquet_path = tempdir.path().join("test_arrow_types.parquet");
let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "MyTable")
.execute(&mut conn);

format!(
"CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')",
parquet_path.to_str().unwrap()
)
.execute(&mut conn);

match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) {
devanbenz marked this conversation as resolved.
Show resolved Hide resolved
Ok(_) => {}
Err(error) => {
panic!(
"should have successfully called time_bucket() for timeseries data: {}",
error
);
}
}

match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) {
Ok(_) => {
panic!(
"should have failed call to time_bucket() for timeseries data with incorrect parameters"
);
}
Err(_) => {}
}

let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '6 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;"
.fetch_result(&mut conn).unwrap();

assert_eq!(2, data.len());

let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;"
.fetch_result(&mut conn).unwrap();

assert_eq!(10, data.len());

Ok(())
}
Loading