Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote write for metric-explorer-prometheus #542

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion metrics-exporter-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ categories = ["development-tools::debugging"]
keywords = ["metrics", "telemetry", "prometheus"]

[features]
default = ["http-listener", "push-gateway"]
default = ["http-listener", "push-gateway","remote-write"]
async-runtime = ["tokio", "hyper-util/tokio"]
http-listener = ["async-runtime", "ipnet", "tracing", "_hyper-server"]
uds-listener = ["http-listener"]
push-gateway = ["async-runtime", "tracing", "_hyper-client"]
remote-write = ["_hyper-client","async-runtime","dep:prost","dep:snap","dep:prometheus-parse"]
_hyper-server = ["http-body-util", "hyper/server", "hyper-util/server-auto"]
_hyper-client = [
"http-body-util",
Expand All @@ -48,7 +49,10 @@ metrics-util = { version = "^0.18", path = "../metrics-util", default-features =
"registry",
"summary",
] }
prometheus-parse = {version = "0.2.5", optional = true}
prost = {workspace = true, optional = true}
quanta = { workspace = true }
snap = { version = "1.1.1", optional = true}
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
Expand All @@ -63,6 +67,10 @@ tracing-subscriber = { workspace = true, features = ["fmt"] }
name = "prometheus_push_gateway"
required-features = ["push-gateway"]

[[example]]
name = "prometheus_remote_write"
required-features = ["remote-write"]

[[example]]
name = "prometheus_server"
required-features = ["http-listener"]
Expand Down
73 changes: 73 additions & 0 deletions metrics-exporter-prometheus/examples/prometheus_remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/// Make sure to run this example with `--features remote-write` to properly enable remote write support.
#[allow(unused_imports)]
use std::thread;
use std::time::Duration;

#[allow(unused_imports)]
use metrics::{counter, gauge, histogram};
use metrics::{describe_counter, describe_histogram};
#[allow(unused_imports)]
use metrics_exporter_prometheus::PrometheusBuilder;
#[allow(unused_imports)]
use metrics_util::MetricKindMask;

use quanta::Clock;
use rand::{thread_rng, Rng};

fn main() {
tracing_subscriber::fmt::init();

PrometheusBuilder::new()
.with_remote_write(
"http://127.0.0.1:9091/metrics/job/example",
Duration::from_secs(10),
"test-agent",
)
.expect("remote write endpoint should be valid")
.idle_timeout(
MetricKindMask::COUNTER | MetricKindMask::HISTOGRAM,
Some(Duration::from_secs(10)),
)
.install()
.expect("failed to install Prometheus recorder");

// We register these metrics, which gives us a chance to specify a description for them. The
// Prometheus exporter records this description and adds it as HELP text when the endpoint is
// scraped.
//
// Registering metrics ahead of using them is not required, but is the only way to specify the
// description of a metric.
describe_counter!("tcp_server_loops", "The iterations of the TCP server event loop so far.");
describe_histogram!(
"tcp_server_loop_delta_secs",
"The time taken for iterations of the TCP server event loop."
);

let clock = Clock::new();
let mut last = None;

counter!("idle_metric").increment(1);
gauge!("testing").set(42.0);

// Loop over and over, pretending to do some work.
loop {
counter!("tcp_server_loops", "system" => "foo").increment(1);

if let Some(t) = last {
let delta: Duration = clock.now() - t;
histogram!("tcp_server_loop_delta_secs", "system" => "foo").record(delta);
}

let increment_gauge = thread_rng().gen_bool(0.75);
let gauge = gauge!("lucky_iterations");
if increment_gauge {
gauge.increment(1.0);
} else {
gauge.decrement(1.0);
}

last = Some(clock.now());

thread::sleep(Duration::from_millis(750));
}
}
4 changes: 3 additions & 1 deletion metrics-exporter-prometheus/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub enum BuildError {
/// The given push gateway endpoint is not a valid URI.
#[error("push gateway endpoint is not valid: {0}")]
InvalidPushGatewayEndpoint(String),

/// The given push gateway endpoint is not a valid URI.
#[error("remote write endpoint is not valid: {0}")]
InvalidRemoteWriteEndpoint(String),
/// No exporter configuration was present.
///
/// This generally only occurs when HTTP listener support is disabled, but no push gateway
Expand Down
35 changes: 35 additions & 0 deletions metrics-exporter-prometheus/src/exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,37 @@ impl PrometheusBuilder {
Ok(self)
}

/// Configures the exporter to push periodic requests to endpoint by [remote write protocol](https://prometheus.io/docs/specs/remote_write_spec/).
///
/// Running in remote write mode is mutually exclusive with the HTTP listener/push gateway i.e. enabling the remote write will
/// disable the HTTP listener/push gateway, and vise versa.
///
/// Defaults to disabled.
///
/// ## Errors
///
/// If the given endpoint cannot be parsed into a valid URI, an error variant will be returned describing the error.
///
#[cfg(feature = "remote-write")]
#[cfg_attr(docsrs, doc(cfg(feature = "remote-write")))]
pub fn with_remote_write<T>(
mut self,
endpoint: T,
interval: Duration,
user_agent: &str,
) -> Result<Self, BuildError>
where
T: AsRef<str>,
{
self.exporter_config = ExporterConfig::RemoteWrite {
endpoint: Uri::try_from(endpoint.as_ref())
.map_err(|e| BuildError::InvalidRemoteWriteEndpoint(e.to_string()))?,
interval,
user_agent: user_agent.to_string(),
};
Ok(self)
}

/// Configures the exporter to expose an HTTP listener that functions as a [scrape endpoint], listening on a Unix
/// Domain socket at the given path
///
Expand Down Expand Up @@ -486,6 +517,10 @@ impl PrometheusBuilder {
endpoint, interval, username, password, handle,
)
}
#[cfg(feature = "remote-write")]
ExporterConfig::RemoteWrite { endpoint, interval, user_agent } => {
super::remote_write::new_remote_write(endpoint, interval, handle, &user_agent)
}
},
))
}
Expand Down
10 changes: 10 additions & 0 deletions metrics-exporter-prometheus/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ enum ExporterConfig {
password: Option<String>,
},

// Run a remote write task sending to the given `endpoint` after `interval` time has elapsed,
// infinitely.
#[cfg(feature = "remote-write")]
RemoteWrite { endpoint: Uri, interval: Duration, user_agent: String },

#[allow(dead_code)]
Unconfigured,
}
Expand All @@ -60,6 +65,8 @@ impl ExporterConfig {
Self::HttpListener { .. } => "http-listener",
#[cfg(feature = "push-gateway")]
Self::PushGateway { .. } => "push-gateway",
#[cfg(feature = "remote-write")]
Self::RemoteWrite { .. } => "remote-write",
Self::Unconfigured => "unconfigured,",
}
}
Expand All @@ -71,4 +78,7 @@ mod http_listener;
#[cfg(feature = "push-gateway")]
mod push_gateway;

#[cfg(feature = "remote-write")]
mod remote_write;

pub(crate) mod builder;
68 changes: 68 additions & 0 deletions metrics-exporter-prometheus/src/exporter/remote_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::time::Duration;

use http_body_util::{BodyExt, Collected, Full};
use hyper::{body::Bytes, Uri};
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use tracing::error;

use crate::PrometheusHandle;

use super::ExporterFuture;

// Creates an ExporterFuture implementing a remote write.
pub(super) fn new_remote_write(
endpoint: Uri,
interval: Duration,
handle: PrometheusHandle,
user_agent: &str,
) -> ExporterFuture {
let user_agent = user_agent.to_string();
Box::pin(async move {
let https = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.expect("no native root CA certificates found")
.https_or_http()
.enable_http1()
.build();
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(30))
.build(https);

loop {
// Sleep for `interval` amount of time, and then do a push.
tokio::time::sleep(interval).await;

let binary = handle.render_remote_write_format();

let req = match binary.build_http_request(&endpoint, &user_agent) {
Ok(req) => req,
Err(err) => {
error!("failed to build http remote write request {}", err);
continue;
}
};
match client.request(req).await {
Ok(response) => {
if !response.status().is_success() {
let status = response.status();
let status = status.canonical_reason().unwrap_or_else(|| status.as_str());
let body = response
.into_body()
.collect()
.await
.map(Collected::to_bytes)
.map_err(|_| ())
.and_then(|b| String::from_utf8(b[..].to_vec()).map_err(|_| ()))
.unwrap_or_else(|()| String::from("<failed to read response body>"));
error!(
message = "unexpected status after pushing metrics to remote write",
status,
%body,
);
}
}
Err(e) => error!("error sending request to remote write {}: {:?}", endpoint, e),
}
}
})
}
7 changes: 4 additions & 3 deletions metrics-exporter-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ pub use distribution::{Distribution, DistributionBuilder};

mod exporter;
pub use self::exporter::builder::PrometheusBuilder;
#[cfg(any(feature = "http-listener", feature = "push-gateway"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "http-listener", feature = "push-gateway"))))]
#[cfg(any(feature = "http-listener", feature = "push-gateway", feature = "remote-write"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "http-listener", feature = "push-gateway", feature = "remote-write"))))]
pub use self::exporter::ExporterFuture;

#[cfg(feature = "remote-write")]
mod remote_write_proto;
pub mod formatting;
mod recorder;

Expand Down
16 changes: 14 additions & 2 deletions metrics-exporter-prometheus/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) struct Inner {
}

impl Inner {
fn get_recent_metrics(&self) -> Snapshot {
pub(crate) fn get_recent_metrics(&self) -> Snapshot {
let mut counters = HashMap::new();
let counter_handles = self.registry.get_counter_handles();
for (key, counter) in counter_handles {
Expand Down Expand Up @@ -108,7 +108,12 @@ impl Inner {
histogram.get_inner().clear_with(|samples| entry.record_samples(samples));
}
}

/// Render metric to [Remote-Write format](https://prometheus.io/docs/specs/remote_write_spec/)
#[cfg(feature = "remote-write")]
fn render_remote_write_format(&self) -> crate::remote_write_proto::WriteRequest {
use crate::remote_write_proto::WriteRequest;
WriteRequest::from_raw(self)
}
fn render(&self) -> String {
let Snapshot { mut counters, mut distributions, mut gauges } = self.get_recent_metrics();

Expand Down Expand Up @@ -289,6 +294,13 @@ impl PrometheusHandle {
self.inner.render()
}

/// Takes a snapshot of the metrics held by the recorder and generates a payload conforming to
/// the Prometheus remote write format.
#[cfg(feature = "remote-write")]
pub fn render_remote_write_format(&self) -> crate::remote_write_proto::WriteRequest {
self.inner.render_remote_write_format()
}

/// Performs upkeeping operations to ensure metrics held by recorder are up-to-date and do not
/// grow unboundedly.
pub fn run_upkeep(&self) {
Expand Down
Loading