From c184b27e5047b136fda018867b769395133b4a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Fri, 30 Aug 2024 04:10:08 +1200 Subject: [PATCH 1/5] make sinks, sources, and pipelines generic on their errors --- .../src/pipeline/batching/data_pipeline.rs | 67 +++++++++++++----- pg_replicate/src/pipeline/data_pipeline.rs | 68 ++++++++++++++----- pg_replicate/src/pipeline/mod.rs | 21 +++--- pg_replicate/src/pipeline/sinks/bigquery.rs | 15 ++-- .../src/pipeline/sinks/duckdb/executor.rs | 12 +++- pg_replicate/src/pipeline/sinks/duckdb/mod.rs | 2 +- .../src/pipeline/sinks/duckdb/sink.rs | 29 +++++--- pg_replicate/src/pipeline/sinks/mod.rs | 64 ++++++----------- pg_replicate/src/pipeline/sinks/stdout.rs | 20 ++++-- pg_replicate/src/pipeline/sources/mod.rs | 14 ++-- pg_replicate/src/pipeline/sources/postgres.rs | 10 ++- 11 files changed, 201 insertions(+), 121 deletions(-) diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 8dd7ea4..9036460 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -10,7 +10,7 @@ use crate::{ pipeline::{ batching::stream::BatchTimeoutStream, sinks::BatchSink, - sources::{postgres::CdcStreamError, Source, SourceError}, + sources::{postgres::CdcStreamError, CommonSourceError, Source}, PipelineAction, PipelineError, }, table::TableId, @@ -35,18 +35,24 @@ impl BatchDataPipeline { } } - async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { + async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let table_schemas = table_schemas.clone(); if !table_schemas.is_empty() { - self.sink.write_table_schemas(table_schemas).await?; + self.sink + .write_table_schemas(table_schemas) + .await + .map_err(PipelineError::Sink)?; } Ok(()) } - async fn copy_tables(&mut self, copied_tables: &HashSet) -> Result<(), PipelineError> { + async fn copy_tables( + &mut self, + copied_tables: &HashSet, + ) -> Result<(), PipelineError> { let start = Instant::now(); let table_schemas = self.source.get_table_schemas(); @@ -60,12 +66,16 @@ impl BatchDataPipeline { continue; } - self.sink.truncate_table(table_schema.table_id).await?; + self.sink + .truncate_table(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; let table_rows = self .source .get_table_copy_stream(&table_schema.table_name, &table_schema.column_schemas) - .await?; + .await + .map_err(PipelineError::Source)?; let batch_timeout_stream = BatchTimeoutStream::new(table_rows, self.batch_config.clone()); @@ -77,16 +87,23 @@ impl BatchDataPipeline { //TODO: Avoid a vec copy let mut rows = Vec::with_capacity(batch.len()); for row in batch { - rows.push(row.map_err(SourceError::TableCopyStream)?); + rows.push(row.map_err(CommonSourceError::TableCopyStream)?); } self.sink .write_table_rows(rows, table_schema.table_id) - .await?; + .await + .map_err(PipelineError::Sink)?; } - self.sink.table_copied(table_schema.table_id).await?; + self.sink + .table_copied(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; } - self.source.commit_transaction().await?; + self.source + .commit_transaction() + .await + .map_err(PipelineError::Source)?; let end = Instant::now(); let seconds = (end - start).as_secs(); @@ -95,10 +112,17 @@ impl BatchDataPipeline { Ok(()) } - async fn copy_cdc_events(&mut self, last_lsn: PgLsn) -> Result<(), PipelineError> { + async fn copy_cdc_events( + &mut self, + last_lsn: PgLsn, + ) -> Result<(), PipelineError> { let mut last_lsn: u64 = last_lsn.into(); last_lsn += 1; - let cdc_events = self.source.get_cdc_stream(last_lsn.into()).await?; + let cdc_events = self + .source + .get_cdc_stream(last_lsn.into()) + .await + .map_err(PipelineError::Source)?; pin!(cdc_events); @@ -117,13 +141,17 @@ impl BatchDataPipeline { { continue; } - let event = event.map_err(SourceError::CdcStream)?; + let event = event.map_err(CommonSourceError::CdcStream)?; if let CdcEvent::KeepAliveRequested { reply } = event { send_status_update = reply; }; events.push(event); } - let last_lsn = self.sink.write_cdc_events(events).await?; + let last_lsn = self + .sink + .write_cdc_events(events) + .await + .map_err(PipelineError::Sink)?; if send_status_update { info!("sending status update with lsn: {last_lsn}"); let inner = unsafe { @@ -136,15 +164,20 @@ impl BatchDataPipeline { .as_mut() .send_status_update(last_lsn) .await - .map_err(|e| PipelineError::SourceError(SourceError::StatusUpdate(e)))?; + .map_err(CommonSourceError::StatusUpdate)?; } } Ok(()) } - pub async fn start(&mut self) -> Result<(), PipelineError> { - let resumption_state = self.sink.get_resumption_state().await?; + pub async fn start(&mut self) -> Result<(), PipelineError> { + let resumption_state = self + .sink + .get_resumption_state() + .await + .map_err(PipelineError::Sink)?; + match self.action { PipelineAction::TableCopiesOnly => { self.copy_table_schemas().await?; diff --git a/pg_replicate/src/pipeline/data_pipeline.rs b/pg_replicate/src/pipeline/data_pipeline.rs index b504f20..f1b483c 100644 --- a/pg_replicate/src/pipeline/data_pipeline.rs +++ b/pg_replicate/src/pipeline/data_pipeline.rs @@ -4,7 +4,9 @@ use futures::StreamExt; use tokio::pin; use tokio_postgres::types::PgLsn; -use crate::{conversions::cdc_event::CdcEvent, pipeline::sources::SourceError, table::TableId}; +use crate::{ + conversions::cdc_event::CdcEvent, pipeline::sources::CommonSourceError, table::TableId, +}; use super::{sinks::Sink, sources::Source, PipelineAction, PipelineError}; @@ -23,18 +25,24 @@ impl DataPipeline { } } - async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { + async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let table_schemas = table_schemas.clone(); if !table_schemas.is_empty() { - self.sink.write_table_schemas(table_schemas).await?; + self.sink + .write_table_schemas(table_schemas) + .await + .map_err(PipelineError::Sink)?; } Ok(()) } - async fn copy_tables(&mut self, copied_tables: &HashSet) -> Result<(), PipelineError> { + async fn copy_tables( + &mut self, + copied_tables: &HashSet, + ) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let mut keys: Vec = table_schemas.keys().copied().collect(); @@ -46,58 +54,84 @@ impl DataPipeline { continue; } - self.sink.truncate_table(table_schema.table_id).await?; + self.sink + .truncate_table(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; let table_rows = self .source .get_table_copy_stream(&table_schema.table_name, &table_schema.column_schemas) - .await?; + .await + .map_err(PipelineError::Source)?; pin!(table_rows); while let Some(row) = table_rows.next().await { - let row = row.map_err(SourceError::TableCopyStream)?; + let row = row.map_err(CommonSourceError::TableCopyStream)?; self.sink .write_table_row(row, table_schema.table_id) - .await?; + .await + .map_err(PipelineError::Sink)?; } - self.sink.table_copied(table_schema.table_id).await?; + self.sink + .table_copied(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; } - self.source.commit_transaction().await?; + self.source + .commit_transaction() + .await + .map_err(PipelineError::Source)?; Ok(()) } - async fn copy_cdc_events(&mut self, last_lsn: PgLsn) -> Result<(), PipelineError> { + async fn copy_cdc_events( + &mut self, + last_lsn: PgLsn, + ) -> Result<(), PipelineError> { let mut last_lsn: u64 = last_lsn.into(); last_lsn += 1; - let cdc_events = self.source.get_cdc_stream(last_lsn.into()).await?; + let cdc_events = self + .source + .get_cdc_stream(last_lsn.into()) + .await + .map_err(PipelineError::Source)?; pin!(cdc_events); while let Some(cdc_event) = cdc_events.next().await { - let cdc_event = cdc_event.map_err(SourceError::CdcStream)?; + let cdc_event = cdc_event.map_err(CommonSourceError::CdcStream)?; let send_status_update = if let CdcEvent::KeepAliveRequested { reply } = cdc_event { reply } else { false }; - let last_lsn = self.sink.write_cdc_event(cdc_event).await?; + let last_lsn = self + .sink + .write_cdc_event(cdc_event) + .await + .map_err(PipelineError::Sink)?; if send_status_update { cdc_events .as_mut() .send_status_update(last_lsn) .await - .map_err(|e| PipelineError::SourceError(SourceError::StatusUpdate(e)))?; + .map_err(CommonSourceError::StatusUpdate)?; } } Ok(()) } - pub async fn start(&mut self) -> Result<(), PipelineError> { - let resumption_state = self.sink.get_resumption_state().await?; + pub async fn start(&mut self) -> Result<(), PipelineError> { + let resumption_state = self + .sink + .get_resumption_state() + .await + .map_err(PipelineError::Sink)?; match self.action { PipelineAction::TableCopiesOnly => { self.copy_table_schemas().await?; diff --git a/pg_replicate/src/pipeline/mod.rs b/pg_replicate/src/pipeline/mod.rs index 6ca972b..f8e401e 100644 --- a/pg_replicate/src/pipeline/mod.rs +++ b/pg_replicate/src/pipeline/mod.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; +use sinks::SinkError; +use sources::SourceError; use thiserror::Error; use tokio_postgres::types::PgLsn; use crate::table::TableId; -use self::{sinks::SinkError, sources::SourceError}; - pub mod batching; pub mod data_pipeline; pub mod sinks; @@ -19,16 +19,19 @@ pub enum PipelineAction { Both, } +pub struct PipelineResumptionState { + pub copied_tables: HashSet, + pub last_lsn: PgLsn, +} + #[derive(Debug, Error)] -pub enum PipelineError { +pub enum PipelineError { #[error("source error: {0}")] - SourceError(#[from] SourceError), + Source(#[source] SrcErr), #[error("sink error: {0}")] - SinkError(#[from] SinkError), -} + Sink(#[source] SnkErr), -pub struct PipelineResumptionState { - pub copied_tables: HashSet, - pub last_lsn: PgLsn, + #[error("source error: {0}")] + CommonSource(#[from] sources::CommonSourceError), } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index dae47b8..4fc373c 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -33,6 +33,8 @@ pub enum BigQuerySinkError { CommitWithoutBegin, } +impl SinkError for BigQuerySinkError {} + pub struct BigQueryBatchSink { client: BigQueryClient, dataset_id: String, @@ -84,7 +86,8 @@ impl BigQueryBatchSink { #[async_trait] impl BatchSink for BigQueryBatchSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = BigQuerySinkError; + async fn get_resumption_state(&mut self) -> Result { info!("getting resumption state from bigquery"); let copied_table_column_schemas = [ColumnSchema { name: "table_id".to_string(), @@ -140,7 +143,7 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { for table_schema in table_schemas.values() { let table_name = Self::table_name_in_bq(&table_schema.table_name); self.client @@ -161,7 +164,7 @@ impl BatchSink for BigQueryBatchSink { &mut self, mut table_rows: Vec, table_id: TableId, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); @@ -177,7 +180,7 @@ impl BatchSink for BigQueryBatchSink { Ok(()) } - async fn write_cdc_events(&mut self, events: Vec) -> Result { + async fn write_cdc_events(&mut self, events: Vec) -> Result { let mut table_name_to_table_rows = HashMap::new(); let mut new_last_lsn = PgLsn::from(0); let mut final_lsn: Option = None; @@ -243,14 +246,14 @@ impl BatchSink for BigQueryBatchSink { Ok(committed_lsn) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { self.client .insert_into_copied_tables(&self.dataset_id, table_id) .await?; Ok(()) } - async fn truncate_table(&mut self, _table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, _table_id: TableId) -> Result<(), Self::Error> { Ok(()) } } diff --git a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs index 6927bd7..b7eff30 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use thiserror::Error; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{error::SendError, Receiver, Sender}; use tokio_postgres::types::{PgLsn, Type}; use tracing::error; use crate::{ clients::duckdb::DuckDbClient, conversions::{cdc_event::CdcEvent, table_row::TableRow}, - pipeline::PipelineResumptionState, + pipeline::{sinks::SinkError, PipelineResumptionState}, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -46,8 +46,16 @@ pub enum DuckDbExecutorError { #[error("commit message without begin message")] CommitWithoutBegin, + + #[error("no response received")] + NoResponseReceived, + + #[error("failed to send duckdb request")] + SendError(#[from] SendError), } +impl SinkError for DuckDbExecutorError {} + pub(super) struct DuckDbExecutor { pub(super) client: DuckDbClient, pub(super) req_receiver: Receiver, diff --git a/pg_replicate/src/pipeline/sinks/duckdb/mod.rs b/pg_replicate/src/pipeline/sinks/duckdb/mod.rs index a6cd74e..45dabe2 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/mod.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/mod.rs @@ -1,7 +1,7 @@ pub use executor::{DuckDbExecutorError, DuckDbRequest}; pub use sink::DuckDbSink; -use super::{Sink, SinkError}; +use super::Sink; mod executor; mod sink; diff --git a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs index 86b7ad2..e5a114f 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs @@ -13,8 +13,8 @@ use crate::{ }; use super::{ - executor::{DuckDbExecutor, DuckDbResponse}, - DuckDbRequest, Sink, SinkError, + executor::{DuckDbExecutor, DuckDbExecutorError, DuckDbResponse}, + DuckDbRequest, Sink, }; pub struct DuckDbSink { req_sender: Sender, @@ -84,19 +84,24 @@ impl DuckDbSink { }) } - pub async fn execute(&mut self, req: DuckDbRequest) -> Result { + pub async fn execute( + &mut self, + req: DuckDbRequest, + ) -> Result { self.req_sender.send(req).await?; if let Some(res) = self.res_receiver.recv().await { Ok(res) } else { - Err(SinkError::NoResponseReceived) + Err(DuckDbExecutorError::NoResponseReceived) } } } #[async_trait] impl Sink for DuckDbSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = DuckDbExecutorError; + + async fn get_resumption_state(&mut self) -> Result { let req = DuckDbRequest::GetResumptionState; match self.execute(req).await? { DuckDbResponse::ResumptionState(res) => { @@ -110,7 +115,7 @@ impl Sink for DuckDbSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { let req = DuckDbRequest::CreateTables(table_schemas); match self.execute(req).await? { DuckDbResponse::CreateTablesResponse(res) => { @@ -122,7 +127,11 @@ impl Sink for DuckDbSink { Ok(()) } - async fn write_table_row(&mut self, row: TableRow, table_id: TableId) -> Result<(), SinkError> { + async fn write_table_row( + &mut self, + row: TableRow, + table_id: TableId, + ) -> Result<(), Self::Error> { let req = DuckDbRequest::InsertRow(row, table_id); match self.execute(req).await? { DuckDbResponse::InsertRowResponse(res) => { @@ -133,7 +142,7 @@ impl Sink for DuckDbSink { Ok(()) } - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { let req = DuckDbRequest::HandleCdcEvent(event); let last_lsn = match self.execute(req).await? { DuckDbResponse::HandleCdcEventResponse(res) => res?, @@ -142,7 +151,7 @@ impl Sink for DuckDbSink { Ok(last_lsn) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { let req = DuckDbRequest::TableCopied(table_id); match self.execute(req).await? { DuckDbResponse::TableCopiedResponse(res) => { @@ -153,7 +162,7 @@ impl Sink for DuckDbSink { Ok(()) } - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error> { let req = DuckDbRequest::TruncateTable(table_id); match self.execute(req).await? { DuckDbResponse::TruncateTableResponse(res) => { diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index 6c2502a..5ae7968 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -1,16 +1,8 @@ use std::collections::HashMap; use async_trait::async_trait; -#[cfg(feature = "bigquery")] -use gcp_bigquery_client::error::BQError; -use thiserror::Error; -#[cfg(feature = "duckdb")] -use tokio::sync::mpsc::error::SendError; use tokio_postgres::types::PgLsn; -#[cfg(feature = "bigquery")] -use bigquery::BigQuerySinkError; - use crate::{ conversions::{cdc_event::CdcEvent, table_row::TableRow}, table::{TableId, TableSchema}, @@ -18,9 +10,6 @@ use crate::{ use super::PipelineResumptionState; -#[cfg(feature = "duckdb")] -use self::duckdb::{DuckDbExecutorError, DuckDbRequest}; - #[cfg(feature = "bigquery")] pub mod bigquery; #[cfg(feature = "duckdb")] @@ -28,55 +17,40 @@ pub mod duckdb; #[cfg(feature = "stdout")] pub mod stdout; -#[derive(Debug, Error)] -pub enum SinkError { - #[cfg(feature = "duckdb")] - #[error("failed to send duckdb request")] - SendError(#[from] SendError), - - #[cfg(feature = "duckdb")] - #[error("duckdb executor error: {0}")] - DuckDbExecutor(#[from] DuckDbExecutorError), - - #[error("no response received")] - NoResponseReceived, - - #[cfg(feature = "bigquery")] - #[error("bigquery sink error: {0}")] - BigQuerySink(#[from] BigQuerySinkError), - - //TODO: remove and use the one wrapped inside BigQuerySinkError - #[cfg(feature = "bigquery")] - #[error("bigquery error: {0}")] - BigQuery(#[from] BQError), -} +pub trait SinkError: std::error::Error + Send + Sync + 'static {} #[async_trait] pub trait Sink { - async fn get_resumption_state(&mut self) -> Result; + type Error: SinkError; + async fn get_resumption_state(&mut self) -> Result; async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError>; - async fn write_table_row(&mut self, row: TableRow, table_id: TableId) -> Result<(), SinkError>; - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result; - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError>; - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; + async fn write_table_row( + &mut self, + row: TableRow, + table_id: TableId, + ) -> Result<(), Self::Error>; + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result; + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error>; + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error>; } #[async_trait] pub trait BatchSink { - async fn get_resumption_state(&mut self) -> Result; + type Error: SinkError; + async fn get_resumption_state(&mut self) -> Result; async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, rows: Vec, table_id: TableId, - ) -> Result<(), SinkError>; - async fn write_cdc_events(&mut self, events: Vec) -> Result; - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError>; - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; + async fn write_cdc_events(&mut self, events: Vec) -> Result; + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error>; + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error>; } diff --git a/pg_replicate/src/pipeline/sinks/stdout.rs b/pg_replicate/src/pipeline/sinks/stdout.rs index 31056c9..3d0945c 100644 --- a/pg_replicate/src/pipeline/sinks/stdout.rs +++ b/pg_replicate/src/pipeline/sinks/stdout.rs @@ -1,4 +1,7 @@ -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + convert::Infallible, +}; use async_trait::async_trait; use tokio_postgres::types::PgLsn; @@ -14,9 +17,12 @@ use super::{Sink, SinkError}; pub struct StdoutSink; +impl SinkError for Infallible {} + #[async_trait] impl Sink for StdoutSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = Infallible; + async fn get_resumption_state(&mut self) -> Result { Ok(PipelineResumptionState { copied_tables: HashSet::new(), last_lsn: PgLsn::from(0), @@ -26,7 +32,7 @@ impl Sink for StdoutSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { info!("{table_schemas:?}"); Ok(()) } @@ -35,22 +41,22 @@ impl Sink for StdoutSink { &mut self, row: TableRow, _table_id: TableId, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { info!("{row:?}"); Ok(()) } - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { info!("{event:?}"); Ok(PgLsn::from(0)) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { info!("table {table_id} copied"); Ok(()) } - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error> { info!("table {table_id} truncated"); Ok(()) } diff --git a/pg_replicate/src/pipeline/sources/mod.rs b/pg_replicate/src/pipeline/sources/mod.rs index 1746ad2..037bc22 100644 --- a/pg_replicate/src/pipeline/sources/mod.rs +++ b/pg_replicate/src/pipeline/sources/mod.rs @@ -13,8 +13,10 @@ use self::postgres::{ pub mod postgres; +pub trait SourceError: std::error::Error + Send + Sync + 'static {} + #[derive(Debug, Error)] -pub enum SourceError { +pub enum CommonSourceError { #[error("source error: {0}")] Postgres(#[from] PostgresSourceError), @@ -28,17 +30,21 @@ pub enum SourceError { StatusUpdate(#[from] StatusUpdateError), } +impl SourceError for CommonSourceError {} + #[async_trait] pub trait Source { + type Error: SourceError; + fn get_table_schemas(&self) -> &HashMap; async fn get_table_copy_stream( &self, table_name: &TableName, column_schemas: &[ColumnSchema], - ) -> Result; + ) -> Result; - async fn commit_transaction(&self) -> Result<(), SourceError>; + async fn commit_transaction(&self) -> Result<(), Self::Error>; - async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result; + async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result; } diff --git a/pg_replicate/src/pipeline/sources/postgres.rs b/pg_replicate/src/pipeline/sources/postgres.rs index 987047c..d640d58 100644 --- a/pg_replicate/src/pipeline/sources/postgres.rs +++ b/pg_replicate/src/pipeline/sources/postgres.rs @@ -41,6 +41,8 @@ pub enum PostgresSourceError { MissingSlotName, } +impl SourceError for PostgresSourceError {} + pub struct PostgresSource { replication_client: ReplicationClient, table_schemas: HashMap, @@ -108,6 +110,8 @@ impl PostgresSource { #[async_trait] impl Source for PostgresSource { + type Error = PostgresSourceError; + fn get_table_schemas(&self) -> &HashMap { &self.table_schemas } @@ -116,7 +120,7 @@ impl Source for PostgresSource { &self, table_name: &TableName, column_schemas: &[ColumnSchema], - ) -> Result { + ) -> Result { info!("starting table copy stream for table {table_name}"); let stream = self @@ -131,7 +135,7 @@ impl Source for PostgresSource { }) } - async fn commit_transaction(&self) -> Result<(), SourceError> { + async fn commit_transaction(&self) -> Result<(), Self::Error> { self.replication_client .commit_txn() .await @@ -139,7 +143,7 @@ impl Source for PostgresSource { Ok(()) } - async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result { + async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result { info!("starting cdc stream at lsn {start_lsn}"); let publication = self .publication() From 09f8be83bec9e28fe785e0d25f8b79b451279a2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Fri, 30 Aug 2024 21:21:47 +1200 Subject: [PATCH 2/5] add infallible error variants for sinks and sources those have to be separate, so we can't have both impls for std's infallible --- pg_replicate/src/pipeline/sinks/mod.rs | 6 ++++++ pg_replicate/src/pipeline/sinks/stdout.rs | 11 +++-------- pg_replicate/src/pipeline/sources/mod.rs | 5 +++++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index 5ae7968..bd7b03c 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use thiserror::Error; use tokio_postgres::types::PgLsn; use crate::{ @@ -19,6 +20,11 @@ pub mod stdout; pub trait SinkError: std::error::Error + Send + Sync + 'static {} +#[derive(Debug, Error)] +#[error("unreachable")] +pub enum InfallibleSinkError {} +impl SinkError for InfallibleSinkError {} + #[async_trait] pub trait Sink { type Error: SinkError; diff --git a/pg_replicate/src/pipeline/sinks/stdout.rs b/pg_replicate/src/pipeline/sinks/stdout.rs index 3d0945c..d1aab23 100644 --- a/pg_replicate/src/pipeline/sinks/stdout.rs +++ b/pg_replicate/src/pipeline/sinks/stdout.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - convert::Infallible, -}; +use std::collections::{HashMap, HashSet}; use async_trait::async_trait; use tokio_postgres::types::PgLsn; @@ -13,15 +10,13 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::{Sink, SinkError}; +use super::{Sink, SinkError, InfallibleSinkError}; pub struct StdoutSink; -impl SinkError for Infallible {} - #[async_trait] impl Sink for StdoutSink { - type Error = Infallible; + type Error = InfallibleSinkError; async fn get_resumption_state(&mut self) -> Result { Ok(PipelineResumptionState { copied_tables: HashSet::new(), diff --git a/pg_replicate/src/pipeline/sources/mod.rs b/pg_replicate/src/pipeline/sources/mod.rs index 037bc22..4fbd2b9 100644 --- a/pg_replicate/src/pipeline/sources/mod.rs +++ b/pg_replicate/src/pipeline/sources/mod.rs @@ -15,6 +15,11 @@ pub mod postgres; pub trait SourceError: std::error::Error + Send + Sync + 'static {} +#[derive(Debug, Error)] +#[error("unreachable")] +pub enum InfallibleSourceError {} +impl SourceError for InfallibleSourceError {} + #[derive(Debug, Error)] pub enum CommonSourceError { #[error("source error: {0}")] From a4bcd326de5ccb43c2f4615704d16062f0c82af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sat, 31 Aug 2024 18:50:37 +1200 Subject: [PATCH 3/5] allow write_table_rows to handle errored rows itself --- .../src/pipeline/batching/data_pipeline.rs | 9 ++------- pg_replicate/src/pipeline/sinks/bigquery.rs | 17 +++++++++++------ pg_replicate/src/pipeline/sinks/mod.rs | 4 ++-- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 9036460..f3c2636 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, time::Instant}; use futures::StreamExt; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use crate::{ conversions::cdc_event::{CdcEvent, CdcEventConversionError}, @@ -84,13 +84,8 @@ impl BatchDataPipeline { while let Some(batch) = batch_timeout_stream.next().await { info!("got {} table copy events in a batch", batch.len()); - //TODO: Avoid a vec copy - let mut rows = Vec::with_capacity(batch.len()); - for row in batch { - rows.push(row.map_err(CommonSourceError::TableCopyStream)?); - } self.sink - .write_table_rows(rows, table_schema.table_id) + .write_table_rows(batch, table_schema.table_id) .await .map_err(PipelineError::Sink)?; } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index 4fc373c..8c65385 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -9,7 +9,7 @@ use tracing::info; use crate::{ clients::bigquery::BigQueryClient, conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell}, - pipeline::PipelineResumptionState, + pipeline::{PipelineResumptionState, sources::postgres::TableCopyStreamError}, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -162,19 +162,24 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_rows( &mut self, - mut table_rows: Vec, + mut table_rows: Vec>, table_id: TableId, ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); - for table_row in &mut table_rows { - table_row.values.push(Cell::String("UPSERT".to_string())); - } + let new_rows = table_rows + .drain(..) + .filter_map(|row| row.ok()) + .map(|mut row| { + row.values.push(Cell::String("UPSERT".to_string())); + row + }) + .collect::>(); self.client - .stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows) + .stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows) .await?; Ok(()) diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index bd7b03c..8ef876e 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -9,7 +9,7 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::PipelineResumptionState; +use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState}; #[cfg(feature = "bigquery")] pub mod bigquery; @@ -53,7 +53,7 @@ pub trait BatchSink { ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, - rows: Vec, + rows: Vec>, table_id: TableId, ) -> Result<(), Self::Error>; async fn write_cdc_events(&mut self, events: Vec) -> Result; From 15fecf63992136e44924e6ce71eea3b21347804d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Fri, 29 Nov 2024 20:34:21 +1300 Subject: [PATCH 4/5] format --- pg_replicate/src/pipeline/batching/data_pipeline.rs | 2 +- pg_replicate/src/pipeline/sinks/bigquery.rs | 2 +- pg_replicate/src/pipeline/sinks/stdout.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index f3c2636..69626fe 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, time::Instant}; use futures::StreamExt; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{debug, info, warn}; +use tracing::{debug, info}; use crate::{ conversions::cdc_event::{CdcEvent, CdcEventConversionError}, diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index 8c65385..f598254 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -9,7 +9,7 @@ use tracing::info; use crate::{ clients::bigquery::BigQueryClient, conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell}, - pipeline::{PipelineResumptionState, sources::postgres::TableCopyStreamError}, + pipeline::{sources::postgres::TableCopyStreamError, PipelineResumptionState}, table::{ColumnSchema, TableId, TableName, TableSchema}, }; diff --git a/pg_replicate/src/pipeline/sinks/stdout.rs b/pg_replicate/src/pipeline/sinks/stdout.rs index d1aab23..a8d3579 100644 --- a/pg_replicate/src/pipeline/sinks/stdout.rs +++ b/pg_replicate/src/pipeline/sinks/stdout.rs @@ -10,7 +10,7 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::{Sink, SinkError, InfallibleSinkError}; +use super::{InfallibleSinkError, Sink, SinkError}; pub struct StdoutSink; From 388e84dc1e516decc4e72416813b1f843262a80f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Fri, 29 Nov 2024 21:00:30 +1300 Subject: [PATCH 5/5] revert unrelated write_table_rows change --- .../src/pipeline/batching/data_pipeline.rs | 7 ++++++- pg_replicate/src/pipeline/sinks/bigquery.rs | 17 ++++++----------- pg_replicate/src/pipeline/sinks/mod.rs | 4 ++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 69626fe..9036460 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -84,8 +84,13 @@ impl BatchDataPipeline { while let Some(batch) = batch_timeout_stream.next().await { info!("got {} table copy events in a batch", batch.len()); + //TODO: Avoid a vec copy + let mut rows = Vec::with_capacity(batch.len()); + for row in batch { + rows.push(row.map_err(CommonSourceError::TableCopyStream)?); + } self.sink - .write_table_rows(batch, table_schema.table_id) + .write_table_rows(rows, table_schema.table_id) .await .map_err(PipelineError::Sink)?; } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index f598254..4fc373c 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -9,7 +9,7 @@ use tracing::info; use crate::{ clients::bigquery::BigQueryClient, conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell}, - pipeline::{sources::postgres::TableCopyStreamError, PipelineResumptionState}, + pipeline::PipelineResumptionState, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -162,24 +162,19 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_rows( &mut self, - mut table_rows: Vec>, + mut table_rows: Vec, table_id: TableId, ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); - let new_rows = table_rows - .drain(..) - .filter_map(|row| row.ok()) - .map(|mut row| { - row.values.push(Cell::String("UPSERT".to_string())); - row - }) - .collect::>(); + for table_row in &mut table_rows { + table_row.values.push(Cell::String("UPSERT".to_string())); + } self.client - .stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows) + .stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows) .await?; Ok(()) diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index 8ef876e..bd7b03c 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -9,7 +9,7 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState}; +use super::PipelineResumptionState; #[cfg(feature = "bigquery")] pub mod bigquery; @@ -53,7 +53,7 @@ pub trait BatchSink { ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, - rows: Vec>, + rows: Vec, table_id: TableId, ) -> Result<(), Self::Error>; async fn write_cdc_events(&mut self, events: Vec) -> Result;