Skip to content

Commit

Permalink
chore: cleanup some timestamp handling, mostly in bson (#3103)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Jul 25, 2024
1 parent 6a744c1 commit 9e7b551
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 6 deletions.
54 changes: 54 additions & 0 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit};

use crate::bson::errors::{BsonError, Result};
use crate::common::util::try_parse_datetime;

/// Similar to arrow's `StructBuilder`, but specific for "shredding" bson
/// records.
Expand Down Expand Up @@ -293,6 +294,41 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
(RawBsonRef::String(v), DataType::Float64) => {
append_scalar!(Float64Builder, col, v.parse().unwrap_or_default())
}
(RawBsonRef::String(v), DataType::Date64) => {
append_scalar!(
Date64Builder,
col,
try_parse_datetime(v)?.timestamp_millis()
)
}
(RawBsonRef::String(v), DataType::Date32) => {
append_scalar!(
Date32Builder,
col,
try_parse_datetime(v)?.timestamp() as i32
)
}
(RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => {
append_scalar!(
TimestampMillisecondBuilder,
col,
try_parse_datetime(v)?.timestamp_millis()
)
}
(RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Microsecond, _)) => {
append_scalar!(
TimestampMicrosecondBuilder,
col,
try_parse_datetime(v)?.timestamp_micros()
)
}
(RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Second, _)) => {
append_scalar!(
TimestampSecondBuilder,
col,
try_parse_datetime(v)?.timestamp()
)
}

// ObjectId
(RawBsonRef::ObjectId(v), DataType::Binary) => {
Expand Down Expand Up @@ -327,6 +363,15 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
))?
)
}
(RawBsonRef::Timestamp(v), DataType::Utf8) => {
append_scalar!(
StringBuilder,
col,
chrono::DateTime::from_timestamp_millis(v.time as i64 * 1000)
.ok_or_else(|| BsonError::InvalidValue(v.to_string()))?
.to_rfc3339()
)
}

// Datetime (actual timestamps that you'd actually use in an application)
(RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Second, _)) => {
Expand Down Expand Up @@ -355,6 +400,15 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
(RawBsonRef::DateTime(v), DataType::Date32) => {
append_scalar!(Date32Builder, col, (v.timestamp_millis() / 1000) as i32)
}
(RawBsonRef::DateTime(v), DataType::Utf8) => {
append_scalar!(
StringBuilder,
col,
chrono::DateTime::from_timestamp_millis(v.timestamp_millis())
.ok_or_else(|| BsonError::InvalidValue(v.to_string()))?
.to_rfc3339()
)
}

// Array
(RawBsonRef::Document(doc), DataType::Struct(_)) => {
Expand Down
6 changes: 6 additions & 0 deletions crates/datasources/src/bson/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ pub enum BsonError {

#[error("no objects found {0}")]
NotFound(String),

#[error(transparent)]
DateTimeParse(#[from] chrono::ParseError),

#[error("invalid value: {0}")]
InvalidValue(String),
}

impl From<BsonError> for DataFusionError {
Expand Down
20 changes: 19 additions & 1 deletion crates/datasources/src/common/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::Write;
use std::sync::Arc;

use chrono::{Duration, TimeZone, Utc};
use chrono::{Duration, NaiveTime, TimeZone, Utc};
use datafusion::arrow::array::{Array, ArrayRef, UInt64Array};
use datafusion::arrow::compute::{cast_with_options, CastOptions};
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand Down Expand Up @@ -205,6 +205,24 @@ pub fn create_count_record_batch(count: u64) -> RecordBatch {
.unwrap()
}

pub fn try_parse_datetime(v: &str) -> Result<chrono::DateTime<chrono::Utc>, chrono::ParseError> {
chrono::DateTime::parse_from_rfc3339(v)
.or_else(|_| chrono::DateTime::parse_from_rfc2822(v))
.or_else(|_| {
chrono::NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f")
.map(|dt| dt.and_utc().fixed_offset())
})
.or_else(|_| {
chrono::NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S")
.map(|dt| dt.and_utc().fixed_offset())
})
.or_else(|_| {
chrono::NaiveDate::parse_from_str(v, "%Y-%m-%d")
.map(|dt| dt.and_time(NaiveTime::default()).and_utc().fixed_offset())
})
.map(|dt| dt.to_utc())
}


#[cfg(test)]
mod tests {
Expand Down
17 changes: 12 additions & 5 deletions crates/datasources/src/sqlite/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,11 @@ impl Converter {
builder.append_null();
}
ValueRef::Text(t) => {
// TODO: Support other str formats
let t = std::str::from_utf8(t).unwrap();
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();

// note: use this parsing order rather
// than util::try_parse_datetime,
// given the likely values of sqlite dates
let date = NaiveDate::parse_from_str(t, "%Y-%m-%d")
.or_else(|_| {
NaiveDateTime::parse_from_str(t, "%Y-%m-%d %H:%M:%S%.f")
Expand All @@ -339,6 +340,9 @@ impl Converter {
to: DataType::Date32,
cause: Some(e.to_string()),
})?;

let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();

let num_days_since_epoch =
date.signed_duration_since(epoch).num_days();
let i = i32::try_from(num_days_since_epoch).map_err(|e| {
Expand Down Expand Up @@ -436,9 +440,11 @@ impl Converter {
builder.append_null();
}
ValueRef::Text(t) => {
// TODO: Support other str formats
let t = std::str::from_utf8(t).unwrap();
let epoch = NaiveTime::from_hms_opt(0, 0, 0).unwrap();

// note: use this parsing order rather
// than util::try_parse_datetime,
// given the likely values of sqlite dates
let time = NaiveTime::parse_from_str(t, "%H:%M:%S%.f")
.or_else(|_| NaiveTime::parse_from_str(t, "%H:%M:%S%"))
.or_else(|_| {
Expand Down Expand Up @@ -466,7 +472,8 @@ impl Converter {
cause: Some(e.to_string()),
})?;

let duration_since_midnight = time.signed_duration_since(epoch);
let duration_since_midnight =
time.signed_duration_since(NaiveTime::default());
let microseconds_since_midnight =
duration_since_midnight.num_microseconds().unwrap();
builder.append_value(microseconds_since_midnight);
Expand Down

0 comments on commit 9e7b551

Please sign in to comment.