Skip to content

Upgrade TorchX to Support Kubeflow Pipelines v2 #1073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,7 @@ wordlist.dic
pipeline.yaml

/codecov
.python-version
enhanced_distributed_pipeline.yaml
task_configs_pipeline.yaml
local_outputs
16 changes: 8 additions & 8 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ google-cloud-logging==3.10.0
google-cloud-runtimeconfig==0.34.0
hydra-core
ipython
kfp==1.8.22
# pin protobuf to the version that is required by kfp
protobuf==3.20.3
kfp>=2.8.0
kfp-kubernetes>=1.4.0 # For Kubernetes-specific features in KFP v2
# kfp v2 is compatible with protobuf 4+
protobuf>=4.21.0
mlflow-skinny
moto~=5.0.8
pyre-extensions
Expand All @@ -40,8 +41,7 @@ lintrunner-adapters


# reduce backtracking
grpcio==1.62.1
grpcio-status==1.48.1
googleapis-common-protos==1.63.0
google-api-core==2.18.0
protobuf==3.20.3 # kfp==1.8.22 needs protobuf < 4
grpcio>=1.62.1
grpcio-status>=1.48.1
googleapis-common-protos>=1.63.0
google-api-core>=2.18.0
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ def get_nightly_version():
"google-cloud-logging>=3.0.0",
"google-cloud-runtimeconfig>=0.33.2",
],
"kfp": ["kfp==1.6.2"],
# KFP 2.0+ is not supported yet, see https://github.com/pytorch/torchx/issues/123
"kfp": [
"kfp>=2.8.0"
], # optional: required for Kubeflow Pipelines integration
"kubernetes": ["kubernetes>=11"],
"ray": ["ray>=1.12.1"],
"dev": dev_reqs,
Expand Down
58 changes: 32 additions & 26 deletions torchx/examples/pipelines/kfp/advanced_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import sys
from typing import Dict

import kfp
import torchx

from kfp import compiler, dsl
from torchx import specs
from torchx.components.dist import ddp as dist_ddp
from torchx.components.serve import torchserve
from torchx.components.utils import copy as utils_copy, python as utils_python
from torchx.pipelines.kfp.adapter import container_from_app


parser = argparse.ArgumentParser(description="example kfp pipeline")

# %%
Expand Down Expand Up @@ -238,48 +238,54 @@
# cluster.
#
# The KFP adapter currently doesn't track the input and outputs so the
# containers need to have their dependencies specified via `.after()`.
# containers need to have their dependencies specified.
#
# We call `.set_tty()` to make the logs from the components more responsive for
# example purposes.
# We no longer need to call `.set_tty()` as that was a v1 feature.


@dsl.pipeline(
name="TorchX Advanced Pipeline",
description="Advanced KFP pipeline with TorchX components",
)
def pipeline() -> None:
# container_from_app creates a KFP container from the TorchX app
# container_from_app creates a KFP v2 task from the TorchX app
# definition.
copy = container_from_app(copy_app)
copy.container.set_tty()
copy_task = container_from_app(copy_app)
copy_task.set_display_name("Download Data")

datapreproc = container_from_app(datapreproc_app)
datapreproc.container.set_tty()
datapreproc.after(copy)
datapreproc_task = container_from_app(datapreproc_app)
datapreproc_task.set_display_name("Preprocess Data")
# In KFP v2, dependencies are automatically handled based on data flow
# If you need explicit dependencies, you need to pass outputs as inputs
datapreproc_task.after(copy_task)

# For the trainer we want to log that UI metadata so you can access
# tensorboard from the UI.
trainer = container_from_app(trainer_app, ui_metadata=ui_metadata)
trainer.container.set_tty()
trainer.after(datapreproc)
trainer_task = container_from_app(trainer_app, ui_metadata=ui_metadata)
trainer_task.set_display_name("Train Model")
trainer_task.after(datapreproc_task)

if False:
serve = container_from_app(serve_app)
serve.container.set_tty()
serve.after(trainer)
serve_task = container_from_app(serve_app)
serve_task.set_display_name("Serve Model")
serve_task.after(trainer_task)

if False:
# Serve and interpret only require the trained model so we can run them
# in parallel to each other.
interpret = container_from_app(interpret_app)
interpret.container.set_tty()
interpret.after(trainer)
interpret_task = container_from_app(interpret_app)
interpret_task.set_display_name("Interpret Model")
interpret_task.after(trainer_task)


kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
print(f.read())
with open("pipeline.yaml", "rt") as f:
print(f.read())

# %%
# Once this has all run you should have a pipeline file (typically
Expand Down
40 changes: 26 additions & 14 deletions torchx/examples/pipelines/kfp/dist_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,21 @@
======================================

This is an example KFP pipeline that uses resource_from_app to launch a
distributed operator using the kubernetes/volcano job scheduler. This only works
distributed job using the kubernetes/volcano job scheduler. This only works
in Kubernetes KFP clusters with https://volcano.sh/en/docs/ installed on them.
"""

import kfp
from kfp import compiler, dsl
from torchx import specs
from torchx.pipelines.kfp.adapter import resource_from_app


@dsl.pipeline(
name="distributed-pipeline",
description="A distributed pipeline using Volcano job scheduler",
)
def pipeline() -> None:
# First we define our AppDef for the component, we set
# First we define our AppDef for the component
echo_app = specs.AppDef(
name="test-dist",
roles=[
Expand All @@ -36,31 +40,39 @@ def pipeline() -> None:
],
)

# To convert the TorchX AppDef into a KFP container we use
# the resource_from_app adapter. This takes generates a KFP Kubernetes
# resource operator definition from the TorchX app def and instantiates it.
echo_container: kfp.dsl.BaseOp = resource_from_app(echo_app, queue="default")
# To convert the TorchX AppDef into a KFP v2 task that creates
# a Volcano job, we use the resource_from_app adapter.
# This generates a task that uses kubectl to create the Volcano job.
echo_task = resource_from_app(echo_app, queue="default")

# Set display name for better visualization
echo_task.set_display_name("Distributed Echo Job")


# %%
# To generate the pipeline definition file we need to call into the KFP compiler
# with our pipeline function.

kfp.compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)
if __name__ == "__main__":
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="pipeline.yaml",
)

with open("pipeline.yaml", "rt") as f:
print(f.read())
with open("pipeline.yaml", "rt") as f:
print(f.read())

# %%
# Once this has all run you should have a pipeline file (typically
# pipeline.yaml) that you can upload to your KFP cluster via the UI or
# a kfp.Client.
#
# Note: In KFP v2, for more advanced Kubernetes resource manipulation,
# consider using the kfp-kubernetes extension library which provides
# better integration with Kubernetes resources.
#
# See the
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/legacy-v1/tutorials/sdk-examples/#examples>`_
# `KFP SDK Examples <https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/create-a-pipeline-run/>`_
# for more info on launching KFP pipelines.

# %%
Expand Down
Loading