From 479a096c011eb09fa158842da342054fb8eb7f06 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Mon, 25 Nov 2024 19:41:54 +0530 Subject: [PATCH] fix #59: failure when replica identity full is set --- pg_replicate/src/clients/bigquery.rs | 8 ++--- pg_replicate/src/clients/duckdb.rs | 12 ++++---- pg_replicate/src/clients/postgres.rs | 30 +++++++++---------- pg_replicate/src/pipeline/sinks/bigquery.rs | 6 ++-- .../src/pipeline/sinks/duckdb/executor.rs | 4 +-- pg_replicate/src/table.rs | 2 +- 6 files changed, 31 insertions(+), 31 deletions(-) diff --git a/pg_replicate/src/clients/bigquery.rs b/pg_replicate/src/clients/bigquery.rs index ec3249f..e0b23f0 100644 --- a/pg_replicate/src/clients/bigquery.rs +++ b/pg_replicate/src/clients/bigquery.rs @@ -142,7 +142,7 @@ impl BigQueryClient { } fn add_primary_key_clause(column_schemas: &[ColumnSchema], s: &mut String) { - let identity_columns = column_schemas.iter().filter(|s| s.identity); + let identity_columns = column_schemas.iter().filter(|s| s.primary); s.push_str("primary key ("); @@ -164,7 +164,7 @@ impl BigQueryClient { s.push(','); } - let has_identity_cols = column_schemas.iter().any(|s| s.identity); + let has_identity_cols = column_schemas.iter().any(|s| s.primary); if has_identity_cols { Self::add_primary_key_clause(column_schemas, &mut s); } else { @@ -445,7 +445,7 @@ impl BigQueryClient { let mut remove_comma = false; for (cell, column) in table_row.values.iter().zip(column_schemas) { - if !column.identity { + if !column.primary { s.push_str(&column.name); s.push_str(" = "); Self::cell_to_query_value(cell, &mut s); @@ -473,7 +473,7 @@ impl BigQueryClient { let mut remove_and = false; for (cell, column) in table_row.values.iter().zip(column_schemas) { - if column.identity { + if column.primary { s.push_str(&column.name); s.push_str(" = "); Self::cell_to_query_value(cell, s); diff --git a/pg_replicate/src/clients/duckdb.rs b/pg_replicate/src/clients/duckdb.rs index 513dbcc..389a29f 100644 --- a/pg_replicate/src/clients/duckdb.rs +++ b/pg_replicate/src/clients/duckdb.rs @@ -142,7 +142,7 @@ impl DuckDbClient { s.push(' '); let typ = Self::postgres_to_duckdb_type(&column_schema.typ); s.push_str(typ); - if column_schema.identity { + if column_schema.primary { s.push_str(" primary key"); }; } @@ -231,12 +231,12 @@ impl DuckDbClient { let non_identity_cells = column_schemas .iter() .zip(table_row.values.iter()) - .filter(|(s, _)| !s.identity) + .filter(|(s, _)| !s.primary) .map(|(_, c)| c); let identity_cells = column_schemas .iter() .zip(table_row.values.iter()) - .filter(|(s, _)| s.identity) + .filter(|(s, _)| s.primary) .map(|(_, c)| c); stmt.execute(params_from_iter(non_identity_cells.chain(identity_cells)))?; Ok(()) @@ -250,7 +250,7 @@ impl DuckDbClient { s.push_str(" set "); let mut remove_comma = false; - let non_identity_columns = column_schemas.iter().filter(|s| !s.identity); + let non_identity_columns = column_schemas.iter().filter(|s| !s.primary); for column in non_identity_columns { s.push_str(&column.name); s.push_str(" = ?,"); @@ -270,7 +270,7 @@ impl DuckDbClient { s.push_str(" where "); let mut remove_and = false; - let identity_columns = column_schemas.iter().filter(|s| s.identity); + let identity_columns = column_schemas.iter().filter(|s| s.primary); for column in identity_columns { s.push_str(&column.name); s.push_str(" = ? and "); @@ -298,7 +298,7 @@ impl DuckDbClient { let identity_cells = column_schemas .iter() .zip(table_row.values.iter()) - .filter(|(s, _)| s.identity) + .filter(|(s, _)| s.primary) .map(|(_, c)| c); stmt.execute(params_from_iter(identity_cells))?; Ok(()) diff --git a/pg_replicate/src/clients/postgres.rs b/pg_replicate/src/clients/postgres.rs index e73a73d..0a6b377 100644 --- a/pg_replicate/src/clients/postgres.rs +++ b/pg_replicate/src/clients/postgres.rs @@ -130,19 +130,19 @@ impl ReplicationClient { table_id: TableId, ) -> Result, ReplicationClientError> { let column_info_query = format!( - "SELECT a.attname, + "select a.attname, a.atttypid, a.atttypmod, a.attnotnull, - a.attnum = ANY(i.indkey) is_identity - FROM pg_catalog.pg_attribute a - LEFT JOIN pg_catalog.pg_index i - ON (i.indexrelid = pg_get_replica_identity_index({})) - WHERE a.attnum > 0::pg_catalog.int2 - AND NOT a.attisdropped - AND a.attrelid = {} - ORDER BY a.attnum", - table_id, table_id + coalesce(i.indisprimary, false) primary + from pg_attribute a + left join pg_index i + on a.attrelid = i.indrelid + and a.attnum = any(i.indkey) + where a.attnum > 0::int2 + and not a.attisdropped + and a.attrelid = {table_id} + ", ); let mut column_schemas = vec![]; @@ -195,11 +195,11 @@ impl ReplicationClient { ))? == "f"; - let identity = - row.try_get("is_identity")? + let primary = + row.try_get("primary")? .ok_or(ReplicationClientError::MissingColumn( - "attnum".to_string(), - "pg_attribute".to_string(), + "indisprimary".to_string(), + "pg_index".to_string(), ))? == "t"; @@ -208,7 +208,7 @@ impl ReplicationClient { typ, modifier, nullable, - identity, + primary, }) } } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index b389224..9c133c0 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -87,7 +87,7 @@ impl BatchSink for BigQueryBatchSink { typ: Type::INT4, modifier: 0, nullable: false, - identity: true, + primary: true, }]; self.client @@ -104,14 +104,14 @@ impl BatchSink for BigQueryBatchSink { typ: Type::INT8, modifier: 0, nullable: false, - identity: true, + primary: true, }, ColumnSchema { name: "lsn".to_string(), typ: Type::INT8, modifier: 0, nullable: false, - identity: false, + primary: false, }, ]; if self diff --git a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs index e96c26f..6927bd7 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs @@ -156,7 +156,7 @@ impl DuckDbExecutor { typ: Type::INT4, modifier: 0, nullable: false, - identity: true, + primary: true, }]; self.client .create_schema_if_missing(&copied_tables_table_name.schema)?; @@ -173,7 +173,7 @@ impl DuckDbExecutor { typ: Type::INT8, modifier: 0, nullable: false, - identity: true, + primary: true, }]; if self .client diff --git a/pg_replicate/src/table.rs b/pg_replicate/src/table.rs index d08b8ef..1b64e25 100644 --- a/pg_replicate/src/table.rs +++ b/pg_replicate/src/table.rs @@ -31,7 +31,7 @@ pub struct ColumnSchema { pub typ: Type, pub modifier: TypeModifier, pub nullable: bool, - pub identity: bool, + pub primary: bool, } pub type TableId = u32;