Skip to content

Commit

Permalink
feat: add dynamic csv
Browse files Browse the repository at this point in the history
  • Loading branch information
klDen committed Jan 5, 2024
1 parent 1671db7 commit 91bb15c
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 33 deletions.
46 changes: 14 additions & 32 deletions scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

package com.spotify.scio.extra.csv

import java.io.{Reader, Writer}
import java.nio.channels.{Channels, WritableByteChannel}
import java.nio.charset.StandardCharsets
import com.spotify.scio.ScioContext
import com.spotify.scio.coders.Coder
import com.spotify.scio.io._
import com.spotify.scio.ScioContext
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.util.FilenamePolicySupplier
import com.spotify.scio.util.{FilenamePolicySupplier, ScioUtil}
import com.spotify.scio.values.SCollection

import kantan.csv._
import kantan.codecs.compat._
import kantan.csv.CsvConfiguration.{Header, QuotePolicy}
import kantan.csv.engine.ReaderEngine
import kantan.csv.ops._
import org.apache.beam.sdk.{io => beam}
import org.apache.beam.sdk.io.{Compression, FileIO}
import org.apache.beam.sdk.io.FileIO.ReadableFile
import kantan.csv.{CsvConfiguration, HeaderCodec, HeaderDecoder, HeaderEncoder}
import org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment
import org.apache.beam.sdk.transforms.{DoFn, PTransform, ParDo}
import org.apache.beam.sdk.io.FileIO.ReadableFile
import org.apache.beam.sdk.io.{Compression, FileIO}
import org.apache.beam.sdk.transforms.DoFn.{Element, OutputReceiver, ProcessElement}
import org.apache.beam.sdk.transforms.{DoFn, PTransform, ParDo}
import org.apache.beam.sdk.values.PCollection
import org.apache.beam.sdk.{io => beam}

import java.io.Reader
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets

/**
* This package uses a CSV mapper called [[https://nrinaudo.github.io/kantan.csv/ Kantan]].
Expand All @@ -62,7 +64,7 @@ import org.apache.beam.sdk.values.PCollection
* {{{
* case class User(name: String, age: Int)
* implicit val decoder = RowDecoder.ordered { (name: String, age: Int) => User(name, age) }
* val csvConfiguration = CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfig.withoutHeader)
* val csvConfiguration = CsvIO.ReadParam(csvConfiguration = CsvIO.DefaultCsvConfiguration.withoutHeader)
* val users: SCollection[User] = scioContext.csvFile(path, csvConfiguration)
* }}}
*
Expand Down Expand Up @@ -194,7 +196,7 @@ object CsvIO {
.withSuffix(params.suffix)
.withNumShards(params.numShards)
.withCompression(params.compression)
.via(new CsvSink(params.csvConfiguration))
.via(new com.spotify.scio.extra.csv.CsvSink(params.csvConfiguration))

private def read[T: HeaderDecoder: Coder](sc: ScioContext, path: String, params: ReadParam) = {
val filePattern = ScioUtil.filePattern(path, params.suffix)
Expand Down Expand Up @@ -238,24 +240,4 @@ object CsvIO {
.foreach(out.output)
}
}

final private class CsvSink[T: HeaderEncoder](csvConfig: CsvConfiguration)
extends FileIO.Sink[T] {
var csvWriter: CsvWriter[T] = _
var byteChannelWriter: Writer = _

override def open(channel: WritableByteChannel): Unit = {
byteChannelWriter = Channels.newWriter(channel, StandardCharsets.UTF_8.name())
csvWriter = byteChannelWriter.asCsvWriter[T](csvConfig)
}

override def write(element: T): Unit = {
csvWriter.write(element)
()
}

override def flush(): Unit =
byteChannelWriter.flush()
}

}
45 changes: 45 additions & 0 deletions scio-extra/src/main/scala/com/spotify/scio/extra/csv/CsvSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.extra.csv

import kantan.csv.ops.toCsvOutputOps
import kantan.csv.{CsvConfiguration, CsvWriter, HeaderEncoder}
import org.apache.beam.sdk.io.FileIO

import java.io.Writer
import java.nio.channels.{Channels, WritableByteChannel}
import java.nio.charset.StandardCharsets

final private[scio] class CsvSink[T: HeaderEncoder](csvConfig: CsvConfiguration)
extends FileIO.Sink[T] {
@transient private var csvWriter: CsvWriter[T] = _
@transient private var byteChannelWriter: Writer = _

override def open(channel: WritableByteChannel): Unit = {
byteChannelWriter = Channels.newWriter(channel, StandardCharsets.UTF_8.name())
csvWriter = byteChannelWriter.asCsvWriter[T](csvConfig)
}

override def write(element: T): Unit = {
csvWriter.write(element)
()
}

override def flush(): Unit =
byteChannelWriter.flush()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.extra.csv

import com.spotify.scio.extra.csv.dynamic.syntax.AllSyntax

/**
* CSV package for dynamic destinations. Import All.
*
* {{{
* import com.spotify.scio.extra.csv.dynamic._
* }}}
*/
package object dynamic extends AllSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.extra.csv.dynamic.syntax

trait AllSyntax extends SCollectionSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.extra.csv.dynamic.syntax

import com.spotify.scio.annotations.experimental
import com.spotify.scio.coders.Coder
import com.spotify.scio.extra.csv.{CsvIO, CsvSink}
import com.spotify.scio.io.dynamic.syntax.DynamicSCollectionOps.writeDynamic
import com.spotify.scio.io.{ClosedTap, EmptyTap}
import com.spotify.scio.values.SCollection
import kantan.csv.{CsvConfiguration, HeaderEncoder}
import org.apache.beam.sdk.io.Compression

final class DynamicCsvSCollectionOps[T](
private val self: SCollection[T]
) extends AnyVal {

/** Save this SCollection of records as CSV files written to dynamic destinations. */
@experimental
def saveAsDynamicCsvFile(
path: String,
suffix: String = CsvIO.WriteParam.DefaultSuffix,
prefix: String = CsvIO.WriteParam.DefaultPrefix,
numShards: Int = CsvIO.WriteParam.DefaultNumShards,
compression: Compression = CsvIO.WriteParam.DefaultCompression,
tempDirectory: String = CsvIO.WriteParam.DefaultTempDirectory,
csvConfig: CsvConfiguration = CsvIO.WriteParam.DefaultCsvConfig
)(
destinationFn: T => String
)(implicit coder: Coder[T], enc: HeaderEncoder[T]): ClosedTap[Nothing] = {
if (self.context.isTest) {
throw new NotImplementedError(
"CSV file with dynamic destinations cannot be used in a test context"
)
} else {
val sink = new CsvSink(csvConfig)
val write = writeDynamic(
path = path,
destinationFn = destinationFn,
numShards = numShards,
prefix = prefix,
suffix = suffix,
tempDirectory = tempDirectory
).withCompression(compression).via(sink)
self.applyInternal(write)
}
ClosedTap[Nothing](EmptyTap)
}
}

trait SCollectionSyntax {
implicit def dynamicCsvSCollectionOps[T](
sc: SCollection[T]
): DynamicCsvSCollectionOps[T] =
new DynamicCsvSCollectionOps(sc)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Spotify AB.
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,4 +19,11 @@ package com.spotify.scio.extra

import com.spotify.scio.extra.csv.syntax.AllSyntax

/**
* Main package for CSV type-safe APIs. Import all.
*
* {{{
* import com.spotify.scio.extra.csv._
* }}}
*/
package object csv extends AllSyntax
Loading

0 comments on commit 91bb15c

Please sign in to comment.