diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c842c79..4726bd6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,6 +22,7 @@ jobs: os: [ ubuntu-latest, macos-latest ] python-version: [ "3.8", "3.10", "3.12" ] resolver: [ mamba, conda, micromamba ] + mamba-version: [ "1.5.10", "latest" ] env: METAFLOW_CONDA_DEPENDENCY_RESOLVER: ${{ matrix.resolver }} METAFLOW_CONDA_TEST: 1 @@ -31,7 +32,7 @@ jobs: - uses: mamba-org/setup-micromamba@f8b8a1e23a26f60a44c853292711bacfd3eac822 # v1.9.0 with: - micromamba-version: latest + micromamba-version: ${{ matrix.mamba-version }} environment-file: dev-env.yml init-shell: bash create-args: >- diff --git a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py index f2af6c8..815262f 100644 --- a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py +++ b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py @@ -21,6 +21,7 @@ from metaflow.plugins import DATASTORES from metaflow.metaflow_config import ( CONDA_ALL_ARCHS, + CONDA_DEPENDENCY_RESOLVER, CONDA_TEST, CONDA_SYS_DEPENDENCIES, DEFAULT_DATASTORE, @@ -241,6 +242,13 @@ def environment( help="Recreate the environment if it already exists and remove the `into` directory " "if it exists", ) +@click.option( + "--strict/--no-strict", + default=True, + is_flag=True, + show_default=True, + help="If True, fails if it cannot install the original Metaflow environment", +) @click.option( "--into-dir", default=None, @@ -269,6 +277,7 @@ def create( name: Optional[str], local_only: bool, force: bool, + strict: bool, into_dir: Optional[str], install_notebook: bool, pathspec: bool, @@ -299,26 +308,26 @@ def create( else: os.makedirs(into_dir) if install_notebook and name is None: - raise click.BadOptionUsage("--install-notebook requires --name") + raise click.BadOptionUsage("install-notebook", "requires --name") code_pkg = None - mf_version = None + mf_version = "" + mf_extensions_info = None if pathspec: env_name = "step:%s" % env_name alias_type, resolved_alias = resolve_env_alias(env_name) if alias_type == AliasType.PATHSPEC: if not pathspec: - raise click.BadOptionUsage( - "--pathspec used but environment name is not a pathspec" - ) + raise click.BadOptionUsage("pathspec", "environment name is not a pathspec") task = Step(resolved_alias, _namespace_check=False).task code_pkg = task.code - mf_version = task.metadata_dict["metaflow_version"] + mf_version = task.metadata_dict.get("metaflow_version", "") + mf_extensions_info = task["_graph_info"].data.get("extensions") else: if pathspec: raise click.BadOptionUsage( - "--pathspec not used but environment name is a pathspec" + "pathspec", "missing --pathspec; environment name is a pathspec" ) env_id_for_alias = cast(Conda, obj.conda).env_id_from_alias( @@ -339,61 +348,66 @@ def create( # We need to install ipykernel into the resolved environment obj.echo(" Resolving an environment compatible with Jupyter ...", nl=False) - # We use envsresolver to properly deal with builder environments and what not - resolver = EnvsResolver(obj.conda) - # We force the env_type to be the same as the base env since we don't modify that - # by adding these deps. - - # We also force the use of use_latest because we are not really doing anything - # that would require a re-resolve (ie: the user doesn't really care about the - # version of ipykernel most likely). - resolver.add_environment( - arch_id(), - user_deps={ - "pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": ["ipykernel"] - }, - user_sources={}, - extras={}, - base_env=env, - local_only=local_only, - use_latest=":any:", - ) - resolver.resolve_environments(obj.echo) - update_envs = [] # type: List[ResolvedEnvironment] - if obj.datastore_type != "local" or CONDA_TEST: - # We may need to update caches - # Note that it is possible that something we needed to resolve, we don't need - # to cache (if we resolved to something already cached). - formats = set() # type: Set[str] - for _, resolved_env, f, _ in resolver.need_caching_environments( - include_builder_envs=True - ): - update_envs.append(resolved_env) - formats.update(f) - - cast(Conda, obj.conda).cache_environments( - update_envs, {"conda": list(formats)} + # We first check if `ipykernel` already exists in the environment. If it does, we + # can skip the whole resolution process. + if not any("ipykernel" == p.package_name for p in env.packages): + # We use envsresolver to properly deal with builder environments and what not + resolver = EnvsResolver(obj.conda) + # We force the env_type to be the same as the base env since we don't modify + # that by adding these deps. + + # We also force the use of use_latest because we are not really doing + # anything that would require a re-resolve (ie: the user doesn't really + # care about the version of ipykernel most likely). + resolver.add_environment( + arch_id(), + user_deps={ + "pypi" if env.env_type == EnvType.PYPI_ONLY else "conda": [ + "ipykernel" + ] + }, + user_sources={}, + extras={}, + base_env=env, + local_only=local_only, + use_latest=":any:", ) - else: - update_envs = [ - resolved_env - for _, resolved_env, _ in resolver.new_environments( + resolver.resolve_environments(obj.echo) + update_envs = [] # type: List[ResolvedEnvironment] + if obj.datastore_type != "local" or CONDA_TEST: + # We may need to update caches + # Note that it is possible that something we needed to resolve, we don't need + # to cache (if we resolved to something already cached). + formats = set() # type: Set[str] + for _, resolved_env, f, _ in resolver.need_caching_environments( include_builder_envs=True + ): + update_envs.append(resolved_env) + formats.update(f) + + cast(Conda, obj.conda).cache_environments( + update_envs, {"conda": list(formats)} ) - ] - cast(Conda, obj.conda).add_environments(update_envs) + else: + update_envs = [ + resolved_env + for _, resolved_env, _ in resolver.new_environments( + include_builder_envs=True + ) + ] + cast(Conda, obj.conda).add_environments(update_envs) - # Update the default environment - for _, resolved_env, _ in resolver.resolved_environments( - include_builder_envs=True - ): - cast(Conda, obj.conda).set_default_environment(resolved_env.env_id) + # Update the default environment + for _, resolved_env, _ in resolver.resolved_environments( + include_builder_envs=True + ): + cast(Conda, obj.conda).set_default_environment(resolved_env.env_id) - cast(Conda, obj.conda).write_out_environments() + cast(Conda, obj.conda).write_out_environments() - # We are going to be creating this new environment going forward (not the - # initial env we got) - _, env, _ = next(resolver.resolved_environments()) + # We are going to be creating this new environment going forward (not the + # initial env we got) + _, env, _ = next(resolver.resolved_environments()) delta_time = int(time.time() - start) obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time))) @@ -422,10 +436,12 @@ def create( "Step '%s' does not have a code package -- " "downloading active Metaflow version only" % env_name ) - download_mf_version("./__conda_python", mf_version) + download_mf_version( + "./__conda_python", mf_version, mf_extensions_info, obj.echo, strict + ) obj.echo( - "Code package for %s downloaded into '%s' -- `__conda_python` is " - "the executable to use" % (env_name, into_dir) + "Python executable `__conda_python` for environment '%s' downloaded " + "into '%s'" % (env_name, into_dir) ) else: python_bin = os.path.join(obj.conda.create_for_name(name, env), "bin", "python") @@ -498,8 +514,9 @@ def create( f.write("\n") else: obj.echo( - "Created environment '%s' locally, activate with `%s activate %s`" - % (name, obj.conda.binary("conda"), name) + "Conda environment '%s' created locally, activate with " + "`CONDA_ENVS_DIRS=%s %s activate %s`" + % (name, obj.conda.root_env_dir, CONDA_DEPENDENCY_RESOLVER, name) ) cast(Conda, obj.conda).write_out_environments() diff --git a/metaflow_extensions/netflix_ext/cmd/environment/utils.py b/metaflow_extensions/netflix_ext/cmd/environment/utils.py index 55f4eec..2d78f7d 100644 --- a/metaflow_extensions/netflix_ext/cmd/environment/utils.py +++ b/metaflow_extensions/netflix_ext/cmd/environment/utils.py @@ -1,20 +1,20 @@ import json import os import re +import shutil import subprocess -from typing import Dict, List, Optional - -from metaflow import Step -from metaflow_extensions.netflix_ext.plugins.conda.conda import Conda -from metaflow_extensions.netflix_ext.plugins.conda.env_descr import ( - EnvID, - ResolvedEnvironment, - TStr, +import sys +from typing import Any, Dict, Optional + +from metaflow.extension_support import ( + dump_module_info, + get_extensions_in_dir, + update_package_info, ) -from metaflow_extensions.netflix_ext.plugins.conda.utils import arch_id -_deps_parse = re.compile(r"([^<>=!~]+)(.*)") +# _deps_parse = re.compile(r"([^<>=!~]+)(.*)") _ext_parse = re.compile(r"([-_\w]+)\(([^)]+)\)") +_git_version = re.compile(r"-git([0-9a-f]+)(-dirty)?$") name_to_pkg = {"netflix-ext": "metaflow-netflixext"} @@ -72,8 +72,41 @@ # return local_instances -def download_mf_version(executable: str, version_str: str): - def _install_pkg(pkg: str, ver: str): +def _merge_directories(src_dir, dest_dir): + # Due to a bug in PIP, we can't use --target to install namespace packages + # so we hack around it by merging directories manually. + + for root, dirs, files in os.walk(src_dir): + # Determine the path of the current directory relative to src_dir + relative_path = os.path.relpath(root, src_dir) + # Determine the corresponding path in the destination directory + dest_path = os.path.join(dest_dir, relative_path) + + # Create directories in the destination directory + for dir_name in dirs: + dest_dir_path = os.path.join(dest_path, dir_name) + if not os.path.exists(dest_dir_path): + os.makedirs(dest_dir_path) + + # Copy files to the destination directory + for file_name in files: + src_file_path = os.path.join(root, file_name) + dest_file_path = os.path.join(dest_path, file_name) + shutil.copy2(src_file_path, dest_file_path) + + +def download_mf_version( + executable: str, version_str: str, extension_info: Dict[str, Any], echo, fail_hard +): + def echo_or_fail(msg): + if fail_hard: + raise RuntimeError( + msg + + ". Use --no-strict to install latest versions if possible instead." + ) + echo("WARNING: " + msg + " -- installing latest version if possible.") + + def _install_pkg(pkg: str, ver: Optional[str]): try: subprocess.check_call( [ @@ -81,29 +114,118 @@ def _install_pkg(pkg: str, ver: str): "-m", "pip", "install", + "--quiet", "-t", ".", "--no-deps", - "%s==%s" % (pkg, ver), + "%s==%s" % (pkg, ver) if ver else pkg, ] ) except subprocess.CalledProcessError as e: raise RuntimeError( "Could not install version '%s' of '%s': %s" % (ver, pkg, e.stderr) - ) + ) from e + + if not version_str: + raise ValueError("Unknown version of Metaflow") s = version_str.split("+", 1) - _install_pkg("metaflow", s[0]) + if _git_version.search(s[0]): + # This is not a "public" release so we install the latest + echo_or_fail("Metaflow's version is non public (%s)" % s[0]) + _install_pkg("metaflow", None) + else: + _install_pkg("metaflow", s[0]) if len(s) == 1: return - # We now install the other packages, they are in the format name(ver);name(ver)... - s = s[1].split(";") - for pkg_desc in s: - m = _ext_parse.match(pkg_desc) - if not m: - raise ValueError("Metaflow extension '%s' is not a valid format" % pkg_desc) - pkg_name, pkg_version = m.groups() - pkg = name_to_pkg.get(pkg_name) - if pkg is None: - raise ValueError("Metaflow extension '%s' is not known" % pkg_name) - _install_pkg(pkg, pkg_version) + # We now install the other packages (extensions). + # If we have extension_info, we can get that information from there. + # That is ideal if we have that. If not, we do our best from the version string + # where packages are in the form name(vers);name(vers) but name is not necessarily + # the package name. + if extension_info: + wrong_version_info = set() + first_round = True + for pkg_name, pkg_info in extension_info["installed"].items(): + if pkg_name.startswith("_pythonpath"): + # Local package with *zero* information + echo_or_fail("Unknown extension present at runtime") + continue + if pkg_info["package_version"] == "" or _git_version.search( + pkg_info["package_version"] + ): + echo_or_fail( + "Extension '%s' has a non-public version (%s)" + % (pkg_info["extension_name"], pkg_info["package_version"]) + ) + pkg_version = pkg_info["dist_version"] + wrong_version_info.add(pkg_name) + _install_pkg(pkg_name, pkg_version) + if first_round: + shutil.move("metaflow_extensions", "metaflow_extensions_tmp") + first_round = False + else: + _merge_directories("metaflow_extensions", "metaflow_extensions_tmp") + shutil.rmtree("metaflow_extensions") + else: + s = s[1].split(";") + first_round = True + for pkg_desc in s: + m = _ext_parse.match(pkg_desc) + if not m: + # In some cases (older Metaflow), the version is not recorded so + # we just install the latest + echo_or_fail("Extension '%s' does not have a version" % pkg_desc) + pkg_name, pkg_version = pkg_desc, None + else: + pkg_name, pkg_version = m.groups() + pkg = name_to_pkg.get(pkg_name) + if pkg is None: + raise ValueError("Metaflow extension '%s' is not known" % pkg_name) + _install_pkg(pkg, pkg_version) + if first_round: + shutil.move("metaflow_extensions", "metaflow_extensions_tmp") + first_round = False + else: + _merge_directories("metaflow_extensions", "metaflow_extensions_tmp") + shutil.rmtree("metaflow_extensions") + # We now do a few things to make sure the Metaflow environment is recreated + # as closely as possible: + # - add a __init__.py file to the metaflow_extensions directory to prevent + # other extensions from being loaded + # - create a INFO file with the extension information. This will allow for the + # __init__.py file (since otherwise it is an error) and will also remove + # conflicts when trying to load the extensions. + # - we clean up all the dist-info directories that were created as part of the + # pip install. This is not strictly necessary but it is cleaner. + shutil.move("metaflow_extensions_tmp", "metaflow_extensions") + sys.path.insert(0, ".") + installed_packages, pkgs_per_extension_point = get_extensions_in_dir(os.getcwd()) + # Update the information with the reported version and name from extension_info + for pkg_name, pkg_info in extension_info["installed"].items(): + if pkg_name in installed_packages: + update_package_info( + pkg_to_update=installed_packages[pkg_name], + dist_version=pkg_info["dist_version"], + extension_name=pkg_info["extension_name"], + ) + if pkg_name not in wrong_version_info: + update_package_info( + pkg_to_update=installed_packages[pkg_name], + package_version=pkg_info["package_version"], + ) + + key, val = dump_module_info(installed_packages, pkgs_per_extension_point) + sys.path.pop(0) + + with open("metaflow_extensions/__init__.py", "w+", encoding="utf-8") as f: + f.write("# This file is automatically generated by Metaflow\n") + + with open(os.path.basename(INFO_FILE), "w+", encoding="utf-8") as f: + json.dump({key: val}, f) + + # Clean up the dist-info directories + for root, dirs, _ in os.walk(".", topdown=False): + for d in dirs: + if d.endswith(".dist-info"): + shutil.rmtree(os.path.join(root, d)) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda.py b/metaflow_extensions/netflix_ext/plugins/conda/conda.py index 8f34a91..e617f3d 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda.py @@ -58,6 +58,8 @@ CONDA_USE_REMOTE_LATEST, ) from metaflow.metaflow_environment import InvalidEnvironmentException + +from metaflow.system import _system_logger, _system_monitor from metaflow.util import get_username from metaflow._vendor.packaging.version import parse as parse_version @@ -389,12 +391,41 @@ def create_for_step( try: env_name = self._env_directory_from_envid(env.env_id) - return self.create_for_name(env_name, env, do_symlink) + to_return = None + s = time.time() + with _system_monitor.measure("metaflow.conda.create_for_step"): + to_return = self.create_for_name(env_name, env, do_symlink) + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="env_create_for_step", + payload={ + "qualifier_name": str(env.env_id), + "msg": "Environment created in %d seconds" % (time.time() - s), + }, + ) + return to_return except CondaException as e: + import traceback + + with _system_monitor.count("metaflow.conda.create_for_step.error"): + _system_logger.log_event( + level="error", + module="netflix_ext.conda", + name="env_create_for_step.error", + payload={ + "qualifier_name": str(env.env_id), + "msg": traceback.format_exc(), + }, + ) raise CondaStepException(e, [step_name]) from None def create_for_name( - self, name: str, env: ResolvedEnvironment, do_symlink: bool = False + self, + name: str, + env: ResolvedEnvironment, + do_symlink: bool = False, + quiet: bool = False, ) -> str: """ Creates a local instance of the resolved environment @@ -408,6 +439,9 @@ def create_for_name( do_symlink : bool, optional If True, creates a `__conda_python` symlink in the current directory pointing to the created Conda Python executable, by default False + quiet : bool, optional + If True, does not print status messages when creating the environment, + by default False Returns ------- @@ -430,7 +464,12 @@ def create_for_name( with CondaLockMultiDir( self.echo, self._package_dirs, self._package_dir_lockfile_name ): + if quiet: + techo = self.echo + self.echo = self._no_echo env_path = self._create(env, name) + if quiet: + self.echo = techo if do_symlink: os.symlink( @@ -442,12 +481,11 @@ def create_for_name( def create_builder_env(self, builder_env: ResolvedEnvironment) -> str: # A helper to build a named environment specifically for builder environments. # We are more quiet and have a specific name for it - techo = self.echo - self.echo = self._no_echo r = self.create_for_name( - self._env_builder_directory_from_envid(builder_env.env_id), builder_env + self._env_builder_directory_from_envid(builder_env.env_id), + builder_env, + quiet=True, ) - self.echo = techo return r @@ -1191,7 +1229,8 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str: self._upload_to_ds(upload_files) delta_time = int(time.time() - start) self.echo( - " done in %d second%s." % (delta_time, plural_marker(delta_time)) + " done in %d second%s." % (delta_time, plural_marker(delta_time)), + timestamp=False, ) else: self.echo( @@ -1239,7 +1278,8 @@ def _cache_pkg(pkg: PackageSpecification, pkg_fmt: str, local_path: str) -> str: self._upload_to_ds(upload_files) delta_time = int(time.time() - start) self.echo( - " done in %d second%s." % (delta_time, plural_marker(delta_time)) + " done in %d second%s." % (delta_time, plural_marker(delta_time)), + timestamp=False, ) else: self.echo( @@ -2071,7 +2111,7 @@ def _check_match(dir_name: str) -> Optional[EnvID]: # similar reasons -- we only want to search the ones we created. # For micromamba OR if we are using a specific conda installation # (so with CONDA_LOCAL_PATH), only search there - env_dir = self._root_env_dir + env_dir = self.root_env_dir with CondaLock(self.echo, self._env_lock_file(os.path.join(env_dir, "_"))): # Grab a lock *once* on the parent directory so we pick anyname for # the "directory". @@ -2206,7 +2246,7 @@ def _package_dirs(self) -> List[str]: return info["pkgs_dirs"] @property - def _root_env_dir(self) -> str: + def root_env_dir(self) -> str: info = self._info # We rely on the first directory existing. This should be a fairly # easy check. @@ -2233,7 +2273,7 @@ def _info_no_lock(self) -> Dict[str, Any]: def _create(self, env: ResolvedEnvironment, env_name: str) -> str: # We first check to see if the environment exists -- if it does, we skip it - env_dir = os.path.join(self._root_env_dir, env_name) + env_dir = os.path.join(self.root_env_dir, env_name) self._cached_info = None @@ -2484,10 +2524,10 @@ def paths_and_handles(): def _env_lock_file(self, env_directory: str): # env_directory is either a name or a directory -- if name, it is assumed - # to be rooted at _root_env_dir + # to be rooted at root_env_dir parent_dir = os.path.split(env_directory)[0] if parent_dir == "": - parent_dir = self._root_env_dir + parent_dir = self.root_env_dir return os.path.join(parent_dir, "mf_env-creation.lock") @property @@ -2660,6 +2700,7 @@ def _acquire(self) -> None: try_count = 0 while True: try: + debug.conda_exec("Attempting to create lock at %s" % self.lock) self.fd = os.open(self.lock, os.O_CREAT | os.O_EXCL | os.O_RDWR) self.locked = True break @@ -2667,15 +2708,16 @@ def _acquire(self) -> None: if e.errno != errno.EEXIST: raise - if try_count < 3: - try_count += 1 - elif try_count == 3: + debug.conda_exec( + "Lock at %s already exists -- try %d" % (self.lock, try_count + 1) + ) + try_count += 1 + if try_count % 3 == 0: self.echo( "Waited %ds to acquire lock at '%s' -- if unexpected, " "please remove that file and retry" % (try_count * self.delay, self.lock) ) - try_count += 1 if self.timeout is None: raise CondaException( @@ -2724,6 +2766,7 @@ def __init__( def _acquire(self) -> None: start = time.time() + debug.conda_exec("Will acquire locks on %s" % ", ".join(self.dirs)) for d in self.dirs: full_file = os.path.join(d, self.lockfile) try_count = 0 @@ -2734,6 +2777,8 @@ def _acquire(self) -> None: raise while True: try: + debug.conda_exec("Attempting to create lock at %s" % full_file) + self.fd.append( os.open(full_file, os.O_CREAT | os.O_EXCL | os.O_RDWR) ) @@ -2742,15 +2787,17 @@ def _acquire(self) -> None: if e.errno != errno.EEXIST: raise - if try_count < 3: - try_count += 1 - elif try_count == 3: + debug.conda_exec( + "Lock at %s already exists -- try %d" + % (full_file, try_count + 1) + ) + try_count += 1 + if try_count % 3 == 0: self.echo( "Waited %ds to acquire lock at '%s' -- if unexpected, " "please remove that file and retry" % (try_count * self.delay, full_file) ) - try_count += 1 if self.timeout is None: raise CondaException( diff --git a/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py b/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py index 31d5fe0..bdd75f3 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/envsresolver.py @@ -30,6 +30,7 @@ CONDA_USE_REMOTE_LATEST, ) from metaflow.metaflow_environment import InvalidEnvironmentException +from metaflow.system import _system_monitor, _system_logger from metaflow._vendor.packaging.version import parse as parse_version from .env_descr import ( @@ -221,21 +222,32 @@ def resolve_environments(self, echo: Callable[..., None]): echo : Callable[..., None] Method to use to print things to the console """ - # At this point, we check in our backend storage if we have the files we need - need_resolution = [ - env_id - for env_id, req in self._requested_envs.items() - if req["resolved"] is None - ] - if debug.conda: - debug.conda_exec("Resolving environments:") - for env_id in need_resolution: - info = self._requested_envs[env_id] - debug.conda_exec( - "%s (%s): %s" % (env_id.req_id, env_id.full_id, str(info)) - ) - if len(need_resolution): - self._resolve_environments(echo, need_resolution) + with _system_monitor.measure("metaflow.conda.all_resolve"): + start = time.time() + # At this point, we check in our backend storage if we have the files we need + need_resolution = [ + env_id + for env_id, req in self._requested_envs.items() + if req["resolved"] is None + ] + if debug.conda: + debug.conda_exec("Resolving environments:") + for env_id in need_resolution: + info = self._requested_envs[env_id] + debug.conda_exec( + "%s (%s): %s" % (env_id.req_id, env_id.full_id, str(info)) + ) + if len(need_resolution): + self._resolve_environments(echo, need_resolution) + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="all_envs_resolved", + payload={ + "msg": "All environment resolved and cached in %d seconds" + % (time.time() - start) + }, + ) def all_environments( self, include_builder_envs: bool = False @@ -640,91 +652,104 @@ def _resolve( env_desc: Mapping[str, Any], builder_environments: Optional[Dict[str, List[EnvID]]], ) -> Tuple[EnvID, ResolvedEnvironment, Optional[List[ResolvedEnvironment]]]: - env_id = cast(EnvID, env_desc["id"]) - if builder_environments is None: - builder_environments = {} - - builder_envs = [ - self._builder_envs[builder_env_id]["resolved"] - for builder_env_id in builder_environments.get(env_id.req_id, []) - ] - - # Figure out the env_type - env_type = cast( - EnvType, env_desc.get("env_type") or env_type_for_deps(env_desc["deps"]) - ) + with _system_monitor.measure("metaflow.conda.resolve"): + start = time.time() + env_id = cast(EnvID, env_desc["id"]) + if builder_environments is None: + builder_environments = {} + + builder_envs = [ + self._builder_envs[builder_env_id]["resolved"] + for builder_env_id in builder_environments.get(env_id.req_id, []) + ] - # Create the resolver object - resolver = self.get_resolver(env_type)(self._conda) - - # Resolve the environment - if env_type == EnvType.PYPI_ONLY: - # Pypi only mode - # In this mode, we also allow (as a workaround for poor support for - # more advanced options in conda-lock (like git repo, local support, - # etc)) the inclusion of conda packages that are *not* python packages. - # To ensure this, we check the npconda packages, create an environment - # for it and check if that environment doesn't contain python deps. - # If that is the case, we then create the actual environment including - # both conda and npconda packages and re-resolve. We could maybe - # optimize to not resolve from scratch twice but given this is a rare - # situation and the cost is only during resolution, it doesn't seem - # worth it. - npconda_deps = env_desc["deps"].get("npconda", []) - if npconda_deps: - npcondaenv, _ = self.get_resolver(EnvType.CONDA_ONLY)( - self._conda - ).resolve( - EnvType.CONDA_ONLY, - {"npconda": npconda_deps}, - env_desc["sources"], - {}, - env_id.arch, - ) - if any((p.filename.startswith("python-") for p in npcondaenv.packages)): - raise InvalidEnvironmentException( - "Cannot specify a non-python Conda dependency that uses " - "python: %s. Please use the mixed mode instead." - % ", ".join([d.value for d in npconda_deps]) - ) - resolved_env, builder_envs = resolver.resolve( - env_type, - env_desc["deps"], - env_desc["sources"], - env_desc["extras"], - env_id.arch, - builder_envs, - env_desc["base"], - ) + # Figure out the env_type + env_type = cast( + EnvType, env_desc.get("env_type") or env_type_for_deps(env_desc["deps"]) + ) - if env_desc["base"]: - # We try to copy things over from the base environment as it contains - # potential caching information we don't need to rebuild. This also - # properly sets the user dependencies to what we want as opposed to - # including everything we resolved for. - merged_packages = [] # type: List[PackageSpecification] - base_packages = { - p.filename: p.to_dict() - for p in cast(ResolvedEnvironment, env_desc["base"]).packages - } - for p in resolved_env.packages: - existing_info = base_packages.get(p.filename) - if existing_info: - merged_packages.append( - PackageSpecification.from_dict(existing_info) + # Create the resolver object + resolver = self.get_resolver(env_type)(self._conda) + + # Resolve the environment + if env_type == EnvType.PYPI_ONLY: + # Pypi only mode + # In this mode, we also allow (as a workaround for poor support for + # more advanced options in conda-lock (like git repo, local support, + # etc)) the inclusion of conda packages that are *not* python packages. + # To ensure this, we check the npconda packages, create an environment + # for it and check if that environment doesn't contain python deps. + # If that is the case, we then create the actual environment including + # both conda and npconda packages and re-resolve. We could maybe + # optimize to not resolve from scratch twice but given this is a rare + # situation and the cost is only during resolution, it doesn't seem + # worth it. + npconda_deps = env_desc["deps"].get("npconda", []) + if npconda_deps: + npcondaenv, _ = self.get_resolver(EnvType.CONDA_ONLY)( + self._conda + ).resolve( + EnvType.CONDA_ONLY, + {"npconda": npconda_deps}, + env_desc["sources"], + {}, + env_id.arch, ) - else: - merged_packages.append(p) - resolved_env = ResolvedEnvironment( - env_desc["user_deps"], + if any( + (p.filename.startswith("python-") for p in npcondaenv.packages) + ): + raise InvalidEnvironmentException( + "Cannot specify a non-python Conda dependency that uses " + "python: %s. Please use the mixed mode instead." + % ", ".join([d.value for d in npconda_deps]) + ) + resolved_env, builder_envs = resolver.resolve( + env_type, + env_desc["deps"], env_desc["sources"], env_desc["extras"], env_id.arch, - all_packages=merged_packages, - env_type=resolved_env.env_type, - accurate_source=env_desc["base_accurate"], + builder_envs, + env_desc["base"], + ) + + if env_desc["base"]: + # We try to copy things over from the base environment as it contains + # potential caching information we don't need to rebuild. This also + # properly sets the user dependencies to what we want as opposed to + # including everything we resolved for. + merged_packages = [] # type: List[PackageSpecification] + base_packages = { + p.filename: p.to_dict() + for p in cast(ResolvedEnvironment, env_desc["base"]).packages + } + for p in resolved_env.packages: + existing_info = base_packages.get(p.filename) + if existing_info: + merged_packages.append( + PackageSpecification.from_dict(existing_info) + ) + else: + merged_packages.append(p) + resolved_env = ResolvedEnvironment( + env_desc["user_deps"], + env_desc["sources"], + env_desc["extras"], + env_id.arch, + all_packages=merged_packages, + env_type=resolved_env.env_type, + accurate_source=env_desc["base_accurate"], + ) + _system_logger.log_event( + level="info", + module="netflix_ext.conda", + name="env_resolved", + payload={ + "qualifier_name": str(env_id), + "msg": "Environment resolved in %d seconds" % (time.time() - start), + }, ) - return env_id, resolved_env, builder_envs + return env_id, resolved_env, builder_envs @staticmethod def extract_info_from_base( diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py index e142b7d..dcbc004 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_lock_resolver.py @@ -228,15 +228,10 @@ def resolve( # else: conda_exec_type = self._conda.conda_executable_type if conda_exec_type: - if conda_exec_type == "conda": - args.append(cast(str, self._conda.binary(conda_exec_type))) - else: - args.extend( - [ - cast(str, self._conda.binary(conda_exec_type)), - "--%s" % conda_exec_type, - ] - ) + args.append(cast(str, self._conda.binary(conda_exec_type))) + + if conda_exec_type != "conda": + args.append("--%s" % conda_exec_type) else: raise CondaException("Could not find conda binary for conda-lock") diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py index 077164e..02381ef 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/conda_resolver.py @@ -13,7 +13,12 @@ PackageSpecification, ResolvedEnvironment, ) -from ..utils import CondaException, channel_or_url, parse_explicit_url_conda +from ..utils import ( + CondaException, + channel_or_url, + clean_up_double_equal, + parse_explicit_url_conda, +) from . import Resolver @@ -40,7 +45,10 @@ def resolve( % ", ".join([p.package_name for p in local_packages]) ) sys_overrides = {k: v for d in deps.get("sys", []) for k, v in [d.split("==")]} - real_deps = list(chain(deps.get("conda", []), deps.get("npconda", []))) + real_deps = clean_up_double_equal( + chain(deps.get("conda", []), deps.get("npconda", [])) + ) + packages = [] # type: List[PackageSpecification] with tempfile.TemporaryDirectory() as mamba_dir: args = [ diff --git a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py index 925b690..f6b8aa7 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/resolvers/pip_resolver.py @@ -24,6 +24,7 @@ from ..utils import ( CondaException, arch_id, + clean_up_double_equal, correct_splitext, get_glibc_version, parse_explicit_path_pypi, @@ -254,17 +255,7 @@ def resolve( # Unfortunately, pip doesn't like things like ==<= so we need to strip # the == - for d in real_deps: - splits = d.split("==", 1) - if len(splits) == 1: - args.append(d) - else: - if splits[1][0] in ("=", "<", ">", "!", "~"): - # Something originally like pkg==<=ver - args.append("".join(splits)) - else: - # Something originally like pkg==ver - args.append(d) + args.extend(clean_up_double_equal(real_deps)) self._conda.call_binary(args, binary=builder_python, addl_env=addl_env) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/utils.py b/metaflow_extensions/netflix_ext/plugins/conda/utils.py index 828331c..9880582 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/utils.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/utils.py @@ -18,6 +18,7 @@ Any, Dict, FrozenSet, + Iterable, List, Mapping, NamedTuple, @@ -101,6 +102,9 @@ class AliasType(Enum): FAKEURL_PATHCOMPONENT = "_fake" +_double_equal_match = re.compile("==(?=[<=>!~])") + + class CondaException(MetaflowException): headline = "Conda ran into an error while setting up environment." @@ -468,6 +472,10 @@ def split_into_dict(deps: List[str]) -> Dict[str, str]: return result +def clean_up_double_equal(deps: Iterable[str]) -> List[str]: + return [_double_equal_match.sub("", d) for d in deps] + + def merge_dep_dicts( d1: Dict[str, str], d2: Dict[str, str], only_last_deps: bool = False ) -> Dict[str, str]: