From 9fde835673d3a227fd3f9740edf17076a5e01a28 Mon Sep 17 00:00:00 2001 From: morguldir Date: Fri, 30 Aug 2024 18:23:42 +0200 Subject: [PATCH] syncv3: read receipts extension (MSC3960) --- src/api/client/sync.rs | 25 +++++++++++++--- src/service/rooms/read_receipt/data.rs | 10 ++----- src/service/rooms/read_receipt/mod.rs | 41 +++++++++++++++++++++++--- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/src/api/client/sync.rs b/src/api/client/sync.rs index d95a88ffb..236b85dda 100644 --- a/src/api/client/sync.rs +++ b/src/api/client/sync.rs @@ -36,6 +36,7 @@ use ruma::{ state_res::Event, uint, DeviceId, EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId, }; +use service::rooms::read_receipt::pack_receipts; use tracing::{Instrument as _, Span}; use crate::{ @@ -1168,6 +1169,10 @@ pub(crate) async fn sync_events_v4_route( let mut device_list_changes = HashSet::new(); let mut device_list_left = HashSet::new(); + let mut receipts = sync_events::v4::Receipts { + rooms: BTreeMap::new(), + }; + let mut account_data = sync_events::v4::AccountData { global: Vec::new(), rooms: BTreeMap::new(), @@ -1509,7 +1514,21 @@ pub(crate) async fn sync_events_v4_route( .collect(), ); - if roomsince != &0 && timeline_pdus.is_empty() && account_data.rooms.get(room_id).is_some_and(Vec::is_empty) { + let room_receipts = services + .rooms + .read_receipt + .readreceipts_since(room_id, *roomsince); + let vector: Vec<_> = room_receipts.into_iter().collect(); + let receipt_size = vector.len(); + receipts + .rooms + .insert(room_id.clone(), pack_receipts(Box::new(vector.into_iter()))); + + if roomsince != &0 + && timeline_pdus.is_empty() + && account_data.rooms.get(room_id).is_some_and(Vec::is_empty) + && receipt_size == 0 + { continue; } @@ -1723,9 +1742,7 @@ pub(crate) async fn sync_events_v4_route( device_unused_fallback_key_types: None, }, account_data, - receipts: sync_events::v4::Receipts { - rooms: BTreeMap::new(), - }, + receipts, typing: sync_events::v4::Typing { rooms: BTreeMap::new(), }, diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index 0f400ff3a..0c156df38 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -2,17 +2,11 @@ use std::{mem::size_of, sync::Arc}; use conduit::{utils, Error, Result}; use database::Map; -use ruma::{ - events::{receipt::ReceiptEvent, AnySyncEphemeralRoomEvent}, - serde::Raw, - CanonicalJsonObject, OwnedUserId, RoomId, UserId, -}; +use ruma::{events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, RoomId, UserId}; +use super::AnySyncEphemeralRoomEventIter; use crate::{globals, Dep}; -type AnySyncEphemeralRoomEventIter<'a> = - Box)>> + 'a>; - pub(super) struct Data { roomuserid_privateread: Arc, roomuserid_lastprivatereadupdate: Arc, diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index d202d8935..da11e2a0f 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -1,10 +1,17 @@ mod data; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; -use conduit::Result; +use conduit::{debug, Result}; use data::Data; -use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId}; +use ruma::{ + events::{ + receipt::{ReceiptEvent, ReceiptEventContent}, + AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, + }, + serde::Raw, + OwnedUserId, RoomId, UserId, +}; use crate::{sending, Dep}; @@ -17,6 +24,9 @@ struct Services { sending: Dep, } +type AnySyncEphemeralRoomEventIter<'a> = + Box)>> + 'a>; + impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { @@ -44,7 +54,7 @@ impl Service { #[tracing::instrument(skip(self), level = "debug")] pub fn readreceipts_since<'a>( &'a self, room_id: &RoomId, since: u64, - ) -> impl Iterator)>> + 'a { + ) -> impl Iterator)>> + 'a { self.db.readreceipts_since(room_id, since) } @@ -65,3 +75,26 @@ impl Service { self.db.last_privateread_update(user_id, room_id) } } + +#[must_use] +pub fn pack_receipts(receipts: AnySyncEphemeralRoomEventIter<'_>) -> Raw> { + let mut json = BTreeMap::new(); + for (_user, _count, value) in receipts.flatten() { + let receipt = serde_json::from_str::>(value.json().get()); + if let Ok(value) = receipt { + for (event, receipt) in value.content { + json.insert(event, receipt); + } + } else { + debug!("failed to parse receipt: {:?}", receipt); + } + } + let content = ReceiptEventContent::from_iter(json); + + Raw::from_json( + serde_json::value::to_raw_value(&SyncEphemeralRoomEvent { + content, + }) + .expect("received valid json"), + ) +}