From adcbb49c60ac9d1a29b11a9c6f896b881a9c2049 Mon Sep 17 00:00:00 2001 From: Cristiano Singulani Date: Wed, 28 Aug 2024 16:41:16 -0300 Subject: [PATCH] Added Combine Specz pipeline (#295) * Added Combine Specz pipeline * changed submodule branch * changed submodule branch * changed submodule branch --- .gitmodules | 2 +- backend/core/fixtures/initial_data.yaml | 60 +++++++++- .../0039_release_indexing_column.py | 19 +++ backend/core/models/release.py | 2 + backend/core/tasks.py | 67 ++++++++--- backend/core/views/process.py | 108 +++++++++--------- backend/core/views/product.py | 26 ++--- backend/pzserver/celery.py | 13 ++- orchestration/pipelines | 2 +- 9 files changed, 208 insertions(+), 91 deletions(-) create mode 100644 backend/core/migrations/0039_release_indexing_column.py diff --git a/.gitmodules b/.gitmodules index aca4cd2..78aadd5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "orchestration/pipelines"] path = orchestration/pipelines url = https://github.com/linea-it/pzserver_pipelines - branch = training_set_maker + branch = main diff --git a/backend/core/fixtures/initial_data.yaml b/backend/core/fixtures/initial_data.yaml index 369ba85..d20bd21 100644 --- a/backend/core/fixtures/initial_data.yaml +++ b/backend/core/fixtures/initial_data.yaml @@ -5,6 +5,7 @@ display_name: Small Dataset description: Small dataset for example runs created_at: 2022-05-18 15:36:16.234786+00:00 + indexing_column: id - model: core.producttype pk: 1 fields: @@ -45,6 +46,61 @@ version: 0.0.1 description: Training Set Maker pipeline created_at: 2022-05-18 15:36:59.830913+00:00 - system_config: {"executor": {"local": {"n_workers": 2, "threads_per_worker": 2, "memory_limit": "1GiB"}, "linea-slurm": {"instance": {"cores": 54, "processes": 1, "memory": "123GiB", "queue": "cpu", "job_extra_directives": ["--propagate", "--time=2:00:00"]}, "adapt": {"maximum_jobs": 10}}}, "inputs": {"dataset": {"path": "/datasets/mini_dataset"}, "specz": [{"path": "/datasets/specz.parquet", "columns": {"ra": "ra", "dec": "dec"}}]}, "output_dir": "outputs", "param": {"suffixes": ["_specz", "_dataset"], "output_catalog_name": "tsm_cross_001", "radius_arcsec": 1, "n_neighbors": 1}} + system_config: + executor: + name: "local" + args: + n_workers: 2 + threads_per_worker: 2 + memory_limit: "1GiB" + inputs: + dataset: + path: "/datasets/data-example/mini_dataset" + columns: + id: "id" + specz: + - path: "/datasets/data-example/specz.parquet" + columns: + ra: "ra" + dec: "dec" + output_dir: "outputs" + param: + duplicate_criteria: "closest" + crossmatch: + output_catalog_name: "tsm_cross_001" + radius_arcsec: 1.0 + n_neighbors: 1 product_types_accepted: [1] - output_product_type: 2 \ No newline at end of file + output_product_type: 2 +- model: core.pipeline + pk: 2 + fields: + name: combine_specz + display_name: Combine Specz Catalogs + version: 0.0.1 + description: Combine Specz Catalogs pipeline + created_at: 2022-05-18 15:36:59.830913+00:00 + system_config: + executor: + name: "local" # or "slurm" + args: + n_workers: 2 + threads_per_worker: 2 + memory_limit: "1GiB" + inputs: + specz: + - path: "/datasets/specz.parquet" + columns: + ra: "ra" + dec: "dec" + z: "z" + - path: "/datasets/specz.parquet" + columns: + ra: "ra" + dec: "dec" + z: "z" + output_dir: "outputs" + param: + debug: true + product_types_accepted: [1] + output_product_type: 1 diff --git a/backend/core/migrations/0039_release_indexing_column.py b/backend/core/migrations/0039_release_indexing_column.py new file mode 100644 index 0000000..c3766b6 --- /dev/null +++ b/backend/core/migrations/0039_release_indexing_column.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0.6 on 2024-08-26 20:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0038_alter_process_path_alter_release_name'), + ] + + operations = [ + migrations.AddField( + model_name='release', + name='indexing_column', + field=models.CharField(default='id', max_length=255), + preserve_default=False, + ), + ] diff --git a/backend/core/models/release.py b/backend/core/models/release.py index a025874..62b3cc5 100644 --- a/backend/core/models/release.py +++ b/backend/core/models/release.py @@ -6,6 +6,8 @@ class Release(models.Model): display_name = models.CharField(max_length=255) description = models.TextField(null=True, blank=True) created_at = models.DateTimeField(auto_now_add=True) + indexing_column = models.CharField(max_length=255) + def __str__(self): return f"{self.display_name}" diff --git a/backend/core/tasks.py b/backend/core/tasks.py index 311980b..bee303e 100644 --- a/backend/core/tasks.py +++ b/backend/core/tasks.py @@ -11,21 +11,56 @@ from django.conf import settings from django.utils import dateparse, timezone -logger = logging.getLogger('beat') +logger = logging.getLogger("beat") maestro = Maestro(settings.ORCHEST_URL) +@shared_task() +def check_stopping(): + logger.info("Checking processes stopping...") + + procs_updated = [] + monitoring_statuses = ["Stopping"] + procs_stopping = Process.objects.filter(status__in=monitoring_statuses) + + for proc in procs_stopping: + logger.info(f"Consulting the {str(proc)} process status.") + proc_orches_id = proc.orchestration_process_id # type: ignore + + if not proc_orches_id: + message = f"Process {str(proc.pk)} without Orchestration ID." + logger.error(message) + proc.status = "Failed" + proc = update_dates(proc, {}) + proc.save() + continue + + proc_orchest = maestro.status(proc_orches_id) + proc_orchest_status = proc_orchest.get("status") # type: ignore + + logger.info(f"-> Process orchestration ID: {proc_orches_id}") + logger.info(f"-> Status: {proc_orchest_status}") + + if not proc_orchest_status in monitoring_statuses: + proc.status = proc_orchest_status + proc.save() + logger.info(f"-> Process {str(proc)} updated.") + procs_updated.append(proc_orches_id) + + return procs_updated + + @shared_task() def check_processes_finish(): logger.info("Checking running processes...") procs_updated = [] - active_statuses = ['Pending', 'Running'] + active_statuses = ["Pending", "Running"] procs_running = Process.objects.filter(status__in=active_statuses) for proc in procs_running: logger.info(f"Consulting the {str(proc)} process status.") - proc_orches_id = proc.orchestration_process_id # type: ignore + proc_orches_id = proc.orchestration_process_id # type: ignore if not proc_orches_id: message = f"Process {str(proc.pk)} without Orchestration ID." @@ -36,13 +71,13 @@ def check_processes_finish(): continue proc_orchest = maestro.status(proc_orches_id) - proc_orchest_status = proc_orchest.get('status') # type: ignore + proc_orchest_status = proc_orchest.get("status") # type: ignore logger.info(f"-> Process orchestration ID: {proc_orches_id}") logger.info(f"-> Status: {proc_orchest_status}") - - if proc_orchest_status == 'Running' and not proc.status: - started_at = proc_orchest.get('started_at', str(proc.created_at)) + + if proc_orchest_status == "Running" and not proc.status: + started_at = proc_orchest.get("started_at", str(proc.created_at)) proc.started_at = dateparse.parse_datetime(started_at) proc.save() @@ -58,8 +93,8 @@ def check_processes_finish(): def update_dates(process, data): - started_at = data.get('started_at', str(process.created_at)) - ended_at = data.get('ended_at', str(timezone.now())) + started_at = data.get("started_at", str(process.created_at)) + ended_at = data.get("ended_at", str(timezone.now())) process.started_at = dateparse.parse_datetime(started_at) process.ended_at = dateparse.parse_datetime(ended_at) return process @@ -82,18 +117,18 @@ def register_outputs(process_id): reg_product = RegistryProduct(process.upload.pk) process_file_dict = load_yaml(process_file) - outputs = process_file_dict.get('outputs', None) + outputs = process_file_dict.get("outputs", None) try: for output in outputs: - logger.debug('-> output: %s', output) - filepath = output.get('path') - rolename = output.get('role') - role_id = file_roles.get(rolename, file_roles.get('description')) + logger.debug("-> output: %s", output) + filepath = output.get("path") + rolename = output.get("role") + role_id = file_roles.get(rolename, file_roles.get("description")) upload_path = copy_upload(filepath, process.upload.path) reg_product.create_product_file(upload_path, role_id) process.upload.save() - + reg_product.registry() process.upload.status = 1 # Published status process.upload.save() @@ -108,7 +143,7 @@ def register_outputs(process_id): def copy_upload(filepath, upload_dir): filepath = pathlib.Path(filepath) new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name) - logger.debug('new_filepath -> %s', str(new_filepath)) + logger.debug("new_filepath -> %s", str(new_filepath)) shutil.copyfile(str(filepath), str(new_filepath)) return str(new_filepath) diff --git a/backend/core/views/process.py b/backend/core/views/process.py index ea6a663..ebe38cf 100644 --- a/backend/core/views/process.py +++ b/backend/core/views/process.py @@ -17,8 +17,7 @@ class ProcessFilter(filters.FilterSet): - release__isnull = filters.BooleanFilter( - field_name="release", lookup_expr="isnull") + release__isnull = filters.BooleanFilter(field_name="release", lookup_expr="isnull") pipeline__or = filters.CharFilter(method="filter_pipeline") pipeline = filters.CharFilter(method="filter_pipeline") release_name__or = filters.CharFilter(method="filter_release") @@ -35,23 +34,20 @@ class Meta: def filter_user(self, queryset, name, value): query = format_query_to_char( - name, value, - ["user__username", "user__first_name", "user__last_name"] + name, value, ["user__username", "user__first_name", "user__last_name"] ) return queryset.filter(query) def filter_pipeline(self, queryset, name, value): query = format_query_to_char( - name, value, - ["pipeline__display_name", "pipeline__name"] + name, value, ["pipeline__display_name", "pipeline__name"] ) return queryset.filter(query) def filter_release(self, queryset, name, value): - query = format_query_to_char( - name, value, ["release__display_name"]) + query = format_query_to_char(name, value, ["release__display_name"]) return queryset.filter(query) @@ -73,81 +69,84 @@ class ProcessViewSet(viewsets.ModelViewSet): ordering = ["-created_at"] def create(self, request): - print("USER: ", request.user) - print("PROCESS: ", request.data) - serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) try: + logger.debug(f"Create DB process: {request.data}") instance = self.perform_create(serializer) - - print("INSTANCE: ", instance) - process = Process.objects.get(pk=instance.pk) process.save() - except Exception as e: - content = {"error": str(e)} + logger.debug(f"Process ID {instance.pk} inserted.") + except Exception as err: + content = {"error": str(err)} + logger.error(err) return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR) try: - maestro = Maestro(url=settings.ORCHEST_URL) + orch_url = settings.ORCHEST_URL + logger.debug(f"Instantiating maestro: {orch_url}") + maestro = Maestro(url=orch_url) release_path = None + release_index_col = None + if process.release: - release_path = str(pathlib.Path( - settings.DATASETS_DIR, process.release.name - )) + release_path = str( + pathlib.Path(settings.DATASETS_DIR, process.release.name) + ) + release_index_col = process.release.indexing_column + logger.debug(f"Release: {process.release}") used_config = {} if process.used_config: used_config = process.used_config + + logger.debug(f"Config: {used_config}") _inputs = process.inputs.all() - print("INPUTS: ", _inputs) - inputfiles = [] for _input in _inputs: - print("INPUT: ", _input) main_file = _input.files.get(role=0) - filepath = pathlib.Path(settings.MEDIA_ROOT, _input.path, main_file.name) - print("FILEPATH: ", filepath) + filepath = pathlib.Path( + settings.MEDIA_ROOT, _input.path, main_file.name + ) - ra = self.__get_mapped_column(_input, 'RA') - dec = self.__get_mapped_column(_input, 'Dec') + ra = self.__get_mapped_column(_input, "RA") + dec = self.__get_mapped_column(_input, "Dec") + z = self.__get_mapped_column(_input, "z") - _file = {'path': str(filepath), 'columns': {'ra': ra, 'dec': dec }} + _file = {"path": str(filepath), "columns": {"ra": ra, "dec": dec, "z": z}} inputfiles.append(_file) - used_config['inputs'] = { - 'dataset': {'path': release_path}, - 'specz': inputfiles + used_config["inputs"] = { + "dataset": {"path": release_path, "columns": {"id": release_index_col}}, + "specz": inputfiles, } - print("USED CONFIG: ", used_config) + logger.debug(f"Inputs: {used_config.get('inputs')}") - orchestration_process = maestro.start( - pipeline=process.pipeline.name, - config=used_config + orch_process = maestro.start( + pipeline=process.pipeline.name, config=used_config ) + logger.debug(f"Process submitted: ORCH_ID {process.orchestration_process_id}") - print("ORCHESTRATION PROCESS: ", orchestration_process) - - process.orchestration_process_id = orchestration_process.get('id') - process.used_config = json.loads( - orchestration_process.get('used_config', None) - ) - process.path = orchestration_process.get('path_str') + process.orchestration_process_id = orch_process.get("id") + process.used_config = json.loads(orch_process.get("used_config", None)) + process.path = orch_process.get("path_str") process.save() + data = self.get_serializer(instance=process).data return Response(data, status=status.HTTP_201_CREATED) except Exception as e: - content = {"error": f"Orchestration API failure: {str(e)}"} + msg = f"Orchestration API failure: {str(e)}" + logger.error(msg) + content = {"error": msg} return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def __get_mapped_column(self, product, column): - """ Get mapped column by column name + """Get mapped column by column name Args: product (Product): Product object @@ -171,22 +170,22 @@ def perform_create(self, serializer): owned_by = self.request.user - #TODO: testar path pro release - + # TODO: testar path pro release + upload = self.create_initial_upload(serializer, owned_by) return serializer.save(user=owned_by, upload=upload) def create_initial_upload(self, serializer, user): """_summary_""" data = serializer.initial_data - pipeline = Pipeline.objects.get(pk=data.get('pipeline')) + pipeline = Pipeline.objects.get(pk=data.get("pipeline")) upload_data = { "display_name": data.get("display_name"), "release": data.get("release", None), "pz_code": data.get("pz_code", None), "official_product": data.get("official_product", False), "description": data.get("description", None), - "product_type": pipeline.output_product_type.pk, # type: ignore + "product_type": pipeline.output_product_type.pk, # type: ignore } product = CreateProduct(upload_data, user) check_prodtype = product.check_product_types() @@ -202,9 +201,9 @@ def api_schema(self, request): meta = self.metadata_class() data = meta.determine_metadata(request, self) return Response(data) - + @action(methods=["GET"], detail=True) - def stop(self, request): + def stop(self, request, *args, **kwargs): try: instance = self.get_object() _id = instance.pk @@ -215,7 +214,8 @@ def stop(self, request): raise ValueError(f"Process[{_id}]: orchestration process not found.") maestro = Maestro(url=settings.ORCHEST_URL) - orcdata = maestro.stop(orchestration_process_id) + maestro.stop(orchestration_process_id) + orcdata = maestro.status(orchestration_process_id) process.status = orcdata.get("status", "Stopping*") process.save() data = self.get_serializer(instance=process).data @@ -223,12 +223,12 @@ def stop(self, request): except Exception as err: data = {"error": str(err)} code_status = status.HTTP_500_INTERNAL_SERVER_ERROR - + logger.info("Process[%s]: %s", str(process), data) return Response(data, status=code_status) def destroy(self, request, pk=None, *args, **kwargs): - """Product can only be deleted by the OWNER or if the user + """Product can only be deleted by the OWNER or if the user has an admin profile. """ @@ -236,4 +236,4 @@ def destroy(self, request, pk=None, *args, **kwargs): if instance.can_delete(self.request.user): return super(ProcessViewSet, self).destroy(request, pk, *args, **kwargs) else: - raise exceptions.PermissionDenied() \ No newline at end of file + raise exceptions.PermissionDenied() diff --git a/backend/core/views/product.py b/backend/core/views/product.py index 22e3ee5..65cf08c 100644 --- a/backend/core/views/product.py +++ b/backend/core/views/product.py @@ -34,15 +34,15 @@ class ProductFilter(filters.FilterSet): class Meta: model = Product - fields = [ - "internal_name", - "display_name", - "release", - "product_type", - "official_product", - "status", - "user", - ] + fields = { + "internal_name": ["exact", "in"], + "display_name": ["exact", "in"], + "release": ["exact", "in"], + "product_type": ["exact", "in"], + "official_product": ["exact", "in"], + "status": ["exact", "in"], + "user": ["exact", "in"], + } def filter_user(self, queryset, name, value): query = format_query_to_char( @@ -57,12 +57,12 @@ def filter_name(self, queryset, name, value): return queryset.filter(query) def filter_type_name(self, queryset, name, value): - query = format_query_to_char(name, value, ["product_type__display_name"]) + query = format_query_to_char(name, value, ["product_type__display_name", "product_type__name"]) return queryset.filter(query) def filter_release(self, queryset, name, value): - query = format_query_to_char(name, value, ["release__display_name"]) + query = format_query_to_char(name, value, ["release__display_name", "release__name"]) return queryset.filter(query) @@ -86,8 +86,8 @@ class ProductViewSet(viewsets.ModelViewSet): def create(self, request): - print('PRODUCT -> ', request.data) - print('PRODUCT (type) -> ', type(request.data)) + print("PRODUCT -> ", request.data) + print("PRODUCT (type) -> ", type(request.data)) print("USER -> ", request.user) print("USER (type) -> ", type(request.user)) diff --git a/backend/pzserver/celery.py b/backend/pzserver/celery.py index 4a547ca..19d0ace 100644 --- a/backend/pzserver/celery.py +++ b/backend/pzserver/celery.py @@ -3,15 +3,15 @@ from celery import Celery # Set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pzserver.settings') +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pzserver.settings") -app = Celery('orchestration') +app = Celery("orchestration") # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. -app.config_from_object('django.conf:settings', namespace='CELERY') +app.config_from_object("django.conf:settings", namespace="CELERY") # https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html app.conf.beat_schedule = { @@ -19,12 +19,17 @@ "task": "core.tasks.check_processes_finish", "schedule": 60.0, }, + "check-stopping": { + "task": "core.tasks.check_stopping", + "schedule": 120.0, + }, } app.conf.timezone = "UTC" # Load task modules from all registered Django apps. app.autodiscover_tasks() + @app.task(bind=True, ignore_result=True) def debug_task(self): - print(f'Request: {self.request!r}') \ No newline at end of file + print(f"Request: {self.request!r}") diff --git a/orchestration/pipelines b/orchestration/pipelines index bffeb8a..04e7708 160000 --- a/orchestration/pipelines +++ b/orchestration/pipelines @@ -1 +1 @@ -Subproject commit bffeb8a6fc06fc0b77fb98a5c4ce41a50bebc5aa +Subproject commit 04e7708704831e60eb50ee3b680159c8b0db139c