Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【无需合入】节点计时器边界事件接入样例 #7149

Open
wants to merge 3 commits into
base: feature/issue_6653
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"pipeline.contrib.periodic_task",
"pipeline.contrib.external_plugins",
"pipeline.contrib.engine_admin",
"pipeline.contrib.node_timer_event",
"pipeline.django_signal_valve",
"pipeline_plugins",
"pipeline_plugins.components",
Expand Down Expand Up @@ -406,6 +407,15 @@ def _(s):

PIPELINE_DATA_BACKEND_AUTO_EXPIRE = True

# 配置节点计时器边界事件扫描间隔为 0.1,保证计时器事件及时执行
PIPELINE_NODE_TIMER_EVENT_POOL_SCAN_INTERVAL = 0.1
PIPELINE_NODE_TIMER_EVENT_ADAPTER_CLASS = (
"gcloud.taskflow3.domains.node_timeout_strategy." "NodeTimerEventWithTimeoutConfigAdapter"
)
# 复用节点超时节点池
PIPELINE_NODE_TIMER_EVENT_EXECUTING_POOL = "sops_executing_node_pool"


BAMBOO_PERIODIC_TASK_ROOT_PIPELINE_CONTEXT_PROVIER = "gcloud.taskflow3.context.root_pipeline_context_provider"
BAMBOO_PERIODIC_TASK_SUBPROCESS_CONTEXT_PROVIER = "gcloud.taskflow3.context.subprocess_context_provider"

Expand Down
85 changes: 85 additions & 0 deletions gcloud/taskflow3/domains/node_timeout_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,73 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import re
import typing
from abc import ABCMeta, abstractmethod

from pipeline.contrib.node_timer_event.adapter import NodeTimerEventAdapter
from pipeline.contrib.node_timer_event.constants import TimerType
from pipeline.contrib.node_timer_event.handlers import BaseAction
from pipeline.contrib.node_timer_event.types import TimerEvent
from pipeline.contrib.node_timer_event.utils import parse_timer_defined
from pipeline.core.data.base import DataObject

from gcloud.taskflow3.models import TaskFlowInstance, TimeoutNodeConfig

logger = logging.getLogger(__name__)

EVENT_KEY_PATTERN = re.compile(r"(?P<node_id>[a-z0-9]+)_(?P<version>[a-z0-9]+)")


class NodeTimerEventWithTimeoutConfigAdapter(NodeTimerEventAdapter):
def __init__(self, node_id: str, version: str):

super().__init__(node_id, version)

self.events = self.events or []
timeout_config: typing.Optional[TimeoutNodeConfig] = TimeoutNodeConfig.objects.filter(node_id=node_id).first()
if not timeout_config and not self.events:
return

if timeout_config.action == "forced_fail_and_skip":
defined = f"R1000/PT{timeout_config.timeout}S"
timer_type = TimerType.TIME_CYCLE.value
else:
defined = f"PT{timeout_config.timeout}S"
timer_type = TimerType.TIME_DURATION.value

self.events = [
{
"enable": True,
"action": timeout_config.action,
"timer_type": timer_type,
"defined": defined,
"repetitions": parse_timer_defined(timer_type, defined)["repetitions"],
}
] + self.events

# 重新编号
for idx, event in enumerate(self.events, 1):
event["index"] = idx

self.index__event_map: typing.Dict[int, TimerEvent] = {event["index"]: event for event in self.events}
self.root_pipeline_id = timeout_config.root_pipeline_id

@classmethod
def parse_event_key(cls, key: str) -> typing.Dict[str, typing.Union[str, int]]:
match = EVENT_KEY_PATTERN.match(key)
if match:
key_info: typing.Dict[str, typing.Union[str, int]] = match.groupdict()
# 超时事件被置于首位
key_info["index"] = 1
return key_info
return super().parse_event_key(key)

def fetch_keys_to_be_rem(self) -> typing.List[str]:
# 移除老协议的
return super().fetch_keys_to_be_rem() + [f"{self.node_id}_{self.version}"]


class NodeTimeoutStrategy(metaclass=ABCMeta):
TIMEOUT_NODE_OPERATOR = "sops_system"
Expand All @@ -35,6 +100,26 @@ def deal_with_timeout_node(self, task, node_id):
return fail_result


class ForcedFailAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action(forced_fail)] do: data -> %s, parent_data -> %s", data, parent_data)
task_inst = TaskFlowInstance.objects.get(pk=parent_data.get_one_of_inputs("task_id"))
task_inst.nodes_action("forced_fail", self.node_id, "sops_system")
return True

class Meta:
action_name = "forced_fail"


class ForcedFailAndSkipAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action(forced_fail_and_skip)] do: data -> %s, parent_data -> %s", data, parent_data)
return True

class Meta:
action_name = "forced_fail_and_skip"


node_timeout_handler = {
"forced_fail": ForcedFailStrategy(),
"forced_fail_and_skip": ForcedFailAndSkipStrategy(),
Expand Down
4 changes: 3 additions & 1 deletion gcloud/taskflow3/signals/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def bamboo_engine_eri_post_set_state_handler(sender, node_id, to_state, version,
_finish_taskflow_and_send_signal(root_id, taskflow_finished, True)

try:
_node_timeout_info_update(settings.redis_inst, to_state, node_id, version)
# 切换到边界事件
# _node_timeout_info_update(settings.redis_inst, to_state, node_id, version)
pass
except Exception:
logger.exception("node_timeout_info_update error")

Expand Down
29 changes: 28 additions & 1 deletion gcloud/taskflow3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
"""

import logging
from typing import Any, Dict, List, Optional
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional

from django.apps import apps
from django.utils.translation import ugettext_lazy as _
from pipeline.contrib.node_timer_event.constants import TimerType
from pipeline.core import constants as pipeline_constants
from pipeline.engine import states as pipeline_states
from pipeline.engine.utils import calculate_elapsed_time
Expand Down Expand Up @@ -173,3 +174,29 @@ def parse_node_timeout_configs(pipeline_tree: dict) -> list:
continue
configs.append({"action": action, "node_id": act_id, "timeout": timeout_seconds})
return {"result": True, "data": configs, "message": ""}


def convert_to_timer_events_config(pipeline_tree: dict):
"""将标准运维原来的节点超时配置,转为计时器边界事件,用于测试增强服务功能"""
for act_id, act in pipeline_tree[pipeline_constants.PE.activities].items():
if act["type"] == pipeline_constants.PE.SubProcess:
convert_to_timer_events_config(act[pipeline_constants.PE.pipeline])

elif act["type"] == pipeline_constants.PE.ServiceActivity:
timeout_config = act.get("timeout_config", {})
enable = timeout_config.get("enable")
if not enable:
continue
action = timeout_config.get("action")
timeout_seconds = timeout_config.get("seconds")
if action == "forced_fail_and_skip":
defined = f"R1000/PT{timeout_seconds}S"
timer_type = TimerType.TIME_CYCLE.value
else:
defined = f"PT{timeout_seconds}S"
timer_type = TimerType.TIME_DURATION.value

act["events"] = {}
act["events"]["timer_events"] = [
{"enable": True, "action": action, "timer_type": timer_type, "defined": defined}
]
Loading