diff --git a/buildSrc/src/main/kotlin/Versions.kt b/buildSrc/src/main/kotlin/Versions.kt index 7d58bee..f1e5d37 100644 --- a/buildSrc/src/main/kotlin/Versions.kt +++ b/buildSrc/src/main/kotlin/Versions.kt @@ -1,6 +1,6 @@ @Suppress("ConstPropertyName") object Versions { - const val project = "0.7.6" + const val project = "0.7.7" const val java = 17 const val kotlin = "1.9.22" diff --git a/radar-gateway/src/main/kotlin/org/radarbase/gateway/config/GatewayServerConfig.kt b/radar-gateway/src/main/kotlin/org/radarbase/gateway/config/GatewayServerConfig.kt index 3b52e17..bb8cb5a 100644 --- a/radar-gateway/src/main/kotlin/org/radarbase/gateway/config/GatewayServerConfig.kt +++ b/radar-gateway/src/main/kotlin/org/radarbase/gateway/config/GatewayServerConfig.kt @@ -12,6 +12,11 @@ data class GatewayServerConfig( * This protects against memory overflows. */ val maxRequestSize: Long = 24 * 1024 * 1024, + /** + * Maximum time in seconds to wait for a request to complete. + * This timeout is applied to the co-routine context, not to the Grizzly server. + */ + val requestTimeout: Int = 30, /** * Whether JMX should be enabled. Disable if not needed, for higher performance. */ diff --git a/radar-gateway/src/main/kotlin/org/radarbase/gateway/resource/KafkaTopics.kt b/radar-gateway/src/main/kotlin/org/radarbase/gateway/resource/KafkaTopics.kt index ddcc815..dabfcd6 100644 --- a/radar-gateway/src/main/kotlin/org/radarbase/gateway/resource/KafkaTopics.kt +++ b/radar-gateway/src/main/kotlin/org/radarbase/gateway/resource/KafkaTopics.kt @@ -15,6 +15,7 @@ import jakarta.ws.rs.container.Suspended import jakarta.ws.rs.core.Context import jakarta.ws.rs.core.Response import org.radarbase.auth.authorization.Permission.MEASUREMENT_CREATE +import org.radarbase.gateway.config.GatewayConfig import org.radarbase.gateway.inject.ProcessAvro import org.radarbase.gateway.io.AvroProcessor import org.radarbase.gateway.io.BinaryToAvroConverter @@ -27,6 +28,7 @@ import org.radarbase.jersey.service.AsyncCoroutineService import org.slf4j.LoggerFactory import java.io.IOException import java.io.InputStream +import kotlin.time.Duration.Companion.seconds /** Topics submission and listing. Requests need authentication. */ @Path("/topics") @@ -41,10 +43,17 @@ import java.io.InputStream class KafkaTopics( @Context private val kafkaAdminService: KafkaAdminService, @Context private val producerPool: ProducerPool, + @Context config: GatewayConfig, @Context private val asyncService: AsyncCoroutineService, ) { + + val timeout = config.server.requestTimeout.seconds + @GET - fun topics(@Suspended asyncResponse: AsyncResponse) = asyncService.runAsCoroutine(asyncResponse) { + fun topics( + @Context config: GatewayConfig, + @Suspended asyncResponse: AsyncResponse, + ) = asyncService.runAsCoroutine(asyncResponse, timeout) { kafkaAdminService.listTopics() } @@ -54,7 +63,7 @@ class KafkaTopics( fun topic( @PathParam("topic_name") topic: String, @Suspended asyncResponse: AsyncResponse, - ) = asyncService.runAsCoroutine(asyncResponse) { + ) = asyncService.runAsCoroutine(asyncResponse, timeout) { kafkaAdminService.topicInfo(topic) } @@ -82,13 +91,14 @@ class KafkaTopics( tree: JsonNode?, @PathParam("topic_name") topic: String, @Context avroProcessor: AvroProcessor, + @Context config: GatewayConfig, @Suspended asyncResponse: AsyncResponse, ) { if (tree == null) { asyncResponse.resume(HttpBadRequestException("missing_body", "Missing contents in body")) return } - asyncService.runAsCoroutine(asyncResponse) { + asyncService.runAsCoroutine(asyncResponse, timeout) { val processingResult = avroProcessor.process(topic, tree) producerPool.produce(topic, processingResult.records) TopicPostResponse(processingResult.keySchemaId, processingResult.valueSchemaId) @@ -105,13 +115,14 @@ class KafkaTopics( input: InputStream?, @Context binaryToAvroConverter: BinaryToAvroConverter, @PathParam("topic_name") topic: String, + @Context config: GatewayConfig, @Suspended asyncResponse: AsyncResponse, ) { if (input == null) { asyncResponse.resume(HttpBadRequestException("missing_body", "Missing contents in body")) return } - asyncService.runAsCoroutine(asyncResponse) { + asyncService.runAsCoroutine(asyncResponse, timeout) { val processingResult = try { binaryToAvroConverter.process(topic, input) } catch (ex: IOException) {