diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index d8ae63b..1e91e4d 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -9,6 +9,9 @@ services: - ..:/workspaces:cached - ./archive/log/backend:/archive/log - ./archive/data:/archive/data + - ./orchestration/pipelines:/pipelines + - ./orchestration/processes:/processes + - ./orchestration/datasets:/datasets # Overrides default command so things don't shut down after the process ends. command: sleep infinity diff --git a/.gitignore b/.gitignore index 0756aee..3ccd2ab 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,11 @@ nginx.conf # Docker Compose docker-compose.yml .env.local + +# Orchestration +orchestration/db +orchestration/processes +orchestration/logs +orchestration/rabbitmq/* +!orchestration/rabbitmq/enabled_plugins + diff --git a/README.md b/README.md index f3c26d5..697fb4d 100644 --- a/README.md +++ b/README.md @@ -98,9 +98,51 @@ In the development environment it is not necessary to change Ngnix settings. But if a local change is needed, copy the `nginx_development.conf` file to `nginx.conf` Also change the `docker-compose.yml` file in the ngnix service at the line `- ./nginx_development.conf:/etc/nginx/conf.d/default.conf:ro`. In this way, the ngnix.conf file represents your local environment, if you make any modifications that are necessary for the project, copy this modification to the template file, as the nginx.conf file is not part of the repository. + +### Orchestration setup + +The Pz Server uses [orchestration](https://github.com/linea-it/orchestration/) to process its pipelines and for this you need to configure it: + +``` bash +mkdir orchestration/db orchestration/logs orchestration/processes +``` + +The next step is to add a virtual host to your local machine. On Linux, this must be done by adding the line `127.0.0.1 orchestration` in the `/etc/host`. The file should look like this: + +``` bash +127.0.0.1 localhost +127.0.0.1 orchestration + +# The following lines are desirable for IPv6 capable hosts +::1 ip6-localhost ip6-loopback +fe00::0 ip6-localnet +ff00::0 ip6-mcastprefix +ff02::1 ip6-allnodes +ff02::2 ip6-allrouters +``` + +Start the orchestration with the command: + +``` bash +docker-compose up orchestration +``` + +And follow the procedure to add an authentication app in this [link](https://github.com/linea-it/orchestration?tab=readme-ov-file#how-to-use-using-client-credential). But be careful because when integrating with the Pz Server, the orchestration will have a different url than `http://localhost`, in this case it will be [http://orchestration/admin/oauth2_provider/application/add/](http://orchestration/admin/oauth2_provider/application/add/). + +Another important detail is that the `CLIENT ID` and `SECRET KEY` value from the previous procedure must be changed in the `.env` of the Pz Server, looking similar to this: + +``` bash +# Client ID and Client Secret must be registered in Django Admin +# after backend Setup, in the Django Oauth Applications interface +ORC_CLIENT_ID=wD85gkYeqGEQvVWv5o3Cx6ppBlfDl2S88dek8Exp +ORC_CLIENT_SECRET=eM2dhhxa2vovfaAXmMwqR1M8TdGhVmBjT7co5uaA9pI4aKPDZGxtBtDG5LHfhHvZUabbSP5aUDRpTLpUJAiGS0ScNuhktbuCwuSPiz0bmEftEROJ3ZzzKp2aDNO7Vx0k +``` + +This is enough to have orchestration working with an image pinned to `orchestration/docker-compose.yml`. If you want to change the orchestration version, just change the image in `orchestration/docker-compose.yml` + Once this is done, the development environment setup process is complete. -Finally, to start the whole application: +Finally, to start the whole application: ``` bash docker-compose up @@ -125,6 +167,7 @@ Go to Django ADMIN (for local installation, open a web browser and go to the URL The installation is done, you can now test the newly configured application. + ### Some example commands Turn on background environment (if you have the application already running on the terminal, stop it with `CTRL + C` keys and up ir again, but in the background using `-d` argument): diff --git a/backend/core/admin.py b/backend/core/admin.py index fe802c5..96e783a 100644 --- a/backend/core/admin.py +++ b/backend/core/admin.py @@ -1,11 +1,18 @@ -from core.models import (Product, ProductContent, ProductFile, ProductType, - Profile, Release) +from core.models import (Pipeline, Process, Product, ProductContent, + ProductFile, ProductType, Profile, Release) from django import forms from django.contrib import admin from django.contrib.auth.admin import UserAdmin from django.contrib.auth.models import User +@admin.register(Process) +class ProcessAdmin(admin.ModelAdmin): + list_display = ("id", "pipeline", "status", "user", "created_at") + exclude = ["path"] + search_fields = ("pipeline", "status") + + @admin.register(ProductType) class ProductTypeAdmin(admin.ModelAdmin): list_display = ("id", "name", "display_name", "created_at") @@ -20,6 +27,13 @@ class ReleaseAdmin(admin.ModelAdmin): search_fields = ("name", "display_name") +@admin.register(Pipeline) +class PipelineAdmin(admin.ModelAdmin): + list_display = ("id", "name", "display_name", "created_at") + + search_fields = ("name", "display_name") + + class ProductAdminForm(forms.ModelForm): class Meta: model = Product diff --git a/backend/core/migrations/0029_pipeline.py b/backend/core/migrations/0029_pipeline.py new file mode 100644 index 0000000..d5ced5f --- /dev/null +++ b/backend/core/migrations/0029_pipeline.py @@ -0,0 +1,25 @@ +# Generated by Django 5.0.6 on 2024-05-27 21:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0028_productfile_created_productfile_updated'), + ] + + operations = [ + migrations.CreateModel( + name='Pipeline', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(max_length=255)), + ('display_name', models.CharField(max_length=255)), + ('version', models.CharField(max_length=55)), + ('description', models.TextField(blank=True, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('system_config', models.JSONField(blank=True, null=True)), + ], + ), + ] diff --git a/backend/core/migrations/0030_process.py b/backend/core/migrations/0030_process.py new file mode 100644 index 0000000..9f3217b --- /dev/null +++ b/backend/core/migrations/0030_process.py @@ -0,0 +1,35 @@ +# Generated by Django 5.0.6 on 2024-05-28 15:39 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0029_pipeline'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='Process', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('pipeline_version', models.CharField(blank=True, default=None, max_length=255, null=True)), + ('used_config', models.JSONField(blank=True, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('started_at', models.DateTimeField(blank=True, null=True)), + ('ended_at', models.DateTimeField(blank=True, null=True)), + ('task_id', models.CharField(blank=True, default=None, max_length=255, null=True)), + ('status', models.CharField(default='Pending', max_length=55)), + ('path', models.FilePathField(blank=True, default=None, null=True, verbose_name='Path')), + ('comment', models.TextField(blank=True, null=True)), + ('inputs', models.ManyToManyField(related_name='processes', to='core.product')), + ('pipeline', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='processes', to='core.pipeline')), + ('release', models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='processes', to='core.release')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='processes', to=settings.AUTH_USER_MODEL)), + ], + ), + ] diff --git a/backend/core/migrations/0031_pipeline_product_types_accepted.py b/backend/core/migrations/0031_pipeline_product_types_accepted.py new file mode 100644 index 0000000..10b436b --- /dev/null +++ b/backend/core/migrations/0031_pipeline_product_types_accepted.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0.6 on 2024-05-29 15:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0030_process'), + ] + + operations = [ + migrations.AddField( + model_name='pipeline', + name='product_types_accepted', + field=models.ManyToManyField(related_name='pipelines', to='core.producttype'), + ), + ] diff --git a/backend/core/migrations/0032_process_upload_alter_process_inputs.py b/backend/core/migrations/0032_process_upload_alter_process_inputs.py new file mode 100644 index 0000000..958e6d8 --- /dev/null +++ b/backend/core/migrations/0032_process_upload_alter_process_inputs.py @@ -0,0 +1,25 @@ +# Generated by Django 5.0.6 on 2024-05-29 17:45 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0031_pipeline_product_types_accepted'), + ] + + operations = [ + migrations.AddField( + model_name='process', + name='upload', + field=models.ForeignKey(default=1, on_delete=django.db.models.deletion.CASCADE, related_name='upload', to='core.product'), + preserve_default=False, + ), + migrations.AlterField( + model_name='process', + name='inputs', + field=models.ManyToManyField(related_name='inputs', to='core.product'), + ), + ] diff --git a/backend/core/migrations/0033_alter_process_upload.py b/backend/core/migrations/0033_alter_process_upload.py new file mode 100644 index 0000000..ed99e64 --- /dev/null +++ b/backend/core/migrations/0033_alter_process_upload.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0.6 on 2024-05-29 17:55 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0032_process_upload_alter_process_inputs'), + ] + + operations = [ + migrations.AlterField( + model_name='process', + name='upload', + field=models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='upload', to='core.product'), + ), + ] diff --git a/backend/core/migrations/0034_process_upload_product_type.py b/backend/core/migrations/0034_process_upload_product_type.py new file mode 100644 index 0000000..43af886 --- /dev/null +++ b/backend/core/migrations/0034_process_upload_product_type.py @@ -0,0 +1,20 @@ +# Generated by Django 5.0.6 on 2024-05-29 22:01 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0033_alter_process_upload'), + ] + + operations = [ + migrations.AddField( + model_name='process', + name='upload_product_type', + field=models.ForeignKey(default=2, on_delete=django.db.models.deletion.CASCADE, related_name='upload_product_type', to='core.producttype'), + preserve_default=False, + ), + ] diff --git a/backend/core/migrations/0035_remove_process_upload_product_type_and_more.py b/backend/core/migrations/0035_remove_process_upload_product_type_and_more.py new file mode 100644 index 0000000..074ea3e --- /dev/null +++ b/backend/core/migrations/0035_remove_process_upload_product_type_and_more.py @@ -0,0 +1,30 @@ +# Generated by Django 5.0.6 on 2024-05-31 14:53 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0034_process_upload_product_type'), + ] + + operations = [ + migrations.RemoveField( + model_name='process', + name='upload_product_type', + ), + migrations.AddField( + model_name='pipeline', + name='output_product_type', + field=models.ForeignKey(default=2, on_delete=django.db.models.deletion.CASCADE, related_name='output_product_type', to='core.producttype'), + preserve_default=False, + ), + migrations.AddField( + model_name='process', + name='display_name', + field=models.CharField(default='Test DN', max_length=255), + preserve_default=False, + ), + ] diff --git a/backend/core/models/__init__.py b/backend/core/models/__init__.py index 4a0bc9f..f4fcd78 100644 --- a/backend/core/models/__init__.py +++ b/backend/core/models/__init__.py @@ -1,6 +1,8 @@ from core.models.release import Release from core.models.product_type import ProductType -from core.models.product import Product +from core.models.product import Product, ProductStatus from core.models.product_content import ProductContent from core.models.product_file import ProductFile from core.models.user_profile import Profile +from core.models.pipeline import Pipeline +from core.models.process import Process diff --git a/backend/core/models/pipeline.py b/backend/core/models/pipeline.py new file mode 100644 index 0000000..ffdf6d1 --- /dev/null +++ b/backend/core/models/pipeline.py @@ -0,0 +1,24 @@ +from core.models import ProductType +from django.db import models + + +class Pipeline(models.Model): + + name = models.CharField(max_length=255) + display_name = models.CharField(max_length=255) + version = models.CharField(max_length=55) + description = models.TextField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + system_config = models.JSONField(null=True, blank=True) + product_types_accepted= models.ManyToManyField( + ProductType, related_name="pipelines" + ) + output_product_type = models.ForeignKey( + ProductType, + on_delete=models.CASCADE, + related_name="output_product_type", + ) + + def __str__(self): + return f"{self.display_name}" + \ No newline at end of file diff --git a/backend/core/models/process.py b/backend/core/models/process.py new file mode 100644 index 0000000..6226ccc --- /dev/null +++ b/backend/core/models/process.py @@ -0,0 +1,64 @@ +import pathlib +import shutil + +from core.models import Pipeline, Product, ProductStatus, Release +from django.conf import settings +from django.contrib.auth.models import User +from django.db import models + + +class Process(models.Model): + display_name = models.CharField(max_length=255) + pipeline = models.ForeignKey( + Pipeline, on_delete=models.CASCADE, related_name="processes" + ) + pipeline_version = models.CharField( + max_length=255, null=True, blank=True, default=None + ) + used_config = models.JSONField(null=True, blank=True) + inputs = models.ManyToManyField(Product, related_name="inputs") + release = models.ForeignKey( + Release, + on_delete=models.CASCADE, + related_name="processes", + null=True, + blank=True, + default=None, + ) + upload = models.OneToOneField( + Product, + on_delete=models.CASCADE, + related_name="upload", + ) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="processes") + created_at = models.DateTimeField(auto_now_add=True) + started_at = models.DateTimeField(null=True, blank=True) + ended_at = models.DateTimeField(null=True, blank=True) + task_id = models.CharField(max_length=255, null=True, blank=True, default=None) + status = models.IntegerField( + verbose_name="Status", + default=ProductStatus.REGISTERING, + choices=ProductStatus.choices, + ) + path = models.FilePathField( + verbose_name="Path", null=True, blank=True, default=None + ) + comment = models.TextField(null=True, blank=True) + + def __str__(self): + return f"{self.pipeline}-{str(self.pk).zfill(8)}" + + def delete(self, *args, **kwargs): + process_path = pathlib.Path(settings.PROCESSING_DIR, str(self.path)) + if process_path.exists(): + self.rmtree(process_path) + + super().delete(*args, **kwargs) + + @staticmethod + def rmtree(process_path): + try: + # WARN: is not run by admin + shutil.rmtree(process_path) + except OSError as e: + raise OSError("Failed to remove directory: [ %s ] %s" % (process_path, e)) \ No newline at end of file diff --git a/backend/core/models/product.py b/backend/core/models/product.py index b576d10..f0db996 100644 --- a/backend/core/models/product.py +++ b/backend/core/models/product.py @@ -75,4 +75,5 @@ def can_delete(self, user) -> bool: def can_update(self, user) -> bool: if self.user.id == user.id or user.profile.is_admin(): return True - return False \ No newline at end of file + return False + \ No newline at end of file diff --git a/backend/core/pipeline_objects.py b/backend/core/pipeline_objects.py new file mode 100644 index 0000000..634c032 --- /dev/null +++ b/backend/core/pipeline_objects.py @@ -0,0 +1,48 @@ +from pathlib import Path + +from core.utils import get_pipeline, get_pipelines +from pydantic import BaseModel, validator + + +class Pipeline(): + def __init__(self): + self.__raw_pipelines = get_pipelines() + + def all(self): + pipelines = [] + for pipename, data in self.__raw_pipelines.items(): + data["name"] = pipename + pipelines.append(PipelineModel(**data)) + return pipelines + + def get(self, name): + data = self.__raw_pipelines.get("name", {}) + data["name"] = name + return PipelineModel(**data) + + +class PipelineModel(BaseModel): + name: str + path: str + executor: str + runner: str + executable: str + version: str + display_name: str | None + schema_config: str | None + + @validator('path', pre=True) + def validate_path(cls, value): + assert Path(value).is_dir(), f"Folder '{value}' not found." + return value + + @validator('schema_config', pre=True) + def validate_config(cls, value): + assert Path(value).is_file(), f"File '{value}' not found." + return value + + +if __name__ == "__main__": + from core.utils import get_pipeline + pipe_info = get_pipeline('cross_lsdb') + pipeline = Pipeline(**pipe_info) \ No newline at end of file diff --git a/backend/core/serializers/__init__.py b/backend/core/serializers/__init__.py index ec9acd3..78fb0f9 100644 --- a/backend/core/serializers/__init__.py +++ b/backend/core/serializers/__init__.py @@ -1,6 +1,8 @@ -from core.serializers.release import ReleaseSerializer -from core.serializers.product_type import ProductTypeSerializer +from core.serializers.pipeline import PipelineSerializer +from core.serializers.process import ProcessSerializer from core.serializers.product import ProductSerializer from core.serializers.product_content import ProductContentSerializer from core.serializers.product_file import ProductFileSerializer +from core.serializers.product_type import ProductTypeSerializer +from core.serializers.release import ReleaseSerializer from core.serializers.user import UserSerializer diff --git a/backend/core/serializers/pipeline.py b/backend/core/serializers/pipeline.py new file mode 100644 index 0000000..2cdd4c7 --- /dev/null +++ b/backend/core/serializers/pipeline.py @@ -0,0 +1,8 @@ +from core.models import Pipeline +from rest_framework import serializers + + +class PipelineSerializer(serializers.ModelSerializer): + class Meta: + model = Pipeline + fields = "__all__" diff --git a/backend/core/serializers/process.py b/backend/core/serializers/process.py new file mode 100644 index 0000000..3ffd00e --- /dev/null +++ b/backend/core/serializers/process.py @@ -0,0 +1,59 @@ +from core.models import Process, Product, Release +from rest_framework import serializers + + +class ProcessSerializer(serializers.ModelSerializer): + + release = serializers.PrimaryKeyRelatedField( + queryset=Release.objects.all(), many=False, allow_null=True, required=False + ) + # upload = serializers.PrimaryKeyRelatedField( + # queryset=Product.objects.all(), many=False + # ) + release_name = serializers.SerializerMethodField() + pipeline_name = serializers.SerializerMethodField() + pipeline_version = serializers.SerializerMethodField() + status = serializers.SerializerMethodField() + owned_by = serializers.SerializerMethodField() + is_owner = serializers.SerializerMethodField() + # can_delete = serializers.SerializerMethodField() + # can_update = serializers.SerializerMethodField() + + class Meta: + model = Process + read_only_fields = ("pipeline_version", "is_owner", "upload", "status") + exclude = ("user", "path") + + def get_pipeline_name(self, obj): + return obj.pipeline.name + + def get_pipeline_version(self, obj): + return obj.pipeline.version + + def get_status(self, obj): + return obj.upload.status + # return "REG" + + def get_release_name(self, obj): + try: + return obj.release.display_name + except: + return None + + def get_owned_by(self, obj): + return obj.user.username + + def get_is_owner(self, obj): + current_user = self.context["request"].user + if obj.user.pk == current_user.pk: + return True + else: + return False + + # def get_can_delete(self, obj): + # current_user = self.context["request"].user + # return obj.can_delete(current_user) + + # def get_can_update(self, obj): + # current_user = self.context["request"].user + # return obj.can_update(current_user) \ No newline at end of file diff --git a/backend/core/utils.py b/backend/core/utils.py new file mode 100644 index 0000000..85237bc --- /dev/null +++ b/backend/core/utils.py @@ -0,0 +1,93 @@ +import importlib +import importlib.util +import json +import logging +import pathlib +import sys + +import yaml +from django.conf import settings +from django.db.models import Q + +logger = logging.getLogger() + + +def get_pipelines(): + sys_pipes_file = pathlib.Path(settings.PIPELINES_DIR, 'pipelines.yaml') + with open(sys_pipes_file, encoding="utf-8") as _file: + return yaml.safe_load(_file) + + +def get_pipeline(name): + system_pipelines = get_pipelines() + pipeline = system_pipelines.get(name, None) + assert pipeline, f"Pipeline {name} not found." + pipeline['name'] = name + return pipeline + + +def load_config(schema_path, config={}): + mod = load_module_from_file(schema_path) + return mod.Config(**config) + + +def import_module(module): + return importlib.import_module(module) + + +def load_module_from_file(module): + spec = importlib.util.spec_from_file_location(f"{module}.mod", module) + assert spec, f"No module named '{module}'." + mod = importlib.util.module_from_spec(spec) + assert mod, f"Failed to import python module: {module}" + sys.modules[f"{module}.mod"] = mod + spec.loader.exec_module(mod) + return mod + + +def load_executor(executor): + assert validate_executor(executor), f"No executor named '{executor}'." + mod = import_module(f"core.executors.{executor}") + return getattr(mod, f"Executor{executor.capitalize()}") + + +def validate_executor(executor): + try: import_module(f"core.executors.{executor}") + except ModuleNotFoundError: return False + return True + + +def validate_json(data): + try: json.loads(data) + except ValueError: return False + return True + + +def validate_config(config): + if not config: return True + return validate_json(config) and isinstance(json.loads(config), dict) + + +def get_returncode(process_dir): + try: + with open(f"{process_dir}/return.code", encoding="utf-8") as _file: + content = _file.readline() + return int(content.replace('\n','')) + except Exception as err: + logger.error(f"Error when redeeming return code: {err}") + return -1 + + +def format_query_to_char(key, value, fields): + condition = Q.OR if key.endswith("__or") else Q.AND + values = value.split(",") + query = Q() + + for value in values: + subfilter = Q() + for field in fields: + subfilter.add(Q(**{f"{field}__icontains": value}), Q.OR) + + query.add(subfilter, condition) + + return query \ No newline at end of file diff --git a/backend/core/views/__init__.py b/backend/core/views/__init__.py index e63b8e6..bac1068 100644 --- a/backend/core/views/__init__.py +++ b/backend/core/views/__init__.py @@ -1,10 +1,9 @@ -from core.views.release import ReleaseViewSet -from core.views.product_type import ProductTypeViewSet +from core.views.pipeline import PipelineViewSet +from core.views.process import ProcessViewSet from core.views.product import ProductViewSet from core.views.product_content import ProductContentViewSet from core.views.product_file import ProductFileViewSet -from core.views.user import LoggedUserView -from core.views.user import GetToken -from core.views.user import CsrfToOauth -from core.views.user import Logout -from core.views.user import UserViewSet +from core.views.product_type import ProductTypeViewSet +from core.views.release import ReleaseViewSet +from core.views.user import (CsrfToOauth, GetToken, LoggedUserView, Logout, + UserViewSet) diff --git a/backend/core/views/create_product.py b/backend/core/views/create_product.py new file mode 100644 index 0000000..9d7f53f --- /dev/null +++ b/backend/core/views/create_product.py @@ -0,0 +1,155 @@ +import logging +import pathlib + +from core.models import Product +from core.serializers import ProductSerializer +from django.conf import settings + + +class CreateProduct: + + def __init__(self, data, user): + self.__log = logging.getLogger("create_product") + self.__log.debug(f"Creating product: {data}") + + serializer = ProductSerializer(data=data) + serializer.is_valid(raise_exception=True) + + self.__data = self.__perform_create(serializer, user) + self.__check_official_product(user) + + def save(self): + can_save = self.check_product_types() + + if not can_save.get("success"): + return can_save.get("message") + + self.__set_internal_name() + self.__create_product_path() + + self.__log.debug(f"Product ID {self.__data.pk} created") + + def __check_official_product(self, user): + """Checks if the product is official and if the user has permission + to save an official product. + + Args: + user (User): User object + + Raises: + ValueError: if the user no has permission + + Returns: + bool + """ + + is_official = self.__data.official_product + + if is_official: + if user.profile.is_admin() is False: + self.__delete() + raise ValueError( + "Not allowed. Only users with admin permissions " + "can create official products." + ) + + return True + + @property + def data(self): + return self.__data + + def get(self): + """Returns Product object + + Returns: + Product object + """ + return Product.objects.get(pk=self.__data.pk) + + def __set_internal_name(self): + """Sets the internal name based on the primary key and display name""" + + # change spaces to "_", convert to lowercase, remove trailing spaces. + name = self.__data.display_name.replace(" ", "_").lower().strip().strip("\n") + + # strip any non-alphanumeric character except "_" + name = "".join(e for e in name if e.isalnum() or e == "_") + self.__data.internal_name = f"{self.__data.pk}_{name}" + self.__data.save() + + def __create_product_path(self): + """Create product path""" + + # Create product path + relative_path = f"{self.__data.product_type.name}/{self.__data.internal_name}" + path = pathlib.Path(settings.MEDIA_ROOT, relative_path) + path.mkdir(parents=True, exist_ok=True) + + self.__data.path = relative_path + self.__data.save() + + def check_product_types(self): + """Checks product types by applying a certain business rule. + + Returns: + dict: {'message': {'entity':list(str)}, 'status': bool} + """ + + if not self.__data: + return {"message": {"product": ["No data."]}, "success": False,} + + # Release is not allowed in Spec-z Catalog + if ( + self.__data.release + and self.__data.product_type.name == "specz_catalog" + ): + self.__delete() + return { + "message": {"release": [ + "Release must be null on Spec-z Catalogs products." + ]}, "success": False, + } + + # Pzcode is only allowed in Validations Results and Photo-z Table + if self.__data.pz_code and self.__data.product_type.name in ( + "training_set", + "specz_catalog", + ): + dn = self.__data.product_type.display_name + pzc = self.__data.pz_code + self.__delete() + return { + "message": {"pz_code": [ + f"Pz Code must be null on {dn} products. '{pzc}'" + ]}, "success": False, + } + + return {"message": {"product_type": ["Success!"]}, "success": True,} + + def __perform_create(self, serializer, user): + """Add user""" + + uploaded_by = user + return serializer.save(user=uploaded_by) + + def __delete(self): + """Delete product""" + + if self.__data: + self.__data.path = f"{settings.MEDIA_ROOT}/{self.__data.path}" + self.__data.delete() + self.__data = None + + + + + + + + + + + + + diff --git a/backend/core/views/pipeline.py b/backend/core/views/pipeline.py new file mode 100644 index 0000000..814c3aa --- /dev/null +++ b/backend/core/views/pipeline.py @@ -0,0 +1,26 @@ +from core import models +from core.serializers import PipelineSerializer +from rest_framework import viewsets +from rest_framework.decorators import action +from rest_framework.response import Response + + +class PipelineViewSet(viewsets.ReadOnlyModelViewSet): + queryset = models.Pipeline.objects.all() + serializer_class = PipelineSerializer + filterset_fields = [ + "id", + "name", + ] + search_fields = [ + "display_name", + "description", + ] + ordering = ["-created_at"] + + @action(methods=["GET"], detail=True) + def api_schema(self, request): + meta = self.metadata_class() + data = meta.determine_metadata(request, self) + return Response(data) + \ No newline at end of file diff --git a/backend/core/views/process.py b/backend/core/views/process.py new file mode 100644 index 0000000..c0e515e --- /dev/null +++ b/backend/core/views/process.py @@ -0,0 +1,133 @@ +from core.models import Pipeline, Process +from core.serializers import ProcessSerializer +from core.utils import format_query_to_char +from core.views.create_product import CreateProduct +from django_filters import rest_framework as filters +from rest_framework import exceptions, status, viewsets +from rest_framework.decorators import action +from rest_framework.response import Response + + +class ProcessFilter(filters.FilterSet): + release__isnull = filters.BooleanFilter( + field_name="release", lookup_expr="isnull") + pipeline__or = filters.CharFilter(method="filter_type_name") + pipeline = filters.CharFilter(method="filter_type_name") + release_name__or = filters.CharFilter(method="filter_release") + release_name = filters.CharFilter(method="filter_release") + + class Meta: + model = Process + fields = [ + "pipeline", + "status", + "release", + "user", + ] + + def filter_user(self, queryset, name, value): + query = format_query_to_char( + name, value, + ["user__username", "user__first_name", "user__last_name"] + ) + + return queryset.filter(query) + + def filter_pipeline(self, queryset, name, value): + query = format_query_to_char( + name, value, + ["pipeline__display_name", "pipeline__name"] + ) + + return queryset.filter(query) + + def filter_release(self, queryset, name, value): + query = format_query_to_char( + name, value, ["release__display_name"]) + return queryset.filter(query) + + +class ProcessViewSet(viewsets.ModelViewSet): + queryset = Process.objects.all() + serializer_class = ProcessSerializer + search_fields = [ + "pipeline_name", + "pipeline_display_name", + "user__username", + "user__first_name", + "user__last_name", + ] + filterset_class = ProcessFilter + ordering_fields = [ + "id", + "created_at", + ] + ordering = ["-created_at"] + + def create(self, request): + print("USER: ", request.user) + print("PROCESS: ", request.data) + + serializer = self.get_serializer(data=request.data) + serializer.is_valid(raise_exception=True) + + try: + instance = self.perform_create(serializer) + + print("INSTANCE: ", instance) + print("INSTANCE type: ", type(instance)) + + process = Process.objects.get(pk=instance.pk) + process.save() + + data = self.get_serializer(instance=process).data + return Response(data, status=status.HTTP_201_CREATED) + + except Exception as e: + content = {"error": str(e)} + return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + def perform_create(self, serializer): + """Add user and upload""" + + owned_by = self.request.user + upload = self.create_initial_upload(serializer, owned_by) + return serializer.save(user=owned_by, upload=upload) + + def create_initial_upload(self, serializer, user): + """_summary_""" + data = serializer.initial_data + pipeline = Pipeline.objects.get(pk=data.get('pipeline')) + upload_data = { + "display_name": data.get("display_name"), + "release": data.get("release", None), + "pz_code": data.get("pz_code", None), + "official_product": data.get("official_product", False), + "description": data.get("description", None), + "product_type": pipeline.output_product_type.pk, + } + product = CreateProduct(upload_data, user) + check_prodtype = product.check_product_types() + + if not check_prodtype.get("success"): + raise ValueError(check_prodtype.get("message")) + + product.save() + return product.data + + @action(methods=["GET"], detail=True) + def api_schema(self, request): + meta = self.metadata_class() + data = meta.determine_metadata(request, self) + return Response(data) + + def destroy(self, request, pk=None, *args, **kwargs): + """Product can only be deleted by the OWNER or if the user + has an admin profile. + """ + + instance = self.get_object() + if instance.can_delete(self.request.user): + return super(ProcessViewSet, self).destroy(request, pk, *args, **kwargs) + else: + raise exceptions.PermissionDenied() \ No newline at end of file diff --git a/backend/core/views/product.py b/backend/core/views/product.py index 7d7cc4e..1aa6c28 100644 --- a/backend/core/views/product.py +++ b/backend/core/views/product.py @@ -4,30 +4,27 @@ import secrets import tempfile import zipfile -from json import dumps, loads +from json import loads from pathlib import Path -import pandas as pd from core.models import Product -from core.pagination import CustomPageNumberPagination from core.product_handle import FileHandle, NotTableError -from core.serializers import ProductContentSerializer, ProductSerializer +from core.serializers import ProductSerializer +from core.utils import format_query_to_char +from core.views.create_product import CreateProduct from core.views.registry_product import RegistryProduct from django.conf import settings -from django.contrib.auth.models import User from django.core.paginator import Paginator from django.db.models import Q -from django.http import FileResponse, JsonResponse +from django.http import FileResponse from django_filters import rest_framework as filters from rest_framework import exceptions, status, viewsets from rest_framework.decorators import action -from rest_framework.pagination import PageNumberPagination from rest_framework.response import Response class ProductFilter(filters.FilterSet): - release__isnull = filters.BooleanFilter( - field_name="release", lookup_expr="isnull") + release__isnull = filters.BooleanFilter(field_name="release", lookup_expr="isnull") uploaded_by__or = filters.CharFilter(method="filter_user") uploaded_by = filters.CharFilter(method="filter_user") product_type_name__or = filters.CharFilter(method="filter_type_name") @@ -50,44 +47,26 @@ class Meta: ] def filter_user(self, queryset, name, value): - query = self.format_query_to_char( - name, value, ["user__username", - "user__first_name", "user__last_name"] + query = format_query_to_char( + name, value, ["user__username", "user__first_name", "user__last_name"] ) return queryset.filter(query) def filter_name(self, queryset, name, value): - query = self.format_query_to_char(name, value, ["display_name"]) + query = format_query_to_char(name, value, ["display_name"]) return queryset.filter(query) def filter_type_name(self, queryset, name, value): - query = self.format_query_to_char( - name, value, ["product_type__display_name"]) + query = format_query_to_char(name, value, ["product_type__display_name"]) return queryset.filter(query) def filter_release(self, queryset, name, value): - query = self.format_query_to_char( - name, value, ["release__display_name"]) + query = format_query_to_char(name, value, ["release__display_name"]) return queryset.filter(query) - @staticmethod - def format_query_to_char(key, value, fields): - condition = Q.OR if key.endswith("__or") else Q.AND - values = value.split(",") - query = Q() - - for value in values: - subfilter = Q() - for field in fields: - subfilter.add(Q(**{f"{field}__icontains": value}), Q.OR) - - query.add(subfilter, condition) - - return query - class ProductViewSet(viewsets.ModelViewSet): queryset = Product.objects.all() @@ -108,90 +87,23 @@ class ProductViewSet(viewsets.ModelViewSet): ordering = ["-created_at"] def create(self, request): - serializer = self.get_serializer(data=request.data) - serializer.is_valid(raise_exception=True) - instance = self.perform_create(serializer) - try: - product = Product.objects.get(pk=instance.pk) + product = CreateProduct(request.data, request.user) + check_prodtype = product.check_product_types() - # Verifica se o produto é oficial, - # Apenas user que fazem parte do Group=Admin podem criar produtos oficiais. - if product.official_product is True: - if request.user.profile.is_admin() is False: - return Response( - { - "error": "Not allowed. Only users with admin permissions can create official products." - }, - status=status.HTTP_403_FORBIDDEN, - ) - - # Cria um internal name - name = self.get_internal_name(product.display_name) - product.internal_name = f"{product.pk}_{name}" - - # Cria um path para o produto - relative_path = f"{product.product_type.name}/{product.internal_name}" - # TODO: Talves mover a criação do path do produto para a parte do upload dos arquivos. - path = pathlib.Path(settings.MEDIA_ROOT, relative_path) - path.mkdir(parents=True, exist_ok=True) - - product.path = relative_path - - # Verificar campos relacionados ao Produt Type. - - # Release is not allowed in Spec-z Catalog - if ( - product.release - and product.product_type.name == "specz_catalog" - ): + if not check_prodtype.get("success"): return Response( - {"release": [ - "Release must be null on Spec-z Catalogs products."]}, - status=status.HTTP_400_BAD_REQUEST, - ) - - # Pzcode is only allowed in Validations Results and Photo-z Table - if product.pz_code and product.product_type.name in ( - "training_set", - "specz_catalog", - ): - return Response( - { - "pz_code": [ - f"Pz Code must be null on {product.product_type.display_name} products. '{product.pz_code}'" - ] - }, - status=status.HTTP_400_BAD_REQUEST, + check_prodtype.get("message"), status=status.HTTP_400_BAD_REQUEST ) product.save() - - data = self.get_serializer(instance=product).data + data = self.get_serializer(instance=product.data).data return Response(data, status=status.HTTP_201_CREATED) except Exception as e: content = {"error": str(e)} return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - def perform_create(self, serializer): - """Create user and add internal_name""" - - uploaded_by = self.request.user - return serializer.save(user=uploaded_by) - - def get_internal_name(self, display_name): - """ - Creates an internal name without special characters or spaces. - The internal name can be used for paths, urls and tablenames. - """ - - # change spaces to "_", convert to lowercase, remove trailing spaces. - name = display_name.replace(" ", "_").lower().strip().strip("\n") - - # strip any non-alphanumeric character except "_" - return "".join(e for e in name if e.isalnum() or e == "_") - @action(methods=["GET"], detail=True) def download(self, request, **kwargs): """Download product""" @@ -201,7 +113,8 @@ def download(self, request, **kwargs): with tempfile.TemporaryDirectory() as tmpdirname: # Cria um arquivo zip no diretório tmp com os arquivos do produto zip_file = self.zip_product( - product.internal_name, product.path, tmpdirname) + product.internal_name, product.path, tmpdirname + ) # Abre o arquivo e envia em bites para o navegador mimetype, _ = mimetypes.guess_type(zip_file) @@ -211,8 +124,7 @@ def download(self, request, **kwargs): file_handle = open(zip_file, "rb") response = FileResponse(file_handle, content_type=mimetype) response["Content-Length"] = size - response["Content-Disposition"] = "attachment; filename={}".format( - name) + response["Content-Disposition"] = "attachment; filename={}".format(name) return response except Exception as e: content = {"error": str(e)} @@ -238,8 +150,7 @@ def download_main_file(self, request, **kwargs): response = FileResponse(file_handle, content_type=mimetype) response["Content-Length"] = size - response["Content-Disposition"] = "attachment; filename={}".format( - name) + response["Content-Disposition"] = "attachment; filename={}".format(name) return response except Exception as e: content = {"error": str(e)} @@ -247,8 +158,8 @@ def download_main_file(self, request, **kwargs): @action(methods=["GET"], detail=True) def read_data(self, request, **kwargs): - page = int(request.GET.get('page', 1)) - page_size = int(request.GET.get('page_size', 100)) + page = int(request.GET.get("page", 1)) + page_size = int(request.GET.get("page_size", 100)) product = self.get_object() product_file = product.files.get(role=0) @@ -256,18 +167,20 @@ def read_data(self, request, **kwargs): try: df = FileHandle(main_file_path).to_df() - records = loads(df.to_json(orient='records')) + records = loads(df.to_json(orient="records")) paginator = Paginator(records, page_size) records = paginator.get_page(page) - return Response({ - 'count': df.shape[0], - 'columns': df.columns, - 'results': records.object_list}) + return Response( + { + "count": df.shape[0], + "columns": df.columns, + "results": records.object_list, + } + ) except NotTableError as e: - content = { - "message": "Table preview not available for this product type."} + content = {"message": "Table preview not available for this product type."} return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR) except Exception as e: content = {"message": str(e)} @@ -341,8 +254,7 @@ def pending_publication(self, request, **kwargs): try: # Procura por produtos criados pelo usuario que ainda não foram publicados - product = Product.objects.filter( - status=0, user_id=request.user.id).first() + product = Product.objects.filter(status=0, user_id=request.user.id).first() if product: # Retorna o produto @@ -358,8 +270,7 @@ def pending_publication(self, request, **kwargs): def zip_product(self, internal_name, path, tmpdir): product_path = pathlib.Path(settings.MEDIA_ROOT, path) - thash = ''.join(secrets.choice(secrets.token_hex(16)) - for i in range(5)) + thash = "".join(secrets.choice(secrets.token_hex(16)) for i in range(5)) zip_name = f"{internal_name}_{thash}.zip" zip_path = pathlib.Path(tmpdir, zip_name) @@ -379,9 +290,11 @@ def zip_product(self, internal_name, path, tmpdir): return zip_path def destroy(self, request, pk=None, *args, **kwargs): - """Produto só pode ser excluido pelo DONO ou se o usuario tiver profile de admin. + """Product can only be deleted by the OWNER or if the user has an + admin profile. """ - # Regra do admin atualizada na issue: #192 - https://github.com/linea-it/pzserver_app/issues/192 + # Regra do admin atualizada na issue: + # 192 - https://github.com/linea-it/pzserver_app/issues/192 instance = self.get_object() if instance.can_delete(self.request.user): return super(ProductViewSet, self).destroy(request, pk, *args, **kwargs) diff --git a/backend/core/views/registry_product.py b/backend/core/views/registry_product.py index bb84aed..8752d80 100644 --- a/backend/core/views/registry_product.py +++ b/backend/core/views/registry_product.py @@ -15,6 +15,7 @@ def __init__(self, product_id): self.log = self.get_log() self.log.info("----------------------------") + self.log.info("Product ID: [%s]" % product_id) self.product = Product.objects.get(pk=product_id) diff --git a/backend/pzserver/settings.py b/backend/pzserver/settings.py index 2e4ae38..f5613b0 100644 --- a/backend/pzserver/settings.py +++ b/backend/pzserver/settings.py @@ -299,3 +299,12 @@ }, }, } + +# directory where it will contain the processing of the pipelines. +PROCESSING_DIR = os.getenv("PROCESSING_DIR", "/processes") + +# directory where it will contain the source code of the pipelines. +PIPELINES_DIR = os.getenv("PIPELINES_DIR", "/pipelines") + +# directory where it will contain the datasets. +DATASETS_DIR = os.getenv("DATASETS_DIR", "/datasets") \ No newline at end of file diff --git a/backend/pzserver/urls.py b/backend/pzserver/urls.py index 9bbc18f..768a7ec 100644 --- a/backend/pzserver/urls.py +++ b/backend/pzserver/urls.py @@ -14,31 +14,22 @@ 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ # from core.api import viewsets as products_viewsets -from core.views import ( - CsrfToOauth, - GetToken, - LoggedUserView, - Logout, - ProductContentViewSet, - ProductFileViewSet, - ProductTypeViewSet, - ProductViewSet, - ReleaseViewSet, - UserViewSet, -) +from core.views import (CsrfToOauth, GetToken, LoggedUserView, Logout, + PipelineViewSet, ProcessViewSet, ProductContentViewSet, + ProductFileViewSet, ProductTypeViewSet, ProductViewSet, + ReleaseViewSet, UserViewSet) from django.contrib import admin from django.urls import include, path -from drf_spectacular.views import ( - SpectacularAPIView, - SpectacularRedocView, - SpectacularSwaggerView, -) +from drf_spectacular.views import (SpectacularAPIView, SpectacularRedocView, + SpectacularSwaggerView) from rest_framework import routers route = routers.DefaultRouter() route.register(r"users", UserViewSet, basename="users") route.register(r"releases", ReleaseViewSet, basename="releases") +route.register(r"pipelines", PipelineViewSet, basename="pipelines") +route.register(r"processes", ProcessViewSet, basename="processes") route.register(r"product-types", ProductTypeViewSet, basename="product_types") route.register(r"products", ProductViewSet, basename="products") route.register(r"product-contents", ProductContentViewSet, basename="product_contents") diff --git a/backend/requirements.txt b/backend/requirements.txt index b785cb4..8b598c1 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -27,3 +27,4 @@ social-auth-core==4.5.4 tables-io==0.9.6 tables==3.9.2 uWSGI==2.0.25.1 +pydantic==2.7.1 diff --git a/docker-compose-development.yml b/docker-compose-development.yml index 3696fdd..770ad80 100644 --- a/docker-compose-development.yml +++ b/docker-compose-development.yml @@ -40,6 +40,34 @@ services: stdin_open: true command: yarn dev + orchestration: + extends: + file: ./orchestration/docker-compose.yml + service: orchestration + + celery_local_worker: + extends: + file: ./orchestration/docker-compose.yml + service: celery_local_worker + + celery_flower: + extends: + file: ./orchestration/docker-compose.yml + service: celery_flower + + rabbitmq: + image: "rabbitmq:3.12.12-management" + hostname: "rabbitmq" + env_file: + - ./orchestration/.orchestration-env + ports: + - "15672:15672" + - "5672:5672" + volumes: + - "./orchestration/rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins" + - "./orchestration/rabbitmq/data/:/var/lib/rabbitmq/" + - "./orchestration/rabbitmq/log/:/var/log/rabbitmq/" + nginx: image: nginx:1.21.6-alpine ports: @@ -51,3 +79,6 @@ services: depends_on: - backend - frontend + - orchestration + - rabbitmq + - celery_flower \ No newline at end of file diff --git a/env_template b/env_template index 37e913f..1f01b94 100644 --- a/env_template +++ b/env_template @@ -47,4 +47,13 @@ DJANGO_CSRF_TRUSTED_ORIGINS=http://localhost http://127.0.0.1 # Shibboleth / Satosa Auth # Url para login utilizando Shibboleth -# AUTH_SHIB_URL= \ No newline at end of file +# AUTH_SHIB_URL= + +# directory where it will contain the processing of the pipelines. +PROCESSING_DIR=/processes + +# directory where it will contain the source code of the pipelines. +PIPELINES_DIR=/pipelines + +# directory where it will contain the datasets. +DATASETS_DIR=/datasets \ No newline at end of file diff --git a/nginx_development.conf b/nginx_development.conf index 49d0f9c..1e5f1e6 100644 --- a/nginx_development.conf +++ b/nginx_development.conf @@ -2,13 +2,17 @@ upstream pzapi { server backend:8000; } +upstream orchestapi { + server orchestration:8000; +} + upstream pzfrontend { server frontend:3000; } server { - listen 8080; + server_name localhost; client_max_body_size 200M; @@ -25,7 +29,7 @@ server { fastcgi_read_timeout 120s; # access_log /var/log/nginx/host.access.log main; - + # Proxy pass to frontend development server with live relead # Based in this article: https://nathanfriend.io/2018/05/14/live-reloading-an-angular-2-app-behind-nginx.html location / { @@ -36,7 +40,7 @@ server { proxy_set_header X-Forwarded-For $remote_addr; proxy_set_header X-Real-IP $remote_addr; - proxy_set_header Host $host; + proxy_set_header Host $host; # live reload proxy_http_version 1.1; @@ -50,7 +54,7 @@ server { proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; - } + } location /api { include uwsgi_params; @@ -77,5 +81,41 @@ server { alias /var/www/coverage/; try_files $uri $uri/ /index.html; autoindex off; - } -} \ No newline at end of file + } + + # Rabbitmq Management + location /rabbitmq/ { + proxy_pass http://rabbitmq:15672/; + rewrite ^/rabbitmq/(.*)$ /$1 break; + } +} + +server { + listen 8080; + server_name orchestration; + + location /api { + include uwsgi_params; + uwsgi_pass orchestapi; + } + + location /admin { + include uwsgi_params; + uwsgi_pass orchestapi; + } + + location /o { + include uwsgi_params; + uwsgi_pass orchestapi; + } + + location /django_static { + include uwsgi_params; + uwsgi_pass orchestapi; + } + + # Celery Flower + location /flower { + proxy_pass http://celery_flower:5555; + } +} diff --git a/orchestration/.orchestration-env b/orchestration/.orchestration-env new file mode 100644 index 0000000..963cb22 --- /dev/null +++ b/orchestration/.orchestration-env @@ -0,0 +1,43 @@ +# Backend/Django +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG=1 +LOGGING_LEVEL="DEBUG" +AUTORELOAD=1 + +# CORS +DJANGO_ALLOWED_HOSTS="orchestration localhost 127.0.0.1 [::1]" +ALLOWED_HOSTS="orchestration localhost 127.0.0.1 [::1]" +DJANGO_CSRF_TRUSTED_ORIGINS="http://orchestration http://localhost http://127.0.0.1" + +# AMQP +RABBITMQ_HOST="rabbitmq" +# RABBITMQ_HOST="host-gateway" +RABBITMQ_PORT="5672" +RABBITMQ_ERLANG_COOKIE="SWQOKODSQALRPCLNMEQG" +RABBITMQ_DEFAULT_USER="orcadmin" +RABBITMQ_DEFAULT_PASS="adminorc" +RABBITMQ_DEFAULT_VHOST="/" + +# Database +# DB_ENGINE=django.db.backends.postgresql +# DB_USER=orchadmin +# DB_PASSWORD=adminorch +# DB_DATABASE=orchestration +# DB_HOST=database +# DB_PORT=5432 + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY=YvbMFaR6cJUB5x9PBK6KciWljqbavSfA7K9sqZD-cpM + +DB_DIR="/db" + +LOG_DIR="/logs" + +# directory where it will contain the processing of the pipelines. +PROCESSING_DIR=/processes + +# directory where it will contain the source code of the pipelines. +PIPELINES_DIR=/pipelines + +# directory where it will contain the datasets. +DATASETS_DIR=/datasets diff --git a/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=0.parquet b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=0.parquet new file mode 100644 index 0000000..90a0e15 Binary files /dev/null and b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=0.parquet differ diff --git a/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=11.parquet b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=11.parquet new file mode 100644 index 0000000..3e434eb Binary files /dev/null and b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=4.parquet b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=4.parquet new file mode 100644 index 0000000..f2c8e6a Binary files /dev/null and b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=4.parquet differ diff --git a/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=8.parquet b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=8.parquet new file mode 100644 index 0000000..65207d6 Binary files /dev/null and b/orchestration/datasets/DatasetA/Norder=0/Dir=0/Npix=8.parquet differ diff --git a/orchestration/datasets/DatasetA/_common_metadata b/orchestration/datasets/DatasetA/_common_metadata new file mode 100644 index 0000000..787a029 Binary files /dev/null and b/orchestration/datasets/DatasetA/_common_metadata differ diff --git a/orchestration/datasets/DatasetA/_metadata b/orchestration/datasets/DatasetA/_metadata new file mode 100644 index 0000000..04bdc4b Binary files /dev/null and b/orchestration/datasets/DatasetA/_metadata differ diff --git a/orchestration/datasets/DatasetA/catalog_info.json b/orchestration/datasets/DatasetA/catalog_info.json new file mode 100644 index 0000000..6debd90 --- /dev/null +++ b/orchestration/datasets/DatasetA/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "DatasetA", + "catalog_type": "object", + "total_rows": 100, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/orchestration/datasets/DatasetA/point_map.fits b/orchestration/datasets/DatasetA/point_map.fits new file mode 100644 index 0000000..ffa1298 Binary files /dev/null and b/orchestration/datasets/DatasetA/point_map.fits differ diff --git a/orchestration/datasets/DatasetA/provenance_info.json b/orchestration/datasets/DatasetA/provenance_info.json new file mode 100644 index 0000000..2303142 --- /dev/null +++ b/orchestration/datasets/DatasetA/provenance_info.json @@ -0,0 +1,47 @@ +{ + "catalog_name": "DatasetA", + "catalog_type": "object", + "total_rows": 100, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec", + "version": "0.2.1", + "generation_date": "2024.02.20", + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.2.1", + "runtime_args": { + "catalog_name": "DatasetA", + "output_path": "../data-sample/hipscat/", + "output_artifact_name": "DatasetA", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "", + "dask_n_workers": 4, + "dask_threads_per_worker": 1, + "catalog_path": "../data-sample/hipscat/DatasetA", + "tmp_path": "../data-sample/hipscat/DatasetA/intermediate", + "epoch": "J2000", + "catalog_type": "object", + "input_path": "../data-sample/raw/A", + "input_paths": [ + "file:///home/singulani/projects/slurm_lsdb/import/../data-sample/raw/A/datasetA.parquet" + ], + "input_format": "parquet", + "input_file_list": [], + "ra_column": "ra", + "dec_column": "dec", + "use_hipscat_index": false, + "sort_columns": "id", + "constant_healpix_order": -1, + "highest_healpix_order": 7, + "pixel_threshold": 1000000, + "mapping_healpix_order": 7, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "ParquetReader", + "chunksize": 500000 + } + } + } +} diff --git a/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=4.parquet b/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=4.parquet new file mode 100644 index 0000000..2b589b5 Binary files /dev/null and b/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=4.parquet differ diff --git a/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=8.parquet b/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=8.parquet new file mode 100644 index 0000000..372b90d Binary files /dev/null and b/orchestration/datasets/DatasetB/Norder=0/Dir=0/Npix=8.parquet differ diff --git a/orchestration/datasets/DatasetB/_common_metadata b/orchestration/datasets/DatasetB/_common_metadata new file mode 100644 index 0000000..fa44b6f Binary files /dev/null and b/orchestration/datasets/DatasetB/_common_metadata differ diff --git a/orchestration/datasets/DatasetB/_metadata b/orchestration/datasets/DatasetB/_metadata new file mode 100644 index 0000000..9036c27 Binary files /dev/null and b/orchestration/datasets/DatasetB/_metadata differ diff --git a/orchestration/datasets/DatasetB/catalog_info.json b/orchestration/datasets/DatasetB/catalog_info.json new file mode 100644 index 0000000..56f1340 --- /dev/null +++ b/orchestration/datasets/DatasetB/catalog_info.json @@ -0,0 +1,8 @@ +{ + "catalog_name": "DatasetB", + "catalog_type": "object", + "total_rows": 80, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec" +} diff --git a/orchestration/datasets/DatasetB/point_map.fits b/orchestration/datasets/DatasetB/point_map.fits new file mode 100644 index 0000000..0acdd0b Binary files /dev/null and b/orchestration/datasets/DatasetB/point_map.fits differ diff --git a/orchestration/datasets/DatasetB/provenance_info.json b/orchestration/datasets/DatasetB/provenance_info.json new file mode 100644 index 0000000..6f01c9d --- /dev/null +++ b/orchestration/datasets/DatasetB/provenance_info.json @@ -0,0 +1,47 @@ +{ + "catalog_name": "DatasetB", + "catalog_type": "object", + "total_rows": 80, + "epoch": "J2000", + "ra_column": "ra", + "dec_column": "dec", + "version": "0.2.1", + "generation_date": "2024.02.20", + "tool_args": { + "tool_name": "hipscat_import", + "version": "0.2.1", + "runtime_args": { + "catalog_name": "DatasetB", + "output_path": "../data-sample/hipscat/", + "output_artifact_name": "DatasetB", + "tmp_dir": "", + "overwrite": true, + "dask_tmp": "", + "dask_n_workers": 4, + "dask_threads_per_worker": 1, + "catalog_path": "../data-sample/hipscat/DatasetB", + "tmp_path": "../data-sample/hipscat/DatasetB/intermediate", + "epoch": "J2000", + "catalog_type": "object", + "input_path": "../data-sample/raw/B", + "input_paths": [ + "file:///home/singulani/projects/slurm_lsdb/import/../data-sample/raw/B/datasetB.parquet" + ], + "input_format": "parquet", + "input_file_list": [], + "ra_column": "ra", + "dec_column": "dec", + "use_hipscat_index": false, + "sort_columns": "z", + "constant_healpix_order": -1, + "highest_healpix_order": 7, + "pixel_threshold": 1000000, + "mapping_healpix_order": 7, + "debug_stats_only": false, + "file_reader_info": { + "input_reader_type": "ParquetReader", + "chunksize": 500000 + } + } + } +} diff --git a/orchestration/datasets/README b/orchestration/datasets/README new file mode 100644 index 0000000..a225db4 --- /dev/null +++ b/orchestration/datasets/README @@ -0,0 +1 @@ +# Directory that will contain the example datasets. \ No newline at end of file diff --git a/orchestration/pipelines/cross_lsdb_dev/VERSION b/orchestration/pipelines/cross_lsdb_dev/VERSION new file mode 100644 index 0000000..8a9ecc2 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/VERSION @@ -0,0 +1 @@ +0.0.1 \ No newline at end of file diff --git a/orchestration/pipelines/cross_lsdb_dev/config.py b/orchestration/pipelines/cross_lsdb_dev/config.py new file mode 100644 index 0000000..4961cd0 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/config.py @@ -0,0 +1,51 @@ +from pydantic import BaseModel +import os + +DATASETS_DIR = os.getenv("DATASETS_DIR", "/datasets") + + +class Instance(BaseModel): + processes: int = 1 + memory: str = "123GiB" + queue: str = "cpu" + job_extra_directives: list[str] = ["--propagate", "--time=2:00:00"] + + +class Adapt(BaseModel): + maximum_jobs: int = 10 + + +class LIneASlurm(BaseModel): + instance: Instance = Instance() + adapt: Adapt = Adapt() + + +class Local(BaseModel): + n_workers: int = 2 + threads_per_worker: int = 2 + memory_limit: str = "1GiB" + + +class Inputs(BaseModel): + photo: str = f"{DATASETS_DIR}/DatasetA" + specz: str = f"{DATASETS_DIR}/DatasetB" + + +class Executor(BaseModel): + local: Local = Local() + linea_slurm: LIneASlurm = LIneASlurm() + + +class Config(BaseModel): + output_dir: str = "./output" + executor: Executor = Executor() + inputs: Inputs = Inputs() + + +if __name__ == "__main__": + import yaml + + cfg = Config() + + with open('config.yml', 'w') as outfile: + yaml.dump(cfg.model_dump(), outfile) diff --git a/orchestration/pipelines/cross_lsdb_dev/environment.yml b/orchestration/pipelines/cross_lsdb_dev/environment.yml new file mode 100644 index 0000000..673503b --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/environment.yml @@ -0,0 +1,14 @@ +name: pipe_cross_lsdb_dev +channels: + - defaults +dependencies: + - python=3.10 + - pip: + - PyYaml + - dask==2024.1.0 + - distributed==2024.1.0 + - dask-jobqueue==0.8.2 + - hipscat==0.2.1 + - hipscat-import==0.2.1 + - lsdb==0.1.0 + diff --git a/orchestration/pipelines/cross_lsdb_dev/install.sh b/orchestration/pipelines/cross_lsdb_dev/install.sh new file mode 100755 index 0000000..11d0265 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/install.sh @@ -0,0 +1,34 @@ +#!/bin/bash --login + +source `dirname $CONDA_EXE`/activate || { echo "Failed to activate Conda environment"; exit 1; } + +if [ ! -d "$PIPELINES_DIR" ]; then + echo "Error: PIPELINES_DIR not defined." + exit 1 +fi + +PIPE_BASE="$PIPELINES_DIR/cross_lsdb_dev" +HASENV=`conda env list | grep 'pipe_cross_lsdb_dev '` + +if [ -z "$HASENV" ]; then + echo "Create virtual environment..." + conda env create -f $PIPE_BASE/environment.yml + echo "Virtual environment created and packages installed." +else + if [ "$CONDA_FORCE_UPDATE" == "yes" ]; then + echo "Virtual environment already exists. Updating..." + conda env update --file $PIPE_BASE/environment.yml --prune + fi +fi + +conda activate pipe_cross_lsdb_dev + +export PATH=$PATH:"$PIPE_BASE/scripts/" + +if [ -z "$PYTHONPATH" ]; then + export PYTHONPATH="$PIPE_BASE/packages/" +else + export PYTHONPATH=$PYTHONPATH:"$PIPE_BASE/packages/" +fi + +echo "Conda Environment: $CONDA_DEFAULT_ENV" diff --git a/orchestration/pipelines/cross_lsdb_dev/packages/__init__.py b/orchestration/pipelines/cross_lsdb_dev/packages/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/orchestration/pipelines/cross_lsdb_dev/packages/executor.py b/orchestration/pipelines/cross_lsdb_dev/packages/executor.py new file mode 100755 index 0000000..fcb6746 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/packages/executor.py @@ -0,0 +1,47 @@ +"""_summary_ """ + +from dask.distributed import LocalCluster +from dask_jobqueue import SLURMCluster +from utils import load_yml +import logging +from typing import Union + + +def get_executor_config( + executor_key: str, config_file: str +) -> Union[LocalCluster, SLURMCluster]: + """returns the configuration of where the pipeline will be run + + Args: + executor_key (str): executor key + config_file (str): config path + + Returns: + Union[LocalCluster, SLURMCluster]: Executor object + """ + + logger = logging.getLogger() + logger.info("Getting executor config: %s", executor_key) + + configs = load_yml(config_file) + + try: + config = configs["executor"][executor_key] + except KeyError: + logger.warning("The executor key not found. Using minimal local config.") + executor_key = "minimal" + + match executor_key: + case "local": + cluster = LocalCluster(**config) + case "linea-slurm": + icfg = config["instance"] + cluster = SLURMCluster(**icfg) + cluster.adapt(**config["adapt"]) + case _: + cluster = LocalCluster( + n_workers=1, + threads_per_worker=1, + ) + + return cluster diff --git a/orchestration/pipelines/cross_lsdb_dev/packages/utils.py b/orchestration/pipelines/cross_lsdb_dev/packages/utils.py new file mode 100755 index 0000000..2ce2487 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/packages/utils.py @@ -0,0 +1,46 @@ +"""_summary_ """ + +import yaml +import logging +import os +import pathlib +from typing import Any + + +def setup_logger(name="pipeline-logger"): + """ + Configures the logger for recording events and messages. + + Returns: + logging.Logger: Configured logger instance. + """ + + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + + logdir = os.getenv("LOG_DIR", ".") + + file_handler = logging.FileHandler(pathlib.Path(logdir, f"{name}.log")) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + + logger.addHandler(file_handler) + + return logger + + +def load_yml(filepath: str) -> Any: + """Load yaml file + + Args: + filepath (str): filepath + + Returns: + Any: yaml file content + """ + with open(filepath, encoding="utf-8") as _file: + content = yaml.safe_load(_file) + + return content diff --git a/orchestration/pipelines/cross_lsdb_dev/run.sh b/orchestration/pipelines/cross_lsdb_dev/run.sh new file mode 100755 index 0000000..093ce76 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/run.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# Check if the argument was given +if [ $# -eq 0 ]; then + echo "Error: No arguments provided." + exit 1 +fi + +ARGS=$@ +shift $# + +if [ ! -d "$DASK_EXECUTOR_KEY" ]; then + export DASK_EXECUTOR_KEY=local +fi + +if [ ! -d "$PIPELINES_DIR" ]; then + echo "Error: PIPELINES_DIR not defined." + exit 1 +fi + +INSTALL_PIPE="$PIPELINES_DIR/cross_lsdb_dev/install.sh" + +if [ ! -f "$INSTALL_PIPE" ]; then + echo "Error: Installation script not found." + exit 1 +fi + +# Installing pipeline +echo "Installing pipeline..." +. "$INSTALL_PIPE" + +set -xe + +# Run the Python code with the given argument +# run-crossmatch $ARGS || { echo "Failed to run-crossmatch"; exit 1; } +run-crossmatch $ARGS + +echo $? >> return.code + +echo "Done." \ No newline at end of file diff --git a/orchestration/pipelines/cross_lsdb_dev/scripts/run-crossmatch b/orchestration/pipelines/cross_lsdb_dev/scripts/run-crossmatch new file mode 100755 index 0000000..831ec92 --- /dev/null +++ b/orchestration/pipelines/cross_lsdb_dev/scripts/run-crossmatch @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 + +import argparse +import time +import os +from pathlib import Path +from dask.distributed import Client +import lsdb + +from utils import setup_logger, load_yml +from executor import get_executor_config + + +def run(config_file): + """Run lsdb crossmatch + + Args: + config_file (str): lsdb parameters + """ + + logger = setup_logger(name="cross-lsdb") + + start_time_full = time.time() + + # Loading configurations + pipe_config = load_yml(config_file) + param = pipe_config.get("inputs") + logger.info("Parameters: %s", param) + + executor_key = os.getenv("DASK_EXECUTOR_KEY", "local") + cluster = get_executor_config(executor_key, config_file) + + with Client(cluster): + phot_dp0 = lsdb.read_hipscat(param.get("photo")) + spec_dp0 = lsdb.read_hipscat(param.get("specz")) + + cross = spec_dp0.crossmatch(phot_dp0) + data = cross.compute() + + os.makedirs(pipe_config.get("output_dir"), exist_ok=True) + outputfile = Path(pipe_config.get("output_dir"), "cross-output.parquet") + data.to_parquet(outputfile) + + logger.info("--> Object Count: \n%s", str(data.count())) + + cluster.close() + + logger.info("Time elapsed: %s", str(time.time() - start_time_full)) + + +if __name__ == "__main__": + # Create the parser and add arguments + parser = argparse.ArgumentParser() + parser.add_argument(dest="config_path", help="yaml config path") + + args = parser.parse_args() + config_path = args.config_path + + # Run pipeline + run(config_path) diff --git a/orchestration/pipelines/load_pipelines.sh b/orchestration/pipelines/load_pipelines.sh new file mode 100755 index 0000000..1749f37 --- /dev/null +++ b/orchestration/pipelines/load_pipelines.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +cat << EOF > ${PIPELINES_DIR}/pipelines.yaml +cross_lsdb_dev: + display_name: 'LSDB Crossmatch (dev)' + path: '${PIPELINES_DIR}/cross_lsdb_dev' + executor: 'local' # only to orchestration + runner: 'bash' + executable: 'run.sh' + schema_config: '${PIPELINES_DIR}/cross_lsdb_dev/config.py' + version: '0.0.1' +EOF \ No newline at end of file diff --git a/orchestration/pipelines/pipelines.yaml b/orchestration/pipelines/pipelines.yaml new file mode 100644 index 0000000..18c911c --- /dev/null +++ b/orchestration/pipelines/pipelines.yaml @@ -0,0 +1,8 @@ +cross_lsdb_dev: + display_name: 'LSDB Crossmatch (dev)' + path: '/pipelines/cross_lsdb_dev' + executor: 'local' # only to orchestration + runner: 'bash' + executable: 'run.sh' + schema_config: '/pipelines/cross_lsdb_dev/config.py' + version: '0.0.1' diff --git a/orchestration/pipelines/pipelines.yaml.template b/orchestration/pipelines/pipelines.yaml.template new file mode 100644 index 0000000..18c911c --- /dev/null +++ b/orchestration/pipelines/pipelines.yaml.template @@ -0,0 +1,8 @@ +cross_lsdb_dev: + display_name: 'LSDB Crossmatch (dev)' + path: '/pipelines/cross_lsdb_dev' + executor: 'local' # only to orchestration + runner: 'bash' + executable: 'run.sh' + schema_config: '/pipelines/cross_lsdb_dev/config.py' + version: '0.0.1' diff --git a/orchestration/rabbitmq/enabled_plugins b/orchestration/rabbitmq/enabled_plugins new file mode 100644 index 0000000..2843682 --- /dev/null +++ b/orchestration/rabbitmq/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_management, rabbitmq_management_visualiser]. \ No newline at end of file diff --git a/pipeline.py b/pipeline.py new file mode 100644 index 0000000..40b3872 --- /dev/null +++ b/pipeline.py @@ -0,0 +1,16 @@ +from core.models import ProductType +from django.db import models + + +class Pipeline(models.Model): + + name = models.CharField(max_length=255, unique=True) + display_name = models.CharField(max_length=255, null=True, blank=True) + description = models.TextField(null=True, blank=True) + created_at = models.DateTimeField(auto_now_add=True) + use_release = models.BooleanField(default=False) + product_types = models.ManyToManyField(ProductType, related_name="pipelines") + default_config = models.JSONField(null=True, blank=True) + + def __str__(self): + return f"{self.name}" \ No newline at end of file