diff --git a/src/main/kotlin/io/xconn/xconn/Session.kt b/src/main/kotlin/io/xconn/xconn/Session.kt index 248838f..39769da 100644 --- a/src/main/kotlin/io/xconn/xconn/Session.kt +++ b/src/main/kotlin/io/xconn/xconn/Session.kt @@ -6,10 +6,16 @@ import io.xconn.wampproto.messages.Call import io.xconn.wampproto.messages.Error import io.xconn.wampproto.messages.Goodbye import io.xconn.wampproto.messages.Message +import io.xconn.wampproto.messages.Publish +import io.xconn.wampproto.messages.Published import io.xconn.wampproto.messages.Register import io.xconn.wampproto.messages.Registered +import io.xconn.wampproto.messages.Subscribe +import io.xconn.wampproto.messages.Subscribed import io.xconn.wampproto.messages.Unregister import io.xconn.wampproto.messages.Unregistered +import io.xconn.wampproto.messages.Unsubscribe +import io.xconn.wampproto.messages.Unsubscribed import io.xconn.wampproto.messages.Yield import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope @@ -20,6 +26,7 @@ import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import kotlin.coroutines.cancellation.CancellationException +import io.xconn.wampproto.messages.Event as EventMsg import io.xconn.wampproto.messages.Invocation as InvocationMsg import io.xconn.wampproto.messages.Result as ResultMsg @@ -32,6 +39,12 @@ class Session(private val baseSession: BaseSession) { private val registerRequests: MutableMap = mutableMapOf() private val registrations: MutableMap Result> = mutableMapOf() private val unregisterRequests: MutableMap = mutableMapOf() + + private val publishRequests: MutableMap> = mutableMapOf() + private val subscribeRequests: MutableMap = mutableMapOf() + private val subscriptions: MutableMap Unit> = mutableMapOf() + private val unsubscribeRequests: MutableMap = mutableMapOf() + private val goodbyeRequest: CompletableDeferred = CompletableDeferred() init { @@ -108,6 +121,30 @@ class Session(private val baseSession: BaseSession) { request.completable.complete(Unit) } } + is Published -> { + val request = publishRequests.remove(message.requestID) + request?.complete(Unit) + } + is Subscribed -> { + val request = subscribeRequests.remove(message.requestID) + if (request != null) { + subscriptions[message.subscriptionID] = request.endpoint + request.completable.complete(Subscription(message.subscriptionID)) + } + } + is EventMsg -> { + val endpoint = subscriptions[message.subscriptionID] + if (endpoint != null) { + endpoint(Event(message.args, message.kwargs, message.details)) + } + } + is Unsubscribed -> { + val request = unsubscribeRequests.remove(message.requestID) + if (request != null) { + subscriptions.remove(request.subscriptionID) + request.completable.complete(Unit) + } + } is Goodbye -> { goodbyeRequest.complete(Unit) } @@ -156,7 +193,7 @@ class Session(private val baseSession: BaseSession) { suspend fun register( procedure: String, endpoint: (Invocation) -> Result, - options: Map? = null, + options: Map? = emptyMap(), ): CompletableDeferred { val register = Register(nextID, procedure, options) @@ -178,4 +215,51 @@ class Session(private val baseSession: BaseSession) { return completable } + + suspend fun publish( + topic: String, + args: List? = null, + kwargs: Map? = null, + options: Map = emptyMap(), + ): CompletableDeferred? { + val publish = Publish(nextID, topic, args, kwargs, options) + + baseSession.send(wampSession.sendMessage(publish)) + + val ack = options["acknowledge"] as? Boolean ?: false + if (ack) { + val completer = CompletableDeferred() + publishRequests[publish.requestID] = completer + + return completer + } + + return null + } + + suspend fun subscribe( + topic: String, + endpoint: (Event) -> Unit, + options: Map? = emptyMap(), + ): CompletableDeferred { + val subscribe = Subscribe(nextID, topic, options) + + val completable = CompletableDeferred() + subscribeRequests[subscribe.requestID] = SubscribeRequest(completable, endpoint) + + baseSession.send(wampSession.sendMessage(subscribe)) + + return completable + } + + suspend fun unsubscribe(sub: Subscription): CompletableDeferred { + val unsubscribe = Unsubscribe(nextID, sub.subscriptionID) + + val completable = CompletableDeferred() + unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completable, sub.subscriptionID) + + baseSession.send(wampSession.sendMessage(unsubscribe)) + + return completable + } } diff --git a/src/main/kotlin/io/xconn/xconn/Types.kt b/src/main/kotlin/io/xconn/xconn/Types.kt index 0b93824..c6b5434 100644 --- a/src/main/kotlin/io/xconn/xconn/Types.kt +++ b/src/main/kotlin/io/xconn/xconn/Types.kt @@ -100,3 +100,21 @@ data class UnregisterRequest( val completable: CompletableDeferred, val registrationID: Long, ) + +data class Subscription(val subscriptionID: Long) + +data class SubscribeRequest( + val completable: CompletableDeferred, + val endpoint: (Event) -> Unit, +) + +data class Event( + val args: List? = emptyList(), + val kwargs: Map? = emptyMap(), + val details: Map = emptyMap(), +) + +data class UnsubscribeRequest( + val completable: CompletableDeferred, + val subscriptionID: Long, +)