Skip to content

Commit

Permalink
grpc service metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Eguzki Astiz Lezaun <[email protected]>
  • Loading branch information
eguzki committed Oct 25, 2024
1 parent 9b6766f commit ef37667
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 13 deletions.
8 changes: 8 additions & 0 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ extern "C" fn start() {
Box::new(FilterRoot {
context_id,
config: Default::default(),
rate_limit_ok_metric_id: 0,
rate_limit_error_metric_id: 0,
rate_limit_over_limit_metric_id: 0,
rate_limit_failure_mode_allowed_metric_id: 0,
auth_ok_metric_id: 0,
auth_error_metric_id: 0,
auth_denied_metric_id: 0,
auth_failure_mode_allowed_metric_id: 0,
})
});
}
55 changes: 46 additions & 9 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::configuration::action_set::ActionSet;
use crate::configuration::{FailureMode, FilterConfig};
use crate::envoy::StatusCode;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::GrpcService;
use log::{debug, warn};
Expand Down Expand Up @@ -58,9 +59,21 @@ impl Filter {
}
Err(e) => {
warn!("gRPC call failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
match operation.get_failure_mode() {
FailureMode::Deny => {
operation
.get_service_handler()
.service_metrics
.report_error();
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
FailureMode::Allow => {
operation
.get_service_handler()
.service_metrics
.report_allowed_on_failure();
}
};
Action::Continue
}
}
Expand Down Expand Up @@ -113,13 +126,37 @@ impl Context for Filter {
let some_op = self.operation_dispatcher.borrow().get_operation(token_id);

if let Some(operation) = some_op {
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
self.operation_dispatcher.borrow_mut().next();
if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
} else {
self.resume_http_request()
match GrpcService::process_grpc_response(Rc::clone(&operation), resp_size) {
Ok(_) => {
operation.get_service_handler().service_metrics.report_ok();
self.operation_dispatcher.borrow_mut().next();
if self.operation_dispatcher.borrow_mut().next().is_none() {
self.resume_http_request()
}
}
}
Err(
StatusCode::TooManyRequests | StatusCode::Unauthorized | StatusCode::Forbidden,
) => {
operation
.get_service_handler()
.service_metrics
.report_rejected();
}
Err(_) => match operation.get_failure_mode() {
FailureMode::Deny => {
operation
.get_service_handler()
.service_metrics
.report_error();
}
FailureMode::Allow => {
operation
.get_service_handler()
.service_metrics
.report_allowed_on_failure();
}
},
};
} else {
warn!("No Operation found with token_id: {token_id}");
GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
Expand Down
80 changes: 77 additions & 3 deletions src/filter/root_context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::configuration::{FilterConfig, PluginConfiguration};
use crate::configuration::{FilterConfig, PluginConfiguration, ServiceType};
use crate::filter::http_context::Filter;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use crate::service::{GrpcServiceHandler, HeaderResolver, ServiceMetrics};
use const_format::formatcp;
use log::{debug, error, info};
use proxy_wasm::hostcalls;
use proxy_wasm::traits::{Context, HttpContext, RootContext};
use proxy_wasm::types::ContextType;
use proxy_wasm::types::{ContextType, MetricType};
use std::collections::HashMap;
use std::rc::Rc;

Expand All @@ -18,6 +19,14 @@ const WASM_SHIM_HEADER: &str = "Kuadrant wasm module";
pub struct FilterRoot {
pub context_id: u32,
pub config: Rc<FilterConfig>,
pub rate_limit_ok_metric_id: u32,
pub rate_limit_error_metric_id: u32,
pub rate_limit_over_limit_metric_id: u32,
pub rate_limit_failure_mode_allowed_metric_id: u32,
pub auth_ok_metric_id: u32,
pub auth_error_metric_id: u32,
pub auth_denied_metric_id: u32,
pub auth_failure_mode_allowed_metric_id: u32,
}

impl RootContext for FilterRoot {
Expand All @@ -30,6 +39,51 @@ impl RootContext for FilterRoot {
"#{} {} {}: VM started",
self.context_id, WASM_SHIM_HEADER, full_version
);

self.rate_limit_ok_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.ok") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_error_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.error") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_over_limit_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.over_limit") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_failure_mode_allowed_metric_id = match hostcalls::define_metric(
MetricType::Counter,
"kuadrant.rate_limit.failure_mode_allowed",
) {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_ok_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.ok") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_error_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.error") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_denied_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.denied") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_failure_mode_allowed_metric_id = match hostcalls::define_metric(
MetricType::Counter,
"kuadrant.auth.failure_mode_allowed",
) {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
true
}

Expand All @@ -46,6 +100,7 @@ impl RootContext for FilterRoot {
.or_insert(Rc::from(GrpcServiceHandler::new(
Rc::clone(grpc_service),
Rc::clone(&header_resolver),
Rc::new(self.service_metrics(grpc_service.get_service_type())),
)));
});
Some(Box::new(Filter {
Expand Down Expand Up @@ -89,3 +144,22 @@ impl RootContext for FilterRoot {
}

impl Context for FilterRoot {}

impl FilterRoot {
fn service_metrics(&self, service_type: &ServiceType) -> ServiceMetrics {
match service_type {
ServiceType::Auth => ServiceMetrics::new(
self.auth_ok_metric_id,
self.auth_error_metric_id,
self.auth_denied_metric_id,
self.auth_failure_mode_allowed_metric_id,
),
ServiceType::RateLimit => ServiceMetrics::new(
self.rate_limit_ok_metric_id,
self.rate_limit_error_metric_id,
self.rate_limit_over_limit_metric_id,
self.rate_limit_failure_mode_allowed_metric_id,
),
}
}
}
4 changes: 4 additions & 0 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl Operation {
pub fn get_failure_mode(&self) -> &FailureMode {
&self.service.failure_mode
}

pub fn get_service_handler(&self) -> &GrpcServiceHandler {
&self.service_handler
}
}

pub struct OperationDispatcher {
Expand Down
57 changes: 56 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl GrpcService {
FailureMode::Allow => hostcalls::resume_http_request().unwrap(),
}
}

pub fn get_service_type(&self) -> &ServiceType {
&self.service.service_type
}
}

pub type GrpcCallFn = fn(
Expand All @@ -109,13 +113,19 @@ pub type GrpcMessageBuildFn =
pub struct GrpcServiceHandler {
grpc_service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
pub service_metrics: Rc<ServiceMetrics>,
}

impl GrpcServiceHandler {
pub fn new(grpc_service: Rc<GrpcService>, header_resolver: Rc<HeaderResolver>) -> Self {
pub fn new(
grpc_service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
service_metrics: Rc<ServiceMetrics>,
) -> Self {
Self {
grpc_service,
header_resolver,
service_metrics,
}
}

Expand Down Expand Up @@ -201,3 +211,48 @@ impl TracingHeader {
}
}
}

pub struct ServiceMetrics {
ok_metric_id: u32,
error_metric_id: u32,
rejected_metric_id: u32,
failure_mode_allowed_metric_id: u32,
}

impl ServiceMetrics {
pub fn new(
ok_metric_id: u32,
error_metric_id: u32,
rejected_metric_id: u32,
failure_mode_allowed_metric_id: u32,
) -> Self {
Self {
ok_metric_id,
error_metric_id,
rejected_metric_id,
failure_mode_allowed_metric_id,
}
}

fn report(metric_id: u32, offset: i64) {
if let Err(e) = hostcalls::increment_metric(metric_id, offset) {
warn!("report metric {metric_id}, error: {e:?}");
}
}

pub fn report_error(&self) {
Self::report(self.error_metric_id, 1);
}

pub fn report_allowed_on_failure(&self) {
Self::report(self.failure_mode_allowed_metric_id, 1);
}

pub fn report_ok(&self) {
Self::report(self.ok_metric_id, 1);
}

pub fn report_rejected(&self) {
Self::report(self.rejected_metric_id, 1);
}
}

0 comments on commit ef37667

Please sign in to comment.