Skip to content

Commit

Permalink
Merge pull request #72 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.3.0
  • Loading branch information
civitaspo authored Jul 30, 2019
2 parents 5e88785 + a270ba8 commit 56e7d18
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 76 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
0.3.0 (2019-07-30)
==================

* [Breaking change -- `athena.ctas>`] Remove output option which has been deprecated since v0.2.2.
* [Breaking change -- `athena.ctas>`] Remove select_query option which has been deprecated since v0.2.0.
* [Note -- `athena.ctas>`] Replace com.google.common.base.Optional -> scala.Option.
* [Enhancement -- `athena.ctas>`] Add `/` as suffix if location option does not have.
* [Change -- `athena.ctas>`] Use `athena.drop_table>` operator instead of `athena.query>` operator when deleting the table.
* This change is to reduce the number of query executions.
* `athena.ctas>` operator depends on Glue privileges by this change.
* [Note -- `athena.ctas>`] Use `default` database if not database option specified.

0.2.5 (2019-07-29)
==================

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.5
- pro.civitaspo:digdag-operator-athena:0.3.0
athena:
auth_method: profile

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.2.5'
version = '0.3.0'

def digdagVersion = '0.9.37'
def awsSdkVersion = "1.11.587"
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.2.5
- pro.civitaspo:digdag-operator-athena:0.3.0
athena:
auth_method: profile
value: 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class AthenaApasOperator(operatorName: String,
subTask.set("+diff-schema", buildDiffSchemaInternalSubTaskConfig(comparisonTableName = dummyTable))
subTask.set("+drop-dummy", buildDropTableSubTaskConfig(tableName = dummyTable))
subTask.set("+store-data-by-ctas", buildCtasSubTaskConfig(tableName = dummyTable,
outputLocation = Option(location),
location = Option(location),
format = fmt,
compression = c,
fieldDelimiter = fd,
Expand Down Expand Up @@ -199,7 +199,7 @@ class AthenaApasOperator(operatorName: String,
}

protected def buildCtasSubTaskConfig(tableName: String,
outputLocation: Option[String] = None,
location: Option[String] = None,
format: String,
compression: Option[String] = None,
fieldDelimiter: Option[String] = None,
Expand All @@ -212,7 +212,7 @@ class AthenaApasOperator(operatorName: String,
subTask.set("database", database)
subTask.set("table", tableName)
if (workGroup.isPresent) subTask.set("workgroup", workGroup.get())
outputLocation.foreach(ol => subTask.set("output", ol))
location.foreach(l => subTask.set("location", l))
subTask.set("format", format)
compression.foreach(c => subTask.set("compression", c))
fieldDelimiter.foreach(fd => subTask.set("field_delimiter", fd))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pro.civitaspo.digdag.plugin.athena.ctas

import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.base.Optional
import io.digdag.client.config.{Config, ConfigException}
import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
Expand Down Expand Up @@ -66,51 +65,36 @@ class AthenaCtasOperator(operatorName: String,
}
}

protected val DEFAULT_DATABASE_NAME = "default"

protected lazy val defaultTableName: String = {
val normalizedSessionUuid: String = sessionUuid.replaceAll("-", "")
val random: String = Random.alphanumeric.take(5).mkString
s"digdag_athena_ctas_${normalizedSessionUuid}_$random"
}

@deprecated(message = "Use athena.ctas> instead.", since = "0.2.0")
protected val _selectQueryOrFile: Optional[String] = params.getOptional("select_query", classOf[String])
protected val selectQueryOrFile: String = {
val _command = params.getOptional("_command", classOf[String])
if (_selectQueryOrFile.isPresent && _command.isPresent) throw new ConfigException("Use athena.ctas> instead of 'select_query'")
else if (_command.isPresent) _command.get()
else {
logger.warn("'select_query' is deprecated. Use athena.ctas> instead.")
_selectQueryOrFile.get()
}
}
protected val database: Optional[String] = params.getOptional("database", classOf[String])
protected val selectQueryOrFile: String = params.get("_command", classOf[String])
protected val database: String = params.getOptional("database", classOf[String]).or(DEFAULT_DATABASE_NAME)
protected val table: String = params.get("table", classOf[String], defaultTableName)
protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String])
@deprecated(message = "Use location option instead", since = "0.2.2")
protected val output: Optional[String] = params.getOptional("output", classOf[String])
protected val location: Optional[String] = {
val l = params.getOptional("location", classOf[String])
if (output.isPresent && l.isPresent) {
logger.warn(s"Use the value of location option: ${l.get()} although the value of output option (${output.get()}) is specified.")
l
protected val workGroup: Option[String] = Option(params.getOptional("workgroup", classOf[String]).orNull())
protected val location: Option[String] = {
Option(params.getOptional("location", classOf[String]).orNull()) match {
case Some(l) if !l.endsWith("/") => Option(s"$l/")
case option: Option[String] => option
}
else if (output.isPresent) {
logger.warn("output option is deprecated. Please use location option instead.")
output
}
else l
}
protected val format: String = params.get("format", classOf[String], "parquet")
protected val compression: String = params.get("compression", classOf[String], "snappy")
protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String])
protected val fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull())
protected val partitionedBy: Seq[String] = params.getListOrEmpty("partitioned_by", classOf[String]).asScala.toSeq
protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala.toSeq
protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int])
protected val bucketCount: Option[Int] = Option(params.getOptional("bucket_count", classOf[Int]).orNull())
protected val additionalProperties: Map[String, String] = params.getMapOrEmpty("additional_properties", classOf[String], classOf[String]).asScala.toMap
protected val tableMode: TableMode = TableMode(params.get("table_mode", classOf[String], "default"))
protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite"))
protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena-ctas")
protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m"))
protected val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())

protected lazy val selectQuery: String = {
val t: Try[String] =
Expand Down Expand Up @@ -149,53 +133,57 @@ class AthenaCtasOperator(operatorName: String,
override def runTask(): TaskResult =
{
saveMode match {
case SaveMode.ErrorIfExists if location.isPresent && hasObjects(location.get) =>
throw new IllegalStateException(s"${location.get} already exists")
case SaveMode.Ignore if location.isPresent && hasObjects(location.get) =>
logger.info(s"${location.get} already exists, so ignore this session.")
return TaskResult.empty(request)
case SaveMode.Overwrite if location.isPresent =>
logger.info(s"Overwrite ${location.get}")
rmObjects(location.get)
case _ => // do nothing
case SaveMode.ErrorIfExists =>
if (aws.glue.table.exists(catalogId, database, table)) {
throw new IllegalStateException(s"'$database.$table' already exists")
}
if (location.exists(aws.s3.hasObjects)) {
throw new IllegalStateException(s"${location.get} already exists")
}

case SaveMode.Ignore =>
if (aws.glue.table.exists(catalogId, database, table)) {
logger.info(s"'$database.$table' already exists, so ignore this session.")
return TaskResult.empty(request)
}
if (location.exists(aws.s3.hasObjects)) {
logger.info(s"${location.get} already exists, so ignore this session.")
return TaskResult.empty(request)
}

case _ => // do nothing
}

val subTask: Config = cf.create()
if (saveMode.equals(SaveMode.Overwrite)) subTask.setNested("+drop-before-ctas", buildQuerySubTaskConfig(generateDropTableQuery()))
subTask.setNested("+ctas", buildQuerySubTaskConfig(generateCtasQuery()))
if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildQuerySubTaskConfig(generateDropTableQuery()))
if (saveMode.equals(SaveMode.Overwrite)) {
subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig(with_location = true))
}
subTask.setNested("+ctas", buildCtasQuerySubTaskConfig())
if (tableMode.equals(TableMode.DataOnly)) {
subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig(with_location = false))
}

val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf)
builder.subtaskConfig(subTask)
builder.build()
}

protected def hasObjects(location: String): Boolean =
{
aws.s3.ls(location).nonEmpty
}

protected def rmObjects(location: String): Unit =
{
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
}

protected def generateCtasQuery(): String =
{
val propsBuilder = Map.newBuilder[String, String]
if (location.isPresent) propsBuilder += ("external_location" -> s"'${location.get}'")
location.foreach(l => propsBuilder += ("external_location" -> s"'$l'"))
propsBuilder += ("format" -> s"'$format'")
format match {
case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'")
case "orc" => propsBuilder += ("orc_compression" -> s"'$compression'")
case _ => logger.info(s"compression is not supported for format: $format.")
}
if (fieldDelimiter.isPresent) propsBuilder += ("field_delimiter" -> s"'${fieldDelimiter.get}'")
fieldDelimiter.foreach(fd => propsBuilder += ("field_delimiter" -> s"'$fd'"))
if (partitionedBy.nonEmpty) propsBuilder += ("partitioned_by" -> s"ARRAY[${partitionedBy.map(s => s"'$s'").mkString(",")}]")
if (bucketedBy.nonEmpty) {
propsBuilder += ("bucketed_by" -> s"ARRAY[${bucketedBy.map(s => s"'$s'").mkString(",")}]")
if (!bucketCount.isPresent) throw new ConfigException(s"`bucket_count` must be set if `bucketed_by` is set.")
propsBuilder += ("bucket_count" -> s"${bucketCount.get}")
if (bucketCount.isEmpty) throw new ConfigException(s"`bucket_count` must be set if `bucketed_by` is set.")
bucketCount.foreach(bc => propsBuilder += ("bucket_count" -> s"$bc"))
}
if (additionalProperties.nonEmpty) propsBuilder ++= additionalProperties

Expand All @@ -220,33 +208,44 @@ class AthenaCtasOperator(operatorName: String,
| """.stripMargin
}

protected def generateDropTableQuery(): String =
protected def putCommonSettingToSubTask(subTask: Config): Unit =
{
s""" -- GENERATED BY digdag athena.ctas> operator
| DROP TABLE IF EXISTS $table
| """.stripMargin
subTask.set("auth_method", aws.conf.authMethod)
subTask.set("profile_name", aws.conf.profileName)
if (aws.conf.profileFile.isPresent) subTask.set("profile_file", aws.conf.profileFile.get())
subTask.set("use_http_proxy", aws.conf.useHttpProxy)
subTask.set("region", aws.region)
if (aws.conf.endpoint.isPresent) subTask.set("endpoint", aws.conf.endpoint.get())
}

protected def buildQuerySubTaskConfig(query: String): Config =
protected def buildCtasQuerySubTaskConfig(): Config =
{
logger.info(s"Will execute query in athena.query>: $query")

val subTask: Config = cf.create()

subTask.set("_type", "athena.query")
subTask.set("_command", query)
subTask.set("_command", generateCtasQuery())
subTask.set("token_prefix", tokenPrefix)
if (database.isPresent) subTask.set("database", database)
if (workGroup.isPresent) subTask.set("workgroup", workGroup)
subTask.set("database", database)
workGroup.foreach(wg => subTask.set("workgroup", wg))
subTask.set("timeout", timeout.toString)
subTask.set("preview", false)

subTask.set("auth_method", aws.conf.authMethod)
subTask.set("profile_name", aws.conf.profileName)
if (aws.conf.profileFile.isPresent) subTask.set("profile_file", aws.conf.profileFile.get())
subTask.set("use_http_proxy", aws.conf.useHttpProxy)
subTask.set("region", aws.region)
if (aws.conf.endpoint.isPresent) subTask.set("endpoint", aws.conf.endpoint.get())
putCommonSettingToSubTask(subTask)

subTask
}

protected def buildDropTableSubTaskConfig(with_location: Boolean): Config =
{
val subTask: Config = cf.create()

subTask.set("_type", "athena.drop_table")
subTask.set("database", database)
subTask.set("table", table)
subTask.set("with_location", with_location)
catalogId.foreach(cid => subTask.set("catalog_id", cid))

putCommonSettingToSubTask(subTask)

subTask
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AthenaDropTableOperator(operatorName: String,
{
if (!aws.glue.table.exists(catalogId, database, table)) {
if (!ignoreIfNotExist) throw new IllegalStateException(s"The table '$database.$table' does not exist.")
logger.info(s"Do nothing because the table '$database.$table' does not exist.")
return TaskResult.empty(cf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class AthenaQueryOperator(operatorName: String,

override def runTask(): TaskResult =
{
logger.info(s"Run the query: \n$query")
val execution: QueryExecution = aws.athena.runQuery(query = query,
database = Option(database.orNull),
workGroup = Option(workGroup.orNull()),
Expand Down

0 comments on commit 56e7d18

Please sign in to comment.