diff --git a/graph-proxy/Cargo.lock b/graph-proxy/Cargo.lock index 0c362212..3c3cacb8 100644 --- a/graph-proxy/Cargo.lock +++ b/graph-proxy/Cargo.lock @@ -174,6 +174,7 @@ dependencies = [ "static_assertions_next", "tempfile", "thiserror 1.0.68", + "url", ] [[package]] diff --git a/graph-proxy/Cargo.toml b/graph-proxy/Cargo.toml index ca021676..862c9a63 100644 --- a/graph-proxy/Cargo.toml +++ b/graph-proxy/Cargo.toml @@ -16,7 +16,7 @@ exclude = ["test-resources/"] [dependencies] anyhow = { version = "1.0.95" } argo-workflows-openapi = { path = "./argo-workflows-openapi" } -async-graphql = { version = "7.0.13", features = ["chrono"] } +async-graphql = { version = "7.0.13", features = ["chrono", "url"] } async-graphql-axum = { version = "7.0.13" } axum = { version = "0.7.9" } axum-extra = { version = "0.9.6", features = ["typed-header"] } diff --git a/graph-proxy/src/graphql/workflows.rs b/graph-proxy/src/graphql/workflows.rs index cf8cfd94..8d2d4b5b 100644 --- a/graph-proxy/src/graphql/workflows.rs +++ b/graph-proxy/src/graphql/workflows.rs @@ -1,8 +1,8 @@ use super::{Visit, VisitInput, CLIENT}; use crate::ArgoServerUrl; use argo_workflows_openapi::{ - APIResult, IoArgoprojWorkflowV1alpha1NodeStatus, IoArgoprojWorkflowV1alpha1Workflow, - IoArgoprojWorkflowV1alpha1WorkflowStatus, + APIResult, IoArgoprojWorkflowV1alpha1Artifact, IoArgoprojWorkflowV1alpha1NodeStatus, + IoArgoprojWorkflowV1alpha1Workflow, IoArgoprojWorkflowV1alpha1WorkflowStatus, }; use async_graphql::{ connection::{Connection, CursorType, Edge, EmptyFields, OpaqueCursor}, @@ -10,7 +10,7 @@ use async_graphql::{ }; use axum_extra::headers::{authorization::Bearer, Authorization}; use chrono::{DateTime, Utc}; -use std::{collections::HashMap, ops::Deref}; +use std::{collections::HashMap, ops::Deref, path::Path}; use tracing::{debug, instrument}; use url::Url; @@ -30,6 +30,12 @@ pub(super) enum WorkflowParsingError { UnrecognisedTaskDisplayName, #[error("status was expected but was not present")] MissingWorkflowStatus, + #[error("artifact.s3 was expected but was not present")] + UnrecognisedArtifactStore, + #[error("artifact.s3.key was expected but was not present")] + MissingArtifactKey, + #[error("artifact file name is not valid UTF-8")] + InvalidArtifactFilename, } /// A Workflow consisting of one or more [`Task`]s @@ -174,7 +180,7 @@ impl WorkflowRunningStatus<'_> { .data_unchecked::>>() .to_owned(); let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?; - Ok(TaskMap(nodes).into_tasks()) + Ok(TaskMap(nodes).into_tasks(self.metadata)) } } @@ -242,7 +248,7 @@ impl WorkflowCompleteStatus<'_> { .data_unchecked::>>() .to_owned(); let nodes = fetch_missing_task_info(url, token, self.manifest, self.metadata).await?; - Ok(TaskMap(nodes).into_tasks()) + Ok(TaskMap(nodes).into_tasks(self.metadata)) } } @@ -275,16 +281,60 @@ impl TryFrom for TaskStatus { } } +/// An output produced by a [`Task`] within a [`Workflow`] +#[allow(clippy::missing_docs_in_private_items)] +#[derive(Debug)] +struct Artifact<'a> { + manifest: &'a IoArgoprojWorkflowV1alpha1Artifact, + metadata: &'a Metadata, + node_id: &'a str, +} + +#[Object] +impl Artifact<'_> { + /// The file name of the artifact + async fn name(&self) -> Result<&str, WorkflowParsingError> { + let path = Path::new( + self.manifest + .s3 + .as_ref() + .ok_or(WorkflowParsingError::UnrecognisedArtifactStore)? + .key + .as_ref() + .ok_or(WorkflowParsingError::MissingArtifactKey)?, + ); + path.file_name() + .unwrap_or_default() + .to_str() + .ok_or(WorkflowParsingError::InvalidArtifactFilename) + } + + /// The download URL for the artifact + async fn url(&self, ctx: &Context<'_>) -> Url { + let server_url = ctx.data_unchecked::().deref(); + let mut url = server_url.clone(); + url.path_segments_mut().unwrap().extend([ + "artifacts", + &self.metadata.visit.to_string(), + &self.metadata.name, + self.node_id, + &self.manifest.name, + ]); + url + } +} + /// A Task created by a workflow #[allow(clippy::missing_docs_in_private_items)] #[derive(Debug, Clone)] -struct Task { +struct Task<'a> { node_status: IoArgoprojWorkflowV1alpha1NodeStatus, depends: Vec, + metadata: &'a Metadata, } #[Object] -impl Task { +impl Task<'_> { /// Unique name of the task async fn id(&self) -> &String { &self.node_status.id @@ -318,6 +368,22 @@ impl Task { async fn dependencies(&self) -> Vec { self.node_status.children.clone() } + + /// Artifacts produced by a task + async fn artifacts(&self) -> Vec { + match self.node_status.outputs.as_ref() { + None => Vec::new(), + Some(outputs) => outputs + .artifacts + .iter() + .map(|manifest| Artifact { + manifest, + metadata: self.metadata, + node_id: &self.node_status.id, + }) + .collect::>(), + } + } } async fn fetch_missing_task_info( @@ -373,7 +439,7 @@ impl TaskMap { } /// Converts [`TaskMap`] into [`Vec`]` - fn into_tasks(self) -> Vec { + fn into_tasks(self, metadata: &Metadata) -> Vec { let mut relationship_map = TaskMap::generate_relationship_map(&self); self.0 .into_iter() @@ -382,6 +448,7 @@ impl TaskMap { Task { node_status, depends, + metadata, } }) .collect::>() @@ -1060,4 +1127,77 @@ mod tests { schema.execute(query).await.into_result().unwrap(); workflows_endpoint.assert_async().await; } + + #[tokio::test] + async fn get_artifacts_of_succeeded_workflow_query() { + let workflow_name = "numpy-benchmark-wdkwj"; + let visit = Visit { + proposal_code: "mg".to_string(), + proposal_number: 36964, + number: 1, + }; + + let mut server = mockito::Server::new_async().await; + let mut response_file_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + response_file_path.push("test-assets"); + response_file_path.push("get-workflow-wdkwj.json"); + let workflow_endpoint = server + .mock( + "GET", + &format!("/api/v1/workflows/{}/{}", visit, workflow_name)[..], + ) + .with_status(200) + .with_header("content-type", "application/json") + .with_body_from_file(response_file_path) + .create_async() + .await; + + let argo_server_url = Url::parse(&server.url()).unwrap(); + let schema = root_schema_builder() + .data(ArgoServerUrl(argo_server_url)) + .data(None::>) + .finish(); + let query = format!( + r#" + query {{ + workflow(name: "{}", visit: {{proposalCode: "{}", proposalNumber: {}, number: {}}}) {{ + status {{ + ...on WorkflowSucceededStatus {{ + tasks {{ + artifacts {{ + name + url + }} + }} + }} + }} + }} + }} + "#, + workflow_name, visit.proposal_code, visit.proposal_number, visit.number + ); + let resp = schema.execute(query).await.into_result().unwrap(); + + workflow_endpoint.assert_async().await; + let expected_download_url = format!( + "{}/artifacts/{}/{}/{}/main-logs", + server.url(), + visit, + workflow_name, + workflow_name + ); + let expected_data = json!({ + "workflow": { + "status": { + "tasks": [{ + "artifacts": [{ + "name": "main.log", + "url": expected_download_url + }] + }] + } + } + }); + assert_eq!(resp.data.into_json().unwrap(), expected_data); + } }