From 4358c13d7700f6fb3168815adc4a4869fea0336e Mon Sep 17 00:00:00 2001 From: Ming Date: Thu, 22 Aug 2024 16:21:00 -0400 Subject: [PATCH] chore: Remove `shared` dependency (#91) * remove shared * remove wildcard imports * suppress linter --- Cargo.toml | 11 +- src/lib.rs | 12 +- tests/explain.rs | 5 +- tests/fixtures/arrow.rs | 651 ++++++++++++++++++++++++++ tests/fixtures/db.rs | 198 ++++++++ tests/fixtures/mod.rs | 27 +- tests/fixtures/tables/duckdb_types.rs | 149 ++++++ tests/fixtures/tables/mod.rs | 19 + tests/fixtures/tables/nyc_trips.rs | 240 ++++++++++ tests/scan.rs | 18 +- tests/settings.rs | 3 +- tests/table_config.rs | 13 +- tests/time_bucket.rs | 10 +- 13 files changed, 1305 insertions(+), 51 deletions(-) create mode 100644 tests/fixtures/arrow.rs create mode 100644 tests/fixtures/db.rs create mode 100644 tests/fixtures/tables/duckdb_types.rs create mode 100644 tests/fixtures/tables/mod.rs create mode 100644 tests/fixtures/tables/nyc_trips.rs diff --git a/Cargo.toml b/Cargo.toml index 80ed1b09..9e6d39aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ serde = "1.0.201" serde_json = "1.0.120" signal-hook = "0.3.17" signal-hook-async-std = "0.2.2" -shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652" } strum = { version = "0.26.3", features = ["derive"] } supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "27af09b" } thiserror = "1.0.59" @@ -40,22 +39,24 @@ uuid = "1.9.1" [dev-dependencies] aws-config = "1.5.1" aws-sdk-s3 = "1.34.0" +bigdecimal = { version = "0.3.0", features = ["serde"] } +bytes = "1.7.1" datafusion = "37.1.0" deltalake = { version = "0.17.3", features = ["datafusion"] } futures = "0.3.30" pgrx-tests = "0.11.3" rstest = "0.19.0" serde_arrow = { version = "0.11.3", features = ["arrow-51"] } -shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652", features = [ - "fixtures", -] } -sqlx = { version = "0.7.4", features = [ +soa_derive = "0.13.0" +sqlx = { version = "0.7.3", features = [ "postgres", "runtime-async-std", "time", "bigdecimal", "uuid", + "chrono", ] } +tempfile = "3.12.0" testcontainers = "0.16.7" testcontainers-modules = { version = "0.4.2", features = ["localstack"] } time = { version = "0.3.34", features = ["serde"] } diff --git a/src/lib.rs b/src/lib.rs index 535aef38..e04d7101 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,13 +23,10 @@ mod schema; use hooks::ExtensionHook; use pgrx::*; -use shared::{ - gucs::PostgresGlobalGucSettings, - // telemetry::{setup_telemetry_background_worker, ParadeExtension}, -}; +// TODO: Reactivate once we've properly integrated with the monorepo // A static variable is required to host grand unified configuration settings. -pub static GUCS: PostgresGlobalGucSettings = PostgresGlobalGucSettings::new(); +// pub static GUCS: PostgresGlobalGucSettings = PostgresGlobalGucSettings::new(); pg_module_magic!(); @@ -42,9 +39,8 @@ pub extern "C" fn _PG_init() { register_hook(&mut EXTENSION_HOOK) }; - GUCS.init("pg_analytics"); - - // TODO: Reactivate once we've properly integrated with the monorepo + // TODO: Depends on above TODO + // GUCS.init("pg_analytics"); // setup_telemetry_background_worker(ParadeExtension::PgAnalytics); } diff --git a/tests/explain.rs b/tests/explain.rs index cc892f67..05cbbe1e 100644 --- a/tests/explain.rs +++ b/tests/explain.rs @@ -17,11 +17,14 @@ mod fixtures; +use crate::fixtures::db::Query; +use crate::fixtures::{conn, s3, S3}; use anyhow::Result; -use fixtures::*; use rstest::*; use sqlx::PgConnection; +use crate::fixtures::tables::nyc_trips::NycTripsTable; + const S3_BUCKET: &str = "test-trip-setup"; const S3_KEY: &str = "test_trip_setup.parquet"; diff --git a/tests/fixtures/arrow.rs b/tests/fixtures/arrow.rs new file mode 100644 index 00000000..0661178b --- /dev/null +++ b/tests/fixtures/arrow.rs @@ -0,0 +1,651 @@ +#![allow(dead_code)] + +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::Arc; + +use anyhow::{bail, Result}; +use bigdecimal::{BigDecimal, ToPrimitive}; +use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Timelike}; +use datafusion::arrow::array::*; +use datafusion::arrow::buffer::Buffer; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use datafusion::arrow::record_batch::RecordBatch; +use pgrx::pg_sys::InvalidOid; +use pgrx::PgBuiltInOids; +use sqlx::postgres::PgRow; +use sqlx::{Postgres, Row, TypeInfo, ValueRef}; + +fn array_data() -> ArrayData { + let values: [u8; 12] = *b"helloparquet"; + let offsets: [i32; 4] = [0, 5, 5, 12]; // Note: Correct the offsets to accurately reflect boundaries + + ArrayData::builder(DataType::Binary) + .len(3) // Set length to 3 to match other arrays + .add_buffer(Buffer::from_slice_ref(&offsets[..])) + .add_buffer(Buffer::from_slice_ref(&values[..])) + .build() + .unwrap() +} + +// Fixed size binary is not supported yet, but this will be useful for test data when we do support. +fn fixed_size_array_data() -> ArrayData { + let values: [u8; 15] = *b"hellotherearrow"; // Ensure length is consistent + + ArrayData::builder(DataType::FixedSizeBinary(5)) + .len(3) + .add_buffer(Buffer::from(&values[..])) + .build() + .unwrap() +} + +fn binary_array_data() -> ArrayData { + let values: [u8; 12] = *b"helloparquet"; + let offsets: [i64; 4] = [0, 5, 5, 12]; + + ArrayData::builder(DataType::LargeBinary) + .len(3) // Ensure length is consistent + .add_buffer(Buffer::from_slice_ref(&offsets[..])) + .add_buffer(Buffer::from_slice_ref(&values[..])) + .build() + .unwrap() +} + +/// A separate version of the primitive_record_batch fixture, +/// narrowed to only the types that Delta Lake supports. +pub fn delta_primitive_record_batch() -> Result { + let fields = vec![ + Field::new("boolean_col", DataType::Boolean, false), + Field::new("int8_col", DataType::Int8, false), + Field::new("int16_col", DataType::Int16, false), + Field::new("int32_col", DataType::Int32, false), + Field::new("int64_col", DataType::Int64, false), + Field::new("float32_col", DataType::Float32, false), + Field::new("float64_col", DataType::Float64, false), + Field::new("date32_col", DataType::Date32, false), + Field::new("binary_col", DataType::Binary, false), + Field::new("utf8_col", DataType::Utf8, false), + ]; + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(BooleanArray::from(vec![true, true, false])), + Arc::new(Int8Array::from(vec![1, -1, 0])), + Arc::new(Int16Array::from(vec![1, -1, 0])), + Arc::new(Int32Array::from(vec![1, -1, 0])), + Arc::new(Int64Array::from(vec![1, -1, 0])), + Arc::new(Float32Array::from(vec![1.0, -1.0, 0.0])), + Arc::new(Float64Array::from(vec![1.0, -1.0, 0.0])), + Arc::new(Date32Array::from(vec![18262, 18263, 18264])), + Arc::new(BinaryArray::from(array_data())), + Arc::new(StringArray::from(vec![ + Some("Hello"), + Some("There"), + Some("World"), + ])), + ], + )?; + + Ok(batch) +} + +// Used to test case sensitivity in column names +pub fn record_batch_with_casing() -> Result { + let fields = vec![Field::new("Boolean_Col", DataType::Boolean, false)]; + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(BooleanArray::from(vec![true, true, false]))], + )?; + + Ok(batch) +} + +// Blows up deltalake, so comment out for now. +pub fn primitive_record_batch() -> Result { + // Define the fields for each datatype + let fields = vec![ + Field::new("boolean_col", DataType::Boolean, true), + Field::new("int8_col", DataType::Int8, false), + Field::new("int16_col", DataType::Int16, false), + Field::new("int32_col", DataType::Int32, false), + Field::new("int64_col", DataType::Int64, false), + Field::new("uint8_col", DataType::UInt8, false), + Field::new("uint16_col", DataType::UInt16, false), + Field::new("uint32_col", DataType::UInt32, false), + Field::new("uint64_col", DataType::UInt64, false), + Field::new("float32_col", DataType::Float32, false), + Field::new("float64_col", DataType::Float64, false), + Field::new("date32_col", DataType::Date32, false), + Field::new("date64_col", DataType::Date64, false), + Field::new("binary_col", DataType::Binary, false), + Field::new("large_binary_col", DataType::LargeBinary, false), + Field::new("utf8_col", DataType::Utf8, false), + Field::new("large_utf8_col", DataType::LargeUtf8, false), + ]; + + // Create a schema from the fields + let schema = Arc::new(Schema::new(fields)); + + // Create a RecordBatch + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(BooleanArray::from(vec![ + Some(true), + Some(true), + Some(false), + ])), + Arc::new(Int8Array::from(vec![1, -1, 0])), + Arc::new(Int16Array::from(vec![1, -1, 0])), + Arc::new(Int32Array::from(vec![1, -1, 0])), + Arc::new(Int64Array::from(vec![1, -1, 0])), + Arc::new(UInt8Array::from(vec![1, 2, 0])), + Arc::new(UInt16Array::from(vec![1, 2, 0])), + Arc::new(UInt32Array::from(vec![1, 2, 0])), + Arc::new(UInt64Array::from(vec![1, 2, 0])), + Arc::new(Float32Array::from(vec![1.0, -1.0, 0.0])), + Arc::new(Float64Array::from(vec![1.0, -1.0, 0.0])), + Arc::new(Date32Array::from(vec![18262, 18263, 18264])), + Arc::new(Date64Array::from(vec![ + 1609459200000, + 1609545600000, + 1609632000000, + ])), + Arc::new(BinaryArray::from(array_data())), + Arc::new(LargeBinaryArray::from(binary_array_data())), + Arc::new(StringArray::from(vec![ + Some("Hello"), + Some("There"), + Some("World"), + ])), + Arc::new(LargeStringArray::from(vec![ + Some("Hello"), + Some("There"), + Some("World"), + ])), + ], + )?) +} + +pub fn primitive_create_foreign_data_wrapper( + wrapper: &str, + handler: &str, + validator: &str, +) -> String { + format!("CREATE FOREIGN DATA WRAPPER {wrapper} HANDLER {handler} VALIDATOR {validator}") +} + +pub fn primitive_create_server(server: &str, wrapper: &str) -> String { + format!("CREATE SERVER {server} FOREIGN DATA WRAPPER {wrapper}") +} + +pub fn primitive_create_user_mapping_options(user: &str, server: &str) -> String { + format!("CREATE USER MAPPING FOR {user} SERVER {server}",) +} + +pub fn auto_create_table(server: &str, table: &str) -> String { + format!("CREATE FOREIGN TABLE {table} () SERVER {server}") +} + +fn create_field_definition(fields: &[(&str, &str)]) -> String { + fields + .iter() + .map(|(field_name, field_type)| format!("{field_name} {field_type}")) + .collect::>() + .join(",") +} + +pub fn create_foreign_table(server: &str, table: &str, fields: &[(&str, &str)]) -> String { + let fields_definition = create_field_definition(fields); + format!("CREATE FOREIGN TABLE {table} ({fields_definition}) SERVER {server}") +} + +pub fn setup_fdw_local_parquet_file_listing( + local_file_path: &str, + table: &str, + fields: &[(&str, &str)], +) -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "parquet_wrapper", + "parquet_fdw_handler", + "parquet_fdw_validator", + ); + let create_server = primitive_create_server("parquet_server", "parquet_wrapper"); + let create_table = create_foreign_table("parquet_server", table, fields); + + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_table} OPTIONS (files '{local_file_path}'); + "# + ) +} + +// Some fields have been commented out to get tests to pass +// See https://github.com/paradedb/paradedb/issues/1299 +fn primitive_table_columns() -> Vec<(&'static str, &'static str)> { + vec![ + ("boolean_col", "boolean"), + ("int8_col", "smallint"), + ("int16_col", "smallint"), + ("int32_col", "integer"), + ("int64_col", "bigint"), + ("uint8_col", "smallint"), + ("uint16_col", "integer"), + ("uint32_col", "bigint"), + ("uint64_col", "numeric(20)"), + ("float32_col", "real"), + ("float64_col", "double precision"), + ("date32_col", "date"), + ("date64_col", "date"), + ("binary_col", "bytea"), + ("large_binary_col", "bytea"), + ("utf8_col", "text"), + ("large_utf8_col", "text"), + ] +} + +pub fn primitive_create_table(server: &str, table: &str) -> String { + create_foreign_table(server, table, &primitive_table_columns()) +} + +fn primitive_delta_table_columns() -> Vec<(&'static str, &'static str)> { + vec![ + ("boolean_col", "boolean"), + ("int8_col", "smallint"), + ("int16_col", "smallint"), + ("int32_col", "integer"), + ("int64_col", "bigint"), + ("float32_col", "real"), + ("float64_col", "double precision"), + ("date32_col", "date"), + ("binary_col", "bytea"), + ("utf8_col", "text"), + ] +} + +pub fn primitive_create_delta_table(server: &str, table: &str) -> String { + create_foreign_table(server, table, &primitive_delta_table_columns()) +} + +pub fn primitive_setup_fdw_s3_listing( + s3_endpoint: &str, + s3_object_path: &str, + table: &str, +) -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "parquet_wrapper", + "parquet_fdw_handler", + "parquet_fdw_validator", + ); + let create_user_mapping_options = + primitive_create_user_mapping_options("public", "parquet_server"); + let create_server = primitive_create_server("parquet_server", "parquet_wrapper"); + let create_table = primitive_create_table("parquet_server", table); + + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_user_mapping_options} OPTIONS (type 'S3', region 'us-east-1', endpoint '{s3_endpoint}', use_ssl 'false', url_style 'path'); + {create_table} OPTIONS (files '{s3_object_path}'); + "# + ) +} + +pub fn primitive_setup_fdw_s3_delta( + s3_endpoint: &str, + s3_object_path: &str, + table: &str, +) -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "delta_wrapper", + "delta_fdw_handler", + "delta_fdw_validator", + ); + let create_user_mapping_options = + primitive_create_user_mapping_options("public", "delta_server"); + let create_server = primitive_create_server("delta_server", "delta_wrapper"); + let create_table = primitive_create_delta_table("delta_server", table); + + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_user_mapping_options} OPTIONS (type 'S3', region 'us-east-1', endpoint '{s3_endpoint}', use_ssl 'false', url_style 'path'); + {create_table} OPTIONS (files '{s3_object_path}'); + "# + ) +} + +pub fn primitive_setup_fdw_local_file_listing(local_file_path: &str, table: &str) -> String { + setup_fdw_local_parquet_file_listing(local_file_path, table, &primitive_table_columns()) +} + +pub fn primitive_setup_fdw_local_file_delta(local_file_path: &str, table: &str) -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "delta_wrapper", + "delta_fdw_handler", + "delta_fdw_validator", + ); + let create_server = primitive_create_server("delta_server", "delta_wrapper"); + let create_table = primitive_create_delta_table("delta_server", table); + + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_table} OPTIONS (files '{local_file_path}'); + "# + ) +} + +pub fn setup_local_file_listing_with_casing(local_file_path: &str, table: &str) -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "parquet_wrapper", + "parquet_fdw_handler", + "parquet_fdw_validator", + ); + let create_server = primitive_create_server("parquet_server", "parquet_wrapper"); + let create_table = auto_create_table("parquet_server", table); + + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_table} OPTIONS (files '{local_file_path}', preserve_casing 'true'); + "# + ) +} + +fn valid(data_type: &DataType, oid: u32) -> bool { + let oid = match PgBuiltInOids::from_u32(oid) { + Ok(oid) => oid, + _ => return false, + }; + match data_type { + DataType::Null => false, + DataType::Boolean => matches!(oid, PgBuiltInOids::BOOLOID), + DataType::Int8 => matches!(oid, PgBuiltInOids::INT2OID), + DataType::Int16 => matches!(oid, PgBuiltInOids::INT2OID), + DataType::Int32 => matches!(oid, PgBuiltInOids::INT4OID), + DataType::Int64 => matches!(oid, PgBuiltInOids::INT8OID), + DataType::UInt8 => matches!(oid, PgBuiltInOids::INT2OID), + DataType::UInt16 => matches!(oid, PgBuiltInOids::INT4OID), + DataType::UInt32 => matches!(oid, PgBuiltInOids::INT8OID), + DataType::UInt64 => matches!(oid, PgBuiltInOids::NUMERICOID), + DataType::Float16 => false, // Not supported yet. + DataType::Float32 => matches!(oid, PgBuiltInOids::FLOAT4OID), + DataType::Float64 => matches!(oid, PgBuiltInOids::FLOAT8OID), + DataType::Timestamp(_, _) => matches!(oid, PgBuiltInOids::TIMESTAMPOID), + DataType::Date32 => matches!(oid, PgBuiltInOids::DATEOID), + DataType::Date64 => matches!(oid, PgBuiltInOids::DATEOID), + DataType::Time32(_) => matches!(oid, PgBuiltInOids::TIMEOID), + DataType::Time64(_) => matches!(oid, PgBuiltInOids::TIMEOID), + DataType::Duration(_) => false, // Not supported yet. + DataType::Interval(_) => false, // Not supported yet. + DataType::Binary => matches!(oid, PgBuiltInOids::BYTEAOID), + DataType::FixedSizeBinary(_) => false, // Not supported yet. + DataType::LargeBinary => matches!(oid, PgBuiltInOids::BYTEAOID), + DataType::BinaryView => matches!(oid, PgBuiltInOids::BYTEAOID), + DataType::Utf8 => matches!(oid, PgBuiltInOids::TEXTOID), + DataType::LargeUtf8 => matches!(oid, PgBuiltInOids::TEXTOID), + // Remaining types are not supported yet. + DataType::Utf8View => false, + DataType::List(_) => false, + DataType::ListView(_) => false, + DataType::FixedSizeList(_, _) => false, + DataType::LargeList(_) => false, + DataType::LargeListView(_) => false, + DataType::Struct(_) => false, + DataType::Union(_, _) => false, + DataType::Dictionary(_, _) => false, + DataType::Decimal128(_, _) => false, + DataType::Decimal256(_, _) => false, + DataType::Map(_, _) => false, + DataType::RunEndEncoded(_, _) => false, + } +} + +fn decode<'r, T: sqlx::Decode<'r, Postgres> + sqlx::Type>( + field: &Field, + row: &'r PgRow, +) -> Result { + let field_name = field.name(); + let field_type = field.data_type(); + + let col = row.try_get_raw(field.name().as_str())?; + let info = col.type_info(); + let oid = info.oid().map(|o| o.0).unwrap_or(InvalidOid.into()); + if !valid(field_type, oid) { + bail!( + "field '{}' has arrow type '{}', which cannot be read from postgres type '{}'", + field.name(), + field.data_type(), + info.name() + ) + } + + Ok(row.try_get(field_name.as_str())?) +} + +pub fn schema_to_batch(schema: &SchemaRef, rows: &[PgRow]) -> Result { + let unix_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let arrays = schema + .fields() + .into_iter() + .map(|field| { + Ok(match field.data_type() { + DataType::Boolean => Arc::new(BooleanArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Int8 => Arc::new(Int8Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n as i8))) + .collect::>>()?, + )) as ArrayRef, + DataType::Int16 => Arc::new(Int16Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Int32 => Arc::new(Int32Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Int64 => Arc::new(Int64Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::UInt8 => Arc::new(UInt8Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n as u8))) + .collect::>>()?, + )) as ArrayRef, + DataType::UInt16 => Arc::new(UInt16Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n as u16))) + .collect::>>()?, + )) as ArrayRef, + DataType::UInt32 => Arc::new(UInt32Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n as u32))) + .collect::>>()?, + )) as ArrayRef, + DataType::UInt64 => Arc::new(UInt64Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.and_then(|n| n.to_u64()))) + .collect::>>()?, + )) as ArrayRef, + DataType::Float32 => Arc::new(Float32Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Float64 => Arc::new(Float64Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n.and_utc().timestamp()))) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n.and_utc().timestamp_millis()))) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n.and_utc().timestamp_micros()))) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| o.and_then(|n| n.and_utc().timestamp_nanos_opt())) + }) + .collect::>>()?, + )) as ArrayRef, + }, + DataType::Date32 => Arc::new(Date32Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| { + o.map(|n| n.signed_duration_since(unix_epoch).num_days() as i32) + }) + }) + .collect::>>()?, + )) as ArrayRef, + DataType::Date64 => Arc::new(Date64Array::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| { + o.map(|n| n.signed_duration_since(unix_epoch).num_milliseconds()) + }) + }) + .collect::>>()?, + )) as ArrayRef, + DataType::Time32(unit) => match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| row.map(|o| o.map(|n| n.num_seconds_from_midnight() as i32))) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| { + o.map(|n| { + (n.num_seconds_from_midnight() * 1000 + + (n.nanosecond() / 1_000_000)) + as i32 + }) + }) + }) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Microsecond => bail!("arrow time32 does not support microseconds"), + TimeUnit::Nanosecond => bail!("arrow time32 does not support nanoseconds"), + }, + DataType::Time64(unit) => match unit { + TimeUnit::Second => bail!("arrow time64i does not support seconds"), + TimeUnit::Millisecond => bail!("arrow time64 does not support millseconds"), + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| { + o.map(|n| { + (n.num_seconds_from_midnight() * 1_000_000 + + (n.nanosecond() / 1_000)) + as i64 + }) + }) + }) + .collect::>>()?, + )) as ArrayRef, + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .map(|row| { + row.map(|o| { + o.map(|n| { + (n.num_seconds_from_midnight() as u64 * 1_000_000_000 + + (n.nanosecond() as u64)) + .try_into() + .ok() + .unwrap_or(i64::MAX) + }) + }) + }) + .collect::>>()?, + )) as ArrayRef, + }, + DataType::Binary => Arc::new(BinaryArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::LargeBinary => Arc::new(LargeBinaryArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::Utf8 => Arc::new(StringArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + DataType::LargeUtf8 => Arc::new(LargeStringArray::from( + rows.iter() + .map(|row| decode::>(field, row)) + .collect::>>()?, + )) as ArrayRef, + _ => bail!("cannot read into arrow type '{}'", field.data_type()), + }) + }) + .collect::>>()?; + + Ok(RecordBatch::try_new(schema.clone(), arrays)?) +} diff --git a/tests/fixtures/db.rs b/tests/fixtures/db.rs new file mode 100644 index 00000000..8b61f836 --- /dev/null +++ b/tests/fixtures/db.rs @@ -0,0 +1,198 @@ +#![allow(dead_code)] + +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// TECH DEBT: This file is a copy of the `db.rs` file from https://github.com/paradedb/paradedb/blob/dev/shared/src/fixtures/db.rs +// We duplicated because the paradedb repo may use a different version of pgrx than pg_analytics, but eventually we should +// move this into a separate crate without any dependencies on pgrx. + +use super::arrow::schema_to_batch; +use async_std::prelude::Stream; +use async_std::stream::StreamExt; +use async_std::task::block_on; +use bytes::Bytes; +use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; +use sqlx::{ + postgres::PgRow, + testing::{TestArgs, TestContext, TestSupport}, + ConnectOptions, Decode, Executor, FromRow, PgConnection, Postgres, Type, +}; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub struct Db { + context: TestContext, +} + +impl Db { + pub async fn new() -> Self { + // Use a timestamp as a unique identifier. + let path = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_micros() + .to_string(); + + let args = TestArgs::new(Box::leak(path.into_boxed_str())); + let context = Postgres::test_context(&args) + .await + .unwrap_or_else(|err| panic!("could not create test database: {err:#?}")); + + Self { context } + } + + pub async fn connection(&self) -> PgConnection { + self.context + .connect_opts + .connect() + .await + .unwrap_or_else(|err| panic!("failed to connect to test database: {err:#?}")) + } +} + +impl Drop for Db { + fn drop(&mut self) { + let db_name = self.context.db_name.to_string(); + async_std::task::spawn(async move { + Postgres::cleanup_test(db_name.as_str()).await.unwrap(); + }); + } +} + +pub trait Query +where + Self: AsRef + Sized, +{ + fn execute(self, connection: &mut PgConnection) { + block_on(async { + connection.execute(self.as_ref()).await.unwrap(); + }) + } + + fn execute_result(self, connection: &mut PgConnection) -> Result<(), sqlx::Error> { + block_on(async { connection.execute(self.as_ref()).await })?; + Ok(()) + } + + fn fetch(self, connection: &mut PgConnection) -> Vec + where + T: for<'r> FromRow<'r, ::Row> + Send + Unpin, + { + block_on(async { + sqlx::query_as::<_, T>(self.as_ref()) + .fetch_all(connection) + .await + .unwrap_or_else(|_| panic!("error in query '{}'", self.as_ref())) + }) + } + + fn fetch_dynamic(self, connection: &mut PgConnection) -> Vec { + block_on(async { + sqlx::query(self.as_ref()) + .fetch_all(connection) + .await + .unwrap_or_else(|_| panic!("error in query '{}'", self.as_ref())) + }) + } + + /// A convenient helper for processing PgRow results from Postgres into a DataFusion RecordBatch. + /// It's important to note that the retrieved RecordBatch may not necessarily have the same + /// column order as your Postgres table, or parquet file in a foreign table. + /// You shouldn't expect to be able to test two RecordBatches directly for equality. + /// Instead, just test the column equality for each column, like so: + /// + /// assert_eq!(stored_batch.num_columns(), retrieved_batch.num_columns()); + /// for field in stored_batch.schema().fields() { + /// assert_eq!( + /// stored_batch.column_by_name(field.name()), + /// retrieved_batch.column_by_name(field.name()) + /// ) + /// } + /// + fn fetch_recordbatch(self, connection: &mut PgConnection, schema: &SchemaRef) -> RecordBatch { + block_on(async { + let rows = sqlx::query(self.as_ref()) + .fetch_all(connection) + .await + .unwrap_or_else(|_| panic!("error in query '{}'", self.as_ref())); + schema_to_batch(schema, &rows).expect("could not convert rows to RecordBatch") + }) + } + + fn fetch_scalar(self, connection: &mut PgConnection) -> Vec + where + T: Type + for<'a> Decode<'a, sqlx::Postgres> + Send + Unpin, + { + block_on(async { + sqlx::query_scalar(self.as_ref()) + .fetch_all(connection) + .await + .unwrap_or_else(|_| panic!("error in query '{}'", self.as_ref())) + }) + } + + fn fetch_one(self, connection: &mut PgConnection) -> T + where + T: for<'r> FromRow<'r, ::Row> + Send + Unpin, + { + block_on(async { + sqlx::query_as::<_, T>(self.as_ref()) + .fetch_one(connection) + .await + .unwrap_or_else(|_| panic!("error in query '{}'", self.as_ref())) + }) + } + + fn fetch_result(self, connection: &mut PgConnection) -> Result, sqlx::Error> + where + T: for<'r> FromRow<'r, ::Row> + Send + Unpin, + { + block_on(async { + sqlx::query_as::<_, T>(self.as_ref()) + .fetch_all(connection) + .await + }) + } + + fn fetch_collect(self, connection: &mut PgConnection) -> B + where + T: for<'r> FromRow<'r, ::Row> + Send + Unpin, + B: FromIterator, + { + self.fetch(connection).into_iter().collect::() + } +} + +impl Query for String {} +impl Query for &String {} +impl Query for &str {} + +pub trait DisplayAsync: Stream> + Sized { + fn to_csv(self) -> String { + let mut csv_str = String::new(); + let mut stream = Box::pin(self); + + while let Some(chunk) = block_on(stream.as_mut().next()) { + let chunk = chunk.unwrap(); + csv_str.push_str(&String::from_utf8_lossy(&chunk)); + } + + csv_str + } +} + +impl DisplayAsync for T where T: Stream> + Send + Sized {} diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index e4a663c0..31cca9d2 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -15,14 +15,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{ - fs::{self, File}, - io::Cursor, - io::Read, - path::{Path, PathBuf}, -}; +pub mod arrow; +pub mod db; +pub mod tables; -use anyhow::Context; use anyhow::Result; use async_std::task::block_on; use aws_config::{BehaviorVersion, Region}; @@ -38,7 +34,6 @@ use futures::future::{BoxFuture, FutureExt}; use rstest::*; use serde::Serialize; use serde_arrow::schema::{SchemaLike, TracingOptions}; -use shared::fixtures::tempfile::TempDir; use sqlx::PgConnection; use testcontainers::ContainerAsync; use testcontainers_modules::{ @@ -46,11 +41,8 @@ use testcontainers_modules::{ testcontainers::{runners::AsyncRunner, RunnableImage}, }; -pub use shared::fixtures::db::*; -#[allow(unused_imports)] -pub use shared::fixtures::tables::*; -#[allow(unused_imports)] -pub use shared::fixtures::utils::*; +use crate::fixtures::db::*; +use crate::fixtures::tables::nyc_trips::NycTripsTable; #[fixture] pub fn database() -> Db { @@ -251,13 +243,8 @@ pub async fn s3() -> S3 { } #[fixture] -pub fn tempdir() -> TempDir { - shared::fixtures::tempfile::tempdir().unwrap() -} - -#[fixture] -pub fn tempfile() -> std::fs::File { - shared::fixtures::tempfile::tempfile().unwrap() +pub fn tempdir() -> tempfile::TempDir { + tempfile::tempdir().unwrap() } #[fixture] diff --git a/tests/fixtures/tables/duckdb_types.rs b/tests/fixtures/tables/duckdb_types.rs new file mode 100644 index 00000000..0d3715c9 --- /dev/null +++ b/tests/fixtures/tables/duckdb_types.rs @@ -0,0 +1,149 @@ +#![allow(dead_code)] + +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use sqlx::postgres::types::PgInterval; +use sqlx::types::{BigDecimal, Json, Uuid}; +use sqlx::FromRow; +use std::collections::HashMap; +use time::{Date, OffsetDateTime, PrimitiveDateTime, Time}; + +#[derive(Debug, PartialEq, FromRow)] +pub struct DuckdbTypesTable { + pub bool_col: bool, + pub tinyint_col: i16, + pub smallint_col: i16, + pub integer_col: i32, + pub bigint_col: i64, + pub utinyint_col: i32, + pub usmallint_col: i32, + pub uinteger_col: i64, + pub ubigint_col: BigDecimal, + pub float_col: f64, + pub double_col: f64, + pub timestamp_col: PrimitiveDateTime, + pub date_col: Date, + pub time_col: Time, + pub interval_col: PgInterval, + pub hugeint_col: f64, + pub uhugeint_col: f64, + pub varchar_col: String, + pub blob_col: String, + pub decimal_col: BigDecimal, + pub timestamp_s_col: PrimitiveDateTime, + pub timestamp_ms_col: PrimitiveDateTime, + pub timestamp_ns_col: PrimitiveDateTime, + pub list_col: Vec, + pub struct_col: Json>, + pub array_col: [i32; 3], + pub uuid_col: Uuid, + pub time_tz_col: Time, + pub timestamp_tz_col: OffsetDateTime, +} + +impl DuckdbTypesTable { + pub fn create_duckdb_table() -> String { + DUCKDB_TYPES_TABLE_CREATE.to_string() + } + + pub fn export_duckdb_table(path: &str) -> String { + format!("COPY duckdb_types_test TO '{path}' (FORMAT PARQUET)") + } + + pub fn populate_duckdb_table() -> String { + DUCKDB_TYPES_TABLE_INSERT.to_string() + } + + pub fn create_foreign_table(path: &str) -> String { + format!( + r#" + CREATE FOREIGN DATA WRAPPER parquet_wrapper HANDLER parquet_fdw_handler VALIDATOR parquet_fdw_validator; + CREATE SERVER parquet_server FOREIGN DATA WRAPPER parquet_wrapper; + CREATE FOREIGN TABLE duckdb_types_test () SERVER parquet_server OPTIONS (files '{path}'); + "# + ) + } +} + +static DUCKDB_TYPES_TABLE_CREATE: &str = r#" +CREATE TABLE duckdb_types_test ( + bool_col BOOLEAN, + tinyint_col TINYINT, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + utinyint_col UTINYINT, + usmallint_col USMALLINT, + uinteger_col UINTEGER, + ubigint_col UBIGINT, + float_col FLOAT, + double_col DOUBLE, + timestamp_col TIMESTAMP, + date_col DATE, + time_col TIME, + interval_col INTERVAL, + hugeint_col HUGEINT, + uhugeint_col UHUGEINT, + varchar_col VARCHAR, + blob_col BLOB, + decimal_col DECIMAL, + timestamp_s_col TIMESTAMP_S, + timestamp_ms_col TIMESTAMP_MS, + timestamp_ns_col TIMESTAMP_NS, + list_col INTEGER[], + struct_col STRUCT(a VARCHAR, b VARCHAR), + array_col INTEGER[3], + uuid_col UUID, + time_tz_col TIMETZ, + timestamp_tz_col TIMESTAMPTZ +); +"#; + +static DUCKDB_TYPES_TABLE_INSERT: &str = r#" +INSERT INTO duckdb_types_test VALUES ( + TRUE, + 127, + 32767, + 2147483647, + 9223372036854775807, + 255, + 65535, + 4294967295, + 18446744073709551615, + 1.23, + 2.34, + '2023-06-27 12:34:56', + '2023-06-27', + '12:34:56', + INTERVAL '1 day', + 12345678901234567890, + 12345678901234567890, + 'Example text', + '\x41', + 12345.67, + '2023-06-27 12:34:56', + '2023-06-27 12:34:56.789', + '2023-06-27 12:34:56.789123', + [1, 2, 3], + ROW('abc', 'def'), + [1, 2, 3], + '550e8400-e29b-41d4-a716-446655440000', + '12:34:56+02', + '2023-06-27 12:34:56+02' +); +"#; diff --git a/tests/fixtures/tables/mod.rs b/tests/fixtures/tables/mod.rs new file mode 100644 index 00000000..dab3016a --- /dev/null +++ b/tests/fixtures/tables/mod.rs @@ -0,0 +1,19 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub mod duckdb_types; +pub mod nyc_trips; diff --git a/tests/fixtures/tables/nyc_trips.rs b/tests/fixtures/tables/nyc_trips.rs new file mode 100644 index 00000000..f1d92723 --- /dev/null +++ b/tests/fixtures/tables/nyc_trips.rs @@ -0,0 +1,240 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use serde::{Deserialize, Serialize}; +use soa_derive::StructOfArray; +use sqlx::FromRow; + +#[derive(Debug, PartialEq, FromRow, StructOfArray, Default, Serialize, Deserialize)] +pub struct NycTripsTable { + #[sqlx(rename = "VendorID", default)] + #[serde(rename = "VendorID")] + pub vendor_id: Option, + // For now, we're commenting out the datetime fields because they are presenting + // a problem when serializing to parquet with serde-arrow. While these fields + // do exist in the nyc_trips Postgres table that we create, we'll entirely + // skip reading them into Rust with sqlx. + // pub tpep_pickup_datetime: Option, + // pub tpep_dropoff_datetime: Option, + pub passenger_count: Option, + pub trip_distance: Option, + #[sqlx(rename = "RatecodeID", default)] + #[serde(rename = "RatecodeID")] + pub ratecode_id: Option, + #[sqlx(rename = "store_and_fwd_flag", default)] + #[serde(rename = "store_and_fwd_flag")] + pub store_and_fwd_flag: Option, + #[sqlx(rename = "PULocationID", default)] + #[serde(rename = "PULocationID")] + pub pu_location_id: Option, + #[sqlx(rename = "DOLocationID", default)] + #[serde(rename = "DOLocationID")] + pub do_location_id: Option, + pub payment_type: Option, + pub fare_amount: Option, + pub extra: Option, + pub mta_tax: Option, + pub tip_amount: Option, + pub tolls_amount: Option, + pub improvement_surcharge: Option, + pub total_amount: Option, +} + +impl NycTripsTable { + pub fn setup() -> String { + NYC_TRIPS_TABLE_SETUP.to_string() + } + + fn create_s3_foreign_data_wrapper() -> String { + r#"CREATE FOREIGN DATA WRAPPER parquet_wrapper HANDLER parquet_fdw_handler VALIDATOR parquet_fdw_validator"#.into() + } + + fn create_s3_server() -> String { + r#"CREATE SERVER nyc_trips_server FOREIGN DATA WRAPPER parquet_wrapper"#.into() + } + + fn create_s3_user_mapping() -> String { + r#"CREATE USER MAPPING FOR public SERVER nyc_trips_server"#.into() + } + + fn create_table() -> String { + r#" + CREATE FOREIGN TABLE trips ( + "VendorID" INT, + -- Commented out until serde-arrow serialization issue is addressed. + -- "tpep_pickup_datetime" TIMESTAMP, + -- "tpep_dropoff_datetime" TIMESTAMP, + "passenger_count" BIGINT, + "trip_distance" DOUBLE PRECISION, + "RatecodeID" DOUBLE PRECISION, + "store_and_fwd_flag" TEXT, + "PULocationID" REAL, + "DOLocationID" REAL, + "payment_type" DOUBLE PRECISION, + "fare_amount" DOUBLE PRECISION, + "extra" DOUBLE PRECISION, + "mta_tax" DOUBLE PRECISION, + "tip_amount" DOUBLE PRECISION, + "tolls_amount" DOUBLE PRECISION, + "improvement_surcharge" DOUBLE PRECISION, + "total_amount" DOUBLE PRECISION + ) + SERVER nyc_trips_server + "# + .into() + } + + pub fn setup_s3_listing_fdw(s3_endpoint: &str, s3_object_path: &str) -> String { + let create_foreign_data_wrapper = Self::create_s3_foreign_data_wrapper(); + let create_server = Self::create_s3_server(); + let create_table = Self::create_table(); + let create_user_mapping = Self::create_s3_user_mapping(); + format!( + r#" + {create_foreign_data_wrapper}; + {create_server}; + {create_user_mapping} OPTIONS (type 'S3', region 'us-east-1', endpoint '{s3_endpoint}', use_ssl 'false', url_style 'path'); + {create_table} OPTIONS (files '{s3_object_path}'); + "# + ) + } +} + +static NYC_TRIPS_TABLE_SETUP: &str = r#" +CREATE TABLE nyc_trips ( + "VendorID" INT, + "tpep_pickup_datetime" TIMESTAMP, + "tpep_dropoff_datetime" TIMESTAMP, + "passenger_count" BIGINT, + "trip_distance" DOUBLE PRECISION, + "RatecodeID" DOUBLE PRECISION, + "store_and_fwd_flag" TEXT, + "PULocationID" REAL, + "DOLocationID" REAL, + "payment_type" DOUBLE PRECISION, + "fare_amount" DOUBLE PRECISION, + "extra" DOUBLE PRECISION, + "mta_tax" DOUBLE PRECISION, + "tip_amount" DOUBLE PRECISION, + "tolls_amount" DOUBLE PRECISION, + "improvement_surcharge" DOUBLE PRECISION, + "total_amount" DOUBLE PRECISION +); + +INSERT INTO nyc_trips ("VendorID", tpep_pickup_datetime, tpep_dropoff_datetime, passenger_count, trip_distance, "RatecodeID", store_and_fwd_flag, "PULocationID", "DOLocationID", payment_type, fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, total_amount) +VALUES +(2, '2024-01-24 15:17:12', '2024-01-24 15:34:53', 1, 3.33, 1, 'N', 239, 246, 1, 20.5, 0, 0.5, 3, 0, 1, 27.5), +(2, '2024-01-24 15:52:24', '2024-01-24 16:01:39', 1, 1.61, 1, 'N', 234, 249, 1, 10.7, 0, 0.5, 3.67, 0, 1, 18.37), +(2, '2024-01-24 15:08:55', '2024-01-24 15:31:35', 1, 4.38, 1, 'N', 88, 211, 1, 25.4, 0, 0.5, 5.88, 0, 1, 35.28), +(2, '2024-01-24 15:42:55', '2024-01-24 15:51:35', 1, 0.95, 1, 'N', 211, 234, 1, 9.3, 0, 0.5, 2.66, 0, 1, 15.96), +(2, '2024-01-24 15:52:23', '2024-01-24 16:12:53', 1, 2.58, 1, 'N', 68, 144, 1, 18.4, 0, 0.5, 4.48, 0, 1, 26.88), +(1, '2024-01-24 15:30:55', '2024-01-24 16:38:46', 1, 15.8, 2, 'N', 164, 132, 1, 70, 2.5, 0.5, 10, 6.94, 1, 90.94), +(2, '2024-01-24 15:21:48', '2024-01-24 15:59:06', 2, 7.69, 1, 'N', 231, 161, 1, 40.8, 0, 0.5, 6.5, 0, 1, 51.3), +(2, '2024-01-24 15:47:59', '2024-01-24 16:12:38', 1, 8.31, 1, 'N', 138, 262, 1, 35.2, 5, 0.5, 10, 6.94, 1, 62.89), +(2, '2024-01-24 15:55:32', '2024-01-24 16:23:01', 1, 8.47, 1, 'N', 132, 192, 2, 36.6, 0, 0.5, 0, 0, 1, 39.85), +(1, '2024-01-24 15:02:22', '2024-01-24 15:13:11', 1, 1.4, 1, 'N', 226, 7, 2, 11.4, 0, 0.5, 0, 0, 1, 12.9), +(1, '2024-01-24 15:49:04', '2024-01-24 15:55:15', 1, 0.9, 1, 'N', 43, 237, 1, 7.9, 5, 0.5, 2.85, 0, 1, 17.25), +(2, '2024-01-24 15:10:53', '2024-01-24 15:20:45', 1, 0.55, 1, 'N', 237, 237, 1, 10, 0, 0.5, 2.8, 0, 1, 16.8), +(1, '2024-01-24 15:09:28', '2024-01-24 16:21:23', 1, 16.2, 5, 'N', 230, 132, 1, 86.55, 0, 0, 17.5, 0, 1, 105.05), +(2, '2024-01-24 15:14:11', '2024-01-24 15:27:17', 1, 0.74, 1, 'N', 236, 237, 2, 12.1, 0, 0.5, 0, 0, 1, 16.1), +(2, '2024-01-24 15:56:34', '2024-01-24 16:27:32', 1, 3.79, 1, 'N', 230, 144, 1, 27.5, 0, 0.5, 7.88, 0, 1, 39.38), +(2, '2024-01-24 15:31:32', '2024-01-24 15:46:48', 2, 1.9, 1, 'N', 246, 161, 1, 14.9, 0, 0.5, 3.78, 0, 1, 22.68), +(2, '2024-01-24 15:50:45', '2024-01-24 16:22:14', 1, 6.82, 1, 'N', 162, 261, 1, 33.8, 0, 0.5, 3.78, 0, 1, 41.58), +(2, '2024-01-24 15:54:18', '2024-01-24 16:24:41', 1, 8.26, 1, 'N', 138, 262, 1, 37.3, 5, 0.5, 10.65, 6.94, 1, 65.64), +(2, '2024-01-24 15:11:02', '2024-01-24 15:33:35', 1, 1.6, 1, 'N', 162, 263, 1, 19.1, 0, 0.5, 4.62, 0, 1, 27.72), +(2, '2024-01-24 15:20:01', '2024-01-24 15:34:38', 2, 1.79, 1, 'N', 68, 163, 2, 14.2, 0, 0.5, 0, 0, 1, 18.2), +(2, '2024-01-24 15:50:36', '2024-01-24 15:59:20', 1, 0.58, 1, 'N', 162, 229, 1, 9.3, 0, 0.5, 3.33, 0, 1, 16.63), +(1, '2024-01-24 15:04:08', '2024-01-24 15:23:57', 1, 2, 1, 'N', 246, 161, 1, 14.9, 2.5, 0.5, 1, 0, 1, 19.9), +(1, '2024-01-24 15:25:27', '2024-01-24 15:37:29', 1, 1.6, 1, 'N', 161, 233, 1, 10.7, 2.5, 0.5, 3.65, 0, 1, 18.35), +(1, '2024-01-24 15:40:53', '2024-01-24 15:45:56', 1, 1.1, 1, 'Y', 233, 162, 1, 7.2, 2.5, 0.5, 2.24, 0, 1, 13.44), +(1, '2024-01-24 15:56:09', '2024-01-24 16:05:35', 1, 1.6, 1, 'N', 237, 239, 1, 10, 2.5, 0.5, 4.2, 0, 1, 18.2), +(2, '2024-01-24 15:03:07', '2024-01-24 15:21:19', 2, 5.73, 5, 'N', 180, 132, 1, 84, 0, 0, 17, 0, 1, 102), +(2, '2024-01-24 16:02:45', '2024-01-24 16:11:52', 1, 1.1, 1, 'N', 263, 141, 1, 10, 0, 0.5, 2.1, 0, 1, 16.1), +(2, '2024-01-24 15:19:51', '2024-01-24 15:30:56', 1, 0.77, 1, 'N', 162, 161, 1, 10.7, 0, 0.5, 2.94, 0, 1, 17.64), +(2, '2024-01-24 15:32:10', '2024-01-24 15:39:06', 1, 0.85, 1, 'N', 161, 170, 1, 7.9, 0, 0.5, 2.98, 0, 1, 14.88), +(2, '2024-01-24 15:44:04', '2024-01-24 15:56:43', 2, 1.07, 1, 'N', 170, 163, 1, 12.1, 0, 0.5, 3.22, 0, 1, 19.32), +(2, '2024-01-24 15:57:39', '2024-01-24 16:02:55', 1, 0.54, 1, 'N', 161, 237, 1, 6.5, 0, 0.5, 2.1, 0, 1, 12.6), +(1, '2024-01-24 15:04:50', '2024-01-24 15:25:58', 2, 2.9, 1, 'N', 161, 246, 1, 21.9, 2.5, 0.5, 5.15, 0, 1, 31.05), +(2, '2024-01-24 15:27:35', '2024-01-24 15:50:28', 1, 2.11, 1, 'N', 164, 79, 1, 20.5, 0, 0.5, 4.9, 0, 1, 29.4), +(2, '2024-01-24 15:13:53', '2024-01-24 15:55:09', 3, 5.62, 1, 'N', 161, 261, 1, 38, 0, 0.5, 8.4, 0, 1, 50.4), +(1, '2024-01-24 15:29:37', '2024-01-24 15:50:25', 1, 2.2, 1, 'N', 237, 230, 1, 18.4, 2.5, 0.5, 5.55, 0, 1, 27.95), +(1, '2024-01-24 15:34:29', '2024-01-24 15:45:41', 1, 2, 1, 'N', 142, 151, 1, 12.1, 2.5, 0.5, 3.22, 0, 1, 19.32), +(1, '2024-01-24 15:54:16', '2024-01-24 16:04:40', 2, 1.6, 1, 'N', 238, 143, 1, 10.7, 5, 0.5, 3.4, 0, 1, 20.6), +(2, '2024-01-24 15:05:20', '2024-01-24 15:16:38', 1, 1.27, 1, 'N', 142, 230, 2, 11.4, 0, 0.5, 0, 0, 1, 15.4), +(2, '2024-01-24 15:21:05', '2024-01-24 16:36:49', 1, 7.49, 1, 'N', 163, 181, 1, 61.8, 0, 0.5, 21.82, 6.94, 1, 94.56), +(2, '2024-01-24 15:13:19', '2024-01-24 15:28:32', 1, 2.51, 1, 'N', 143, 236, 1, 16.3, 0, 0.5, 4.06, 0, 1, 24.36), +(2, '2024-01-24 15:38:01', '2024-01-24 15:49:52', 1, 1.83, 1, 'N', 239, 262, 1, 12.8, 0, 0.5, 4.2, 0, 1, 21), +(2, '2024-01-24 15:09:19', '2024-01-24 15:26:41', 1, 2.42, 1, 'N', 238, 237, 1, 17, 0, 0.5, 4.2, 0, 1, 25.2), +(2, '2024-01-24 15:30:22', '2024-01-24 15:45:27', 1, 2.25, 1, 'N', 237, 233, 1, 15.6, 0, 0.5, 3.92, 0, 1, 23.52), +(1, '2024-01-24 15:57:50', '2024-01-24 16:45:02', 0, 15, 1, 'N', 138, 265, 2, 60.4, 9.25, 0.5, 0, 6.94, 1, 78.09), +(2, '2024-01-24 15:41:46', '2024-01-24 15:50:08', 1, 0.8, 1, 'N', 161, 100, 1, 8.6, 0, 0.5, 2.52, 0, 1, 15.12), +(2, '2024-01-24 15:54:22', '2024-01-24 15:59:06', 1, 0.5, 1, 'N', 100, 164, 2, 6.5, 0, 0.5, 0, 0, 1, 10.5), +(2, '2024-01-24 15:25:27', '2024-01-24 15:34:11', 2, 1.09, 1, 'N', 164, 234, 1, 9.3, 0, 0.5, 3.99, 0, 1, 17.29), +(2, '2024-01-24 15:14:18', '2024-01-24 15:22:17', 1, 0.78, 1, 'N', 234, 249, 1, 8.6, 0, 0.5, 2.52, 0, 1, 15.12), +(2, '2024-01-24 15:33:41', '2024-01-24 15:47:12', 1, 1.54, 1, 'N', 113, 231, 1, 12.8, 0, 0.5, 5.04, 0, 1, 21.84), +(2, '2024-01-24 15:53:15', '2024-01-24 16:04:11', 1, 1.63, 1, 'N', 125, 68, 1, 12.1, 0, 0.5, 2.42, 0, 1, 18.52), +(1, '2024-01-24 15:13:03', '2024-01-24 15:23:58', 1, 1.4, 1, 'N', 142, 161, 1, 10, 2.5, 0.5, 2.8, 0, 1, 16.8), +(1, '2024-01-24 15:31:49', '2024-01-24 15:46:47', 1, 1.8, 1, 'N', 161, 68, 1, 12.8, 2.5, 0.5, 3.36, 0, 1, 20.16), +(1, '2024-01-24 15:48:50', '2024-01-24 16:06:14', 1, 1.1, 1, 'N', 68, 246, 1, 12.1, 2.5, 0.5, 2, 0, 1, 18.1), +(2, '2024-01-24 15:17:46', '2024-01-24 15:28:19', 1, 1.02, 1, 'N', 236, 236, 1, 10.7, 0, 0.5, 3.67, 0, 1, 18.37), +(2, '2024-01-24 15:30:25', '2024-01-24 15:38:09', 1, 0.84, 1, 'N', 236, 141, 1, 8.6, 0, 0.5, 2.52, 0, 1, 15.12), +(2, '2024-01-24 15:47:13', '2024-01-24 15:50:30', 1, 0.54, 1, 'N', 237, 162, 1, 5.8, 0, 0.5, 2.45, 0, 1, 12.25), +(1, '2024-01-24 15:04:49', '2024-01-24 15:29:05', 1, 6.6, 1, 'N', 132, 134, 1, 27.5, 1.75, 0.5, 0, 0, 1, 30.75), +(1, '2024-01-24 15:52:43', '2024-01-24 16:48:43', 1, 16.3, 2, 'N', 132, 230, 1, 70, 4.25, 0.5, 15.15, 0, 1, 90.9), +(1, '2024-01-24 15:10:42', '2024-01-24 16:07:13', 1, 16.9, 2, 'N', 162, 132, 1, 70, 2.5, 0.5, 16.15, 6.94, 1, 97.09), +(1, '2024-01-24 15:24:26', '2024-01-24 15:53:43', 1, 3.1, 1, 'N', 236, 164, 2, 25.4, 2.5, 0.5, 0, 0, 1, 29.4), +(1, '2024-01-24 15:55:46', '2024-01-24 16:02:04', 1, 0.8, 1, 'N', 164, 107, 1, 7.9, 2.5, 0.5, 2.35, 0, 1, 14.25), +(1, '2024-01-24 15:57:50', '2024-01-24 16:21:27', 1, 2.9, 1, 'N', 75, 143, 1, 21.9, 5, 0.5, 5.65, 0, 1, 34.05), +(2, '2024-01-24 15:56:42', '2024-01-24 16:01:57', 1, 0.73, 1, 'N', 237, 162, 2, 7.2, 0, 0.5, 0, 0, 1, 11.2), +(2, '2024-01-24 15:02:26', '2024-01-24 15:14:20', 1, 1.41, 1, 'N', 151, 41, 2, 12.1, 0, 0.5, 0, 0, 1, 13.6), +(2, '2024-01-24 15:43:11', '2024-01-24 15:52:26', 1, 2.03, 1, 'N', 75, 239, 1, 12.1, 0, 0.5, 3.22, 0, 1, 19.32), +(1, '2024-01-24 15:09:57', '2024-01-24 15:17:06', 1, 0.9, 1, 'N', 186, 234, 1, 8.6, 2.5, 0.5, 1.5, 0, 1, 14.1), +(1, '2024-01-24 15:15:44', '2024-01-24 16:03:27', 1, 5.2, 1, 'N', 234, 41, 2, 40.8, 2.5, 0.5, 0, 0, 1, 44.8), +(2, '2024-01-24 15:03:30', '2024-01-24 15:15:18', 1, 1.74, 1, 'N', 142, 162, 1, 12.8, 0, 0.5, 3, 0, 1, 19.8), +(2, '2024-01-24 15:16:18', '2024-01-24 15:26:54', 1, 1.02, 1, 'N', 162, 230, 1, 10.7, 0, 0.5, 2.94, 0, 1, 17.64), +(1, '2024-01-24 15:09:12', '2024-01-24 15:26:06', 1, 2.5, 1, 'N', 163, 43, 2, 15.6, 2.5, 0.5, 0, 0, 1, 19.6), +(1, '2024-01-24 15:36:01', '2024-01-24 16:09:08', 1, 3.4, 1, 'N', 238, 164, 1, 26.8, 2.5, 0.5, 3.08, 0, 1, 33.88), +(1, '2024-01-24 15:01:40', '2024-01-24 15:30:58', 1, 4, 1, 'N', 231, 181, 1, 23.3, 2.5, 0.5, 6.85, 0, 1, 34.15), +(1, '2024-01-24 15:44:58', '2024-01-24 16:02:01', 1, 1, 1, 'N', 97, 33, 2, 13.5, 0, 0.5, 0, 0, 1, 15), +(1, '2024-01-24 15:08:08', '2024-01-24 15:19:26', 1, 1.1, 1, 'N', 262, 75, 2, 10.7, 2.5, 0.5, 0, 0, 1, 14.7), +(1, '2024-01-24 15:24:26', '2024-01-24 15:51:30', 1, 2.8, 1, 'N', 75, 48, 1, 21.9, 2.5, 0.5, 5.2, 0, 1, 31.1), +(1, '2024-01-24 15:05:32', '2024-01-24 16:11:42', 1, 8.1, 1, 'N', 186, 85, 2, 49.2, 2.5, 0.5, 0, 0, 1, 53.2), +(1, '2024-01-24 15:16:02', '2024-01-24 15:25:14', 1, 0.5, 1, 'N', 162, 161, 1, 9.3, 2.5, 0.5, 2.65, 0, 1, 15.95), +(1, '2024-01-24 15:29:34', '2024-01-24 15:34:45', 1, 0.3, 1, 'N', 161, 162, 2, 6.5, 2.5, 0.5, 0, 0, 1, 10.5), +(1, '2024-01-24 15:56:23', '2024-01-24 16:12:18', 1, 1.4, 1, 'N', 48, 164, 1, 14.9, 2.5, 0.5, 3.75, 0, 1, 22.65), +(1, '2024-01-24 15:22:06', '2024-01-24 15:46:23', 1, 4.4, 1, 'N', 68, 238, 1, 26.1, 2.5, 0.5, 7.5, 0, 1, 37.6), +(2, '2024-01-24 15:28:46', '2024-01-24 15:43:33', 1, 1.49, 1, 'N', 113, 186, 1, 13.5, 0, 0.5, 3.5, 0, 1, 21), +(2, '2024-01-24 15:49:11', '2024-01-24 16:03:14', 1, 1.49, 1, 'N', 90, 161, 1, 13.5, 0, 0.5, 3.5, 0, 1, 21), +(1, '2024-01-24 15:09:45', '2024-01-24 15:43:41', 1, 2.6, 1, 'N', 158, 170, 1, 28.2, 2.5, 0.5, 6.4, 0, 1, 38.6), +(2, '2024-01-24 15:10:12', '2024-01-24 15:30:12', 1, 2.64, 1, 'N', 186, 141, 1, 19.1, 0, 0.5, 2, 0, 1, 25.1), +(2, '2024-01-24 15:08:02', '2024-01-24 15:20:36', 1, 1.59, 1, 'N', 142, 161, 1, 13.5, 0, 0.5, 3.5, 0, 1, 21), +(2, '2024-01-24 15:54:25', '2024-01-24 16:25:45', 1, 3.55, 1, 'N', 236, 234, 1, 27.5, 0, 0.5, 6.3, 0, 1, 37.8), +(2, '2024-01-24 15:09:55', '2024-01-24 15:22:14', 1, 1.85, 1, 'N', 236, 143, 1, 13.5, 0, 0.5, 2, 0, 1, 19.5), +(2, '2024-01-24 15:33:37', '2024-01-24 15:39:20', 2, 0.59, 1, 'N', 238, 238, 1, 7.2, 0, 0.5, 2.24, 0, 1, 13.44), +(2, '2024-01-24 15:58:14', '2024-01-24 16:02:46', 2, 0.42, 1, 'N', 239, 142, 1, 5.8, 0, 0.5, 1.96, 0, 1, 11.76), +(2, '2024-01-24 15:05:34', '2024-01-24 15:51:33', 1, 11.54, 1, 'N', 138, 142, 1, 52, 5, 0.5, 13.94, 6.94, 1, 83.63), +(2, '2024-01-24 15:19:22', '2024-01-24 15:28:49', 1, 1.38, 1, 'N', 230, 143, 1, 10.7, 0, 0.5, 1.47, 0, 1, 16.17), +(2, '2024-01-24 15:22:30', '2024-01-24 15:47:17', 1, 3.6, 1, 'N', 163, 74, 2, 22.6, 0, 0.5, 0, 0, 1, 26.6), +(1, '2024-01-24 15:51:41', '2024-01-24 15:54:17', 1, 0.3, 1, 'N', 249, 90, 1, 4.4, 5, 0.5, 2, 0, 1, 12.9), +(2, '2024-01-24 15:02:26', '2024-01-24 15:07:59', 1, 0.66, 1, 'N', 161, 163, 1, 7.2, 0, 0.5, 2.24, 0, 1, 13.44), +(2, '2024-01-24 15:09:01', '2024-01-24 15:25:34', 1, 1.38, 1, 'N', 163, 236, 1, 14.9, 0, 0.5, 1, 0, 1, 19.9), +(1, '2024-01-24 15:06:58', '2024-01-24 15:24:35', 1, 1.4, 1, 'N', 236, 161, 1, 14.9, 2.5, 0.5, 3.8, 0, 1, 22.7), +(1, '2024-01-24 15:39:09', '2024-01-24 16:03:25', 1, 2.5, 1, 'N', 233, 68, 1, 19.8, 2.5, 0.5, 4.75, 0, 1, 28.55), +(2, '2024-01-24 15:21:47', '2024-01-24 15:55:15', 1, 8.65, 1, 'N', 70, 230, 2, 40.8, 5, 0.5, 0, 6.94, 1, 58.49), +(2, '2024-01-24 15:32:46', '2024-01-24 16:01:04', 1, 2.16, 1, 'N', 113, 79, 1, 23.3, 0, 0.5, 8.19, 0, 1, 35.49), +(2, '2024-01-24 15:37:00', '2024-01-24 16:01:28', 1, 4.56, 1, 'N', 261, 170, 1, 25.4, 0, 0.5, 5.88, 0, 1, 35.28); +"#; diff --git a/tests/scan.rs b/tests/scan.rs index dbec5c16..1fea10b8 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -19,25 +19,29 @@ mod fixtures; use std::fs::File; +use crate::fixtures::arrow::{ + delta_primitive_record_batch, primitive_record_batch, primitive_setup_fdw_local_file_delta, + primitive_setup_fdw_local_file_listing, primitive_setup_fdw_s3_delta, + primitive_setup_fdw_s3_listing, +}; +use crate::fixtures::db::Query; +use crate::fixtures::{conn, duckdb_conn, s3, tempdir, S3}; use anyhow::Result; use datafusion::parquet::arrow::ArrowWriter; use deltalake::operations::create::CreateBuilder; use deltalake::writer::{DeltaWriter, RecordBatchWriter}; -use fixtures::*; use rstest::*; -use shared::fixtures::arrow::{ - delta_primitive_record_batch, primitive_record_batch, primitive_setup_fdw_local_file_delta, - primitive_setup_fdw_local_file_listing, primitive_setup_fdw_s3_delta, - primitive_setup_fdw_s3_listing, -}; -use shared::fixtures::tempfile::TempDir; use sqlx::postgres::types::PgInterval; use sqlx::types::{BigDecimal, Json, Uuid}; use sqlx::PgConnection; use std::collections::HashMap; use std::str::FromStr; +use tempfile::TempDir; use time::macros::{date, datetime, time}; +use crate::fixtures::tables::duckdb_types::DuckdbTypesTable; +use crate::fixtures::tables::nyc_trips::NycTripsTable; + const S3_TRIPS_BUCKET: &str = "test-trip-setup"; const S3_TRIPS_KEY: &str = "test_trip_setup.parquet"; diff --git a/tests/settings.rs b/tests/settings.rs index 73784c12..6bb8c197 100644 --- a/tests/settings.rs +++ b/tests/settings.rs @@ -1,7 +1,8 @@ mod fixtures; +use crate::fixtures::conn; +use crate::fixtures::db::Query; use anyhow::Result; -use fixtures::*; use rstest::*; use sqlx::PgConnection; diff --git a/tests/table_config.rs b/tests/table_config.rs index 58ead6e7..e40eee99 100644 --- a/tests/table_config.rs +++ b/tests/table_config.rs @@ -17,17 +17,18 @@ mod fixtures; -use anyhow::Result; -use datafusion::parquet::arrow::ArrowWriter; -use fixtures::*; -use rstest::*; -use shared::fixtures::arrow::{ +use crate::fixtures::arrow::{ primitive_record_batch, primitive_setup_fdw_local_file_listing, record_batch_with_casing, setup_local_file_listing_with_casing, }; -use shared::fixtures::tempfile::TempDir; +use crate::fixtures::db::Query; +use crate::fixtures::{conn, tempdir}; +use anyhow::Result; +use datafusion::parquet::arrow::ArrowWriter; +use rstest::*; use sqlx::PgConnection; use std::fs::File; +use tempfile::TempDir; #[rstest] async fn test_table_case_sensitivity(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { diff --git a/tests/time_bucket.rs b/tests/time_bucket.rs index f6c9ed07..45fcabb5 100644 --- a/tests/time_bucket.rs +++ b/tests/time_bucket.rs @@ -17,17 +17,21 @@ mod fixtures; +use crate::fixtures::arrow::primitive_setup_fdw_local_file_listing; +use crate::fixtures::db::Query; +use crate::fixtures::tables::nyc_trips::NycTripsTable; +use crate::fixtures::{ + conn, tempdir, time_series_record_batch_minutes, time_series_record_batch_years, +}; use anyhow::Result; use chrono::NaiveDateTime; use datafusion::parquet::arrow::ArrowWriter; -use fixtures::*; use rstest::*; -use shared::fixtures::arrow::primitive_setup_fdw_local_file_listing; -use shared::fixtures::tempfile::TempDir; use sqlx::types::BigDecimal; use sqlx::PgConnection; use std::fs::File; use std::str::FromStr; +use tempfile::TempDir; use time::Date; use time::Month::January;