Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor visitors' codec lists, and aggregate them into a conference property. #1137

Merged
merged 11 commits into from
Feb 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ interface ChatRoomMember {
/** The statistics id if any. */
val statsId: String?

/** The supported video codecs if any */
val videoCodecs: List<String>?

/**
* The list of features advertised as XMPP capabilities. Note that although the features are cached (XEP-0115),
* the first time [features] is accessed it may block waiting for a disco#info response!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.jitsi.xmpp.extensions.jitsimeet.AudioMutedExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StartMutedPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.StatsId
Expand All @@ -36,6 +37,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.VideoMutedExtension
import org.jivesoftware.smack.packet.Presence
import org.jivesoftware.smack.packet.StandardExtensionElement
import org.jivesoftware.smackx.caps.packet.CapsExtension
import org.json.simple.JSONArray
import org.jxmpp.jid.EntityFullJid
import org.jxmpp.jid.Jid

Expand Down Expand Up @@ -68,6 +70,8 @@ class ChatRoomMemberImpl(
private set
override var statsId: String? = null
private set
override var videoCodecs: List<String>? = null
private set
override var isAudioMuted = true
private set
override var isVideoMuted = true
Expand Down Expand Up @@ -150,6 +154,7 @@ class ChatRoomMemberImpl(
fun processPresence(presence: Presence) {
require(presence.from == occupantJid) { "Ignoring presence for a different member: ${presence.from}" }

val firstPresence = (this.presence == null)
this.presence = presence
presence.getExtension(UserInfoPacketExt::class.java)?.let {
val newStatus = it.isRobot
Expand Down Expand Up @@ -185,12 +190,14 @@ class ChatRoomMemberImpl(
isJibri = false
}

val oldRole = role
chatRoom.getOccupant(this)?.let { role = fromSmack(it.role, it.affiliation) }
if ((role == MemberRole.VISITOR) != (oldRole == MemberRole.VISITOR)) {
var newRole: MemberRole = MemberRole.VISITOR
chatRoom.getOccupant(this)?.let { newRole = fromSmack(it.role, it.affiliation) }
if (!firstPresence && (role == MemberRole.VISITOR) != (newRole == MemberRole.VISITOR)) {
// This will mess up various member counts
// TODO: Should we try to update them, instead?
logger.warn("Member role changed from $oldRole to $role - not supported!")
logger.warn("Member role changed from $role to $newRole - not supported!")
} else {
role = newRole
}

isTranscriber = isJigasi && presence.getExtension(TranscriptionStatusExtension::class.java) != null
Expand All @@ -209,6 +216,36 @@ class ChatRoomMemberImpl(
presence.getExtension(StatsId::class.java)?.let {
statsId = it.statsId
}

presence.getExtension(JitsiParticipantCodecList::class.java)?.let {
if (!firstPresence && it.codecs != videoCodecs) {
logger.warn("Video codec list changed from $videoCodecs to ${it.codecs} - not supported!")
} else {
if (!it.codecs.contains("vp8")) {
if (firstPresence) {
logger.warn("Video codec list {${it.codecs}} does not contain vp8! Adding manually.")
}
videoCodecs = it.codecs + "vp8"
} else {
videoCodecs = it.codecs
}
}
} ?: // Older clients sent a single codec in codecType rather than all supported ones in codecList
presence.getExtensionElement("jitsi_participant_codecType", "jabber:client")?.let {
if (it is StandardExtensionElement) {
val codec = it.text.lowercase()
val codecList = if (codec == "vp8") {
listOf(codec)
} else {
listOf(codec, "vp8")
}
if (!firstPresence && codecList != videoCodecs) {
logger.warn("Video codec list changed from $videoCodecs to $codecList - not supported!")
} else {
videoCodecs = codecList
}
}
}
}

/**
Expand Down Expand Up @@ -240,6 +277,7 @@ class ChatRoomMemberImpl(
this["is_jibri"] = isJibri
this["is_jigasi"] = isJigasi
this["role"] = role.toString()
this["video_codecs"] = JSONArray().apply { videoCodecs?.let { addAll(it) } }
this["stats_id"] = statsId.toString()
this["is_audio_muted"] = isAudioMuted
this["is_video_muted"] = isVideoMuted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public class JitsiMeetConferenceImpl
return null;
});

/**
* The aggregated count of visitors' supported codecs
*/
private final PreferenceAggregator visitorCodecs;

/**
* The {@link JibriRecorder} instance used to provide live streaming through
* Jibri.
Expand Down Expand Up @@ -290,6 +295,16 @@ public JitsiMeetConferenceImpl(
TimeUnit.MILLISECONDS);


visitorCodecs = new PreferenceAggregator(
logger,
(codecs) -> {
setConferenceProperty(
ConferenceProperties.KEY_VISITOR_CODECS,
String.join(",", codecs)
);
return null;
});

logger.info("Created new conference.");
}

Expand Down Expand Up @@ -816,7 +831,7 @@ private void inviteChatMember(ChatRoomMember chatRoomMember, boolean justJoined)
}
else if (participant.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorAdded();
visitorAdded(participant.getChatMember().getVideoCodecs());
}
}

Expand Down Expand Up @@ -1042,7 +1057,7 @@ private void terminateParticipant(
}
else if (removed.getChatMember().getRole() == MemberRole.VISITOR)
{
visitorRemoved();
visitorRemoved(removed.getChatMember().getVideoCodecs());
}
}
}
Expand Down Expand Up @@ -1579,6 +1594,7 @@ else if (member.isJigasi())
}
}
o.put("visitor_count", visitorCount);
o.put("visitor_codecs", visitorCodecs.debugState());
o.put("participant_count", participantCount);
o.put("jibri_count", jibriCount);
o.put("jigasi_count", jigasiCount);
Expand Down Expand Up @@ -2013,15 +2029,23 @@ private void rescheduleSingleParticipantTimeout()
}

/** Called when a new visitor has been added to the conference. */
private void visitorAdded()
private void visitorAdded(List<String> codecs)
{
visitorCount.adjustValue(+1);
if (codecs != null)
{
visitorCodecs.addPreference(codecs);
}
}

/** Called when a new visitor has been added to the conference. */
private void visitorRemoved()
private void visitorRemoved(List<String> codecs)
{
visitorCount.adjustValue(-1);
if (codecs != null)
{
visitorCodecs.removePreference(codecs);
Copy link
Member

@bgrozev bgrozev Feb 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We allow codecs to change from null to a non-null, so if a visitor joins with no codecs and then adds some it would mess with the count. It's a bit of a stretch, but one could trick jicofo into upgrading to av1 thus breaking video for some participants.
Maybe make ChatRoomMemberImpl.videoCodecs a lazy val and initialize it with the first presence received?

Unless I misunderstand how the aggregator works

}
}

/**
Expand Down
150 changes: 150 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/util/PreferenceAggregator.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Jicofo, the Jitsi Conference Focus.
*
* Copyright @ 2015-Present 8x8, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.jicofo.util

import org.jitsi.utils.OrderedJsonObject
import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.logging2.createChildLogger
import org.json.simple.JSONArray

/** Aggregate lists of preferences coming from a large group of people, such that the resulting aggregated
* list consists of preference items supported by everyone, and in a rough consensus of preference order.
*
* The intended use case is maintaining the list of supported codecs for conference visitors.
*
* Preference orders are aggregated using the Borda count; this isn't theoretically optimal, but it should be
* good enough, and it's computationally cheap.
*/
class PreferenceAggregator(
parentLogger: Logger,
private val onChanged: (List<String>) -> Unit
) {
private val logger = createChildLogger(parentLogger)
private val lock = Any()

var aggregate: List<String> = emptyList()
private set

var count = 0
private set

private val values = mutableMapOf<String, ValueInfo>()

/**
* Add a preference to the aggregator.
*/
fun addPreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count++
distinctPrefs.forEachIndexed { index, element ->
val info = values.computeIfAbsent(element) { ValueInfo() }
info.count++
info.rankAggregate += index
}
updateAggregate()
}
}

/**
* Remove a preference from the aggregator.
*/
fun removePreference(prefs: List<String>) {
val distinctPrefs = prefs.distinct()
if (distinctPrefs != prefs) {
logger.warn("Preferences $prefs contains repeated values")
}
synchronized(lock) {
count--
check(count >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be force to fail by sending initial presence with no codecs, then updating with some codecs, then leaving repeatedly

"Preference count $count should not be negative"
}
distinctPrefs.forEachIndexed { index, element ->
val info = values[element]
check(info != null) {
"Preference info for $element should exist when preferences are being removed"
}
info.count--
check(info.count >= 0) {
"Preference count for $element ${info.count} should not be negative"
}
info.rankAggregate -= index
check(info.rankAggregate >= 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should not be negative"
}
if (info.count == 0) {
check(info.rankAggregate == 0) {
"Preference rank aggregate for $element ${info.rankAggregate} should be zero " +
"when preference count is 0"
}
values.remove(element)
}
}
updateAggregate()
}
}

fun reset() {
synchronized(lock) {
aggregate = emptyList()
count = 0
values.clear()
}
}

fun debugState() = OrderedJsonObject().apply {
synchronized(lock) {
put("count", count)
put(
"ranks",
OrderedJsonObject().apply {
[email protected]()
.sortedBy { it.value.rankAggregate }
.forEach { put(it.key, it.value.debugState()) }
}
)
put("aggregate", JSONArray().apply { addAll(aggregate) })
}
}

private fun updateAggregate() {
val newAggregate = values.asSequence()
.filter { it.value.count == count }
.sortedBy { it.value.rankAggregate }
.map { it.key }
.toList()
if (aggregate != newAggregate) {
aggregate = newAggregate
/* ?? Do we need to drop the lock before calling this? */
onChanged(aggregate)
}
}

private class ValueInfo {
var count = 0
var rankAggregate = 0

fun debugState() = OrderedJsonObject().apply {
put("count", count)
put("rank_aggregate", rankAggregate)
}
}
}
6 changes: 6 additions & 0 deletions jicofo/src/main/kotlin/org/jitsi/jicofo/xmpp/Smack.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.jitsi.xmpp.extensions.jitsimeet.ConferenceIqProvider
import org.jitsi.xmpp.extensions.jitsimeet.FeatureExtension
import org.jitsi.xmpp.extensions.jitsimeet.FeaturesExtension
import org.jitsi.xmpp.extensions.jitsimeet.IceStatePacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantCodecList
import org.jitsi.xmpp.extensions.jitsimeet.JitsiParticipantRegionPacketExtension
import org.jitsi.xmpp.extensions.jitsimeet.JsonMessageExtension
import org.jitsi.xmpp.extensions.jitsimeet.LoginUrlIqProvider
Expand Down Expand Up @@ -119,6 +120,11 @@ fun registerXmppExtensions() {
StatsId.NAMESPACE,
StatsId.Provider()
)
ProviderManager.addExtensionProvider(
JitsiParticipantCodecList.ELEMENT,
JitsiParticipantCodecList.NAMESPACE,
DefaultPacketExtensionProvider(JitsiParticipantCodecList::class.java)
)

// Add the extensions used for handling the inviting of transcriber
ProviderManager.addExtensionProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class JibriChatRoomMember(
override val isVideoMuted: Boolean get() = TODO("Not yet implemented")
override val region: String? get() = TODO("Not yet implemented")
override val statsId: String? get() = TODO("Not yet implemented")
override val videoCodecs: List<String>? get() = TODO("Not yet implemented")
override val features: Set<Features> get() = TODO("Not yet implemented")
override val debugState: OrderedJsonObject get() = TODO("Not yet implemented")

Expand Down
Loading
Loading