From 0dc02f429442a19f113841e6335e79cb13a624eb Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 08:37:36 -0500 Subject: [PATCH 01/29] feat: 1414: Adds time_bucket() support for duckdb fdw Signed-off-by: Devan --- .idea/.gitignore | 5 ++ .idea/git_toolbox_blame.xml | 6 ++ .idea/modules.xml | 8 ++ .idea/pg_analytics.iml | 12 +++ .idea/vcs.xml | 6 ++ src/duckdb/mod.rs | 1 + src/duckdb/time_bucket.rs | 162 ++++++++++++++++++++++++++++++++++++ src/hooks/executor.rs | 5 ++ tests/fixtures/mod.rs | 29 ++++++- tests/time_bucket.rs | 79 ++++++++++++++++++ 10 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/git_toolbox_blame.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/pg_analytics.iml create mode 100644 .idea/vcs.xml create mode 100644 src/duckdb/time_bucket.rs create mode 100644 tests/time_bucket.rs diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..b58b603f --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ diff --git a/.idea/git_toolbox_blame.xml b/.idea/git_toolbox_blame.xml new file mode 100644 index 00000000..7dc12496 --- /dev/null +++ b/.idea/git_toolbox_blame.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..cada3ffe --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/pg_analytics.iml b/.idea/pg_analytics.iml new file mode 100644 index 00000000..bbe0a70f --- /dev/null +++ b/.idea/pg_analytics.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/src/duckdb/mod.rs b/src/duckdb/mod.rs index 11b69efc..24ff0fcf 100644 --- a/src/duckdb/mod.rs +++ b/src/duckdb/mod.rs @@ -22,3 +22,4 @@ pub mod iceberg; pub mod parquet; pub mod secret; pub mod utils; +pub mod time_bucket; diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs new file mode 100644 index 00000000..2c3d65fd --- /dev/null +++ b/src/duckdb/time_bucket.rs @@ -0,0 +1,162 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +use pgrx::iter::TableIterator; +use pgrx::*; +use std::fmt::{Display, Formatter}; + +pub enum TimeBucketInput { + Date(Date), + Timestamp(Timestamp), +} + +pub enum TimeBucketOffset { + Interval(Interval), + Date(Date), +} + +impl Display for TimeBucketInput { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TimeBucketInput::Date(input) => { + write!(f, "{}::DATE", input.to_string()) + } + TimeBucketInput::Timestamp(input) => { + write!(f, "{}", input.to_string()) + } + } + } +} + +impl Display for TimeBucketOffset { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TimeBucketOffset::Date(input) => { + write!(f, "DATE {}", input.to_string()) + } + TimeBucketOffset::Interval(input) => { + write!(f, "INTERVAL {}", input.to_string()) + } + } + } +} + +fn create_time_bucket( + bucket_width: Interval, + input: TimeBucketInput, + offset: Option, +) -> String { + if let Some(bucket_offset) = offset { + format!( + "SELECT time_bucket(INTERVAL {}, {}, {});", + bucket_width, input, bucket_offset + ) + } else { + format!("SELECT time_bucket(INTERVAL {}, {});", bucket_width, input) + } +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date_no_offset( + bucket_width: Interval, + input: Date, +) -> TableIterator<'static, (name!(time_bucket, Date),)> { + let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Date(input), None); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date_offset_date( + bucket_width: Interval, + input: Date, + offset: Date, +) -> TableIterator<'static, (name!(time_bucket, Date),)> { + let bucket_query = create_time_bucket( + bucket_width, + TimeBucketInput::Date(input), + Some(TimeBucketOffset::Date(offset)), + ); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date_offset_interval( + bucket_width: Interval, + input: Date, + offset: Interval, +) -> TableIterator<'static, (name!(time_bucket, Date),)> { + let bucket_query = create_time_bucket( + bucket_width, + TimeBucketInput::Date(input), + Some(TimeBucketOffset::Interval(offset)), + ); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp( + bucket_width: Interval, + input: Timestamp, +) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { + let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Timestamp(input), None); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp_offset_date( + bucket_width: Interval, + input: Timestamp, + offset: Date, +) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { + let bucket_query = create_time_bucket( + bucket_width, + TimeBucketInput::Timestamp(input), + Some(TimeBucketOffset::Date(offset)), + ); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp_offset_interval( + bucket_width: Interval, + input: Timestamp, + offset: Interval, +) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { + let bucket_query = create_time_bucket( + bucket_width, + TimeBucketInput::Timestamp(input), + Some(TimeBucketOffset::Interval(offset)), + ); + + TableIterator::once((bucket_query + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +} diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index 5ff20cb9..ac891c55 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -60,6 +60,11 @@ pub async fn executor_run( } }); + if !is_duckdb_query && query.contains("time_bucket") { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.\ + If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.") + } + if rtable.is_null() || query_desc.operation != pg_sys::CmdType_CMD_SELECT || !is_duckdb_query diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index c3cc2b3c..acbea211 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -20,15 +20,19 @@ use std::{ io::Read, path::{Path, PathBuf}, }; - +use std::sync::Arc; use anyhow::Result; use async_std::task::block_on; use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; +use chrono::{DateTime, Duration}; use datafusion::{ arrow::{datatypes::FieldRef, record_batch::RecordBatch}, parquet::arrow::ArrowWriter, }; +use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::datatypes::TimeUnit::Millisecond; use futures::future::{BoxFuture, FutureExt}; use rstest::*; use serde::Serialize; @@ -223,3 +227,26 @@ pub fn tempfile() -> std::fs::File { pub fn duckdb_conn() -> duckdb::Connection { duckdb::Connection::open_in_memory().unwrap() } + +pub fn time_series_record_batch() -> Result { + // Define the fields for each datatype + let fields = vec![ + Field::new("value", DataType::Int32, false), + Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), + ]; + + let schema = Arc::new(Schema::new(fields)); + + let start_time = DateTime::from_timestamp(60, 0).unwrap(); + let timestamps: Vec = (0..10) + .map(|i| (start_time + Duration::minutes(i)).timestamp_millis()) + .collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])), + Arc::new(TimestampMillisecondArray::from(timestamps)), + ], + )?) +} diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs new file mode 100644 index 00000000..922f9b67 --- /dev/null +++ b/tests/time_bucket.rs @@ -0,0 +1,79 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod fixtures; + +use anyhow::Result; +use chrono::NaiveDateTime; +use datafusion::parquet::arrow::ArrowWriter; +use fixtures::*; +use rstest::*; +use shared::fixtures::arrow::{primitive_setup_fdw_local_file_listing}; +use shared::fixtures::tempfile::TempDir; +use sqlx::PgConnection; +use std::fs::File; + +#[rstest] +async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = time_series_record_batch()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "MyTable") + .execute(&mut conn); + + format!( + "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", + parquet_path.to_str().unwrap() + ) + .execute(&mut conn); + + match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => {} + Err(error) => { + panic!( + "should have successfully called time_bucket() for timeseries data: {}", + error + ); + } + } + + match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!( + "should have failed call to time_bucket() for timeseries data with incorrect parameters" + ); + } + Err(_) => {} + } + + let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '6 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(2, data.len()); + + let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(10, data.len()); + + Ok(()) +} From 61a482b4f99db126f83855d1ad9d8904bf27654e Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 08:38:12 -0500 Subject: [PATCH 02/29] feat: 1414: Adds time_bucket() support for duckdb fdw Signed-off-by: Devan --- .gitignore | 5 ++++- .idea/.gitignore | 5 ----- .idea/git_toolbox_blame.xml | 6 ------ .idea/modules.xml | 8 -------- .idea/pg_analytics.iml | 12 ------------ .idea/vcs.xml | 6 ------ 6 files changed, 4 insertions(+), 38 deletions(-) delete mode 100644 .idea/.gitignore delete mode 100644 .idea/git_toolbox_blame.xml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/pg_analytics.iml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index c5116260..451a853a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # VS Code .vscode/ +# Jetbrains +.idea/ + # macOS .DS_Store @@ -31,4 +34,4 @@ TPC-H_V*/ # Tests regression.* results/ -.env \ No newline at end of file +.env diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index b58b603f..00000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ diff --git a/.idea/git_toolbox_blame.xml b/.idea/git_toolbox_blame.xml deleted file mode 100644 index 7dc12496..00000000 --- a/.idea/git_toolbox_blame.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index cada3ffe..00000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/pg_analytics.iml b/.idea/pg_analytics.iml deleted file mode 100644 index bbe0a70f..00000000 --- a/.idea/pg_analytics.iml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1ddf..00000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From 9813b8a185ef53d228d6506504b05f13d536e9e2 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 08:45:07 -0500 Subject: [PATCH 03/29] fmt Signed-off-by: Devan --- src/duckdb/mod.rs | 2 +- src/duckdb/time_bucket.rs | 24 ++++++++++++------------ tests/fixtures/mod.rs | 18 +++++++++--------- tests/time_bucket.rs | 4 ++-- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/duckdb/mod.rs b/src/duckdb/mod.rs index 24ff0fcf..b06e84d5 100644 --- a/src/duckdb/mod.rs +++ b/src/duckdb/mod.rs @@ -21,5 +21,5 @@ pub mod delta; pub mod iceberg; pub mod parquet; pub mod secret; -pub mod utils; pub mod time_bucket; +pub mod utils; diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 2c3d65fd..583189dc 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -77,8 +77,8 @@ pub fn time_bucket_date_no_offset( let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Date(input), None); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] @@ -94,8 +94,8 @@ pub fn time_bucket_date_offset_date( ); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] @@ -111,8 +111,8 @@ pub fn time_bucket_date_offset_interval( ); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] @@ -123,8 +123,8 @@ pub fn time_bucket_timestamp( let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Timestamp(input), None); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] @@ -140,8 +140,8 @@ pub fn time_bucket_timestamp_offset_date( ); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] @@ -157,6 +157,6 @@ pub fn time_bucket_timestamp_offset_interval( ); TableIterator::once((bucket_query - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index acbea211..c510dfad 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -15,30 +15,30 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{ - fs::{self, File}, - io::Read, - path::{Path, PathBuf}, -}; -use std::sync::Arc; use anyhow::Result; use async_std::task::block_on; use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; use chrono::{DateTime, Duration}; +use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit::Millisecond; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::{ arrow::{datatypes::FieldRef, record_batch::RecordBatch}, parquet::arrow::ArrowWriter, }; -use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray}; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::arrow::datatypes::TimeUnit::Millisecond; use futures::future::{BoxFuture, FutureExt}; use rstest::*; use serde::Serialize; use serde_arrow::schema::{SchemaLike, TracingOptions}; use shared::fixtures::tempfile::TempDir; use sqlx::PgConnection; +use std::sync::Arc; +use std::{ + fs::{self, File}, + io::Read, + path::{Path, PathBuf}, +}; use testcontainers::ContainerAsync; use testcontainers_modules::{ localstack::LocalStack, diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 922f9b67..bae34c4f 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -22,7 +22,7 @@ use chrono::NaiveDateTime; use datafusion::parquet::arrow::ArrowWriter; use fixtures::*; use rstest::*; -use shared::fixtures::arrow::{primitive_setup_fdw_local_file_listing}; +use shared::fixtures::arrow::primitive_setup_fdw_local_file_listing; use shared::fixtures::tempfile::TempDir; use sqlx::PgConnection; use std::fs::File; @@ -44,7 +44,7 @@ async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<() "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", parquet_path.to_str().unwrap() ) - .execute(&mut conn); + .execute(&mut conn); match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => {} From c4cccf56e592be35b67b9b901c9fb94cda55d906 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 09:13:21 -0500 Subject: [PATCH 04/29] clippy linting Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 8 ++++---- tests/fixtures/mod.rs | 1 + tests/time_bucket.rs | 8 ++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 583189dc..072c6845 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -32,10 +32,10 @@ impl Display for TimeBucketInput { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { TimeBucketInput::Date(input) => { - write!(f, "{}::DATE", input.to_string()) + write!(f, "{}::DATE", input) } TimeBucketInput::Timestamp(input) => { - write!(f, "{}", input.to_string()) + write!(f, "{}", input) } } } @@ -45,10 +45,10 @@ impl Display for TimeBucketOffset { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { TimeBucketOffset::Date(input) => { - write!(f, "DATE {}", input.to_string()) + write!(f, "DATE {}", input) } TimeBucketOffset::Interval(input) => { - write!(f, "INTERVAL {}", input.to_string()) + write!(f, "INTERVAL {}", input) } } } diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index c510dfad..f93bb321 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -228,6 +228,7 @@ pub fn duckdb_conn() -> duckdb::Connection { duckdb::Connection::open_in_memory().unwrap() } +#[allow(dead_code)] pub fn time_series_record_batch() -> Result { // Define the fields for each datatype let fields = vec![ diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index bae34c4f..0155edd2 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -26,6 +26,7 @@ use shared::fixtures::arrow::primitive_setup_fdw_local_file_listing; use shared::fixtures::tempfile::TempDir; use sqlx::PgConnection; use std::fs::File; +use time::Date; #[rstest] async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { @@ -46,6 +47,7 @@ async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<() ) .execute(&mut conn); + #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => {} Err(error) => { @@ -56,6 +58,7 @@ async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<() } } + #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { panic!( @@ -75,5 +78,10 @@ async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<() assert_eq!(10, data.len()); + let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(1, data.len()); + Ok(()) } From 9af31d5ba8552fa3056b289097ef07e7f1e64d85 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 10:06:28 -0500 Subject: [PATCH 05/29] adds testing Signed-off-by: Devan --- tests/fixtures/mod.rs | 26 ++++++++++++++++- tests/time_bucket.rs | 66 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index f93bb321..5bd4a2ba 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -229,7 +229,7 @@ pub fn duckdb_conn() -> duckdb::Connection { } #[allow(dead_code)] -pub fn time_series_record_batch() -> Result { +pub fn time_series_record_batch_minutes() -> Result { // Define the fields for each datatype let fields = vec![ Field::new("value", DataType::Int32, false), @@ -251,3 +251,27 @@ pub fn time_series_record_batch() -> Result { ], )?) } + +#[allow(dead_code)] +pub fn time_series_record_batch_years() -> Result { + // Define the fields for each datatype + let fields = vec![ + Field::new("value", DataType::Int32, false), + Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), + ]; + + let schema = Arc::new(Schema::new(fields)); + + let start_time = DateTime::from_timestamp(60, 0).unwrap(); + let timestamps: Vec = (0..10) + .map(|i| (start_time + Duration::days(i * 366)).timestamp_millis()) + .collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])), + Arc::new(TimestampMillisecondArray::from(timestamps)), + ], + )?) +} diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 0155edd2..397ef163 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -29,8 +29,8 @@ use std::fs::File; use time::Date; #[rstest] -async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { - let stored_batch = time_series_record_batch()?; +async fn test_time_bucket_minutes(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = time_series_record_batch_minutes()?; let parquet_path = tempdir.path().join("test_arrow_types.parquet"); let parquet_file = File::create(&parquet_path)?; @@ -78,10 +78,70 @@ async fn test_time_bucket(mut conn: PgConnection, tempdir: TempDir) -> Result<() assert_eq!(10, data.len()); + + let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(10, data.len()); + + Ok(()) +} + + +#[rstest] +async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = time_series_record_batch_years()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "MyTable") + .execute(&mut conn); + + format!( + "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", + parquet_path.to_str().unwrap() + ) + .execute(&mut conn); + + #[allow(clippy::single_match)] + match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => {} + Err(error) => { + panic!( + "should have successfully called time_bucket() for timeseries data: {}", + error + ); + } + } + + #[allow(clippy::single_match)] + match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!( + "should have failed call to time_bucket() for timeseries data with incorrect parameters" + ); + } + Err(_) => {} + } + let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); - assert_eq!(1, data.len()); + assert_eq!(10, data.len()); + + + let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '5 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(2, data.len()); + + let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '2 YEAR', timestamp::DATE, DATE '1980-01-01') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + assert_eq!(5, data.len()); Ok(()) } From 5ac3e6319744b5b170b284feaea1f1477c3421b4 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 10:18:44 -0500 Subject: [PATCH 06/29] adds test for postgres fallback Signed-off-by: Devan --- tests/time_bucket.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 397ef163..4d3660ac 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -145,3 +145,23 @@ async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Res assert_eq!(5, data.len()); Ok(()) } + +#[rstest] +async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { + let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; + let trips_table = NycTripsTable::setup(); + trips_table.execute(&mut conn); + + #[allow(clippy::single_match)] + match "SELECT time_bucket(INTERVAL '2 DAY', tpep_pickup_datetime::DATE) AS bucket, AVG(trip_distance) as avg_value FROM nyc_trips GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!("Should have error'ed when calling time_bucket() on non-FDW data.") + } + Err(error) => { + let a = error.to_string().contains(error_message); + assert_eq!(true, a); + } + } + + Ok(()) +} From 758dd535d61853d77a132d4f842105cfa28cdd15 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 10:22:14 -0500 Subject: [PATCH 07/29] clippy Signed-off-by: Devan --- tests/time_bucket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 4d3660ac..760b68ba 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -159,7 +159,7 @@ async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { } Err(error) => { let a = error.to_string().contains(error_message); - assert_eq!(true, a); + assert!(a); } } From fe2ec75fe45b8a2ab0235fe631a375e22489b5af Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 10:26:49 -0500 Subject: [PATCH 08/29] fmt Signed-off-by: Devan --- tests/time_bucket.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 760b68ba..ad0b5f62 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -78,7 +78,6 @@ async fn test_time_bucket_minutes(mut conn: PgConnection, tempdir: TempDir) -> R assert_eq!(10, data.len()); - let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); @@ -87,7 +86,6 @@ async fn test_time_bucket_minutes(mut conn: PgConnection, tempdir: TempDir) -> R Ok(()) } - #[rstest] async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { let stored_batch = time_series_record_batch_years()?; @@ -105,7 +103,7 @@ async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Res "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", parquet_path.to_str().unwrap() ) - .execute(&mut conn); + .execute(&mut conn); #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { @@ -133,7 +131,6 @@ async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Res assert_eq!(10, data.len()); - let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '5 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); From bc8852b9f5d3ead5961080d8f36ff24b7c141e76 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 15:24:26 -0500 Subject: [PATCH 09/29] remove unneeded code Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 127 +++++++------------------------------- 1 file changed, 24 insertions(+), 103 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 072c6845..f587d2f8 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -14,149 +14,70 @@ // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use pgrx::iter::TableIterator; -use pgrx::*; -use std::fmt::{Display, Formatter}; - -pub enum TimeBucketInput { - Date(Date), - Timestamp(Timestamp), -} - -pub enum TimeBucketOffset { - Interval(Interval), - Date(Date), -} - -impl Display for TimeBucketInput { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TimeBucketInput::Date(input) => { - write!(f, "{}::DATE", input) - } - TimeBucketInput::Timestamp(input) => { - write!(f, "{}", input) - } - } - } -} - -impl Display for TimeBucketOffset { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TimeBucketOffset::Date(input) => { - write!(f, "DATE {}", input) - } - TimeBucketOffset::Interval(input) => { - write!(f, "INTERVAL {}", input) - } - } - } -} -fn create_time_bucket( - bucket_width: Interval, - input: TimeBucketInput, - offset: Option, -) -> String { - if let Some(bucket_offset) = offset { - format!( - "SELECT time_bucket(INTERVAL {}, {}, {});", - bucket_width, input, bucket_offset - ) - } else { - format!("SELECT time_bucket(INTERVAL {}, {});", bucket_width, input) - } -} +use pgrx::*; +use pgrx::iter::TableIterator; #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_no_offset( - bucket_width: Interval, - input: Date, + _bucket_width: Interval, + _input: Date, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { - let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Date(input), None); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_date( - bucket_width: Interval, - input: Date, - offset: Date, + _bucket_width: Interval, + _input: Date, + _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { - let bucket_query = create_time_bucket( - bucket_width, - TimeBucketInput::Date(input), - Some(TimeBucketOffset::Date(offset)), - ); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_interval( - bucket_width: Interval, - input: Date, - offset: Interval, + _bucket_width: Interval, + _input: Date, + _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { - let bucket_query = create_time_bucket( - bucket_width, - TimeBucketInput::Date(input), - Some(TimeBucketOffset::Interval(offset)), - ); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp( - bucket_width: Interval, - input: Timestamp, + _bucket_width: Interval, + _input: Timestamp, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - let bucket_query = create_time_bucket(bucket_width, TimeBucketInput::Timestamp(input), None); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( - bucket_width: Interval, - input: Timestamp, - offset: Date, + _bucket_width: Interval, + _input: Timestamp, + _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - let bucket_query = create_time_bucket( - bucket_width, - TimeBucketInput::Timestamp(input), - Some(TimeBucketOffset::Date(offset)), - ); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( - bucket_width: Interval, - input: Timestamp, - offset: Interval, + _bucket_width: Interval, + _input: Timestamp, + _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - let bucket_query = create_time_bucket( - bucket_width, - TimeBucketInput::Timestamp(input), - Some(TimeBucketOffset::Interval(offset)), - ); - - TableIterator::once((bucket_query + TableIterator::once(("" .parse() .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } From 8d0b7d5f32b20528aaf6230197884993e2f26b32 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 3 Aug 2024 15:41:17 -0500 Subject: [PATCH 10/29] fmt Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index f587d2f8..7456fc1f 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -15,8 +15,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use pgrx::*; use pgrx::iter::TableIterator; +use pgrx::*; #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_no_offset( From 606a22c40e442d7b16fd4eba5ce83cc4975a990b Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 5 Aug 2024 08:13:41 -0500 Subject: [PATCH 11/29] adding native PG UDF functionality to time_bucket() Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 123 +++++++++++++++++++++++++++++++------- src/hooks/executor.rs | 5 -- 2 files changed, 103 insertions(+), 25 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 7456fc1f..a83a4272 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -18,16 +18,75 @@ use pgrx::iter::TableIterator; use pgrx::*; +const HOURS_PER_MICRO: i64 = 3600000000; +const MINUTES_PER_MICRO: i64 = 60000000; + +fn set_date(year: i32, month: u8, day: u8) -> Date { + return Date::from(Timestamp::new( + year, + month, + day, + 0, + 0, + 0f64, + ).unwrap_or_else(|error| panic!("There was an error in date creation: {}", error))) +} + +fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp { + return Timestamp::new( + year, + month, + day, + hour, + minute, + 0f64, + ).unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))) +} + +fn get_hours_delta(micros: i64, input: u8) -> u8 { + let micros_to_hours = (micros / HOURS_PER_MICRO) as u8; + if micros_to_hours == 0 { + return 0; + } + input % micros_to_hours +} + +fn get_minutes_delta(micros: i64, input: u8) -> u8 { + let micros_to_minutes = (micros / MINUTES_PER_MICRO) as u8; + if micros_to_minutes == 0 { + return 0; + } + input % micros_to_minutes +} + #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_no_offset( - _bucket_width: Interval, - _input: Date, -) -> TableIterator<'static, (name!(time_bucket, Date),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + bucket_width: Interval, + input: Date, +) -> Date { + let years = bucket_width.months() / 12; + if years != 0 { + let delta = input.year() as i32 % years; + return set_date(input.year() - delta, input.month(), input.day()) + } else if bucket_width.months() != 0 { + let delta = input.month() as i32 % bucket_width.months(); + return set_date(input.year(), input.month() - delta as u8, input.day()) + } else if bucket_width.days() != 0 { + let delta = input.day() as i32 % bucket_width.days(); + return set_date(input.year(), input.month(), input.day() - delta as u8) + } + + Date::from(Timestamp::new( + input.year(), + input.month(), + input.day(), + 0, + 0, + 0f64, + ).unwrap()) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_date( _bucket_width: Interval, @@ -35,10 +94,11 @@ pub fn time_bucket_date_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_interval( _bucket_width: Interval, @@ -46,20 +106,42 @@ pub fn time_bucket_date_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp( - _bucket_width: Interval, - _input: Timestamp, -) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + bucket_width: Interval, + input: Timestamp, +) -> Timestamp { + let years = bucket_width.months() / 12; + if years != 0 { + let delta = input.year() as i32 % years; + return set_timestamp(input.year() - delta, input.month(), input.day(), input.hour(), input.minute()) + } else if bucket_width.months() != 0 { + let delta = input.month() as i32 % bucket_width.months(); + return set_timestamp(input.year(), input.month() - delta as u8, input.day(), input.hour(), input.minute()) + } else if bucket_width.days() != 0 { + let delta = input.day() as i32 % bucket_width.days(); + return set_timestamp(input.year(), input.month(), input.day() - delta as u8, input.hour(), input.minute()) + } else if bucket_width.micros() != 0 { + let hours_delta = get_hours_delta(bucket_width.micros(), input.hour()); + let minutes_delta = get_minutes_delta(bucket_width.micros(), input.minute()); + return set_timestamp(input.year(), input.month(), input.day(), input.hour() - hours_delta, input.minute() - minutes_delta) + } + + Timestamp::new( + input.year(), + input.month(), + input.day(), + input.hour(), + input.minute(), + 0f64, + ).unwrap() } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( _bucket_width: Interval, @@ -67,10 +149,11 @@ pub fn time_bucket_timestamp_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( _bucket_width: Interval, @@ -78,6 +161,6 @@ pub fn time_bucket_timestamp_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index ac891c55..5ff20cb9 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -60,11 +60,6 @@ pub async fn executor_run( } }); - if !is_duckdb_query && query.contains("time_bucket") { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.\ - If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.") - } - if rtable.is_null() || query_desc.operation != pg_sys::CmdType_CMD_SELECT || !is_duckdb_query From f8bb86f41fb2d468d349b7abee438b0bd0f35ad0 Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 5 Aug 2024 08:55:00 -0500 Subject: [PATCH 12/29] todo Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index a83a4272..b4c881f6 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -43,20 +43,12 @@ fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timesta ).unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))) } -fn get_hours_delta(micros: i64, input: u8) -> u8 { - let micros_to_hours = (micros / HOURS_PER_MICRO) as u8; - if micros_to_hours == 0 { +fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { + let micros_quotient = (micros / divisor) as u8; + if micros_quotient == 0 { return 0; } - input % micros_to_hours -} - -fn get_minutes_delta(micros: i64, input: u8) -> u8 { - let micros_to_minutes = (micros / MINUTES_PER_MICRO) as u8; - if micros_to_minutes == 0 { - return 0; - } - input % micros_to_minutes + input % micros_quotient } #[pg_extern(name = "time_bucket")] @@ -86,7 +78,7 @@ pub fn time_bucket_date_no_offset( ).unwrap()) } -// TODO: Need to implement offset +// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_date( _bucket_width: Interval, @@ -98,7 +90,7 @@ pub fn time_bucket_date_offset_date( .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } -// TODO: Need to implement offset +// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_interval( _bucket_width: Interval, @@ -126,8 +118,8 @@ pub fn time_bucket_timestamp( let delta = input.day() as i32 % bucket_width.days(); return set_timestamp(input.year(), input.month(), input.day() - delta as u8, input.hour(), input.minute()) } else if bucket_width.micros() != 0 { - let hours_delta = get_hours_delta(bucket_width.micros(), input.hour()); - let minutes_delta = get_minutes_delta(bucket_width.micros(), input.minute()); + let hours_delta = get_micros_delta(bucket_width.micros(), input.hour(), HOURS_PER_MICRO); + let minutes_delta = get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO); return set_timestamp(input.year(), input.month(), input.day(), input.hour() - hours_delta, input.minute() - minutes_delta) } @@ -141,7 +133,7 @@ pub fn time_bucket_timestamp( ).unwrap() } -// TODO: Need to implement offset +// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( _bucket_width: Interval, @@ -153,7 +145,7 @@ pub fn time_bucket_timestamp_offset_date( .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } -// TODO: Need to implement offset +// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( _bucket_width: Interval, From 599f83cfe43d65d1307d22bb2668a5e209d3f8d4 Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 5 Aug 2024 11:55:12 -0500 Subject: [PATCH 13/29] fmt Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 101 ++++++++++++++++++++------------------ tests/time_bucket.rs | 9 ++-- 2 files changed, 55 insertions(+), 55 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index b4c881f6..32bf6f3f 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -22,25 +22,15 @@ const HOURS_PER_MICRO: i64 = 3600000000; const MINUTES_PER_MICRO: i64 = 60000000; fn set_date(year: i32, month: u8, day: u8) -> Date { - return Date::from(Timestamp::new( - year, - month, - day, - 0, - 0, - 0f64, - ).unwrap_or_else(|error| panic!("There was an error in date creation: {}", error))) + return Date::from( + Timestamp::new(year, month, day, 0, 0, 0f64) + .unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)), + ); } fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp { - return Timestamp::new( - year, - month, - day, - hour, - minute, - 0f64, - ).unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))) + return Timestamp::new(year, month, day, hour, minute, 0f64) + .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)); } fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { @@ -52,30 +42,20 @@ fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_no_offset( - bucket_width: Interval, - input: Date, -) -> Date { +pub fn time_bucket_date_no_offset(bucket_width: Interval, input: Date) -> Date { let years = bucket_width.months() / 12; if years != 0 { let delta = input.year() as i32 % years; - return set_date(input.year() - delta, input.month(), input.day()) + return set_date(input.year() - delta, input.month(), input.day()); } else if bucket_width.months() != 0 { let delta = input.month() as i32 % bucket_width.months(); - return set_date(input.year(), input.month() - delta as u8, input.day()) + return set_date(input.year(), input.month() - delta as u8, input.day()); } else if bucket_width.days() != 0 { let delta = input.day() as i32 % bucket_width.days(); - return set_date(input.year(), input.month(), input.day() - delta as u8) + return set_date(input.year(), input.month(), input.day() - delta as u8); } - Date::from(Timestamp::new( - input.year(), - input.month(), - input.day(), - 0, - 0, - 0f64, - ).unwrap()) + Date::from(Timestamp::new(input.year(), input.month(), input.day(), 0, 0, 0f64).unwrap()) } // TODO: Need to implement offset for pg @@ -86,8 +66,8 @@ pub fn time_bucket_date_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } // TODO: Need to implement offset for pg @@ -98,29 +78,51 @@ pub fn time_bucket_date_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_timestamp( - bucket_width: Interval, - input: Timestamp, -) -> Timestamp { +pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp { let years = bucket_width.months() / 12; if years != 0 { let delta = input.year() as i32 % years; - return set_timestamp(input.year() - delta, input.month(), input.day(), input.hour(), input.minute()) + return set_timestamp( + input.year() - delta, + input.month(), + input.day(), + input.hour(), + input.minute(), + ); } else if bucket_width.months() != 0 { let delta = input.month() as i32 % bucket_width.months(); - return set_timestamp(input.year(), input.month() - delta as u8, input.day(), input.hour(), input.minute()) + return set_timestamp( + input.year(), + input.month() - delta as u8, + input.day(), + input.hour(), + input.minute(), + ); } else if bucket_width.days() != 0 { let delta = input.day() as i32 % bucket_width.days(); - return set_timestamp(input.year(), input.month(), input.day() - delta as u8, input.hour(), input.minute()) + return set_timestamp( + input.year(), + input.month(), + input.day() - delta as u8, + input.hour(), + input.minute(), + ); } else if bucket_width.micros() != 0 { let hours_delta = get_micros_delta(bucket_width.micros(), input.hour(), HOURS_PER_MICRO); - let minutes_delta = get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO); - return set_timestamp(input.year(), input.month(), input.day(), input.hour() - hours_delta, input.minute() - minutes_delta) + let minutes_delta = + get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO); + return set_timestamp( + input.year(), + input.month(), + input.day(), + input.hour() - hours_delta, + input.minute() - minutes_delta, + ); } Timestamp::new( @@ -130,7 +132,8 @@ pub fn time_bucket_timestamp( input.hour(), input.minute(), 0f64, - ).unwrap() + ) + .unwrap() } // TODO: Need to implement offset for pg @@ -141,8 +144,8 @@ pub fn time_bucket_timestamp_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } // TODO: Need to implement offset for pg @@ -153,6 +156,6 @@ pub fn time_bucket_timestamp_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index ad0b5f62..d557349b 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -29,7 +29,7 @@ use std::fs::File; use time::Date; #[rstest] -async fn test_time_bucket_minutes(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { +async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { let stored_batch = time_series_record_batch_minutes()?; let parquet_path = tempdir.path().join("test_arrow_types.parquet"); let parquet_file = File::create(&parquet_path)?; @@ -87,7 +87,7 @@ async fn test_time_bucket_minutes(mut conn: PgConnection, tempdir: TempDir) -> R } #[rstest] -async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { +async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { let stored_batch = time_series_record_batch_years()?; let parquet_path = tempdir.path().join("test_arrow_types.parquet"); let parquet_file = File::create(&parquet_path)?; @@ -145,18 +145,15 @@ async fn test_time_bucket_years(mut conn: PgConnection, tempdir: TempDir) -> Res #[rstest] async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { - let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; let trips_table = NycTripsTable::setup(); trips_table.execute(&mut conn); #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY', tpep_pickup_datetime::DATE) AS bucket, AVG(trip_distance) as avg_value FROM nyc_trips GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { - panic!("Should have error'ed when calling time_bucket() on non-FDW data.") } Err(error) => { - let a = error.to_string().contains(error_message); - assert!(a); + panic!("Should not have error'ed when calling time_bucket() on non-FDW data: {}", error) } } From 0d552fb7d603a0ab51b26617829696e9632197ee Mon Sep 17 00:00:00 2001 From: Devan Date: Mon, 5 Aug 2024 15:51:35 -0500 Subject: [PATCH 14/29] clippy Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 32bf6f3f..bc979ed3 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -22,15 +22,15 @@ const HOURS_PER_MICRO: i64 = 3600000000; const MINUTES_PER_MICRO: i64 = 60000000; fn set_date(year: i32, month: u8, day: u8) -> Date { - return Date::from( + Date::from( Timestamp::new(year, month, day, 0, 0, 0f64) .unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)), - ); + ) } fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp { - return Timestamp::new(year, month, day, hour, minute, 0f64) - .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)); + Timestamp::new(year, month, day, hour, minute, 0f64) + .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) } fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { @@ -45,7 +45,7 @@ fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { pub fn time_bucket_date_no_offset(bucket_width: Interval, input: Date) -> Date { let years = bucket_width.months() / 12; if years != 0 { - let delta = input.year() as i32 % years; + let delta = input.year() % years; return set_date(input.year() - delta, input.month(), input.day()); } else if bucket_width.months() != 0 { let delta = input.month() as i32 % bucket_width.months(); @@ -86,7 +86,7 @@ pub fn time_bucket_date_offset_interval( pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp { let years = bucket_width.months() / 12; if years != 0 { - let delta = input.year() as i32 % years; + let delta = input.year() % years; return set_timestamp( input.year() - delta, input.month(), From 22a26ae87d268dc44b5f69203271c02479de2bf6 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 7 Aug 2024 07:50:43 -0500 Subject: [PATCH 15/29] refactoring calculations for time_bucket() I found edge cases Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 163 +++++++++++++++++++------------------- 1 file changed, 80 insertions(+), 83 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index bc979ed3..9c0f3d04 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -15,64 +15,84 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use chrono::{DateTime, Datelike, NaiveDateTime, Timelike}; +use duckdb::arrow::temporal_conversions::SECONDS_IN_DAY; use pgrx::iter::TableIterator; use pgrx::*; -const HOURS_PER_MICRO: i64 = 3600000000; -const MINUTES_PER_MICRO: i64 = 60000000; +// Origin epoch for months/years intervals. +// The origin date is 2000-01-01. +// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin +const ORIGIN_UNIX_EPOCH: i128 = 946684800; +// Origin epoch for days/minutes/seconds intervals. +// Date is set to 2000-01-03. +// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin +const DAYS_ORIGIN_UNIX_EPOCH: i128 = 946857600; +const MICROS_PER_SECOND: i128 = 1000000; -fn set_date(year: i32, month: u8, day: u8) -> Date { +fn set_date(year: i32, month: u32, day: u32) -> Date { Date::from( - Timestamp::new(year, month, day, 0, 0, 0f64) + Timestamp::new(year, month as u8, day as u8, 0, 0, 0f64) .unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)), ) } -fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp { - Timestamp::new(year, month, day, hour, minute, 0f64) +fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f64) -> Timestamp { + Timestamp::new(year, month, day, hour, minute, second) .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) } -fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 { - let micros_quotient = (micros / divisor) as u8; - if micros_quotient == 0 { - return 0; +fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, months: i32) -> i128 { + if months != 0 { + let truncated_input_unix_epoch = + ((input_unix_epoch - ORIGIN_UNIX_EPOCH) / bucket_width_seconds) * bucket_width_seconds; + ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch + } else { + let truncated_input_unix_epoch = ((input_unix_epoch - DAYS_ORIGIN_UNIX_EPOCH) + / bucket_width_seconds) + * bucket_width_seconds; + DAYS_ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch } - input % micros_quotient } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_no_offset(bucket_width: Interval, input: Date) -> Date { - let years = bucket_width.months() / 12; - if years != 0 { - let delta = input.year() % years; - return set_date(input.year() - delta, input.month(), input.day()); - } else if bucket_width.months() != 0 { - let delta = input.month() as i32 % bucket_width.months(); - return set_date(input.year(), input.month() - delta as u8, input.day()); - } else if bucket_width.days() != 0 { - let delta = input.day() as i32 % bucket_width.days(); - return set_date(input.year(), input.month(), input.day() - delta as u8); - } +pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { + let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; + let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + let bucket_date = calculate_time_bucket( + bucket_width_seconds, + input_unix_epoch, + bucket_width.months(), + ); - Date::from(Timestamp::new(input.year(), input.month(), input.day(), 0, 0, 0f64).unwrap()) + if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { + set_date(dt.year(), dt.month(), dt.day()) + } else { + panic!("There was a problem setting the native datetime from provided unix epoch.") + } } -// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_offset_date( - _bucket_width: Interval, - _input: Date, - _offset: Date, -) -> TableIterator<'static, (name!(time_bucket, Date),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date) -> Date { + let new_origin_epoch = (origin.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + + let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; + let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + let truncated_input_unix_epoch = + ((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds; + + let bucket_date = ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch; + + if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { + set_date(dt.year(), dt.month(), dt.day()) + } else { + panic!("There was a problem setting the native datetime from provided unix epoch.") + } } // TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_offset_interval( +pub fn time_bucket_date_offset( _bucket_width: Interval, _input: Date, _offset: Interval, @@ -84,56 +104,33 @@ pub fn time_bucket_date_offset_interval( #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp { - let years = bucket_width.months() / 12; - if years != 0 { - let delta = input.year() % years; - return set_timestamp( - input.year() - delta, - input.month(), - input.day(), - input.hour(), - input.minute(), - ); - } else if bucket_width.months() != 0 { - let delta = input.month() as i32 % bucket_width.months(); - return set_timestamp( - input.year(), - input.month() - delta as u8, - input.day(), - input.hour(), - input.minute(), - ); - } else if bucket_width.days() != 0 { - let delta = input.day() as i32 % bucket_width.days(); - return set_timestamp( - input.year(), - input.month(), - input.day() - delta as u8, - input.hour(), - input.minute(), - ); - } else if bucket_width.micros() != 0 { - let hours_delta = get_micros_delta(bucket_width.micros(), input.hour(), HOURS_PER_MICRO); - let minutes_delta = - get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO); - return set_timestamp( - input.year(), - input.month(), - input.day(), - input.hour() - hours_delta, - input.minute() - minutes_delta, - ); - } + let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; + let input_string = input.to_iso_string(); + let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") + .unwrap_or_else(|error| { + panic!( + "there was an error parsing the set TIMESTAMP value as a string: {}", + error + ) + }); + let bucket_date = calculate_time_bucket( + bucket_width_seconds, + input_unix_epoch.and_utc().timestamp() as i128, + bucket_width.months(), + ); - Timestamp::new( - input.year(), - input.month(), - input.day(), - input.hour(), - input.minute(), - 0f64, - ) - .unwrap() + if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { + set_timestamp( + dt.year(), + dt.month() as u8, + dt.day() as u8, + dt.hour() as u8, + dt.minute() as u8, + dt.second() as f64, + ) + } else { + panic!("There was a problem setting the native datetime from provided unix epoch.") + } } // TODO: Need to implement offset for pg From 42b4b787ac118393143a37cb844de840652a89cf Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 7 Aug 2024 07:57:23 -0500 Subject: [PATCH 16/29] Adding origin parameter Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 64 +++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 9c0f3d04..c53a1838 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -42,7 +42,13 @@ fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f6 .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) } -fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, months: i32) -> i128 { +fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, months: i32, override_origin_epoch: Option) -> i128 { + if let Some(new_origin_epoch) = override_origin_epoch { + let truncated_input_unix_epoch = + ((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds; + return new_origin_epoch + truncated_input_unix_epoch; + } + if months != 0 { let truncated_input_unix_epoch = ((input_unix_epoch - ORIGIN_UNIX_EPOCH) / bucket_width_seconds) * bucket_width_seconds; @@ -63,6 +69,7 @@ pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { bucket_width_seconds, input_unix_epoch, bucket_width.months(), + None ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -78,10 +85,13 @@ pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - let truncated_input_unix_epoch = - ((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds; - let bucket_date = ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch; + let bucket_date = calculate_time_bucket( + bucket_width_seconds, + input_unix_epoch, + bucket_width.months(), + Some(new_origin_epoch) + ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { set_date(dt.year(), dt.month(), dt.day()) @@ -113,10 +123,12 @@ pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timest error ) }); + let bucket_date = calculate_time_bucket( bucket_width_seconds, input_unix_epoch.and_utc().timestamp() as i128, bucket_width.months(), + None ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -133,16 +145,44 @@ pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timest } } -// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( - _bucket_width: Interval, - _input: Timestamp, - _offset: Date, -) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + bucket_width: Interval, + input: Timestamp, + origin: Date, +) -> Timestamp { + let new_origin_epoch = (origin.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + + let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; + let input_string = input.to_iso_string(); + let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") + .unwrap_or_else(|error| { + panic!( + "there was an error parsing the set TIMESTAMP value as a string: {}", + error + ) + }); + + let bucket_date = calculate_time_bucket( + bucket_width_seconds, + input_unix_epoch.and_utc().timestamp() as i128, + bucket_width.months(), + Some(new_origin_epoch) + ); + + if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { + set_timestamp( + dt.year(), + dt.month() as u8, + dt.day() as u8, + dt.hour() as u8, + dt.minute() as u8, + dt.second() as f64, + ) + } else { + panic!("There was a problem setting the native datetime from provided unix epoch.") + } + } // TODO: Need to implement offset for pg From 8a29b621d2525caf67772a0854984393fdabb48a Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 7 Aug 2024 08:20:56 -0500 Subject: [PATCH 17/29] Adding timestamp functions Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index c53a1838..ce33f3f9 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -49,6 +49,8 @@ fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, mon return new_origin_epoch + truncated_input_unix_epoch; } + // Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin + // DuckDB will change which origin it uses based on whether months are set in the INTERVAL. if months != 0 { let truncated_input_unix_epoch = ((input_unix_epoch - ORIGIN_UNIX_EPOCH) / bucket_width_seconds) * bucket_width_seconds; @@ -65,6 +67,7 @@ fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, mon pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + let bucket_date = calculate_time_bucket( bucket_width_seconds, input_unix_epoch, @@ -116,6 +119,7 @@ pub fn time_bucket_date_offset( pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp { let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; let input_string = input.to_iso_string(); + let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") .unwrap_or_else(|error| { panic!( @@ -155,6 +159,7 @@ pub fn time_bucket_timestamp_offset_date( let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; let input_string = input.to_iso_string(); + let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") .unwrap_or_else(|error| { panic!( From 90ccb9222effb335f853cb74f568cef93ba64910 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 7 Aug 2024 08:21:46 -0500 Subject: [PATCH 18/29] fmt Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index ce33f3f9..b01a80d9 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -42,7 +42,12 @@ fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f6 .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) } -fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, months: i32, override_origin_epoch: Option) -> i128 { +fn calculate_time_bucket( + bucket_width_seconds: i128, + input_unix_epoch: i128, + months: i32, + override_origin_epoch: Option, +) -> i128 { if let Some(new_origin_epoch) = override_origin_epoch { let truncated_input_unix_epoch = ((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds; @@ -72,7 +77,7 @@ pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { bucket_width_seconds, input_unix_epoch, bucket_width.months(), - None + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -93,7 +98,7 @@ pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date bucket_width_seconds, input_unix_epoch, bucket_width.months(), - Some(new_origin_epoch) + Some(new_origin_epoch), ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -132,7 +137,7 @@ pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timest bucket_width_seconds, input_unix_epoch.and_utc().timestamp() as i128, bucket_width.months(), - None + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -172,7 +177,7 @@ pub fn time_bucket_timestamp_offset_date( bucket_width_seconds, input_unix_epoch.and_utc().timestamp() as i128, bucket_width.months(), - Some(new_origin_epoch) + Some(new_origin_epoch), ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -187,7 +192,6 @@ pub fn time_bucket_timestamp_offset_date( } else { panic!("There was a problem setting the native datetime from provided unix epoch.") } - } // TODO: Need to implement offset for pg From cf155d813f48c75b2b4f5f1969c0723d03eca5a8 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 7 Aug 2024 09:26:39 -0500 Subject: [PATCH 19/29] (wip): working on offset Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 48 +++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index b01a80d9..3b91fd1e 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -47,6 +47,7 @@ fn calculate_time_bucket( input_unix_epoch: i128, months: i32, override_origin_epoch: Option, + override_offset_epoch: Option, ) -> i128 { if let Some(new_origin_epoch) = override_origin_epoch { let truncated_input_unix_epoch = @@ -54,6 +55,22 @@ fn calculate_time_bucket( return new_origin_epoch + truncated_input_unix_epoch; } + if let Some(new_offset_epoch) = override_offset_epoch { + if months != 0 { + let new_origin_epoch = (ORIGIN_UNIX_EPOCH - new_offset_epoch).abs(); + let truncated_input_unix_epoch = ((input_unix_epoch - new_origin_epoch) + / bucket_width_seconds) + * bucket_width_seconds; + return new_origin_epoch + truncated_input_unix_epoch; + } else { + let new_origin_epoch = (DAYS_ORIGIN_UNIX_EPOCH - new_offset_epoch).abs(); + let truncated_input_unix_epoch = ((input_unix_epoch - new_origin_epoch) + / bucket_width_seconds) + * bucket_width_seconds; + return new_origin_epoch + truncated_input_unix_epoch; + } + }; + // Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin // DuckDB will change which origin it uses based on whether months are set in the INTERVAL. if months != 0 { @@ -78,6 +95,7 @@ pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { input_unix_epoch, bucket_width.months(), None, + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -99,6 +117,7 @@ pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date input_unix_epoch, bucket_width.months(), Some(new_origin_epoch), + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -108,16 +127,25 @@ pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date } } -// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_offset( - _bucket_width: Interval, - _input: Date, - _offset: Interval, -) -> TableIterator<'static, (name!(time_bucket, Date),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +pub fn time_bucket_date_offset(bucket_width: Interval, input: Date, offset: Interval) -> Date { + let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; + let offset_seconds = offset.as_micros() / MICROS_PER_SECOND; + let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; + + let bucket_date = calculate_time_bucket( + bucket_width_seconds, + input_unix_epoch, + bucket_width.months(), + None, + Some(offset_seconds), + ); + + if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { + set_date(dt.year(), dt.month(), dt.day()) + } else { + panic!("There was a problem setting the native datetime from provided unix epoch.") + } } #[pg_extern(name = "time_bucket")] @@ -138,6 +166,7 @@ pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timest input_unix_epoch.and_utc().timestamp() as i128, bucket_width.months(), None, + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { @@ -178,6 +207,7 @@ pub fn time_bucket_timestamp_offset_date( input_unix_epoch.and_utc().timestamp() as i128, bucket_width.months(), Some(new_origin_epoch), + None, ); if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { From 7cdd2f68db01c23a0d3ac5ef45f00235756bd0e8 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 9 Aug 2024 08:55:58 -0500 Subject: [PATCH 20/29] revert to only push down time_bucket() to FDW duckdb relations Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 221 +++++++------------------------------- src/hooks/executor.rs | 4 + tests/time_bucket.rs | 5 +- 3 files changed, 45 insertions(+), 185 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 3b91fd1e..a878a1c0 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -15,24 +15,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use chrono::{DateTime, Datelike, NaiveDateTime, Timelike}; -use duckdb::arrow::temporal_conversions::SECONDS_IN_DAY; -use pgrx::iter::TableIterator; use pgrx::*; -// Origin epoch for months/years intervals. -// The origin date is 2000-01-01. -// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin -const ORIGIN_UNIX_EPOCH: i128 = 946684800; -// Origin epoch for days/minutes/seconds intervals. -// Date is set to 2000-01-03. -// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin -const DAYS_ORIGIN_UNIX_EPOCH: i128 = 946857600; -const MICROS_PER_SECOND: i128 = 1000000; -fn set_date(year: i32, month: u32, day: u32) -> Date { +fn set_date(year: i32, month: u8, day: u8) -> Date { Date::from( - Timestamp::new(year, month as u8, day as u8, 0, 0, 0f64) + Timestamp::new(year, month, day, 0, 0, 0f64) .unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)), ) } @@ -42,196 +30,61 @@ fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f6 .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) } -fn calculate_time_bucket( - bucket_width_seconds: i128, - input_unix_epoch: i128, - months: i32, - override_origin_epoch: Option, - override_offset_epoch: Option, -) -> i128 { - if let Some(new_origin_epoch) = override_origin_epoch { - let truncated_input_unix_epoch = - ((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds; - return new_origin_epoch + truncated_input_unix_epoch; - } - - if let Some(new_offset_epoch) = override_offset_epoch { - if months != 0 { - let new_origin_epoch = (ORIGIN_UNIX_EPOCH - new_offset_epoch).abs(); - let truncated_input_unix_epoch = ((input_unix_epoch - new_origin_epoch) - / bucket_width_seconds) - * bucket_width_seconds; - return new_origin_epoch + truncated_input_unix_epoch; - } else { - let new_origin_epoch = (DAYS_ORIGIN_UNIX_EPOCH - new_offset_epoch).abs(); - let truncated_input_unix_epoch = ((input_unix_epoch - new_origin_epoch) - / bucket_width_seconds) - * bucket_width_seconds; - return new_origin_epoch + truncated_input_unix_epoch; - } - }; - - // Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin - // DuckDB will change which origin it uses based on whether months are set in the INTERVAL. - if months != 0 { - let truncated_input_unix_epoch = - ((input_unix_epoch - ORIGIN_UNIX_EPOCH) / bucket_width_seconds) * bucket_width_seconds; - ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch - } else { - let truncated_input_unix_epoch = ((input_unix_epoch - DAYS_ORIGIN_UNIX_EPOCH) - / bucket_width_seconds) - * bucket_width_seconds; - DAYS_ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch - } -} - #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date { - let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; - let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - - let bucket_date = calculate_time_bucket( - bucket_width_seconds, - input_unix_epoch, - bucket_width.months(), - None, - None, - ); - - if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { - set_date(dt.year(), dt.month(), dt.day()) - } else { - panic!("There was a problem setting the native datetime from provided unix epoch.") - } +pub fn time_bucket_date(_bucket_width: Interval, input: Date) -> Date { + set_date(input.year(), input.day(), input.month()) } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date) -> Date { - let new_origin_epoch = (origin.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - - let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; - let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - - let bucket_date = calculate_time_bucket( - bucket_width_seconds, - input_unix_epoch, - bucket_width.months(), - Some(new_origin_epoch), - None, - ); - - if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { - set_date(dt.year(), dt.month(), dt.day()) - } else { - panic!("There was a problem setting the native datetime from provided unix epoch.") - } +pub fn time_bucket_date_origin(_bucket_width: Interval, input: Date, _origin: Date) -> Date { + set_date(input.year(), input.day(), input.month()) } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_offset(bucket_width: Interval, input: Date, offset: Interval) -> Date { - let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; - let offset_seconds = offset.as_micros() / MICROS_PER_SECOND; - let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - - let bucket_date = calculate_time_bucket( - bucket_width_seconds, - input_unix_epoch, - bucket_width.months(), - None, - Some(offset_seconds), - ); - - if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { - set_date(dt.year(), dt.month(), dt.day()) - } else { - panic!("There was a problem setting the native datetime from provided unix epoch.") - } +pub fn time_bucket_date_offset(_bucket_width: Interval, input: Date, _offset: Interval) -> Date { + set_date(input.year(), input.day(), input.month()) } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp { - let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; - let input_string = input.to_iso_string(); - - let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") - .unwrap_or_else(|error| { - panic!( - "there was an error parsing the set TIMESTAMP value as a string: {}", - error - ) - }); - - let bucket_date = calculate_time_bucket( - bucket_width_seconds, - input_unix_epoch.and_utc().timestamp() as i128, - bucket_width.months(), - None, - None, - ); - - if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { - set_timestamp( - dt.year(), - dt.month() as u8, - dt.day() as u8, - dt.hour() as u8, - dt.minute() as u8, - dt.second() as f64, - ) - } else { - panic!("There was a problem setting the native datetime from provided unix epoch.") - } +pub fn time_bucket_timestamp(_bucket_width: Interval, input: Timestamp) -> Timestamp { + set_timestamp( + input.year(), + input.month(), + input.day(), + input.hour(), + input.minute(), + input.second(), + ) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( - bucket_width: Interval, + _bucket_width: Interval, input: Timestamp, - origin: Date, + _origin: Date, ) -> Timestamp { - let new_origin_epoch = (origin.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128; - - let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND; - let input_string = input.to_iso_string(); - - let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S") - .unwrap_or_else(|error| { - panic!( - "there was an error parsing the set TIMESTAMP value as a string: {}", - error - ) - }); - - let bucket_date = calculate_time_bucket( - bucket_width_seconds, - input_unix_epoch.and_utc().timestamp() as i128, - bucket_width.months(), - Some(new_origin_epoch), - None, - ); - - if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) { - set_timestamp( - dt.year(), - dt.month() as u8, - dt.day() as u8, - dt.hour() as u8, - dt.minute() as u8, - dt.second() as f64, - ) - } else { - panic!("There was a problem setting the native datetime from provided unix epoch.") - } + set_timestamp( + input.year(), + input.month(), + input.day(), + input.hour(), + input.minute(), + input.second(), + ) } -// TODO: Need to implement offset for pg #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( _bucket_width: Interval, - _input: Timestamp, + input: Timestamp, _offset: Interval, -) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) +) -> Timestamp { + set_timestamp( + input.year(), + input.month(), + input.day(), + input.hour(), + input.minute(), + input.second(), + ) } diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index 5ff20cb9..22e23191 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -60,6 +60,10 @@ pub async fn executor_run( } }); + if !is_duckdb_query && query.contains("time_bucket") { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + } + if rtable.is_null() || query_desc.operation != pg_sys::CmdType_CMD_SELECT || !is_duckdb_query diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index d557349b..0e789861 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -145,15 +145,18 @@ async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) #[rstest] async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { + let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; let trips_table = NycTripsTable::setup(); trips_table.execute(&mut conn); #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY', tpep_pickup_datetime::DATE) AS bucket, AVG(trip_distance) as avg_value FROM nyc_trips GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { + panic!("Should have error'ed when calling time_bucket() on non-FDW data.") } Err(error) => { - panic!("Should not have error'ed when calling time_bucket() on non-FDW data: {}", error) + let a = error.to_string().contains(error_message); + assert!(a); } } From 67862f6a68b2c10fba8883ffee9e2ef44dfa3c7f Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 9 Aug 2024 08:58:54 -0500 Subject: [PATCH 21/29] fmt Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index a878a1c0..f90ea4ae 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -17,7 +17,6 @@ use pgrx::*; - fn set_date(year: i32, month: u8, day: u8) -> Date { Date::from( Timestamp::new(year, month, day, 0, 0, 0f64) From 5fed985741a3b879872ffb27f2b9e1af89a5af39 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 9 Aug 2024 10:15:45 -0500 Subject: [PATCH 22/29] add errors for methods Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 57 +++++++++------------------------------ src/hooks/executor.rs | 4 --- 2 files changed, 12 insertions(+), 49 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index f90ea4ae..424fc546 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -17,73 +17,40 @@ use pgrx::*; -fn set_date(year: i32, month: u8, day: u8) -> Date { - Date::from( - Timestamp::new(year, month, day, 0, 0, 0f64) - .unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)), - ) -} - -fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f64) -> Timestamp { - Timestamp::new(year, month, day, hour, minute, second) - .unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)) -} - #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date(_bucket_width: Interval, input: Date) -> Date { - set_date(input.year(), input.day(), input.month()) +pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_origin(_bucket_width: Interval, input: Date, _origin: Date) -> Date { - set_date(input.year(), input.day(), input.month()) +pub fn time_bucket_date_origin(_bucket_width: Interval, _input: Date, _origin: Date) -> Date { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_date_offset(_bucket_width: Interval, input: Date, _offset: Interval) -> Date { - set_date(input.year(), input.day(), input.month()) +pub fn time_bucket_date_offset(_bucket_width: Interval, _input: Date, _offset: Interval) -> Date { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] -pub fn time_bucket_timestamp(_bucket_width: Interval, input: Timestamp) -> Timestamp { - set_timestamp( - input.year(), - input.month(), - input.day(), - input.hour(), - input.minute(), - input.second(), - ) +pub fn time_bucket_timestamp(_bucket_width: Interval, _input: Timestamp) -> Timestamp { + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( _bucket_width: Interval, - input: Timestamp, + _input: Timestamp, _origin: Date, ) -> Timestamp { - set_timestamp( - input.year(), - input.month(), - input.day(), - input.hour(), - input.minute(), - input.second(), - ) + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( _bucket_width: Interval, - input: Timestamp, + _input: Timestamp, _offset: Interval, ) -> Timestamp { - set_timestamp( - input.year(), - input.month(), - input.day(), - input.hour(), - input.minute(), - input.second(), - ) + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index 22e23191..5ff20cb9 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -60,10 +60,6 @@ pub async fn executor_run( } }); - if !is_duckdb_query && query.contains("time_bucket") { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); - } - if rtable.is_null() || query_desc.operation != pg_sys::CmdType_CMD_SELECT || !is_duckdb_query From 70e9865d500bbcc9fcb526410d3c16ac046a8247 Mon Sep 17 00:00:00 2001 From: Devan Date: Fri, 9 Aug 2024 15:45:27 -0500 Subject: [PATCH 23/29] typo Signed-off-by: Devan --- src/duckdb/time_bucket.rs | 12 ++++++------ tests/time_bucket.rs | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 424fc546..96d7ba53 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -19,22 +19,22 @@ use pgrx::*; #[pg_extern(name = "time_bucket")] pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_origin(_bucket_width: Interval, _input: Date, _origin: Date) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset(_bucket_width: Interval, _input: Date, _offset: Interval) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp(_bucket_width: Interval, _input: Timestamp) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] @@ -43,7 +43,7 @@ pub fn time_bucket_timestamp_offset_date( _input: Timestamp, _origin: Date, ) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } #[pg_extern(name = "time_bucket")] @@ -52,5 +52,5 @@ pub fn time_bucket_timestamp_offset_interval( _input: Timestamp, _offset: Interval, ) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); } diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 0e789861..2720a08a 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -145,7 +145,7 @@ async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) #[rstest] async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { - let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; + let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; let trips_table = NycTripsTable::setup(); trips_table.execute(&mut conn); From bdc0cad43531a5b76543ffaf69f6b002424203aa Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 19:27:50 -0500 Subject: [PATCH 24/29] move folder and make error const Signed-off-by: Devan --- src/api/mod.rs | 1 + src/{duckdb => api}/time_bucket.rs | 14 ++++++++------ src/duckdb/mod.rs | 1 - 3 files changed, 9 insertions(+), 7 deletions(-) rename src/{duckdb => api}/time_bucket.rs (50%) diff --git a/src/api/mod.rs b/src/api/mod.rs index 2cdb683c..595b4f4b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -18,3 +18,4 @@ mod csv; mod duckdb; mod parquet; +pub mod time_bucket; diff --git a/src/duckdb/time_bucket.rs b/src/api/time_bucket.rs similarity index 50% rename from src/duckdb/time_bucket.rs rename to src/api/time_bucket.rs index 96d7ba53..d8d362d8 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/api/time_bucket.rs @@ -17,24 +17,26 @@ use pgrx::*; +const TIME_BUCKET_FALLBACK_ERROR: &'static str = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; + #[pg_extern(name = "time_bucket")] pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_origin(_bucket_width: Interval, _input: Date, _origin: Date) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset(_bucket_width: Interval, _input: Date, _offset: Interval) -> Date { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp(_bucket_width: Interval, _input: Timestamp) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } #[pg_extern(name = "time_bucket")] @@ -43,7 +45,7 @@ pub fn time_bucket_timestamp_offset_date( _input: Timestamp, _origin: Date, ) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } #[pg_extern(name = "time_bucket")] @@ -52,5 +54,5 @@ pub fn time_bucket_timestamp_offset_interval( _input: Timestamp, _offset: Interval, ) -> Timestamp { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."); + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); } diff --git a/src/duckdb/mod.rs b/src/duckdb/mod.rs index b06e84d5..11b69efc 100644 --- a/src/duckdb/mod.rs +++ b/src/duckdb/mod.rs @@ -21,5 +21,4 @@ pub mod delta; pub mod iceberg; pub mod parquet; pub mod secret; -pub mod time_bucket; pub mod utils; From a6457fc534cde8d42f40f18c4243e989af355544 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 19:28:33 -0500 Subject: [PATCH 25/29] clippy Signed-off-by: Devan --- src/api/time_bucket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/time_bucket.rs b/src/api/time_bucket.rs index d8d362d8..c64f19f2 100644 --- a/src/api/time_bucket.rs +++ b/src/api/time_bucket.rs @@ -17,7 +17,7 @@ use pgrx::*; -const TIME_BUCKET_FALLBACK_ERROR: &'static str = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; +const TIME_BUCKET_FALLBACK_ERROR: &str = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; #[pg_extern(name = "time_bucket")] pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date { From ff8e7002fa17521e695da8e1dc2fab0e012cb9bf Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 19:34:17 -0500 Subject: [PATCH 26/29] clippy Signed-off-by: Devan --- tests/fixtures/mod.rs | 2 -- tests/time_bucket.rs | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 5bd4a2ba..a44c225f 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -230,7 +230,6 @@ pub fn duckdb_conn() -> duckdb::Connection { #[allow(dead_code)] pub fn time_series_record_batch_minutes() -> Result { - // Define the fields for each datatype let fields = vec![ Field::new("value", DataType::Int32, false), Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), @@ -254,7 +253,6 @@ pub fn time_series_record_batch_minutes() -> Result { #[allow(dead_code)] pub fn time_series_record_batch_years() -> Result { - // Define the fields for each datatype let fields = vec![ Field::new("value", DataType::Int32, false), Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 2720a08a..5d86995e 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -78,10 +78,10 @@ async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDi assert_eq!(10, data.len()); - let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); - assert_eq!(10, data.len()); + println!("{:?}", data); Ok(()) } From 0d7346ccfed6136db3d24f5331546403c97e0fa0 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 20:49:16 -0500 Subject: [PATCH 27/29] add more tests for expected data Signed-off-by: Devan --- tests/time_bucket.rs | 135 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 127 insertions(+), 8 deletions(-) diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index 5d86995e..e1b189c3 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -24,9 +24,12 @@ use fixtures::*; use rstest::*; use shared::fixtures::arrow::primitive_setup_fdw_local_file_listing; use shared::fixtures::tempfile::TempDir; +use sqlx::types::BigDecimal; use sqlx::PgConnection; use std::fs::File; +use std::str::FromStr; use time::Date; +use time::Month::January; #[rstest] async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { @@ -68,20 +71,51 @@ async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDi Err(_) => {} } - let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '6 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); assert_eq!(2, data.len()); - let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ("1970-01-01T00:00:00".parse()?, BigDecimal::from_str("3")?), + ("1970-01-01T00:10:00".parse()?, BigDecimal::from_str("8")?), + ]; + assert_eq!(expected, data); + + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); assert_eq!(10, data.len()); - let data: Vec<(NaiveDateTime,)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ("1970-01-01T00:01:00".parse()?, BigDecimal::from_str("1")?), + ("1970-01-01T00:02:00".parse()?, BigDecimal::from_str("-1")?), + ("1970-01-01T00:03:00".parse()?, BigDecimal::from_str("0")?), + ("1970-01-01T00:04:00".parse()?, BigDecimal::from_str("2")?), + ("1970-01-01T00:05:00".parse()?, BigDecimal::from_str("3")?), + ("1970-01-01T00:06:00".parse()?, BigDecimal::from_str("4")?), + ("1970-01-01T00:07:00".parse()?, BigDecimal::from_str("5")?), + ("1970-01-01T00:08:00".parse()?, BigDecimal::from_str("6")?), + ("1970-01-01T00:09:00".parse()?, BigDecimal::from_str("7")?), + ("1970-01-01T00:10:00".parse()?, BigDecimal::from_str("8")?), + ]; + assert_eq!(expected, data); + + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); + assert_eq!(2, data.len()); - println!("{:?}", data); + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ( + "1969-12-31T23:55:00".parse()?, + BigDecimal::from_str("0.5000")?, + ), + ( + "1970-01-01T00:05:00".parse()?, + BigDecimal::from_str("5.5000")?, + ), + ]; + assert_eq!(expected, data); Ok(()) } @@ -126,20 +160,105 @@ async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) Err(_) => {} } - let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); assert_eq!(10, data.len()); - let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '5 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1970, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1971, January, 1)?, + BigDecimal::from_str("-1")?, + ), + ( + Date::from_calendar_date(1972, January, 1)?, + BigDecimal::from_str("0")?, + ), + ( + Date::from_calendar_date(1973, January, 1)?, + BigDecimal::from_str("2")?, + ), + ( + Date::from_calendar_date(1974, January, 1)?, + BigDecimal::from_str("3")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("4")?, + ), + ( + Date::from_calendar_date(1976, January, 1)?, + BigDecimal::from_str("5")?, + ), + ( + Date::from_calendar_date(1977, January, 1)?, + BigDecimal::from_str("6")?, + ), + ( + Date::from_calendar_date(1978, January, 1)?, + BigDecimal::from_str("7")?, + ), + ( + Date::from_calendar_date(1979, January, 1)?, + BigDecimal::from_str("8")?, + ), + ]; + assert_eq!(expected, data); + + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '5 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); assert_eq!(2, data.len()); - let data: Vec<(Date,)> = "SELECT time_bucket(INTERVAL '2 YEAR', timestamp::DATE, DATE '1980-01-01') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1970, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("6")?, + ), + ]; + assert_eq!(expected, data); + + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '2 YEAR', timestamp::DATE, DATE '1969-01-01') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" .fetch_result(&mut conn).unwrap(); - assert_eq!(5, data.len()); + assert_eq!(6, data.len()); + + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1969, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1971, January, 1)?, + BigDecimal::from_str("-0.5000")?, + ), + ( + Date::from_calendar_date(1973, January, 1)?, + BigDecimal::from_str("2.5000")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("4.5000")?, + ), + ( + Date::from_calendar_date(1977, January, 1)?, + BigDecimal::from_str("6.5000")?, + ), + ( + Date::from_calendar_date(1979, January, 1)?, + BigDecimal::from_str("8")?, + ), + ]; + assert_eq!(expected, data); + Ok(()) } From 02e8c84a028b29d4d50052787faab5557f8317fb Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 20:56:53 -0500 Subject: [PATCH 28/29] fmt tests file Signed-off-by: Devan --- tests/time_bucket.rs | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index e1b189c3..f6c9ed07 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -50,25 +50,15 @@ async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDi ) .execute(&mut conn); - #[allow(clippy::single_match)] - match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { - Ok(_) => {} - Err(error) => { - panic!( - "should have successfully called time_bucket() for timeseries data: {}", - error - ); - } - } - - #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { panic!( "should have failed call to time_bucket() for timeseries data with incorrect parameters" ); } - Err(_) => {} + Err(err) => { + assert_eq!("error returned from database: function time_bucket(interval) does not exist", err.to_string()); + } } let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" @@ -139,25 +129,15 @@ async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) ) .execute(&mut conn); - #[allow(clippy::single_match)] - match "SELECT time_bucket(INTERVAL '2 DAY', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { - Ok(_) => {} - Err(error) => { - panic!( - "should have successfully called time_bucket() for timeseries data: {}", - error - ); - } - } - - #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { panic!( "should have failed call to time_bucket() for timeseries data with incorrect parameters" ); } - Err(_) => {} + Err(err) => { + assert_eq!("error returned from database: function time_bucket(interval) does not exist", err.to_string()); + } } let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" @@ -268,7 +248,6 @@ async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { let trips_table = NycTripsTable::setup(); trips_table.execute(&mut conn); - #[allow(clippy::single_match)] match "SELECT time_bucket(INTERVAL '2 DAY', tpep_pickup_datetime::DATE) AS bucket, AVG(trip_distance) as avg_value FROM nyc_trips GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { Ok(_) => { panic!("Should have error'ed when calling time_bucket() on non-FDW data.") From 5aab7101e4e6e1dd4083352d66743147a0fec988 Mon Sep 17 00:00:00 2001 From: Devan Date: Sat, 10 Aug 2024 21:27:47 -0500 Subject: [PATCH 29/29] rm dead code warning -- should be a fixture for rstest Signed-off-by: Devan --- tests/fixtures/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index a44c225f..5852f973 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -228,7 +228,7 @@ pub fn duckdb_conn() -> duckdb::Connection { duckdb::Connection::open_in_memory().unwrap() } -#[allow(dead_code)] +#[fixture] pub fn time_series_record_batch_minutes() -> Result { let fields = vec![ Field::new("value", DataType::Int32, false), @@ -251,7 +251,7 @@ pub fn time_series_record_batch_minutes() -> Result { )?) } -#[allow(dead_code)] +#[fixture] pub fn time_series_record_batch_years() -> Result { let fields = vec![ Field::new("value", DataType::Int32, false),