Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Provide spark catalog, dsv2 and use parquet for copy/unload #120

Closed
wants to merge 175 commits into from

Conversation

parisni
Copy link

@parisni parisni commented Jan 3, 2023

This PR:

  1. merge [WIP] Issue #69: Support for DSV2 #70 for datasource v2 on master fixes Feature request upgrade to datasource v2 #119
  2. spark catalog feature for reading and writing and DDL from spark sql to redshift (see readme.md) fixes Feature request provide spark catalog #118
  3. a cache with TTL on s3 for each table (for analytics use cases) fixes Feature request cache queries on s3 with TTL #114
  4. fixes empty parquet files when no rows in redshift fixes Feature request unload as parquet file #116
  5. provides copy from parquet fixes Feature request copy data with parquet format #117
  6. support redshift columns comments and faster tables schema discovery

rxin and others added 30 commits November 8, 2017 09:29
…563_remove-itests-from-public

Remove itests. Fix jdbc url. Update Redshift jdbc driver
…488_cleanup-fix-double-to-float

Fix double type to float and cleanup
…486_avoid-log-creds

datalake-486 avoid log creds
…4899_empty-string-to-null

Empty string is converted to null
…un - fix for STS token aws access in progress
…ion between different libraries versions! Tests pass and can compile spark-on-paasta and spark successfullygit add src/ project/
@parisni parisni marked this pull request as ready for review January 3, 2023 15:05
@parisni parisni changed the title Provide spark catalog, dsv2 and use parquet for copy/unload [WIP] Provide spark catalog, dsv2 and use parquet for copy/unload Jan 3, 2023
@@ -0,0 +1 @@
sbt.version=1.7.1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this file about? I don't think it's needed, and it also doesn't match the other sbt.version

Comment on lines 1 to 14
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")

addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")

addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing these plugins? Some like dependency-graph as just nice to have, but I'm pretty sure others like sbt-release and sbt-pgp are needed in order to release the jars to sonatype.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert that commit about sbt. For obscur reasons I need them to build in my current Dev setup.

build.sbt Outdated
Comment on lines 100 to 102
releaseCrossBuild := true,
licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0")),
releasePublishArtifactsAction := PgpKeys.publishSigned.value,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about changes in this file either

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, ignore this, will revert

@@ -570,6 +607,12 @@ and use that as a temp location for this data.
<td>Determined by the JDBC URL's subprotocol</td>
<td>The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL's subprotocol.</td>
</tr>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a readme entry for the unload format too

Comment on lines +44 to +45
"unloadformat" -> "csv",
"table_minutes_ttl" -> "-1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the PR correctly, only the v2 sources implement these parameters. Do you intend to make the v1 sources respect these parameters too? I think it's ok if they don't, but then we probably should make a distinct parameters class for dsv2 to make it clearer to users about what is available in each version.

That would also let us make the v2 sources default to parquet unloadformat without breaking any backwards compatibility things.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could backport the TTL for data source v1. But your proposal makes sense.

Copy link
Author

@parisni parisni Jan 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dsv2 provides limit pushdown in spark 3.3.x, so dsv1 and lot of code about CSV format could just be removed instead. That's the alternate way idlike to discuss

import io.github.spark_redshift_community.spark.redshift


class RedshiftCatalog extends JDBCTableCatalog {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add any test coverage of the catalog capabilities?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah catalog is a good way to test the whole thing

(ident.namespace() :+ ident.name()).map(dialect.quoteIdentifier).mkString(".")
}
override def invalidateTable(ident: Identifier): Unit = {
// TODO When refresh table, then drop the s3 folder
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a todo within this PR or for late? One of the readme entries mentioned we could invalidate the cache by doing a table refresh. I'm not sure if that is the same thing as invalidateTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, and I am about implementing it

Comment on lines +45 to +47
val convertedReadSchema = StructType(readDataSchema()
.copy().map(field => field.copy(dataType = StringType)))
val convertedDataSchema = StructType(dataSchema.copy().map(x => x.copy(dataType = StringType)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: only need to do these conversions if we're in csv mode

Comment on lines +64 to +68
/**
* A name to identify this table. Implementations should provide a meaningful name, like the
* database and table name from catalog, or the location of files for this table.
*/
override def name(): String = "redshift"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how this name is used in spark. Do we need to provide something more descriptive if we want users to be able to load multiple tables? We have getTableNameOrSubquery param which might be nice to include in this name depending on how it is used?


val jdbcWrapper: JDBCWrapper = DefaultJDBCWrapper

private def buildUnloadStmt(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to the other comment about v1 sources supporting ttl and parquet. iirc there is a build unload stmt method somewhere already and we might be able to share code between the v1 and v2 sources to have a single unload stmt builder?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah refactoring this would be valuable

Comment on lines 50 to 51
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why importing here instead of on the module like usual.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well in case I d'like to move the logic somewhere else this will help a bit. Nothing really important

This avoid s3 file listings to find the last cache candidate
Also use hadoop FS instead of aws low level client
@@ -179,15 +181,44 @@ private[redshift] class JDBCWrapper {
val isSigned = rsmd.isSigned(i + 1)
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
val columnType = getCatalystType(dataType, fieldSize, fieldScale, isSigned)
val comment = comments.get(columnName)
if(!comment.isEmpty){
fields(i) = StructField(columnName, columnType, nullable, comment.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Indentation.

@@ -165,7 +166,8 @@ private[redshift] class JDBCWrapper {
// the underlying JDBC driver implementation implements PreparedStatement.getMetaData() by
// executing the query. It looks like the standard Redshift and Postgres JDBC drivers don't do
// this but we leave the LIMIT condition here as a safety-net to guard against perf regressions.
val ps = conn.prepareStatement(s"SELECT * FROM $table LIMIT 1")
val comments = resolveComments(conn, table)
val ps = conn.prepareStatement(s"SELECT * FROM $table LIMIT 0")
Copy link
Collaborator

@88manpreet 88manpreet Jun 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the LIMIT changed to 0, since we only care about column metadata?
And making slightly more performant.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely

@@ -179,15 +181,44 @@ private[redshift] class JDBCWrapper {
val isSigned = rsmd.isSigned(i + 1)
val nullable = rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
val columnType = getCatalystType(dataType, fieldSize, fieldScale, isSigned)
val comment = comments.get(columnName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is added to preseve the comments?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed

@parisni
Copy link
Author

parisni commented Jun 15, 2023

found out 2 issues on this:

  • spark parrallelism to read parquet files = files number. It makes performances bad for reading after the unload. Better to just read the unload folder and skip the manifest stuff
  • when the query is cancelled on redshift side, then no error occurs and the lib returns a dataframe with the current state of the content (which is not complete)

checkAnswer(
sqlContext.sql("select * from test_table"),
TestUtils.expectedData)
withUnloadFormat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, RedshiftReadSuite only caters towards testing 'csv' format?
If so, are there plans to extend this to 'parquet' format too?
If not, I believe the changes in this particular file are no-op.

/**
* Create a new DataFrameReader using common options for reading from Redshift.
*/
override protected def read: DataFrameReader = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test file complete? Are there plans to add more tests similar to RedshiftReadSuite.scala?

/**
* The AWS SSE-KMS key to use for encryption during UNLOAD operations
* instead of AWS's default encryption
*/
def sseKmsKey: Option[String] = parameters.get("sse_kms_key")

/**
* The Int value to write for nulls when using CSV.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly confused here for the comment.
Do you intend to write, "The Int value to write for ttl when using CSV."

@smoy
Copy link
Member

smoy commented Sep 1, 2023

Because of an introduction of sensitive materials recently, I have to rewrite history using the procedure here: https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/removing-sensitive-data-from-a-repository

This create a lot more conflict in this pull request. If this PR is still wanted, but probably open a new one instead.

In addition, the AWS contribution has brought along many improvement that included some of the intended features of this original PR. Check #128

@smoy smoy closed this Sep 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet