Skip to content

Commit

Permalink
feat: Add ClientTagsRouter (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernauer authored Jan 16, 2024
1 parent f4c9cae commit 468daf9
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ This is the most flexible way of defining routing rules.
3. ExplainCostsRouter: This router executes an `explain {query}` [EXPLAIN](https://trino.io/docs/current/sql/explain.html?highlight=explain) query for every incoming query.
Trino will respond with an resource estimation the query will consume.
Please note that this functional heavily depends on [Table statistics](https://trino.io/docs/current/optimizer/statistics.html) being present for the access tables to get meaningful estimations.
4. [ClientTagsRouter](./routing/ClientTagsRouter.md): Route queries based on client tags send in the `X-Trino-Client-Tags` header.

## 3. Choosing cluster from cluster group

Expand Down
37 changes: 37 additions & 0 deletions docs/routing/ClientTagsRouter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ClientTagsRouter

This router routes queries based on client tags send in the `X-Trino-Client-Tags` header.
It supports routing a query based on the presence of one tag from a given list OR on the presence of all tags in the list

## One of a list of tags

Let's imagine you want all queries with the tag `etl`, `etl-airflow` **or** `etc-special` to end up the the cluster group `etl`.

You can achieve this with the following config:

```yaml
routers:
- clientTags:
oneOf: ["etl", "etl-airflow", "etl-special"]
trinoClusterGroup: etl
```
## All of a list of tags
A different scenario is that you want to route all queries that have all the required tags, let's say they need the tag `etl` and `system=foo`, as this system executes very very large queries.

You can achieve this with the following config:

```yaml
routers:
- clientTags:
allOf: ["etl", "system=foo"]
trinoClusterGroup: etl-foo
- clientTags:
oneOf: ["etl", "etl-airflow", "etl-special"]
trinoClusterGroup: etl
```

## More flexible routing

If the `oneOf` and `allOf` do not fulfill your routing needs please have a look at the [PythonScriptRouter](./PythonScriptRouter.md), which allows you to execute arbitrary Python script with the most flexibility.
1 change: 1 addition & 0 deletions docs/routing/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Currently the following routers are implemented:
1. TrinoRoutingGroupHeaderRouter
2. [PythonScriptRouter](./PythonScriptRouter.md)
3. ExplainCostsRouter
4. [ClientTagsRouter](./ClientTagsRouter.md)
24 changes: 23 additions & 1 deletion trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{collections::HashMap, fmt::Debug, fs::File, path::PathBuf, time::Duration};
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
fs::File,
path::PathBuf,
time::Duration,
};

use serde::Deserialize;
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -162,6 +168,7 @@ pub enum RoutingConfig {
ExplainCosts(ExplainCostsRouterConfig),
TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouterConfig),
PythonScript(PythonScriptRouterConfig),
ClientTags(ClientTagsRouterConfig),
}

#[derive(Clone, Debug, Deserialize)]
Expand Down Expand Up @@ -218,6 +225,21 @@ pub struct PythonScriptRouterConfig {
pub script: String,
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ClientTagsRouterConfig {
#[serde(flatten)]
pub tag_matching_strategy: TagMatchingStrategy,
pub trino_cluster_group: String,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum TagMatchingStrategy {
AllOf(HashSet<String>),
OneOf(HashSet<String>),
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ScalerConfig {
Expand Down
189 changes: 189 additions & 0 deletions trino-lb/src/routing/client_tags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::collections::HashSet;

use http::HeaderValue;
use snafu::Snafu;
use tracing::{instrument, warn};
use trino_lb_core::{
config::{ClientTagsRouterConfig, TagMatchingStrategy},
sanitization::Sanitize,
};

use crate::routing::RouterImplementationTrait;

const TRINO_CLIENT_TAGS_HEADER: &str = "x-trino-client-tags";

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(
"Configuration error: The configured target cluster group {cluster_group} does not exist"
))]
TargetClusterGroupNotFound { cluster_group: String },
}

pub struct ClientTagsRouter {
config: ClientTagsRouterConfig,
}

impl ClientTagsRouter {
#[instrument(name = "ClientTagsRouter::new")]
pub fn new(
config: &ClientTagsRouterConfig,
valid_target_groups: HashSet<String>,
) -> Result<Self, Error> {
if !valid_target_groups.contains(&config.trino_cluster_group) {
TargetClusterGroupNotFoundSnafu {
cluster_group: &config.trino_cluster_group,
}
.fail()?;
}

Ok(Self {
config: config.clone(),
})
}
}

impl RouterImplementationTrait for ClientTagsRouter {
#[instrument(
name = "ClientTagHeadersRouter::route"
skip(self),
fields(headers = ?headers.sanitize()),
)]
async fn route(&self, query: &str, headers: &http::HeaderMap) -> Option<String> {
if let Some(Ok(client_tags)) = headers
.get(TRINO_CLIENT_TAGS_HEADER)
.map(HeaderValue::to_str)
{
let client_tags = client_tags
.split(',')
.map(String::from)
.collect::<HashSet<_>>();
match &self.config.tag_matching_strategy {
TagMatchingStrategy::OneOf(one_of) => {
if !one_of.is_disjoint(&client_tags) {
return Some(self.config.trino_cluster_group.clone());
}
}
TagMatchingStrategy::AllOf(all_of) => {
if all_of.is_subset(&client_tags) {
return Some(self.config.trino_cluster_group.clone());
}
}
}
}

None
}
}

#[cfg(test)]
mod tests {
use super::*;

use http::{HeaderMap, HeaderName};
use rstest::rstest;

#[rstest]
#[case(None, None)]
#[case(Some("foo"), Some("my-target"))]
#[case(Some("bar"), Some("my-target"))]
#[case(Some("bak"), Some("my-target"))]
#[case(Some("system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))]
#[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,something-else"), Some("my-target"))]
#[tokio::test]
async fn test_routing_with_one_of(
#[case] x_trino_client_tags: Option<&str>,
#[case] expected: Option<&str>,
) {
let config = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
oneOf: ["foo", "bar", "bak", "system=airflow"]
"#,
)
.unwrap();
let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()]))
.expect("Failed to create ClientTagsRouter");
let mut headers = HeaderMap::new();

if let Some(x_trino_client_tags) = x_trino_client_tags {
headers.insert(
HeaderName::from_static("x-trino-client-tags"),
x_trino_client_tags
.parse()
.expect("Failed to create x-trino-client-tags header"),
);
}

assert_eq!(router.route("", &headers).await.as_deref(), expected);
}

#[rstest]
#[case(None, None)]
#[case(Some("foo"), None)]
#[case(Some("bar"), None)]
#[case(Some("bak"), None)]
#[case(Some("foo,bar,bak"), None)]
#[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))]
#[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,system=airflow,something-else"), Some("my-target"))]
#[tokio::test]
async fn test_routing_with_all_of(
#[case] x_trino_client_tags: Option<&str>,
#[case] expected: Option<&str>,
) {
let config = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
allOf: ["foo", "bar", "bak", "system=airflow"]
"#,
)
.unwrap();
let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()]))
.expect("Failed to create ClientTagsRouter");
let mut headers = HeaderMap::new();

if let Some(x_trino_client_tags) = x_trino_client_tags {
headers.insert(
HeaderName::from_static("x-trino-client-tags"),
x_trino_client_tags
.parse()
.expect("Failed to create x-trino-client-tags header"),
);
}

assert_eq!(router.route("", &headers).await.as_deref(), expected);
}

/// Seems like whatever comes first takes precedence
#[test]
fn test_configuring_one_of_and_all_of() {
let config: ClientTagsRouterConfig = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
allOf: ["allOf"]
oneOf: ["oneOf"]
"#,
)
.unwrap();
assert!(matches!(
config.tag_matching_strategy,
TagMatchingStrategy::AllOf(_)
));

let config: ClientTagsRouterConfig = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
oneOf: ["oneOf"]
allOf: ["allOf"]
"#,
)
.unwrap();
assert!(matches!(
config.tag_matching_strategy,
TagMatchingStrategy::OneOf(_)
));
}
}
25 changes: 24 additions & 1 deletion trino-lb/src/routing/explain_costs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use snafu::{ResultExt, Snafu};
use tracing::{instrument, warn};
use trino_lb_core::sanitization::Sanitize;
Expand All @@ -10,6 +12,11 @@ use crate::{

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(
"Configuration error: The configured target cluster group {cluster_group} does not exist"
))]
TargetClusterGroupNotFound { cluster_group: String },

#[snafu(display("Failed to create Trino client"))]
ExtractTrinoHost { source: trino_client::Error },
}
Expand All @@ -21,7 +28,23 @@ pub struct ExplainCostsRouter {

impl ExplainCostsRouter {
#[instrument(name = "ExplainCostsRouter::new")]
pub fn new(config: &ExplainCostsRouterConfig) -> Result<Self, Error> {
pub fn new(
config: &ExplainCostsRouterConfig,
valid_target_groups: HashSet<String>,
) -> Result<Self, Error> {
for ExplainCostTargetConfig {
trino_cluster_group,
..
} in &config.targets
{
if !valid_target_groups.contains(trino_cluster_group) {
TargetClusterGroupNotFoundSnafu {
cluster_group: trino_cluster_group,
}
.fail()?;
}
}

let trino_client = TrinoClient::new(&config.trino_cluster_to_run_explain_query)
.context(ExtractTrinoHostSnafu)?;

Expand Down
21 changes: 18 additions & 3 deletions trino-lb/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use trino_lb_core::sanitization::Sanitize;

use crate::config::{Config, RoutingConfig};

mod client_tags;
mod explain_costs;
mod python_script;
mod trino_routing_group_header;

pub use client_tags::ClientTagsRouter;
pub use explain_costs::ExplainCostsRouter;
pub use python_script::PythonScriptRouter;
pub use trino_routing_group_header::TrinoRoutingGroupHeaderRouter;
Expand All @@ -21,6 +23,9 @@ pub enum Error {
#[snafu(display("Failed to create python script router"))]
CreatePythonScriptRouter { source: python_script::Error },

#[snafu(display("Failed to create client tags router"))]
CreateClientTagsRouter { source: client_tags::Error },

#[snafu(display("Configuration error: The router {router:?} is configured to route to trinoClusterGroup {trino_cluster_group:?} which does not exist"))]
ConfigErrorClusterGroupDoesNotExist {
router: String,
Expand Down Expand Up @@ -48,9 +53,12 @@ impl Router {
let targets = router_config.targets.iter().map(|t| &t.trino_cluster_group);
check_every_target_group_exists(targets, cluster_groups, "ExplainCostsRouter")?;

ExplainCostsRouter::new(router_config)
.context(CreateExplainCostsRouterSnafu)?
.into()
ExplainCostsRouter::new(
router_config,
config.trino_cluster_groups.keys().cloned().collect(),
)
.context(CreateExplainCostsRouterSnafu)?
.into()
}
RoutingConfig::TrinoRoutingGroupHeader(router_config) => {
TrinoRoutingGroupHeaderRouter::new(
Expand All @@ -65,6 +73,12 @@ impl Router {
)
.context(CreatePythonScriptRouterSnafu)?
.into(),
RoutingConfig::ClientTags(router_config) => ClientTagsRouter::new(
router_config,
config.trino_cluster_groups.keys().cloned().collect(),
)
.context(CreateClientTagsRouterSnafu)?
.into(),
};
routers.push(router);
}
Expand Down Expand Up @@ -114,6 +128,7 @@ pub enum RoutingImplementation {
ExplainCosts(ExplainCostsRouter),
TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouter),
PythonScript(PythonScriptRouter),
ClientTagHeaders(ClientTagsRouter),
}

#[instrument(skip(targets))]
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/routing/python_script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {
},
valid_target_groups,
)
.expect("Failed to create PythonScriptRouter router")
.expect("Failed to create PythonScriptRouter")
}

fn get_headers(x_trino_source: Option<&str>, x_trino_client_tags: Option<&str>) -> HeaderMap {
Expand Down

0 comments on commit 468daf9

Please sign in to comment.