Skip to content

Commit

Permalink
Fix data sending to Kafka REST proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Sep 20, 2023
1 parent 3de88b5 commit 2b6c3e8
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.radarbase.producer.io

import io.ktor.http.content.*
import io.ktor.utils.io.*
import io.ktor.http.ContentType
import io.ktor.http.content.OutgoingContent
import io.ktor.utils.io.ByteWriteChannel

class FunctionalWriteChannelContent(
override val contentType: ContentType,
private val writeAction: suspend (ByteWriteChannel) -> Unit,
) : OutgoingContent.WriteChannelContent() {
override suspend fun writeTo(channel: ByteWriteChannel) = writeAction(channel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class AvroContentConverter(
valueSchemaMetadata = valueSchema.await(),
)
}
maker.createContent()
maker.createContent(contentType)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package org.radarbase.producer.rest

import io.ktor.http.content.*
import io.ktor.http.ContentType
import io.ktor.http.content.OutgoingContent

interface AvroRecordContent {
fun createContent(): OutgoingContent
fun createContent(contentType: ContentType): OutgoingContent
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.radarbase.producer.rest

import io.ktor.http.content.*
import io.ktor.http.ContentType
import io.ktor.http.content.OutgoingContent
import org.radarbase.data.RecordData
import org.radarbase.data.RemoteSchemaEncoder
import org.radarbase.producer.avro.AvroDataMapperFactory
Expand Down Expand Up @@ -35,11 +36,12 @@ class BinaryRecordContent<V : Any>(
"missing key schema version"
}

override fun createContent(): OutgoingContent = FunctionalWriteChannelContent { channel ->
DirectBinaryEncoder(channel).use {
it.writeRecords()
override fun createContent(contentType: ContentType): OutgoingContent =
FunctionalWriteChannelContent(contentType) { channel ->
DirectBinaryEncoder(channel).use {
it.writeRecords()
}
}
}

private suspend fun BinaryEncoder.writeRecords() {
startItem()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.radarbase.producer.rest

import io.ktor.http.content.*
import io.ktor.utils.io.*
import io.ktor.http.ContentType
import io.ktor.http.content.OutgoingContent
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.writeByte
import io.ktor.utils.io.writeFully
import org.radarbase.data.RecordData
import org.radarbase.data.RemoteSchemaEncoder
import org.radarbase.producer.io.FunctionalWriteChannelContent
import org.radarbase.producer.schema.ParsedSchemaMetadata
import org.slf4j.LoggerFactory

class JsonRecordContent<K : Any, V : Any>(
private val records: RecordData<K, V>,
Expand All @@ -26,8 +28,8 @@ class JsonRecordContent<K : Any, V : Any>(
readerSchema = valueSchemaMetadata.schema,
)

override fun createContent(): OutgoingContent =
FunctionalWriteChannelContent { it.writeRecords() }
override fun createContent(contentType: ContentType): OutgoingContent =
FunctionalWriteChannelContent(contentType) { it.writeRecords() }

private suspend fun ByteWriteChannel.writeRecords() {
writeByte('{'.code)
Expand Down Expand Up @@ -60,7 +62,5 @@ class JsonRecordContent<K : Any, V : Any>(
val KEY = "{\"key\":".toByteArray()
val VALUE = ",\"value\":".toByteArray()
val END = "]}".toByteArray()

private val logger = LoggerFactory.getLogger(JsonRecordContent::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ class RestKafkaSender(config: Config) : KafkaSender {
private fun HttpClientConfig<*>.configure() {
timeout(connectionTimeout)
install(ContentNegotiation) {
this.register(
register(
KAFKA_REST_BINARY_ENCODING,
AvroContentConverter(schemaRetriever, binary = true),
)
this.register(
register(
KAFKA_REST_JSON_ENCODING,
AvroContentConverter(schemaRetriever, binary = false),
)
Expand All @@ -118,14 +118,11 @@ class RestKafkaSender(config: Config) : KafkaSender {
inner class RestKafkaTopicSender<K : Any, V : Any>(
override val topic: AvroTopic<K, V>,
) : KafkaTopicSender<K, V> {
@OptIn(ExperimentalStdlibApi::class)
override suspend fun send(records: RecordData<K, V>) = scope.async {
try {
val response: HttpResponse = restClient.post {
url("topics/${topic.name}")
val kType = typeOf<RecordData<Any, Any>>()
val reifiedType = kType.javaType
setBody(records, TypeInfo(RecordData::class, reifiedType, kType))
setBody(records, recordDataTypeInfo)
}
if (response.status.isSuccess()) {
_connectionState.didConnect()
Expand Down Expand Up @@ -253,11 +250,20 @@ class RestKafkaSender(config: Config) : KafkaSender {

companion object {
private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java)
private val recordDataTypeInfo: TypeInfo

val DEFAULT_TIMEOUT: Duration = 20.seconds
val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary")
val KAFKA_REST_JSON_ENCODING = ContentType("application", "vnd.kafka.avro.v2+json")
const val GZIP_CONTENT_ENCODING = "gzip"

init {
val kType = typeOf<RecordData<Any, Any>>()
@OptIn(ExperimentalStdlibApi::class)
val reifiedType = kType.javaType
recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType)
}

fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender =
RestKafkaSender(Config().apply(builder))
}
Expand Down

0 comments on commit 2b6c3e8

Please sign in to comment.