Skip to content

Commit

Permalink
Merge pull request #16 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.0.3
  • Loading branch information
civitaspo authored Aug 12, 2018
2 parents a4cf780 + 1333b9a commit e35e83c
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 36 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
0.0.3 (2018-08-13)
==================

* [New Feature / Destructive Change] Add `keep_metadata` option to indicate whether metadata file is expected to be kept on S3. This opiton is `false` by default, so the behaviour is changed when you use the same configuration as the prior version than this version. Please be careful.
* [New Feature / Destructive Change] Add `save_mode` option to specify the expected behavior of saving the query results. This option is `"overwrite"` by default, so the behaviour is changed when you use the same configuration as the prior version than this version. Please be careful.

0.0.2 (2018-08-09)
==================

Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.2
- pro.civitaspo:digdag-operator-athena:0.0.3
athena:
auth_method: profile

Expand Down Expand Up @@ -83,7 +83,13 @@ Define the below options on properties (which is indicated by `-c`, `--config`).
- **athena.query>**: The SQL query statements or file to be executed. You can use digdag's template engine like `${...}` in the SQL query. (string, required)
- **token_prefix**: Prefix for `ClientRequestToken` that a unique case-sensitive string used to ensure the request to create the query is idempotent (executes only once). On this plugin, the token is composed like `${token_prefix}-${session_uuid}-${hash value of query}`. (string, default: `"digdag-athena"`)
- **database**: The name of the database. (string, optional)
- **output**: The location in Amazon S3 where your query results are stored, such as s3://path/to/query/bucket/. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required)
- **output**: The location in Amazon S3 where your query results are stored, such as `"s3://path/to/query/"`. For more information, see [Queries and Query Result Files](https://docs.aws.amazon.com/athena/latest/ug/querying.html). (string, required)
- **keep_metadata**: Indicate whether to keep the metadata after executing the query. (boolean, default: `false`)
- **save_mode**: Specify the expected behavior of saving the query results. Available values are `"append"`, `"error_if_exists"`, `"ignore"`, `"overwrite"`. See the below explanation of the behaviour. (string, default: `"overwrite"`)
- `"append"`: When saving the query results, even if other CSVs already exist, the query results are expected to be saved as another CSV.
- `"error_if_exists"`: When saving the query results, if other CSVs already exists, an exception is expected to be thrown.
- `"ignore"`: When saving the query results, if other CSVs already exists, the save operation is expected to not save the query results and to not change the existing data.
- `"overwrite"`: When saving the query results, if other CSVs already exist, existing data is expected to be overwritten by the query results. This operation is not atomic.
- **timeout**: Specify timeout period. (`DurationParam`, default: `"10m"`)

### Output Parameters
Expand Down
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.2'
version = '0.0.3'

def digdagVersion = '0.9.27'
def awsSdkVersion = "1.11.372"
Expand All @@ -27,8 +27,10 @@ dependencies {

// https://mvnrepository.com/artifact/org.scala-lang/scala-library
compile group: 'org.scala-lang', name: 'scala-library', version: scalaSemanticVersion
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-emr
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-athena
compile group: 'com.amazonaws', name: 'aws-java-sdk-athena', version: awsSdkVersion
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-s3
compile group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: awsSdkVersion
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sts
compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: awsSdkVersion
}
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-athena:0.0.2
- pro.civitaspo:digdag-operator-athena:0.0.3
athena:
auth_method: profile
query:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import com.amazonaws.auth.profile.{ProfileCredentialsProvider, ProfilesConfigFil
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.regions.{DefaultAwsRegionProviderChain, Regions}
import com.amazonaws.services.athena.{AmazonAthena, AmazonAthenaClientBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder
import com.amazonaws.services.securitytoken.model.AssumeRoleRequest
import com.google.common.base.Optional
Expand Down Expand Up @@ -70,6 +71,12 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon
finally athena.shutdown()
}

protected def withS3[T](f: AmazonS3 => T): T = {
val s3 = buildS3
try f(s3)
finally s3.shutdown()
}

private def buildAthena: AmazonAthena = {
val builder = AmazonAthenaClientBuilder
.standard()
Expand All @@ -92,6 +99,28 @@ abstract class AbstractAthenaOperator(operatorName: String, context: OperatorCon
builder.build()
}

private def buildS3: AmazonS3 = {
val builder = AmazonS3ClientBuilder
.standard()
.withClientConfiguration(clientConfiguration)
.withCredentials(credentialsProvider)

if (region.isPresent && endpoint.isPresent) {
val ec = new EndpointConfiguration(endpoint.get(), region.get())
builder.setEndpointConfiguration(ec)
}
else if (region.isPresent && !endpoint.isPresent) {
builder.setRegion(region.get())
}
else if (!region.isPresent && endpoint.isPresent) {
val r = Try(new DefaultAwsRegionProviderChain().getRegion).getOrElse(Regions.DEFAULT_REGION.getName)
val ec = new EndpointConfiguration(endpoint.get(), r)
builder.setEndpointConfiguration(ec)
}

builder.build()
}

private def credentialsProvider: AWSCredentialsProvider = {
if (!roleArn.isPresent) return standardCredentialsProvider
assumeRoleCredentialsProvider(standardCredentialsProvider)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,83 @@ import com.amazonaws.services.athena.model.{
StartQueryExecutionRequest
}
import com.amazonaws.services.athena.model.QueryExecutionState.{CANCELLED, FAILED, QUEUED, RUNNING, SUCCEEDED}
import com.amazonaws.services.s3.AmazonS3URI
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.google.common.base.Optional
import com.google.common.collect.ImmutableList
import io.digdag.client.config.{Config, ConfigKey}
import io.digdag.client.config.{Config, ConfigException, ConfigKey}
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
import io.digdag.util.DurationParam
import pro.civitaspo.digdag.plugin.athena.wrapper.{NotRetryableException, ParamInGiveup, ParamInRetry, RetryableException, RetryExecutorWrapper}

import scala.collection.JavaConverters._
import scala.util.Try
import scala.util.hashing.MurmurHash3

class AthenaQueryOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine) {

sealed abstract class SaveMode

object SaveMode {
final case object Append extends SaveMode
final case object ErrorIfExists extends SaveMode
final case object Ignore extends SaveMode
final case object Overwrite extends SaveMode

def apply(mode: String): SaveMode = {
mode match {
case "append" => Append
case "error_if_exists" => ErrorIfExists
case "ignore" => Ignore
case "overwrite" => Overwrite
case unknown => throw new ConfigException(s"[$operatorName] save mode '$unknown' is unsupported.")
}
}
}

case class LastQuery(
id: String,
database: Option[String] = None,
query: String,
outputCsvUri: AmazonS3URI,
outputCsvMetadataUri: AmazonS3URI,
scanBytes: Option[Long] = None,
execMillis: Option[Long] = None,
state: QueryExecutionState,
stateChangeReason: Option[String] = None,
submittedAt: Option[Long] = None,
completedAt: Option[Long] = None
)

object LastQuery {

def apply(qe: QueryExecution): LastQuery = {
new LastQuery(
id = qe.getQueryExecutionId,
database = Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None),
query = qe.getQuery,
outputCsvUri = new AmazonS3URI(qe.getResultConfiguration.getOutputLocation, false),
outputCsvMetadataUri = new AmazonS3URI(s"${qe.getResultConfiguration.getOutputLocation}.metadata", false),
scanBytes = Try(Option(qe.getStatistics.getDataScannedInBytes.toLong)).getOrElse(None),
execMillis = Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis.toLong)).getOrElse(None),
state = QueryExecutionState.fromValue(qe.getStatus.getState),
stateChangeReason = Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None),
submittedAt = Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None),
completedAt = Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None)
)
}
}

protected val queryOrFile: String = params.get("_command", classOf[String])
protected val tokenPrefix: String = params.get("token_prefix", classOf[String], "digdag-athena")
protected val database: Optional[String] = params.getOptional("database", classOf[String])
protected val output: String = params.get("output", classOf[String])
protected val output: AmazonS3URI = {
val o = params.get("output", classOf[String])
new AmazonS3URI(if (o.endsWith("/")) o else s"$o/")
}
protected val keepMetadata: Boolean = params.get("keep_metadata", classOf[Boolean], false)
protected val saveMode: SaveMode = SaveMode(params.get("save_mode", classOf[String], "overwrite"))
protected val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("10m"))

protected lazy val query: String = {
Expand All @@ -45,33 +105,64 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
}

override def runTask(): TaskResult = {
val execId: String = withAthena { athena =>
val req = buildStartQueryExecutionRequest
val r = athena.startQueryExecution(req)
r.getQueryExecutionId
saveMode match {
case SaveMode.ErrorIfExists =>
val result = withS3(_.listObjectsV2(output.getBucket, output.getKey))
if (!result.getObjectSummaries.isEmpty) throw new IllegalStateException(s"[$operatorName] some objects already exists in `$output`.")
case SaveMode.Ignore =>
val result = withS3(_.listObjectsV2(output.getBucket, output.getKey))
if (!result.getObjectSummaries.isEmpty) {
logger.info(s"[$operatorName] some objects already exists in $output so do nothing in this session: `$sessionUuid`.")
val builder = TaskResult.defaultBuilder(request)
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query")))
return builder.build()
}
case _ => // do nothing
}

val execId: String = startQueryExecution
val lastQuery: LastQuery = pollingQueryExecution(execId)

if (saveMode.equals(SaveMode.Overwrite)) {
withS3(_.listObjectsV2(output.getBucket, output.getKey)).getObjectSummaries.asScala
.filterNot(_.getKey.startsWith(lastQuery.outputCsvUri.getKey)) // filter output.csv and output.csv.metadata
.foreach { summary: S3ObjectSummary =>
logger.info(s"[$operatorName] Delete s3://${summary.getBucketName}/${summary.getKey}")
withS3(_.deleteObject(summary.getBucketName, summary.getKey))
}
}
if (!keepMetadata) {
logger.info(s"[$operatorName] Delete ${lastQuery.outputCsvMetadataUri.toString}.")
withS3(_.deleteObject(lastQuery.outputCsvMetadataUri.getBucket, lastQuery.outputCsvMetadataUri.getKey))
}

val qe: QueryExecution = pollingQueryExecution(execId)
val p: Config = buildStoredParamFromQueryExecution(qe)
logger.info(s"[$operatorName] Created ${lastQuery.outputCsvUri} (scan: ${lastQuery.scanBytes.orNull} bytes, time: ${lastQuery.execMillis.orNull}ms)")
val p: Config = buildLastQueryParam(lastQuery)

val builder = TaskResult.defaultBuilder(request)
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_query")))
builder.storeParams(p)
builder.build()
}

def buildStartQueryExecutionRequest: StartQueryExecutionRequest = {
protected def startQueryExecution: String = {
val req = buildStartQueryExecutionRequest
val r = withAthena(_.startQueryExecution(req))
r.getQueryExecutionId
}

protected def buildStartQueryExecutionRequest: StartQueryExecutionRequest = {
val req = new StartQueryExecutionRequest()

req.setClientRequestToken(clientRequestToken)
if (database.isPresent) req.setQueryExecutionContext(new QueryExecutionContext().withDatabase(database.get()))
req.setQueryString(query)
req.setResultConfiguration(new ResultConfiguration().withOutputLocation(output))
req.setResultConfiguration(new ResultConfiguration().withOutputLocation(output.toString))

req
}

def pollingQueryExecution(execId: String): QueryExecution = {
protected def pollingQueryExecution(execId: String): LastQuery = {
val req = new GetQueryExecutionRequest().withQueryExecutionId(execId)

RetryExecutorWrapper()
Expand All @@ -95,14 +186,12 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
}
.runInterruptible {
val r = withAthena(_.getQueryExecution(req))
val qe = r.getQueryExecution
val status = Option(qe.getStatus).getOrElse(throw new RetryableException("status is null"))
val stateStr = Option(status.getState).getOrElse(throw new RetryableException("state is null"))
val lastQuery = LastQuery(r.getQueryExecution)

QueryExecutionState.fromValue(stateStr) match {
lastQuery.state match {
case SUCCEEDED =>
logger.info(s"[$operatorName] query is `$SUCCEEDED`")
qe
lastQuery
case FAILED => throw new NotRetryableException(message = s"[$operatorName] query is `$FAILED`")
case CANCELLED => throw new NotRetryableException(message = s"[$operatorName] query is `$CANCELLED`")
case RUNNING => throw new RetryableException(message = s"query is `$RUNNING`")
Expand All @@ -111,20 +200,20 @@ class AthenaQueryOperator(operatorName: String, context: OperatorContext, system
}
}

def buildStoredParamFromQueryExecution(qe: QueryExecution): Config = {
protected def buildLastQueryParam(lastQuery: LastQuery): Config = {
val ret = cf.create()
val lastQueryParam = ret.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_query")

lastQueryParam.set("id", qe.getQueryExecutionId)
lastQueryParam.set("database", Try(Option(qe.getQueryExecutionContext.getDatabase)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("query", qe.getQuery)
lastQueryParam.set("output", qe.getResultConfiguration.getOutputLocation)
lastQueryParam.set("scan_bytes", Try(Option(qe.getStatistics.getDataScannedInBytes)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("exec_millis", Try(Option(qe.getStatistics.getEngineExecutionTimeInMillis)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("state", Try(Option(qe.getStatus.getState)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("state_change_reason", Try(Option(qe.getStatus.getStateChangeReason)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("submitted_at", Try(Option(qe.getStatus.getSubmissionDateTime.getTime / 1000)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("completed_at", Try(Option(qe.getStatus.getCompletionDateTime.getTime / 1000)).getOrElse(None).getOrElse(Optional.absent()))
lastQueryParam.set("id", lastQuery.id)
lastQueryParam.set("database", lastQuery.database.getOrElse(Optional.absent()))
lastQueryParam.set("query", lastQuery.query)
lastQueryParam.set("output", lastQuery.outputCsvUri.toString)
lastQueryParam.set("scan_bytes", lastQuery.scanBytes.getOrElse(Optional.absent()))
lastQueryParam.set("exec_millis", lastQuery.execMillis.getOrElse(Optional.absent()))
lastQueryParam.set("state", lastQuery.state)
lastQueryParam.set("state_change_reason", lastQuery.stateChangeReason.getOrElse(Optional.absent()))
lastQueryParam.set("submitted_at", lastQuery.submittedAt.getOrElse(Optional.absent()))
lastQueryParam.set("completed_at", lastQuery.completedAt.getOrElse(Optional.absent()))

ret
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class RetryExecutorWrapper(exe: RetryExecutor, param: ParamInWrapper) {
RetryExecutorWrapper(exe.retryIf(r), param)
}

def onRetry(f: ParamInRetry => Unit): RetryExecutorWrapper = {
def onRetry(f: ParamInRetry => Unit = _ => ()): RetryExecutorWrapper = {
val r = new RetryAction {
override def onRetry(exception: Exception, retryCount: Int, retryLimit: Int, retryWait: Int): Unit = {
val totalWaitMillis: Int = param.totalWaitMillisCounter.next()
Expand Down Expand Up @@ -94,9 +94,7 @@ class RetryExecutorWrapper(exe: RetryExecutor, param: ParamInWrapper) {
}

private def executeWithWrappedRetryExecutorWrapper[T](f: RetryExecutorWrapper => T): T = {
val wrapped = if (!param.hasOnRetry) onRetry { _ =>
}
else this
val wrapped = if (!param.hasOnRetry) onRetry() else this
try f(wrapped)
catch {
case ex: RetryGiveupException => throw new NotRetryableException(cause = ex)
Expand Down

0 comments on commit e35e83c

Please sign in to comment.