Skip to content

Commit

Permalink
feat: pipeline dependencies version update
Browse files Browse the repository at this point in the history
  • Loading branch information
normal-wls committed Apr 10, 2024
1 parent a442fad commit 996c5cb
Show file tree
Hide file tree
Showing 20 changed files with 470 additions and 142 deletions.
5 changes: 4 additions & 1 deletion runtime/bamboo-pipeline/pipeline/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from django.apps import AppConfig
from django.conf import settings
from redis.sentinel import Sentinel
from rediscluster import RedisCluster
try:
from redis.cluster import RedisCluster
except ImportError:
from rediscluster import RedisCluster

logger = logging.getLogger("root")

Expand Down
12 changes: 12 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
48 changes: 48 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from celery import Task, current_app
from celery.schedules import maybe_schedule


class PipelinePeriodicTask(Task):
"""A task that adds itself to the :setting:`beat_schedule` setting."""

abstract = True
ignore_result = True
relative = False
options = None
compat = True

def __init__(self):
if not hasattr(self, 'run_every'):
raise NotImplementedError(
'Periodic tasks must have a run_every attribute')
self.run_every = maybe_schedule(self.run_every, self.relative)
super(PipelinePeriodicTask, self).__init__()

@classmethod
def on_bound(cls, app):
app.conf.beat_schedule[cls.name] = {
'task': cls.name,
'schedule': cls.run_every,
'args': (),
'kwargs': {},
'options': cls.options or {},
'relative': cls.relative,
}


def periodic_task(*args, **options):
"""Deprecated decorator, please use :setting:`beat_schedule`."""
return current_app.task(**dict({'base': PipelinePeriodicTask}, **options))

35 changes: 26 additions & 9 deletions runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import traceback

import pytz
from celery import task
from celery import current_app
from django.utils import timezone
from django.utils.module_loading import import_string
from bamboo_engine import api as bamboo_engine_api
Expand All @@ -35,7 +35,7 @@
logger = logging.getLogger("celery")


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def periodic_task_start(*args, **kwargs):
try:
periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"])
Expand Down Expand Up @@ -69,19 +69,28 @@ def periodic_task_start(*args, **kwargs):
)

result = instance.start(
periodic_task.creator, check_workers=False, priority=periodic_task.priority, queue=periodic_task.queue,
periodic_task.creator,
check_workers=False,
priority=periodic_task.priority,
queue=periodic_task.queue,
)
except Exception:
et = traceback.format_exc()
logger.error(et)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=et,
start_success=False,
)
return

if not result.result:
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=result.message,
start_success=False,
)
return

Expand All @@ -93,7 +102,7 @@ def periodic_task_start(*args, **kwargs):
PeriodicTaskHistory.objects.record_schedule(periodic_task=periodic_task, pipeline_instance=instance, ex_data="")


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def bamboo_engine_periodic_task_start(*args, **kwargs):
try:
periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"])
Expand Down Expand Up @@ -147,16 +156,24 @@ def bamboo_engine_periodic_task_start(*args, **kwargs):
et = traceback.format_exc()
logger.error(et)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=et,
start_success=False,
)
return

if not result.result:
PipelineInstance.objects.filter(id=instance.instance_id).update(
start_time=None, is_started=False, executor="",
start_time=None,
is_started=False,
executor="",
)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=result.message,
start_success=False,
)
return

Expand Down
6 changes: 3 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import ujson as json
from copy import deepcopy

from celery import task
from celery import current_app
from bamboo_engine import api as bamboo_engine_api

from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION
Expand Down Expand Up @@ -112,7 +112,7 @@ def recursive_collect_components(activities, status_tree, instance_id, stack=Non
return component_list


@task
@current_app.task
def pipeline_post_save_statistics_task(instance_id):
instance = PipelineInstance.objects.get(instance_id=instance_id)
# 统计流程标准插件个数,子流程个数,网关个数
Expand All @@ -134,7 +134,7 @@ def pipeline_post_save_statistics_task(instance_id):
)


@task
@current_app.task
def pipeline_archive_statistics_task(instance_id):
instance = PipelineInstance.objects.get(instance_id=instance_id)
engine_ver = 1
Expand Down
94 changes: 76 additions & 18 deletions runtime/bamboo-pipeline/pipeline/engine/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import traceback

from celery import current_app
from celery.task.control import revoke

try:
from celery.task.control import revoke
except ModuleNotFoundError:
revoke = current_app.control.revoke

from django.db import models, transaction
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -89,10 +94,16 @@ def prepare_for_pipeline(self, pipeline):
"""
# init runtime info
snapshot = ProcessSnapshot.objects.create_snapshot(
pipeline_stack=utils.Stack(), children=[], root_pipeline=pipeline, subprocess_stack=utils.Stack(),
pipeline_stack=utils.Stack(),
children=[],
root_pipeline=pipeline,
subprocess_stack=utils.Stack(),
)
process = self.create(
id=node_uniqid(), root_pipeline_id=pipeline.id, current_node_id=pipeline.start_event.id, snapshot=snapshot,
id=node_uniqid(),
root_pipeline_id=pipeline.id,
current_node_id=pipeline.start_event.id,
snapshot=snapshot,
)
process.push_pipeline(pipeline)
process.save()
Expand Down Expand Up @@ -362,7 +373,9 @@ def adjust_status(self, adjust_scope=None):
if node_state in {states.FAILED, states.SUSPENDED}:
# if current node failed or suspended
Status.objects.batch_transit(
id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING,
id_list=self.subprocess_stack,
state=states.BLOCKED,
from_state=states.RUNNING,
)
Status.objects.transit(self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True)
elif states.SUSPENDED in set(subproc_states):
Expand All @@ -372,7 +385,9 @@ def adjust_status(self, adjust_scope=None):
elif pipeline_state == states.SUSPENDED:
# if root pipeline suspended
Status.objects.batch_transit(
id_list=self.subprocess_stack, state=pipeline_state, from_state=states.RUNNING,
id_list=self.subprocess_stack,
state=pipeline_state,
from_state=states.RUNNING,
)

def wake_up(self):
Expand Down Expand Up @@ -475,10 +490,14 @@ def destroy_and_wake_up_parent(self, destination_id):
else:
if parent.blocked_by_failure_or_suspended():
Status.objects.batch_transit(
id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING,
id_list=self.subprocess_stack,
state=states.BLOCKED,
from_state=states.RUNNING,
)
Status.objects.transit(
id=self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True,
id=self.root_pipeline.id,
to_state=states.BLOCKED,
is_pipeline=True,
)

parent.save(save_snapshot=False)
Expand Down Expand Up @@ -540,7 +559,8 @@ def exit_gracefully(self, e):
if not result.result:
logger.error(
"process({process_id}) exit_gracefully status transit failed, current_node :{node_id}".format(
process_id=self.id, node_id=current_node.id if current_node else self.current_node_id,
process_id=self.id,
node_id=current_node.id if current_node else self.current_node_id,
)
)
self.sleep(adjust_status=True)
Expand Down Expand Up @@ -615,7 +635,9 @@ def build_relationship(self, ancestor_id, descendant_id):
relationships = [NodeRelationship(ancestor_id=descendant_id, descendant_id=descendant_id, distance=0)]
for ancestor in ancestors:
rel = NodeRelationship(
ancestor_id=ancestor.ancestor_id, descendant_id=descendant_id, distance=ancestor.distance + 1,
ancestor_id=ancestor.ancestor_id,
descendant_id=descendant_id,
distance=ancestor.distance + 1,
)
relationships.append(rel)
self.bulk_create(relationships)
Expand All @@ -630,12 +652,26 @@ class NodeRelationship(models.Model):
objects = RelationshipManager()

def __unicode__(self):
return str("#{} -({})-> #{}".format(self.ancestor_id, self.distance, self.descendant_id,))
return str(
"#{} -({})-> #{}".format(
self.ancestor_id,
self.distance,
self.descendant_id,
)
)


class StatusManager(models.Manager):
def transit(
self, id, to_state, is_pipeline=False, appoint=False, start=False, name="", version=None, unchanged_pass=False,
self,
id,
to_state,
is_pipeline=False,
appoint=False,
start=False,
name="",
version=None,
unchanged_pass=False,
):
"""
尝试改变某个节点的状态
Expand Down Expand Up @@ -679,7 +715,10 @@ def transit(
return ActionResult(result=True, message="success", extra=status)

if states.can_transit(
from_state=status.state, to_state=to_state, is_pipeline=is_pipeline, appoint=appoint,
from_state=status.state,
to_state=to_state,
is_pipeline=is_pipeline,
appoint=appoint,
):

# 在冻结状态下不能改变 pipeline 的状态
Expand All @@ -688,11 +727,17 @@ def transit(
if subprocess_rel:
process = PipelineProcess.objects.get(id=subprocess_rel[0].process_id)
if process.is_frozen:
return ActionResult(result=False, message="engine is frozen, can not perform operation",)
return ActionResult(
result=False,
message="engine is frozen, can not perform operation",
)

processes = PipelineProcess.objects.filter(root_pipeline_id=id)
if processes and processes[0].is_frozen:
return ActionResult(result=False, message="engine is frozen, can not perform operation",)
return ActionResult(
result=False,
message="engine is frozen, can not perform operation",
)

if name:
status.name = name
Expand Down Expand Up @@ -772,7 +817,9 @@ def prepare_for_pipeline(self, pipeline):
cls_str = str(pipeline.__class__)
cls_name = pipeline.__class__.__name__[:NAME_MAX_LENGTH]
self.create(
id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name,
id=pipeline.id,
state=states.READY,
name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name,
)

def fail(self, node, ex_data):
Expand Down Expand Up @@ -1287,7 +1334,11 @@ def record(self, name, kwargs, type, extra_kwargs, exec_trace):
save_kwargs = json.dumps(save_kwargs)

return self.create(
name=name, kwargs=save_kwargs, type=type, extra_kwargs=save_extra_kwargs, exec_trace=exec_trace,
name=name,
kwargs=save_kwargs,
type=type,
extra_kwargs=save_extra_kwargs,
exec_trace=exec_trace,
)

def resend(self, id):
Expand Down Expand Up @@ -1341,7 +1392,10 @@ def resend(self):
)
elif self.type == self.TASK_TYPE_NODE:
NodeCeleryTask.objects.start_task(
node_id=self.extra_kwargs_dict["node_id"], task=task, kwargs=self.kwargs_dict, record_error=False,
node_id=self.extra_kwargs_dict["node_id"],
task=task,
kwargs=self.kwargs_dict,
record_error=False,
)
elif self.type == self.TASK_TYPE_SCHEDULE:
ScheduleCeleryTask.objects.start_task(
Expand All @@ -1366,7 +1420,11 @@ def watch(cls, name, kwargs, type, extra_kwargs):
except Exception:
logger.exception("celery task({}) watcher catch error.".format(name))
cls.objects.record(
name=name, kwargs=kwargs, type=type, extra_kwargs=extra_kwargs, exec_trace=traceback.format_exc(),
name=name,
kwargs=kwargs,
type=type,
extra_kwargs=extra_kwargs,
exec_trace=traceback.format_exc(),
)
# raise specific exception to indicate that send fail task have been catched
raise exceptions.CeleryFailedTaskCatchException(name)
Loading

0 comments on commit 996c5cb

Please sign in to comment.