Skip to content

Commit 551393c

Browse files
Savepoint deserialization fixup - The class 'pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapSerializer$$anon$1' is not instantiable: The class is an inner class, but not statically accessible.
1 parent 13e570b commit 551393c

File tree

5 files changed

+56
-69
lines changed

5 files changed

+56
-69
lines changed

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala

+2-13
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package pl.touk.nussknacker.engine.process.typeinformation
22

33
import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
4-
import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot}
54
import org.apache.flink.api.java.typeutils.{ListTypeInfo, MapTypeInfo, MultisetTypeInfo, RowTypeInfo}
65
import org.apache.flink.types.Row
76
import pl.touk.nussknacker.engine.api.context.ValidationContext
@@ -99,20 +98,10 @@ class TypingResultAwareTypeInformationDetection extends TypeInformationDetection
9998
}
10099

101100
private def createScalaMapTypeInformation(typingResult: TypedObjectTypingResult) =
102-
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
101+
TypedScalaMapTypeInformation(typingResult.fields.mapValuesNow(forType))
103102

104103
private def createJavaMapTypeInformation(typingResult: TypedObjectTypingResult) =
105-
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType), constructIntermediateCompatibilityResult)
106-
107-
protected def constructIntermediateCompatibilityResult(
108-
newNestedSerializers: Array[TypeSerializer[_]],
109-
oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
110-
): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = {
111-
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
112-
newNestedSerializers.map(_.snapshotConfiguration()),
113-
oldNestedSerializerSnapshots
114-
)
115-
}
104+
TypedJavaMapTypeInformation(typingResult.fields.mapValuesNow(forType))
116105

117106
def forValueWithContext[T](
118107
validationContext: ValidationContext,

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedJavaMapBasedTypeInformation.scala

+7-13
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,36 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
33
import java.{util => jutil}
44
import org.apache.flink.api.common.typeinfo.TypeInformation
55
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
6-
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
76

87
case class TypedJavaMapTypeInformation(
9-
informations: Map[String, TypeInformation[_]],
10-
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
8+
informations: Map[String, TypeInformation[_]]
119
) extends TypedObjectBasedTypeInformation[jutil.Map[String, AnyRef]](informations) {
1210

1311
override def createSerializer(
1412
serializers: Array[(String, TypeSerializer[_])]
1513
): TypeSerializer[jutil.Map[String, AnyRef]] =
16-
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
14+
TypedJavaMapSerializer(serializers)
1715

1816
}
1917

2018
@SerialVersionUID(1L)
2119
case class TypedJavaMapSerializer(
22-
override val serializers: Array[(String, TypeSerializer[_])],
23-
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
20+
override val serializers: Array[(String, TypeSerializer[_])]
2421
) extends TypedObjectBasedTypeSerializer[jutil.Map[String, AnyRef]](serializers)
2522
with BaseJavaMapBasedSerializer[AnyRef, jutil.Map[String, AnyRef]] {
2623

2724
override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[jutil.Map[String, AnyRef]] =
28-
TypedJavaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
25+
TypedJavaMapSerializer(serializers)
2926

3027
override def createInstance(): jutil.Map[String, AnyRef] = new jutil.HashMap()
3128

3229
override def snapshotConfiguration(
3330
snapshots: Array[(String, TypeSerializerSnapshot[_])]
34-
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots) {
35-
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
36-
buildIntermediateSchemaCompatibilityResultFunction
37-
}
31+
): TypeSerializerSnapshot[jutil.Map[String, AnyRef]] = new TypedJavaMapSerializerSnapshot(snapshots)
3832

3933
}
4034

41-
abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
35+
final class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[jutil.Map[String, AnyRef]] {
4236

4337
def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
4438
this()
@@ -48,6 +42,6 @@ abstract class TypedJavaMapSerializerSnapshot extends TypedObjectBasedSerializer
4842
override protected def restoreSerializer(
4943
restored: Array[(String, TypeSerializer[_])]
5044
): TypeSerializer[jutil.Map[String, AnyRef]] =
51-
TypedJavaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
45+
TypedJavaMapSerializer(restored)
5246

5347
}

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedObjectBasedTypeInformation.scala

+33-20
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import com.github.ghik.silencer.silent
44
import com.typesafe.scalalogging.LazyLogging
55
import org.apache.flink.api.common.ExecutionConfig
66
import org.apache.flink.api.common.typeinfo.TypeInformation
7+
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil.IntermediateCompatibilityResult
78
import org.apache.flink.api.common.typeutils.{
89
CompositeTypeSerializerUtil,
910
TypeSerializer,
1011
TypeSerializerSchemaCompatibility,
1112
TypeSerializerSnapshot
1213
}
1314
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
14-
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
1515

1616
import scala.reflect.ClassTag
1717

@@ -57,15 +57,6 @@ abstract class TypedObjectBasedTypeInformation[T: ClassTag](informations: Array[
5757
def createSerializer(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
5858
}
5959

60-
object TypedObjectBasedTypeInformation {
61-
62-
type BuildIntermediateSchemaCompatibilityResult = (
63-
Array[TypeSerializer[_]],
64-
Array[TypeSerializerSnapshot[_]]
65-
) => CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing]
66-
67-
}
68-
6960
//We use Array instead of List here, as we need access by index, which is faster for array
7061
abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String, TypeSerializer[_])])
7162
extends TypeSerializer[T]
@@ -132,17 +123,13 @@ abstract class TypedObjectBasedTypeSerializer[T](val serializers: Array[(String,
132123

133124
def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[T]
134125

135-
def buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
136126
}
137127

138128
abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnapshot[T] with LazyLogging {
139129

140-
protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
130+
private val constructIntermediateCompatibilityResultMethodName = "constructIntermediateCompatibilityResult"
141131

142-
def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
143-
this()
144-
this.serializersSnapshots = serializers
145-
}
132+
protected var serializersSnapshots: Array[(String, TypeSerializerSnapshot[_])] = _
146133

147134
override def getCurrentVersion: Int = 1
148135

@@ -182,10 +169,10 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
182169
val newKeys = newSerializers.map(_._1)
183170
val commons = currentKeys.intersect(newKeys)
184171

185-
val newSerializersToUse = newSerializers.filter(k => commons.contains(k._1))
186-
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
172+
val newSerializersToUse: Array[(String, TypeSerializer[_])] = newSerializers.filter(k => commons.contains(k._1))
173+
val snapshotsToUse = serializersSnapshots.filter(k => commons.contains(k._1))
187174

188-
val fieldsCompatibility = buildIntermediateSchemaCompatibilityResult(
175+
val fieldsCompatibility = constructIntermediateCompatibilityResultProxied(
189176
newSerializersToUse.map(_._2),
190177
snapshotsToUse.map(_._2)
191178
)
@@ -237,7 +224,33 @@ abstract class TypedObjectBasedSerializerSnapshot[T] extends TypeSerializerSnaps
237224
}
238225
}
239226

240-
val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult
227+
private def constructIntermediateCompatibilityResultProxied(
228+
newNestedSerializers: Array[TypeSerializer[_]],
229+
nestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]]
230+
): IntermediateCompatibilityResult[_] = {
231+
// signature of CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult changed between flink 1.18/1.19
232+
// Because of contract of serialization/deserialization of TypeSerializerSnapshot in can't be easily provided by TypeInformationDetection SPI mechanism
233+
try {
234+
val newMethod = classOf[CompositeTypeSerializerUtil].getMethod(
235+
constructIntermediateCompatibilityResultMethodName,
236+
classOf[Array[TypeSerializerSnapshot[_]]],
237+
classOf[Array[TypeSerializerSnapshot[_]]]
238+
)
239+
newMethod
240+
.invoke(newNestedSerializers.map(_.snapshotConfiguration()), nestedSerializerSnapshots)
241+
.asInstanceOf[IntermediateCompatibilityResult[_]]
242+
} catch {
243+
case _: NoSuchMethodException =>
244+
val oldMethod = classOf[CompositeTypeSerializerUtil].getMethod(
245+
constructIntermediateCompatibilityResultMethodName,
246+
classOf[Array[TypeSerializer[_]]],
247+
classOf[Array[TypeSerializerSnapshot[_]]]
248+
)
249+
oldMethod
250+
.invoke(newNestedSerializers, nestedSerializerSnapshots)
251+
.asInstanceOf[IntermediateCompatibilityResult[_]]
252+
}
253+
}
241254

242255
override def restoreSerializer(): TypeSerializer[T] = restoreSerializer(serializersSnapshots.map {
243256
case (k, snapshot) => (k, snapshot.restoreSerializer())

engine/flink/executor/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/internal/typedobject/TypedScalaMapBasedTypeInformation.scala

+7-13
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,21 @@ package pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject
33
import com.typesafe.scalalogging.LazyLogging
44
import org.apache.flink.api.common.typeinfo.TypeInformation
55
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
6-
import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedObjectBasedTypeInformation.BuildIntermediateSchemaCompatibilityResult
76

87
case class TypedScalaMapTypeInformation(
9-
informations: Map[String, TypeInformation[_]],
10-
buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
8+
informations: Map[String, TypeInformation[_]]
119
) extends TypedObjectBasedTypeInformation[Map[String, _ <: AnyRef]](informations) {
1210

1311
override def createSerializer(
1412
serializers: Array[(String, TypeSerializer[_])]
1513
): TypeSerializer[Map[String, _ <: AnyRef]] =
16-
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
14+
TypedScalaMapSerializer(serializers)
1715

1816
}
1917

2018
@SerialVersionUID(1L)
2119
case class TypedScalaMapSerializer(
22-
override val serializers: Array[(String, TypeSerializer[_])],
23-
override val buildIntermediateSchemaCompatibilityResultFunction: BuildIntermediateSchemaCompatibilityResult
20+
override val serializers: Array[(String, TypeSerializer[_])]
2421
) extends TypedObjectBasedTypeSerializer[Map[String, _ <: AnyRef]](serializers)
2522
with LazyLogging {
2623

@@ -36,20 +33,17 @@ case class TypedScalaMapSerializer(
3633
override def get(value: Map[String, _ <: AnyRef], k: String): AnyRef = value.getOrElse(k, null)
3734

3835
override def duplicate(serializers: Array[(String, TypeSerializer[_])]): TypeSerializer[Map[String, _ <: AnyRef]] =
39-
TypedScalaMapSerializer(serializers, buildIntermediateSchemaCompatibilityResultFunction)
36+
TypedScalaMapSerializer(serializers)
4037

4138
override def createInstance(): Map[String, _ <: AnyRef] = Map.empty
4239

4340
override def snapshotConfiguration(
4441
snapshots: Array[(String, TypeSerializerSnapshot[_])]
45-
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots) {
46-
override val buildIntermediateSchemaCompatibilityResult: BuildIntermediateSchemaCompatibilityResult =
47-
buildIntermediateSchemaCompatibilityResultFunction
48-
}
42+
): TypeSerializerSnapshot[Map[String, _ <: AnyRef]] = new TypedScalaMapSerializerSnapshot(snapshots)
4943

5044
}
5145

52-
abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
46+
final class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerializerSnapshot[Map[String, _ <: AnyRef]] {
5347

5448
def this(serializers: Array[(String, TypeSerializerSnapshot[_])]) = {
5549
this()
@@ -59,6 +53,6 @@ abstract class TypedScalaMapSerializerSnapshot extends TypedObjectBasedSerialize
5953
override protected def restoreSerializer(
6054
restored: Array[(String, TypeSerializer[_])]
6155
): TypeSerializer[Map[String, _ <: AnyRef]] =
62-
TypedScalaMapSerializer(restored, buildIntermediateSchemaCompatibilityResult)
56+
TypedScalaMapSerializer(restored)
6357

6458
}

engine/flink/executor/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala

+7-10
Original file line numberDiff line numberDiff line change
@@ -264,20 +264,17 @@ class TypingResultAwareTypeInformationDetectionSpec
264264
}
265265

266266
private def assertNested(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_] => Assertion)*): Unit = {
267-
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
268-
case TypedScalaMapSerializer(array, _) =>
269-
array.zipAll(nested.toList, null, null).foreach {
270-
case ((name, serializer), (expectedName, expectedSerializer)) =>
271-
name shouldBe expectedName
272-
expectedSerializer(serializer)
273-
}
267+
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
268+
array.zipAll(nested.toList, null, null).foreach { case ((name, serializer), (expectedName, expectedSerializer)) =>
269+
name shouldBe expectedName
270+
expectedSerializer(serializer)
271+
}
274272
}
275273
}
276274

277275
private def assertMapSerializers(serializer: TypeSerializer[_], nested: (String, TypeSerializer[_])*) = {
278-
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) {
279-
case TypedScalaMapSerializer(array, _) =>
280-
array.toList shouldBe nested.toList
276+
inside(serializer.asInstanceOf[TypeSerializer[Map[String, _ <: AnyRef]]]) { case TypedScalaMapSerializer(array) =>
277+
array.toList shouldBe nested.toList
281278
}
282279
}
283280

0 commit comments

Comments
 (0)