Skip to content

Commit

Permalink
Log storage_query errors on warn
Browse files Browse the repository at this point in the history
Logging the storage_query errors on warn allows better insights
into why a connection might be rejected by a client. Currently,
a user of Restate runs into the problem that the connection gets
rejected but it is unclear why.

This fixes #740.
  • Loading branch information
tillrohrmann committed Aug 26, 2023
1 parent 76fb0b8 commit 3c3c3b7
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/storage_query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ paste = { workspace = true}
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
10 changes: 8 additions & 2 deletions src/storage_query/src/pgwire_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::net::SocketAddr;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -40,6 +41,7 @@ use pgwire::api::{ClientInfo, MakeHandler, StatelessMakeHandler, Type};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use pgwire::messages::data::DataRow;
use pgwire::tokio::process_socket;
use tracing::warn;

pub(crate) struct HandlerFactory {
processor: Arc<StatelessMakeHandler<DfSessionService>>,
Expand All @@ -65,19 +67,23 @@ impl HandlerFactory {
}
}

pub fn spawn_connection(&self, incoming_socket: TcpStream) {
pub fn spawn_connection(&self, incoming_socket: TcpStream, addr: SocketAddr) {
let authenticator_ref = self.authenticator.make();
let processor_ref = self.processor.make();
let placeholder_ref = self.placeholder.make();
tokio::spawn(async move {
let _ = process_socket(
let result = process_socket(
incoming_socket,
None,
authenticator_ref,
processor_ref,
placeholder_ref,
)
.await;

if let Err(err) = result {
warn!("Failed processing socket for connection '{addr}': {err}");
}
});
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/storage_query/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::io::ErrorKind;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::select;
use tracing::warn;

pub type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -66,8 +67,11 @@ impl PostgresQueryService {
loop {
select! {
incoming_socket = listener.accept() => {
if let Ok((stream, _addr)) = incoming_socket {
factory.spawn_connection(stream);
match incoming_socket {
Ok((stream, addr)) => factory.spawn_connection(stream, addr),
Err(err) => {
warn!("Failed to accept storage query connection: {err}");
}
}
},
_ = &mut shutdown => {
Expand Down

0 comments on commit 3c3c3b7

Please sign in to comment.