Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ClientTagsRouter #9

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ This is the most flexible way of defining routing rules.
3. ExplainCostsRouter: This router executes an `explain {query}` [EXPLAIN](https://trino.io/docs/current/sql/explain.html?highlight=explain) query for every incoming query.
Trino will respond with an resource estimation the query will consume.
Please note that this functional heavily depends on [Table statistics](https://trino.io/docs/current/optimizer/statistics.html) being present for the access tables to get meaningful estimations.
4. [ClientTagsRouter](./routing/ClientTagsRouter.md): Route queries based on client tags send in the `X-Trino-Client-Tags` header.

## 3. Choosing cluster from cluster group

Expand Down
37 changes: 37 additions & 0 deletions docs/routing/ClientTagsRouter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ClientTagsRouter

This router routes queries based on client tags send in the `X-Trino-Client-Tags` header.
It supports routing a query based on the presence of one tag from a given list OR on the presence of all tags in the list

## One of a list of tags

Let's imagine you want all queries with the tag `etl`, `etl-airflow` **or** `etc-special` to end up the the cluster group `etl`.

You can achieve this with the following config:

```yaml
routers:
- clientTags:
oneOf: ["etl", "etl-airflow", "etl-special"]
trinoClusterGroup: etl
```

## All of a list of tags

A different scenario is that you want to route all queries that have all the required tags, let's say they need the tag `etl` and `system=foo`, as this system executes very very large queries.

You can achieve this with the following config:

```yaml
routers:
- clientTags:
allOf: ["etl", "system=foo"]
trinoClusterGroup: etl-foo
- clientTags:
oneOf: ["etl", "etl-airflow", "etl-special"]
trinoClusterGroup: etl
```

## More flexible routing

If the `oneOf` and `allOf` do not fulfill your routing needs please have a look at the [PythonScriptRouter](./PythonScriptRouter.md), which allows you to execute arbitrary Python script with the most flexibility.
1 change: 1 addition & 0 deletions docs/routing/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ Currently the following routers are implemented:
1. TrinoRoutingGroupHeaderRouter
2. [PythonScriptRouter](./PythonScriptRouter.md)
3. ExplainCostsRouter
4. [ClientTagsRouter](./ClientTagsRouter.md)
24 changes: 23 additions & 1 deletion trino-lb-core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{collections::HashMap, fmt::Debug, fs::File, path::PathBuf, time::Duration};
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
fs::File,
path::PathBuf,
time::Duration,
};

use serde::Deserialize;
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -162,6 +168,7 @@ pub enum RoutingConfig {
ExplainCosts(ExplainCostsRouterConfig),
TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouterConfig),
PythonScript(PythonScriptRouterConfig),
ClientTags(ClientTagsRouterConfig),
}

#[derive(Clone, Debug, Deserialize)]
Expand Down Expand Up @@ -218,6 +225,21 @@ pub struct PythonScriptRouterConfig {
pub script: String,
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ClientTagsRouterConfig {
#[serde(flatten)]
pub tag_matching_strategy: TagMatchingStrategy,
pub trino_cluster_group: String,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum TagMatchingStrategy {
AllOf(HashSet<String>),
OneOf(HashSet<String>),
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ScalerConfig {
Expand Down
189 changes: 189 additions & 0 deletions trino-lb/src/routing/client_tags.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::collections::HashSet;

use http::HeaderValue;
use snafu::Snafu;
use tracing::{instrument, warn};
use trino_lb_core::{
config::{ClientTagsRouterConfig, TagMatchingStrategy},
sanitization::Sanitize,
};

use crate::routing::RouterImplementationTrait;

const TRINO_CLIENT_TAGS_HEADER: &str = "x-trino-client-tags";

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(
"Configuration error: The configured target cluster group {cluster_group} does not exist"
))]
TargetClusterGroupNotFound { cluster_group: String },
}

pub struct ClientTagsRouter {
config: ClientTagsRouterConfig,
}

impl ClientTagsRouter {
#[instrument(name = "ClientTagsRouter::new")]
pub fn new(
config: &ClientTagsRouterConfig,
valid_target_groups: HashSet<String>,
) -> Result<Self, Error> {
if !valid_target_groups.contains(&config.trino_cluster_group) {
TargetClusterGroupNotFoundSnafu {
cluster_group: &config.trino_cluster_group,
}
.fail()?;
}

Ok(Self {
config: config.clone(),
})
}
}

impl RouterImplementationTrait for ClientTagsRouter {
#[instrument(
name = "ClientTagHeadersRouter::route"
skip(self),
fields(headers = ?headers.sanitize()),
)]
async fn route(&self, query: &str, headers: &http::HeaderMap) -> Option<String> {
if let Some(Ok(client_tags)) = headers
.get(TRINO_CLIENT_TAGS_HEADER)
.map(HeaderValue::to_str)
{
let client_tags = client_tags
.split(',')
.map(String::from)
.collect::<HashSet<_>>();
match &self.config.tag_matching_strategy {
TagMatchingStrategy::OneOf(one_of) => {
if !one_of.is_disjoint(&client_tags) {
return Some(self.config.trino_cluster_group.clone());
}
}
TagMatchingStrategy::AllOf(all_of) => {
if all_of.is_subset(&client_tags) {
return Some(self.config.trino_cluster_group.clone());
}
}
}
}

None
}
}

#[cfg(test)]
mod tests {
use super::*;

use http::{HeaderMap, HeaderName};
use rstest::rstest;

#[rstest]
#[case(None, None)]
#[case(Some("foo"), Some("my-target"))]
#[case(Some("bar"), Some("my-target"))]
#[case(Some("bak"), Some("my-target"))]
#[case(Some("system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))]
#[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,something-else"), Some("my-target"))]
#[tokio::test]
async fn test_routing_with_one_of(
#[case] x_trino_client_tags: Option<&str>,
#[case] expected: Option<&str>,
) {
let config = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
oneOf: ["foo", "bar", "bak", "system=airflow"]
"#,
)
.unwrap();
let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()]))
.expect("Failed to create ClientTagsRouter");
let mut headers = HeaderMap::new();

if let Some(x_trino_client_tags) = x_trino_client_tags {
headers.insert(
HeaderName::from_static("x-trino-client-tags"),
x_trino_client_tags
.parse()
.expect("Failed to create x-trino-client-tags header"),
);
}

assert_eq!(router.route("", &headers).await.as_deref(), expected);
}

#[rstest]
#[case(None, None)]
#[case(Some("foo"), None)]
#[case(Some("bar"), None)]
#[case(Some("bak"), None)]
#[case(Some("foo,bar,bak"), None)]
#[case(Some("foo,bar,bak,system=airflow"), Some("my-target"))]
#[case(Some("bak,foo,bar,system=airflow"), Some("my-target"))]
#[case(Some("foo,bar,bak,system=airflow,something-else"), Some("my-target"))]
#[tokio::test]
async fn test_routing_with_all_of(
#[case] x_trino_client_tags: Option<&str>,
#[case] expected: Option<&str>,
) {
let config = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
allOf: ["foo", "bar", "bak", "system=airflow"]
"#,
)
.unwrap();
let router = ClientTagsRouter::new(&config, HashSet::from(["my-target".to_string()]))
.expect("Failed to create ClientTagsRouter");
let mut headers = HeaderMap::new();

if let Some(x_trino_client_tags) = x_trino_client_tags {
headers.insert(
HeaderName::from_static("x-trino-client-tags"),
x_trino_client_tags
.parse()
.expect("Failed to create x-trino-client-tags header"),
);
}

assert_eq!(router.route("", &headers).await.as_deref(), expected);
}

/// Seems like whatever comes first takes precedence
#[test]
fn test_configuring_one_of_and_all_of() {
let config: ClientTagsRouterConfig = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
allOf: ["allOf"]
oneOf: ["oneOf"]
"#,
)
.unwrap();
assert!(matches!(
config.tag_matching_strategy,
TagMatchingStrategy::AllOf(_)
));

let config: ClientTagsRouterConfig = serde_yaml::from_str(
r#"
trinoClusterGroup: my-target
oneOf: ["oneOf"]
allOf: ["allOf"]
"#,
)
.unwrap();
assert!(matches!(
config.tag_matching_strategy,
TagMatchingStrategy::OneOf(_)
));
}
}
25 changes: 24 additions & 1 deletion trino-lb/src/routing/explain_costs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashSet;

use snafu::{ResultExt, Snafu};
use tracing::{instrument, warn};
use trino_lb_core::sanitization::Sanitize;
Expand All @@ -10,6 +12,11 @@ use crate::{

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display(
"Configuration error: The configured target cluster group {cluster_group} does not exist"
))]
TargetClusterGroupNotFound { cluster_group: String },

#[snafu(display("Failed to create Trino client"))]
ExtractTrinoHost { source: trino_client::Error },
}
Expand All @@ -21,7 +28,23 @@ pub struct ExplainCostsRouter {

impl ExplainCostsRouter {
#[instrument(name = "ExplainCostsRouter::new")]
pub fn new(config: &ExplainCostsRouterConfig) -> Result<Self, Error> {
pub fn new(
config: &ExplainCostsRouterConfig,
valid_target_groups: HashSet<String>,
) -> Result<Self, Error> {
sbernauer marked this conversation as resolved.
Show resolved Hide resolved
for ExplainCostTargetConfig {
trino_cluster_group,
..
} in &config.targets
{
if !valid_target_groups.contains(trino_cluster_group) {
TargetClusterGroupNotFoundSnafu {
cluster_group: trino_cluster_group,
}
.fail()?;
}
}

let trino_client = TrinoClient::new(&config.trino_cluster_to_run_explain_query)
.context(ExtractTrinoHostSnafu)?;

Expand Down
21 changes: 18 additions & 3 deletions trino-lb/src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use trino_lb_core::sanitization::Sanitize;

use crate::config::{Config, RoutingConfig};

mod client_tags;
mod explain_costs;
mod python_script;
mod trino_routing_group_header;

pub use client_tags::ClientTagsRouter;
pub use explain_costs::ExplainCostsRouter;
pub use python_script::PythonScriptRouter;
pub use trino_routing_group_header::TrinoRoutingGroupHeaderRouter;
Expand All @@ -21,6 +23,9 @@ pub enum Error {
#[snafu(display("Failed to create python script router"))]
CreatePythonScriptRouter { source: python_script::Error },

#[snafu(display("Failed to create client tags router"))]
CreateClientTagsRouter { source: client_tags::Error },

#[snafu(display("Configuration error: The router {router:?} is configured to route to trinoClusterGroup {trino_cluster_group:?} which does not exist"))]
ConfigErrorClusterGroupDoesNotExist {
router: String,
Expand Down Expand Up @@ -48,9 +53,12 @@ impl Router {
let targets = router_config.targets.iter().map(|t| &t.trino_cluster_group);
check_every_target_group_exists(targets, cluster_groups, "ExplainCostsRouter")?;

ExplainCostsRouter::new(router_config)
.context(CreateExplainCostsRouterSnafu)?
.into()
ExplainCostsRouter::new(
router_config,
config.trino_cluster_groups.keys().cloned().collect(),
)
.context(CreateExplainCostsRouterSnafu)?
.into()
}
RoutingConfig::TrinoRoutingGroupHeader(router_config) => {
TrinoRoutingGroupHeaderRouter::new(
Expand All @@ -65,6 +73,12 @@ impl Router {
)
.context(CreatePythonScriptRouterSnafu)?
.into(),
RoutingConfig::ClientTags(router_config) => ClientTagsRouter::new(
router_config,
config.trino_cluster_groups.keys().cloned().collect(),
)
.context(CreateClientTagsRouterSnafu)?
.into(),
};
routers.push(router);
}
Expand Down Expand Up @@ -114,6 +128,7 @@ pub enum RoutingImplementation {
ExplainCosts(ExplainCostsRouter),
TrinoRoutingGroupHeader(TrinoRoutingGroupHeaderRouter),
PythonScript(PythonScriptRouter),
ClientTagHeaders(ClientTagsRouter),
}

#[instrument(skip(targets))]
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/routing/python_script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {
},
valid_target_groups,
)
.expect("Failed to create PythonScriptRouter router")
.expect("Failed to create PythonScriptRouter")
}

fn get_headers(x_trino_source: Option<&str>, x_trino_client_tags: Option<&str>) -> HeaderMap {
Expand Down