diff --git a/build.sbt b/build.sbt index dee72f76a2..d279d31d8e 100644 --- a/build.sbt +++ b/build.sbt @@ -83,6 +83,7 @@ lazy val javaSdkProtobuf = project // We need to generate the java files for things like entity_key.proto so that downstream libraries can use them // without needing to generate them themselves Compile / PB.targets += PB.gens.java -> crossTarget.value / "akka-grpc" / "main", + Test / javacOptions ++= Seq("-parameters"), // for Jackson Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client), Test / PB.protoSources ++= (Compile / PB.protoSources).value, Test / PB.targets += PB.gens.java -> crossTarget.value / "akka-grpc" / "test") @@ -126,7 +127,7 @@ lazy val javaSdkProtobufTestKit = project lazy val javaSdkSpring = project .in(file("sdk/java-sdk-spring")) - .dependsOn(javaSdkProtobuf) + .dependsOn(javaSdkProtobuf % "compile->compile;test->test") .dependsOn(devTools % IntegrationTest) .dependsOn(javaSdkProtobufTestKit % IntegrationTest) .enablePlugins(AkkaGrpcPlugin, BuildInfoPlugin, PublishSonatype, IntegrationTests) diff --git a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java index 6f25cbfa28..8665d7433e 100644 --- a/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java +++ b/samples/java-spring-transfer-workflow-compensation/src/main/java/com/example/wallet/WalletEntity.java @@ -31,8 +31,8 @@ public Wallet deposit(int amount) { @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @JsonSubTypes({ - @JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class), - @JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class)}) + @JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class, name = "withdraw-succeed"), + @JsonSubTypes.Type(value = WithdrawResult.WithdrawFailed.class, name = "withdraw-failed")}) public sealed interface WithdrawResult { record WithdrawFailed(String errorMsg) implements WithdrawResult { } @@ -43,8 +43,8 @@ record WithdrawSucceed() implements WithdrawResult { @JsonTypeInfo(use = JsonTypeInfo.Id.NAME) @JsonSubTypes({ - @JsonSubTypes.Type(value = DepositResult.DepositSucceed.class), - @JsonSubTypes.Type(value = DepositResult.DepositFailed.class)}) + @JsonSubTypes.Type(value = DepositResult.DepositSucceed.class, name = "deposit-succeed"), + @JsonSubTypes.Type(value = DepositResult.DepositFailed.class, name = "deposit-failed")}) public sealed interface DepositResult { record DepositFailed(String errorMsg) implements DepositResult { } diff --git a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonMigration.java b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonMigration.java new file mode 100644 index 0000000000..81113c5a6a --- /dev/null +++ b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonMigration.java @@ -0,0 +1,68 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.util.List; + +/** + * Allows to specify dedicated strategy for JSON schema evolution. + *

+ * It is used when deserializing data of older version than the + * {@link JsonMigration#currentVersion}. You implement the transformation of the + * JSON structure in the {@link JsonMigration#transform} method. If you have changed the + * class name you should add it to {@link JsonMigration#supportedClassNames}. + */ +public abstract class JsonMigration { + + /** + * Define current version, that is, the value used when serializing new data. The first version, when no + * migration was used, is always 0. + */ + public abstract int currentVersion(); + + /** + * Define the supported forward version this migration can read (must be greater or equal than `currentVersion`). + * If this value is different from {@link JsonMigration#currentVersion} a {@link JsonMigration#transform} will be used to downcast + * the received payload to the current schema. + */ + public int supportedForwardVersion() { + return currentVersion(); + } + + /** + * Implement the transformation of the incoming JSON structure to the current + * JSON structure. The `JsonNode` is mutable so you can add and remove fields, + * or change values. Note that you have to cast to specific sub-classes such + * as `ObjectNode` and `ArrayNode` to get access to mutators. + * + * @param fromVersion the version of the old data + * @param json the incoming JSON data + */ + public JsonNode transform(int fromVersion, JsonNode json) { + return json; + } + + /** + * Override this method if you have changed the class name. Return + * all old class names. + */ + public List supportedClassNames() { + return List.of(); + } +} diff --git a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonSupport.java b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonSupport.java index cb5edd4d15..9ed9fe4738 100644 --- a/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonSupport.java +++ b/sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonSupport.java @@ -17,25 +17,27 @@ package kalix.javasdk; import akka.Done; -import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import kalix.javasdk.impl.ByteStringEncoding; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.protobuf.*; import java.io.IOException; import java.util.Collection; @@ -130,11 +132,27 @@ public static Any encodeJson(T value, String jsonType) { * the JSON string as bytes as value and a type URL starting with "json.kalix.io/". * * @param valueClass The type of class to deserialize the object to, the class must have the - * proper Jackson annotations for deserialization. + * proper Jackson annotations for deserialization. + * @param any The protobuf Any object to deserialize. * @return The decoded object * @throws IllegalArgumentException if the given value cannot be decoded to a T */ public static T decodeJson(Class valueClass, Any any) { + return decodeJson(valueClass, any, Optional.empty()); + } + + /** + * Decode the given protobuf Any object to an instance of T using Jackson. The object must have + * the JSON string as bytes as value and a type URL starting with "json.kalix.io/". + * + * @param valueClass The type of class to deserialize the object to, the class must have the + * proper Jackson annotations for deserialization. + * @param any The protobuf Any object to deserialize. + * @param jacksonMigration The optional @{@link JsonMigration} implementation used for deserialization. + * @return The decoded object + * @throws IllegalArgumentException if the given value cannot be decoded to a T + */ + public static T decodeJson(Class valueClass, Any any, Optional jacksonMigration) { if (!any.getTypeUrl().startsWith(KALIX_JSON)) { throw new IllegalArgumentException( "Protobuf bytes with type url [" @@ -145,7 +163,24 @@ public static T decodeJson(Class valueClass, Any any) { } else { try { ByteString decodedBytes = ByteStringEncoding.decodePrimitiveBytes(any.getValue()); - return objectMapper.readValue(decodedBytes.toByteArray(), valueClass); + if (jacksonMigration.isPresent()) { + int fromVersion = parseVersion(any.getTypeUrl()); + JsonMigration migration = jacksonMigration.get(); + int currentVersion = migration.currentVersion(); + int supportedForwardVersion = migration.supportedForwardVersion(); + if (fromVersion < currentVersion) { + return migrate(valueClass, decodedBytes, fromVersion, migration); + } else if (fromVersion == currentVersion) { + return objectMapper.readValue(decodedBytes.toByteArray(), valueClass); + } else if (fromVersion <= supportedForwardVersion) { + return migrate(valueClass, decodedBytes, fromVersion, migration); + } else { + throw new IllegalStateException("Migration version " + supportedForwardVersion + " is " + + "behind version " + fromVersion + " of deserialized type [" + valueClass.getName() + "]"); + } + } else { + return objectMapper.readValue(decodedBytes.toByteArray(), valueClass); + } } catch (IOException e) { throw new IllegalArgumentException( "JSON with type url [" @@ -158,14 +193,30 @@ public static T decodeJson(Class valueClass, Any any) { } } + private static T migrate(Class valueClass, ByteString decodedBytes, int fromVersion, JsonMigration jsonMigration) throws IOException { + JsonNode jsonNode = objectMapper.readTree(decodedBytes.toByteArray()); + JsonNode newJsonNode = jsonMigration.transform(fromVersion, jsonNode); + return objectMapper.treeToValue(newJsonNode, valueClass); + } + + private static int parseVersion(String typeUrl) { + int versionSeparatorIndex = typeUrl.lastIndexOf("#"); + if (versionSeparatorIndex > 0) { + String maybeVersion = typeUrl.substring(versionSeparatorIndex + 1); + return Integer.parseInt(maybeVersion); + } else { + return 0; + } + } + public static > C decodeJsonCollection(Class valueClass, Class collectionType, Any any) { if (!any.getTypeUrl().startsWith(KALIX_JSON)) { throw new IllegalArgumentException( - "Protobuf bytes with type url [" - + any.getTypeUrl() - + "] cannot be decoded as JSON, must start with [" - + KALIX_JSON - + "]"); + "Protobuf bytes with type url [" + + any.getTypeUrl() + + "] cannot be decoded as JSON, must start with [" + + KALIX_JSON + + "]"); } else { try { ByteString decodedBytes = ByteStringEncoding.decodePrimitiveBytes(any.getValue()); @@ -212,9 +263,9 @@ public void serialize(Done value, JsonGenerator gen, SerializerProvider serializ class DoneDeserializer extends JsonDeserializer { @Override - public Done deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException { - if (p.currentToken() == JsonToken.START_OBJECT && p.nextToken() == JsonToken.END_OBJECT){ - return Done.getInstance(); + public Done deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + if (p.currentToken() == JsonToken.START_OBJECT && p.nextToken() == JsonToken.END_OBJECT) { + return Done.getInstance(); } else { throw JsonMappingException.from(ctxt, "Cannot deserialize Done class, expecting empty object '{}'"); } diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass.java new file mode 100644 index 0000000000..fcc4ee3b8e --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.Optional; + + +public class DummyClass { + public String stringValue; + public int intValue; + public Optional optionalStringValue; + + @JsonCreator + public DummyClass(String stringValue, int intValue, Optional optionalStringValue) { + this.stringValue = stringValue; + this.intValue = intValue; + this.optionalStringValue = optionalStringValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DummyClass that = (DummyClass) o; + return intValue == that.intValue && Objects.equals(stringValue, that.stringValue) && Objects.equals(optionalStringValue, that.optionalStringValue); + } + + @Override + public int hashCode() { + return Objects.hash(stringValue, intValue, optionalStringValue); + } +} diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2.java new file mode 100644 index 0000000000..3b58011dc0 --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2.java @@ -0,0 +1,57 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class DummyClass2 { + public String stringValue; + public int intValue; + public String mandatoryStringValue; + + @JsonCreator + public DummyClass2(String stringValue, int intValue, String mandatoryStringValue) { + this.stringValue = stringValue; + this.intValue = intValue; + this.mandatoryStringValue = mandatoryStringValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DummyClass2 that = (DummyClass2) o; + return intValue == that.intValue && Objects.equals(stringValue, that.stringValue) && Objects.equals(mandatoryStringValue, that.mandatoryStringValue); + } + + @Override + public int hashCode() { + return Objects.hash(stringValue, intValue, mandatoryStringValue); + } + + @Override + public String toString() { + return "DummyClass2{" + + "stringValue='" + stringValue + '\'' + + ", intValue=" + intValue + + ", mandatoryStringValue='" + mandatoryStringValue + '\'' + + '}'; + } +} diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2Migration.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2Migration.java new file mode 100644 index 0000000000..aa96f5f1ab --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2Migration.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; + +public class DummyClass2Migration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + if (fromVersion < 1) { + return ((ObjectNode) json).set("mandatoryStringValue", TextNode.valueOf("mandatory-value")); + } else { + return json; + } + } +} diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassMigration.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassMigration.java new file mode 100644 index 0000000000..ca8d9f1e91 --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassMigration.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class DummyClassMigration extends JsonMigration { + @Override + public int currentVersion() { + return 0; + } + + @Override + public int supportedForwardVersion() { + return 1; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + if (fromVersion == 1) { + ObjectNode objectNode = ((ObjectNode) json); + objectNode.set("optionalStringValue", objectNode.get("mandatoryStringValue")); + objectNode.remove("mandatoryStringValue"); + return objectNode; + } else { + return json; + } + } +} diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassRenamed.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassRenamed.java new file mode 100644 index 0000000000..4bc7cfa904 --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClassRenamed.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.Optional; + +public class DummyClassRenamed { + public String stringValue; + public int intValue; + public Optional optionalStringValue; + + @JsonCreator + public DummyClassRenamed(String stringValue, int intValue, Optional optionalStringValue) { + this.stringValue = stringValue; + this.intValue = intValue; + this.optionalStringValue = optionalStringValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DummyClassRenamed that = (DummyClassRenamed) o; + return intValue == that.intValue && Objects.equals(stringValue, that.stringValue) && Objects.equals(optionalStringValue, that.optionalStringValue); + } + + @Override + public int hashCode() { + return Objects.hash(stringValue, intValue, optionalStringValue); + } +} diff --git a/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/workflow/DummyClassRenamedMigration.java b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/workflow/DummyClassRenamedMigration.java new file mode 100644 index 0000000000..9621defcd5 --- /dev/null +++ b/sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/workflow/DummyClassRenamedMigration.java @@ -0,0 +1,40 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.workflow; + +import com.fasterxml.jackson.databind.JsonNode; +import kalix.javasdk.DummyClass2; +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class DummyClassRenamedMigration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + return json; + } + + @Override + public List supportedClassNames() { + return List.of(DummyClass2.class.getName()); + } +} diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/JsonSupportSpec.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/JsonSupportSpec.scala index 2b2dffa356..2d11278d29 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/JsonSupportSpec.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/javasdk/JsonSupportSpec.scala @@ -17,10 +17,14 @@ package kalix.javasdk import java.util +import java.util.Optional import scala.beans.BeanProperty import akka.Done +import com.google.protobuf.Any +import com.google.protobuf.UnsafeByteOperations +import kalix.javasdk.impl.ByteStringEncoding import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -41,6 +45,69 @@ class JsonSupportSpec extends AnyWordSpec with Matchers { JsonSupport.decodeJson(classOf[MyJsonable], any).field should ===("foo") } + "serialize and deserialize DummyClass" in { + val dummyClass = new DummyClass("123", 321, Optional.of("test")) + val any = JsonSupport.encodeJson(dummyClass) + any.getTypeUrl should ===(JsonSupport.KALIX_JSON + classOf[DummyClass].getName) + val decoded = JsonSupport.decodeJson(classOf[DummyClass], any, Optional.of(new DummyClassMigration)) + decoded shouldBe dummyClass + } + + "deserialize missing field as optional none" in { + val bytes = UnsafeByteOperations.unsafeWrap("""{"stringValue":"123","intValue":321}""".getBytes) + val encodedBytes = ByteStringEncoding.encodePrimitiveBytes(bytes) + val any = + Any.newBuilder.setTypeUrl(JsonSupport.KALIX_JSON + classOf[DummyClass].getName).setValue(encodedBytes).build + + val decoded = JsonSupport.decodeJson(classOf[DummyClass], any, Optional.of(new DummyClassMigration)) + decoded shouldBe new DummyClass("123", 321, Optional.empty()) + } + + "deserialize null field as optional none" in { + val bytes = + UnsafeByteOperations.unsafeWrap("""{"stringValue":"123","intValue":321,"optionalStringValue":null}""".getBytes) + val encodedBytes = ByteStringEncoding.encodePrimitiveBytes(bytes) + val any = + Any.newBuilder.setTypeUrl(JsonSupport.KALIX_JSON + classOf[DummyClass].getName).setValue(encodedBytes).build + + val decoded = JsonSupport.decodeJson(classOf[DummyClass], any, Optional.of(new DummyClassMigration)) + decoded shouldBe new DummyClass("123", 321, Optional.empty()) + } + + "deserialize mandatory field with migration" in { + val bytes = UnsafeByteOperations.unsafeWrap("""{"stringValue":"123","intValue":321}""".getBytes) + val encodedBytes = ByteStringEncoding.encodePrimitiveBytes(bytes) + val any = + Any.newBuilder.setTypeUrl(JsonSupport.KALIX_JSON + classOf[DummyClass2].getName).setValue(encodedBytes).build + + val decoded = JsonSupport.decodeJson(classOf[DummyClass2], any, Optional.of(new DummyClass2Migration)) + decoded shouldBe new DummyClass2("123", 321, "mandatory-value") + } + + "deserialize renamed class" in { + val bytes = UnsafeByteOperations.unsafeWrap("""{"stringValue":"123","intValue":321}""".getBytes) + val encodedBytes = ByteStringEncoding.encodePrimitiveBytes(bytes) + val any = + Any.newBuilder.setTypeUrl(JsonSupport.KALIX_JSON + classOf[DummyClass].getName).setValue(encodedBytes).build + + val decoded = JsonSupport.decodeJson(classOf[DummyClassRenamed], any) + decoded shouldBe new DummyClassRenamed("123", 321, Optional.empty()) + } + + "deserialize forward from DummyClass2 to DummyClass" in { + val bytes = UnsafeByteOperations.unsafeWrap( + """{"stringValue":"123","intValue":321,"mandatoryStringValue":"value"}""".getBytes) + val encodedBytes = ByteStringEncoding.encodePrimitiveBytes(bytes) + val any = + Any.newBuilder + .setTypeUrl(JsonSupport.KALIX_JSON + classOf[DummyClass2].getName + "#1") + .setValue(encodedBytes) + .build + + val decoded = JsonSupport.decodeJson(classOf[DummyClass], any, Optional.of(new DummyClassMigration)) + decoded shouldBe new DummyClass("123", 321, Optional.of("value")) + } + "serialize and deserialize Akka Done class" in { val done = Done.getInstance() val any = JsonSupport.encodeJson(done) diff --git a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/workflow/WorkflowMessages.scala b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/workflow/WorkflowMessages.scala index d79e4905c6..fa2b4027d8 100644 --- a/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/workflow/WorkflowMessages.scala +++ b/sdk/java-sdk-protobuf/src/test/scala/kalix/testkit/workflow/WorkflowMessages.scala @@ -24,9 +24,9 @@ import kalix.protocol.entity.Command import kalix.protocol.workflow_entity.WorkflowStreamIn.{ Message => InMessage } import kalix.protocol.workflow_entity.WorkflowStreamOut.{ Message => OutMessage } import kalix.protocol.workflow_entity._ +import kalix.protocol.workflow_entity.{ NoTransition => ProtoNoTransition } import kalix.testkit.entity.EntityMessages import scalapb.{ GeneratedMessage => ScalaPbMessage } -import kalix.protocol.workflow_entity.{ NoTransition => ProtoNoTransition } object WorkflowMessages extends EntityMessages { @@ -68,6 +68,14 @@ object WorkflowMessages extends EntityMessages { InMessage.Step(executeStep) } + def executeStep(id: Long, stepName: String): InMessage = { + InMessage.Step(ExecuteStep(id, stepName, None, None)) + } + + def executeStep(id: Long, stepName: String, state: ScalaPbAny): InMessage = { + InMessage.Step(ExecuteStep(id, stepName, None, Some(state))) + } + def getNextStep(id: Long, stepName: String, input: JavaPbMessage): InMessage = { val nextStep = GetNextStep.defaultInstance @@ -77,6 +85,15 @@ object WorkflowMessages extends EntityMessages { InMessage.Transition(nextStep) } + def getNextStep(id: Long, stepName: String, input: ScalaPbAny): InMessage = { + val nextStep = + GetNextStep.defaultInstance + .withCommandId(id) + .withStepName(stepName) + .withResult(input) + InMessage.Transition(nextStep) + } + def actionFailure(id: Long, description: String, statusCode: Status.Code): OutMessage = { val failure = component.Failure(id, description, statusCode.value()) val failureClientAction = WorkflowClientAction.defaultInstance.withFailure(failure) @@ -88,6 +105,43 @@ object WorkflowMessages extends EntityMessages { WorkflowStreamOut.Message.Effect(failureEffect) } + def workflowActionReply(payload: Option[ScalaPbAny]): Option[WorkflowClientAction] = { + Some(WorkflowClientAction(WorkflowClientAction.Action.Reply(component.Reply(payload, None)))) + } + + def stepTransition(stepName: String) = + WorkflowEffect.Transition.StepTransition(StepTransition(stepName)) + + def reply(id: Long, payload: ScalaPbAny): OutMessage = + replyAction(id, workflowActionReply(Some(payload)), None, WorkflowEffect.Transition.NoTransition(NoTransition())) + + def reply(id: Long, payload: ScalaPbAny, transition: WorkflowEffect.Transition): OutMessage = + replyAction(id, workflowActionReply(Some(payload)), None, transition) + + def reply(id: Long, payload: ScalaPbAny, state: ScalaPbAny, transition: WorkflowEffect.Transition): OutMessage = + replyAction(id, workflowActionReply(Some(payload)), Some(state), transition) + + def replyAction( + id: Long, + action: Option[WorkflowClientAction], + state: Option[ScalaPbAny], + transition: WorkflowEffect.Transition): OutMessage = { + OutMessage.Effect(WorkflowEffect(id, action, state, transition)) + } + + def stepExecuted(id: Long, stepName: String, result: ScalaPbAny): OutMessage = { + OutMessage.Response(StepResponse(id, stepName, StepResponse.Response.Executed(StepExecuted(Some(result))))) + } + + def end(id: Long, state: ScalaPbAny): OutMessage = { + OutMessage.Effect( + WorkflowEffect( + id, + workflowActionReply(None), + Some(state), + WorkflowEffect.Transition.EndTransition(EndTransition()))) + } + def config(): OutMessage = WorkflowStreamOut.Message.Config(WorkflowConfig(defaultStepConfig = Some(StepConfig("", None, None)))) } diff --git a/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java b/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java index 58d6c7eec5..f1e13fafc1 100644 --- a/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java +++ b/sdk/java-sdk-spring/src/it/java/com/example/wiring/SpringSdkIntegrationTest.java @@ -242,25 +242,37 @@ public void verifyEchoActionRequestParamWithTypedForward() { public void verifyStreamActions() { List messageList = - webClient - .get() - .uri("/echo/repeat/abc/times/3") - .retrieve() - .bodyToFlux(Message.class) - .toStream() - .collect(Collectors.toList()); + webClient + .get() + .uri("/echo/repeat/abc/times/3") + .retrieve() + .bodyToFlux(Message.class) + .toStream() + .collect(Collectors.toList()); assertThat(messageList).hasSize(3); } + @Test + public void verifyCounterEventSourceSubscription2() throws InterruptedException { + + String entityId = "hello1"; + execute(componentClient.forEventSourcedEntity(entityId) + .call(CounterEntity::increase) + .params(777)); + + Thread.sleep(60000); + + } + @Test public void verifyCounterEventSourceSubscription() { // GIVEN IncreaseAction is subscribed to CounterEntity events // WHEN the CounterEntity is requested to increase 42\ String entityId = "hello1"; execute(componentClient.forEventSourcedEntity(entityId) - .call(CounterEntity::increase) - .params(42)); + .call(CounterEntity::increase) + .params(42)); // THEN IncreaseAction receives the event 42 and increases the counter 1 more await() diff --git a/sdk/java-sdk-spring/src/main/java/kalix/javasdk/annotations/Migration.java b/sdk/java-sdk-spring/src/main/java/kalix/javasdk/annotations/Migration.java new file mode 100644 index 0000000000..4bbd4c468c --- /dev/null +++ b/sdk/java-sdk-spring/src/main/java/kalix/javasdk/annotations/Migration.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.annotations; + +import kalix.javasdk.JsonMigration; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to assign a @{@link JsonMigration} implementation for a given class. + * Can be combined with @{@link TypeName} annotation. + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Migration { + + Class value(); +} diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/CommandHandler.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/CommandHandler.scala index 245d0895fa..b3078432fe 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/CommandHandler.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/CommandHandler.scala @@ -43,7 +43,8 @@ case class CommandHandler( val lastParam = javaMethod.method.getParameterTypes.last if (lastParam.getAnnotation(classOf[JsonSubTypes]) != null) { lastParam.getAnnotation(classOf[JsonSubTypes]).value().exists { subType => - inputTypeUrl == messageCodec.typeUrlFor(subType.value()) + inputTypeUrl == messageCodec + .typeUrlFor(subType.value()) //TODO requires more changes to be used with JsonMigration } } else false } @@ -51,12 +52,13 @@ case class CommandHandler( def lookupInvoker(inputTypeUrl: String): Option[MethodInvoker] = methodInvokers - .get(inputTypeUrl) + .get(messageCodec.removeVersion(inputTypeUrl)) .orElse(lookupMethodAcceptingSubType(inputTypeUrl)) def getInvoker(inputTypeUrl: String): MethodInvoker = lookupInvoker(inputTypeUrl).getOrElse { - throw new NoSuchElementException(s"Couldn't find any entry for typeUrl [${inputTypeUrl}] in [${methodInvokers}].") + throw new NoSuchElementException( + s"Couldn't find any entry for typeUrl [$inputTypeUrl] in [${methodInvokers.view.mapValues(_.method.getName)}].") } } diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptor.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptor.scala index 94a0cf5f11..528468786f 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptor.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptor.scala @@ -59,6 +59,7 @@ import kalix.javasdk.impl.reflection.RestServiceIntrospector.UnhandledParameter import kalix.javasdk.impl.reflection.ServiceMethod import kalix.javasdk.impl.reflection.SubscriptionServiceMethod import kalix.javasdk.impl.reflection.SyntheticRequestServiceMethod +import kalix.javasdk.impl.reflection.VirtualServiceMethod // TODO: abstract away spring dependency import org.springframework.web.bind.annotation.RequestMethod @@ -259,28 +260,22 @@ private[kalix] object ComponentDescriptor { case method: SubscriptionServiceMethod => val methodInvokers = - serviceMethod.javaMethodOpt.map { meth => + serviceMethod.javaMethodOpt + .map { meth => - val parameterExtractors: ParameterExtractorsArray = - Array(ParameterExtractors.AnyBodyExtractor(method.inputType)) + val parameterExtractors: ParameterExtractorsArray = + Array(ParameterExtractors.AnyBodyExtractor(method.inputType)) - val typeUrl = messageCodec.typeUrlFor(method.inputType) - (typeUrl, MethodInvoker(meth, parameterExtractors)) - }.toMap + val typeUrls = messageCodec.typeUrlsFor(method.inputType) + typeUrls.map(_ -> MethodInvoker(meth, parameterExtractors)).toMap + } + .getOrElse(Map.empty) CommandHandler(grpcMethodName, messageCodec, JavaPbAny.getDescriptor, methodInvokers) - case method: AnyJsonRequestServiceMethod => - val methodInvokers = serviceMethod.javaMethodOpt.map { meth => - - val parameterExtractors: ParameterExtractorsArray = - Array(ParameterExtractors.AnyBodyExtractor(method.inputType)) - - val typeUrl = messageCodec.typeUrlFor(method.inputType) - (typeUrl, MethodInvoker(meth, parameterExtractors)) - }.toMap - - CommandHandler(grpcMethodName, messageCodec, JavaPbAny.getDescriptor, methodInvokers) + case _: VirtualServiceMethod => + //java method is empty + CommandHandler(grpcMethodName, messageCodec, JavaPbAny.getDescriptor, Map.empty) case _: DeleteServiceMethod => val methodInvokers = serviceMethod.javaMethodOpt.map { meth => diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptorFactory.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptorFactory.scala index 08f91624c5..410b4a88f7 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptorFactory.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptorFactory.scala @@ -472,13 +472,12 @@ private[impl] object ComponentDescriptorFactory { groupedSubscriptions.collect { case (source, kMethods) if kMethods.size > 1 => val methodsMap = - kMethods.map { k => + kMethods.flatMap { k => val methodParameterTypes = k.serviceMethod.javaMethodOpt.get.getParameterTypes // it is safe to pick the last parameter. An action has one and View has two. In the View always the last is the event val eventParameter = methodParameterTypes.last - val typeUrl = messageCodec.typeUrlFor(eventParameter) - (typeUrl, k.serviceMethod.javaMethodOpt.get) + messageCodec.typeUrlsFor(eventParameter).map(typeUrl => (typeUrl, k.serviceMethod.javaMethodOpt.get)) }.toMap KalixMethod( diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/JsonMessageCodec.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/JsonMessageCodec.scala index 75e13ae7fc..91b997a3e2 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/JsonMessageCodec.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/JsonMessageCodec.scala @@ -16,24 +16,29 @@ package kalix.javasdk.impl -import com.google.protobuf.ByteString -import com.google.protobuf.BytesValue - import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap + +import scala.jdk.CollectionConverters._ + +import com.google.protobuf.ByteString +import com.google.protobuf.BytesValue import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.{ Any => JavaPbAny } import kalix.javasdk.JsonSupport +import kalix.javasdk.annotations.Migration import kalix.javasdk.annotations.TypeName import kalix.javasdk.impl.AnySupport.BytesPrimitive +import kalix.javasdk.impl.reflection.MigrationExtractor.extractMigration import org.slf4j.LoggerFactory private[kalix] class JsonMessageCodec extends MessageCodec { private val log = LoggerFactory.getLogger(getClass) + private[kalix] case class TypeHint(currenTypeHintWithVersion: String, allTypeHints: List[String]) - private val cache: ConcurrentMap[Class[_], String] = new ConcurrentHashMap() - private[kalix] val reversedCache: ConcurrentMap[String, Class[_]] = new ConcurrentHashMap() + private val typeHints: ConcurrentMap[Class[_], TypeHint] = new ConcurrentHashMap() + private[kalix] val reversedTypeHints: ConcurrentMap[String, Class[_]] = new ConcurrentHashMap() /** * In the Java SDK, output data are encoded to Json. @@ -44,7 +49,7 @@ private[kalix] class JsonMessageCodec extends MessageCodec { case javaPbAny: JavaPbAny => ScalaPbAny.fromJavaProto(javaPbAny) case scalaPbAny: ScalaPbAny => scalaPbAny case bytes: Array[Byte] => ScalaPbAny.fromJavaProto(JavaPbAny.pack(BytesValue.of(ByteString.copyFrom(bytes)))) - case other => ScalaPbAny.fromJavaProto(JsonSupport.encodeJson(other, lookupTypeHint(other))) + case other => ScalaPbAny.fromJavaProto(JsonSupport.encodeJson(other, lookupTypeHintWithVersion(other))) } } @@ -53,20 +58,34 @@ private[kalix] class JsonMessageCodec extends MessageCodec { value match { case javaPbAny: JavaPbAny => javaPbAny case scalaPbAny: ScalaPbAny => ScalaPbAny.toJavaProto(scalaPbAny) - case other => JsonSupport.encodeJson(other, lookupTypeHint(other)) + case other => JsonSupport.encodeJson(other, lookupTypeHintWithVersion(other)) } } - private def lookupTypeHint(value: Any): String = - lookupTypeHint(value.getClass) + private def lookupTypeHintWithVersion(value: Any): String = + lookupTypeHint(value.getClass).currenTypeHintWithVersion + + private[kalix] def lookupTypeHint(clz: Class[_]): TypeHint = { + typeHints.computeIfAbsent(clz, computeTypeHint) + } - private[kalix] def lookupTypeHint(clz: Class[_]): String = { + private def computeTypeHint(clz: Class[_]): TypeHint = { val typeName = Option(clz.getAnnotation(classOf[TypeName])) .collect { case ann if ann.value().trim.nonEmpty => ann.value() } .getOrElse(clz.getName) - cache.computeIfAbsent(clz, _ => typeName) + + val (version, supportedClassNames) = getVersionAndSupportedClassNames(clz) + val typeNameWithVersion = typeName + (if (version == 0) "" else "#" + version) + //TODO verify if this could be replaced by sth smarter/safer - reversedCache.compute( + addToReversedCache(clz, typeName) + supportedClassNames.foreach(className => addToReversedCache(clz, className)) + + TypeHint(typeNameWithVersion, typeName :: supportedClassNames) + } + + private def addToReversedCache(clz: Class[_], typeName: String) = { + reversedTypeHints.compute( typeName, (_, currentValue) => { if (currentValue == null) { @@ -78,21 +97,40 @@ private[kalix] class JsonMessageCodec extends MessageCodec { "Collision with existing existing mapping " + currentValue + " -> " + typeName + ". The same type name can't be used for other class " + clz) } }) + } - typeName + private def getVersionAndSupportedClassNames(clz: Class[_]): (Int, List[String]) = { + Option(clz.getAnnotation(classOf[Migration])) + .map(_.value()) + .map(migrationClass => migrationClass.getConstructor().newInstance()) + .map(migration => + (migration.currentVersion(), migration.supportedClassNames().asScala.toList)) //TODO what about TypeName + .getOrElse((0, List.empty)) } def typeUrlFor(clz: Class[_]) = { if (clz == classOf[Array[Byte]]) { BytesPrimitive.fullName } else { - JsonSupport.KALIX_JSON + lookupTypeHint(clz) + JsonSupport.KALIX_JSON + lookupTypeHint(clz).currenTypeHintWithVersion + } + } + + def typeUrlsFor(clz: Class[_]): List[String] = { + if (clz == classOf[Array[Byte]]) { + List(BytesPrimitive.fullName) + } else { + lookupTypeHint(clz).allTypeHints.map(JsonSupport.KALIX_JSON + _) } } override def decodeMessage(value: ScalaPbAny): Any = { value } + + private[kalix] def removeVersion(typeName: String) = { + typeName.split("#").head + } } /** @@ -105,12 +143,12 @@ private[kalix] class StrictJsonMessageCodec(delegate: JsonMessageCodec) extends override def decodeMessage(value: ScalaPbAny): Any = if (value.typeUrl.startsWith(JsonSupport.KALIX_JSON)) { val any = ScalaPbAny.toJavaProto(value) - val typeName = value.typeUrl.replace(JsonSupport.KALIX_JSON, "") - val typeClass = delegate.reversedCache.get(typeName) + val typeName = delegate.removeVersion(value.typeUrl.replace(JsonSupport.KALIX_JSON, "")) + val typeClass = delegate.reversedTypeHints.get(typeName) if (typeClass == null) { throw new IllegalStateException(s"Cannot decode ${value.typeUrl} message type. Class mapping not found.") } else { - JsonSupport.decodeJson(typeClass, any) + JsonSupport.decodeJson(typeClass, any, extractMigration(typeClass)) } } else { value diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedHandlersExtractor.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedHandlersExtractor.scala index 667ec3fbcc..5d9574d2e9 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedHandlersExtractor.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedHandlersExtractor.scala @@ -78,10 +78,10 @@ object EventSourcedHandlersExtractor { } EventSourceEntityHandlers( - handlers = validHandlers.map { case (classType, methods) => - messageCodec.typeUrlFor(classType) -> MethodInvoker( - methods.head, - ParameterExtractors.AnyBodyExtractor[AnyRef](classType)) + handlers = validHandlers.flatMap { case (classType, methods) => + val invoker = MethodInvoker(methods.head, ParameterExtractors.AnyBodyExtractor[AnyRef](classType)) + //in case of schema evolution more types can point to the same invoker + messageCodec.typeUrlsFor(classType).map(typeUrl => typeUrl -> invoker) }, errors = errorsForSignatures ++ errorsForDuplicates.toList ++ missingEventHandler) } diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala index 42b7810156..0a0c7e5e09 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/eventsourcedentity/ReflectiveEventSourcedEntityRouter.scala @@ -26,8 +26,9 @@ import kalix.javasdk.eventsourcedentity.CommandContext import kalix.javasdk.eventsourcedentity.EventSourcedEntity import kalix.javasdk.impl.CommandHandler import kalix.javasdk.impl.InvocationContext -import kalix.javasdk.impl.MethodInvoker import kalix.javasdk.impl.JsonMessageCodec +import kalix.javasdk.impl.MethodInvoker +import kalix.javasdk.impl.reflection.MigrationExtractor.extractMigration class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( override protected val entity: ES, @@ -41,10 +42,11 @@ class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( commandName, throw new HandlerNotFoundException("command", commandName, commandHandlers.keySet)) - private def eventHandlerLookup(eventName: String) = + private def eventHandlerLookup(eventName: String) = { eventHandlerMethods.getOrElse( - eventName, - throw new HandlerNotFoundException("event", eventName, commandHandlers.keySet)) + messageCodec.removeVersion(eventName), + throw new HandlerNotFoundException("event", eventName, eventHandlerMethods.keySet)) + } override def handleEvent(state: S, event: E): S = { @@ -107,7 +109,10 @@ class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]]( entity._internalSetCurrentState(s) case s => val deserializedState = - JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) + JsonSupport.decodeJson( + entityStateType, + ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny]), + extractMigration(entityStateType)) entity._internalSetCurrentState(deserializedState) } } diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/MigrationExtractor.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/MigrationExtractor.scala new file mode 100644 index 0000000000..fdc3bf1ba3 --- /dev/null +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/MigrationExtractor.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.impl.reflection + +import java.util.Optional +import kalix.javasdk.JsonMigration +import kalix.javasdk.annotations.Migration + +object MigrationExtractor { + + def extractMigration(clazz: Class[_]): Optional[JsonMigration] = { + if (clazz.getAnnotation(classOf[Migration]) != null) { + val migration = clazz + .getAnnotation(classOf[Migration]) + .value() + .getConstructor() + .newInstance() + Optional.of(migration) + } else { + Optional.empty() + } + } +} diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/ParameterExtractor.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/ParameterExtractor.scala index 75b83657ff..cf9719066e 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/ParameterExtractor.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/reflection/ParameterExtractor.scala @@ -28,6 +28,7 @@ import kalix.javasdk.JsonSupport import kalix.javasdk.Metadata import kalix.javasdk.impl.AnySupport import kalix.javasdk.impl.ErrorHandling.BadRequestException +import kalix.javasdk.impl.reflection.MigrationExtractor.extractMigration /** * Extracts method parameters from an invocation context for the purpose of passing them to a reflective invocation call @@ -63,7 +64,7 @@ object ParameterExtractors { val bytes = dm.getField(JavaPbAny.getDescriptor.findFieldByName("value")).asInstanceOf[ByteString] AnySupport.decodePrimitiveBytes(bytes).toByteArray.asInstanceOf[T] } else { - JsonSupport.decodeJson(cls, toAny(dm)) + JsonSupport.decodeJson(cls, toAny(dm), extractMigration(cls)) } } diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/valueentity/ReflectiveValueEntityRouter.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/valueentity/ReflectiveValueEntityRouter.scala index 64dc6dae25..4df640086e 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/valueentity/ReflectiveValueEntityRouter.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/valueentity/ReflectiveValueEntityRouter.scala @@ -22,6 +22,7 @@ import com.google.protobuf.any.{ Any => ScalaPbAny } import kalix.javasdk.JsonSupport import kalix.javasdk.impl.CommandHandler import kalix.javasdk.impl.InvocationContext +import kalix.javasdk.impl.reflection.MigrationExtractor.extractMigration import kalix.javasdk.valueentity.CommandContext import kalix.javasdk.valueentity.ValueEntity @@ -73,7 +74,10 @@ class ReflectiveValueEntityRouter[S, E <: ValueEntity[S]]( entity._internalSetCurrentState(s) case s => val deserializedState = - JsonSupport.decodeJson(entityStateType, ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny])) + JsonSupport.decodeJson( + entityStateType, + ScalaPbAny.toJavaProto(s.asInstanceOf[ScalaPbAny]), + extractMigration(entityStateType)) entity._internalSetCurrentState(deserializedState) } } diff --git a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala index 9fe9e99e91..b8bdb4d1ba 100644 --- a/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala +++ b/sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/workflow/ReflectiveWorkflowRouter.scala @@ -19,7 +19,6 @@ package kalix.javasdk.impl.workflow import com.google.protobuf.any.{ Any => ScalaPbAny } import kalix.javasdk.impl.CommandHandler import kalix.javasdk.impl.InvocationContext -import kalix.javasdk.impl.workflow.WorkflowRouter import kalix.javasdk.workflow.CommandContext import kalix.javasdk.workflow.Workflow diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestESSubscriptionAction.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestESSubscriptionAction.java new file mode 100644 index 0000000000..57f382369b --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/action/TestESSubscriptionAction.java @@ -0,0 +1,37 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.action; + +import kalix.javasdk.annotations.Subscribe; +import kalix.javasdk.eventsourcedentity.TestESEvent; +import kalix.javasdk.eventsourcedentity.TestEventSourcedEntity; + +@Subscribe.EventSourcedEntity(value = TestEventSourcedEntity.class, ignoreUnknown = true) +public class TestESSubscriptionAction extends Action { + + public Effect handleEvent2(TestESEvent.Event2 event) { + return effects().reply(event.newName()); + } + + public Effect handleEvent3(TestESEvent.Event3 event) { + return effects().reply(event.b()); + } + + public Effect handleEvent4(TestESEvent.Event4 event) { + return effects().reply(event.anotherString()); + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event1Migration.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event1Migration.java new file mode 100644 index 0000000000..184fc7b56b --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event1Migration.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class Event1Migration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public List supportedClassNames() { + return List.of(OldTestESEvent.OldEvent1.class.getName()); + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event2Migration.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event2Migration.java new file mode 100644 index 0000000000..a858c6d0ec --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event2Migration.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class Event2Migration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public List supportedClassNames() { + return List.of(OldTestESEvent.OldEvent2.class.getName()); + } + + @Override + public JsonNode transform(int fromVersion, JsonNode jsonNode) { + if (fromVersion < 1) { + ObjectNode objectNode = (ObjectNode) jsonNode; + objectNode.set("newName", IntNode.valueOf(321)); + objectNode.remove("i"); + return objectNode; + } else { + return jsonNode; + } + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event4Migration.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event4Migration.java new file mode 100644 index 0000000000..1dec560366 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/Event4Migration.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class Event4Migration extends JsonMigration { + @Override + public int currentVersion() { + return 2; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + if (fromVersion < 2) { + TextNode s = (TextNode) json.get("anotherString"); + return ((ObjectNode) json).set("anotherString", TextNode.valueOf(s.textValue() + "-v2")); + } else { + return json; + } + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/OldTestESEvent.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/OldTestESEvent.java new file mode 100644 index 0000000000..6fc50b3a28 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/OldTestESEvent.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import kalix.javasdk.annotations.TypeName; + +public interface OldTestESEvent { + + record OldEvent1(String s) implements OldTestESEvent { + } + + record OldEvent2(int i) implements OldTestESEvent { + } + + @TypeName("old-event-3") + record OldEvent3(boolean b) implements OldTestESEvent { + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESEvent.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESEvent.java new file mode 100644 index 0000000000..f32a0742bf --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESEvent.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import kalix.javasdk.annotations.Migration; +import kalix.javasdk.annotations.TypeName; + +public interface TestESEvent { + + @Migration(Event1Migration.class) + record Event1(String s) implements TestESEvent { + } + + @Migration(Event2Migration.class) + record Event2(int newName) implements TestESEvent { + } + + @TypeName("old-event-3") + record Event3(boolean b) implements OldTestESEvent { + } + + @Migration(Event4Migration.class) + record Event4(String anotherString) implements OldTestESEvent { + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESState.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESState.java new file mode 100644 index 0000000000..b9c84591f9 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestESState.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +public record TestESState(String s, int i, boolean b, String anotherString) { +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java new file mode 100644 index 0000000000..a8c0836627 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/eventsourcedentity/TestEventSourcedEntity.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.eventsourcedentity; + +import kalix.javasdk.annotations.EventHandler; +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.TypeId; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +@Id("id") +@TypeId("es") +@RequestMapping("/es") +public class TestEventSourcedEntity extends EventSourcedEntity { + + @Override + public TestESState emptyState() { + return new TestESState("", 0, false, ""); + } + + @GetMapping + public Effect get() { + return effects().reply(currentState()); + } + + @EventHandler + public TestESState apply(TestESEvent.Event1 event1) { + return new TestESState(event1.s(), currentState().i(), currentState().b(), currentState().anotherString()); + } + + @EventHandler + public TestESState apply(TestESEvent.Event2 event2) { + return new TestESState(currentState().s(), event2.newName(), currentState().b(), currentState().anotherString()); + } + + @EventHandler + public TestESState apply(TestESEvent.Event3 event3) { + return new TestESState(currentState().s(), currentState().i(), event3.b(), currentState().anotherString()); + } + + @EventHandler + public TestESState apply(TestESEvent.Event4 event4) { + return new TestESState(currentState().s(), currentState().i(), currentState().b(), event4.anotherString()); + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState0.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState0.java new file mode 100644 index 0000000000..23f7f8c90b --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState0.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + +public record TestVEState0(String s, int i) { +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState1.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState1.java new file mode 100644 index 0000000000..fe04816be0 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState1.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + +public record TestVEState1(String s, int i) { +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2.java new file mode 100644 index 0000000000..c0c8eacb27 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + +import kalix.javasdk.annotations.Migration; + +@Migration(TestVEState2Migration.class) +public record TestVEState2(String s, int i, String newValue) { +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2Migration.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2Migration.java new file mode 100644 index 0000000000..ebfc117a2d --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestVEState2Migration.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import kalix.javasdk.JsonMigration; + + +public class TestVEState2Migration extends JsonMigration { + + @Override + public int currentVersion() { + return 1; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + if (fromVersion < 1) { + return ((ObjectNode) json).set("newValue", TextNode.valueOf("newValue")); + } else { + return null; + + } + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java new file mode 100644 index 0000000000..ae99346544 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntity.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.TypeId; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +@Id("id") +@TypeId("ve") +@RequestMapping("/ve") +public class TestValueEntity extends ValueEntity { + + @Override + public TestVEState1 emptyState() { + return new TestVEState1("empty", 1); + } + + @GetMapping + public Effect get() { + return effects().reply(currentState()); + } + +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntityMigration.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntityMigration.java new file mode 100644 index 0000000000..36a15de420 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/valueentity/TestValueEntityMigration.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.valueentity; + +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.TypeId; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +@Id("id") +@TypeId("ve") +@RequestMapping("/ve") +public class TestValueEntityMigration extends ValueEntity { + + @GetMapping + public Effect get() { + return effects().reply(currentState()); + } + +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/javasdk/workflow/TestWorkflowSerialization.java b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/workflow/TestWorkflowSerialization.java new file mode 100644 index 0000000000..b6af7550dd --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/javasdk/workflow/TestWorkflowSerialization.java @@ -0,0 +1,61 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.workflow; + +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.TypeId; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; + +import java.util.concurrent.CompletableFuture; + +@Id("id") +@TypeId("workflow") +@RequestMapping("/workflow") +public class TestWorkflowSerialization extends Workflow { + + public interface Result { + record Failed(String errorMsg) implements Result { + } + + record Succeed() implements Result { + } + } + + + @Override + public WorkflowDef definition() { + var testStep = step("test") + .asyncCall(() -> CompletableFuture.completedFuture(new Result.Succeed())) + .andThen(Result.class, result -> effects().updateState("success").end()); + + return workflow().addStep(testStep); + } + + @GetMapping + public Effect start() { + return effects() + .updateState("empty") + .transitionTo("test") + .thenReply("ok"); + } + + @GetMapping + public Effect get() { + return effects().reply(currentState()); + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeCreatedMigration.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeCreatedMigration.java new file mode 100644 index 0000000000..06b504ac23 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeCreatedMigration.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.spring.testmodels.eventsourcedentity; + +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class EmployeeCreatedMigration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public List supportedClassNames() { + return List.of("old-created"); + } +} diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeEvent.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeEvent.java index 9c69da2998..ab3547bc36 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeEvent.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EmployeeEvent.java @@ -16,11 +16,13 @@ package kalix.spring.testmodels.eventsourcedentity; +import kalix.javasdk.annotations.Migration; import kalix.javasdk.annotations.TypeName; public interface EmployeeEvent { @TypeName("created") + @Migration(EmployeeCreatedMigration.class) final class EmployeeCreated implements EmployeeEvent { public final String firstName; diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EventSourcedEntitiesTestModels.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EventSourcedEntitiesTestModels.java index 01d183bf4a..fe5a7062e4 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EventSourcedEntitiesTestModels.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/eventsourcedentity/EventSourcedEntitiesTestModels.java @@ -16,9 +16,23 @@ package kalix.spring.testmodels.eventsourcedentity; -import kalix.javasdk.annotations.*; +import kalix.javasdk.JsonMigration; +import kalix.javasdk.annotations.Migration; +import kalix.javasdk.annotations.Acl; +import kalix.javasdk.annotations.EventHandler; +import kalix.javasdk.annotations.GenerateId; +import kalix.javasdk.annotations.Id; +import kalix.javasdk.annotations.JWT; +import kalix.javasdk.annotations.TypeId; import kalix.javasdk.eventsourcedentity.EventSourcedEntity; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; + +import java.util.List; public class EventSourcedEntitiesTestModels { @@ -46,6 +60,26 @@ public Employee onEvent(EmployeeEvent event) { @RequestMapping("/eventsourced/{id}") public static class CounterEventSourcedEntity extends EventSourcedEntity { + @Migration(EventMigration.class) + public record Event(String s) { + } + + public static class EventMigration extends JsonMigration { + + public EventMigration() { + } + + @Override + public int currentVersion() { + return 1; + } + + @Override + public List supportedClassNames() { + return List.of("additional-mapping"); + } + } + @GetMapping("/int/{number}") public Integer getInteger(@PathVariable Integer number) { return number; @@ -57,7 +91,7 @@ public Integer changeInteger(@PathVariable Integer number) { } @EventHandler - public Integer receiveStringEvent(String event) { + public Integer receiveStringEvent(Event event) { return 0; } diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/subscriptions/PubSubTestModels.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/subscriptions/PubSubTestModels.java index b5f2d1b4d3..d283aafe7b 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/subscriptions/PubSubTestModels.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/subscriptions/PubSubTestModels.java @@ -19,25 +19,25 @@ import kalix.javasdk.action.Action; import kalix.javasdk.annotations.Acl; import kalix.javasdk.annotations.Publish; +import kalix.javasdk.annotations.Query; import kalix.javasdk.annotations.Subscribe; +import kalix.javasdk.annotations.Table; +import kalix.javasdk.view.View; import kalix.spring.testmodels.Done; import kalix.spring.testmodels.Message; import kalix.spring.testmodels.Message2; +import kalix.spring.testmodels.eventsourcedentity.Employee; import kalix.spring.testmodels.eventsourcedentity.EmployeeEvent.EmployeeCreated; +import kalix.spring.testmodels.eventsourcedentity.EmployeeEvent.EmployeeEmailUpdated; +import kalix.spring.testmodels.eventsourcedentity.EventSourcedEntitiesTestModels.CounterEventSourcedEntity; +import kalix.spring.testmodels.eventsourcedentity.EventSourcedEntitiesTestModels.EmployeeEntity; import kalix.spring.testmodels.valueentity.AssignedCounter; import kalix.spring.testmodels.valueentity.Counter; import kalix.spring.testmodels.valueentity.CounterState; -import kalix.javasdk.view.View; -import kalix.javasdk.annotations.Query; -import kalix.javasdk.annotations.Table; -import kalix.spring.testmodels.eventsourcedentity.Employee; -import kalix.spring.testmodels.eventsourcedentity.EmployeeEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; -import kalix.spring.testmodels.eventsourcedentity.EventSourcedEntitiesTestModels.CounterEventSourcedEntity; -import kalix.spring.testmodels.eventsourcedentity.EventSourcedEntitiesTestModels.EmployeeEntity; public class PubSubTestModels {//TODO shall we remove this class and move things to ActionTestModels and ViewTestModels @@ -96,6 +96,18 @@ public Action.Effect onDelete() { } } + @Subscribe.EventSourcedEntity(EmployeeEntity.class) + public static class SubscribeToEventSourcedEmployee extends Action { + + public Effect methodOne(EmployeeCreated message) { + return effects().reply(message); + } + + public Effect methodTwo(EmployeeEmailUpdated message) { + return effects().reply(message); + } + } + public static class SubscribeToEventSourcedEntityAction extends Action { @Subscribe.EventSourcedEntity(CounterEventSourcedEntity.class) @@ -722,7 +734,7 @@ public UpdateEffect onCreate(EmployeeCreated evt) { .updateState(new Employee(evt.firstName, evt.lastName, evt.email)); } - public UpdateEffect onEmailUpdate(EmployeeEvent.EmployeeEmailUpdated eeu) { + public UpdateEffect onEmailUpdate(EmployeeEmailUpdated eeu) { var employee = viewState(); return effects().updateState(new Employee(employee.firstName, employee.lastName, eeu.email)); } @@ -743,7 +755,7 @@ public Effect transform(EmployeeCreated created) { return effects().reply(created.toString()); } - public Effect transform(EmployeeEvent.EmployeeEmailUpdated emailUpdated) { + public Effect transform(EmployeeEmailUpdated emailUpdated) { return effects().reply(emailUpdated.toString()); } @@ -756,7 +768,7 @@ public Effect transform(EmployeeCreated created) { return effects().reply(created.toString()); } - public Effect transform(EmployeeEvent.EmployeeEmailUpdated emailUpdated) { + public Effect transform(EmployeeEmailUpdated emailUpdated) { return effects().reply(emailUpdated.toString()); } } @@ -770,7 +782,7 @@ public UpdateEffect onCreate(EmployeeCreated evt) { .updateState(new Employee(evt.firstName, evt.lastName, evt.email)); } - public UpdateEffect onEmailUpdate(EmployeeEvent.EmployeeEmailUpdated eeu) { + public UpdateEffect onEmailUpdate(EmployeeEmailUpdated eeu) { var employee = viewState(); return effects().updateState(new Employee(employee.firstName, employee.lastName, eeu.email)); } diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterState.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterState.java index f558e211cb..104d10d075 100644 --- a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterState.java +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterState.java @@ -16,6 +16,9 @@ package kalix.spring.testmodels.valueentity; +import kalix.javasdk.annotations.Migration; + +@Migration(CounterStateMigration.class) public class CounterState { public final String id; diff --git a/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterStateMigration.java b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterStateMigration.java new file mode 100644 index 0000000000..de67aadbd9 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/java/kalix/spring/testmodels/valueentity/CounterStateMigration.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.spring.testmodels.valueentity; + +import kalix.javasdk.JsonMigration; + +import java.util.List; + +public class CounterStateMigration extends JsonMigration { + @Override + public int currentVersion() { + return 1; + } + + @Override + public List supportedClassNames() { + return List.of("counter-state"); + } +} diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionDescriptorFactorySpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionDescriptorFactorySpec.scala index ecfc8bfea2..928188ba94 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionDescriptorFactorySpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionDescriptorFactorySpec.scala @@ -84,6 +84,7 @@ import kalix.spring.testmodels.subscriptions.PubSubTestModels.RestWithPublishToT import kalix.spring.testmodels.subscriptions.PubSubTestModels.StreamSubscriptionWithPublishToTopicAction import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeOnlyOneToEventSourcedEntityActionTypeLevel import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeToBytesFromTopicAction +import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeToEventSourcedEmployee import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeToEventSourcedEntityAction import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeToTopicAction import kalix.spring.testmodels.subscriptions.PubSubTestModels.SubscribeToTopicCombinedAction @@ -98,6 +99,7 @@ import kalix.spring.testmodels.subscriptions.PubSubTestModels.TypeLevelESWithPub import kalix.spring.testmodels.subscriptions.PubSubTestModels.TypeLevelSubscribeToValueEntityWithRestAction import kalix.spring.testmodels.subscriptions.PubSubTestModels.TypeLevelTopicSubscriptionWithPublishToTopicAction import kalix.spring.testmodels.subscriptions.PubSubTestModels.VEWithPublishToTopicAction +import kalix.spring.testmodels.valueentity.CounterState import org.scalatest.wordspec.AnyWordSpec class ActionDescriptorFactorySpec extends AnyWordSpec with ComponentDescriptorSuite { @@ -319,6 +321,25 @@ class ActionDescriptorFactorySpec extends AnyWordSpec with ComponentDescriptorSu } } + "generate mapping with Event Sourced Subscription annotations" in { + assertDescriptor[SubscribeToEventSourcedEmployee] { desc => + + val onUpdateMethodDescriptor = findMethodByName(desc, "KalixSyntheticMethodOnESEmployee") + onUpdateMethodDescriptor.isServerStreaming shouldBe false + onUpdateMethodDescriptor.isClientStreaming shouldBe false + + val onUpdateMethod = desc.commandHandlers("KalixSyntheticMethodOnESEmployee") + onUpdateMethod.requestMessageDescriptor.getFullName shouldBe JavaPbAny.getDescriptor.getFullName + + val eventing = findKalixServiceOptions(desc).getEventing.getIn + eventing.getEventSourcedEntity shouldBe "employee" + + // in case of @Migration, it should map 2 type urls to the same method + onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + contain only ("json.kalix.io/created" -> "methodOne", "json.kalix.io/old-created" -> "methodOne", "json.kalix.io/emailUpdated" -> "methodTwo") + } + } + "generate combined mapping with Event Sourced Entity Subscription annotation" in { assertDescriptor[SubscribeToEventSourcedEntityAction] { desc => val methodDescriptor = findMethodByName(desc, "KalixSyntheticMethodOnESCounterentity") @@ -355,9 +376,14 @@ class ActionDescriptorFactorySpec extends AnyWordSpec with ComponentDescriptorSu val eventing = findKalixMethodOptions(onUpdateMethodDescriptor).getEventing.getIn eventing.getValueEntity shouldBe "ve-counter" - // should have a default extractor for any payload - val javaMethod = onUpdateMethod.methodInvokers.values.head - javaMethod.parameterExtractors.length shouldBe 1 + // in case of @Migration, it should map 2 type urls to the same method + onUpdateMethod.methodInvokers should have size 2 + onUpdateMethod.methodInvokers.values.map { javaMethod => + javaMethod.parameterExtractors.length shouldBe 1 + } + onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + contain only ("json.kalix.io/counter-state" -> "onUpdate", "json.kalix.io/" + classOf[ + CounterState].getName -> "onUpdate") } } @@ -374,9 +400,14 @@ class ActionDescriptorFactorySpec extends AnyWordSpec with ComponentDescriptorSu val eventing = findKalixServiceOptions(desc).getEventing.getIn eventing.getValueEntity shouldBe "ve-counter" - // should have a default extractor for any payload - val javaMethod = onUpdateMethod.methodInvokers.values.head - javaMethod.parameterExtractors.length shouldBe 1 + // in case of @Migration, it should map 2 type urls to the same method + onUpdateMethod.methodInvokers should have size 2 + onUpdateMethod.methodInvokers.values.map { javaMethod => + javaMethod.parameterExtractors.length shouldBe 1 + } + onUpdateMethod.methodInvokers.view.mapValues(_.method.getName).toMap should + contain only ("json.kalix.io/counter-state" -> "onUpdate", "json.kalix.io/" + classOf[ + CounterState].getName -> "onUpdate") } } diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala new file mode 100644 index 0000000000..c37b121ec0 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ActionsImplSpec.scala @@ -0,0 +1,107 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.impl + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.scaladsl.adapter._ +import com.google.protobuf.any.Any.toJavaProto +import com.google.protobuf.any.{ Any => ScalaPbAny } +import kalix.javasdk.JsonSupport +import kalix.javasdk.JsonSupport.decodeJson +import kalix.javasdk.action.ActionCreationContext +import kalix.javasdk.action.ReflectiveActionProvider +import kalix.javasdk.action.TestESSubscriptionAction +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent1 +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent2 +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent3 +import kalix.javasdk.eventsourcedentity.TestESEvent.Event4 +import kalix.javasdk.impl.action.ActionService +import kalix.javasdk.impl.action.ActionsImpl +import kalix.protocol.action.ActionCommand +import kalix.protocol.action.ActionResponse +import kalix.protocol.action.Actions +import kalix.protocol.component.Reply +import org.scalatest.BeforeAndAfterAll +import org.scalatest.Inside +import org.scalatest.OptionValues +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +class ActionsImplSpec + extends ScalaTestWithActorTestKit + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with Inside + with OptionValues + with ScalaFutures { + + private val classicSystem = system.toClassic + + def create(provider: ReflectiveActionProvider[_], messageCodec: MessageCodec): Actions = { + val actionFactory: ActionFactory = ctx => provider.newRouter(ctx) + val service = new ActionService(actionFactory, provider.serviceDescriptor(), Array(), messageCodec, None) + + val services = Map(provider.serviceDescriptor().getFullName -> service) + + new ActionsImpl(classicSystem, services, new AbstractContext(classicSystem) {}) + } + + "The action service" should { + "check event migration for subscription" in { + val jsonMessageCodec = new JsonMessageCodec() + val actionProvider = ReflectiveActionProvider.of( + classOf[TestESSubscriptionAction], + jsonMessageCodec, + (_: ActionCreationContext) => new TestESSubscriptionAction) + + val service = create(actionProvider, jsonMessageCodec) + val serviceName = actionProvider.serviceDescriptor().getFullName + + val event1 = jsonMessageCodec.encodeScala(new OldEvent1("state")) + val reply1 = service.handleUnary(toActionCommand(serviceName, event1)).futureValue + //ignore event1 + reply1.response shouldBe ActionResponse.Response.Empty + + val event2 = new JsonMessageCodec().encodeScala(new OldEvent2(123)) + val reply2 = service.handleUnary(toActionCommand(serviceName, event2)).futureValue + inside(reply2.response) { case ActionResponse.Response.Reply(Reply(Some(payload), _, _)) => + decodeJson(classOf[Integer], toJavaProto(payload)) shouldBe 321 //migration reverts numbers + } + + val event3 = new JsonMessageCodec().encodeScala(new OldEvent3(true)) + val reply3 = service.handleUnary(toActionCommand(serviceName, event3)).futureValue + inside(reply3.response) { case ActionResponse.Response.Reply(Reply(Some(payload), _, _)) => + decodeJson(classOf[Boolean], toJavaProto(payload)) shouldBe true + } + + val event4OldVersionNumber = JsonSupport.encodeJson(new Event4("value"), classOf[Event4].getName + "#1") + val event4 = + new JsonMessageCodec().encodeScala(event4OldVersionNumber) + val reply4 = service.handleUnary(toActionCommand(serviceName, event4)).futureValue + inside(reply4.response) { case ActionResponse.Response.Reply(Reply(Some(payload), _, _)) => + decodeJson(classOf[String], toJavaProto(payload)) shouldBe "value-v2" //-v2 from migration + } + } + } + + private def toActionCommand(serviceName: String, event1: ScalaPbAny) = { + ActionCommand(serviceName, "KalixSyntheticMethodOnESEs", Some(event1)) + } + +} diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/DescriptorPrinter.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/DescriptorPrinter.scala index 638b736892..b197de3c17 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/DescriptorPrinter.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/DescriptorPrinter.scala @@ -17,9 +17,11 @@ package kalix.javasdk.impl import kalix.spring.testmodels.subscriptions.PubSubTestModels.MissingTopicForTopicSubscription - import scala.reflect.ClassTag +import kalix.javasdk.eventsourcedentity.TestEventSourcedEntity +import kalix.javasdk.valueentity.TestValueEntity + /** * Utility class to quickly print descriptors */ @@ -29,7 +31,7 @@ object DescriptorPrinter { ComponentDescriptor.descriptorFor(ev.runtimeClass, new JsonMessageCodec) def main(args: Array[String]) = { - val descriptor = descriptorFor[MissingTopicForTopicSubscription] + val descriptor = descriptorFor[TestEventSourcedEntity] println(ProtoDescriptorRenderer.toString(descriptor.fileDescriptor)) } } diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala new file mode 100644 index 0000000000..875caf0771 --- /dev/null +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/EvenSourcedEntitiesImplSpec.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.impl + +import com.google.protobuf.ByteString._ +import com.google.protobuf.any.{ Any => ScalaPbAny } +import kalix.javasdk.JsonSupport +import kalix.javasdk.eventsourced.ReflectiveEventSourcedEntityProvider +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent1 +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent2 +import kalix.javasdk.eventsourcedentity.OldTestESEvent.OldEvent3 +import kalix.javasdk.eventsourcedentity.TestESEvent +import kalix.javasdk.eventsourcedentity.TestESEvent.Event4 +import kalix.javasdk.eventsourcedentity.TestESState +import kalix.javasdk.eventsourcedentity.TestEventSourcedEntity +import kalix.javasdk.impl.eventsourcedentity.TestEventSourcedService +import kalix.testkit.TestProtocol +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class EvenSourcedEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { + + import kalix.testkit.eventsourcedentity.EventSourcedMessages._ + + "EventSourcedEntityImpl" should { + + "recover es state based on old events version" in { + val entityId = "1" + val jsonMessageCodec = new JsonMessageCodec() + val service = new TestEventSourcedService( + ReflectiveEventSourcedEntityProvider + .of[TestESState, TestESEvent, TestEventSourcedEntity]( + classOf[TestEventSourcedEntity], + new JsonMessageCodec(), + _ => new TestEventSourcedEntity())) + val protocol = TestProtocol(service.port) + val entity = protocol.eventSourced.connect() + + entity.send(init(classOf[TestEventSourcedEntity].getName, entityId)) + entity.send(event(1, jsonMessageCodec.encodeJava(new OldEvent1("state")))) + entity.send(event(2, jsonMessageCodec.encodeJava(new OldEvent2(123)))) + entity.send(event(3, jsonMessageCodec.encodeJava(new OldEvent3(true)))) + entity.send( + event(4, JsonSupport.encodeJson(new Event4("value"), classOf[Event4].getName + "#1")) + ) //current version is 2 + + entity.send(command(1, entityId, "Get", emptySyntheticRequest("Get"))) + //321 because of Event2Migration + entity.expect(reply(1, jsonMessageCodec.encodeJava(new TestESState("state", 321, true, "value-v2")))) + protocol.terminate() + service.terminate() + } + } + + private def emptySyntheticRequest(methodName: String) = { + ScalaPbAny(s"type.googleapis.com/kalix.javasdk.eventsourcedentity.${methodName}KalixSyntheticRequest", EMPTY) + } +} diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/JsonMessageCodecSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/JsonMessageCodecSpec.scala index 6c2d27baf0..7a2c3800a3 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/JsonMessageCodecSpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/JsonMessageCodecSpec.scala @@ -16,14 +16,24 @@ package kalix.javasdk.impl +import java.util +import java.util.Optional + import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.ObjectNode import com.google.protobuf.any.{ Any => ScalaPbAny } import com.google.protobuf.{ Any => JavaPbAny } +import kalix.javasdk.JsonMigration import kalix.javasdk.JsonSupport +import kalix.javasdk.annotations.Migration import kalix.javasdk.annotations.TypeName import kalix.javasdk.impl.JsonMessageCodecSpec.Cat import kalix.javasdk.impl.JsonMessageCodecSpec.Dog import kalix.javasdk.impl.JsonMessageCodecSpec.SimpleClass +import kalix.javasdk.impl.JsonMessageCodecSpec.SimpleClassUpdated +import kalix.javasdk.impl.JsonMessageCodecSpec.SimpleClassUpdatedMigration import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -40,6 +50,25 @@ object JsonMessageCodecSpec { @JsonCreator case class SimpleClass(str: String, in: Int) + class SimpleClassUpdatedMigration extends JsonMigration { + override def currentVersion(): Int = 1 + override def transform(fromVersion: Int, jsonNode: JsonNode): JsonNode = { + if (fromVersion == 0) { + jsonNode.asInstanceOf[ObjectNode].set("newField", IntNode.valueOf(1)) + } else { + jsonNode + } + } + + override def supportedClassNames(): util.List[String] = { + util.List.of(classOf[SimpleClass].getName) + } + } + + @JsonCreator + @Migration(classOf[SimpleClassUpdatedMigration]) + case class SimpleClassUpdated(str: String, in: Int, newField: Int) + object AnnotatedWithTypeName { sealed trait Animal @@ -49,6 +78,9 @@ object JsonMessageCodecSpec { @TypeName("elephant") final case class Elephant(name: String, age: Int) extends Animal + + @TypeName("elephant") + final case class IndianElephant(name: String, age: Int) extends Animal } object AnnotatedWithEmptyTypeName { @@ -76,6 +108,19 @@ class JsonMessageCodecSpec extends AnyWordSpec with Matchers { encoded.getTypeUrl shouldBe jsonTypeUrlWith("kalix.javasdk.impl.JsonMessageCodecSpec$SimpleClass") } + "add version number to typeUrl" in { + //new codec to avoid collision with SimpleClass + val encoded = new JsonMessageCodec().encodeJava(SimpleClassUpdated("abc", 10, 123)) + encoded.getTypeUrl shouldBe jsonTypeUrlWith("kalix.javasdk.impl.JsonMessageCodecSpec$SimpleClassUpdated#1") + } + + "decode with new schema version" in { + val encoded = messageCodec.encodeJava(SimpleClass("abc", 10)) + val decoded = + JsonSupport.decodeJson(classOf[SimpleClassUpdated], encoded, Optional.of(new SimpleClassUpdatedMigration)) + decoded shouldBe SimpleClassUpdated("abc", 10, 1) + } + "not re-encode (wrap) to JavaPbAny" in { val encoded: JavaPbAny = messageCodec.encodeJava(SimpleClass("abc", 10)) val reEncoded = messageCodec.encodeJava(encoded) @@ -126,10 +171,35 @@ class JsonMessageCodecSpec extends AnyWordSpec with Matchers { decoded shouldBe value } + "decode message with new version" in { + //old schema + val value = SimpleClass("abc", 10) + val encoded = new JsonMessageCodec().encodeScala(value) + + //new schema, simulating restart + val messageCodecAfterRestart = new JsonMessageCodec() + messageCodecAfterRestart.typeUrlFor(classOf[SimpleClassUpdated]) + val decoded = new StrictJsonMessageCodec(messageCodecAfterRestart).decodeMessage(encoded) + + decoded shouldBe SimpleClassUpdated(value.str, value.in, 1) + } + { import JsonMessageCodecSpec.AnnotatedWithTypeName.Elephant + import JsonMessageCodecSpec.AnnotatedWithTypeName.IndianElephant import JsonMessageCodecSpec.AnnotatedWithTypeName.Lion + "fail when using the same TypeName" in { + val encodedElephant = messageCodec.encodeJava(Elephant("Dumbo", 1)) + encodedElephant.getTypeUrl shouldBe jsonTypeUrlWith("elephant") + + val exception = intercept[IllegalStateException] { + messageCodec.encodeJava(IndianElephant("Dumbo", 1)) + } + + exception.getMessage shouldBe "Collision with existing existing mapping class kalix.javasdk.impl.JsonMessageCodecSpec$AnnotatedWithTypeName$Elephant -> elephant. The same type name can't be used for other class class kalix.javasdk.impl.JsonMessageCodecSpec$AnnotatedWithTypeName$IndianElephant" + } + "use TypeName if available (java)" in { val encodedLion = messageCodec.encodeJava(Lion("Simba")) diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala new file mode 100644 index 0000000000..01a406a8be --- /dev/null +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/ValueEntitiesImplSpec.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.impl + +import com.google.protobuf.ByteString._ +import com.google.protobuf.any.{ Any => ScalaPbAny } +import kalix.javasdk.impl.valueentity.TestValueService +import kalix.javasdk.valueentity.ReflectiveValueEntityProvider +import kalix.javasdk.valueentity.TestVEState0 +import kalix.javasdk.valueentity.TestVEState1 +import kalix.javasdk.valueentity.TestVEState2 +import kalix.javasdk.valueentity.TestValueEntity +import kalix.javasdk.valueentity.TestValueEntityMigration +import kalix.testkit.TestProtocol +import kalix.testkit.valueentity.ValueEntityMessages +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ValueEntitiesImplSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll { + import ValueEntityMessages._ + + "EntityImpl" should { + + "recover entity when state name has been changed" in { + val entityId = "1" + val jsonMessageCodec = new JsonMessageCodec() + val service = new TestValueService( + ReflectiveValueEntityProvider + .of[TestVEState1, TestValueEntity](classOf[TestValueEntity], jsonMessageCodec, _ => new TestValueEntity())) + val protocol = TestProtocol(service.port) + val entity = protocol.valueEntity.connect() + //old state + entity.send( + init( + classOf[TestValueEntity].getName, + entityId, + jsonMessageCodec.encodeJava(new TestVEState0("old-state", 12)))) + + entity.send(command(1, entityId, "Get", emptySyntheticRequest("Get"))) + //new state + entity.expect(reply(1, jsonMessageCodec.encodeJava(new TestVEState1("old-state", 12)))) + protocol.terminate() + service.terminate() + } + + "recover entity when state change has non-backward compatible change" in { + val entityId = "1" + val jsonMessageCodec = new JsonMessageCodec() + + val service: TestValueService = new TestValueService( + ReflectiveValueEntityProvider + .of[TestVEState2, TestValueEntityMigration]( + classOf[TestValueEntityMigration], + jsonMessageCodec, + _ => new TestValueEntityMigration())) + val protocol: TestProtocol = TestProtocol(service.port) + val entity = protocol.valueEntity.connect() + //old state + entity.send( + init( + classOf[TestValueEntityMigration].getName, + entityId, + jsonMessageCodec.encodeJava(new TestVEState0("old-state", 12)))) + + entity.send(command(1, entityId, "Get", emptySyntheticRequest("Get"))) + //migrated state + entity.expect(reply(1, jsonMessageCodec.encodeJava(new TestVEState2("old-state", 12, "newValue")))) + + protocol.terminate() + service.terminate() + } + } + + private def emptySyntheticRequest(methodName: String) = { + ScalaPbAny(s"type.googleapis.com/kalix.javasdk.valueentity.${methodName}KalixSyntheticRequest", EMPTY) + } +} diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/WorkflowImplSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/WorkflowImplSpec.scala new file mode 100644 index 0000000000..ecb3d5339e --- /dev/null +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/WorkflowImplSpec.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2021 Lightbend Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kalix.javasdk.impl + +import com.google.protobuf.ByteString._ +import com.google.protobuf.any.{ Any => ScalaPbAny } +import kalix.javasdk.impl.workflow.TestWorkflow +import kalix.javasdk.workflow.ReflectiveWorkflowProvider +import kalix.javasdk.workflow.TestWorkflowSerialization +import kalix.javasdk.workflow.TestWorkflowSerialization.Result +import kalix.testkit.TestProtocol +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Millis +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpec + +class WorkflowImplSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually { + import kalix.testkit.workflow.WorkflowMessages._ + + implicit private val patience: PatienceConfig = + PatienceConfig(Span(10, Seconds), Span(100, Millis)) + + "Workflow" should { + + "deserialize response" in { + val entityId = "1" + val jsonMessageCodec = new JsonMessageCodec() + val service = new TestWorkflow( + ReflectiveWorkflowProvider + .of[String, TestWorkflowSerialization]( + classOf[TestWorkflowSerialization], + jsonMessageCodec, + _ => new TestWorkflowSerialization())) + val protocol = TestProtocol(service.port) + val workflow = protocol.workflow.connect() + + workflow.send(init(classOf[TestWorkflowSerialization].getName, entityId)) + + workflow.expect(config()) + + val emptyState = jsonMessageCodec.encodeScala("empty") + val stepResult = jsonMessageCodec.encodeScala(new Result.Succeed()) + + workflow.send(command(1, entityId, "Start", emptySyntheticRequest("Start"))) + workflow.expect(reply(1, jsonMessageCodec.encodeScala("ok"), emptyState, stepTransition("test"))) + + workflow.send(executeStep(2, "test", emptyState)) + workflow.expect(stepExecuted(2, "test", stepResult)) + + workflow.send(getNextStep(3, "test", stepResult)) + workflow.expect(end(3, jsonMessageCodec.encodeScala("success"))) + + workflow.send(command(1, entityId, "Get", emptySyntheticRequest("Get"))) + workflow.expect(reply(1, jsonMessageCodec.encodeScala("success"))) + + protocol.terminate() + service.terminate() + } + } + + private def emptySyntheticRequest(methodName: String) = { + ScalaPbAny(s"type.googleapis.com/kalix.javasdk.workflow.${methodName}KalixSyntheticRequest", EMPTY) + } +} diff --git a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/eventsourceentity/EventSourcedHandlersExtractorSpec.scala b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/eventsourceentity/EventSourcedHandlersExtractorSpec.scala index 264ee73f54..6a6f071dfa 100644 --- a/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/eventsourceentity/EventSourcedHandlersExtractorSpec.scala +++ b/sdk/java-sdk-spring/src/test/scala/kalix/javasdk/impl/eventsourceentity/EventSourcedHandlersExtractorSpec.scala @@ -16,6 +16,7 @@ package kalix.javasdk.impl.eventsourceentity +import kalix.javasdk.JsonSupport import kalix.javasdk.impl.JsonMessageCodec import kalix.javasdk.impl.eventsourcedentity.EventSourcedHandlersExtractor import kalix.spring.testmodels.eventsourcedentity.EmployeeEvent.EmployeeEmailUpdated @@ -30,18 +31,24 @@ class EventSourcedHandlersExtractorSpec extends AnyWordSpec with Matchers { private final val messageCodec = new JsonMessageCodec private final val intTypeUrl = messageCodec.typeUrlFor(classOf[Integer]) - private final val stringTypeUrl = messageCodec.typeUrlFor(classOf[String]) + private final val eventTypeUrl = messageCodec.typeUrlFor(classOf[CounterEventSourcedEntity.Event]) + private final val additionalMappingTypeUrl = JsonSupport.KALIX_JSON + "additional-mapping" "EventSourcedHandlersExtractor" should { "extract public well-annotated handlers keyed by event type received as unique parameter" in { val result = EventSourcedHandlersExtractor.handlersFrom(classOf[CounterEventSourcedEntity], messageCodec) - result.handlers.size shouldBe 2 + result.handlers.size shouldBe 3 result.handlers.get(intTypeUrl).map { m => m.method.getName shouldBe "receivedIntegerEvent" m.method.getParameterCount shouldBe 1 } - result.handlers.get(stringTypeUrl).map { m => + result.handlers.get(eventTypeUrl).map { m => + m.method.getName shouldBe "receiveStringEvent" + m.method.getParameterCount shouldBe 1 + } + //additional type pointing to the same handler to support events schema evolution + result.handlers.get(additionalMappingTypeUrl).map { m => m.method.getName shouldBe "receiveStringEvent" m.method.getParameterCount shouldBe 1 }