Skip to content

Commit

Permalink
add --motherduck-db-name option to allow the user to pass db name
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 8, 2024
1 parent ac8b7f6 commit 5fd56d1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 12 deletions.
28 changes: 20 additions & 8 deletions pg_replicate/examples/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,24 @@ struct DbArgs {
}

#[derive(Debug, clap::Args)]
#[group(required = true, multiple = false)]
pub struct DuckDbOptions {
/// DuckDb file name
#[clap(long)]
duckdb_file: Option<String>,

/// motherduck access token
#[clap(flatten)]
motherduck: Option<MotherDuckOptions>,
}

#[derive(Debug, Args)]
struct MotherDuckOptions {
/// MotherDuck access token
#[clap(long)]
motherduck_access_token: Option<String>,
motherduck_access_token: String,

/// MotherDuck database name
#[clap(long)]
motherduck_db_name: String,
}

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -122,12 +131,15 @@ async fn main_impl() -> Result<(), Box<dyn Error>> {
}
};

let duckdb_sink = match (
db_args.duckdb.duckdb_file,
db_args.duckdb.motherduck_access_token,
) {
let duckdb_sink = match (db_args.duckdb.duckdb_file, db_args.duckdb.motherduck) {
(Some(duckdb_file), None) => DuckDbSink::file(duckdb_file).await?,
(None, Some(access_token)) => DuckDbSink::mother_duck(&access_token).await?,
(None, Some(motherduck)) => {
DuckDbSink::mother_duck(
&motherduck.motherduck_access_token,
&motherduck.motherduck_db_name,
)
.await?
}
_ => {
unreachable!()
}
Expand Down
7 changes: 5 additions & 2 deletions pg_replicate/src/clients/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ impl DuckDbClient {
Ok(DuckDbClient { conn })
}

pub fn open_mother_duck(access_token: &str) -> Result<DuckDbClient, duckdb::Error> {
pub fn open_mother_duck(
access_token: &str,
db_name: &str,
) -> Result<DuckDbClient, duckdb::Error> {
let conf = Config::default()
.with("motherduck_token", access_token)?
.with("custom_user_agent", "pg_replicate")?;

let conn = Connection::open_with_flags("md:my_db", conf)?;
let conn = Connection::open_with_flags(format!("md:{db_name}"), conf)?;
Ok(DuckDbClient { conn })
}

Expand Down
7 changes: 5 additions & 2 deletions pg_replicate/src/pipeline/sinks/duckdb/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ impl DuckDbSink {
})
}

pub async fn mother_duck(access_token: &str) -> Result<DuckDbSink, duckdb::Error> {
pub async fn mother_duck(
access_token: &str,
db_name: &str,
) -> Result<DuckDbSink, duckdb::Error> {
let (req_sender, req_receiver) = channel(CHANNEL_SIZE);
let (res_sender, res_receiver) = channel(CHANNEL_SIZE);
let client = DuckDbClient::open_mother_duck(access_token)?;
let client = DuckDbClient::open_mother_duck(access_token, db_name)?;
let executor = DuckDbExecutor {
client,
req_receiver,
Expand Down

0 comments on commit 5fd56d1

Please sign in to comment.