Skip to content

Commit

Permalink
parse bytea type from text format
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 27, 2024
1 parent c8e1c9a commit a2a8edb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 2 deletions.
11 changes: 9 additions & 2 deletions pg_replicate/src/conversions/cdc_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ use tracing::info;
use uuid::Uuid;

use crate::{
conversions::bool::parse_bool,
conversions::{bool::parse_bool, hex},
pipeline::batching::BatchBoundary,
table::{ColumnSchema, TableId, TableSchema},
};

use super::{bool::ParseBoolError, numeric::PgNumeric, table_row::TableRow, ArrayCell, Cell};
use super::{
bool::ParseBoolError, hex::ByteaHexParseError, numeric::PgNumeric, table_row::TableRow,
ArrayCell, Cell,
};

#[derive(Debug, Error)]
pub enum CdcEventConversionError {
Expand Down Expand Up @@ -51,6 +54,9 @@ pub enum CdcEventConversionError {
#[error("invalid numeric: {0}")]
InvalidNumeric(#[from] ParseBigDecimalError),

#[error("invalid bytea: {0}")]
InvalidBytea(#[from] ByteaHexParseError),

#[error("invalid uuid: {0}")]
InvalidUuid(#[from] uuid::Error),

Expand Down Expand Up @@ -317,6 +323,7 @@ impl FromTupleData for TextFormatConverter {
// let val = Vec::<u8>::from_sql(typ, bytes)?;
// Ok(Cell::Bytes(val))
// }
Type::BYTEA => Ok(Cell::Bytes(hex::from_bytea_hex(str)?)),
// Type::BYTEA_ARRAY => {
// let val = Vec::<Option<Vec<u8>>>::from_sql(typ, bytes)?;
// Ok(Cell::Array(ArrayCell::Bytes(val)))
Expand Down
35 changes: 35 additions & 0 deletions pg_replicate/src/conversions/hex.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::num::ParseIntError;

use thiserror::Error;

#[derive(Debug, Error)]
pub enum ByteaHexParseError {
#[error("missing prefix '\\x'")]
InvalidPrefix,

#[error("invalid byte")]
OddNumerOfDigits,

#[error("parse int result: {0}")]
ParseInt(#[from] ParseIntError),
}

pub fn from_bytea_hex(s: &str) -> Result<Vec<u8>, ByteaHexParseError> {
if s.len() < 2 || &s[..2] != "\\x" {
return Err(ByteaHexParseError::InvalidPrefix);
}

let mut result = Vec::with_capacity((s.len() - 2) / 2);
let s = &s[2..];

if s.len() % 2 != 0 {
return Err(ByteaHexParseError::OddNumerOfDigits);
}

for i in (0..s.len()).step_by(2) {
let val = u8::from_str_radix(&s[i..i + 2], 16)?;
result.push(val);
}

Ok(result)
}
1 change: 1 addition & 0 deletions pg_replicate/src/conversions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use uuid::Uuid;

pub mod bool;
pub mod cdc_event;
pub mod hex;
pub mod numeric;
pub mod table_row;

Expand Down

0 comments on commit a2a8edb

Please sign in to comment.