Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Savepoint deserialization fixup - The class is an inner class, but not statically accessible. #7270

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pl.touk.nussknacker.engine.process.typeinformation

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo}
import org.apache.flink.types.Row
import pl.touk.nussknacker.engine.api.context.ValidationContext
Expand Down Expand Up @@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection
}

private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) =
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType))

private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) =
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)

protected def constructIntermediateCompatibilityResult(
newNestedSerializers: Array[TypeSerializer[_]],
oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = {
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
newNestedSerializers.map(_.snapshotConfiguration()),
oldNestedSerializerSnapshots
)
}
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType))

def forValueWithContext[T](
validationContext: ValidationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import java.{util => jutil}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

case class TypedJavaMapTypeInformation(
informations: Map[String, TypeInformation[_]],
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) {

override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedJavaMapSerializer(serializers)

}

@SerialVersionUID(1L)
case class TypedJavaMapSerializer(
override val serializers: Array[(String, TypeSerializer[_])],
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers)
with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] {

override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedJavaMapSerializer(serializers)

override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap()

override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) {
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
buildIntermediateSchemaCompatibilityResultFunction
}
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots)

}

abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
Expand All @@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[jutil.Map[String, AnyRef]] =
TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
TypedJavaMapSerializer(restored)

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import com.github.ghik.silencer.silent
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerUtil,
TypeSerializer,
TypeSerializerSchemaCompatibility,
TypeSerializerSnapshot
}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

import scala.reflect.ClassTag

Expand Down Expand Up @@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[
def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
}

object TypedObjectBasedTypeInformation {

type BuildIntermediateSchemaCompatibilityResult = (
Array[TypeSerializer[_]],
Array[TypeSerializerSnapshot[_]]
) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing]

}

//We use Array instead of List here, as we need access by index, which is faster for array
abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])])
extends TypeSerializer[T]
Expand Down Expand Up @@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String,

def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]

def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
}

abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging {

protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult"

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
this.serializersSnapshots = serializers
}
protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _

override def getCurrentVersion: Int = 1

Expand Down Expand Up @@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
val newKeys = newSerializers.map(_._1)
val commons = currentKeys.intersect(newKeys)

val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1))
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1))
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))

val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult(
val fieldsCompatibility = constructIntermediateCompatibilityResultProxied(
newSerializersToUse.map(_._2),
snapshotsToUse.map(_._2)
)
Expand Down Expand Up @@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
}
}

val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult
private def constructIntermediateCompatibilityResultProxied(
newNestedSerializers: Array[TypeSerializer[_]],
nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
): IntermediateCompatibilityResult[_] = {
// signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult has been changed between flink 1.18/1.19
// Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism
try {
val newMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializerSnapshot[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
newMethod
.invoke(null, newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
} catch {
case _: NoSuchMethodException =>
val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod(
constructIntermediateCompatibilityResultMethodName,
classOf[Array[TypeSerializer[_]]],
classOf[Array[TypeSerializerSnapshot[_]]]
)
oldMethod
.invoke(null, newNestedSerializers, nestedSerializerSnapshots)
.asInstanceOf[IntermediateCompatibilityResult[_]]
}
}

override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map {
case (k, snapshot) => (k, snapshot.restoreSerializer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult

case class TypedScalaMapTypeInformation(
informations: Map[String, TypeInformation[_]],
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
informations: Map[String, TypeInformation[_]]
) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) {

override def createSerializer(
serializers: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedScalaMapSerializer(serializers)

}

@SerialVersionUID(1L)
case class TypedScalaMapSerializer(
override val serializers: Array[(String, TypeSerializer[_])],
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
override val serializers: Array[(String, TypeSerializer[_])]
) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers)
with LazyLogging {

Expand All @@ -36,20 +33,17 @@ case class TypedScalaMapSerializer(
override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null)

override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
TypedScalaMapSerializer(serializers)

override def createInstance(): Map[String, _ <: AnyRef] = Map.empty

override def snapshotConfiguration(
snapshots: Array[(String, TypeSerializerSnapshot[_])]
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) {
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
buildIntermediateSchemaCompatibilityResultFunction
}
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots)

}

abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {

def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
this()
Expand All @@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize
override protected def restoreSerializer(
restored: Array[(String, TypeSerializer[_])]
): TypeSerializer[Map[String, _ <: AnyRef]] =
TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
TypedScalaMapSerializer(restored)

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec
}

private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = {
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
case TypedScalaMapSerializer(array, _) =>
array.zipAll(nested.toList, null, null).foreach {
case ((name, serializer), (expectedName, expectedSerializer)) =>
name shouldBe expectedName
expectedSerializer(serializer)
}
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) =>
name shouldBe expectedName
expectedSerializer(serializer)
}
}
}

private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = {
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
case TypedScalaMapSerializer(array, _) =>
array.toList shouldBe nested.toList
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
array.toList shouldBe nested.toList
}
}

Expand Down
Loading