Skip to content

Commit

Permalink
feat: JSON schema evolution support (#1760)
Browse files Browse the repository at this point in the history
* feat: JSON schema evolution support

* fmt

* documentation + moving @migration to the rest of annotations

* fixing event handlers with version

* handling migration for ES subscriptions

* fixing tests

* minor improvements

* fixing test

* ranaming to JsonMigration

* removing unused classes

* replacing split to substring

* JsonProperties removed

* workflow serialization integration test

* fmt
  • Loading branch information
aludwiko authored Aug 21, 2023
1 parent 2886e7c commit 00297b0
Show file tree
Hide file tree
Showing 53 changed files with 1,884 additions and 111 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ lazy val javaSdkProtobuf = project
// We need to generate the java files for things like entity_key.proto so that downstream libraries can use them
// without needing to generate them themselves
Compile / PB.targets += PB.gens.java -> crossTarget.value / "akka-grpc" / "main",
Test / javacOptions ++= Seq("-parameters"), // for Jackson
Test / akkaGrpcGeneratedSources := Seq(AkkaGrpc.Client),
Test / PB.protoSources ++= (Compile / PB.protoSources).value,
Test / PB.targets += PB.gens.java -> crossTarget.value / "akka-grpc" / "test")
Expand Down Expand Up @@ -126,7 +127,7 @@ lazy val javaSdkProtobufTestKit = project

lazy val javaSdkSpring = project
.in(file("sdk/java-sdk-spring"))
.dependsOn(javaSdkProtobuf)
.dependsOn(javaSdkProtobuf % "compile->compile;test->test")
.dependsOn(devTools % IntegrationTest)
.dependsOn(javaSdkProtobufTestKit % IntegrationTest)
.enablePlugins(AkkaGrpcPlugin, BuildInfoPlugin, PublishSonatype, IntegrationTests)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public Wallet deposit(int amount) {

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
@JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class),
@JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class)})
@JsonSubTypes.Type(value = WithdrawResult.WithdrawSucceed.class, name = "withdraw-succeed"),
@JsonSubTypes.Type(value = WithdrawResult.WithdrawFailed.class, name = "withdraw-failed")})
public sealed interface WithdrawResult {
record WithdrawFailed(String errorMsg) implements WithdrawResult {
}
Expand All @@ -43,8 +43,8 @@ record WithdrawSucceed() implements WithdrawResult {

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME)
@JsonSubTypes({
@JsonSubTypes.Type(value = DepositResult.DepositSucceed.class),
@JsonSubTypes.Type(value = DepositResult.DepositFailed.class)})
@JsonSubTypes.Type(value = DepositResult.DepositSucceed.class, name = "deposit-succeed"),
@JsonSubTypes.Type(value = DepositResult.DepositFailed.class, name = "deposit-failed")})
public sealed interface DepositResult {
record DepositFailed(String errorMsg) implements DepositResult {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk;

import com.fasterxml.jackson.databind.JsonNode;

import java.util.List;

/**
* Allows to specify dedicated strategy for JSON schema evolution.
* <p>
* It is used when deserializing data of older version than the
* {@link JsonMigration#currentVersion}. You implement the transformation of the
* JSON structure in the {@link JsonMigration#transform} method. If you have changed the
* class name you should add it to {@link JsonMigration#supportedClassNames}.
*/
public abstract class JsonMigration {

/**
* Define current version, that is, the value used when serializing new data. The first version, when no
* migration was used, is always 0.
*/
public abstract int currentVersion();

/**
* Define the supported forward version this migration can read (must be greater or equal than `currentVersion`).
* If this value is different from {@link JsonMigration#currentVersion} a {@link JsonMigration#transform} will be used to downcast
* the received payload to the current schema.
*/
public int supportedForwardVersion() {
return currentVersion();
}

/**
* Implement the transformation of the incoming JSON structure to the current
* JSON structure. The `JsonNode` is mutable so you can add and remove fields,
* or change values. Note that you have to cast to specific sub-classes such
* as `ObjectNode` and `ArrayNode` to get access to mutators.
*
* @param fromVersion the version of the old data
* @param json the incoming JSON data
*/
public JsonNode transform(int fromVersion, JsonNode json) {
return json;
}

/**
* Override this method if you have changed the class name. Return
* all old class names.
*/
public List<String> supportedClassNames() {
return List.of();
}
}
89 changes: 70 additions & 19 deletions sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/JsonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@
package kalix.javasdk;

import akka.Done;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import kalix.javasdk.impl.ByteStringEncoding;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.*;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -130,11 +132,27 @@ public static <T> Any encodeJson(T value, String jsonType) {
* the JSON string as bytes as value and a type URL starting with "json.kalix.io/".
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* proper Jackson annotations for deserialization.
* @param any The protobuf Any object to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*/
public static <T> T decodeJson(Class<T> valueClass, Any any) {
return decodeJson(valueClass, any, Optional.empty());
}

/**
* Decode the given protobuf Any object to an instance of T using Jackson. The object must have
* the JSON string as bytes as value and a type URL starting with "json.kalix.io/".
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* @param any The protobuf Any object to deserialize.
* @param jacksonMigration The optional @{@link JsonMigration} implementation used for deserialization.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*/
public static <T> T decodeJson(Class<T> valueClass, Any any, Optional<? extends JsonMigration> jacksonMigration) {
if (!any.getTypeUrl().startsWith(KALIX_JSON)) {
throw new IllegalArgumentException(
"Protobuf bytes with type url ["
Expand All @@ -145,7 +163,24 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
} else {
try {
ByteString decodedBytes = ByteStringEncoding.decodePrimitiveBytes(any.getValue());
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
if (jacksonMigration.isPresent()) {
int fromVersion = parseVersion(any.getTypeUrl());
JsonMigration migration = jacksonMigration.get();
int currentVersion = migration.currentVersion();
int supportedForwardVersion = migration.supportedForwardVersion();
if (fromVersion < currentVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else if (fromVersion == currentVersion) {
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
} else if (fromVersion <= supportedForwardVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else {
throw new IllegalStateException("Migration version " + supportedForwardVersion + " is " +
"behind version " + fromVersion + " of deserialized type [" + valueClass.getName() + "]");
}
} else {
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
}
} catch (IOException e) {
throw new IllegalArgumentException(
"JSON with type url ["
Expand All @@ -158,14 +193,30 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
}
}

private static <T> T migrate(Class<T> valueClass, ByteString decodedBytes, int fromVersion, JsonMigration jsonMigration) throws IOException {
JsonNode jsonNode = objectMapper.readTree(decodedBytes.toByteArray());
JsonNode newJsonNode = jsonMigration.transform(fromVersion, jsonNode);
return objectMapper.treeToValue(newJsonNode, valueClass);
}

private static int parseVersion(String typeUrl) {
int versionSeparatorIndex = typeUrl.lastIndexOf("#");
if (versionSeparatorIndex > 0) {
String maybeVersion = typeUrl.substring(versionSeparatorIndex + 1);
return Integer.parseInt(maybeVersion);
} else {
return 0;
}
}

public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, Any any) {
if (!any.getTypeUrl().startsWith(KALIX_JSON)) {
throw new IllegalArgumentException(
"Protobuf bytes with type url ["
+ any.getTypeUrl()
+ "] cannot be decoded as JSON, must start with ["
+ KALIX_JSON
+ "]");
"Protobuf bytes with type url ["
+ any.getTypeUrl()
+ "] cannot be decoded as JSON, must start with ["
+ KALIX_JSON
+ "]");
} else {
try {
ByteString decodedBytes = ByteStringEncoding.decodePrimitiveBytes(any.getValue());
Expand Down Expand Up @@ -212,9 +263,9 @@ public void serialize(Done value, JsonGenerator gen, SerializerProvider serializ
class DoneDeserializer extends JsonDeserializer<Done> {

@Override
public Done deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException {
if (p.currentToken() == JsonToken.START_OBJECT && p.nextToken() == JsonToken.END_OBJECT){
return Done.getInstance();
public Done deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.currentToken() == JsonToken.START_OBJECT && p.nextToken() == JsonToken.END_OBJECT) {
return Done.getInstance();
} else {
throw JsonMappingException.from(ctxt, "Cannot deserialize Done class, expecting empty object '{}'");
}
Expand Down
50 changes: 50 additions & 0 deletions sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.Optional;


public class DummyClass {
public String stringValue;
public int intValue;
public Optional<String> optionalStringValue;

@JsonCreator
public DummyClass(String stringValue, int intValue, Optional<String> optionalStringValue) {
this.stringValue = stringValue;
this.intValue = intValue;
this.optionalStringValue = optionalStringValue;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DummyClass that = (DummyClass) o;
return intValue == that.intValue && Objects.equals(stringValue, that.stringValue) && Objects.equals(optionalStringValue, that.optionalStringValue);
}

@Override
public int hashCode() {
return Objects.hash(stringValue, intValue, optionalStringValue);
}
}
57 changes: 57 additions & 0 deletions sdk/java-sdk-protobuf/src/test/java/kalix/javasdk/DummyClass2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class DummyClass2 {
public String stringValue;
public int intValue;
public String mandatoryStringValue;

@JsonCreator
public DummyClass2(String stringValue, int intValue, String mandatoryStringValue) {
this.stringValue = stringValue;
this.intValue = intValue;
this.mandatoryStringValue = mandatoryStringValue;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DummyClass2 that = (DummyClass2) o;
return intValue == that.intValue && Objects.equals(stringValue, that.stringValue) && Objects.equals(mandatoryStringValue, that.mandatoryStringValue);
}

@Override
public int hashCode() {
return Objects.hash(stringValue, intValue, mandatoryStringValue);
}

@Override
public String toString() {
return "DummyClass2{" +
"stringValue='" + stringValue + '\'' +
", intValue=" + intValue +
", mandatoryStringValue='" + mandatoryStringValue + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kalix.javasdk;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;

public class DummyClass2Migration extends JsonMigration {
@Override
public int currentVersion() {
return 1;
}

@Override
public JsonNode transform(int fromVersion, JsonNode json) {
if (fromVersion < 1) {
return ((ObjectNode) json).set("mandatoryStringValue", TextNode.valueOf("mandatory-value"));
} else {
return json;
}
}
}
Loading

0 comments on commit 00297b0

Please sign in to comment.