Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace static event name with dynamic based on event id #1029

Merged
merged 29 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
13d20ae
feat: replace static event name with dynamic based on event id
vordimous May 15, 2024
8c44723
update event.name test value
vordimous May 15, 2024
db6c249
Update incubator/catalog-filesystem/src/main/java/io/aklivity/zilla/r…
vordimous May 15, 2024
a9b16cd
Update runtime/exporter-stdout/src/main/java/io/aklivity/zilla/runtim…
vordimous May 15, 2024
db918eb
Update runtime/exporter-otlp/src/main/java/io/aklivity/zilla/runtime/…
vordimous May 15, 2024
5b3b00d
Build event name in Engine context
vordimous May 15, 2024
35f3254
fix build errors
vordimous May 16, 2024
5608829
update tests
vordimous May 16, 2024
34deb18
fix http binding event test
vordimous May 16, 2024
981fed4
move event names out of format
vordimous May 20, 2024
7bff9fe
correct code to pass tests
vordimous May 30, 2024
1ba8a8e
refactor the compact session topic event
vordimous May 30, 2024
30430b8
move the static reason into the event formatter
vordimous Jun 3, 2024
f3ccc93
update model validation event messages
vordimous Jun 3, 2024
c45effe
update jwt guard event message
vordimous Jun 3, 2024
74af1f5
update catalog filesystem event message
vordimous Jun 3, 2024
d11f441
Add generic tls error message
vordimous Jun 3, 2024
0d122ac
update tcp event message
vordimous Jun 3, 2024
7bdd8e3
add context to mqtt client connected message
vordimous Jun 3, 2024
a23579b
update kafka and http message formatting
vordimous Jun 3, 2024
4086ca4
update apicurio artifact fetch messages
vordimous Jun 3, 2024
369e109
update karapace schema registry message formatting
vordimous Jun 3, 2024
6cd31b3
fix test
vordimous Jun 3, 2024
656b19a
Update runtime/binding-tls/src/main/java/io/aklivity/zilla/runtime/bi…
vordimous Jun 3, 2024
187bd0e
Improve jwt message with added reason
vordimous Jun 3, 2024
140f628
rename
vordimous Jun 3, 2024
6377a8a
improve mqtt session topic compated event message
vordimous Jun 3, 2024
0b38f7a
update code coverage for guard-jwt project to 0.97
vordimous Jun 3, 2024
5d6e603
Merge branch 'develop' into feature/1013-use-full-event-id-and-the-ev…
vordimous Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ telemetry:
events:
- qname: test.catalog0
id: catalog.filesystem.file.not.found
message: FILE_NOT_FOUND asyncapi/kafka.yaml
name: CATALOG_FILESYSTEM_FILE_NOT_FOUND
message: Unable to find file at (asyncapi/kafka.yaml) on the host filesystem.
catalogs:
catalog0:
type: filesystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@

public final class FilesystemEventFormatter implements EventFormatterSpi
{
private static final String FILE_NOT_FOUND = "FILE_NOT_FOUND %s";

private final EventFW eventRO = new EventFW();
private final FilesystemEventExFW schemaRegistryEventExRO = new FilesystemEventExFW();
private final FilesystemEventExFW filesystemEventExRO = new FilesystemEventExFW();

FilesystemEventFormatter(
Configuration config)
Expand All @@ -41,15 +39,15 @@ public String format(
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
final FilesystemEventExFW extension = schemaRegistryEventExRO
final FilesystemEventExFW extension = filesystemEventExRO
.wrap(event.extension().buffer(), event.extension().offset(), event.extension().limit());
String result = null;
switch (extension.kind())
{
case FILE_NOT_FOUND:
{
FilesystemFileNotFoundExFW ex = extension.fileNotFound();
result = String.format(FILE_NOT_FOUND, asString(ex.location()));
result = String.format("Unable to find file at (%s) on the host filesystem.", asString(ex.location()));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ public int supplyEventId(
return 0;
}

@Override
public String supplyEventName(
int eventId)
{
return "";
}

@Override
public BindingHandler streamFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

public final class HttpEventFormatter implements EventFormatterSpi
{
private static final String REQUEST_ACCEPTED_FORMAT = "REQUEST_ACCEPTED %s %s %s %s %s";

private final EventFW eventRO = new EventFW();
private final HttpEventExFW httpEventExRO = new HttpEventExFW();

Expand All @@ -50,7 +48,7 @@ public String format(
case REQUEST_ACCEPTED:
{
HttpRequestAcceptedExFW ex = extension.requestAccepted();
result = String.format(REQUEST_ACCEPTED_FORMAT, identity(ex.identity()), asString(ex.scheme()), asString(ex.method()),
result = String.format("%s %s %s://%s%s", identity(ex.identity()), asString(ex.method()), asString(ex.scheme()),
asString(ex.authority()), asString(ex.path()));
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@

public final class KafkaEventFormatter implements EventFormatterSpi
{
private static final String AUTHORIZATION_FAILED_FORMAT = "AUTHORIZATION_FAILED %s";
private static final String API_VERSION_REJECTED_FORMAT = "API_VERSION_REJECTED %d %d";

private final EventFW eventRO = new EventFW();
private final KafkaEventExFW kafkaEventExRO = new KafkaEventExFW();

Expand All @@ -52,22 +49,22 @@ public String format(
case AUTHORIZATION_FAILED:
{
KafkaAuthorizationFailedExFW ex = extension.authorizationFailed();
result = String.format(AUTHORIZATION_FAILED_FORMAT, identity(ex.identity()));
result = String.format("Unable to authenticate client with identity (%s).", asString(ex.identity()));
break;
}
case API_VERSION_REJECTED:
{
final KafkaApiVersionRejectedExFW ex = extension.apiVersionRejected();
result = String.format(API_VERSION_REJECTED_FORMAT, ex.apiKey(), ex.apiVersion());
result = String.format("%d %d", ex.apiKey(), ex.apiVersion());
}
}
return result;
}

private static String identity(
StringFW identity)
private static String asString(
StringFW stringFW)
{
int length = identity.length();
return length <= 0 ? "-" : identity.asString();
String s = stringFW.asString();
return s == null ? "" : s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public MqttKafkaEventContext(
public void onMqttConnectionReset(
long traceId,
long bindingId,
String16FW reason)
String16FW topic)
{
MqttKafkaEventExFW extension = mqttKafkaEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.nonCompactSessionsTopic(e -> e
.typeId(NON_COMPACT_SESSIONS_TOPIC.value())
.reason(reason))
.topic(topic))
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaEventExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaResetMqttConnectionExFW;
import io.aklivity.zilla.runtime.binding.mqtt.kafka.internal.types.event.MqttKafkaNonCompactSessionsTopicExFW;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.event.EventFormatterSpi;

public final class MqttKafkaEventFormatter implements EventFormatterSpi
{
private static final String NON_COMPACT_SESSIONS_TOPIC_FORMAT = "NON COMPACT SESSIONS TOPIC - %s";

private final EventFW eventRO = new EventFW();
private final MqttKafkaEventExFW mqttKafkaEventExRO = new MqttKafkaEventExFW();

Expand All @@ -48,8 +46,11 @@ public String format(
{
case NON_COMPACT_SESSIONS_TOPIC:
{
MqttKafkaResetMqttConnectionExFW ex = extension.nonCompactSessionsTopic();
result = String.format(NON_COMPACT_SESSIONS_TOPIC_FORMAT, asString(ex.reason()));
MqttKafkaNonCompactSessionsTopicExFW ex = extension.nonCompactSessionsTopic();
result = String.format(
"The sessions topic (%s) is not log compacted. Update the cleanup policy to enable log compaction.",
asString(ex.topic())
);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public class MqttKafkaSessionFactory implements MqttKafkaStreamFactory
public static final int MQTT_NOT_AUTHORIZED = 0x87;
public static final int MQTT_IMPLEMENTATION_SPECIFIC_ERROR = 0x83;
public static final String MQTT_INVALID_SESSION_TIMEOUT_REASON = "Invalid session expiry interval";
public static final String16FW MQTT_NON_COMPACT_SESSIONS_TOPIC = new String16FW("Sessions Kafka topic in non-compacted");
private static final KafkaConfigFW CONFIG_COMPACT_CLEANUP_POLICY = new KafkaConfigFW.Builder()
.wrap(new UnsafeBuffer(new byte[25]), 0, 25)
.name("cleanup.policy")
Expand Down Expand Up @@ -3469,7 +3468,7 @@ protected void onKafkaBegin(
.build();
delegate.doMqttWindow(authorization, traceId, 0, 0, 0);
delegate.doMqttReset(traceId, mqttResetEx);
events.onMqttConnectionReset(traceId, routedId, MQTT_NON_COMPACT_SESSIONS_TOPIC);
events.onMqttConnectionReset(traceId, routedId, delegate.sessionsTopic);
doKafkaWindow(traceId, authorization, 0, 0, 0, 0, 0);
doKafkaAbort(traceId, authorization);
break onKafkaBegin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

public final class MqttEventFormatter implements EventFormatterSpi
{
private static final String CLIENT_CONNECTED_FORMAT = "CLIENT_CONNECTED %s %s";

private final EventFW eventRO = new EventFW();
private final MqttEventExFW mqttEventExRO = new MqttEventExFW();

Expand All @@ -50,7 +48,10 @@ public String format(
case CLIENT_CONNECTED:
{
MqttClientConnectedExFW ex = extension.clientConnected();
result = String.format(CLIENT_CONNECTED_FORMAT, identity(ex.identity()), asString(ex.clientId()));
result = String.format("Session authorization (%s) was successful for client id (%s).",
identity(ex.identity()),
asString(ex.clientId())
);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@

public final class TcpEventFormatter implements EventFormatterSpi
{
private static final String DNS_FAILED_FORMAT = "DNS_FAILED %s";

private final EventFW eventRO = new EventFW();
private final TcpEventExFW tcpEventExRO = new TcpEventExFW();

Expand All @@ -50,7 +48,7 @@ public String format(
case DNS_FAILED:
{
final TcpDnsFailedExFW ex = extension.dnsFailed();
result = String.format(DNS_FAILED_FORMAT, asString(ex.address()));
result = String.format("Unable to resolve host dns for address (%s).", asString(ex.address()));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@

public final class TlsEventFormatter implements EventFormatterSpi
{
private static final String TLS_FAILED_FORMAT = "TLS_FAILED";
private static final String PROTOCOL_REJECTED_FORMAT = "PROTOCOL_REJECTED";
private static final String KEY_REJECTED_FORMAT = "KEY_REJECTED";
private static final String PEER_NOT_VERIFIED_FORMAT = "PEER_NOT_VERIFIED";
private static final String HANDSHAKE_FAILED_FORMAT = "HANDSHAKE_FAILED";

private final EventFW eventRO = new EventFW();
private final TlsEventExFW tlsEventExRO = new TlsEventExFW();

Expand All @@ -51,27 +45,27 @@ public String format(
{
case TLS_FAILED:
{
result = TLS_FAILED_FORMAT;
result = "There was a generic error detected by an SSL subsystem.";
break;
}
case TLS_PROTOCOL_REJECTED:
{
result = PROTOCOL_REJECTED_FORMAT;
result = "There was an error in the operation of the SSL protocol.";
break;
}
case TLS_KEY_REJECTED:
{
result = KEY_REJECTED_FORMAT;
result = "Bad SSL key due to misconfiguration of the server or client SSL certificate and private key.";
break;
}
case TLS_PEER_NOT_VERIFIED:
{
result = PEER_NOT_VERIFIED_FORMAT;
result = "The peer's identity could not be verified.";
break;
}
case TLS_HANDSHAKE_FAILED:
{
result = HANDSHAKE_FAILED_FORMAT;
result = "The client and server could not negotiate the desired level of security.";
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,13 @@ public int supplyEventId(
return 0;
}

@Override
public String supplyEventName(
int eventId)
{
return "";
}

@Override
public BindingHandler streamFactory()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ApicurioEventContext
private final int staleArtifactID;
private final int unretrievableArtifactId;
private final int retrievableArtifactSubjectVersionId;
private final int retrievableArtifactId;
private final int retrievedArtifactId;
private final MessageConsumer eventWriter;
private final Clock clock;

Expand All @@ -57,8 +57,8 @@ public ApicurioEventContext(
this.staleArtifactID = context.supplyEventId("catalog.apicurio.unretrievable.artifact.subject.version.stale.artifact");
this.unretrievableArtifactId = context.supplyEventId("catalog.apicurio.unretrievable.artifact.id");
this.retrievableArtifactSubjectVersionId = context.supplyEventId(
"catalog.apicurio.retrievable.artifact.subject.version");
this.retrievableArtifactId = context.supplyEventId("catalog.apicurio.retrievable.artifact.id");
"catalog.apicurio.retrieved.artifact.subject.version");
this.retrievedArtifactId = context.supplyEventId("catalog.apicurio.retrieved.artifact.id");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand Down Expand Up @@ -142,7 +142,7 @@ public void onRetrievableArtifactSubjectVersion(
{
ApicurioEventExFW extension = apicurioEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableArtifactSubjectVersion(e -> e
.retrievedArtifactSubjectVersion(e -> e
.typeId(RETRIEVED_ARTIFACT_SUBJECT_VERSION.value())
.subject(subject)
.version(version)
Expand All @@ -165,14 +165,14 @@ public void onRetrievableArtifactId(
{
ApicurioEventExFW extension = apicurioEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.retrievableArtifactId(e -> e
.retrievedArtifactId(e -> e
.typeId(RETRIEVED_ARTIFACT_ID.value())
.artifactId(artifactId)
)
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(retrievableArtifactId)
.id(retrievedArtifactId)
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.StringFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioEventExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievableArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievableArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievedArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioRetrievedArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactIdExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactSubjectVersionExFW;
import io.aklivity.zilla.runtime.catalog.apicurio.internal.types.event.ApicurioUnretrievableArtifactSubjectVersionStaleArtifactExFW;
Expand All @@ -29,13 +29,6 @@

public final class ApicurioEventFormatter implements EventFormatterSpi
{
private static final String UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION = "UNRETRIEVABLE_ARTIFACT %s %s";
private static final String UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION_STALE_ARTIFACT =
"UNRETRIEVABLE_ARTIFACT %s %s, USING_STALE_ARTIFACT %d";
private static final String UNRETRIEVABLE_ARTIFACT_ID = "UNRETRIEVABLE_ARTIFACT_ID %d";
private static final String RETRIEVED_ARTIFACT_SUBJECT_VERSION = "RETRIEVED_ARTIFACT_SUBJECT_VERSION %s %s";
private static final String RETRIEVED_ARTIFACT_ID = "RETRIEVED_ARTIFACT_ID %d";

private final EventFW eventRO = new EventFW();
private final ApicurioEventExFW schemaRegistryEventExRO = new ApicurioEventExFW();

Expand All @@ -58,33 +51,44 @@ public String format(
case UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION:
{
ApicurioUnretrievableArtifactSubjectVersionExFW ex = extension.unretrievableArtifactSubjectVersion();
result = String.format(UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION, asString(ex.subject()), asString(ex.version()));
result = String.format(
"Unable to fetch artifact for subject %s with version %s.",
asString(ex.subject()),
asString(ex.version())
);
break;
}
case UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION_STALE_ARTIFACT:
{
ApicurioUnretrievableArtifactSubjectVersionStaleArtifactExFW ex = extension
.unretrievableArtifactSubjectVersionStaleArtifact();
result = String.format(UNRETRIEVABLE_ARTIFACT_SUBJECT_VERSION_STALE_ARTIFACT, asString(ex.subject()),
asString(ex.version()), ex.artifactId());
result = String.format(
"Unable to fetch artifact for subject %s with version %s; using stale artifact with id %d.",
asString(ex.subject()),
asString(ex.version()),
ex.artifactId()
);
break;
}
case UNRETRIEVABLE_ARTIFACT_ID:
{
ApicurioUnretrievableArtifactIdExFW ex = extension.unretrievableArtifactId();
result = String.format(UNRETRIEVABLE_ARTIFACT_ID, ex.artifactId());
result = String.format("Unable to fetch artifact id %d.", ex.artifactId());
break;
}
case RETRIEVED_ARTIFACT_SUBJECT_VERSION:
{
ApicurioRetrievableArtifactSubjectVersionExFW ex = extension.retrievableArtifactSubjectVersion();
result = String.format(RETRIEVED_ARTIFACT_SUBJECT_VERSION, asString(ex.subject()), asString(ex.version()));
ApicurioRetrievedArtifactSubjectVersionExFW ex = extension.retrievedArtifactSubjectVersion();
result = String.format("Successfully fetched artifact for subject %s with version %s.",
asString(ex.subject()),
asString(ex.version())
);
break;
}
case RETRIEVED_ARTIFACT_ID:
{
ApicurioRetrievableArtifactIdExFW ex = extension.retrievableArtifactId();
result = String.format(RETRIEVED_ARTIFACT_ID, ex.artifactId());
ApicurioRetrievedArtifactIdExFW ex = extension.retrievedArtifactId();
result = String.format("Successfully fetched artifact id %d.", ex.artifactId());
break;
}
}
Expand Down
Loading