From 2b6c3e80b44a5849817d072f12cc1c8cafa8e74b Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Wed, 20 Sep 2023 11:03:57 +0200 Subject: [PATCH] Fix data sending to Kafka REST proxy --- .../io/FunctionalWriteChannelContent.kt | 6 ++++-- .../producer/rest/AvroContentConverter.kt | 2 +- .../producer/rest/AvroRecordContent.kt | 5 +++-- .../producer/rest/BinaryRecordContent.kt | 12 +++++++----- .../producer/rest/JsonRecordContent.kt | 14 +++++++------- .../radarbase/producer/rest/RestKafkaSender.kt | 18 ++++++++++++------ 6 files changed, 34 insertions(+), 23 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt b/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt index 2bf60d01..2b94d22d 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/io/FunctionalWriteChannelContent.kt @@ -1,9 +1,11 @@ package org.radarbase.producer.io -import io.ktor.http.content.* -import io.ktor.utils.io.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent +import io.ktor.utils.io.ByteWriteChannel class FunctionalWriteChannelContent( + override val contentType: ContentType, private val writeAction: suspend (ByteWriteChannel) -> Unit, ) : OutgoingContent.WriteChannelContent() { override suspend fun writeTo(channel: ByteWriteChannel) = writeAction(channel) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt index b8634e33..7aa19f61 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt @@ -51,7 +51,7 @@ class AvroContentConverter( valueSchemaMetadata = valueSchema.await(), ) } - maker.createContent() + maker.createContent(contentType) } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt index 5bcea266..2fc734e6 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroRecordContent.kt @@ -1,7 +1,8 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent interface AvroRecordContent { - fun createContent(): OutgoingContent + fun createContent(contentType: ContentType): OutgoingContent } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt index 42785c50..5fa62de9 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordContent.kt @@ -1,6 +1,7 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent import org.radarbase.data.RecordData import org.radarbase.data.RemoteSchemaEncoder import org.radarbase.producer.avro.AvroDataMapperFactory @@ -35,11 +36,12 @@ class BinaryRecordContent( "missing key schema version" } - override fun createContent(): OutgoingContent = FunctionalWriteChannelContent { channel -> - DirectBinaryEncoder(channel).use { - it.writeRecords() + override fun createContent(contentType: ContentType): OutgoingContent = + FunctionalWriteChannelContent(contentType) { channel -> + DirectBinaryEncoder(channel).use { + it.writeRecords() + } } - } private suspend fun BinaryEncoder.writeRecords() { startItem() diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt index 7fa481b9..2415abf3 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/JsonRecordContent.kt @@ -1,12 +1,14 @@ package org.radarbase.producer.rest -import io.ktor.http.content.* -import io.ktor.utils.io.* +import io.ktor.http.ContentType +import io.ktor.http.content.OutgoingContent +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.writeByte +import io.ktor.utils.io.writeFully import org.radarbase.data.RecordData import org.radarbase.data.RemoteSchemaEncoder import org.radarbase.producer.io.FunctionalWriteChannelContent import org.radarbase.producer.schema.ParsedSchemaMetadata -import org.slf4j.LoggerFactory class JsonRecordContent( private val records: RecordData, @@ -26,8 +28,8 @@ class JsonRecordContent( readerSchema = valueSchemaMetadata.schema, ) - override fun createContent(): OutgoingContent = - FunctionalWriteChannelContent { it.writeRecords() } + override fun createContent(contentType: ContentType): OutgoingContent = + FunctionalWriteChannelContent(contentType) { it.writeRecords() } private suspend fun ByteWriteChannel.writeRecords() { writeByte('{'.code) @@ -60,7 +62,5 @@ class JsonRecordContent( val KEY = "{\"key\":".toByteArray() val VALUE = ",\"value\":".toByteArray() val END = "]}".toByteArray() - - private val logger = LoggerFactory.getLogger(JsonRecordContent::class.java) } } diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index 852c3609..f0c69e89 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -89,11 +89,11 @@ class RestKafkaSender(config: Config) : KafkaSender { private fun HttpClientConfig<*>.configure() { timeout(connectionTimeout) install(ContentNegotiation) { - this.register( + register( KAFKA_REST_BINARY_ENCODING, AvroContentConverter(schemaRetriever, binary = true), ) - this.register( + register( KAFKA_REST_JSON_ENCODING, AvroContentConverter(schemaRetriever, binary = false), ) @@ -118,14 +118,11 @@ class RestKafkaSender(config: Config) : KafkaSender { inner class RestKafkaTopicSender( override val topic: AvroTopic, ) : KafkaTopicSender { - @OptIn(ExperimentalStdlibApi::class) override suspend fun send(records: RecordData) = scope.async { try { val response: HttpResponse = restClient.post { url("topics/${topic.name}") - val kType = typeOf>() - val reifiedType = kType.javaType - setBody(records, TypeInfo(RecordData::class, reifiedType, kType)) + setBody(records, recordDataTypeInfo) } if (response.status.isSuccess()) { _connectionState.didConnect() @@ -253,11 +250,20 @@ class RestKafkaSender(config: Config) : KafkaSender { companion object { private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java) + private val recordDataTypeInfo: TypeInfo + val DEFAULT_TIMEOUT: Duration = 20.seconds val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary") val KAFKA_REST_JSON_ENCODING = ContentType("application", "vnd.kafka.avro.v2+json") const val GZIP_CONTENT_ENCODING = "gzip" + init { + val kType = typeOf>() + @OptIn(ExperimentalStdlibApi::class) + val reifiedType = kType.javaType + recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType) + } + fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender = RestKafkaSender(Config().apply(builder)) }