diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvIO.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvIO.scala index 9003938e12..fc08dfe0b1 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvIO.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvIO.scala @@ -17,27 +17,26 @@ package com.spotify.scio.extra.csv -import java.io.{Reader, Writer} -import java.nio.channels.{Channels, WritableByteChannel} -import java.nio.charset.StandardCharsets +import com.spotify.scio.ScioContext import com.spotify.scio.coders.Coder import com.spotify.scio.io._ -import com.spotify.scio.ScioContext -import com.spotify.scio.util.ScioUtil -import com.spotify.scio.util.FilenamePolicySupplier +import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil} import com.spotify.scio.values.SCollection -import kantan.csv._ -import kantan.codecs.compat._ import kantan.csv.CsvConfiguration.{Header, QuotePolicy} import kantan.csv.engine.ReaderEngine import kantan.csv.ops._ -import org.apache.beam.sdk.{io => beam} -import org.apache.beam.sdk.io.{Compression, FileIO} -import org.apache.beam.sdk.io.FileIO.ReadableFile +import kantan.csv.{CsvConfiguration, HeaderCodec, HeaderDecoder, HeaderEncoder} import org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment -import org.apache.beam.sdk.transforms.{DoFn, PTransform, ParDo} +import org.apache.beam.sdk.io.FileIO.ReadableFile +import org.apache.beam.sdk.io.{Compression, FileIO} import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement} +import org.apache.beam.sdk.transforms.{DoFn, PTransform, ParDo} import org.apache.beam.sdk.values.PCollection +import org.apache.beam.sdk.{io => beam} + +import java.io.Reader +import java.nio.channels.Channels +import java.nio.charset.StandardCharsets /** * This package uses a CSV mapper called [[https://nrinaudo.github.io/kantan.csv/ Kantan]]. @@ -62,7 +61,7 @@ import org.apache.beam.sdk.values.PCollection * {{{ * case class User(name: String, age: Int) * implicit val decoder = RowDecoder.ordered { (name: String, age: Int) => User(name, age) } - * val csvConfiguration = CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfig.withoutHeader) + * val csvConfiguration = CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfiguration.withoutHeader) * val users: SCollection[User] = scioContext.csvFile(path, csvConfiguration) * }}} * @@ -238,24 +237,4 @@ object CsvIO { .foreach(out.output) } } - - final private class CsvSink[T: HeaderEncoder](csvConfig: CsvConfiguration) - extends FileIO.Sink[T] { - var csvWriter: CsvWriter[T] = _ - var byteChannelWriter: Writer = _ - - override def open(channel: WritableByteChannel): Unit = { - byteChannelWriter = Channels.newWriter(channel, StandardCharsets.UTF_8.name()) - csvWriter = byteChannelWriter.asCsvWriter[T](csvConfig) - } - - override def write(element: T): Unit = { - csvWriter.write(element) - () - } - - override def flush(): Unit = - byteChannelWriter.flush() - } - } diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/dynamic.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/dynamic.scala new file mode 100644 index 0000000000..021851622b --- /dev/null +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/dynamic.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.extra.csv + +import com.spotify.scio.extra.csv.dynamic.syntax.AllSyntax + +/** + * CSV package for dynamic destinations. Import All. + * + * {{{ + * import com.spotify.scio.extra.csv.dynamic._ + * }}} + */ +package object dynamic extends AllSyntax diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/AllSyntax.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/AllSyntax.scala new file mode 100644 index 0000000000..648a848cb3 --- /dev/null +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/AllSyntax.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.extra.csv.dynamic.syntax + +trait AllSyntax extends SCollectionSyntax diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/SCollectionSyntax.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/SCollectionSyntax.scala new file mode 100644 index 0000000000..f8ae1698b7 --- /dev/null +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/dynamic/syntax/SCollectionSyntax.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.extra.csv.dynamic.syntax + +import com.spotify.scio.annotations.experimental +import com.spotify.scio.coders.Coder +import com.spotify.scio.extra.csv.{CsvIO, CsvSink} +import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps.writeDynamic +import com.spotify.scio.io.{ClosedTap, EmptyTap} +import com.spotify.scio.values.SCollection +import kantan.csv.{CsvConfiguration, HeaderEncoder} +import org.apache.beam.sdk.io.Compression + +final class DynamicCsvSCollectionOps[T]( + private val self: SCollection[T] +) extends AnyVal { + + /** Save this SCollection of records as CSV files written to dynamic destinations. */ + @experimental + def saveAsDynamicCsvFile( + path: String, + suffix: String = CsvIO.WriteParam.DefaultSuffix, + prefix: String = CsvIO.WriteParam.DefaultPrefix, + numShards: Int = CsvIO.WriteParam.DefaultNumShards, + compression: Compression = CsvIO.WriteParam.DefaultCompression, + tempDirectory: String = CsvIO.WriteParam.DefaultTempDirectory, + csvConfig: CsvConfiguration = CsvIO.WriteParam.DefaultCsvConfig + )( + destinationFn: T => String + )(implicit coder: Coder[T], enc: HeaderEncoder[T]): ClosedTap[Nothing] = { + if (self.context.isTest) { + throw new NotImplementedError( + "CSV file with dynamic destinations cannot be used in a test context" + ) + } else { + val sink = new CsvSink(csvConfig) + val write = writeDynamic( + path = path, + destinationFn = destinationFn, + numShards = numShards, + prefix = prefix, + suffix = suffix, + tempDirectory = tempDirectory + ).withCompression(compression).via(sink) + self.applyInternal(write) + } + ClosedTap[Nothing](EmptyTap) + } +} + +trait SCollectionSyntax { + implicit def dynamicCsvSCollectionOps[T]( + sc: SCollection[T] + ): DynamicCsvSCollectionOps[T] = + new DynamicCsvSCollectionOps(sc) +} diff --git a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/package.scala b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/package.scala index 50ce879a68..7c78a88de4 100644 --- a/scio-extra/src/main/scala/com/spotify/scio/extra/csv/package.scala +++ b/scio-extra/src/main/scala/com/spotify/scio/extra/csv/package.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020 Spotify AB. + * Copyright 2024 Spotify AB. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,5 +18,38 @@ package com.spotify.scio.extra import com.spotify.scio.extra.csv.syntax.AllSyntax +import kantan.csv.ops.toCsvOutputOps +import kantan.csv.{CsvConfiguration, CsvWriter, HeaderEncoder} +import org.apache.beam.sdk.io.FileIO -package object csv extends AllSyntax +import java.io.Writer +import java.nio.channels.{Channels, WritableByteChannel} +import java.nio.charset.StandardCharsets + +/** + * Main package for CSV type-safe APIs. Import all. + * + * {{{ + * import com.spotify.scio.extra.csv._ + * }}} + */ +package object csv extends AllSyntax { + final private[scio] class CsvSink[T: HeaderEncoder](csvConfig: CsvConfiguration) + extends FileIO.Sink[T] { + @transient private var csvWriter: CsvWriter[T] = _ + @transient private var byteChannelWriter: Writer = _ + + override def open(channel: WritableByteChannel): Unit = { + byteChannelWriter = Channels.newWriter(channel, StandardCharsets.UTF_8.name()) + csvWriter = byteChannelWriter.asCsvWriter[T](csvConfig) + } + + override def write(element: T): Unit = { + csvWriter.write(element) + () + } + + override def flush(): Unit = + byteChannelWriter.flush() + } +} diff --git a/scio-extra/src/test/scala/com/spotify/scio/extra/csv/dynamic/CsvDynamicTest.scala b/scio-extra/src/test/scala/com/spotify/scio/extra/csv/dynamic/CsvDynamicTest.scala new file mode 100644 index 0000000000..a435855b14 --- /dev/null +++ b/scio-extra/src/test/scala/com/spotify/scio/extra/csv/dynamic/CsvDynamicTest.scala @@ -0,0 +1,181 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.extra.csv.dynamic + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.testing.PipelineSpec +import com.spotify.scio.values.SCollection +import org.apache.commons.io.FileUtils + +import java.nio.file.{Files, Path} +import scala.jdk.CollectionConverters.IteratorHasAsScala +import scala.util.Random + +case class TypedRecord( + int: Int, + long: Long, + float: Float, + bool: Boolean, + string: String +) + +trait CsvDynamicTest extends PipelineSpec { + val aNames: Seq[String] = Seq("Anna", "Alice", "Albrecht", "Amy", "Arlo", "Agnes") + val bNames: Seq[String] = Seq("Bob", "Barbara", "Barry", "Betty", "Brody", "Bard") + + def dynamicTest[T: Coder]( + input: Seq[T], + save: (SCollection[T], Path) => ClosedTap[_], + read: (ScioContext, Path) => (SCollection[String], SCollection[String]) + ): Unit = { + val tmpDir = Files.createTempDirectory("csv-dynamic-io-") + val sc = ScioContext() + save(sc.parallelize(input), tmpDir) + sc.run() + verifyOutput(tmpDir, "0", "1") + + val sc2 = ScioContext() + val (lines0, lines1) = read(sc2, tmpDir) + + lines0 should containInAnyOrder(aNames) + lines1 should containInAnyOrder(bNames) + sc2.run() + FileUtils.deleteDirectory(tmpDir.toFile) + } + + private def verifyOutput(path: Path, expected: String*): Unit = { + val p = path + val actual = Files + .list(p) + .iterator() + .asScala + .filterNot(_.toFile.getName.startsWith(".")) + .toSet + actual shouldBe expected.map(p.resolve).toSet + } +} + +class TypedCsvDynamicTest extends CsvDynamicTest { + import kantan.csv._ + import com.spotify.scio.extra.csv._ + + it should "write with headers" in { + implicit val encoder: HeaderEncoder[TypedRecord] = + HeaderEncoder.caseEncoder( + "int", + "long", + "float", + "bool", + "string" + )(TypedRecord.unapply) + implicit val decoder: HeaderDecoder[TypedRecord] = HeaderDecoder.decoder( + "int", + "long", + "float", + "bool", + "string" + )(TypedRecord.apply) + + def typedRec(int: Int, name: String): TypedRecord = + TypedRecord(int, 0L, 0f, false, name) + val input: Seq[TypedRecord] = + Random.shuffle(aNames.map(n => typedRec(0, n)) ++ bNames.map(n => typedRec(1, n))) + + dynamicTest[TypedRecord]( + input, + (scoll, tmpDir) => + scoll.saveAsDynamicCsvFile(tmpDir.toAbsolutePath.toString)(t => s"${t.int}"), + (sc2, tmpDir) => { + val lines0 = sc2.csvFile(s"$tmpDir/0/*.csv").map(_.string) + val lines1 = sc2.csvFile(s"$tmpDir/1/*.csv").map(_.string) + (lines0, lines1) + } + ) + } + + it should "write without headers" in { + implicit val encoder: HeaderEncoder[TypedRecord] = + HeaderEncoder.caseEncoder( + "int", + "long", + "float", + "bool", + "string" + )(TypedRecord.unapply) + implicit val decoder: RowDecoder[TypedRecord] = RowDecoder.ordered { + (int: Int, long: Long, float: Float, bool: Boolean, string: String) => + TypedRecord(int, long, float, bool, string) + } + val writeConfig = + CsvIO.WriteParam.DefaultCsvConfig.copy(header = CsvConfiguration.Header.None) + val readConfig = + CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfiguration.withoutHeader) + + def typedRec(int: Int, name: String): TypedRecord = + TypedRecord(int, 0L, 0f, false, name) + val input: Seq[TypedRecord] = + Random.shuffle(aNames.map(n => typedRec(0, n)) ++ bNames.map(n => typedRec(1, n))) + + dynamicTest[TypedRecord]( + input, + (scoll, tmpDir) => + scoll.saveAsDynamicCsvFile(tmpDir.toAbsolutePath.toString, csvConfig = writeConfig)(t => + s"${t.int}" + ), + (sc2, tmpDir) => { + val lines0 = sc2.csvFile(s"$tmpDir/0/*.csv", readConfig).map(_.string) + val lines1 = sc2.csvFile(s"$tmpDir/1/*.csv", readConfig).map(_.string) + (lines0, lines1) + } + ) + } + + it should "write with a row encoder" in { + implicit val encoder: RowEncoder[TypedRecord] = + RowEncoder.encoder(0, 1, 2, 3, 4)((t: TypedRecord) => + (t.int, t.long, t.float, t.bool, t.string) + ) + implicit val decoder: RowDecoder[TypedRecord] = RowDecoder.ordered { + (int: Int, long: Long, float: Float, bool: Boolean, string: String) => + TypedRecord(int, long, float, bool, string) + } + val writeConfig = + CsvIO.WriteParam.DefaultCsvConfig.copy(header = CsvConfiguration.Header.None) + val readConfig = + CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfiguration.withoutHeader) + + def typedRec(int: Int, name: String): TypedRecord = + TypedRecord(int, 0L, 0f, false, name) + val input: Seq[TypedRecord] = + Random.shuffle(aNames.map(n => typedRec(0, n)) ++ bNames.map(n => typedRec(1, n))) + + dynamicTest[TypedRecord]( + input, + (scoll, tmpDir) => + scoll.saveAsDynamicCsvFile(tmpDir.toAbsolutePath.toString, csvConfig = writeConfig)(t => + s"${t.int}" + ), + (sc2, tmpDir) => { + val lines0 = sc2.csvFile(s"$tmpDir/0/*.csv", readConfig).map(_.string) + val lines1 = sc2.csvFile(s"$tmpDir/1/*.csv", readConfig).map(_.string) + (lines0, lines1) + } + ) + } +}