Skip to content

Commit

Permalink
GH-2179 Switch to using an Server-Sent Event endpoint for Console (Re…
Browse files Browse the repository at this point in the history
…solve #2120)

* rename internal CliEndpoint class to follow file naming

* initial implementation

* don't hang when stopping server

* remove unused sse handler parameter

* replace event source library

* add todo to error toast

* minor cleanup

* close all sse connections when shutting down reposilite

* ping sse connection every second (closes connection if ping fails)

* close sse connection before page is reloaded/refreshed

* use an iterator instead of forEach to avoid CME

* remove TODO comments about endpoint name

* better openapi data

* remove unneeded termination check

* remove redundant users check

* use `ctx.ip()` instead of extension

* update connection error toast message

* update readystate constant comment

* move onbeforeload call into onopen listener

* ping every 5 seconds instead of 1 second

* disable watcher before closing sse clients

* rename consumer function to handleSseLiveLog

* stop stack overflow log spam when sse connection is terminated

* bump journalist version, fixes CME errors

* move watcher into sse handler

* minor cleanup

* remove final TODO comment
  • Loading branch information
granny authored Aug 18, 2024
1 parent 80ba6a4 commit 647bb7f
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 28 deletions.
4 changes: 2 additions & 2 deletions reposilite-backend/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ dependencies {
val jansi = "2.4.1"
implementation("org.fusesource.jansi:jansi:$jansi")

val journalist = "1.0.10"
val journalist = "1.0.12"
api("com.reposilite:journalist:$journalist")
implementation("com.reposilite:journalist-slf4j:$journalist")
implementation("com.reposilite:journalist-tinylog:$journalist")
Expand Down Expand Up @@ -280,4 +280,4 @@ tasks["jacocoTestCoverageVerification"].mustRunAfter(tasks["jacocoTestReport"])
//
//tasks.withType<DetektCreateBaselineTask>().configureEach {
// jvmTarget = "11"
//}
//}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import com.reposilite.console.HelpCommand
import com.reposilite.console.LevelCommand
import com.reposilite.console.StopCommand
import com.reposilite.console.api.CommandsSetupEvent
import com.reposilite.console.infrastructure.CliEndpoint
import com.reposilite.console.infrastructure.ConsoleWebSocketHandler
import com.reposilite.console.infrastructure.ConsoleEndpoint
import com.reposilite.console.infrastructure.ConsoleSseHandler
import com.reposilite.plugin.api.Facade
import com.reposilite.plugin.api.Plugin
import com.reposilite.plugin.api.ReposiliteDisposeEvent
Expand All @@ -35,13 +36,21 @@ import com.reposilite.plugin.reposilite
import com.reposilite.web.api.HttpServerInitializationEvent
import com.reposilite.web.api.RoutingSetupEvent
import com.reposilite.web.application.WebSettings
import io.javalin.http.sse.SseClient

@Plugin(name = "console", dependencies = [ "shared-configuration", "failure", "access-token", "authentication" ])
internal class ConsolePlugin : ReposilitePlugin() {

override fun initialize(): Facade {
val sharedConfigurationFacade = facade<SharedConfigurationFacade>()
val consoleFacade = ConsoleComponents(this, facade()).consoleFacade()
val client = ConsoleSseHandler(
journalist = reposilite().journalist,
accessTokenFacade = facade(),
authenticationFacade = facade(),
forwardedIp = sharedConfigurationFacade.getDomainSettings<WebSettings>().computed { it.forwardedIp },
scheduler = reposilite().scheduler
)

event { _: ReposiliteInitializeEvent ->
consoleFacade.registerCommand(HelpCommand(consoleFacade))
Expand All @@ -66,14 +75,18 @@ internal class ConsolePlugin : ReposilitePlugin() {
event.config.router.mount {
it.ws(
"/api/console/sock",
CliEndpoint(
ConsoleWebSocketHandler(
journalist = reposilite().journalist,
accessTokenFacade = facade(),
authenticationFacade = facade(),
consoleFacade = consoleFacade,
forwardedIp = sharedConfigurationFacade.getDomainSettings<WebSettings>().computed { it.forwardedIp }
)
)
it.sse(
"/api/console/log",
client::handleSseLiveLog
)
}
}

Expand All @@ -91,6 +104,7 @@ internal class ConsolePlugin : ReposilitePlugin() {

event { _: ReposiliteDisposeEvent ->
consoleFacade.commandExecutor.stop()
client.users.keys.forEach(SseClient::close)
}

return consoleFacade
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.reposilite.console.infrastructure

import com.reposilite.ReposiliteJournalist
import com.reposilite.auth.AuthenticationFacade
import com.reposilite.auth.api.Credentials
import com.reposilite.shared.ErrorResponse
import com.reposilite.shared.extractFromHeader
import com.reposilite.shared.unauthorized
import com.reposilite.token.AccessTokenFacade
import com.reposilite.token.AccessTokenPermission
import io.javalin.http.Context
import io.javalin.http.Header
import io.javalin.http.sse.SseClient
import io.javalin.openapi.HttpMethod
import io.javalin.openapi.OpenApi
import io.javalin.openapi.OpenApiParam
import io.javalin.openapi.OpenApiResponse
import panda.std.Result
import panda.std.reactive.Reference
import java.util.*
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

private const val SSE_EVENT_NAME = "log"

data class SseSession(
val identifier: String,
val subscriberId: Int,
val scheduler: ScheduledFuture<*>
)

internal class ConsoleSseHandler(
private val journalist: ReposiliteJournalist,
private val accessTokenFacade: AccessTokenFacade,
private val authenticationFacade: AuthenticationFacade,
private val forwardedIp: Reference<String>,
private val scheduler: ScheduledExecutorService
) {

internal val users: WeakHashMap<SseClient, SseSession> = WeakHashMap()

@OpenApi(
path = "/api/console/log",
methods = [HttpMethod.GET],
headers = [OpenApiParam(name = "Authorization", description = "Name and secret provided as basic auth credentials", required = true)],
description = "Streams the output of logs through an SSE Connection.",
responses = [
OpenApiResponse(
status = "200",
description = "Continuously sends out the log as messages under the `log` event. Sends a keepalive ping through comments."
)
],
tags = ["Console"]
)
fun handleSseLiveLog(sse: SseClient) {
sse.keepAlive()
sse.onClose { ->
val session = users.remove(sse) ?: return@onClose
session.scheduler.cancel(false)
journalist.logger.info("CLI | ${session.identifier} closed connection")
journalist.unsubscribe(session.subscriberId)
}

authenticateContext(sse.ctx())
.peek { identifier ->
journalist.logger.info("CLI | $identifier accessed remote console")

val subscriberId = journalist.subscribe {
// stop stack overflow log spam
if (!sse.terminated()) {
sse.sendEvent(SSE_EVENT_NAME, it.value)
}
}

val watcher = scheduler.scheduleWithFixedDelay({
sse.sendComment("ping")
}, 5, 5, TimeUnit.SECONDS)

users[sse] = SseSession(identifier, subscriberId, watcher)

journalist.cachedLogger.messages.forEach { message ->
sse.sendEvent(SSE_EVENT_NAME, message.value)
}
}
.onError {
journalist.logger.info("CLI | ${it.message} (${it.status})")
sse.sendEvent(SSE_EVENT_NAME, it)
sse.close()
}
}

private fun authenticateContext(connection: Context): Result<String, ErrorResponse> {
return extractFromHeader(connection.header(Header.AUTHORIZATION))
.map { (name, secret) ->
Credentials(
host = connection.ip(),
name = name,
secret = secret
)
}
.flatMap { authenticationFacade.authenticateByCredentials(it) }
.filter(
{ accessTokenFacade.hasPermission(it.identifier, AccessTokenPermission.MANAGER) },
{ unauthorized("Unauthorized CLI access request from ${connection.ip()}") }
)
.map { "${it.name}@${connection.ip()}" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private data class WsSession(
val subscriberId: Int,
)

internal class CliEndpoint(
internal class ConsoleWebSocketHandler(
private val journalist: ReposiliteJournalist,
private val accessTokenFacade: AccessTokenFacade,
private val authenticationFacade: AuthenticationFacade,
Expand Down
6 changes: 6 additions & 0 deletions reposilite-frontend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions reposilite-frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"apexcharts": "^3.51.0",
"axios": "^1.7.4",
"downloadjs": "^1.4.7",
"extended-eventsource": "^1.4.9",
"mime-types": "^2.1.35",
"mosha-vue-toastify": "^1.0.23",
"pretty-bytes": "^6.1.1",
Expand Down
8 changes: 4 additions & 4 deletions reposilite-frontend/src/components/console/ConsoleView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ const props = defineProps({
const { levels, log, logMessage, filter, clearLog } = useLog()
const {
onOpen, onMessage, onClose, onError,
const {
onOpen, onMessage, onClose, onError,
connect,
//close,
command,
Expand Down Expand Up @@ -60,7 +60,7 @@ const setupConnection = () => {
}
onError.value = error => {
console.log(error)
createToast(`Console connection error - Make sure that WebSockets are enabled.`, { type: 'danger' })
createToast(`Console connection error - Cannot establish SSE connection.`, { type: 'danger' })
}
onClose.value = () => createToast('Connection with console has been lost', { type: 'danger' })
createToast('Connecting to the remote console', { type: 'info', })
Expand Down Expand Up @@ -111,4 +111,4 @@ watch(
/>
</div>
</div>
</template>
</template>
12 changes: 11 additions & 1 deletion reposilite-frontend/src/store/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ const createClient = (defaultName, defaultSecret) => {
return get("/api/auth/me")
},
},
console: {},
console: {
execute(command) {
return axios.post(
createURL(`/api/console/execute`), command, {
headers: {
...defaultAuthorization().headers,
}
}
)
}
},
status: {
instance() {
return get("/api/status/instance")
Expand Down
46 changes: 28 additions & 18 deletions reposilite-frontend/src/store/console/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@

import { ref } from "vue"
import { createURL } from '../client'
import { EventSource as Eventsource } from 'extended-eventsource';
import { useSession } from "../session.js";

const { client } = useSession()

const connection = ref()
const command = ref("")

export default function useConsole() {
const consoleAddress = createURL("/api/console/sock")
.replace("https", "wss")
.replace("http", "ws")
const consoleAddress = createURL("/api/console/log");

const isConnected = () =>
connection.value?.readyState === WebSocket.OPEN
const isConnected = () => {
// using built-in EventSource for readystate constants
// the ones from extended-eventsource return undefined for some reason
return connection.value?.readyState === EventSource.OPEN
}

const close = () => {
if (isConnected())
Expand All @@ -45,7 +50,7 @@ export default function useConsole() {

const execute = () => {
addCommandToHistory(command.value)
connection.value.send(command.value)
client.value.console.execute(command.value)
command.value = ''
}

Expand Down Expand Up @@ -74,30 +79,35 @@ export default function useConsole() {

const connect = (token) => {
try {
connection.value = new WebSocket(consoleAddress)
connection.value = new Eventsource(consoleAddress, {
headers: {
Authorization: `xBasic ${btoa(`${token.name}:${token.secret}`)}`
},
disableRetry: true
})

connection.value.onopen = () => {
connection.value.send(`Authorization:${token.name}:${token.secret}`)
// this is needed to stop an error from appearing in console when
// switching/refreshing the page without closing the connection
window.onbeforeunload = function () {
close();
};

onOpen?.value()
}

connection.value.onmessage = (event) => {
if (event.data != "keep-alive" && !event.data.toString().includes("GET /api/status/instance from"))
connection.value.addEventListener("log", (event) => {
if (!event.data.toString().includes("GET /api/status/instance from"))
onMessage?.value(event.data)
}
})

connection.value.onerror = (error) =>
connection.value.onerror = (error) => {
onError?.value(error)
}

connection.value.onclose = () =>
onClose?.value()

const keepAliveInterval = setInterval(() => {
if (isConnected())
connection?.value?.send("keep-alive")
else
clearInterval(keepAliveInterval)
}, 1000 * 5)
} catch (error) {
onError?.value(error)
}
Expand Down

0 comments on commit 647bb7f

Please sign in to comment.