Skip to content

Commit

Permalink
handle setting enforce_parameter_schema in prefect.yaml (#16607)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jan 7, 2025
1 parent 87bc756 commit b1810d9
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 38 deletions.
65 changes: 38 additions & 27 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import timedelta
from getpass import GetPassWarning
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from uuid import UUID

import pydantic
Expand Down Expand Up @@ -398,7 +398,7 @@ async def deploy(
)
)

options = {
options: dict[str, Any] = {
"entrypoint": entrypoint,
"description": description,
"version": version,
Expand All @@ -417,11 +417,12 @@ async def deploy(
"params": params,
"sla": sla,
}

try:
deploy_configs, actions = _load_deploy_configs_and_actions(
prefect_file=prefect_file,
)
parsed_names = []
parsed_names: list[str] = []
for name in names or []:
if "*" in name:
parsed_names.extend(_parse_name_from_pattern(deploy_configs, name))
Expand Down Expand Up @@ -450,14 +451,18 @@ async def deploy(
prefect_file=prefect_file,
)
else:
deploy_config = deploy_configs[0] if deploy_configs else {}
# Accommodate passing in -n flow-name/deployment-name as well as -n deployment-name
options["names"] = [
name.split("/", 1)[-1] if "/" in name else name for name in parsed_names
]
options["enforce_parameter_schema"] = enforce_parameter_schema

# Only set enforce_parameter_schema in options if CLI flag was explicitly passed
if not enforce_parameter_schema:
options["enforce_parameter_schema"] = False

await _run_single_deploy(
deploy_config=deploy_configs[0] if deploy_configs else {},
deploy_config=deploy_config,
actions=actions,
options=options,
prefect_file=prefect_file,
Expand Down Expand Up @@ -670,7 +675,7 @@ async def _run_single_deploy(
)

## RUN BUILD AND PUSH STEPS
step_outputs = {}
step_outputs: dict[str, Any] = {}
if build_steps:
app.console.print("Running deployment build steps...")
step_outputs.update(
Expand Down Expand Up @@ -1232,12 +1237,12 @@ async def _generate_default_pull_action(

def _load_deploy_configs_and_actions(
prefect_file: Path,
) -> Tuple[List[Dict], Dict]:
) -> tuple[list[dict[str, Any]], dict[str, Any]]:
"""
Load deploy configs and actions from a deployment configuration YAML file.
Returns:
Tuple[List[Dict], Dict]: a tuple of deployment configurations and actions
tuple[list[dict[str, Any]], dict[str, Any]]: a tuple of deployment configurations and actions
"""
try:
with prefect_file.open("r") as f:
Expand All @@ -1247,15 +1252,15 @@ def _load_deploy_configs_and_actions(
f"Unable to read the specified config file. Reason: {exc}. Skipping.",
style="yellow",
)
prefect_yaml_contents = {}
prefect_yaml_contents: dict[str, Any] = {}
if not isinstance(prefect_yaml_contents, dict):
app.console.print(
"Unable to parse the specified config file. Skipping.",
style="yellow",
)
prefect_yaml_contents = {}
prefect_yaml_contents: dict[str, Any] = {}

actions = {
actions: dict[str, Any] = {
"build": prefect_yaml_contents.get("build", []),
"push": prefect_yaml_contents.get("push", []),
"pull": prefect_yaml_contents.get("pull", []),
Expand All @@ -1266,7 +1271,9 @@ def _load_deploy_configs_and_actions(
return deploy_configs, actions


def _handle_pick_deploy_without_name(deploy_configs):
def _handle_pick_deploy_without_name(
deploy_configs: list[dict[str, Any]],
) -> list[dict[str, Any]]:
# Prompt the user to select one or more deployment configurations
selectable_deploy_configs = [
deploy_config for deploy_config in deploy_configs if deploy_config.get("name")
Expand Down Expand Up @@ -1331,7 +1338,9 @@ def _filter_matching_deploy_config(name, deploy_configs):
return matching_deployments


def _parse_name_from_pattern(deploy_configs, name_pattern):
def _parse_name_from_pattern(
deploy_configs: list[dict[str, Any]], name_pattern: str
) -> list[str]:
"""
Parse the deployment names from a user-provided pattern such as "flow-name/*" or "my-deployment-*"
Expand All @@ -1353,7 +1362,7 @@ def _parse_name_from_pattern(deploy_configs, name_pattern):
Returns:
List[str]: a list of deployment names that match the given pattern
"""
parsed_names = []
parsed_names: list[str] = []

name_pattern = re.escape(name_pattern).replace(r"\*", ".*")

Expand Down Expand Up @@ -1390,11 +1399,11 @@ def _parse_name_from_pattern(deploy_configs, name_pattern):


def _handle_pick_deploy_with_name(
deploy_configs,
names,
):
matched_deploy_configs = []
deployment_names = []
deploy_configs: list[dict[str, Any]],
names: list[str],
) -> list[dict[str, Any]]:
matched_deploy_configs: list[dict[str, Any]] = []
deployment_names: list[str] = []
for name in names:
matching_deployments = _filter_matching_deploy_config(name, deploy_configs)

Expand Down Expand Up @@ -1430,10 +1439,10 @@ def _handle_pick_deploy_with_name(


def _pick_deploy_configs(
deploy_configs,
names,
deploy_all,
):
deploy_configs: list[dict[str, Any]],
names: list[str],
deploy_all: bool,
) -> list[dict[str, Any]]:
"""
Return a list of deploy configs to deploy based on the given
deploy configs, names, and deploy_all flag.
Expand Down Expand Up @@ -1480,7 +1489,7 @@ def _pick_deploy_configs(
return []


def _extract_variable(variable: str) -> Dict[str, Any]:
def _extract_variable(variable: str) -> dict[str, Any]:
"""
Extracts a variable from a string. Variables can be in the format
key=value or a JSON object.
Expand All @@ -1505,7 +1514,9 @@ def _extract_variable(variable: str) -> Dict[str, Any]:
) from e


def _apply_cli_options_to_deploy_config(deploy_config, cli_options):
def _apply_cli_options_to_deploy_config(
deploy_config: dict[str, Any], cli_options: dict[str, Any]
) -> dict[str, Any]:
"""
Applies CLI options to a deploy config. CLI options take
precedence over values in the deploy config.
Expand All @@ -1528,7 +1539,7 @@ def _apply_cli_options_to_deploy_config(deploy_config, cli_options):
if len(cli_options.get("names", [])) == 1:
deploy_config["name"] = cli_options["names"][0]

variable_overrides = {}
variable_overrides: dict[str, Any] = {}
for cli_option, cli_value in cli_options.items():
if (
cli_option
Expand Down Expand Up @@ -1570,7 +1581,7 @@ def _apply_cli_options_to_deploy_config(deploy_config, cli_options):
deploy_config["schedules"].append({cli_option: value})

elif cli_option in ["param", "params"] and cli_value:
parameters = dict()
parameters: dict[str, Any] = {}
if cli_option == "param":
for p in cli_value or []:
k, unparsed_value = p.split("=", 1)
Expand Down
64 changes: 53 additions & 11 deletions tests/cli/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tempfile
from datetime import timedelta
from pathlib import Path
from typing import Optional
from typing import Any, Optional
from unittest import mock
from uuid import UUID, uuid4

Expand Down Expand Up @@ -290,7 +290,7 @@ async def test_project_deploy(self, project_dir, prefect_client: PrefectClient):
assert deployment.enforce_parameter_schema

async def test_deploy_with_no_enforce_parameter_schema(
self, project_dir, work_pool, prefect_client
self, project_dir: Path, work_pool: WorkPool, prefect_client: PrefectClient
):
await run_sync_in_worker_thread(
invoke_and_assert,
Expand Down Expand Up @@ -1793,7 +1793,9 @@ async def test_project_deploy_exits_with_no_entrypoint_configured(self, work_poo
)

@pytest.mark.usefixtures("interactive_console", "project_dir")
async def test_deploy_without_name_interactive(self, work_pool, prefect_client):
async def test_deploy_without_name_interactive(
self, work_pool: WorkPool, prefect_client: PrefectClient
):
await run_sync_in_worker_thread(
invoke_and_assert,
command=(
Expand Down Expand Up @@ -1837,7 +1839,7 @@ async def test_deploy_without_work_pool_non_interactive(self):

@pytest.mark.usefixtures("interactive_console", "project_dir")
async def test_deploy_without_work_pool_interactive(
self, work_pool, prefect_client
self, work_pool: WorkPool, prefect_client: PrefectClient
):
await run_sync_in_worker_thread(
invoke_and_assert,
Expand Down Expand Up @@ -1885,7 +1887,10 @@ async def test_deploy_with_prefect_agent_work_pool_non_interactive(

@pytest.mark.usefixtures("interactive_console", "project_dir")
async def test_deploy_with_prefect_agent_work_pool_interactive(
self, work_pool, prefect_client, default_agent_pool
self,
work_pool: WorkPool,
prefect_client: PrefectClient,
default_agent_pool: WorkPool,
):
await run_sync_in_worker_thread(
invoke_and_assert,
Expand Down Expand Up @@ -1921,7 +1926,9 @@ async def test_deploy_with_prefect_agent_work_pool_interactive(
assert deployment.entrypoint == "./flows/hello.py:my_flow"

@pytest.mark.usefixtures("interactive_console", "project_dir")
async def test_deploy_with_push_pool_no_worker_start_message(self, push_work_pool):
async def test_deploy_with_push_pool_no_worker_start_message(
self, push_work_pool: WorkPool
):
await run_sync_in_worker_thread(
invoke_and_assert,
command=(
Expand All @@ -1943,7 +1950,9 @@ async def test_deploy_with_push_pool_no_worker_start_message(self, push_work_poo
)

@pytest.mark.usefixtures("interactive_console", "project_dir")
async def test_deploy_with_no_available_work_pool_interactive(self, prefect_client):
async def test_deploy_with_no_available_work_pool_interactive(
self, prefect_client: PrefectClient
):
await run_sync_in_worker_thread(
invoke_and_assert,
command="deploy ./flows/hello.py:my_flow -n test-name --interval 3600",
Expand Down Expand Up @@ -1985,7 +1994,7 @@ async def test_deploy_with_no_available_work_pool_interactive(self, prefect_clie

@pytest.mark.usefixtures("project_dir")
async def test_deploy_with_entrypoint_does_not_fail_with_missing_prefect_folder(
self, work_pool
self, work_pool: WorkPool
):
Path(".prefect").rmdir()
await run_sync_in_worker_thread(
Expand All @@ -2000,7 +2009,7 @@ async def test_deploy_with_entrypoint_does_not_fail_with_missing_prefect_folder(
@pytest.mark.parametrize("schedule_value", [None, {}])
@pytest.mark.usefixtures("project_dir", "interactive_console")
async def test_deploy_does_not_prompt_schedule_when_empty_schedule_prefect_yaml(
self, schedule_value, work_pool, prefect_client
self, schedule_value: Any, work_pool: WorkPool, prefect_client: PrefectClient
):
prefect_yaml_file = Path("prefect.yaml")
with prefect_yaml_file.open(mode="r") as f:
Expand Down Expand Up @@ -2042,7 +2051,7 @@ async def test_deploy_does_not_prompt_schedule_when_empty_schedule_prefect_yaml(
@pytest.mark.parametrize("build_value", [None, {}])
@pytest.mark.usefixtures("project_dir", "interactive_console")
async def test_deploy_does_not_prompt_build_docker_image_when_empty_build_action_prefect_yaml(
self, build_value, work_pool, prefect_client
self, build_value, work_pool: WorkPool, prefect_client: PrefectClient
):
prefect_yaml_file = Path("prefect.yaml")
with prefect_yaml_file.open(mode="r") as f:
Expand Down Expand Up @@ -2083,7 +2092,7 @@ async def test_deploy_does_not_prompt_build_docker_image_when_empty_build_action
)

async def test_deploy_with_bad_run_shell_script_raises(
self, project_dir, work_pool
self, project_dir: Path, work_pool: WorkPool
):
"""
Regression test for a bug where deployment steps would continue even when
Expand Down Expand Up @@ -2274,6 +2283,39 @@ async def test_git_option_present_when_remote_url(
),
)

@pytest.mark.usefixtures("project_dir")
async def test_deploy_respects_yaml_enforce_parameter_schema(
self, work_pool: WorkPool, prefect_client: PrefectClient
):
prefect_yaml_file = Path("prefect.yaml")
with prefect_yaml_file.open(mode="r") as f:
deploy_config = yaml.safe_load(f)

deploy_config["deployments"] = [
{
"name": "test-name",
"entrypoint": "flows/hello.py:my_flow",
"work_pool": {
"name": work_pool.name,
},
"enforce_parameter_schema": False,
}
]

with prefect_yaml_file.open(mode="w") as f:
yaml.safe_dump(deploy_config, f)

await run_sync_in_worker_thread(
invoke_and_assert,
command="deploy -n test-name",
expected_code=0,
)

deployment = await prefect_client.read_deployment_by_name(
"An important name/test-name"
)
assert not deployment.enforce_parameter_schema


class TestSchedules:
@pytest.mark.usefixtures("project_dir")
Expand Down

0 comments on commit b1810d9

Please sign in to comment.