Skip to content

Commit

Permalink
add api endpoint to get pipeline status
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Oct 4, 2024
1 parent cf88514 commit 338a23d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 4 deletions.
53 changes: 53 additions & 0 deletions api/src/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ pub enum K8sError {
Kube(#[from] kube::Error),
}

pub enum PodPhase {
Pending,
Running,
Succeeded,
Failed,
Unknown,
}

impl From<&str> for PodPhase {
fn from(value: &str) -> Self {
match value {
"Pending" => PodPhase::Pending,
"Running" => PodPhase::Running,
"Succeeded" => PodPhase::Succeeded,
"Failed" => PodPhase::Failed,
_ => PodPhase::Unknown,
}
}
}

#[async_trait]
pub trait K8sClient {
async fn create_or_update_postgres_secret(
Expand Down Expand Up @@ -57,6 +77,8 @@ pub trait K8sClient {

async fn delete_stateful_set(&self, prefix: &str) -> Result<(), K8sError>;

async fn get_pod_phase(&self, prefix: &str) -> Result<PodPhase, K8sError>;

async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError>;
}

Expand Down Expand Up @@ -361,6 +383,37 @@ impl K8sClient for HttpK8sClient {
Ok(())
}

async fn get_pod_phase(&self, prefix: &str) -> Result<PodPhase, K8sError> {
info!("getting pod status");
let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0");
let pod = match self.pods_api.get(&pod_name).await {
Ok(pod) => pod,
Err(e) => match e {
kube::Error::Api(ref er) => {
if er.code == 404 {
return Ok(PodPhase::Succeeded);
}
return Err(e.into());
}
e => return Err(e.into()),
},
};
let phase = pod
.status
.map(|status| {
let phase: PodPhase = status
.phase
.map(|phase| {
let phase: PodPhase = phase.as_str().into();
phase
})
.unwrap_or(PodPhase::Unknown);
phase
})
.unwrap_or(PodPhase::Unknown);
Ok(phase)
}

async fn delete_pod(&self, prefix: &str) -> Result<(), K8sError> {
info!("deleting pod");
let pod_name = format!("{prefix}-{STATEFUL_SET_NAME_SUFFIX}-0");
Expand Down
47 changes: 46 additions & 1 deletion api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
sources::{source_exists, Source, SourceConfig, SourcesDbError},
},
encryption::EncryptionKey,
k8s_client::{HttpK8sClient, K8sClient, K8sError},
k8s_client::{HttpK8sClient, K8sClient, K8sError, PodPhase},
replicator_config,
worker::Secrets,
};
Expand Down Expand Up @@ -402,6 +402,51 @@ pub async fn stop_pipeline(
Ok(HttpResponse::Ok().finish())
}

#[derive(Serialize, ToSchema)]
pub enum PipelineStatus {
Stopped,
Starting,
Started,
Stopping,
Unknown,
}

#[utoipa::path(
context_path = "/v1",
responses(
(status = 200, description = "Get pipeline status"),
(status = 500, description = "Internal server error")
)
)]
#[get("/pipelines/{pipeline_id}/status")]
pub async fn get_pipeline_status(
req: HttpRequest,
pool: Data<PgPool>,
k8s_client: Data<Arc<HttpK8sClient>>,
pipeline_id: Path<i64>,
) -> Result<impl Responder, PipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let pipeline_id = pipeline_id.into_inner();

let replicator = db::replicators::read_replicator_by_pipeline_id(&pool, tenant_id, pipeline_id)
.await?
.ok_or(PipelineError::ReplicatorNotFound(pipeline_id))?;

let prefix = create_prefix(tenant_id, replicator.id);

let pod_phase = k8s_client.get_pod_phase(&prefix).await?;

let status = match pod_phase {
PodPhase::Pending => PipelineStatus::Starting,
PodPhase::Running => PipelineStatus::Started,
PodPhase::Succeeded => PipelineStatus::Stopped,
PodPhase::Failed => PipelineStatus::Stopped,
PodPhase::Unknown => PipelineStatus::Unknown,
};

Ok(Json(status))
}

async fn read_data(
pool: &PgPool,
tenant_id: i64,
Expand Down
8 changes: 5 additions & 3 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use crate::{
GetImageResponse, PostImageRequest, PostImageResponse,
},
pipelines::{
create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, start_pipeline,
stop_pipeline, update_pipeline, GetPipelineResponse, PostPipelineRequest,
PostPipelineResponse,
create_pipeline, delete_pipeline, get_pipeline_status, read_all_pipelines,
read_pipeline, start_pipeline, stop_pipeline, update_pipeline, GetPipelineResponse,
PostPipelineRequest, PostPipelineResponse,
},
sinks::{
create_sink, delete_sink, read_all_sinks, read_sink, update_sink, GetSinkResponse,
Expand Down Expand Up @@ -132,6 +132,7 @@ pub async fn run(
crate::routes::pipelines::update_pipeline,
crate::routes::pipelines::delete_pipeline,
crate::routes::pipelines::read_all_pipelines,
crate::routes::pipelines::get_pipeline_status,
crate::routes::tenants::create_tenant,
crate::routes::tenants::read_tenant,
crate::routes::tenants::update_tenant,
Expand Down Expand Up @@ -218,6 +219,7 @@ pub async fn run(
.service(read_all_pipelines)
.service(start_pipeline)
.service(stop_pipeline)
.service(get_pipeline_status)
//tables
.service(read_table_names)
//publications
Expand Down

0 comments on commit 338a23d

Please sign in to comment.