diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a285cd..88a2b0d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +0.0.12 (2019-05-23) +=================== +* [Enhancement] Follow latest python runner script used by `ecs_task.py>`. The changes resolve the same issues that the bellow p-rs resolve. + * [Support type hints for Python3 on py> operator by chezou · Pull Request \#905 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/905) + * [Fix default argument check on py> operator by chezou · Pull Request \#913 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/913) + * [Fix digdag\.env\.add\_subtask for python3 by sonots · Pull Request \#972 · treasure\-data/digdag](https://github.com/treasure-data/digdag/pull/972) + 0.0.11 (2019-01-24) =================== * [Enhancement] `ecs_task.wait>` operator supports changeable interval and exponential backoff storategy. @Mulyu++ diff --git a/README.md b/README.md index c7eb099..89f2db1 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.11 + - pro.civitaspo:digdag-operator-ecs_task:0.0.12 ecs_task: auth_method: profile tmp_storage: diff --git a/build.gradle b/build.gradle index a62ac09..88f1719 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.11' +version = '0.0.12' def digdagVersion = '0.9.31' def scalaSemanticVersion = "2.12.6" diff --git a/example/ecs_task.py/echo.py b/example/ecs_task.py/echo.py index 3eb2ed6..7980ad4 100644 --- a/example/ecs_task.py/echo.py +++ b/example/ecs_task.py/echo.py @@ -1,6 +1,6 @@ import yaml -def echo(message): +def echo(message: str): print(yaml.dump(message)) diff --git a/example/example.dig b/example/example.dig index 3d6b616..010c6d0 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.11 + - pro.civitaspo:digdag-operator-ecs_task:0.0.12 ecs_task: auth_method: profile tmp_storage: diff --git a/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/py/runner.py b/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/py/runner.py index edc206c..e05039a 100644 --- a/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/py/runner.py +++ b/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/py/runner.py @@ -1,14 +1,14 @@ ######### -# Copy from https://raw.githubusercontent.com/treasure-data/digdag/52ff276bcc0aed23bf5a0df6c7a7c7b155c22d53/digdag-standards/src/main/resources/digdag/standards/py/runner.py +# Copy from https://raw.githubusercontent.com/treasure-data/digdag/6c81976334b78b3b776e357c7e9244f6bbe2711a/digdag-standards/src/main/resources/digdag/standards/py/runner.py # Then, customize a bit about error handling ######### -import collections +import sys +import os +import json import imp import inspect -import json -import os -import sys +import collections import traceback command = sys.argv[1] @@ -31,7 +31,6 @@ # fake digdag module already imported digdag_mod = sys.modules['digdag'] = imp.new_module('digdag') - class Env(object): def __init__(self, digdag_env_mod): self.params = digdag_env_mod.params @@ -60,7 +59,7 @@ def add_subtask(self, function=None, **params): command = ".".join([function.im_class.__module__, function.im_class.__name__, function.__name__]) else: # Python 3 - command = ".".join([function.__module__, function.__name__]) + command = ".".join([function.__module__, function.__qualname__]) config = params config["py>"] = command else: @@ -76,13 +75,12 @@ def add_subtask(self, function=None, **params): self.subtask_config["+subtask" + str(self.subtask_index)] = config self.subtask_index += 1 - digdag_mod.env = Env(digdag_env_mod) +import digdag # add the archive path to improt path sys.path.append(os.path.abspath(os.getcwd())) - def digdag_inspect_command(command): # package.name.Class.method fragments = command.split(".") @@ -112,12 +110,17 @@ def digdag_inspect_command(command): else: return (callable_type, None) - def digdag_inspect_arguments(callable_type, exclude_self, params): if callable_type == object.__init__: # object.__init__ accepts *varargs and **keywords but it throws exception return {} - spec = inspect.getargspec(callable_type) + if hasattr(inspect, 'getfullargspec'): # Python3 + spec = inspect.getfullargspec(callable_type) + keywords_ = spec.varkw + else: # Python 2 + spec = inspect.getargspec(callable_type) + keywords_ = spec.keywords + args = {} for idx, key in enumerate(spec.args): if exclude_self and idx == 0: @@ -125,7 +128,7 @@ def digdag_inspect_arguments(callable_type, exclude_self, params): if key in params: args[key] = params[key] else: - if spec.defaults is None or len(spec.defaults) < idx: + if spec.defaults is None or idx < len(spec.args) - len(spec.defaults): # this keyword is required but not in params. raising an error. if hasattr(callable_type, '__qualname__'): # Python 3 @@ -136,13 +139,13 @@ def digdag_inspect_arguments(callable_type, exclude_self, params): else: name = callable_type.__name__ raise TypeError("Method '%s' requires parameter '%s' but not set" % (name, key)) - if spec.keywords: + if keywords_: # above code was only for validation return params else: return args - +##### begin: Custom Error Handling Code ##### status_params = {} def with_error_handler(func, **func_args): try: @@ -154,6 +157,7 @@ def with_error_handler(func, **func_args): status_params['error_message'] = str(e) status_params['error_stacktrace'] = traceback.format_exc() print('message: {}, stacktrace: {}'.format(str(e), traceback.format_exc())) +##### end: Custom Error Handling Code ##### callable_type, method_name = digdag_inspect_command(command) @@ -163,11 +167,13 @@ def with_error_handler(func, **func_args): method = getattr(instance, method_name) method_args = digdag_inspect_arguments(method, True, params) + # Replace the below code to customize error hadling # result = method(**method_args) result = with_error_handler(method, **method_args) else: args = digdag_inspect_arguments(callable_type, False, params) + # Replace the below code to customize error hadling # result = callable_type(**args) result = with_error_handler(callable_type, **args) @@ -175,9 +181,10 @@ def with_error_handler(func, **func_args): 'subtask_config': digdag_env.subtask_config, 'export_params': digdag_env.export_params, 'store_params': digdag_env.store_params, + #'state_params': digdag_env.state_params, # only for retrying 'status_params': status_params, # only for ecs_task.command_result_internal - # 'state_params': digdag_env.state_params, # only for retrying } with open(out_file, 'w') as f: json.dump(out, f) + diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala index ff1c5c5..6cd8fac 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala @@ -2,6 +2,6 @@ package pro.civitaspo.digdag.plugin package object ecs_task { - val VERSION: String = "0.0.11" + val VERSION: String = "0.0.12" }