Skip to content

Commit

Permalink
feature: 云原生应用 部署阶段 支持中断 (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
shabbywu authored Sep 19, 2023
1 parent 56be7d1 commit d1e5ec3
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class DeploymentOperationSLZ(serializers.Serializer):

version_info = VersionInfoSLZ(help_text="版本信息")
err_detail = serializers.CharField(help_text="错误原因")
has_requested_int = serializers.BooleanField(help_text="部署是否被中断")


class OfflineOperationSLZ(serializers.Serializer):
Expand Down
46 changes: 46 additions & 0 deletions apiserver/paasng/paasng/engine/deploy/bg_wait/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸智云 - PaaS 平台 (BlueKing - PaaS System) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the specific language governing permissions and
limitations under the License.
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""
from typing import Any, Optional

from pydantic import BaseModel, validator


class AbortedDetailsPolicy(BaseModel):
"""`policy` field of `AbortedDetails`"""

reason: str
name: str
is_interrupted: bool = False


class AbortedDetails(BaseModel):
"""A model for storing aborted details, such as "reason" and other infos
:param extra_data: reserved field for storing extra info
"""

aborted: bool
policy: Optional[AbortedDetailsPolicy]
extra_data: Optional[Any]

@validator('policy', always=True)
def data_not_empty(cls, v, values, **kwargs):
if values.get('aborted') and v is None:
raise ValueError('"data" can not be empty when aborted is "True"!')
return v
165 changes: 156 additions & 9 deletions apiserver/paasng/paasng/engine/deploy/bg_wait/wait_bkapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,151 @@
"""Wait for BkApp's reconciliation cycle to be stable"""
import datetime
import logging
from typing import Optional
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple

from blue_krill.async_utils.poll_task import CallbackHandler, CallbackResult, PollingResult, TaskPoller
from blue_krill.async_utils.poll_task import (
CallbackHandler,
CallbackResult,
PollingMetadata,
PollingResult,
PollingStatus,
TaskPoller,
)
from django.utils import timezone
from pydantic import ValidationError as PyDanticValidationError

from paas_wl.cnative.specs.constants import CNATIVE_DEPLOY_STATUS_POLLING_FAILURE_LIMITS, DeployStatus
from paas_wl.cnative.specs.models import AppModelDeploy
from paas_wl.cnative.specs.resource import ModelResState, MresConditionParser, get_mres_from_cluster
from paas_wl.cnative.specs.signals import post_cnative_env_deploy
from paasng.engine.constants import JobStatus
from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy
from paasng.engine.exceptions import StepNotInPresetListError
from paasng.engine.models import Deployment
from paasng.engine.models.phases import DeployPhaseTypes
from paasng.engine.workflow.flow import DeploymentStateMgr
from paasng.platform.applications.models import ModuleEnvironment

logger = logging.getLogger(__name__)


class AppModelDeployStatusPoller(TaskPoller):
class AbortPolicy(ABC):
"""Base class for wait policy"""

@abstractmethod
def get_reason(self) -> str:
"""Reason of abortion"""

@property
def is_interrupted(self) -> bool:
"""If the wait procedure was aborted by current policy, whether is was considered as an interruption
or a regular failure.
"""
return False

@abstractmethod
def evaluate(self, already_waited: float, extra_params: Dict) -> bool:
"""Determine if current wait procedure should be aborted
:param already_waited: how long since current procedure has been started, in seconds
:param extra_params: extra params of current procedure
"""


class UserInterruptedPolicy(AbortPolicy):
"""Abort procedure when user requested an interruption"""

def get_reason(self) -> str:
return 'User interrupted release'

@property
def is_interrupted(self) -> bool:
return True

def evaluate(self, already_waited: float, params: Dict) -> bool:
deployment_id = params.get('deployment_id')
if not deployment_id:
logger.warning('Deployment was not provided for UserInterruptedPolicy, will not proceed.')
return False

try:
deployment = Deployment.objects.get(pk=deployment_id)
except Deployment.DoesNotExist:
logger.warning('Deployment not exists for UserInterruptedPolicy, will not proceed.')
return False
return bool(deployment.release_int_requested_at)


class WaitProcedurePoller(TaskPoller):
"""Base class of process waiting procedure
`params` schema:
:param module_env_id: id of ModuleEnvironment object
"""

# Abort policies were extra rules which were used to break current polling procedure
abort_policies: List[AbortPolicy] = []

def __init__(self, params: Dict, metadata: PollingMetadata):
super().__init__(params, metadata)
self.env = ModuleEnvironment.objects.get(pk=self.params['env_id'])

def query(self) -> PollingResult:
already_waited = time.time() - self.metadata.query_started_at
logger.info(f'wait procedure started {already_waited} seconds, env: {self.env}')
# Check all abort policies
for policy in self.abort_policies:
if policy.evaluate(already_waited, self.params):
policy_name = policy.__class__.__name__
logger.info(f'AbortPolicy: {policy_name} evaluated, got positive result, abort current procedure')
return PollingResult(
PollingStatus.DONE,
data=AbortedDetails(
aborted=True,
policy=AbortedDetailsPolicy(
reason=policy.get_reason(),
name=policy_name,
is_interrupted=policy.is_interrupted,
),
).dict(),
)

polling_result = self.get_status()
if polling_result.status != PollingStatus.DOING:
# reserve original data in `extra_data` field
polling_result.data = AbortedDetails(aborted=False, extra_data=polling_result.data).dict()
return polling_result

def get_status(self) -> PollingResult:
raise NotImplementedError()


class WaitAppModelReady(WaitProcedurePoller):
"""A task poller to query status for fresh AppModelDeploy objects
It takes below params:
- deploy_id: int, ID of AppModelDeploy object
`params` schema:
:param module_env_id: id of ModuleEnvironment object
:param deploy_id: int, ID of AppModelDeploy object
:param deployment_id: Optional[uuid]: ID of Deployment object
"""

# over 15 min considered as timeout
overall_timeout_seconds = 15 * 60
# Abort policies were extra rules which were used to break current polling procedure
abort_policies: List[AbortPolicy] = [UserInterruptedPolicy()]

def query(self) -> PollingResult:
def get_status(self) -> PollingResult:
dp = AppModelDeploy.objects.get(id=self.params['deploy_id'])
mres = get_mres_from_cluster(
ModuleEnvironment.objects.get(
application_id=dp.application_id, module_id=dp.module_id, environment=dp.environment_name
)
)

if not mres:
return PollingResult.doing()

Expand Down Expand Up @@ -90,13 +199,19 @@ class DeployStatusHandler(CallbackHandler):

def handle(self, result: CallbackResult, poller: TaskPoller):
dp = AppModelDeploy.objects.get(id=poller.params['deploy_id'])
if result.is_exception:

is_interrupted, err_message, extra_data = self.parse_result(result)
if result.is_exception or (not is_interrupted and err_message):
logger.warning('Error polling AppModelDeploy status, result: %s', result)
state = ModelResState(DeployStatus.ERROR, 'internal', 'error polling deploy status')
state = ModelResState(DeployStatus.ERROR, 'internal', err_message)
update_status(dp, state)
elif is_interrupted:
logger.warning('polling AppModelDeploy is interrupted')
state = ModelResState(DeployStatus.UNKNOWN, 'interrupted', err_message)
update_status(dp, state)
else:
logger.info('Update AppModelDeploy status with data: %s', result.data)
update_status(dp, result.data['state'], last_transition_time=result.data['last_update'])
update_status(dp, extra_data['state'], last_transition_time=extra_data['last_update'])

dp.refresh_from_db()
# 需要更新 deploy step 的状态
Expand All @@ -105,7 +220,7 @@ def handle(self, result: CallbackResult, poller: TaskPoller):
state_mgr = DeploymentStateMgr.from_deployment_id(
deployment_id=deployment_id, phase_type=DeployPhaseTypes.RELEASE
)
job_status = deploy_status_to_job_status(dp.status)
job_status = deploy_status_to_job_status(dp.status) if not is_interrupted else JobStatus.INTERRUPTED
try:
step_obj = state_mgr.phase.get_step_by_name(name="检测部署结果")
step_obj.mark_and_write_to_stream(state_mgr.stream, job_status)
Expand All @@ -117,6 +232,38 @@ def handle(self, result: CallbackResult, poller: TaskPoller):
# 在部署流程结束后,发送信号触发操作审计等后续步骤(不支持创建监控告警规则)
post_cnative_env_deploy.send(dp.environment, deploy=dp)

def parse_result(self, result: CallbackResult) -> Tuple[bool, str, Dict]:
"""Get detailed error message. if error message was empty, release was considered succeeded
:returns: (is_interrupted, error_msg, extra_data)
"""
if result.is_exception:
return False, "error polling deploy status", result.data

aborted_details = self.get_aborted_details(result)
if not aborted_details:
return False, "invalid polling result", result.data

if aborted_details.aborted:
assert aborted_details.policy is not None, 'policy must not be None' # Make type checker happy
return (
aborted_details.policy.is_interrupted,
aborted_details.policy.reason,
aborted_details.extra_data or {},
)

# if error message was empty, release was considered succeeded
return False, "", aborted_details.extra_data or {}

@staticmethod
def get_aborted_details(result: CallbackResult) -> Optional[AbortedDetails]:
"""If current release was aborted, return detailed info"""
try:
details = AbortedDetails.parse_obj(result.data)
except PyDanticValidationError:
return None
return details


def update_status(dp: AppModelDeploy, state: ModelResState, last_transition_time: Optional[datetime.datetime] = None):
"""Update deployment status, `last_transition_time` will always be updated
Expand Down
30 changes: 3 additions & 27 deletions apiserver/paasng/paasng/engine/deploy/bg_wait/wait_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
We undertake not to change the open source license (MIT license) applicable
to the current version of the project delivered to anyone in the future.
"""
from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy

"""Wait processes to match certain conditions"""
import logging
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type
from typing import Dict, List, Optional, Type

from blue_krill.async_utils.poll_task import PollingMetadata, PollingResult, PollingStatus, TaskPoller
from pydantic import BaseModel, validator

from paas_wl.workloads.processes.processes import PlainProcess
from paas_wl.workloads.processes.shim import ProcessManager
Expand Down Expand Up @@ -274,28 +275,3 @@ def get_status(self, processes: List[PlainProcess]) -> PollingResult:

logger.info(f'All processes has been updated to {self.release_version}, env: {self.env}')
return PollingResult.done()


class AbortedDetailsPolicy(BaseModel):
"""`policy` field of `AbortedDetails`"""

reason: str
name: str
is_interrupted: bool = False


class AbortedDetails(BaseModel):
"""A model for storing aborted details, such as "reason" and other infos
:param extra_data: reserved field for storing extra info
"""

aborted: bool
policy: Optional[AbortedDetailsPolicy]
extra_data: Optional[Any]

@validator('policy', always=True)
def data_not_empty(cls, v, values, **kwargs):
if values.get('aborted') and v is None:
raise ValueError('"data" can not be empty when aborted is "True"!')
return v
3 changes: 2 additions & 1 deletion apiserver/paasng/paasng/engine/deploy/release/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
from paasng.engine.configurations.image import update_image_runtime_config
from paasng.engine.configurations.ingress import AppDefaultDomains, AppDefaultSubpaths
from paasng.engine.constants import JobStatus, ReleaseStatus
from paasng.engine.deploy.bg_wait.wait_deployment import AbortedDetails, wait_for_release
from paasng.engine.deploy.bg_wait.base import AbortedDetails
from paasng.engine.deploy.bg_wait.wait_deployment import wait_for_release
from paasng.engine.exceptions import StepNotInPresetListError
from paasng.engine.models.deployment import Deployment
from paasng.engine.models.phases import DeployPhaseTypes
Expand Down
6 changes: 3 additions & 3 deletions apiserver/paasng/paasng/engine/deploy/release/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from paas_wl.resources.base.kres import KNamespace
from paas_wl.resources.utils.basic import get_client_by_app
from paasng.engine.constants import JobStatus
from paasng.engine.deploy.bg_wait.wait_bkapp import AppModelDeployStatusPoller, DeployStatusHandler
from paasng.engine.deploy.bg_wait.wait_bkapp import DeployStatusHandler, WaitAppModelReady
from paasng.engine.exceptions import StepNotInPresetListError
from paasng.engine.models.phases import DeployPhaseTypes
from paasng.engine.workflow import DeployStep
Expand Down Expand Up @@ -156,8 +156,8 @@ def release_by_k8s_operator(

# TODO: 统计成功 metrics
# Poll status in background
AppModelDeployStatusPoller.start(
{'deploy_id': app_model_deploy.id, 'deployment_id': deployment_id}, DeployStatusHandler
WaitAppModelReady.start(
{'env_id': env.id, 'deploy_id': app_model_deploy.id, 'deployment_id': deployment_id}, DeployStatusHandler
)
return str(app_model_deploy.id)

Expand Down
2 changes: 1 addition & 1 deletion apiserver/paasng/tests/api/apigw/test_cnative.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_create(self, api_client, bk_app, bk_module, bk_stag_env, with_wl_apps,
}
# Mock out the interactions with k8s cluster
with mock.patch("paasng.engine.deploy.release.operator.apply_bkapp_to_k8s", return_value=manifest), mock.patch(
'paasng.engine.deploy.release.operator.AppModelDeployStatusPoller.start',
'paasng.engine.deploy.release.operator.WaitAppModelReady.start',
return_value=None,
), mock.patch("paasng.engine.deploy.release.operator.svc_disc"), mock.patch(
"paasng.engine.deploy.release.operator.ensure_namespace"
Expand Down
3 changes: 1 addition & 2 deletions apiserver/paasng/tests/engine/processes/test_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
from blue_krill.async_utils.poll_task import PollingMetadata, PollingStatus
from django.dispatch import receiver

from paasng.engine.deploy.bg_wait.base import AbortedDetails, AbortedDetailsPolicy
from paasng.engine.deploy.bg_wait.wait_deployment import (
AbortedDetails,
AbortedDetailsPolicy,
DynamicReadyTimeoutPolicy,
TooManyRestartsPolicy,
UserInterruptedPolicy,
Expand Down
Loading

0 comments on commit d1e5ec3

Please sign in to comment.