From cb9ab7463c27f30532f944ff9d3adb0636e42364 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 14 Oct 2024 12:25:55 +0200 Subject: [PATCH] proxy: split out the console-redirect backend flow (#9270) removes the ConsoleRedirect backend from the main auth::Backends enum, copy-paste the existing crate::proxy::task_main structure to use the ConsoleRedirectBackend exclusively. This makes the logic a bit simpler at the cost of some fairly trivial code duplication. --- proxy/src/auth/backend/console_redirect.rs | 37 +++- proxy/src/auth/backend/mod.rs | 72 ++----- proxy/src/bin/local_proxy.rs | 2 +- proxy/src/bin/proxy.rs | 97 +++++---- proxy/src/console_redirect_proxy.rs | 217 +++++++++++++++++++++ proxy/src/lib.rs | 1 + proxy/src/proxy/mod.rs | 6 +- proxy/src/proxy/tests/mod.rs | 2 +- proxy/src/serverless/backend.rs | 7 +- proxy/src/serverless/mod.rs | 2 +- proxy/src/serverless/websocket.rs | 2 +- 11 files changed, 333 insertions(+), 112 deletions(-) create mode 100644 proxy/src/console_redirect_proxy.rs diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 127be545e1d8..457410ec8cec 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -1,18 +1,24 @@ use crate::{ - auth, compute, + auth, + cache::Cached, + compute, config::AuthenticationConfig, context::RequestMonitoring, - control_plane::{self, provider::NodeInfo}, + control_plane::{self, provider::NodeInfo, CachedNodeInfo}, error::{ReportableError, UserFacingError}, + proxy::connect_compute::ComputeConnectBackend, stream::PqStream, waiters, }; +use async_trait::async_trait; use pq_proto::BeMessage as Be; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_postgres::config::SslMode; use tracing::{info, info_span}; +use super::ComputeCredentialKeys; + #[derive(Debug, Error)] pub(crate) enum WebAuthError { #[error(transparent)] @@ -25,6 +31,7 @@ pub(crate) enum WebAuthError { Io(#[from] std::io::Error), } +#[derive(Debug)] pub struct ConsoleRedirectBackend { console_uri: reqwest::Url, } @@ -66,17 +73,31 @@ impl ConsoleRedirectBackend { Self { console_uri } } - pub(super) fn url(&self) -> &reqwest::Url { - &self.console_uri - } - pub(crate) async fn authenticate( &self, ctx: &RequestMonitoring, auth_config: &'static AuthenticationConfig, client: &mut PqStream, - ) -> auth::Result { - authenticate(ctx, auth_config, &self.console_uri, client).await + ) -> auth::Result { + authenticate(ctx, auth_config, &self.console_uri, client) + .await + .map(ConsoleRedirectNodeInfo) + } +} + +pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo); + +#[async_trait] +impl ComputeConnectBackend for ConsoleRedirectNodeInfo { + async fn wake_compute( + &self, + _ctx: &RequestMonitoring, + ) -> Result { + Ok(Cached::new_uncached(self.0.clone())) + } + + fn get_keys(&self) -> &ComputeCredentialKeys { + &ComputeCredentialKeys::None } } diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 27c9f1876eac..96e1a787ed1b 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -22,7 +22,7 @@ use crate::cache::Cached; use crate::context::RequestMonitoring; use crate::control_plane::errors::GetAuthInfoError; use crate::control_plane::provider::{CachedRoleSecret, ControlPlaneBackend}; -use crate::control_plane::{AuthSecret, NodeInfo}; +use crate::control_plane::AuthSecret; use crate::intern::EndpointIdInt; use crate::metrics::Metrics; use crate::proxy::connect_compute::ComputeConnectBackend; @@ -66,11 +66,9 @@ impl std::ops::Deref for MaybeOwned<'_, T> { /// * However, when we substitute `T` with [`ComputeUserInfoMaybeEndpoint`], /// this helps us provide the credentials only to those auth /// backends which require them for the authentication process. -pub enum Backend<'a, T, D> { +pub enum Backend<'a, T> { /// Cloud API (V2). ControlPlane(MaybeOwned<'a, ControlPlaneBackend>, T), - /// Authentication via a web browser. - ConsoleRedirect(MaybeOwned<'a, ConsoleRedirectBackend>, D), /// Local proxy uses configured auth credentials and does not wake compute Local(MaybeOwned<'a, LocalBackend>), } @@ -91,7 +89,7 @@ impl Clone for Box { } } -impl std::fmt::Display for Backend<'_, (), ()> { +impl std::fmt::Display for Backend<'_, ()> { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ControlPlane(api, ()) => match &**api { @@ -107,46 +105,39 @@ impl std::fmt::Display for Backend<'_, (), ()> { #[cfg(test)] ControlPlaneBackend::Test(_) => fmt.debug_tuple("ControlPlane::Test").finish(), }, - Self::ConsoleRedirect(backend, ()) => fmt - .debug_tuple("ConsoleRedirect") - .field(&backend.url().as_str()) - .finish(), Self::Local(_) => fmt.debug_tuple("Local").finish(), } } } -impl Backend<'_, T, D> { +impl Backend<'_, T> { /// Very similar to [`std::option::Option::as_ref`]. /// This helps us pass structured config to async tasks. - pub(crate) fn as_ref(&self) -> Backend<'_, &T, &D> { + pub(crate) fn as_ref(&self) -> Backend<'_, &T> { match self { Self::ControlPlane(c, x) => Backend::ControlPlane(MaybeOwned::Borrowed(c), x), - Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(MaybeOwned::Borrowed(c), x), Self::Local(l) => Backend::Local(MaybeOwned::Borrowed(l)), } } } -impl<'a, T, D> Backend<'a, T, D> { +impl<'a, T> Backend<'a, T> { /// Very similar to [`std::option::Option::map`]. /// Maps [`Backend`] to [`Backend`] by applying /// a function to a contained value. - pub(crate) fn map(self, f: impl FnOnce(T) -> R) -> Backend<'a, R, D> { + pub(crate) fn map(self, f: impl FnOnce(T) -> R) -> Backend<'a, R> { match self { Self::ControlPlane(c, x) => Backend::ControlPlane(c, f(x)), - Self::ConsoleRedirect(c, x) => Backend::ConsoleRedirect(c, x), Self::Local(l) => Backend::Local(l), } } } -impl<'a, T, D, E> Backend<'a, Result, D> { +impl<'a, T, E> Backend<'a, Result> { /// Very similar to [`std::option::Option::transpose`]. /// This is most useful for error handling. - pub(crate) fn transpose(self) -> Result, E> { + pub(crate) fn transpose(self) -> Result, E> { match self { Self::ControlPlane(c, x) => x.map(|x| Backend::ControlPlane(c, x)), - Self::ConsoleRedirect(c, x) => Ok(Backend::ConsoleRedirect(c, x)), Self::Local(l) => Ok(Backend::Local(l)), } } @@ -414,12 +405,11 @@ async fn authenticate_with_secret( classic::authenticate(ctx, info, client, config, secret).await } -impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> { +impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> { /// Get username from the credentials. pub(crate) fn get_user(&self) -> &str { match self { Self::ControlPlane(_, user_info) => &user_info.user, - Self::ConsoleRedirect(_, ()) => "web", Self::Local(_) => "local", } } @@ -433,7 +423,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> { allow_cleartext: bool, config: &'static AuthenticationConfig, endpoint_rate_limiter: Arc, - ) -> auth::Result> { + ) -> auth::Result> { let res = match self { Self::ControlPlane(api, user_info) => { info!( @@ -454,14 +444,6 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> { .await?; Backend::ControlPlane(api, credentials) } - // NOTE: this auth backend doesn't use client credentials. - Self::ConsoleRedirect(backend, ()) => { - info!("performing web authentication"); - - let info = backend.authenticate(ctx, config, client).await?; - - Backend::ConsoleRedirect(backend, info) - } Self::Local(_) => { return Err(auth::AuthError::bad_auth_method("invalid for local proxy")) } @@ -472,14 +454,13 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint, &()> { } } -impl Backend<'_, ComputeUserInfo, &()> { +impl Backend<'_, ComputeUserInfo> { pub(crate) async fn get_role_secret( &self, ctx: &RequestMonitoring, ) -> Result { match self { Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await, - Self::ConsoleRedirect(_, ()) => Ok(Cached::new_uncached(None)), Self::Local(_) => Ok(Cached::new_uncached(None)), } } @@ -492,45 +473,19 @@ impl Backend<'_, ComputeUserInfo, &()> { Self::ControlPlane(api, user_info) => { api.get_allowed_ips_and_secret(ctx, user_info).await } - Self::ConsoleRedirect(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)), } } } #[async_trait::async_trait] -impl ComputeConnectBackend for Backend<'_, ComputeCredentials, NodeInfo> { +impl ComputeConnectBackend for Backend<'_, ComputeCredentials> { async fn wake_compute( &self, ctx: &RequestMonitoring, ) -> Result { match self { Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await, - Self::ConsoleRedirect(_, info) => Ok(Cached::new_uncached(info.clone())), - Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())), - } - } - - fn get_keys(&self) -> &ComputeCredentialKeys { - match self { - Self::ControlPlane(_, creds) => &creds.keys, - Self::ConsoleRedirect(_, _) => &ComputeCredentialKeys::None, - Self::Local(_) => &ComputeCredentialKeys::None, - } - } -} - -#[async_trait::async_trait] -impl ComputeConnectBackend for Backend<'_, ComputeCredentials, &()> { - async fn wake_compute( - &self, - ctx: &RequestMonitoring, - ) -> Result { - match self { - Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await, - Self::ConsoleRedirect(_, ()) => { - unreachable!("web auth flow doesn't support waking the compute") - } Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())), } } @@ -538,7 +493,6 @@ impl ComputeConnectBackend for Backend<'_, ComputeCredentials, &()> { fn get_keys(&self) -> &ComputeCredentialKeys { match self { Self::ControlPlane(_, creds) => &creds.keys, - Self::ConsoleRedirect(_, ()) => &ComputeCredentialKeys::None, Self::Local(_) => &ComputeCredentialKeys::None, } } diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index c781af846a2e..c92ebbc51f3c 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -291,7 +291,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig /// auth::Backend is created at proxy startup, and lives forever. fn build_auth_backend( args: &LocalProxyCliArgs, -) -> anyhow::Result<&'static auth::Backend<'static, (), ()>> { +) -> anyhow::Result<&'static auth::Backend<'static, ()>> { let auth_backend = proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned( LocalBackend::new(args.compute), )); diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 3f4c2df80954..3c0e66dec3f1 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -314,7 +314,10 @@ async fn main() -> anyhow::Result<()> { let config = build_config(&args)?; let auth_backend = build_auth_backend(&args)?; - info!("Authentication backend: {}", auth_backend); + match auth_backend { + Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"), + Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"), + }; info!("Using region: {}", args.aws_region); let region_provider = @@ -461,26 +464,41 @@ async fn main() -> anyhow::Result<()> { // client facing tasks. these will exit on error or on cancellation // cancellation returns Ok(()) let mut client_tasks = JoinSet::new(); - if let Some(proxy_listener) = proxy_listener { - client_tasks.spawn(proxy::proxy::task_main( - config, - auth_backend, - proxy_listener, - cancellation_token.clone(), - cancellation_handler.clone(), - endpoint_rate_limiter.clone(), - )); - } + match auth_backend { + Either::Left(auth_backend) => { + if let Some(proxy_listener) = proxy_listener { + client_tasks.spawn(proxy::proxy::task_main( + config, + auth_backend, + proxy_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + endpoint_rate_limiter.clone(), + )); + } - if let Some(serverless_listener) = serverless_listener { - client_tasks.spawn(serverless::task_main( - config, - auth_backend, - serverless_listener, - cancellation_token.clone(), - cancellation_handler.clone(), - endpoint_rate_limiter.clone(), - )); + if let Some(serverless_listener) = serverless_listener { + client_tasks.spawn(serverless::task_main( + config, + auth_backend, + serverless_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + endpoint_rate_limiter.clone(), + )); + } + } + Either::Right(auth_backend) => { + if let Some(proxy_listener) = proxy_listener { + client_tasks.spawn(proxy::console_redirect_proxy::task_main( + config, + auth_backend, + proxy_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + )); + } + } } client_tasks.spawn(proxy::context::parquet::worker( @@ -510,7 +528,7 @@ async fn main() -> anyhow::Result<()> { )); } - if let auth::Backend::ControlPlane(api, _) = auth_backend { + if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend { if let proxy::control_plane::provider::ControlPlaneBackend::Management(api) = &**api { match (redis_notifications_client, regional_redis_client.clone()) { (None, None) => {} @@ -663,7 +681,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { webauth_confirmation_timeout: args.webauth_confirmation_timeout, }; - let config = Box::leak(Box::new(ProxyConfig { + let config = ProxyConfig { tls_config, metric_collection, allow_self_signed_compute: args.allow_self_signed_compute, @@ -677,7 +695,9 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { connect_to_compute_retry_config: config::RetryConfig::parse( &args.connect_to_compute_retry, )?, - })); + }; + + let config = Box::leak(Box::new(config)); tokio::spawn(config.connect_compute_locks.garbage_collect_worker()); @@ -687,8 +707,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { /// auth::Backend is created at proxy startup, and lives forever. fn build_auth_backend( args: &ProxyCliArgs, -) -> anyhow::Result<&'static auth::Backend<'static, (), ()>> { - let auth_backend = match &args.auth_backend { +) -> anyhow::Result, &'static ConsoleRedirectBackend>> { + match &args.auth_backend { AuthBackendType::Console => { let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; let project_info_cache_config: ProjectInfoCacheOptions = @@ -738,12 +758,11 @@ fn build_auth_backend( wake_compute_endpoint_rate_limiter, ); let api = control_plane::provider::ControlPlaneBackend::Management(api); - auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()) - } + let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); - AuthBackendType::Web => { - let url = args.uri.parse()?; - auth::Backend::ConsoleRedirect(MaybeOwned::Owned(ConsoleRedirectBackend::new(url)), ()) + let config = Box::leak(Box::new(auth_backend)); + + Ok(Either::Left(config)) } #[cfg(feature = "testing")] @@ -751,11 +770,23 @@ fn build_auth_backend( let url = args.auth_endpoint.parse()?; let api = control_plane::provider::mock::Api::new(url, !args.is_private_access_proxy); let api = control_plane::provider::ControlPlaneBackend::PostgresMock(api); - auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()) + + let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); + + let config = Box::leak(Box::new(auth_backend)); + + Ok(Either::Left(config)) } - }; - Ok(Box::leak(Box::new(auth_backend))) + AuthBackendType::Web => { + let url = args.uri.parse()?; + let backend = ConsoleRedirectBackend::new(url); + + let config = Box::leak(Box::new(backend)); + + Ok(Either::Right(config)) + } + } } #[cfg(test)] diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs new file mode 100644 index 000000000000..9e1797672021 --- /dev/null +++ b/proxy/src/console_redirect_proxy.rs @@ -0,0 +1,217 @@ +use crate::auth::backend::ConsoleRedirectBackend; +use crate::config::{ProxyConfig, ProxyProtocolV2}; +use crate::proxy::{ + prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource, +}; +use crate::{ + cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal}, + context::RequestMonitoring, + error::ReportableError, + metrics::{Metrics, NumClientConnectionsGuard}, + protocol2::read_proxy_protocol, + proxy::handshake::{handshake, HandshakeData}, +}; +use futures::TryFutureExt; +use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, Instrument}; + +use crate::proxy::{ + connect_compute::{connect_to_compute, TcpMechanism}, + passthrough::ProxyPassthrough, +}; + +pub async fn task_main( + config: &'static ProxyConfig, + backend: &'static ConsoleRedirectBackend, + listener: tokio::net::TcpListener, + cancellation_token: CancellationToken, + cancellation_handler: Arc, +) -> anyhow::Result<()> { + scopeguard::defer! { + info!("proxy has shut down"); + } + + // When set for the server socket, the keepalive setting + // will be inherited by all accepted client sockets. + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let connections = tokio_util::task::task_tracker::TaskTracker::new(); + + while let Some(accept_result) = + run_until_cancelled(listener.accept(), &cancellation_token).await + { + let (socket, peer_addr) = accept_result?; + + let conn_gauge = Metrics::get() + .proxy + .client_connections + .guard(crate::metrics::Protocol::Tcp); + + let session_id = uuid::Uuid::new_v4(); + let cancellation_handler = Arc::clone(&cancellation_handler); + + tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection"); + + connections.spawn(async move { + let (socket, peer_addr) = match read_proxy_protocol(socket).await { + Err(e) => { + error!("per-client task finished with an error: {e:#}"); + return; + } + Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => { + error!("missing required proxy protocol header"); + return; + } + Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => { + error!("proxy protocol header not supported"); + return; + } + Ok((socket, Some(addr))) => (socket, addr.ip()), + Ok((socket, None)) => (socket, peer_addr.ip()), + }; + + match socket.inner.set_nodelay(true) { + Ok(()) => {} + Err(e) => { + error!("per-client task finished with an error: failed to set socket option: {e:#}"); + return; + } + }; + + let ctx = RequestMonitoring::new( + session_id, + peer_addr, + crate::metrics::Protocol::Tcp, + &config.region, + ); + let span = ctx.span(); + + let startup = Box::pin( + handle_client( + config, + backend, + &ctx, + cancellation_handler, + socket, + conn_gauge, + ) + .instrument(span.clone()), + ); + let res = startup.await; + + match res { + Err(e) => { + // todo: log and push to ctx the error kind + ctx.set_error_kind(e.get_error_kind()); + error!(parent: &span, "per-client task finished with an error: {e:#}"); + } + Ok(None) => { + ctx.set_success(); + } + Ok(Some(p)) => { + ctx.set_success(); + ctx.log_connect(); + match p.proxy_pass().instrument(span.clone()).await { + Ok(()) => {} + Err(ErrorSource::Client(e)) => { + error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}"); + } + Err(ErrorSource::Compute(e)) => { + error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}"); + } + } + } + } + }); + } + + connections.close(); + drop(listener); + + // Drain connections + connections.wait().await; + + Ok(()) +} + +pub(crate) async fn handle_client( + config: &'static ProxyConfig, + backend: &'static ConsoleRedirectBackend, + ctx: &RequestMonitoring, + cancellation_handler: Arc, + stream: S, + conn_gauge: NumClientConnectionsGuard<'static>, +) -> Result>, ClientRequestError> { + info!( + protocol = %ctx.protocol(), + "handling interactive connection from client" + ); + + let metrics = &Metrics::get().proxy; + let proto = ctx.protocol(); + let request_gauge = metrics.connection_requests.guard(proto); + + let tls = config.tls_config.as_ref(); + + let record_handshake_error = !ctx.has_private_peer_addr(); + let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client); + let do_handshake = handshake(ctx, stream, tls, record_handshake_error); + let (mut stream, params) = + match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? { + HandshakeData::Startup(stream, params) => (stream, params), + HandshakeData::Cancel(cancel_key_data) => { + return Ok(cancellation_handler + .cancel_session(cancel_key_data, ctx.session_id()) + .await + .map(|()| None)?) + } + }; + drop(pause); + + ctx.set_db_options(params.clone()); + + let user_info = match backend + .authenticate(ctx, &config.authentication_config, &mut stream) + .await + { + Ok(auth_result) => auth_result, + Err(e) => { + return stream.throw_error(e).await?; + } + }; + + let mut node = connect_to_compute( + ctx, + &TcpMechanism { + params: ¶ms, + locks: &config.connect_compute_locks, + }, + &user_info, + config.allow_self_signed_compute, + config.wake_compute_retry_config, + config.connect_to_compute_retry_config, + ) + .or_else(|e| stream.throw_error(e)) + .await?; + + let session = cancellation_handler.get_session(); + prepare_client_connection(&node, &session, &mut stream).await?; + + // Before proxy passing, forward to compute whatever data is left in the + // PqStream input buffer. Normally there is none, but our serverless npm + // driver in pipeline mode sends startup, password and first query + // immediately after opening the connection. + let (stream, read_buf) = stream.into_inner(); + node.stream.write_all(&read_buf).await?; + + Ok(Some(ProxyPassthrough { + client: stream, + aux: node.aux.clone(), + compute: node, + _req: request_gauge, + _conn: conn_gauge, + _cancel: session, + })) +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8d274baa10b9..74bc778a36c4 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -95,6 +95,7 @@ pub mod cache; pub mod cancellation; pub mod compute; pub mod config; +pub mod console_redirect_proxy; pub mod context; pub mod control_plane; pub mod error; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 3a43ccb74a6f..b2b5a7f43d6c 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -61,7 +61,7 @@ pub async fn run_until_cancelled( pub async fn task_main( config: &'static ProxyConfig, - auth_backend: &'static auth::Backend<'static, (), ()>, + auth_backend: &'static auth::Backend<'static, ()>, listener: tokio::net::TcpListener, cancellation_token: CancellationToken, cancellation_handler: Arc, @@ -248,7 +248,7 @@ impl ReportableError for ClientRequestError { #[allow(clippy::too_many_arguments)] pub(crate) async fn handle_client( config: &'static ProxyConfig, - auth_backend: &'static auth::Backend<'static, (), ()>, + auth_backend: &'static auth::Backend<'static, ()>, ctx: &RequestMonitoring, cancellation_handler: Arc, stream: S, @@ -356,7 +356,7 @@ pub(crate) async fn handle_client( /// Finish client connection initialization: confirm auth success, send params, etc. #[tracing::instrument(skip_all)] -async fn prepare_client_connection

( +pub(crate) async fn prepare_client_connection

( node: &compute::PostgresConnection, session: &cancellation::Session

, stream: &mut PqStream, diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 3861ddc8edff..58fb36dba754 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -552,7 +552,7 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn fn helper_create_connect_info( mechanism: &TestConnectMechanism, -) -> auth::Backend<'static, ComputeCredentials, &()> { +) -> auth::Backend<'static, ComputeCredentials> { let user_info = auth::Backend::ControlPlane( MaybeOwned::Owned(ControlPlaneBackend::Test(Box::new(mechanism.clone()))), ComputeCredentials { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 9e49478cf3ec..2b060af9e1e3 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -42,7 +42,7 @@ pub(crate) struct PoolingBackend { pub(crate) local_pool: Arc>, pub(crate) pool: Arc>, pub(crate) config: &'static ProxyConfig, - pub(crate) auth_backend: &'static crate::auth::Backend<'static, (), ()>, + pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>, pub(crate) endpoint_rate_limiter: Arc, } @@ -135,9 +135,6 @@ impl PoolingBackend { keys: crate::auth::backend::ComputeCredentialKeys::None, }) } - crate::auth::Backend::ConsoleRedirect(_, ()) => Err(AuthError::auth_failed( - "JWT login over web auth proxy is not supported", - )), crate::auth::Backend::Local(_) => { let keys = self .config @@ -264,7 +261,7 @@ impl PoolingBackend { info!(%conn_id, "local_pool: opening a new connection '{conn_info}'"); let mut node_info = match &self.auth_backend { - auth::Backend::ControlPlane(_, ()) | auth::Backend::ConsoleRedirect(_, ()) => { + auth::Backend::ControlPlane(_, ()) => { unreachable!("only local_proxy can connect to local postgres") } auth::Backend::Local(local) => local.node_info.clone(), diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 95f64e972c07..3131adada4cc 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -55,7 +55,7 @@ pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api"; pub async fn task_main( config: &'static ProxyConfig, - auth_backend: &'static crate::auth::Backend<'static, (), ()>, + auth_backend: &'static crate::auth::Backend<'static, ()>, ws_listener: TcpListener, cancellation_token: CancellationToken, cancellation_handler: Arc, diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index fd0f0cac7f34..f5a692cf404e 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -129,7 +129,7 @@ impl AsyncBufRead for WebSocketRw { pub(crate) async fn serve_websocket( config: &'static ProxyConfig, - auth_backend: &'static crate::auth::Backend<'static, (), ()>, + auth_backend: &'static crate::auth::Backend<'static, ()>, ctx: RequestMonitoring, websocket: OnUpgrade, cancellation_handler: Arc,