diff --git a/.gitignore b/.gitignore index c5116260..451a853a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # VS Code .vscode/ +# Jetbrains +.idea/ + # macOS .DS_Store @@ -31,4 +34,4 @@ TPC-H_V*/ # Tests regression.* results/ -.env \ No newline at end of file +.env diff --git a/src/api/mod.rs b/src/api/mod.rs index 2cdb683c..595b4f4b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -18,3 +18,4 @@ mod csv; mod duckdb; mod parquet; +pub mod time_bucket; diff --git a/src/api/time_bucket.rs b/src/api/time_bucket.rs new file mode 100644 index 00000000..c64f19f2 --- /dev/null +++ b/src/api/time_bucket.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use pgrx::*; + +const TIME_BUCKET_FALLBACK_ERROR: &str = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date_origin(_bucket_width: Interval, _input: Date, _origin: Date) -> Date { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_date_offset(_bucket_width: Interval, _input: Date, _offset: Interval) -> Date { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp(_bucket_width: Interval, _input: Timestamp) -> Timestamp { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp_offset_date( + _bucket_width: Interval, + _input: Timestamp, + _origin: Date, +) -> Timestamp { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} + +#[pg_extern(name = "time_bucket")] +pub fn time_bucket_timestamp_offset_interval( + _bucket_width: Interval, + _input: Timestamp, + _offset: Interval, +) -> Timestamp { + panic!("{}", TIME_BUCKET_FALLBACK_ERROR); +} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index c3cc2b3c..5852f973 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -15,16 +15,14 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{ - fs::{self, File}, - io::Read, - path::{Path, PathBuf}, -}; - use anyhow::Result; use async_std::task::block_on; use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::primitives::ByteStream; +use chrono::{DateTime, Duration}; +use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit::Millisecond; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::{ arrow::{datatypes::FieldRef, record_batch::RecordBatch}, parquet::arrow::ArrowWriter, @@ -35,6 +33,12 @@ use serde::Serialize; use serde_arrow::schema::{SchemaLike, TracingOptions}; use shared::fixtures::tempfile::TempDir; use sqlx::PgConnection; +use std::sync::Arc; +use std::{ + fs::{self, File}, + io::Read, + path::{Path, PathBuf}, +}; use testcontainers::ContainerAsync; use testcontainers_modules::{ localstack::LocalStack, @@ -223,3 +227,49 @@ pub fn tempfile() -> std::fs::File { pub fn duckdb_conn() -> duckdb::Connection { duckdb::Connection::open_in_memory().unwrap() } + +#[fixture] +pub fn time_series_record_batch_minutes() -> Result { + let fields = vec![ + Field::new("value", DataType::Int32, false), + Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), + ]; + + let schema = Arc::new(Schema::new(fields)); + + let start_time = DateTime::from_timestamp(60, 0).unwrap(); + let timestamps: Vec = (0..10) + .map(|i| (start_time + Duration::minutes(i)).timestamp_millis()) + .collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])), + Arc::new(TimestampMillisecondArray::from(timestamps)), + ], + )?) +} + +#[fixture] +pub fn time_series_record_batch_years() -> Result { + let fields = vec![ + Field::new("value", DataType::Int32, false), + Field::new("timestamp", DataType::Timestamp(Millisecond, None), false), + ]; + + let schema = Arc::new(Schema::new(fields)); + + let start_time = DateTime::from_timestamp(60, 0).unwrap(); + let timestamps: Vec = (0..10) + .map(|i| (start_time + Duration::days(i * 366)).timestamp_millis()) + .collect(); + + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])), + Arc::new(TimestampMillisecondArray::from(timestamps)), + ], + )?) +} diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs new file mode 100644 index 00000000..f6c9ed07 --- /dev/null +++ b/tests/time_bucket.rs @@ -0,0 +1,262 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod fixtures; + +use anyhow::Result; +use chrono::NaiveDateTime; +use datafusion::parquet::arrow::ArrowWriter; +use fixtures::*; +use rstest::*; +use shared::fixtures::arrow::primitive_setup_fdw_local_file_listing; +use shared::fixtures::tempfile::TempDir; +use sqlx::types::BigDecimal; +use sqlx::PgConnection; +use std::fs::File; +use std::str::FromStr; +use time::Date; +use time::Month::January; + +#[rstest] +async fn test_time_bucket_minutes_duckdb(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = time_series_record_batch_minutes()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "MyTable") + .execute(&mut conn); + + format!( + "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", + parquet_path.to_str().unwrap() + ) + .execute(&mut conn); + + match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!( + "should have failed call to time_bucket() for timeseries data with incorrect parameters" + ); + } + Err(err) => { + assert_eq!("error returned from database: function time_bucket(interval) does not exist", err.to_string()); + } + } + + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(2, data.len()); + + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ("1970-01-01T00:00:00".parse()?, BigDecimal::from_str("3")?), + ("1970-01-01T00:10:00".parse()?, BigDecimal::from_str("8")?), + ]; + assert_eq!(expected, data); + + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '1 MINUTE', timestamp::TIMESTAMP) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(10, data.len()); + + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ("1970-01-01T00:01:00".parse()?, BigDecimal::from_str("1")?), + ("1970-01-01T00:02:00".parse()?, BigDecimal::from_str("-1")?), + ("1970-01-01T00:03:00".parse()?, BigDecimal::from_str("0")?), + ("1970-01-01T00:04:00".parse()?, BigDecimal::from_str("2")?), + ("1970-01-01T00:05:00".parse()?, BigDecimal::from_str("3")?), + ("1970-01-01T00:06:00".parse()?, BigDecimal::from_str("4")?), + ("1970-01-01T00:07:00".parse()?, BigDecimal::from_str("5")?), + ("1970-01-01T00:08:00".parse()?, BigDecimal::from_str("6")?), + ("1970-01-01T00:09:00".parse()?, BigDecimal::from_str("7")?), + ("1970-01-01T00:10:00".parse()?, BigDecimal::from_str("8")?), + ]; + assert_eq!(expected, data); + + let data: Vec<(NaiveDateTime, BigDecimal)> = "SELECT time_bucket(INTERVAL '10 MINUTE', timestamp::TIMESTAMP, INTERVAL '5 MINUTE') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + assert_eq!(2, data.len()); + + let expected: Vec<(NaiveDateTime, BigDecimal)> = vec![ + ( + "1969-12-31T23:55:00".parse()?, + BigDecimal::from_str("0.5000")?, + ), + ( + "1970-01-01T00:05:00".parse()?, + BigDecimal::from_str("5.5000")?, + ), + ]; + assert_eq!(expected, data); + + Ok(()) +} + +#[rstest] +async fn test_time_bucket_years_duckdb(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = time_series_record_batch_years()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "MyTable") + .execute(&mut conn); + + format!( + "CREATE FOREIGN TABLE timeseries () SERVER parquet_server OPTIONS (files '{}')", + parquet_path.to_str().unwrap() + ) + .execute(&mut conn); + + match "SELECT time_bucket(INTERVAL '2 DAY') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!( + "should have failed call to time_bucket() for timeseries data with incorrect parameters" + ); + } + Err(err) => { + assert_eq!("error returned from database: function time_bucket(interval) does not exist", err.to_string()); + } + } + + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '1 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(10, data.len()); + + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1970, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1971, January, 1)?, + BigDecimal::from_str("-1")?, + ), + ( + Date::from_calendar_date(1972, January, 1)?, + BigDecimal::from_str("0")?, + ), + ( + Date::from_calendar_date(1973, January, 1)?, + BigDecimal::from_str("2")?, + ), + ( + Date::from_calendar_date(1974, January, 1)?, + BigDecimal::from_str("3")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("4")?, + ), + ( + Date::from_calendar_date(1976, January, 1)?, + BigDecimal::from_str("5")?, + ), + ( + Date::from_calendar_date(1977, January, 1)?, + BigDecimal::from_str("6")?, + ), + ( + Date::from_calendar_date(1978, January, 1)?, + BigDecimal::from_str("7")?, + ), + ( + Date::from_calendar_date(1979, January, 1)?, + BigDecimal::from_str("8")?, + ), + ]; + assert_eq!(expected, data); + + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '5 YEAR', timestamp::DATE) AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(2, data.len()); + + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1970, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("6")?, + ), + ]; + assert_eq!(expected, data); + + let data: Vec<(Date, BigDecimal)> = "SELECT time_bucket(INTERVAL '2 YEAR', timestamp::DATE, DATE '1969-01-01') AS bucket, AVG(value) as avg_value FROM timeseries GROUP BY bucket ORDER BY bucket;" + .fetch_result(&mut conn).unwrap(); + + assert_eq!(6, data.len()); + + let expected: Vec<(Date, BigDecimal)> = vec![ + ( + Date::from_calendar_date(1969, January, 1)?, + BigDecimal::from_str("1")?, + ), + ( + Date::from_calendar_date(1971, January, 1)?, + BigDecimal::from_str("-0.5000")?, + ), + ( + Date::from_calendar_date(1973, January, 1)?, + BigDecimal::from_str("2.5000")?, + ), + ( + Date::from_calendar_date(1975, January, 1)?, + BigDecimal::from_str("4.5000")?, + ), + ( + Date::from_calendar_date(1977, January, 1)?, + BigDecimal::from_str("6.5000")?, + ), + ( + Date::from_calendar_date(1979, January, 1)?, + BigDecimal::from_str("8")?, + ), + ]; + assert_eq!(expected, data); + + Ok(()) +} + +#[rstest] +async fn test_time_bucket_fallback(mut conn: PgConnection) -> Result<()> { + let error_message = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues."; + let trips_table = NycTripsTable::setup(); + trips_table.execute(&mut conn); + + match "SELECT time_bucket(INTERVAL '2 DAY', tpep_pickup_datetime::DATE) AS bucket, AVG(trip_distance) as avg_value FROM nyc_trips GROUP BY bucket ORDER BY bucket;".execute_result(&mut conn) { + Ok(_) => { + panic!("Should have error'ed when calling time_bucket() on non-FDW data.") + } + Err(error) => { + let a = error.to_string().contains(error_message); + assert!(a); + } + } + + Ok(()) +}