Skip to content

Commit

Permalink
refactoring calculations for time_bucket() I found edge cases
Browse files Browse the repository at this point in the history
Signed-off-by: Devan <[email protected]>
  • Loading branch information
devanbenz committed Aug 7, 2024
1 parent 0d552fb commit 22a26ae
Showing 1 changed file with 80 additions and 83 deletions.
163 changes: 80 additions & 83 deletions src/duckdb/time_bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,84 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use chrono::{DateTime, Datelike, NaiveDateTime, Timelike};
use duckdb::arrow::temporal_conversions::SECONDS_IN_DAY;
use pgrx::iter::TableIterator;
use pgrx::*;

const HOURS_PER_MICRO: i64 = 3600000000;
const MINUTES_PER_MICRO: i64 = 60000000;
// Origin epoch for months/years intervals.
// The origin date is 2000-01-01.
// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin
const ORIGIN_UNIX_EPOCH: i128 = 946684800;
// Origin epoch for days/minutes/seconds intervals.
// Date is set to 2000-01-03.
// Please see: https://duckdb.org/docs/sql/functions/date.html#time_bucketbucket_width-date-origin
const DAYS_ORIGIN_UNIX_EPOCH: i128 = 946857600;
const MICROS_PER_SECOND: i128 = 1000000;

fn set_date(year: i32, month: u8, day: u8) -> Date {
fn set_date(year: i32, month: u32, day: u32) -> Date {
Date::from(
Timestamp::new(year, month, day, 0, 0, 0f64)
Timestamp::new(year, month as u8, day as u8, 0, 0, 0f64)
.unwrap_or_else(|error| panic!("There was an error in date creation: {}", error)),
)
}

fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8) -> Timestamp {
Timestamp::new(year, month, day, hour, minute, 0f64)
fn set_timestamp(year: i32, month: u8, day: u8, hour: u8, minute: u8, second: f64) -> Timestamp {
Timestamp::new(year, month, day, hour, minute, second)
.unwrap_or_else(|error| panic!("There was an error in timestamp creation: {}", error))
}

fn get_micros_delta(micros: i64, input: u8, divisor: i64) -> u8 {
let micros_quotient = (micros / divisor) as u8;
if micros_quotient == 0 {
return 0;
fn calculate_time_bucket(bucket_width_seconds: i128, input_unix_epoch: i128, months: i32) -> i128 {
if months != 0 {
let truncated_input_unix_epoch =
((input_unix_epoch - ORIGIN_UNIX_EPOCH) / bucket_width_seconds) * bucket_width_seconds;
ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch
} else {
let truncated_input_unix_epoch = ((input_unix_epoch - DAYS_ORIGIN_UNIX_EPOCH)
/ bucket_width_seconds)
* bucket_width_seconds;
DAYS_ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch
}
input % micros_quotient
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_no_offset(bucket_width: Interval, input: Date) -> Date {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() % years;
return set_date(input.year() - delta, input.month(), input.day());
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_date(input.year(), input.month() - delta as u8, input.day());
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_date(input.year(), input.month(), input.day() - delta as u8);
}
pub fn time_bucket_date(bucket_width: Interval, input: Date) -> Date {
let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND;
let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128;
let bucket_date = calculate_time_bucket(
bucket_width_seconds,
input_unix_epoch,
bucket_width.months(),
);

Date::from(Timestamp::new(input.year(), input.month(), input.day(), 0, 0, 0f64).unwrap())
if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) {
set_date(dt.year(), dt.month(), dt.day())
} else {
panic!("There was a problem setting the native datetime from provided unix epoch.")
}
}

// TODO: Need to implement offset for pg
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_date(
_bucket_width: Interval,
_input: Date,
_offset: Date,
) -> TableIterator<'static, (name!(time_bucket, Date),)> {
TableIterator::once((""
.parse()
.unwrap_or_else(|err| panic!("There was an error while parsing time_bucket(): {}", err)),))
pub fn time_bucket_date_origin(bucket_width: Interval, input: Date, origin: Date) -> Date {
let new_origin_epoch = (origin.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128;

let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND;
let input_unix_epoch = (input.to_unix_epoch_days() as i64 * SECONDS_IN_DAY) as i128;
let truncated_input_unix_epoch =
((input_unix_epoch - new_origin_epoch) / bucket_width_seconds) * bucket_width_seconds;

let bucket_date = ORIGIN_UNIX_EPOCH + truncated_input_unix_epoch;

if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) {
set_date(dt.year(), dt.month(), dt.day())
} else {
panic!("There was a problem setting the native datetime from provided unix epoch.")
}
}

// TODO: Need to implement offset for pg
#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset_interval(
pub fn time_bucket_date_offset(
_bucket_width: Interval,
_input: Date,
_offset: Interval,
Expand All @@ -84,56 +104,33 @@ pub fn time_bucket_date_offset_interval(

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp(bucket_width: Interval, input: Timestamp) -> Timestamp {
let years = bucket_width.months() / 12;
if years != 0 {
let delta = input.year() % years;
return set_timestamp(
input.year() - delta,
input.month(),
input.day(),
input.hour(),
input.minute(),
);
} else if bucket_width.months() != 0 {
let delta = input.month() as i32 % bucket_width.months();
return set_timestamp(
input.year(),
input.month() - delta as u8,
input.day(),
input.hour(),
input.minute(),
);
} else if bucket_width.days() != 0 {
let delta = input.day() as i32 % bucket_width.days();
return set_timestamp(
input.year(),
input.month(),
input.day() - delta as u8,
input.hour(),
input.minute(),
);
} else if bucket_width.micros() != 0 {
let hours_delta = get_micros_delta(bucket_width.micros(), input.hour(), HOURS_PER_MICRO);
let minutes_delta =
get_micros_delta(bucket_width.micros(), input.minute(), MINUTES_PER_MICRO);
return set_timestamp(
input.year(),
input.month(),
input.day(),
input.hour() - hours_delta,
input.minute() - minutes_delta,
);
}
let bucket_width_seconds = bucket_width.as_micros() / MICROS_PER_SECOND;
let input_string = input.to_iso_string();
let input_unix_epoch = NaiveDateTime::parse_from_str(&input_string, "%Y-%m-%dT%H:%M:%S")
.unwrap_or_else(|error| {
panic!(
"there was an error parsing the set TIMESTAMP value as a string: {}",
error
)
});
let bucket_date = calculate_time_bucket(
bucket_width_seconds,
input_unix_epoch.and_utc().timestamp() as i128,
bucket_width.months(),
);

Timestamp::new(
input.year(),
input.month(),
input.day(),
input.hour(),
input.minute(),
0f64,
)
.unwrap()
if let Some(dt) = DateTime::from_timestamp(bucket_date as i64, 0) {
set_timestamp(
dt.year(),
dt.month() as u8,
dt.day() as u8,
dt.hour() as u8,
dt.minute() as u8,
dt.second() as f64,
)
} else {
panic!("There was a problem setting the native datetime from provided unix epoch.")
}
}

// TODO: Need to implement offset for pg
Expand Down

0 comments on commit 22a26ae

Please sign in to comment.