From 216b6e0d14c8ce62a096334d878d52c8fb06c31c Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 15 Oct 2018 09:07:59 +0900 Subject: [PATCH 01/37] Remove keep_metadata and save_mode option from athena.query operator --- README.md | 10 -- .../digdag/plugin/athena/AthenaPlugin.scala | 10 +- .../athena/operator/AthenaQueryOperator.scala | 108 +++--------------- .../AthenaRemoveMetadataOperator.scala | 20 ---- 4 files changed, 18 insertions(+), 130 deletions(-) delete mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala diff --git a/README.md b/README.md index 3a2bd91..5e7fd5f 100644 --- a/README.md +++ b/README.md @@ -84,18 +84,8 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena"`) - **database**: The name of the database. (string, optional) - **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required) -- **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`) - - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. - - **NOTE**: [Athena supports CTAS](https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/), so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'keep_metadata' option will be removed and the default behaviour will become the same as `keep_metadata: true` (the current default behaviour is the same as `keep_metadata: false`) because this option was added for that the metadata file is obstructive when using the output csv as another table. -- **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) - - `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV. - - `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown. - - `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data. - - `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic. - - **NOTE**: [Athena supports CTAS](https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/), so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'save_mode' option will be removed and the behaviour will become the same as `save_mode: append` (the current default behaviour is the same as `save_mode: overwrite`) because this option was added for that lots of duplicated output csv files which are created by other executions are sometimes obstructive when using the output csv as another table. - **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`) - **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`) - - **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata. ### Output Parameters diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index fd74e82..68044fb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -1,12 +1,12 @@ package pro.civitaspo.digdag.plugin.athena -import java.util.{Arrays => JArrays, List => JList} import java.lang.reflect.Constructor +import java.util.{Arrays => JArrays, List => JList} import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator, AthenaRemoveMetadataOperator} +import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator} object AthenaPlugin { @@ -16,11 +16,7 @@ object AthenaPlugin { @Inject protected var templateEngine: TemplateEngine = null override def get(): JList[OperatorFactory] = { - JArrays.asList( - operatorFactory("athena.query", classOf[AthenaQueryOperator]), - operatorFactory("athena.preview", classOf[AthenaPreviewOperator]), - operatorFactory("athena.remove_metadata", classOf[AthenaRemoveMetadataOperator]) - ) + JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator]), operatorFactory("athena.preview", classOf[AthenaPreviewOperator])) } private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index f1d1ff5..09e3d9c 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -13,15 +13,13 @@ import com.amazonaws.services.athena.model.{ } import com.amazonaws.services.athena.model.QueryExecutionState.{CANCELLED, FAILED, QUEUED, RUNNING, SUCCEEDED} import com.amazonaws.services.s3.AmazonS3URI -import com.amazonaws.services.s3.model.S3ObjectSummary import com.google.common.base.Optional import com.google.common.collect.ImmutableList -import io.digdag.client.config.{Config, ConfigException, ConfigKey} +import io.digdag.client.config.{Config, ConfigKey} import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} import io.digdag.util.DurationParam import pro.civitaspo.digdag.plugin.athena.wrapper.{NotRetryableException, ParamInGiveup, ParamInRetry, RetryableException, RetryExecutorWrapper} -import scala.collection.JavaConverters._ import scala.util.Try import scala.util.hashing.MurmurHash3 @@ -32,25 +30,6 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) } - sealed abstract class SaveMode - - object SaveMode { - final case object Append extends SaveMode - final case object ErrorIfExists extends SaveMode - final case object Ignore extends SaveMode - final case object Overwrite extends SaveMode - - def apply(mode: String): SaveMode = { - mode match { - case "append" => Append - case "error_if_exists" => ErrorIfExists - case "ignore" => Ignore - case "overwrite" => Overwrite - case unknown => throw new ConfigException(s"[$operatorName] save mode '$unknown' is unsupported.") - } - } - } - case class LastQuery( id: String, database: Option[String] = None, @@ -88,23 +67,10 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena") protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val output: AmazonS3URI = { + // TODO: to optional val o = params.get("output", classOf[String]) AmazonS3URI(if (o.endsWith("/")) o else s"$o/") } - - @deprecated protected val keepMetadata: Boolean = { - logger.warn( - "Athena supports CTAS, so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'keep_metadata' option will be removed and the default behaviour will become the same as `keep_metadata: true` (the current default behaviour is the same as `keep_metadata: false`) because this option was added for that the metadata file is obstructive when using the output csv as another table." - ) - params.get("keep_metadata", classOf[Boolean], false) - } - - @deprecated protected val saveMode: SaveMode = { - logger.warn( - "Athena supports CTAS, so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'save_mode' option will be removed and the behaviour will become the same as `save_mode: append` (the current default behaviour is the same as `save_mode: overwrite`) because this option was added for that lots of duplicated output csv files which are created by other executions are sometimes obstructive when using the output csv as another table." - ) - SaveMode(params.get("save_mode", classOf[String], "overwrite")) - } protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) protected val preview: Boolean = params.get("preview", classOf[Boolean], true) @@ -121,41 +87,29 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system s"$tokenPrefix-$sessionUuid-$queryHash" } - override def runTask(): TaskResult = { - saveMode match { - case SaveMode.ErrorIfExists => - val result = withS3(_.listObjectsV2(output.getBucket, output.getKey)) - if (!result.getObjectSummaries.isEmpty) throw new IllegalStateException(s"[$operatorName] some objects already exists in `$output`.") - case SaveMode.Ignore => - val result = withS3(_.listObjectsV2(output.getBucket, output.getKey)) - if (!result.getObjectSummaries.isEmpty) { - logger.info(s"[$operatorName] some objects already exists in $output so do nothing in this session: `$sessionUuid`.") - val builder = TaskResult.defaultBuilder(request) - builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query"))) - return builder.build() - } - case _ => // do nothing + @deprecated private def showMessageIfUnsupportedOptionExists() = { + if (params.getOptional("keep_metadata", classOf[Boolean]).isPresent) { + logger.warn("`keep_metadata` option has removed, and the behaviour is the same as `keep_metadata: true`.") + } + if (params.getOptional("save_mode", classOf[String]).isPresent) { + logger.warn("`save_mode` option has removed, and the behaviour is the same as `save_mode: append` .") } + } + + override def runTask(): TaskResult = { + showMessageIfUnsupportedOptionExists() val execId: String = startQueryExecution val lastQuery: LastQuery = pollingQueryExecution(execId) - if (saveMode.equals(SaveMode.Overwrite)) { - withS3(_.listObjectsV2(output.getBucket, output.getKey)).getObjectSummaries.asScala - .filterNot(_.getKey.startsWith(lastQuery.outputCsvUri.getKey)) // filter output.csv and output.csv.metadata - .foreach { summary: S3ObjectSummary => - logger.info(s"[$operatorName] Delete s3://${summary.getBucketName}/${summary.getKey}") - withS3(_.deleteObject(summary.getBucketName, summary.getKey)) - } - } - + // TODO: Change outputCsvUri, logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") val p: Config = buildLastQueryParam(lastQuery) val builder = TaskResult.defaultBuilder(request) builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query"))) builder.storeParams(p) - builder.subtaskConfig(buildSubTaskConfig(lastQuery)) + if (preview) builder.subtaskConfig(buildPreviewSubTaskConfig(lastQuery)) builder.build() } @@ -221,7 +175,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system lastQueryParam.set("id", lastQuery.id) lastQueryParam.set("database", lastQuery.database.getOrElse(Optional.absent())) lastQueryParam.set("query", lastQuery.query) - lastQueryParam.set("output", lastQuery.outputCsvUri.toString) + lastQueryParam.set("output", lastQuery.outputCsvUri.toString) // TODO: It's not always csv, so should change. lastQueryParam.set("scan_bytes", lastQuery.scanBytes.getOrElse(Optional.absent())) lastQueryParam.set("exec_millis", lastQuery.execMillis.getOrElse(Optional.absent())) lastQueryParam.set("state", lastQuery.state) @@ -232,23 +186,6 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system ret } - protected def buildSubTaskConfig(lastQuery: LastQuery): Config = { - val subTask: Config = cf.create() - val export: Config = subTask.getNestedOrSetEmpty("_export").getNestedOrSetEmpty("athena") - - export.set("auth_method", authMethod) - export.set("profile_name", profileName) - if (profileFile.isPresent) export.set("profile_file", profileFile.get()) - export.set("use_http_proxy", useHttpProxy) - if (region.isPresent) export.set("region", region.get()) - if (endpoint.isPresent) export.set("endpoint", endpoint.get()) - - if (preview) subTask.setNested("+run-preview", buildPreviewSubTaskConfig(lastQuery)) - if (!keepMetadata) subTask.setNested("+run-remove-matadata", buildRemoveMetadataSubTaskConfig(lastQuery)) - - subTask - } - protected def buildPreviewSubTaskConfig(lastQuery: LastQuery): Config = { val subTask: Config = cf.create() subTask.set("_type", "athena.preview") @@ -264,19 +201,4 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system subTask } - - protected def buildRemoveMetadataSubTaskConfig(lastQuery: LastQuery): Config = { - val subTask: Config = cf.create() - subTask.set("_type", "athena.remove_metadata") - subTask.set("_command", lastQuery.outputCsvMetadataUri.toString) - - subTask.set("auth_method", authMethod) - subTask.set("profile_name", profileName) - if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) - subTask.set("use_http_proxy", useHttpProxy) - if (region.isPresent) subTask.set("region", region.get()) - if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) - - subTask - } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala deleted file mode 100644 index 88636ce..0000000 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaRemoveMetadataOperator.scala +++ /dev/null @@ -1,20 +0,0 @@ -package pro.civitaspo.digdag.plugin.athena.operator -import com.amazonaws.services.s3.AmazonS3URI -import io.digdag.client.config.Config -import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} - -class AthenaRemoveMetadataOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) - extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { - - object AmazonS3URI { - def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) - } - - protected val metadataUri: AmazonS3URI = AmazonS3URI(params.get("_command", classOf[String])) - - override def runTask(): TaskResult = { - logger.info(s"[$operatorName] Delete ${metadataUri.toString}.") - withS3(_.deleteObject(metadataUri.getBucket, metadataUri.getKey)) - TaskResult.empty(request) - } -} From 2ca58273af50602bf7819108e5ea28f94711507c Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 00:38:39 +0900 Subject: [PATCH 02/37] Add athena.ctas> operator to README --- README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/README.md b/README.md index 5e7fd5f..dd3d520 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,46 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **type**: The data type of the column. (string) - **athena.last_preview.rows**: The rows in the preview results. (array of array) +## Configuration for `athena.ctas>` operator + +### Options + +- **select_query**: The select SQL statements or file to be executed for a new table by [`Create Table As Select`]((https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/)). You can use digdag's template engine like `${...}` in the SQL query. (string, required) +- **database**: The database name for query execution context. (string, optional) +- **table**: The table name for the new table (string, default: `digdag-athena-ctas-${session_uuid}`) +- **output**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`) +- **format**: The data format for the CTAS query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: `"parquet"`) +- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: `"snappy"`) +- **field_delimiter**: The field delimiter for files in CSV, TSV, and text files. This option is applied only when **format** is specific to text-based data storage formats. (string, optional) +- **partitioned_by**: An array list of columns by which the CTAS table will be partitioned. Verify that the names of partitioned columns are listed last in the list of columns in the SELECT statement. (array of string, optional) +- **bucketed_by**: An array list of buckets to bucket data. If omitted, Athena does not bucket your data in this query. (array of string, optional) +- **bucket_count**: The number of buckets for bucketing your data. If omitted, Athena does not bucket your data. (integer, optional) +- **additional_properties**: Additional properties for CTAS. These are used for CTAS WITH clause without escaping. (string to string map, optional) +- **save_mode**: Specify the expected behavior of saving CTAS results. Available values are `"default"`, `"empty_table"`, `"data_only"`. See the below explanation of the behaviour. (string, default: `"default"`) + - `"default"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena. + - `"empty_table"`: Create a new empty table with the same schema as the select query results. + - `"data_only"`: +- **query_mode**: Specify the expected behavior of CTAS query. Available values are `"none"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) + - `"none"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena. + - `"error_if_exists"`: Raise error if the distination table or location exists. + - `"ignore"`: Skip CTAS query if the distination table or location exists. + - `"overwrite"`: Drop the distination table and remove objects before executing CTAS. This operation is not atomic. +- **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena-ctas"`) +- **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`) + +### Output Parameters + +- **athena.last_ctas_query.id**: The unique identifier for each query execution. (string) +- **athena.last_ctas_query.database**: The name of the database. (string) +- **athena.last_ctas_query.query**: The SQL query statements which the query execution ran. (string) +- **athena.last_ctas_query.output**: The location in Amazon S3 where your query results are stored. (string) +- **athena.last_ctas_query.scan_bytes**: The number of bytes in the data that was queried. (long) +- **athena.last_ctas_query.exec_millis**: The number of milliseconds that the query took to execute. (long) +- **athena.last_ctas_query.state**: The state of query execution. SUBMITTED indicates that the query is queued for execution. RUNNING indicates that the query is scanning data and returning results. SUCCEEDED indicates that the query completed without error. FAILED indicates that the query experienced an error and did not complete processing. CANCELLED indicates that user input interrupted query execution. (string) +- **athena.last_ctas_query.state_change_reason**: Further detail about the status of the query. (string) +- **athena.last_ctas_query.submitted_at**: The unix timestamp that the query was submitted. (integer) +- **athena.last_ctas_query.completed_at**: The unix timestamp that the query completed. (integer) + # Development ## Run an Example From a2b7ce08ef783605fefc864cbc8a373fc727a49d Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 00:51:47 +0900 Subject: [PATCH 03/37] Add a TODO comment --- .../digdag/plugin/athena/operator/AbstractAthenaOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala index 18c18f2..11eb5b7 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala @@ -30,7 +30,7 @@ import scala.util.Try abstract class AbstractAthenaOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends BaseOperator(context) { - protected val logger: Logger = LoggerFactory.getLogger(this.getClass) + protected val logger: Logger = LoggerFactory.getLogger(this.getClass) // TODO: use operatorName protected val cf: ConfigFactory = request.getConfig.getFactory protected val params: Config = { val elems: Seq[String] = operatorName.split("\\.") From e1b851cb8b6228d1776f47ef997ea8cc88a77c25 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 00:50:13 +0900 Subject: [PATCH 04/37] Add athena.ctas operator minimally --- .../digdag/plugin/athena/AthenaPlugin.scala | 13 +++++++++++-- .../plugin/athena/operator/AthenaCtasOperator.scala | 11 +++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index 68044fb..e093b8b 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -6,7 +6,12 @@ import java.util.{Arrays => JArrays, List => JList} import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator} +import pro.civitaspo.digdag.plugin.athena.operator.{ + AbstractAthenaOperator, + AthenaCtasOperator, + AthenaPreviewOperator, + AthenaQueryOperator, +} object AthenaPlugin { @@ -16,7 +21,11 @@ object AthenaPlugin { @Inject protected var templateEngine: TemplateEngine = null override def get(): JList[OperatorFactory] = { - JArrays.asList(operatorFactory("athena.query", classOf[AthenaQueryOperator]), operatorFactory("athena.preview", classOf[AthenaPreviewOperator])) + JArrays.asList( + operatorFactory("athena.ctas", classOf[AthenaCtasOperator]), + operatorFactory("athena.query", classOf[AthenaQueryOperator]), + operatorFactory("athena.preview", classOf[AthenaPreviewOperator]) + ) } private def operatorFactory[T <: AbstractAthenaOperator](operatorName: String, klass: Class[T]): OperatorFactory = { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala new file mode 100644 index 0000000..f96a3b7 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -0,0 +1,11 @@ +package pro.civitaspo.digdag.plugin.athena.operator +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} + +class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + + override def runTask(): TaskResult = { + null + } +} From 5b784d198f30f6459f1c45be31d3d7a52f8c0b1e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 04:48:10 +0900 Subject: [PATCH 05/37] Write options for athena.ctas --- .../athena/operator/AthenaCtasOperator.scala | 111 +++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index f96a3b7..8acbe27 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -1,10 +1,119 @@ package pro.civitaspo.digdag.plugin.athena.operator -import io.digdag.client.config.Config + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} +import com.amazonaws.services.s3.AmazonS3URI +import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest +import com.google.common.base.Optional +import io.digdag.client.config.{Config, ConfigException} import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} +import io.digdag.util.DurationParam + +import scala.collection.JavaConverters._ +import scala.util.Try class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) { + object AmazonS3URI { + def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) + } + + sealed abstract class SaveMode + + object SaveMode { + final case object Default extends SaveMode + final case object EmptyTable extends SaveMode + final case object DataOnly extends SaveMode + + def apply(mode: String): SaveMode = { + mode match { + case "default" => Default + case "empty_table" => EmptyTable + case "data_only" => DataOnly + case unknown => throw new ConfigException(s"[$operatorName] save_mode '$unknown' is unsupported.") + } + } + } + + sealed abstract class QueryMode + + object QueryMode { + final case object None extends QueryMode + final case object ErrorIfExists extends QueryMode + final case object Ignore extends QueryMode + final case object Overwrite extends QueryMode + + def apply(mode: String): QueryMode = { + mode match { + case "none" => None + case "error_if_exists" => ErrorIfExists + case "ignore" => Ignore + case "overwrite" => Overwrite + case unknown => throw new ConfigException(s"[$operatorName] query_mode '$unknown' is unsupported.") + } + } + } + + case class LastCtasQuery( + id: String, + database: Option[String] = None, + query: String, + outputCsvUri: AmazonS3URI, + outputCsvMetadataUri: AmazonS3URI, + scanBytes: Option[Long] = None, + execMillis: Option[Long] = None, + state: QueryExecutionState, + stateChangeReason: Option[String] = None, + submittedAt: Option[Long] = None, + completedAt: Option[Long] = None + ) + + object LastCtasQuery { + + def apply(qe: QueryExecution): LastCtasQuery = { + new LastCtasQuery( + id = qe.getQueryExecutionId, + database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None), + query = qe.getQuery, + outputCsvUri = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), + outputCsvMetadataUri = AmazonS3URI(s"${qe.getResultConfiguration.getOutputLocation}.metadata"), + scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None), + execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None), + state = QueryExecutionState.fromValue(qe.getStatus.getState), + stateChangeReason = Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None), + submittedAt = Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None), + completedAt = Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None) + ) + } + } + + protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) + protected val database: Optional[String] = params.getOptional("database", classOf[String]) + protected val table: String = params.get("table", classOf[String], s"digdag-athena-ctas-$sessionUuid") + protected val output: Optional[String] = params.getOptional("output", classOf[String]) // gen default later + protected val format: String = params.get("format", classOf[String], "parquet") + protected val compression: String = params.get("compression", classOf[String], "snappy") + protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) + protected val partitionedBy: Seq[String] = params.getListOrEmpty("partitioned_by", classOf[String]).asScala + protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala + protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int]) + protected val additionalProperties: Map[String, String] = params.getMapOrEmpty("additional_properties", classOf[String], classOf[String]).asScala.toMap + protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "default")) + protected val queryMode: QueryMode = QueryMode(params.get("query_mode", classOf[String], "overwrite")) + protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena-ctas") + protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) + + protected lazy val selectQuery: String = { + val t: Try[String] = Try { + val f: File = workspace.getFile(selectQueryOrFile) + workspace.templateFile(templateEngine, f.getPath, UTF_8, params) + } + t.getOrElse(selectQueryOrFile) + } + override def runTask(): TaskResult = { null } From 6ee910e94ac0efd64ed9440df851b9ab7b7f22ac Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 04:56:19 +0900 Subject: [PATCH 06/37] Gather endpoint configuration for aws client --- .../operator/AbstractAthenaOperator.scala | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala index 11eb5b7..ad8946a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala @@ -13,6 +13,7 @@ import com.amazonaws.auth.{ SystemPropertiesCredentialsProvider } import com.amazonaws.auth.profile.{ProfileCredentialsProvider, ProfilesConfigFile} +import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} import com.amazonaws.services.athena.{AmazonAthena, AmazonAthenaClientBuilder} @@ -78,33 +79,20 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon } private def buildAthena: AmazonAthena = { - val builder = AmazonAthenaClientBuilder - .standard() + configureBuilderEndpointConfiguration(AmazonAthenaClientBuilder.standard()) .withClientConfiguration(clientConfiguration) .withCredentials(credentialsProvider) - - if (region.isPresent && endpoint.isPresent) { - val ec = new EndpointConfiguration(endpoint.get(), region.get()) - builder.setEndpointConfiguration(ec) - } - else if (region.isPresent && !endpoint.isPresent) { - builder.setRegion(region.get()) - } - else if (!region.isPresent && endpoint.isPresent) { - val r = Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName) - val ec = new EndpointConfiguration(endpoint.get(), r) - builder.setEndpointConfiguration(ec) - } - - builder.build() + .build() } private def buildS3: AmazonS3 = { - val builder = AmazonS3ClientBuilder - .standard() + configureBuilderEndpointConfiguration(AmazonS3ClientBuilder.standard()) .withClientConfiguration(clientConfiguration) .withCredentials(credentialsProvider) + .build() + } + private def configureBuilderEndpointConfiguration[S, T](builder: AwsClientBuilder[S, T]): AwsClientBuilder[S, T] = { if (region.isPresent && endpoint.isPresent) { val ec = new EndpointConfiguration(endpoint.get(), region.get()) builder.setEndpointConfiguration(ec) @@ -117,8 +105,7 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon val ec = new EndpointConfiguration(endpoint.get(), r) builder.setEndpointConfiguration(ec) } - - builder.build() + builder } private def credentialsProvider: AWSCredentialsProvider = { From 45590a93f0e52a6f1f5d5f34f5fd20d70444d33b Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 05:00:42 +0900 Subject: [PATCH 07/37] Add sts client --- .../athena/operator/AbstractAthenaOperator.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala index ad8946a..d21d940 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala @@ -18,7 +18,7 @@ import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} import com.amazonaws.services.athena.{AmazonAthena, AmazonAthenaClientBuilder} import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder +import com.amazonaws.services.securitytoken.{AWSSecurityTokenService, AWSSecurityTokenServiceClientBuilder} import com.amazonaws.services.securitytoken.model.AssumeRoleRequest import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException, ConfigFactory} @@ -78,6 +78,12 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon finally s3.shutdown() } + protected def withSts[T](f: AWSSecurityTokenService => T): T = { + val sts = buildSts + try f(sts) + finally sts.shutdown() + } + private def buildAthena: AmazonAthena = { configureBuilderEndpointConfiguration(AmazonAthenaClientBuilder.standard()) .withClientConfiguration(clientConfiguration) @@ -92,6 +98,13 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon .build() } + private def buildSts: AWSSecurityTokenService = { + configureBuilderEndpointConfiguration(AWSSecurityTokenServiceClientBuilder.standard()) + .withClientConfiguration(clientConfiguration) + .withCredentials(credentialsProvider) + .build() + } + private def configureBuilderEndpointConfiguration[S, T](builder: AwsClientBuilder[S, T]): AwsClientBuilder[S, T] = { if (region.isPresent && endpoint.isPresent) { val ec = new EndpointConfiguration(endpoint.get(), region.get()) From f5f3b56ef3b16ef2b99b31d10a08a1c4f646a8e3 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sun, 14 Oct 2018 05:15:21 +0900 Subject: [PATCH 08/37] Add queryResultsDefaultBucket var --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 8acbe27..3e8a2d5 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -3,6 +3,7 @@ package pro.civitaspo.digdag.plugin.athena.operator import java.io.File import java.nio.charset.StandardCharsets.UTF_8 +import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} import com.amazonaws.services.s3.AmazonS3URI import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest @@ -114,6 +115,12 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC t.getOrElse(selectQueryOrFile) } + protected lazy val queryResultsDefaultBucket: String = { + val accountId: String = withSts(_.getCallerIdentity(new GetCallerIdentityRequest())).getAccount + val r = region.or(Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName)) + s"s3://aws-athena-query-results-$accountId-$r" + } + override def runTask(): TaskResult = { null } From 5dbb8887d6a1ee084ed5bbc80752b0818bec3a6c Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 16 Oct 2018 22:10:12 +0900 Subject: [PATCH 09/37] Add AmazonS3URI under aws package --- .../civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala new file mode 100644 index 0000000..17a853e --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala @@ -0,0 +1,7 @@ +package pro.civitaspo.digdag.plugin.athena.aws + +import com.amazonaws.services.s3.AmazonS3URI + +object AmazonS3URI { + def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) +} From 18237a8040a21fdccc3db69bd9d1bef6d0577f25 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 16 Oct 2018 22:11:55 +0900 Subject: [PATCH 10/37] Move LastQuery to query package --- .../plugin/athena/query/LastQuery.scala | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala new file mode 100644 index 0000000..ee0ddff --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala @@ -0,0 +1,37 @@ +package pro.civitaspo.digdag.plugin.athena.query +import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} +import com.amazonaws.services.s3.AmazonS3URI +import pro.civitaspo.digdag.plugin.athena.aws.AmazonS3URI + +import scala.util.Try + +case class LastQuery( + id: String, + database: Option[String] = None, + query: String, + output: AmazonS3URI, + scanBytes: Option[Long] = None, + execMillis: Option[Long] = None, + state: QueryExecutionState, + stateChangeReason: Option[String] = None, + submittedAt: Option[Long] = None, + completedAt: Option[Long] = None +) + +object LastQuery { + + def apply(qe: QueryExecution): LastQuery = { + new LastQuery( + id = qe.getQueryExecutionId, + database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None), + query = qe.getQuery, + output = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), + scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None), + execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None), + state = QueryExecutionState.fromValue(qe.getStatus.getState), + stateChangeReason = Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None), + submittedAt = Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None), + completedAt = Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None) + ) + } +} From 6b6ac7eb8c243fe92a672600f64db318f36d8a50 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 08:36:54 +0900 Subject: [PATCH 11/37] Remove Ctas OutputParameter --- README.md | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/README.md b/README.md index dd3d520..1f1e12e 100644 --- a/README.md +++ b/README.md @@ -152,16 +152,7 @@ Define the below options on properties (which is indicated by `-c`, `--config`). ### Output Parameters -- **athena.last_ctas_query.id**: The unique identifier for each query execution. (string) -- **athena.last_ctas_query.database**: The name of the database. (string) -- **athena.last_ctas_query.query**: The SQL query statements which the query execution ran. (string) -- **athena.last_ctas_query.output**: The location in Amazon S3 where your query results are stored. (string) -- **athena.last_ctas_query.scan_bytes**: The number of bytes in the data that was queried. (long) -- **athena.last_ctas_query.exec_millis**: The number of milliseconds that the query took to execute. (long) -- **athena.last_ctas_query.state**: The state of query execution. SUBMITTED indicates that the query is queued for execution. RUNNING indicates that the query is scanning data and returning results. SUCCEEDED indicates that the query completed without error. FAILED indicates that the query experienced an error and did not complete processing. CANCELLED indicates that user input interrupted query execution. (string) -- **athena.last_ctas_query.state_change_reason**: Further detail about the status of the query. (string) -- **athena.last_ctas_query.submitted_at**: The unix timestamp that the query was submitted. (integer) -- **athena.last_ctas_query.completed_at**: The unix timestamp that the query completed. (integer) +Nothing # Development From 2540e9e9d77f6c8c675317205e1a83afd5c1372f Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 08:41:15 +0900 Subject: [PATCH 12/37] Stop to refactor --- .../plugin/athena/aws/AmazonS3URI.scala | 7 ---- .../plugin/athena/query/LastQuery.scala | 37 ------------------- 2 files changed, 44 deletions(-) delete mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala delete mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala deleted file mode 100644 index 17a853e..0000000 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/AmazonS3URI.scala +++ /dev/null @@ -1,7 +0,0 @@ -package pro.civitaspo.digdag.plugin.athena.aws - -import com.amazonaws.services.s3.AmazonS3URI - -object AmazonS3URI { - def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) -} diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala deleted file mode 100644 index ee0ddff..0000000 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/query/LastQuery.scala +++ /dev/null @@ -1,37 +0,0 @@ -package pro.civitaspo.digdag.plugin.athena.query -import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} -import com.amazonaws.services.s3.AmazonS3URI -import pro.civitaspo.digdag.plugin.athena.aws.AmazonS3URI - -import scala.util.Try - -case class LastQuery( - id: String, - database: Option[String] = None, - query: String, - output: AmazonS3URI, - scanBytes: Option[Long] = None, - execMillis: Option[Long] = None, - state: QueryExecutionState, - stateChangeReason: Option[String] = None, - submittedAt: Option[Long] = None, - completedAt: Option[Long] = None -) - -object LastQuery { - - def apply(qe: QueryExecution): LastQuery = { - new LastQuery( - id = qe.getQueryExecutionId, - database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None), - query = qe.getQuery, - output = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), - scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None), - execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None), - state = QueryExecutionState.fromValue(qe.getStatus.getState), - stateChangeReason = Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None), - submittedAt = Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None), - completedAt = Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None) - ) - } -} From f87bb2b05d862891315a112358545a31e21a27fe Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 08:43:11 +0900 Subject: [PATCH 13/37] Change lastQuery parameter: output --- .../plugin/athena/operator/AthenaQueryOperator.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 09e3d9c..92f2b2d 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -34,8 +34,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system id: String, database: Option[String] = None, query: String, - outputCsvUri: AmazonS3URI, - outputCsvMetadataUri: AmazonS3URI, + output: AmazonS3URI, scanBytes: Option[Long] = None, execMillis: Option[Long] = None, state: QueryExecutionState, @@ -51,8 +50,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system id = qe.getQueryExecutionId, database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None), query = qe.getQuery, - outputCsvUri = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), - outputCsvMetadataUri = AmazonS3URI(s"${qe.getResultConfiguration.getOutputLocation}.metadata"), + output = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None), execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None), state = QueryExecutionState.fromValue(qe.getStatus.getState), @@ -102,8 +100,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system val execId: String = startQueryExecution val lastQuery: LastQuery = pollingQueryExecution(execId) - // TODO: Change outputCsvUri, - logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") + logger.info(s"[$operatorName] Created ${lastQuery.output} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") val p: Config = buildLastQueryParam(lastQuery) val builder = TaskResult.defaultBuilder(request) @@ -175,7 +172,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system lastQueryParam.set("id", lastQuery.id) lastQueryParam.set("database", lastQuery.database.getOrElse(Optional.absent())) lastQueryParam.set("query", lastQuery.query) - lastQueryParam.set("output", lastQuery.outputCsvUri.toString) // TODO: It's not always csv, so should change. + lastQueryParam.set("output", lastQuery.output.toString) // TODO: It's not always csv, so should change. lastQueryParam.set("scan_bytes", lastQuery.scanBytes.getOrElse(Optional.absent())) lastQueryParam.set("exec_millis", lastQuery.execMillis.getOrElse(Optional.absent())) lastQueryParam.set("state", lastQuery.state) From e518eda1399be7a54bad18ff094d875c189dcaa2 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 08:44:27 +0900 Subject: [PATCH 14/37] Use operator name instead of ClassName as logger prefix --- .../digdag/plugin/athena/operator/AbstractAthenaOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala index d21d940..e5d457d 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala @@ -31,7 +31,7 @@ import scala.util.Try abstract class AbstractAthenaOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends BaseOperator(context) { - protected val logger: Logger = LoggerFactory.getLogger(this.getClass) // TODO: use operatorName + protected val logger: Logger = LoggerFactory.getLogger(operatorName) protected val cf: ConfigFactory = request.getConfig.getFactory protected val params: Config = { val elems: Seq[String] = operatorName.split("\\.") From 02e37f26050915fa8ad0f50002a557ea4b655815 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 08:58:05 +0900 Subject: [PATCH 15/37] Fix compile error --- .../digdag/plugin/athena/operator/AbstractAthenaOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala index e5d457d..c38c259 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AbstractAthenaOperator.scala @@ -105,7 +105,7 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon .build() } - private def configureBuilderEndpointConfiguration[S, T](builder: AwsClientBuilder[S, T]): AwsClientBuilder[S, T] = { + private def configureBuilderEndpointConfiguration[S <: AwsClientBuilder[S, T], T](builder: AwsClientBuilder[S, T]): AwsClientBuilder[S, T] = { if (region.isPresent && endpoint.isPresent) { val ec = new EndpointConfiguration(endpoint.get(), region.get()) builder.setEndpointConfiguration(ec) From 40b0bfac44faf531c79c459fd3af0932b311d903 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 23:20:11 +0900 Subject: [PATCH 16/37] Make output option to optional in athena.query operator --- README.md | 2 +- .../athena/operator/AthenaCtasOperator.scala | 8 -------- .../athena/operator/AthenaQueryOperator.scala | 19 ++++++++++++++----- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 1f1e12e..bbc37e6 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **athena.query>**: The SQL query statements or file to be executed. You can use digdag's template engine like `${...}` in the SQL query. (string, required) - **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena"`) - **database**: The name of the database. (string, optional) -- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required) +- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-"`) - **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`) - **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 3e8a2d5..ba5ffbd 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -3,10 +3,8 @@ package pro.civitaspo.digdag.plugin.athena.operator import java.io.File import java.nio.charset.StandardCharsets.UTF_8 -import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} import com.amazonaws.services.s3.AmazonS3URI -import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} @@ -115,12 +113,6 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC t.getOrElse(selectQueryOrFile) } - protected lazy val queryResultsDefaultBucket: String = { - val accountId: String = withSts(_.getCallerIdentity(new GetCallerIdentityRequest())).getAccount - val r = region.or(Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName)) - s"s3://aws-athena-query-results-$accountId-$r" - } - override def runTask(): TaskResult = { null } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 92f2b2d..ab750ad 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -3,6 +3,7 @@ package pro.civitaspo.digdag.plugin.athena.operator import java.nio.charset.StandardCharsets.UTF_8 import java.time.Duration +import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions} import com.amazonaws.services.athena.model.{ GetQueryExecutionRequest, QueryExecution, @@ -13,6 +14,7 @@ import com.amazonaws.services.athena.model.{ } import com.amazonaws.services.athena.model.QueryExecutionState.{CANCELLED, FAILED, QUEUED, RUNNING, SUCCEEDED} import com.amazonaws.services.s3.AmazonS3URI +import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest import com.google.common.base.Optional import com.google.common.collect.ImmutableList import io.digdag.client.config.{Config, ConfigKey} @@ -64,11 +66,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val queryOrFile: String = params.get("_command", classOf[String]) protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena") protected val database: Optional[String] = params.getOptional("database", classOf[String]) - protected val output: AmazonS3URI = { - // TODO: to optional - val o = params.get("output", classOf[String]) - AmazonS3URI(if (o.endsWith("/")) o else s"$o/") - } + protected val outputOptional: Optional[String] = params.getOptional("output", classOf[String]) protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) protected val preview: Boolean = params.get("preview", classOf[Boolean], true) @@ -85,6 +83,17 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system s"$tokenPrefix-$sessionUuid-$queryHash" } + protected lazy val output: AmazonS3URI = { + AmazonS3URI { + if (outputOptional.isPresent) outputOptional.get() + else { + val accountId: String = withSts(_.getCallerIdentity(new GetCallerIdentityRequest())).getAccount + val r = region.or(Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName)) + s"s3://aws-athena-query-results-$accountId-$r" + } + } + } + @deprecated private def showMessageIfUnsupportedOptionExists() = { if (params.getOptional("keep_metadata", classOf[Boolean]).isPresent) { logger.warn("`keep_metadata` option has removed, and the behaviour is the same as `keep_metadata: true`.") From 899f9b63482f3b9f6d3327dc6762827f8947159d Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 23:24:14 +0900 Subject: [PATCH 17/37] Add deprecated signature and message for output option of athena.query operator --- .../plugin/athena/operator/AthenaQueryOperator.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index ab750ad..7a170e2 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -66,7 +66,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system protected val queryOrFile: String = params.get("_command", classOf[String]) protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena") protected val database: Optional[String] = params.getOptional("database", classOf[String]) - protected val outputOptional: Optional[String] = params.getOptional("output", classOf[String]) + @deprecated protected val outputOptional: Optional[String] = params.getOptional("output", classOf[String]) protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) protected val preview: Boolean = params.get("preview", classOf[Boolean], true) @@ -94,13 +94,16 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system } } - @deprecated private def showMessageIfUnsupportedOptionExists() = { + @deprecated private def showMessageIfUnsupportedOptionExists(): Unit = { if (params.getOptional("keep_metadata", classOf[Boolean]).isPresent) { logger.warn("`keep_metadata` option has removed, and the behaviour is the same as `keep_metadata: true`.") } if (params.getOptional("save_mode", classOf[String]).isPresent) { logger.warn("`save_mode` option has removed, and the behaviour is the same as `save_mode: append` .") } + if (outputOptional.isPresent) { + logger.warn("`output` option will be removed, and the current default value will be always used.") + } } override def runTask(): TaskResult = { From 6f5d91a8aab82a8b79685237ff1e7e5ba158c81a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Wed, 17 Oct 2018 23:30:34 +0900 Subject: [PATCH 18/37] Remove output parameter implementations --- .../athena/operator/AthenaCtasOperator.scala | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index ba5ffbd..6f7bddc 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -3,7 +3,6 @@ package pro.civitaspo.digdag.plugin.athena.operator import java.io.File import java.nio.charset.StandardCharsets.UTF_8 -import com.amazonaws.services.athena.model.{QueryExecution, QueryExecutionState} import com.amazonaws.services.s3.AmazonS3URI import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} @@ -56,39 +55,6 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC } } - case class LastCtasQuery( - id: String, - database: Option[String] = None, - query: String, - outputCsvUri: AmazonS3URI, - outputCsvMetadataUri: AmazonS3URI, - scanBytes: Option[Long] = None, - execMillis: Option[Long] = None, - state: QueryExecutionState, - stateChangeReason: Option[String] = None, - submittedAt: Option[Long] = None, - completedAt: Option[Long] = None - ) - - object LastCtasQuery { - - def apply(qe: QueryExecution): LastCtasQuery = { - new LastCtasQuery( - id = qe.getQueryExecutionId, - database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None), - query = qe.getQuery, - outputCsvUri = AmazonS3URI(qe.getResultConfiguration.getOutputLocation), - outputCsvMetadataUri = AmazonS3URI(s"${qe.getResultConfiguration.getOutputLocation}.metadata"), - scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None), - execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None), - state = QueryExecutionState.fromValue(qe.getStatus.getState), - stateChangeReason = Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None), - submittedAt = Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None), - completedAt = Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None) - ) - } - } - protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], s"digdag-athena-ctas-$sessionUuid") From 6c6bb8187bd203cd0f18ddb7fca898d5a1165898 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Thu, 18 Oct 2018 09:10:39 +0900 Subject: [PATCH 19/37] Add CTAS query generator --- .../athena/operator/AthenaCtasOperator.scala | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 6f7bddc..1c606e7 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -58,7 +58,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], s"digdag-athena-ctas-$sessionUuid") - protected val output: Optional[String] = params.getOptional("output", classOf[String]) // gen default later + protected val output: Optional[String] = params.getOptional("output", classOf[String]) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) @@ -82,4 +82,43 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC override def runTask(): TaskResult = { null } + + protected def generateCtasQuery(): String = { + val propsBuilder = Map.newBuilder[String, String] + if (output.isPresent) propsBuilder += ("external_location" -> s"'${output.get}'") + propsBuilder += ("format" -> s"'$format'") + format match { + case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'") + case "orc" => propsBuilder += ("orc_compression" -> s"'$compression'") + case _ => logger.info(s"compression is not supported for format: $format.") + } + if (fieldDelimiter.isPresent) propsBuilder += ("field_delimiter" -> s"'${fieldDelimiter.get}'") + if (partitionedBy.nonEmpty) propsBuilder += ("partitioned_by" -> s"ARRAY[${partitionedBy.map(s => s"'$s'").mkString(",")}]") + if (bucketedBy.nonEmpty) { + propsBuilder += ("bucketed_by" -> s"ARRAY[${bucketedBy.map(s => s"'$s'").mkString(",")}]") + if (!bucketCount.isPresent) throw new ConfigException(s"`bucket_count` must be set if `bucketed_by` is set.") + propsBuilder += ("bucket_count" -> s"${bucketCount.get}") + } + if (additionalProperties.nonEmpty) propsBuilder ++= additionalProperties + + val propStr: String = propsBuilder.result().map { case (k, v) => s"$k = $v" }.mkString(",\n") + val createTableClause: String = queryMode match { + case QueryMode.Ignore => "CREATE TABLE IF NOT EXISTS" + case _ => "CREATE TABLE" + } + val dataHint: String = saveMode match { + case SaveMode.EmptyTable => "WITH NO DATA" + case _ => "WITH DATA" + } + + s""" -- GENERATED BY digdag athena.ctas> operator + | $createTableClause "$table" + | WITH ( + | $propStr + | ) + | AS + | $selectQuery + | $dataHint + """.stripMargin + } } From 258e6b25e043cd964961a7a2e0d6ff6d8747bd09 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Fri, 19 Oct 2018 08:37:52 +0900 Subject: [PATCH 20/37] Rename options: save_mode -> table_mode, query_mode -> save_mode --- README.md | 4 +- .../athena/operator/AthenaCtasOperator.scala | 44 +++++++++---------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index bbc37e6..482235b 100644 --- a/README.md +++ b/README.md @@ -138,11 +138,11 @@ Define the below options on properties (which is indicated by `-c`, `--config`). - **bucketed_by**: An array list of buckets to bucket data. If omitted, Athena does not bucket your data in this query. (array of string, optional) - **bucket_count**: The number of buckets for bucketing your data. If omitted, Athena does not bucket your data. (integer, optional) - **additional_properties**: Additional properties for CTAS. These are used for CTAS WITH clause without escaping. (string to string map, optional) -- **save_mode**: Specify the expected behavior of saving CTAS results. Available values are `"default"`, `"empty_table"`, `"data_only"`. See the below explanation of the behaviour. (string, default: `"default"`) +- **table_mode**: Specify the expected behavior of CTAS results. Available values are `"default"`, `"empty"`, `"data_only"`. See the below explanation of the behaviour. (string, default: `"default"`) - `"default"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena. - `"empty_table"`: Create a new empty table with the same schema as the select query results. - `"data_only"`: -- **query_mode**: Specify the expected behavior of CTAS query. Available values are `"none"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) +- **save_mode**: Specify the expected behavior of CTAS. Available values are `"none"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`) - `"none"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena. - `"error_if_exists"`: Raise error if the distination table or location exists. - `"ignore"`: Skip CTAS query if the distination table or location exists. diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 1c606e7..09d60bb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -19,38 +19,38 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC def apply(path: String): AmazonS3URI = new AmazonS3URI(path, false) } - sealed abstract class SaveMode + sealed abstract class TableMode - object SaveMode { - final case object Default extends SaveMode - final case object EmptyTable extends SaveMode - final case object DataOnly extends SaveMode + object TableMode { + final case object Default extends TableMode + final case object Empty extends TableMode + final case object DataOnly extends TableMode - def apply(mode: String): SaveMode = { + def apply(mode: String): TableMode = { mode match { case "default" => Default - case "empty_table" => EmptyTable + case "empty" => Empty case "data_only" => DataOnly - case unknown => throw new ConfigException(s"[$operatorName] save_mode '$unknown' is unsupported.") + case unknown => throw new ConfigException(s"[$operatorName] table_mode '$unknown' is unsupported.") } } } - sealed abstract class QueryMode + sealed abstract class SaveMode - object QueryMode { - final case object None extends QueryMode - final case object ErrorIfExists extends QueryMode - final case object Ignore extends QueryMode - final case object Overwrite extends QueryMode + object SaveMode { + final case object None extends SaveMode + final case object ErrorIfExists extends SaveMode + final case object Ignore extends SaveMode + final case object Overwrite extends SaveMode - def apply(mode: String): QueryMode = { + def apply(mode: String): SaveMode = { mode match { case "none" => None case "error_if_exists" => ErrorIfExists case "ignore" => Ignore case "overwrite" => Overwrite - case unknown => throw new ConfigException(s"[$operatorName] query_mode '$unknown' is unsupported.") + case unknown => throw new ConfigException(s"[$operatorName] save_mode '$unknown' is unsupported.") } } } @@ -66,8 +66,8 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected val bucketedBy: Seq[String] = params.getListOrEmpty("bucketed_by", classOf[String]).asScala protected val bucketCount: Optional[Int] = params.getOptional("bucket_count", classOf[Int]) protected val additionalProperties: Map[String, String] = params.getMapOrEmpty("additional_properties", classOf[String], classOf[String]).asScala.toMap - protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "default")) - protected val queryMode: QueryMode = QueryMode(params.get("query_mode", classOf[String], "overwrite")) + protected val tableMode: TableMode = TableMode(params.get("table_mode", classOf[String], "default")) + protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite")) protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena-ctas") protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m")) @@ -102,12 +102,12 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC if (additionalProperties.nonEmpty) propsBuilder ++= additionalProperties val propStr: String = propsBuilder.result().map { case (k, v) => s"$k = $v" }.mkString(",\n") - val createTableClause: String = queryMode match { - case QueryMode.Ignore => "CREATE TABLE IF NOT EXISTS" + val createTableClause: String = saveMode match { + case SaveMode.Ignore => "CREATE TABLE IF NOT EXISTS" case _ => "CREATE TABLE" } - val dataHint: String = saveMode match { - case SaveMode.EmptyTable => "WITH NO DATA" + val dataHint: String = tableMode match { + case TableMode.Empty => "WITH NO DATA" case _ => "WITH DATA" } From 161c10043cf9d9b38631291830c4d62944940778 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Fri, 19 Oct 2018 08:39:11 +0900 Subject: [PATCH 21/37] Change var name: output -> outputOptional --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 09d60bb..14cacb1 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -58,7 +58,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], s"digdag-athena-ctas-$sessionUuid") - protected val output: Optional[String] = params.getOptional("output", classOf[String]) + protected val outputOptional: Optional[String] = params.getOptional("output", classOf[String]) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) @@ -85,7 +85,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected def generateCtasQuery(): String = { val propsBuilder = Map.newBuilder[String, String] - if (output.isPresent) propsBuilder += ("external_location" -> s"'${output.get}'") + if (outputOptional.isPresent) propsBuilder += ("external_location" -> s"'${ outputOptional.get}'") propsBuilder += ("format" -> s"'$format'") format match { case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'") From 73589bf8c8bf6e40f208c8bdd10e3c88584b1c96 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Fri, 19 Oct 2018 08:40:31 +0900 Subject: [PATCH 22/37] Revert "Change var name: output -> outputOptional" This reverts commit b911e2a --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 14cacb1..09d60bb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -58,7 +58,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected val selectQueryOrFile: String = params.get("select_query", classOf[String]) protected val database: Optional[String] = params.getOptional("database", classOf[String]) protected val table: String = params.get("table", classOf[String], s"digdag-athena-ctas-$sessionUuid") - protected val outputOptional: Optional[String] = params.getOptional("output", classOf[String]) + protected val output: Optional[String] = params.getOptional("output", classOf[String]) protected val format: String = params.get("format", classOf[String], "parquet") protected val compression: String = params.get("compression", classOf[String], "snappy") protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String]) @@ -85,7 +85,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected def generateCtasQuery(): String = { val propsBuilder = Map.newBuilder[String, String] - if (outputOptional.isPresent) propsBuilder += ("external_location" -> s"'${ outputOptional.get}'") + if (output.isPresent) propsBuilder += ("external_location" -> s"'${output.get}'") propsBuilder += ("format" -> s"'$format'") format match { case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'") From c32a61b749c40a2334bbcb6cd80bb003e58008eb Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Fri, 19 Oct 2018 09:11:10 +0900 Subject: [PATCH 23/37] Add pre-processing codes --- .../athena/operator/AthenaCtasOperator.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 09d60bb..6ca3f68 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -4,6 +4,7 @@ import java.io.File import java.nio.charset.StandardCharsets.UTF_8 import com.amazonaws.services.s3.AmazonS3URI +import com.amazonaws.services.s3.model.{DeleteObjectsRequest, DeleteObjectsResult} import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} @@ -80,9 +81,31 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC } override def runTask(): TaskResult = { + saveMode match { + case SaveMode.ErrorIfExists if output.isPresent && hasObjects(output.get) => + throw new IllegalStateException(s"${output.get} already exists") + case SaveMode.Ignore if output.isPresent && hasObjects(output.get) => + logger.info(s"${output.get} already exists, so ignore this session.") + return TaskResult.empty(request) + case SaveMode.Overwrite if output.isPresent => + logger.info(s"Overwrite ${output.get}") + rmObjects(output.get) + case _ => // do nothing + } null } + protected def hasObjects(location: String): Boolean = { + val uri: AmazonS3URI = AmazonS3URI(location) + !withS3(_.listObjectsV2(uri.getBucket, uri.getKey)).getObjectSummaries.isEmpty + } + + protected def rmObjects(location: String): Unit = { + val uri: AmazonS3URI = AmazonS3URI(location) + val r: DeleteObjectsResult = withS3(_.deleteObjects(new DeleteObjectsRequest(uri.getBucket).withKeys(uri.getKey))) + r.getDeletedObjects.asScala.foreach(o => logger.info(s"Deleted: s3://${uri.getBucket}/${o.getKey}")) + } + protected def generateCtasQuery(): String = { val propsBuilder = Map.newBuilder[String, String] if (output.isPresent) propsBuilder += ("external_location" -> s"'${output.get}'") From afa7e82e1b2775be8232af7529a1544f617c67a7 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 01:06:46 +0900 Subject: [PATCH 24/37] Add DROP TABLE subtask config --- .../athena/operator/AthenaCtasOperator.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 6ca3f68..380b9fb 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -144,4 +144,29 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC | $dataHint """.stripMargin } + + protected def buildDropTableSubTaskConfig(): Config = { + val dropTableQuery: String = + s""" -- GENERATED BY digdag athena.ctas> operator + | DROP TABLE IF EXISTS \"$table\" + | """.stripMargin + + val subTask: Config = cf.create() + + subTask.set("_type", "athena.query") + subTask.set("_command", dropTableQuery) + subTask.set("token_prefix", tokenPrefix) + if (database.isPresent) subTask.set("database", database) + subTask.set("timeout", timeout.toString) + subTask.set("preview", false) + + subTask.set("auth_method", authMethod) + subTask.set("profile_name", profileName) + if (profileFile.isPresent) subTask.set("profile_file", profileFile.get()) + subTask.set("use_http_proxy", useHttpProxy) + if (region.isPresent) subTask.set("region", region.get()) + if (endpoint.isPresent) subTask.set("endpoint", endpoint.get()) + + subTask + } } From c30bae1432131b4beacc269f3dbd77f8417d1a4a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 01:10:27 +0900 Subject: [PATCH 25/37] Add a sign to query --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 380b9fb..60170ea 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -142,7 +142,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC | AS | $selectQuery | $dataHint - """.stripMargin + | """.stripMargin } protected def buildDropTableSubTaskConfig(): Config = { From ede49aa2b6903cb510896ac8591418f632222fb1 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 01:33:01 +0900 Subject: [PATCH 26/37] Simplify query subtask --- .../plugin/athena/operator/AthenaCtasOperator.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 60170ea..98fbdf5 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -145,16 +145,17 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC | """.stripMargin } - protected def buildDropTableSubTaskConfig(): Config = { - val dropTableQuery: String = - s""" -- GENERATED BY digdag athena.ctas> operator - | DROP TABLE IF EXISTS \"$table\" - | """.stripMargin + protected def generateDropTableQuery(): String = { + s""" -- GENERATED BY digdag athena.ctas> operator + | DROP TABLE IF EXISTS \"$table\" + | """.stripMargin + } + protected def buildQuerySubTaskConfig(query: String): Config = { val subTask: Config = cf.create() subTask.set("_type", "athena.query") - subTask.set("_command", dropTableQuery) + subTask.set("_command", query) subTask.set("token_prefix", tokenPrefix) if (database.isPresent) subTask.set("database", database) subTask.set("timeout", timeout.toString) From e516efaa976f74d3327102536099786dbc95b6b6 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 01:43:37 +0900 Subject: [PATCH 27/37] Write CTAS workflow --- .../plugin/athena/operator/AthenaCtasOperator.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 98fbdf5..a74bf36 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -7,7 +7,7 @@ import com.amazonaws.services.s3.AmazonS3URI import com.amazonaws.services.s3.model.{DeleteObjectsRequest, DeleteObjectsResult} import com.google.common.base.Optional import io.digdag.client.config.{Config, ConfigException} -import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine} +import io.digdag.spi.{ImmutableTaskResult, OperatorContext, TaskResult, TemplateEngine} import io.digdag.util.DurationParam import scala.collection.JavaConverters._ @@ -92,7 +92,15 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC rmObjects(output.get) case _ => // do nothing } - null + + val subTask: Config = cf.create() + if (saveMode.equals(SaveMode.Overwrite)) subTask.setNested("+drop-before-ctas", buildQuerySubTaskConfig(generateDropTableQuery())) + subTask.setNested("+ctas", buildQuerySubTaskConfig(generateCtasQuery())) + if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildQuerySubTaskConfig(generateDropTableQuery())) + + val builder: ImmutableTaskResult.Builder = TaskResult.builder() + builder.subtaskConfig(subTask) + builder.build() } protected def hasObjects(location: String): Boolean = { From f616b86e17188ec8890ec62a08c491929b8ee9a0 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:16:46 +0900 Subject: [PATCH 28/37] Cannot escape in DROP TABLE Clause --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index a74bf36..69d65a6 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -155,7 +155,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected def generateDropTableQuery(): String = { s""" -- GENERATED BY digdag athena.ctas> operator - | DROP TABLE IF EXISTS \"$table\" + | DROP TABLE IF EXISTS $table | """.stripMargin } From c9acc157084bb2a54c12334f101e4ea93df05e84 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:19:09 +0900 Subject: [PATCH 29/37] Fix Cannot build TaskResult, some of required attributes are not set [exportParams, storeParams, report] --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 69d65a6..2d06d2d 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -98,7 +98,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC subTask.setNested("+ctas", buildQuerySubTaskConfig(generateCtasQuery())) if (tableMode.equals(TableMode.DataOnly)) subTask.setNested("+drop-after-ctas", buildQuerySubTaskConfig(generateDropTableQuery())) - val builder: ImmutableTaskResult.Builder = TaskResult.builder() + val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf) builder.subtaskConfig(subTask) builder.build() } From 524d2db0fda69c201695ca9b6e97c2fd9702804a Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:20:12 +0900 Subject: [PATCH 30/37] Collect targets to be removed --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 2d06d2d..5095326 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -110,7 +110,8 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC protected def rmObjects(location: String): Unit = { val uri: AmazonS3URI = AmazonS3URI(location) - val r: DeleteObjectsResult = withS3(_.deleteObjects(new DeleteObjectsRequest(uri.getBucket).withKeys(uri.getKey))) + val keys: Seq[String] = withS3(_.listObjectsV2(uri.getBucket, uri.getKey)).getObjectSummaries.asScala.map(_.getKey) + val r: DeleteObjectsResult = withS3(_.deleteObjects(new DeleteObjectsRequest(uri.getBucket).withKeys(keys: _*))) r.getDeletedObjects.asScala.foreach(o => logger.info(s"Deleted: s3://${uri.getBucket}/${o.getKey}")) } From 11e24e45170337f97c6c4e59b51ea1b6274ecdf2 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:20:35 +0900 Subject: [PATCH 31/37] log required query --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 5095326..4eeeb8a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -161,6 +161,8 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC } protected def buildQuerySubTaskConfig(query: String): Config = { + logger.info(s"Will execute query in athena.query>: $query") + val subTask: Config = cf.create() subTask.set("_type", "athena.query") From 72e8042af6ccaf99242c89872daef2df2ffb4e78 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:21:25 +0900 Subject: [PATCH 32/37] Use athena.query> default output value for in athena.ctas> --- .../digdag/plugin/athena/operator/AthenaCtasOperator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala index 4eeeb8a..f6679db 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaCtasOperator.scala @@ -171,6 +171,7 @@ class AthenaCtasOperator(operatorName: String, context: OperatorContext, systemC if (database.isPresent) subTask.set("database", database) subTask.set("timeout", timeout.toString) subTask.set("preview", false) + subTask.set("output", Optional.absent()) subTask.set("auth_method", authMethod) subTask.set("profile_name", profileName) From 04b13e1f457870431b00613a5504ce72dbe081ae Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:22:02 +0900 Subject: [PATCH 33/37] Add new examples for athena.ctas> --- example/example.dig | 11 ++++++++--- example/run.sh | 11 ++++++++--- example/template.sql | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/example/example.dig b/example/example.dig index d9b36a9..adc23e9 100644 --- a/example/example.dig +++ b/example/example.dig @@ -7,9 +7,7 @@ _export: - pro.civitaspo:digdag-operator-athena:0.0.6 athena: auth_method: profile - query: - output: ${output} - value: 5 + value: 5 +step1: athena.query>: template.sql @@ -17,3 +15,10 @@ _export: +step2: echo>: ${athena} ++stap3: + athena.ctas>: + select_query: template.sql + database: ${database} + table: hoge + output: ${output} + diff --git a/example/run.sh b/example/run.sh index 3c0b2eb..d1c7ba4 100755 --- a/example/run.sh +++ b/example/run.sh @@ -3,10 +3,15 @@ ROOT=$(cd $(dirname $0)/..; pwd) EXAMPLE_ROOT=$ROOT/example LOCAL_MAVEN_REPO=$ROOT/build/repo -OUTPUT="$1" +DATABASE="$1" +OUTPUT="$2" +if [ -z "$DATABASE" ]; then + echo "[ERROR] Set database as the first argument." + exit 1 +fi if [ -z "$OUTPUT" ]; then - echo "[ERROR] Set output s3 URI as the first argument." + echo "[ERROR] Set output s3 URI as the second argument." exit 1 fi @@ -17,5 +22,5 @@ fi rm -rfv .digdag ## run - digdag run example.dig -c allow.properties -p repos=${LOCAL_MAVEN_REPO} -p output=${OUTPUT} --no-save + digdag run example.dig -c allow.properties -p repos=${LOCAL_MAVEN_REPO} -p output=${OUTPUT} -p database=${DATABASE} --no-save ) diff --git a/example/template.sql b/example/template.sql index e753441..e2de457 100644 --- a/example/template.sql +++ b/example/template.sql @@ -1 +1 @@ -select ${value} +select ${value} as a From a7235c8ace953dc4713dfa106457622328d5d33e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:29:46 +0900 Subject: [PATCH 34/37] Do not use query output location for log to avoid confusing --- .../digdag/plugin/athena/operator/AthenaQueryOperator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala index 7a170e2..1e4b6ca 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/operator/AthenaQueryOperator.scala @@ -112,7 +112,7 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system val execId: String = startQueryExecution val lastQuery: LastQuery = pollingQueryExecution(execId) - logger.info(s"[$operatorName] Created ${lastQuery.output} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") + logger.info(s"[$operatorName] Executed ${lastQuery.id} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)") val p: Config = buildLastQueryParam(lastQuery) val builder = TaskResult.defaultBuilder(request) From 6e532aa01e4c8de9ffd7386d8da695b04d1c76c0 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:43:58 +0900 Subject: [PATCH 35/37] ./gradlew spotlessapply --- .../pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala index e093b8b..386302e 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala @@ -6,12 +6,7 @@ import java.util.{Arrays => JArrays, List => JList} import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject -import pro.civitaspo.digdag.plugin.athena.operator.{ - AbstractAthenaOperator, - AthenaCtasOperator, - AthenaPreviewOperator, - AthenaQueryOperator, -} +import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaCtasOperator, AthenaPreviewOperator, AthenaQueryOperator} object AthenaPlugin { From d36d69bdf302243db27d13cf2dbaa81cfc414dad Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:56:27 +0900 Subject: [PATCH 36/37] Fix usage on README --- README.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 482235b..fa160c8 100644 --- a/README.md +++ b/README.md @@ -15,17 +15,21 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.6 + - pro.civitaspo:digdag-operator-athena:0.1.0 athena: auth_method: profile +step1: athena.query>: template.sql - output: s3://mybucket/prefix/ +step2: echo>: ${athena.last_query} ++stap3: + athena.ctas>: + select_query: template.sql + table: hoge + output: s3://mybucket/prefix/ ``` # Configuration From 4c265dfc69b33dbd47771159277264deafc75a02 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Sat, 20 Oct 2018 05:56:36 +0900 Subject: [PATCH 37/37] Ship v0.1.0 --- CHANGELOG.md | 8 ++++++++ build.gradle | 2 +- example/example.dig | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a9a08cb..f96d9d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +0.1.0 (2018-10-20) +================== + +* [New Feature] Add `athena.ctas>` operator +* [Breaking Change] Remove **keep_metadata** and **save_mode** option from athena.query operator. +* [Change] Change **output** option in `athena.query` to not required +* [Deprecated] Change **output** option in `athena.query` to deprecated + 0.0.6 (2018-10-14) ================== diff --git a/build.gradle b/build.gradle index 3c061ca..af824f5 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.6' +version = '0.1.0' def digdagVersion = '0.9.27' def awsSdkVersion = "1.11.372" diff --git a/example/example.dig b/example/example.dig index adc23e9..bce25d4 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-athena:0.0.6 + - pro.civitaspo:digdag-operator-athena:0.1.0 athena: auth_method: profile value: 5