diff --git a/apiserver/paasng/paas_wl/workloads/processes/drf_serializers.py b/apiserver/paasng/paas_wl/workloads/processes/drf_serializers.py index 0fc179d516..e14336bf85 100644 --- a/apiserver/paasng/paas_wl/workloads/processes/drf_serializers.py +++ b/apiserver/paasng/paas_wl/workloads/processes/drf_serializers.py @@ -151,6 +151,7 @@ class DeploymentOperationSLZ(serializers.Serializer): version_info = VersionInfoSLZ(help_text="版本信息") err_detail = serializers.CharField(help_text="错误原因") + has_requested_int = serializers.BooleanField(help_text="部署是否被中断") class OfflineOperationSLZ(serializers.Serializer): diff --git a/apiserver/paasng/paasng/engine/deploy/bg_wait/base.py b/apiserver/paasng/paasng/engine/deploy/bg_wait/base.py new file mode 100644 index 0000000000..bb5395714e --- /dev/null +++ b/apiserver/paasng/paasng/engine/deploy/bg_wait/base.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making +蓝鲸智云 - PaaS 平台 (BlueKing - PaaS System) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except +in compliance with the License. You may obtain a copy of the License at + + http://opensource.org/licenses/MIT + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +either express or implied. See the License for the specific language governing permissions and +limitations under the License. + +We undertake not to change the open source license (MIT license) applicable +to the current version of the project delivered to anyone in the future. +""" +from typing import Any, Optional + +from pydantic import BaseModel, validator + + +class AbortedDetailsPolicy(BaseModel): + """`policy` field of `AbortedDetails`""" + + reason: str + name: str + is_interrupted: bool = False + + +class AbortedDetails(BaseModel): + """A model for storing aborted details, such as "reason" and other infos + + :param extra_data: reserved field for storing extra info + """ + + aborted: bool + policy: Optional[AbortedDetailsPolicy] + extra_data: Optional[Any] + + @validator('policy', always=True) + def data_not_empty(cls, v, values, **kwargs): + if values.get('aborted') and v is None: + raise ValueError('"data" can not be empty when aborted is "True"!') + return v diff --git a/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_bkapp.py b/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_bkapp.py index 6d5190ca76..364dbbc68b 100644 --- a/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_bkapp.py +++ b/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_bkapp.py @@ -19,17 +19,29 @@ """Wait for BkApp's reconciliation cycle to be stable""" import datetime import logging -from typing import Optional +import time +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Tuple -from blue_krill.async_utils.poll_task import CallbackHandler, CallbackResult, PollingResult, TaskPoller +from blue_krill.async_utils.poll_task import ( + CallbackHandler, + CallbackResult, + PollingMetadata, + PollingResult, + PollingStatus, + TaskPoller, +) from django.utils import timezone +from pydantic import ValidationError as PyDanticValidationError from paas_wl.cnative.specs.constants import CNATIVE_DEPLOY_STATUS_POLLING_FAILURE_LIMITS, DeployStatus from paas_wl.cnative.specs.models import AppModelDeploy from paas_wl.cnative.specs.resource import ModelResState, MresConditionParser, get_mres_from_cluster from paas_wl.cnative.specs.signals import post_cnative_env_deploy from paasng.engine.constants import JobStatus +from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy from paasng.engine.exceptions import StepNotInPresetListError +from paasng.engine.models import Deployment from paasng.engine.models.phases import DeployPhaseTypes from paasng.engine.workflow.flow import DeploymentStateMgr from paasng.platform.applications.models import ModuleEnvironment @@ -37,24 +49,121 @@ logger = logging.getLogger(__name__) -class AppModelDeployStatusPoller(TaskPoller): +class AbortPolicy(ABC): + """Base class for wait policy""" + + @abstractmethod + def get_reason(self) -> str: + """Reason of abortion""" + + @property + def is_interrupted(self) -> bool: + """If the wait procedure was aborted by current policy, whether is was considered as an interruption + or a regular failure. + """ + return False + + @abstractmethod + def evaluate(self, already_waited: float, extra_params: Dict) -> bool: + """Determine if current wait procedure should be aborted + + :param already_waited: how long since current procedure has been started, in seconds + :param extra_params: extra params of current procedure + """ + + +class UserInterruptedPolicy(AbortPolicy): + """Abort procedure when user requested an interruption""" + + def get_reason(self) -> str: + return 'User interrupted release' + + @property + def is_interrupted(self) -> bool: + return True + + def evaluate(self, already_waited: float, params: Dict) -> bool: + deployment_id = params.get('deployment_id') + if not deployment_id: + logger.warning('Deployment was not provided for UserInterruptedPolicy, will not proceed.') + return False + + try: + deployment = Deployment.objects.get(pk=deployment_id) + except Deployment.DoesNotExist: + logger.warning('Deployment not exists for UserInterruptedPolicy, will not proceed.') + return False + return bool(deployment.release_int_requested_at) + + +class WaitProcedurePoller(TaskPoller): + """Base class of process waiting procedure + + `params` schema: + :param module_env_id: id of ModuleEnvironment object + """ + + # Abort policies were extra rules which were used to break current polling procedure + abort_policies: List[AbortPolicy] = [] + + def __init__(self, params: Dict, metadata: PollingMetadata): + super().__init__(params, metadata) + self.env = ModuleEnvironment.objects.get(pk=self.params['env_id']) + + def query(self) -> PollingResult: + already_waited = time.time() - self.metadata.query_started_at + logger.info(f'wait procedure started {already_waited} seconds, env: {self.env}') + # Check all abort policies + for policy in self.abort_policies: + if policy.evaluate(already_waited, self.params): + policy_name = policy.__class__.__name__ + logger.info(f'AbortPolicy: {policy_name} evaluated, got positive result, abort current procedure') + return PollingResult( + PollingStatus.DONE, + data=AbortedDetails( + aborted=True, + policy=AbortedDetailsPolicy( + reason=policy.get_reason(), + name=policy_name, + is_interrupted=policy.is_interrupted, + ), + ).dict(), + ) + + polling_result = self.get_status() + if polling_result.status != PollingStatus.DOING: + # reserve original data in `extra_data` field + polling_result.data = AbortedDetails(aborted=False, extra_data=polling_result.data).dict() + return polling_result + + def get_status(self) -> PollingResult: + raise NotImplementedError() + + +class WaitAppModelReady(WaitProcedurePoller): """A task poller to query status for fresh AppModelDeploy objects It takes below params: - - deploy_id: int, ID of AppModelDeploy object + `params` schema: + :param module_env_id: id of ModuleEnvironment object + :param deploy_id: int, ID of AppModelDeploy object + :param deployment_id: Optional[uuid]: ID of Deployment object """ # over 15 min considered as timeout overall_timeout_seconds = 15 * 60 + # Abort policies were extra rules which were used to break current polling procedure + abort_policies: List[AbortPolicy] = [UserInterruptedPolicy()] - def query(self) -> PollingResult: + def get_status(self) -> PollingResult: dp = AppModelDeploy.objects.get(id=self.params['deploy_id']) mres = get_mres_from_cluster( ModuleEnvironment.objects.get( application_id=dp.application_id, module_id=dp.module_id, environment=dp.environment_name ) ) + if not mres: return PollingResult.doing() @@ -90,13 +199,19 @@ class DeployStatusHandler(CallbackHandler): def handle(self, result: CallbackResult, poller: TaskPoller): dp = AppModelDeploy.objects.get(id=poller.params['deploy_id']) - if result.is_exception: + + is_interrupted, err_message, extra_data = self.parse_result(result) + if result.is_exception or (not is_interrupted and err_message): logger.warning('Error polling AppModelDeploy status, result: %s', result) - state = ModelResState(DeployStatus.ERROR, 'internal', 'error polling deploy status') + state = ModelResState(DeployStatus.ERROR, 'internal', err_message) + update_status(dp, state) + elif is_interrupted: + logger.warning('polling AppModelDeploy is interrupted') + state = ModelResState(DeployStatus.UNKNOWN, 'interrupted', err_message) update_status(dp, state) else: logger.info('Update AppModelDeploy status with data: %s', result.data) - update_status(dp, result.data['state'], last_transition_time=result.data['last_update']) + update_status(dp, extra_data['state'], last_transition_time=extra_data['last_update']) dp.refresh_from_db() # 需要更新 deploy step 的状态 @@ -105,7 +220,7 @@ def handle(self, result: CallbackResult, poller: TaskPoller): state_mgr = DeploymentStateMgr.from_deployment_id( deployment_id=deployment_id, phase_type=DeployPhaseTypes.RELEASE ) - job_status = deploy_status_to_job_status(dp.status) + job_status = deploy_status_to_job_status(dp.status) if not is_interrupted else JobStatus.INTERRUPTED try: step_obj = state_mgr.phase.get_step_by_name(name="检测部署结果") step_obj.mark_and_write_to_stream(state_mgr.stream, job_status) @@ -117,6 +232,38 @@ def handle(self, result: CallbackResult, poller: TaskPoller): # 在部署流程结束后,发送信号触发操作审计等后续步骤(不支持创建监控告警规则) post_cnative_env_deploy.send(dp.environment, deploy=dp) + def parse_result(self, result: CallbackResult) -> Tuple[bool, str, Dict]: + """Get detailed error message. if error message was empty, release was considered succeeded + + :returns: (is_interrupted, error_msg, extra_data) + """ + if result.is_exception: + return False, "error polling deploy status", result.data + + aborted_details = self.get_aborted_details(result) + if not aborted_details: + return False, "invalid polling result", result.data + + if aborted_details.aborted: + assert aborted_details.policy is not None, 'policy must not be None' # Make type checker happy + return ( + aborted_details.policy.is_interrupted, + aborted_details.policy.reason, + aborted_details.extra_data or {}, + ) + + # if error message was empty, release was considered succeeded + return False, "", aborted_details.extra_data or {} + + @staticmethod + def get_aborted_details(result: CallbackResult) -> Optional[AbortedDetails]: + """If current release was aborted, return detailed info""" + try: + details = AbortedDetails.parse_obj(result.data) + except PyDanticValidationError: + return None + return details + def update_status(dp: AppModelDeploy, state: ModelResState, last_transition_time: Optional[datetime.datetime] = None): """Update deployment status, `last_transition_time` will always be updated diff --git a/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_deployment.py b/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_deployment.py index da6964c532..b765138762 100644 --- a/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_deployment.py +++ b/apiserver/paasng/paasng/engine/deploy/bg_wait/wait_deployment.py @@ -16,14 +16,15 @@ We undertake not to change the open source license (MIT license) applicable to the current version of the project delivered to anyone in the future. """ +from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy + """Wait processes to match certain conditions""" import logging import time from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional, Type +from typing import Dict, List, Optional, Type from blue_krill.async_utils.poll_task import PollingMetadata, PollingResult, PollingStatus, TaskPoller -from pydantic import BaseModel, validator from paas_wl.workloads.processes.processes import PlainProcess from paas_wl.workloads.processes.shim import ProcessManager @@ -274,28 +275,3 @@ def get_status(self, processes: List[PlainProcess]) -> PollingResult: logger.info(f'All processes has been updated to {self.release_version}, env: {self.env}') return PollingResult.done() - - -class AbortedDetailsPolicy(BaseModel): - """`policy` field of `AbortedDetails`""" - - reason: str - name: str - is_interrupted: bool = False - - -class AbortedDetails(BaseModel): - """A model for storing aborted details, such as "reason" and other infos - - :param extra_data: reserved field for storing extra info - """ - - aborted: bool - policy: Optional[AbortedDetailsPolicy] - extra_data: Optional[Any] - - @validator('policy', always=True) - def data_not_empty(cls, v, values, **kwargs): - if values.get('aborted') and v is None: - raise ValueError('"data" can not be empty when aborted is "True"!') - return v diff --git a/apiserver/paasng/paasng/engine/deploy/release/legacy.py b/apiserver/paasng/paasng/engine/deploy/release/legacy.py index 8e0ee8c2c4..054f67f0a6 100644 --- a/apiserver/paasng/paasng/engine/deploy/release/legacy.py +++ b/apiserver/paasng/paasng/engine/deploy/release/legacy.py @@ -37,7 +37,8 @@ from paasng.engine.configurations.image import update_image_runtime_config from paasng.engine.configurations.ingress import AppDefaultDomains, AppDefaultSubpaths from paasng.engine.constants import JobStatus, ReleaseStatus -from paasng.engine.deploy.bg_wait.wait_deployment import AbortedDetails, wait_for_release +from paasng.engine.deploy.bg_wait.base import AbortedDetails +from paasng.engine.deploy.bg_wait.wait_deployment import wait_for_release from paasng.engine.exceptions import StepNotInPresetListError from paasng.engine.models.deployment import Deployment from paasng.engine.models.phases import DeployPhaseTypes diff --git a/apiserver/paasng/paasng/engine/deploy/release/operator.py b/apiserver/paasng/paasng/engine/deploy/release/operator.py index 3df30def2d..ad46e3284e 100644 --- a/apiserver/paasng/paasng/engine/deploy/release/operator.py +++ b/apiserver/paasng/paasng/engine/deploy/release/operator.py @@ -33,7 +33,7 @@ from paas_wl.resources.base.kres import KNamespace from paas_wl.resources.utils.basic import get_client_by_app from paasng.engine.constants import JobStatus -from paasng.engine.deploy.bg_wait.wait_bkapp import AppModelDeployStatusPoller, DeployStatusHandler +from paasng.engine.deploy.bg_wait.wait_bkapp import DeployStatusHandler, WaitAppModelReady from paasng.engine.exceptions import StepNotInPresetListError from paasng.engine.models.phases import DeployPhaseTypes from paasng.engine.workflow import DeployStep @@ -156,8 +156,8 @@ def release_by_k8s_operator( # TODO: 统计成功 metrics # Poll status in background - AppModelDeployStatusPoller.start( - {'deploy_id': app_model_deploy.id, 'deployment_id': deployment_id}, DeployStatusHandler + WaitAppModelReady.start( + {'env_id': env.id, 'deploy_id': app_model_deploy.id, 'deployment_id': deployment_id}, DeployStatusHandler ) return str(app_model_deploy.id) diff --git a/apiserver/paasng/tests/api/apigw/test_cnative.py b/apiserver/paasng/tests/api/apigw/test_cnative.py index d47586b11a..5fd09cc2d9 100644 --- a/apiserver/paasng/tests/api/apigw/test_cnative.py +++ b/apiserver/paasng/tests/api/apigw/test_cnative.py @@ -94,7 +94,7 @@ def test_create(self, api_client, bk_app, bk_module, bk_stag_env, with_wl_apps, } # Mock out the interactions with k8s cluster with mock.patch("paasng.engine.deploy.release.operator.apply_bkapp_to_k8s", return_value=manifest), mock.patch( - 'paasng.engine.deploy.release.operator.AppModelDeployStatusPoller.start', + 'paasng.engine.deploy.release.operator.WaitAppModelReady.start', return_value=None, ), mock.patch("paasng.engine.deploy.release.operator.svc_disc"), mock.patch( "paasng.engine.deploy.release.operator.ensure_namespace" diff --git a/apiserver/paasng/tests/engine/processes/test_wait.py b/apiserver/paasng/tests/engine/processes/test_wait.py index 267f2dc60d..7a3c5d1dc7 100644 --- a/apiserver/paasng/tests/engine/processes/test_wait.py +++ b/apiserver/paasng/tests/engine/processes/test_wait.py @@ -28,9 +28,8 @@ from blue_krill.async_utils.poll_task import PollingMetadata, PollingStatus from django.dispatch import receiver +from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy from paasng.engine.deploy.bg_wait.wait_deployment import ( - AbortedDetails, - AbortedDetailsPolicy, DynamicReadyTimeoutPolicy, TooManyRestartsPolicy, UserInterruptedPolicy, diff --git a/apiserver/paasng/tests/paas_wl/cnative/specs/test_tasks.py b/apiserver/paasng/tests/paas_wl/cnative/specs/test_tasks.py index 937f63cb79..65751ce8ba 100644 --- a/apiserver/paasng/tests/paas_wl/cnative/specs/test_tasks.py +++ b/apiserver/paasng/tests/paas_wl/cnative/specs/test_tasks.py @@ -26,7 +26,7 @@ from paas_wl.cnative.specs.constants import DeployStatus, MResConditionType, MResPhaseType from paas_wl.cnative.specs.resource import ModelResState -from paasng.engine.deploy.bg_wait.wait_bkapp import AppModelDeployStatusPoller, DeployStatusHandler +from paasng.engine.deploy.bg_wait.wait_bkapp import AbortedDetails, DeployStatusHandler, WaitAppModelReady from tests.paas_wl.cnative.specs.utils import create_cnative_deploy, create_condition, create_res_with_conds pytestmark = pytest.mark.django_db(databases=["default", "workloads"]) @@ -41,10 +41,10 @@ def dp(bk_stag_env, bk_stag_wl_app, bk_user): def poller(bk_stag_env, dp): """A poller fixture for testing""" metadata = PollingMetadata(retries=0, query_started_at=time.time(), queried_count=1) - return AppModelDeployStatusPoller(params={'deploy_id': dp.id}, metadata=metadata) + return WaitAppModelReady(params={'env_id': bk_stag_env.id, 'deploy_id': dp.id}, metadata=metadata) -class TestAppModelDeployStatusPoller: +class TestWaitAppModelReady: @patch('paasng.engine.deploy.bg_wait.wait_bkapp.get_mres_from_cluster', return_value=create_res_with_conds([])) def test_pending(self, mocker, dp, poller): ret = poller.query() @@ -71,8 +71,10 @@ def test_progressing(self, mocker, dp, poller): def test_stable(self, mocker, poller): ret = poller.query() assert ret.status == PollingStatus.DONE - assert 'state' in ret.data - assert 'last_update' in ret.data + + extra_data = ret.data["extra_data"] + assert 'state' in extra_data + assert 'last_update' in extra_data class TestDeployStatusHandler: @@ -86,10 +88,13 @@ def test_handle_ready(self, dp, poller): DeployStatusHandler().handle( result=CallbackResult( status=CallbackStatus.NORMAL, - data={ - 'state': ModelResState(DeployStatus.READY, 'ready', 'foo ready'), - 'last_update': arrow.get('2020-10-10').datetime, - }, + data=AbortedDetails( + aborted=False, + extra_data={ + 'state': ModelResState(DeployStatus.READY, 'ready', 'foo ready'), + 'last_update': arrow.get('2020-10-10').datetime, + }, + ).dict(), ), poller=poller, ) @@ -100,3 +105,22 @@ def test_handle_ready(self, dp, poller): assert dp.reason == 'ready' assert dp.message == 'foo ready' assert dp.last_transition_time.date() == datetime.date(2020, 10, 10) + + def test_is_interrupted(self, dp, poller): + DeployStatusHandler().handle( + result=CallbackResult( + status=CallbackStatus.NORMAL, + data=AbortedDetails( + aborted=True, + policy={"is_interrupted": True, "reason": "User interrupted release", "name": ""}, + extra_data={}, + ).dict(), + ), + poller=poller, + ) + + dp.refresh_from_db() + # Assert deploy has been updated + assert dp.status == DeployStatus.UNKNOWN + assert dp.reason == 'interrupted' + assert dp.message == 'User interrupted release'