Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Resolve Issues in RestKafkaSender and SchemaRetriever #180

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ class AvroContentConverter(

return coroutineScope {
val keySchema = async {
schemaRetriever.metadata(
schemaRetriever.getByVersion(
topic = value.topic.name,
ofValue = false,
schema = value.topic.keySchema,
version = -1,
)
}
val valueSchema = async {
schemaRetriever.metadata(
schemaRetriever.getByVersion(
topic = value.topic.name,
ofValue = true,
schema = value.topic.valueSchema,
version = -1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@this-Aditya this-Aditya Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we update it to a GET request, we should fetch the schemas by version or ID. We don't have either of these at the moment. Should I proceed with "latest", which will fetch the latest version?

)
}
val maker = if (binary) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.radarbase.producer.rest

import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type

class RadarParameterizedType(
private val raw: Class<*>,
private val args: Array<Type>,
private val owner: Type? = null,
) : ParameterizedType {
override fun getRawType(): Type = raw
override fun getActualTypeArguments(): Array<Type> = args
override fun getOwnerType(): Type? = owner

override fun toString(): String {
return buildString {
append(raw.typeName)
if (args.isNotEmpty()) {
append('<')
append(args.joinToString(", ") { it.typeName })
append('>')
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ import org.radarbase.topic.AvroTopic
import org.radarbase.util.RadarProducerDsl
import org.slf4j.LoggerFactory
import java.io.IOException
import kotlin.reflect.javaType
import kotlin.reflect.typeOf
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

Expand Down Expand Up @@ -143,6 +141,16 @@ class RestKafkaSender(config: Config) : KafkaSender {
inner class RestKafkaTopicSender<K : Any, V : Any>(
override val topic: AvroTopic<K, V>,
) : KafkaTopicSender<K, V> {

val recordDataTypeInfo: TypeInfo = TypeInfo(
type = RecordData::class,
kotlinType = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not define the ktype as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attempted to define kotlinType using the following approaches:

typeOf<RecordData<K, V>>()
typeOf<RecordData<*, *>>()
typeOf<RecordData<Any, Any>>()

Here are the results:

KafkaSenderTrace: Failed to set reflection work: 
java.lang.IllegalArgumentException: Class declares 0 type parameters, but 2 were provided.
    at kotlin.reflect.full.KClassifiers.createType(KClassifiers.kt:53)

When trying:

try {
    val type = typeOf<RecordData<*, *>>()
    val reifiedType = type.javaType
    typeInfo = TypeInfo(RecordData::class, reifiedType, type)
    logger.trace("KafkaSenderTrace: Successfully set up the reflection work")
} catch (ex: Exception) {
    logger.warn("KafkaSenderTrace: Failed to set reflection work: ", ex)
}

I encountered similar errors with:

val type = typeOf<RecordData<Any, Any>>()
val reifiedType = type.javaType
typeInfo = TypeInfo(RecordData::class, reifiedType, type)

I am uncertain if this approach is optimal. I would appreciate any suggestions for improvements or more efficient alternatives.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also from the TypeInfo class, kotlinType is null by default, unlike other properties:

public data class TypeInfo(
    public val type: KClass<*>,
    public val reifiedType: Type,
    public val kotlinType: KType? = null
)

reifiedType = RadarParameterizedType(
raw = RecordData::class.java,
args = arrayOf(topic.keyClass, topic.valueClass),
),
)

override suspend fun send(records: RecordData<K, V>) = withContext(scope.coroutineContext) {
try {
val response: HttpResponse = restClient.post {
Expand Down Expand Up @@ -275,22 +283,13 @@ class RestKafkaSender(config: Config) : KafkaSender {

companion object {
private val logger = LoggerFactory.getLogger(RestKafkaSender::class.java)
private val recordDataTypeInfo: TypeInfo

val DEFAULT_TIMEOUT: Duration = 20.seconds
val KAFKA_REST_BINARY_ENCODING = ContentType("application", "vnd.radarbase.avro.v1+binary")
val KAFKA_REST_JSON_ENCODING = ContentType("application", "vnd.kafka.avro.v2+json")
val KAFKA_REST_ACCEPT = ContentType("application", "vnd.kafka.v2+json")
const val GZIP_CONTENT_ENCODING = "gzip"

init {
val kType = typeOf<RecordData<Any, Any>>()

@OptIn(ExperimentalStdlibApi::class)
val reifiedType = kType.javaType
recordDataTypeInfo = TypeInfo(RecordData::class, reifiedType, kType)
}

fun restKafkaSender(builder: Config.() -> Unit): RestKafkaSender =
RestKafkaSender(Config().apply(builder))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ class RestKafkaSenderTest {
val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema)
val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema)
retriever.stub {
onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata)
onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata)
onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata)
onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata)
}
webServer.enqueueJson("{\"offset\": 100}")
topicSender.send(key, value)
verify(retriever, times(1))
.metadata("test", false, keySchema)
.getByVersion("test", false, -1)
verify(retriever, times(1))
.metadata("test", true, valueSchema)
.getByVersion("test", true, -1)
val request = webServer.takeRequest()
assertEquals("/topics/test", request.path)
val body = READER.readTree(request.body.inputStream())
Expand Down Expand Up @@ -145,15 +145,15 @@ class RestKafkaSenderTest {
val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema)
val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema)
retriever.stub {
onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata)
onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata)
onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata)
onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata)
}
webServer.enqueueJson("{\"offset\": 100}")
topicSender.send(key, value)
verify(retriever, times(1))
.metadata("test", false, keySchema)
.getByVersion("test", false, -1)
verify(retriever, times(1))
.metadata("test", true, valueSchema)
.getByVersion("test", true, -1)
val request = webServer.takeRequest()
assertEquals("/topics/test", request.path)
var decoder = DecoderFactory.get().directBinaryDecoder(request.body.inputStream(), null)
Expand Down Expand Up @@ -192,15 +192,15 @@ class RestKafkaSenderTest {
val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema)

retriever.stub {
onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata)
onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata)
onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata)
onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata)
}
webServer.enqueueJson("{\"offset\": 100}")
topicSender.send(AvroRecordData(topic, key, listOf(value, value)))
verify(retriever, times(1))
.metadata("test", false, keySchema)
.getByVersion("test", false, -1)
verify(retriever, times(1))
.metadata("test", true, valueSchema)
.getByVersion("test", true, -1)
val request = webServer.takeRequest()
assertEquals("/topics/test", request.path)
val bodyString = request.body.readString(StandardCharsets.UTF_8)
Expand Down Expand Up @@ -293,8 +293,8 @@ class RestKafkaSenderTest {
val keySchemaMetadata = ParsedSchemaMetadata(10, 2, keySchema)
val valueSchemaMetadata = ParsedSchemaMetadata(10, 2, valueSchema)
retriever.stub {
onBlocking { metadata("test", false, keySchema) }.doReturn(keySchemaMetadata)
onBlocking { metadata("test", true, valueSchema) }.doReturn(valueSchemaMetadata)
onBlocking { getByVersion("test", false, -1) }.doReturn(keySchemaMetadata)
onBlocking { getByVersion("test", true, -1) }.doReturn(valueSchemaMetadata)
}
topicSender.send(key, value)

Expand Down
Loading