From fe9864b6e0476289d8970609707d0ee1f1b5065e Mon Sep 17 00:00:00 2001 From: jrconlin Date: Fri, 1 Nov 2024 10:25:52 -0700 Subject: [PATCH] feat: Delete ACK'd messages from storage to prevent some dupes Closes: SYNC-4466 --- .../src/identified/on_client_msg.rs | 59 +++++--- autoendpoint/src/extractors/message_id.rs | 140 +----------------- autoendpoint/src/routes/webpush.rs | 6 +- autopush-common/src/errors.rs | 2 + autopush-common/src/lib.rs | 1 + autopush-common/src/message_id.rs | 129 ++++++++++++++++ .../integration/test_integration_all_rust.py | 15 +- 7 files changed, 186 insertions(+), 166 deletions(-) create mode 100644 autopush-common/src/message_id.rs diff --git a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs index ea277cdf1..53672ee9a 100644 --- a/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs +++ b/autoconnect/autoconnect-ws/autoconnect-ws-sm/src/identified/on_client_msg.rs @@ -7,7 +7,7 @@ use autoconnect_common::{ broadcast::Broadcast, protocol::{BroadcastValue, ClientAck, ClientMessage, ServerMessage}, }; -use autopush_common::{endpoint::make_endpoint, util::sec_since_epoch}; +use autopush_common::{endpoint::make_endpoint, message_id::MessageId, util::sec_since_epoch}; use super::WebPushClient; use crate::error::{SMError, SMErrorKind}; @@ -34,8 +34,8 @@ impl WebPushClient { .await? .map_or_else(Vec::new, |smsg| vec![smsg])), ClientMessage::Ack { updates } => self.ack(&updates).await, - ClientMessage::Nack { code, .. } => { - self.nack(code); + ClientMessage::Nack { code, version } => { + self.nack(code, &version).await?; Ok(vec![]) } ClientMessage::Ping => Ok(vec![self.ping()?]), @@ -185,6 +185,14 @@ impl WebPushClient { // Check the list of unacked "direct" (unstored) notifications. We only want to // ack messages we've not yet seen and we have the right version, otherwise we could // have gotten an older, inaccurate ACK. + // Force remove all valid ACK messages. There were reports that some users received multiple + // instance of a previously "ACK"d message. This may happen if we send a batch of messages to + // the client, and there's one message that cannot be ACK'd for some reason. This would result + // in the `process_post_acks` function to not get called, so no `increment_storage` would + // happen. This meant that the next time the client reconnected, or a message event occurred + // the database may re-fetch data and resend it to the client, including previously ack'd + // messages. Deleting the message from the data store should prevent that. + let pos = self .ack_state .unacked_direct_notifs @@ -198,6 +206,12 @@ impl WebPushClient { ); self.ack_state.unacked_direct_notifs.remove(pos); self.stats.direct_acked += 1; + let message_id = MessageId::decrypt(&self.app_state.fernet, ¬if.version) + .map_err(|_e| SMErrorKind::InvalidMessage("Invalid MessageID".to_owned()))?; + self.app_state + .db + .remove_message(&self.uaid, &message_id.sort_key()) + .await?; continue; }; @@ -217,22 +231,17 @@ impl WebPushClient { let n = &self.ack_state.unacked_stored_notifs[pos]; debug!("✅ Ack notif: {:?}", &n); // TODO: Record "ack'd" reliability_id, if present. - // Only force delete Topic messages, since they don't have a timestamp. - // Other messages persist in the database, to be, eventually, cleaned up by their - // TTL. We will need to update the `CurrentTimestamp` field for the channel - // record. Use that field to set the baseline timestamp for when to pull messages - // in the future. // Topic/legacy messages have no sortkey_timestamp - if n.sortkey_timestamp.is_none() { - debug!( - "✅ WebPushClient:ack removing Stored, sort_key: {}", - &n.chidmessageid() - ); - self.app_state - .db - .remove_message(&self.uaid, &n.chidmessageid()) - .await?; - } + debug!( + "✅ WebPushClient:ack removing Stored, sort_key: {}", + &n.chidmessageid() + ); + let message_id = MessageId::decrypt(&self.app_state.fernet, ¬if.version) + .map_err(|_e| SMErrorKind::InvalidMessage("Invalid MessageID".to_owned()))?; + self.app_state + .db + .remove_message(&self.uaid, &message_id.sort_key()) + .await?; self.ack_state.unacked_stored_notifs.remove(pos); self.stats.stored_acked += 1; continue; @@ -250,18 +259,30 @@ impl WebPushClient { /// Negative Acknowledgement (a Client error occurred) of one or more Push /// Notifications - fn nack(&mut self, code: Option) { + async fn nack(&mut self, code: Option, _message_id: &str) -> Result<(), SMError> { trace!("WebPushClient:nack"); // only metric codes expected from the client (or 0) let code = code .and_then(|code| (301..=303).contains(&code).then_some(code)) .unwrap_or(0); + // TODO: There may be certain NAK codes indicating a message absolutley is rejected and + // should not be retried. In that case, drop the message. Waiting for Client to confirm + // the codes to absolutely reject. + /* + if code == 302 { + self.app_state + .db + .remove_message(&self.uaid, _message_id) + .await? + } + */ self.app_state .metrics .incr_with_tags("ua.command.nack") .with_tag("code", &code.to_string()) .send(); self.stats.nacks += 1; + Ok(()) } /// Handle a WebPush Ping diff --git a/autoendpoint/src/extractors/message_id.rs b/autoendpoint/src/extractors/message_id.rs index c6fb35887..1b2f27ff8 100644 --- a/autoendpoint/src/extractors/message_id.rs +++ b/autoendpoint/src/extractors/message_id.rs @@ -1,139 +1 @@ -use crate::error::{ApiError, ApiErrorKind, ApiResult}; -use crate::server::AppState; -use actix_web::dev::Payload; -use actix_web::{web::Data, FromRequest, HttpRequest}; -use autopush_common::notification::{STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX}; -use fernet::MultiFernet; -use futures::future; -use uuid::Uuid; - -/// Holds information about a notification. The information is encoded and -/// encrypted into a "message ID" which is presented to the user. Later, the -/// user can send us the message ID to perform operations on the associated -/// notification (e.g. delete it). -#[derive(Debug)] -pub enum MessageId { - WithTopic { - uaid: Uuid, - channel_id: Uuid, - topic: String, - }, - WithoutTopic { - uaid: Uuid, - channel_id: Uuid, - timestamp: u64, - }, -} - -impl FromRequest for MessageId { - type Error = ApiError; - type Future = future::Ready>; - - fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { - let message_id_param = req - .match_info() - .get("message_id") - .expect("{message_id} must be part of the path"); - let app_state: Data = Data::extract(req) - .into_inner() - .expect("No server state found"); - - future::ready(MessageId::decrypt(&app_state.fernet, message_id_param)) - } -} - -impl MessageId { - /// Encode and encrypt the message ID - pub fn encrypt(&self, fernet: &MultiFernet) -> String { - let id_str = match self { - MessageId::WithTopic { - uaid, - channel_id, - topic, - } => format!( - "{}:{}:{}:{}", - TOPIC_NOTIFICATION_PREFIX, - &uaid.as_simple(), - &channel_id.as_simple(), - topic - ), - MessageId::WithoutTopic { - uaid, - channel_id, - timestamp, - } => format!( - "{}:{}:{}:{}", - STANDARD_NOTIFICATION_PREFIX, - uaid.as_simple(), - channel_id.as_simple(), - timestamp - ), - }; - - fernet.encrypt(id_str.as_bytes()) - } - - /// Decrypt and decode the message ID - pub fn decrypt(fernet: &MultiFernet, message_id: &str) -> ApiResult { - let decrypted_bytes = fernet - .decrypt(message_id) - .map_err(|_| ApiErrorKind::InvalidMessageId)?; - let decrypted_str = String::from_utf8_lossy(&decrypted_bytes); - let segments: Vec<_> = decrypted_str.split(':').collect(); - - if segments.len() != 4 { - return Err(ApiErrorKind::InvalidMessageId.into()); - } - - let (version, uaid, chid, topic_or_timestamp) = - (segments[0], segments[1], segments[2], segments[3]); - - match version { - "01" => Ok(MessageId::WithTopic { - uaid: Uuid::parse_str(uaid).map_err(|_| ApiErrorKind::InvalidMessageId)?, - channel_id: Uuid::parse_str(chid).map_err(|_| ApiErrorKind::InvalidMessageId)?, - topic: topic_or_timestamp.to_string(), - }), - "02" => Ok(MessageId::WithoutTopic { - uaid: Uuid::parse_str(uaid).map_err(|_| ApiErrorKind::InvalidMessageId)?, - channel_id: Uuid::parse_str(chid).map_err(|_| ApiErrorKind::InvalidMessageId)?, - timestamp: topic_or_timestamp - .parse() - .map_err(|_| ApiErrorKind::InvalidMessageId)?, - }), - _ => Err(ApiErrorKind::InvalidMessageId.into()), - } - } - - /// Get the UAID of the associated notification - pub fn uaid(&self) -> Uuid { - match self { - MessageId::WithTopic { uaid, .. } => *uaid, - MessageId::WithoutTopic { uaid, .. } => *uaid, - } - } - - /// Get the sort-key for the associated notification - pub fn sort_key(&self) -> String { - match self { - MessageId::WithTopic { - channel_id, topic, .. - } => format!( - "{}:{}:{}", - TOPIC_NOTIFICATION_PREFIX, - channel_id.as_hyphenated(), - topic - ), - MessageId::WithoutTopic { - channel_id, - timestamp, - .. - } => format!( - "{}:{}:{}", - STANDARD_NOTIFICATION_PREFIX, - timestamp, - channel_id.as_hyphenated() - ), - } - } -} +pub use autopush_common::message_id::MessageId; diff --git a/autoendpoint/src/routes/webpush.rs b/autoendpoint/src/routes/webpush.rs index 44f0a2d8b..a05fb7642 100644 --- a/autoendpoint/src/routes/webpush.rs +++ b/autoendpoint/src/routes/webpush.rs @@ -6,7 +6,7 @@ use crate::extractors::notification::Notification; use crate::extractors::routers::{RouterType, Routers}; use crate::server::AppState; use actix_web::web::Data; -use actix_web::HttpResponse; +use actix_web::{HttpRequest, HttpResponse}; /// Handle the `POST /wpush/{api_version}/{token}` and `POST /wpush/{token}` routes pub async fn webpush_route( @@ -30,9 +30,11 @@ pub async fn webpush_route( /// Handle the `DELETE /m/{message_id}` route pub async fn delete_notification_route( - message_id: MessageId, + req: HttpRequest, app_state: Data, ) -> ApiResult { + let message_id = MessageId::from_request(&app_state.fernet, req) + .map_err(|_| ApiErrorKind::InvalidMessageId)?; let sort_key = message_id.sort_key(); debug!("Deleting notification with sort-key {}", sort_key); trace!("message_id = {:?}", message_id); diff --git a/autopush-common/src/errors.rs b/autopush-common/src/errors.rs index 4402dcd6d..d73a10bfc 100644 --- a/autopush-common/src/errors.rs +++ b/autopush-common/src/errors.rs @@ -103,6 +103,8 @@ pub enum ApcErrorKind { BroadcastError(String), #[error("Payload Error: {0}")] PayloadError(String), + #[error("Invalid message ID")] + InvalidMessageId, #[error("General Error: {0}")] GeneralError(String), } diff --git a/autopush-common/src/lib.rs b/autopush-common/src/lib.rs index 8656e814c..a4febb43c 100644 --- a/autopush-common/src/lib.rs +++ b/autopush-common/src/lib.rs @@ -10,6 +10,7 @@ pub mod db; pub mod endpoint; pub mod errors; pub mod logging; +pub mod message_id; pub mod metrics; pub mod middleware; pub mod notification; diff --git a/autopush-common/src/message_id.rs b/autopush-common/src/message_id.rs new file mode 100644 index 000000000..3127db2b6 --- /dev/null +++ b/autopush-common/src/message_id.rs @@ -0,0 +1,129 @@ +use actix_web::HttpRequest; +use fernet::MultiFernet; +use uuid::Uuid; + +use crate::errors::{ApcErrorKind, Result}; +use crate::notification::{STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX}; + +/// Holds information about a notification. The information is encoded and +/// encrypted into a "message ID" which is presented to the user. Later, the +/// user can send us the message ID to perform operations on the associated +/// notification (e.g. delete it). +#[derive(Debug)] +pub enum MessageId { + WithTopic { + uaid: Uuid, + channel_id: Uuid, + topic: String, + }, + WithoutTopic { + uaid: Uuid, + channel_id: Uuid, + timestamp: u64, + }, +} + +impl MessageId { + /// Encode and encrypt the message ID + pub fn encrypt(&self, fernet: &MultiFernet) -> String { + let id_str = match self { + MessageId::WithTopic { + uaid, + channel_id, + topic, + } => format!( + "{}:{}:{}:{}", + TOPIC_NOTIFICATION_PREFIX, + &uaid.as_simple(), + &channel_id.as_simple(), + topic + ), + MessageId::WithoutTopic { + uaid, + channel_id, + timestamp, + } => format!( + "{}:{}:{}:{}", + STANDARD_NOTIFICATION_PREFIX, + uaid.as_simple(), + channel_id.as_simple(), + timestamp + ), + }; + + fernet.encrypt(id_str.as_bytes()) + } + + /// Decrypt and decode the message ID + pub fn decrypt(fernet: &MultiFernet, message_id: &str) -> Result { + let decrypted_bytes = fernet + .decrypt(message_id) + .map_err(|_| ApcErrorKind::InvalidMessageId)?; + let decrypted_str = String::from_utf8_lossy(&decrypted_bytes); + let segments: Vec<_> = decrypted_str.split(':').collect(); + + if segments.len() != 4 { + return Err(ApcErrorKind::InvalidMessageId.into()); + } + + let (version, uaid, chid, topic_or_timestamp) = + (segments[0], segments[1], segments[2], segments[3]); + + match version { + "01" => Ok(MessageId::WithTopic { + uaid: Uuid::parse_str(uaid).map_err(|_| ApcErrorKind::InvalidMessageId)?, + channel_id: Uuid::parse_str(chid).map_err(|_| ApcErrorKind::InvalidMessageId)?, + topic: topic_or_timestamp.to_string(), + }), + "02" => Ok(MessageId::WithoutTopic { + uaid: Uuid::parse_str(uaid).map_err(|_| ApcErrorKind::InvalidMessageId)?, + channel_id: Uuid::parse_str(chid).map_err(|_| ApcErrorKind::InvalidMessageId)?, + timestamp: topic_or_timestamp + .parse() + .map_err(|_| ApcErrorKind::InvalidMessageId)?, + }), + _ => Err(ApcErrorKind::InvalidMessageId.into()), + } + } + + pub fn from_request(fernet: &MultiFernet, req: HttpRequest) -> Result { + let message_id = req + .match_info() + .get("message_id") + .expect("{message_id} must be part of the path"); + + Self::decrypt(fernet, message_id) + } + + /// Get the UAID of the associated notification + pub fn uaid(&self) -> Uuid { + match self { + MessageId::WithTopic { uaid, .. } => *uaid, + MessageId::WithoutTopic { uaid, .. } => *uaid, + } + } + + /// Get the sort-key for the associated notification + pub fn sort_key(&self) -> String { + match self { + MessageId::WithTopic { + channel_id, topic, .. + } => format!( + "{}:{}:{}", + TOPIC_NOTIFICATION_PREFIX, + channel_id.as_hyphenated(), + topic + ), + MessageId::WithoutTopic { + channel_id, + timestamp, + .. + } => format!( + "{}:{}:{}", + STANDARD_NOTIFICATION_PREFIX, + timestamp, + channel_id.as_hyphenated() + ), + } + } +} diff --git a/tests/integration/test_integration_all_rust.py b/tests/integration/test_integration_all_rust.py index 4501f59b3..28434d9b9 100644 --- a/tests/integration/test_integration_all_rust.py +++ b/tests/integration/test_integration_all_rust.py @@ -1009,13 +1009,16 @@ async def test_topic_expired(registered_test_client: AsyncPushTestClient) -> Non @pytest.mark.parametrize("fixture_max_conn_logs", [4], indirect=True) -async def test_multiple_delivery_with_single_ack( +async def test_no_multiple_delivery_with_single_ack( registered_test_client: AsyncPushTestClient, ) -> None: """Test that the server provides the right unacknowledged messages if the client only acknowledges one of the received messages. Note: the `data` fields are constructed so that they return `FirstMessage` and `OtherMessage`, which may be useful for debugging. + + *Note*: Prior versions would return previously ACK'd messages along with + messages. This behavior has changed. """ uuid_data_1: bytes = ( b"\x16*\xec\xb4\xc7\xac\xb1\xa8\x1e" + str(uuid.uuid4()).encode() @@ -1035,6 +1038,7 @@ async def test_multiple_delivery_with_single_ack( result2 = await registered_test_client.get_notification(timeout=0.5) assert result2 != {} assert result2["data"] == base64url_encode(uuid_data_2) + # ACK the first message, this now should remove it from the list of pending messages. await registered_test_client.ack(result["channelID"], result["version"]) await registered_test_client.disconnect() @@ -1042,13 +1046,12 @@ async def test_multiple_delivery_with_single_ack( await registered_test_client.hello() result = await registered_test_client.get_notification(timeout=0.5) assert result != {} - assert result["data"] == base64url_encode(uuid_data_1) + # ensure that the message doesn't match the previously ACK'd message. + assert result["data"] != base64url_encode(uuid_data_1) + assert result["data"] == base64url_encode(uuid_data_2) assert result["messageType"] == ClientMessageType.NOTIFICATION.value - result2 = await registered_test_client.get_notification() - assert result2 != {} - assert result2["data"] == base64url_encode(uuid_data_2) + assert await registered_test_client.get_notification() is None await registered_test_client.ack(result["channelID"], result["version"]) - await registered_test_client.ack(result2["channelID"], result2["version"]) # Verify no messages are delivered await registered_test_client.disconnect()