Skip to content

Commit

Permalink
feat(sdk): support for observing m.beacon events
Browse files Browse the repository at this point in the history
  • Loading branch information
torrybr committed Nov 27, 2024
1 parent cefd5a2 commit e10d8eb
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 8 deletions.
1 change: 1 addition & 0 deletions crates/matrix-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub use sliding_sync::{
#[cfg(feature = "uniffi")]
uniffi::setup_scaffolding!();

pub mod live_location_share;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;

Expand Down
83 changes: 83 additions & 0 deletions crates/matrix-sdk/src/live_location_share.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Types for live location sharing.
//!
//! Live location sharing allows users to share their real-time location with
//! others in a room via [MSC3489](https://github.com/matrix-org/matrix-spec-proposals/pull/3489).
use async_stream::stream;
use futures_util::Stream;
use ruma::{
events::{
beacon::OriginalSyncBeaconEvent, beacon_info::BeaconInfoEventContent,
location::LocationContent,
},
MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId,
};

use crate::{event_handler::ObservableEventHandler, Client, Room};

/// An observable live location.
#[derive(Debug)]
pub struct ObservableLiveLocation {
observable_room_events: ObservableEventHandler<(OriginalSyncBeaconEvent, Room)>,
}

impl ObservableLiveLocation {
/// Create a new `ObservableLiveLocation` for a particular room.
pub fn new(client: &Client, room_id: &RoomId) -> Self {
Self { observable_room_events: client.observe_room_events(room_id) }
}

/// Get a stream of [`LiveLocationShare`].
pub fn subscribe(&self) -> impl Stream<Item = LiveLocationShare> {
let stream = self.observable_room_events.subscribe();
stream! {
for await (event, room) in stream {
yield LiveLocationShare {
last_location: LastLocation {
location: event.content.location,
ts: event.origin_server_ts,
},
beacon_info: room
.get_user_beacon_info(&event.sender)
.await
.ok()
.map(|info| info.content),
user_id: event.sender,
};
}
}
}
}

/// Details of the last known location beacon.
#[derive(Clone, Debug)]
pub struct LastLocation {
/// The most recent location content of the user.
pub location: LocationContent,
/// The timestamp of when the location was updated.
pub ts: MilliSecondsSinceUnixEpoch,
}

/// Details of a users live location share.
#[derive(Clone, Debug)]
pub struct LiveLocationShare {
/// The user's last known location.
pub last_location: LastLocation,
/// Information about the associated beacon event.
pub beacon_info: Option<BeaconInfoEventContent>,
/// The user ID of the person sharing their live location.
pub user_id: OwnedUserId,
}
20 changes: 15 additions & 5 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ use crate::{
error::{BeaconError, WrongRoomState},
event_cache::{self, EventCacheDropHandles, RoomEventCache},
event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
live_location_share::ObservableLiveLocation,
media::{MediaFormat, MediaRequestParameters},
notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
room::power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
Expand Down Expand Up @@ -2990,17 +2991,18 @@ impl Room {
Ok(())
}

/// Get the beacon information event in the room for the current user.
/// Get the beacon information event in the room for the `user_id`.
///
/// # Errors
///
/// Returns an error if the event is redacted, stripped, not found or could
/// not be deserialized.
async fn get_user_beacon_info(
pub(crate) async fn get_user_beacon_info(
&self,
user_id: &UserId,
) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
let raw_event = self
.get_state_event_static_for_key::<BeaconInfoEventContent, _>(self.own_user_id())
.get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
.await?
.ok_or(BeaconError::NotFound)?;

Expand Down Expand Up @@ -3053,7 +3055,7 @@ impl Room {
) -> Result<send_state_event::v3::Response, BeaconError> {
self.ensure_room_joined()?;

let mut beacon_info_event = self.get_user_beacon_info().await?;
let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
beacon_info_event.content.stop();
Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
}
Expand All @@ -3075,7 +3077,7 @@ impl Room {
) -> Result<send_message_event::v3::Response, BeaconError> {
self.ensure_room_joined()?;

let beacon_info_event = self.get_user_beacon_info().await?;
let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;

if beacon_info_event.content.is_live() {
let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
Expand Down Expand Up @@ -3166,6 +3168,14 @@ impl Room {
},
}
}

/// Observe live location sharing events for this room.
///
/// The returned observable will receive the newest event for each sync
/// response that contains an `m.beacon` event.
pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
ObservableLiveLocation::new(&self.client, self.room_id())
}
}

#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
Expand Down
227 changes: 224 additions & 3 deletions crates/matrix-sdk/tests/integration/room/beacon/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::time::{Duration, UNIX_EPOCH};

use matrix_sdk::config::SyncSettings;
use matrix_sdk_test::{async_test, mocks::mock_encryption_state, test_json, DEFAULT_TEST_ROOM_ID};
use ruma::{event_id, time::SystemTime};
use futures_util::{pin_mut, StreamExt as _};
use js_int::uint;
use matrix_sdk::{config::SyncSettings, live_location_share::LiveLocationShare};
use matrix_sdk_test::{
async_test, mocks::mock_encryption_state, sync_timeline_event, test_json, JoinedRoomBuilder,
SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
};
use ruma::{event_id, events::location::AssetType, time::SystemTime, MilliSecondsSinceUnixEpoch};
use serde_json::json;
use wiremock::{
matchers::{body_partial_json, header, method, path_regex},
Expand Down Expand Up @@ -153,3 +158,219 @@ async fn test_send_location_beacon_with_expired_live_share() {

assert!(response.is_err());
}

#[async_test]
async fn test_most_recent_event_in_stream() {
let (client, server) = logged_in_client_with_server().await;

let mut sync_builder = SyncResponseBuilder::new();

let current_time = MilliSecondsSinceUnixEpoch::now();
let millis_time = current_time
.to_system_time()
.unwrap()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

mock_sync(
&server,
json!({
"next_batch": "s526_47314_0_7_1_1_1_1_1",
"rooms": {
"join": {
*DEFAULT_TEST_ROOM_ID: {
"state": {
"events": [
{
"content": {
"description": "Live Share",
"live": true,
"org.matrix.msc3488.ts": millis_time,
"timeout": 3000,
"org.matrix.msc3488.asset": { "type": "m.self" }
},
"event_id": "$15139375514XsgmR:localhost",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "org.matrix.msc3672.beacon_info",
"unsigned": {
"age": 7034220
}
},
]
}
}
}
}

}),
None,
)
.await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();

let observable_live_location_shares = room.observe_live_location_shares();
let stream = observable_live_location_shares.subscribe();
pin_mut!(stream);

let mut timeline_events = Vec::new();

for nth in 0..25 {
timeline_events.push(sync_timeline_event!({
"content": {
"m.relates_to": {
"event_id": "$15139375514XsgmR:localhost",
"rel_type": "m.reference"
},
"org.matrix.msc3488.location": {
"uri": format!("geo:{nth}.9575274619722,12.494122581370175;u={nth}")
},
"org.matrix.msc3488.ts": 1_636_829_458
},
"event_id": format!("$event_for_stream_{nth}"),
"origin_server_ts": 1_636_829_458,
"sender": "@example:localhost",
"type": "org.matrix.msc3672.beacon",
"unsigned": {
"age": 598971
}
}));
}

sync_builder.add_joined_room(
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events),
);

mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

// Stream should only process the latest beacon event for the user, ignoring any
// previous events.
let LiveLocationShare { user_id, last_location, beacon_info } =
stream.next().await.expect("Another live location was expected");

assert_eq!(user_id.to_string(), "@example:localhost");

assert_eq!(last_location.location.uri, "geo:24.9575274619722,12.494122581370175;u=24");

assert!(last_location.location.description.is_none());
assert!(last_location.location.zoom_level.is_none());
assert_eq!(last_location.ts, MilliSecondsSinceUnixEpoch(uint!(1_636_829_458)));

let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");

assert!(beacon_info.live);
assert!(beacon_info.is_live());
assert_eq!(beacon_info.description, Some("Live Share".to_owned()));
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
assert_eq!(beacon_info.ts, current_time);
assert_eq!(beacon_info.asset.type_, AssetType::Self_);
}

#[async_test]
async fn test_observe_single_live_location_share() {
let (client, server) = logged_in_client_with_server().await;

let current_time = MilliSecondsSinceUnixEpoch::now();
let millis_time = current_time
.to_system_time()
.unwrap()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

mock_sync(
&server,
json!({
"next_batch": "s526_47314_0_7_1_1_1_1_1",
"rooms": {
"join": {
*DEFAULT_TEST_ROOM_ID: {
"state": {
"events": [
{
"content": {
"description": "Test Live Share",
"live": true,
"org.matrix.msc3488.ts": millis_time,
"timeout": 3000,
"org.matrix.msc3488.asset": { "type": "m.self" }
},
"event_id": "$test_beacon_info",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"state_key": "@example:localhost",
"type": "org.matrix.msc3672.beacon_info",
}
]
}
}
}
}
}),
None,
)
.await;

let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
let observable_live_location_shares = room.observe_live_location_shares();
let stream = observable_live_location_shares.subscribe();
pin_mut!(stream);

let timeline_event = sync_timeline_event!({
"content": {
"m.relates_to": {
"event_id": "$test_beacon_info",
"rel_type": "m.reference"
},
"org.matrix.msc3488.location": {
"uri": "geo:10.000000,20.000000;u=5"
},
"org.matrix.msc3488.ts": 1_636_829_458
},
"event_id": "$location_event",
"origin_server_ts": millis_time,
"sender": "@example:localhost",
"type": "org.matrix.msc3672.beacon",
});

mock_sync(
&server,
SyncResponseBuilder::new()
.add_joined_room(
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event),
)
.build_json_sync_response(),
None,
)
.await;

let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;

let LiveLocationShare { user_id, last_location, beacon_info } =
stream.next().await.expect("Another live location was expected");

assert_eq!(user_id.to_string(), "@example:localhost");
assert_eq!(last_location.location.uri, "geo:10.000000,20.000000;u=5");
assert_eq!(last_location.ts, current_time);

let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");

assert!(beacon_info.live);
assert!(beacon_info.is_live());
assert_eq!(beacon_info.description, Some("Test Live Share".to_owned()));
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
assert_eq!(beacon_info.ts, current_time);
}

0 comments on commit e10d8eb

Please sign in to comment.