Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port To.safe macro to Scala 3 #10

Open
wants to merge 31 commits into
base: scala3-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
84cb877
Add first implementation of macro interpreter
vincenzobaz Mar 29, 2021
a3186d6
Correct syntax
vincenzobaz Mar 29, 2021
e55b1a1
Continue To.safe macro implementation
MaximeKjaer Mar 30, 2021
ccf83a8
Update implicit parameter to Scala 3 syntax
MaximeKjaer Mar 30, 2021
55a9393
Remove withDottyCompath from scio-macros. Add withDottyCompat to over…
vincenzobaz Mar 19, 2021
b40e626
Fix compilatin issue
vincenzobaz Apr 1, 2021
7c4c7e4
Make Scala 3 To.safe macro compile
MaximeKjaer Apr 1, 2021
d90ed49
Make To.safe take implicit parameters in Scala 3
MaximeKjaer Apr 7, 2021
3a9c506
Add To.safe test
MaximeKjaer Apr 9, 2021
7d87a58
Relace expr-based schema interpreter with a reflection-based one
vincenzobaz Apr 12, 2021
bb323dc
Add simple tests
vincenzobaz Apr 16, 2021
2556de8
Enable scio-core tests in ci
vincenzobaz Apr 16, 2021
91b6f0d
Use scalatest it in To.safe tests
MaximeKjaer Apr 19, 2021
78dfc9d
Add missing cases to To.safe
MaximeKjaer Apr 19, 2021
e227e8c
Add tests for To.safe conversions of Java types
MaximeKjaer Apr 19, 2021
fa5c956
Prototype a compile time schema generation for java beans
vincenzobaz Apr 27, 2021
6658e8b
Simplify test
vincenzobaz Apr 27, 2021
afb37aa
Specify versions
vincenzobaz Apr 26, 2021
4aa8e57
Drop .tree usage in javabean schema derivation
vincenzobaz Apr 28, 2021
232995b
Drop usage of .tree in IsJavaBean macro
vincenzobaz Apr 28, 2021
47044fe
Add JavaBeans
vincenzobaz Apr 28, 2021
d4b9f8c
Disable Scala 2 test
vincenzobaz Apr 28, 2021
75c4d4e
Reorder cases in To.safe interpreter
MaximeKjaer May 5, 2021
c49e5e9
Update to Scala 3
vincenzobaz Jun 7, 2021
93d192f
Fix IsJavaBean
vincenzobaz Jun 7, 2021
e082144
Update scalatest
vincenzobaz Jun 7, 2021
997be91
Update sbt
vincenzobaz Jun 7, 2021
26f5857
Disable scoverage plugin to compile in Scala 3
vincenzobaz Jun 7, 2021
63f0dcc
Migrate ci to 3.0.0
vincenzobaz Jun 7, 2021
1004f2d
Fix typo
vincenzobaz Jun 7, 2021
1ec8f25
Use transparent to whitebox isjavabean
vincenzobaz Jun 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/migration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
# - scio-extra/compile
# - scio-extra/test
- scio-core/compile
# - scio-core/test
- scio-core/test
# - scio-examples/compile
# - scio-examples/test
# - scio-redis/compile
Expand Down Expand Up @@ -54,5 +54,7 @@ jobs:
uses: coursier/cache-action@v5
- name: java 8 setup
uses: olafurpg/setup-scala@v10
- name: Compile
# - name: Scala 2
# run: sbt "++2.13.5;${{ matrix.task }}"
- name: Scala 3
run: sbt "++3.0.0-RC2;${{ matrix.task }}"
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,8 @@ lazy val `scio-macros`: Project = project
libraryDependencies ++= Seq(
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion,
"org.apache.avro" % "avro" % avroVersion
"org.apache.avro" % "avro" % avroVersion,
"org.scalatest" %% "scalatest" % scalatestVersion % Test
),
// Scala 2 dependencies
libraryDependencies ++= {
Expand Down
159 changes: 154 additions & 5 deletions scio-core/src/main/scala-3/com/spotify/scio/schemas/To.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,165 @@
package com.spotify.scio.schemas

import org.apache.beam.sdk.schemas.{SchemaCoder, Schema => BSchema}
import BSchema.{ FieldType => BFieldType }

import scala.compiletime._
import scala.deriving._
import scala.quoted._
import scala.reflect.ClassTag
import scala.collection.mutable
import com.spotify.scio.IsJavaBean.checkGetterAndSetters

object ToMacro {
def safeImpl[I, O](si: Expr[Schema[I]])(implicit q: Quotes): Expr[To[I, O]] = {
???

def safeImpl[I: scala.quoted.Type, O: scala.quoted.Type](
iSchema: Expr[Schema[I]],
oSchema: Expr[Schema[O]]
)(using Quotes): Expr[To[I, O]] = {
import scala.quoted.quotes.reflect.{report, TypeRepr}

(interpret[I] , interpret[O]) match {
case (None, None) => report.throwError(
s"""
|Could not interpret input schema:
| ${iSchema.show}
|Could not interpret output schema:
| ${oSchema.show}
|""".stripMargin
)
case (None, _) => report.throwError("Could not interpret input schema: " + iSchema.show)
case (_, None) => report.throwError("Could not interpret output schema: " + oSchema.show)
case (Some(sIn), Some(sOut)) =>
val schemaOut: BSchema = SchemaMaterializer.fieldType(sOut).getRowSchema()
val schemaIn: BSchema = SchemaMaterializer.fieldType(sIn).getRowSchema()
val classTagOpt = Expr.summon[ClassTag[O]]
if (classTagOpt.isEmpty) {
report.throwError(s"Could not summon Expr[ClassTag[${TypeRepr.of[O].show}]]")
}
val classTag = classTagOpt.get
To.checkCompatibility(schemaIn, schemaOut)('{ To.unchecked[I, O](using $iSchema, $oSchema, $classTag) })
.fold(message => report.throwError(message), identity)
}
}

private def sequence[T](ls: List[Option[T]]): Option[List[T]] =
if ls.exists(_.isEmpty) then None
else Some(ls.collect { case Some(x) => x })

private def interpret[T: scala.quoted.Type](using Quotes): Option[Schema[T]] =
Type.of[T] match {
Copy link

@jto jto Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something this means that the possible set of supported schema types is effectively limited to those already defined in scio.

If a user adds a given instance of Schema for type that isn't supported (say for example a Java class), the derivation will simply ignore it. This could be a problem for aliaswa support in Schema but probably something we can live with. We don't really expect users to define their own instances of Schema.

I'd be curious to see how the implementation looks like for Java beans and Avro's SpecificRecord.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something this means that the possible set of supported schema types is effectively limited to those already defined in scio.

Yes, that's the assumption here.

If a user adds a given instance of Schema for type that isn't supported (say for example a Java class), the derivation will simply ignore it. This could be a problem for aliaswa support in Schema but probably something we can live with. We don't really expect users to define their own instances of Schema.

It's still possible to support those use cases by registering custom interpreters to be tried as a last resort. Indeed, it would complicate things and can be subtle: think the case where the user defines the schema and use it in the same project.

This could be a problem for aliaswa support in Schema

It's not clear what you mean above. Could you please clarify?

I'd be curious to see how the implementation looks like for Java beans and Avro's SpecificRecord.

@vincenzobaz and @MaximeKjaer are going to work on it. I'll let them report progress on it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey. Yeah sorry I wanted to talk about LogicalType actually but got the exact term used in beam wrong... I think that's the only case for which a user might want to define a custom type. Last time I checked the implementation was broken in beam and fixing it did not seem to be the priority so I guess we can live without it.

The most common use case for us would be to convert from avro's SpecificRecord to case classes and vice-versa so this one needs to be working really well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking out how we could handle java beans and I hit an important blocker I think.
It is really easy to work around the implicit instance of IsJavaBean[T] because an instance of such type it is just an empty evidence, moreover the code to instantiate it is already ported to macro. It is straightforward to add a case to the pattern match which could like this:

 case _ if TypeRepr.of[T].typeSymbol.flags.is(Flags.JavaDefined) && Try(IsJavaBean.isJavaBeanImpl[T]).isSuccess => 
    ???

I also remarked that we only need a BSchema for the compatibility check, so we might avoid altogether reaching for a RawRecord[T].

The major pain point is that JavaBeanSchema.schemaFor which is used to create the BSchema in question requires a Class[T] (which explains the implicit requirement of a ClassTag[T]). Class[T] is available only at runtime, so it is not possible to have instances of it in the macro.

I am not sure how we can handle this problem besides maybe implementing our own schema derivation for java beans.
What do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blocker might affect AvroInstances.avroSchema as well given that it relies on an implicit ClassTag

Copy link

@jto jto Apr 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However this might result in a compile time Schema which diverges from the one generated at runtime by the beam

This might be the case but seems unlikely. I think r-eimplementing it is an acceptable solution. We can always have tests to validate the the Schema is the same as what beam would derive.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC for Avro we actually only need to access the avro Schema. There's a schema field in the generated code so it should be possible to access it at compile time since it's just a String ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For avro I see two methods:

  • implicit def avroSchema[T <: SpecificRecord: ClassTag]: Schema[T] which relies on the ClassTag to feed a Class to TypeDescriptor and therefore suffers from the same problem as the Java bean discussed here.
  • def fromAvroSchema(schema: org.apache.avro.Schema): Schema[GenericRecord] which is not generic nor implicit.

I imagine that you refer to the second one. In this case we can pattern match on the call of fromAvroSchema but we have not compile time information about schema: avro.Schema, so I am not sure about how to obtain the Schema[GenericRecord]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jto commit 93de09c proposes an implementation of compile time schema derivation for java beans similar to the one used for case classes. It seems to be accepted by (simple) tests.
Does it look reasonable to you?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. Seems reasonable :)

case '[java.lang.Byte] => Some(Schema.jByteSchema.asInstanceOf[Schema[T]])
case '[Array[java.lang.Byte]] => Some(Schema.jBytesSchema.asInstanceOf[Schema[T]])
case '[java.lang.Short] => Some(Schema.jShortSchema.asInstanceOf[Schema[T]])
case '[java.lang.Integer] => Some(Schema.jIntegerSchema.asInstanceOf[Schema[T]])
case '[java.lang.Long] => Some(Schema.jLongSchema.asInstanceOf[Schema[T]])
case '[java.lang.Float] => Some(Schema.jFloatSchema.asInstanceOf[Schema[T]])
case '[java.lang.Double] => Some(Schema.jDoubleSchema.asInstanceOf[Schema[T]])
case '[java.math.BigDecimal] => Some(Schema.jBigDecimalSchema.asInstanceOf[Schema[T]])
case '[java.lang.Boolean] => Some(Schema.jBooleanSchema.asInstanceOf[Schema[T]])
case '[java.util.List[u]] =>
for (itemSchema) <- interpret[u]
yield Schema.jListSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[java.util.ArrayList[u]] =>
for (itemSchema) <- interpret[u]
yield Schema.jArrayListSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[java.util.Map[k, v]] =>
for {
keySchema <- interpret[k]
valueSchema <- interpret[v]
} yield Schema.jMapSchema(using keySchema, valueSchema).asInstanceOf[Schema[T]]
// TODO javaBeanSchema
// TODO javaEnumSchema
case '[java.time.LocalDate] => Some(Schema.jLocalDate.asInstanceOf[Schema[T]])

case '[String] => Some(Schema.stringSchema.asInstanceOf[Schema[T]])
case '[Byte] => Some(Schema.byteSchema.asInstanceOf[Schema[T]])
case '[Array[Byte]] => Some(Schema.bytesSchema.asInstanceOf[Schema[T]])
case '[Short] => Some(Schema.sortSchema.asInstanceOf[Schema[T]])
case '[Int] => Some(Schema.intSchema.asInstanceOf[Schema[T]])
case '[Long] => Some(Schema.longSchema.asInstanceOf[Schema[T]])
case '[Float] => Some(Schema.floatSchema.asInstanceOf[Schema[T]])
case '[Double] => Some(Schema.doubleSchema.asInstanceOf[Schema[T]])
case '[BigDecimal] => Some(Schema.bigDecimalSchema.asInstanceOf[Schema[T]])
case '[Boolean] => Some(Schema.booleanSchema.asInstanceOf[Schema[T]])
case '[Option[u]] =>
for (itemSchema <- interpret[u])
yield Schema.optionSchema(using itemSchema).asInstanceOf[Schema[T]]
// TODO Array[T]
case '[List[u]] =>
for (itemSchema <- interpret[u])
yield Schema.listSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[mutable.ArrayBuffer[u]] =>
for (itemSchema <- interpret[u])
yield Schema.arrayBufferSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[mutable.Buffer[u]] =>
for (itemSchema <- interpret[u])
yield Schema.bufferSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[mutable.Set[u]] =>
for (itemSchema <- interpret[u])
yield Schema.mutableSetSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[Set[u]] =>
for (itemSchema <- interpret[u])
yield Schema.setSchema(using itemSchema).asInstanceOf[Schema[T]]
// TODO SortedSet[T]
case '[mutable.ListBuffer[u]] =>
for (itemSchema <- interpret[u])
yield Schema.listBufferSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[Vector[u]] =>
for (itemSchema <- interpret[u])
yield Schema.vectorSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[mutable.Map[k, v]] =>
for {
keySchema <- interpret[k]
valueSchema <- interpret[v]
} yield Schema.mutableMapSchema(using keySchema, valueSchema).asInstanceOf[Schema[T]]
case '[Map[k, v]] =>
for {
keySchema <- interpret[k]
valueSchema <- interpret[v]
} yield Schema.mapSchema(using keySchema, valueSchema).asInstanceOf[Schema[T]]
case '[Seq[u]] =>
for (itemSchema <- interpret[u])
yield Schema.seqSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[TraversableOnce[u]] =>
for (itemSchema <- interpret[u])
yield Schema.traversableOnceSchema(using itemSchema).asInstanceOf[Schema[T]]
case '[Iterable[u]] =>
for (itemSchema <- interpret[u])
yield Schema.iterableSchema(using itemSchema).asInstanceOf[Schema[T]]
case _ =>
import quotes.reflect._
val tp = TypeRepr.of[T]
val tpSymbol: Symbol = tp.typeSymbol

// if case class iterate and recurse, else sorry
if tp <:< TypeRepr.of[Product] && tpSymbol.caseFields.nonEmpty then {
val schemasOpt: List[Option[(String, Schema[Any])]] = tpSymbol.caseFields.map { (f: Symbol) =>
assert(f.isValDef)
val fieldName = f.name
val fieldType: TypeRepr = tp.memberType(f)
fieldType.asType match {
// mattern match to create a bind <3
case '[u] => interpret[u].asInstanceOf[Option[Schema[Any]]].map(s => (fieldName, s))
}
}
sequence(schemasOpt).map(schemas => Record(schemas.toArray, null, null))
} else if tpSymbol.flags.is(Flags.JavaDefined) && scala.util.Try(checkGetterAndSetters[T]).isSuccess then {
val schemasOpt = tpSymbol.declaredMethods.collect {
case s if s.name.toString.startsWith("get") && s.isDefDef=>
val fieldName: String = s.name.toString.drop(3)
val fieldType: TypeRepr = tp.memberType(s)
fieldType match {
case MethodType(_, _, returnTpt) =>
returnTpt.asType match {
case '[u] => interpret[u].asInstanceOf[Option[Schema[Any]]].map(s => (fieldName, s))
}
}
}
// RawRecord is used for JavaBeans, not Record
sequence(schemasOpt).map(schemas => Record(schemas.toArray, null, null))
} else None
}
}

trait ToMacro {
Expand All @@ -36,7 +186,6 @@ trait ToMacro {
* at compile time.
* @see To#unsafe
*/
// TODO: scala3
inline def safe[I, O](inline si: Schema[I], inline so: Schema[O]): To[I, O] =
???
inline def safe[I, O](using inline iSchema: Schema[I], inline oSchema: Schema[O]): To[I, O] =
${ ToMacro.safeImpl('iSchema, 'oSchema) }
}
28 changes: 28 additions & 0 deletions scio-core/src/test/scala/com/spotify/scio/JavaBeanA.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.spotify.scio;

class JavaBeanA implements java.io.Serializable {
private String firstName = null;
private String lastName = null;
private int age = 0;

public JavaBeanA() {
}
public String getFirstName(){
return firstName;
}
public String getLastName(){
return lastName;
}
public int getAge(){
return age;
}
public void setFirstName(String firstName){
this.firstName = firstName;
}
public void setLastName(String lastName){
this.lastName = lastName;
}
public void setAge(int age){
this.age = age;
}
}
28 changes: 28 additions & 0 deletions scio-core/src/test/scala/com/spotify/scio/JavaBeanB.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.spotify.scio;

class JavaBeanB implements java.io.Serializable {
private String name = null;
private String uuid = null;
private int money = 0;

public JavaBeanB() {
}
public String getName(){
return name;
}
public String getUuid(){
return uuid;
}
public int getMoney(){
return money;
}
public void setName(String name){
this.name = name;
}
public void setUuid(String uuid){
this.uuid = uuid;
}
public void setMoney(int money){
this.money = money;
}
}
28 changes: 28 additions & 0 deletions scio-core/src/test/scala/com/spotify/scio/JavaBeanC.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.spotify.scio;

class JavaBeanC implements java.io.Serializable {
private String firstName = null;
private String lastName = null;
private int age = 0;

public JavaBeanC() {
}
public String getFirstName(){
return firstName;
}
public String getLastName(){
return lastName;
}
public int getAge(){
return age;
}
public void setFirstName(String firstName){
this.firstName = firstName;
}
public void setLastName(String lastName){
this.lastName = lastName;
}
public void setAge(int age){
this.age = age;
}
}
64 changes: 64 additions & 0 deletions scio-core/src/test/scala/com/spotify/scio/ToSafeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.spotify.scio

import com.spotify.scio.schemas.To

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

case class JavaListInt(l: java.util.List[java.lang.Integer])
case class JavaListString(l: java.util.List[java.lang.String])
case class ListInt(l: List[Int])
case class JavaSource(b: java.lang.Boolean)
case class Source(b: Boolean)
case class Dest(b: Boolean)
case class Mistake(b: Int)
case class Mistake2(c: Boolean)

case class Sources(name: String, links: List[Array[Byte]])
case class Destinations(name: String, links: List[Array[Byte]])
case class DestinationsWrong(name: String, links: List[Array[Int]])

class ToSafeTest extends AnyFlatSpec with Matchers {
"To.safe" should "generate a conversion on compatible flat case class schemas" in {
To.safe[Source, Dest]
}

it should "generate a conversion between java.lang.Boolean and Boolean" in {
To.safe[JavaSource, Source]
To.safe[Source, JavaSource]
}

it should "generate a conversion between java.util.List[java.lang.Integer] and List[Int]" in {
To.safe[JavaListInt, ListInt]
To.safe[ListInt, JavaListInt]
}

it should "fail on incompatible Java types" in {
"To.safe[JavaListString, JavaListInt]" shouldNot compile
"To.safe[JavaListString, ListInt]" shouldNot compile
}

it should "fail on incompatible flat case class schemas" in {
"To.safe[Source, Mistake2]" shouldNot compile
"To.safe[Source, Mistake]" shouldNot compile
}

it should "generate a conversion on compatible nested case class schemas" in {
To.safe[Sources, Destinations]
}

it should "fail on incompatible nested case class schemas" in {
"To.safe[Sources, DestinationsWrong]" shouldNot compile
}

it should "work with java beans" in {
"To.safe[JavaBeanA, JavaBeanB]" shouldNot compile
"To.safe[JavaBeanB, JavaBeanA]" shouldNot compile

"To.safe[JavaBeanB, JavaBeanC]" shouldNot compile
"To.safe[JavaBeanC, JavaBeanB]" shouldNot compile

To.safe[JavaBeanA, JavaBeanC]
To.safe[JavaBeanC, JavaBeanA]
}
}
Loading