From b931f9ae327bb11c274bbb7939b5fdcf542c2cd2 Mon Sep 17 00:00:00 2001 From: ymao2 Date: Tue, 25 Jul 2023 09:12:33 +0100 Subject: [PATCH 01/84] ANPL-0000 adding feature flag for message broker --- settings.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/settings.yaml b/settings.yaml index eebe69f6b..f7d02949f 100644 --- a/settings.yaml +++ b/settings.yaml @@ -11,6 +11,11 @@ enabled_features: _HOST_dev: false _HOST_prod: false _HOST_alpha: false + message_broker: + _DEFAULT: false + _HOST_dev: true + _HOST_prod: false + _HOST_alpha: false AWS_SERVICE_URL: From 4dd0f7ff0e1066a7ba9caa08f3ecabc239382aa6 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Tue, 25 Jul 2023 18:14:56 +0100 Subject: [PATCH 02/84] WIP celery --- controlpanel/__init__.py | 3 +++ controlpanel/api/__init__.py | 2 -- controlpanel/api/models/users3bucket.py | 25 +++++++++++++----- controlpanel/api/tasks.py | 13 ++++++++++ controlpanel/celery.py | 31 +++++++++++++++++++++++ controlpanel/frontend/consumers.py | 1 + controlpanel/frontend/tasks.py | 10 ++++++++ controlpanel/frontend/views/apps_mng.py | 4 ++- controlpanel/frontend/views/datasource.py | 3 +++ controlpanel/settings/common.py | 17 +++++++++++++ 10 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 controlpanel/api/tasks.py create mode 100644 controlpanel/celery.py create mode 100644 controlpanel/frontend/tasks.py diff --git a/controlpanel/__init__.py b/controlpanel/__init__.py index e69de29bb..fb989c4e6 100644 --- a/controlpanel/__init__.py +++ b/controlpanel/__init__.py @@ -0,0 +1,3 @@ +from .celery import app as celery_app + +__all__ = ('celery_app',) diff --git a/controlpanel/api/__init__.py b/controlpanel/api/__init__.py index 16b5858ba..e69de29bb 100644 --- a/controlpanel/api/__init__.py +++ b/controlpanel/api/__init__.py @@ -1,2 +0,0 @@ -# First-party/Local -import controlpanel.api.signals diff --git a/controlpanel/api/models/users3bucket.py b/controlpanel/api/models/users3bucket.py index 1e54f2f28..fc94ed4d2 100644 --- a/controlpanel/api/models/users3bucket.py +++ b/controlpanel/api/models/users3bucket.py @@ -44,12 +44,25 @@ def grant_bucket_access(self): bucket_arn=self.s3bucket.arn, access_level=self.access_level, ) - - cluster.User(self.user).grant_bucket_access( - self.s3bucket.arn, - self.access_level, - self.resources, - ) + from ..tasks import grant_user_bucket_access_task + # cluster.User(self.user).grant_bucket_access( + # self.s3bucket.arn, + # self.access_level, + # self.resources, + # ) + from controlpanel import celery_app + celery_app.send_task("controlpanel.api.tasks.grant_user_bucket_access_task", kwargs={ + "user_pk": self.user.pk, + "arn": self.s3bucket.arn, + "access_level": self.access_level, + "resources": self.resources, + }) + # grant_user_bucket_access_task.delay( + # self.user.pk, + # self.s3bucket.arn, + # self.access_level, + # self.resources, + # ) def revoke_bucket_access(self): cluster.User(self.user).revoke_bucket_access(self.s3bucket.arn) diff --git a/controlpanel/api/tasks.py b/controlpanel/api/tasks.py new file mode 100644 index 000000000..2a8b150bc --- /dev/null +++ b/controlpanel/api/tasks.py @@ -0,0 +1,13 @@ +from celery import shared_task +from controlpanel.api import cluster +from controlpanel.api.models import User + + +@shared_task() +def grant_user_bucket_access_task(user_pk, arn, access_level, resources): + user = User.objects.get(pk=user_pk) + cluster.User(user).grant_bucket_access( + arn, + access_level, + resources, + ) diff --git a/controlpanel/celery.py b/controlpanel/celery.py new file mode 100644 index 000000000..835afdb05 --- /dev/null +++ b/controlpanel/celery.py @@ -0,0 +1,31 @@ +import os + +from celery import Celery + +import dotenv +import django + + +app = Celery('controlpanels') + +dotenv.load_dotenv() + +# TODO update +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'controlpanel.settings.development') + +from controlpanel.utils import load_app_conf_from_file +load_app_conf_from_file() + +django.setup() +# Load task modules from all registered Django app configs. +app.config_from_object('django.conf:settings', namespace='CELERY') + +# Auto-discover tasks in all installed applications +app.autodiscover_tasks(["controlpanel.frontend", "controlpanel.api"]) + +# define in settings? +# app.conf.broker_url = 'redis://localhost:6379/0' + +@app.task(bind=True, ignore_result=True) +def debug_task(self): + print(f'Request: {self.request!r}') diff --git a/controlpanel/frontend/consumers.py b/controlpanel/frontend/consumers.py index a3f3895e7..b22c15e09 100644 --- a/controlpanel/frontend/consumers.py +++ b/controlpanel/frontend/consumers.py @@ -282,6 +282,7 @@ def update_home_status(home_directory, status): ) +# this used currently to run a task def start_background_task(task, message): async_to_sync(channel_layer.send)( "background_tasks", diff --git a/controlpanel/frontend/tasks.py b/controlpanel/frontend/tasks.py new file mode 100644 index 000000000..fc517321f --- /dev/null +++ b/controlpanel/frontend/tasks.py @@ -0,0 +1,10 @@ +from celery import shared_task +from time import sleep + + +@shared_task +def sleep_then_print(): + print("Going to sleep...") + sleep(3) + print("Awake!") + return "Done" diff --git a/controlpanel/frontend/views/apps_mng.py b/controlpanel/frontend/views/apps_mng.py index 93dbad396..22debf3a9 100644 --- a/controlpanel/frontend/views/apps_mng.py +++ b/controlpanel/frontend/views/apps_mng.py @@ -131,4 +131,6 @@ def _add_app_to_users(self, app, user): ) def _create_app_role(self, app): - cluster.App(app).create_iam_role() + # cluster.App(app).create_iam_role() + from controlpanel.frontend.tasks import create_app_role_task + create_app_role_task.delay() diff --git a/controlpanel/frontend/views/datasource.py b/controlpanel/frontend/views/datasource.py index 91ee55813..c3236de11 100644 --- a/controlpanel/frontend/views/datasource.py +++ b/controlpanel/frontend/views/datasource.py @@ -95,6 +95,9 @@ def get_context_data(self, *args, **kwargs): context["other_datasources"] = other_datasources context["other_datasources_admins"] = other_datasources_admins + + from ..tasks import sleep_then_print + sleep_then_print.delay() return context diff --git a/controlpanel/settings/common.py b/controlpanel/settings/common.py index 3256ba852..abadef24e 100644 --- a/controlpanel/settings/common.py +++ b/controlpanel/settings/common.py @@ -538,3 +538,20 @@ # volume name for the EFS directory for user homes EFS_VOLUME = os.environ.get("EFS_VOLUME") MAX_RELEASE_NAME_LEN = 53 + + +CELERY_BROKER_URL = "sqs://" +CELERY_CREATE_MISSING_QUEUES = False +CELERY_BROKER_TRANSPORT_OPTIONS = { + "region": "eu-west-1", + "queue_name_prefix": "django-", + "predefined_queues": { + "django-celery": { + "url": "https://sqs.eu-west-1.amazonaws.com/525294151996/django-celery" + }, + "mjc-test-1": { + "url": "https://sqs.eu-west-1.amazonaws.com/525294151996/mjc-test-1" + } + } +} +# CELERY_BROKER_URL = "redis://localhost:6379/0" From b03b4e8b05310c407638d15b8edc14aa0db76a46 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Wed, 26 Jul 2023 16:15:28 +0100 Subject: [PATCH 03/84] Fix celery settings to work with django --- controlpanel/celery.py | 20 ++++++-------------- controlpanel/settings/common.py | 1 - 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/controlpanel/celery.py b/controlpanel/celery.py index 835afdb05..2c4592074 100644 --- a/controlpanel/celery.py +++ b/controlpanel/celery.py @@ -1,30 +1,22 @@ import os - -from celery import Celery - import dotenv -import django +from celery import Celery -app = Celery('controlpanels') +from controlpanel.utils import load_app_conf_from_file dotenv.load_dotenv() - -# TODO update -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'controlpanel.settings.development') - -from controlpanel.utils import load_app_conf_from_file +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "controlpanel.settings") load_app_conf_from_file() -django.setup() +app = Celery('controlpanel') + # Load task modules from all registered Django app configs. app.config_from_object('django.conf:settings', namespace='CELERY') # Auto-discover tasks in all installed applications -app.autodiscover_tasks(["controlpanel.frontend", "controlpanel.api"]) +app.autodiscover_tasks() -# define in settings? -# app.conf.broker_url = 'redis://localhost:6379/0' @app.task(bind=True, ignore_result=True) def debug_task(self): diff --git a/controlpanel/settings/common.py b/controlpanel/settings/common.py index abadef24e..840806704 100644 --- a/controlpanel/settings/common.py +++ b/controlpanel/settings/common.py @@ -554,4 +554,3 @@ } } } -# CELERY_BROKER_URL = "redis://localhost:6379/0" From 64d291b59696f3564b4514d77a76ea2263a71e58 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Wed, 26 Jul 2023 16:47:09 +0100 Subject: [PATCH 04/84] Add celery with sqs to dependencies --- requirements.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index f03e4f583..ffbfe9ecf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,8 @@ asgiref==3.6.0 auth0-python==3.13.0 beautifulsoup4==4.12.2 -boto3==1.20.13 +boto3==1.26.143 +celery[sqs]==5.3.1 channels==4.0.0 channels-redis==4.0.0 daphne==4.0.0 @@ -33,5 +34,5 @@ pyyaml==6.0 rules==3.3 sentry-sdk==1.14.0 slackclient==2.8.1 -urllib3==1.26.11 +urllib3==1.26.16 uvicorn[standard]==0.20.0 From 3956f57ef327dcd5d1e8bd603d8865baa81c5264 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Wed, 26 Jul 2023 17:29:32 +0100 Subject: [PATCH 05/84] Fix app_old_url to use . in webapp-detail.html (#1187) --- controlpanel/frontend/jinja2/webapp-detail.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlpanel/frontend/jinja2/webapp-detail.html b/controlpanel/frontend/jinja2/webapp-detail.html index 7b7d0c3af..d4062721e 100644 --- a/controlpanel/frontend/jinja2/webapp-detail.html +++ b/controlpanel/frontend/jinja2/webapp-detail.html @@ -12,7 +12,7 @@ {% set page_name = "webapps" %} {% set page_title = app.name %} {% set app_domain = settings.APP_DOMAIN %} -{% set app_old_url = "https://" + app.slug + "/" + settings.APP_DOMAIN_BEFORE_MIGRATION %} +{% set app_old_url = "https://" + app.slug + "." + settings.APP_DOMAIN_BEFORE_MIGRATION %} {% set feature_enabled = settings.features.app_migration.enabled %} {% set app_admins_html %} From f998011c8b0b96033a7ca9f6b4d40c585f35d181 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Thu, 27 Jul 2023 14:26:12 +0100 Subject: [PATCH 06/84] Remove calls to tasks --- controlpanel/api/models/users3bucket.py | 24 +++++------------------ controlpanel/frontend/views/apps_mng.py | 4 +--- controlpanel/frontend/views/datasource.py | 2 -- 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/controlpanel/api/models/users3bucket.py b/controlpanel/api/models/users3bucket.py index fc94ed4d2..b7f585b98 100644 --- a/controlpanel/api/models/users3bucket.py +++ b/controlpanel/api/models/users3bucket.py @@ -44,25 +44,11 @@ def grant_bucket_access(self): bucket_arn=self.s3bucket.arn, access_level=self.access_level, ) - from ..tasks import grant_user_bucket_access_task - # cluster.User(self.user).grant_bucket_access( - # self.s3bucket.arn, - # self.access_level, - # self.resources, - # ) - from controlpanel import celery_app - celery_app.send_task("controlpanel.api.tasks.grant_user_bucket_access_task", kwargs={ - "user_pk": self.user.pk, - "arn": self.s3bucket.arn, - "access_level": self.access_level, - "resources": self.resources, - }) - # grant_user_bucket_access_task.delay( - # self.user.pk, - # self.s3bucket.arn, - # self.access_level, - # self.resources, - # ) + cluster.User(self.user).grant_bucket_access( + self.s3bucket.arn, + self.access_level, + self.resources, + ) def revoke_bucket_access(self): cluster.User(self.user).revoke_bucket_access(self.s3bucket.arn) diff --git a/controlpanel/frontend/views/apps_mng.py b/controlpanel/frontend/views/apps_mng.py index 22debf3a9..93dbad396 100644 --- a/controlpanel/frontend/views/apps_mng.py +++ b/controlpanel/frontend/views/apps_mng.py @@ -131,6 +131,4 @@ def _add_app_to_users(self, app, user): ) def _create_app_role(self, app): - # cluster.App(app).create_iam_role() - from controlpanel.frontend.tasks import create_app_role_task - create_app_role_task.delay() + cluster.App(app).create_iam_role() diff --git a/controlpanel/frontend/views/datasource.py b/controlpanel/frontend/views/datasource.py index c3236de11..9844a7811 100644 --- a/controlpanel/frontend/views/datasource.py +++ b/controlpanel/frontend/views/datasource.py @@ -96,8 +96,6 @@ def get_context_data(self, *args, **kwargs): context["other_datasources"] = other_datasources context["other_datasources_admins"] = other_datasources_admins - from ..tasks import sleep_then_print - sleep_then_print.delay() return context From 7068236de244c581ee0ba5cb7ec28f351d2cdb6a Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Thu, 27 Jul 2023 17:02:31 +0100 Subject: [PATCH 07/84] Add method to time functions --- controlpanel/frontend/views/apps_mng.py | 6 ++++++ controlpanel/utils.py | 13 +++++++++++++ 2 files changed, 19 insertions(+) diff --git a/controlpanel/frontend/views/apps_mng.py b/controlpanel/frontend/views/apps_mng.py index 93dbad396..07a5383a1 100644 --- a/controlpanel/frontend/views/apps_mng.py +++ b/controlpanel/frontend/views/apps_mng.py @@ -12,6 +12,7 @@ UserS3Bucket, ) from controlpanel.frontend.consumers import start_background_task +from controlpanel.utils import time_it class AppManager: @@ -86,10 +87,12 @@ def trigger_tasks_for_ip_range_update(self, user, pre_update_obj, updated_obj): def _create_app(self, **kwargs): return App.objects.create(**kwargs) + @time_it def _add_ip_allowlists(self, app, envs, ip_allowlists): for env in envs: AppIPAllowList.objects.update_records(app, env, ip_allowlists) + @time_it def _create_auth_settigs( self, app, envs, github_api_token, disable_authentication, connections ): @@ -100,6 +103,7 @@ def _create_auth_settigs( connections=connections, ) + @time_it def _create_or_link_datasource(self, app, user, bucket_data): if bucket_data.get("new_datasource_name"): bucket = S3Bucket.objects.create( @@ -123,6 +127,7 @@ def _create_or_link_datasource(self, app, user, bucket_data): access_level="readonly", ) + @time_it def _add_app_to_users(self, app, user): UserApp.objects.create( app=app, @@ -130,5 +135,6 @@ def _add_app_to_users(self, app, user): is_admin=True, ) + @time_it def _create_app_role(self, app): cluster.App(app).create_iam_role() diff --git a/controlpanel/utils.py b/controlpanel/utils.py index b2a2e1bd8..d6ed0ad9b 100644 --- a/controlpanel/utils.py +++ b/controlpanel/utils.py @@ -1,4 +1,5 @@ # Standard library +import time from base64 import b64encode import os import re @@ -216,3 +217,15 @@ def encrypt_data_by_using_public_key(public_key: str, data: str) -> str: sealed_box = public.SealedBox(public_key) encrypted = sealed_box.encrypt(data.encode("utf-8")) return b64encode(encrypted).decode("utf-8") + + +def time_it(func): + def wrapper(*args, **kwargs): + start = time.time() + result = func(*args, **kwargs) + end = time.time() + elapsed = end - start + print(f"{func.__name__} took {elapsed:.5f} secs") + return result + + return wrapper From b692b398e570c2c43ac05decace438591ec7d0f9 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Thu, 27 Jul 2023 17:03:04 +0100 Subject: [PATCH 08/84] Task to create AWS role for an app --- controlpanel/api/tasks.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/controlpanel/api/tasks.py b/controlpanel/api/tasks.py index 2a8b150bc..e20f1e58f 100644 --- a/controlpanel/api/tasks.py +++ b/controlpanel/api/tasks.py @@ -1,13 +1,27 @@ from celery import shared_task + from controlpanel.api import cluster -from controlpanel.api.models import User +from controlpanel.api.models import App + + +@shared_task(bind=True, max_retries=3) +def create_app_aws_role(self, app_pk): + try: + app = App.objects.get(pk=app_pk) + except App.DoesNotExist as exc: + # the app does not exist, so add message back to the queue. Is this the best way? + raise self.retry(exc=exc, countdown=5) + + # this will catch when a role already exists + cluster.App(app).create_iam_role() + +# @shared_task +# def grant_user_bucket_access_task(user_pk, arn, access_level, resources): +# user = User.objects.get(pk=user_pk) +# cluster.User(user).grant_bucket_access( +# arn, +# access_level, +# resources, +# ) -@shared_task() -def grant_user_bucket_access_task(user_pk, arn, access_level, resources): - user = User.objects.get(pk=user_pk) - cluster.User(user).grant_bucket_access( - arn, - access_level, - resources, - ) From dad2ec318f80a3fd9f7216e445a1028fa60f3c33 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Fri, 28 Jul 2023 13:00:18 +0100 Subject: [PATCH 09/84] Add tests for create_app_aws_role task --- tests/api/tasks/__init__.py | 0 tests/api/tasks/test_create_app_aws_role.py | 29 +++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 tests/api/tasks/__init__.py create mode 100644 tests/api/tasks/test_create_app_aws_role.py diff --git a/tests/api/tasks/__init__.py b/tests/api/tasks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/api/tasks/test_create_app_aws_role.py b/tests/api/tasks/test_create_app_aws_role.py new file mode 100644 index 000000000..c7095afa6 --- /dev/null +++ b/tests/api/tasks/test_create_app_aws_role.py @@ -0,0 +1,29 @@ +import pytest +from unittest.mock import patch +from celery.exceptions import Retry +from model_mommy import mommy + +from controlpanel.api.tasks import create_app_aws_role + + +@pytest.mark.django_db +@patch("controlpanel.api.tasks.create_app_aws_role.retry") +@patch("controlpanel.api.tasks.cluster") +def test_retry_when_app_does_not_exist(cluster, retry): + retry.side_effect = Retry + + with pytest.raises(Retry): + create_app_aws_role(app_pk=1) + + cluster.App.assert_not_called() + + +@pytest.mark.django_db +@patch("controlpanel.api.tasks.cluster") +def test_app_exists(cluster): + app = mommy.make("api.App") + + create_app_aws_role(app_pk=app.pk) + + cluster.App.assert_called_once_with(app) + cluster.App.return_value.create_iam_role.assert_called_once() From ba4d260b9c0dd410d75de8008b71bf0013c41609 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Mon, 31 Jul 2023 13:50:47 +0100 Subject: [PATCH 10/84] Add create_app_auth_settings task --- controlpanel/api/tasks.py | 30 +++++++++++++++++-- .../tasks/test_create_app_auth_settings.py | 0 2 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 tests/api/tasks/test_create_app_auth_settings.py diff --git a/controlpanel/api/tasks.py b/controlpanel/api/tasks.py index e20f1e58f..2589870d4 100644 --- a/controlpanel/api/tasks.py +++ b/controlpanel/api/tasks.py @@ -1,21 +1,47 @@ from celery import shared_task from controlpanel.api import cluster -from controlpanel.api.models import App +from controlpanel.api.models import App, User +# TODO use acks_late=True, acks_on_failure_or_timeout=False? +# this should mean that for any failure it is will be added back to the queue +# after SQS timeout visibility has passed. Could result in tasks being run forever +# unless SQS defines a limit? @shared_task(bind=True, max_retries=3) def create_app_aws_role(self, app_pk): try: app = App.objects.get(pk=app_pk) except App.DoesNotExist as exc: # the app does not exist, so add message back to the queue. Is this the best way? - raise self.retry(exc=exc, countdown=5) + raise self.retry(exc=exc, countdown=5, throw=False) # this will catch when a role already exists cluster.App(app).create_iam_role() +@shared_task(bind=True) +def create_app_auth_settings(self, app_pk, user_pk, envs, disable_authentication, connections): + try: + app = App.objects.get(pk=app_pk) + except App.DoesNotExist as exc: + # the app does not exist, so add message back to the queue. Is this the best way? + raise self.retry(exc=exc, countdown=5) + + user = User.objects.get(pk=user_pk) + if not user.github_api_token: + # user doesnt have a githubapi token, log this? + return + + for env in envs: + cluster.App(app, user.github_api_token).create_auth_settings( + env_name=env, + disable_authentication=disable_authentication, + connections=connections, + ) + + + # @shared_task # def grant_user_bucket_access_task(user_pk, arn, access_level, resources): # user = User.objects.get(pk=user_pk) diff --git a/tests/api/tasks/test_create_app_auth_settings.py b/tests/api/tasks/test_create_app_auth_settings.py new file mode 100644 index 000000000..e69de29bb From 64a408af6fa70203b27289499e52fd14be75d648 Mon Sep 17 00:00:00 2001 From: ymao2 Date: Mon, 31 Jul 2023 14:05:56 +0100 Subject: [PATCH 11/84] ANPL-0000 Add parts for sending message --- controlpanel/__init__.py | 5 + controlpanel/api/aws.py | 123 ++++++++++++- controlpanel/api/aws_auth.py | 13 +- controlpanel/api/message_broker.py | 167 ++++++++++++++++++ controlpanel/api/migrations/0030_task.py | 44 +++++ controlpanel/api/models/__init__.py | 1 + controlpanel/api/models/app.py | 35 ++++ controlpanel/api/models/apps3bucket.py | 12 +- controlpanel/api/models/s3bucket.py | 5 +- controlpanel/api/models/task.py | 50 ++++++ controlpanel/api/models/users3bucket.py | 13 +- controlpanel/api/tasks/__init__.py | 9 + controlpanel/api/tasks/app.py | 34 ++++ controlpanel/api/tasks/s3bucket.py | 66 +++++++ controlpanel/api/tasks/task_base.py | 74 ++++++++ controlpanel/api/tasks/user.py | 0 controlpanel/api/urls.py | 5 + controlpanel/api/views/__init__.py | 1 + controlpanel/api/views/tasks.py | 33 ++++ controlpanel/api/views/tool_deployments.py | 2 +- controlpanel/celery.py | 104 +++++++++++ controlpanel/frontend/consumers.py | 28 +-- .../frontend/jinja2/includes/app-list.html | 2 +- .../frontend/jinja2/includes/task-list.html | 41 +++++ .../frontend/jinja2/webapp-detail.html | 8 +- .../static/javascripts/modules/confirm.js | 8 +- .../static/javascripts/modules/track-tasks.js | 58 ++++++ controlpanel/frontend/views/app.py | 21 ++- controlpanel/frontend/views/apps_mng.py | 19 +- controlpanel/frontend/views/reset.py | 2 +- controlpanel/frontend/views/tool.py | 2 +- controlpanel/settings/common.py | 22 +++ controlpanel/utils.py | 24 +++ controlpanel/worker.py | 0 requirements.txt | 2 + 35 files changed, 972 insertions(+), 61 deletions(-) create mode 100644 controlpanel/api/message_broker.py create mode 100644 controlpanel/api/migrations/0030_task.py create mode 100644 controlpanel/api/models/task.py create mode 100644 controlpanel/api/tasks/__init__.py create mode 100644 controlpanel/api/tasks/app.py create mode 100644 controlpanel/api/tasks/s3bucket.py create mode 100644 controlpanel/api/tasks/task_base.py create mode 100644 controlpanel/api/tasks/user.py create mode 100644 controlpanel/api/views/tasks.py create mode 100644 controlpanel/celery.py create mode 100644 controlpanel/frontend/jinja2/includes/task-list.html create mode 100644 controlpanel/frontend/static/javascripts/modules/track-tasks.js create mode 100644 controlpanel/worker.py diff --git a/controlpanel/__init__.py b/controlpanel/__init__.py index e69de29bb..15d7c5085 100644 --- a/controlpanel/__init__.py +++ b/controlpanel/__init__.py @@ -0,0 +1,5 @@ +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app + +__all__ = ('celery_app',) diff --git a/controlpanel/api/aws.py b/controlpanel/api/aws.py index 2b85e0188..839c74769 100644 --- a/controlpanel/api/aws.py +++ b/controlpanel/api/aws.py @@ -307,16 +307,19 @@ def save_policy_document(self, policy_document): class AWSService: - def __init__(self, assume_role_name=None, profile_name=None): + def __init__(self, assume_role_name=None, profile_name=None, region_name=None): self.assume_role_name = assume_role_name self.profile_name = profile_name + self.region_name = region_name or settings.AWS_DEFAULT_REGION self.aws_sessions = AWSCredentialSessionSet() @property def boto3_session(self): return self.aws_sessions.get_session( - assume_role_name=self.assume_role_name, profile_name=self.profile_name + assume_role_name=self.assume_role_name, + profile_name=self.profile_name, + region_name=self.region_name ) @@ -808,3 +811,119 @@ def get_secret_if_found(self, secret_name: str) -> Optional[dict]: if self.has_existed(secret_name): return self.get_secret(secret_name) return {} + + +class AWSSQS(AWSService): + + def __init__(self, assume_role_name=None, profile_name=None): + super(AWSSQS, self).__init__( + assume_role_name=assume_role_name, + profile_name=profile_name, + region_name=settings.SQS_REGION + ) + self.client = self.boto3_session.resource("sqs") + + # def __init__(self, assume_role_name=None, profile_name=None): + # self.assume_role_name = assume_role_name + # self.profile_name = profile_name + # + # self.aws_sessions = AWSCredentialSessionSet(region_name=settings.SQS_REGION) + # self.client = self.boto3_session.resource("sqs") + # + def get_queue(self, name): + """ + Gets an SQS queue by name. + + :param name: The name that was used to create the queue. + :return: A Queue object. + """ + try: + queue = self.client.get_queue_by_name(QueueName=name) + log.info("Got queue '%s' with URL=%s", name, queue.url) + except botocore.exceptions.ClientError as error: + log.exception("Couldn't get queue named %s.", name) + raise error + else: + return queue + + def send_message(self, queue_name, message_body, message_attributes=None): + """ + Send a message to an Amazon SQS queue. + + :param queue_name: The queue that receives the message. + :param message_body: The body text of the message. + :param message_attributes: Custom attributes of the message. These are key-value + pairs that can be whatever you want. + :return: The response from SQS that contains the assigned message ID. + """ + if not message_attributes: + message_attributes = {} + + queue = self.get_queue(name=queue_name) + try: + response = queue.send_message( + MessageBody=message_body, + MessageAttributes=message_attributes + ) + except botocore.exceptions.ClientError as error: + log.exception("Send message failed: %s", message_body) + raise error + else: + return response + + def receive_messages(self, queue_name, max_number, wait_time): + """ + Receive a batch of messages in a single request from an SQS queue. + + :param queue_name: The queue from which to receive messages. + :param max_number: The maximum number of messages to receive. The actual number + of messages received might be less. + :param wait_time: The maximum time to wait (in seconds) before returning. When + this number is greater than zero, long polling is used. This + can result in reduced costs and fewer false empty responses. + :return: The list of Message objects received. These each contain the body + of the message and metadata and custom attributes. + """ + queue = self.get_queue(name=queue_name) + try: + messages = queue.receive_messages( + MessageAttributeNames=['All'], + MaxNumberOfMessages=max_number, + WaitTimeSeconds=wait_time + ) + for msg in messages: + log.info("Received message: %s: %s", msg.message_id, msg.body) + except botocore.exceptions.ClientError as error: + log.exception("Couldn't receive messages from queue: %s", queue_name) + raise error + else: + return messages + + def delete_messages(self, queue, messages): + """ + Delete a batch of messages from a queue in a single request. + + :param queue: The queue from which to delete the messages. + :param messages: The list of messages to delete. + :return: The response from SQS that contains the list of successful and failed + message deletions. + """ + try: + entries = [{ + 'Id': str(ind), + 'ReceiptHandle': msg.receipt_handle + } for ind, msg in enumerate(messages)] + response = queue.delete_messages(Entries=entries) + if 'Successful' in response: + for msg_meta in response['Successful']: + log.info("Deleted %s", messages[int(msg_meta['Id'])].receipt_handle) + if 'Failed' in response: + for msg_meta in response['Failed']: + log.warning( + "Could not delete %s", + messages[int(msg_meta['Id'])].receipt_handle + ) + except botocore.exceptions.ClientError: + log.exception("Couldn't delete messages from queue %s", queue) + else: + return response diff --git a/controlpanel/api/aws_auth.py b/controlpanel/api/aws_auth.py index 97b8b0d76..ee0881c18 100644 --- a/controlpanel/api/aws_auth.py +++ b/controlpanel/api/aws_auth.py @@ -113,8 +113,13 @@ class AWSCredentialSessionSet(metaclass=SingletonMeta): def __init__(self): self.credential_sessions = {} - def get_session(self, profile_name: str = None, assume_role_name: str = None): - credential_session_key = "{}_{}".format(profile_name, assume_role_name) + def get_session( + self, + profile_name: str = None, + assume_role_name: str = None, + region_name: str = None): + credential_session_key = "{}_{}_{}".format( + profile_name, assume_role_name, region_name) if credential_session_key not in self.credential_sessions: log.warn( "(for monitoring purpose) Initialising the session ({})".format( @@ -122,6 +127,8 @@ def get_session(self, profile_name: str = None, assume_role_name: str = None): ) ) self.credential_sessions[credential_session_key] = BotoSession( - profile_name=profile_name, assume_role_name=assume_role_name + region_name=region_name, + profile_name=profile_name, + assume_role_name=assume_role_name ).refreshable_session() return self.credential_sessions[credential_session_key] diff --git a/controlpanel/api/message_broker.py b/controlpanel/api/message_broker.py new file mode 100644 index 000000000..f61efc296 --- /dev/null +++ b/controlpanel/api/message_broker.py @@ -0,0 +1,167 @@ +import os +import socket +import uuid +import json +import base64 +from collections.abc import Mapping + +from controlpanel.api.aws import AWSSQS + + +class MessageProtocolError(Exception): + pass + + +class MessageProtocol: + + def __init__(self, task_id: str, task_name: str, queue_name: str, args=None, kwargs=None): + self.task_id = task_id + self.task_name = task_name + self.queue_name = queue_name + self.args = args or () + self.kwargs = kwargs or {} + self. _validate() + + def _validate(self): + if not isinstance(self.args, (list, tuple)): + raise TypeError('task args must be a list or tuple') + if not isinstance(self.kwargs, Mapping): + raise TypeError('task keyword arguments must be a mapping') + try: + uuid.UUID(str(self.task_id)) + except ValueError: + raise TypeError('task id arguments must be a uuid') + if not self.task_name: + raise TypeError('task name arguments must not be blank') + if not self.queue_name: + raise TypeError('queue name arguments must not be blank') + + def prepare_message(self): + raise NotImplementedError("Note implemented") + + +class SimpleBase64: + """Base64 codec.""" + + @staticmethod + def str_to_bytes(s): + """Convert str to bytes.""" + if isinstance(s, str): + return s.encode() + return s + + @staticmethod + def bytes_to_str(s): + """Convert bytes to str.""" + if isinstance(s, bytes): + return s.decode(errors='replace') + return s + + def encode(self, s): + return self.bytes_to_str(base64.b64encode(self.str_to_bytes(s))) + + def decode(self, s): + return base64.b64decode(self.str_to_bytes(s)) + + +class CeleryTaskMessage(MessageProtocol): + + #: Default body encoding. + DEFAULT_BODY_ENCODING = 'base64' + DEFAULT_CONTENT_TYPE = "application/json" + DEFAULT_CONTENT_ENCODING = "utf-8" + DEFAULT_PRIORITY = 0 + + DEFAULT_FUNC_SIGNATURE = None + + codecs = {'base64': SimpleBase64()} + + @staticmethod + def _anon_nodename(): + """Return the nodename for this process (not a worker). + + This is used for e.g. the origin task message field. + """ + return f"{os.getpid()}@{socket.gethostname()}" + + def _init_message(self): + headers = { + 'lang': 'py', + 'task': self.task_name, + 'id': self.task_id, + 'group': None, + 'root_id': self.task_id, + 'parent_id': None, + 'origin': self._anon_nodename() + } + + message = dict( + headers=headers, + properties={ + 'correlation_id': self.task_id, + }, + body=( + self.args, self.kwargs, { + 'callbacks': self.DEFAULT_FUNC_SIGNATURE, + 'errbacks': self.DEFAULT_FUNC_SIGNATURE, + 'chain': self.DEFAULT_FUNC_SIGNATURE, + 'chord': self.DEFAULT_FUNC_SIGNATURE, + }, + ) + ) + return message + + def prepare_message(self): + message = self._init_message() + properties = message.get("properties") or {} + info = properties.setdefault('delivery_info', {}) + info['priority'] = self.DEFAULT_PRIORITY or 0 + message['content-encoding'] = self.DEFAULT_CONTENT_ENCODING + message['content-type'] = self.DEFAULT_CONTENT_TYPE + message['body'], body_encoding = self.encode_body( + json.dumps(message['body']), self.DEFAULT_BODY_ENCODING + ) + props = message['properties'] + props.update( + body_encoding=body_encoding, + delivery_tag=str(uuid.uuid4()), + ) + props['delivery_info'].update( + routing_key=self.queue_name, + ) + encoded_message, _ = self.encode_body(json.dumps(message), self.DEFAULT_BODY_ENCODING) + return encoded_message + + def encode_body(self, body, encoding=None): + return self.codecs.get(encoding).encode(body), encoding + + +class MessageBrokerClient: + DEFAULT_MESSAGE_PROTOCOL = "celery" + + MESSAGE_PROTOCOL_MAP_TABLE = { + "celery": CeleryTaskMessage + } + + def __init__(self, message_protocol=None): + self.message_protocol = message_protocol or self.DEFAULT_MESSAGE_PROTOCOL + self.client = self._get_client() + + def _get_client(self): + return AWSSQS() + + def send_message(self, task_id, task_name, queue_name, args): + message_class = self.MESSAGE_PROTOCOL_MAP_TABLE.get(self.message_protocol) + if not message_class: + raise MessageProtocolError("Not support!") + + message = message_class( + task_id=task_id, + task_name=task_name, + queue_name=queue_name, + args=tuple(args) + ).prepare_message() + self._get_client().send_message(queue_name=queue_name, message_body=message) + return message + + diff --git a/controlpanel/api/migrations/0030_task.py b/controlpanel/api/migrations/0030_task.py new file mode 100644 index 000000000..d231a8915 --- /dev/null +++ b/controlpanel/api/migrations/0030_task.py @@ -0,0 +1,44 @@ +# Generated by Django 4.2.1 on 2023-07-30 14:52 + +from django.db import migrations, models +import django_extensions.db.fields + + +class Migration(migrations.Migration): + dependencies = [ + ("api", "0029_remove_tool_target_infrastructure"), + ] + + operations = [ + migrations.CreateModel( + name="Task", + fields=[ + ( + "created", + django_extensions.db.fields.CreationDateTimeField( + auto_now_add=True, verbose_name="created" + ), + ), + ( + "modified", + django_extensions.db.fields.ModificationDateTimeField( + auto_now=True, verbose_name="modified" + ), + ), + ("entity_class", models.CharField(max_length=20)), + ("entity_description", models.CharField(max_length=128)), + ("entity_id", models.BigIntegerField()), + ("user_id", models.CharField(max_length=128)), + ("task_id", models.UUIDField(primary_key=True, serialize=False)), + ("task_name", models.CharField(max_length=60)), + ("task_description", models.CharField(max_length=128)), + ("queue_name", models.CharField(max_length=60)), + ("completed", models.BooleanField(default=False)), + ("message_body", models.CharField(max_length=4000)), + ], + options={ + "db_table": "control_panel_api_task", + "ordering": ("entity_class", "entity_id"), + }, + ), + ] diff --git a/controlpanel/api/models/__init__.py b/controlpanel/api/models/__init__.py index 139a18257..e9383894b 100644 --- a/controlpanel/api/models/__init__.py +++ b/controlpanel/api/models/__init__.py @@ -16,3 +16,4 @@ from controlpanel.api.models.userapp import UserApp from controlpanel.api.models.users3bucket import UserS3Bucket from controlpanel.api.models.app_ip_allowlist import AppIPAllowList +from controlpanel.api.models.task import Task diff --git a/controlpanel/api/models/app.py b/controlpanel/api/models/app.py index 98036d78d..1432febf3 100644 --- a/controlpanel/api/models/app.py +++ b/controlpanel/api/models/app.py @@ -13,6 +13,7 @@ from controlpanel.api import auth0, cluster from controlpanel.api.models import IPAllowlist from controlpanel.utils import github_repository_name, s3_slugify, webapp_release_name +from controlpanel.api import tasks class App(TimeStampedModel): @@ -34,6 +35,11 @@ class App(TimeStampedModel): # are not within the fields which will be searched frequently app_conf = models.JSONField(null=True) + # Non database field just for passing extra parameters + disable_authentication = False + connections = {} + user = None + DEFAULT_AUTH_CATEGORY = "primary" KEY_WORD_FOR_AUTH_SETTINGS = "auth_settings" @@ -43,6 +49,13 @@ class Meta: db_table = "control_panel_api_app" ordering = ("name",) + def __init__(self, *args, **kwargs): + """Overwrite this constructor to pass some non-field parameter""" + self.disable_authentication = kwargs.pop("disable_authentication", False) + self.connections = kwargs.pop("connections", {}) + self.user = kwargs.pop("user", None) + super().__init__(*args, **kwargs) + def __repr__(self): return f"" @@ -303,3 +316,25 @@ class DeleteCustomerError(Exception): App.AddCustomerError = AddCustomerError App.DeleteCustomerError = DeleteCustomerError + + +from django.db.models.signals import post_save, post_delete +from django.dispatch import receiver + + +@receiver(post_save, sender=App) +def trigger_app_create_related_messages(sender, instance, created, **kwargs): + if created: + tasks.AppCreateRole(instance, instance.user).create_task() + tasks.AppCreateAuth(instance, instance.user, extra_data=dict( + disable_authentication=instance.disable_authentication, + connections=instance.connections + )).create_task() + + +@receiver(post_delete, sender=App) +def remove_app_related_tasks(sender, instance, **kwargs): + from controlpanel.api.models import Task + related_app_tasks = Task.objects.filter(entity_class="App", entity_id=instance.id) + for task in related_app_tasks: + task.delete() diff --git a/controlpanel/api/models/apps3bucket.py b/controlpanel/api/models/apps3bucket.py index 81740de15..fe60b0bf7 100644 --- a/controlpanel/api/models/apps3bucket.py +++ b/controlpanel/api/models/apps3bucket.py @@ -4,6 +4,7 @@ # First-party/Local from controlpanel.api import cluster from controlpanel.api.models.access_to_s3bucket import AccessToS3Bucket +from controlpanel.api import tasks class AppS3Bucket(AccessToS3Bucket): @@ -33,11 +34,12 @@ def __repr__(self): return f"" def grant_bucket_access(self): - cluster.App(self.app).grant_bucket_access( - self.s3bucket.arn, - self.access_level, - self.resources, - ) + tasks.S3BucketGrantToApp(self).create_task() + # cluster.App(self.app).grant_bucket_access( + # self.s3bucket.arn, + # self.access_level, + # self.resources, + # ) def revoke_bucket_access(self): cluster.App(self.app).revoke_bucket_access(self.s3bucket.arn) diff --git a/controlpanel/api/models/s3bucket.py b/controlpanel/api/models/s3bucket.py index b1dcb4776..4f8c7717e 100644 --- a/controlpanel/api/models/s3bucket.py +++ b/controlpanel/api/models/s3bucket.py @@ -13,6 +13,7 @@ from controlpanel.api import cluster, validators from controlpanel.api.models.apps3bucket import AppS3Bucket from controlpanel.api.models.users3bucket import UserS3Bucket +from controlpanel.api import tasks def s3bucket_console_url(name): @@ -133,7 +134,9 @@ def save(self, *args, **kwargs): if is_create: bucket_owner = kwargs.pop("bucket_owner", self.bucket_owner) - self.cluster.create(bucket_owner) + + # self.cluster.create(bucket_owner) + tasks.S3BucketCreate(self, self.created_by).create_task() # XXX created_by is always set if model is saved by the API view if self.created_by: diff --git a/controlpanel/api/models/task.py b/controlpanel/api/models/task.py new file mode 100644 index 000000000..815d48be1 --- /dev/null +++ b/controlpanel/api/models/task.py @@ -0,0 +1,50 @@ +import json + +# Third-party +from django.db import models +from django_extensions.db.models import TimeStampedModel + +from controlpanel.utils import send_sse + + +class Task(TimeStampedModel): + """ + Use the task table to track the basic status of task fired from the app + """ + + entity_class = models.CharField(max_length=20) + entity_description = models.CharField(max_length=128) + entity_id = models.BigIntegerField() + user_id = models.CharField(max_length=128) + task_id = models.UUIDField(primary_key=True) + task_name = models.CharField(max_length=60) + task_description = models.CharField(max_length=128) + queue_name = models.CharField(max_length=60) + completed = models.BooleanField(default=False) + message_body = models.CharField(max_length=4000) + + class Meta: + db_table = "control_panel_api_task" + ordering = ("entity_class", "entity_id") + + def __repr__(self): + return f"" + + def save(self, *args, **kwargs): + super().save(*args, **kwargs) + + if self.completed: + payload = { + "entity_name": self.entity_description, + "task_description": self.task_description, + "status": "COMPLETED", + } + send_sse( + self.user_id, + { + "event": "taskStatus", + "data": json.dumps(payload), + }, + ) + + diff --git a/controlpanel/api/models/users3bucket.py b/controlpanel/api/models/users3bucket.py index 1e54f2f28..63098e651 100644 --- a/controlpanel/api/models/users3bucket.py +++ b/controlpanel/api/models/users3bucket.py @@ -4,6 +4,7 @@ # First-party/Local from controlpanel.api import cluster from controlpanel.api.models.access_to_s3bucket import AccessToS3Bucket +from controlpanel.api import tasks class UserS3Bucket(AccessToS3Bucket): @@ -44,12 +45,12 @@ def grant_bucket_access(self): bucket_arn=self.s3bucket.arn, access_level=self.access_level, ) - - cluster.User(self.user).grant_bucket_access( - self.s3bucket.arn, - self.access_level, - self.resources, - ) + tasks.S3BucketGrantToUser(self, self.user).create_task() + # cluster.User(self.user).grant_bucket_access( + # self.s3bucket.arn, + # self.access_level, + # self.resources, + # ) def revoke_bucket_access(self): cluster.User(self.user).revoke_bucket_access(self.s3bucket.arn) diff --git a/controlpanel/api/tasks/__init__.py b/controlpanel/api/tasks/__init__.py new file mode 100644 index 000000000..18ba2e6ca --- /dev/null +++ b/controlpanel/api/tasks/__init__.py @@ -0,0 +1,9 @@ + +from controlpanel.api.tasks.app import AppCreateRole, AppCreateAuth +from controlpanel.api.tasks.s3bucket import ( + S3BucketCreate, + S3BucketGrantToUser, + S3BucketGrantToApp, + S3BucketRevokeFromUser, + S3BucketRevokeFromApp +) diff --git a/controlpanel/api/tasks/app.py b/controlpanel/api/tasks/app.py new file mode 100644 index 000000000..55b97e398 --- /dev/null +++ b/controlpanel/api/tasks/app.py @@ -0,0 +1,34 @@ +from controlpanel.api.tasks.task_base import TaskBase + + +class AppCreateRole(TaskBase): + ENTITY_CLASS = "App" + + @property + def task_name(self): + return "controlpanel.celery.create_app_role" + + @property + def task_description(self): + return "creating aws role" + + +class AppCreateAuth(AppCreateRole): + + # QUEUE_NAME = "auth_queue" + + @property + def task_name(self): + return "controlpanel.celery.create_auth_settings" + + def _get_args_list(self): + return [ + self.entity.id, + self.user.id, + self.extra_data.get('disable_authentication'), + self.extra_data.get('connections'), + ] + + @property + def task_description(self): + return "creating auth settings" diff --git a/controlpanel/api/tasks/s3bucket.py b/controlpanel/api/tasks/s3bucket.py new file mode 100644 index 000000000..fdca1232f --- /dev/null +++ b/controlpanel/api/tasks/s3bucket.py @@ -0,0 +1,66 @@ +from controlpanel.api.tasks.task_base import TaskBase + + +class S3BucketCreate(TaskBase): + ENTITY_CLASS = "S3Bucket" + QUEUE_NAME = "s3_queue" + + @property + def task_name(self): + return "controlpanel.celery.create_s3bucket" + + @property + def task_description(self): + return "creating s3 bucket" + + +class S3BucketGrantToUser(TaskBase): + ENTITY_CLASS = "UserS3Bucket" + QUEUE_NAME = "iam_queue" + + @property + def task_name(self): + return "controlpanel.celery.s3bucket_grant_to_user" + + @property + def task_description(self): + return "granting access to the user" + + +class S3BucketGrantToApp(TaskBase): + ENTITY_CLASS = "AppS3Bucket" + QUEUE_NAME = "iam_queue" + + @property + def task_name(self): + return "controlpanel.celery.s3bucket_grant_to_app" + + @property + def task_description(self): + return "granting access to the app" + + +class S3BucketRevokeFromUser(TaskBase): + ENTITY_CLASS = "UserS3Bucket" + QUEUE_NAME = "iam_queue" + + @property + def task_name(self): + return "controlpanel.celery.s3bucket_revoke_from_user" + + @property + def task_description(self): + return "revoking access from the user" + + +class S3BucketRevokeFromApp(TaskBase): + ENTITY_CLASS = "AppS3Bucket" + QUEUE_NAME = "iam_queue" + + @property + def task_name(self): + return "controlpanel.celery.s3bucket_from_app" + + @property + def task_description(self): + return "revoking access from the app" diff --git a/controlpanel/api/tasks/task_base.py b/controlpanel/api/tasks/task_base.py new file mode 100644 index 000000000..d6ea807d9 --- /dev/null +++ b/controlpanel/api/tasks/task_base.py @@ -0,0 +1,74 @@ +import uuid +from django.conf import settings + +from controlpanel.api.message_broker import MessageBrokerClient +from controlpanel.api.models.task import Task + + +class TaskError(Exception): + pass + + +class TaskBase: + + QUEUE_NAME = settings.DEFAULT_QUEUE + ENTITY_CLASS = None + + def __init__(self, entity, user=None, extra_data=None): + self._message_broker_client = None + self.entity = entity + self.user = user + self.extra_data = extra_data + self._validate() + + def _validate(self): + if not self.entity: + raise TaskError("Please provide entity instance") + if self.user and self.user.__class__.__name__ != "User": + raise TaskError("The instance has to be user class") + if self.entity.__class__.__name__ != self.ENTITY_CLASS: + raise TaskError(f"The instance has to be {self.ENTITY_CLASS} class") + + @property + def task_id(self): + return str(uuid.uuid4()) + + @property + def task_description(self): + raise NotImplementedError("Not implemented") + + @property + def task_name(self): + raise NotImplementedError("Not implemented") + + @property + def message_broker_client(self): + if self._message_broker_client is None: + self._message_broker_client = MessageBrokerClient() + return self._message_broker_client + + def _get_args_list(self): + args = [self.entity.id] + if self.user: + args.append(self.user.id) + return args + + def create_task(self): + task_id = self.task_id + message = self.message_broker_client.send_message( + task_id=task_id, + task_name=self.task_name, + queue_name=self.QUEUE_NAME, + args=self._get_args_list() + ) + Task.objects.create( + entity_class=self.ENTITY_CLASS, + entity_description=self.entity.name, + entity_id=self.entity.id, + user_id=self.user.auth0_id if self.user else 'None', + task_id=task_id, + task_description=self.task_description, + task_name=self.task_name, + queue_name=self.QUEUE_NAME, + message_body=message + ) diff --git a/controlpanel/api/tasks/user.py b/controlpanel/api/tasks/user.py new file mode 100644 index 000000000..e69de29bb diff --git a/controlpanel/api/urls.py b/controlpanel/api/urls.py index 89b551751..d8031516d 100644 --- a/controlpanel/api/urls.py +++ b/controlpanel/api/urls.py @@ -36,4 +36,9 @@ views.ToolDeploymentAPIView.as_view(), name="tool-deployments", ), + path( + "tasks//", + views.TaskAPIView.as_view(), + name="tasks", + ), ] diff --git a/controlpanel/api/views/__init__.py b/controlpanel/api/views/__init__.py index bffde87fb..cd76283df 100644 --- a/controlpanel/api/views/__init__.py +++ b/controlpanel/api/views/__init__.py @@ -16,3 +16,4 @@ from controlpanel.api.views.tools import ToolViewSet from controlpanel.api.views.apps import AppByNameViewSet from controlpanel.api.views.repos import RepoApi, RepoEnvironmentAPI +from controlpanel.api.views.tasks import TaskAPIView diff --git a/controlpanel/api/views/tasks.py b/controlpanel/api/views/tasks.py new file mode 100644 index 000000000..69cb75162 --- /dev/null +++ b/controlpanel/api/views/tasks.py @@ -0,0 +1,33 @@ +# Third-party +from rest_framework import status +from rest_framework.generics import GenericAPIView +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from controlpanel.api.models import Task +from controlpanel.api.message_broker import MessageBrokerClient + + +class TaskAPIView(GenericAPIView): + + http_method_names = ["post"] + permission_classes = (IsAuthenticated,) + + def _send_message(self, task_id): + task = Task.objects.filter(task_id=task_id).first() + if task: + message_client = MessageBrokerClient() + message_client.client.send_message( + queue_name=task.queue_name, + message_body=task.message_body + ) + + def post(self, request, *args, **kwargs): + task_id = self.kwargs["task_id"] + task_action = self.kwargs["action"] + task_action_function = getattr(self, f"_{task_action}", None) + if task_action_function and callable(task_action_function): + task_action_function(task_id) + return Response(status=status.HTTP_200_OK) + else: + return Response(status=status.HTTP_400_BAD_REQUEST) diff --git a/controlpanel/api/views/tool_deployments.py b/controlpanel/api/views/tool_deployments.py index 4444f71a8..6d81e1981 100644 --- a/controlpanel/api/views/tool_deployments.py +++ b/controlpanel/api/views/tool_deployments.py @@ -6,7 +6,7 @@ # First-party/Local from controlpanel.api import serializers -from controlpanel.frontend.consumers import start_background_task +from controlpanel.utils import start_background_task class ToolDeploymentAPIView(GenericAPIView): diff --git a/controlpanel/celery.py b/controlpanel/celery.py new file mode 100644 index 000000000..52f260df6 --- /dev/null +++ b/controlpanel/celery.py @@ -0,0 +1,104 @@ +import os +import dotenv +from celery import Celery +from time import time +import json + +# First-party/Local +from controlpanel.utils import load_app_conf_from_file +dotenv.load_dotenv() + + +# Set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'controlpanel.settings') +load_app_conf_from_file() + +app = Celery('controlpanel') + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +app.config_from_object('django.conf:settings') +# Load task modules from all registered Django apps. +app.autodiscover_tasks() + + +@app.task(bind=True, ignore_result=True) +def debug_task(self): + print(f'Request: {self.request!r}') + + +@app.task(bind=True) +def create_app_role(self, app_id, user_id): + from controlpanel.api.models.app import App + app = App.objects.get(pk=app_id) + app.description = json.dumps({"app_role": {"task_id": self.request.id, "time": str(int(time()))}}) + app.save() + from controlpanel.api.models.task import Task + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() + + +@app.task(bind=True) +def create_auth_settings(self, app_id, user_id, auth_flag, connections): + from controlpanel.api.models.app import App + app = App.objects.get(pk=app_id) + app.description = json.dumps({"app_auth": {"task_id": self.request.id, "time": str(int(time()))}}) + app.save() + + from controlpanel.api.models.task import Task + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() + + +@app.task(bind=True) +def create_s3bucket(self, s3bucket_id, user_id): + from controlpanel.api.models.s3bucket import S3Bucket + from controlpanel.api.models.task import Task + s3bucket = S3Bucket.objects.get(pk=s3bucket_id) + if s3bucket: + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() + + +@app.task(bind=True) +def create_s3bucket(self, s3bucket_id): + from controlpanel.api.models.s3bucket import S3Bucket + from controlpanel.api.models.task import Task + s3bucket = S3Bucket.objects.get(pk=s3bucket_id) + if s3bucket: + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() + + +@app.task(bind=True) +def s3bucket_grant_to_user(self, users3bucket_id, user_id): + from controlpanel.api.models.users3bucket import UserS3Bucket + from controlpanel.api.models.task import Task + s3bucket = UserS3Bucket.objects.get(pk=users3bucket_id) + if s3bucket: + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() + + +@app.task(bind=True) +def s3bucket_grant_to_app(self, apps3bucket_id): + from controlpanel.api.models.apps3bucket import AppS3Bucket + from controlpanel.api.models.task import Task + s3bucket = AppS3Bucket.objects.get(pk=apps3bucket_id) + if s3bucket: + task = Task.objects.filter(task_id=self.request.id).first() + if task: + task.completed = True + task.save() diff --git a/controlpanel/frontend/consumers.py b/controlpanel/frontend/consumers.py index a3f3895e7..15612a034 100644 --- a/controlpanel/frontend/consumers.py +++ b/controlpanel/frontend/consumers.py @@ -9,9 +9,9 @@ # Third-party import structlog -from asgiref.sync import async_to_sync +# from asgiref.sync import async_to_sync from channels.consumer import SyncConsumer -from channels.layers import get_channel_layer +# from channels.layers import get_channel_layer from django.db import transaction # First-party/Local @@ -31,12 +31,12 @@ ToolDeployment, User, ) -from controlpanel.utils import PatchedAsyncHttpConsumer, sanitize_dns_label +from controlpanel.utils import (PatchedAsyncHttpConsumer, sanitize_dns_label, send_sse) WORKER_HEALTH_FILENAME = "/tmp/worker_health.txt" -channel_layer = get_channel_layer() +# channel_layer = get_channel_layer() log = structlog.getLogger(__name__) @@ -239,16 +239,6 @@ def workers_health(self, message): log.debug("Worker health ping task executed") -def send_sse(user_id, event): - """ - Tell the SSEConsumer to send an event to the specified user - """ - async_to_sync(channel_layer.group_send)( - sanitize_dns_label(user_id), - {"type": "sse.event", **event}, - ) - - def update_tool_status(tool_deployment, id_token, status): user = tool_deployment.user tool = tool_deployment.tool @@ -282,16 +272,6 @@ def update_home_status(home_directory, status): ) -def start_background_task(task, message): - async_to_sync(channel_layer.send)( - "background_tasks", - { - "type": task, - **message, - }, - ) - - def wait_for_deployment(tool_deployment, id_token): status = TOOL_DEPLOYING while status == TOOL_DEPLOYING: diff --git a/controlpanel/frontend/jinja2/includes/app-list.html b/controlpanel/frontend/jinja2/includes/app-list.html index 0954212d6..f5338ec20 100644 --- a/controlpanel/frontend/jinja2/includes/app-list.html +++ b/controlpanel/frontend/jinja2/includes/app-list.html @@ -7,7 +7,7 @@ {% macro app_list(apps, user) %} {%- set num_apps = apps|length %} - +
diff --git a/controlpanel/frontend/jinja2/includes/task-list.html b/controlpanel/frontend/jinja2/includes/task-list.html new file mode 100644 index 000000000..cb6cd7fc3 --- /dev/null +++ b/controlpanel/frontend/jinja2/includes/task-list.html @@ -0,0 +1,41 @@ +{% macro task_list(tasks, csrf_input) %} +{%- set num_tasks = tasks|length %} +
App name
+ + + + + + + + + {%- for task in tasks %} + + + + + + {% endfor %} + + + + + + +
Task descriptionCreate time + Action +
{{ task.task_description }}{{ task.created }} +
+ {{ csrf_input }} + +
+
+ {{ num_tasks }} task{% if num_tasks != 1 %}s{% endif %} +
+{% endmacro %} diff --git a/controlpanel/frontend/jinja2/webapp-detail.html b/controlpanel/frontend/jinja2/webapp-detail.html index 7b7d0c3af..461712ec5 100644 --- a/controlpanel/frontend/jinja2/webapp-detail.html +++ b/controlpanel/frontend/jinja2/webapp-detail.html @@ -8,12 +8,14 @@ {% from "user/macro.html" import user_name %} {% from "includes/yesno.html" import yes_no %} {% from "includes/app-deployment-settings.html" import app_deployment_settings %} +{% from "includes/task-list.html" import task_list %} {% set page_name = "webapps" %} {% set page_title = app.name %} {% set app_domain = settings.APP_DOMAIN %} {% set app_old_url = "https://" + app.slug + "/" + settings.APP_DOMAIN_BEFORE_MIGRATION %} {% set feature_enabled = settings.features.app_migration.enabled %} +{% set mq_feature_enabled = settings.features.message_broker.enabled %} {% set app_admins_html %} {% include "modals/app_admins.html" %} @@ -72,6 +74,10 @@

Migration status:

{% endif %} {% endif %} + {% if mq_feature_enabled and app_tasks%} +

Incomplete pp tasks

+ {{ task_list(app_tasks, csrf_input) }} + {% endif %} {% if feature_enabled and repo_access_error_msg %}
@@ -128,7 +134,7 @@

Deployment Pipeline

-
+
{% if feature_enabled %} {% if github_settings_access_error_msg %}