From cb53c320e91b975a408f442cf0f8f44071a9d685 Mon Sep 17 00:00:00 2001 From: MeLike2D Date: Wed, 17 Mar 2021 23:12:29 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20convert=20bedrock=20to=20kotlinx.se?= =?UTF-8?q?rialization,=20add=20a=20player=20event,=20document=20the=20api?= =?UTF-8?q?,=20and=20other=20things?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 16 +- .obsidianrc | 2 +- API.md | 442 ++++++++++++++++++ Server/build.gradle | 2 + .../obsidian/bedrock/BedrockEventAdapter.kt | 8 +- .../obsidian/bedrock/BedrockEventListener.kt | 29 +- .../obsidian/bedrock/EventDispatcher.kt | 35 +- .../kotlin/obsidian/bedrock/codec/Codec.kt | 26 +- .../obsidian/bedrock/codec/CodecType.kt | 5 + .../obsidian/bedrock/codec/OpusCodec.kt | 15 +- .../gateway/AbstractMediaGatewayConnection.kt | 185 +++++--- .../gateway/MediaGatewayV4Connection.kt | 133 +++--- .../kotlin/obsidian/bedrock/gateway/Op.kt | 41 -- .../obsidian/bedrock/gateway/event/Command.kt | 145 ++++++ .../obsidian/bedrock/gateway/event/Event.kt | 109 +++++ .../obsidian/bedrock/gateway/event/Op.kt | 64 +++ .../bedrock/handler/ConnectionHandler.kt | 8 +- .../bedrock/handler/DiscordUDPConnection.kt | 21 +- .../main/kotlin/obsidian/server/Obsidian.kt | 33 +- .../kotlin/obsidian/server/io/Dispatch.kt | 22 +- .../main/kotlin/obsidian/server/io/Magma.kt | 73 ++- .../kotlin/obsidian/server/io/MagmaClient.kt | 84 ++-- .../src/main/kotlin/obsidian/server/io/Op.kt | 2 +- .../kotlin/obsidian/server/io/Operation.kt | 7 +- .../player/filter/impl/KaraokeFilter.kt | 4 + .../player/filter/impl/RotationFilter.kt | 2 + .../player/filter/impl/TimescaleFilter.kt | 45 +- .../kotlin/obsidian/server/util/Builders.kt | 3 - .../kotlin/obsidian/server/util/Extensions.kt | 3 +- 29 files changed, 1206 insertions(+), 358 deletions(-) create mode 100644 API.md delete mode 100644 Server/src/main/kotlin/obsidian/bedrock/gateway/Op.kt create mode 100644 Server/src/main/kotlin/obsidian/bedrock/gateway/event/Command.kt create mode 100644 Server/src/main/kotlin/obsidian/bedrock/gateway/event/Event.kt create mode 100644 Server/src/main/kotlin/obsidian/bedrock/gateway/event/Op.kt diff --git a/.gitignore b/.gitignore index 1a6164d..a136e16 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,16 @@ .gradle/ .idea/ +.settings/ -logs/ -build/ -Server/build/ -js-test/ +/logs/ +/build/ +/test/ +/target/ +/Server/build .obsidianrc +*.iml *.jar -application.yml -/js-test/ + +.project +.classpath diff --git a/.obsidianrc b/.obsidianrc index b800547..166280f 100644 --- a/.obsidianrc +++ b/.obsidianrc @@ -18,7 +18,7 @@ obsidian: playlist-page-limit: 6 logging: - level: debug + level: TRACE file: max-history: 30 diff --git a/API.md b/API.md new file mode 100644 index 0000000..44d4e1b --- /dev/null +++ b/API.md @@ -0,0 +1,442 @@ +# Obsidian API Documentation + +Welcome to the Obsidian API Documentation! This document describes mostly everything about Magma and the REST API. + +###### What's Magma? + +Magma is the name for the WebSocket and REST server! + +--- + +- **Current Version:** 1.0.0-pre + +## Magma REST + +As of version `1.0.0` of obsidian the REST API is only used for loading tracks. This will most likely change in future +releases. + +*p.s. nothing requires authorization... this will change in a future release, you should provide the authorization +header anyways* + +###### Load Tracks + +``` +GET /loadtracks?identifier=D-ocerKPufk +``` + +**Example Response** + +*this was actually taken from a response btw* + +```json +{ + "load_type": "TRACK_LOADED", + "playlist_info": null, + "tracks": [ + { + "track": "...", + "info": { + "title": "The Kid LAROI - SELFISH (Official Video)", + "author": "TheKidLAROIVEVO", + "uri": "https://www.youtube.com/watch?v=D-ocerKPufk", + "identifier": "D-ocerKPufk", + "length": 270000, + "position": 0, + "is_stream": false, + "is_seekable": true + } + } + ], + "exception": null +} +``` + +###### Decode Track(s) + +**Singular** + +``` +GET /decodetrack?track=QAAAkQIAKF... +``` + +**Example Response** + +```json +{ + "title": "The Kid LAROI - SELFISH (Official Video)", + "author": "TheKidLAROIVEVO", + "uri": "https://www.youtube.com/watch?v=D-ocerKPufk", + "identifier": "D-ocerKPufk", + "length": 270000, + "position": 0, + "is_stream": false, + "is_seekable": true +} +``` + +**Multiple** + +``` +POST /decodetracks + +{ + "tracks": [ + "..." + ] +} +``` + +**Example Response** + +ok, yknow what... idc man + +jk the response is just an array filled with track info objects (refer to the singular track response) + +## Magma WebSocket + +Magma also has a WebSocket used for real-time player updates, it also allows for player control + +##### Connecting + +To connect you must have these headers assigned + +``` +Authorization: Password configured in `.obsidianrc` +User-Id: The user id of the bot you're playing music with +``` + +**Close Codes** + +| close code | reason | +|:-----------|:---------------------------------------------------| +| 4001 | You specified the incorrect authorization | +| 4002 | You didn't specify the `User-Id` header | +| 4004 | A client with the specified user id already exists | +| 4005 | An error occurred while handling incoming frames | + +###### Payload Structure + +- **op:** numeric op code +- **d:** payload data + +```json +{ + "op": 0, + "d": {} +} +``` + +### Operations + +| op code/name | description | +|:--------|:--------| +| 0 • submit voice update | allows obsidian to connect to the discord voice server | +| 1 • stats | has resource usage for both the system and jvm, also includes player count | +| 2 • player event | dispatched when a player event occurs, e.g. track end, track start | +| 3 • player update | used to keep track of player state, e.g. current position and filters | +| 4 • play track | used to play tracks | +| 5 • stop track | stops the current track | +| 6 • pause | configures the pause state of the player | +| 7 • filters | configures the current filters | +| 8 • seek | seeks to the specified position | +| 9 • destroy | used to destroy players | + +#### Submit Voice Update + +The equivalent of `voiceUpdate` for lavalink and `voice-state-update` for andesite + +```json +{ + "op": 0, + "d": { + "guild_id": "751571246189379610", + "token": "your token", + "session_id": "your session id", + "endpoint": "smart.loyal.discord.gg" + } +} +``` + +- `token` and `endpoint` can be received from Discord's `VOICE_SERVER_UPDATE` dispatch event +- `session_id` can be received from Discord's `VOICE_STATE_UPDATE` dispatch event + +#### Stats + +todo + +#### Player Events + +List of current player events Example: + +```json +{ + "op": 2, + "d": { + "type": "TRACK_START", + "guild_id": "751571246189379610", + "track": "QAAAmAIAO0tTSSDigJMgUGF0aWVuY2UgKGZlYXQuIFlVTkdCTFVEICYgUG9sbyBHKSBbT2ZmaWNpYWwgQXVkaW9dAANLU0kAAAAAAALG8AALTXJmVTRhVGNVYU0AAQAraHR0cHM6Ly93d3cueW91dHViZS5jb20vd2F0Y2g/dj1NcmZVNGFUY1VhTQAHeW91dHViZQAAAAAAAAAA" + } +} +``` + +--- + +*code blocks below represent data in the `d` field* + +##### `TRACK_START` + +Dispatched when a track starts + +--- + +- `track` *self-explanatory* + +```json +{ + ... + "track": "..." +} +``` + +##### `TRACK_END` + +Dispatched when a track ends + +--- + +- `track` the track that had ended + +```json +{ + ... + "track": "..." +} +``` + +##### `TRACK_STUCK` + +dispatched when track playback is stuck + +--- + +- `track` *self-explanatory* +- `threshold_ms` The wait threshold that was exceeded for this event to trigger + +```json +{ + ... + "track": "", + "threshold_ms": 1000 +} +``` + +- [** + Lavaplayer**](https://github.com/sedmelluq/lavaplayer/blob/bec39953a037b318663fad76873fbab9ce13c033/main/src/main/java/com/sedmelluq/discord/lavaplayer/player/event/TrackStuckEvent.java) + +##### `TRACK_EXCEPTION` + +Dispatched when lavaplayer encounters an error while playing a track + +--- + +- `track` the track that threw an exception during playback +- `exception` *everything is self-explanatory* + - `message` + - `cause` + - `severity` *see [Severity Types](#severity-types)* + +```json +{ + ... + "track": "...", + "exception": { + "message": "This video is too cool for the people listening", + "cause": "Lack of coolness by the listeners", + "severity": "COMMON" + } +} +``` + +###### Severity Types + +- **COMMON**, **FAULT**, **SUSPICIOUS** + +For more information +visit **[this](https://github.com/sedmelluq/lavaplayer/blob/master/main/src/main/java/com/sedmelluq/discord/lavaplayer/tools/FriendlyException.java#L30-L46)** + +##### `WEBSOCKET_READY` + +Dispatched when the voice websocket receives the `READY` op + +--- + +- `target` voice server ip +- `ssrc` voice ssrc + +*refer to the Discord voice docs for what ssrc is for* + +```json +{ + ... + "target": "420.69.69.9", + "ssrc": 42069 +} +``` + +##### `WEBSOCKET_CLOSED` + +Dispatched when the voice websocket closes + +- `code` close code +- `reason` close reason + +```json +{ + ... + "code": 4000, + "reason": "" +} +``` + +#### Player Update + +- `frames` describes the number of frames, in the last minute, that have been sent or lost +- `current_track` describes the current playing track + - `track` the base64 encoded track + - `position` the current playback position + - `paused` whether playback is paused + +```json +{ + "op": 3, + "d": { + "guild_id": "751571246189379610", + "frames": { + "lost": 0, + "sent": 3000 + }, + "current_track": { + "track": "...", + "position": 3192719, + "paused": false + } + } +} +``` + +#### Play Track + +- `start_time` specifies the number of milliseconds to offset the track by +- `end_time` specifies when to stop the track (in milliseconds) +- `no_replace` when specified, the server will ignore this request if a track is already playing + +```json +{ + "op": 4, + "d": { + "guild_id": "...", + "track": "...", + "end_time": 30000, + "start_time": 130000, + "no_replace": false + } +} +``` + +#### Stop Track + +```json +{ + "op": 4, + "d": { + "guild_id": "..." + } +} +``` + +#### Pause + +- `state` whether playback should be paused. + +```json +{ + "op": 6, + "d": { + "state": true + } +} +``` + +#### Filters + +*p.s. this operation overrides any filters that were previously configured, also I didn't really care to fill out the +example lol* + +- `volume` the volume to set. `0.0` through `5.0` is accepted, where `1.0` is 100% +- `tremolo` creates a shuddering effect, where the volume quickly oscillates + - `frequency` Effect frequency • `0 < x` + - `depth` Effect depth • `0 < x ≤ 1` + + +- `equalizer` There are 15 bands (0-14) that can be configured. Each band has a gain and band field, band being the band + number and gain being a number between `-0.25` and `1.0` + `-0.25` means the band is completed muted and `0.25` meaning it's doubled + + +- `timescale` [Time stretch and pitch scale](https://en.wikipedia.org/wiki/Audio_time_stretching_and_pitch_scaling) + filter implementation + - `pitch` Sets the audio pitch + - `pitch_octaves` Sets the audio pitch in octaves, this cannot be used in conjunction with the other two options + - `pitch_semi_tones` Sets the audio pitch in semi tones, this cannot be used in conjunction with the other two pitch + options + - `rate` Sets the audio rate, cannot be used in conjunction with `rate_change` + - `rate_change` Sets the audio rate, in percentage, relative to the default + - `speed` Sets the playback speed, cannot be used in conjunction with `speed_change` + - `speed_change` Sets the playback speed, in percentage, relative to the default + + +- `karaoke` Uses equalization to eliminate part of a band, usually targeting vocals. None of these i have explanations + for... ask [natan](https://github.com/natanbc/lavadsp) ig + - `filter_band`, `filter_width`, `level`, `mono_level` + + +- `channel_mix` This filter mixes both channels (left and right), with a configurable factor on how much each channel + affects the other. With the defaults, both channels are kept independent of each other. Setting all factors to `0.5` + means both channels get the same audio + - `right_to_left` The current right-to-left factor. The default is `0.0` + - `right_to_right` The current right-to-right factor. The default is `1.0` + - `left_to_right` The current left-to-right factor. The default is `0.0` + - `left_to_left` The current left-to-left factor. The default is `1.0` + + +- `vibrato` Similar to tremolo. While tremolo oscillates the volume, vibrato oscillates the pitch + - `frequency` Effect frequency • `0 < x ≤ 14` + - `depth` Effect depth • `0 < x ≤ 1` + + +- `rotation` This filter simulates an audio source rotating around the listener + - `rotation_hz` The frequency the audio should rotate around the listener, in Hertz + + +- `low_pass` Higher frequencies get suppressed, while lower frequencies pass through this filter, thus the name low pass + - `smoothing` Smoothing to use. 20 is the default + +```json +{ + "op": 7, + "d": { + "guild_id": "...", + "volume": 1.0, + "timescale": {}, + "karaoke": {}, + "channel_mix": {}, + "vibrato": {}, + "rotation": {}, + "low_pass": {}, + "tremolo": {}, + "equalizer": { + "bands": [] + } + } +} +``` diff --git a/Server/build.gradle b/Server/build.gradle index 12c8c28..b12b3cd 100644 --- a/Server/build.gradle +++ b/Server/build.gradle @@ -87,8 +87,10 @@ compileKotlin { kotlinOptions { jvmTarget = "13" + incremental = true freeCompilerArgs += "-Xopt-in=kotlin.ExperimentalStdlibApi" freeCompilerArgs += "-Xopt-in=kotlinx.coroutines.ObsoleteCoroutinesApi" + freeCompilerArgs += "-Xopt-in=kotlinx.coroutines.ExperimentalCoroutinesApi" freeCompilerArgs += "-Xopt-in=io.ktor.locations.KtorExperimentalLocationsAPI" freeCompilerArgs += "-Xinline-classes" } diff --git a/Server/src/main/kotlin/obsidian/bedrock/BedrockEventAdapter.kt b/Server/src/main/kotlin/obsidian/bedrock/BedrockEventAdapter.kt index 7a8f53e..f2370a9 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/BedrockEventAdapter.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/BedrockEventAdapter.kt @@ -23,14 +23,8 @@ import org.json.JSONObject open class BedrockEventAdapter : BedrockEventListener { override suspend fun gatewayReady(target: NetworkAddress, ssrc: Int) = Unit - override suspend fun gatewayClosed(code: Int, byRemote: Boolean, reason: String?) = Unit - + override suspend fun gatewayClosed(code: Short, reason: String?) = Unit override suspend fun userConnected(id: String?, audioSSRC: Int, videoSSRC: Int, rtxSSRC: Int) = Unit - override suspend fun userDisconnected(id: String?) = Unit - - override suspend fun externalIPDiscovered(address: NetworkAddress) = Unit - override suspend fun sessionDescription(session: JSONObject?) = Unit - override suspend fun heartbeatDispatched(nonce: Long) = Unit override suspend fun heartbeatAcknowledged(nonce: Long) = Unit } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/BedrockEventListener.kt b/Server/src/main/kotlin/obsidian/bedrock/BedrockEventListener.kt index 5b49205..67aa5c2 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/BedrockEventListener.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/BedrockEventListener.kt @@ -19,21 +19,34 @@ package obsidian.bedrock import io.ktor.util.network.* -import org.json.JSONObject interface BedrockEventListener { + /** + * Called when we receive a READY opcode from the voice server + * + * @param target The target address + * @param ssrc The ssrc + */ suspend fun gatewayReady(target: NetworkAddress, ssrc: Int) - suspend fun gatewayClosed(code: Int, byRemote: Boolean, reason: String?) + /** + * Called when the connection to the voice server closes. + * + * @param code Close code + * @param reason Close reason + */ + suspend fun gatewayClosed(code: Short, reason: String?) + /** + * Called whenever a user connects to the voice channel + * + * @param id The user's id + * @param audioSSRC Audio ssrc + * @param videoSSRC Video ssrc + * @param rtxSSRC Idk + */ suspend fun userConnected(id: String?, audioSSRC: Int, videoSSRC: Int, rtxSSRC: Int) - suspend fun userDisconnected(id: String?) - - suspend fun externalIPDiscovered(address: NetworkAddress) - - suspend fun sessionDescription(session: JSONObject?) - /** * Called whenever we dispatch a heartbeat. * diff --git a/Server/src/main/kotlin/obsidian/bedrock/EventDispatcher.kt b/Server/src/main/kotlin/obsidian/bedrock/EventDispatcher.kt index ef532c5..54091c0 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/EventDispatcher.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/EventDispatcher.kt @@ -19,7 +19,6 @@ package obsidian.bedrock import io.ktor.util.network.* -import org.json.JSONObject class EventDispatcher : BedrockEventListener { private var listeners = hashSetOf() @@ -31,34 +30,32 @@ class EventDispatcher : BedrockEventListener { listeners.remove(listener) override suspend fun gatewayReady(target: NetworkAddress, ssrc: Int) { - for (listener in listeners) listener.gatewayReady(target, ssrc) + for (listener in listeners) { + listener.gatewayReady(target, ssrc) + } } - override suspend fun gatewayClosed(code: Int, byRemote: Boolean, reason: String?) { - for (listener in listeners) listener.gatewayClosed(code, byRemote, reason) + override suspend fun gatewayClosed(code: Short, reason: String?) { + for (listener in listeners) { + listener.gatewayClosed(code, reason) + } } override suspend fun userConnected(id: String?, audioSSRC: Int, videoSSRC: Int, rtxSSRC: Int) { - for (listener in listeners) listener.userConnected(id, audioSSRC, videoSSRC, rtxSSRC) - } - - override suspend fun userDisconnected(id: String?) { - for (listener in listeners) listener.userDisconnected(id) - } - - override suspend fun externalIPDiscovered(address: NetworkAddress) { - for (listener in listeners) listener.externalIPDiscovered(address) - } - - override suspend fun sessionDescription(session: JSONObject?) { - for (listener in listeners) listener.sessionDescription(session) + for (listener in listeners) { + listener.userConnected(id, audioSSRC, videoSSRC, rtxSSRC) + } } override suspend fun heartbeatDispatched(nonce: Long) { - for (listener in listeners) listener.heartbeatDispatched(nonce) + for (listener in listeners) { + listener.heartbeatDispatched(nonce) + } } override suspend fun heartbeatAcknowledged(nonce: Long) { - for (listener in listeners) listener.heartbeatAcknowledged(nonce) + for (listener in listeners) { + listener.heartbeatAcknowledged(nonce) + } } } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/codec/Codec.kt b/Server/src/main/kotlin/obsidian/bedrock/codec/Codec.kt index 1df5453..0ee918f 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/codec/Codec.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/codec/Codec.kt @@ -18,7 +18,7 @@ package obsidian.bedrock.codec -import org.json.JSONObject +import obsidian.bedrock.gateway.event.CodecDescription abstract class Codec { /** @@ -39,7 +39,7 @@ abstract class Codec { /** * The JSON description of this Codec. */ - abstract val jsonDescription: JSONObject + abstract val description: CodecDescription /** * The type of this codec, can only be audio @@ -63,8 +63,24 @@ abstract class Codec { return payloadType == (other as Codec).payloadType } + override fun hashCode(): Int { + var result = name.hashCode() + result = 31 * result + payloadType + result = 31 * result + priority + result = 31 * result + description.hashCode() + result = 31 * result + codecType.hashCode() + result = 31 * result + rtxPayloadType + + return result + } + companion object { - private val AUDIO_CODECS: List by lazy { listOf(OpusCodec.INSTANCE) } + /** + * List of all audio codecs available + */ + private val AUDIO_CODECS: List by lazy { + listOf(OpusCodec.INSTANCE) + } /** * Gets audio codec description by name. @@ -72,6 +88,8 @@ abstract class Codec { * @param name the codec name * @return Codec instance or null if the codec is not found/supported by Bedrock */ - fun getAudio(name: String): Codec? = AUDIO_CODECS.find { it.name == name } + fun getAudio(name: String): Codec? = AUDIO_CODECS.find { + it.name == name + } } } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/codec/CodecType.kt b/Server/src/main/kotlin/obsidian/bedrock/codec/CodecType.kt index dc58a8b..798834f 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/codec/CodecType.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/codec/CodecType.kt @@ -18,6 +18,11 @@ package obsidian.bedrock.codec +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable + +@Serializable enum class CodecType { + @SerialName("audio") AUDIO } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/codec/OpusCodec.kt b/Server/src/main/kotlin/obsidian/bedrock/codec/OpusCodec.kt index d1b6656..659d8ce 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/codec/OpusCodec.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/codec/OpusCodec.kt @@ -18,8 +18,7 @@ package obsidian.bedrock.codec -import obsidian.server.util.buildJson -import org.json.JSONObject +import obsidian.bedrock.gateway.event.CodecDescription class OpusCodec : Codec() { override val name = "opus" @@ -28,12 +27,12 @@ class OpusCodec : Codec() { override val payloadType: Byte = PAYLOAD_TYPE - override val jsonDescription: JSONObject = buildJson { - put("name", name) - put("payload_type", payloadType) - put("priority", priority) - put("type", "audio") - } + override val description = CodecDescription( + name = name, + payloadType = payloadType, + priority = priority, + type = CodecType.AUDIO + ) companion object { /** diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/AbstractMediaGatewayConnection.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/AbstractMediaGatewayConnection.kt index ce213e0..2e01de4 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/gateway/AbstractMediaGatewayConnection.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/gateway/AbstractMediaGatewayConnection.kt @@ -27,19 +27,20 @@ import io.ktor.client.request.* import io.ktor.http.* import io.ktor.http.cio.websocket.* import io.ktor.util.* -import io.ktor.utils.io.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.* import obsidian.bedrock.MediaConnection import obsidian.bedrock.VoiceServerInfo -import obsidian.server.util.buildJsonString -import org.json.JSONObject +import obsidian.bedrock.gateway.event.Command +import obsidian.bedrock.gateway.event.Event +import obsidian.server.io.MagmaClient.Companion.jsonParser import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.nio.charset.Charset +import java.util.concurrent.CancellationException import kotlin.coroutines.CoroutineContext abstract class AbstractMediaGatewayConnection( @@ -48,83 +49,48 @@ abstract class AbstractMediaGatewayConnection( version: Int ) : MediaGatewayConnection, CoroutineScope { + /** + * Whether the websocket is open + */ override var open = false + + /** + * Coroutine context + */ override val coroutineContext: CoroutineContext get() = Job() + Dispatchers.IO - private var eventFlow = MutableSharedFlow(extraBufferCapacity = Int.MAX_VALUE) - private var session: ClientWebSocketSession? = null - private val websocketUrl: String by lazy { "wss://${voiceServerInfo.endpoint.replace(":80", "")}/?v=$version" } + /** + * Broadcast channel + */ + private val channel = BroadcastChannel(1) /** - * Identifies this session + * Event flow */ - protected abstract suspend fun identify() + protected val eventFlow: Flow + get() = channel.openSubscription().asFlow().buffer(Channel.UNLIMITED) /** - * Sends a JSON encoded string to the voice server. - * - * @param op The operation code. - * @param data The operation data. + * Current websocket session */ - suspend inline fun sendPayload(op: Op, data: Any?) { - sendPayload(buildJsonString { - put("op", op.code) - if (data != null) { - put("d", data) - } - }) - } + private lateinit var socket: DefaultClientWebSocketSession /** - * Sends a text payload to the voice server websocket. - * - * @param text The text to send. + * Websocket url to use */ - suspend fun sendPayload(text: String) { - logger.trace("VS <- $text") - session?.send(Frame.Text(text)) + private val websocketUrl: String by lazy { + "wss://${voiceServerInfo.endpoint.replace(":80", "")}/?v=$version" } /** - * Closes the gateway connection. + * Closes the connection to the voice server * * @param code The close code. * @param reason The close reason. */ override suspend fun close(code: Short, reason: String?) { - session?.close(CloseReason(code, reason ?: "")) - } - - protected fun on(op: Op, block: suspend (data: JSONObject) -> Unit) { - eventFlow - .filter { it.getInt("op") == op.code } - .onEach { - try { - block(it) - } catch (ex: Exception) { - logger.error("Error while handling OP ${op.code}", ex) - } - }.launchIn(this) - } - - private suspend fun handleIncoming() { - val session = this.session ?: return - - session.incoming.asFlow().buffer(Channel.UNLIMITED) - .collect { - when (it) { - is Frame.Text -> handleFrame(it) - else -> { /* noop */ - } - } - } - } - - private suspend fun handleFrame(frame: Frame.Text) { - val json = JSONObject(frame.readText()) - logger.trace("VS -> $json") - eventFlow.emit(json) + channel.close() } /** @@ -138,7 +104,7 @@ abstract class AbstractMediaGatewayConnection( open = true while (open) { try { - session = client.webSocketSession { + socket = client.webSocketSession { url(websocketUrl) } } catch (ex: Exception) { @@ -155,13 +121,104 @@ abstract class AbstractMediaGatewayConnection( open = false } + + if (::socket.isInitialized) { + socket.close() + } + + val reason = withTimeoutOrNull(1500) { + socket.closeReason.await() + } + + try { + onClose(reason?.code ?: -1, reason?.message ?: "unknown") + } catch (ex: Exception) { + logger.error(ex) + } + } + + /** + * Sends a JSON encoded string to the voice server. + * + * @param command The command to send + */ + suspend fun sendPayload(command: Command) { + if (open) { + try { + val json = jsonParser.encodeToString(Command.Companion, command) + logger.trace("VS <<< $json") + socket.send(json) + } catch (ex: Exception) { + logger.error(ex) + } + } + } + + /** + * Identifies this session + */ + protected abstract suspend fun identify() + + /** + * Called when the websocket connection has closed. + * + * @param code Close code + * @param reason Close reason + */ + protected abstract suspend fun onClose(code: Short, reason: String?) + + /** + * Used to handle specific events that are received + * + * @param block + */ + protected inline fun on(crossinline block: suspend T.() -> Unit) { + eventFlow.filterIsInstance() + .onEach { + try { + block(it) + } catch (ex: Exception) { + logger.error(ex) + } + }.launchIn(this) + } + + /** + * Handles incoming frames from the voice server + */ + private suspend fun handleIncoming() { + val session = this.socket + session.incoming.asFlow().buffer(Channel.UNLIMITED) + .collect { + when (it) { + is Frame.Text, is Frame.Binary -> handleFrame(it) + else -> { /* noop */ + } + } + } + } + + /** + * Handles an incoming frame + * + * @param frame Frame that was received + */ + private suspend fun handleFrame(frame: Frame) { + val json = frame.data.toString(Charset.defaultCharset()) + + try { + logger.trace("VS >>> $json") + jsonParser.decodeFromString(Event.Companion, json)?.let { channel.send(it) } + } catch (ex: Exception) { + logger.error(ex) + } } companion object { val logger: Logger = LoggerFactory.getLogger(AbstractMediaGatewayConnection::class.java) val client = HttpClient(OkHttp) { install(WebSockets) } - fun ReceiveChannel.asFlow() = flow { + internal fun ReceiveChannel.asFlow() = flow { try { for (value in this@asFlow) emit(value) } catch (ignore: CancellationException) { diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/MediaGatewayV4Connection.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/MediaGatewayV4Connection.kt index f66799d..2f3fcb9 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/gateway/MediaGatewayV4Connection.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/gateway/MediaGatewayV4Connection.kt @@ -24,11 +24,9 @@ import obsidian.bedrock.MediaConnection import obsidian.bedrock.VoiceServerInfo import obsidian.bedrock.codec.OpusCodec import obsidian.bedrock.crypto.EncryptionMode +import obsidian.bedrock.gateway.event.* import obsidian.bedrock.handler.DiscordUDPConnection import obsidian.bedrock.util.Interval -import obsidian.server.util.buildJson -import org.json.JSONArray -import org.json.JSONObject import java.util.* @ObsoleteCoroutinesApi @@ -44,45 +42,37 @@ class MediaGatewayV4Connection( private lateinit var encryptionModes: List init { - on(Op.HELLO) { - val heartbeatInterval = it.getJSONObject("d") - .getLong("heartbeat_interval") - + on { logger.debug("Received HELLO, heartbeat interval: $heartbeatInterval") startHeartbeating(heartbeatInterval) } - on(Op.READY) { - val data = it.getJSONObject("d") - - ssrc = data.getInt("ssrc") - address = NetworkAddress(data.getString("ip"), data.getInt("port")) - encryptionModes = data.getJSONArray("modes").toList().map(Any::toString) - + on { logger.debug("Received READY, ssrc: $ssrc") + // update state + this@MediaGatewayV4Connection.ssrc = ssrc + address = NetworkAddress(ip, port) + encryptionModes = modes + mediaConnection.eventDispatcher.gatewayReady(address!!, ssrc) selectProtocol("udp") } - on(Op.SESSION_DESCRIPTION) { - val data = it.getJSONObject("d") + on { + mediaConnection.eventDispatcher.heartbeatAcknowledged(nonce) + } + + on { if (mediaConnection.connectionHandler != null) { - mediaConnection.eventDispatcher.sessionDescription(data) - mediaConnection.connectionHandler?.handleSessionDescription(data) + mediaConnection.connectionHandler?.handleSessionDescription(this) } else { logger.warn("Received session description before protocol selection? (connection id = $rtcConnectionId)") } } - on(Op.CLIENT_CONNECT) { - val data = it.getJSONObject("d") - val user = data.getString("user_id") - val audioSsrc = data.optInt("audio_ssrc", 0) - val videoSsrc = data.optInt("video_ssrc", 0) - val rtxSsrc = data.optInt("rtx_ssrc", 0) - - mediaConnection.eventDispatcher.userConnected(user, audioSsrc, videoSsrc, rtxSsrc) + on { + mediaConnection.eventDispatcher.userConnected(userId, audioSsrc, videoSsrc, rtxSsrc) } } @@ -104,27 +94,23 @@ class MediaGatewayV4Connection( val externalAddress = connection.connect() logger.debug("Connected, our external address is '$externalAddress'") - mediaConnection.eventDispatcher.externalIPDiscovered(externalAddress) - - val udpInformation = buildJson { - put("address", externalAddress.address.hostAddress) - put("port", externalAddress.port) - put("mode", mode) - } - - sendPayload(Op.SELECT_PROTOCOL, buildJson { - put("protocol", "udp") - put("codecs", SUPPORTED_CODECS) - put("rtc_connection_id", rtcConnectionId.toString()) - put("data", udpInformation) - combineWith(udpInformation) - }) - - sendPayload(Op.CLIENT_CONNECT, buildJson { - put("audio_ssrc", ssrc) - put("video_ssrc", 0) - put("rtx_ssrc", 0) - }) + + sendPayload(SelectProtocol( + protocol = "udp", + codecs = SUPPORTED_CODECS, + connectionId = rtcConnectionId!!, + data = SelectProtocol.UDPInformation( + address = externalAddress.address.hostAddress, + port = externalAddress.port, + mode = mode + ) + )) + + sendPayload(Command.ClientConnect( + audioSsrc = ssrc, + videoSsrc = 0, + rtxSsrc = 0 + )) mediaConnection.connectionHandler = connection logger.debug("Waiting for session description...") @@ -137,12 +123,20 @@ class MediaGatewayV4Connection( override suspend fun identify() { logger.debug("Identifying...") - sendPayload(Op.IDENTIFY, buildJson { - put("server_id", mediaConnection.id.toString()) - put("user_id", mediaConnection.bedrockClient.clientId.toString()) - put("session_id", voiceServerInfo.sessionId) - put("token", voiceServerInfo.token) - }) + sendPayload(Identify( + token = voiceServerInfo.token, + guildId = mediaConnection.id, + userId = mediaConnection.bedrockClient.clientId, + sessionId = voiceServerInfo.sessionId + )) + } + + override suspend fun onClose(code: Short, reason: String?) { + if (interval.started) { + interval.stop() + } + + mediaConnection.eventDispatcher.gatewayClosed(code, reason) } /** @@ -151,11 +145,11 @@ class MediaGatewayV4Connection( * @param mask The speaking mask. */ override suspend fun updateSpeaking(mask: Int) { - sendPayload(Op.SPEAKING, buildJson { - put("speaking", mask) - put("delay", 0) - put("ssrc", ssrc) - }) + sendPayload(Speaking( + speaking = mask, + delay = 0, + ssrc = ssrc + )) } /** @@ -164,11 +158,11 @@ class MediaGatewayV4Connection( * @param delay Delay, in milliseconds, between heart-beats. */ @ObsoleteCoroutinesApi - private suspend fun startHeartbeating(delay: Long) { - interval.start(delay) { + private suspend fun startHeartbeating(delay: Double) { + interval.start(delay.toLong()) { val nonce = System.currentTimeMillis() - sendPayload(Op.HEARTBEAT, nonce) mediaConnection.eventDispatcher.heartbeatDispatched(nonce) + sendPayload(Heartbeat(nonce)) } } @@ -176,21 +170,6 @@ class MediaGatewayV4Connection( /** * All supported audio codecs. */ - val SUPPORTED_CODECS = buildJson { - put(OpusCodec.INSTANCE.jsonDescription) - } - - /** - * Combines this [JSONObject] with the provided [JSONObject] - * - * @param other The [JSONObject] to combine with. - */ - fun JSONObject.combineWith(other: JSONObject): JSONObject { - other.keySet() - .filter { this.has(it) } - .forEach { put(it, other.get(it)) } - - return this - } + val SUPPORTED_CODECS = listOf(OpusCodec.INSTANCE.description) } } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/Op.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/Op.kt deleted file mode 100644 index cb9f150..0000000 --- a/Server/src/main/kotlin/obsidian/bedrock/gateway/Op.kt +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Obsidian - * Copyright (C) 2021 Mixtape-Bot - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package obsidian.bedrock.gateway - -enum class Op(val code: Int) { - IDENTIFY(0), - SELECT_PROTOCOL(1), - READY(2), - HEARTBEAT(3), - SESSION_DESCRIPTION(4), - SPEAKING(5), - HEARTBEAT_ACK(6), - HELLO(8), - CLIENT_CONNECT(12); - - companion object { - /** - * Finds the Op for the provided [code] - * - * @param code The operation code. - */ - operator fun get(code: Int): Op? = - values().find { it.code == code } - } -} \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Command.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Command.kt new file mode 100644 index 0000000..ec09c28 --- /dev/null +++ b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Command.kt @@ -0,0 +1,145 @@ +package obsidian.bedrock.gateway.event + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.SerializationStrategy +import kotlinx.serialization.builtins.LongAsStringSerializer +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.json.JsonObject +import obsidian.bedrock.codec.CodecType +import java.util.* + +sealed class Command { + @Serializable + internal data class ClientConnect( + @SerialName("audio_ssrc") + val audioSsrc: Int, + + @SerialName("video_ssrc") + val videoSsrc: Int, + + @SerialName("rtx_ssrc") + val rtxSsrc: Int, + ) : Command() + + companion object : SerializationStrategy { + override val descriptor: SerialDescriptor = buildClassSerialDescriptor("Command") { + element("op", Op.descriptor) + element("d", JsonObject.serializer().descriptor) + } + + override fun serialize(encoder: Encoder, value: Command) { + val composite = encoder.beginStructure(descriptor) + when (value) { + is SelectProtocol -> { + composite.encodeSerializableElement(descriptor, 0, Op, Op.SelectProtocol) + composite.encodeSerializableElement(descriptor, 1, SelectProtocol.serializer(), value) + } + + is Heartbeat -> { + composite.encodeSerializableElement(descriptor, 0, Op, Op.Heartbeat) + composite.encodeSerializableElement(descriptor, 1, Heartbeat.serializer(), value) + } + + is ClientConnect -> { + composite.encodeSerializableElement(descriptor, 0, Op, Op.ClientConnect) + composite.encodeSerializableElement(descriptor, 1, ClientConnect.serializer(), value) + } + + is Identify -> { + composite.encodeSerializableElement(descriptor, 0, Op, Op.Identify) + composite.encodeSerializableElement(descriptor, 1, Identify.serializer(), value) + } + + is Speaking -> { + composite.encodeSerializableElement(descriptor, 0, Op, Op.Speaking) + composite.encodeSerializableElement(descriptor, 1, Speaking.serializer(), value) + } + + } + + composite.endStructure(descriptor) + } + } +} + +@Serializable +data class SelectProtocol( + val protocol: String, + val codecs: List, + @Serializable(with = UUIDSerializer::class) + @SerialName("rtc_connection_id") + val connectionId: UUID, + val data: UDPInformation +) : Command() { + @Serializable + data class UDPInformation( + val address: String, + val port: Int, + val mode: String + ) +} + +@Serializable +data class CodecDescription( + val name: String, + @SerialName("payload_type") + val payloadType: Byte, + val priority: Int, + val type: CodecType +) + +@Serializable +data class Heartbeat( + val nonce: Long +) : Command() { + companion object : SerializationStrategy { + override val descriptor: SerialDescriptor = + PrimitiveSerialDescriptor("Heartbeat", PrimitiveKind.LONG) + + override fun serialize(encoder: Encoder, value: Heartbeat) { + encoder.encodeLong(value.nonce) + } + } +} + +@Serializable +data class Identify( + val token: String, + + @Serializable(with = LongAsStringSerializer::class) + @SerialName("server_id") + val guildId: Long, + + @Serializable(with = LongAsStringSerializer::class) + @SerialName("user_id") + val userId: Long, + + @SerialName("session_id") + val sessionId: String +) : Command() + +@Serializable +data class Speaking( + val speaking: Int, + val delay: Int, + val ssrc: Int +) : Command() + +object UUIDSerializer : KSerializer { + override val descriptor: SerialDescriptor = + PrimitiveSerialDescriptor("UUID", PrimitiveKind.STRING) + + override fun serialize(encoder: Encoder, value: UUID) { + encoder.encodeString(value.toString()) + } + + override fun deserialize(decoder: Decoder): UUID = + UUID.fromString(decoder.decodeString()) +} diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Event.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Event.kt new file mode 100644 index 0000000..20d8de0 --- /dev/null +++ b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Event.kt @@ -0,0 +1,109 @@ +package obsidian.bedrock.gateway.event + +import kotlinx.serialization.* +import kotlinx.serialization.builtins.nullable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.encoding.CompositeDecoder +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonObject +import obsidian.server.io.Operation + +sealed class Event { + companion object : DeserializationStrategy { + override val descriptor: SerialDescriptor = buildClassSerialDescriptor("Event") { + element("op", Op.descriptor) + element("d", JsonObject.serializer().descriptor, isOptional = true) + } + + @ExperimentalSerializationApi + override fun deserialize(decoder: Decoder): Event? { + var op: Op? = null + var data: Event? = null + + with(decoder.beginStructure(descriptor)) { + loop@ while (true) { + val idx = decodeElementIndex(descriptor) + fun decode(serializer: DeserializationStrategy) = + decodeSerializableElement(Operation.descriptor, idx, serializer) + + when (idx) { + CompositeDecoder.DECODE_DONE -> break@loop + + 0 -> + op = Op.deserialize(decoder) + + 1 -> data = + when (op) { + Op.Hello -> decode(Hello.serializer()) + Op.Ready -> decode(Ready.serializer()) + Op.HeartbeatAck -> decode(HeartbeatAck.serializer()) + Op.SessionDescription -> decode(SessionDescription.serializer()) + Op.ClientConnect -> decode(ClientConnect.serializer()) + + else -> { + decodeNullableSerializableElement(Operation.descriptor, idx, JsonElement.serializer().nullable) + data + } + } + } + } + + endStructure(descriptor) + return data + } + } + } +} + +@Serializable +data class Hello( + @SerialName("heartbeat_interval") + val heartbeatInterval: Double +) : Event() + +@Serializable +data class Ready( + val ssrc: Int, + val ip: String, + val port: Int, + val modes: List +) : Event() + +@Serializable +data class HeartbeatAck(val nonce: Long) : Event() { + companion object : DeserializationStrategy { + override val descriptor: SerialDescriptor + get() = PrimitiveSerialDescriptor("HeartbeatAck", PrimitiveKind.LONG) + + override fun deserialize(decoder: Decoder): HeartbeatAck = + HeartbeatAck(decoder.decodeLong()) + } +} + +@Serializable +data class SessionDescription( + val mode: String, + @SerialName("audio_codec") + val audioCodec: String, + @SerialName("secret_key") + val secretKey: List +) : Event() + +@Serializable +data class ClientConnect( + @SerialName("user_id") + val userId: String, + + @SerialName("audio_ssrc") + val audioSsrc: Int = 0, + + @SerialName("video_ssrc") + val videoSsrc: Int = 0, + + @SerialName("rtx_ssrc") + val rtxSsrc: Int = 0, +) : Event() \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Op.kt b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Op.kt new file mode 100644 index 0000000..6945cc8 --- /dev/null +++ b/Server/src/main/kotlin/obsidian/bedrock/gateway/event/Op.kt @@ -0,0 +1,64 @@ +/* + * Obsidian + * Copyright (C) 2021 Mixtape-Bot + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package obsidian.bedrock.gateway.event + +import kotlinx.serialization.KSerializer +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder + +enum class Op(val code: Int) { + Unknown(Int.MIN_VALUE), + + // sent + Identify(0), + SelectProtocol(1), + Heartbeat(3), + Speaking(5), + + // received & sent + ClientConnect(12), + + // received + Ready(2), + SessionDescription(4), + HeartbeatAck(6), + Hello(8); + + companion object Serializer : KSerializer { + /** + * Finds the Op for the provided [code] + * + * @param code The operation code. + */ + operator fun get(code: Int): Op? = + values().find { it.code == code } + + override val descriptor: SerialDescriptor + get() = PrimitiveSerialDescriptor("op", PrimitiveKind.INT) + + override fun deserialize(decoder: Decoder): Op = + this[decoder.decodeInt()] ?: Unknown + + override fun serialize(encoder: Encoder, value: Op) = + encoder.encodeInt(value.code) + } +} \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/handler/ConnectionHandler.kt b/Server/src/main/kotlin/obsidian/bedrock/handler/ConnectionHandler.kt index 2e36fc5..d5af702 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/handler/ConnectionHandler.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/handler/ConnectionHandler.kt @@ -21,7 +21,7 @@ package obsidian.bedrock.handler import io.ktor.util.network.* import io.netty.buffer.ByteBuf import obsidian.bedrock.codec.Codec -import org.json.JSONObject +import obsidian.bedrock.gateway.event.SessionDescription import java.io.Closeable /** @@ -36,7 +36,7 @@ interface ConnectionHandler : Closeable { * * @param data The session description data. */ - suspend fun handleSessionDescription(data: JSONObject) + suspend fun handleSessionDescription(data: SessionDescription) /** * Connects to the Discord UDP Socket. @@ -45,9 +45,5 @@ interface ConnectionHandler : Closeable { */ suspend fun connect(): NetworkAddress - suspend fun sendFrame(codec: Codec, timestamp: Int, data: ByteBuf, start: Int) { - sendFrame(codec.payloadType, timestamp, data, start, false) - } - suspend fun sendFrame(payloadType: Byte, timestamp: Int, data: ByteBuf, start: Int, extension: Boolean) } \ No newline at end of file diff --git a/Server/src/main/kotlin/obsidian/bedrock/handler/DiscordUDPConnection.kt b/Server/src/main/kotlin/obsidian/bedrock/handler/DiscordUDPConnection.kt index dd41e73..2fab144 100644 --- a/Server/src/main/kotlin/obsidian/bedrock/handler/DiscordUDPConnection.kt +++ b/Server/src/main/kotlin/obsidian/bedrock/handler/DiscordUDPConnection.kt @@ -27,9 +27,9 @@ import obsidian.bedrock.Bedrock import obsidian.bedrock.MediaConnection import obsidian.bedrock.codec.Codec import obsidian.bedrock.crypto.EncryptionMode +import obsidian.bedrock.gateway.event.SessionDescription import obsidian.bedrock.util.NettyBootstrapFactory import obsidian.bedrock.util.writeV2 -import org.json.JSONObject import org.slf4j.Logger import org.slf4j.LoggerFactory import java.io.Closeable @@ -73,14 +73,12 @@ class DiscordUDPConnection( } } - override suspend fun handleSessionDescription(sessionDescription: JSONObject) { - val mode = sessionDescription.getString("mode") - val audioCodecName = sessionDescription.getString("audio_codec") - encryptionMode = EncryptionMode[mode] + override suspend fun handleSessionDescription(data: SessionDescription) { + encryptionMode = EncryptionMode[data.mode] - val audioCodec = Codec.getAudio(audioCodecName) - if (audioCodecName != null && audioCodec == null) { - logger.warn("Unsupported audio codec type: {}, no audio data will be polled", audioCodecName) + val audioCodec = Codec.getAudio(data.audioCodec) + if (audioCodec == null) { + logger.warn("Unsupported audio codec type: {}, no audio data will be polled", data.audioCodec) } checkNotNull(encryptionMode) { @@ -88,11 +86,8 @@ class DiscordUDPConnection( "protocol changed! Open an issue!" } - val keyArray = sessionDescription.getJSONArray("secret_key") - secretKey = ByteArray(keyArray.length()) - - for (i in secretKey!!.indices) { - secretKey!![i] = (keyArray.getInt(i) and 0xff).toByte() + secretKey = ByteArray(data.secretKey.size) { idx -> + (data.secretKey[idx] and 0xff).toByte() } connection.startFramePolling() diff --git a/Server/src/main/kotlin/obsidian/server/Obsidian.kt b/Server/src/main/kotlin/obsidian/server/Obsidian.kt index b4bb538..488c5a0 100644 --- a/Server/src/main/kotlin/obsidian/server/Obsidian.kt +++ b/Server/src/main/kotlin/obsidian/server/Obsidian.kt @@ -60,23 +60,22 @@ object Obsidian { @JvmStatic fun main(args: Array) { - val rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME) as Logger - val rollingFileAppender = RollingFileAppender().apply { - context = rootLogger.loggerContext - rollingPolicy = TimeBasedRollingPolicy().apply { - maxHistory = 30 - fileNamePattern = "logs/obsidian.%d{yyyyMMdd}.log" - } - - - file = "logs/obsidian.log" - encoder = PatternLayoutEncoder().apply { - pattern = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%20.-20thread] %-40.40logger{39} %-6level %msg%n" - } - } - - rootLogger.addAppender(rollingFileAppender) - rootLogger.level = Level.toLevel(config[LoggingConfig.Level]) +// val rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME) as Logger +// val rollingFileAppender = RollingFileAppender().apply { +// context = rootLogger.loggerContext +// rollingPolicy = TimeBasedRollingPolicy().apply { +// maxHistory = 30 +// fileNamePattern = "logs/obsidian.%d{yyyyMMdd}.log" +// } +// +// file = "logs/obsidian.log" +// encoder = PatternLayoutEncoder().apply { +// pattern = "%d{yyyy-MM-dd HH:mm:ss.SSS} [%20.-20thread] %-40.40logger{39} %-6level %msg%n" +// } +// } +// +// rootLogger.addAppender(rollingFileAppender) +// rootLogger.level = Level.toLevel(config[LoggingConfig.Level]) val server = embeddedServer(CIO, host = config[ObsidianConfig.Host], port = config[ObsidianConfig.Port]) { install(Locations) diff --git a/Server/src/main/kotlin/obsidian/server/io/Dispatch.kt b/Server/src/main/kotlin/obsidian/server/io/Dispatch.kt index 8aa96a8..a31d7ce 100644 --- a/Server/src/main/kotlin/obsidian/server/io/Dispatch.kt +++ b/Server/src/main/kotlin/obsidian/server/io/Dispatch.kt @@ -69,6 +69,11 @@ sealed class Dispatch { encodeSerializableElement(descriptor, 1, TrackStuckEvent.serializer(), value) } + is WebSocketOpenEvent -> { + encodeSerializableElement(descriptor, 0, Op, Op.PlayerEvent) + encodeSerializableElement(descriptor, 1, WebSocketOpenEvent.serializer(), value) + } + is WebSocketClosedEvent -> { encodeSerializableElement(descriptor, 0, Op, Op.PlayerEvent) encodeSerializableElement(descriptor, 1, WebSocketClosedEvent.serializer(), value) @@ -117,15 +122,23 @@ sealed class PlayerEvent : Dispatch() { } @Serializable -data class WebSocketClosedEvent( +data class WebSocketOpenEvent( @Serializable(with = LongAsStringSerializer::class) @SerialName("guild_id") override val guildId: Long, + val ssrc: Int, + val target: String +) : PlayerEvent() { + override val type: PlayerEventType = PlayerEventType.WEBSOCKET_OPEN +} - @SerialName("by_remote") - val byRemote: Boolean, +@Serializable +data class WebSocketClosedEvent( + @Serializable(with = LongAsStringSerializer::class) + @SerialName("guild_id") + override val guildId: Long, val reason: String?, - val code: Int + val code: Short ) : PlayerEvent() { override val type: PlayerEventType = PlayerEventType.WEBSOCKET_CLOSED } @@ -190,6 +203,7 @@ data class TrackExceptionEvent( } enum class PlayerEventType { + WEBSOCKET_OPEN, WEBSOCKET_CLOSED, TRACK_START, TRACK_END, diff --git a/Server/src/main/kotlin/obsidian/server/io/Magma.kt b/Server/src/main/kotlin/obsidian/server/io/Magma.kt index d9e3135..3ffb593 100644 --- a/Server/src/main/kotlin/obsidian/server/io/Magma.kt +++ b/Server/src/main/kotlin/obsidian/server/io/Magma.kt @@ -23,8 +23,10 @@ import io.ktor.http.cio.websocket.* import io.ktor.locations.* import io.ktor.request.* import io.ktor.routing.* +import io.ktor.util.* import io.ktor.util.pipeline.* import io.ktor.websocket.* +import kotlinx.coroutines.isActive import obsidian.server.io.MagmaCloseReason.CLIENT_EXISTS import obsidian.server.io.MagmaCloseReason.INVALID_AUTHORIZATION import obsidian.server.io.MagmaCloseReason.NO_USER_ID @@ -45,7 +47,40 @@ class Magma private constructor() { fun use(routing: Routing) { routing { - webSocket("/", handler = this@Magma::websocketHandler) + webSocket("/") { + val request = call.request + if (!ObsidianConfig.validateAuth(request.authorization())) { + logger.warn("Authentication failed from ${request.local.remoteHost}") + close(INVALID_AUTHORIZATION) + return@webSocket + } else { + logger.info("Incoming request from ${request.local.remoteHost}") + } + + val userId = request.headers["User-Id"]?.toLongOrNull() + if (userId == null) { + close(NO_USER_ID) + logger.info("${request.local.remoteHost}: Missing 'User-Id' header") + return@webSocket + } + + var client = clients[userId] + if (client != null) { + close(CLIENT_EXISTS) + return@webSocket + } + + client = MagmaClient(userId, this) + try { + client.listen() + } catch (ex: Throwable) { + logger.error(ex) + close(CloseReason(4005, ex.message ?: "unknown exception")) + } + + client.shutdown() + clients.remove(userId) + } get("/") { context.respondJson { @@ -61,42 +96,6 @@ class Magma private constructor() { Tracks(routing) } - /** - * Handles an incoming session - */ - private suspend fun websocketHandler(session: WebSocketServerSession) { - val request = session.call.request - if (!ObsidianConfig.validateAuth(request.authorization())) { - logger.warn("Authentication failed from ${request.local.remoteHost}") - session.close(INVALID_AUTHORIZATION) - return - } else { - logger.info("Incoming request from ${request.local.remoteHost}") - } - - val userId = request.headers["User-Id"]?.toLongOrNull() - if (userId == null) { - session.close(NO_USER_ID) - return - } - - var client = clients[userId] - if (client != null) { - session.close(CLIENT_EXISTS) - return - } - - client = MagmaClient(userId, session) - try { - client.listen() - } catch (ex: Throwable) { - session.close(CloseReason(4005, ex.message ?: "Unknown Error")) - } - - client.shutdown() - clients.remove(userId) - } - suspend fun shutdown() { if (clients.isNotEmpty()) { logger.info("Shutting down ${clients.size} clients.") diff --git a/Server/src/main/kotlin/obsidian/server/io/MagmaClient.kt b/Server/src/main/kotlin/obsidian/server/io/MagmaClient.kt index 20256f8..2315b24 100644 --- a/Server/src/main/kotlin/obsidian/server/io/MagmaClient.kt +++ b/Server/src/main/kotlin/obsidian/server/io/MagmaClient.kt @@ -21,6 +21,7 @@ package obsidian.server.io import com.sedmelluq.discord.lavaplayer.track.TrackMarker import io.ktor.http.cio.websocket.* import io.ktor.util.* +import io.ktor.util.network.* import io.ktor.websocket.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel @@ -39,10 +40,12 @@ import obsidian.server.util.TrackUtil import org.json.JSONObject import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.nio.charset.Charset import java.util.concurrent.ConcurrentHashMap import kotlin.coroutines.CoroutineContext import kotlin.reflect.full.* +@ExperimentalCoroutinesApi @Suppress("unused") class MagmaClient( private val clientId: Long, @@ -58,14 +61,6 @@ class MagmaClient( */ val links = ConcurrentHashMap() - /** - * JSON parser for events. - */ - private val jsonParser = Json { - ignoreUnknownKeys = true - isLenient = true - encodeDefaults = true - } /** * Events flow lol - idk kotlin @@ -83,7 +78,13 @@ class MagmaClient( init { on { - mediaConnectionFor(guildId).connect(VoiceServerInfo(sessionId, token, endpoint)) + val conn = mediaConnectionFor(guildId) + val link = links.computeIfAbsent(guildId) { + Link(this@MagmaClient, guildId) + } + + conn.connect(VoiceServerInfo(sessionId, token, endpoint)) + link.provideTo(conn) } on { @@ -148,21 +149,16 @@ class MagmaClient( } link.play(track) - - val connection: MediaConnection = mediaConnectionFor(guildId) - link.provideTo(connection) } } @ObsoleteCoroutinesApi suspend fun listen() { -// sendStats() - session.incoming.asFlow().buffer(Channel.UNLIMITED) .collect { when (it) { - is Frame.Text -> handleFrame(it) - else -> { /* no-op */ + is Frame.Binary, is Frame.Text -> handleIncomingFrame(it) + else -> { // no-op } } } @@ -181,18 +177,19 @@ class MagmaClient( } /** - * Handles an incoming [Frame.Text]. + * Handles an incoming [Frame]. * - * @param frame The received text frame, binary frames. + * @param frame The received text or binary frame. */ - private suspend fun handleFrame(frame: Frame.Text) { - val json = frame.readText() - logger.trace("$clientId -> $json") - - val operation = jsonParser.decodeFromString(Operation.Companion, json) - ?: return - - events.emit(operation) + private suspend fun handleIncomingFrame(frame: Frame) { + val json = frame.data.toString(Charset.defaultCharset()) + + try { + logger.trace("$clientId >>> $json") + jsonParser.decodeFromString(Operation, json)?.let { events.emit(it) } + } catch (ex: Exception) { + logger.error(ex) + } } private fun mediaConnectionFor(guildId: Long): MediaConnection { @@ -208,13 +205,12 @@ class MagmaClient( /** * Send a JSON payload to the client. * - * @param json The jason payload. + * @param dispatch The dispatch instance */ - public suspend fun send(dispatch: Dispatch) { + suspend fun send(dispatch: Dispatch) { val json = jsonParser.encodeToString(Dispatch.Companion, dispatch) - - session.send(Frame.Text(json)) - logger.trace("$clientId <- ${json}") + logger.trace("$clientId <- $json") + session.send(json) } internal suspend fun shutdown() { @@ -230,13 +226,22 @@ class MagmaClient( private var lastHeartbeat: Long? = null private var lastHeartbeatNonce: Long? = null - override suspend fun gatewayClosed(code: Int, byRemote: Boolean, reason: String?) { + override suspend fun gatewayReady(target: NetworkAddress, ssrc: Int) { + send( + WebSocketOpenEvent( + guildId = guildId, + ssrc = ssrc, + target = target.hostname + ) + ) + } + + override suspend fun gatewayClosed(code: Short, reason: String?) { send( WebSocketClosedEvent( guildId = guildId, reason = reason, - code = code, - byRemote = byRemote + code = code ) ) } @@ -276,6 +281,15 @@ class MagmaClient( } companion object { + /** + * JSON parser for everything. + */ + val jsonParser = Json { + ignoreUnknownKeys = true + isLenient = true + encodeDefaults = true + } + private val logger: Logger = LoggerFactory.getLogger(MagmaClient::class.java) private fun getGuildId(data: JSONObject): Long? = @@ -285,4 +299,4 @@ class MagmaClient( null } } -} \ No newline at end of file +} diff --git a/Server/src/main/kotlin/obsidian/server/io/Op.kt b/Server/src/main/kotlin/obsidian/server/io/Op.kt index 1718f84..fe7f8aa 100644 --- a/Server/src/main/kotlin/obsidian/server/io/Op.kt +++ b/Server/src/main/kotlin/obsidian/server/io/Op.kt @@ -65,4 +65,4 @@ enum class Op(val code: Int) { encoder.encodeInt(value.code) } -} \ No newline at end of file +} diff --git a/Server/src/main/kotlin/obsidian/server/io/Operation.kt b/Server/src/main/kotlin/obsidian/server/io/Operation.kt index ce17823..2ed222f 100644 --- a/Server/src/main/kotlin/obsidian/server/io/Operation.kt +++ b/Server/src/main/kotlin/obsidian/server/io/Operation.kt @@ -98,7 +98,10 @@ data class PlayTrack( ) : Operation() @Serializable -data class StopTrack(@SerialName("guild_id") val guildId: Long) : Operation() +data class StopTrack( + @SerialName("guild_id") + val guildId: Long +) : Operation() @Serializable data class SubmitVoiceUpdate( @@ -129,9 +132,11 @@ data class Filters( val equalizer: EqualizerFilter? = null, val timescale: TimescaleFilter? = null, val karaoke: KaraokeFilter? = null, + @SerialName("channel_mix") val channelMix: ChannelMixFilter? = null, val vibrato: VibratoFilter? = null, val rotation: RotationFilter? = null, + @SerialName("low_pass") val lowPass: LowPassFilter? = null ) : Operation() diff --git a/Server/src/main/kotlin/obsidian/server/player/filter/impl/KaraokeFilter.kt b/Server/src/main/kotlin/obsidian/server/player/filter/impl/KaraokeFilter.kt index 6ea767f..be726b2 100644 --- a/Server/src/main/kotlin/obsidian/server/player/filter/impl/KaraokeFilter.kt +++ b/Server/src/main/kotlin/obsidian/server/player/filter/impl/KaraokeFilter.kt @@ -21,14 +21,18 @@ package obsidian.server.player.filter.impl import com.github.natanbc.lavadsp.karaoke.KaraokePcmAudioFilter import com.sedmelluq.discord.lavaplayer.filter.FloatPcmAudioFilter import com.sedmelluq.discord.lavaplayer.format.AudioDataFormat +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import obsidian.server.player.filter.Filter @Serializable data class KaraokeFilter( val level: Float, + @SerialName("mono_level") val monoLevel: Float, + @SerialName("filter_band") val filterBand: Float, + @SerialName("filter_width") val filterWidth: Float, ) : Filter { override val enabled: Boolean diff --git a/Server/src/main/kotlin/obsidian/server/player/filter/impl/RotationFilter.kt b/Server/src/main/kotlin/obsidian/server/player/filter/impl/RotationFilter.kt index 2c1d85c..eb30091 100644 --- a/Server/src/main/kotlin/obsidian/server/player/filter/impl/RotationFilter.kt +++ b/Server/src/main/kotlin/obsidian/server/player/filter/impl/RotationFilter.kt @@ -21,11 +21,13 @@ package obsidian.server.player.filter.impl import com.github.natanbc.lavadsp.rotation.RotationPcmAudioFilter import com.sedmelluq.discord.lavaplayer.filter.FloatPcmAudioFilter import com.sedmelluq.discord.lavaplayer.format.AudioDataFormat +import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import obsidian.server.player.filter.Filter @Serializable data class RotationFilter( + @SerialName("rotation_hz") val rotationHz: Float = 5f ) : Filter { override val enabled: Boolean diff --git a/Server/src/main/kotlin/obsidian/server/player/filter/impl/TimescaleFilter.kt b/Server/src/main/kotlin/obsidian/server/player/filter/impl/TimescaleFilter.kt index 298a10e..dcc34ac 100644 --- a/Server/src/main/kotlin/obsidian/server/player/filter/impl/TimescaleFilter.kt +++ b/Server/src/main/kotlin/obsidian/server/player/filter/impl/TimescaleFilter.kt @@ -29,8 +29,12 @@ import obsidian.server.player.filter.FilterChain @Serializable data class TimescaleFilter( val pitch: Float = 1f, + val pitchOctaves: Float? = null, + val pitchSemiTones: Float? = null, val speed: Float = 1f, - val rate: Float = 1f + val speedChange: Float? = null, + val rate: Float = 1f, + val rateChange: Float? = null, ) : Filter { override val enabled: Boolean get() = @@ -51,12 +55,43 @@ data class TimescaleFilter( require(pitch > 0) { "'pitch' must be greater than 0" } + + if (pitchOctaves != null) { + require(!isSet(pitch, 1.0F) && pitchSemiTones == null) { + "'pitchOctaves' cannot be used in conjunction with 'pitch' and 'pitchSemiTones'" + } + } + + if (pitchSemiTones != null) { + require(!isSet(pitch, 1.0F) && pitchOctaves == null) { + "'pitchOctaves' cannot be used in conjunction with 'pitch' and 'pitchSemiTones'" + } + } + + if (speedChange != null) { + require(!isSet(speed, 1.0F)) { + "'speedChange' cannot be used in conjunction with 'speed'" + } + } + + if (rateChange != null) { + require(!isSet(rate, 1.0F)) { + "'rateChange' cannot be used in conjunction with 'rate'" + } + } } override fun build(format: AudioDataFormat, downstream: FloatPcmAudioFilter): FloatPcmAudioFilter = - TimescalePcmAudioFilter(downstream, format.channelCount, format.sampleRate) - .setPitch(pitch.toDouble()) - .setRate(rate.toDouble()) - .setSpeed(speed.toDouble()) + TimescalePcmAudioFilter(downstream, format.channelCount, format.sampleRate).also { af -> + af.pitch = pitch.toDouble() + af.rate = rate.toDouble() + af.speed = speed.toDouble() + + this.pitchOctaves?.let { af.setPitchOctaves(it.toDouble()) } + this.pitchSemiTones?.let { af.setPitchSemiTones(it.toDouble()) } + this.speedChange?.let { af.setSpeedChange(it.toDouble()) } + this.rateChange?.let { af.setRateChange(it.toDouble()) } + } + } diff --git a/Server/src/main/kotlin/obsidian/server/util/Builders.kt b/Server/src/main/kotlin/obsidian/server/util/Builders.kt index ffc983b..b76b8f0 100644 --- a/Server/src/main/kotlin/obsidian/server/util/Builders.kt +++ b/Server/src/main/kotlin/obsidian/server/util/Builders.kt @@ -18,9 +18,6 @@ package obsidian.server.util -inline fun buildJsonString(builder: T.() -> Unit): String = - buildJson(builder).toString() - inline fun buildJson(builder: T.() -> Unit): T = T::class.java .getDeclaredConstructor() diff --git a/Server/src/main/kotlin/obsidian/server/util/Extensions.kt b/Server/src/main/kotlin/obsidian/server/util/Extensions.kt index 4eb5250..d821643 100644 --- a/Server/src/main/kotlin/obsidian/server/util/Extensions.kt +++ b/Server/src/main/kotlin/obsidian/server/util/Extensions.kt @@ -38,5 +38,6 @@ suspend inline fun ApplicationCall.respondJson( respondText( json.toString(), status = status, - contentType = ContentType.Application.Json + contentType = ContentType.Application.Json, + configure = configure ) \ No newline at end of file