Skip to content

Commit

Permalink
fix #59: failure when replica identity full is set
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 25, 2024
1 parent d4f0a55 commit 479a096
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 31 deletions.
8 changes: 4 additions & 4 deletions pg_replicate/src/clients/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl BigQueryClient {
}

fn add_primary_key_clause(column_schemas: &[ColumnSchema], s: &mut String) {
let identity_columns = column_schemas.iter().filter(|s| s.identity);
let identity_columns = column_schemas.iter().filter(|s| s.primary);

s.push_str("primary key (");

Expand All @@ -164,7 +164,7 @@ impl BigQueryClient {
s.push(',');
}

let has_identity_cols = column_schemas.iter().any(|s| s.identity);
let has_identity_cols = column_schemas.iter().any(|s| s.primary);
if has_identity_cols {
Self::add_primary_key_clause(column_schemas, &mut s);
} else {
Expand Down Expand Up @@ -445,7 +445,7 @@ impl BigQueryClient {
let mut remove_comma = false;

for (cell, column) in table_row.values.iter().zip(column_schemas) {
if !column.identity {
if !column.primary {
s.push_str(&column.name);
s.push_str(" = ");
Self::cell_to_query_value(cell, &mut s);
Expand Down Expand Up @@ -473,7 +473,7 @@ impl BigQueryClient {

let mut remove_and = false;
for (cell, column) in table_row.values.iter().zip(column_schemas) {
if column.identity {
if column.primary {
s.push_str(&column.name);
s.push_str(" = ");
Self::cell_to_query_value(cell, s);
Expand Down
12 changes: 6 additions & 6 deletions pg_replicate/src/clients/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl DuckDbClient {
s.push(' ');
let typ = Self::postgres_to_duckdb_type(&column_schema.typ);
s.push_str(typ);
if column_schema.identity {
if column_schema.primary {
s.push_str(" primary key");
};
}
Expand Down Expand Up @@ -231,12 +231,12 @@ impl DuckDbClient {
let non_identity_cells = column_schemas
.iter()
.zip(table_row.values.iter())
.filter(|(s, _)| !s.identity)
.filter(|(s, _)| !s.primary)
.map(|(_, c)| c);
let identity_cells = column_schemas
.iter()
.zip(table_row.values.iter())
.filter(|(s, _)| s.identity)
.filter(|(s, _)| s.primary)
.map(|(_, c)| c);
stmt.execute(params_from_iter(non_identity_cells.chain(identity_cells)))?;
Ok(())
Expand All @@ -250,7 +250,7 @@ impl DuckDbClient {
s.push_str(" set ");

let mut remove_comma = false;
let non_identity_columns = column_schemas.iter().filter(|s| !s.identity);
let non_identity_columns = column_schemas.iter().filter(|s| !s.primary);
for column in non_identity_columns {
s.push_str(&column.name);
s.push_str(" = ?,");
Expand All @@ -270,7 +270,7 @@ impl DuckDbClient {
s.push_str(" where ");

let mut remove_and = false;
let identity_columns = column_schemas.iter().filter(|s| s.identity);
let identity_columns = column_schemas.iter().filter(|s| s.primary);
for column in identity_columns {
s.push_str(&column.name);
s.push_str(" = ? and ");
Expand Down Expand Up @@ -298,7 +298,7 @@ impl DuckDbClient {
let identity_cells = column_schemas
.iter()
.zip(table_row.values.iter())
.filter(|(s, _)| s.identity)
.filter(|(s, _)| s.primary)
.map(|(_, c)| c);
stmt.execute(params_from_iter(identity_cells))?;
Ok(())
Expand Down
30 changes: 15 additions & 15 deletions pg_replicate/src/clients/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,19 @@ impl ReplicationClient {
table_id: TableId,
) -> Result<Vec<ColumnSchema>, ReplicationClientError> {
let column_info_query = format!(
"SELECT a.attname,
"select a.attname,
a.atttypid,
a.atttypmod,
a.attnotnull,
a.attnum = ANY(i.indkey) is_identity
FROM pg_catalog.pg_attribute a
LEFT JOIN pg_catalog.pg_index i
ON (i.indexrelid = pg_get_replica_identity_index({}))
WHERE a.attnum > 0::pg_catalog.int2
AND NOT a.attisdropped
AND a.attrelid = {}
ORDER BY a.attnum",
table_id, table_id
coalesce(i.indisprimary, false) primary
from pg_attribute a
left join pg_index i
on a.attrelid = i.indrelid
and a.attnum = any(i.indkey)
where a.attnum > 0::int2
and not a.attisdropped
and a.attrelid = {table_id}
",
);

let mut column_schemas = vec![];
Expand Down Expand Up @@ -195,11 +195,11 @@ impl ReplicationClient {
))?
== "f";

let identity =
row.try_get("is_identity")?
let primary =
row.try_get("primary")?
.ok_or(ReplicationClientError::MissingColumn(
"attnum".to_string(),
"pg_attribute".to_string(),
"indisprimary".to_string(),
"pg_index".to_string(),
))?
== "t";

Expand All @@ -208,7 +208,7 @@ impl ReplicationClient {
typ,
modifier,
nullable,
identity,
primary,
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions pg_replicate/src/pipeline/sinks/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl BatchSink for BigQueryBatchSink {
typ: Type::INT4,
modifier: 0,
nullable: false,
identity: true,
primary: true,
}];

self.client
Expand All @@ -104,14 +104,14 @@ impl BatchSink for BigQueryBatchSink {
typ: Type::INT8,
modifier: 0,
nullable: false,
identity: true,
primary: true,
},
ColumnSchema {
name: "lsn".to_string(),
typ: Type::INT8,
modifier: 0,
nullable: false,
identity: false,
primary: false,
},
];
if self
Expand Down
4 changes: 2 additions & 2 deletions pg_replicate/src/pipeline/sinks/duckdb/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl DuckDbExecutor {
typ: Type::INT4,
modifier: 0,
nullable: false,
identity: true,
primary: true,
}];
self.client
.create_schema_if_missing(&copied_tables_table_name.schema)?;
Expand All @@ -173,7 +173,7 @@ impl DuckDbExecutor {
typ: Type::INT8,
modifier: 0,
nullable: false,
identity: true,
primary: true,
}];
if self
.client
Expand Down
2 changes: 1 addition & 1 deletion pg_replicate/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct ColumnSchema {
pub typ: Type,
pub modifier: TypeModifier,
pub nullable: bool,
pub identity: bool,
pub primary: bool,
}

pub type TableId = u32;
Expand Down

0 comments on commit 479a096

Please sign in to comment.