-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathversion.rs
82 lines (73 loc) · 2.81 KB
/
version.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use axum::extract::{Path, State};
use axum::{http::StatusCode, response::IntoResponse, routing, Json, Router};
use futures::{future, StreamExt};
use k8s_openapi::api::apps::v1::Deployment;
use kube::runtime::{reflector, watcher, WatchStreamExt};
use kube::{Api, Client, ResourceExt};
use tracing::{debug, warn};
#[derive(serde::Serialize, Clone)]
struct Entry {
container: String,
name: String,
namespace: String,
version: String,
}
type Cache = reflector::Store<Deployment>;
fn deployment_to_entry(d: &Deployment) -> Option<Entry> {
let name = d.name_any();
let namespace = d.namespace()?;
let tpl = d.spec.as_ref()?.template.spec.as_ref()?;
let img = tpl.containers.get(0)?.image.as_ref()?;
let splits = img.splitn(2, ':').collect::<Vec<_>>();
let (container, version) = match *splits.as_slice() {
[c, v] => (c.to_owned(), v.to_owned()),
[c] => (c.to_owned(), "latest".to_owned()),
_ => return None,
};
Some(Entry { name, namespace, container, version })
}
// - GET /versions/:namespace/:name
#[derive(serde::Deserialize, Debug)]
struct EntryPath {
name: String,
namespace: String,
}
async fn get_version(State(store): State<Cache>, Path(p): Path<EntryPath>) -> impl IntoResponse {
let key = reflector::ObjectRef::new(&p.name).within(&p.namespace);
if let Some(Some(e)) = store.get(&key).map(|d| deployment_to_entry(&d)) {
return Ok(Json(e));
}
Err((StatusCode::NOT_FOUND, "not found"))
}
// - GET /versions
async fn get_versions(State(store): State<Cache>) -> Json<Vec<Entry>> {
let data = store.state().iter().filter_map(|d| deployment_to_entry(d)).collect();
Json(data)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let api: Api<Deployment> = Api::all(client);
let (reader, writer) = reflector::store();
let watch = reflector(writer, watcher(api, Default::default()))
.default_backoff()
.touched_objects()
.for_each(|r| {
future::ready(match r {
Ok(o) => debug!("Saw {} in {}", o.name_any(), o.namespace().unwrap()),
Err(e) => warn!("watcher error: {e}"),
})
});
tokio::spawn(watch); // poll forever
let app = Router::new()
.route("/versions", routing::get(get_versions))
.route("/versions/:namespace/:name", routing::get(get_version))
.with_state(reader) // routes can read from the reflector store
.layer(tower_http::trace::TraceLayer::new_for_http())
// NB: routes added after TraceLayer are not traced
.route("/health", routing::get(|| async { "up" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await?;
axum::serve(listener, app.into_make_service()).await?;
Ok(())
}