Skip to content

Commit

Permalink
parse json and bytea array types from text format
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 27, 2024
1 parent 440cb64 commit fe419f2
Showing 1 changed file with 30 additions and 37 deletions.
67 changes: 30 additions & 37 deletions pg_replicate/src/conversions/cdc_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use postgres_replication::protocol::{
};
use thiserror::Error;
use tokio_postgres::types::{FromSql, Type};
use tracing::info;
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -270,9 +269,6 @@ pub enum ArrayParseError {

#[error("missing brances")]
MissingBraces,

#[error("missing double quotes")]
MissingDoubleQuotes,
}

impl TextFormatConverter {
Expand All @@ -290,47 +286,43 @@ impl TextFormatConverter {
}

let mut res = vec![];
let mut str = &str[1..(str.len() - 1)];
let str = &str[1..(str.len() - 1)];
let mut val_str = String::with_capacity(10);
let mut in_quotes = false;
let mut in_escape = false;
let mut chars = str.chars();
let mut done = str.is_empty();

while !done {
let start_idx = 0;
let mut end_idx = str.len();
let mut chars = str.char_indices();
loop {
match chars.next() {
Some((i, ',')) if !in_quotes => {
end_idx = i;
break;
}
Some((_, '"')) => in_quotes = !in_quotes,
Some(_) => {}
Some(c) => match c {
c if in_escape => {
val_str.push(c);
in_escape = false;
}
'"' => in_quotes = !in_quotes,
'\\' => in_escape = true,
',' if !in_quotes => {
break;
}
c => {
val_str.push(c);
}
},
None => {
done = true;
break;
}
}
}
let val_str = &str[start_idx..end_idx];
let val_str = if val_str.starts_with('"') {
if !val_str.ends_with('"') {
return Err(ArrayParseError::MissingDoubleQuotes.into());
}
&val_str[1..(val_str.len() - 1)]
} else {
val_str
};
info!("VAL STR: {val_str}");
let val = if val_str.to_lowercase() == "null" {
None
} else {
parse(val_str)?
parse(&val_str)?
};
res.push(val);
if !done {
str = &str[end_idx + 1..];
}
val_str.clear();
}

Ok(Cell::Array(m(res)))
Expand All @@ -344,7 +336,6 @@ impl FromTupleData for TextFormatConverter {
bytes: &[u8],
) -> Result<Cell, CdcEventConversionError> {
let str = str::from_utf8(bytes)?;
info!("TYP: {typ:#?}, STR: {str:#?}");
match *typ {
Type::BOOL => Ok(Cell::Bool(parse_bool(str)?)),
Type::BOOL_ARRAY => TextFormatConverter::parse_array(
Expand Down Expand Up @@ -391,10 +382,11 @@ impl FromTupleData for TextFormatConverter {
ArrayCell::Numeric,
),
Type::BYTEA => Ok(Cell::Bytes(hex::from_bytea_hex(str)?)),
// Type::BYTEA_ARRAY => {
// let val = Vec::<Option<Vec<u8>>>::from_sql(typ, bytes)?;
// Ok(Cell::Array(ArrayCell::Bytes(val)))
// }
Type::BYTEA_ARRAY => TextFormatConverter::parse_array(
str,
|str| Ok(Some(hex::from_bytea_hex(str)?)),
ArrayCell::Bytes,
),
Type::DATE => {
let val = NaiveDate::parse_from_str(str, "%Y-%m-%d")?;
Ok(Cell::Date(val))
Expand Down Expand Up @@ -453,10 +445,11 @@ impl FromTupleData for TextFormatConverter {
let val = serde_json::from_str(str)?;
Ok(Cell::Json(val))
}
// Type::JSON_ARRAY | Type::JSONB_ARRAY => {
// let val = Vec::<Option<serde_json::Value>>::from_sql(typ, bytes)?;
// Ok(Cell::Array(ArrayCell::Json(val)))
// }
Type::JSON_ARRAY | Type::JSONB_ARRAY => TextFormatConverter::parse_array(
str,
|str| Ok(Some(serde_json::from_str(str)?)),
ArrayCell::Json,
),
Type::OID => {
let val: u32 = str.parse()?;
Ok(Cell::U32(val))
Expand Down

0 comments on commit fe419f2

Please sign in to comment.