Skip to content

Commit

Permalink
read sources, sinks and pipelines in cli
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 26, 2024
1 parent d0658c8 commit cbd4e5a
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 36 deletions.
141 changes: 117 additions & 24 deletions cli/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ pub enum SourceConfig {
},
}

impl Display for SourceConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let SourceConfig::Postgres {
host,
port,
name,
username,
password,
slot_name,
publication,
} = self;
write!(
f,
"host: {host}, port: {port}, name: {name}, username: {username}, password: {password:?}, slot_name: {slot_name}, publication: {publication}",
)
}
}

#[derive(serde::Serialize, serde::Deserialize)]
pub enum SinkConfig {
BigQuery {
Expand All @@ -48,11 +66,28 @@ pub enum SinkConfig {
},
}

impl Display for SinkConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let SinkConfig::BigQuery {
project_id,
dataset_id,
service_account_key,
} = self;
write!(f, "project_id: {project_id}, dataset_id: {dataset_id}, service_account_key: {service_account_key}")
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct PipelineConfig {
pub config: BatchConfig,
}

impl Display for PipelineConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "config: {}", self.config)
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct BatchConfig {
/// maximum batch size in number of events
Expand All @@ -62,6 +97,16 @@ pub struct BatchConfig {
pub max_fill_secs: u64,
}

impl Display for BatchConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"max_size: {}, max_fill_secs: {}",
self.max_size, self.max_fill_secs
)
}
}

#[derive(Serialize)]
pub struct CreateTenantRequest {
pub name: String,
Expand Down Expand Up @@ -137,6 +182,16 @@ pub struct SourceResponse {
pub config: SourceConfig,
}

impl Display for SourceResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"tenant_id: {}, id: {}, config: {}",
self.tenant_id, self.id, self.config
)
}
}

#[derive(Serialize)]
pub struct CreateSinkRequest {
pub config: SinkConfig,
Expand Down Expand Up @@ -165,6 +220,16 @@ pub struct SinkResponse {
pub config: SinkConfig,
}

impl Display for SinkResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"tenant_id: {}, id: {}, config: {}",
self.tenant_id, self.id, self.config
)
}
}

#[derive(Serialize)]
pub struct CreatePipelineRequest {
pub source_id: i64,
Expand Down Expand Up @@ -192,6 +257,16 @@ pub struct PipelineResponse {
pub config: PipelineConfig,
}

impl Display for PipelineResponse {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"tenant_id: {}, id: {}, source_id: {}, sink_id: {}, config: {}",
self.tenant_id, self.id, self.source_id, self.sink_id, self.config
)
}
}

#[derive(Serialize)]
pub struct UpdatePipelineRequest {
pub source_id: i64,
Expand Down Expand Up @@ -282,14 +357,20 @@ impl ApiClient {
.await?)
}

// pub async fn read_source(&self, tenant_id: i64, source_id: i64) -> reqwest::Response {
// self.client
// .get(&format!("{}/v1/sources/{source_id}", &self.address))
// .header("tenant_id", tenant_id)
// .send()
// .await
// .expect("failed to execute request")
// }
pub async fn read_source(
&self,
tenant_id: i64,
source_id: i64,
) -> Result<SourceResponse, ApiClientError> {
Ok(self
.client
.get(&format!("{}/v1/sources/{source_id}", &self.address))
.header("tenant_id", tenant_id)
.send()
.await?
.json()
.await?)
}

// pub async fn update_source(
// &self,
Expand Down Expand Up @@ -340,14 +421,20 @@ impl ApiClient {
.await?)
}

// pub async fn read_sink(&self, tenant_id: i64, sink_id: i64) -> reqwest::Response {
// self.client
// .get(&format!("{}/v1/sinks/{sink_id}", &self.address))
// .header("tenant_id", tenant_id)
// .send()
// .await
// .expect("failed to execute request")
// }
pub async fn read_sink(
&self,
tenant_id: i64,
sink_id: i64,
) -> Result<SinkResponse, ApiClientError> {
Ok(self
.client
.get(&format!("{}/v1/sinks/{sink_id}", &self.address))
.header("tenant_id", tenant_id)
.send()
.await?
.json()
.await?)
}

// pub async fn update_sink(
// &self,
Expand Down Expand Up @@ -398,14 +485,20 @@ impl ApiClient {
.await?)
}

// pub async fn read_pipeline(&self, tenant_id: i64, pipeline_id: i64) -> reqwest::Response {
// self.client
// .get(&format!("{}/v1/pipelines/{pipeline_id}", &self.address))
// .header("tenant_id", tenant_id)
// .send()
// .await
// .expect("failed to execute request")
// }
pub async fn read_pipeline(
&self,
tenant_id: i64,
pipeline_id: i64,
) -> Result<PipelineResponse, ApiClientError> {
Ok(self
.client
.get(&format!("{}/v1/pipelines/{pipeline_id}", &self.address))
.header("tenant_id", tenant_id)
.send()
.await?
.json()
.await?)
}

// pub async fn update_pipeline(
// &self,
Expand Down
33 changes: 27 additions & 6 deletions cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use thiserror::Error;

use crate::{
api_client::ApiClient,
pipelines::create_pipeline,
sinks::create_sink,
sources::create_source,
pipelines::{create_pipeline, show_pipeline},
sinks::{create_sink, show_sink},
sources::{create_source, show_source},
tenants::{create_tenant, delete_tenant, list_tenants, show_tenant, update_tenant},
};

Expand Down Expand Up @@ -135,9 +135,30 @@ pub async fn execute_commands(
println!("error reading tenant: {e:?}");
}
},
(Command::Show, SubCommand::Sources) => todo!(),
(Command::Show, SubCommand::Sinks) => todo!(),
(Command::Show, SubCommand::Pipelines) => todo!(),
(Command::Show, SubCommand::Sources) => match show_source(api_client, editor).await {
Ok(source) => {
println!("source: {source}")
}
Err(e) => {
println!("error reading source: {e:?}");
}
},
(Command::Show, SubCommand::Sinks) => match show_sink(api_client, editor).await {
Ok(sink) => {
println!("sink: {sink}")
}
Err(e) => {
println!("error reading sink: {e:?}");
}
},
(Command::Show, SubCommand::Pipelines) => match show_pipeline(api_client, editor).await {
Ok(pipeline) => {
println!("pipeline: {pipeline}")
}
Err(e) => {
println!("error reading pipeline: {e:?}");
}
},
(Command::List, SubCommand::Tenants) => match list_tenants(api_client).await {
Ok(tenants) => {
println!("tenants: ");
Expand Down
21 changes: 17 additions & 4 deletions cli/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use rustyline::DefaultEditor;
use crate::{
api_client::{
ApiClient, BatchConfig, CreatePipelineRequest, CreatePipelineResponse, PipelineConfig,
PipelineResponse,
},
get_u64, get_usize,
get_id, get_u64, get_usize,
sinks::get_sink_id,
sources::get_source_id,
tenants::get_tenant_id,
Expand Down Expand Up @@ -33,9 +34,17 @@ pub async fn create_pipeline(
Ok(pipeline)
}

// pub fn get_pipeline_id(editor: &mut DefaultEditor) -> Result<i64, CliError> {
// get_id(editor, "enter pipeline id: ")
// }
pub async fn show_pipeline(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<PipelineResponse, CliError> {
let tenant_id = get_tenant_id(editor)?;
let pipeline_id = get_pipeline_id(editor)?;

let pipeline = api_client.read_pipeline(tenant_id, pipeline_id).await?;

Ok(pipeline)
}

fn get_pipeline_config(editor: &mut DefaultEditor) -> Result<PipelineConfig, CliError> {
let max_size = get_usize(editor, "enter max batch size: ")?;
Expand All @@ -46,3 +55,7 @@ fn get_pipeline_config(editor: &mut DefaultEditor) -> Result<PipelineConfig, Cli
};
Ok(PipelineConfig { config })
}

pub fn get_pipeline_id(editor: &mut DefaultEditor) -> Result<i64, CliError> {
get_id(editor, "enter pipeline id: ")
}
14 changes: 13 additions & 1 deletion cli/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rustyline::DefaultEditor;

use crate::{
api_client::{ApiClient, CreateSinkRequest, CreateSinkResponse, SinkConfig},
api_client::{ApiClient, CreateSinkRequest, CreateSinkResponse, SinkConfig, SinkResponse},
get_id, get_string,
tenants::get_tenant_id,
CliError,
Expand All @@ -20,6 +20,18 @@ pub async fn create_sink(
Ok(sink)
}

pub async fn show_sink(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<SinkResponse, CliError> {
let tenant_id = get_tenant_id(editor)?;
let sink_id = get_sink_id(editor)?;

let sink = api_client.read_sink(tenant_id, sink_id).await?;

Ok(sink)
}

fn get_sink_config(editor: &mut DefaultEditor) -> Result<SinkConfig, CliError> {
let project_id = get_string(editor, "enter project_id: ")?;
let dataset_id = get_string(editor, "enter dataset_id: ")?;
Expand Down
16 changes: 15 additions & 1 deletion cli/src/sources.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use rustyline::DefaultEditor;

use crate::{
api_client::{ApiClient, CreateSourceRequest, CreateSourceResponse, SourceConfig},
api_client::{
ApiClient, CreateSourceRequest, CreateSourceResponse, SourceConfig, SourceResponse,
},
get_id, get_string,
tenants::get_tenant_id,
CliError,
Expand All @@ -20,6 +22,18 @@ pub async fn create_source(
Ok(source)
}

pub async fn show_source(
api_client: &ApiClient,
editor: &mut DefaultEditor,
) -> Result<SourceResponse, CliError> {
let tenant_id = get_tenant_id(editor)?;
let source_id = get_source_id(editor)?;

let source = api_client.read_source(tenant_id, source_id).await?;

Ok(source)
}

fn get_source_config(editor: &mut DefaultEditor) -> Result<SourceConfig, CliError> {
let host = get_string(editor, "enter host: ")?;
let port = get_string(editor, "enter port: ")?;
Expand Down

0 comments on commit cbd4e5a

Please sign in to comment.