Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Base for push notification system using websockets #426

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions server/src/main/kotlin/suwayomi/tachidesk/event/Event.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package suwayomi.tachidesk.event

import suwayomi.tachidesk.event.enums.EventType
import java.util.UUID

data class Event<T>(
val id: UUID? = UUID.randomUUID(),
val type: EventType,
val entity: T
)
41 changes: 41 additions & 0 deletions server/src/main/kotlin/suwayomi/tachidesk/event/EventDispatcher.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package suwayomi.tachidesk.event

import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList

abstract class EventDispatcher<T : EventEntity> {
private val clients = ConcurrentHashMap<String, WsContext>()
private val eventQueue = CopyOnWriteArrayList<Event<T>>()

fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx
}

fun removeClient(ctx: WsContext) {
clients.remove(key = ctx.sessionId)
}

abstract fun notifyClient(ctx: WsContext, event: Event<T>)

abstract fun handleRequest(ctx: WsMessageContext)

fun notifyAllClients(event: Event<T>) {
clients.forEach {
notifyClient(ctx = it.value, event = event)
}
}

fun enqueue(event: Event<T>) {
eventQueue += event
}

// to be consumed in the client based on
// the event type in a notifications screen
fun queue() = eventQueue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this should be used for. Could you expand on what would be the content of "Notifications screen"?


fun dequeue(event: Event<T>) {
eventQueue.removeIf { it.id == event.id }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package suwayomi.tachidesk.event

abstract class EventEntity
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package suwayomi.tachidesk.event.enums

enum class EventType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be some example events for these types?

SYSTEM,
INFORMATIVE,
STATIC,
DYNAMIC
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ object DownloadController {
/** Download queue stats */
fun downloadsWS(ws: WsConfig) {
ws.onConnect { ctx ->
DownloadManager.addClient(ctx)
DownloadManager.eventDispatcher.addClient(ctx)
DownloadManager.notifyClient(ctx)
}
ws.onMessage { ctx ->
DownloadManager.handleRequest(ctx)
DownloadManager.eventDispatcher.handleRequest(ctx)
}
ws.onClose { ctx ->
DownloadManager.removeClient(ctx)
DownloadManager.eventDispatcher.removeClient(ctx)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package suwayomi.tachidesk.manga.impl.download

import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import suwayomi.tachidesk.event.Event
import suwayomi.tachidesk.event.EventDispatcher
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus

class DownloadEventDispatcher : EventDispatcher<DownloadStatus>() {
override fun notifyClient(ctx: WsContext, event: Event<DownloadStatus>) {
ctx.send(
event.entity
)
}

override fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> DownloadManager.notifyClient(ctx)
else -> ctx.send(
"""
|Invalid command.
|Supported commands are:
| - STATUS
| sends the current download status
|
""".trimMargin()
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,30 @@ package suwayomi.tachidesk.manga.impl.download
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import suwayomi.tachidesk.event.Event
import suwayomi.tachidesk.event.enums.EventType
import suwayomi.tachidesk.manga.impl.Manga.getManga
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
import suwayomi.tachidesk.manga.model.table.ChapterTable
import suwayomi.tachidesk.manga.model.table.toDataClass
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList

object DownloadManager {
private val clients = ConcurrentHashMap<String, WsContext>()
val eventDispatcher = DownloadEventDispatcher()
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
private var downloader: Downloader? = null

fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx
}

fun removeClient(ctx: WsContext) {
clients.remove(ctx.sessionId)
}

fun notifyClient(ctx: WsContext) {
ctx.send(
getStatus()
)
}

fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> notifyClient(ctx)
else -> ctx.send(
"""
|Invalid command.
|Supported commands are:
| - STATUS
| sends the current download status
|
""".trimMargin()
)
}
}

private fun notifyAllClients() {
val status = getStatus()
clients.forEach {
it.value.send(status)
}
eventDispatcher.notifyClient(ctx, getStatus())
}

private fun getStatus(): DownloadStatus {
return DownloadStatus(
private fun getStatus(): Event<DownloadStatus> {
val status = DownloadStatus(
if (downloader == null ||
downloadQueue.none { it.state == Downloading }
) {
Expand All @@ -73,6 +41,10 @@ object DownloadManager {
},
downloadQueue
)
return Event(
type = EventType.STATIC,
entity = status
)
}

suspend fun enqueue(chapterIndex: Int, mangaId: Int) {
Expand All @@ -92,12 +64,16 @@ object DownloadManager {
)
start()
}
notifyAllClients()
val status = getStatus()
eventDispatcher.enqueue(status)
eventDispatcher.notifyAllClients(status)
}

fun unqueue(chapterIndex: Int, mangaId: Int) {
downloadQueue.removeIf { it.mangaId == mangaId && it.chapterIndex == chapterIndex }
notifyAllClients()
val status = getStatus()
eventDispatcher.dequeue(status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would ever do anything. getStatus() always returns new Event which has new randomUUID(). dequeue(status) then look for this new id for removal.

Also, I don't think this unqueue is supposed to dequeue the status. unqueue is used for removing specific download job. It should notify client with new status (which will be missing the unqueued download task) which it does below. I'm not sure what this code is meant to do.

eventDispatcher.notifyAllClients(status)
}

fun start() {
Expand All @@ -107,11 +83,11 @@ object DownloadManager {
}

if (downloader == null) {
downloader = Downloader(downloadQueue) { notifyAllClients() }
downloader = Downloader(downloadQueue) { eventDispatcher.notifyAllClients(getStatus()) }
downloader!!.start()
}

notifyAllClients()
eventDispatcher.notifyAllClients(getStatus())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could notifyAllClients() maybe have been left in and only its implementation changed to use eventDispatcher? It seems like this line is repeated a lot.

}

fun stop() {
Expand All @@ -121,13 +97,13 @@ object DownloadManager {
}
}
downloader = null
notifyAllClients()
eventDispatcher.notifyAllClients(getStatus())
}

fun clear() {
stop()
downloadQueue.clear()
notifyAllClients()
eventDispatcher.notifyAllClients(getStatus())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package suwayomi.tachidesk.manga.impl.download.model
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */

import suwayomi.tachidesk.event.EventEntity
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
Expand All @@ -19,4 +20,4 @@ class DownloadChapter(
var state: DownloadState = Queued,
var progress: Float = 0f,
var tries: Int = 0
)
) : EventEntity()
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package suwayomi.tachidesk.manga.impl.download.model

import suwayomi.tachidesk.event.EventEntity

/*
* Copyright (C) Contributors to the Suwayomi project
*
Expand All @@ -10,4 +12,4 @@ package suwayomi.tachidesk.manga.impl.download.model
data class DownloadStatus(
val status: String,
val queue: List<DownloadChapter>
)
) : EventEntity()