diff --git a/src/duckdb/time_bucket.rs b/src/duckdb/time_bucket.rs index 7456fc1f..a83a4272 100644 --- a/src/duckdb/time_bucket.rs +++ b/src/duckdb/time_bucket.rs @@ -18,16 +18,75 @@ use pgrx::iter::TableIterator; use pgrx::*; +const HOURS_PER_MICRO: i64 = 3600000000; +const MINUTES_PER_MICRO: i64 = 60000000; + +fn set_date(year: i32, month: u8, day: u8) -> Date { + return Date::from(Timestamp::new( + year, + month, + day, + 0, + 0, + 0f64, + ).unwrap_or_else(|error| panic!("There was an error in date creation: {}", error))) +} + +fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp { + return Timestamp::new( + year, + month, + day, + hour, + minute, + 0f64, + ).unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))) +} + +fn get_hours_delta(micros: i64, input: u8) -> u8 { + let micros_to_hours = (micros / HOURS_PER_MICRO) as u8; + if micros_to_hours == 0 { + return 0; + } + input % micros_to_hours +} + +fn get_minutes_delta(micros: i64, input: u8) -> u8 { + let micros_to_minutes = (micros / MINUTES_PER_MICRO) as u8; + if micros_to_minutes == 0 { + return 0; + } + input % micros_to_minutes +} + #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_no_offset( - _bucket_width: Interval, - _input: Date, -) -> TableIterator<'static, (name!(time_bucket, Date),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + bucket_width: Interval, + input: Date, +) -> Date { + let years = bucket_width.months() / 12; + if years != 0 { + let delta = input.year() as i32 % years; + return set_date(input.year() - delta, input.month(), input.day()) + } else if bucket_width.months() != 0 { + let delta = input.month() as i32 % bucket_width.months(); + return set_date(input.year(), input.month() - delta as u8, input.day()) + } else if bucket_width.days() != 0 { + let delta = input.day() as i32 % bucket_width.days(); + return set_date(input.year(), input.month(), input.day() - delta as u8) + } + + Date::from(Timestamp::new( + input.year(), + input.month(), + input.day(), + 0, + 0, + 0f64, + ).unwrap()) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_date( _bucket_width: Interval, @@ -35,10 +94,11 @@ pub fn time_bucket_date_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_date_offset_interval( _bucket_width: Interval, @@ -46,20 +106,42 @@ pub fn time_bucket_date_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Date),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp( - _bucket_width: Interval, - _input: Timestamp, -) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { - TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + bucket_width: Interval, + input: Timestamp, +) -> Timestamp { + let years = bucket_width.months() / 12; + if years != 0 { + let delta = input.year() as i32 % years; + return set_timestamp(input.year() - delta, input.month(), input.day(), input.hour(), input.minute()) + } else if bucket_width.months() != 0 { + let delta = input.month() as i32 % bucket_width.months(); + return set_timestamp(input.year(), input.month() - delta as u8, input.day(), input.hour(), input.minute()) + } else if bucket_width.days() != 0 { + let delta = input.day() as i32 % bucket_width.days(); + return set_timestamp(input.year(), input.month(), input.day() - delta as u8, input.hour(), input.minute()) + } else if bucket_width.micros() != 0 { + let hours_delta = get_hours_delta(bucket_width.micros(), input.hour()); + let minutes_delta = get_minutes_delta(bucket_width.micros(), input.minute()); + return set_timestamp(input.year(), input.month(), input.day(), input.hour() - hours_delta, input.minute() - minutes_delta) + } + + Timestamp::new( + input.year(), + input.month(), + input.day(), + input.hour(), + input.minute(), + 0f64, + ).unwrap() } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_date( _bucket_width: Interval, @@ -67,10 +149,11 @@ pub fn time_bucket_timestamp_offset_date( _offset: Date, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } +// TODO: Need to implement offset #[pg_extern(name = "time_bucket")] pub fn time_bucket_timestamp_offset_interval( _bucket_width: Interval, @@ -78,6 +161,6 @@ pub fn time_bucket_timestamp_offset_interval( _offset: Interval, ) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> { TableIterator::once(("" - .parse() - .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) + .parse() + .unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),)) } diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index ac891c55..5ff20cb9 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -60,11 +60,6 @@ pub async fn executor_run( } }); - if !is_duckdb_query && query.contains("time_bucket") { - panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.\ - If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.") - } - if rtable.is_null() || query_desc.operation != pg_sys::CmdType_CMD_SELECT || !is_duckdb_query