Skip to content
This repository has been archived by the owner on Jul 10, 2024. It is now read-only.

Commit

Permalink
SUBMARINE-1117. Connect API for CLI Experiments
Browse files Browse the repository at this point in the history
### What is this PR for?
1. Implement API with CLI for experiments with nice format and async fetching status
2. Implement e2e test for CLI experiments

### What type of PR is it?
[Feature]

### Todos

### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1117

### How should this be tested?

e2e test is implemented

### Screenshots (if appropriate)
![](https://i.imgur.com/3LSB8yq.gif)

### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? Yes

Author: atosystem <[email protected]>

Signed-off-by: kuanhsun <[email protected]>

Closes #831 from atosystem/SUBMARINE-1117 and squashes the following commits:

48b5a62 [atosystem] SUBMARINE-1117. remove redundant line
02fbc11 [atosystem] SUBMARINE-1117. remove print
f7fcb1e [atosystem] SUBMARINE-1117. fix error
707582e [atosystem] SUBMARINE-1117. remove wait experiment finish in test
7b0050e [atosystem] SUBMARINE-1117. test github action no wait
e4eaf4a [atosystem] SUBMARINE-1117. test github action
23e0594 [atosystem] SUBMARINE-1117. test github action
f1e8b00 [atosystem] SUBMARINE-1117. test github action
9d7afc7 [atosystem] SUBMARINE-1117. revise hostname to port
45cbe82 [atosystem] SUBMARINE-1117. test github action
4cb1daa [atosystem] SUBMARINE-1117. test github action
ac26b77 [atosystem] SUBMARINE-1117. test github action
3430043 [atosystem] SUBMARINE-1117. test github action
208d0cb [atosystem] SUBMARINE-1117. double check cli config
f2790af [atosystem] SUBMARINE-1117. fix e2e test wrong port
db91cb4 [atosystem] SUBMARINE-1117. change generate_host
4a2985c [atosystem] SUBMARINE-1117. lint
5a9d1cf [atosystem] SUBMARINE-1117. Use config in experiment api lient
ef38ddd [atosystem] SUBMARINE-1117. Change python import path in command.py
0df473a [atosystem] SUBMARINE-1117. Change python import path
6a16b0f [atosystem] SUBMARINE-1117. Set default flag to no-wait
40d0197 [atosystem] SUBMARINE-1117. Add wait and no-wait options for deleting experiments
3083c4f [atosystem] SUBMARINE-1117. Fix code for the comments
2e49d23 [atosystem] SUBMARINE-1117. lint
24ecd4d [atosystem] SUBMARINE-1117. Fix e2e test
d8bd7d6 [atosystem] SUBMARINE-1117. Add Integration Tests for CLI Experiments
7943995 [atosystem] SUBMARINE-1117. Finished CLI for Experiment
  • Loading branch information
atosystem authored and KUAN-HSUN-LI committed Jan 15, 2022
1 parent 0c2d71b commit 28e3b76
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 20 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ jobs:
pip install --no-cache-dir tensorflow-addons
pip install --no-cache-dir tf_slim
pip install -r ./submarine-sdk/pysubmarine/github-actions/test-requirements.txt
- name: Set CLI config for e2e pytest
run: |
submarine config set connection.hostname localhost
submarine config set connection.port 8080
submarine config list
- name: Run integration test
working-directory: ./submarine-sdk/pysubmarine
run: pytest --cov=submarine -vs -m "e2e"
Expand Down
124 changes: 120 additions & 4 deletions submarine-sdk/pysubmarine/submarine/cli/experiment/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,140 @@
under the License.
"""

import json
import time

import click
from rich.console import Console
from rich.json import JSON as richJSON
from rich.panel import Panel
from rich.table import Table

from submarine.cli.config.config import loadConfig
from submarine.client.api.experiment_client import ExperimentClient
from submarine.client.exceptions import ApiException

submarineCliConfig = loadConfig()
if submarineCliConfig is None:
exit(1)
experimentClient = ExperimentClient(
host="http://{}:{}".format(
submarineCliConfig.connection.hostname, submarineCliConfig.connection.port
)
)

POLLING_INTERVAL = 1 # sec
TIMEOUT = 30 # sec


@click.command("experiment")
def list_experiment():
"""List experiments"""
click.echo("list experiment!")
COLS_TO_SHOW = ["Name", "Id", "Tags", "Finished Time", "Created Time", "Running Time", "Status"]
console = Console()
try:
thread = experimentClient.list_experiments_async()
timeout = time.time() + TIMEOUT
with console.status("[bold green] Fetching Experiments..."):
while not thread.ready():
time.sleep(POLLING_INTERVAL)
if time.time() > timeout:
console.print("[bold red] Timeout!")
return

result = thread.get()
results = result.result

results = list(
map(
lambda r: [
r["spec"]["meta"]["name"],
r["experimentId"],
",".join(r["spec"]["meta"]["tags"]),
r["finishedTime"],
r["createdTime"],
r["runningTime"],
r["status"],
],
results,
)
)

table = Table(title="List of Experiments")

for col in COLS_TO_SHOW:
table.add_column(col, overflow="fold")
for res in results:
table.add_row(*res)

console.print(table)

except ApiException as err:
if err.body is not None:
errbody = json.loads(err.body)
click.echo("[Api Error] {}".format(errbody["message"]))
else:
click.echo("[Api Error] {}".format(err))


@click.command("experiment")
@click.argument("id")
def get_experiment(id):
"""Get experiments"""
click.echo("get experiment! id={}".format(id))
console = Console()
try:
thread = experimentClient.get_experiment_async(id)
timeout = time.time() + TIMEOUT
with console.status("[bold green] Fetching Experiment(id = {} )...".format(id)):
while not thread.ready():
time.sleep(POLLING_INTERVAL)
if time.time() > timeout:
console.print("[bold red] Timeout!")
return

result = thread.get()
result = result.result

json_data = richJSON.from_data(result)
console.print(Panel(json_data, title="Experiment(id = {} )".format(id)))
except ApiException as err:
if err.body is not None:
errbody = json.loads(err.body)
click.echo("[Api Error] {}".format(errbody["message"]))
else:
click.echo("[Api Error] {}".format(err))


@click.command("experiment")
@click.argument("id")
def delete_experiment(id):
@click.option("--wait/--no-wait", is_flag=True, default=False)
def delete_experiment(id, wait):
"""Delete experiment"""
click.echo("delete experiment! id={}".format(id))
console = Console()
try:
thread = experimentClient.delete_experiment_async(id)
timeout = time.time() + TIMEOUT
with console.status("[bold green] Deleting Experiment(id = {} )...".format(id)):
while not thread.ready():
time.sleep(POLLING_INTERVAL)
if time.time() > timeout:
console.print("[bold red] Timeout!")
return

result = thread.get()
result = result.result

if wait:
if result["status"] == "Deleted":
console.print("[bold green] Experiment(id = {} ) deleted".format(id))
else:
console.print("[bold red] Failed")
json_data = richJSON.from_data(result)
console.print(Panel(json_data, title="Experiment(id = {} )".format(id)))

except ApiException as err:
if err.body is not None:
errbody = json.loads(err.body)
click.echo("[Api Error] {}".format(errbody["message"]))
else:
click.echo("[Api Error] {}".format(err))
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def generate_host():
"""
submarine_server_dns_name = str(os.environ.get("SUBMARINE_SERVER_DNS_NAME"))
submarine_server_port = str(os.environ.get("SUBMARINE_SERVER_PORT"))
host = submarine_server_dns_name + ":" + submarine_server_port
host = "http://" + submarine_server_dns_name + ":" + submarine_server_port
return host


Expand Down Expand Up @@ -107,6 +107,15 @@ def get_experiment(self, id):
response = self.experiment_api.get_experiment(id=id)
return response.result

def get_experiment_async(self, id):
"""
Get the experiment's detailed info by id (async)
:param id: submarine experiment id
:return: multiprocessing.pool.ApplyResult
"""
thread = self.experiment_api.get_experiment(id=id, async_req=True)
return thread

def list_experiments(self, status=None):
"""
List all experiment for the user
Expand All @@ -116,6 +125,15 @@ def list_experiments(self, status=None):
response = self.experiment_api.list_experiments(status=status)
return response.result

def list_experiments_async(self, status=None):
"""
List all experiment for the user (async)
:param status: Accepted, Created, Running, Succeeded, Deleted
:return: multiprocessing.pool.ApplyResult
"""
thread = self.experiment_api.list_experiments(status=status, async_req=True)
return thread

def delete_experiment(self, id):
"""
Delete the Submarine experiment
Expand All @@ -125,6 +143,15 @@ def delete_experiment(self, id):
response = self.experiment_api.delete_experiment(id)
return response.result

def delete_experiment_async(self, id):
"""
Delete the Submarine experiment (async)
:param id: Submarine experiment id
:return: The detailed info about deleted submarine experiment
"""
thread = self.experiment_api.delete_experiment(id, async_req=True)
return thread

def get_log(self, id, onlyMaster=False):
"""
Get training logs of all pod of the experiment.
Expand Down
85 changes: 70 additions & 15 deletions submarine-sdk/pysubmarine/tests/cli/test_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,84 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from click.testing import CliRunner

import submarine
from submarine.cli import main
from submarine.client.models.code_spec import CodeSpec
from submarine.client.models.environment_spec import EnvironmentSpec
from submarine.client.models.experiment_meta import ExperimentMeta
from submarine.client.models.experiment_spec import ExperimentSpec
from submarine.client.models.experiment_task_spec import ExperimentTaskSpec

TEST_CONSOLE_WIDTH = 191

def test_list_experiment():
runner = CliRunner()
result = runner.invoke(main.entry_point, ["list", "experiment"])

@pytest.mark.e2e
def test_all_experiment_e2e():
"""E2E Test for using submarine CLI to access submarine experiment
To run this test, you should first set
your submarine CLI config `port` to 8080 and `hostname` to localhost
i.e. please execute the commands in your terminal:
submarine config set connection.hostname localhost
submarine config set connection.port 8080
"""
# set env to display full table
runner = CliRunner(env={"COLUMNS": str(TEST_CONSOLE_WIDTH)})
# check if cli config is correct for testing
result = runner.invoke(main.entry_point, ["config", "get", "connection.port"])
assert result.exit_code == 0
assert "list experiment!" in result.output
assert "connection.port={}".format(8080) in result.output

submarine_client = submarine.ExperimentClient(host="http://localhost:8080")
environment = EnvironmentSpec(image="apache/submarine:tf-dist-mnist-test-1.0")
experiment_meta = ExperimentMeta(
name="mnist-dist",
namespace="default",
framework="Tensorflow",
cmd="python /var/tf_dist_mnist/dist_mnist.py --train_steps=100",
env_vars={"ENV1": "ENV1"},
)

def test_get_experiment():
mock_experiment_id = "0"
runner = CliRunner()
result = runner.invoke(main.entry_point, ["get", "experiment", mock_experiment_id])
assert result.exit_code == 0
assert "get experiment! id={}".format(mock_experiment_id) in result.output
worker_spec = ExperimentTaskSpec(resources="cpu=1,memory=1024M", replicas=1)
ps_spec = ExperimentTaskSpec(resources="cpu=1,memory=1024M", replicas=1)

code_spec = CodeSpec(sync_mode="git", url="https://github.com/apache/submarine.git")

experiment_spec = ExperimentSpec(
meta=experiment_meta,
environment=environment,
code=code_spec,
spec={"Ps": ps_spec, "Worker": worker_spec},
)

experiment = submarine_client.create_experiment(experiment_spec=experiment_spec)
experiment = submarine_client.get_experiment(experiment["experimentId"])

def test_delete_experiment():
mock_experiment_id = "0"
runner = CliRunner()
result = runner.invoke(main.entry_point, ["delete", "experiment", mock_experiment_id])
# test list experiment
result = runner.invoke(main.entry_point, ["list", "experiment"])
assert result.exit_code == 0
assert "delete experiment! id={}".format(mock_experiment_id) in result.output
assert "List of Experiments" in result.output
assert experiment["spec"]["meta"]["name"] in result.output
assert experiment["experimentId"] in result.output
assert experiment["createdTime"] in result.output
if experiment["runningTime"] is not None:
assert experiment["runningTime"] in result.output
if experiment["status"] is not None:
assert experiment["status"] in result.output

# test get experiment
result = runner.invoke(main.entry_point, ["get", "experiment", experiment["experimentId"]])
assert "Experiment(id = {} )".format(experiment["experimentId"]) in result.output
assert experiment["spec"]["environment"]["image"] in result.output

# test delete experiment (blocking mode)
result = runner.invoke(
main.entry_point, ["delete", "experiment", experiment["experimentId"], "--wait"]
)
assert "Experiment(id = {} ) deleted".format(experiment["experimentId"]) in result.output

# test get experiment fail after delete
result = runner.invoke(main.entry_point, ["get", "experiment", experiment["experimentId"]])
assert "[Api Error] Not found experiment." in result.output

0 comments on commit 28e3b76

Please sign in to comment.