diff --git a/CHANGELOG.md b/CHANGELOG.md index b094b4f..71721d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +0.0.7 (2018-11-19) +================== + +* [Enhancement] Add examples for scripting operators and update `ecs_task.run>` example. +* [Enhancement] Always normalize ECS Task family name. +* [Enhancement] Update aws-java-sdk 1.11.433 -> 1.11.451 +* [Enhancement] Add new options (`secrets`, `tags`) that follow ECS new release. `ipc_mode` and `pid_mode` are not supported yet because aws-java-sdk does not supports them. + 0.0.6 (2018-11-13) ================== diff --git a/README.md b/README.md index 3e697b7..41ca36a 100644 --- a/README.md +++ b/README.md @@ -15,34 +15,129 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.6 + - pro.civitaspo:digdag-operator-ecs_task:0.0.7 ecs_task: auth_method: profile - -+step0: - sh>: echo '{"store_params":{"civi":"taspo"}}' | aws s3 cp - ${output} - -+step1: - ecs_task.run>: - def: - network_mode: Host - container_definitions: - - name: uploader - image: amazonlinux:2 - command: [yum, install, '-y', awscli] - essential: true - memory: 500 - cpu: 10 - family: hello_world - cluster: ${cluster} - count: 1 - result_s3_uri: ${output} - -+step2: - echo>: ${civi} + tmp_storage: + type: s3 + uri: ${output} + family_prefix: hello- + cluster: ${cluster} + network_mode: host + memory: 1 GB + task_role_arn: ${task_role_arn} + ++ecs_task.run: + +step1: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step1 + image: civitaspo/python-awscli:latest + command: [echo, step1] + essential: true + memory: 500 + cpu: 1 + family: step1 + count: 1 + +step2: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step2 + image: civitaspo/python-awscli:latest + command: [echo, step2] + essential: true + memory: 500 + cpu: 1 + family: step2 + count: 1 + +step3: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step3 + image: civitaspo/python-awscli:latest + command: + - sh + - -c + - echo '{"store_params":{"civi":"taspo"}}' | aws s3 cp - ${output}/${session_uuid}.json + essential: true + memory: 500 + cpu: 1 + task_role_arn: ${task_role_arn} + family: step3 + count: 1 + result_s3_uri: ${output}/${session_uuid}.json + + +step4: + echo>: ${civi} + ++ecs_task.sh: + +step0: + ecs_task.sh>: env + image: civitaspo/digdag-awscli:latest + _export: + message: + message: 'hello ecs_task.rb' + created_by: civitaspo + ++ecs_task.rb: + +step0: + ecs_task.rb>: echo + require: echo + gem_install: [awesome_print] + image: civitaspo/ruby-awscli:latest + _export: + message: + message: 'hello ecs_task.rb' + created_by: civitaspo + ++ecs_task.py: + +step0: + ecs_task.py>: echo.echo + pip_install: [PyYaml] + image: civitaspo/python-awscli:latest + _export: + message: + message: 'hello ecs_task.py' + created_by: civitaspo + ++ecs_task.embulk: + _export: + path_prefix: ./csv/ + +dig: + ecs_task.embulk>: + in: + type: file + path_prefix: ${path_prefix} + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: ',' + quote: '"' + escape: '"' + null_string: 'NULL' + skip_header_lines: 0 + columns: + - {name: id, type: long} + - {name: comment, type: string} + out: + type: stdout + image: civitaspo/embulk-awscli:latest + + +file: + ecs_task.embulk>: template.yml + image: civitaspo/embulk-awscli:latest ``` +See [example](./example). + # Configuration ## Remarks @@ -138,12 +233,8 @@ In addition, the below configurations exist. - **execution_role_arn**: The Amazon Resource Name (ARN) of the task execution role that the Amazon ECS container agent and the Docker daemon can assume. (string, optional) - **family_prefix**: The family name prefix for a task definition. This is used if **family** is not defined. (string, default: `""`) - **family_infix**: The family name infix for a task definition. This is used if **family** is not defined. (string, default: `"${task_name}"`) - - The default value is replaced as below: - - `+` -> `_` - - `^` -> `_` - - `=` -> `_` - **family_suffix**: The family name sufix for a task definition. This is used if **family** is not defined. (string, default: `""`) -- **family**: You must specify a `family` for a task definition, which allows you to track multiple versions of the same task definition. The `family` is used as a name for your task definition. Up to 255 letters (uppercase and lowercase), numbers, hyphens, and underscores are allowed. (string, default: `"${family_prefix}${family_infix}${family_suffix}"`) +- **family**: You must specify a `family` for a task definition, which allows you to track multiple versions of the same task definition. The `family` is used as a name for your task definition. Up to 255 letters (uppercase and lowercase), numbers, hyphens, and underscores are allowed. If invalid charactors are found, these are replaced to `"_"`. (string, default: `"${family_prefix}${family_infix}${family_suffix}"`) - **memory**: The amount of memory (in MiB) used by the task. It can be expressed as an integer using MiB, for example `1024`, or as a string using GB, for example `1GB` or `1 GB`, in a task definition. String values are converted to an integer indicating the MiB when the task definition is registered. (string, optional) - See the docs for more info: [ECS-RegisterTaskDefinition-request-memory](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html#ECS-RegisterTaskDefinition-request-memory) - **network_mode**: The Docker networking mode to use for the containers in the task. The valid values are `none`, `bridge`, `awsvpc`, and `host`. The default Docker network mode is `bridge`. If using the Fargate launch type, the `awsvpc` network mode is required. If using the EC2 launch type, any network mode can be used. If the network mode is set to `none`, you can't specify port mappings in your container definitions, and the task's containers do not have external connectivity. The `host` and `awsvpc` network modes offer the highest networking performance for containers because they use the EC2 network stack instead of the virtualized network stack provided by the `bridge` mode. With the `host` and `awsvpc` network modes, exposed container ports are mapped directly to the corresponding host port (for the `host` network mode) or the attached elastic network interface port (for the `awsvpc` network mode), so you cannot take advantage of dynamic host port mappings. If the network mode is `awsvpc`, the task is allocated an Elastic Network Interface, and you must specify the **network_configuration** option when you create a service or run a task with the task definition. For more information, see [Task Networking](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-networking.html) in the Amazon Elastic Container Service Developer Guide. If the network mode is `host`, you can't run multiple instantiations of the same task on a single container instance when port mappings are used. Docker for Windows uses different network modes than Docker for Linux. When you register a task definition with Windows containers, you must not specify a network mode. (string, optional) @@ -175,6 +266,8 @@ In addition, the below configurations exist. - **pseudo_terminal**: When this parameter is `true`, a TTY is allocated. (boolean, optional) - **readonly_root_filesystem**: When this parameter is `true`, the container is given read-only access to its root file system. (boolean, optional) - **repository_credentials**: The private repository authentication credentials to use. The configuration map is the same as the snake-cased [API_RepositoryCredentials](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RepositoryCredentials.html). (map, optional) +- **secrets**: The secrets to pass to the container. (array of map, optional) + - The configuration map is the same as the snake-cased [API_Secret](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_Secret.html). - **system_controls**: A list of namespaced kernel parameters to set in the container. For more information, see [ECS-Type-ContainerDefinition-systemControls](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-systemControls). (array of map, optional) - The configuration map is the same as the snake-cased [API_SystemControl](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_SystemControl.html). - **ulimits**: A list of ulimits to set in the container. (array of map, optional) @@ -195,6 +288,8 @@ In addition, the below configurations exist. - The configuration map is the same as the snake-cased [API_PlacementStrategy](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_PlacementStrategy.html). - **platform_version**: The platform version on which to run your task. If one is not specified, the latest version is used by default. (string, optional) - **started_by**: An optional tag specified when a task is started. (string, optional) +- **tags**: The metadata that you apply to the task to help you categorize and organize them. Each tag consists of a key and an optional value, both of which you define. Tag keys can have a maximum character length of 128 characters, and tag values can have a maximum length of 256 characters. (string to string map, optional) + - For using this option, require [migrating Amazon ECS deployment to the new ARN and resource ID format](Migrating your Amazon ECS deployment to the new ARN and resource ID format). - **workspace_s3_uri_prefix**: S3 uri prefix for using as workspace. (string, required) - Currently, input params, output params, stdout, stderr, and internal scripts are put on S3, and then they are not removed. So it's insecure unless strict access control to S3. - This option is **deprecated**. Please use **tmp_storage** option instead. @@ -245,7 +340,7 @@ aws configure ### 3) run an example ```sh -./example/run.sh +./example/run.sh ${ECS Cluster Name} ${S3 URI Prefix for tmp storage} ${ECS Task Role ARN} ``` ## (TODO) Run Tests diff --git a/build.gradle b/build.gradle index e36184e..15dd253 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.6' +version = '0.0.7' def digdagVersion = '0.9.31' def scalaSemanticVersion = "2.12.6" @@ -29,7 +29,7 @@ dependencies { ['ecs', 's3', 'sts'].each { svc -> // https://mvnrepository.com/artifact/com.amazonaws/ - compile group: 'com.amazonaws', name: "aws-java-sdk-${svc}", version: '1.11.433' + compile group: 'com.amazonaws', name: "aws-java-sdk-${svc}", version: '1.11.451' } } diff --git a/example/ecs_task.embulk/csv/data.01.csv b/example/ecs_task.embulk/csv/data.01.csv new file mode 100644 index 0000000..231608c --- /dev/null +++ b/example/ecs_task.embulk/csv/data.01.csv @@ -0,0 +1,3 @@ +1,aaa +2,bbb +3,ccc diff --git a/example/ecs_task.embulk/csv/data.02.csv b/example/ecs_task.embulk/csv/data.02.csv new file mode 100644 index 0000000..231608c --- /dev/null +++ b/example/ecs_task.embulk/csv/data.02.csv @@ -0,0 +1,3 @@ +1,aaa +2,bbb +3,ccc diff --git a/example/ecs_task.embulk/example.dig b/example/ecs_task.embulk/example.dig new file mode 100644 index 0000000..250e2e4 --- /dev/null +++ b/example/ecs_task.embulk/example.dig @@ -0,0 +1,29 @@ + ++ecs_task.embulk: + _export: + path_prefix: ./csv/ + +dig: + ecs_task.embulk>: + in: + type: file + path_prefix: ${path_prefix} + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: ',' + quote: '"' + escape: '"' + null_string: 'NULL' + skip_header_lines: 0 + columns: + - {name: id, type: long} + - {name: comment, type: string} + out: + type: stdout + image: civitaspo/embulk-awscli:latest + + +file: + ecs_task.embulk>: template.yml + image: civitaspo/embulk-awscli:latest + diff --git a/example/ecs_task.embulk/template.yml b/example/ecs_task.embulk/template.yml new file mode 100644 index 0000000..c3a98be --- /dev/null +++ b/example/ecs_task.embulk/template.yml @@ -0,0 +1,18 @@ +in: + type: file + path_prefix: ${path_prefix} + parser: + charset: UTF-8 + newline: CRLF + type: csv + delimiter: ',' + quote: '"' + escape: '"' + null_string: 'NULL' + skip_header_lines: 0 + columns: + - {name: id, type: long} + - {name: comment, type: string} +out: + type: stdout + diff --git a/example/ecs_task.py/echo.py b/example/ecs_task.py/echo.py new file mode 100644 index 0000000..3eb2ed6 --- /dev/null +++ b/example/ecs_task.py/echo.py @@ -0,0 +1,6 @@ +import yaml + + +def echo(message): + print(yaml.dump(message)) + diff --git a/example/ecs_task.py/example.dig b/example/ecs_task.py/example.dig new file mode 100644 index 0000000..07ba4bd --- /dev/null +++ b/example/ecs_task.py/example.dig @@ -0,0 +1,11 @@ + ++ecs_task.py: + +step0: + ecs_task.py>: echo.echo + pip_install: [PyYaml] + image: civitaspo/python-awscli:latest + _export: + message: + message: 'hello ecs_task.py' + created_by: civitaspo + diff --git a/example/ecs_task.rb/echo.rb b/example/ecs_task.rb/echo.rb new file mode 100644 index 0000000..b8f89fc --- /dev/null +++ b/example/ecs_task.rb/echo.rb @@ -0,0 +1,5 @@ +require 'awesome_print' + +def echo(message) + ap message +end diff --git a/example/ecs_task.rb/example.dig b/example/ecs_task.rb/example.dig new file mode 100644 index 0000000..47644dd --- /dev/null +++ b/example/ecs_task.rb/example.dig @@ -0,0 +1,12 @@ + ++ecs_task.rb: + +step0: + ecs_task.rb>: echo + require: echo + gem_install: [awesome_print] + image: civitaspo/ruby-awscli:latest + _export: + message: + message: 'hello ecs_task.rb' + created_by: civitaspo + diff --git a/example/ecs_task.run/example.dig b/example/ecs_task.run/example.dig new file mode 100644 index 0000000..de786fd --- /dev/null +++ b/example/ecs_task.run/example.dig @@ -0,0 +1,49 @@ + ++ecs_task.run: + +step1: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step1 + image: civitaspo/python-awscli:latest + command: [echo, step1] + essential: true + memory: 500 + cpu: 1 + family: step1 + count: 1 + +step2: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step2 + image: civitaspo/python-awscli:latest + command: [echo, step2] + essential: true + memory: 500 + cpu: 1 + family: step2 + count: 1 + +step3: + ecs_task.run>: + def: + network_mode: host + container_definitions: + - name: step3 + image: civitaspo/python-awscli:latest + command: + - sh + - -c + - echo '{"store_params":{"civi":"taspo"}}' | aws s3 cp - ${output}/${session_uuid}.json + essential: true + memory: 500 + cpu: 1 + task_role_arn: ${task_role_arn} + family: step3 + count: 1 + result_s3_uri: ${output}/${session_uuid}.json + + +step4: + echo>: ${civi} diff --git a/example/ecs_task.sh/example.dig b/example/ecs_task.sh/example.dig new file mode 100644 index 0000000..ec6835b --- /dev/null +++ b/example/ecs_task.sh/example.dig @@ -0,0 +1,10 @@ + ++ecs_task.sh: + +step0: + ecs_task.sh>: env + image: civitaspo/digdag-awscli:latest + _export: + message: + message: 'hello ecs_task.rb' + created_by: civitaspo + diff --git a/example/example.dig b/example/example.dig index 15d0fd1..91b8e85 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,26 +4,34 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.6 + - pro.civitaspo:digdag-operator-ecs_task:0.0.7 ecs_task: auth_method: profile + tmp_storage: + type: s3 + uri: ${output} + family_prefix: hello- + cluster: ${cluster} + network_mode: host + memory: 1 GB + # NOTE: For using this option, require the ECS Cluster migration. See ttps://aws.amazon.com/jp/blogs/compute/migrating-your-amazon-ecs-deployment-to-the-new-arn-and-resource-id-format-2/ + # tags: + # environment: development + # created_by: digdag-operator-ecs_task + # digdag.session_uuid: ${session_uuid} + task_role_arn: ${task_role_arn} -+step0: - sh>: echo '{"store_params":{"civi":"taspo"}}' | aws s3 cp - ${output} -+step1: - ecs_task.run>: - def: - network_mode: Host - container_definitions: - - name: uploader - image: amazonlinux:2 - command: [yum, install, '-y', awscli] - essential: true - memory: 500 - cpu: 10 - family: hello_world - cluster: ${cluster} - count: 1 - result_s3_uri: ${output} -+step2: - echo>: ${civi} ++ecs_task.sh: + call>: ecs_task.sh/example + ++ecs_task.rb: + call>: ecs_task.rb/example + ++ecs_task.py: + call>: ecs_task.py/example + ++ecs_task.embulk: + call>: ecs_task.embulk/example + ++ecs_task.run: + call>: ecs_task.run/example diff --git a/example/run.sh b/example/run.sh index fa1a646..d2023d0 100755 --- a/example/run.sh +++ b/example/run.sh @@ -6,6 +6,7 @@ LOCAL_MAVEN_REPO=$ROOT/build/repo CLUSTER="$1" OUTPUT="$2" +TASK_ROLE_ARN="$3" if [ -z "$CLUSTER" ]; then echo "[ERROR] Set cluster as the first argument." @@ -15,6 +16,10 @@ if [ -z "$OUTPUT" ]; then echo "[ERROR] Set output s3 URI as the second argument." exit 1 fi +if [ -z "$TASK_ROLE_ARN" ]; then + echo "[ERROR] Set task role arn as the third argument." + exit 1 +fi ( cd $EXAMPLE_ROOT @@ -23,5 +28,11 @@ fi rm -rfv .digdag ## run - digdag run example.dig -c digdag.properties -p repos=${LOCAL_MAVEN_REPO} -p output=${OUTPUT} -p cluster=${CLUSTER} --no-save + digdag run example.dig \ + -c digdag.properties \ + -p repos=${LOCAL_MAVEN_REPO} \ + -p output=${OUTPUT} \ + -p cluster=${CLUSTER} \ + -p task_role_arn=${TASK_ROLE_ARN} \ + --no-save ) diff --git a/example/template.txt b/example/template.txt deleted file mode 100644 index d794f34..0000000 --- a/example/template.txt +++ /dev/null @@ -1 +0,0 @@ -Worked? ${message} diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/AbstractEcsTaskCommandOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/AbstractEcsTaskCommandOperator.scala index 1235882..cd31636 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/AbstractEcsTaskCommandOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/AbstractEcsTaskCommandOperator.scala @@ -56,7 +56,7 @@ abstract class AbstractEcsTaskCommandOperator(operatorName: String, context: Ope protected def collectEnvironments(): Map[String, String] = { val vars: PrivilegedVariables = context.getPrivilegedVariables vars.getKeys.asScala.foldLeft(Map.empty[String, String]) { (env, key) => - if (isValidEnvKey(key)) { + if (isValidEnvKey(key)) { env ++ Map(key -> vars.get(key)) } else { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala index 6e91d86..bbc478a 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala @@ -8,6 +8,7 @@ import pro.civitaspo.digdag.plugin.ecs_task.VERSION import pro.civitaspo.digdag.plugin.ecs_task.aws.AwsConf import scala.collection.JavaConverters._ +import scala.util.matching.Regex case class EcsTaskCommandRunner( tmpStorage: TmpStorage, @@ -30,13 +31,17 @@ case class EcsTaskCommandRunner( val taskName: String = params.get("task_name", classOf[String]) val familyPrefix: String = params.get("family_prefix", classOf[String], "") val familySuffix: String = params.get("family_suffix", classOf[String], "") - val familyInfix: String = params.get("family_infix", classOf[String], taskName.replaceAll("\\+", "_").replaceAll("\\^", "_").replaceAll("\\=", "_")) + val familyInfix: String = params.get("family_infix", classOf[String], taskName) val family: String = params.get("family", classOf[String], s"$familyPrefix$familyInfix$familySuffix") + val ipcMode: Optional[String] = params.getOptional("ipc_mode", classOf[String]) val memory: Optional[String] = params.getOptional("memory", classOf[String]) val networkMode: Optional[String] = params.getOptional("network_mode", classOf[String]) + val pidMode: Optional[String] = params.getOptional("pid_mode", classOf[String]) // NOTE: Use `ecs_task.run>`'s one. // val placementConstraints: Seq[TaskDefinitionPlacementConstraint] = params.parseListOrGetEmpty("placement_constraints", classOf[Config]).asScala.map(configureTaskDefinitionPlacementConstraint).map(_.get) val requiresCompatibilities: Seq[String] = params.parseListOrGetEmpty("requires_compatibilities", classOf[String]).asScala // Valid Values: EC2 | FARGATE + // NOTE: Use the same var as `ecs_task.run>`'s one. + // val tags: Optional[Config] = params.getOptionalNested("tags") val taskRoleArn: Optional[String] = params.getOptional("task_role_arn", classOf[String]) val volumes: Seq[Config] = params.parseListOrGetEmpty("volumes", classOf[Config]).asScala @@ -69,12 +74,13 @@ case class EcsTaskCommandRunner( // NOTE: If you set it by container level, use the `overrides` option. // val memoryReservation: Optional[Int] = params.getOptional("memory_reservation", classOf[Int]) val mountPoints: Seq[Config] = params.parseListOrGetEmpty("mount_points", classOf[Config]).asScala - val containerName: String = params.get("container_name", classOf[String], family) + val containerName: Optional[String] = params.getOptional("container_name", classOf[String]) val portMappings: Seq[Config] = params.parseListOrGetEmpty("port_mappings", classOf[Config]).asScala val privileged: Optional[Boolean] = params.getOptional("privileged", classOf[Boolean]) val pseudoTerminal: Optional[Boolean] = params.getOptional("pseudo_terminal", classOf[Boolean]) val readonlyRootFilesystem: Optional[Boolean] = params.getOptional("readonly_root_filesystem", classOf[Boolean]) val repositoryCredentials: Optional[Config] = params.getOptionalNested("repository_credentials") + val secrets: Seq[Config] = params.parseListOrGetEmpty("secrets", classOf[Config]).asScala val systemControls: Seq[Config] = params.parseListOrGetEmpty("system_controls", classOf[Config]).asScala val ulimits: Seq[Config] = params.parseListOrGetEmpty("ulimits", classOf[Config]).asScala val user: Optional[String] = params.getOptional("user", classOf[String]) @@ -92,12 +98,15 @@ case class EcsTaskCommandRunner( val placementStrategy: Seq[Config] = params.parseListOrGetEmpty("placement_strategy", classOf[Config]).asScala val platformVersion: Optional[String] = params.getOptional("platform_version", classOf[String]) val startedBy: Optional[String] = params.getOptional("started_by", classOf[String]) + val tags: Optional[Config] = params.getOptionalNested("tags") // NOTE: Generated by ecs_task.register operator // val taskDefinition: String = params.get("task_definition", classOf[String]) // For ecs_task.wait operator val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("15m")) + lazy val normalizedFamily: String = normalizeFamily(family) + def run(): TaskResult = { val subTasks: Config = cf.create() subTasks.setNested("+register", ecsTaskRegisterSubTask()) @@ -130,6 +139,7 @@ case class EcsTaskCommandRunner( subTask.set("placement_strategy", placementStrategy.asJava) subTask.setOptional("platform_version", platformVersion) subTask.setOptional("started_by", startedBy) + subTask.setOptional("tags", tags) subTask.set("task_definition", "${last_ecs_task_register.task_definition_arn}") } } @@ -171,10 +181,13 @@ case class EcsTaskCommandRunner( c.set("container_definitions", (Seq(containerDefinitionConfig()) ++ sidecars).asJava) c.setOptional("cpu", cpu) c.setOptional("execution_role_arn", executionRoleArn) - c.set("family", family) + c.set("family", normalizedFamily) + c.setOptional("ipc_mode", ipcMode) c.setOptional("memory", memory) c.setOptional("network_mode", networkMode) + c.setOptional("pid_mode", pidMode) c.set("requires_compatibilities", requiresCompatibilities.asJava) + c.setOptional("tags", tags) c.setOptional("task_role_arn", taskRoleArn) c.set("volumes", volumes.asJava) @@ -203,12 +216,13 @@ case class EcsTaskCommandRunner( c.setOptional("linux_parameters", linuxParameters) c.setOptional("log_configuration", logConfiguration) c.set("mount_points", mountPoints.asJava) - c.set("name", containerName) + c.set("name", containerName.or(normalizedFamily)) c.set("port_mappings", portMappings.asJava) c.setOptional("privileged", privileged) c.setOptional("pseudo_terminal", pseudoTerminal) c.setOptional("readonly_root_filesystem", readonlyRootFilesystem) c.setOptional("repository_credentials", repositoryCredentials) + c.set("secrets", secrets.asJava) c.set("system_controls", systemControls.asJava) c.set("ulimits", ulimits.asJava) c.setOptional("user", user) @@ -218,4 +232,12 @@ case class EcsTaskCommandRunner( c } + protected def normalizeFamily(family: String): String = { + // ref. https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html#ECS-RegisterTaskDefinition-request-family + val validLetterRegex: Regex = "[a-zA-Z0-9_-]".r + val after: String = family.map { case l @ validLetterRegex() => l; case _ => "_" }.mkString + if (!family.contentEquals(after)) logger.warn(s"Normalized family: $family -> $after") + after + } + } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala index 12b12eb..f52b9bd 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala @@ -2,6 +2,6 @@ package pro.civitaspo.digdag.plugin package object ecs_task { - val VERSION: String = "0.0.6" + val VERSION: String = "0.0.7" } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/register/EcsTaskRegisterOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/register/EcsTaskRegisterOperator.scala index 930fa74..6213b30 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/register/EcsTaskRegisterOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/register/EcsTaskRegisterOperator.scala @@ -15,7 +15,9 @@ import com.amazonaws.services.ecs.model.{ RegisterTaskDefinitionRequest, RegisterTaskDefinitionResult, RepositoryCredentials, + Secret, SystemControl, + Tag, TaskDefinitionPlacementConstraint, Tmpfs, Ulimit, @@ -43,12 +45,15 @@ class EcsTaskRegisterOperator(operatorName: String, context: OperatorContext, sy val cpu: Optional[String] = c.getOptional("cpu", classOf[String]) val executionRoleArn: Optional[String] = c.getOptional("execution_role_arn", classOf[String]) val family: String = c.get("family", classOf[String]) + val ipcMode: Optional[String] = c.getOptional("ipc_mode", classOf[String]) val memory: Optional[String] = c.getOptional("memory", classOf[String]) val networkMode: Optional[String] = c.getOptional("network_mode", classOf[String]) + val pidMode: Optional[String] = c.getOptional("pid_mode", classOf[String]) val placementConstraints: Seq[TaskDefinitionPlacementConstraint] = c.parseListOrGetEmpty("placement_constraints", classOf[Config]).asScala.map(configureTaskDefinitionPlacementConstraint).map(_.get) val requiresCompatibilities: Seq[String] = c.parseListOrGetEmpty("requires_compatibilities", classOf[String]).asScala // Valid Values: EC2 | FARGATE + val tags: Seq[Tag] = c.getMapOrEmpty("tags", classOf[String], classOf[String]).asScala.map(t => new Tag().withKey(t._1).withValue(t._2)).toSeq val taskRoleArn: Optional[String] = c.getOptional("task_role_arn", classOf[String]) val volumes: Seq[Volume] = c.parseListOrGetEmpty("volumes", classOf[Config]).asScala.map(configureVolume).map(_.get) @@ -56,10 +61,13 @@ class EcsTaskRegisterOperator(operatorName: String, context: OperatorContext, sy if (cpu.isPresent) req.setCpu(cpu.get) if (executionRoleArn.isPresent) req.setExecutionRoleArn(executionRoleArn.get) req.setFamily(family) + if (ipcMode.isPresent) throw new UnsupportedOperationException("Currently aws-java-sdk does not support ipc_mode.") if (memory.isPresent) req.setMemory(memory.get) if (networkMode.isPresent) req.setNetworkMode(networkMode.get) + if (pidMode.isPresent) throw new UnsupportedOperationException("Currently aws-java-sdk does not support pid_mode.") if (placementConstraints.nonEmpty) req.setPlacementConstraints(placementConstraints.asJava) if (requiresCompatibilities.nonEmpty) req.setRequiresCompatibilities(requiresCompatibilities.asJava) + if (tags.nonEmpty) req.setTags(tags.asJava) if (taskRoleArn.isPresent) req.setTaskRoleArn(taskRoleArn.get) if (volumes.nonEmpty) req.setVolumes(volumes.asJava) @@ -104,6 +112,7 @@ class EcsTaskRegisterOperator(operatorName: String, context: OperatorContext, sy val pseudoTerminal: Optional[Boolean] = c.getOptional("pseudo_terminal", classOf[Boolean]) val readonlyRootFilesystem: Optional[Boolean] = c.getOptional("readonly_root_filesystem", classOf[Boolean]) val repositoryCredentials: Optional[RepositoryCredentials] = configureRepositoryCredentials(c.parseNestedOrGetEmpty("repository_credentials")) + val secrets: Seq[Secret] = c.parseListOrGetEmpty("secrets", classOf[Config]).asScala.map(configureSecrets).map(_.get) val systemControls: Seq[SystemControl] = c.parseListOrGetEmpty("system_controls", classOf[Config]).asScala.map(configureSystemControl).map(_.get) val ulimits: Seq[Ulimit] = c.parseListOrGetEmpty("ulimits", classOf[Config]).asScala.map(configureUlimit).map(_.get) val user: Optional[String] = c.getOptional("user", classOf[String]) @@ -138,6 +147,7 @@ class EcsTaskRegisterOperator(operatorName: String, context: OperatorContext, sy if (pseudoTerminal.isPresent) cd.setPseudoTerminal(pseudoTerminal.get) if (readonlyRootFilesystem.isPresent) cd.setReadonlyRootFilesystem(readonlyRootFilesystem.get) if (repositoryCredentials.isPresent) cd.setRepositoryCredentials(repositoryCredentials.get) + if (secrets.nonEmpty) cd.setSecrets(secrets.asJava) if (systemControls.nonEmpty) cd.setSystemControls(systemControls.asJava) if (ulimits.nonEmpty) cd.setUlimits(ulimits.asJava) if (user.isPresent) cd.setUser(user.get) @@ -282,6 +292,19 @@ class EcsTaskRegisterOperator(operatorName: String, context: OperatorContext, sy Optional.of(rc) } + protected def configureSecrets(c: Config): Optional[Secret] = { + if (c.isEmpty) return Optional.absent() + + val name: String = c.get("name", classOf[String]) + val valueFrom: String = c.get("value_from", classOf[String]) + + val s: Secret = new Secret() + s.setName(name) + s.setValueFrom(valueFrom) + + Optional.of(s) + } + protected def configureSystemControl(c: Config): Optional[SystemControl] = { if (c.isEmpty) return Optional.absent() diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/run/EcsTaskRunInternalOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/run/EcsTaskRunInternalOperator.scala index 070dd34..2f0078d 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/run/EcsTaskRunInternalOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/run/EcsTaskRunInternalOperator.scala @@ -9,6 +9,7 @@ import com.amazonaws.services.ecs.model.{ PlacementStrategy, RunTaskRequest, RunTaskResult, + Tag, TaskOverride } import com.google.common.base.Optional @@ -38,6 +39,7 @@ class EcsTaskRunInternalOperator(operatorName: String, context: OperatorContext, params.parseListOrGetEmpty("placement_strategy", classOf[Config]).asScala.map(configurePlacementStrategy).map(_.get) val platformVersion: Optional[String] = params.getOptional("platform_version", classOf[String]) val startedBy: Optional[String] = params.getOptional("started_by", classOf[String]) + val tags: Seq[Tag] = params.getMapOrEmpty("tags", classOf[String], classOf[String]).asScala.map(t => new Tag().withKey(t._1).withValue(t._2)).toSeq val taskDefinition: String = params.get("task_definition", classOf[String]) // generated by ecs_task.register> operator if not set. val runRequestRetryInterval: DurationParam = params.get("run_request_retry_interval", classOf[DurationParam], DurationParam.parse("5s")) @@ -56,6 +58,7 @@ class EcsTaskRunInternalOperator(operatorName: String, context: OperatorContext, if (placementStrategy.nonEmpty) req.setPlacementStrategy(placementStrategy.asJava) if (platformVersion.isPresent) req.setPlatformVersion(platformVersion.get) if (startedBy.isPresent) req.setStartedBy(startedBy.get) + if (tags.nonEmpty) req.setTags(tags.asJava) req.setTaskDefinition(taskDefinition) req