From cd266bd46d38332166bae97a3439d9eac474b3dc Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 14 Nov 2024 23:27:57 +0800 Subject: [PATCH 1/3] chore: add tool-versions --- .tool-versions | 1 + 1 file changed, 1 insertion(+) create mode 100644 .tool-versions diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..5b6ed22 --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +rust 1.82.0 From 24575a528f1506fbcaaa43594293a9b1fd1204c4 Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 14 Nov 2024 23:28:08 +0800 Subject: [PATCH 2/3] docs: add contributing section --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index b36285e..7c98da3 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,12 @@ See the [open issues](https://github.com/imor/pg_replicate/issues) for a full li Distributed under the Apache-2.0 License. See `LICENSE` for more information. +## Contributing + +If using the [asdf](https://asdf-vm.com/) package manager, run `asdf install`. + +Otherwise, ensure that dependencies match `.tool-versions.` + ## Docker To create the docker image for `replicator` run `docker build -f ./replicator/Dockerfile .` from the root of the repo. Similarly, to create the docker image for `api` run `docker build -f ./api/Dockerfile .`. From 4ed3d6e5772a735cb82c99ea85b8f490a8fc2eed Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 14 Nov 2024 23:48:35 +0800 Subject: [PATCH 3/3] feat: inital clickhouse client impl --- Cargo.toml | 1 + pg_replicate/Cargo.toml | 2 + pg_replicate/src/clients/clickhouse.rs | 89 ++++++++++++++++++++++++++ pg_replicate/src/clients/mod.rs | 2 + 4 files changed, 94 insertions(+) create mode 100644 pg_replicate/src/clients/clickhouse.rs diff --git a/Cargo.toml b/Cargo.toml index 8499657..423f81e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ tracing-subscriber = { version = "0.3", default-features = false } utoipa = { version = "4.2.3", default-features = false } utoipa-swagger-ui = { version = "7.1.0", default-features = false } uuid = { version = "1.10.0", default-features = false } +clickhouse = { version = "0.12.2", default-features = false} # [patch.crates-io] # gcp-bigquery-client = { path = "../gcp-bigquery-client" } diff --git a/pg_replicate/Cargo.toml b/pg_replicate/Cargo.toml index d8619eb..de8576a 100644 --- a/pg_replicate/Cargo.toml +++ b/pg_replicate/Cargo.toml @@ -23,6 +23,7 @@ bigdecimal = { workspace = true, features = ["std"] } bytes = { workspace = true } byteorder = { workspace = true } chrono = { workspace = true, features = ["serde"] } +clickhouse = { workspace = true, default-features = true, optional= true} duckdb = { workspace = true, optional = true } futures = { workspace = true } gcp-bigquery-client = { workspace = true, optional = true, features = [ @@ -59,6 +60,7 @@ tracing-subscriber = { workspace = true, default-features = true, features = [ [features] bigquery = ["dep:gcp-bigquery-client", "dep:prost"] +clickhouse = ["dep:clickhouse"] duckdb = ["dep:duckdb"] stdout = [] # When enabled converts unknown types to bytes diff --git a/pg_replicate/src/clients/clickhouse.rs b/pg_replicate/src/clients/clickhouse.rs new file mode 100644 index 0000000..e52b1c7 --- /dev/null +++ b/pg_replicate/src/clients/clickhouse.rs @@ -0,0 +1,89 @@ + +use clickhouse::Client; + +// use crate::{ +// conversions::{table_row::TableRow, Cell}, +// table::{ColumnSchema, TableId, TableName, TableSchema}, +// }; + + +pub struct ClickhouseClient { + client: Client, +} + +pub struct ClickhouseConfig { + url: String, + user: String, + pasword: String, + database: String +} + +impl ClickhouseClient { + pub fn create_client(config: &ClickhouseConfig) -> Result { + let client = Client::default() + // should include both protocol and port + .with_url(config.url) + .with_user(config.user) + .with_password(config.password) + .with_database(config.database); + Ok(ClickhouseClient { + client + }) + } + + // fn async_insert(client: &Client) -> + + + + pub fn create_table_if_missing( + &self, + table_name: &TableName, + column_schemas: &[ColumnSchema], + ) -> Result { + if self.table_exists(table_name)? { + Ok(false) + } else { + self.create_table(table_name, column_schemas)?; + Ok(true) + } + } + + + + pub fn create_table( + &self, + table_name: &TableName, + column_schemas: &[ColumnSchema], + ) -> Result<(), clickhouse::error::Error> { + let columns_spec = Self::create_columns_spec(column_schemas); + let query = format!( + "create table {}.{} {}", + table_name.schema, table_name.name, columns_spec + ); + self.conn.execute(&query, [])?; + Ok(()) + } + + pub fn table_exists(&self, table_name: &TableName) -> Result { + let query = + "select * from information_schema.tables where table_catalog = ? and table_schema = ? and table_name = ?;"; + let mut stmt = self.conn.prepare(query)?; + let exists = stmt.exists([&self.current_database, &table_name.schema, &table_name.name])?; + Ok(exists) + } + + pub fn insert_rows( + &self, + table_name: &TableName, + table_rows: &Vec, + ) -> Result<(), clickhouse::error::Error> { + let table_name = format!("{}.{}", table_name.schema, table_name.name); + let column_count = table_row.values.len(); + let query = Self::create_insert_row_query(&table_name, column_count); + let mut stmt = self.conn.prepare(&query)?; + stmt.execute(params_from_iter(table_row.values.iter()))?; + + Ok(()) + } + +} \ No newline at end of file diff --git a/pg_replicate/src/clients/mod.rs b/pg_replicate/src/clients/mod.rs index 115324a..ddfbc30 100644 --- a/pg_replicate/src/clients/mod.rs +++ b/pg_replicate/src/clients/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "clickhouse")] +pub mod clickhouse; #[cfg(feature = "bigquery")] pub mod bigquery; #[cfg(feature = "duckdb")]