Skip to content

Commit

Permalink
Merge pull request #18 from findify/feature/type-mappers
Browse files Browse the repository at this point in the history
type mapper support
  • Loading branch information
shuttie authored May 17, 2022
2 parents 8926841 + 24a0c4f commit 7670584
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 11 deletions.
70 changes: 69 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ compiles OK. 2022.1 is fine with serializers derived with this library.

`flink-adt` is released to Maven-central. For SBT, add this snippet to `build.sbt`:
```scala
libraryDependencies += "io.findify" %% "flink-adt" % "0.5.0"
libraryDependencies += "io.findify" %% "flink-adt" % "0.6.1"
```

To use this library, swap `import org.apache.flink.api.scala._` with `import io.findify.flinkadt.api._` and enjoy.
Expand All @@ -50,6 +50,71 @@ function, which may happily generate you a kryo-based serializer in a place you
to do this type of wildcard import, make sure that you explicitly called `deriveTypeInformation`
for all the sealed traits in the current scope.

## Java types

flink-adt is a scala-specific library and won't derive TypeInformation for java classes (as they don't extend the `scala.Product` type).
But you can always fall back to flink's own POJO serializer in this way, so just make it implicit so flink-adt can pick it up:

```scala
import java.date.LocalDate
implicit val localDateTypeInfo: TypeInformation[LocalDate] = TypeInformation.of(classOf[LocalDate])
```

## Type mapping

Sometimes flink-adt may spot a type (usually a java one), which cannot be directly serialized as a case class, like this
example:
```scala
class WrappedString {
private var internal: String = ""

override def equals(obj: Any): Boolean = obj match {
case s: WrappedString => s.get == internal
case _ => false
}
def get: String = internal
def put(value: String) = {
internal = value
}
}
```

You can write a pair of explicit `TypeInformation[WrappedString]` and `Serializer[WrappedString]`, but it's extremely verbose,
and the class itself can be 1-to-1 mapped to a regular `String`. Flink-adt has a mechanism of type mappers to delegate serialization
of non-serializable types to existing serializers. For example:
```scala
class WrappedMapper extends TypeMapper[WrappedString, String] {
override def map(a: WrappedString): String = a.get

override def contramap(b: String): WrappedString = {
val str = new WrappedString
str.put(b)
str
}
}
implicit val mapper: TypeMapper[WrappedString, String] = new WrappedMapper()
// will treat WrappedString with String typeinfo:
implicit val ti: TypeInformation[WrappedString] = implicitly[TypeInformation[WrappedString]]
```

When there is a `TypeMapper[A,B]` in the scope to convert `A` to `B` and back, and type `B` has `TypeInformation[B]` available
in the scope also, then flink-adt will use a delegated existing typeinfo for `B` when it will spot type `A`.

Warning: on Scala 3, the TypeMapper should not be made anonymous. This example won't work, as anonymous implicit classes in
scala 3 are private, and Flink cannot instantiate it on restore without jvm17 incompatible reflection hacks:
```scala
// anonymous class, will fail on runtime on scala 3
implicit val mapper: TypeMapper[WrappedString, String] = new TypeMapper[WrappedString, String] {
override def map(a: WrappedString): String = a.get

override def contramap(b: String): WrappedString = {
val str = new WrappedString
str.put(b)
str
}
}
```

## Schema evolution

For the child case classes being part of ADT, `flink-adt` uses a Flink's `ScalaCaseClassSerializer`, so all the compatibility rules
Expand All @@ -66,6 +131,9 @@ For the sealed trait membership itself, `flink-adt` used an own serialization fo
This project uses a separate set of serializers for collections, instead of Flink's own TraversableSerializer. So probably you
may have issues while migrating state snapshots from TraversableSerializer to FlinkADT ones.

Starting from version 0.5.0+, this project strictly depends on Flink 1.15+, as starting from this version it can be cross-built
for scala 2.13 and scala 3.

## Licence

The MIT License (MIT)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "flink-adt"

version := "0.5.0"
version := "0.6.1"

lazy val `scala 2.12` = "2.12.15"
lazy val `scala 2.13` = "2.13.8"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.findify.flinkadt.api.mapper

import io.findify.flinkadt.api.serializer.MappedSerializer.TypeMapper

class BigDecMapper extends TypeMapper[scala.BigDecimal, java.math.BigDecimal] {
override def map(a: BigDecimal): java.math.BigDecimal = a.bigDecimal
override def contramap(b: java.math.BigDecimal): BigDecimal = BigDecimal(b)
}
10 changes: 10 additions & 0 deletions src/main/scala/io/findify/flinkadt/api/mapper/BigIntMapper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.findify.flinkadt.api.mapper

import io.findify.flinkadt.api.serializer.MappedSerializer.TypeMapper

import java.math.BigInteger

class BigIntMapper() extends TypeMapper[scala.BigInt, java.math.BigInteger] {
override def contramap(b: BigInteger): BigInt = BigInt(b)
override def map(a: BigInt): BigInteger = a.bigInteger
}
35 changes: 27 additions & 8 deletions src/main/scala/io/findify/flinkadt/api/package.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.findify.flinkadt

import io.findify.flinkadt.api.mapper.{BigDecMapper, BigIntMapper}
import io.findify.flinkadt.api.serializer.MappedSerializer.TypeMapper
import io.findify.flinkadt.api.serializer._
import io.findify.flinkadt.api.typeinfo._
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.typeutils.base.array._

import java.math.BigInteger
import scala.collection.mutable
import scala.reflect.{ClassTag, classTag}

Expand Down Expand Up @@ -92,6 +95,19 @@ package object api extends LowPrioImplicits {
implicit lazy val floatInfo: TypeInformation[Float] = BasicTypeInfo.getInfoFor(classOf[Float])
implicit lazy val longInfo: TypeInformation[Long] = BasicTypeInfo.getInfoFor(classOf[Long])
implicit lazy val shortInfo: TypeInformation[Short] = BasicTypeInfo.getInfoFor(classOf[Short])

implicit lazy val bigDecMapper: TypeMapper[scala.BigDecimal, java.math.BigDecimal] = new BigDecMapper()
implicit lazy val bigDecInfo: TypeInformation[BigDecimal] = mappedTypeInfo[scala.BigDecimal, java.math.BigDecimal]
implicit lazy val bigIntMapper: TypeMapper[scala.BigInt, java.math.BigInteger] = new BigIntMapper()
implicit lazy val bigIntInfo: TypeInformation[BigInt] = mappedTypeInfo[scala.BigInt, java.math.BigInteger]

implicit lazy val unitInfo: TypeInformation[Unit] = new UnitTypeInformation()
implicit def mappedTypeInfo[A: ClassTag, B](implicit
mapper: TypeMapper[A, B],
ti: TypeInformation[B]
): TypeInformation[A] =
new MappedTypeInformation[A, B](mapper, ti)

// serializers
implicit lazy val stringSerializer: TypeSerializer[String] = stringInfo.createSerializer(config)
implicit lazy val intSerializer: TypeSerializer[Int] = intInfo.createSerializer(config)
Expand All @@ -104,14 +120,17 @@ package object api extends LowPrioImplicits {
implicit lazy val shortSerializer: TypeSerializer[Short] = shortInfo.createSerializer(config)

// java
implicit lazy val jIntegerInfo: TypeInformation[java.lang.Integer] = BasicTypeInfo.INT_TYPE_INFO
implicit lazy val jLongInfo: TypeInformation[java.lang.Long] = BasicTypeInfo.LONG_TYPE_INFO
implicit lazy val jFloatInfo: TypeInformation[java.lang.Float] = BasicTypeInfo.FLOAT_TYPE_INFO
implicit lazy val jDoubleInfo: TypeInformation[java.lang.Double] = BasicTypeInfo.DOUBLE_TYPE_INFO
implicit lazy val jBooleanInfo: TypeInformation[java.lang.Boolean] = BasicTypeInfo.BOOLEAN_TYPE_INFO
implicit lazy val jByteInfo: TypeInformation[java.lang.Byte] = BasicTypeInfo.BYTE_TYPE_INFO
implicit lazy val jCharInfo: TypeInformation[java.lang.Character] = BasicTypeInfo.CHAR_TYPE_INFO
implicit lazy val jShortInfo: TypeInformation[java.lang.Short] = BasicTypeInfo.SHORT_TYPE_INFO
implicit lazy val jIntegerInfo: TypeInformation[java.lang.Integer] = BasicTypeInfo.INT_TYPE_INFO
implicit lazy val jLongInfo: TypeInformation[java.lang.Long] = BasicTypeInfo.LONG_TYPE_INFO
implicit lazy val jFloatInfo: TypeInformation[java.lang.Float] = BasicTypeInfo.FLOAT_TYPE_INFO
implicit lazy val jDoubleInfo: TypeInformation[java.lang.Double] = BasicTypeInfo.DOUBLE_TYPE_INFO
implicit lazy val jBooleanInfo: TypeInformation[java.lang.Boolean] = BasicTypeInfo.BOOLEAN_TYPE_INFO
implicit lazy val jByteInfo: TypeInformation[java.lang.Byte] = BasicTypeInfo.BYTE_TYPE_INFO
implicit lazy val jCharInfo: TypeInformation[java.lang.Character] = BasicTypeInfo.CHAR_TYPE_INFO
implicit lazy val jShortInfo: TypeInformation[java.lang.Short] = BasicTypeInfo.SHORT_TYPE_INFO
implicit lazy val jVoidInfo: TypeInformation[java.lang.Void] = BasicTypeInfo.VOID_TYPE_INFO
implicit lazy val jBigIntInfo: TypeInformation[BigInteger] = BasicTypeInfo.BIG_INT_TYPE_INFO
implicit lazy val jBigDecInfo: TypeInformation[java.math.BigDecimal] = BasicTypeInfo.BIG_DEC_TYPE_INFO

implicit def listCCInfo[T](implicit lc: ClassTag[T], ls: TypeSerializer[::[T]]): TypeInformation[::[T]] = {
drop(lc)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.findify.flinkadt.api.serializer

import io.findify.flinkadt.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerSnapshot,
GenericTypeSerializerSnapshot,
SimpleTypeSerializerSnapshot,
TypeSerializer,
TypeSerializerSchemaCompatibility,
TypeSerializerSnapshot
}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil

case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] {
override def equals(obj: Any): Boolean = ser.equals(obj)

override def toString: String = ser.toString

override def hashCode(): Int = ser.hashCode()
override def getLength: Int = ser.getLength

override def serialize(record: A, target: DataOutputView): Unit = {
ser.serialize(mapper.map(record), target)
}

override def deserialize(reuse: A, source: DataInputView): A = {
mapper.contramap(ser.deserialize(mapper.map(reuse), source))
}

override def deserialize(source: DataInputView): A = mapper.contramap(ser.deserialize(source))

override def snapshotConfiguration(): TypeSerializerSnapshot[A] = new MappedSerializerSnapshot[A, B](mapper, ser)

override def createInstance(): A = mapper.contramap(ser.createInstance())
}

object MappedSerializer {
trait TypeMapper[A, B] {
def map(a: A): B
def contramap(b: B): A
}
class MappedSerializerSnapshot[A, B]() extends TypeSerializerSnapshot[A] {
var mapper: TypeMapper[A, B] = _
var ser: TypeSerializer[B] = _
def this(xmapper: TypeMapper[A, B], xser: TypeSerializer[B]) = {
this()
mapper = xmapper
ser = xser
}

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val mapperClazz = InstantiationUtil.resolveClassByName[TypeMapper[A, B]](in, userCodeClassLoader)
mapper = InstantiationUtil.instantiate(mapperClazz)
val serClazz = InstantiationUtil.resolveClassByName(in, userCodeClassLoader)
ser = InstantiationUtil.instantiate(serClazz)
}

override def resolveSchemaCompatibility(newSerializer: TypeSerializer[A]): TypeSerializerSchemaCompatibility[A] =
TypeSerializerSchemaCompatibility.compatibleAsIs()

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(mapper.getClass.getName)
out.writeUTF(ser.getClass.getName)
}

override def restoreSerializer(): TypeSerializer[A] = new MappedSerializer[A, B](mapper, ser)

override def getCurrentVersion: Int = 1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.findify.flinkadt.api.serializer

import io.findify.flinkadt.api.serializer.UnitSerializer.UnitSerializerSnapshot
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

import java.util.function.Supplier

class UnitSerializer extends SimpleSerializer[Unit] {
override def getLength: Int = 0

override def serialize(record: Unit, target: DataOutputView): Unit = {}

override def deserialize(reuse: Unit, source: DataInputView): Unit = {}

override def deserialize(source: DataInputView): Unit = {}

override def snapshotConfiguration(): TypeSerializerSnapshot[Unit] = new UnitSerializerSnapshot()

override def createInstance(): Unit = {}
}

object UnitSerializer {
class UnitSerializerSnapshot
extends SimpleTypeSerializerSnapshot[Unit](
new Supplier[TypeSerializer[Unit]] {
override def get(): TypeSerializer[Unit] = new UnitSerializer
}
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.findify.flinkadt.api.typeinfo

import io.findify.flinkadt.api.serializer.MappedSerializer
import io.findify.flinkadt.api.serializer.MappedSerializer.TypeMapper
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer

import scala.reflect.{ClassTag, classTag}

case class MappedTypeInformation[A: ClassTag, B](mapper: TypeMapper[A, B], nested: TypeInformation[B])
extends TypeInformation[A] {
override def createSerializer(config: ExecutionConfig): TypeSerializer[A] =
new MappedSerializer(mapper, nested.createSerializer(config))
override def isKeyType: Boolean = nested.isKeyType
override def getTotalFields: Int = nested.getTotalFields
override def isTupleType: Boolean = nested.isTupleType

override def canEqual(obj: Any): Boolean = obj match {
case m: MappedTypeInformation[_, _] => true
case _ => false
}
override def getTypeClass: Class[A] = classTag[A].runtimeClass.asInstanceOf[Class[A]]
override def getArity: Int = nested.getArity
override def isBasicType: Boolean = nested.isBasicType

override def toString: String = nested.toString

override def equals(obj: Any): Boolean = obj match {
case m: MappedTypeInformation[_, _] => (m.nested == nested) && m.mapper.equals(mapper)
case _ => false
}

override def hashCode(): Int = nested.hashCode()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.findify.flinkadt.api.typeinfo

import io.findify.flinkadt.api.serializer.UnitSerializer
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer

class UnitTypeInformation extends TypeInformation[Unit] {
override def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] = new UnitSerializer()
override def isKeyType: Boolean = true
override def getTotalFields: Int = 0
override def isTupleType: Boolean = false
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[Unit]
override def getTypeClass: Class[Unit] = classOf[Unit]
override def getArity: Int = 0
override def isBasicType: Boolean = false

override def toString: String = "{}"

override def equals(obj: Any): Boolean = obj match {
case _: Unit => true
case _ => false
}

override def hashCode(): Int = ().hashCode()
}
43 changes: 43 additions & 0 deletions src/test/scala/io/findify/flinkadt/MappedTypeInfoTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.findify.flinkadt

import io.findify.flinkadt.MappedTypeInfoTest.WrappedString
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import io.findify.flinkadt.api._
import io.findify.flinkadt.api.serializer.MappedSerializer.TypeMapper
import org.apache.flink.api.common.typeinfo.TypeInformation

import scala.reflect.ClassTag

class MappedTypeInfoTest extends AnyFlatSpec with Matchers with TestUtils {
import MappedTypeInfoTest._
it should "derive TI for non-serializeable classes" in {
drop(implicitly[TypeInformation[WrappedString]])
}
}

object MappedTypeInfoTest {
class WrappedMapper extends TypeMapper[WrappedString, String] {
override def map(a: WrappedString): String = a.get

override def contramap(b: String): WrappedString = {
val str = new WrappedString
str.put(b)
str
}
}
implicit val mapper: TypeMapper[WrappedString, String] = new WrappedMapper()

class WrappedString {
private var internal: String = ""

override def equals(obj: Any): Boolean = obj match {
case s: WrappedString => s.get == internal
case _ => false
}
def get: String = internal
def put(value: String) = {
internal = value
}
}
}
Loading

0 comments on commit 7670584

Please sign in to comment.