diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt index 9b3a184f..35b8d64f 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroContentConverter.kt @@ -25,17 +25,17 @@ class AvroContentConverter( return coroutineScope { val keySchema = async { - schemaRetriever.metadata( + schemaRetriever.getByVersion( topic = value.topic.name, ofValue = false, - schema = value.topic.keySchema, + version = -1, ) } val valueSchema = async { - schemaRetriever.metadata( + schemaRetriever.getByVersion( topic = value.topic.name, ofValue = true, - schema = value.topic.valueSchema, + version = -1, ) } val maker = if (binary) { diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt new file mode 100644 index 00000000..392725d3 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RadarParameterizedType.kt @@ -0,0 +1,25 @@ +package org.radarbase.producer.rest + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type + +class RadarParameterizedType( + private val raw: Class<*>, + private val args: Array, + private val owner: Type? = null, +) : ParameterizedType { + override fun getRawType(): Type = raw + override fun getActualTypeArguments(): Array = args + override fun getOwnerType(): Type? = owner + + override fun toString(): String { + return buildString { + append(raw.typeName) + if (args.isNotEmpty()) { + append('<') + append(args.joinToString(", ") { it.typeName }) + append('>') + } + } + } +} diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt index 9c3cc082..efccbf84 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/RestKafkaSender.kt @@ -61,8 +61,6 @@ import org.radarbase.topic.AvroTopic import org.radarbase.util.RadarProducerDsl import org.slf4j.LoggerFactory import java.io.IOException -import kotlin.reflect.javaType -import kotlin.reflect.typeOf import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -143,6 +141,16 @@ class RestKafkaSender(config: Config) : KafkaSender { inner class RestKafkaTopicSender( override val topic: AvroTopic, ) : KafkaTopicSender { + + val recordDataTypeInfo: TypeInfo = TypeInfo( + type = RecordData::class, + kotlinType = null, + reifiedType = RadarParameterizedType( + raw = RecordData::class.java, + args = arrayOf(topic.keyClass, topic.valueClass), + ), + ) + override suspend fun send(records: RecordData) = withContext(scope.coroutineContext) { try { val response: HttpResponse = restClient.post { @@ -275,7 +283,6 @@ class RestKafkaSender(config: Config) : KafkaSender { companion object { private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java) - private val recordDataTypeInfo: TypeInfo val DEFAULT_TIMEOUT: Duration = 20.seconds val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary") @@ -283,14 +290,6 @@ class RestKafkaSender(config: Config) : KafkaSender { val KAFKA_REST_ACCEPT = ContentType("application", "vnd.kafka.v2+json") const val GZIP_CONTENT_ENCODING = "gzip" - init { - val kType = typeOf>() - - @OptIn(ExperimentalStdlibApi::class) - val reifiedType = kType.javaType - recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType) - } - fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender = RestKafkaSender(Config().apply(builder)) } diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt index 2194ba9a..d2c65432 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/RestKafkaSenderTest.kt @@ -101,15 +101,15 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(key, value) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) val body = READER.readTree(request.body.inputStream()) @@ -145,15 +145,15 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(key, value) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) var decoder = DecoderFactory.get().directBinaryDecoder(request.body.inputStream(), null) @@ -192,15 +192,15 @@ class RestKafkaSenderTest { val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } webServer.enqueueJson("{\"offset\": 100}") topicSender.send(AvroRecordData(topic, key, listOf(value, value))) verify(retriever, times(1)) - .metadata("test", false, keySchema) + .getByVersion("test", false, -1) verify(retriever, times(1)) - .metadata("test", true, valueSchema) + .getByVersion("test", true, -1) val request = webServer.takeRequest() assertEquals("/topics/test", request.path) val bodyString = request.body.readString(StandardCharsets.UTF_8) @@ -293,8 +293,8 @@ class RestKafkaSenderTest { val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema) val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema) retriever.stub { - onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata) - onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata) + onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata) + onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata) } topicSender.send(key, value)