Skip to content

Commit

Permalink
fix: Correctly change nextUri to trino-lb
Browse files Browse the repository at this point in the history
Previously we did only swap the host, but not the protocol or port...
  • Loading branch information
sbernauer committed Jan 11, 2024
1 parent 189f20e commit ee33e5d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 30 deletions.
22 changes: 5 additions & 17 deletions trino-lb-core/src/trino_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

use prusto::{QueryError, Warning};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use tracing::instrument;
use url::Url;

Expand All @@ -22,12 +22,6 @@ pub enum Error {
#[snafu(display("Failed to parse nextUri Trino send us"))]
ParseNextUriFromTrino { source: url::ParseError },

#[snafu(display("The trino-lb address {trino_lb_addr} has no host"))]
TrinoLbAddrHasNoHost { trino_lb_addr: Url },

#[snafu(display("Failed to change the host of the nextUri"))]
ChangeHostOfNextUri { source: url::ParseError },

#[snafu(display("Failed to determine the elapsed time of a queued query. Are all system clocks of trino-lb instances in sync?"))]
DetermineElapsedTime { source: SystemTimeError },

Expand Down Expand Up @@ -159,17 +153,11 @@ impl TrinoQueryApiResponse {
#[instrument(
fields(trino_lb_addr = %trino_lb_addr),
)]
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> {
pub fn change_next_uri_to_trino_lb(&mut self, mut trino_lb_addr: Url) -> Result<(), Error> {
if let Some(next_uri) = &self.next_uri {
let mut next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
next_uri
.set_host(Some(trino_lb_addr.host_str().context(
TrinoLbAddrHasNoHostSnafu {
trino_lb_addr: trino_lb_addr.clone(),
},
)?))
.context(ChangeHostOfNextUriSnafu)?;
self.next_uri = Some(next_uri.to_string());
let next_uri = Url::parse(next_uri).context(ParseNextUriFromTrinoSnafu)?;
trino_lb_addr.set_path(next_uri.path());
self.next_uri = Some(trino_lb_addr.to_string());
}

Ok(())
Expand Down
37 changes: 26 additions & 11 deletions trino-lb/src/http_server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
fmt::Debug,
net::{Ipv6Addr, SocketAddr},
path::PathBuf,
sync::Arc,
time::Duration,
};
Expand All @@ -27,14 +28,20 @@ mod v1;

#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Failed configure HTTP server"))]
ConfigureHttpServer { source: std::io::Error },
#[snafu(display(
"Failed configure HTTP server PEM cert at {cert_pem_file:?} and PEM key at {key_pem_file:?}"
))]
ConfigureServerTrustAndKeystore {
source: std::io::Error,
cert_pem_file: PathBuf,
key_pem_file: PathBuf,
},

#[snafu(display("Failed start HTTP server"))]
StartHttpServer { source: std::io::Error },

#[snafu(display(
"In case https is used the `tls.certPemFile` and `tls.keyPemFile` option must be set"
"In case https is used the `tls.certPemFile` and `tls.keyPemFile` options must be set"
))]
CertsMissing {},
}
Expand Down Expand Up @@ -118,12 +125,15 @@ pub async fn start_http_server(
// Start https server
let listen_addr = SocketAddr::from((Ipv6Addr::UNSPECIFIED, 8443));
info!(%listen_addr, "Starting server");
let tls_config = RustlsConfig::from_pem_file(
tls_config.cert_pem_file.context(CertsMissingSnafu)?,
tls_config.key_pem_file.context(CertsMissingSnafu)?,
)
.await
.context(ConfigureHttpServerSnafu)?;

let cert_pem_file = tls_config.cert_pem_file.context(CertsMissingSnafu)?;
let key_pem_file = tls_config.key_pem_file.context(CertsMissingSnafu)?;
let tls_config = RustlsConfig::from_pem_file(&cert_pem_file, &key_pem_file)
.await
.context(ConfigureServerTrustAndKeystoreSnafu {
cert_pem_file,
key_pem_file,
})?;

axum_server::bind_rustls(listen_addr, tls_config)
.handle(handle)
Expand All @@ -142,18 +152,23 @@ pub async fn start_http_server(
.context(StartHttpServerSnafu)?;
}

info!("Shut down");

Ok(())
}

async fn graceful_shutdown(handle: Handle) {
wait_for_shutdown_signal().await;

info!("Sending graceful shutdown signal");
info!("Shutting down gracefully");

// Signal the server to shutdown using Handle.
handle.graceful_shutdown(Some(Duration::from_secs(5)));
loop {
info!(connection = handle.connection_count(), "Alive connections");
info!(
connections = handle.connection_count(),
"Waiting for all connections to close"
);
sleep(Duration::from_secs(1)).await;
}
}
Expand Down
6 changes: 4 additions & 2 deletions trino-lb/src/http_server/v1/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ async fn queue_or_hand_over_query(
)?;

trino_query_api_response
.change_next_uri_to_trino_lb(&state.config.trino_lb.external_address)
.change_next_uri_to_trino_lb(
state.config.trino_lb.external_address.clone(),
)
.context(ModifyNextUriSnafu)?;

info!(
Expand Down Expand Up @@ -463,7 +465,7 @@ async fn handle_query_running_on_trino(
if trino_query_api_response.next_uri.is_some() {
// Change the nextUri to actually point to trino-lb instead of Trino.
trino_query_api_response
.change_next_uri_to_trino_lb(&state.config.trino_lb.external_address)
.change_next_uri_to_trino_lb(state.config.trino_lb.external_address.clone())
.context(ModifyNextUriSnafu)?;
} else {
info!(%query_id, "Query completed (no next_uri send)");
Expand Down

0 comments on commit ee33e5d

Please sign in to comment.