Skip to content

Commit

Permalink
Refactored AvroParquet ReadParams and improved tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shnapz committed Dec 21, 2023
1 parent b952934 commit aa95561
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.values.SCollection
Expand All @@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.{
AvroDataSupplier,
AvroParquetInputFormat,
AvroParquetReader,
AvroReadSupport,
AvroWriteSupport,
GenericDataSupplier
Expand All @@ -61,11 +60,10 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
override type WriteP = ParquetAvroIO.WriteParam
override val tapT: TapT.Aux[T, T] = TapOf[T]

private val cls = ScioUtil.classOf[T]

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val bCoder = CoderMaterializer.beam(sc, Coder[T])
sc.pipeline.getCoderRegistry.registerCoderForClass(ScioUtil.classOf[T], bCoder)
params.setupConfig()
params.read(sc, path)(Coder[T])
}

Expand Down Expand Up @@ -116,19 +114,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
}

override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = ParquetConfiguration.ofNullable(params.conf)
if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
)
) {
ParquetAvroIO.log.warn(
s"Detected a logical type in schema `$writerSchema`, but Configuration key `${AvroWriteSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}
val writerSchema = params.setupConfigAndGetSchema[T]()

data.applyInternal(
parquetOut(
Expand All @@ -137,7 +123,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
params.suffix,
params.numShards,
params.compression,
conf,
params.conf,
params.filenamePolicySupplier,
params.prefix,
params.shardNameTemplate,
Expand All @@ -159,7 +145,7 @@ object ParquetAvroIO {
object ReadParam {
val DefaultProjection: Schema = null
val DefaultPredicate: FilterPredicate = null
val DefaultConfiguration: Configuration = null
def DefaultConfiguration: Configuration = ParquetConfiguration.empty()
val DefaultSuffix: String = null

private[scio] def apply[T: ClassTag](params: WriteParam): ReadParam[T, T] =
Expand All @@ -184,10 +170,16 @@ object ParquetAvroIO {
if (isSpecific) ReflectData.get().getSchema(avroClass) else projection

def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = ParquetConfiguration.ofNullable(conf)
if (ParquetReadConfiguration.getUseSplittableDoFn(conf, sc.options)) {
readSplittableDoFn(sc, path)
} else {
readLegacy(sc, path)
}
}

def setupConfig(): Unit = {
if (
jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO
.containsLogicalType(
readSchema
)
Expand All @@ -198,74 +190,57 @@ object ParquetAvroIO {
)
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
jobConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)
setDefaultGenericDataSupplier()
}

if (ParquetReadConfiguration.getUseSplittableDoFn(jobConf, sc.options)) {
readSplittableDoFn(sc, jobConf, path)
} else {
readLegacy(sc, jobConf, path)
AvroReadSupport.setAvroReadSchema(conf, readSchema)
if (projection != null) {
AvroReadSupport.setRequestedProjection(conf, projection)
}
}

def setReadSchemas[A, T](params: ParquetAvroIO.ReadParam[A, T]): Unit = {
AvroReadSupport.setAvroReadSchema(conf, params.readSchema)
if (params.projection != null) {
AvroReadSupport.setRequestedProjection(conf, params.projection)
if (predicate != null) {
ParquetInputFormat.setFilterPredicate(conf, predicate)
}
}

def setDefaultGenericDataSupplier(): Unit = {
if (conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
classOf[GenericDataSupplier],
classOf[AvroDataSupplier]
)
// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)
if (conf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
conf.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
classOf[GenericDataSupplier],
classOf[AvroDataSupplier]
)
}
}
}

private def readSplittableDoFn(sc: ScioContext, conf: Configuration, path: String)(implicit
private def readSplittableDoFn(sc: ScioContext, path: String)(implicit
coder: Coder[T]
): SCollection[T] = {
setReadSchemas(this)
if (predicate != null) {
ParquetInputFormat.setFilterPredicate(conf, predicate)
}

val filePattern = ScioUtil.filePattern(path, suffix)
val bCoder = CoderMaterializer.beam(sc, coder)
val cleanedProjectionFn = ClosureCleaner.clean(projectionFn)

sc.applyTransform(
ParquetRead.read[A, T](
ReadSupportFactory.avro,
new SerializableConfiguration(conf),
path,
Functions.serializableFn(cleanedProjectionFn)
filePattern,
Functions.serializableFn { x =>
cleanedProjectionFn(x)
}
)
).setCoder(bCoder)
}

private def readLegacy(sc: ScioContext, conf: Configuration, path: String)(implicit
private def readLegacy(sc: ScioContext, path: String)(implicit
coder: Coder[T]
): SCollection[T] = {
val job = Job.getInstance(conf)
GcsConnectorUtil.setInputPaths(sc, job, path)
val filePattern = ScioUtil.filePattern(path, suffix)
GcsConnectorUtil.setInputPaths(sc, job, filePattern)
job.setInputFormatClass(classOf[AvroParquetInputFormat[T]])
job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void])
job.getConfiguration.setClass("value.class", avroClass, avroClass)
AvroParquetInputFormat.setAvroReadSchema(job, readSchema)

if (projection != null) {
AvroParquetInputFormat.setRequestedProjection(job, projection)
}
if (predicate != null) {
ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate)
}

val g = ClosureCleaner.clean(projectionFn) // defeat closure
val aCls = avroClass
Expand Down Expand Up @@ -305,7 +280,7 @@ object ParquetAvroIO {
val DefaultNumShards: Int = 0
val DefaultSuffix: String = ".parquet"
val DefaultCompression: CompressionCodecName = CompressionCodecName.ZSTD
val DefaultConfiguration: Configuration = null
def DefaultConfiguration: Configuration = ParquetConfiguration.empty()
val DefaultFilenamePolicySupplier: FilenamePolicySupplier = null
val DefaultPrefix: String = null
val DefaultShardNameTemplate: String = null
Expand All @@ -322,5 +297,25 @@ object ParquetAvroIO {
prefix: String = WriteParam.DefaultPrefix,
shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
tempDirectory: String = WriteParam.DefaultTempDirectory
)
) {

private[avro] def setupConfigAndGetSchema[T: ClassTag](): Schema = {
val avroClass = ScioUtil.classOf[T]
val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass
val writerSchema = if (isSpecific) ReflectData.get().getSchema(avroClass) else schema

if (
conf.get(AvroWriteSupport.AVRO_DATA_SUPPLIER) == null && ParquetAvroIO.containsLogicalType(
writerSchema
)
) {
ParquetAvroIO.log.warn(
s"Detected a logical type in schema `$writerSchema`, but Configuration key `${AvroWriteSupport.AVRO_DATA_SUPPLIER}`" +
s"was not set to a logical type supplier. See https://spotify.github.io/scio/io/Parquet.html#logical-types for more information."
)
}

writerSchema
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,13 @@ final case class ParquetAvroTap[A, T: ClassTag: Coder](
) extends Tap[T] {
override def value: Iterator[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val conf = ParquetConfiguration
.ofNullable(params.conf)
params.setReadSchemas(params)
if (!params.isSpecific) {
params.setDefaultGenericDataSupplier()
}
params.setupConfig()

val xs = FileSystems.`match`(filePattern).metadata().asScala.toList
xs.iterator.flatMap { metadata =>
val reader = AvroParquetReader
.builder[A](BeamInputFile.of(metadata.resourceId()))
.withConf(conf)
.withConf(params.conf)
.build()
new Iterator[T] {
private var current: A = reader.read()
Expand Down
Loading

0 comments on commit aa95561

Please sign in to comment.