diff --git a/cli/src/api_client.rs b/cli/src/api_client.rs index e024898..efa7866 100644 --- a/cli/src/api_client.rs +++ b/cli/src/api_client.rs @@ -34,6 +34,24 @@ pub enum SourceConfig { }, } +impl Display for SourceConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let SourceConfig::Postgres { + host, + port, + name, + username, + password, + slot_name, + publication, + } = self; + write!( + f, + "host: {host}, port: {port}, name: {name}, username: {username}, password: {password:?}, slot_name: {slot_name}, publication: {publication}", + ) + } +} + #[derive(serde::Serialize, serde::Deserialize)] pub enum SinkConfig { BigQuery { @@ -48,11 +66,28 @@ pub enum SinkConfig { }, } +impl Display for SinkConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let SinkConfig::BigQuery { + project_id, + dataset_id, + service_account_key, + } = self; + write!(f, "project_id: {project_id}, dataset_id: {dataset_id}, service_account_key: {service_account_key}") + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct PipelineConfig { pub config: BatchConfig, } +impl Display for PipelineConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "config: {}", self.config) + } +} + #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct BatchConfig { /// maximum batch size in number of events @@ -62,6 +97,16 @@ pub struct BatchConfig { pub max_fill_secs: u64, } +impl Display for BatchConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "max_size: {}, max_fill_secs: {}", + self.max_size, self.max_fill_secs + ) + } +} + #[derive(Serialize)] pub struct CreateTenantRequest { pub name: String, @@ -137,6 +182,16 @@ pub struct SourceResponse { pub config: SourceConfig, } +impl Display for SourceResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "tenant_id: {}, id: {}, config: {}", + self.tenant_id, self.id, self.config + ) + } +} + #[derive(Serialize)] pub struct CreateSinkRequest { pub config: SinkConfig, @@ -165,6 +220,16 @@ pub struct SinkResponse { pub config: SinkConfig, } +impl Display for SinkResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "tenant_id: {}, id: {}, config: {}", + self.tenant_id, self.id, self.config + ) + } +} + #[derive(Serialize)] pub struct CreatePipelineRequest { pub source_id: i64, @@ -192,6 +257,16 @@ pub struct PipelineResponse { pub config: PipelineConfig, } +impl Display for PipelineResponse { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "tenant_id: {}, id: {}, source_id: {}, sink_id: {}, config: {}", + self.tenant_id, self.id, self.source_id, self.sink_id, self.config + ) + } +} + #[derive(Serialize)] pub struct UpdatePipelineRequest { pub source_id: i64, @@ -282,14 +357,20 @@ impl ApiClient { .await?) } - // pub async fn read_source(&self, tenant_id: i64, source_id: i64) -> reqwest::Response { - // self.client - // .get(&format!("{}/v1/sources/{source_id}", &self.address)) - // .header("tenant_id", tenant_id) - // .send() - // .await - // .expect("failed to execute request") - // } + pub async fn read_source( + &self, + tenant_id: i64, + source_id: i64, + ) -> Result { + Ok(self + .client + .get(&format!("{}/v1/sources/{source_id}", &self.address)) + .header("tenant_id", tenant_id) + .send() + .await? + .json() + .await?) + } // pub async fn update_source( // &self, @@ -340,14 +421,20 @@ impl ApiClient { .await?) } - // pub async fn read_sink(&self, tenant_id: i64, sink_id: i64) -> reqwest::Response { - // self.client - // .get(&format!("{}/v1/sinks/{sink_id}", &self.address)) - // .header("tenant_id", tenant_id) - // .send() - // .await - // .expect("failed to execute request") - // } + pub async fn read_sink( + &self, + tenant_id: i64, + sink_id: i64, + ) -> Result { + Ok(self + .client + .get(&format!("{}/v1/sinks/{sink_id}", &self.address)) + .header("tenant_id", tenant_id) + .send() + .await? + .json() + .await?) + } // pub async fn update_sink( // &self, @@ -398,14 +485,20 @@ impl ApiClient { .await?) } - // pub async fn read_pipeline(&self, tenant_id: i64, pipeline_id: i64) -> reqwest::Response { - // self.client - // .get(&format!("{}/v1/pipelines/{pipeline_id}", &self.address)) - // .header("tenant_id", tenant_id) - // .send() - // .await - // .expect("failed to execute request") - // } + pub async fn read_pipeline( + &self, + tenant_id: i64, + pipeline_id: i64, + ) -> Result { + Ok(self + .client + .get(&format!("{}/v1/pipelines/{pipeline_id}", &self.address)) + .header("tenant_id", tenant_id) + .send() + .await? + .json() + .await?) + } // pub async fn update_pipeline( // &self, diff --git a/cli/src/commands.rs b/cli/src/commands.rs index c849447..3371a9f 100644 --- a/cli/src/commands.rs +++ b/cli/src/commands.rs @@ -3,9 +3,9 @@ use thiserror::Error; use crate::{ api_client::ApiClient, - pipelines::create_pipeline, - sinks::create_sink, - sources::create_source, + pipelines::{create_pipeline, show_pipeline}, + sinks::{create_sink, show_sink}, + sources::{create_source, show_source}, tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant}, }; @@ -135,9 +135,30 @@ pub async fn execute_commands( println!("error reading tenant: {e:?}"); } }, - (Command::Show, SubCommand::Sources) => todo!(), - (Command::Show, SubCommand::Sinks) => todo!(), - (Command::Show, SubCommand::Pipelines) => todo!(), + (Command::Show, SubCommand::Sources) => match show_source(api_client, editor).await { + Ok(source) => { + println!("source: {source}") + } + Err(e) => { + println!("error reading source: {e:?}"); + } + }, + (Command::Show, SubCommand::Sinks) => match show_sink(api_client, editor).await { + Ok(sink) => { + println!("sink: {sink}") + } + Err(e) => { + println!("error reading sink: {e:?}"); + } + }, + (Command::Show, SubCommand::Pipelines) => match show_pipeline(api_client, editor).await { + Ok(pipeline) => { + println!("pipeline: {pipeline}") + } + Err(e) => { + println!("error reading pipeline: {e:?}"); + } + }, (Command::List, SubCommand::Tenants) => match list_tenants(api_client).await { Ok(tenants) => { println!("tenants: "); diff --git a/cli/src/pipelines.rs b/cli/src/pipelines.rs index 743b172..5c2ad92 100644 --- a/cli/src/pipelines.rs +++ b/cli/src/pipelines.rs @@ -3,8 +3,9 @@ use rustyline::DefaultEditor; use crate::{ api_client::{ ApiClient, BatchConfig, CreatePipelineRequest, CreatePipelineResponse, PipelineConfig, + PipelineResponse, }, - get_u64, get_usize, + get_id, get_u64, get_usize, sinks::get_sink_id, sources::get_source_id, tenants::get_tenant_id, @@ -33,9 +34,17 @@ pub async fn create_pipeline( Ok(pipeline) } -// pub fn get_pipeline_id(editor: &mut DefaultEditor) -> Result { -// get_id(editor, "enter pipeline id: ") -// } +pub async fn show_pipeline( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result { + let tenant_id = get_tenant_id(editor)?; + let pipeline_id = get_pipeline_id(editor)?; + + let pipeline = api_client.read_pipeline(tenant_id, pipeline_id).await?; + + Ok(pipeline) +} fn get_pipeline_config(editor: &mut DefaultEditor) -> Result { let max_size = get_usize(editor, "enter max batch size: ")?; @@ -46,3 +55,7 @@ fn get_pipeline_config(editor: &mut DefaultEditor) -> Result Result { + get_id(editor, "enter pipeline id: ") +} diff --git a/cli/src/sinks.rs b/cli/src/sinks.rs index 80871d3..95625ae 100644 --- a/cli/src/sinks.rs +++ b/cli/src/sinks.rs @@ -1,7 +1,7 @@ use rustyline::DefaultEditor; use crate::{ - api_client::{ApiClient, CreateSinkRequest, CreateSinkResponse, SinkConfig}, + api_client::{ApiClient, CreateSinkRequest, CreateSinkResponse, SinkConfig, SinkResponse}, get_id, get_string, tenants::get_tenant_id, CliError, @@ -20,6 +20,18 @@ pub async fn create_sink( Ok(sink) } +pub async fn show_sink( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result { + let tenant_id = get_tenant_id(editor)?; + let sink_id = get_sink_id(editor)?; + + let sink = api_client.read_sink(tenant_id, sink_id).await?; + + Ok(sink) +} + fn get_sink_config(editor: &mut DefaultEditor) -> Result { let project_id = get_string(editor, "enter project_id: ")?; let dataset_id = get_string(editor, "enter dataset_id: ")?; diff --git a/cli/src/sources.rs b/cli/src/sources.rs index 3242ff7..fa612a0 100644 --- a/cli/src/sources.rs +++ b/cli/src/sources.rs @@ -1,7 +1,9 @@ use rustyline::DefaultEditor; use crate::{ - api_client::{ApiClient, CreateSourceRequest, CreateSourceResponse, SourceConfig}, + api_client::{ + ApiClient, CreateSourceRequest, CreateSourceResponse, SourceConfig, SourceResponse, + }, get_id, get_string, tenants::get_tenant_id, CliError, @@ -20,6 +22,18 @@ pub async fn create_source( Ok(source) } +pub async fn show_source( + api_client: &ApiClient, + editor: &mut DefaultEditor, +) -> Result { + let tenant_id = get_tenant_id(editor)?; + let source_id = get_source_id(editor)?; + + let source = api_client.read_source(tenant_id, source_id).await?; + + Ok(source) +} + fn get_source_config(editor: &mut DefaultEditor) -> Result { let host = get_string(editor, "enter host: ")?; let port = get_string(editor, "enter port: ")?;