diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 8dd7ea4..9036460 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -10,7 +10,7 @@ use crate::{ pipeline::{ batching::stream::BatchTimeoutStream, sinks::BatchSink, - sources::{postgres::CdcStreamError, Source, SourceError}, + sources::{postgres::CdcStreamError, CommonSourceError, Source}, PipelineAction, PipelineError, }, table::TableId, @@ -35,18 +35,24 @@ impl BatchDataPipeline { } } - async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { + async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let table_schemas = table_schemas.clone(); if !table_schemas.is_empty() { - self.sink.write_table_schemas(table_schemas).await?; + self.sink + .write_table_schemas(table_schemas) + .await + .map_err(PipelineError::Sink)?; } Ok(()) } - async fn copy_tables(&mut self, copied_tables: &HashSet) -> Result<(), PipelineError> { + async fn copy_tables( + &mut self, + copied_tables: &HashSet, + ) -> Result<(), PipelineError> { let start = Instant::now(); let table_schemas = self.source.get_table_schemas(); @@ -60,12 +66,16 @@ impl BatchDataPipeline { continue; } - self.sink.truncate_table(table_schema.table_id).await?; + self.sink + .truncate_table(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; let table_rows = self .source .get_table_copy_stream(&table_schema.table_name, &table_schema.column_schemas) - .await?; + .await + .map_err(PipelineError::Source)?; let batch_timeout_stream = BatchTimeoutStream::new(table_rows, self.batch_config.clone()); @@ -77,16 +87,23 @@ impl BatchDataPipeline { //TODO: Avoid a vec copy let mut rows = Vec::with_capacity(batch.len()); for row in batch { - rows.push(row.map_err(SourceError::TableCopyStream)?); + rows.push(row.map_err(CommonSourceError::TableCopyStream)?); } self.sink .write_table_rows(rows, table_schema.table_id) - .await?; + .await + .map_err(PipelineError::Sink)?; } - self.sink.table_copied(table_schema.table_id).await?; + self.sink + .table_copied(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; } - self.source.commit_transaction().await?; + self.source + .commit_transaction() + .await + .map_err(PipelineError::Source)?; let end = Instant::now(); let seconds = (end - start).as_secs(); @@ -95,10 +112,17 @@ impl BatchDataPipeline { Ok(()) } - async fn copy_cdc_events(&mut self, last_lsn: PgLsn) -> Result<(), PipelineError> { + async fn copy_cdc_events( + &mut self, + last_lsn: PgLsn, + ) -> Result<(), PipelineError> { let mut last_lsn: u64 = last_lsn.into(); last_lsn += 1; - let cdc_events = self.source.get_cdc_stream(last_lsn.into()).await?; + let cdc_events = self + .source + .get_cdc_stream(last_lsn.into()) + .await + .map_err(PipelineError::Source)?; pin!(cdc_events); @@ -117,13 +141,17 @@ impl BatchDataPipeline { { continue; } - let event = event.map_err(SourceError::CdcStream)?; + let event = event.map_err(CommonSourceError::CdcStream)?; if let CdcEvent::KeepAliveRequested { reply } = event { send_status_update = reply; }; events.push(event); } - let last_lsn = self.sink.write_cdc_events(events).await?; + let last_lsn = self + .sink + .write_cdc_events(events) + .await + .map_err(PipelineError::Sink)?; if send_status_update { info!("sending status update with lsn: {last_lsn}"); let inner = unsafe { @@ -136,15 +164,20 @@ impl BatchDataPipeline { .as_mut() .send_status_update(last_lsn) .await - .map_err(|e| PipelineError::SourceError(SourceError::StatusUpdate(e)))?; + .map_err(CommonSourceError::StatusUpdate)?; } } Ok(()) } - pub async fn start(&mut self) -> Result<(), PipelineError> { - let resumption_state = self.sink.get_resumption_state().await?; + pub async fn start(&mut self) -> Result<(), PipelineError> { + let resumption_state = self + .sink + .get_resumption_state() + .await + .map_err(PipelineError::Sink)?; + match self.action { PipelineAction::TableCopiesOnly => { self.copy_table_schemas().await?; diff --git a/pg_replicate/src/pipeline/data_pipeline.rs b/pg_replicate/src/pipeline/data_pipeline.rs index b504f20..f1b483c 100644 --- a/pg_replicate/src/pipeline/data_pipeline.rs +++ b/pg_replicate/src/pipeline/data_pipeline.rs @@ -4,7 +4,9 @@ use futures::StreamExt; use tokio::pin; use tokio_postgres::types::PgLsn; -use crate::{conversions::cdc_event::CdcEvent, pipeline::sources::SourceError, table::TableId}; +use crate::{ + conversions::cdc_event::CdcEvent, pipeline::sources::CommonSourceError, table::TableId, +}; use super::{sinks::Sink, sources::Source, PipelineAction, PipelineError}; @@ -23,18 +25,24 @@ impl DataPipeline { } } - async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { + async fn copy_table_schemas(&mut self) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let table_schemas = table_schemas.clone(); if !table_schemas.is_empty() { - self.sink.write_table_schemas(table_schemas).await?; + self.sink + .write_table_schemas(table_schemas) + .await + .map_err(PipelineError::Sink)?; } Ok(()) } - async fn copy_tables(&mut self, copied_tables: &HashSet) -> Result<(), PipelineError> { + async fn copy_tables( + &mut self, + copied_tables: &HashSet, + ) -> Result<(), PipelineError> { let table_schemas = self.source.get_table_schemas(); let mut keys: Vec = table_schemas.keys().copied().collect(); @@ -46,58 +54,84 @@ impl DataPipeline { continue; } - self.sink.truncate_table(table_schema.table_id).await?; + self.sink + .truncate_table(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; let table_rows = self .source .get_table_copy_stream(&table_schema.table_name, &table_schema.column_schemas) - .await?; + .await + .map_err(PipelineError::Source)?; pin!(table_rows); while let Some(row) = table_rows.next().await { - let row = row.map_err(SourceError::TableCopyStream)?; + let row = row.map_err(CommonSourceError::TableCopyStream)?; self.sink .write_table_row(row, table_schema.table_id) - .await?; + .await + .map_err(PipelineError::Sink)?; } - self.sink.table_copied(table_schema.table_id).await?; + self.sink + .table_copied(table_schema.table_id) + .await + .map_err(PipelineError::Sink)?; } - self.source.commit_transaction().await?; + self.source + .commit_transaction() + .await + .map_err(PipelineError::Source)?; Ok(()) } - async fn copy_cdc_events(&mut self, last_lsn: PgLsn) -> Result<(), PipelineError> { + async fn copy_cdc_events( + &mut self, + last_lsn: PgLsn, + ) -> Result<(), PipelineError> { let mut last_lsn: u64 = last_lsn.into(); last_lsn += 1; - let cdc_events = self.source.get_cdc_stream(last_lsn.into()).await?; + let cdc_events = self + .source + .get_cdc_stream(last_lsn.into()) + .await + .map_err(PipelineError::Source)?; pin!(cdc_events); while let Some(cdc_event) = cdc_events.next().await { - let cdc_event = cdc_event.map_err(SourceError::CdcStream)?; + let cdc_event = cdc_event.map_err(CommonSourceError::CdcStream)?; let send_status_update = if let CdcEvent::KeepAliveRequested { reply } = cdc_event { reply } else { false }; - let last_lsn = self.sink.write_cdc_event(cdc_event).await?; + let last_lsn = self + .sink + .write_cdc_event(cdc_event) + .await + .map_err(PipelineError::Sink)?; if send_status_update { cdc_events .as_mut() .send_status_update(last_lsn) .await - .map_err(|e| PipelineError::SourceError(SourceError::StatusUpdate(e)))?; + .map_err(CommonSourceError::StatusUpdate)?; } } Ok(()) } - pub async fn start(&mut self) -> Result<(), PipelineError> { - let resumption_state = self.sink.get_resumption_state().await?; + pub async fn start(&mut self) -> Result<(), PipelineError> { + let resumption_state = self + .sink + .get_resumption_state() + .await + .map_err(PipelineError::Sink)?; match self.action { PipelineAction::TableCopiesOnly => { self.copy_table_schemas().await?; diff --git a/pg_replicate/src/pipeline/mod.rs b/pg_replicate/src/pipeline/mod.rs index 6ca972b..f8e401e 100644 --- a/pg_replicate/src/pipeline/mod.rs +++ b/pg_replicate/src/pipeline/mod.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; +use sinks::SinkError; +use sources::SourceError; use thiserror::Error; use tokio_postgres::types::PgLsn; use crate::table::TableId; -use self::{sinks::SinkError, sources::SourceError}; - pub mod batching; pub mod data_pipeline; pub mod sinks; @@ -19,16 +19,19 @@ pub enum PipelineAction { Both, } +pub struct PipelineResumptionState { + pub copied_tables: HashSet, + pub last_lsn: PgLsn, +} + #[derive(Debug, Error)] -pub enum PipelineError { +pub enum PipelineError { #[error("source error: {0}")] - SourceError(#[from] SourceError), + Source(#[source] SrcErr), #[error("sink error: {0}")] - SinkError(#[from] SinkError), -} + Sink(#[source] SnkErr), -pub struct PipelineResumptionState { - pub copied_tables: HashSet, - pub last_lsn: PgLsn, + #[error("source error: {0}")] + CommonSource(#[from] sources::CommonSourceError), } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index dae47b8..4fc373c 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -33,6 +33,8 @@ pub enum BigQuerySinkError { CommitWithoutBegin, } +impl SinkError for BigQuerySinkError {} + pub struct BigQueryBatchSink { client: BigQueryClient, dataset_id: String, @@ -84,7 +86,8 @@ impl BigQueryBatchSink { #[async_trait] impl BatchSink for BigQueryBatchSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = BigQuerySinkError; + async fn get_resumption_state(&mut self) -> Result { info!("getting resumption state from bigquery"); let copied_table_column_schemas = [ColumnSchema { name: "table_id".to_string(), @@ -140,7 +143,7 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { for table_schema in table_schemas.values() { let table_name = Self::table_name_in_bq(&table_schema.table_name); self.client @@ -161,7 +164,7 @@ impl BatchSink for BigQueryBatchSink { &mut self, mut table_rows: Vec, table_id: TableId, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); @@ -177,7 +180,7 @@ impl BatchSink for BigQueryBatchSink { Ok(()) } - async fn write_cdc_events(&mut self, events: Vec) -> Result { + async fn write_cdc_events(&mut self, events: Vec) -> Result { let mut table_name_to_table_rows = HashMap::new(); let mut new_last_lsn = PgLsn::from(0); let mut final_lsn: Option = None; @@ -243,14 +246,14 @@ impl BatchSink for BigQueryBatchSink { Ok(committed_lsn) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { self.client .insert_into_copied_tables(&self.dataset_id, table_id) .await?; Ok(()) } - async fn truncate_table(&mut self, _table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, _table_id: TableId) -> Result<(), Self::Error> { Ok(()) } } diff --git a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs index 6927bd7..b7eff30 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/executor.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/executor.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use thiserror::Error; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{error::SendError, Receiver, Sender}; use tokio_postgres::types::{PgLsn, Type}; use tracing::error; use crate::{ clients::duckdb::DuckDbClient, conversions::{cdc_event::CdcEvent, table_row::TableRow}, - pipeline::PipelineResumptionState, + pipeline::{sinks::SinkError, PipelineResumptionState}, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -46,8 +46,16 @@ pub enum DuckDbExecutorError { #[error("commit message without begin message")] CommitWithoutBegin, + + #[error("no response received")] + NoResponseReceived, + + #[error("failed to send duckdb request")] + SendError(#[from] SendError), } +impl SinkError for DuckDbExecutorError {} + pub(super) struct DuckDbExecutor { pub(super) client: DuckDbClient, pub(super) req_receiver: Receiver, diff --git a/pg_replicate/src/pipeline/sinks/duckdb/mod.rs b/pg_replicate/src/pipeline/sinks/duckdb/mod.rs index a6cd74e..45dabe2 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/mod.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/mod.rs @@ -1,7 +1,7 @@ pub use executor::{DuckDbExecutorError, DuckDbRequest}; pub use sink::DuckDbSink; -use super::{Sink, SinkError}; +use super::Sink; mod executor; mod sink; diff --git a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs index 86b7ad2..e5a114f 100644 --- a/pg_replicate/src/pipeline/sinks/duckdb/sink.rs +++ b/pg_replicate/src/pipeline/sinks/duckdb/sink.rs @@ -13,8 +13,8 @@ use crate::{ }; use super::{ - executor::{DuckDbExecutor, DuckDbResponse}, - DuckDbRequest, Sink, SinkError, + executor::{DuckDbExecutor, DuckDbExecutorError, DuckDbResponse}, + DuckDbRequest, Sink, }; pub struct DuckDbSink { req_sender: Sender, @@ -84,19 +84,24 @@ impl DuckDbSink { }) } - pub async fn execute(&mut self, req: DuckDbRequest) -> Result { + pub async fn execute( + &mut self, + req: DuckDbRequest, + ) -> Result { self.req_sender.send(req).await?; if let Some(res) = self.res_receiver.recv().await { Ok(res) } else { - Err(SinkError::NoResponseReceived) + Err(DuckDbExecutorError::NoResponseReceived) } } } #[async_trait] impl Sink for DuckDbSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = DuckDbExecutorError; + + async fn get_resumption_state(&mut self) -> Result { let req = DuckDbRequest::GetResumptionState; match self.execute(req).await? { DuckDbResponse::ResumptionState(res) => { @@ -110,7 +115,7 @@ impl Sink for DuckDbSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { let req = DuckDbRequest::CreateTables(table_schemas); match self.execute(req).await? { DuckDbResponse::CreateTablesResponse(res) => { @@ -122,7 +127,11 @@ impl Sink for DuckDbSink { Ok(()) } - async fn write_table_row(&mut self, row: TableRow, table_id: TableId) -> Result<(), SinkError> { + async fn write_table_row( + &mut self, + row: TableRow, + table_id: TableId, + ) -> Result<(), Self::Error> { let req = DuckDbRequest::InsertRow(row, table_id); match self.execute(req).await? { DuckDbResponse::InsertRowResponse(res) => { @@ -133,7 +142,7 @@ impl Sink for DuckDbSink { Ok(()) } - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { let req = DuckDbRequest::HandleCdcEvent(event); let last_lsn = match self.execute(req).await? { DuckDbResponse::HandleCdcEventResponse(res) => res?, @@ -142,7 +151,7 @@ impl Sink for DuckDbSink { Ok(last_lsn) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { let req = DuckDbRequest::TableCopied(table_id); match self.execute(req).await? { DuckDbResponse::TableCopiedResponse(res) => { @@ -153,7 +162,7 @@ impl Sink for DuckDbSink { Ok(()) } - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error> { let req = DuckDbRequest::TruncateTable(table_id); match self.execute(req).await? { DuckDbResponse::TruncateTableResponse(res) => { diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index 6c2502a..bd7b03c 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -1,16 +1,9 @@ use std::collections::HashMap; use async_trait::async_trait; -#[cfg(feature = "bigquery")] -use gcp_bigquery_client::error::BQError; use thiserror::Error; -#[cfg(feature = "duckdb")] -use tokio::sync::mpsc::error::SendError; use tokio_postgres::types::PgLsn; -#[cfg(feature = "bigquery")] -use bigquery::BigQuerySinkError; - use crate::{ conversions::{cdc_event::CdcEvent, table_row::TableRow}, table::{TableId, TableSchema}, @@ -18,9 +11,6 @@ use crate::{ use super::PipelineResumptionState; -#[cfg(feature = "duckdb")] -use self::duckdb::{DuckDbExecutorError, DuckDbRequest}; - #[cfg(feature = "bigquery")] pub mod bigquery; #[cfg(feature = "duckdb")] @@ -28,55 +18,45 @@ pub mod duckdb; #[cfg(feature = "stdout")] pub mod stdout; -#[derive(Debug, Error)] -pub enum SinkError { - #[cfg(feature = "duckdb")] - #[error("failed to send duckdb request")] - SendError(#[from] SendError), - - #[cfg(feature = "duckdb")] - #[error("duckdb executor error: {0}")] - DuckDbExecutor(#[from] DuckDbExecutorError), - - #[error("no response received")] - NoResponseReceived, +pub trait SinkError: std::error::Error + Send + Sync + 'static {} - #[cfg(feature = "bigquery")] - #[error("bigquery sink error: {0}")] - BigQuerySink(#[from] BigQuerySinkError), - - //TODO: remove and use the one wrapped inside BigQuerySinkError - #[cfg(feature = "bigquery")] - #[error("bigquery error: {0}")] - BigQuery(#[from] BQError), -} +#[derive(Debug, Error)] +#[error("unreachable")] +pub enum InfallibleSinkError {} +impl SinkError for InfallibleSinkError {} #[async_trait] pub trait Sink { - async fn get_resumption_state(&mut self) -> Result; + type Error: SinkError; + async fn get_resumption_state(&mut self) -> Result; async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError>; - async fn write_table_row(&mut self, row: TableRow, table_id: TableId) -> Result<(), SinkError>; - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result; - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError>; - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; + async fn write_table_row( + &mut self, + row: TableRow, + table_id: TableId, + ) -> Result<(), Self::Error>; + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result; + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error>; + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error>; } #[async_trait] pub trait BatchSink { - async fn get_resumption_state(&mut self) -> Result; + type Error: SinkError; + async fn get_resumption_state(&mut self) -> Result; async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, rows: Vec, table_id: TableId, - ) -> Result<(), SinkError>; - async fn write_cdc_events(&mut self, events: Vec) -> Result; - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError>; - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError>; + ) -> Result<(), Self::Error>; + async fn write_cdc_events(&mut self, events: Vec) -> Result; + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error>; + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error>; } diff --git a/pg_replicate/src/pipeline/sinks/stdout.rs b/pg_replicate/src/pipeline/sinks/stdout.rs index 31056c9..a8d3579 100644 --- a/pg_replicate/src/pipeline/sinks/stdout.rs +++ b/pg_replicate/src/pipeline/sinks/stdout.rs @@ -10,13 +10,14 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::{Sink, SinkError}; +use super::{InfallibleSinkError, Sink, SinkError}; pub struct StdoutSink; #[async_trait] impl Sink for StdoutSink { - async fn get_resumption_state(&mut self) -> Result { + type Error = InfallibleSinkError; + async fn get_resumption_state(&mut self) -> Result { Ok(PipelineResumptionState { copied_tables: HashSet::new(), last_lsn: PgLsn::from(0), @@ -26,7 +27,7 @@ impl Sink for StdoutSink { async fn write_table_schemas( &mut self, table_schemas: HashMap, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { info!("{table_schemas:?}"); Ok(()) } @@ -35,22 +36,22 @@ impl Sink for StdoutSink { &mut self, row: TableRow, _table_id: TableId, - ) -> Result<(), SinkError> { + ) -> Result<(), Self::Error> { info!("{row:?}"); Ok(()) } - async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { + async fn write_cdc_event(&mut self, event: CdcEvent) -> Result { info!("{event:?}"); Ok(PgLsn::from(0)) } - async fn table_copied(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn table_copied(&mut self, table_id: TableId) -> Result<(), Self::Error> { info!("table {table_id} copied"); Ok(()) } - async fn truncate_table(&mut self, table_id: TableId) -> Result<(), SinkError> { + async fn truncate_table(&mut self, table_id: TableId) -> Result<(), Self::Error> { info!("table {table_id} truncated"); Ok(()) } diff --git a/pg_replicate/src/pipeline/sources/mod.rs b/pg_replicate/src/pipeline/sources/mod.rs index 1746ad2..4fbd2b9 100644 --- a/pg_replicate/src/pipeline/sources/mod.rs +++ b/pg_replicate/src/pipeline/sources/mod.rs @@ -13,8 +13,15 @@ use self::postgres::{ pub mod postgres; +pub trait SourceError: std::error::Error + Send + Sync + 'static {} + +#[derive(Debug, Error)] +#[error("unreachable")] +pub enum InfallibleSourceError {} +impl SourceError for InfallibleSourceError {} + #[derive(Debug, Error)] -pub enum SourceError { +pub enum CommonSourceError { #[error("source error: {0}")] Postgres(#[from] PostgresSourceError), @@ -28,17 +35,21 @@ pub enum SourceError { StatusUpdate(#[from] StatusUpdateError), } +impl SourceError for CommonSourceError {} + #[async_trait] pub trait Source { + type Error: SourceError; + fn get_table_schemas(&self) -> &HashMap; async fn get_table_copy_stream( &self, table_name: &TableName, column_schemas: &[ColumnSchema], - ) -> Result; + ) -> Result; - async fn commit_transaction(&self) -> Result<(), SourceError>; + async fn commit_transaction(&self) -> Result<(), Self::Error>; - async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result; + async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result; } diff --git a/pg_replicate/src/pipeline/sources/postgres.rs b/pg_replicate/src/pipeline/sources/postgres.rs index 987047c..d640d58 100644 --- a/pg_replicate/src/pipeline/sources/postgres.rs +++ b/pg_replicate/src/pipeline/sources/postgres.rs @@ -41,6 +41,8 @@ pub enum PostgresSourceError { MissingSlotName, } +impl SourceError for PostgresSourceError {} + pub struct PostgresSource { replication_client: ReplicationClient, table_schemas: HashMap, @@ -108,6 +110,8 @@ impl PostgresSource { #[async_trait] impl Source for PostgresSource { + type Error = PostgresSourceError; + fn get_table_schemas(&self) -> &HashMap { &self.table_schemas } @@ -116,7 +120,7 @@ impl Source for PostgresSource { &self, table_name: &TableName, column_schemas: &[ColumnSchema], - ) -> Result { + ) -> Result { info!("starting table copy stream for table {table_name}"); let stream = self @@ -131,7 +135,7 @@ impl Source for PostgresSource { }) } - async fn commit_transaction(&self) -> Result<(), SourceError> { + async fn commit_transaction(&self) -> Result<(), Self::Error> { self.replication_client .commit_txn() .await @@ -139,7 +143,7 @@ impl Source for PostgresSource { Ok(()) } - async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result { + async fn get_cdc_stream(&self, start_lsn: PgLsn) -> Result { info!("starting cdc stream at lsn {start_lsn}"); let publication = self .publication()