Skip to content

Commit

Permalink
adding native PG UDF functionality to time_bucket()
Browse files Browse the repository at this point in the history
Signed-off-by: Devan <[email protected]>
  • Loading branch information
devanbenz committed Aug 5, 2024
1 parent 8d0b7d5 commit 606a22c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 25 deletions.
123 changes: 103 additions & 20 deletions src/duckdb/time_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,149 @@
use pgrx::iter::TableIterator;
use pgrx::*;

const HOURS_PER_MICRO: i64 = 3600000000;
const MINUTES_PER_MICRO: i64 = 60000000;

fn set_date(year: i32, month: u8, day: u8) -> Date {
return Date::from(Timestamp::new(
year,
month,
day,
0,
0,
0f64,
).unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)))
}

fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp {
return Timestamp::new(
year,
month,
day,
hour,
minute,
0f64,
).unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error)))
}

fn get_hours_delta(micros: i64, input: u8) -> u8 {
let micros_to_hours = (micros / HOURS_PER_MICRO) as u8;
if micros_to_hours == 0 {
return 0;
}
input % micros_to_hours
}

fn get_minutes_delta(micros: i64, input: u8) -> u8 {
let micros_to_minutes = (micros / MINUTES_PER_MICRO) as u8;
if micros_to_minutes == 0 {
return 0;
}
input % micros_to_minutes
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_no_offset(
_bucket_width: Interval,
_input: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
bucket_width: Interval,
input: Date,
) -> Date {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() as i32 % years;
return set_date(input.year() - delta, input.month(), input.day())
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_date(input.year(), input.month() - delta as u8, input.day())
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_date(input.year(), input.month(), input.day() - delta as u8)
}

Date::from(Timestamp::new(
input.year(),
input.month(),
input.day(),
0,
0,
0f64,
).unwrap())
}

// TODO: Need to implement offset
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_date(
_bucket_width: Interval,
_input: Date,
_offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

// TODO: Need to implement offset
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_interval(
_bucket_width: Interval,
_input: Date,
_offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp(
_bucket_width: Interval,
_input: Timestamp,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
bucket_width: Interval,
input: Timestamp,
) -> Timestamp {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() as i32 % years;
return set_timestamp(input.year() - delta, input.month(), input.day(), input.hour(), input.minute())
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_timestamp(input.year(), input.month() - delta as u8, input.day(), input.hour(), input.minute())
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_timestamp(input.year(), input.month(), input.day() - delta as u8, input.hour(), input.minute())
} else if bucket_width.micros() != 0 {
let hours_delta = get_hours_delta(bucket_width.micros(), input.hour());
let minutes_delta = get_minutes_delta(bucket_width.micros(), input.minute());
return set_timestamp(input.year(), input.month(), input.day(), input.hour() - hours_delta, input.minute() - minutes_delta)
}

Timestamp::new(
input.year(),
input.month(),
input.day(),
input.hour(),
input.minute(),
0f64,
).unwrap()
}

// TODO: Need to implement offset
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_date(
_bucket_width: Interval,
_input: Timestamp,
_offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}

// TODO: Need to implement offset
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_interval(
_bucket_width: Interval,
_input: Timestamp,
_offset: Interval,
) -> TableIterator<'static, (name!(time_bucket, Timestamp),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
}
5 changes: 0 additions & 5 deletions src/hooks/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ pub async fn executor_run(
}
});

if !is_duckdb_query && query.contains("time_bucket") {
panic!("Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function.\
If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.")
}

if rtable.is_null()
|| query_desc.operation != pg_sys::CmdType_CMD_SELECT
|| !is_duckdb_query
Expand Down

0 comments on commit 606a22c

Please sign in to comment.