Skip to content

Commit

Permalink
Merge pull request #29 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.1.0
  • Loading branch information
civitaspo authored Oct 19, 2018
2 parents f5fcac1 + 60cbfb4 commit e28fca2
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 171 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
0.1.0 (2018-10-20)
==================

* [New Feature] Add `athena.ctas>` operator
* [Breaking Change] Remove **keep_metadata** and **save_mode** option from athena.query operator.
* [Change] Change **output** option in `athena.query` to not required
* [Deprecated] Change **output** option in `athena.query` to deprecated

0.0.6 (2018-10-14)
==================

Expand Down
51 changes: 38 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.6
- pro.civitaspo:digdag-operator-athena:0.1.0
athena:
auth_method: profile

+step1:
athena.query>: template.sql
output: s3://mybucket/prefix/

+step2:
echo>: ${athena.last_query}

+stap3:
athena.ctas>:
select_query: template.sql
table: hoge
output: s3://mybucket/prefix/
```
# Configuration
Expand Down Expand Up @@ -83,19 +87,9 @@ Define the below options on properties (which is indicated by `-c`, `--config`).
- **athena.query>**: The SQL query statements or file to be executed. You can use digdag's template engine like `${...}` in the SQL query. (string, required)
- **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena"`)
- **database**: The name of the database. (string, optional)
- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required)
- **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`)
- **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata.
- **NOTE**: [Athena supports CTAS](https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/), so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'keep_metadata' option will be removed and the default behaviour will become the same as `keep_metadata: true` (the current default behaviour is the same as `keep_metadata: false`) because this option was added for that the metadata file is obstructive when using the output csv as another table.
- **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`)
- `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV.
- `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown.
- `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data.
- `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic.
- **NOTE**: [Athena supports CTAS](https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/), so digdag-operator-athena will support it as `athena.ctas>` operator. After that, 'save_mode' option will be removed and the behaviour will become the same as `save_mode: append` (the current default behaviour is the same as `save_mode: overwrite`) because this option was added for that lots of duplicated output csv files which are created by other executions are sometimes obstructive when using the output csv as another table.
- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-<AWS_REGION>"`)
- **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`)
- **preview**: Call `athena.preview>` operator after run `athena.query>`. (boolean, default: `true`)
- **NOTE**: If **keep_metadata** is false, `athena.preview>` operator cannot be used except in this time, because athena [`GetQueryResults API`](https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html) requires metadata.

### Output Parameters

Expand Down Expand Up @@ -133,6 +127,37 @@ Define the below options on properties (which is indicated by `-c`, `--config`).
- **type**: The data type of the column. (string)
- **athena.last_preview.rows**: The rows in the preview results. (array of array)

## Configuration for `athena.ctas>` operator

### Options

- **select_query**: The select SQL statements or file to be executed for a new table by [`Create Table As Select`]((https://aws.amazon.com/jp/about-aws/whats-new/2018/10/athena_ctas_support/)). You can use digdag's template engine like `${...}` in the SQL query. (string, required)
- **database**: The database name for query execution context. (string, optional)
- **table**: The table name for the new table (string, default: `digdag-athena-ctas-${session_uuid}`)
- **output**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-<AWS_REGION>/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`)
- **format**: The data format for the CTAS query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: `"parquet"`)
- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: `"snappy"`)
- **field_delimiter**: The field delimiter for files in CSV, TSV, and text files. This option is applied only when **format** is specific to text-based data storage formats. (string, optional)
- **partitioned_by**: An array list of columns by which the CTAS table will be partitioned. Verify that the names of partitioned columns are listed last in the list of columns in the SELECT statement. (array of string, optional)
- **bucketed_by**: An array list of buckets to bucket data. If omitted, Athena does not bucket your data in this query. (array of string, optional)
- **bucket_count**: The number of buckets for bucketing your data. If omitted, Athena does not bucket your data. (integer, optional)
- **additional_properties**: Additional properties for CTAS. These are used for CTAS WITH clause without escaping. (string to string map, optional)
- **table_mode**: Specify the expected behavior of CTAS results. Available values are `"default"`, `"empty"`, `"data_only"`. See the below explanation of the behaviour. (string, default: `"default"`)
- `"default"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena.
- `"empty_table"`: Create a new empty table with the same schema as the select query results.
- `"data_only"`:
- **save_mode**: Specify the expected behavior of CTAS. Available values are `"none"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`)
- `"none"`: Do not do any care. This option require the least IAM privileges for digdag, but the behaviour depends on Athena.
- `"error_if_exists"`: Raise error if the distination table or location exists.
- `"ignore"`: Skip CTAS query if the distination table or location exists.
- `"overwrite"`: Drop the distination table and remove objects before executing CTAS. This operation is not atomic.
- **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena-ctas"`)
- **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`)

### Output Parameters

Nothing

# Development

## Run an Example
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.6'
version = '0.1.0'

def digdagVersion = '0.9.27'
def awsSdkVersion = "1.11.372"
Expand Down
13 changes: 9 additions & 4 deletions example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,21 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.6
- pro.civitaspo:digdag-operator-athena:0.1.0
athena:
auth_method: profile
query:
output: ${output}
value: 5
value: 5

+step1:
athena.query>: template.sql

+step2:
echo>: ${athena}

+stap3:
athena.ctas>:
select_query: template.sql
database: ${database}
table: hoge
output: ${output}

11 changes: 8 additions & 3 deletions example/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@
ROOT=$(cd $(dirname $0)/..; pwd)
EXAMPLE_ROOT=$ROOT/example
LOCAL_MAVEN_REPO=$ROOT/build/repo
OUTPUT="$1"
DATABASE="$1"
OUTPUT="$2"

if [ -z "$DATABASE" ]; then
echo "[ERROR] Set database as the first argument."
exit 1
fi
if [ -z "$OUTPUT" ]; then
echo "[ERROR] Set output s3 URI as the first argument."
echo "[ERROR] Set output s3 URI as the second argument."
exit 1
fi

Expand All @@ -17,5 +22,5 @@ fi
rm -rfv .digdag

## run
digdag run example.dig -c allow.properties -p repos=${LOCAL_MAVEN_REPO} -p output=${OUTPUT} --no-save
digdag run example.dig -c allow.properties -p repos=${LOCAL_MAVEN_REPO} -p output=${OUTPUT} -p database=${DATABASE} --no-save
)
2 changes: 1 addition & 1 deletion example/template.sql
Original file line number Diff line number Diff line change
@@ -1 +1 @@
select ${value}
select ${value} as a
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pro.civitaspo.digdag.plugin.athena

import java.util.{Arrays => JArrays, List => JList}
import java.lang.reflect.Constructor
import java.util.{Arrays => JArrays, List => JList}

import io.digdag.client.config.Config
import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine}
import javax.inject.Inject
import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaPreviewOperator, AthenaQueryOperator, AthenaRemoveMetadataOperator}
import pro.civitaspo.digdag.plugin.athena.operator.{AbstractAthenaOperator, AthenaCtasOperator, AthenaPreviewOperator, AthenaQueryOperator}

object AthenaPlugin {

Expand All @@ -17,9 +17,9 @@ object AthenaPlugin {

override def get(): JList[OperatorFactory] = {
JArrays.asList(
operatorFactory("athena.ctas", classOf[AthenaCtasOperator]),
operatorFactory("athena.query", classOf[AthenaQueryOperator]),
operatorFactory("athena.preview", classOf[AthenaPreviewOperator]),
operatorFactory("athena.remove_metadata", classOf[AthenaRemoveMetadataOperator])
operatorFactory("athena.preview", classOf[AthenaPreviewOperator])
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ import com.amazonaws.auth.{
SystemPropertiesCredentialsProvider
}
import com.amazonaws.auth.profile.{ProfileCredentialsProvider, ProfilesConfigFile}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}
import com.amazonaws.services.athena.{AmazonAthena, AmazonAthenaClientBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.{AWSSecurityTokenService, AWSSecurityTokenServiceClientBuilder}
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
import com.google.common.base.Optional
import io.digdag.client.config.{Config, ConfigException, ConfigFactory}
Expand All @@ -30,7 +31,7 @@ import scala.util.Try
abstract class AbstractAthenaOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends BaseOperator(context) {

protected val logger: Logger = LoggerFactory.getLogger(this.getClass)
protected val logger: Logger = LoggerFactory.getLogger(operatorName)
protected val cf: ConfigFactory = request.getConfig.getFactory
protected val params: Config = {
val elems: Seq[String] = operatorName.split("\\.")
Expand Down Expand Up @@ -77,34 +78,34 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon
finally s3.shutdown()
}

protected def withSts[T](f: AWSSecurityTokenService => T): T = {
val sts = buildSts
try f(sts)
finally sts.shutdown()
}

private def buildAthena: AmazonAthena = {
val builder = AmazonAthenaClientBuilder
.standard()
configureBuilderEndpointConfiguration(AmazonAthenaClientBuilder.standard())
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider)

if (region.isPresent && endpoint.isPresent) {
val ec = new EndpointConfiguration(endpoint.get(), region.get())
builder.setEndpointConfiguration(ec)
}
else if (region.isPresent && !endpoint.isPresent) {
builder.setRegion(region.get())
}
else if (!region.isPresent && endpoint.isPresent) {
val r = Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName)
val ec = new EndpointConfiguration(endpoint.get(), r)
builder.setEndpointConfiguration(ec)
}

builder.build()
.build()
}

private def buildS3: AmazonS3 = {
val builder = AmazonS3ClientBuilder
.standard()
configureBuilderEndpointConfiguration(AmazonS3ClientBuilder.standard())
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider)
.build()
}

private def buildSts: AWSSecurityTokenService = {
configureBuilderEndpointConfiguration(AWSSecurityTokenServiceClientBuilder.standard())
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider)
.build()
}

private def configureBuilderEndpointConfiguration[S <: AwsClientBuilder[S, T], T](builder: AwsClientBuilder[S, T]): AwsClientBuilder[S, T] = {
if (region.isPresent && endpoint.isPresent) {
val ec = new EndpointConfiguration(endpoint.get(), region.get())
builder.setEndpointConfiguration(ec)
Expand All @@ -117,8 +118,7 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon
val ec = new EndpointConfiguration(endpoint.get(), r)
builder.setEndpointConfiguration(ec)
}

builder.build()
builder
}

private def credentialsProvider: AWSCredentialsProvider = {
Expand Down
Loading

0 comments on commit e28fca2

Please sign in to comment.