Skip to content

Commit

Permalink
feat: 1414: Adds time_bucket() support for duckdb fdw (#32)
Browse files Browse the repository at this point in the history
* feat: 1414: Adds time_bucket() support for duckdb fdw

Signed-off-by: Devan <[email protected]>

* feat: 1414: Adds time_bucket() support for duckdb fdw

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* clippy linting

Signed-off-by: Devan <[email protected]>

* adds testing

Signed-off-by: Devan <[email protected]>

* adds test for postgres fallback

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* remove unneeded code

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* adding native PG UDF functionality to time_bucket()

Signed-off-by: Devan <[email protected]>

* todo

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

* refactoring calculations for time_bucket() I found edge cases

Signed-off-by: Devan <[email protected]>

* Adding origin parameter

Signed-off-by: Devan <[email protected]>

* Adding timestamp functions

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* (wip): working on offset

Signed-off-by: Devan <[email protected]>

* revert to only push down time_bucket() to FDW duckdb relations

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* add errors for methods

Signed-off-by: Devan <[email protected]>

* typo

Signed-off-by: Devan <[email protected]>

* move folder and make error const

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

* add more tests for expected data

Signed-off-by: Devan <[email protected]>

* fmt tests file

Signed-off-by: Devan <[email protected]>

* rm dead code warning -- should be a fixture for rstest

Signed-off-by: Devan <[email protected]>

---------

Signed-off-by: Devan <[email protected]>
  • Loading branch information
devanbenz authored Aug 19, 2024
1 parent 8c2130b commit 73ddff2
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 7 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# VS Code
.vscode/

# Jetbrains
.idea/

# macOS
.DS_Store

Expand Down Expand Up @@ -31,4 +34,4 @@ TPC-H_V*/
# Tests
regression.*
results/
.env
.env
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
mod csv;
mod duckdb;
mod parquet;
pub mod time_bucket;
58 changes: 58 additions & 0 deletions src/api/time_bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2023-2024 Retake, Inc.
//
// This file is part of ParadeDB - Postgres for Search and Analytics
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use pgrx::*;

const TIME_BUCKET_FALLBACK_ERROR: &str = "Function `time_bucket()` must be used with a DuckDB FDW. Native postgres does not support this function. If you believe this function should be implemented natively as a fallback please submit a ticket to https://github.com/paradedb/pg_analytics/issues.";

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date(_bucket_width: Interval, _input: Date) -> Date {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_origin(_bucket_width: Interval, _input: Date, _origin: Date) -> Date {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_date_offset(_bucket_width: Interval, _input: Date, _offset: Interval) -> Date {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp(_bucket_width: Interval, _input: Timestamp) -> Timestamp {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_date(
_bucket_width: Interval,
_input: Timestamp,
_origin: Date,
) -> Timestamp {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}

#[pg_extern(name = "time_bucket")]
pub fn time_bucket_timestamp_offset_interval(
_bucket_width: Interval,
_input: Timestamp,
_offset: Interval,
) -> Timestamp {
panic!("{}", TIME_BUCKET_FALLBACK_ERROR);
}
62 changes: 56 additions & 6 deletions tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::{
fs::{self, File},
io::Read,
path::{Path, PathBuf},
};

use anyhow::Result;
use async_std::task::block_on;
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::primitives::ByteStream;
use chrono::{DateTime, Duration};
use datafusion::arrow::array::{Int32Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit::Millisecond;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::{
arrow::{datatypes::FieldRef, record_batch::RecordBatch},
parquet::arrow::ArrowWriter,
Expand All @@ -35,6 +33,12 @@ use serde::Serialize;
use serde_arrow::schema::{SchemaLike, TracingOptions};
use shared::fixtures::tempfile::TempDir;
use sqlx::PgConnection;
use std::sync::Arc;
use std::{
fs::{self, File},
io::Read,
path::{Path, PathBuf},
};
use testcontainers::ContainerAsync;
use testcontainers_modules::{
localstack::LocalStack,
Expand Down Expand Up @@ -223,3 +227,49 @@ pub fn tempfile() -> std::fs::File {
pub fn duckdb_conn() -> duckdb::Connection {
duckdb::Connection::open_in_memory().unwrap()
}

#[fixture]
pub fn time_series_record_batch_minutes() -> Result<RecordBatch> {
let fields = vec![
Field::new("value", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(Millisecond, None), false),
];

let schema = Arc::new(Schema::new(fields));

let start_time = DateTime::from_timestamp(60, 0).unwrap();
let timestamps: Vec<i64> = (0..10)
.map(|i| (start_time + Duration::minutes(i)).timestamp_millis())
.collect();

Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])),
Arc::new(TimestampMillisecondArray::from(timestamps)),
],
)?)
}

#[fixture]
pub fn time_series_record_batch_years() -> Result<RecordBatch> {
let fields = vec![
Field::new("value", DataType::Int32, false),
Field::new("timestamp", DataType::Timestamp(Millisecond, None), false),
];

let schema = Arc::new(Schema::new(fields));

let start_time = DateTime::from_timestamp(60, 0).unwrap();
let timestamps: Vec<i64> = (0..10)
.map(|i| (start_time + Duration::days(i * 366)).timestamp_millis())
.collect();

Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(vec![1, -1, 0, 2, 3, 4, 5, 6, 7, 8])),
Arc::new(TimestampMillisecondArray::from(timestamps)),
],
)?)
}
Loading

0 comments on commit 73ddff2

Please sign in to comment.