Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance : improve time to open a room. #3186

Merged
merged 5 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
Expand All @@ -67,73 +65,76 @@ import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.rustcomponents.sdk.FormattedBody
import org.matrix.rustcomponents.sdk.MessageFormat
import org.matrix.rustcomponents.sdk.RoomMessageEventContentWithoutRelation
import org.matrix.rustcomponents.sdk.SendAttachmentJoinHandle
import org.matrix.rustcomponents.sdk.TimelineChange
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import org.matrix.rustcomponents.sdk.messageEventContentFromHtml
import org.matrix.rustcomponents.sdk.messageEventContentFromMarkdown
import org.matrix.rustcomponents.sdk.use
import timber.log.Timber
import uniffi.matrix_sdk_ui.EventItemOrigin
import uniffi.matrix_sdk_ui.LiveBackPaginationStatus
import java.io.File
import java.util.Date
import java.util.concurrent.atomic.AtomicBoolean
import org.matrix.rustcomponents.sdk.Timeline as InnerTimeline

private const val INITIAL_MAX_SIZE = 50
private const val PAGINATION_SIZE = 50

class RustTimeline(
private val inner: InnerTimeline,
isLive: Boolean,
private val isLive: Boolean,
systemClock: SystemClock,
roomCoroutineScope: CoroutineScope,
isKeyBackupEnabled: Boolean,
private val matrixRoom: MatrixRoom,
private val dispatcher: CoroutineDispatcher,
lastLoginTimestamp: Date?,
private val roomContentForwarder: RoomContentForwarder,
private val onNewSyncedEvent: () -> Unit,
onNewSyncedEvent: () -> Unit,
) : Timeline {
private val initLatch = CompletableDeferred<Unit>()
private val isInit = AtomicBoolean(false)

private val _timelineItems: MutableStateFlow<List<MatrixTimelineItem>> =
MutableStateFlow(emptyList())

private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
lastLoginTimestamp = lastLoginTimestamp,
isRoomEncrypted = matrixRoom.isEncrypted,
isKeyBackupEnabled = isKeyBackupEnabled,
dispatcher = dispatcher,
)

private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive)

private val timelineEventContentMapper = TimelineEventContentMapper()
private val inReplyToMapper = InReplyToMapper(timelineEventContentMapper)
private val timelineItemFactory = MatrixTimelineItemMapper(
private val timelineItemMapper = MatrixTimelineItemMapper(
fetchDetailsForEvent = this::fetchDetailsForEvent,
roomCoroutineScope = roomCoroutineScope,
virtualTimelineItemMapper = VirtualTimelineItemMapper(),
eventTimelineItemMapper = EventTimelineItemMapper(
contentMapper = timelineEventContentMapper
)
)

private val timelineDiffProcessor = MatrixTimelineDiffProcessor(
timelineItems = _timelineItems,
timelineItemFactory = timelineItemFactory,
timelineItemFactory = timelineItemMapper,
)
private val encryptedHistoryPostProcessor = TimelineEncryptedHistoryPostProcessor(
lastLoginTimestamp = lastLoginTimestamp,
isRoomEncrypted = matrixRoom.isEncrypted,
isKeyBackupEnabled = isKeyBackupEnabled,
dispatcher = dispatcher,
)
private val timelineItemsSubscriber = TimelineItemsSubscriber(
timeline = inner,
roomCoroutineScope = roomCoroutineScope,
timelineDiffProcessor = timelineDiffProcessor,
initLatch = initLatch,
isInit = isInit,
dispatcher = dispatcher,
onNewSyncedEvent = onNewSyncedEvent,
)

private val roomBeginningPostProcessor = RoomBeginningPostProcessor()
private val loadingIndicatorsPostProcessor = LoadingIndicatorsPostProcessor(systemClock)
private val lastForwardIndicatorsPostProcessor = LastForwardIndicatorsPostProcessor(isLive)

private val backPaginationStatus = MutableStateFlow(
Timeline.PaginationStatus(isPaginating = false, hasMoreToLoad = true)
Expand All @@ -145,36 +146,28 @@ class RustTimeline(

init {
roomCoroutineScope.launch(dispatcher) {
inner.timelineDiffFlow()
.onEach { diffs ->
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
onNewSyncedEvent()
}
postDiffs(diffs)
}
.launchIn(this)

launch {
fetchMembers()
}

fetchMembers()
if (isLive) {
// When timeline is live, we need to listen to the back pagination status as
// sdk can automatically paginate backwards.
inner.liveBackPaginationStatus()
.onEach { backPaginationStatus ->
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
when (backPaginationStatus) {
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
}
}
}
.launchIn(this)
registerBackPaginationStatusListener()
}
}
}

private fun CoroutineScope.registerBackPaginationStatusListener() {
inner.liveBackPaginationStatus()
.onEach { backPaginationStatus ->
updatePaginationStatus(Timeline.PaginationDirection.BACKWARDS) {
when (backPaginationStatus) {
is LiveBackPaginationStatus.Idle -> it.copy(isPaginating = false, hasMoreToLoad = !backPaginationStatus.hitStartOfTimeline)
is LiveBackPaginationStatus.Paginating -> it.copy(isPaginating = true, hasMoreToLoad = true)
}
}
}
.launchIn(this)
}

override val membershipChangeEventReceived: Flow<Unit> = timelineDiffProcessor.membershipChangeEventReceived

override suspend fun sendReadReceipt(eventId: EventId, receiptType: ReceiptType): Result<Unit> {
Expand Down Expand Up @@ -248,13 +241,15 @@ class RustTimeline(
// Keep lastForwardIndicatorsPostProcessor last
.let { items -> lastForwardIndicatorsPostProcessor.process(items) }
}
}.onStart {
timelineItemsSubscriber.subscribeIfNeeded()
}

override fun close() {
inner.close()
}

private suspend fun fetchMembers() = withContext(dispatcher) {
private fun CoroutineScope.fetchMembers() = launch(dispatcher) {
initLatch.await()
try {
inner.fetchMembers()
Expand All @@ -263,32 +258,6 @@ class RustTimeline(
}
}

private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
ensureActive()
timelineDiffProcessor.postItems(it)
}
isInit.set(true)
initLatch.complete(Unit)
}

private suspend fun postDiffs(diffs: List<TimelineDiff>) {
val diffsToProcess = diffs.toMutableList()
if (!isInit.get()) {
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
if (resetDiff != null) {
// Keep using the postItems logic so we can post the timelineItems asap.
postItems(resetDiff.reset() ?: emptyList())
diffsToProcess.remove(resetDiff)
}
}
initLatch.await()
if (diffsToProcess.isNotEmpty()) {
timelineDiffProcessor.postDiffs(diffsToProcess)
}
}

override suspend fun sendMessage(body: String, htmlBody: String?, mentions: List<Mention>): Result<Unit> = withContext(dispatcher) {
messageEventContentFromParts(body, htmlBody).withMentions(mentions.map()).use { content ->
runCatching<Unit> {
Expand Down Expand Up @@ -550,12 +519,6 @@ class RustTimeline(
}
}

private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return runCatching {
inner.fetchDetailsForEvent(eventId.value)
}
}

override suspend fun loadReplyDetails(eventId: EventId): InReplyTo = withContext(dispatcher) {
val timelineItem = _timelineItems.value.firstOrNull { timelineItem ->
timelineItem is MatrixTimelineItem.Event && timelineItem.eventId == eventId
Expand All @@ -572,4 +535,10 @@ class RustTimeline(
inner.loadReplyDetails(eventId.value).use(inReplyToMapper::map)
}
}

private suspend fun fetchDetailsForEvent(eventId: EventId): Result<Unit> {
return runCatching {
inner.fetchDetailsForEvent(eventId.value)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2024 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.element.android.libraries.matrix.impl.timeline

import io.element.android.libraries.core.coroutine.childScope
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.matrix.rustcomponents.sdk.Timeline
import org.matrix.rustcomponents.sdk.TimelineChange
import org.matrix.rustcomponents.sdk.TimelineDiff
import org.matrix.rustcomponents.sdk.TimelineItem
import uniffi.matrix_sdk_ui.EventItemOrigin
import java.util.concurrent.atomic.AtomicBoolean

private const val INITIAL_MAX_SIZE = 50

/**
* This class is responsible for subscribing to a timeline and post the items/diffs to the timelineDiffProcessor.
* It will also trigger a callback when a new synced event is received.
* It will also handle the initial items and make sure they are posted before any diff.
* When closing the room subscription, it will also unsubscribe automatically.
*/
internal class TimelineItemsSubscriber(
roomCoroutineScope: CoroutineScope,
dispatcher: CoroutineDispatcher,
private val timeline: Timeline,
private val timelineDiffProcessor: MatrixTimelineDiffProcessor,
private val initLatch: CompletableDeferred<Unit>,
private val isInit: AtomicBoolean,
private val onNewSyncedEvent: () -> Unit,
) {
private var subscriptionCount = 0
private val mutex = Mutex()

private val coroutineScope = roomCoroutineScope.childScope(dispatcher, "TimelineItemsSubscriber")

suspend fun subscribeIfNeeded() = mutex.withLock {
if (subscriptionCount == 0) {
timeline.timelineDiffFlow()
.onEach { diffs ->
if (diffs.any { diff -> diff.eventOrigin() == EventItemOrigin.SYNC }) {
onNewSyncedEvent()
}
postDiffs(diffs)
}
.launchIn(coroutineScope)
}
subscriptionCount++
}

suspend fun unsubscribeIfNeeded() = mutex.withLock {
bmarty marked this conversation as resolved.
Show resolved Hide resolved
when (subscriptionCount) {
0 -> return@withLock
1 -> {
coroutineScope.coroutineContext.cancelChildren()
}
}
subscriptionCount--
}

private suspend fun postItems(items: List<TimelineItem>) = coroutineScope {
// Split the initial items in multiple list as there is no pagination in the cached data, so we can post timelineItems asap.
items.chunked(INITIAL_MAX_SIZE).reversed().forEach {
ensureActive()
timelineDiffProcessor.postItems(it)
}
isInit.set(true)
initLatch.complete(Unit)
}

private suspend fun postDiffs(diffs: List<TimelineDiff>) {
val diffsToProcess = diffs.toMutableList()
if (!isInit.get()) {
val resetDiff = diffsToProcess.firstOrNull { it.change() == TimelineChange.RESET }
if (resetDiff != null) {
// Keep using the postItems logic so we can post the timelineItems asap.
postItems(resetDiff.reset() ?: emptyList())
diffsToProcess.remove(resetDiff)
}
}
initLatch.await()
if (diffsToProcess.isNotEmpty()) {
timelineDiffProcessor.postDiffs(diffsToProcess)
}
}
}
Loading