Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRPC service metrics #118

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ extern "C" fn start() {
Box::new(FilterRoot {
context_id,
config: Default::default(),
rate_limit_ok_metric_id: 0,
rate_limit_error_metric_id: 0,
rate_limit_over_limit_metric_id: 0,
rate_limit_failure_mode_allowed_metric_id: 0,
auth_ok_metric_id: 0,
auth_error_metric_id: 0,
auth_denied_metric_id: 0,
auth_failure_mode_allowed_metric_id: 0,
})
});
}
55 changes: 46 additions & 9 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::configuration::action_set::ActionSet;
use crate::configuration::{FailureMode, FilterConfig};
use crate::envoy::StatusCode;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::GrpcService;
use log::{debug, warn};
Expand Down Expand Up @@ -58,9 +59,21 @@ impl Filter {
}
Err(e) => {
warn!("gRPC call failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
match operation.get_failure_mode() {
FailureMode::Deny => {
operation
.get_service_handler()
.service_metrics
.report_error();
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
FailureMode::Allow => {
operation
.get_service_handler()
.service_metrics
.report_allowed_on_failure();
}
};
Action::Continue
}
}
Expand Down Expand Up @@ -113,13 +126,37 @@ impl Context for Filter {
let some_op = self.operation_dispatcher.borrow().get_operation(token_id);

if let Some(operation) = some_op {
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
self.operation_dispatcher.borrow_mut().next();
if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
} else {
self.resume_http_request()
match GrpcService::process_grpc_response(Rc::clone(&operation), resp_size) {
Ok(_) => {
operation.get_service_handler().service_metrics.report_ok();
self.operation_dispatcher.borrow_mut().next();
if self.operation_dispatcher.borrow_mut().next().is_none() {
self.resume_http_request()
}
}
}
Err(
StatusCode::TooManyRequests | StatusCode::Unauthorized | StatusCode::Forbidden,
) => {
operation
.get_service_handler()
.service_metrics
.report_rejected();
}
Err(_) => match operation.get_failure_mode() {
FailureMode::Deny => {
operation
.get_service_handler()
.service_metrics
.report_error();
}
FailureMode::Allow => {
operation
.get_service_handler()
.service_metrics
.report_allowed_on_failure();
}
},
};
} else {
warn!("No Operation found with token_id: {token_id}");
GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
Expand Down
80 changes: 77 additions & 3 deletions src/filter/root_context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::configuration::{FilterConfig, PluginConfiguration};
use crate::configuration::{FilterConfig, PluginConfiguration, ServiceType};
use crate::filter::http_context::Filter;
use crate::operation_dispatcher::OperationDispatcher;
use crate::service::{GrpcServiceHandler, HeaderResolver};
use crate::service::{GrpcServiceHandler, HeaderResolver, ServiceMetrics};
use const_format::formatcp;
use log::{debug, error, info};
use proxy_wasm::hostcalls;
use proxy_wasm::traits::{Context, HttpContext, RootContext};
use proxy_wasm::types::ContextType;
use proxy_wasm::types::{ContextType, MetricType};
use std::collections::HashMap;
use std::rc::Rc;

Expand All @@ -18,6 +19,14 @@ const WASM_SHIM_HEADER: &str = "Kuadrant wasm module";
pub struct FilterRoot {
pub context_id: u32,
pub config: Rc<FilterConfig>,
pub rate_limit_ok_metric_id: u32,
pub rate_limit_error_metric_id: u32,
pub rate_limit_over_limit_metric_id: u32,
pub rate_limit_failure_mode_allowed_metric_id: u32,
pub auth_ok_metric_id: u32,
pub auth_error_metric_id: u32,
pub auth_denied_metric_id: u32,
pub auth_failure_mode_allowed_metric_id: u32,
}

impl RootContext for FilterRoot {
Expand All @@ -30,6 +39,51 @@ impl RootContext for FilterRoot {
"#{} {} {}: VM started",
self.context_id, WASM_SHIM_HEADER, full_version
);

self.rate_limit_ok_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.ok") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_error_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.error") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_over_limit_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.rate_limit.over_limit") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.rate_limit_failure_mode_allowed_metric_id = match hostcalls::define_metric(
MetricType::Counter,
"kuadrant.rate_limit.failure_mode_allowed",
) {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_ok_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.ok") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_error_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.error") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_denied_metric_id =
match hostcalls::define_metric(MetricType::Counter, "kuadrant.auth.denied") {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
self.auth_failure_mode_allowed_metric_id = match hostcalls::define_metric(
MetricType::Counter,
"kuadrant.auth.failure_mode_allowed",
) {
Ok(metric_id) => metric_id,
Err(e) => panic!("Error: {:?}", e),
};
true
}

Expand All @@ -46,6 +100,7 @@ impl RootContext for FilterRoot {
.or_insert(Rc::from(GrpcServiceHandler::new(
Rc::clone(grpc_service),
Rc::clone(&header_resolver),
Rc::new(self.service_metrics(grpc_service.get_service_type())),
)));
});
Some(Box::new(Filter {
Expand Down Expand Up @@ -89,3 +144,22 @@ impl RootContext for FilterRoot {
}

impl Context for FilterRoot {}

impl FilterRoot {
fn service_metrics(&self, service_type: &ServiceType) -> ServiceMetrics {
match service_type {
ServiceType::Auth => ServiceMetrics::new(
self.auth_ok_metric_id,
self.auth_error_metric_id,
self.auth_denied_metric_id,
self.auth_failure_mode_allowed_metric_id,
),
ServiceType::RateLimit => ServiceMetrics::new(
self.rate_limit_ok_metric_id,
self.rate_limit_error_metric_id,
self.rate_limit_over_limit_metric_id,
self.rate_limit_failure_mode_allowed_metric_id,
),
}
}
}
4 changes: 4 additions & 0 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@
pub fn get_failure_mode(&self) -> &FailureMode {
&self.service.failure_mode
}

pub fn get_service_handler(&self) -> &GrpcServiceHandler {
&self.service_handler
}
}

pub struct OperationDispatcher {
Expand Down Expand Up @@ -281,7 +285,7 @@
}

fn build_grpc_service_handler() -> GrpcServiceHandler {
GrpcServiceHandler::new(Rc::new(Default::default()), Rc::new(Default::default()))

Check failure on line 288 in src/operation_dispatcher.rs

View workflow job for this annotation

GitHub Actions / Test Suite

this function takes 3 arguments but 2 arguments were supplied

Check failure on line 288 in src/operation_dispatcher.rs

View workflow job for this annotation

GitHub Actions / Clippy

this function takes 3 arguments but 2 arguments were supplied
}

fn conditions_apply_fn_stub(_action: &Action) -> bool {
Expand Down
57 changes: 56 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl GrpcService {
FailureMode::Allow => hostcalls::resume_http_request().unwrap(),
}
}

pub fn get_service_type(&self) -> &ServiceType {
&self.service.service_type
}
}

pub type GrpcCallFn = fn(
Expand All @@ -109,13 +113,19 @@ pub type GrpcMessageBuildFn =
pub struct GrpcServiceHandler {
grpc_service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
pub service_metrics: Rc<ServiceMetrics>,
}

impl GrpcServiceHandler {
pub fn new(grpc_service: Rc<GrpcService>, header_resolver: Rc<HeaderResolver>) -> Self {
pub fn new(
grpc_service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
service_metrics: Rc<ServiceMetrics>,
) -> Self {
Self {
grpc_service,
header_resolver,
service_metrics,
}
}

Expand Down Expand Up @@ -201,3 +211,48 @@ impl TracingHeader {
}
}
}

pub struct ServiceMetrics {
ok_metric_id: u32,
error_metric_id: u32,
rejected_metric_id: u32,
failure_mode_allowed_metric_id: u32,
}

impl ServiceMetrics {
pub fn new(
ok_metric_id: u32,
error_metric_id: u32,
rejected_metric_id: u32,
failure_mode_allowed_metric_id: u32,
) -> Self {
Self {
ok_metric_id,
error_metric_id,
rejected_metric_id,
failure_mode_allowed_metric_id,
}
}

fn report(metric_id: u32, offset: i64) {
if let Err(e) = hostcalls::increment_metric(metric_id, offset) {
warn!("report metric {metric_id}, error: {e:?}");
}
}

pub fn report_error(&self) {
Self::report(self.error_metric_id, 1);
}

pub fn report_allowed_on_failure(&self) {
Self::report(self.failure_mode_allowed_metric_id, 1);
}

pub fn report_ok(&self) {
Self::report(self.ok_metric_id, 1);
}

pub fn report_rejected(&self) {
Self::report(self.rejected_metric_id, 1);
}
}
Loading