forked from kube-rs/kube
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_client_trace.rs
62 lines (56 loc) · 2.65 KB
/
custom_client_trace.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Custom client example with TraceLayer.
use http::{Request, Response};
use hyper::Body;
use k8s_openapi::api::core::v1::Pod;
use std::time::Duration;
use tower::ServiceBuilder;
use tower_http::{decompression::DecompressionLayer, trace::TraceLayer};
use tracing::{Span, *};
use kube::{client::ConfigExt, Api, Client, Config, ResourceExt};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let config = Config::infer().await?;
let https = config.openssl_https_connector()?;
let service = ServiceBuilder::new()
.layer(config.base_uri_layer())
// showcase rate limiting; max 10rps, and 4 concurrent
.layer(tower::limit::RateLimitLayer::new(10, Duration::from_secs(1)))
.layer(tower::limit::ConcurrencyLimitLayer::new(4))
// Add `DecompressionLayer` to make request headers interesting.
.layer(DecompressionLayer::new())
.layer(
// Attribute names follow [Semantic Conventions].
// [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-client
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
otel.name = %format!("HTTP {}", request.method()),
otel.kind = "client",
otel.status_code = tracing::field::Empty,
)
})
.on_request(|request: &Request<Body>, _span: &Span| {
tracing::debug!("payload: {:?} headers: {:?}", request.body(), request.headers())
})
.on_response(|response: &Response<Body>, latency: Duration, span: &Span| {
let status = response.status();
span.record("http.status_code", status.as_u16());
if status.is_client_error() || status.is_server_error() {
span.record("otel.status_code", "ERROR");
}
tracing::debug!("finished in {}ms", latency.as_millis())
}),
)
.service(hyper::Client::builder().build(https));
let client = Client::new(service, config.default_namespace);
let pods: Api<Pod> = Api::default_namespaced(client);
for p in pods.list(&Default::default()).await? {
info!("{}", p.name_any());
}
Ok(())
}