From 4d76e7840435ebfe66cacddb691cbc7f4c5750bb Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 11:16:12 +0900 Subject: [PATCH 01/12] Remove output option (which is deprecated) from athena.ctas> operator --- .../plugin/athena/apas/AthenaApasOperator.scala | 6 +++--- .../plugin/athena/ctas/AthenaCtasOperator.scala | 15 +-------------- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala index 6607952..23a5f35 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/apas/AthenaApasOperator.scala @@ -137,7 +137,7 @@ class AthenaApasOperator(operatorName: String, subTask.set("+diff-schema", buildDiffSchemaInternalSubTaskConfig(comparisonTableName = dummyTable)) subTask.set("+drop-dummy", buildDropTableSubTaskConfig(tableName = dummyTable)) subTask.set("+store-data-by-ctas", buildCtasSubTaskConfig(tableName = dummyTable, - outputLocation = Option(location), + location = Option(location), format = fmt, compression = c, fieldDelimiter = fd, @@ -199,7 +199,7 @@ class AthenaApasOperator(operatorName: String, } protected def buildCtasSubTaskConfig(tableName: String, - outputLocation: Option[String] = None, + location: Option[String] = None, format: String, compression: Option[String] = None, fieldDelimiter: Option[String] = None, @@ -212,7 +212,7 @@ class AthenaApasOperator(operatorName: String, subTask.set("database", database) subTask.set("table", tableName) if (workGroup.isPresent) subTask.set("workgroup", workGroup.get()) - outputLocation.foreach(ol => subTask.set("output", ol)) + location.foreach(l => subTask.set("location", l)) subTask.set("format", format) compression.foreach(c => subTask.set("compression", c)) fieldDelimiter.foreach(fd => subTask.set("field_delimiter", fd)) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index b6634f7..4bf032b 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -86,20 +86,7 @@ class AthenaCtasOperator(operatorName: String, protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], defaultTableName) protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String]) - @deprecated(message = "Use location option instead", since = "0.2.2") - protected val output: Optional[String] = params.getOptional("output", classOf[String]) - protected val location: Optional[String] = { - val l = params.getOptional("location", classOf[String]) - if (output.isPresent && l.isPresent) { - logger.warn(s"Use the value of location option: ${l.get()} although the value of output option (${output.get()}) is specified.") - l - } - else if (output.isPresent) { - logger.warn("output option is deprecated. Please use location option instead.") - output - } - else l - } + protected val location: Optional[String] = params.getOptional("location", classOf[String]) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) From 4caa8f87cb4cda7d3b303b598b7c593b2ee5ccd2 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 11:25:40 +0900 Subject: [PATCH 02/12] Remove select_query option (which is deprecated) from athena.ctas> operator --- .../plugin/athena/ctas/AthenaCtasOperator.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index 4bf032b..c13e376 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -72,17 +72,7 @@ class AthenaCtasOperator(operatorName: String, s"digdag_athena_ctas_${normalizedSessionUuid}_$random" } - @deprecated(message = "Use athena.ctas> instead.", since = "0.2.0") - protected val _selectQueryOrFile: Optional[String] = params.getOptional("select_query", classOf[String]) - protected val selectQueryOrFile: String = { - val _command = params.getOptional("_command", classOf[String]) - if (_selectQueryOrFile.isPresent && _command.isPresent) throw new ConfigException("Use athena.ctas> instead of 'select_query'") - else if (_command.isPresent) _command.get() - else { - logger.warn("'select_query' is deprecated. Use athena.ctas> instead.") - _selectQueryOrFile.get() - } - } + protected val selectQueryOrFile: String = params.get("_command", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], defaultTableName) protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String]) From 873abf290e881c482283bc9a2b585885addbe0bb Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 11:35:01 +0900 Subject: [PATCH 03/12] Replace com.google.common.base.Optional -> scala.Option in athena.ctas> --- .../athena/ctas/AthenaCtasOperator.scala | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index c13e376..8e8ee82 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -3,7 +3,6 @@ package pro.civitaspo.digdag.plugin.athena.ctas import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, TemplateEngine} import io.digdag.util.DurationParam @@ -73,16 +72,16 @@ class AthenaCtasOperator(operatorName: String, } protected val selectQueryOrFile: String = params.get("_command", classOf[String]) - protected val database: Optional[String] = params.getOptional("database", classOf[String]) + protected val database: Option[String] = Option(params.getOptional("database", classOf[String]).orNull()) protected val table: String = params.get("table", classOf[String], defaultTableName) - protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String]) - protected val location: Optional[String] = params.getOptional("location", classOf[String]) + protected val workGroup: Option[String] = Option(params.getOptional("workgroup", classOf[String]).orNull()) + protected val location: Option[String] = Option(params.getOptional("location", classOf[String]).orNull()) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") - protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) + protected val fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull()) protected val partitionedBy: Seq[String] = params.getListOrEmpty("partitioned_by", classOf[String]).asScala.toSeq protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala.toSeq - protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int]) + protected val bucketCount: Option[Int] = Option(params.getOptional("bucket_count", classOf[Int]).orNull()) protected val additionalProperties: Map[String, String] = params.getMapOrEmpty("additional_properties", classOf[String], classOf[String]).asScala.toMap protected val tableMode: TableMode = TableMode(params.get("table_mode", classOf[String], "default")) protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) @@ -126,12 +125,12 @@ class AthenaCtasOperator(operatorName: String, override def runTask(): TaskResult = { saveMode match { - case SaveMode.ErrorIfExists if location.isPresent && hasObjects(location.get) => + case SaveMode.ErrorIfExists if location.isDefined && hasObjects(location.get) => throw new IllegalStateException(s"${location.get} already exists") - case SaveMode.Ignore if location.isPresent && hasObjects(location.get) => + case SaveMode.Ignore if location.isDefined && hasObjects(location.get) => logger.info(s"${location.get} already exists, so ignore this session.") return TaskResult.empty(request) - case SaveMode.Overwrite if location.isPresent => + case SaveMode.Overwrite if location.isDefined => logger.info(s"Overwrite ${location.get}") rmObjects(location.get) case _ => // do nothing @@ -160,19 +159,19 @@ class AthenaCtasOperator(operatorName: String, protected def generateCtasQuery(): String = { val propsBuilder = Map.newBuilder[String, String] - if (location.isPresent) propsBuilder += ("external_location" -> s"'${location.get}'") + location.foreach(l => propsBuilder += ("external_location" -> s"'$l'")) propsBuilder += ("format" -> s"'$format'") format match { case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'") case "orc" => propsBuilder += ("orc_compression" -> s"'$compression'") case _ => logger.info(s"compression is not supported for format: $format.") } - if (fieldDelimiter.isPresent) propsBuilder += ("field_delimiter" -> s"'${fieldDelimiter.get}'") + fieldDelimiter.foreach(fd => propsBuilder += ("field_delimiter" -> s"'$fd'")) if (partitionedBy.nonEmpty) propsBuilder += ("partitioned_by" -> s"ARRAY[${partitionedBy.map(s => s"'$s'").mkString(",")}]") if (bucketedBy.nonEmpty) { propsBuilder += ("bucketed_by" -> s"ARRAY[${bucketedBy.map(s => s"'$s'").mkString(",")}]") - if (!bucketCount.isPresent) throw new ConfigException(s"`bucket_count` must be set if `bucketed_by` is set.") - propsBuilder += ("bucket_count" -> s"${bucketCount.get}") + if (bucketCount.isEmpty) throw new ConfigException(s"`bucket_count` must be set if `bucketed_by` is set.") + bucketCount.foreach(bc => propsBuilder += ("bucket_count" -> s"$bc")) } if (additionalProperties.nonEmpty) propsBuilder ++= additionalProperties @@ -213,8 +212,8 @@ class AthenaCtasOperator(operatorName: String, subTask.set("_type", "athena.query") subTask.set("_command", query) subTask.set("token_prefix", tokenPrefix) - if (database.isPresent) subTask.set("database", database) - if (workGroup.isPresent) subTask.set("workgroup", workGroup) + database.foreach(db => subTask.set("database", db)) + workGroup.foreach(wg => subTask.set("workgroup", wg)) subTask.set("timeout", timeout.toString) subTask.set("preview", false) From f02e51464df7b8e40364d4db41a47241ec7280b1 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 11:41:56 +0900 Subject: [PATCH 04/12] Add '/' suffix if location does not have '/' as suffix in athena.ctas> operator --- .../digdag/plugin/athena/ctas/AthenaCtasOperator.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index 8e8ee82..8c498f9 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -75,7 +75,12 @@ class AthenaCtasOperator(operatorName: String, protected val database: Option[String] = Option(params.getOptional("database", classOf[String]).orNull()) protected val table: String = params.get("table", classOf[String], defaultTableName) protected val workGroup: Option[String] = Option(params.getOptional("workgroup", classOf[String]).orNull()) - protected val location: Option[String] = Option(params.getOptional("location", classOf[String]).orNull()) + protected val location: Option[String] = { + Option(params.getOptional("location", classOf[String]).orNull()) match { + case Some(l) if !l.endsWith("/") => Option(s"$l/") + case option: Option[String] => option + } + } protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") protected val fieldDelimiter: Option[String] = Option(params.getOptional("field_delimiter", classOf[String]).orNull()) From 375c1f58c5652b0e08f995db0bf23cd9b25798e7 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 11:54:16 +0900 Subject: [PATCH 05/12] Use scala.Option natively and use aws.s3 package directly. --- .../athena/ctas/AthenaCtasOperator.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index 8c498f9..aa96dfd 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -130,15 +130,20 @@ class AthenaCtasOperator(operatorName: String, override def runTask(): TaskResult = { saveMode match { - case SaveMode.ErrorIfExists if location.isDefined && hasObjects(location.get) => + case SaveMode.ErrorIfExists if location.exists(aws.s3.hasObjects) => throw new IllegalStateException(s"${location.get} already exists") - case SaveMode.Ignore if location.isDefined && hasObjects(location.get) => + + case SaveMode.Ignore if location.exists(aws.s3.hasObjects) => logger.info(s"${location.get} already exists, so ignore this session.") return TaskResult.empty(request) - case SaveMode.Overwrite if location.isDefined => - logger.info(s"Overwrite ${location.get}") - rmObjects(location.get) - case _ => // do nothing + + case SaveMode.Overwrite => + location.foreach { l => + logger.info(s"Overwrite $l") + aws.s3.rm_r(l).foreach(uri => logger.info(s"Deleted: ${uri.toString}")) + } + + case _ => // do nothing } val subTask: Config = cf.create() @@ -151,16 +156,6 @@ class AthenaCtasOperator(operatorName: String, builder.build() } - protected def hasObjects(location: String): Boolean = - { - aws.s3.ls(location).nonEmpty - } - - protected def rmObjects(location: String): Unit = - { - aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}")) - } - protected def generateCtasQuery(): String = { val propsBuilder = Map.newBuilder[String, String] From d6b772709e0ad741ad64aa687897c95e4d5c5588 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:11:04 +0900 Subject: [PATCH 06/12] Use athena.drop_table> operator instead of athena.query> operator --- .../athena/ctas/AthenaCtasOperator.scala | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index aa96dfd..3c2a040 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -65,6 +65,8 @@ class AthenaCtasOperator(operatorName: String, } } + protected val DEFAULT_DATABASE_NAME = "default" + protected lazy val defaultTableName: String = { val normalizedSessionUuid: String = sessionUuid.replaceAll("-", "") val random: String = Random.alphanumeric.take(5).mkString @@ -92,6 +94,7 @@ class AthenaCtasOperator(operatorName: String, protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena-ctas") protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) + protected val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull()) protected lazy val selectQuery: String = { val t: Try[String] = @@ -147,9 +150,9 @@ class AthenaCtasOperator(operatorName: String, } val subTask: Config = cf.create() - if (saveMode.equals(SaveMode.Overwrite)) subTask.setNested("+drop-before-ctas", buildQuerySubTaskConfig(generateDropTableQuery())) - subTask.setNested("+ctas", buildQuerySubTaskConfig(generateCtasQuery())) - if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildQuerySubTaskConfig(generateDropTableQuery())) + if (saveMode.equals(SaveMode.Overwrite)) subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig()) + subTask.setNested("+ctas", buildCtasQuerySubTaskConfig()) + if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig()) val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf) builder.subtaskConfig(subTask) @@ -196,33 +199,44 @@ class AthenaCtasOperator(operatorName: String, | """.stripMargin } - protected def generateDropTableQuery(): String = + protected def putCommonSettingToSubTask(subTask: Config): Unit = { - s""" -- GENERATED BY digdag athena.ctas> operator - | DROP TABLE IF EXISTS $table - | """.stripMargin + subTask.set("auth_method", aws.conf.authMethod) + subTask.set("profile_name", aws.conf.profileName) + if (aws.conf.profileFile.isPresent) subTask.set("profile_file", aws.conf.profileFile.get()) + subTask.set("use_http_proxy", aws.conf.useHttpProxy) + subTask.set("region", aws.region) + if (aws.conf.endpoint.isPresent) subTask.set("endpoint", aws.conf.endpoint.get()) } - protected def buildQuerySubTaskConfig(query: String): Config = + protected def buildCtasQuerySubTaskConfig(): Config = { - logger.info(s"Will execute query in athena.query>: $query") - val subTask: Config = cf.create() subTask.set("_type", "athena.query") - subTask.set("_command", query) + subTask.set("_command", generateCtasQuery()) subTask.set("token_prefix", tokenPrefix) database.foreach(db => subTask.set("database", db)) workGroup.foreach(wg => subTask.set("workgroup", wg)) subTask.set("timeout", timeout.toString) subTask.set("preview", false) - subTask.set("auth_method", aws.conf.authMethod) - subTask.set("profile_name", aws.conf.profileName) - if (aws.conf.profileFile.isPresent) subTask.set("profile_file", aws.conf.profileFile.get()) - subTask.set("use_http_proxy", aws.conf.useHttpProxy) - subTask.set("region", aws.region) - if (aws.conf.endpoint.isPresent) subTask.set("endpoint", aws.conf.endpoint.get()) + putCommonSettingToSubTask(subTask) + + subTask + } + + protected def buildDropTableSubTaskConfig(): Config = + { + val subTask: Config = cf.create() + + subTask.set("_type", "athena.drop_table") + subTask.set("database", database.getOrElse(DEFAULT_DATABASE_NAME)) + subTask.set("table", table) + subTask.set("with_location", false) + catalogId.foreach(cid => subTask.set("catalog_id", cid)) + + putCommonSettingToSubTask(subTask) subTask } From e5a85b699407adbade86f8d3acd93db4e3d69ef2 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:11:49 +0900 Subject: [PATCH 07/12] Output info log if the table does not exist in athena.drop_table> operator. --- .../plugin/athena/drop_table/AthenaDropTableOperator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table/AthenaDropTableOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table/AthenaDropTableOperator.scala index 4176f5a..de8aea7 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table/AthenaDropTableOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/drop_table/AthenaDropTableOperator.scala @@ -23,6 +23,7 @@ class AthenaDropTableOperator(operatorName: String, { if (!aws.glue.table.exists(catalogId, database, table)) { if (!ignoreIfNotExist) throw new IllegalStateException(s"The table '$database.$table' does not exist.") + logger.info(s"Do nothing because the table '$database.$table' does not exist.") return TaskResult.empty(cf) } From 4ddad0625df69e239d46f4c97bf5f7f363bb74d3 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:22:31 +0900 Subject: [PATCH 08/12] Use DEFAULT_DATABASE_NAME always if database option is not specifed in athena.ctas> operator --- .../digdag/plugin/athena/ctas/AthenaCtasOperator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index 3c2a040..a1a43d5 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -74,7 +74,7 @@ class AthenaCtasOperator(operatorName: String, } protected val selectQueryOrFile: String = params.get("_command", classOf[String]) - protected val database: Option[String] = Option(params.getOptional("database", classOf[String]).orNull()) + protected val database: String = params.getOptional("database", classOf[String]).or(DEFAULT_DATABASE_NAME) protected val table: String = params.get("table", classOf[String], defaultTableName) protected val workGroup: Option[String] = Option(params.getOptional("workgroup", classOf[String]).orNull()) protected val location: Option[String] = { @@ -216,7 +216,7 @@ class AthenaCtasOperator(operatorName: String, subTask.set("_type", "athena.query") subTask.set("_command", generateCtasQuery()) subTask.set("token_prefix", tokenPrefix) - database.foreach(db => subTask.set("database", db)) + subTask.set("database", database) workGroup.foreach(wg => subTask.set("workgroup", wg)) subTask.set("timeout", timeout.toString) subTask.set("preview", false) @@ -231,7 +231,7 @@ class AthenaCtasOperator(operatorName: String, val subTask: Config = cf.create() subTask.set("_type", "athena.drop_table") - subTask.set("database", database.getOrElse(DEFAULT_DATABASE_NAME)) + subTask.set("database", database) subTask.set("table", table) subTask.set("with_location", false) catalogId.foreach(cid => subTask.set("catalog_id", cid)) From 625b5b2ab0804b4cf3eb3df0e200f12f1c3ba19a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:28:25 +0900 Subject: [PATCH 09/12] Check the table existence without running query in athena.ctas> operator --- .../athena/ctas/AthenaCtasOperator.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index a1a43d5..47b15cb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -133,13 +133,22 @@ class AthenaCtasOperator(operatorName: String, override def runTask(): TaskResult = { saveMode match { - case SaveMode.ErrorIfExists if location.exists(aws.s3.hasObjects) => - throw new IllegalStateException(s"${location.get} already exists") - - case SaveMode.Ignore if location.exists(aws.s3.hasObjects) => - logger.info(s"${location.get} already exists, so ignore this session.") - return TaskResult.empty(request) + case SaveMode.ErrorIfExists => + if (aws.glue.table.exists(catalogId, database, table)) { + throw new IllegalStateException(s"'$database.$table' already exists") + } + if (location.exists(aws.s3.hasObjects)) { + throw new IllegalStateException(s"${location.get} already exists") + } + case SaveMode.Ignore => + if (aws.glue.table.exists(catalogId, database, table)) { + logger.info(s"'$database.$table' already exists, so ignore this session.") + return TaskResult.empty(request) + } + if (location.exists(aws.s3.hasObjects)) { + logger.info(s"${location.get} already exists, so ignore this session.") + return TaskResult.empty(request) case SaveMode.Overwrite => location.foreach { l => logger.info(s"Overwrite $l") From 58e5a9e246af349377909e711f4caf1b710b8595 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:29:14 +0900 Subject: [PATCH 10/12] Use athena.drop_table> operator completely when athena.ctas> save_mode option is overwrite --- .../plugin/athena/ctas/AthenaCtasOperator.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala index 47b15cb..ad923b8 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala @@ -149,19 +149,19 @@ class AthenaCtasOperator(operatorName: String, if (location.exists(aws.s3.hasObjects)) { logger.info(s"${location.get} already exists, so ignore this session.") return TaskResult.empty(request) - case SaveMode.Overwrite => - location.foreach { l => - logger.info(s"Overwrite $l") - aws.s3.rm_r(l).foreach(uri => logger.info(s"Deleted: ${uri.toString}")) } case _ => // do nothing } val subTask: Config = cf.create() - if (saveMode.equals(SaveMode.Overwrite)) subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig()) + if (saveMode.equals(SaveMode.Overwrite)) { + subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig(with_location = true)) + } subTask.setNested("+ctas", buildCtasQuerySubTaskConfig()) - if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig()) + if (tableMode.equals(TableMode.DataOnly)) { + subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig(with_location = false)) + } val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf) builder.subtaskConfig(subTask) @@ -235,14 +235,14 @@ class AthenaCtasOperator(operatorName: String, subTask } - protected def buildDropTableSubTaskConfig(): Config = + protected def buildDropTableSubTaskConfig(with_location: Boolean): Config = { val subTask: Config = cf.create() subTask.set("_type", "athena.drop_table") subTask.set("database", database) subTask.set("table", table) - subTask.set("with_location", false) + subTask.set("with_location", with_location) catalogId.foreach(cid => subTask.set("catalog_id", cid)) putCommonSettingToSubTask(subTask) From 20e89c071e1620f63216e40dcedd73da35f7be5b Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 12:46:50 +0900 Subject: [PATCH 11/12] Output query log in athena.query> operator --- .../digdag/plugin/athena/query/AthenaQueryOperator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/AthenaQueryOperator.scala index 036cd80..cfbb6db 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/AthenaQueryOperator.scala @@ -105,6 +105,7 @@ class AthenaQueryOperator(operatorName: String, override def runTask(): TaskResult = { + logger.info(s"Run the query: \n$query") val execution: QueryExecution = aws.athena.runQuery(query = query, database = Option(database.orNull), workGroup = Option(workGroup.orNull()), From d02ea653b24fa0c9b515ace1949e6600fef64694 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 30 Jul 2019 14:58:13 +0900 Subject: [PATCH 12/12] Ship v0.3.0 --- CHANGELOG.md | 12 ++++++++++++ README.md | 2 +- build.gradle | 2 +- example/example.dig | 2 +- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a011d2..612cc50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +0.3.0 (2019-07-30) +================== + +* [Breaking change -- `athena.ctas>`] Remove output option which has been deprecated since v0.2.2. +* [Breaking change -- `athena.ctas>`] Remove select_query option which has been deprecated since v0.2.0. +* [Note -- `athena.ctas>`] Replace com.google.common.base.Optional -> scala.Option. +* [Enhancement -- `athena.ctas>`] Add `/` as suffix if location option does not have. +* [Change -- `athena.ctas>`] Use `athena.drop_table>` operator instead of `athena.query>` operator when deleting the table. + * This change is to reduce the number of query executions. + * `athena.ctas>` operator depends on Glue privileges by this change. +* [Note -- `athena.ctas>`] Use `default` database if not database option specified. + 0.2.5 (2019-07-29) ================== diff --git a/README.md b/README.md index 05aaafc..f1486dc 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.5 + - pro.civitaspo:digdag-operator-athena:0.3.0 athena: auth_method: profile diff --git a/build.gradle b/build.gradle index e202561..6c3f647 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.2.5' +version = '0.3.0' def digdagVersion = '0.9.37' def awsSdkVersion = "1.11.587" diff --git a/example/example.dig b/example/example.dig index f9bb091..c1e2645 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.2.5 + - pro.civitaspo:digdag-operator-athena:0.3.0 athena: auth_method: profile value: 5