From b03a453bc86392175c17762a18f3e0a38e7a39a0 Mon Sep 17 00:00:00 2001 From: Romain Cledat Date: Tue, 20 Aug 2024 00:56:13 -0700 Subject: [PATCH] More WIP --- metaflow/__init__.py | 2 + metaflow/cli.py | 42 ++- metaflow/decorators.py | 62 +++- metaflow/flowspec.py | 140 +++++++- metaflow/package.py | 2 +- metaflow/parameters.py | 16 +- metaflow/plugins/aws/batch/batch_decorator.py | 4 +- .../kubernetes/kubernetes_decorator.py | 4 +- metaflow/plugins/pypi/conda_decorator.py | 12 +- metaflow/plugins/timeout_decorator.py | 4 +- metaflow/runner/click_api.py | 4 +- metaflow/runtime.py | 175 ++++++---- metaflow/user_configs.py | 310 +++++++++++++----- metaflow/util.py | 11 + 14 files changed, 590 insertions(+), 198 deletions(-) diff --git a/metaflow/__init__.py b/metaflow/__init__.py index c676a5c7a9b..b032ca2bc3f 100644 --- a/metaflow/__init__.py +++ b/metaflow/__init__.py @@ -105,6 +105,8 @@ class and related decorators. from .parameters import Parameter, JSONTypeClass +from .user_configs import Config, FlowConfig, config_expr, eval_config + JSONType = JSONTypeClass() # data layer diff --git a/metaflow/cli.py b/metaflow/cli.py index 29c6701cc71..0bf7a46e365 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -1,4 +1,5 @@ import inspect +import json import os import sys import traceback @@ -46,6 +47,7 @@ resolve_identity, write_latest_run_id, ) +from .user_configs import LocalFileInput ERASE_TO_EOL = "\033[K" HIGHLIGHT = "red" @@ -522,7 +524,7 @@ def init(obj, run_id=None, task_id=None, tags=None, **kwargs): obj.monitor, run_id=run_id, ) - obj.flow._set_constants(obj.graph, kwargs) + obj.flow._set_constants(obj.graph, kwargs, obj.config_options) runtime.persist_constants(task_id=task_id) @@ -771,7 +773,7 @@ def run( write_latest_run_id(obj, runtime.run_id) write_file(run_id_file, runtime.run_id) - obj.flow._set_constants(obj.graph, kwargs) + obj.flow._set_constants(obj.graph, kwargs, obj.config_options) runtime.print_workflow_info() runtime.persist_constants() write_file( @@ -783,6 +785,7 @@ def run( "/".join((obj.flow.name, runtime.run_id)), ), ) + runtime.execute() @@ -842,7 +845,7 @@ def version(obj): @tracing.cli_entrypoint("cli/start") -@decorators.add_decorator_options +@decorators.add_decorator_and_config_options @click.command( cls=click.CommandCollection, sources=[cli] + plugins.get_plugin_cli(), @@ -910,6 +913,15 @@ def version(obj): type=click.Choice(MONITOR_SIDECARS), help="Monitoring backend type", ) +@click.option( + "--local-info-file", + type=LocalFileInput(exists=True, readable=True, dir_okay=False, resolve_path=True), + required=False, + default=None, + help="A filename containing a subset of the INFO file. Internal use only.", + hidden=True, + is_eager=True, +) @click.pass_context def start( ctx, @@ -923,7 +935,8 @@ def start( pylint=None, event_logger=None, monitor=None, - **deco_options + local_info_file=None, + **deco_and_config_options ): global echo if quiet: @@ -940,11 +953,17 @@ def start( echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False) echo(" for *%s*" % resolve_identity(), fg="magenta") + # At this point, we are able to resolve the user-configuration options so we can + # process all those decorators that the user added that will modify the flow based + # on those configurations. It is important to do this as early as possible since it + # actually modifies the flow itself + ctx.obj.flow = ctx.obj.flow._process_config_funcs(deco_and_config_options) + cli_args._set_top_kwargs(ctx.params) ctx.obj.echo = echo ctx.obj.echo_always = echo_always ctx.obj.is_quiet = quiet - ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__) + ctx.obj.graph = ctx.obj.flow._graph ctx.obj.logger = logger ctx.obj.check = _check ctx.obj.pylint = pylint @@ -996,6 +1015,19 @@ def start( ctx.obj.monitor, ) + ctx.obj.config_options = { + k: v + for k, v in deco_and_config_options.items() + if k in ctx.command.config_options + } + deco_options = { + k: v + for k, v in deco_and_config_options.items() + if k not in ctx.command.config_options + } + + decorators._resolve_configs(ctx.obj.flow) + # It is important to initialize flow decorators early as some of the # things they provide may be used by some of the objects initialized after. decorators._init_flow_decorators( diff --git a/metaflow/decorators.py b/metaflow/decorators.py index 262102551e4..2ac396701d9 100644 --- a/metaflow/decorators.py +++ b/metaflow/decorators.py @@ -12,6 +12,7 @@ ) from .parameters import current_flow +from .user_configs import DelayEvaluator from metaflow._vendor import click @@ -123,6 +124,30 @@ def __init__(self, attributes=None, statically_defined=False): else: raise InvalidDecoratorAttribute(self.name, k, self.defaults) + def resolve_configs(self): + """ + Resolve any configuration options that may be set in the decorator's attributes + """ + + def _resolve_delayed_evaluator(v): + if isinstance(v, DelayEvaluator): + return v() + if isinstance(v, dict): + return { + _resolve_delayed_evaluator(k): _resolve_delayed_evaluator(v) + for k, v in v.items() + } + if isinstance(v, list): + return [_resolve_delayed_evaluator(x) for x in v] + if isinstance(v, tuple): + return tuple(_resolve_delayed_evaluator(x) for x in v) + if isinstance(v, set): + return {_resolve_delayed_evaluator(x) for x in v} + return v + + for k, v in self.attributes.items(): + self.attributes[k] = _resolve_delayed_evaluator(v) + @classmethod def _parse_decorator_spec(cls, deco_spec): if len(deco_spec) == 0: @@ -202,11 +227,28 @@ def get_top_level_options(self): # compare this to parameters.add_custom_parameters -def add_decorator_options(cmd): - seen = {} +def add_decorator_and_config_options(cmd): + config_seen = {} flow_cls = getattr(current_flow, "flow_cls", None) if flow_cls is None: return cmd + + parameters = [p for _, p in flow_cls._get_parameters() if p.IS_FLOW_PARAMETER] + # Add configuration options + for arg in parameters[::-1]: + kwargs = arg.option_kwargs(False) + if arg.name in config_seen: + msg = ( + "Multiple configurations use the same name '%s'. Please change the " + "names of some of your configurations" % arg.name + ) + raise MetaflowException(msg) + config_seen[arg.name] = arg + cmd.params.insert(0, click.Option(("--" + arg.name,), **kwargs)) + + cmd.config_options = set(config_seen.keys()) + seen = {} + # Add decorator options for deco in flow_decorators(flow_cls): for option, kwargs in deco.options.items(): if option in seen: @@ -217,6 +259,12 @@ def add_decorator_options(cmd): % (deco.name, option, seen[option]) ) raise MetaflowInternalError(msg) + elif option in config_seen: + msg = ( + "Flow decorator '%s' uses an option '%s' which is also " + "used by a configuration. Please change the name of the " + "configuration" % (deco.name, option) + ) else: seen[option] = deco.name cmd.params.insert(0, click.Option(("--" + option,), **kwargs)) @@ -510,6 +558,16 @@ def _attach_decorators_to_step(step, decospecs): step.decorators.append(deco) +def _resolve_configs(flow): + # We get the datastore for the _parameters step which can contain + for decorators in flow._flow_decorators.values(): + for deco in decorators: + deco.resolve_configs() + for step in flow: + for deco in step.decorators: + deco.resolve_configs() + + def _init_flow_decorators( flow, graph, environment, flow_datastore, metadata, logger, echo, deco_options ): diff --git a/metaflow/flowspec.py b/metaflow/flowspec.py index 6f277f4f1a9..29300ef79cf 100644 --- a/metaflow/flowspec.py +++ b/metaflow/flowspec.py @@ -6,7 +6,7 @@ from itertools import islice from types import FunctionType, MethodType -from typing import Any, Callable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Callable, Generator, List, Optional, Tuple from . import cmd_with_io, parameters from .parameters import DelayedEvaluationParameter, Parameter @@ -17,6 +17,7 @@ ) from .graph import FlowGraph from .unbounded_foreach import UnboundedForeachInput +from .user_configs import ConfigInput, ConfigValue from .util import to_pod from .metaflow_config import INCLUDE_FOREACH_STACK, MAXIMUM_FOREACH_VALUE_CHARS @@ -67,10 +68,66 @@ def __new__(cls, name, bases, dct): # This makes sure to give _flow_decorators to each # child class (and not share it with the FlowSpec base # class). This is important to not make a "global" - # _flow_decorators + # _flow_decorators. Same deal with user configurations f._flow_decorators = {} + f._user_configs = {} + + # We also cache parameter names to avoid having to recompute what is a parameter + # in the dir of a flow + f._cached_parameters = None + + # Finally attach all functions that need to be evaluated once user configurations + # are available + f._config_funcs = [] + return f + @property + def configs(cls) -> Generator[Tuple[str, "ConfigValue"], None, None]: + """ + Iterate over all user configurations in this flow + + Use this to parameterize your flow based on configuration. As an example: + ``` + def parametrize(flow): + val = next(flow.configs)[1].steps.start.cpu + flow.start = environment(vars={'mycpu': val})(flow.start) + return flow + + @parametrize + class TestFlow(FlowSpec): + config = Config('myconfig.json') + + @step + def start(self): + pass + ``` + can be used to add an environment decorator to the `start` step. + + Yields + ------ + Tuple[str, ConfigValue] + Iterates over the configurations of the flow + """ + # When configs are parsed, they are loaded in _user_configs + for name, value in cls._user_configs.items(): + yield name, ConfigValue(value) + + @property + def steps(cls) -> Generator[Tuple[str, Any], None, None]: + """ + Iterate over all the steps in this flow + + Yields + ------ + Tuple[str, Any] + A tuple with the step name and the step itself + """ + for var in dir(cls): + potential_step = getattr(cls, var) + if callable(potential_step) and hasattr(potential_step, "is_step"): + yield var, potential_step + class FlowSpec(metaclass=_FlowSpecMeta): """ @@ -92,6 +149,9 @@ class FlowSpec(metaclass=_FlowSpecMeta): "_cached_input", "_graph", "_flow_decorators", + "_user_configs", + "_cached_parameters", + "_config_funcs", "_steps", "index", "input", @@ -144,14 +204,7 @@ def script_name(self) -> str: fname = fname[:-1] return os.path.basename(fname) - def _set_constants(self, graph, kwargs): - from metaflow.decorators import ( - flow_decorators, - ) # To prevent circular dependency - - # Persist values for parameters and other constants (class level variables) - # only once. This method is called before persist_constants is called to - # persist all values set using setattr + def _check_parameters(self): seen = set() for var, param in self._get_parameters(): norm = param.name.lower() @@ -162,13 +215,69 @@ def _set_constants(self, graph, kwargs): "case-insensitive." % param.name ) seen.add(norm) - seen.clear() + + def _process_config_funcs(self, config_options): + current_cls = self.__class__ + + # Fast path for no user configurations + if not self._config_funcs: + return self + + # We need to convert all the user configurations from DelayedEvaluationParameters + # to actual values so they can be used as is in the config functions. + + # We then reset them to be proper parameters so they can be re-evaluated in + # _set_constants + to_reset_params = [] + self._check_parameters() + for var, param in self._get_parameters(): + if not param.IS_FLOW_PARAMETER: + continue + to_reset_params.append((var, param)) + val = config_options[param.name.replace("-", "_").lower()] + if isinstance(val, DelayedEvaluationParameter): + val = val() + setattr(current_cls, var, val) + + # Run all the functions. They will now be able to access the configuration + # values directly from the class + for func in self._config_funcs: + current_cls = func(current_cls) + + # Reset all configs that were already present in the class. + # TODO: This means that users can't override configs directly. Not sure if this + # is a pattern we want to support + for var, param in to_reset_params: + setattr(current_cls, var, param) + + # We reset cached_parameters on the very off chance that the user added + # more configurations based on the configuration + current_cls._cached_parameters = None + + # Set the current flow class we are in (the one we just created) + parameters.replace_flow_context(current_cls) + return current_cls(use_cli=False) + + def _set_constants(self, graph, kwargs, config_options): + from metaflow.decorators import ( + flow_decorators, + ) # To prevent circular dependency + + # Persist values for parameters and other constants (class level variables) + # only once. This method is called before persist_constants is called to + # persist all values set using setattr + self._check_parameters() + + seen = set() self._success = True parameters_info = [] for var, param in self._get_parameters(): seen.add(var) - val = kwargs[param.name.replace("-", "_").lower()] + if param.IS_FLOW_PARAMETER: + val = config_options[param.name.replace("-", "_").lower()] + else: + val = kwargs[param.name.replace("-", "_").lower()] # Support for delayed evaluation of parameters. if isinstance(val, DelayedEvaluationParameter): val = val() @@ -213,6 +322,11 @@ def _set_constants(self, graph, kwargs): @classmethod def _get_parameters(cls): + if cls._cached_parameters is not None: + for var in cls._cached_parameters: + yield var, getattr(cls, var) + return + build_list = [] for var in dir(cls): if var[0] == "_" or var in cls._NON_PARAMETERS: continue @@ -221,7 +335,9 @@ def _get_parameters(cls): except: continue if isinstance(val, Parameter): + build_list.append(var) yield var, val + cls._cached_parameters = build_list def _set_datastore(self, datastore): self._datastore = datastore diff --git a/metaflow/package.py b/metaflow/package.py index a3431f03e68..26778ea8088 100644 --- a/metaflow/package.py +++ b/metaflow/package.py @@ -152,7 +152,7 @@ def path_tuples(self): def _add_info(self, tar): info = tarfile.TarInfo(os.path.basename(INFO_FILE)) - env = self.environment.get_environment_info(include_ext_info=True) + env = self.environment.get_environment_info(full_info=True) buf = BytesIO() buf.write(json.dumps(env).encode("utf-8")) buf.seek(0) diff --git a/metaflow/parameters.py b/metaflow/parameters.py index 5923765df4f..aebe781b59c 100644 --- a/metaflow/parameters.py +++ b/metaflow/parameters.py @@ -72,6 +72,16 @@ def flow_context(flow_cls): context_proto = None +def replace_flow_context(flow_cls): + """ + Replace the current flow context with a new flow class. This is used + when we change the current flow class after having run user configuration functions + """ + current_flow.flow_cls_stack = current_flow.flow_cls_stack[1:] + current_flow.flow_cls_stack.insert(0, flow_cls) + current_flow.flow_cls = current_flow.flow_cls_stack[0] + + class JSONTypeClass(click.ParamType): name = "JSON" @@ -293,6 +303,8 @@ class MyFlow(FlowSpec): If True, show the default value in the help text. """ + IS_FLOW_PARAMETER = False + def __init__( self, name: str, @@ -433,7 +445,9 @@ def wrapper(cmd): flow_cls = getattr(current_flow, "flow_cls", None) if flow_cls is None: return cmd - parameters = [p for _, p in flow_cls._get_parameters()] + parameters = [ + p for _, p in flow_cls._get_parameters() if not p.IS_FLOW_PARAMETER + ] for arg in parameters[::-1]: kwargs = arg.option_kwargs(deploy_mode) cmd.params.insert(0, click.Option(("--" + arg.name,), **kwargs)) diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 89c63c43f72..f475f19558c 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -134,8 +134,8 @@ class BatchDecorator(StepDecorator): package_sha = None run_time_limit = None - def __init__(self, attributes=None, statically_defined=False): - super(BatchDecorator, self).__init__(attributes, statically_defined) + def resolve_configs(self): + super(BatchDecorator, self).resolve_configs() # If no docker image is explicitly specified, impute a default image. if not self.attributes["image"]: diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 486bd09b1df..9b277806610 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -131,8 +131,8 @@ class KubernetesDecorator(StepDecorator): package_sha = None run_time_limit = None - def __init__(self, attributes=None, statically_defined=False): - super(KubernetesDecorator, self).__init__(attributes, statically_defined) + def resolve_configs(self): + super(KubernetesDecorator, self).resolve_configs() if not self.attributes["namespace"]: self.attributes["namespace"] = KUBERNETES_NAMESPACE diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index 9b1c604a41c..6063a9c4fd2 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -49,8 +49,8 @@ class CondaStepDecorator(StepDecorator): # CONDA_CHANNELS in their environment. For pinning specific packages to specific # conda channels, users can specify channel::package as the package name. - def __init__(self, attributes=None, statically_defined=False): - super(CondaStepDecorator, self).__init__(attributes, statically_defined) + def resolve_configs(self): + super(CondaStepDecorator, self).resolve_configs() # Support legacy 'libraries=' attribute for the decorator. self.attributes["packages"] = { @@ -161,9 +161,7 @@ def runtime_init(self, flow, graph, package, run_id): encoding="utf-8", ) as f: f.write( - json.dumps( - self.environment.get_environment_info(include_ext_info=True) - ) + json.dumps(self.environment.get_environment_info(full_info=True)) ) # Support metaflow extensions. @@ -321,8 +319,8 @@ class CondaFlowDecorator(FlowDecorator): "disabled": None, } - def __init__(self, attributes=None, statically_defined=False): - super(CondaFlowDecorator, self).__init__(attributes, statically_defined) + def resolve_configs(self): + super(CondaFlowDecorator, self).resolve_configs() # Support legacy 'libraries=' attribute for the decorator. self.attributes["packages"] = { diff --git a/metaflow/plugins/timeout_decorator.py b/metaflow/plugins/timeout_decorator.py index e2c04dbcb31..648e318d36a 100644 --- a/metaflow/plugins/timeout_decorator.py +++ b/metaflow/plugins/timeout_decorator.py @@ -37,8 +37,8 @@ class TimeoutDecorator(StepDecorator): name = "timeout" defaults = {"seconds": 0, "minutes": 0, "hours": 0} - def __init__(self, *args, **kwargs): - super(TimeoutDecorator, self).__init__(*args, **kwargs) + def resolve_configs(self): + super().resolve_configs() # Initialize secs in __init__ so other decorators could safely use this # value without worrying about decorator order. # Convert values in attributes to type:int since they can be type:str diff --git a/metaflow/runner/click_api.py b/metaflow/runner/click_api.py index 90569c8b4ad..5dccfbea91d 100644 --- a/metaflow/runner/click_api.py +++ b/metaflow/runner/click_api.py @@ -32,7 +32,7 @@ UUIDParameterType, ) from metaflow._vendor.typeguard import TypeCheckError, check_type -from metaflow.decorators import add_decorator_options +from metaflow.decorators import add_decorator_and_config_options from metaflow.exception import MetaflowException from metaflow.includefile import FilePathClass from metaflow.parameters import JSONTypeClass, flow_context @@ -186,7 +186,7 @@ def from_cli(cls, flow_file: str, cli_collection: Callable) -> Callable: flow_cls = extract_flow_class_from_file(flow_file) flow_parameters = [p for _, p in flow_cls._get_parameters()] with flow_context(flow_cls) as _: - add_decorator_options(cli_collection) + add_decorator_and_config_options(cli_collection) class_dict = {"__module__": "metaflow", "_API_NAME": flow_file} command_groups = cli_collection.sources diff --git a/metaflow/runtime.py b/metaflow/runtime.py index d5fbc0b6837..f9535c5433c 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -6,9 +6,11 @@ """ from __future__ import print_function +import json import os import sys import fcntl +import tempfile import time import subprocess from datetime import datetime @@ -39,6 +41,9 @@ UBF_CONTROL, UBF_TASK, ) + +from .user_configs import ConfigInput, dump_config_values + import metaflow.tracing as tracing MAX_WORKERS = 16 @@ -417,82 +422,95 @@ def execute(self): else: self._queue_push("start", {}) progress_tstamp = time.time() - try: - # main scheduling loop - exception = None - while self._run_queue or self._active_tasks[0] > 0 or self._cloned_tasks: - # 1. are any of the current workers finished? - if self._cloned_tasks: - finished_tasks = self._cloned_tasks - # reset the list of cloned tasks and let poll_workers handle - # the remaining transition - self._cloned_tasks = [] - else: - finished_tasks = list(self._poll_workers()) - # 2. push new tasks triggered by the finished tasks to the queue - self._queue_tasks(finished_tasks) - # 3. if there are available worker slots, pop and start tasks - # from the queue. - self._launch_workers() - - if time.time() - progress_tstamp > PROGRESS_INTERVAL: - progress_tstamp = time.time() - tasks_print = ", ".join( - [ - "%s (%d running; %d done)" % (k, v[0], v[1]) - for k, v in self._active_tasks.items() - if k != 0 and v[0] > 0 - ] - ) - if self._active_tasks[0] == 0: - msg = "No tasks are running." + with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as config_file: + # Configurations are passed through a file to avoid overloading the + # command-line. We only need to create this file once and it can be reused + # for any task launch + config_key, config_value = dump_config_values(self._flow) + if config_value: + json.dump({config_key: config_value}, config_file) + config_file.flush() + self._config_file_name = config_file.name + else: + self._config_file_name = None + try: + # main scheduling loop + exception = None + while ( + self._run_queue or self._active_tasks[0] > 0 or self._cloned_tasks + ): + # 1. are any of the current workers finished? + if self._cloned_tasks: + finished_tasks = self._cloned_tasks + # reset the list of cloned tasks and let poll_workers handle + # the remaining transition + self._cloned_tasks = [] else: - if self._active_tasks[0] == 1: - msg = "1 task is running: " + finished_tasks = list(self._poll_workers()) + # 2. push new tasks triggered by the finished tasks to the queue + self._queue_tasks(finished_tasks) + # 3. if there are available worker slots, pop and start tasks + # from the queue. + self._launch_workers() + + if time.time() - progress_tstamp > PROGRESS_INTERVAL: + progress_tstamp = time.time() + tasks_print = ", ".join( + [ + "%s (%d running; %d done)" % (k, v[0], v[1]) + for k, v in self._active_tasks.items() + if k != 0 and v[0] > 0 + ] + ) + if self._active_tasks[0] == 0: + msg = "No tasks are running." else: - msg = "%d tasks are running: " % self._active_tasks[0] - msg += "%s." % tasks_print + if self._active_tasks[0] == 1: + msg = "1 task is running: " + else: + msg = "%d tasks are running: " % self._active_tasks[0] + msg += "%s." % tasks_print - self._logger(msg, system_msg=True) + self._logger(msg, system_msg=True) - if len(self._run_queue) == 0: - msg = "No tasks are waiting in the queue." - else: - if len(self._run_queue) == 1: - msg = "1 task is waiting in the queue: " + if len(self._run_queue) == 0: + msg = "No tasks are waiting in the queue." else: - msg = "%d tasks are waiting in the queue." % len( - self._run_queue - ) + if len(self._run_queue) == 1: + msg = "1 task is waiting in the queue: " + else: + msg = "%d tasks are waiting in the queue." % len( + self._run_queue + ) - self._logger(msg, system_msg=True) - if len(self._unprocessed_steps) > 0: - if len(self._unprocessed_steps) == 1: - msg = "%s step has not started" % ( - next(iter(self._unprocessed_steps)), - ) - else: - msg = "%d steps have not started: " % len( - self._unprocessed_steps - ) - msg += "%s." % ", ".join(self._unprocessed_steps) self._logger(msg, system_msg=True) - - except KeyboardInterrupt as ex: - self._logger("Workflow interrupted.", system_msg=True, bad=True) - self._killall() - exception = ex - raise - except Exception as ex: - self._logger("Workflow failed.", system_msg=True, bad=True) - self._killall() - exception = ex - raise - finally: - # on finish clean tasks - for step in self._flow: - for deco in step.decorators: - deco.runtime_finished(exception) + if len(self._unprocessed_steps) > 0: + if len(self._unprocessed_steps) == 1: + msg = "%s step has not started" % ( + next(iter(self._unprocessed_steps)), + ) + else: + msg = "%d steps have not started: " % len( + self._unprocessed_steps + ) + msg += "%s." % ", ".join(self._unprocessed_steps) + self._logger(msg, system_msg=True) + + except KeyboardInterrupt as ex: + self._logger("Workflow interrupted.", system_msg=True, bad=True) + self._killall() + exception = ex + raise + except Exception as ex: + self._logger("Workflow failed.", system_msg=True, bad=True) + self._killall() + exception = ex + raise + finally: + # on finish clean tasks + for step in self._flow: + for deco in step.decorators: + deco.runtime_finished(exception) # assert that end was executed and it was successful if ("end", ()) in self._finished: @@ -901,7 +919,7 @@ def _launch_worker(self, task): ) return - worker = Worker(task, self._max_log_size) + worker = Worker(task, self._max_log_size, self._config_file_name) for fd in worker.fds(): self._workers[fd] = worker self._poll.add(fd) @@ -1181,7 +1199,6 @@ def __init__( # Open the output datastore only if the task is not being cloned. if not self._is_cloned: self.new_attempt() - for deco in decos: deco.runtime_task_created( self._ds, @@ -1448,6 +1465,13 @@ def __init__(self, task): for deco in flow_decorators(self.task.flow): self.top_level_options.update(deco.get_top_level_options()) + # We also pass configuration options using the kv. syntax which will cause + # the configuration options to be loaded from the INFO file (or local-info-file + # in the case of the local runtime) + self.top_level_options.update( + {k: ConfigInput.make_key_name(k) for k in self.task.flow._user_configs} + ) + self.commands = ["step"] self.command_args = [self.task.step] self.command_options = { @@ -1498,8 +1522,9 @@ def __str__(self): class Worker(object): - def __init__(self, task, max_logs_size): + def __init__(self, task, max_logs_size, config_file_name): self.task = task + self._config_file_name = config_file_name self._proc = self._launch() if task.retries > task.user_code_retries: @@ -1551,6 +1576,12 @@ def _launch(self): self.task.user_code_retries, self.task.ubf_context, ) + + # Add user configurations using a file to avoid using up too much space on the + # command line + if self._config_file_name: + args.top_level_options["local-info-file"] = self._config_file_name + # Pass configuration options env.update(args.get_env()) env["PYTHONUNBUFFERED"] = "x" tracing.inject_tracing_vars(env) diff --git a/metaflow/user_configs.py b/metaflow/user_configs.py index 1570af25b55..3de376a8f19 100644 --- a/metaflow/user_configs.py +++ b/metaflow/user_configs.py @@ -1,15 +1,16 @@ import json import os -from typing import Any, Dict, Optional, Union, TYPE_CHECKING +from typing import Any, Callable, Dict, Optional, Union, TYPE_CHECKING from metaflow import INFO_FILE from metaflow._vendor import click -from .exception import MetaflowException +from .exception import MetaflowException, MetaflowInternalError from .parameters import ( DelayedEvaluationParameter, Parameter, + ParameterContext, current_flow, ) import functools @@ -37,21 +38,23 @@ # return tracefunc_closure -def dump_config_values(flow: FlowSpec): - if hasattr(flow, "_user_configs"): +def dump_config_values(flow: "FlowSpec"): + if flow._user_configs: return "user_configs", flow._user_configs return None, None -def load_config_values() -> Optional[Dict[str, Any]]: +def load_config_values(info_file: Optional[str] = None) -> Optional[Dict[str, Any]]: + if info_file is None: + info_file = INFO_FILE try: - with open(INFO_FILE, encoding="utf-8") as contents: + with open(info_file, encoding="utf-8") as contents: return json.load(contents).get("user_configs", {}) except IOError: return None -class ConfigValue(object): +class ConfigValue: # Thin wrapper to allow configuration values to be accessed using a "." notation # as well as a [] notation. @@ -61,21 +64,20 @@ def __init__(self, data: Dict[str, Any]): for key, value in data.items(): if isinstance(value, dict): value = ConfigValue(value) - elif isinstance(value, list): - value = [ConfigValue(v) for v in value] setattr(self, key, value) def __getitem__(self, key): value = self._data[key] if isinstance(value, dict): value = ConfigValue(value) - elif isinstance(value, list): - value = [ConfigValue(v) for v in value] return value def __repr__(self): return repr(self._data) + def __str__(self): + return json.dumps(self._data) + class ConfigInput(click.ParamType): name = "ConfigInput" @@ -88,58 +90,43 @@ class ConfigInput(click.ParamType): # (ie: even if Runner is evoked in that task, we won't "share" this global value's # usage). loaded_configs = None # type: Optional[Dict[str, Dict[str, Any]]] + info_file = None # type: Optional[str] - def __init__(self): - self._flow_cls = getattr(current_flow, "flow_cls", None) - if self._flow_cls is None: - raise MetaflowException("ConfigInput can only be used inside a flow") - if not hasattr(self._flow_cls, "_user_configs"): - self._flow_cls._user_configs = {} + def __init__(self, parser: Optional[Callable[[str], Dict[str, Any]]] = None): + self._parser = parser @staticmethod - def _make_key_name(name: str) -> str: + def make_key_name(name: str) -> str: return "kv." + name.lower() + @classmethod + def set_info_file(cls, info_file: str): + cls.info_file = info_file + @classmethod def get_config(cls, config_name: str) -> Optional[Dict[str, Any]]: if cls.loaded_configs is None: - all_configs = load_config_values() + all_configs = load_config_values(cls.info_file) if all_configs is None: raise MetaflowException( "Could not load expected configuration values " - "the INFO file. This is a Metaflow bug. Please contact support." + "from the INFO file. This is a Metaflow bug. Please contact support." ) cls.loaded_configs = all_configs return cls.loaded_configs.get(config_name, None) + # @tracefunc def convert(self, value, param, ctx): + flow_cls = getattr(current_flow, "flow_cls", None) + if flow_cls is None: + # We are not executing inside a flow (ie: not the CLI) + raise MetaflowException("Config object can only be used in a FlowSpec") + # Click can call convert multiple times, so we need to make sure to only # convert once. - if isinstance(value, (ConfigValue, DelayedEvaluationParameter)): + if isinstance(value, ConfigValue): return value - # There are two paths we need to worry about: - # - Scenario A: deploying to a scheduler - # A.1 In this case, when deploying (using `step-functions create` for example), - # the value passed to click (or the default value) will be converted and we - # will: - # - store the configuration in the flow object under _user_configs (so that it - # can later be dumped to the INFO file when packaging) - # - return a DelayedEvaluationParameter object so that when the scheduler - # evaluates it (with return_str set to True), it gets back the *string* - # kv. which indicates that this - # configuration should be fetched from INFO - # A.2 When the scheduler runs the flow, the value returned in A.1 (the kv. - # string) will be passed to convert again. This time, we directly return a - # ConfigValue after having fetched/loaded the configuration from INFO. - # - # - Scenario B: running with the native Runtime - # The value passed in will be similarly stored under _user_configs. We also - # return a DelayedEvaluationParameter object but when the _set_constants in - # the runtime calls it, it calls it with return_str set to False and it will - # return a ConfigValue directly which can then be persisted in the artifact - # store. - # The value we get in to convert can be: # - a dictionary # - a path to a YAML or JSON file @@ -149,22 +136,11 @@ def convert(self, value, param, ctx): # can access it when packaging to store it in the INFO file. The config itself # will be stored as regular artifacts (the ConfigValue object basically) - def _delay_eval(name: str, value: ConfigValue, return_str=False): - if return_str: - # Scenario A.1 when deploy_time_eval is called by the scheduler - # (or, in some cases, some schedulers directly identify the - # DelayedEvaluationParameter value and call it directory with - # return_str=True) - return name - # Scenario B - return value - if isinstance(value, dict): - # Scenario A.1 or B. - self._flow_cls._user_configs[self._make_key_name(param.name)] = value - return DelayedEvaluationParameter( - param.name, "value", functools.partial(_delay_eval, param.name, value) - ) + if self._parser: + value = self._parser(value) + flow_cls._user_configs[param.name] = value + return ConfigValue(value) elif not isinstance(value, str): raise MetaflowException( "Configuration value for '%s' must be a string or a dictionary" @@ -173,12 +149,13 @@ def _delay_eval(name: str, value: ConfigValue, return_str=False): # Here we are sure we have a string if value.startswith("kv."): - # This is scenario A.2 - value = self.get_config(value) + value = self.get_config(value[3:]) if value is None: raise MetaflowException( "Could not find configuration '%s' in INFO file" % value ) + # We also set in flow_cls as this makes it easier to access + flow_cls._user_configs[param.name] = value return ConfigValue(value) elif os.path.isfile(value): @@ -189,26 +166,33 @@ def _delay_eval(name: str, value: ConfigValue, return_str=False): raise MetaflowException( "Could not read configuration file '%s'" % value ) from e - try: - value = json.loads(content) - except json.JSONDecodeError as e: - raise MetaflowException( - "Configuration file '%s' is not valid JSON" % value - ) from e - # TODO: Support YAML - self._flow_cls._user_configs[self._make_key_name(param.name)] = value + if self._parser: + value = self._parser(content) + else: + try: + if self._parser: + value = self._parser(content) + + value = json.loads(content) + except json.JSONDecodeError as e: + raise MetaflowException( + "Configuration file '%s' is not valid JSON" % value + ) from e + # TODO: Support YAML + flow_cls._user_configs[param.name] = value else: - try: - value = json.loads(value) - except json.JSONDecodeError as e: - raise MetaflowException( - "Configuration value for '%s' is not valid JSON" % param.name - ) from e - # TODO: Support YAML - self._flow_cls._user_configs[self._make_key_name(param.name)] = value - return DelayedEvaluationParameter( - param.name, "value", functools.partial(_delay_eval, param.name, value) - ) + if self._parser: + value = self._parser(value) + else: + try: + value = json.loads(value) + except json.JSONDecodeError as e: + raise MetaflowException( + "Configuration value for '%s' is not valid JSON" % param.name + ) from e + # TODO: Support YAML + flow_cls._user_configs[param.name] = value + return ConfigValue(value) def __str__(self): return repr(self) @@ -217,9 +201,140 @@ def __repr__(self): return "ConfigInput" +class LocalFileInput(click.Path): + name = "LocalFileInput" + + def convert(self, value, param, ctx): + super().convert(value, param, ctx) + ConfigInput.set_info_file(value) + # This purposefully returns None which means it is *not* passed down + # when commands use ctx.parent.parent.params to get all the configuration + # values. + + def __str__(self): + return repr(self) + + def __repr__(self): + return "LocalFileInput" + + ConfigArgType = Union[str, Dict[str, Any]] +class DelayEvaluator: + """ + Small wrapper that allows the evaluation of a Config() value in a delayed manner. + This is used when we want to use config.* values in decorators for example. + """ + + def __init__(self, config_expr: str, is_var_only=True): + self._config_expr = config_expr + if is_var_only: + self._access = [] + else: + self._access = None + self._is_var_only = is_var_only + + def __getattr__(self, name): + if self._access is None: + raise AttributeError() + self._access.append(name) + return self + + def __call__(self): + flow_cls = getattr(current_flow, "flow_cls", None) + if flow_cls is None: + # We are not executing inside a flow (ie: not the CLI) + raise MetaflowException( + "Config object can only be used directly in the FlowSpec defining them. " + "If using outside of the FlowSpec, please use ConfigEval" + ) + if self._access is not None: + # Build the final expression by adding all the fields in access as . fields + self._config_expr = ".".join([self._config_expr] + self._access) + # Evaluate the expression setting the config values as local variables + return eval( + self._config_expr, + globals(), + {k: ConfigValue(v) for k, v in flow_cls._user_configs.items()}, + ) + + +def config_expr(expr: str) -> DelayEvaluator: + return DelayEvaluator(expr) + + +def eval_config(f: Callable[["FlowSpec"], "FlowSpec"]) -> "FlowSpec": + """ + Decorator to allow you to add Python decorators to a FlowSpec that makes use of + user configurations. + + As an example: + + ``` + def parameterize(f): + for s in f: + # Iterate over all the steps + if s.name in f.config.add_env_to_steps: + setattr(f, s.name) = environment(vars={**f.config.env_vars})(s) + return f + + @eval_config(parameterize) + class MyFlow(FlowSpec): + config = Config("config") + ... + ``` + + allows you to add an environment decorator to all steps in `add_env_to_steps`. Both + the steps to add this decorator to and the values to add are extracted from the + configuration passed to the Flow through config. + + Parameters + ---------- + f : Callable[[FlowSpec], FlowSpec] + Decorator function + + Returns + ------- + FlowSpec + The modified FlowSpec + """ + + def _wrapper(flow_spec: "FlowSpec"): + flow_spec._config_funcs.append(f) + return flow_spec + + return _wrapper + + +class FlowConfig(DelayEvaluator): + def __init__(self, config_name: str): + """ + Small wrapper to allow you to refer to a flow's configuration in a flow-level + decorator. + + As an example: + + @project(name=FlowConfig("config").project.name) + class MyFlow(FlowSpec): + config = Config("config") + ... + + This will allow you to specify a `project.name` value in your configuration + and have it used in the flow-level decorator. + + Without this construct, it would be difficult to access `config` inside the + arguments of the decorator. + + Parameters + ---------- + config_name : str + Name of the configuration being used. This should be the name given to + the `Config` constructor. + """ + super().__init__(config_name, is_var_only=True) + + class Config(Parameter): """ Includes a configuration for this flow. @@ -233,34 +348,49 @@ class Config(Parameter): Parameters ---------- name : str - User-visible parameter name. - default : Union[ConfigArgType, Callable[[ParameterContext], ConfigArgType]] - Default configuration either as a path to a file, the string representation of - a YAML or JSON file or a dictionary. If specified as a function, the function - will be evaluated to get the value to use. + User-visible configuration name. + default : Union[str, Dict[str, Any], Callable[[ParameterContext], Union[str, Dict[str, Any]]]], optional, default None + Default value for the parameter. A function + implies that the value will be computed using that function. + help : str, optional, default None + Help text to show in `run --help`. required : bool, default False Require that the user specified a value for the parameter. - `required=True` implies that the `default` value is ignored. - help : str, optional - Help text to show in `run --help`. + `required=True` implies that the `default` is not used. + parser : Callable[[str], Dict[str, Any]], optional, default None show_default : bool, default True If True, show the default value in the help text. """ + IS_FLOW_PARAMETER = True + def __init__( self, name: str, - required: bool = False, + default: Optional[ + Union[ + str, + Dict[str, Any], + Callable[[ParameterContext], Union[str, Dict[str, Any]]], + ] + ] = None, help: Optional[str] = None, - **kwargs: Dict[str, str] + required: bool = False, + parser: Optional[Callable[[str], Dict[str, Any]]] = None, + **kwargs: Dict[str, str], ): super(Config, self).__init__( name, + default=default, required=required, help=help, - type=ConfigInput(), + type=ConfigInput(parser), **kwargs, ) def load_parameter(self, v): return v + + def __getattr__(self, name): + ev = DelayEvaluator(self.name, is_var_only=True) + return ev.__getattr__(name) diff --git a/metaflow/util.py b/metaflow/util.py index e95ab9f0f87..348323ee42b 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -296,6 +296,9 @@ def get_metaflow_root(): def dict_to_cli_options(params): + # Prevent circular imports + from metaflow.user_configs import ConfigInput, ConfigValue + for k, v in params.items(): # Omit boolean options set to false or None, but preserve options with an empty # string argument. @@ -304,11 +307,19 @@ def dict_to_cli_options(params): # keyword in Python, so we call it 'decospecs' in click args if k == "decospecs": k = "with" + orig_k = k k = k.replace("_", "-") v = v if isinstance(v, (list, tuple, set)) else [v] for value in v: yield "--%s" % k if not isinstance(value, bool): + if isinstance(value, ConfigValue): + # For ConfigValues, we don't send them as is but instead pass + # the special value that will look up the config value in the + # INFO file + yield ConfigInput.make_key_name(orig_k) + continue + value = to_unicode(value) # Of the value starts with $, assume the caller wants shell variable