diff --git a/runtime/bamboo-pipeline/pipeline/apps.py b/runtime/bamboo-pipeline/pipeline/apps.py index f1f2f831..8e8eb166 100644 --- a/runtime/bamboo-pipeline/pipeline/apps.py +++ b/runtime/bamboo-pipeline/pipeline/apps.py @@ -19,7 +19,10 @@ from django.apps import AppConfig from django.conf import settings from redis.sentinel import Sentinel -from rediscluster import RedisCluster +try: + from redis.cluster import RedisCluster +except ImportError: + from rediscluster import RedisCluster logger = logging.getLogger("root") diff --git a/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py new file mode 100644 index 00000000..90101cd1 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from celery import Task, current_app +from celery.schedules import maybe_schedule + + +class PipelinePeriodicTask(Task): + """A task that adds itself to the :setting:`beat_schedule` setting.""" + + abstract = True + ignore_result = True + relative = False + options = None + compat = True + + def __init__(self): + if not hasattr(self, 'run_every'): + raise NotImplementedError( + 'Periodic tasks must have a run_every attribute') + self.run_every = maybe_schedule(self.run_every, self.relative) + super(PipelinePeriodicTask, self).__init__() + + @classmethod + def on_bound(cls, app): + app.conf.beat_schedule[cls.name] = { + 'task': cls.name, + 'schedule': cls.run_every, + 'args': (), + 'kwargs': {}, + 'options': cls.options or {}, + 'relative': cls.relative, + } + + +def periodic_task(*args, **options): + """Deprecated decorator, please use :setting:`beat_schedule`.""" + return current_app.task(**dict({'base': PipelinePeriodicTask}, **options)) + diff --git a/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py index 90da26e2..76d0d366 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py @@ -16,7 +16,7 @@ import traceback import pytz -from celery import task +from celery import current_app from django.utils import timezone from django.utils.module_loading import import_string from bamboo_engine import api as bamboo_engine_api @@ -35,7 +35,7 @@ logger = logging.getLogger("celery") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def periodic_task_start(*args, **kwargs): try: periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"]) @@ -69,19 +69,28 @@ def periodic_task_start(*args, **kwargs): ) result = instance.start( - periodic_task.creator, check_workers=False, priority=periodic_task.priority, queue=periodic_task.queue, + periodic_task.creator, + check_workers=False, + priority=periodic_task.priority, + queue=periodic_task.queue, ) except Exception: et = traceback.format_exc() logger.error(et) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=et, + start_success=False, ) return if not result.result: PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=result.message, + start_success=False, ) return @@ -93,7 +102,7 @@ def periodic_task_start(*args, **kwargs): PeriodicTaskHistory.objects.record_schedule(periodic_task=periodic_task, pipeline_instance=instance, ex_data="") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def bamboo_engine_periodic_task_start(*args, **kwargs): try: periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"]) @@ -147,16 +156,24 @@ def bamboo_engine_periodic_task_start(*args, **kwargs): et = traceback.format_exc() logger.error(et) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=et, + start_success=False, ) return if not result.result: PipelineInstance.objects.filter(id=instance.instance_id).update( - start_time=None, is_started=False, executor="", + start_time=None, + is_started=False, + executor="", ) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=result.message, + start_success=False, ) return diff --git a/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py index a18ab110..f6e921f1 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py @@ -16,7 +16,7 @@ import ujson as json from copy import deepcopy -from celery import task +from celery import current_app from bamboo_engine import api as bamboo_engine_api from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION @@ -112,7 +112,7 @@ def recursive_collect_components(activities, status_tree, instance_id, stack=Non return component_list -@task +@current_app.task def pipeline_post_save_statistics_task(instance_id): instance = PipelineInstance.objects.get(instance_id=instance_id) # 统计流程标准插件个数,子流程个数,网关个数 @@ -134,7 +134,7 @@ def pipeline_post_save_statistics_task(instance_id): ) -@task +@current_app.task def pipeline_archive_statistics_task(instance_id): instance = PipelineInstance.objects.get(instance_id=instance_id) engine_ver = 1 diff --git a/runtime/bamboo-pipeline/pipeline/engine/models/core.py b/runtime/bamboo-pipeline/pipeline/engine/models/core.py index aaa7551b..6b621559 100644 --- a/runtime/bamboo-pipeline/pipeline/engine/models/core.py +++ b/runtime/bamboo-pipeline/pipeline/engine/models/core.py @@ -17,7 +17,12 @@ import traceback from celery import current_app -from celery.task.control import revoke + +try: + from celery.task.control import revoke +except ModuleNotFoundError: + revoke = current_app.control.revoke + from django.db import models, transaction from django.utils import timezone from django.utils.translation import ugettext_lazy as _ @@ -89,10 +94,16 @@ def prepare_for_pipeline(self, pipeline): """ # init runtime info snapshot = ProcessSnapshot.objects.create_snapshot( - pipeline_stack=utils.Stack(), children=[], root_pipeline=pipeline, subprocess_stack=utils.Stack(), + pipeline_stack=utils.Stack(), + children=[], + root_pipeline=pipeline, + subprocess_stack=utils.Stack(), ) process = self.create( - id=node_uniqid(), root_pipeline_id=pipeline.id, current_node_id=pipeline.start_event.id, snapshot=snapshot, + id=node_uniqid(), + root_pipeline_id=pipeline.id, + current_node_id=pipeline.start_event.id, + snapshot=snapshot, ) process.push_pipeline(pipeline) process.save() @@ -362,7 +373,9 @@ def adjust_status(self, adjust_scope=None): if node_state in {states.FAILED, states.SUSPENDED}: # if current node failed or suspended Status.objects.batch_transit( - id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=states.BLOCKED, + from_state=states.RUNNING, ) Status.objects.transit(self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True) elif states.SUSPENDED in set(subproc_states): @@ -372,7 +385,9 @@ def adjust_status(self, adjust_scope=None): elif pipeline_state == states.SUSPENDED: # if root pipeline suspended Status.objects.batch_transit( - id_list=self.subprocess_stack, state=pipeline_state, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=pipeline_state, + from_state=states.RUNNING, ) def wake_up(self): @@ -475,10 +490,14 @@ def destroy_and_wake_up_parent(self, destination_id): else: if parent.blocked_by_failure_or_suspended(): Status.objects.batch_transit( - id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=states.BLOCKED, + from_state=states.RUNNING, ) Status.objects.transit( - id=self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True, + id=self.root_pipeline.id, + to_state=states.BLOCKED, + is_pipeline=True, ) parent.save(save_snapshot=False) @@ -540,7 +559,8 @@ def exit_gracefully(self, e): if not result.result: logger.error( "process({process_id}) exit_gracefully status transit failed, current_node :{node_id}".format( - process_id=self.id, node_id=current_node.id if current_node else self.current_node_id, + process_id=self.id, + node_id=current_node.id if current_node else self.current_node_id, ) ) self.sleep(adjust_status=True) @@ -615,7 +635,9 @@ def build_relationship(self, ancestor_id, descendant_id): relationships = [NodeRelationship(ancestor_id=descendant_id, descendant_id=descendant_id, distance=0)] for ancestor in ancestors: rel = NodeRelationship( - ancestor_id=ancestor.ancestor_id, descendant_id=descendant_id, distance=ancestor.distance + 1, + ancestor_id=ancestor.ancestor_id, + descendant_id=descendant_id, + distance=ancestor.distance + 1, ) relationships.append(rel) self.bulk_create(relationships) @@ -630,12 +652,26 @@ class NodeRelationship(models.Model): objects = RelationshipManager() def __unicode__(self): - return str("#{} -({})-> #{}".format(self.ancestor_id, self.distance, self.descendant_id,)) + return str( + "#{} -({})-> #{}".format( + self.ancestor_id, + self.distance, + self.descendant_id, + ) + ) class StatusManager(models.Manager): def transit( - self, id, to_state, is_pipeline=False, appoint=False, start=False, name="", version=None, unchanged_pass=False, + self, + id, + to_state, + is_pipeline=False, + appoint=False, + start=False, + name="", + version=None, + unchanged_pass=False, ): """ 尝试改变某个节点的状态 @@ -679,7 +715,10 @@ def transit( return ActionResult(result=True, message="success", extra=status) if states.can_transit( - from_state=status.state, to_state=to_state, is_pipeline=is_pipeline, appoint=appoint, + from_state=status.state, + to_state=to_state, + is_pipeline=is_pipeline, + appoint=appoint, ): # 在冻结状态下不能改变 pipeline 的状态 @@ -688,11 +727,17 @@ def transit( if subprocess_rel: process = PipelineProcess.objects.get(id=subprocess_rel[0].process_id) if process.is_frozen: - return ActionResult(result=False, message="engine is frozen, can not perform operation",) + return ActionResult( + result=False, + message="engine is frozen, can not perform operation", + ) processes = PipelineProcess.objects.filter(root_pipeline_id=id) if processes and processes[0].is_frozen: - return ActionResult(result=False, message="engine is frozen, can not perform operation",) + return ActionResult( + result=False, + message="engine is frozen, can not perform operation", + ) if name: status.name = name @@ -772,7 +817,9 @@ def prepare_for_pipeline(self, pipeline): cls_str = str(pipeline.__class__) cls_name = pipeline.__class__.__name__[:NAME_MAX_LENGTH] self.create( - id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name, + id=pipeline.id, + state=states.READY, + name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name, ) def fail(self, node, ex_data): @@ -1287,7 +1334,11 @@ def record(self, name, kwargs, type, extra_kwargs, exec_trace): save_kwargs = json.dumps(save_kwargs) return self.create( - name=name, kwargs=save_kwargs, type=type, extra_kwargs=save_extra_kwargs, exec_trace=exec_trace, + name=name, + kwargs=save_kwargs, + type=type, + extra_kwargs=save_extra_kwargs, + exec_trace=exec_trace, ) def resend(self, id): @@ -1341,7 +1392,10 @@ def resend(self): ) elif self.type == self.TASK_TYPE_NODE: NodeCeleryTask.objects.start_task( - node_id=self.extra_kwargs_dict["node_id"], task=task, kwargs=self.kwargs_dict, record_error=False, + node_id=self.extra_kwargs_dict["node_id"], + task=task, + kwargs=self.kwargs_dict, + record_error=False, ) elif self.type == self.TASK_TYPE_SCHEDULE: ScheduleCeleryTask.objects.start_task( @@ -1366,7 +1420,11 @@ def watch(cls, name, kwargs, type, extra_kwargs): except Exception: logger.exception("celery task({}) watcher catch error.".format(name)) cls.objects.record( - name=name, kwargs=kwargs, type=type, extra_kwargs=extra_kwargs, exec_trace=traceback.format_exc(), + name=name, + kwargs=kwargs, + type=type, + extra_kwargs=extra_kwargs, + exec_trace=traceback.format_exc(), ) # raise specific exception to indicate that send fail task have been catched raise exceptions.CeleryFailedTaskCatchException(name) diff --git a/runtime/bamboo-pipeline/pipeline/engine/tasks.py b/runtime/bamboo-pipeline/pipeline/engine/tasks.py index 956350f8..93f54d37 100644 --- a/runtime/bamboo-pipeline/pipeline/engine/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/engine/tasks.py @@ -14,9 +14,8 @@ import logging import datetime from dateutil.relativedelta import relativedelta -from celery import task +from celery import current_app from celery.schedules import crontab -from celery.task import periodic_task from django.db import transaction, connection from pipeline.conf import default_settings @@ -34,11 +33,12 @@ History, ) from pipeline.models import PipelineInstance +from pipeline.contrib.celery_tools.periodic import periodic_task logger = logging.getLogger("celery") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def process_unfreeze(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -48,7 +48,7 @@ def process_unfreeze(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def start(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -67,7 +67,7 @@ def start(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def dispatch(child_id): process = PipelineProcess.objects.get(id=child_id) if not process.is_alive: @@ -77,7 +77,7 @@ def dispatch(child_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def process_wake_up(process_id, current_node_id=None, call_from_child=False): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -103,7 +103,7 @@ def process_wake_up(process_id, current_node_id=None, call_from_child=False): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def wake_up(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -114,7 +114,7 @@ def wake_up(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def batch_wake_up(process_id_list, pipeline_id): # success_when_unchanged to deal with parallel gateway subprocess wake up action_result = Status.objects.transit(pipeline_id, to_state=states.RUNNING, is_pipeline=True, unchanged_pass=True) @@ -126,7 +126,7 @@ def batch_wake_up(process_id_list, pipeline_id): ProcessCeleryTask.objects.bind(process_id, task_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def wake_from_schedule(process_id, service_act_id): process = PipelineProcess.objects.get(id=process_id) process.wake_up() @@ -136,12 +136,12 @@ def wake_from_schedule(process_id, service_act_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def service_schedule(process_id, schedule_id, data_id=None): schedule.schedule(process_id, schedule_id, data_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def node_timeout_check(node_id, version, root_pipeline_id): NodeCeleryTask.objects.destroy(node_id) state = Status.objects.state_for(node_id, version=version, may_not_exist=True) diff --git a/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py b/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py index 76bc4c6f..77c160da 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py @@ -14,9 +14,9 @@ import logging from typing import Optional -from celery import task -from celery.decorators import periodic_task +from celery import current_app from celery.schedules import crontab +from pipeline.contrib.celery_tools.periodic import periodic_task from django.conf import settings from bamboo_engine import metrics @@ -47,7 +47,7 @@ def _observe_message_delay(metric: metrics.Histogram, headers: dict): logger.exception("%s observe err" % metric) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute( process_id: int, node_id: str, @@ -86,7 +86,7 @@ def execute( ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def schedule( process_id: int, node_id: str, diff --git a/runtime/bamboo-pipeline/pipeline/log/tasks.py b/runtime/bamboo-pipeline/pipeline/log/tasks.py index 394a1d5b..a5aef126 100644 --- a/runtime/bamboo-pipeline/pipeline/log/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/log/tasks.py @@ -13,11 +13,11 @@ import logging -from celery.decorators import periodic_task from celery.schedules import crontab from django.conf import settings from pipeline.log.models import LogEntry +from pipeline.contrib.celery_tools.periodic import periodic_task logger = logging.getLogger(__name__) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py new file mode 100644 index 00000000..f34dc676 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from __future__ import absolute_import, unicode_literals + +from optparse import make_option as Option + +from celery.bin import celery + +from pipeline.management.commands.app import app +from pipeline.management.commands.base import CeleryCommand + +base = celery.CeleryCommand(app=app) + + +class Command(CeleryCommand): + """The celery command.""" + + help = "celery commands, see celery help" + options = ( + Option("-A", "--app", default=None), + Option("--broker", default=None), + Option("--loader", default=None), + Option("--config", default=None), + Option("--workdir", default=None, dest="working_directory"), + Option("--result-backend", default=None), + Option("--no-color", "-C", action="store_true", default=None), + Option("--quiet", "-q", action="store_true"), + ) + if base.get_options() is not None: + options = options + CeleryCommand.options + base.get_options() + + def run_from_argv(self, argv): + argv = self.handle_default_options(argv) + base.execute_from_commandline(["{0[0]} {0[1]}".format(argv)] + argv[2:],) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py new file mode 100644 index 00000000..49339fb1 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from celery.bin.celery import celery +from click.exceptions import Exit +from django.core.management import BaseCommand + + +class Command(BaseCommand): + """The celery command.""" + + help = "celery commands, see celery help" + + def run_from_argv(self, argv): + try: + celery.main(args=argv[2:], standalone_mode=False) + except Exit as e: + print(f"celery command error: {e}") + return e.exit_code diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py new file mode 100644 index 00000000..2ef3aef4 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from __future__ import absolute_import, unicode_literals + +from optparse import make_option as Option + +from celery.bin import beat + +from pipeline.management.commands.app import app +from pipeline.management.commands.base import CeleryCommand + +beat = beat.beat(app=app) + + +class Command(CeleryCommand): + """Run the celery periodic task scheduler.""" + + help = 'Old alias to the "celery beat" command.' + options = ( + Option("-A", "--app", default=None), + Option("--broker", default=None), + Option("--loader", default=None), + Option("--config", default=None), + Option("--workdir", default=None, dest="working_directory"), + Option("--result-backend", default=None), + Option("--no-color", "-C", action="store_true", default=None), + Option("--quiet", "-q", action="store_true"), + ) + if beat.get_options() is not None: + options = options + CeleryCommand.options + beat.get_options() + + def handle(self, *args, **options): + beat.run(*args, **options) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py new file mode 100644 index 00000000..88ec7be5 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from celery.bin.celery import celery +from click.exceptions import Exit +from django.core.management import BaseCommand + + +class Command(BaseCommand): + """The celery command.""" + + help = "celery commands, see celery help" + + def run_from_argv(self, argv): + try: + celery.main(args=["beat", *argv[2:]], standalone_mode=False) + except Exit as e: + print(f"celery command error: {e}") + return e.exit_code diff --git a/runtime/bamboo-pipeline/pipeline/management/commands/celery.py b/runtime/bamboo-pipeline/pipeline/management/commands/celery.py index f34dc676..4dd3bc9a 100644 --- a/runtime/bamboo-pipeline/pipeline/management/commands/celery.py +++ b/runtime/bamboo-pipeline/pipeline/management/commands/celery.py @@ -10,35 +10,14 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from __future__ import absolute_import, unicode_literals +import celery -from optparse import make_option as Option +major_version = celery.VERSION.major -from celery.bin import celery -from pipeline.management.commands.app import app -from pipeline.management.commands.base import CeleryCommand +if major_version < 5: + from ..celery_version_adapter.celery4 import Command +else: + from ..celery_version_adapter.celery5 import Command -base = celery.CeleryCommand(app=app) - - -class Command(CeleryCommand): - """The celery command.""" - - help = "celery commands, see celery help" - options = ( - Option("-A", "--app", default=None), - Option("--broker", default=None), - Option("--loader", default=None), - Option("--config", default=None), - Option("--workdir", default=None, dest="working_directory"), - Option("--result-backend", default=None), - Option("--no-color", "-C", action="store_true", default=None), - Option("--quiet", "-q", action="store_true"), - ) - if base.get_options() is not None: - options = options + CeleryCommand.options + base.get_options() - - def run_from_argv(self, argv): - argv = self.handle_default_options(argv) - base.execute_from_commandline(["{0[0]} {0[1]}".format(argv)] + argv[2:],) +Command = Command diff --git a/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py b/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py index 73e0cfe7..56ddd711 100644 --- a/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py +++ b/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py @@ -1,36 +1,24 @@ +# -*- coding: utf-8 -*- """ - -Start the celery clock service from the Django management command. - +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. """ -from __future__ import absolute_import, unicode_literals - -from optparse import make_option as Option - -from celery.bin import beat - -from pipeline.management.commands.app import app -from pipeline.management.commands.base import CeleryCommand -beat = beat.beat(app=app) +import celery +major_version = celery.VERSION.major -class Command(CeleryCommand): - """Run the celery periodic task scheduler.""" - help = 'Old alias to the "celery beat" command.' - options = ( - Option("-A", "--app", default=None), - Option("--broker", default=None), - Option("--loader", default=None), - Option("--config", default=None), - Option("--workdir", default=None, dest="working_directory"), - Option("--result-backend", default=None), - Option("--no-color", "-C", action="store_true", default=None), - Option("--quiet", "-q", action="store_true"), - ) - if beat.get_options() is not None: - options = options + CeleryCommand.options + beat.get_options() +if major_version < 5: + from ..celery_version_adapter.celerybeat4 import Command +else: + from ..celery_version_adapter.celerybeat5 import Command - def handle(self, *args, **options): - beat.run(*args, **options) +Command = Command \ No newline at end of file diff --git a/runtime/bamboo-pipeline/pipeline/utils/celery_tools.py b/runtime/bamboo-pipeline/pipeline/utils/celery_tools.py new file mode 100644 index 00000000..acc0a65e --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/utils/celery_tools.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +from celery import Task, current_app +from celery.schedules import maybe_schedule + + +class PipelinePeriodicTask(Task): + """A task that adds itself to the :setting:`beat_schedule` setting.""" + + abstract = True + ignore_result = True + relative = False + options = None + compat = True + + def __init__(self): + if not hasattr(self, 'run_every'): + raise NotImplementedError( + 'Periodic tasks must have a run_every attribute') + self.run_every = maybe_schedule(self.run_every, self.relative) + super(PipelinePeriodicTask, self).__init__() + + @classmethod + def on_bound(cls, app): + app.conf.beat_schedule[cls.name] = { + 'task': cls.name, + 'schedule': cls.run_every, + 'args': (), + 'kwargs': {}, + 'options': cls.options or {}, + 'relative': cls.relative, + } + + +def periodic_task(*args, **options): + """Deprecated decorator, please use :setting:`beat_schedule`.""" + return current_app.task(**dict({'base': PipelinePeriodicTask}, **options)) + diff --git a/runtime/bamboo-pipeline/poetry.lock b/runtime/bamboo-pipeline/poetry.lock index e90b49e3..d77735dd 100644 --- a/runtime/bamboo-pipeline/poetry.lock +++ b/runtime/bamboo-pipeline/poetry.lock @@ -17,6 +17,20 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "asgiref" +version = "3.4.1" +description = "ASGI specs, helper code, and adapters" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +typing-extensions = {version = "*", markers = "python_version < \"3.8\""} + +[package.extras] +tests = ["mypy (>=0.800)", "pytest", "pytest-asyncio"] + [[package]] name = "atomicwrites" version = "1.4.0" @@ -34,10 +48,10 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [package.extras] -dev = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface", "furo", "sphinx", "sphinx-notfound-page", "pre-commit"] -docs = ["furo", "sphinx", "zope.interface", "sphinx-notfound-page"] -tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface"] -tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins"] +dev = ["coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"] +docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"] +tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"] +tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"] [[package]] name = "bamboo-engine" @@ -133,12 +147,12 @@ vine = "1.3.0" [package.extras] arangodb = ["pyArango (>=1.3.2)"] auth = ["cryptography"] -azureblockblob = ["azure-storage (==0.36.0)", "azure-common (==1.1.5)", "azure-storage-common (==1.1.0)"] +azureblockblob = ["azure-common (==1.1.5)", "azure-storage (==0.36.0)", "azure-storage-common (==1.1.0)"] brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"] cassandra = ["cassandra-driver (<3.21.0)"] consul = ["python-consul"] cosmosdbsql = ["pydocumentdb (==2.3.2)"] -couchbase = ["couchbase-cffi (<3.0.0)", "couchbase (<3.0.0)"] +couchbase = ["couchbase (<3.0.0)", "couchbase-cffi (<3.0.0)"] couchdb = ["pycouchdb"] django = ["Django (>=1.11)"] dynamodb = ["boto3 (>=1.9.178)"] @@ -227,18 +241,19 @@ python-versions = ">=3.6, <3.7" [[package]] name = "django" -version = "2.2.24" +version = "3.2.25" description = "A high-level Python Web framework that encourages rapid development and clean, pragmatic design." category = "main" optional = false -python-versions = ">=3.5" +python-versions = ">=3.6" [package.dependencies] +asgiref = ">=3.3.2,<4" pytz = "*" sqlparse = ">=0.2.2" [package.extras] -argon2 = ["argon2-cffi (>=16.1.0)"] +argon2 = ["argon2-cffi (>=19.1.0)"] bcrypt = ["bcrypt"] [[package]] @@ -257,7 +272,7 @@ python-crontab = ">=2.3.4" [[package]] name = "django-timezone-field" -version = "4.2.1" +version = "4.2.3" description = "A Django app providing database and form fields for pytz timezone objects." category = "main" optional = false @@ -314,9 +329,9 @@ typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""} zipp = ">=0.5" [package.extras] -docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] +docs = ["jaraco.packaging (>=8.2)", "rst.linker (>=1.9)", "sphinx"] perf = ["ipython"] -testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "packaging", "pep517", "pyfakefs", "flufl.flake8", "pytest-perf (>=0.9.2)", "pytest-black (>=0.3.7)", "pytest-mypy", "importlib-resources (>=1.3)"] +testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pep517", "pyfakefs", "pytest (>=4.6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-flake8", "pytest-mypy", "pytest-perf (>=0.9.2)"] [[package]] name = "iniconfig" @@ -405,7 +420,7 @@ optional = false python-versions = ">=3.6" [package.extras] -build = ["twine", "wheel", "blurb"] +build = ["blurb", "twine", "wheel"] docs = ["sphinx"] test = ["pytest (<5.4)", "pytest-cov"] @@ -456,8 +471,8 @@ python-versions = ">=3.6" importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} [package.extras] -testing = ["pytest-benchmark", "pytest"] -dev = ["tox", "pre-commit"] +dev = ["pre-commit", "tox"] +testing = ["pytest", "pytest-benchmark"] [[package]] name = "prometheus-client" @@ -573,7 +588,7 @@ name = "redis-py-cluster" version = "2.1.0" description = "Library for communicating with Redis Clusters. Built on top of redis-py lib" category = "main" -optional = false +optional = true python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4" [package.dependencies] @@ -592,7 +607,7 @@ python-versions = "*" [[package]] name = "requests" -version = "2.26.0" +version = "2.27.1" description = "Python HTTP for Humans." category = "main" optional = false @@ -672,7 +687,7 @@ python-versions = "*" [[package]] name = "ujson" -version = "4.1.0" +version = "4.3.0" description = "Ultra fast JSON encoder and decoder for Python" category = "main" optional = false @@ -688,7 +703,7 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" [package.extras] brotli = ["brotlipy (>=0.6.0)"] -secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"] +secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] @@ -708,8 +723,8 @@ optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" [package.extras] +dev = ["coverage", "pallets-sphinx-themes", "pytest", "pytest-timeout", "sphinx", "sphinx-issues", "tox"] watchdog = ["watchdog"] -dev = ["sphinx-issues", "pallets-sphinx-themes", "sphinx", "tox", "coverage", "pytest-timeout", "pytest"] [[package]] name = "zipp" @@ -720,17 +735,24 @@ optional = false python-versions = ">=3.6" [package.extras] -docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] -testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-cov", "pytest-enabler (>=1.0.1)", "jaraco.itertools", "func-timeout", "pytest-black (>=0.3.7)", "pytest-mypy"] +docs = ["jaraco.packaging (>=8.2)", "rst.linker (>=1.9)", "sphinx"] +testing = ["func-timeout", "jaraco.itertools", "pytest (>=4.6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.0.1)", "pytest-flake8", "pytest-mypy"] + +[extras] +redis3 = ["redis-py-cluster"] [metadata] lock-version = "1.1" python-versions = ">= 3.6, < 3.8" -content-hash = "f721517ea7b3b399c454d39eb72f5c99517aaf5d2cf8a875734b4e0cc2107f84" +content-hash = "fc74dba2b47e2cbc204e1209f60c09f7bcda542f415eddb8dd7883557e47a8e5" [metadata.files] amqp = [] appdirs = [] +asgiref = [ + {file = "asgiref-3.4.1-py3-none-any.whl", hash = "sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214"}, + {file = "asgiref-3.4.1.tar.gz", hash = "sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9"}, +] atomicwrites = [] attrs = [] bamboo-engine = [] @@ -745,7 +767,10 @@ click = [] colorama = [] coverage = [] dataclasses = [] -django = [] +django = [ + {file = "Django-3.2.25-py3-none-any.whl", hash = "sha256:a52ea7fcf280b16f7b739cec38fa6d3f8953a5456986944c3ca97e79882b4e38"}, + {file = "Django-3.2.25.tar.gz", hash = "sha256:7ca38a78654aee72378594d63e51636c04b8e28574f5505dff630895b5472777"}, +] django-celery-beat = [] django-timezone-field = [] factory-boy = [] diff --git a/runtime/bamboo-pipeline/pyproject.toml b/runtime/bamboo-pipeline/pyproject.toml index 16fbc202..6f7d4e62 100644 --- a/runtime/bamboo-pipeline/pyproject.toml +++ b/runtime/bamboo-pipeline/pyproject.toml @@ -10,23 +10,27 @@ packages = [ [tool.poetry.dependencies] python = ">= 3.6, < 3.8" -celery = "^4.4.0" -Django = "^2.2 || ^3.0" -requests = "^2.22.0" -django-celery-beat = "^2.1.0" +celery = ">=4.4.0, <6" +Django = ">=2.2, <5" +requests = "^2.22" +django-celery-beat = "^2.1" Mako = "^1.1.4" pytz = "2019.3" bamboo-engine = "2.6.1" jsonschema = "^2.5.1" -ujson = "4.1.*" -pyparsing = "^2.2.0" -redis = "^3.2.0" -redis-py-cluster = "2.1.0" -django-timezone-field = "^4.0" -Werkzeug = "^1.0.0" -prometheus-client = "^0.9.0" +ujson = "^4" +pyparsing = "^2.2" +redis = ">=3.2.0, <6" +django-timezone-field = "^4" +Werkzeug = "^1" +prometheus-client = "^0.9" boto3 = "^1.9.130" +redis-py-cluster = { version = "2.1.0", optional = true } + +[tool.poetry.extras] +redis3 = ["redis-py-cluster"] + [tool.poetry.dev-dependencies] pytest = "^6.2.2" black = "^20.8b1" diff --git a/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py b/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py index 7939a9a0..749178f5 100644 --- a/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py +++ b/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py @@ -14,7 +14,7 @@ import json from typing import Optional -from celery import task +from celery import current_app from bamboo_engine.eri import ExecuteInterruptPoint, ScheduleInterruptPoint from bamboo_engine.engine import Engine @@ -28,7 +28,7 @@ from .runtime import ChoasBambooDjangoRuntime -@task(ignore_result=True) +@current_app.task(ignore_result=True) def chaos_execute( process_id: int, node_id: str, @@ -70,7 +70,7 @@ def chaos_execute( ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def chaos_schedule( process_id: int, node_id: str,