Skip to content

Commit

Permalink
update karapace schema registry message formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
vordimous committed Jun 3, 2024
1 parent 4086ca4 commit 369e109
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ApicurioEventContext
private final int staleArtifactID;
private final int unretrievableArtifactId;
private final int retrievableArtifactSubjectVersionId;
private final int retrievableArtifactId;
private final int retrievedArtifactId;
private final MessageConsumer eventWriter;
private final Clock clock;

Expand All @@ -57,8 +57,8 @@ public ApicurioEventContext(
this.staleArtifactID = context.supplyEventId("catalog.apicurio.unretrievable.artifact.subject.version.stale.artifact");
this.unretrievableArtifactId = context.supplyEventId("catalog.apicurio.unretrievable.artifact.id");
this.retrievableArtifactSubjectVersionId = context.supplyEventId(
"catalog.apicurio.retrievable.artifact.subject.version");
this.retrievableArtifactId = context.supplyEventId("catalog.apicurio.retrievable.artifact.id");
"catalog.apicurio.retrieved.artifact.subject.version");
this.retrievedArtifactId = context.supplyEventId("catalog.apicurio.retrieved.artifact.id");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand Down Expand Up @@ -142,7 +142,7 @@ public void onRetrievableArtifactSubjectVersion(
{
ApicurioEventExFW extension = apicurioEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableArtifactSubjectVersion(e -> e
.retrievedArtifactSubjectVersion(e -> e
.typeId(RETRIEVED_ARTIFACT_SUBJECT_VERSION.value())
.subject(subject)
.version(version)
Expand All @@ -165,14 +165,14 @@ public void onRetrievableArtifactId(
{
ApicurioEventExFW extension = apicurioEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableArtifactId(e -> e
.retrievedArtifactId(e -> e
.typeId(RETRIEVED_ARTIFACT_ID.value())
.artifactId(artifactId)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(retrievableArtifactId)
.id(retrievedArtifactId)
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.StringFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioEventExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievableArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievableArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievedArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievedArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactSubjectVersionStaleArtifactExFW;
Expand Down Expand Up @@ -78,7 +78,7 @@ public String format(
}
case RETRIEVED_ARTIFACT_SUBJECT_VERSION:
{
ApicurioRetrievableArtifactSubjectVersionExFW ex = extension.retrievableArtifactSubjectVersion();
ApicurioRetrievedArtifactSubjectVersionExFW ex = extension.retrievedArtifactSubjectVersion();
result = String.format("Successfully fetched artifact for subject %s with version %s.",
asString(ex.subject()),
asString(ex.version())
Expand All @@ -87,7 +87,7 @@ public String format(
}
case RETRIEVED_ARTIFACT_ID:
{
ApicurioRetrievableArtifactIdExFW ex = extension.retrievableArtifactId();
ApicurioRetrievedArtifactIdExFW ex = extension.retrievedArtifactId();
result = String.format("Successfully fetched artifact id %d.", ex.artifactId());
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class KarapaceEventContext
private final int staleSchemaID;
private final int unretrievableSchemaId;
private final int retrievableSchemaSubjectVersionId;
private final int retrievableSchemaId;
private final int retrievedSchemaId;
private final MessageConsumer eventWriter;
private final Clock clock;

Expand All @@ -55,8 +55,8 @@ public KarapaceEventContext(
this.unretrievableSchemaSubjectVersionId = context.supplyEventId("catalog.karapace.unretrievable.schema.subject.version");
this.staleSchemaID = context.supplyEventId("catalog.karapace.unretrievable.schema.subject.version.stale.schema");
this.unretrievableSchemaId = context.supplyEventId("catalog.karapace.unretrievable.schema.id");
this.retrievableSchemaSubjectVersionId = context.supplyEventId("catalog.karapace.retrievable.schema.subject.version");
this.retrievableSchemaId = context.supplyEventId("catalog.karapace.retrievable.schema.id");
this.retrievableSchemaSubjectVersionId = context.supplyEventId("catalog.karapace.retrieved.schema.subject.version");
this.retrievedSchemaId = context.supplyEventId("catalog.karapace.retrieved.schema.id");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand Down Expand Up @@ -140,7 +140,7 @@ public void onRetrievableSchemaSubjectVersion(
{
KarapaceEventExFW extension = karapaceEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableSchemaSubjectVersion(e -> e
.retrievedSchemaSubjectVersion(e -> e
.typeId(RETRIEVED_SCHEMA_SUBJECT_VERSION.value())
.subject(subject)
.version(version)
Expand All @@ -163,14 +163,14 @@ public void onRetrievableSchemaId(
{
KarapaceEventExFW extension = karapaceEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableSchemaId(e -> e
.retrievedSchemaId(e -> e
.typeId(RETRIEVED_SCHEMA_ID.value())
.schemaId(schemaId)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(retrievableSchemaId)
.id(retrievedSchemaId)
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,44 @@ public String format(
case UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION:
{
KarapaceUnretrievableSchemaSubjectVersionExFW ex = extension.unretrievableSchemaSubjectVersion();
result = String.format("%s %s", asString(ex.subject()), asString(ex.version()));
result = String.format(
"Unable to fetch schema for subject %s with version %s.",
asString(ex.subject()),
asString(ex.version())
);
break;
}
case UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION_STALE_SCHEMA:
{
KarapaceUnretrievableSchemaSubjectVersionStaleSchemaExFW ex = extension
.unretrievableSchemaSubjectVersionStaleSchema();
result = String.format("%s %s, using stale schema: %d", asString(ex.subject()),
asString(ex.version()), ex.schemaId());
result = String.format(
"Unable to fetch schema for subject %s with version %s; using stale schema with id %d.",
asString(ex.subject()),
asString(ex.version()),
ex.schemaId()
);
break;
}
case UNRETRIEVABLE_SCHEMA_ID:
{
KarapaceUnretrievableSchemaIdExFW ex = extension.unretrievableSchemaId();
result = String.format("%d", ex.schemaId());
result = String.format("Unable to fetch schema id %d.", ex.schemaId());
break;
}
case RETRIEVED_SCHEMA_SUBJECT_VERSION:
{
KarapaceRetrievableSchemaSubjectVersionExFW ex = extension.retrievableSchemaSubjectVersion();
result = String.format("%s %s", asString(ex.subject()), asString(ex.version()));
KarapaceRetrievableSchemaSubjectVersionExFW ex = extension.retrievedSchemaSubjectVersion();
result = String.format("Successfully fetched schema for subject %s with version %s.",
asString(ex.subject()),
asString(ex.version())
);
break;
}
case RETRIEVED_SCHEMA_ID:
{
KarapaceRetrievableSchemaIdExFW ex = extension.retrievableSchemaId();
result = String.format("%d", ex.schemaId());
KarapaceRetrievableSchemaIdExFW ex = extension.retrievedSchemaId();
result = String.format("Successfully fetched schema id %d.", ex.schemaId());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ scope apicurio
int32 artifactId;
}

struct ApicurioRetrievableArtifactSubjectVersionEx extends core::stream::Extension
struct ApicurioRetrievedArtifactSubjectVersionEx extends core::stream::Extension
{
string8 subject;
string8 version;
}

struct ApicurioRetrievableArtifactIdEx extends core::stream::Extension
struct ApicurioRetrievedArtifactIdEx extends core::stream::Extension
{
int32 artifactId;
}
Expand All @@ -59,8 +59,8 @@ scope apicurio
case UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION: ApicurioUnretrievableArtifactSubjectVersionEx unretrievableArtifactSubjectVersion;
case UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION_STALE_ARTIFACT: ApicurioUnretrievableArtifactSubjectVersionStaleArtifactEx unretrievableArtifactSubjectVersionStaleArtifact;
case UNRETRIEVABLE_ARTIFACT_ID: ApicurioUnretrievableArtifactIdEx unretrievableArtifactId;
case RETRIEVED_ARTIFACT_SUBJECT_VERSION: ApicurioRetrievableArtifactSubjectVersionEx retrievableArtifactSubjectVersion;
case RETRIEVED_ARTIFACT_ID: ApicurioRetrievableArtifactIdEx retrievableArtifactId;
case RETRIEVED_ARTIFACT_SUBJECT_VERSION: ApicurioRetrievedArtifactSubjectVersionEx retrievedArtifactSubjectVersion;
case RETRIEVED_ARTIFACT_ID: ApicurioRetrievedArtifactIdEx retrievedArtifactId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ telemetry:
name: CATALOG_APICURIO_UNRETRIEVABLE_ARTIFACT_ID
message: Unable to fetch artifact id 1.
- qname: test.catalog0
id: catalog.apicurio.retrievable.artifact.id
name: CATALOG_APICURIO_RETRIEVABLE_ARTIFACT_ID
id: catalog.apicurio.retrieved.artifact.id
name: CATALOG_APICURIO_RETRIEVED_ARTIFACT_ID
message: Successfully fetched artifact id 1.
catalogs:
catalog0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ telemetry:
name: CATALOG_APICURIO_UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION_STALE_ARTIFACT
message: Unable to fetch artifact for subject artifactId with version latest; using stale artifact with id 1.
- qname: test.catalog0
id: catalog.apicurio.retrievable.artifact.subject.version
name: CATALOG_APICURIO_RETRIEVABLE_ARTIFACT_SUBJECT_VERSION
id: catalog.apicurio.retrieved.artifact.subject.version
name: CATALOG_APICURIO_RETRIEVED_ARTIFACT_SUBJECT_VERSION
message: Successfully fetched artifact for subject artifactId with version latest.
catalogs:
catalog0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ scope karapace
int32 schemaId;
}

struct KarapaceRetrievableSchemaSubjectVersionEx extends core::stream::Extension
struct KarapaceRetrievedSchemaSubjectVersionEx extends core::stream::Extension
{
string8 subject;
string8 version;
}

struct KarapaceRetrievableSchemaIdEx extends core::stream::Extension
struct KarapaceRetrievedSchemaIdEx extends core::stream::Extension
{
int32 schemaId;
}
Expand All @@ -59,8 +59,8 @@ scope karapace
case UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION: KarapaceUnretrievableSchemaSubjectVersionEx unretrievableSchemaSubjectVersion;
case UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION_STALE_SCHEMA: KarapaceUnretrievableSchemaSubjectVersionStaleSchemaEx unretrievableSchemaSubjectVersionStaleSchema;
case UNRETRIEVABLE_SCHEMA_ID: KarapaceUnretrievableSchemaIdEx unretrievableSchemaId;
case RETRIEVED_SCHEMA_SUBJECT_VERSION: KarapaceRetrievableSchemaSubjectVersionEx retrievableSchemaSubjectVersion;
case RETRIEVED_SCHEMA_ID: KarapaceRetrievableSchemaIdEx retrievableSchemaId;
case RETRIEVED_SCHEMA_SUBJECT_VERSION: KarapaceRetrievedSchemaSubjectVersionEx retrievedSchemaSubjectVersion;
case RETRIEVED_SCHEMA_ID: KarapaceRetrievedSchemaIdEx retrievedSchemaId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ telemetry:
- qname: test.catalog0
id: catalog.karapace.unretrievable.schema.id
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA_ID
message: 9
message: Unable to fetch schema id 9.
- qname: test.catalog0
id: catalog.karapace.retrievable.schema.id
id: catalog.karapace.retrieved.schema.id
name: CATALOG_KARAPACE_RETRIEVED_SCHEMA_ID
message: 9
message: Successfully fetched schema id 9.
catalogs:
catalog0:
type: karapace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ telemetry:
events:
- qname: test.catalog0
id: catalog.karapace.unretrievable.schema.subject.version
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA
message: items-snapshots-value latest
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION
message: Unable to fetch schema for subject items-snapshots-value with version latest.
- qname: test.catalog0
id: catalog.karapace.unretrievable.schema.subject.version.stale.schema
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA
message: items-snapshots-value latest, USING_STALE_SCHEMA 9
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION_STALE_SCHEMA
message: Unable to fetch schema for subject items-snapshots-value with version latest; using stale schema with id 9.
- qname: test.catalog0
id: catalog.karapace.retrievable.schema.subject.version
id: catalog.karapace.retrieved.schema.subject.version
name: CATALOG_KARAPACE_RETRIEVED_SCHEMA_SUBJECT_VERSION
message: items-snapshots-value latest
message: Successfully fetched schema for subject items-snapshots-value with version latest.
catalogs:
catalog0:
type: karapace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ telemetry:
- qname: test.catalog0
id: catalog.karapace.unretrievable.schema.id
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA_ID
message: 1
message: Unable to fetch schema id 1.
catalogs:
catalog0:
type: karapace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ telemetry:
events:
- qname: test.catalog0
id: catalog.karapace.unretrievable.schema.subject.version
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA
message: items-snapshots-value latest
name: CATALOG_KARAPACE_UNRETRIEVABLE_SCHEMA_SUBJECT_VERSION
message: Unable to fetch schema for subject items-snapshots-value with version latest.
catalogs:
catalog0:
type: karapace
Expand Down

0 comments on commit 369e109

Please sign in to comment.