diff --git a/src/filter.rs b/src/filter.rs index ab359182..6ea9e148 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -33,6 +33,14 @@ extern "C" fn start() { Box::new(FilterRoot { context_id, config: Default::default(), + rate_limit_ok_metric_id: 0, + rate_limit_error_metric_id: 0, + rate_limit_over_limit_metric_id: 0, + rate_limit_failure_mode_allowed_metric_id: 0, + auth_ok_metric_id: 0, + auth_error_metric_id: 0, + auth_denied_metric_id: 0, + auth_failure_mode_allowed_metric_id: 0, }) }); } diff --git a/src/filter/http_context.rs b/src/filter/http_context.rs index a4ab0aec..7d5e80d0 100644 --- a/src/filter/http_context.rs +++ b/src/filter/http_context.rs @@ -1,5 +1,6 @@ use crate::configuration::action_set::ActionSet; use crate::configuration::{FailureMode, FilterConfig}; +use crate::envoy::StatusCode; use crate::operation_dispatcher::OperationDispatcher; use crate::service::GrpcService; use log::{debug, warn}; @@ -58,9 +59,21 @@ impl Filter { } Err(e) => { warn!("gRPC call failed! {e:?}"); - if let FailureMode::Deny = operation.get_failure_mode() { - self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) - } + match operation.get_failure_mode() { + FailureMode::Deny => { + operation + .get_service_handler() + .service_metrics + .report_error(); + self.send_http_response(500, vec![], Some(b"Internal Server Error.\n")) + } + FailureMode::Allow => { + operation + .get_service_handler() + .service_metrics + .report_allowed_on_failure(); + } + }; Action::Continue } } @@ -113,13 +126,37 @@ impl Context for Filter { let some_op = self.operation_dispatcher.borrow().get_operation(token_id); if let Some(operation) = some_op { - if GrpcService::process_grpc_response(operation, resp_size).is_ok() { - self.operation_dispatcher.borrow_mut().next(); - if let Some(_op) = self.operation_dispatcher.borrow_mut().next() { - } else { - self.resume_http_request() + match GrpcService::process_grpc_response(Rc::clone(&operation), resp_size) { + Ok(_) => { + operation.get_service_handler().service_metrics.report_ok(); + self.operation_dispatcher.borrow_mut().next(); + if self.operation_dispatcher.borrow_mut().next().is_none() { + self.resume_http_request() + } } - } + Err( + StatusCode::TooManyRequests | StatusCode::Unauthorized | StatusCode::Forbidden, + ) => { + operation + .get_service_handler() + .service_metrics + .report_rejected(); + } + Err(_) => match operation.get_failure_mode() { + FailureMode::Deny => { + operation + .get_service_handler() + .service_metrics + .report_error(); + } + FailureMode::Allow => { + operation + .get_service_handler() + .service_metrics + .report_allowed_on_failure(); + } + }, + }; } else { warn!("No Operation found with token_id: {token_id}"); GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode diff --git a/src/filter/root_context.rs b/src/filter/root_context.rs index 01b29ff5..73ec5822 100644 --- a/src/filter/root_context.rs +++ b/src/filter/root_context.rs @@ -1,11 +1,12 @@ -use crate::configuration::{FilterConfig, PluginConfiguration}; +use crate::configuration::{FilterConfig, PluginConfiguration, ServiceType}; use crate::filter::http_context::Filter; use crate::operation_dispatcher::OperationDispatcher; -use crate::service::{GrpcServiceHandler, HeaderResolver}; +use crate::service::{GrpcServiceHandler, HeaderResolver, ServiceMetrics}; use const_format::formatcp; use log::{debug, error, info}; +use proxy_wasm::hostcalls; use proxy_wasm::traits::{Context, HttpContext, RootContext}; -use proxy_wasm::types::ContextType; +use proxy_wasm::types::{ContextType, MetricType}; use std::collections::HashMap; use std::rc::Rc; @@ -18,6 +19,14 @@ const WASM_SHIM_HEADER: &str = "Kuadrant wasm module"; pub struct FilterRoot { pub context_id: u32, pub config: Rc, + pub rate_limit_ok_metric_id: u32, + pub rate_limit_error_metric_id: u32, + pub rate_limit_over_limit_metric_id: u32, + pub rate_limit_failure_mode_allowed_metric_id: u32, + pub auth_ok_metric_id: u32, + pub auth_error_metric_id: u32, + pub auth_denied_metric_id: u32, + pub auth_failure_mode_allowed_metric_id: u32, } impl RootContext for FilterRoot { @@ -30,6 +39,51 @@ impl RootContext for FilterRoot { "#{} {} {}: VM started", self.context_id, WASM_SHIM_HEADER, full_version ); + + self.rate_limit_ok_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.ok") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.rate_limit_error_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.error") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.rate_limit_over_limit_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.over_limit") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.rate_limit_failure_mode_allowed_metric_id = match hostcalls::define_metric( + MetricType::Counter, + "kuadrant.rate_limit.failure_mode_allowed", + ) { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.auth_ok_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.ok") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.auth_error_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.error") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.auth_denied_metric_id = + match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.denied") { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; + self.auth_failure_mode_allowed_metric_id = match hostcalls::define_metric( + MetricType::Counter, + "kuadrant.auth.failure_mode_allowed", + ) { + Ok(metric_id) => metric_id, + Err(e) => panic!("Error: {:?}", e), + }; true } @@ -46,6 +100,7 @@ impl RootContext for FilterRoot { .or_insert(Rc::from(GrpcServiceHandler::new( Rc::clone(grpc_service), Rc::clone(&header_resolver), + Rc::new(self.service_metrics(grpc_service.get_service_type())), ))); }); Some(Box::new(Filter { @@ -89,3 +144,22 @@ impl RootContext for FilterRoot { } impl Context for FilterRoot {} + +impl FilterRoot { + fn service_metrics(&self, service_type: &ServiceType) -> ServiceMetrics { + match service_type { + ServiceType::Auth => ServiceMetrics::new( + self.auth_ok_metric_id, + self.auth_error_metric_id, + self.auth_denied_metric_id, + self.auth_failure_mode_allowed_metric_id, + ), + ServiceType::RateLimit => ServiceMetrics::new( + self.rate_limit_ok_metric_id, + self.rate_limit_error_metric_id, + self.rate_limit_over_limit_metric_id, + self.rate_limit_failure_mode_allowed_metric_id, + ), + } + } +} diff --git a/src/operation_dispatcher.rs b/src/operation_dispatcher.rs index 07de507b..9bf8ce62 100644 --- a/src/operation_dispatcher.rs +++ b/src/operation_dispatcher.rs @@ -107,6 +107,10 @@ impl Operation { pub fn get_failure_mode(&self) -> &FailureMode { &self.service.failure_mode } + + pub fn get_service_handler(&self) -> &GrpcServiceHandler { + &self.service_handler + } } pub struct OperationDispatcher { diff --git a/src/service.rs b/src/service.rs index 5ca9635e..e66a7e44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -90,6 +90,10 @@ impl GrpcService { FailureMode::Allow => hostcalls::resume_http_request().unwrap(), } } + + pub fn get_service_type(&self) -> &ServiceType { + &self.service.service_type + } } pub type GrpcCallFn = fn( @@ -109,13 +113,19 @@ pub type GrpcMessageBuildFn = pub struct GrpcServiceHandler { grpc_service: Rc, header_resolver: Rc, + pub service_metrics: Rc, } impl GrpcServiceHandler { - pub fn new(grpc_service: Rc, header_resolver: Rc) -> Self { + pub fn new( + grpc_service: Rc, + header_resolver: Rc, + service_metrics: Rc, + ) -> Self { Self { grpc_service, header_resolver, + service_metrics, } } @@ -201,3 +211,48 @@ impl TracingHeader { } } } + +pub struct ServiceMetrics { + ok_metric_id: u32, + error_metric_id: u32, + rejected_metric_id: u32, + failure_mode_allowed_metric_id: u32, +} + +impl ServiceMetrics { + pub fn new( + ok_metric_id: u32, + error_metric_id: u32, + rejected_metric_id: u32, + failure_mode_allowed_metric_id: u32, + ) -> Self { + Self { + ok_metric_id, + error_metric_id, + rejected_metric_id, + failure_mode_allowed_metric_id, + } + } + + fn report(metric_id: u32, offset: i64) { + if let Err(e) = hostcalls::increment_metric(metric_id, offset) { + warn!("report metric {metric_id}, error: {e:?}"); + } + } + + pub fn report_error(&self) { + Self::report(self.error_metric_id, 1); + } + + pub fn report_allowed_on_failure(&self) { + Self::report(self.failure_mode_allowed_metric_id, 1); + } + + pub fn report_ok(&self) { + Self::report(self.ok_metric_id, 1); + } + + pub fn report_rejected(&self) { + Self::report(self.rejected_metric_id, 1); + } +}