diff --git a/crates/matrix-sdk/src/lib.rs b/crates/matrix-sdk/src/lib.rs index 370061fa8de..5730ed086ce 100644 --- a/crates/matrix-sdk/src/lib.rs +++ b/crates/matrix-sdk/src/lib.rs @@ -94,6 +94,7 @@ pub use sliding_sync::{ #[cfg(feature = "uniffi")] uniffi::setup_scaffolding!(); +pub mod live_location_share; #[cfg(any(test, feature = "testing"))] pub mod test_utils; diff --git a/crates/matrix-sdk/src/live_location_share.rs b/crates/matrix-sdk/src/live_location_share.rs new file mode 100644 index 00000000000..f2d3e4fa097 --- /dev/null +++ b/crates/matrix-sdk/src/live_location_share.rs @@ -0,0 +1,83 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Types for live location sharing. +//! +//! Live location sharing allows users to share their real-time location with +//! others in a room via [MSC3489](https://github.com/matrix-org/matrix-spec-proposals/pull/3489). +use async_stream::stream; +use futures_util::Stream; +use ruma::{ + events::{ + beacon::OriginalSyncBeaconEvent, beacon_info::BeaconInfoEventContent, + location::LocationContent, + }, + MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, +}; + +use crate::{event_handler::ObservableEventHandler, Client, Room}; + +/// An observable live location. +#[derive(Debug)] +pub struct ObservableLiveLocation { + observable_room_events: ObservableEventHandler<(OriginalSyncBeaconEvent, Room)>, +} + +impl ObservableLiveLocation { + /// Create a new `ObservableLiveLocation` for a particular room. + pub fn new(client: &Client, room_id: &RoomId) -> Self { + Self { observable_room_events: client.observe_room_events(room_id) } + } + + /// Get a stream of [`LiveLocationShare`]. + pub fn subscribe(&self) -> impl Stream { + let stream = self.observable_room_events.subscribe(); + stream! { + for await (event, room) in stream { + yield LiveLocationShare { + last_location: LastLocation { + location: event.content.location, + ts: event.origin_server_ts, + }, + beacon_info: room + .get_user_beacon_info(&event.sender) + .await + .ok() + .map(|info| info.content), + user_id: event.sender, + }; + } + } + } +} + +/// Details of the last known location beacon. +#[derive(Clone, Debug)] +pub struct LastLocation { + /// The most recent location content of the user. + pub location: LocationContent, + /// The timestamp of when the location was updated. + pub ts: MilliSecondsSinceUnixEpoch, +} + +/// Details of a users live location share. +#[derive(Clone, Debug)] +pub struct LiveLocationShare { + /// The user's last known location. + pub last_location: LastLocation, + /// Information about the associated beacon event. + pub beacon_info: Option, + /// The user ID of the person sharing their live location. + pub user_id: OwnedUserId, +} diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 00484d31457..c5634b4e1ed 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -132,6 +132,7 @@ use crate::{ error::{BeaconError, WrongRoomState}, event_cache::{self, EventCacheDropHandles, RoomEventCache}, event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent}, + live_location_share::ObservableLiveLocation, media::{MediaFormat, MediaRequestParameters}, notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode}, room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt}, @@ -2990,17 +2991,18 @@ impl Room { Ok(()) } - /// Get the beacon information event in the room for the current user. + /// Get the beacon information event in the room for the `user_id`. /// /// # Errors /// /// Returns an error if the event is redacted, stripped, not found or could /// not be deserialized. - async fn get_user_beacon_info( + pub(crate) async fn get_user_beacon_info( &self, + user_id: &UserId, ) -> Result, BeaconError> { let raw_event = self - .get_state_event_static_for_key::(self.own_user_id()) + .get_state_event_static_for_key::(user_id) .await? .ok_or(BeaconError::NotFound)?; @@ -3053,7 +3055,7 @@ impl Room { ) -> Result { self.ensure_room_joined()?; - let mut beacon_info_event = self.get_user_beacon_info().await?; + let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?; beacon_info_event.content.stop(); Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?) } @@ -3075,7 +3077,7 @@ impl Room { ) -> Result { self.ensure_room_joined()?; - let beacon_info_event = self.get_user_beacon_info().await?; + let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?; if beacon_info_event.content.is_live() { let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None); @@ -3166,6 +3168,14 @@ impl Room { }, } } + + /// Observe live location sharing events for this room. + /// + /// The returned observable will receive the newest event for each sync + /// response that contains an `m.beacon` event. + pub fn observe_live_location_shares(&self) -> ObservableLiveLocation { + ObservableLiveLocation::new(&self.client, self.room_id()) + } } #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))] diff --git a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs index d83bfda7c70..2f3403087a4 100644 --- a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs +++ b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs @@ -1,8 +1,13 @@ use std::time::{Duration, UNIX_EPOCH}; -use matrix_sdk::config::SyncSettings; -use matrix_sdk_test::{async_test, mocks::mock_encryption_state, test_json, DEFAULT_TEST_ROOM_ID}; -use ruma::{event_id, time::SystemTime}; +use futures_util::{pin_mut, StreamExt as _}; +use js_int::uint; +use matrix_sdk::{config::SyncSettings, live_location_share::LiveLocationShare}; +use matrix_sdk_test::{ + async_test, mocks::mock_encryption_state, sync_timeline_event, test_json, JoinedRoomBuilder, + SyncResponseBuilder, DEFAULT_TEST_ROOM_ID, +}; +use ruma::{event_id, events::location::AssetType, time::SystemTime, MilliSecondsSinceUnixEpoch}; use serde_json::json; use wiremock::{ matchers::{body_partial_json, header, method, path_regex}, @@ -153,3 +158,219 @@ async fn test_send_location_beacon_with_expired_live_share() { assert!(response.is_err()); } + +#[async_test] +async fn test_most_recent_event_in_stream() { + let (client, server) = logged_in_client_with_server().await; + + let mut sync_builder = SyncResponseBuilder::new(); + + let current_time = MilliSecondsSinceUnixEpoch::now(); + let millis_time = current_time + .to_system_time() + .unwrap() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + mock_sync( + &server, + json!({ + "next_batch": "s526_47314_0_7_1_1_1_1_1", + "rooms": { + "join": { + *DEFAULT_TEST_ROOM_ID: { + "state": { + "events": [ + { + "content": { + "description": "Live Share", + "live": true, + "org.matrix.msc3488.ts": millis_time, + "timeout": 3000, + "org.matrix.msc3488.asset": { "type": "m.self" } + }, + "event_id": "$15139375514XsgmR:localhost", + "origin_server_ts": millis_time, + "sender": "@example:localhost", + "state_key": "@example:localhost", + "type": "org.matrix.msc3672.beacon_info", + "unsigned": { + "age": 7034220 + } + }, + ] + } + } + } + } + + }), + None, + ) + .await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + + let observable_live_location_shares = room.observe_live_location_shares(); + let stream = observable_live_location_shares.subscribe(); + pin_mut!(stream); + + let mut timeline_events = Vec::new(); + + for nth in 0..25 { + timeline_events.push(sync_timeline_event!({ + "content": { + "m.relates_to": { + "event_id": "$15139375514XsgmR:localhost", + "rel_type": "m.reference" + }, + "org.matrix.msc3488.location": { + "uri": format!("geo:{nth}.9575274619722,12.494122581370175;u={nth}") + }, + "org.matrix.msc3488.ts": 1_636_829_458 + }, + "event_id": format!("$event_for_stream_{nth}"), + "origin_server_ts": 1_636_829_458, + "sender": "@example:localhost", + "type": "org.matrix.msc3672.beacon", + "unsigned": { + "age": 598971 + } + })); + } + + sync_builder.add_joined_room( + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events), + ); + + mock_sync(&server, sync_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + // Stream should only process the latest beacon event for the user, ignoring any + // previous events. + let LiveLocationShare { user_id, last_location, beacon_info } = + stream.next().await.expect("Another live location was expected"); + + assert_eq!(user_id.to_string(), "@example:localhost"); + + assert_eq!(last_location.location.uri, "geo:24.9575274619722,12.494122581370175;u=24"); + + assert!(last_location.location.description.is_none()); + assert!(last_location.location.zoom_level.is_none()); + assert_eq!(last_location.ts, MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))); + + let beacon_info = beacon_info.expect("Live location share is missing the beacon_info"); + + assert!(beacon_info.live); + assert!(beacon_info.is_live()); + assert_eq!(beacon_info.description, Some("Live Share".to_owned())); + assert_eq!(beacon_info.timeout, Duration::from_millis(3000)); + assert_eq!(beacon_info.ts, current_time); + assert_eq!(beacon_info.asset.type_, AssetType::Self_); +} + +#[async_test] +async fn test_observe_single_live_location_share() { + let (client, server) = logged_in_client_with_server().await; + + let current_time = MilliSecondsSinceUnixEpoch::now(); + let millis_time = current_time + .to_system_time() + .unwrap() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + mock_sync( + &server, + json!({ + "next_batch": "s526_47314_0_7_1_1_1_1_1", + "rooms": { + "join": { + *DEFAULT_TEST_ROOM_ID: { + "state": { + "events": [ + { + "content": { + "description": "Test Live Share", + "live": true, + "org.matrix.msc3488.ts": millis_time, + "timeout": 3000, + "org.matrix.msc3488.asset": { "type": "m.self" } + }, + "event_id": "$test_beacon_info", + "origin_server_ts": millis_time, + "sender": "@example:localhost", + "state_key": "@example:localhost", + "type": "org.matrix.msc3672.beacon_info", + } + ] + } + } + } + } + }), + None, + ) + .await; + + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let observable_live_location_shares = room.observe_live_location_shares(); + let stream = observable_live_location_shares.subscribe(); + pin_mut!(stream); + + let timeline_event = sync_timeline_event!({ + "content": { + "m.relates_to": { + "event_id": "$test_beacon_info", + "rel_type": "m.reference" + }, + "org.matrix.msc3488.location": { + "uri": "geo:10.000000,20.000000;u=5" + }, + "org.matrix.msc3488.ts": 1_636_829_458 + }, + "event_id": "$location_event", + "origin_server_ts": millis_time, + "sender": "@example:localhost", + "type": "org.matrix.msc3672.beacon", + }); + + mock_sync( + &server, + SyncResponseBuilder::new() + .add_joined_room( + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event), + ) + .build_json_sync_response(), + None, + ) + .await; + + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let LiveLocationShare { user_id, last_location, beacon_info } = + stream.next().await.expect("Another live location was expected"); + + assert_eq!(user_id.to_string(), "@example:localhost"); + assert_eq!(last_location.location.uri, "geo:10.000000,20.000000;u=5"); + assert_eq!(last_location.ts, current_time); + + let beacon_info = beacon_info.expect("Live location share is missing the beacon_info"); + + assert!(beacon_info.live); + assert!(beacon_info.is_live()); + assert_eq!(beacon_info.description, Some("Test Live Share".to_owned())); + assert_eq!(beacon_info.timeout, Duration::from_millis(3000)); + assert_eq!(beacon_info.ts, current_time); +}