Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pubsub #3

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion src/main/kotlin/io/xconn/xconn/Session.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ import io.xconn.wampproto.messages.Call
import io.xconn.wampproto.messages.Error
import io.xconn.wampproto.messages.Goodbye
import io.xconn.wampproto.messages.Message
import io.xconn.wampproto.messages.Publish
import io.xconn.wampproto.messages.Published
import io.xconn.wampproto.messages.Register
import io.xconn.wampproto.messages.Registered
import io.xconn.wampproto.messages.Subscribe
import io.xconn.wampproto.messages.Subscribed
import io.xconn.wampproto.messages.Unregister
import io.xconn.wampproto.messages.Unregistered
import io.xconn.wampproto.messages.Unsubscribe
import io.xconn.wampproto.messages.Unsubscribed
import io.xconn.wampproto.messages.Yield
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
Expand All @@ -20,6 +26,7 @@ import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import kotlin.coroutines.cancellation.CancellationException
import io.xconn.wampproto.messages.Event as EventMsg
import io.xconn.wampproto.messages.Invocation as InvocationMsg
import io.xconn.wampproto.messages.Result as ResultMsg

Expand All @@ -32,6 +39,12 @@ class Session(private val baseSession: BaseSession) {
private val registerRequests: MutableMap<Long, RegisterRequest> = mutableMapOf()
private val registrations: MutableMap<Long, (Invocation) -> Result> = mutableMapOf()
private val unregisterRequests: MutableMap<Long, UnregisterRequest> = mutableMapOf()

private val publishRequests: MutableMap<Long, CompletableDeferred<Unit>> = mutableMapOf()
private val subscribeRequests: MutableMap<Long, SubscribeRequest> = mutableMapOf()
private val subscriptions: MutableMap<Long, (Event) -> Unit> = mutableMapOf()
private val unsubscribeRequests: MutableMap<Long, UnsubscribeRequest> = mutableMapOf()

private val goodbyeRequest: CompletableDeferred<Unit> = CompletableDeferred()

init {
Expand Down Expand Up @@ -108,6 +121,30 @@ class Session(private val baseSession: BaseSession) {
request.completable.complete(Unit)
}
}
is Published -> {
val request = publishRequests.remove(message.requestID)
request?.complete(Unit)
}
is Subscribed -> {
val request = subscribeRequests.remove(message.requestID)
if (request != null) {
subscriptions[message.subscriptionID] = request.endpoint
request.completable.complete(Subscription(message.subscriptionID))
}
}
is EventMsg -> {
val endpoint = subscriptions[message.subscriptionID]
if (endpoint != null) {
endpoint(Event(message.args, message.kwargs, message.details))
}
}
is Unsubscribed -> {
val request = unsubscribeRequests.remove(message.requestID)
if (request != null) {
subscriptions.remove(request.subscriptionID)
request.completable.complete(Unit)
}
}
is Goodbye -> {
goodbyeRequest.complete(Unit)
}
Expand Down Expand Up @@ -156,7 +193,7 @@ class Session(private val baseSession: BaseSession) {
suspend fun register(
procedure: String,
endpoint: (Invocation) -> Result,
options: Map<String, Any>? = null,
options: Map<String, Any>? = emptyMap(),
): CompletableDeferred<Registration> {
val register = Register(nextID, procedure, options)

Expand All @@ -178,4 +215,51 @@ class Session(private val baseSession: BaseSession) {

return completable
}

suspend fun publish(
topic: String,
args: List<Any>? = null,
kwargs: Map<String, Any>? = null,
options: Map<String, Any> = emptyMap(),
): CompletableDeferred<Unit>? {
val publish = Publish(nextID, topic, args, kwargs, options)

baseSession.send(wampSession.sendMessage(publish))

val ack = options["acknowledge"] as? Boolean ?: false
if (ack) {
val completer = CompletableDeferred<Unit>()
publishRequests[publish.requestID] = completer

return completer
}

return null
}

suspend fun subscribe(
topic: String,
endpoint: (Event) -> Unit,
options: Map<String, Any>? = emptyMap(),
): CompletableDeferred<Subscription> {
val subscribe = Subscribe(nextID, topic, options)

val completable = CompletableDeferred<Subscription>()
subscribeRequests[subscribe.requestID] = SubscribeRequest(completable, endpoint)

baseSession.send(wampSession.sendMessage(subscribe))

return completable
}

suspend fun unsubscribe(sub: Subscription): CompletableDeferred<Unit> {
val unsubscribe = Unsubscribe(nextID, sub.subscriptionID)

val completable = CompletableDeferred<Unit>()
unsubscribeRequests[unsubscribe.requestID] = UnsubscribeRequest(completable, sub.subscriptionID)

baseSession.send(wampSession.sendMessage(unsubscribe))

return completable
}
}
18 changes: 18 additions & 0 deletions src/main/kotlin/io/xconn/xconn/Types.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,21 @@ data class UnregisterRequest(
val completable: CompletableDeferred<Unit>,
val registrationID: Long,
)

data class Subscription(val subscriptionID: Long)

data class SubscribeRequest(
val completable: CompletableDeferred<Subscription>,
val endpoint: (Event) -> Unit,
)

data class Event(
val args: List<Any>? = emptyList(),
val kwargs: Map<String, Any>? = emptyMap(),
val details: Map<String, Any> = emptyMap(),
)

data class UnsubscribeRequest(
val completable: CompletableDeferred<Unit>,
val subscriptionID: Long,
)