Skip to content

Commit

Permalink
feat: fetch schema from schema-registry by schema string
Browse files Browse the repository at this point in the history
* enable users to lookup the schema id for a given schema string
  • Loading branch information
rroesch1 committed Oct 28, 2024
1 parent ea13e01 commit f9dd1b9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 21 deletions.
6 changes: 4 additions & 2 deletions api-docs/docs/classes/SchemaRegistry.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ Deserializes the given data and schema into its original form.

**getSchema**(`schema`): [`Schema`](../interfaces/Schema.md)

**`method`**
Get a schema from Schema Registry by version and subject.
**`method`** Get a schema from Schema Registry
* if only `schema.subject` is set: returns the latest schema for the given subject
* if `schema.subject` and `schema.schema` is set: returns the schema for the given schema string
* if `schema.subject` and `schema.version` is set: returns the schema for the given version

#### Parameters

Expand Down
3 changes: 2 additions & 1 deletion api-docs/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,8 @@ export class SchemaRegistry {
constructor(schemaRegistryConfig: SchemaRegistryConfig);
/**
* @method
* Get a schema from Schema Registry by version and subject.
* Get latest schema from Schema Registry by subject.
* Alternatively a specific schema version can be fetched by either specifing schema.version of schema.schema
* @param {Schema} schema - Schema configuration.
* @returns {Schema} - Schema.
*/
Expand Down
30 changes: 21 additions & 9 deletions schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,25 +274,37 @@ func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.Sch
return srClient
}

// getSchema returns the schema for the given subject and schema ID and version.
// getSchema returns either the latest schema for the given subject or a specific version (if given) or the schema for the given schema string (if given)
func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
// If EnableCache is set, check if the schema is in the cache.
if schema.EnableCaching {
if schema, ok := k.schemaCache[schema.Subject]; ok {
return schema
if cachedSchema, ok := k.schemaCache[schema.Subject]; ok {
// the cache should contain the latest version of a schema for the given subject
// we must not return the cached schema if it does not match the requested version or schema string
if (schema.Version == 0 && schema.Schema != "") || schema.Version == cachedSchema.Version || schema.Schema == cachedSchema.Schema {
return cachedSchema;
}
}
}

runtime := k.vu.Runtime()
// The client always caches the schema.
var schemaInfo *srclient.Schema
var err error
// Default version of the schema is the latest version.
if schema.Version == 0 {
var isLatestSchema = false;
if schema.Schema != "" { // fetch schema for given schema string
var schemaType srclient.SchemaType
if schema.SchemaType != nil {
schemaType = *schema.SchemaType
} else {
schemaType = srclient.Avro
}
schemaInfo, err = client.LookupSchema(schema.Subject, schema.Schema, schemaType, schema.References...)
} else if schema.Version == 0 { // fetch schema by version
schemaInfo, err = client.GetLatestSchema(schema.Subject)
} else {
schemaInfo, err = client.GetSchemaByVersion(
schema.Subject, schema.Version)
} else { // fetch latest schema for given subject
schemaInfo, err = client.GetSchemaByVersion(schema.Subject, schema.Version)
isLatestSchema = true;
}

if err == nil {
Expand All @@ -306,7 +318,7 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema)
Subject: schema.Subject,
}
// If the Cache is set, cache the schema.
if wrappedSchema.EnableCaching {
if wrappedSchema.EnableCaching && isLatestSchema {
k.schemaCache[wrappedSchema.Subject] = wrappedSchema
}
return wrappedSchema
Expand Down
64 changes: 55 additions & 9 deletions schema_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func TestGetSubjectNameCanUseRecordNameStrategyWithNamespace(t *testing.T) {
// TestSchemaRegistryClientClass tests the schema registry client class.
func TestSchemaRegistryClientClass(t *testing.T) {
test := getTestModuleInstance(t)
avroSchema := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}`
avroSchema1 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}]}`
avroSchema2 := `{"type":"record","name":"Schema","namespace":"com.example.person","fields":[{"name":"field","type":"string"}, {"name":"field2","type":"int"}]}`

require.NoError(t, test.moveToVUCode())
assert.NotPanics(t, func() {
Expand All @@ -287,21 +288,36 @@ func TestSchemaRegistryClientClass(t *testing.T) {
})
assert.NotNil(t, client)

// Create a schema and send it to the registry.
// Create first schema and send it to the registry.
createSchema := client.Get("createSchema").Export().(func(sobek.FunctionCall) sobek.Value)
newSchema := createSchema(sobek.FunctionCall{
newSchema1 := createSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema,
"schema": avroSchema1,
"schemaType": srclient.Avro,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", newSchema.Subject)
assert.Equal(t, 0, newSchema.Version)
assert.Equal(t, "test-subject", newSchema1.Subject)
assert.Equal(t, 1, newSchema1.Version)

// Create second schema and send it to the registry.
newSchema2 := createSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema2,
"schemaType": srclient.Avro,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", newSchema2.Subject)
assert.Equal(t, 2, newSchema2.Version)

// Get the latest version of the schema from the registry.
getSchema := client.Get("getSchema").Export().(func(sobek.FunctionCall) sobek.Value)
Expand All @@ -316,16 +332,46 @@ func TestSchemaRegistryClientClass(t *testing.T) {
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", currentSchema.Subject)
assert.Equal(t, 1, currentSchema.Version)
assert.Equal(t, avroSchema, currentSchema.Schema)
assert.Equal(t, 2, currentSchema.Version)
assert.Equal(t, avroSchema2, currentSchema.Schema)

// get schema by schema string
schemaByString := getSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"schema": avroSchema1,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", schemaByString.Subject)
assert.Equal(t, 1, schemaByString.Version)
assert.Equal(t, avroSchema1, schemaByString.Schema)

// get schema by version
schemaByVersion := getSchema(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"subject": "test-subject",
"version": 1,
},
),
},
}).Export().(*Schema)
assert.Equal(t, "test-subject", schemaByVersion.Subject)
assert.Equal(t, 1, schemaByVersion.Version)
assert.Equal(t, avroSchema1, schemaByVersion.Schema)

// Get the subject name based on the given subject name config.
getSubjectName := client.Get("getSubjectName").Export().(func(sobek.FunctionCall) sobek.Value)
subjectName := getSubjectName(sobek.FunctionCall{
Arguments: []sobek.Value{
test.module.vu.Runtime().ToValue(
map[string]interface{}{
"schema": avroSchema,
"schema": avroSchema1,
"topic": "test-topic",
"subjectNameStrategy": TopicRecordNameStrategy,
"element": Value,
Expand Down
6 changes: 6 additions & 0 deletions scripts/test_avro_with_schema_registry.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ const valueSchemaObject = schemaRegistry.createSchema({
schemaType: SCHEMA_TYPE_AVRO,
});

// if you want use a schema which has already been created you can fetch it by the schema string
const valueSchemaObjectExisting = schemaRegistry.get({
subject: valueSubjectName,
schema: valueSchema,
});

export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
Expand Down

0 comments on commit f9dd1b9

Please sign in to comment.