-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Resolve Issues in RestKafkaSender and SchemaRetriever #180
base: dev
Are you sure you want to change the base?
Conversation
|
||
val recordDataTypeInfo: TypeInfo = TypeInfo( | ||
type = RecordData::class, | ||
kotlinType = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not define the ktype as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I attempted to define kotlinType using the following approaches:
typeOf<RecordData<K, V>>()
typeOf<RecordData<*, *>>()
typeOf<RecordData<Any, Any>>()
Here are the results:
KafkaSenderTrace: Failed to set reflection work:
java.lang.IllegalArgumentException: Class declares 0 type parameters, but 2 were provided.
at kotlin.reflect.full.KClassifiers.createType(KClassifiers.kt:53)
When trying:
try {
val type = typeOf<RecordData<*, *>>()
val reifiedType = type.javaType
typeInfo = TypeInfo(RecordData::class, reifiedType, type)
logger.trace("KafkaSenderTrace: Successfully set up the reflection work")
} catch (ex: Exception) {
logger.warn("KafkaSenderTrace: Failed to set reflection work: ", ex)
}
I encountered similar errors with:
val type = typeOf<RecordData<Any, Any>>()
val reifiedType = type.javaType
typeInfo = TypeInfo(RecordData::class, reifiedType, type)
I am uncertain if this approach is optimal. I would appreciate any suggestions for improvements or more efficient alternatives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also from the TypeInfo class, kotlinType is null by default, unlike other properties:
public data class TypeInfo(
public val type: KClass<*>,
public val reifiedType: Type,
public val kotlinType: KType? = null
)
topic = value.topic.name, | ||
ofValue = true, | ||
schema = value.topic.valueSchema, | ||
version = -1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The POST issue is likely due to a bug here- https://github.com/RADAR-base/radar-commons/blame/4d2044f5a1ec7f7b6b8f4fa1d6433144e7501bac/radar-commons/src/main/java/org/radarbase/producer/schema/SchemaRestClient.kt#L122, this should be calling schemaGet
, not schemaPost
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we update it to a GET request, we should fetch the schemas by version or ID. We don't have either of these at the moment. Should I proceed with "latest", which will fetch the latest version?
This PR addresses two key issues:
1.
TypeInfo
Errors inRestKafkaSender
:The current approach using reflection and generics to set TypeInfo results in errors when using with
radar-commons-android
. This has been replaced with an implementation based on ParameterizedType, which is error-free.2. SchemaRetriever Metadata Behavior:
The
ContentConverters
implementation utilizesSchemaRetriever::metadata
, which sends a POST request to the /subject/topic_name endpoint, resulting in a 503 Service Unavailable error.This behavior was not observed in previous versions, previously, a GET request was sent to the
/subject/topic_name-key|value
endpoint if that fails, then the POST is used.These changes have been tested and work seamlessly with RADAR-pRMT. Suggestions for alternative solutions are welcome.