Skip to content

Commit

Permalink
use binary format for CdcStream
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 12, 2024
1 parent 8b70449 commit 18dbd5b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 47 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ k8s-openapi = { version = "0.23.0", default-features = false }
kube = { version = "0.96.0", default-features = false }
pg_escape = { version = "0.1.1", default-features = false }
pin-project-lite = { version = "0.2", default-features = false }
postgres-protocol = { git = "https://github.com/MaterializeInc/rust-postgres", rev = "548f00c4cbcf7bd714c29cfd2fea7685040c9987" }
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "548f00c4cbcf7bd714c29cfd2fea7685040c9987" }
postgres-protocol = { git = "https://github.com/imor/rust-postgres", rev = "20265ef38e32a06f76b6f9b678e2077fc2211f6b" }
postgres-replication = { git = "https://github.com/imor/rust-postgres", default-features = false, rev = "20265ef38e32a06f76b6f9b678e2077fc2211f6b" }
prost = { version = "0.13.1", default-features = false }
rand = { version = "0.8.5", default-features = false }
reqwest = { version = "0.12", default-features = false }
Expand All @@ -39,7 +39,7 @@ serde_json = { version = "1.0", default-features = false }
sqlx = { version = "0.8.2", default-features = false }
thiserror = "1.0"
tokio = { version = "1.38", default-features = false }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "548f00c4cbcf7bd714c29cfd2fea7685040c9987" }
tokio-postgres = { git = "https://github.com/imor/rust-postgres", default-features = false, rev = "20265ef38e32a06f76b6f9b678e2077fc2211f6b" }
tracing = { version = "0.1", default-features = false }
tracing-actix-web = { version = "0.7", default-features = false }
tracing-bunyan-formatter = { version = "0.3", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion pg_replicate/src/clients/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl ReplicationClient {
start_lsn: PgLsn,
) -> Result<LogicalReplicationStream, ReplicationClientError> {
let options = format!(
r#"("proto_version" '1', "publication_names" {})"#,
r#"("proto_version" '1', "publication_names" {}, "binary")"#,
quote_literal(publication)
);

Expand Down
73 changes: 30 additions & 43 deletions pg_replicate/src/conversions/cdc_event.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use std::{
collections::HashMap,
num::{ParseFloatError, ParseIntError},
str::{from_utf8, FromStr, ParseBoolError, Utf8Error},
str::{ParseBoolError, Utf8Error},
string::FromUtf8Error,
};

use bigdecimal::{BigDecimal, ParseBigDecimalError};
use bigdecimal::ParseBigDecimalError;
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime};
use postgres_replication::protocol::{
BeginBody, CommitBody, DeleteBody, InsertBody, LogicalReplicationMessage, RelationBody,
ReplicationMessage, TupleData, TypeBody, UpdateBody,
};
use thiserror::Error;
use tokio_postgres::types::Type;
use tokio_postgres::types::{FromSql, Type};
use uuid::Uuid;

use crate::{
conversions::hex::from_bytea_hex,
pipeline::batching::BatchBoundary,
table::{ColumnSchema, TableId, TableSchema},
};
Expand All @@ -34,6 +33,9 @@ pub enum CdcEventConversionError {
#[error("unchanged toast not yet supported")]
UnchangedToastNotSupported,

#[error("text format not supported")]
TextFormatNotSupported,

#[error("invalid string value")]
InvalidStr(#[from] Utf8Error),

Expand Down Expand Up @@ -84,6 +86,9 @@ pub enum CdcEventConversionError {

#[error("invalid column name: {0}")]
InvalidColumnName(String),

#[error("row get error: {0:?}")]
RowGetError(#[from] Box<dyn std::error::Error + Sync + Send>),
}

pub struct CdcEventConverter;
Expand All @@ -98,97 +103,79 @@ impl CdcEventConverter {
TupleData::UnchangedToast => {
return Err(CdcEventConversionError::UnchangedToastNotSupported)
}
TupleData::Text(bytes) => &bytes[..],
TupleData::Text(_) => return Err(CdcEventConversionError::TextFormatNotSupported),
TupleData::Binary(bytes) => &bytes[..],
};

match *typ {
Type::BOOL => {
let val = from_utf8(bytes)?;
let val = match val {
"t" => true,
"f" => false,
_ => val.parse()?,
};
let val = bool::from_sql(typ, bytes)?;
Ok(Cell::Bool(val))
}
Type::CHAR | Type::BPCHAR | Type::VARCHAR | Type::NAME | Type::TEXT => {
let val = from_utf8(bytes)?;
let val = String::from_sql(typ, bytes)?;
Ok(Cell::String(val.to_string()))
}
Type::INT2 => {
let val = from_utf8(bytes)?;
let val: i16 = val.parse()?;
let val = i16::from_sql(typ, bytes)?;
Ok(Cell::I16(val))
}
Type::INT4 => {
let val = from_utf8(bytes)?;
let val: i32 = val.parse()?;
let val = i32::from_sql(typ, bytes)?;
Ok(Cell::I32(val))
}
Type::INT8 => {
let val = from_utf8(bytes)?;
let val: i64 = val.parse()?;
let val = i64::from_sql(typ, bytes)?;
Ok(Cell::I64(val))
}
Type::FLOAT4 => {
let val = from_utf8(bytes)?;
let val: f32 = val.parse()?;
let val = f32::from_sql(typ, bytes)?;
Ok(Cell::F32(val))
}
Type::FLOAT8 => {
let val = from_utf8(bytes)?;
let val: f64 = val.parse()?;
let val = f64::from_sql(typ, bytes)?;
Ok(Cell::F64(val))
}
Type::NUMERIC => {
let val = from_utf8(bytes)?;
let val = BigDecimal::from_str(val)?;
let val = PgNumeric::new(Some(val));
let val = PgNumeric::from_sql(typ, bytes)?;
Ok(Cell::Numeric(val))
}
Type::BYTEA => {
let val = from_utf8(bytes)?;
let val = from_bytea_hex(val)?;
let val = Vec::<u8>::from_sql(typ, bytes)?;
Ok(Cell::Bytes(val))
}
Type::DATE => {
let val = from_utf8(bytes)?;
let val = NaiveDate::parse_from_str(val, "%Y-%m-%d")?;
let val = NaiveDate::from_sql(typ, bytes)?;
Ok(Cell::Date(val))
}
Type::TIME => {
let val = from_utf8(bytes)?;
let val = NaiveTime::parse_from_str(val, "%H:%M:%S%.f")?;
let val = NaiveTime::from_sql(typ, bytes)?;
Ok(Cell::Time(val))
}
Type::TIMESTAMP => {
let val = from_utf8(bytes)?;
let val = NaiveDateTime::parse_from_str(val, "%Y-%m-%d %H:%M:%S%.f")?;
let val = NaiveDateTime::from_sql(typ, bytes)?;
Ok(Cell::TimeStamp(val))
}
Type::TIMESTAMPTZ => {
let val = from_utf8(bytes)?;
let val = DateTime::<FixedOffset>::parse_from_rfc3339(val)?;
let val = DateTime::<FixedOffset>::from_sql(typ, bytes)?;
Ok(Cell::TimeStampTz(val.into()))
}
Type::UUID => {
let val = from_utf8(bytes)?;
let val = Uuid::parse_str(val)?;
let val = Uuid::from_sql(typ, bytes)?;
Ok(Cell::Uuid(val))
}
Type::JSON | Type::JSONB => {
let val = from_utf8(bytes)?;
let val = serde_json::from_str(val)?;
let val = serde_json::Value::from_sql(typ, bytes)?;
Ok(Cell::Json(val))
}
Type::OID => {
let val = from_utf8(bytes)?;
let val: u32 = val.parse()?;
let val = u32::from_sql(typ, bytes)?;
Ok(Cell::U32(val))
}
#[cfg(feature = "unknown_types_to_bytes")]
_ => {
let s = String::from_utf8(bytes.to_vec())?;
Ok(Cell::String(s))
let val = String::from_sql(typ, bytes)?;
Ok(Cell::String(val.to_string()))
}
#[cfg(not(feature = "unknown_types_to_bytes"))]
_ => Err(CdcEventConversionError::UnsupportedType(
Expand Down

0 comments on commit 18dbd5b

Please sign in to comment.