Skip to content

Commit

Permalink
fix: cloud events - convert kafka prefix (#2055)
Browse files Browse the repository at this point in the history
* fix: cloud events - convert kafka prefix

* using a conversion map instead
  • Loading branch information
octonato authored Mar 6, 2024
1 parent 3c38b2c commit e021e2d
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ private[testkit] class OutgoingMessagesImpl(

override def expectOneTyped[T](clazz: Class[T], timeout: time.Duration): TestKitMessage[T] = {
val msg = expectMsgInternal(destinationProbe, timeout, Some(clazz))
val metadata = new MetadataImpl(msg.getMessage.getMetadata.entries)
val metadata = MetadataImpl.of(msg.getMessage.getMetadata.entries)
val scalaPb = ScalaPbAny(typeUrlFor(metadata), msg.getMessage.payload)

val decodedMsg = if (typeUrlFor(metadata).startsWith(JsonSupport.KALIX_JSON)) {
Expand All @@ -392,7 +392,7 @@ private[testkit] class OutgoingMessagesImpl(
}

private def anyFromMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[_] = {
val metadata = new MetadataImpl(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val anyMsg = if (typeUrlFor(metadata).startsWith(JsonSupport.KALIX_JSON)) {
m.payload.toStringUtf8
} else {
Expand Down Expand Up @@ -503,7 +503,7 @@ private[testkit] class TopicImpl(

override def expectOneTyped[T](clazz: Class[T], timeout: time.Duration): TestKitMessage[T] = {
val msg = expectMsgInternal(destinationProbe, timeout, Some(clazz))
val metadata = new MetadataImpl(msg.getMessage.getMetadata.entries)
val metadata = MetadataImpl.of(msg.getMessage.getMetadata.entries)
val scalaPb = ScalaPbAny(typeUrlFor(metadata), msg.getMessage.payload)

val decodedMsg = if (typeUrlFor(metadata).startsWith(JsonSupport.KALIX_JSON)) {
Expand All @@ -519,7 +519,7 @@ private[testkit] class TopicImpl(
}

private def anyFromMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[_] = {
val metadata = new MetadataImpl(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata.defaultInstance).entries)
val anyMsg = if (typeUrlFor(metadata).startsWith(JsonSupport.KALIX_JSON)) {
m.payload.toStringUtf8
} else {
Expand Down Expand Up @@ -579,7 +579,7 @@ private[testkit] case class TestKitMessageImpl[P](payload: P, metadata: SdkMetad

private[testkit] object TestKitMessageImpl {
def ofProtocolMessage(m: kalix.testkit.protocol.eventing_test_backend.Message): TestKitMessage[ByteString] = {
val metadata = new MetadataImpl(m.metadata.getOrElse(Metadata()).entries)
val metadata = MetadataImpl.of(m.metadata.getOrElse(Metadata()).entries)
TestKitMessageImpl[ByteString](m.payload, metadata).asInstanceOf[TestKitMessage[ByteString]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import java.util.Optional
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

private[kalix] class MetadataImpl(val entries: Seq[MetadataEntry]) extends Metadata with CloudEvent {
private[kalix] class MetadataImpl private (val entries: Seq[MetadataEntry]) extends Metadata with CloudEvent {

override def has(key: String): Boolean = entries.exists(_.key.equalsIgnoreCase(key))

Expand Down Expand Up @@ -89,28 +89,28 @@ private[kalix] class MetadataImpl(val entries: Seq[MetadataEntry]) extends Metad
override def set(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
}

override def setBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
MetadataImpl.of(removeKey(key) :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
}

override def add(key: String, value: String): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.StringValue(value)))
}

override def addBinary(key: String, value: ByteBuffer): MetadataImpl = {
Objects.requireNonNull(key, "Key must not be null")
Objects.requireNonNull(value, "Value must not be null")
new MetadataImpl(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
MetadataImpl.of(entries :+ MetadataEntry(key, MetadataEntry.Value.BytesValue(ByteString.copyFrom(value))))
}

override def remove(key: String): MetadataImpl = new MetadataImpl(removeKey(key))
override def remove(key: String): MetadataImpl = MetadataImpl.of(removeKey(key))

override def clear(): MetadataImpl = MetadataImpl.Empty

Expand All @@ -137,7 +137,7 @@ private[kalix] class MetadataImpl(val entries: Seq[MetadataEntry]) extends Metad
} else this

override def asCloudEvent(id: String, source: URI, `type`: String): MetadataImpl =
new MetadataImpl(
MetadataImpl.of(
entries.filterNot(e => MetadataImpl.CeRequired(e.key)) ++
Seq(
MetadataEntry(MetadataImpl.CeSpecversion, MetadataEntry.Value.StringValue(MetadataImpl.CeSpecversionValue)),
Expand Down Expand Up @@ -274,8 +274,19 @@ object MetadataImpl {
val CeSubject = "ce-subject"
val CeTime = "ce-time"
val CeRequired: Set[String] = Set(CeSpecversion, CeId, CeSource, CeType)
private val AllCeAttributes = CeRequired ++ Set(CeDataschema, CeDatacontenttype, CeSubject, CeTime)

val Empty = new MetadataImpl(Vector.empty)
/**
* Maps alternative prefixed keys to our default key format, ie: ce-.
*
* For the moment, only the Kafka prefix is in use, ie: ce_, but others might be needed in future.
*/
private val alternativeKeyFormats = AllCeAttributes.map { attr =>
val key = attr.replaceFirst("^ce-", "ce_")
(key, attr)
}.toMap

val Empty = MetadataImpl.of(Vector.empty)

val JwtClaimPrefix = "_kalix-jwt-claim-"

Expand All @@ -291,4 +302,17 @@ object MetadataImpl {
throw new RuntimeException(s"Unknown metadata implementation: ${other.getClass}, cannot send")
}

def of(entries: Seq[MetadataEntry]): MetadataImpl = {
val transformedEntries =
entries.map { entry =>
// is incoming ce key in one of the alternative formats?
// if so, convert key to our internal default key format
alternativeKeyFormats.get(entry.key) match {
case Some(defaultKey) => MetadataEntry(defaultKey, entry.value)
case _ => entry
}
}

new MetadataImpl(transformedEntries)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private[javasdk] final class ActionsImpl(
.handleStreamedIn(
call.name,
messages.map { message =>
val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(message.metadata.map(_.entries.toVector).getOrElse(Nil))
val decodedPayload = service.messageCodec.decodeMessage(
message.payload.getOrElse(throw new IllegalArgumentException("No command payload")))
MessageEnvelope.of(decodedPayload, metadata)
Expand Down Expand Up @@ -333,7 +333,7 @@ private[javasdk] final class ActionsImpl(
.handleStreamed(
call.name,
messages.map { message =>
val metadata = new MetadataImpl(message.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(message.metadata.map(_.entries.toVector).getOrElse(Nil))
val decodedPayload = service.messageCodec.decodeMessage(
message.payload.getOrElse(throw new IllegalArgumentException("No command payload")))
MessageEnvelope.of(decodedPayload, metadata)
Expand Down Expand Up @@ -364,7 +364,7 @@ private[javasdk] final class ActionsImpl(
messageCodec: MessageCodec,
spanContext: Option[SpanContext],
serviceName: String): ActionContext = {
val metadata = new MetadataImpl(in.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(in.metadata.map(_.entries.toVector).getOrElse(Nil))
val updatedMetadata = spanContext.map(metadataWithTracing(metadata, _)).getOrElse(metadata)
new ActionContextImpl(updatedMetadata, messageCodec, system, serviceName, telemetries)
}
Expand All @@ -381,7 +381,7 @@ private[javasdk] final class ActionsImpl(
log.trace(
"Updated metadata with trace context: [{}]",
l.toList.filter(m => m.key == TRACE_PARENT_KEY || m.key == TRACE_STATE_KEY))
new MetadataImpl(l.toSeq)
MetadataImpl.of(l.toSeq)
}

}
Expand Down Expand Up @@ -411,7 +411,7 @@ class ActionContextImpl(

override def componentCallMetadata: MetadataImpl = {
if (metadata.has(Telemetry.TRACE_PARENT_KEY)) {
new MetadataImpl(
MetadataImpl.of(
List(
MetadataEntry(
Telemetry.TRACE_PARENT_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ final class EventSourcedEntitiesImpl(
val cmd =
service.messageCodec.decodeMessage(
command.payload.getOrElse(throw ProtocolException(command, "No command payload")))
val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val context =
new CommandContextImpl(thisEntityId, sequence, command.name, command.id, metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object ReplicatedEntitiesImpl {

override val commandName: String = command.name

override val metadata: Metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
override val metadata: Metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private final class TraceInstrumentation(
*/
override def buildSpan(service: Service, command: Command): Option[Span] = {
if (logger.isTraceEnabled) logger.trace("Building span for command [{}].", command)
val metadata = new MetadataImpl(command.metadata.map(_.entries).getOrElse(Nil))
val metadata = MetadataImpl.of(command.metadata.map(_.entries).getOrElse(Nil))
if (metadata.get(TRACE_PARENT_KEY).isPresent) {
if (logger.isTraceEnabled) logger.trace("`traceparent` found")

Expand All @@ -212,7 +212,7 @@ private final class TraceInstrumentation(
override def buildSpan(service: Service, command: ActionCommand): Option[Span] = {
if (logger.isTraceEnabled) logger.trace("Building span for action command [{}].", command)

val metadata = new MetadataImpl(command.metadata.map(_.entries).getOrElse(Nil))
val metadata = MetadataImpl.of(command.metadata.map(_.entries).getOrElse(Nil))
if (metadata.get(TRACE_PARENT_KEY).isPresent) {
if (logger.isTraceEnabled) logger.trace("`traceparent` found")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ final class ValueEntitiesImpl(
throw ProtocolException(command, "No command payload for Value entity")

case InCommand(command) =>
val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))

if (log.isTraceEnabled) log.trace("Metadata entries [{}].", metadata.entries)
val span = instrumentations(service.serviceName).buildSpan(service, command)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService],

val commandName = receiveEvent.commandName
val msg = service.messageCodec.decodeMessage(receiveEvent.payload.get)
val metadata = new MetadataImpl(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil))
val context = new UpdateContextImpl(service.viewId, commandName, metadata)

val effect =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
Future.failed(ProtocolException(command, "No command payload for Workflow"))

case InCommand(command) =>
val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
val metadata = MetadataImpl.of(command.metadata.map(_.entries.toVector).getOrElse(Nil))

val context = new CommandContextImpl(workflowId, command.name, command.id, metadata, system)
val timerScheduler = new TimerSchedulerImpl(service.messageCodec, system, context.componentCallMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MetadataImplSpec extends AnyWordSpec with Matchers with OptionValues {
metadata("_kalix-jwt-claim-sub" -> "some-subject").jwtClaims.subject().toScala.value shouldBe "some-subject"
}

"support getting the expiriation JWT claim" in {
"support getting the expiration JWT claim" in {
metadata("_kalix-jwt-claim-exp" -> "12345").jwtClaims.expirationTime().toScala.value shouldBe Instant
.ofEpochSecond(12345)
}
Expand Down Expand Up @@ -159,10 +159,20 @@ class MetadataImplSpec extends AnyWordSpec with Matchers with OptionValues {
val mdRedirect = md.withStatusCode(Redirect.MOVED_PERMANENTLY)
mdRedirect.get("_kalix-http-code").toScala.value shouldBe "301"
}

"support creationg with CloudEvents prefixed with ce_" in {
val md = metadata("ce_id" -> "id", "ce_source" -> "source", "ce_specversion" -> "1.0", "ce_type" -> "foo")
md.isCloudEvent shouldBe true
val ce = md.asCloudEvent()
ce.id() shouldBe "id"
ce.source().toString shouldBe "source"
ce.specversion() shouldBe "1.0"
ce.`type`() shouldBe "foo"
}
}

private def metadata(entries: (String, String)*): Metadata = {
new MetadataImpl(entries.map { case (key, value) =>
MetadataImpl.of(entries.map { case (key, value) =>
MetadataEntry(key, MetadataEntry.Value.StringValue(value))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ trait Metadata extends Iterable[MetadataEntry] {
}

object Metadata {
val empty: Metadata = new MetadataImpl(JMetadataImpl.Empty)
val empty: Metadata = MetadataImpl(JMetadataImpl.Empty)
}

/** A metadata entry. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[scalasdk] final case class ScalaDeferredCallAdapter[I, O](javaSdkDeferre
extends DeferredCall[I, O] {
override def message: I = javaSdkDeferredCall.message
override def metadata: Metadata =
new MetadataImpl(javaSdkDeferredCall.metadata.asInstanceOf[kalix.javasdk.impl.MetadataImpl])
MetadataImpl(javaSdkDeferredCall.metadata.asInstanceOf[kalix.javasdk.impl.MetadataImpl])

def execute(): Future[O] = javaSdkDeferredCall.execute().asScala

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ trait InternalContext {
* the caller. It's empty by default because only actions and workflows can to call other components. Of the two, only
* actions have traces and can pass them around using `def components`.
*/
def componentCallMetadata: MetadataImpl = new MetadataImpl(javasdk.impl.MetadataImpl.Empty)
def componentCallMetadata: MetadataImpl = MetadataImpl(javasdk.impl.MetadataImpl.Empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
private[kalix] object MetadataImpl {
def apply(impl: kalix.javasdk.impl.MetadataImpl): MetadataImpl = new MetadataImpl(impl)
def apply(entries: Seq[ProtocolMetadataEntry]): MetadataImpl = MetadataImpl(
new kalix.javasdk.impl.MetadataImpl(entries))
kalix.javasdk.impl.MetadataImpl.of(entries))
}

private[kalix] class MetadataImpl(val impl: kalix.javasdk.impl.MetadataImpl) extends Metadata with CloudEvent {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ private[scalasdk] final case class ScalaActionContextAdapter(javaSdkContext: jav
override def componentCallMetadata: MetadataImpl = {
metadata.get(Telemetry.TRACE_PARENT_KEY) match {
case Some(traceparent) =>
new MetadataImpl(
new kalix.javasdk.impl.MetadataImpl(
List(MetadataEntry(Telemetry.TRACE_PARENT_KEY, MetadataEntry.Value.StringValue(traceparent)))))
case None => new MetadataImpl(kalix.javasdk.impl.MetadataImpl.Empty)
MetadataImpl(
kalix.javasdk.impl.MetadataImpl
.of(List(MetadataEntry(Telemetry.TRACE_PARENT_KEY, MetadataEntry.Value.StringValue(traceparent)))))
case None => MetadataImpl(kalix.javasdk.impl.MetadataImpl.Empty)
}
}

Expand Down

0 comments on commit e021e2d

Please sign in to comment.