Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delete ACK'd messages from storage to prevent some dupes #795

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use autoconnect_common::{
broadcast::Broadcast,
protocol::{BroadcastValue, ClientAck, ClientMessage, ServerMessage},
};
use autopush_common::{endpoint::make_endpoint, util::sec_since_epoch};
use autopush_common::{endpoint::make_endpoint, message_id::MessageId, util::sec_since_epoch};

use super::WebPushClient;
use crate::error::{SMError, SMErrorKind};
Expand All @@ -34,8 +34,8 @@ impl WebPushClient {
.await?
.map_or_else(Vec::new, |smsg| vec![smsg])),
ClientMessage::Ack { updates } => self.ack(&updates).await,
ClientMessage::Nack { code, .. } => {
self.nack(code);
ClientMessage::Nack { code, version } => {
self.nack(code, &version).await?;
Ok(vec![])
}
ClientMessage::Ping => Ok(vec![self.ping()?]),
Expand Down Expand Up @@ -185,6 +185,14 @@ impl WebPushClient {
// Check the list of unacked "direct" (unstored) notifications. We only want to
// ack messages we've not yet seen and we have the right version, otherwise we could
// have gotten an older, inaccurate ACK.
// Force remove all valid ACK messages. There were reports that some users received multiple
// instance of a previously "ACK"d message. This may happen if we send a batch of messages to
// the client, and there's one message that cannot be ACK'd for some reason. This would result
// in the `process_post_acks` function to not get called, so no `increment_storage` would
// happen. This meant that the next time the client reconnected, or a message event occurred
// the database may re-fetch data and resend it to the client, including previously ack'd
// messages. Deleting the message from the data store should prevent that.

let pos = self
.ack_state
.unacked_direct_notifs
Expand All @@ -198,6 +206,12 @@ impl WebPushClient {
);
self.ack_state.unacked_direct_notifs.remove(pos);
self.stats.direct_acked += 1;
let message_id = MessageId::decrypt(&self.app_state.fernet, &notif.version)
.map_err(|_e| SMErrorKind::InvalidMessage("Invalid MessageID".to_owned()))?;
self.app_state
.db
.remove_message(&self.uaid, &message_id.sort_key())
.await?;
continue;
};

Expand All @@ -217,22 +231,17 @@ impl WebPushClient {
let n = &self.ack_state.unacked_stored_notifs[pos];
debug!("✅ Ack notif: {:?}", &n);
// TODO: Record "ack'd" reliability_id, if present.
// Only force delete Topic messages, since they don't have a timestamp.
// Other messages persist in the database, to be, eventually, cleaned up by their
// TTL. We will need to update the `CurrentTimestamp` field for the channel
// record. Use that field to set the baseline timestamp for when to pull messages
// in the future.
// Topic/legacy messages have no sortkey_timestamp
if n.sortkey_timestamp.is_none() {
debug!(
"✅ WebPushClient:ack removing Stored, sort_key: {}",
&n.chidmessageid()
);
self.app_state
.db
.remove_message(&self.uaid, &n.chidmessageid())
.await?;
}
debug!(
"✅ WebPushClient:ack removing Stored, sort_key: {}",
&n.chidmessageid()
);
let message_id = MessageId::decrypt(&self.app_state.fernet, &notif.version)
.map_err(|_e| SMErrorKind::InvalidMessage("Invalid MessageID".to_owned()))?;
self.app_state
.db
.remove_message(&self.uaid, &message_id.sort_key())
.await?;
self.ack_state.unacked_stored_notifs.remove(pos);
self.stats.stored_acked += 1;
continue;
Expand All @@ -250,18 +259,30 @@ impl WebPushClient {

/// Negative Acknowledgement (a Client error occurred) of one or more Push
/// Notifications
fn nack(&mut self, code: Option<i32>) {
async fn nack(&mut self, code: Option<i32>, _message_id: &str) -> Result<(), SMError> {
trace!("WebPushClient:nack");
// only metric codes expected from the client (or 0)
let code = code
.and_then(|code| (301..=303).contains(&code).then_some(code))
.unwrap_or(0);
// TODO: There may be certain NAK codes indicating a message absolutley is rejected and
// should not be retried. In that case, drop the message. Waiting for Client to confirm
// the codes to absolutely reject.
/*
if code == 302 {
self.app_state
.db
.remove_message(&self.uaid, _message_id)
.await?
}
*/
self.app_state
.metrics
.incr_with_tags("ua.command.nack")
.with_tag("code", &code.to_string())
.send();
self.stats.nacks += 1;
Ok(())
}

/// Handle a WebPush Ping
Expand Down
140 changes: 1 addition & 139 deletions autoendpoint/src/extractors/message_id.rs
Original file line number Diff line number Diff line change
@@ -1,139 +1 @@
use crate::error::{ApiError, ApiErrorKind, ApiResult};
use crate::server::AppState;
use actix_web::dev::Payload;
use actix_web::{web::Data, FromRequest, HttpRequest};
use autopush_common::notification::{STANDARD_NOTIFICATION_PREFIX, TOPIC_NOTIFICATION_PREFIX};
use fernet::MultiFernet;
use futures::future;
use uuid::Uuid;

/// Holds information about a notification. The information is encoded and
/// encrypted into a "message ID" which is presented to the user. Later, the
/// user can send us the message ID to perform operations on the associated
/// notification (e.g. delete it).
#[derive(Debug)]
pub enum MessageId {
WithTopic {
uaid: Uuid,
channel_id: Uuid,
topic: String,
},
WithoutTopic {
uaid: Uuid,
channel_id: Uuid,
timestamp: u64,
},
}

impl FromRequest for MessageId {
type Error = ApiError;
type Future = future::Ready<Result<Self, Self::Error>>;

fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let message_id_param = req
.match_info()
.get("message_id")
.expect("{message_id} must be part of the path");
let app_state: Data<AppState> = Data::extract(req)
.into_inner()
.expect("No server state found");

future::ready(MessageId::decrypt(&app_state.fernet, message_id_param))
}
}

impl MessageId {
/// Encode and encrypt the message ID
pub fn encrypt(&self, fernet: &MultiFernet) -> String {
let id_str = match self {
MessageId::WithTopic {
uaid,
channel_id,
topic,
} => format!(
"{}:{}:{}:{}",
TOPIC_NOTIFICATION_PREFIX,
&uaid.as_simple(),
&channel_id.as_simple(),
topic
),
MessageId::WithoutTopic {
uaid,
channel_id,
timestamp,
} => format!(
"{}:{}:{}:{}",
STANDARD_NOTIFICATION_PREFIX,
uaid.as_simple(),
channel_id.as_simple(),
timestamp
),
};

fernet.encrypt(id_str.as_bytes())
}

/// Decrypt and decode the message ID
pub fn decrypt(fernet: &MultiFernet, message_id: &str) -> ApiResult<Self> {
let decrypted_bytes = fernet
.decrypt(message_id)
.map_err(|_| ApiErrorKind::InvalidMessageId)?;
let decrypted_str = String::from_utf8_lossy(&decrypted_bytes);
let segments: Vec<_> = decrypted_str.split(':').collect();

if segments.len() != 4 {
return Err(ApiErrorKind::InvalidMessageId.into());
}

let (version, uaid, chid, topic_or_timestamp) =
(segments[0], segments[1], segments[2], segments[3]);

match version {
"01" => Ok(MessageId::WithTopic {
uaid: Uuid::parse_str(uaid).map_err(|_| ApiErrorKind::InvalidMessageId)?,
channel_id: Uuid::parse_str(chid).map_err(|_| ApiErrorKind::InvalidMessageId)?,
topic: topic_or_timestamp.to_string(),
}),
"02" => Ok(MessageId::WithoutTopic {
uaid: Uuid::parse_str(uaid).map_err(|_| ApiErrorKind::InvalidMessageId)?,
channel_id: Uuid::parse_str(chid).map_err(|_| ApiErrorKind::InvalidMessageId)?,
timestamp: topic_or_timestamp
.parse()
.map_err(|_| ApiErrorKind::InvalidMessageId)?,
}),
_ => Err(ApiErrorKind::InvalidMessageId.into()),
}
}

/// Get the UAID of the associated notification
pub fn uaid(&self) -> Uuid {
match self {
MessageId::WithTopic { uaid, .. } => *uaid,
MessageId::WithoutTopic { uaid, .. } => *uaid,
}
}

/// Get the sort-key for the associated notification
pub fn sort_key(&self) -> String {
match self {
MessageId::WithTopic {
channel_id, topic, ..
} => format!(
"{}:{}:{}",
TOPIC_NOTIFICATION_PREFIX,
channel_id.as_hyphenated(),
topic
),
MessageId::WithoutTopic {
channel_id,
timestamp,
..
} => format!(
"{}:{}:{}",
STANDARD_NOTIFICATION_PREFIX,
timestamp,
channel_id.as_hyphenated()
),
}
}
}
pub use autopush_common::message_id::MessageId;
6 changes: 4 additions & 2 deletions autoendpoint/src/routes/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::extractors::notification::Notification;
use crate::extractors::routers::{RouterType, Routers};
use crate::server::AppState;
use actix_web::web::Data;
use actix_web::HttpResponse;
use actix_web::{HttpRequest, HttpResponse};

/// Handle the `POST /wpush/{api_version}/{token}` and `POST /wpush/{token}` routes
pub async fn webpush_route(
Expand All @@ -30,9 +30,11 @@ pub async fn webpush_route(

/// Handle the `DELETE /m/{message_id}` route
pub async fn delete_notification_route(
message_id: MessageId,
req: HttpRequest,
app_state: Data<AppState>,
) -> ApiResult<HttpResponse> {
let message_id = MessageId::from_request(&app_state.fernet, req)
.map_err(|_| ApiErrorKind::InvalidMessageId)?;
let sort_key = message_id.sort_key();
debug!("Deleting notification with sort-key {}", sort_key);
trace!("message_id = {:?}", message_id);
Expand Down
2 changes: 2 additions & 0 deletions autopush-common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum ApcErrorKind {
BroadcastError(String),
#[error("Payload Error: {0}")]
PayloadError(String),
#[error("Invalid message ID")]
InvalidMessageId,
#[error("General Error: {0}")]
GeneralError(String),
}
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod db;
pub mod endpoint;
pub mod errors;
pub mod logging;
pub mod message_id;
pub mod metrics;
pub mod middleware;
pub mod notification;
Expand Down
Loading