Skip to content

Commit

Permalink
Improve DM performance (#310)
Browse files Browse the repository at this point in the history
* implement the peer inbox id

* add the binaries

* log errors
  • Loading branch information
nplasterer authored Oct 30, 2024
1 parent bbdac09 commit a39b6c2
Show file tree
Hide file tree
Showing 11 changed files with 1,319 additions and 637 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class DmTest {

assertEquals(
runBlocking
{ dm.peerInboxId() },
{ dm.peerInboxId },
alixClient.inboxId,
)
}
Expand Down
4 changes: 2 additions & 2 deletions library/src/main/java/libxmtp-version.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Version: 9331eb7d
Version: 257063e9
Branch: main
Date: 2024-10-23 16:57:08 +0000
Date: 2024-10-29 02:56:38 +0000
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ sealed class Conversation {
is V1 -> conversationV1.peerAddress
is V2 -> conversationV2.peerAddress
is Group -> runBlocking { group.peerInboxIds().joinToString(",") }
is Dm -> runBlocking { dm.peerInboxId() }
is Dm -> dm.peerInboxId
}
}

Expand All @@ -327,7 +327,7 @@ sealed class Conversation {
is V1 -> listOf(conversationV1.peerAddress)
is V2 -> listOf(conversationV2.peerAddress)
is Group -> runBlocking { group.peerInboxIds() }
is Dm -> runBlocking { listOf(dm.peerInboxId()) }
is Dm -> listOf(dm.peerInboxId)
}
}

Expand Down
29 changes: 29 additions & 0 deletions library/src/main/java/org/xmtp/android/library/Conversations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import uniffi.xmtpv3.FfiListMessagesOptions
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import uniffi.xmtpv3.FfiPermissionPolicySet
import uniffi.xmtpv3.FfiSubscribeException
import uniffi.xmtpv3.FfiV2SubscribeRequest
import uniffi.xmtpv3.FfiV2Subscription
import uniffi.xmtpv3.FfiV2SubscriptionCallback
Expand Down Expand Up @@ -561,6 +562,10 @@ data class Conversations(
trySend(Conversation.Group(Group(client, conversation)))
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Conversation stream", error.message.toString())
}
}
val stream = libXMTPConversations?.stream(conversationCallback)
?: throw XMTPException("Client does not support Groups")
Expand All @@ -572,6 +577,10 @@ data class Conversations(
override fun onConversation(conversation: FfiConversation) {
trySend(Conversation.Group(Group(client, conversation)))
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Conversation stream", error.message.toString())
}
}

val stream = libXMTPConversations?.streamGroups(groupCallback)
Expand All @@ -584,6 +593,10 @@ data class Conversations(
override fun onConversation(conversation: FfiConversation) {
trySend(Group(client, conversation))
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Group stream", error.message.toString())
}
}
val stream = libXMTPConversations?.streamGroups(groupCallback)
?: throw XMTPException("Client does not support Groups")
Expand Down Expand Up @@ -614,6 +627,10 @@ data class Conversations(
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP all group message stream", error.message.toString())
}
}
val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback)
?: throw XMTPException("Client does not support Groups")
Expand All @@ -628,6 +645,10 @@ data class Conversations(
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP all group message stream", error.message.toString())
}
}
val stream = libXMTPConversations?.streamAllGroupMessages(messageCallback)
?: throw XMTPException("Client does not support Groups")
Expand All @@ -650,6 +671,10 @@ data class Conversations(
}
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP all message stream", error.message.toString())
}
}

val stream = libXMTPConversations?.streamAllMessages(messageCallback)
Expand All @@ -675,6 +700,10 @@ data class Conversations(
}
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP all message stream", error.message.toString())
}
}

val stream = libXMTPConversations?.streamAllMessages(messageCallback)
Expand Down
19 changes: 13 additions & 6 deletions library/src/main/java/org/xmtp/android/library/Dm.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -20,6 +21,7 @@ import uniffi.xmtpv3.FfiDirection
import uniffi.xmtpv3.FfiListMessagesOptions
import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import uniffi.xmtpv3.FfiSubscribeException
import java.util.Date
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.DurationUnit
Expand All @@ -34,6 +36,9 @@ class Dm(val client: Client, private val libXMTPGroup: FfiConversation) {
val createdAt: Date
get() = Date(libXMTPGroup.createdAtNs() / 1_000_000)

val peerInboxId: String
get() = libXMTPGroup.dmPeerInboxId()

private val metadata: FfiConversationMetadata
get() = libXMTPGroup.groupMetadata()

Expand Down Expand Up @@ -169,12 +174,6 @@ class Dm(val client: Client, private val libXMTPGroup: FfiConversation) {
return libXMTPGroup.listMembers().map { Member(it) }
}

suspend fun peerInboxId(): String {
val ids = members().map { it.inboxId }.toMutableList()
ids.remove(client.inboxId)
return ids.first()
}

fun streamMessages(): Flow<DecodedMessage> = callbackFlow {
val messageCallback = object : FfiMessageCallback {
override fun onMessage(message: FfiMessage) {
Expand All @@ -183,6 +182,10 @@ class Dm(val client: Client, private val libXMTPGroup: FfiConversation) {
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Dm stream", error.message.toString())
}
}

val stream = libXMTPGroup.stream(messageCallback)
Expand All @@ -197,6 +200,10 @@ class Dm(val client: Client, private val libXMTPGroup: FfiConversation) {
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Dm stream", error.message.toString())
}
}

val stream = libXMTPGroup.stream(messageCallback)
Expand Down
10 changes: 10 additions & 0 deletions library/src/main/java/org/xmtp/android/library/Group.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.xmtp.android.library

import android.util.Log
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
Expand All @@ -24,6 +25,7 @@ import uniffi.xmtpv3.FfiMessage
import uniffi.xmtpv3.FfiMessageCallback
import uniffi.xmtpv3.FfiMetadataField
import uniffi.xmtpv3.FfiPermissionUpdateType
import uniffi.xmtpv3.FfiSubscribeException
import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.PermissionOption
import uniffi.xmtpv3.org.xmtp.android.library.libxmtp.PermissionPolicySet
import java.util.Date
Expand Down Expand Up @@ -409,6 +411,10 @@ class Group(val client: Client, private val libXMTPGroup: FfiConversation) {
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Group stream", error.message.toString())
}
}

val stream = libXMTPGroup.stream(messageCallback)
Expand All @@ -423,6 +429,10 @@ class Group(val client: Client, private val libXMTPGroup: FfiConversation) {
trySend(it)
}
}

override fun onError(error: FfiSubscribeException) {
Log.e("XMTP Group stream", error.message.toString())
}
}

val stream = libXMTPGroup.stream(messageCallback)
Expand Down
Loading

0 comments on commit a39b6c2

Please sign in to comment.