From bb738f92186298b948ece0840904221ad1c4fc54 Mon Sep 17 00:00:00 2001 From: Nadeshiko Manju Date: Mon, 25 Sep 2023 16:02:57 +0800 Subject: [PATCH] feat(service/postgresql): support connection pool (#3176) Signed-off-by: Manjusaka --- Cargo.lock | 13 ++++ core/Cargo.toml | 4 +- core/src/services/postgresql/backend.rs | 90 +++++++------------------ 3 files changed, 40 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30fa76d24f87..30ccc8fc4504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,6 +578,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "bb8-postgres" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ac82c42eb30889b5c4ee4763a24b8c566518171ebea648cd7e3bc532c60680" +dependencies = [ + "async-trait", + "bb8", + "tokio", + "tokio-postgres", +] + [[package]] name = "bigdecimal" version = "0.3.1" @@ -3950,6 +3962,7 @@ dependencies = [ "backon", "base64 0.21.2", "bb8", + "bb8-postgres", "bytes", "cacache", "chrono", diff --git a/core/Cargo.toml b/core/Cargo.toml index f37cacb5c42d..4d187d9c50d1 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -153,7 +153,7 @@ services-oss = [ "reqsign?/reqwest_request", ] services-persy = ["dep:persy"] -services-postgresql = ["dep:tokio-postgres"] +services-postgresql = ["dep:tokio-postgres", "dep:bb8", "dep:bb8-postgres"] services-redb = ["dep:redb"] services-redis = ["dep:redis"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] @@ -275,7 +275,7 @@ tokio-postgres = { version = "0.7.8", optional = true } tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } mysql_async = { version = "0.32.2", optional = true } - +bb8-postgres = { version = "0.8.1", optional = true } [dev-dependencies] criterion = { version = "0.4", features = ["async", "async_tokio"] } diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 23eb127dd135..cbc4c7dbbabc 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -22,10 +22,10 @@ use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; +use bb8::Pool; +use bb8_postgres::PostgresConnectionManager; use tokio::sync::OnceCell; -use tokio_postgres::Client; use tokio_postgres::Config; -use tokio_postgres::Statement; use crate::raw::adapters::kv; use crate::raw::*; @@ -185,15 +185,11 @@ impl Builder for PostgresqlBuilder { ); Ok(PostgresqlBackend::new(Adapter { - client: OnceCell::new(), + pool: OnceCell::new(), config, table, key_field, value_field, - - statement_get: OnceCell::new(), - statement_set: OnceCell::new(), - statement_del: OnceCell::new(), }) .with_root(&root)) } @@ -204,17 +200,12 @@ pub type PostgresqlBackend = kv::Backend; #[derive(Clone)] pub struct Adapter { - client: OnceCell>, + pool: OnceCell>>>, config: Config, table: String, key_field: String, value_field: String, - - /// Prepared statements for get/put/delete. - statement_get: OnceCell, - statement_set: OnceCell, - statement_del: OnceCell, } impl Debug for Adapter { @@ -225,21 +216,14 @@ impl Debug for Adapter { } impl Adapter { - async fn get_client(&self) -> Result<&Client> { - self.client + async fn get_client(&self) -> Result<&Pool>> { + self.pool .get_or_try_init(|| async { // TODO: add tls support. - let (client, conn) = self.config.connect(tokio_postgres::NoTls).await?; - - // The connection object performs the actual communication with the database, - // so spawn it off to run on its own. - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("postgresql connection error: {}", e); - } - }); - - Ok(Arc::new(client)) + let manager = + PostgresConnectionManager::new(self.config.clone(), tokio_postgres::NoTls); + let pool = Pool::builder().build(manager).await?; + Ok(Arc::new(pool)) }) .await .map(|v| v.as_ref()) @@ -265,18 +249,11 @@ impl kv::Adapter for Adapter { "SELECT {} FROM {} WHERE {} = $1 LIMIT 1", self.value_field, self.table, self.key_field ); - let statement = self - .statement_get - .get_or_try_init(|| async { - self.get_client() - .await? - .prepare(&query) - .await - .map_err(Error::from) - }) - .await?; - - let rows = self.get_client().await?.query(statement, &[&path]).await?; + let connection = self.get_client().await?.get().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err) + })?; + let statement = connection.prepare(&query).await?; + let rows = connection.query(&statement, &[&path]).await?; if rows.is_empty() { return Ok(None); } @@ -294,39 +271,22 @@ impl kv::Adapter for Adapter { ON CONFLICT ({key_field}) \ DO UPDATE SET {value_field} = EXCLUDED.{value_field}", ); - let statement = self - .statement_set - .get_or_try_init(|| async { - self.get_client() - .await? - .prepare(&query) - .await - .map_err(Error::from) - }) - .await?; - - let _ = self - .get_client() - .await? - .query(statement, &[&path, &value]) - .await?; + let connection = self.get_client().await?.get().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err) + })?; + let statement = connection.prepare(&query).await?; + let _ = connection.query(&statement, &[&path, &value]).await?; Ok(()) } async fn delete(&self, path: &str) -> Result<()> { let query = format!("DELETE FROM {} WHERE {} = $1", self.table, self.key_field); - let statement = self - .statement_del - .get_or_try_init(|| async { - self.get_client() - .await? - .prepare(&query) - .await - .map_err(Error::from) - }) - .await?; + let connection = self.get_client().await?.get().await.map_err(|err| { + Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err) + })?; + let statement = connection.prepare(&query).await?; - let _ = self.get_client().await?.query(statement, &[&path]).await?; + let _ = connection.query(&statement, &[&path]).await?; Ok(()) } }