From a4d9a9c84639603e6881d89ec0ae4bb9936dbae3 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Mon, 20 May 2024 19:24:53 -0400 Subject: [PATCH 1/6] fix: improve create client performance --- src/stores/client.rs | 124 ++++++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 48 deletions(-) diff --git a/src/stores/client.rs b/src/stores/client.rs index 8aefa7a..a31bead 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -47,63 +47,91 @@ impl ClientStore for sqlx::PgPool { client.token ); - let mut transaction = self.begin().await?; - - // Statement for locking based on the client id to prevent an issue #230 - // and locking based on the token to prevent an issue #292 - let start = Instant::now(); - sqlx::query( - "SELECT - pg_advisory_xact_lock(abs(hashtext($1::text))), - pg_advisory_xact_lock(abs(hashtext($2::text)))", - ) - .bind(id) - .bind(client.token.clone()) - .execute(&mut transaction) - .await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_pg_advisory_xact_lock", start); + #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] + pub struct ClientSelect { + pub id: String, + pub device_token: String, } + let query = " + SELECT * + FROM public.clients + WHERE id = $1 + OR device_token = $2 + FOR UPDATE + "; let start = Instant::now(); - sqlx::query("DELETE FROM public.clients WHERE id = $1 OR device_token = $2") + let res = sqlx::query_as::(query) .bind(id) .bind(client.token.clone()) - .execute(&mut transaction) - .await?; + .fetch_one(self) + .await; if let Some(metrics) = metrics { metrics.postgres_query("create_client_delete", start); } - let start = Instant::now(); - let mut insert_query = sqlx::QueryBuilder::new( - "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", - ); - insert_query.push_values( - vec![( - id, - tenant_id, - client.push_type, - client.token, - client.always_raw, - )], - |mut b, client| { - b.push_bind(client.0) - .push_bind(client.1) - .push_bind(client.2) - .push_bind(client.3) - .push_bind(client.4); - }, - ); - insert_query.build().execute(&mut transaction).await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_insert", start); - } - - let start = Instant::now(); - transaction.commit().await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_commit", start); + let existing_client = match res { + Err(sqlx::Error::RowNotFound) => { + let start = Instant::now(); + let mut insert_query = sqlx::QueryBuilder::new( + "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", + ); + insert_query.push_values( + vec![( + id, + tenant_id, + client.push_type, + client.token, + client.always_raw, + )], + |mut b, client| { + b.push_bind(client.0) + .push_bind(client.1) + .push_bind(client.2) + .push_bind(client.3) + .push_bind(client.4); + }, + ); + insert_query.build().execute(self).await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_insert", start); + } + return Ok(()); + } + Err(e) => return Err(e.into()), + Ok(row) => row, + }; + + if existing_client.id == id && existing_client.device_token != client.token { + let query = " + UPDATE public.clients + SET device_token = $2 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.token) + .execute(self) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_device_token", start); + } + } else if existing_client.device_token == client.token && existing_client.id != id { + let query = " + UPDATE public.clients + SET id = $2 + WHERE device_token = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(client.token) + .bind(id) + .execute(self) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } } Ok(()) From f8c7dc015a8a7b4fe7f4de763d03375a569e4f27 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Mon, 20 May 2024 20:15:45 -0400 Subject: [PATCH 2/6] fix: failing tests --- src/stores/client.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/stores/client.rs b/src/stores/client.rs index a31bead..19de3e6 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -105,13 +105,19 @@ impl ClientStore for sqlx::PgPool { if existing_client.id == id && existing_client.device_token != client.token { let query = " UPDATE public.clients - SET device_token = $2 + SET device_token = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 WHERE id = $1 "; let start = Instant::now(); sqlx::query(query) .bind(id) .bind(client.token) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) .execute(self) .await?; if let Some(metrics) = metrics { @@ -120,13 +126,19 @@ impl ClientStore for sqlx::PgPool { } else if existing_client.device_token == client.token && existing_client.id != id { let query = " UPDATE public.clients - SET id = $2 + SET id = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 WHERE device_token = $1 "; let start = Instant::now(); sqlx::query(query) .bind(client.token) .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) .execute(self) .await?; if let Some(metrics) = metrics { From 5ffce16922ad974ea0a75b22fc5df2179c92e23e Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 21 May 2024 08:45:23 -0400 Subject: [PATCH 3/6] fix: tests --- src/stores/client.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/stores/client.rs b/src/stores/client.rs index 19de3e6..6b877d7 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -51,6 +51,7 @@ impl ClientStore for sqlx::PgPool { pub struct ClientSelect { pub id: String, pub device_token: String, + pub tenant_id: String, } let query = " @@ -124,6 +125,14 @@ impl ClientStore for sqlx::PgPool { metrics.postgres_query("create_client_update_device_token", start); } } else if existing_client.device_token == client.token && existing_client.id != id { + let mut notification_query_builder = + sqlx::QueryBuilder::new("DELETE FROM public.notifications WHERE client_id = "); + notification_query_builder.push_bind(existing_client.id); + notification_query_builder.push(" and tenant_id = "); + notification_query_builder.push_bind(existing_client.tenant_id); + let notification_query = notification_query_builder.build(); + self.execute(notification_query).await?; + let query = " UPDATE public.clients SET id = $2, @@ -144,6 +153,25 @@ impl ClientStore for sqlx::PgPool { if let Some(metrics) = metrics { metrics.postgres_query("create_client_update_id", start); } + } else { + let query = " + UPDATE public.clients + SET push_type = $2, + always_raw = $3, + tenant_id = $4 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(self) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } } Ok(()) From 2cfcca4daaec71662b946da70f79375c814078a9 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 21 May 2024 08:53:29 -0400 Subject: [PATCH 4/6] fix: metrics --- src/stores/client.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/stores/client.rs b/src/stores/client.rs index 6b877d7..d136938 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -131,7 +131,11 @@ impl ClientStore for sqlx::PgPool { notification_query_builder.push(" and tenant_id = "); notification_query_builder.push_bind(existing_client.tenant_id); let notification_query = notification_query_builder.build(); + let start = Instant::now(); self.execute(notification_query).await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_delete_notification", start); + } let query = " UPDATE public.clients From f7f260fc5143a399943924b8e7a14624d776550f Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 21 May 2024 09:29:06 -0400 Subject: [PATCH 5/6] fix: missing transaction --- src/stores/client.rs | 208 +++++++++++++++++++++++-------------------- 1 file changed, 112 insertions(+), 96 deletions(-) diff --git a/src/stores/client.rs b/src/stores/client.rs index d136938..d5fdef5 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -54,6 +54,12 @@ impl ClientStore for sqlx::PgPool { pub tenant_id: String, } + let start = Instant::now(); + let mut transaction = self.begin().await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_begin", start); + } + let query = " SELECT * FROM public.clients @@ -65,119 +71,129 @@ impl ClientStore for sqlx::PgPool { let res = sqlx::query_as::(query) .bind(id) .bind(client.token.clone()) - .fetch_one(self) + .fetch_one(&mut transaction) .await; if let Some(metrics) = metrics { metrics.postgres_query("create_client_delete", start); } let existing_client = match res { - Err(sqlx::Error::RowNotFound) => { - let start = Instant::now(); - let mut insert_query = sqlx::QueryBuilder::new( - "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", - ); - insert_query.push_values( - vec![( - id, - tenant_id, - client.push_type, - client.token, - client.always_raw, - )], - |mut b, client| { - b.push_bind(client.0) - .push_bind(client.1) - .push_bind(client.2) - .push_bind(client.3) - .push_bind(client.4); - }, - ); - insert_query.build().execute(self).await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_insert", start); - } - return Ok(()); - } + Err(sqlx::Error::RowNotFound) => None, Err(e) => return Err(e.into()), - Ok(row) => row, + Ok(row) => Some(row), }; - if existing_client.id == id && existing_client.device_token != client.token { - let query = " - UPDATE public.clients - SET device_token = $2, - push_type = $3, - always_raw = $4, - tenant_id = $5 - WHERE id = $1 - "; - let start = Instant::now(); - sqlx::query(query) - .bind(id) - .bind(client.token) - .bind(client.push_type) - .bind(client.always_raw) - .bind(tenant_id) - .execute(self) - .await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_update_device_token", start); - } - } else if existing_client.device_token == client.token && existing_client.id != id { - let mut notification_query_builder = - sqlx::QueryBuilder::new("DELETE FROM public.notifications WHERE client_id = "); - notification_query_builder.push_bind(existing_client.id); - notification_query_builder.push(" and tenant_id = "); - notification_query_builder.push_bind(existing_client.tenant_id); - let notification_query = notification_query_builder.build(); - let start = Instant::now(); - self.execute(notification_query).await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_delete_notification", start); - } + if let Some(existing_client) = existing_client { + if existing_client.id == id && existing_client.device_token != client.token { + let query = " + UPDATE public.clients + SET device_token = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.token) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_device_token", start); + } + } else if existing_client.device_token == client.token && existing_client.id != id { + let query = " + DELETE FROM public.notifications + WHERE client_id = $1 + AND tenant_id = $2 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(existing_client.id) + .bind(existing_client.tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_delete_notifications", start); + } - let query = " - UPDATE public.clients - SET id = $2, - push_type = $3, - always_raw = $4, - tenant_id = $5 - WHERE device_token = $1 - "; - let start = Instant::now(); - sqlx::query(query) - .bind(client.token) - .bind(id) - .bind(client.push_type) - .bind(client.always_raw) - .bind(tenant_id) - .execute(self) - .await?; - if let Some(metrics) = metrics { - metrics.postgres_query("create_client_update_id", start); + let query = " + UPDATE public.clients + SET id = $2, + push_type = $3, + always_raw = $4, + tenant_id = $5 + WHERE device_token = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(client.token) + .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } + } else { + let query = " + UPDATE public.clients + SET push_type = $2, + always_raw = $3, + tenant_id = $4 + WHERE id = $1 + "; + let start = Instant::now(); + sqlx::query(query) + .bind(id) + .bind(client.push_type) + .bind(client.always_raw) + .bind(tenant_id) + .execute(&mut transaction) + .await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_update_id", start); + } } } else { - let query = " - UPDATE public.clients - SET push_type = $2, - always_raw = $3, - tenant_id = $4 - WHERE id = $1 - "; let start = Instant::now(); - sqlx::query(query) - .bind(id) - .bind(client.push_type) - .bind(client.always_raw) - .bind(tenant_id) - .execute(self) - .await?; + let mut insert_query = sqlx::QueryBuilder::new( + "INSERT INTO public.clients (id, tenant_id, push_type, device_token, always_raw)", + ); + insert_query.push_values( + vec![( + id, + tenant_id, + client.push_type, + client.token, + client.always_raw, + )], + |mut b, client| { + b.push_bind(client.0) + .push_bind(client.1) + .push_bind(client.2) + .push_bind(client.3) + .push_bind(client.4); + }, + ); + insert_query.build().execute(&mut transaction).await?; if let Some(metrics) = metrics { - metrics.postgres_query("create_client_update_id", start); + metrics.postgres_query("create_client_insert", start); } } + let start = Instant::now(); + transaction.commit().await?; + if let Some(metrics) = metrics { + metrics.postgres_query("create_client_commit", start); + } + Ok(()) } From 776802abb7dc8bc473b2f2c5b2a85d5ff23d4b77 Mon Sep 17 00:00:00 2001 From: Chris Smith Date: Tue, 21 May 2024 09:32:59 -0400 Subject: [PATCH 6/6] chore: use functional mapping --- src/stores/client.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/stores/client.rs b/src/stores/client.rs index d5fdef5..6ba5e47 100644 --- a/src/stores/client.rs +++ b/src/stores/client.rs @@ -68,21 +68,20 @@ impl ClientStore for sqlx::PgPool { FOR UPDATE "; let start = Instant::now(); - let res = sqlx::query_as::(query) + let existing_client = sqlx::query_as::(query) .bind(id) .bind(client.token.clone()) .fetch_one(&mut transaction) - .await; + .await + .map(Some) + .or_else(|e| match e { + sqlx::Error::RowNotFound => Ok(None), + e => Err(e), + })?; if let Some(metrics) = metrics { metrics.postgres_query("create_client_delete", start); } - let existing_client = match res { - Err(sqlx::Error::RowNotFound) => None, - Err(e) => return Err(e.into()), - Ok(row) => Some(row), - }; - if let Some(existing_client) = existing_client { if existing_client.id == id && existing_client.device_token != client.token { let query = "