From 001813836bebe5dacf32407d33ec433669f3fd6d Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Tue, 16 Jan 2024 16:27:10 +0100 Subject: [PATCH] feat: Add ClientTagsRouter --- docs/design.md | 1 + docs/routing/ClientTagsRouter.md | 37 +++++ docs/routing/index.md | 1 + trino-lb-core/src/config.rs | 24 +++- trino-lb/src/routing/client_tags.rs | 189 ++++++++++++++++++++++++++ trino-lb/src/routing/explain_costs.rs | 25 +++- trino-lb/src/routing/mod.rs | 21 ++- trino-lb/src/routing/python_script.rs | 2 +- 8 files changed, 294 insertions(+), 6 deletions(-) create mode 100644 docs/routing/ClientTagsRouter.md create mode 100644 trino-lb/src/routing/client_tags.rs diff --git a/docs/design.md b/docs/design.md index 2e1d9aa..b3e9080 100644 --- a/docs/design.md +++ b/docs/design.md @@ -52,6 +52,7 @@ This is the most flexible way of defining routing rules. 3. ExplainCostsRouter: This router executes an `explain {query}` [EXPLAIN](https://trino.io/docs/current/sql/explain.html?highlight=explain) query for every incoming query. Trino will respond with an resource estimation the query will consume. Please note that this functional heavily depends on [Table statistics](https://trino.io/docs/current/optimizer/statistics.html) being present for the access tables to get meaningful estimations. +4. [ClientTagsRouter](./routing/ClientTagsRouter.md): Route queries based on client tags send in the `X-Trino-Client-Tags` header. ## 3. Choosing cluster from cluster group diff --git a/docs/routing/ClientTagsRouter.md b/docs/routing/ClientTagsRouter.md new file mode 100644 index 0000000..ddd7551 --- /dev/null +++ b/docs/routing/ClientTagsRouter.md @@ -0,0 +1,37 @@ +# ClientTagsRouter + +This router routes queries based on client tags send in the `X-Trino-Client-Tags` header. +It supports routing a query based on the presence of one tag from a given list OR on the presence of all tags in the list + +## One of a list of tags + +Let's imagine you want all queries with the tag `etl`, `etl-airflow` **or** `etc-special` to end up the the cluster group `etl`. + +You can achieve this with the following config: + +```yaml +routers: + - clientTags: + oneOf: ["etl", "etl-airflow", "etl-special"] + trinoClusterGroup: etl +``` + +## All of a list of tags + +A different scenario is that you want to route all queries that have all the required tags, let's say they need the tag `etl` and `system=foo`, as this system executes very very large queries. + +You can achieve this with the following config: + +```yaml +routers: + - clientTags: + allOf: ["etl", "system=foo"] + trinoClusterGroup: etl-foo + - clientTags: + oneOf: ["etl", "etl-airflow", "etl-special"] + trinoClusterGroup: etl +``` + +## More flexible routing + +If the `oneOf` and `allOf` do not fulfill your routing needs please have a look at the [PythonScriptRouter](./PythonScriptRouter.md), which allows you to execute arbitrary Python script with the most flexibility. diff --git a/docs/routing/index.md b/docs/routing/index.md index afd0da7..523f441 100644 --- a/docs/routing/index.md +++ b/docs/routing/index.md @@ -10,3 +10,4 @@ Currently the following routers are implemented: 1. TrinoRoutingGroupHeaderRouter 2. [PythonScriptRouter](./PythonScriptRouter.md) 3. ExplainCostsRouter +4. [ClientTagsRouter](./ClientTagsRouter.md) diff --git a/trino-lb-core/src/config.rs b/trino-lb-core/src/config.rs index 2551313..527e0c8 100644 --- a/trino-lb-core/src/config.rs +++ b/trino-lb-core/src/config.rs @@ -1,4 +1,10 @@ -use std::{collections::HashMap, fmt::Debug, fs::File, path::PathBuf, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + fs::File, + path::PathBuf, + time::Duration, +}; use serde::Deserialize; use snafu::{ResultExt, Snafu}; @@ -162,6 +168,7 @@ pub enum RoutingConfig { ExplainCosts(ExplainCostsRouterConfig), TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouterConfig), PythonScript(PythonScriptRouterConfig), + ClientTags(ClientTagsRouterConfig), } #[derive(Clone, Debug, Deserialize)] @@ -218,6 +225,21 @@ pub struct PythonScriptRouterConfig { pub script: String, } +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ClientTagsRouterConfig { + #[serde(flatten)] + pub tag_matching_strategy: TagMatchingStrategy, + pub trino_cluster_group: String, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum TagMatchingStrategy { + AllOf(HashSet), + OneOf(HashSet), +} + #[derive(Clone, Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub enum ScalerConfig { diff --git a/trino-lb/src/routing/client_tags.rs b/trino-lb/src/routing/client_tags.rs new file mode 100644 index 0000000..a38b3bc --- /dev/null +++ b/trino-lb/src/routing/client_tags.rs @@ -0,0 +1,189 @@ +use std::collections::HashSet; + +use http::HeaderValue; +use snafu::Snafu; +use tracing::{instrument, warn}; +use trino_lb_core::{ + config::{ClientTagsRouterConfig, TagMatchingStrategy}, + sanitization::Sanitize, +}; + +use crate::routing::RouterImplementationTrait; + +const TRINO_CLIENT_TAGS_HEADER: &str = "x-trino-client-tags"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display( + "Configuration error: The configured target cluster group {cluster_group} does not exist" + ))] + TargetClusterGroupNotFound { cluster_group: String }, +} + +pub struct ClientTagsRouter { + config: ClientTagsRouterConfig, +} + +impl ClientTagsRouter { + #[instrument(name = "ClientTagsRouter::new")] + pub fn new( + config: &ClientTagsRouterConfig, + valid_target_groups: HashSet, + ) -> Result { + if !valid_target_groups.contains(&config.trino_cluster_group) { + TargetClusterGroupNotFoundSnafu { + cluster_group: &config.trino_cluster_group, + } + .fail()?; + } + + Ok(Self { + config: config.clone(), + }) + } +} + +impl RouterImplementationTrait for ClientTagsRouter { + #[instrument( + name = "ClientTagHeadersRouter::route" + skip(self), + fields(headers = ?headers.sanitize()), + )] + async fn route(&self, query: &str, headers: &http::HeaderMap) -> Option { + if let Some(Ok(client_tags)) = headers + .get(TRINO_CLIENT_TAGS_HEADER) + .map(HeaderValue::to_str) + { + let client_tags = client_tags + .split(',') + .map(String::from) + .collect::>(); + match &self.config.tag_matching_strategy { + TagMatchingStrategy::OneOf(one_of) => { + if !one_of.is_disjoint(&client_tags) { + return Some(self.config.trino_cluster_group.clone()); + } + } + TagMatchingStrategy::AllOf(all_of) => { + if all_of.is_subset(&client_tags) { + return Some(self.config.trino_cluster_group.clone()); + } + } + } + } + + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use http::{HeaderMap, HeaderName}; + use rstest::rstest; + + #[rstest] + #[case(None, None)] + #[case(Some("foo"), Some("my-target"))] + #[case(Some("bar"), Some("my-target"))] + #[case(Some("bak"), Some("my-target"))] + #[case(Some("system=airflow"), Some("my-target"))] + #[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))] + #[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))] + #[case(Some("foo,bar,bak,something-else"), Some("my-target"))] + #[tokio::test] + async fn test_routing_with_one_of( + #[case] x_trino_client_tags: Option<&str>, + #[case] expected: Option<&str>, + ) { + let config = serde_yaml::from_str( + r#" + trinoClusterGroup: my-target + oneOf: ["foo", "bar", "bak", "system=airflow"] + "#, + ) + .unwrap(); + let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()])) + .expect("Failed to create ClientTagsRouter"); + let mut headers = HeaderMap::new(); + + if let Some(x_trino_client_tags) = x_trino_client_tags { + headers.insert( + HeaderName::from_static("x-trino-client-tags"), + x_trino_client_tags + .parse() + .expect("Failed to create x-trino-client-tags header"), + ); + } + + assert_eq!(router.route("", &headers).await.as_deref(), expected); + } + + #[rstest] + #[case(None, None)] + #[case(Some("foo"), None)] + #[case(Some("bar"), None)] + #[case(Some("bak"), None)] + #[case(Some("foo,bar,bak"), None)] + #[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))] + #[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))] + #[case(Some("foo,bar,bak,system=airflow,something-else"), Some("my-target"))] + #[tokio::test] + async fn test_routing_with_all_of( + #[case] x_trino_client_tags: Option<&str>, + #[case] expected: Option<&str>, + ) { + let config = serde_yaml::from_str( + r#" + trinoClusterGroup: my-target + allOf: ["foo", "bar", "bak", "system=airflow"] + "#, + ) + .unwrap(); + let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()])) + .expect("Failed to create ClientTagsRouter"); + let mut headers = HeaderMap::new(); + + if let Some(x_trino_client_tags) = x_trino_client_tags { + headers.insert( + HeaderName::from_static("x-trino-client-tags"), + x_trino_client_tags + .parse() + .expect("Failed to create x-trino-client-tags header"), + ); + } + + assert_eq!(router.route("", &headers).await.as_deref(), expected); + } + + /// Seems like whatever comes first takes precedence + #[test] + fn test_configuring_one_of_and_all_of() { + let config: ClientTagsRouterConfig = serde_yaml::from_str( + r#" + trinoClusterGroup: my-target + allOf: ["allOf"] + oneOf: ["oneOf"] + "#, + ) + .unwrap(); + assert!(matches!( + config.tag_matching_strategy, + TagMatchingStrategy::AllOf(_) + )); + + let config: ClientTagsRouterConfig = serde_yaml::from_str( + r#" + trinoClusterGroup: my-target + oneOf: ["oneOf"] + allOf: ["allOf"] + "#, + ) + .unwrap(); + assert!(matches!( + config.tag_matching_strategy, + TagMatchingStrategy::OneOf(_) + )); + } +} diff --git a/trino-lb/src/routing/explain_costs.rs b/trino-lb/src/routing/explain_costs.rs index 368633e..8959ee0 100644 --- a/trino-lb/src/routing/explain_costs.rs +++ b/trino-lb/src/routing/explain_costs.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use snafu::{ResultExt, Snafu}; use tracing::{instrument, warn}; use trino_lb_core::sanitization::Sanitize; @@ -10,6 +12,11 @@ use crate::{ #[derive(Snafu, Debug)] pub enum Error { + #[snafu(display( + "Configuration error: The configured target cluster group {cluster_group} does not exist" + ))] + TargetClusterGroupNotFound { cluster_group: String }, + #[snafu(display("Failed to create Trino client"))] ExtractTrinoHost { source: trino_client::Error }, } @@ -21,7 +28,23 @@ pub struct ExplainCostsRouter { impl ExplainCostsRouter { #[instrument(name = "ExplainCostsRouter::new")] - pub fn new(config: &ExplainCostsRouterConfig) -> Result { + pub fn new( + config: &ExplainCostsRouterConfig, + valid_target_groups: HashSet, + ) -> Result { + for ExplainCostTargetConfig { + trino_cluster_group, + .. + } in &config.targets + { + if !valid_target_groups.contains(trino_cluster_group) { + TargetClusterGroupNotFoundSnafu { + cluster_group: trino_cluster_group, + } + .fail()?; + } + } + let trino_client = TrinoClient::new(&config.trino_cluster_to_run_explain_query) .context(ExtractTrinoHostSnafu)?; diff --git a/trino-lb/src/routing/mod.rs b/trino-lb/src/routing/mod.rs index 769fff6..785b3b8 100644 --- a/trino-lb/src/routing/mod.rs +++ b/trino-lb/src/routing/mod.rs @@ -5,10 +5,12 @@ use trino_lb_core::sanitization::Sanitize; use crate::config::{Config, RoutingConfig}; +mod client_tags; mod explain_costs; mod python_script; mod trino_routing_group_header; +pub use client_tags::ClientTagsRouter; pub use explain_costs::ExplainCostsRouter; pub use python_script::PythonScriptRouter; pub use trino_routing_group_header::TrinoRoutingGroupHeaderRouter; @@ -21,6 +23,9 @@ pub enum Error { #[snafu(display("Failed to create python script router"))] CreatePythonScriptRouter { source: python_script::Error }, + #[snafu(display("Failed to create client tags router"))] + CreateClientTagsRouter { source: client_tags::Error }, + #[snafu(display("Configuration error: The router {router:?} is configured to route to trinoClusterGroup {trino_cluster_group:?} which does not exist"))] ConfigErrorClusterGroupDoesNotExist { router: String, @@ -48,9 +53,12 @@ impl Router { let targets = router_config.targets.iter().map(|t| &t.trino_cluster_group); check_every_target_group_exists(targets, cluster_groups, "ExplainCostsRouter")?; - ExplainCostsRouter::new(router_config) - .context(CreateExplainCostsRouterSnafu)? - .into() + ExplainCostsRouter::new( + router_config, + config.trino_cluster_groups.keys().cloned().collect(), + ) + .context(CreateExplainCostsRouterSnafu)? + .into() } RoutingConfig::TrinoRoutingGroupHeader(router_config) => { TrinoRoutingGroupHeaderRouter::new( @@ -65,6 +73,12 @@ impl Router { ) .context(CreatePythonScriptRouterSnafu)? .into(), + RoutingConfig::ClientTags(router_config) => ClientTagsRouter::new( + router_config, + config.trino_cluster_groups.keys().cloned().collect(), + ) + .context(CreateClientTagsRouterSnafu)? + .into(), }; routers.push(router); } @@ -114,6 +128,7 @@ pub enum RoutingImplementation { ExplainCosts(ExplainCostsRouter), TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouter), PythonScript(PythonScriptRouter), + ClientTagHeaders(ClientTagsRouter), } #[instrument(skip(targets))] diff --git a/trino-lb/src/routing/python_script.rs b/trino-lb/src/routing/python_script.rs index 3922a9d..32de1ce 100644 --- a/trino-lb/src/routing/python_script.rs +++ b/trino-lb/src/routing/python_script.rs @@ -132,7 +132,7 @@ mod tests { }, valid_target_groups, ) - .expect("Failed to create PythonScriptRouter router") + .expect("Failed to create PythonScriptRouter") } fn get_headers(x_trino_source: Option<&str>, x_trino_client_tags: Option<&str>) -> HeaderMap {