Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics for ACK codes. #793

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autoconnect/autoconnect-common/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct ClientAck {
pub channel_id: Uuid,
// The corresponding version number for the message.
pub version: String,
pub code: Option<u32>,
}

#[derive(Debug, Serialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ mod tests {
Notification {
channel_id: *channel_id,
ttl,
timestamp: sec_since_epoch(),
recv_timestamp: sec_since_epoch(),
sortkey_timestamp: Some(ms_since_epoch()),
..Default::default()
}
Expand All @@ -421,7 +421,7 @@ mod tests {
async fn expired_increments_storage() {
let mut db = MockDbClient::new();
let mut seq = mockall::Sequence::new();
let timestamp = sec_since_epoch();
let timestamp = ms_since_epoch();
// No topic messages
db.expect_fetch_topic_messages()
.times(1)
Expand Down Expand Up @@ -465,7 +465,6 @@ mod tests {
.in_sequence(&mut seq)
.withf(move |_, ts| ts == &timestamp)
.return_once(|_, _| Ok(()));

// No check_storage called here (via default ClientFlags)
let (mut client, _) = wpclient(
DUMMY_UAID,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use cadence::CountedExt;
use cadence::{Counted, CountedExt};
use uuid::Uuid;

use autoconnect_common::{
Expand Down Expand Up @@ -179,22 +179,30 @@ impl WebPushClient {
/// Acknowledge receipt of one or more Push Notifications
async fn ack(&mut self, updates: &[ClientAck]) -> Result<Vec<ServerMessage>, SMError> {
trace!("✅ WebPushClient:ack");
let _ = self.app_state.metrics.incr("ua.command.ack");
let mut codes: HashMap<u32, u32> = HashMap::new();

for notif in updates {
// Check the list of unacked "direct" (unstored) notifications. We only want to
// ack messages we've not yet seen and we have the right version, otherwise we could
// have gotten an older, inaccurate ACK.
// Since the `version` is the `message_id` which is an encrypted string containing
// the uaid, channel_id, and message timestamp in ms, it should be unique enough
// that we do not need any other values.

let pos = self
.ack_state
.unacked_direct_notifs
.iter()
.position(|n| n.channel_id == notif.channel_id && n.version == notif.version);
.position(|n| n.version == notif.version);
if let Some(code) = &notif.code {
codes.insert(*code, codes.get(code).unwrap_or(&0) + 1);
}
// We found one, so delete it from our list of unacked messages
if let Some(pos) = pos {
debug!("✅ Ack (Direct)";
"channel_id" => notif.channel_id.as_hyphenated().to_string(),
"version" => &notif.version
"version" => &notif.version,
"code" => &notif.code.unwrap_or_default(),
);
self.ack_state.unacked_direct_notifs.remove(pos);
self.stats.direct_acked += 1;
Expand All @@ -206,7 +214,7 @@ impl WebPushClient {
.ack_state
.unacked_stored_notifs
.iter()
.position(|n| n.channel_id == notif.channel_id && n.version == notif.version);
.position(|n| n.version == notif.version);
if let Some(pos) = pos {
debug!(
"✅ Ack (Stored)";
Expand Down Expand Up @@ -237,8 +245,26 @@ impl WebPushClient {
self.stats.stored_acked += 1;
continue;
};

// The client returned an ACK for a message that we don't recognize. This shouldn't happen,
// so we should raise a bit of a stink.
info!("✖️🟥 Found unknown ACK: {:?}", &notif.version);
let _ = self
.app_state
.metrics
.incr("ua.command.ack.unknown_version");
}

// Return metrics associated with the various possible return codes.
if !codes.is_empty() {
for (key, val) in codes.into_iter() {
self.app_state
.metrics
.count_with_tags("ua.command.ack", val)
.with_tag("code", &key.to_string())
.send();
}
}
if self.ack_state.unacked_notifs() {
// Wait for the Client to Ack all notifications before further
// processing
Expand Down
12 changes: 6 additions & 6 deletions autoendpoint/src/extractors/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use uuid::Uuid;
/// Extracts notification data from `Subscription` and request data
#[derive(Clone, Debug)]
pub struct Notification {
/// Unique message_id for this notification
/// Unique message_id for this notification generated from the UAID, ChannelID, and other key elements.
pub message_id: String,
/// The subscription information block
pub subscription: Subscription,
/// Set of associated crypto headers
pub headers: NotificationHeaders,
/// UNIX timestamp in seconds
pub timestamp: u64,
pub recv_timestamp: u64,
/// UNIX timestamp in milliseconds
pub sort_key_timestamp: u64,
/// The encrypted notification body
Expand Down Expand Up @@ -58,7 +58,7 @@ impl FromRequest for Notification {
};

let headers = NotificationHeaders::from_request(&req, data.is_some())?;
let timestamp = sec_since_epoch();
let recv_timestamp = sec_since_epoch();
let sort_key_timestamp = ms_since_epoch();
let message_id = Self::generate_message_id(
&app_state.fernet,
Expand All @@ -82,7 +82,7 @@ impl FromRequest for Notification {
message_id,
subscription,
headers,
timestamp,
recv_timestamp,
sort_key_timestamp,
data,
})
Expand All @@ -100,7 +100,7 @@ impl From<Notification> for autopush_common::notification::Notification {
version: notification.message_id,
ttl: notification.headers.ttl as u64,
topic,
timestamp: notification.timestamp,
recv_timestamp: notification.recv_timestamp,
data: notification.data,
sortkey_timestamp,
reliability_id: notification.subscription.reliability_id,
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Notification {
map.insert("version", serde_json::to_value(&self.message_id)?);
map.insert("ttl", serde_json::to_value(self.headers.ttl)?);
map.insert("topic", serde_json::to_value(&self.headers.topic)?);
map.insert("timestamp", serde_json::to_value(self.timestamp)?);
map.insert("timestamp", serde_json::to_value(self.recv_timestamp)?);
if let Some(reliability_id) = &self.subscription.reliability_id {
map.insert("reliability_id", serde_json::to_value(reliability_id)?);
}
Expand Down
4 changes: 3 additions & 1 deletion autoendpoint/src/routers/apns/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ impl Router for ApnsRouter {
apns_priority: Some(Priority::High),
apns_topic: Some(topic),
apns_collapse_id: None,
apns_expiration: Some(notification.timestamp + notification.headers.ttl as u64),
apns_expiration: Some(
notification.recv_timestamp + notification.headers.ttl as u64,
),
..Default::default()
},
);
Expand Down
4 changes: 2 additions & 2 deletions autoendpoint/src/routers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub fn incr_success_metrics(
metrics
.time_with_tags(
"notification.total_request_time",
(autopush_common::util::sec_since_epoch() - notification.timestamp) * 1000,
(autopush_common::util::sec_since_epoch() - notification.recv_timestamp) * 1000,
)
.with_tag("platform", platform)
.with_tag("app_id", app_id)
Expand Down Expand Up @@ -255,7 +255,7 @@ pub mod tests {
encryption_key: Some("test-encryption-key".to_string()),
crypto_key: Some("test-crypto-key".to_string()),
},
timestamp: 0,
recv_timestamp: 0,
sort_key_timestamp: 0,
data,
}
Expand Down
3 changes: 2 additions & 1 deletion autoendpoint/src/routers/webpush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ impl Router for WebPushRouter {
self.metrics
.time_with_tags(
"notification.total_request_time",
(notification.timestamp - autopush_common::util::sec_since_epoch())
(notification.recv_timestamp
- autopush_common::util::sec_since_epoch())
* 1000,
)
.with_tag("platform", "websocket")
Expand Down
6 changes: 3 additions & 3 deletions autopush-common/src/db/bigtable/bigtable_client/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct Cell {
pub value_index: usize,
/// "Timestamp" in milliseconds. This value is used by the family
/// garbage collection rules and may not reflect reality.
pub timestamp: SystemTime,
pub timestamp_st: SystemTime,
pub labels: Vec<String>, // not sure if these are used?
}

Expand All @@ -26,7 +26,7 @@ impl Default for Cell {
Self {
family: String::default(),
qualifier: String::default(),
timestamp: SystemTime::now(),
timestamp_st: SystemTime::now(),
labels: Vec::new(),
value: Vec::new(),
value_index: 0,
Expand All @@ -41,7 +41,7 @@ impl From<PartialCell> for Cell {
qualifier: partial.qualifier,
value: partial.value,
value_index: partial.value_index,
timestamp: partial.timestamp,
timestamp_st: partial.timestamp_st,
labels: partial.labels,
}
}
Expand Down
10 changes: 5 additions & 5 deletions autopush-common/src/db/bigtable/bigtable_client/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) struct PartialCell {
/// Timestamps are returned as microseconds, but need to be
/// specified as milliseconds (even though the function asks
/// for microseconds, you * 1000 the mls).
pub(crate) timestamp: SystemTime,
pub(crate) timestamp_st: SystemTime,
/// Not sure if or how these are used
pub(crate) labels: Vec<String>,
/// The data buffer.
Expand All @@ -43,7 +43,7 @@ impl Default for PartialCell {
Self {
family: String::default(),
qualifier: String::default(),
timestamp: SystemTime::now(),
timestamp_st: SystemTime::now(),
labels: Vec::new(),
value: Vec::new(),
value_index: 0,
Expand Down Expand Up @@ -210,7 +210,7 @@ impl RowMerger {
// record the timestamp for this cell. (Note: this is not the clock time that it was
// created, but the timestamp that was used for it's creation. It is used by the
// garbage collector.)
cell.timestamp =
cell.timestamp_st =
SystemTime::UNIX_EPOCH + Duration::from_micros(chunk.timestamp_micros as u64);

// If there are additional labels for this cell, record them.
Expand Down Expand Up @@ -320,7 +320,7 @@ impl RowMerger {
row_in_progress.last_qualifier.clone_from(&qualifier);
let qualifier_cells = vec![Cell {
family: cell_in_progress.family.clone(),
timestamp: cell_in_progress.timestamp,
timestamp_st: cell_in_progress.timestamp_st,
labels: cell_in_progress.labels.clone(),
qualifier: cell_in_progress.qualifier.clone(),
value: cell_in_progress.value.clone(),
Expand All @@ -336,7 +336,7 @@ impl RowMerger {
}

// reset the cell in progress
cell_in_progress.timestamp = SystemTime::now();
cell_in_progress.timestamp_st = SystemTime::now();
cell_in_progress.value.clear();
cell_in_progress.value_index = 0;

Expand Down
Loading