Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In SMB and ParquetAvroIOTap set GenericDataSupplier and read schemas #5121

Merged
merged 12 commits into from
Jan 18, 2024

Conversation

shnapz
Copy link
Contributor

@shnapz shnapz commented Dec 14, 2023

@@ -0,0 +1,38 @@
package com.spotify.scio.parquet.avro
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this class out from package object

val filePattern = ScioUtil.filePattern(path, params.suffix)
val conf = ParquetConfiguration
.ofNullable(params.conf)
conf.setReadSchemas(params)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except of these two lines, the rest of the class remains the same, just moved

@@ -241,21 +241,24 @@ public Read<T> withPredicate(Predicate<T> predicate) {
return toBuilder().setPredicate(predicate).build();
}

@SuppressWarnings("unchecked")
@Override
FileOperations<T> getFileOperations() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to reflect the same style as in Write and TransformOutput

@Test
public void testSpecificRecord() throws Exception {
final ParquetAvroFileOperations<AvroGeneratedUser> fileOperations =
ParquetAvroFileOperations.of(AvroGeneratedUser.getClassSchema());
ParquetAvroFileOperations.of(AvroGeneratedUser.class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Prod we use the overload with Schema for GenericRecords only

Copy link

codecov bot commented Dec 16, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (41c8df7) 62.99% compared to head (1ba9610) 63.06%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5121      +/-   ##
==========================================
+ Coverage   62.99%   63.06%   +0.06%     
==========================================
  Files         298      300       +2     
  Lines       10946    10945       -1     
  Branches      754      760       +6     
==========================================
+ Hits         6895     6902       +7     
+ Misses       4051     4043       -8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

final Schema schema =
getRecordClass() == null
? getSchema()
: new ReflectData(getRecordClass().getClassLoader()).getSchema(getRecordClass());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is identical resolution of schema through ReflectData in ParquetAvroFileOperations

@shnapz shnapz marked this pull request as ready for review December 18, 2023 15:27
@shnapz shnapz requested a review from clairemcginty December 18, 2023 15:27

implicit class ConfigurationSyntax(conf: Configuration) {

def setReadSchemas[A, T](params: ParquetAvroIO.ReadParam[A, T]): Unit = {
Copy link
Contributor

@clairemcginty clairemcginty Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of broadening the scope of this method and moving it into a class method of ParquetAvroIO.ReadParam? Like:

 final case class ReadParam[A: ClassTag, T: ClassTag] private (
    projectionFn: A => T,
    projection: Schema = ReadParam.DefaultProjection,
    predicate: FilterPredicate = ReadParam.DefaultPredicate,
    conf: Configuration = ReadParam.DefaultConfiguration,
    suffix: String = null
  ) {
   val avroClass: Class[A] = ScioUtil.classOf[A]
   val isSpecific = classOf[SpecificRecord] isAssignableFrom ScioUtil.classOf[T] 
   
   private[avro] def getReadConfiguration(): Configuration = {
       val c = ParquetConfiguration.ofNullable(conf)
       // Set schema/projection
       val readSchema: Schema = if (isSpecific) ReflectData.get().getSchema(avroClass) else params.projection

       AvroReadSupport.setAvroReadSchema(readSchema)
       if (params.projection != null) { /* set projection... */ }
       if (params.predicate != null) { /* set predicate */ }
   
       // Set data supplier
       if (!isSpecific && conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
         ....
      }
  }

That way it's easy to access from both read and tap methods 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! I missed it, the great place for common functions on input conf!

@shnapz shnapz requested a review from clairemcginty December 18, 2023 21:40
@shnapz shnapz force-pushed the akabas/ParquetAvro-dataSupplier branch from 1673bf2 to b952934 Compare December 18, 2023 23:01
@@ -219,13 +212,27 @@ object ParquetAvroIO {
}
}

def setReadSchemas[A, T](params: ParquetAvroIO.ReadParam[A, T]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadParam is already parameterized with [A, T]. I don't think we want type shadowing here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this function

val filePattern = ScioUtil.filePattern(path, params.suffix)
val conf = ParquetConfiguration
.ofNullable(params.conf)
params.setReadSchemas(params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to do this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably was tired when committed this, I am changing the approach

@shnapz shnapz requested a review from RustedBones January 3, 2024 23:56
@shnapz shnapz force-pushed the akabas/ParquetAvro-dataSupplier branch from aa95561 to 1b8ddac Compare January 17, 2024 01:24
Copy link
Contributor

@RustedBones RustedBones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that using an empty config as parameter allows some simplification, but I'd still prefer to pass a null there for consistency.

This is a personal preference, not a blocker. @clairemcginty might also have an opinion about it.

@@ -159,7 +145,7 @@ object ParquetAvroIO {
object ReadParam {
val DefaultProjection: Schema = null
val DefaultPredicate: FilterPredicate = null
val DefaultConfiguration: Configuration = null
def DefaultConfiguration: Configuration = ParquetConfiguration.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I'd prefer keeping this as null and handle it internally

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I think, partly because it matches other ScioIO param conventions and partly because I'm worried that over time we'll forget why this was made a def and refactor it back into a val :)

Copy link
Contributor Author

@shnapz shnapz Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to challenge this approach :) but fully agree, it is so easy to introduce a bug. As an alternative would be full clone of ParquetConfiguration after used passed it, because we modify this instance and we don't know what else a user does

Comment on lines 228 to 230
Functions.serializableFn { x =>
cleanedProjectionFn(x)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. can we revert to previous syntax?

current = reader.read()
r
}
private[avro] def setupConfigAndGetSchema[T: ClassTag](): Schema = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer getting rid of this helper method an just inlining it in ParquetAvroIO::write as before. Especially since once we merge the Parquet 1.13.1 upgrade, we'll be able to remove all the logical type checks, so there won't be much logic left :D

Alternately, we could convert it it into a method that just gets the Schema for a type T:

private[avro] def getSchemaForClass[T: ClassTag](): Schema = {
   val avroClass = ScioUtil.classOf[T]
   val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass
    if (isSpecific) ReflectData.get().getSchema(avroClass) else schema
}

since that logic is reused a bit in this file :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. In case of write it is used only once, but I aimed to have similarity with read param handling. I will inline

public static <V extends IndexedRecord> ParquetAvroFileOperations<V> of(
Schema schema, CompressionCodecName compression, FilterPredicate predicate,
Configuration conf) {
if (conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer setting this in ParquetAvroReader::prepareReader, similar to how we set model in ParquetAvroSink::open -- that way this logic doesn't depend on the user choosing this constructor specifically :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right, with the help of newly introduced private final Class<ValueT> recordClass; we can do that

@RustedBones RustedBones added this to the 0.14.0 milestone Jan 17, 2024
@shnapz shnapz force-pushed the akabas/ParquetAvro-dataSupplier branch from 0cbfe14 to 338dc55 Compare January 18, 2024 03:51
_.parquetAvroFile[TestRecord](_).map(identity)
)(_.saveAsParquetAvroFile(_))

forAllCases(readConfigs) { case (c, _) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main idea of changes in this file is to test both SDR and Legacy in the same test cases

@RustedBones RustedBones merged commit b48d9d6 into main Jan 18, 2024
11 checks passed
@RustedBones RustedBones deleted the akabas/ParquetAvro-dataSupplier branch January 18, 2024 08:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants