Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In SMB and ParquetAvroIOTap set GenericDataSupplier and read schemas #5121

Merged
merged 12 commits into from
Jan 18, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{ScioIO, Tap, TapOf, TapT}
import com.spotify.scio.parquet.read.{ParquetRead, ParquetReadConfiguration, ReadSupportFactory}
import com.spotify.scio.parquet.{BeamInputFile, GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.parquet.{GcsConnectorUtil, ParquetConfiguration}
import com.spotify.scio.testing.TestDataManager
import com.spotify.scio.util.{FilenamePolicySupplier, Functions, ScioUtil}
import com.spotify.scio.values.SCollection
Expand All @@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.parquet.avro.{
AvroDataSupplier,
AvroParquetInputFormat,
AvroParquetReader,
AvroReadSupport,
AvroWriteSupport,
GenericDataSupplier
Expand All @@ -61,11 +60,10 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
override type WriteP = ParquetAvroIO.WriteParam
override val tapT: TapT.Aux[T, T] = TapOf[T]

private val cls = ScioUtil.classOf[T]

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = {
val bCoder = CoderMaterializer.beam(sc, Coder[T])
sc.pipeline.getCoderRegistry.registerCoderForClass(ScioUtil.classOf[T], bCoder)
params.setupConfig()
params.read(sc, path)(Coder[T])
}

Expand Down Expand Up @@ -101,7 +99,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
)(ScioUtil.strippedPath(path), suffix)
val dynamicDestinations = DynamicFileDestinations
.constant(fp, SerializableFunctions.identity[T])
val job = Job.getInstance(ParquetConfiguration.ofNullable(conf))
val job = Job.getInstance(conf)
if (isLocalRunner) GcsConnectorUtil.setCredentials(job)

val sink = new ParquetAvroFileBasedSink[T](
Expand All @@ -116,9 +114,9 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
}

override protected def write(data: SCollection[T], params: WriteP): Tap[T] = {
val isAssignable = classOf[SpecificRecord].isAssignableFrom(cls)
val writerSchema = if (isAssignable) ReflectData.get().getSchema(cls) else params.schema
val conf = ParquetConfiguration.ofNullable(params.conf)
val avroClass = ScioUtil.classOf[T]
val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass
val writerSchema = if (isSpecific) ReflectData.get().getSchema(avroClass) else params.schema

data.applyInternal(
parquetOut(
Expand All @@ -127,7 +125,7 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
params.suffix,
params.numShards,
params.compression,
conf,
ParquetConfiguration.ofNullable(params.conf),
params.filenamePolicySupplier,
params.prefix,
params.shardNameTemplate,
Expand All @@ -144,8 +142,6 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
}

object ParquetAvroIO {
private lazy val log = LoggerFactory.getLogger(getClass)

object ReadParam {
val DefaultProjection: Schema = null
val DefaultPredicate: FilterPredicate = null
Expand All @@ -168,75 +164,70 @@ object ParquetAvroIO {
conf: Configuration = ReadParam.DefaultConfiguration,
suffix: String = null
) {
lazy val confOrDefault = ParquetConfiguration.ofNullable(conf)
val avroClass: Class[A] = ScioUtil.classOf[A]
val isSpecific: Boolean = classOf[SpecificRecord] isAssignableFrom avroClass
val readSchema: Schema =
if (isSpecific) ReflectData.get().getSchema(avroClass) else projection

def read(sc: ScioContext, path: String)(implicit coder: Coder[T]): SCollection[T] = {
val jobConf = ParquetConfiguration.ofNullable(conf)
if (ParquetReadConfiguration.getUseSplittableDoFn(confOrDefault, sc.options)) {
readSplittableDoFn(sc, path)
} else {
readLegacy(sc, path)
}
}

def setupConfig(): Unit = {
AvroReadSupport.setAvroReadSchema(confOrDefault, readSchema)
if (projection != null) {
AvroReadSupport.setRequestedProjection(confOrDefault, projection)
}

if (predicate != null) {
ParquetInputFormat.setFilterPredicate(confOrDefault, predicate)
}

// Needed to make GenericRecord read by parquet-avro work with Beam's
// org.apache.beam.sdk.extensions.avro.coders.AvroCoder
if (!isSpecific) {
jobConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)

if (jobConf.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
jobConf.setClass(
confOrDefault.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false)
if (confOrDefault.get(AvroReadSupport.AVRO_DATA_SUPPLIER) == null) {
confOrDefault.setClass(
AvroReadSupport.AVRO_DATA_SUPPLIER,
classOf[GenericDataSupplier],
classOf[AvroDataSupplier]
)
}
}

if (ParquetReadConfiguration.getUseSplittableDoFn(jobConf, sc.options)) {
readSplittableDoFn(sc, jobConf, path)
} else {
readLegacy(sc, jobConf, path)
}
}

private def readSplittableDoFn(sc: ScioContext, conf: Configuration, path: String)(implicit
private def readSplittableDoFn(sc: ScioContext, path: String)(implicit
coder: Coder[T]
): SCollection[T] = {
AvroReadSupport.setAvroReadSchema(conf, readSchema)
if (projection != null) {
AvroReadSupport.setRequestedProjection(conf, projection)
}
if (predicate != null) {
ParquetInputFormat.setFilterPredicate(conf, predicate)
}

val filePattern = ScioUtil.filePattern(path, suffix)
val bCoder = CoderMaterializer.beam(sc, coder)
val cleanedProjectionFn = ClosureCleaner.clean(projectionFn)

sc.applyTransform(
ParquetRead.read[A, T](
ReadSupportFactory.avro,
new SerializableConfiguration(conf),
path,
new SerializableConfiguration(confOrDefault),
filePattern,
Functions.serializableFn(cleanedProjectionFn)
)
).setCoder(bCoder)
}

private def readLegacy(sc: ScioContext, conf: Configuration, path: String)(implicit
private def readLegacy(sc: ScioContext, path: String)(implicit
coder: Coder[T]
): SCollection[T] = {
val job = Job.getInstance(conf)
GcsConnectorUtil.setInputPaths(sc, job, path)
val job = Job.getInstance(confOrDefault)
val filePattern = ScioUtil.filePattern(path, suffix)
GcsConnectorUtil.setInputPaths(sc, job, filePattern)
job.setInputFormatClass(classOf[AvroParquetInputFormat[T]])
job.getConfiguration.setClass("key.class", classOf[Void], classOf[Void])
job.getConfiguration.setClass("value.class", avroClass, avroClass)
AvroParquetInputFormat.setAvroReadSchema(job, readSchema)

if (projection != null) {
AvroParquetInputFormat.setRequestedProjection(job, projection)
}
if (predicate != null) {
ParquetInputFormat.setFilterPredicate(job.getConfiguration, predicate)
}

val g = ClosureCleaner.clean(projectionFn) // defeat closure
val aCls = avroClass
Expand Down Expand Up @@ -285,30 +276,3 @@ object ParquetAvroIO {
tempDirectory: String = WriteParam.DefaultTempDirectory
)
}

final case class ParquetAvroTap[A, T: ClassTag: Coder](
path: String,
params: ParquetAvroIO.ReadParam[A, T]
) extends Tap[T] {
override def value: Iterator[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
val xs = FileSystems.`match`(filePattern).metadata().asScala.toList
xs.iterator.flatMap { metadata =>
val reader = AvroParquetReader
.builder[A](BeamInputFile.of(metadata.resourceId()))
.withConf(ParquetConfiguration.ofNullable(params.conf))
.build()
new Iterator[T] {
private var current: A = reader.read()
override def hasNext: Boolean = current != null
override def next(): T = {
val r = params.projectionFn(current)
current = reader.read()
r
}
}
}
}
override def open(sc: ScioContext): SCollection[T] =
sc.read(ParquetAvroIO[T](path))(params)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet.avro
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved this class out from package object


import com.spotify.scio.parquet.ParquetOutputFile
import org.apache.avro.Schema
import org.apache.beam.sdk.io.FileIO
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetWriter}

import java.nio.channels.WritableByteChannel

class ParquetAvroSink[T](
schema: Schema,
val compression: CompressionCodecName,
val conf: SerializableConfiguration
) extends FileIO.Sink[T] {
private val schemaString = schema.toString
private var writer: ParquetWriter[T] = _

override def open(channel: WritableByteChannel): Unit = {
val schema = new Schema.Parser().parse(schemaString)
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
val rowGroupSize =
conf.get.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)
writer = AvroParquetWriter
.builder[T](new ParquetOutputFile(channel))
.withSchema(schema)
.withCompressionCodec(compression)
.withConf(conf.get)
.withRowGroupSize(rowGroupSize)
.build
}

override def write(element: T): Unit = writer.write(element)

override def flush(): Unit = writer.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.parquet.avro

import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.io.Tap
import com.spotify.scio.parquet.{BeamInputFile, ParquetConfiguration}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
import org.apache.beam.sdk.io._
import org.apache.parquet.avro.AvroParquetReader

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

final case class ParquetAvroTap[A, T: ClassTag: Coder](
path: String,
params: ParquetAvroIO.ReadParam[A, T]
) extends Tap[T] {
override def value: Iterator[T] = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
params.setupConfig()

val xs = FileSystems.`match`(filePattern).metadata().asScala.toList
xs.iterator.flatMap { metadata =>
val reader = AvroParquetReader
.builder[A](BeamInputFile.of(metadata.resourceId()))
.withConf(params.confOrDefault)
.build()
new Iterator[T] {
private var current: A = reader.read()
override def hasNext: Boolean = current != null
override def next(): T = {
val r = params.projectionFn(current)
current = reader.read()
r
}
}
}
}
override def open(sc: ScioContext): SCollection[T] =
sc.read(ParquetAvroIO[T](path))(params)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
package com.spotify.scio.parquet

import com.spotify.scio.parquet.avro.syntax.Syntax
import org.apache.avro.Schema
import org.apache.beam.sdk.io.FileIO
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetWriter}

import java.nio.channels.WritableByteChannel

/**
* Main package for Parquet Avro APIs. Import all.
Expand All @@ -41,28 +33,4 @@ package object avro extends Syntax {

/** Alias for `me.lyh.parquet.avro.Predicate`. */
val Predicate = me.lyh.parquet.avro.Predicate

class ParquetAvroSink[T](
schema: Schema,
val compression: CompressionCodecName,
val conf: SerializableConfiguration
) extends FileIO.Sink[T] {
private val schemaString = schema.toString
private var writer: ParquetWriter[T] = _
override def open(channel: WritableByteChannel): Unit = {
val schema = new Schema.Parser().parse(schemaString)
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
val rowGroupSize =
conf.get.getInt(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)
writer = AvroParquetWriter
.builder[T](new ParquetOutputFile(channel))
.withSchema(schema)
.withCompressionCodec(compression)
.withConf(conf.get)
.withRowGroupSize(rowGroupSize)
.build
}
override def write(element: T): Unit = writer.write(element)
override def flush(): Unit = writer.close()
}
}
Loading