Skip to content

Commit

Permalink
Merge pull request #116 from RADAR-base/feature/timeout-duration-config
Browse files Browse the repository at this point in the history
Impl. configuration of co-routine request timeouts
  • Loading branch information
pvannierop authored Nov 27, 2024
2 parents 8aa3885 + 73272ca commit 71e7cc7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@Suppress("ConstPropertyName")
object Versions {
const val project = "0.7.6"
const val project = "0.7.7"

const val java = 17
const val kotlin = "1.9.22"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ data class GatewayServerConfig(
* This protects against memory overflows.
*/
val maxRequestSize: Long = 24 * 1024 * 1024,
/**
* Maximum time in seconds to wait for a request to complete.
* This timeout is applied to the co-routine context, not to the Grizzly server.
*/
val requestTimeout: Int = 30,
/**
* Whether JMX should be enabled. Disable if not needed, for higher performance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import jakarta.ws.rs.container.Suspended
import jakarta.ws.rs.core.Context
import jakarta.ws.rs.core.Response
import org.radarbase.auth.authorization.Permission.MEASUREMENT_CREATE
import org.radarbase.gateway.config.GatewayConfig
import org.radarbase.gateway.inject.ProcessAvro
import org.radarbase.gateway.io.AvroProcessor
import org.radarbase.gateway.io.BinaryToAvroConverter
Expand All @@ -27,6 +28,7 @@ import org.radarbase.jersey.service.AsyncCoroutineService
import org.slf4j.LoggerFactory
import java.io.IOException
import java.io.InputStream
import kotlin.time.Duration.Companion.seconds

/** Topics submission and listing. Requests need authentication. */
@Path("/topics")
Expand All @@ -41,10 +43,17 @@ import java.io.InputStream
class KafkaTopics(
@Context private val kafkaAdminService: KafkaAdminService,
@Context private val producerPool: ProducerPool,
@Context config: GatewayConfig,
@Context private val asyncService: AsyncCoroutineService,
) {

val timeout = config.server.requestTimeout.seconds

@GET
fun topics(@Suspended asyncResponse: AsyncResponse) = asyncService.runAsCoroutine(asyncResponse) {
fun topics(
@Context config: GatewayConfig,
@Suspended asyncResponse: AsyncResponse,
) = asyncService.runAsCoroutine(asyncResponse, timeout) {
kafkaAdminService.listTopics()
}

Expand All @@ -54,7 +63,7 @@ class KafkaTopics(
fun topic(
@PathParam("topic_name") topic: String,
@Suspended asyncResponse: AsyncResponse,
) = asyncService.runAsCoroutine(asyncResponse) {
) = asyncService.runAsCoroutine(asyncResponse, timeout) {
kafkaAdminService.topicInfo(topic)
}

Expand Down Expand Up @@ -82,13 +91,14 @@ class KafkaTopics(
tree: JsonNode?,
@PathParam("topic_name") topic: String,
@Context avroProcessor: AvroProcessor,
@Context config: GatewayConfig,
@Suspended asyncResponse: AsyncResponse,
) {
if (tree == null) {
asyncResponse.resume(HttpBadRequestException("missing_body", "Missing contents in body"))
return
}
asyncService.runAsCoroutine(asyncResponse) {
asyncService.runAsCoroutine(asyncResponse, timeout) {
val processingResult = avroProcessor.process(topic, tree)
producerPool.produce(topic, processingResult.records)
TopicPostResponse(processingResult.keySchemaId, processingResult.valueSchemaId)
Expand All @@ -105,13 +115,14 @@ class KafkaTopics(
input: InputStream?,
@Context binaryToAvroConverter: BinaryToAvroConverter,
@PathParam("topic_name") topic: String,
@Context config: GatewayConfig,
@Suspended asyncResponse: AsyncResponse,
) {
if (input == null) {
asyncResponse.resume(HttpBadRequestException("missing_body", "Missing contents in body"))
return
}
asyncService.runAsCoroutine(asyncResponse) {
asyncService.runAsCoroutine(asyncResponse, timeout) {
val processingResult = try {
binaryToAvroConverter.process(topic, input)
} catch (ex: IOException) {
Expand Down

0 comments on commit 71e7cc7

Please sign in to comment.