Skip to content

Commit

Permalink
Merge branch 'main' into switch_to_simplified_sync_service
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinaboos committed Oct 1, 2024
2 parents 5212ae5 + f522a1a commit 571536d
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 15 deletions.
110 changes: 100 additions & 10 deletions src/home/room_screen.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A room screen is the UI page that displays a single Room's timeline of events/messages
//! along with a message input bar at the bottom.
use std::{borrow::Cow, collections::BTreeMap, ops::{DerefMut, Range}, sync::{Arc, Mutex}};
use std::{borrow::Cow, collections::{BTreeMap, HashMap}, ops::{DerefMut, Range}, sync::{Arc, Mutex}, time::Instant};

use imbl::Vector;
use makepad_widgets::*;
Expand All @@ -17,20 +17,14 @@ use matrix_sdk_ui::timeline::{
};

use crate::{
avatar_cache::{self, AvatarCacheEntry},
event_preview::{text_preview_of_member_profile_change, text_preview_of_other_state, text_preview_of_redacted_message, text_preview_of_room_membership_change, text_preview_of_timeline_item},
media_cache::{MediaCache, MediaCacheEntry},
profile::{
avatar_cache::{self, AvatarCacheEntry}, event_preview::{text_preview_of_member_profile_change, text_preview_of_other_state, text_preview_of_redacted_message, text_preview_of_room_membership_change, text_preview_of_timeline_item}, home::main_content::MainContentWidgetRefExt, media_cache::{MediaCache, MediaCacheEntry}, profile::{
user_profile::{AvatarState, ShowUserProfileAction, UserProfile, UserProfileAndRoomId, UserProfilePaneInfo, UserProfileSlidingPaneRef, UserProfileSlidingPaneWidgetExt},
user_profile_cache,
},
shared::{
}, shared::{
avatar::{AvatarRef, AvatarWidgetRefExt},
html_or_plaintext::{HtmlOrPlaintextRef, HtmlOrPlaintextWidgetRefExt},
text_or_image::{TextOrImageRef, TextOrImageWidgetRefExt},
},
sliding_sync::{get_client, submit_async_request, take_timeline_update_receiver, MatrixRequest},
utils::{self, unix_time_millis_to_datetime, MediaFormatConst},
}, sliding_sync::{get_client, submit_async_request, take_timeline_update_receiver, MatrixRequest}, utils::{self, unix_time_millis_to_datetime, MediaFormatConst}
};
use rangemap::RangeSet;

Expand Down Expand Up @@ -933,6 +927,78 @@ struct RoomScreen {
#[rust] room_name: String,
/// The UI-relevant states for the room that this widget is currently displaying.
#[rust] tl_state: Option<TimelineUiState>,
/// 5 secs timer when scroll ends
#[rust] fully_read_timer: Timer,
}

impl RoomScreen{
fn send_user_read_receipts_based_on_scroll_pos(
&mut self,
cx: &mut Cx,
actions: &ActionsBuf,
) {
let portal_list = self.portal_list(id!(list));
//stopped scrolling
if portal_list.scrolled(actions) {
return;
}
let first_index = portal_list.first_id();

let Some(tl_state) = self.tl_state.as_mut() else { return };
let Some(room_id) = self.room_id.as_ref() else { return };
if let Some(ref mut index) = tl_state.prev_first_index {
// to detect change of scroll when scroll ends
if *index != first_index {
// scroll changed
self.fully_read_timer = cx.start_interval(5.0);
let time_now = std::time::Instant::now();
if first_index > *index {
// Store visible event messages with current time into a hashmap
let mut read_receipt_event = None;
for r in first_index .. (first_index + portal_list.visible_items() + 1) {
if let Some(v) = tl_state.items.get(r) {
if let Some(e) = v.as_event().and_then(|f| f.event_id()) {
read_receipt_event = Some(e.to_owned());
if !tl_state.read_event_hashmap.contains_key(&e.to_string()) {
tl_state.read_event_hashmap.insert(
e.to_string(),
(room_id.clone(), e.to_owned(), time_now, false),
);
}
}
}
}
if let Some(event_id) = read_receipt_event {
submit_async_request(MatrixRequest::ReadReceipt { room_id: room_id.clone(), event_id });
}
let mut fully_read_receipt_event = None;
// Implements sending fully read receipts when message is scrolled out of first row
for r in *index..first_index {
if let Some(v) = tl_state.items.get(r).clone() {
if let Some(e) = v.as_event().and_then(|f| f.event_id()) {
let mut to_remove = vec![];
for (event_id_string, (_, event_id)) in &tl_state.marked_fully_read_queue {
if e == event_id {
fully_read_receipt_event = Some(event_id.clone());
to_remove.push(event_id_string.clone());
}
}
for r in to_remove {
tl_state.marked_fully_read_queue.remove(&r);
}
}
}
}
if let Some(event_id) = fully_read_receipt_event {
submit_async_request(MatrixRequest::FullyReadReceipt { room_id: room_id.clone(), event_id: event_id.clone()});
}
}
*index = first_index;
}
} else {
tl_state.prev_first_index = Some(first_index);
}
}
}

impl Widget for RoomScreen {
Expand Down Expand Up @@ -997,6 +1063,8 @@ impl Widget for RoomScreen {
if curr_item_idx != new_item_idx {
log!("Timeline::handle_event(): jumping view from event index {curr_item_idx} to new index {new_item_idx}, scroll {new_item_scroll}, event ID {_event_id}");
portal_list.set_first_id_and_scroll(new_item_idx, new_item_scroll);
tl.prev_first_index = Some(new_item_idx);
cx.stop_timer(self.fully_read_timer);
}
}
// TODO: after an (un)ignore user event, all timelines are cleared.
Expand Down Expand Up @@ -1025,6 +1093,7 @@ impl Widget for RoomScreen {
// log!("Timeline::handle_event(): changed_indices: {changed_indices:?}, items len: {}\ncontent drawn: {:#?}\nprofile drawn: {:#?}", items.len(), tl.content_drawn_since_last_update, tl.profile_drawn_since_last_update);
}
tl.items = items;

}
TimelineUpdate::TimelineStartReached => {
log!("Timeline::handle_event(): timeline start reached for room {}", tl.room_id);
Expand Down Expand Up @@ -1081,6 +1150,7 @@ impl Widget for RoomScreen {
}

if let Event::Actions(actions) = event {
self.send_user_read_receipts_based_on_scroll_pos(cx, actions);
for action in actions {
// Handle actions on a message, e.g., clicking the reply button or clicking the reply preview.
match action.as_widget_action().cast() {
Expand Down Expand Up @@ -1336,6 +1406,19 @@ impl Widget for RoomScreen {
}
}

// Mark events as fully read after they have been displayed on screen for 5 seconds.
if self.fully_read_timer.is_event(event).is_some() {
if let (Some(ref mut tl_state), Some(ref _room_id)) = (&mut self.tl_state, &self.room_id) {
for (k, (room, event, start, ref mut moved_to_queue)) in &mut tl_state.read_event_hashmap {
if start.elapsed() > std::time::Duration::new(5, 0) && !*moved_to_queue{
tl_state.marked_fully_read_queue.insert(k.clone(), (room.clone(), event.clone()));
*moved_to_queue = true;
}
}
}
cx.stop_timer(self.fully_read_timer);
}

// Only forward visibility-related events (touch/tap/scroll) to the inner timeline view
// if the user profile sliding pane is not visible.
if event.requires_visibility() && pane.is_currently_shown(cx) {
Expand Down Expand Up @@ -1571,6 +1654,9 @@ impl RoomScreen {
replying_to: None,
saved_state: SavedState::default(),
message_highlight_animation_state: MessageHighlightAnimationState::default(),
prev_first_index: None,
read_event_hashmap: HashMap::new(),
marked_fully_read_queue: HashMap::new(),
};
(new_tl_state, true)
};
Expand Down Expand Up @@ -1823,6 +1909,10 @@ struct TimelineUiState {
/// Once the scrolling is started, the state becomes Pending.
/// If the animation was trigged, the state goes back to Off.
message_highlight_animation_state: MessageHighlightAnimationState,

prev_first_index: Option<usize>,
read_event_hashmap: HashMap<String, (OwnedRoomId, OwnedEventId, Instant, bool)>,
marked_fully_read_queue: HashMap<String, (OwnedRoomId, OwnedEventId)>,
}

/// The item index, scroll position, and optional unique IDs of the first `N` events
Expand Down
74 changes: 69 additions & 5 deletions src/sliding_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,34 @@ use eyeball_im::VectorDiff;
use futures_util::StreamExt;
use makepad_widgets::{error, log, warning, SignalToUI};
use matrix_sdk::{
config::RequestConfig, event_handler::EventHandlerDropGuard, media::MediaRequest, room::RoomMember, ruma::{
api::client::session::get_login_types::v3::LoginType, events::{room::{message::{ForwardThread, RoomMessageEventContent}, MediaSource}, FullStateEventContent}, OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, RoomId, UserId
}, sliding_sync::VersionBuilder, Client, Room
config::RequestConfig,
event_handler::EventHandlerDropGuard,
media::MediaRequest,
room::{Receipts, RoomMember},
ruma::{
api::client::{receipt::create_receipt::v3::ReceiptType, session::get_login_types::v3::LoginType},
events::{
receipt::ReceiptThread, room::{
message::{ForwardThread, RoomMessageEventContent},
MediaSource,
}, FullStateEventContent
},
OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, RoomId, UserId
},
sliding_sync::VersionBuilder,
Client,
Room,
};
use matrix_sdk_ui::{
room_list_service::{self, RoomListLoadingState},
sync_service::{self, SyncService},
timeline::{AnyOtherFullStateEventContent, EventTimelineItem, LiveBackPaginationStatus, RepliedToInfo, TimelineDetails, TimelineItemContent},
Timeline,
};
use matrix_sdk_ui::{room_list_service::{self, RoomListLoadingState}, sync_service::{self, SyncService}, timeline::{AnyOtherFullStateEventContent, EventTimelineItem, LiveBackPaginationStatus, RepliedToInfo, TimelineDetails, TimelineItemContent}, Timeline};
use tokio::{
runtime::Handle,
sync::mpsc::{UnboundedSender, UnboundedReceiver}, task::JoinHandle,
sync::mpsc::{UnboundedSender, UnboundedReceiver},
task::JoinHandle,
};
use unicode_segmentation::UnicodeSegmentation;
use std::{cmp::{max, min}, collections::{BTreeMap, BTreeSet}, path:: Path, sync::{Arc, Mutex, OnceLock}};
Expand Down Expand Up @@ -235,6 +255,16 @@ pub enum MatrixRequest {
/// Whether to subscribe or unsubscribe from typing notices for this room.
subscribe: bool,
},
/// Sends a read receipt for the given event in the given room.
ReadReceipt{
room_id: OwnedRoomId,
event_id: OwnedEventId,
},
/// Sends a fully-read receipt for the given event in the given room.
FullyReadReceipt{
room_id: OwnedRoomId,
event_id: OwnedEventId,
}
}

/// Submits a request to the worker thread to be executed asynchronously.
Expand Down Expand Up @@ -608,6 +638,40 @@ async fn async_worker(mut receiver: UnboundedReceiver<MatrixRequest>) -> Result<
});
}

MatrixRequest::ReadReceipt { room_id, event_id }=>{
let timeline = {
let all_room_info = ALL_ROOM_INFO.lock().unwrap();
let Some(room_info) = all_room_info.get(&room_id) else {
log!("BUG: room info not found when sending read receipt, room {room_id}, {event_id}");
continue;
};
room_info.timeline.clone()
};
let _send_rr_task = Handle::current().spawn(async move {
match timeline.send_single_receipt(ReceiptType::Read, ReceiptThread::Unthreaded, event_id.clone()).await {
Ok(sent) => log!("{} read receipt to room {room_id} for event {event_id}", if sent { "Sent" } else { "Already sent" }),
Err(_e) => error!("Failed to send read receipt to room {room_id} for event {event_id}; error: {_e:?}"),
}
});
},

MatrixRequest::FullyReadReceipt { room_id, event_id }=>{
let timeline = {
let all_room_info = ALL_ROOM_INFO.lock().unwrap();
let Some(room_info) = all_room_info.get(&room_id) else {
log!("BUG: room info not found when sending fully read receipt, room {room_id}, {event_id}");
continue;
};
room_info.timeline.clone()
};
let _send_frr_task = Handle::current().spawn(async move {
let receipt = Receipts::new().fully_read_marker(event_id.clone());
match timeline.send_multiple_receipts(receipt).await {
Ok(()) => log!("Sent fully read receipt to room {room_id}, event {event_id}"),
Err(_e) => error!("Failed to send fully read receipt to room {room_id}, event {event_id}; error: {_e:?}"),
}
});
}
}
}

Expand Down

0 comments on commit 571536d

Please sign in to comment.