Skip to content

Commit

Permalink
feature: 节点计时器边界事件接入样例
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 31, 2023
1 parent d7e15a8 commit d7cde35
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 5 deletions.
4 changes: 4 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"pipeline.contrib.periodic_task",
"pipeline.contrib.external_plugins",
"pipeline.contrib.engine_admin",
"pipeline.contrib.node_timer_event",
"pipeline.django_signal_valve",
"pipeline_plugins",
"pipeline_plugins.components",
Expand Down Expand Up @@ -406,6 +407,9 @@ def _(s):

PIPELINE_DATA_BACKEND_AUTO_EXPIRE = True

# 配置节点计时器边界事件扫描间隔为 0.1,保证计时器事件及时执行
PIPELINE_NODE_TIMER_EVENT_POOL_SCAN_INTERVAL = 0.1

BAMBOO_PERIODIC_TASK_ROOT_PIPELINE_CONTEXT_PROVIER = "gcloud.taskflow3.context.root_pipeline_context_provider"
BAMBOO_PERIODIC_TASK_SUBPROCESS_CONTEXT_PROVIER = "gcloud.taskflow3.context.subprocess_context_provider"

Expand Down
13 changes: 9 additions & 4 deletions gcloud/core/apis/drf/viewsets/taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from django.utils.translation import ugettext_lazy as _
from django_filters import FilterSet
from drf_yasg.utils import swagger_auto_schema
from pipeline.contrib.node_timer_event.api import batch_create_node_timer_event_config
from pipeline.eri.models import State
from pipeline.exceptions import PipelineException
from pipeline.models import PipelineInstance, Snapshot
Expand Down Expand Up @@ -69,7 +70,8 @@
from gcloud.iam_auth.utils import get_common_flow_allowed_actions_for_user, get_flow_allowed_actions_for_user
from gcloud.taskflow3.domains.auto_retry import AutoRetryNodeStrategyCreator
from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher
from gcloud.taskflow3.models import TaskConfig, TaskFlowInstance, TaskFlowRelation, TimeoutNodeConfig
from gcloud.taskflow3.models import TaskConfig, TaskFlowInstance, TaskFlowRelation
from gcloud.taskflow3.utils import convert_to_timer_events_config
from gcloud.tasktmpl3.models import TaskTemplate
from gcloud.utils import concurrent
from gcloud.utils.strings import standardize_name, standardize_pipeline_node_name
Expand Down Expand Up @@ -441,6 +443,11 @@ def perform_create(self, serializer):
"func_claim" if serializer.validated_data["flow_type"] == "common_func" else "execute_task"
)
serializer.validated_data["template_id"] = template.id

# convert timeout_config to timer_events
convert_to_timer_events_config(pipeline_instance.execution_data)
pipeline_instance.execution_snapshot.save()

# create taskflow
serializer.save()
# crete auto retry strategy
Expand All @@ -449,9 +456,7 @@ def perform_create(self, serializer):
)
arn_creator.batch_create_strategy(pipeline_instance.execution_data)

# create timeout config
TimeoutNodeConfig.objects.batch_create_node_timeout_config(
taskflow_id=serializer.instance.id,
batch_create_node_timer_event_config(
root_pipeline_id=pipeline_instance.instance_id,
pipeline_tree=pipeline_instance.execution_data,
)
Expand Down
24 changes: 24 additions & 0 deletions gcloud/taskflow3/domains/node_timeout_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,16 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
from abc import ABCMeta, abstractmethod

from pipeline.contrib.node_timer_event.handlers import BaseAction, register_action
from pipeline.core.data.base import DataObject

from gcloud.taskflow3.models import TaskFlowInstance

logger = logging.getLogger(__name__)


class NodeTimeoutStrategy(metaclass=ABCMeta):
TIMEOUT_NODE_OPERATOR = "sops_system"
Expand All @@ -35,6 +43,22 @@ def deal_with_timeout_node(self, task, node_id):
return fail_result


@register_action("forced_fail")
class ForcedFailAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action(forced_fail)] do: data -> %s, parent_data -> %s", data, parent_data)
task_inst = TaskFlowInstance.objects.get(pk=parent_data.get_one_of_inputs("task_id"))
task_inst.nodes_action("forced_fail", self.node_id, "sops_system")
return True


@register_action("forced_fail_and_skip")
class ForcedFailAndSkipAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action(forced_fail_and_skip)] do: data -> %s, parent_data -> %s", data, parent_data)
return True


node_timeout_handler = {
"forced_fail": ForcedFailStrategy(),
"forced_fail_and_skip": ForcedFailAndSkipStrategy(),
Expand Down
29 changes: 28 additions & 1 deletion gcloud/taskflow3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
"""

import logging
from typing import Any, Dict, List, Optional
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional

from django.apps import apps
from django.utils.translation import ugettext_lazy as _
from pipeline.contrib.node_timer_event.constants import TimerType
from pipeline.core import constants as pipeline_constants
from pipeline.engine import states as pipeline_states
from pipeline.engine.utils import calculate_elapsed_time
Expand Down Expand Up @@ -173,3 +174,29 @@ def parse_node_timeout_configs(pipeline_tree: dict) -> list:
continue
configs.append({"action": action, "node_id": act_id, "timeout": timeout_seconds})
return {"result": True, "data": configs, "message": ""}


def convert_to_timer_events_config(pipeline_tree: dict):
"""将标准运维原来的节点超时配置,转为计时器边界事件,用于测试增强服务功能"""
for act_id, act in pipeline_tree[pipeline_constants.PE.activities].items():
if act["type"] == pipeline_constants.PE.SubProcess:
convert_to_timer_events_config(act[pipeline_constants.PE.pipeline])

elif act["type"] == pipeline_constants.PE.ServiceActivity:
timeout_config = act.get("timeout_config", {})
enable = timeout_config.get("enable")
if not enable:
continue
action = timeout_config.get("action")
timeout_seconds = timeout_config.get("seconds")
if action == "forced_fail_and_skip":
defined = f"R1000/PT{timeout_seconds}S"
timer_type = TimerType.TIME_CYCLE.value
else:
defined = f"PT{timeout_seconds}S"
timer_type = TimerType.TIME_DURATION.value

act["events"] = {}
act["events"]["timer_events"] = [
{"enable": True, "action": action, "timer_type": timer_type, "defined": defined}
]

0 comments on commit d7cde35

Please sign in to comment.