From 4fbd1c89b584f81013cd4a59671e215334a1fa9f Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Wed, 25 Sep 2024 15:42:05 +0200 Subject: [PATCH 1/9] Support Snowflake (#5500) --- .github/workflows/ci.yml | 4 +- README.md | 1 + build.sbt | 19 +++ .../spotify/scio/snowflake/SnowflakeIO.scala | 134 ++++++++++++++++++ .../scio/snowflake/SnowflakeOptions.scala | 100 +++++++++++++ .../com/spotify/scio/snowflake/package.scala | 29 ++++ .../scio/snowflake/syntax/AllSyntax.scala | 20 +++ .../snowflake/syntax/SCollectionSyntax.scala | 129 +++++++++++++++++ .../snowflake/syntax/ScioContextSyntax.scala | 47 ++++++ 9 files changed, 481 insertions(+), 2 deletions(-) create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 477c4964e8..0dbe3c2f66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,11 +120,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-snowflake/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-snowflake/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/README.md b/README.md index c13c9467fe..14f32f499a 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,7 @@ Scio includes the following artifacts: - `scio-redis`: add-on for Redis - `scio-repl`: extension of the Scala REPL with Scio specific operations - `scio-smb`: add-on for Sort Merge Bucket operations +- `scio-snowflake`: add-on for Snowflake IO - `scio-tensorflow`: add-on for TensorFlow TFRecords IO and prediction - `scio-test`: all following test utilities. Add to your project as a "test" dependency - `scio-test-core`: test core utilities diff --git a/build.sbt b/build.sbt index 9616cb2d8c..90bc151998 100644 --- a/build.sbt +++ b/build.sbt @@ -671,6 +671,7 @@ lazy val scio = project `scio-redis`, `scio-repl`, `scio-smb`, + `scio-snowflake`, `scio-tensorflow`, `scio-test-core`, `scio-test-google-cloud-platform`, @@ -1265,6 +1266,24 @@ lazy val `scio-parquet` = project ) ) +lazy val `scio-snowflake` = project + .in(file("scio-snowflake")) + .dependsOn( + `scio-core` % "compile;test->test" + ) + .settings(commonSettings) + .settings( + description := "Scio add-on for Neo4J", + libraryDependencies ++= Seq( + // compile + "org.apache.beam" % "beam-sdks-java-core" % beamVersion, + "org.apache.beam" % "beam-sdks-java-io-snowflake" % beamVersion, + "com.nrinaudo" %% "kantan.csv" % kantanCsvVersion, + "com.nrinaudo" %% "kantan.codecs" % kantanCodecsVersion + ), + tlMimaPreviousVersions := Set.empty // TODO: remove once released + ) + val tensorFlowMetadataSourcesDir = settingKey[File]("Directory containing TensorFlow metadata proto files") val tensorFlowMetadata = taskKey[Seq[File]]("Retrieve TensorFlow metadata proto files") diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala new file mode 100644 index 0000000000..5984910857 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -0,0 +1,134 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import kantan.csv.{RowDecoder, RowEncoder} +import org.apache.beam.sdk.io.snowflake.SnowflakeIO.{CsvMapper, UserDataMapper} +import org.apache.beam.sdk.io.{snowflake => beam} + +object SnowflakeIO { + + private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = { + + val datasourceInitial = beam.SnowflakeIO.DataSourceConfiguration + .create() + + val datasourceWithAuthent = connectionOptions.authenticationOptions match { + case SnowflakeUsernamePasswordAuthenticationOptions(username, password) => + datasourceInitial.withUsernamePasswordAuth(username, password) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) => + datasourceInitial.withKeyPairPathAuth(username, privateKeyPath) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) => + datasourceInitial.withKeyPairPathAuth(username, privateKeyPath, passphrase) + case SnowflakeOAuthTokenAuthenticationOptions(token) => + datasourceInitial.withOAuth(token) + } + + val datasourceBeforeSchema = datasourceWithAuthent + .withServerName(connectionOptions.serverName) + .withDatabase(connectionOptions.database) + .withRole(connectionOptions.role) + .withWarehouse(connectionOptions.warehouse) + + connectionOptions.schema + .map(schema => datasourceBeforeSchema.withSchema(schema)) + .getOrElse(datasourceBeforeSchema) + } + + private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] = + new CsvMapper[T] { + override def mapRow(parts: Array[String]): T = { + val unsnowedParts = parts.map { + case "\\N" => "" // needs to be mapped to an Option + case other => other + }.toSeq + rowDecoder.unsafeDecode(unsnowedParts) + } + } + + private[snowflake] def prepareRead[T]( + snowflakeOptions: SnowflakeOptions, + sc: ScioContext + )(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): beam.SnowflakeIO.Read[T] = + beam.SnowflakeIO + .read() + .withDataSourceConfiguration( + SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withCsvMapper(buildCsvMapper(rowDecoder)) + .withCoder(CoderMaterializer.beam(sc, coder)) +} + +sealed trait SnowflakeIO[T] extends ScioIO[T] + +final case class SnowflakeSelect[T](snowflakeOptions: SnowflakeOptions, select: String)(implicit + rowDecoder: RowDecoder[T], + coder: Coder[T] +) extends SnowflakeIO[T] { + + override type ReadP = Unit + override type WriteP = Unit + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromQuery(select)) + + override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = + throw new UnsupportedOperationException("SnowflakeSelect is read-only") + + override def tap(params: ReadP): Tap[Nothing] = EmptyTap +} + +final case class SnowflakeTable[T](snowflakeOptions: SnowflakeOptions, table: String)(implicit + rowDecoder: RowDecoder[T], + rowEncoder: RowEncoder[T], + coder: Coder[T] +) extends SnowflakeIO[T] { + + override type ReadP = Unit + override type WriteP = Unit + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromTable(table)) + + override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = { + data.applyInternal( + beam.SnowflakeIO + .write[T]() + .withDataSourceConfiguration( + SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .to(table) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withUserDataMapper(new UserDataMapper[T] { + override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray + }) + ) + EmptyTap + } + + override def tap(params: ReadP): Tap[Nothing] = EmptyTap +} diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala new file mode 100644 index 0000000000..4ee8aaaefb --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake + +trait SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake username/password authentication. + * + * @param username + * username + * @param password + * password + */ +final case class SnowflakeUsernamePasswordAuthenticationOptions( + username: String, + password: String +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake key pair authentication. + * + * @param username + * username + * @param privateKeyPath + * path to the private key + * @param privateKeyPassphrase + * passphrase for the private key (optional) + */ +final case class SnowflakeKeyPairAuthenticationOptions( + username: String, + privateKeyPath: String, + privateKeyPassphrase: Option[String] +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake OAuth token authentication. + * + * @param token + * OAuth token + */ +final case class SnowflakeOAuthTokenAuthenticationOptions( + token: String +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake connection. + * + * @param authenticationOptions + * authentication options + * @param serverName + * server name (e.g. "account.region.snowflakecomputing.com") + * @param database + * database name + * @param role + * role name + * @param warehouse + * warehouse name + * @param schema + * schema name (optional) + */ +final case class SnowflakeConnectionOptions( + authenticationOptions: SnowflakeAuthenticationOptions, + serverName: String, + database: String, + role: String, + warehouse: String, + schema: Option[String] +) + +/** + * Options for configuring a Neo4J driver. + * + * @param connectionOptions + * connection options + * @param stagingBucketName + * Snowflake staging bucket name where CSV files will be stored + * @param storageIntegrationName + * Storage integration name as created in Snowflake + */ +final case class SnowflakeOptions( + connectionOptions: SnowflakeConnectionOptions, + stagingBucketName: String, + storageIntegrationName: String +) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala new file mode 100644 index 0000000000..2c80313e7a --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.snowflake.syntax.AllSyntax + +/** + * Main package for Snowflake APIs. Import all. + * + * {{{ + * import com.spotify.scio.snowflake._ + * }}} + */ +package object snowflake extends AllSyntax diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala new file mode 100644 index 0000000000..3ce04e0130 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +trait AllSyntax extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala new file mode 100644 index 0000000000..1ff2c3f235 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.{EmptyTap, Tap} +import com.spotify.scio.snowflake.SnowflakeOptions +import com.spotify.scio.values.SCollection +import kantan.csv.{RowDecoder, RowEncoder} +import org.apache.beam.sdk.io.snowflake.SnowflakeIO.UserDataMapper +import org.apache.beam.sdk.io.{snowflake => beam} + +/** + * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Snowflake methods. + */ +final class SnowflakeSCollectionOps[T](private val self: SCollection[T]) extends AnyVal { + + import com.spotify.scio.snowflake.SnowflakeIO._ + + /** + * Execute the provided SQL query in Snowflake, COPYing the result in CSV format to the provided + * bucket, and return an [[SCollection]] of provided type, reading this bucket. + * + * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type + * properties must then match the order of the columns of the SELECT, that will be copied to the + * bucket. + * + * @see + * ''Reading from Snowflake'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * @param snowflakeConf + * options for configuring a Snowflake integration + * @param query + * SQL select query + * @return + * [[SCollection]] containing the query results as parsed from the CSV bucket copied from + * Snowflake + */ + def snowflakeSelect[U]( + snowflakeConf: SnowflakeOptions, + query: String + )(implicit + rowDecoder: RowDecoder[U], + coder: Coder[U] + ): SCollection[U] = + self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromQuery(query)) + + /** + * Copy the provided Snowflake table in CSV format to the provided bucket, and * return an + * [[SCollection]] of provided type, reading this bucket. + * + * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type + * properties must then match the order of the columns of the table, that will be copied to the + * bucket. + * + * @see + * ''Reading from Snowflake'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * @param snowflakeConf + * options for configuring a Snowflake integration + * @param table + * table + * @return + * [[SCollection]] containing the table elements as parsed from the CSV bucket copied from + * Snowflake table + */ + def snowflakeTable[U]( + snowflakeConf: SnowflakeOptions, + table: String + )(implicit + rowDecoder: RowDecoder[U], + coder: Coder[U] + ): SCollection[U] = + self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromTable(table)) + + /** + * Save this SCollection as a Snowflake database table. The [[SCollection]] is written to CSV + * files in a bucket, using the provided [[kantan.csv.RowEncoder]] to encode each element as a CSV + * row. The bucket is then COPYied to the Snowflake table. + * + * @see + * ''Writing to Snowflake tables'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * + * @param snowflakeOptions + * options for configuring a Snowflake connexion + * @param table + * Snowflake table + */ + def saveAsSnowflakeTable( + snowflakeOptions: SnowflakeOptions, + table: String + )(implicit rowEncoder: RowEncoder[T], coder: Coder[T]): Tap[Nothing] = { + self.applyInternal( + beam.SnowflakeIO + .write[T]() + .withDataSourceConfiguration( + dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .to(table) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withUserDataMapper(new UserDataMapper[T] { + override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray + }) + ) + EmptyTap + } +} + +trait SCollectionSyntax { + implicit def snowflakeSCollectionOps[T](sc: SCollection[T]): SnowflakeSCollectionOps[T] = + new SnowflakeSCollectionOps(sc) +} diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala new file mode 100644 index 0000000000..846f437da2 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.snowflake.{SnowflakeOptions, SnowflakeSelect} +import com.spotify.scio.values.SCollection +import kantan.csv.RowDecoder + +/** Enhanced version of [[ScioContext]] with Snowflake methods. */ +final class SnowflakeScioContextOps(private val self: ScioContext) extends AnyVal { + + /** + * Get an SCollection for a Snowflake SQL query + * + * @param snowflakeOptions + * options for configuring a Snowflake connexion + * @param query + * Snowflake SQL select query + */ + def snowflakeQuery[T: RowDecoder: Coder]( + snowflakeOptions: SnowflakeOptions, + query: String + ): SCollection[T] = + self.read(SnowflakeSelect(snowflakeOptions, query)) + +} +trait ScioContextSyntax { + implicit def snowflakeScioContextOps(sc: ScioContext): SnowflakeScioContextOps = + new SnowflakeScioContextOps(sc) +} From 0859b24cd8daf0923e3a858fa664a89c6d8e8a75 Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Mon, 4 Nov 2024 16:01:02 +0100 Subject: [PATCH 2/9] fixup! Support Snowflake (#5500) --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 90bc151998..527bb99f5c 100644 --- a/build.sbt +++ b/build.sbt @@ -1273,7 +1273,7 @@ lazy val `scio-snowflake` = project ) .settings(commonSettings) .settings( - description := "Scio add-on for Neo4J", + description := "Scio add-on for Snowflake", libraryDependencies ++= Seq( // compile "org.apache.beam" % "beam-sdks-java-core" % beamVersion, From 8b0da460b2f18fd2eabfd6436137378d030f6054 Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Mon, 4 Nov 2024 16:32:30 +0100 Subject: [PATCH 3/9] fixup! Support Snowflake (#5500) --- .../spotify/scio/snowflake/SnowflakeIO.scala | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala index 5984910857..5018b56858 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -17,6 +17,7 @@ package com.spotify.scio.snowflake +import scala.util.chaining._ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} @@ -27,32 +28,29 @@ import org.apache.beam.sdk.io.{snowflake => beam} object SnowflakeIO { - private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = { - - val datasourceInitial = beam.SnowflakeIO.DataSourceConfiguration + private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = + beam.SnowflakeIO.DataSourceConfiguration .create() - - val datasourceWithAuthent = connectionOptions.authenticationOptions match { - case SnowflakeUsernamePasswordAuthenticationOptions(username, password) => - datasourceInitial.withUsernamePasswordAuth(username, password) - case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) => - datasourceInitial.withKeyPairPathAuth(username, privateKeyPath) - case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) => - datasourceInitial.withKeyPairPathAuth(username, privateKeyPath, passphrase) - case SnowflakeOAuthTokenAuthenticationOptions(token) => - datasourceInitial.withOAuth(token) - } - - val datasourceBeforeSchema = datasourceWithAuthent - .withServerName(connectionOptions.serverName) - .withDatabase(connectionOptions.database) - .withRole(connectionOptions.role) - .withWarehouse(connectionOptions.warehouse) - - connectionOptions.schema - .map(schema => datasourceBeforeSchema.withSchema(schema)) - .getOrElse(datasourceBeforeSchema) - } + .pipe(ds => + connectionOptions.authenticationOptions match { + case SnowflakeUsernamePasswordAuthenticationOptions(username, password) => + ds.withUsernamePasswordAuth(username, password) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) => + ds.withKeyPairPathAuth(username, privateKeyPath) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) => + ds.withKeyPairPathAuth(username, privateKeyPath, passphrase) + case SnowflakeOAuthTokenAuthenticationOptions(token) => + ds.withOAuth(token) + } + ) + .pipe(ds => + ds + .withServerName(connectionOptions.serverName) + .withDatabase(connectionOptions.database) + .withRole(connectionOptions.role) + .withWarehouse(connectionOptions.warehouse) + ) + .pipe(ds => connectionOptions.schema.map(schema => ds.withSchema(schema)).getOrElse(ds)) private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] = new CsvMapper[T] { From b6c8906a72e872c580a55d2073d361330a84619b Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Fri, 8 Nov 2024 23:19:28 +0100 Subject: [PATCH 4/9] fixup! Support Snowflake (#5500) --- .../src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala index 5018b56858..de38cfec34 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -50,7 +50,7 @@ object SnowflakeIO { .withRole(connectionOptions.role) .withWarehouse(connectionOptions.warehouse) ) - .pipe(ds => connectionOptions.schema.map(schema => ds.withSchema(schema)).getOrElse(ds)) + .pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema)) private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] = new CsvMapper[T] { From 48252d2054717e494f4ead6444f7537eefac2dc4 Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Fri, 8 Nov 2024 23:24:55 +0100 Subject: [PATCH 5/9] fixup! Support Snowflake (#5500) --- .../scala/com/spotify/scio/snowflake/SnowflakeOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala index 4ee8aaaefb..a43fb74fe9 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala @@ -17,7 +17,7 @@ package com.spotify.scio.snowflake -trait SnowflakeAuthenticationOptions +sealed trait SnowflakeAuthenticationOptions /** * Options for a Snowflake username/password authentication. From 5ccff09a4f5fd77ef988ca8379c0ca490b1f462f Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 18 Nov 2024 17:27:35 +0100 Subject: [PATCH 6/9] Add IO parameters --- .../com/spotify/scio/neo4j/Neo4jIOTest.scala | 2 +- .../spotify/scio/snowflake/SnowflakeIO.scala | 220 ++++++++++++------ .../scio/snowflake/SnowflakeOptions.scala | 118 +++++----- .../snowflake/syntax/SCollectionSyntax.scala | 146 +++++------- .../snowflake/syntax/ScioContextSyntax.scala | 73 +++++- .../scio/snowflake/SnowflakeIOTest.scala | 43 ++++ 6 files changed, 382 insertions(+), 220 deletions(-) create mode 100644 scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala diff --git a/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala b/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala index 3f6b5fb94f..7c3121233e 100644 --- a/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala +++ b/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2022 Spotify AB. + * Copyright 2024 Spotify AB. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala index de38cfec34..a710a1d1d6 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -20,77 +20,145 @@ package com.spotify.scio.snowflake import scala.util.chaining._ import com.spotify.scio.ScioContext import com.spotify.scio.coders.{Coder, CoderMaterializer} -import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT, TestIO} +import com.spotify.scio.util.ScioUtil import com.spotify.scio.values.SCollection -import kantan.csv.{RowDecoder, RowEncoder} +import kantan.csv.{RowCodec, RowDecoder, RowEncoder} import org.apache.beam.sdk.io.snowflake.SnowflakeIO.{CsvMapper, UserDataMapper} +import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema +import org.apache.beam.sdk.io.snowflake.enums.{CreateDisposition, WriteDisposition} import org.apache.beam.sdk.io.{snowflake => beam} +import org.joda.time.Duration object SnowflakeIO { + final def apply[T](opts: SnowflakeConnectionOptions, query: String): SnowflakeIO[T] = + new SnowflakeIO[T] with TestIO[T] { + final override val tapT = EmptyTapOf[T] + override def testId: String = s"SnowflakeIO(${snowflakeIoId(opts, query)})" + } + + private[snowflake] def snowflakeIoId(opts: SnowflakeConnectionOptions, target: String): String = { + // source params + val params = Option(opts.database).map(db => s"db=$db") ++ + Option(opts.warehouse).map(db => s"warehouse=$db") + s"${opts.url}${params.mkString("?", "&", "")}:$target" + } + + object ReadParam { + type ConfigOverride[T] = beam.SnowflakeIO.Read[T] => beam.SnowflakeIO.Read[T] + + val DefaultStagingBucketName: String = null + val DefaultQuotationMark: String = null + val DefaultConfigOverride = null + } + final case class ReadParam[T]( + storageIntegrationName: String, + stagingBucketName: String = ReadParam.DefaultStagingBucketName, + quotationMark: String = ReadParam.DefaultQuotationMark, + configOverride: ReadParam.ConfigOverride[T] = ReadParam.DefaultConfigOverride + ) + + object WriteParam { + type ConfigOverride[T] = beam.SnowflakeIO.Write[T] => beam.SnowflakeIO.Write[T] + + val DefaultTableSchema: SnowflakeTableSchema = null + val DefaultCreateDisposition: CreateDisposition = null + val DefaultWriteDisposition: WriteDisposition = null + val DefaultSnowPipe: String = null + val DefaultShardNumber: Integer = null + val DefaultFlushRowLimit: Integer = null + val DefaultFlushTimeLimit: Duration = null + val DefaultStorageIntegrationName: String = null + val DefaultStagingBucketName: String = null + val DefaultQuotationMark: String = null + val DefaultConfigOverride = null + } + final case class WriteParam[T]( + tableSchema: SnowflakeTableSchema = WriteParam.DefaultTableSchema, + createDisposition: CreateDisposition = WriteParam.DefaultCreateDisposition, + writeDisposition: WriteDisposition = WriteParam.DefaultWriteDisposition, + snowPipe: String = WriteParam.DefaultSnowPipe, + shardNumber: Integer = WriteParam.DefaultShardNumber, + flushRowLimit: Integer = WriteParam.DefaultFlushRowLimit, + flushTimeLimit: Duration = WriteParam.DefaultFlushTimeLimit, + storageIntegrationName: String = WriteParam.DefaultStorageIntegrationName, + stagingBucketName: String = WriteParam.DefaultStagingBucketName, + quotationMark: String = WriteParam.DefaultQuotationMark, + configOverride: WriteParam.ConfigOverride[T] = WriteParam.DefaultConfigOverride + ) + private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = beam.SnowflakeIO.DataSourceConfiguration .create() - .pipe(ds => - connectionOptions.authenticationOptions match { - case SnowflakeUsernamePasswordAuthenticationOptions(username, password) => + .withUrl(connectionOptions.url) + .pipe { ds => + import SnowflakeAuthenticationOptions._ + Option(connectionOptions.authenticationOptions).fold(ds) { + case UsernamePassword(username, password) => ds.withUsernamePasswordAuth(username, password) - case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) => + case KeyPair(username, privateKeyPath, None) => ds.withKeyPairPathAuth(username, privateKeyPath) - case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) => + case KeyPair(username, privateKeyPath, Some(passphrase)) => ds.withKeyPairPathAuth(username, privateKeyPath, passphrase) - case SnowflakeOAuthTokenAuthenticationOptions(token) => - ds.withOAuth(token) + case OAuthToken(token) => + ds.withOAuth(token).withAuthenticator("oauth") } - ) + } + .pipe(ds => Option(connectionOptions.database).fold(ds)(ds.withDatabase)) + .pipe(ds => Option(connectionOptions.role).fold(ds)(ds.withRole)) + .pipe(ds => Option(connectionOptions.warehouse).fold(ds)(ds.withWarehouse)) .pipe(ds => - ds - .withServerName(connectionOptions.serverName) - .withDatabase(connectionOptions.database) - .withRole(connectionOptions.role) - .withWarehouse(connectionOptions.warehouse) + Option(connectionOptions.loginTimeout) + .map[Integer](_.getStandardSeconds.toInt) + .fold(ds)(ds.withLoginTimeout) ) - .pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema)) - - private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] = - new CsvMapper[T] { - override def mapRow(parts: Array[String]): T = { - val unsnowedParts = parts.map { - case "\\N" => "" // needs to be mapped to an Option - case other => other - }.toSeq - rowDecoder.unsafeDecode(unsnowedParts) - } - } + .pipe(ds => Option(connectionOptions.schema).fold(ds)(ds.withSchema)) + + private[snowflake] def csvMapper[T: RowDecoder]: CsvMapper[T] = { (parts: Array[String]) => + val unsnowedParts = parts.map { + case "\\N" => "" // needs to be mapped to an Option + case other => other + }.toSeq + RowDecoder[T].unsafeDecode(unsnowedParts) + } - private[snowflake] def prepareRead[T]( - snowflakeOptions: SnowflakeOptions, - sc: ScioContext - )(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): beam.SnowflakeIO.Read[T] = - beam.SnowflakeIO - .read() - .withDataSourceConfiguration( - SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) - ) - .withStagingBucketName(snowflakeOptions.stagingBucketName) - .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) - .withCsvMapper(buildCsvMapper(rowDecoder)) - .withCoder(CoderMaterializer.beam(sc, coder)) + private[snowflake] def userDataMapper[T: RowEncoder]: UserDataMapper[T] = { (element: T) => + RowEncoder[T].encode(element).toArray + } } sealed trait SnowflakeIO[T] extends ScioIO[T] -final case class SnowflakeSelect[T](snowflakeOptions: SnowflakeOptions, select: String)(implicit +final case class SnowflakeSelect[T](connectionOptions: SnowflakeConnectionOptions, select: String)( + implicit rowDecoder: RowDecoder[T], coder: Coder[T] ) extends SnowflakeIO[T] { - override type ReadP = Unit + import SnowflakeIO._ + + override type ReadP = ReadParam[T] override type WriteP = Unit override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromQuery(select)) + override def testId: String = s"SnowflakeIO(${snowflakeIoId(connectionOptions, select)})" + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { + val tempDirectory = ScioUtil.tempDirOrDefault(params.stagingBucketName, sc).toString + val t = beam.SnowflakeIO + .read[T]() + .fromQuery(select) + .withDataSourceConfiguration(dataSourceConfiguration(connectionOptions)) + .withStorageIntegrationName(params.storageIntegrationName) + .withStagingBucketName(tempDirectory) + .pipe(r => Option(params.quotationMark).fold(r)(r.withQuotationMark)) + .withCsvMapper(csvMapper) + .withCoder(CoderMaterializer.beam(sc, coder)) + .pipe(r => Option(params.configOverride).fold(r)(_(r))) + + sc.applyTransform(t) + } override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = throw new UnsupportedOperationException("SnowflakeSelect is read-only") @@ -98,33 +166,55 @@ final case class SnowflakeSelect[T](snowflakeOptions: SnowflakeOptions, select: override def tap(params: ReadP): Tap[Nothing] = EmptyTap } -final case class SnowflakeTable[T](snowflakeOptions: SnowflakeOptions, table: String)(implicit - rowDecoder: RowDecoder[T], - rowEncoder: RowEncoder[T], +final case class SnowflakeTable[T](connectionOptions: SnowflakeConnectionOptions, table: String)( + implicit + rowCodec: RowCodec[T], // use codec for tap coder: Coder[T] ) extends SnowflakeIO[T] { - override type ReadP = Unit - override type WriteP = Unit - override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + import SnowflakeIO._ + + override type ReadP = ReadParam[T] + override type WriteP = WriteParam[T] + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] // TODO Create a tap + + override def testId: String = s"SnowflakeIO(${snowflakeIoId(connectionOptions, table)})" - override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = - sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromTable(table)) + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { + val tempDirectory = ScioUtil.tempDirOrDefault(params.stagingBucketName, sc).toString + val t = beam.SnowflakeIO + .read[T]() + .fromTable(table) + .withDataSourceConfiguration(dataSourceConfiguration(connectionOptions)) + .withStorageIntegrationName(params.storageIntegrationName) + .withStagingBucketName(tempDirectory) + .pipe(r => Option(params.quotationMark).fold(r)(r.withQuotationMark)) + .withCsvMapper(csvMapper) + .withCoder(CoderMaterializer.beam(sc, coder)) + .pipe(r => Option(params.configOverride).fold(r)(_(r))) + + sc.applyTransform(t) + } override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = { - data.applyInternal( - beam.SnowflakeIO - .write[T]() - .withDataSourceConfiguration( - SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) - ) - .to(table) - .withStagingBucketName(snowflakeOptions.stagingBucketName) - .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) - .withUserDataMapper(new UserDataMapper[T] { - override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray - }) - ) + val tempDirectory = ScioUtil.tempDirOrDefault(params.stagingBucketName, data.context).toString + val t = beam.SnowflakeIO + .write[T]() + .withDataSourceConfiguration(dataSourceConfiguration(connectionOptions)) + .to(table) + .pipe(w => Option(params.createDisposition).fold(w)(w.withCreateDisposition)) + .pipe(w => Option(params.writeDisposition).fold(w)(w.withWriteDisposition)) + .pipe(w => Option(params.snowPipe).fold(w)(w.withSnowPipe)) + .pipe(w => Option(params.shardNumber).fold(w)(w.withShardsNumber)) + .pipe(w => Option(params.flushRowLimit).fold(w)(w.withFlushRowLimit)) + .pipe(w => Option(params.flushTimeLimit).fold(w)(w.withFlushTimeLimit)) + .pipe(w => Option(params.quotationMark).fold(w)(w.withQuotationMark)) + .pipe(w => Option(params.storageIntegrationName).fold(w)(w.withStorageIntegrationName)) + .withStagingBucketName(tempDirectory) + .withUserDataMapper(userDataMapper) + .pipe(w => Option(params.configOverride).fold(w)(_(w))) + + data.applyInternal(t) EmptyTap } diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala index a43fb74fe9..cdcd801866 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala @@ -17,84 +17,76 @@ package com.spotify.scio.snowflake +import org.joda.time.Duration + sealed trait SnowflakeAuthenticationOptions -/** - * Options for a Snowflake username/password authentication. - * - * @param username - * username - * @param password - * password - */ -final case class SnowflakeUsernamePasswordAuthenticationOptions( - username: String, - password: String -) extends SnowflakeAuthenticationOptions +object SnowflakeAuthenticationOptions { -/** - * Options for a Snowflake key pair authentication. - * - * @param username - * username - * @param privateKeyPath - * path to the private key - * @param privateKeyPassphrase - * passphrase for the private key (optional) - */ -final case class SnowflakeKeyPairAuthenticationOptions( - username: String, - privateKeyPath: String, - privateKeyPassphrase: Option[String] -) extends SnowflakeAuthenticationOptions + /** + * Snowflake username/password authentication. + * + * @param username + * username + * @param password + * password + */ + final case class UsernamePassword( + username: String, + password: String + ) extends SnowflakeAuthenticationOptions -/** - * Options for a Snowflake OAuth token authentication. - * - * @param token - * OAuth token - */ -final case class SnowflakeOAuthTokenAuthenticationOptions( - token: String -) extends SnowflakeAuthenticationOptions + /** + * Key pair authentication. + * + * @param username + * username + * @param privateKeyPath + * path to the private key + * @param privateKeyPassphrase + * passphrase for the private key (optional) + */ + final case class KeyPair( + username: String, + privateKeyPath: String, + privateKeyPassphrase: Option[String] = None + ) extends SnowflakeAuthenticationOptions + + /** + * OAuth token authentication. + * + * @param token + * OAuth token + */ + final case class OAuthToken(token: String) extends SnowflakeAuthenticationOptions + +} /** * Options for a Snowflake connection. * * @param authenticationOptions * authentication options - * @param serverName - * server name (e.g. "account.region.snowflakecomputing.com") + * @param url + * Sets URL of Snowflake server in following format: + * "jdbc:snowflake://[host]:[port].snowflakecomputing.com" * @param database - * database name + * database to use * @param role - * role name + * user's role to be used when running queries on Snowflake * @param warehouse * warehouse name * @param schema - * schema name (optional) + * schema to use when connecting to Snowflake + * @param loginTimeout + * Sets loginTimeout that will be used in [[net.snowflake.client.jdbc.SnowflakeBasicDataSource]]. */ final case class SnowflakeConnectionOptions( - authenticationOptions: SnowflakeAuthenticationOptions, - serverName: String, - database: String, - role: String, - warehouse: String, - schema: Option[String] -) - -/** - * Options for configuring a Neo4J driver. - * - * @param connectionOptions - * connection options - * @param stagingBucketName - * Snowflake staging bucket name where CSV files will be stored - * @param storageIntegrationName - * Storage integration name as created in Snowflake - */ -final case class SnowflakeOptions( - connectionOptions: SnowflakeConnectionOptions, - stagingBucketName: String, - storageIntegrationName: String + url: String, + authenticationOptions: SnowflakeAuthenticationOptions = null, + database: String = null, + role: String = null, + warehouse: String = null, + schema: String = null, + loginTimeout: Duration = null ) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala index 1ff2c3f235..3679dc593c 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala @@ -18,108 +18,88 @@ package com.spotify.scio.snowflake.syntax import com.spotify.scio.coders.Coder -import com.spotify.scio.io.{EmptyTap, Tap} -import com.spotify.scio.snowflake.SnowflakeOptions +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.snowflake.{SnowflakeConnectionOptions, SnowflakeIO, SnowflakeTable} import com.spotify.scio.values.SCollection -import kantan.csv.{RowDecoder, RowEncoder} -import org.apache.beam.sdk.io.snowflake.SnowflakeIO.UserDataMapper -import org.apache.beam.sdk.io.{snowflake => beam} +import kantan.csv.RowCodec +import org.apache.beam.sdk.io.snowflake.data.SnowflakeTableSchema +import org.apache.beam.sdk.io.snowflake.enums.{CreateDisposition, WriteDisposition} +import org.joda.time.Duration /** * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Snowflake methods. */ final class SnowflakeSCollectionOps[T](private val self: SCollection[T]) extends AnyVal { - import com.spotify.scio.snowflake.SnowflakeIO._ - /** - * Execute the provided SQL query in Snowflake, COPYing the result in CSV format to the provided - * bucket, and return an [[SCollection]] of provided type, reading this bucket. - * - * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type - * properties must then match the order of the columns of the SELECT, that will be copied to the - * bucket. - * - * @see - * ''Reading from Snowflake'' in the - * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] - * @param snowflakeConf - * options for configuring a Snowflake integration - * @param query - * SQL select query - * @return - * [[SCollection]] containing the query results as parsed from the CSV bucket copied from - * Snowflake - */ - def snowflakeSelect[U]( - snowflakeConf: SnowflakeOptions, - query: String - )(implicit - rowDecoder: RowDecoder[U], - coder: Coder[U] - ): SCollection[U] = - self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromQuery(query)) - - /** - * Copy the provided Snowflake table in CSV format to the provided bucket, and * return an - * [[SCollection]] of provided type, reading this bucket. - * - * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type - * properties must then match the order of the columns of the table, that will be copied to the - * bucket. + * Save this SCollection as a Snowflake database table. The [[SCollection]] is written to CSV + * files in a bucket, using a provided [[kantan.csv.RowEncoder]] to encode each element as a CSV + * row. The bucket is then COPYied to the Snowflake table. * * @see * ''Reading from Snowflake'' in the * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] - * @param snowflakeConf + * @param connectionOptions * options for configuring a Snowflake integration * @param table - * table + * table name to be written in Snowflake + * @param tableSchema + * table schema to be used during creating table + * @param createDisposition + * disposition to be used during table preparation + * @param writeDisposition + * disposition to be used during writing to table phase + * @param snowPipe + * name of created + * [[https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro SnowPipe]] in Snowflake + * dashboard + * @param shardNumber + * number of shards that are created per window + * @param flushRowLimit + * number of row limit that will be saved to the staged file and then loaded to Snowflake + * @param flushTimeLimit + * duration how often staged files will be created and then how often ingested by Snowflake + * during streaming + * @param storageIntegrationName + * Storage Integration in Snowflake to be used + * @param stagingBucketName + * cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement. + * @param quotationMark + * Snowflake-specific quotations around strings * @return * [[SCollection]] containing the table elements as parsed from the CSV bucket copied from * Snowflake table */ - def snowflakeTable[U]( - snowflakeConf: SnowflakeOptions, - table: String - )(implicit - rowDecoder: RowDecoder[U], - coder: Coder[U] - ): SCollection[U] = - self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromTable(table)) - - /** - * Save this SCollection as a Snowflake database table. The [[SCollection]] is written to CSV - * files in a bucket, using the provided [[kantan.csv.RowEncoder]] to encode each element as a CSV - * row. The bucket is then COPYied to the Snowflake table. - * - * @see - * ''Writing to Snowflake tables'' in the - * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] - * - * @param snowflakeOptions - * options for configuring a Snowflake connexion - * @param table - * Snowflake table - */ - def saveAsSnowflakeTable( - snowflakeOptions: SnowflakeOptions, - table: String - )(implicit rowEncoder: RowEncoder[T], coder: Coder[T]): Tap[Nothing] = { - self.applyInternal( - beam.SnowflakeIO - .write[T]() - .withDataSourceConfiguration( - dataSourceConfiguration(snowflakeOptions.connectionOptions) - ) - .to(table) - .withStagingBucketName(snowflakeOptions.stagingBucketName) - .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) - .withUserDataMapper(new UserDataMapper[T] { - override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray - }) + def saveAsSnowflake( + connectionOptions: SnowflakeConnectionOptions, + table: String, + tableSchema: SnowflakeTableSchema = SnowflakeIO.WriteParam.DefaultTableSchema, + createDisposition: CreateDisposition = SnowflakeIO.WriteParam.DefaultCreateDisposition, + writeDisposition: WriteDisposition = SnowflakeIO.WriteParam.DefaultWriteDisposition, + snowPipe: String = SnowflakeIO.WriteParam.DefaultSnowPipe, + shardNumber: Integer = SnowflakeIO.WriteParam.DefaultShardNumber, + flushRowLimit: Integer = SnowflakeIO.WriteParam.DefaultFlushRowLimit, + flushTimeLimit: Duration = SnowflakeIO.WriteParam.DefaultFlushTimeLimit, + storageIntegrationName: String = SnowflakeIO.WriteParam.DefaultStorageIntegrationName, + stagingBucketName: String = SnowflakeIO.WriteParam.DefaultStagingBucketName, + quotationMark: String = SnowflakeIO.WriteParam.DefaultQuotationMark, + configOverride: SnowflakeIO.WriteParam.ConfigOverride[T] = + SnowflakeIO.WriteParam.DefaultConfigOverride + )(implicit rowCodec: RowCodec[T], coder: Coder[T]): ClosedTap[Nothing] = { + val param = SnowflakeIO.WriteParam( + tableSchema = tableSchema, + createDisposition = createDisposition, + writeDisposition = writeDisposition, + snowPipe = snowPipe, + shardNumber = shardNumber, + flushRowLimit = flushRowLimit, + flushTimeLimit = flushTimeLimit, + storageIntegrationName = storageIntegrationName, + stagingBucketName = stagingBucketName, + quotationMark = quotationMark, + configOverride = configOverride ) - EmptyTap + self.write(SnowflakeTable[T](connectionOptions, table))(param) } } diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala index 846f437da2..392267b29b 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala @@ -19,9 +19,14 @@ package com.spotify.scio.snowflake.syntax import com.spotify.scio.ScioContext import com.spotify.scio.coders.Coder -import com.spotify.scio.snowflake.{SnowflakeOptions, SnowflakeSelect} +import com.spotify.scio.snowflake.{ + SnowflakeConnectionOptions, + SnowflakeIO, + SnowflakeSelect, + SnowflakeTable +} import com.spotify.scio.values.SCollection -import kantan.csv.RowDecoder +import kantan.csv.{RowCodec, RowDecoder} /** Enhanced version of [[ScioContext]] with Snowflake methods. */ final class SnowflakeScioContextOps(private val self: ScioContext) extends AnyVal { @@ -29,16 +34,68 @@ final class SnowflakeScioContextOps(private val self: ScioContext) extends AnyVa /** * Get an SCollection for a Snowflake SQL query * - * @param snowflakeOptions + * @param connectionOptions * options for configuring a Snowflake connexion * @param query * Snowflake SQL select query + * @param storageIntegrationName + * Storage Integration in Snowflake to be used + * @param stagingBucketName + * cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement. + * @param quotationMark + * Snowflake-specific quotations around strings + */ + def snowflakeQuery[T]( + connectionOptions: SnowflakeConnectionOptions, + query: String, + storageIntegrationName: String, + stagingBucketName: String = SnowflakeIO.ReadParam.DefaultStagingBucketName, + quotationMark: String = SnowflakeIO.ReadParam.DefaultQuotationMark, + configOverride: SnowflakeIO.ReadParam.ConfigOverride[T] = + SnowflakeIO.ReadParam.DefaultConfigOverride + )(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): SCollection[T] = { + val param = SnowflakeIO.ReadParam( + storageIntegrationName = storageIntegrationName, + stagingBucketName = stagingBucketName, + quotationMark = quotationMark, + configOverride = configOverride + ) + self.read(SnowflakeSelect(connectionOptions, query))(param) + } + + /** + * Get an SCollection for a Snowflake table + * + * @param connectionOptions + * options for configuring a Snowflake connexion + * @param table + * Snowflake table + * @param storageIntegrationName + * Storage Integration in Snowflake to be used + * @param stagingBucketName + * cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement. + * @param quotationMark + * Snowflake-specific quotations around strings */ - def snowflakeQuery[T: RowDecoder: Coder]( - snowflakeOptions: SnowflakeOptions, - query: String - ): SCollection[T] = - self.read(SnowflakeSelect(snowflakeOptions, query)) + def snowflakeTable[T]( + connectionOptions: SnowflakeConnectionOptions, + table: String, + storageIntegrationName: String, + stagingBucketName: String = SnowflakeIO.ReadParam.DefaultStagingBucketName, + quotationMark: String = SnowflakeIO.ReadParam.DefaultQuotationMark, + configOverride: SnowflakeIO.ReadParam.ConfigOverride[T] = + SnowflakeIO.ReadParam.DefaultConfigOverride + )(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): SCollection[T] = { + // create a read only codec + implicit val codec: RowCodec[T] = RowCodec.from(rowDecoder, null) + val param = SnowflakeIO.ReadParam( + storageIntegrationName = storageIntegrationName, + stagingBucketName = stagingBucketName, + quotationMark = quotationMark, + configOverride = configOverride + ) + self.read(SnowflakeTable(connectionOptions, table))(param) + } } trait ScioContextSyntax { diff --git a/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala b/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala new file mode 100644 index 0000000000..289e318e78 --- /dev/null +++ b/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala @@ -0,0 +1,43 @@ +package com.spotify.scio.snowflake + +import com.spotify.scio.testing.ScioIOSpec +import kantan.csv.RowCodec + +object SnowflakeIOTest { + final case class Data(value: String) +} + +class SnowflakeIOTest extends ScioIOSpec { + + import SnowflakeIOTest._ + + val connectionOptions = SnowflakeConnectionOptions( + url = "jdbc:snowflake://host.snowflakecomputing.com" + ) + + implicit val rowCodecData: RowCodec[Data] = RowCodec.caseCodec(Data.apply)(Data.unapply) + + "SnowflakeIO" should "support query input" in { + val input = Seq(Data("a"), Data("b"), Data("c")) + val query = "SELECT * FROM table" + testJobTestInput(input, query)(SnowflakeIO(connectionOptions, _))( + _.snowflakeQuery(connectionOptions, _, "storage-integration") + ) + } + + it should "support table input" in { + val input = Seq(Data("a"), Data("b"), Data("c")) + val table = "table" + testJobTestInput(input, table)(SnowflakeIO(connectionOptions, _))( + _.snowflakeTable(connectionOptions, _, "storage-integration") + ) + } + + it should "support table output" in { + val output = Seq(Data("a"), Data("b"), Data("c")) + val table = "table" + testJobTestOutput(output, table)(SnowflakeIO(connectionOptions, _))( + _.saveAsSnowflake(connectionOptions, _) + ) + } +} From 095cf07b442c75e82793ae3382cf3abd42807c4b Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 18 Nov 2024 17:29:30 +0100 Subject: [PATCH 7/9] Consistent naming --- .../main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala index a710a1d1d6..910d4477a3 100644 --- a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -130,7 +130,7 @@ object SnowflakeIO { sealed trait SnowflakeIO[T] extends ScioIO[T] -final case class SnowflakeSelect[T](connectionOptions: SnowflakeConnectionOptions, select: String)( +final case class SnowflakeSelect[T](connectionOptions: SnowflakeConnectionOptions, query: String)( implicit rowDecoder: RowDecoder[T], coder: Coder[T] @@ -142,13 +142,13 @@ final case class SnowflakeSelect[T](connectionOptions: SnowflakeConnectionOption override type WriteP = Unit override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] - override def testId: String = s"SnowflakeIO(${snowflakeIoId(connectionOptions, select)})" + override def testId: String = s"SnowflakeIO(${snowflakeIoId(connectionOptions, query)})" override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = { val tempDirectory = ScioUtil.tempDirOrDefault(params.stagingBucketName, sc).toString val t = beam.SnowflakeIO .read[T]() - .fromQuery(select) + .fromQuery(query) .withDataSourceConfiguration(dataSourceConfiguration(connectionOptions)) .withStorageIntegrationName(params.storageIntegrationName) .withStagingBucketName(tempDirectory) From c28d92576fa49a002de99d9c226e2ee81ee39d2e Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 18 Nov 2024 17:32:56 +0100 Subject: [PATCH 8/9] Headers --- .../com/spotify/scio/neo4j/Neo4jIOTest.scala | 2 +- .../spotify/scio/snowflake/SnowflakeIOTest.scala | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala b/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala index 7c3121233e..3f6b5fb94f 100644 --- a/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala +++ b/scio-neo4j/src/test/scala/com/spotify/scio/neo4j/Neo4jIOTest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2024 Spotify AB. + * Copyright 2022 Spotify AB. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala b/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala index 289e318e78..1fbc901c8b 100644 --- a/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala +++ b/scio-snowflake/src/test/scala/com/spotify/scio/snowflake/SnowflakeIOTest.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.snowflake import com.spotify.scio.testing.ScioIOSpec From 2a2ac467e4c674b8ef2f4e4feb962231a82e7c99 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Wed, 20 Nov 2024 11:36:58 +0100 Subject: [PATCH 9/9] Add missing dep --- build.sbt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 527bb99f5c..a03ac889b9 100644 --- a/build.sbt +++ b/build.sbt @@ -1276,10 +1276,11 @@ lazy val `scio-snowflake` = project description := "Scio add-on for Snowflake", libraryDependencies ++= Seq( // compile - "org.apache.beam" % "beam-sdks-java-core" % beamVersion, - "org.apache.beam" % "beam-sdks-java-io-snowflake" % beamVersion, + "com.nrinaudo" %% "kantan.codecs" % kantanCodecsVersion, "com.nrinaudo" %% "kantan.csv" % kantanCsvVersion, - "com.nrinaudo" %% "kantan.codecs" % kantanCodecsVersion + "joda-time" % "joda-time" % jodaTimeVersion, + "org.apache.beam" % "beam-sdks-java-core" % beamVersion, + "org.apache.beam" % "beam-sdks-java-io-snowflake" % beamVersion ), tlMimaPreviousVersions := Set.empty // TODO: remove once released )