Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Minecraft servers in containers through Kubernetes #3

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
9ef25b6
Add role to msm-app deployment with access to kubernetes API.
battery-staple Jul 22, 2024
c115802
Add sample monitor deployment
battery-staple Jul 23, 2024
9e861ad
Add jar downloading skeleton to monitor
battery-staple Jul 23, 2024
45ed9ce
Remove extraneous properties and update Java version in monitor Docke…
battery-staple Jul 23, 2024
f28456a
Add KubernetesRunner skeleton and fill out prepareEnvironment
battery-staple Jul 23, 2024
6a84ae3
Improve monitor deployment to use named target ports
battery-staple Jul 23, 2024
8f28e96
Make monitor not download jar each time
battery-staple Jul 23, 2024
18b0f9a
Fix monitor port names to keep them under the 15 character limit
battery-staple Jul 25, 2024
008a7de
Configure traefik gateway to control access to app and running servers
battery-staple Jul 25, 2024
70a4955
Extract service exclusively for minecraft, allowing reassigning that …
battery-staple Jul 25, 2024
d61d313
Allow running servers in Kubernetes
battery-staple Jul 25, 2024
bc42e71
Extract PipingMinecraftServerProcess from MinecraftServerProcessImpl …
battery-staple Jul 26, 2024
29891b2
Implement `MinecraftServerProcess` for Kubernetes servers with `Minec…
battery-staple Jul 26, 2024
26fbb6c
Add SHA-1 hash validation for jar downloads in monitor
battery-staple Jul 27, 2024
e5ba219
Removed double Base64 encoding of monitor deployment secret
battery-staple Jul 27, 2024
e2b77c3
Fixed websocket connection between app and monitor
battery-staple Jul 27, 2024
876c3e8
Fixed creation and running of Minecraft server pods
battery-staple Jul 27, 2024
22139d9
Allow usage of the Kubernetes runner to the frontend
battery-staple Jul 27, 2024
b893ef4
Change monitor names to include their server's UUID
battery-staple Jul 27, 2024
44bc9b4
Use random tokens for communication with monitors
battery-staple Jul 27, 2024
96f2eff
Inject Kubernetes API Client and associated objects
battery-staple Jul 27, 2024
db0c813
Persist Kubernetes environments between runs
battery-staple Jul 27, 2024
9b19663
Persist and use random auth tokens
battery-staple Jul 29, 2024
7ec70de
Fix monitor PVC name
battery-staple Jul 29, 2024
4c0961a
Fix bug where some console messages were not sent to the frontend
battery-staple Jul 29, 2024
e92f405
Fix initialization order bug preventing cleanup of old runs
battery-staple Jul 29, 2024
23a3ba6
Refactor PipingMinecraftServerProcess to be more idiomatic Kotlin
battery-staple Jul 29, 2024
d626d34
Recreate Websocket connection to monitor pods after they expire
battery-staple Jul 31, 2024
4a1bfe8
Add error handling and exponential backoff to make monitor Websocket …
battery-staple Jul 31, 2024
cad26b6
Remove repeated logging of each Websocket message. This declutters th…
battery-staple Jul 31, 2024
dfed39a
Refactor websocket connection logic in MinecraftServerPod
battery-staple Jul 31, 2024
b30b946
Introduce tryWithBackoff to simplify backoff in websocket and databas…
battery-staple Aug 1, 2024
87bb17f
Allow MinecraftServerPod to watch the kubernetes pod and update when …
battery-staple Aug 1, 2024
4611a42
Fix persistence of `LocalMinecraftServerEnvironment`s
battery-staple Aug 1, 2024
b92fc03
Refactor MinecraftServerPod to improve reliability, including recover…
battery-staple Aug 17, 2024
77c337f
Rename MinecraftServerPod to DeploymentProcess. Since this class now …
battery-staple Aug 17, 2024
a2b1208
Prevent KubernetesRunner#prepareEnvironment from failing silently
battery-staple Aug 17, 2024
c479404
Implement KubernetesRunner#cleanupEnvironment
battery-staple Aug 18, 2024
58afef9
Move PipingMinecraftServerProcess to its own file
battery-staple Aug 18, 2024
a799325
Fix a bug where a CurrentRun that continues while the application fai…
battery-staple Aug 19, 2024
7ca6da9
Ensure `MinecraftServerRunner`s only clean up their own `CurrentRunRe…
battery-staple Aug 20, 2024
711df0d
Clean up monitor code
battery-staple Aug 20, 2024
4f57d6e
Extract request building logic in monitor into function
battery-staple Aug 20, 2024
110ecad
Fix accidental usage of the Watch logger in other files
battery-staple Aug 20, 2024
9a01df5
Add necessary permissions to allow deleting Kubernetes servers
battery-staple Aug 22, 2024
de62b0c
Handle deletion of K8s servers when some components have been (errone…
battery-staple Aug 23, 2024
316fb8f
Improve logging in `RestAPIServiceImple#createServer`
battery-staple Jan 5, 2025
e3e3986
Make `KubernetesEnvironment` take a `MinecraftServer` rather than jus…
battery-staple Jan 5, 2025
99ab309
Stop running Kubernetes servers immediately when created (wait until …
battery-staple Jan 5, 2025
0630c3b
Save log files by run UUID on monitor for later access
battery-staple Jan 7, 2025
af9ac23
Make MonitorTokenRepository#getTokenForServer correctly return null w…
battery-staple Jan 8, 2025
71cd545
Improve documentation and logging in `KubernetesRunner#initialProcess`
battery-staple Jan 10, 2025
c96f59f
Fix log message order in the websocket console route
battery-staple Jan 10, 2025
3598d5d
Move `MutexGuardedResource` to the shared library
battery-staple Jan 10, 2025
3e44950
Add more idiomatic factory methods for initializing `MutexGuardedReso…
battery-staple Jan 10, 2025
8c3def7
Move custom status page exceptions into shared library
battery-staple Jan 12, 2025
ed7eb77
Add optional messages to status page exceptions
battery-staple Jan 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move PipingMinecraftServerProcess to its own file
battery-staple committed Aug 18, 2024

Verified

This commit was signed with the committer’s verified signature.
battery-staple Rohen Giralt
commit 58afef903115d06979fa0808031a595eddab6d17
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
package com.rohengiralt.shared.serverProcess

import com.rohengiralt.shared.serverProcess.MinecraftServerProcess.ProcessMessage
import com.rohengiralt.shared.serverProcess.ServerIO.Input.InputMessage
import com.rohengiralt.shared.serverProcess.ServerIO.Output
import com.rohengiralt.shared.util.assertAllPropertiesNotNull
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.*
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.flow.Flow
import kotlin.time.Duration

/**
@@ -74,182 +66,3 @@ interface MinecraftServerProcess {
}
}

abstract class PipingMinecraftServerProcess(protected val serverName: String) : MinecraftServerProcess {
override val output: Flow<ProcessMessage<Output>> by lazy {
assertInv()
_output.asSharedFlow()
}

override val input: SendChannel<String> by lazy {
assertInv()
_input
}
override val interleavedIO: Flow<ProcessMessage<ServerIO>> by lazy {
assertInv()
_interleavedIO.asSharedFlow()
}

/**
* The [MutableSharedFlow] that underlies [interleavedIO].
* This field is necessary to allow sending to the flow from within this class but not from outside.
*/
private val _interleavedIO: MutableSharedFlow<ProcessMessage<ServerIO>> = MutableSharedFlow(Channel.UNLIMITED)

/**
* The [Channel] that underlies [input].
* This field is necessary to allow sending to the channel from within this class but to discourage doing the same
* from the outside.
*/
private val _input: Channel<String> = Channel()

/**
* The [MutableSharedFlow] that underlies [output].
* This field is necessary to allow sending to the flow from within this class but not from outside.
*/
private val _output: MutableSharedFlow<ProcessMessage<Output>> = MutableSharedFlow(Channel.UNLIMITED)
private val scope = CoroutineScope(Dispatchers.IO) // All jobs are likely to block often, so Dispatchers.IO is best
private val jobs = mutableListOf<Job>()
private val jobsAreInitialized = AtomicBoolean()
private val logger = LoggerFactory.getLogger(PipingMinecraftServerProcess::class.java) // not this; we want to disambiguate between sub- and superclass operations

/**
* The job that handles piping from [input] into the process' standard input and [interleavedIO].
*/
private suspend fun inputChannelJob() {
logger.info("Input channel job started")
_input
.consumeAsFlow()
.flowOn(Dispatchers.IO)
.collect { input ->
try {
logger.trace("Got new input: $input")
val cleanedInput = input.trimEnd()

trySend(cleanedInput)

logger.trace("Sending input message to interleavedIO")
_interleavedIO.emit(ProcessMessage.IO(InputMessage(cleanedInput)))

logger.debug("Sent input: $input")
} catch (e: IOException) {
logger.warn("Cannot send input $input", e)
}
}
}

/**
* Sends a message to the minecraft server, or throws an IOException if not possible.
*/
protected abstract suspend fun trySend(input: String)

/**
* The job that handles piping from the process' standard output
* and standard error into [output] and [interleavedIO].
*/
private suspend fun outputChannelJob() = coroutineScope {
logger.info("Output channel job started")
launch { pipeOutputJob(stdOut, createMessage = Output::LogMessage, streamName = "stdout") }
launch { pipeOutputJob(stdError, createMessage = Output::ErrorMessage, streamName = "stderr") }
logger.debug("Output channel job ended")
}

/**
* Helper method to pipe from an output stream into [output] and [interleavedIO].
*/
private suspend fun pipeOutputJob(output: Flow<String>?, createMessage: (String) -> Output, streamName: String) {
try {
if (output == null) {
logger.error("Cannot read from stream $streamName")
return
}

output.collect {
try {
logger.trace("[SERVER $serverName $streamName]: $it")

_output.emit(ProcessMessage.IO(createMessage(it)))

_interleavedIO.emit(ProcessMessage.IO(createMessage(it)))
} catch (e: IOException) {
logger.warn("Cannot read server output, got exception $e")
}
}
} catch (e: CancellationException) {
logger.info("Output channel job for $streamName cancelled")
} catch (e: Throwable) {
logger.error("Output stream $streamName threw error $e")
} finally {
logger.info("Output stream $streamName ended")
}
}

/**
* A flow that contains each message in the server's standard out as it is sent.
*/
protected abstract val stdOut: Flow<String>?

/**
* A flow that contains each message in the server's standard error as it is sent.
*/
protected abstract val stdError: Flow<String>?

/**
* The job that handles cleanup when the process ends.
*/
private suspend fun endJob() {
var status: Int? = null
try {
status = withContext(Dispatchers.IO) {
waitForExit()
}
logger.info("Minecraft Server ended with exit code ${status ?: "unknown"}")
cancelAllJobs()
} catch (e: CancellationException) {
logger.info("Process end job cancelled")
} catch (e: Throwable) {
logger.error("Process ended with error $e")
cancelAllJobs()
} finally {
@OptIn(ExperimentalCoroutinesApi::class)
_output.resetReplayCache()
_output.emit(ProcessMessage.ProcessEnd(status))
}
}

/**
* Suspends until the server ends.
* @return the server's exit code, or null if unknown
*/
protected abstract suspend fun waitForExit(): Int?

/**
* Helper method to cancel all running jobs to avoid wasting resources after the process ends.
*/
private fun cancelAllJobs() {
@Suppress("ControlFlowWithEmptyBody")
while (!jobsAreInitialized.get()) {} // TODO: Remove inefficient spin loop
jobs.forEach { job -> job.cancel() }
}

/**
* Begins piping to and from [input], [output], and [interleavedIO].
* Precondition: if this function is called at class construction,
* it must be called at the end of the constructor after all properties are initialized.
* If this method is not called by a subclass, it will be automatically called
* on first access of [input], [output], or [interleavedIO]
*/
protected fun initIO() {
assertAllPropertiesNotNull() // Ensure all properties are initialized

jobs += scope.launch { inputChannelJob() }
jobs += scope.launch { outputChannelJob() }
jobs += scope.launch { endJob() }

jobsAreInitialized.set(true)
logger.debug("Launched all jobs")
}

private fun assertInv() {
assert(jobsAreInitialized.get()) { "Jobs not yet all initialized" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package com.rohengiralt.shared.serverProcess

import com.rohengiralt.shared.util.assertAllPropertiesNotNull
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.*
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean

abstract class PipingMinecraftServerProcess(protected val serverName: String) : MinecraftServerProcess {
override val output: Flow<MinecraftServerProcess.ProcessMessage<ServerIO.Output>> by lazy {
assertInv()
_output.asSharedFlow()
}

override val input: SendChannel<String> by lazy {
assertInv()
_input
}
override val interleavedIO: Flow<MinecraftServerProcess.ProcessMessage<ServerIO>> by lazy {
assertInv()
_interleavedIO.asSharedFlow()
}

/**
* The [MutableSharedFlow] that underlies [interleavedIO].
* This field is necessary to allow sending to the flow from within this class but not from outside.
*/
private val _interleavedIO: MutableSharedFlow<MinecraftServerProcess.ProcessMessage<ServerIO>> =
MutableSharedFlow(Channel.UNLIMITED)

/**
* The [Channel] that underlies [input].
* This field is necessary to allow sending to the channel from within this class but to discourage doing the same
* from the outside.
*/
private val _input: Channel<String> = Channel()

/**
* The [MutableSharedFlow] that underlies [output].
* This field is necessary to allow sending to the flow from within this class but not from outside.
*/
private val _output: MutableSharedFlow<MinecraftServerProcess.ProcessMessage<ServerIO.Output>> =
MutableSharedFlow(Channel.UNLIMITED)
private val scope = CoroutineScope(Dispatchers.IO) // All jobs are likely to block often, so Dispatchers.IO is best
private val jobs = mutableListOf<Job>()
private val jobsAreInitialized = AtomicBoolean()
private val logger =
LoggerFactory.getLogger(PipingMinecraftServerProcess::class.java) // not this; we want to disambiguate between sub- and superclass operations

/**
* The job that handles piping from [input] into the process' standard input and [interleavedIO].
*/
private suspend fun inputChannelJob() {
logger.info("Input channel job started")
_input
.consumeAsFlow()
.flowOn(Dispatchers.IO)
.collect { input ->
try {
logger.trace("Got new input: $input")
val cleanedInput = input.trimEnd()

trySend(cleanedInput)

logger.trace("Sending input message to interleavedIO")
_interleavedIO.emit(
MinecraftServerProcess.ProcessMessage.IO(
ServerIO.Input.InputMessage(
cleanedInput
)
)
)

logger.debug("Sent input: $input")
} catch (e: IOException) {
logger.warn("Cannot send input $input", e)
}
}
}

/**
* Sends a message to the minecraft server, or throws an IOException if not possible.
*/
protected abstract suspend fun trySend(input: String)

/**
* The job that handles piping from the process' standard output
* and standard error into [output] and [interleavedIO].
*/
private suspend fun outputChannelJob() = coroutineScope {
logger.info("Output channel job started")
launch { pipeOutputJob(stdOut, createMessage = ServerIO.Output::LogMessage, streamName = "stdout") }
launch { pipeOutputJob(stdError, createMessage = ServerIO.Output::ErrorMessage, streamName = "stderr") }
logger.debug("Output channel job ended")
}

/**
* Helper method to pipe from an output stream into [output] and [interleavedIO].
*/
private suspend fun pipeOutputJob(output: Flow<String>?, createMessage: (String) -> ServerIO.Output, streamName: String) {
try {
if (output == null) {
logger.error("Cannot read from stream $streamName")
return
}

output.collect {
try {
logger.trace("[SERVER $serverName $streamName]: $it")

_output.emit(MinecraftServerProcess.ProcessMessage.IO(createMessage(it)))

_interleavedIO.emit(MinecraftServerProcess.ProcessMessage.IO(createMessage(it)))
} catch (e: IOException) {
logger.warn("Cannot read server output, got exception $e")
}
}
} catch (e: CancellationException) {
logger.info("Output channel job for $streamName cancelled")
} catch (e: Throwable) {
logger.error("Output stream $streamName threw error $e")
} finally {
logger.info("Output stream $streamName ended")
}
}

/**
* A flow that contains each message in the server's standard out as it is sent.
*/
protected abstract val stdOut: Flow<String>?

/**
* A flow that contains each message in the server's standard error as it is sent.
*/
protected abstract val stdError: Flow<String>?

/**
* The job that handles cleanup when the process ends.
*/
private suspend fun endJob() {
var status: Int? = null
try {
status = withContext(Dispatchers.IO) {
waitForExit()
}
logger.info("Minecraft Server ended with exit code ${status ?: "unknown"}")
cancelAllJobs()
} catch (e: CancellationException) {
logger.info("Process end job cancelled")
} catch (e: Throwable) {
logger.error("Process ended with error $e")
cancelAllJobs()
} finally {
@OptIn(ExperimentalCoroutinesApi::class)
_output.resetReplayCache()
_output.emit(MinecraftServerProcess.ProcessMessage.ProcessEnd(status))
}
}

/**
* Suspends until the server ends.
* @return the server's exit code, or null if unknown
*/
protected abstract suspend fun waitForExit(): Int?

/**
* Helper method to cancel all running jobs to avoid wasting resources after the process ends.
*/
private fun cancelAllJobs() {
@Suppress("ControlFlowWithEmptyBody")
while (!jobsAreInitialized.get()) {} // TODO: Remove inefficient spin loop
jobs.forEach { job -> job.cancel() }
}

/**
* Begins piping to and from [input], [output], and [interleavedIO].
* Precondition: if this function is called at class construction,
* it must be called at the end of the constructor after all properties are initialized.
* If this method is not called by a subclass, it will be automatically called
* on first access of [input], [output], or [interleavedIO]
*/
protected fun initIO() {
assertAllPropertiesNotNull() // Ensure all properties are initialized

jobs += scope.launch { inputChannelJob() }
jobs += scope.launch { outputChannelJob() }
jobs += scope.launch { endJob() }

jobsAreInitialized.set(true)
logger.debug("Launched all jobs")
}

private fun assertInv() {
assert(jobsAreInitialized.get()) { "Jobs not yet all initialized" }
}
}