Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Combine Specz pipeline #295

Merged
merged 5 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "orchestration/pipelines"]
path = orchestration/pipelines
url = https://github.com/linea-it/pzserver_pipelines
branch = training_set_maker
branch = main
60 changes: 58 additions & 2 deletions backend/core/fixtures/initial_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
display_name: Small Dataset
description: Small dataset for example runs
created_at: 2022-05-18 15:36:16.234786+00:00
indexing_column: id
- model: core.producttype
pk: 1
fields:
Expand Down Expand Up @@ -45,6 +46,61 @@
version: 0.0.1
description: Training Set Maker pipeline
created_at: 2022-05-18 15:36:59.830913+00:00
system_config: {"executor": {"local": {"n_workers": 2, "threads_per_worker": 2, "memory_limit": "1GiB"}, "linea-slurm": {"instance": {"cores": 54, "processes": 1, "memory": "123GiB", "queue": "cpu", "job_extra_directives": ["--propagate", "--time=2:00:00"]}, "adapt": {"maximum_jobs": 10}}}, "inputs": {"dataset": {"path": "/datasets/mini_dataset"}, "specz": [{"path": "/datasets/specz.parquet", "columns": {"ra": "ra", "dec": "dec"}}]}, "output_dir": "outputs", "param": {"suffixes": ["_specz", "_dataset"], "output_catalog_name": "tsm_cross_001", "radius_arcsec": 1, "n_neighbors": 1}}
system_config:
executor:
name: "local"
args:
n_workers: 2
threads_per_worker: 2
memory_limit: "1GiB"
inputs:
dataset:
path: "/datasets/data-example/mini_dataset"
columns:
id: "id"
specz:
- path: "/datasets/data-example/specz.parquet"
columns:
ra: "ra"
dec: "dec"
output_dir: "outputs"
param:
duplicate_criteria: "closest"
crossmatch:
output_catalog_name: "tsm_cross_001"
radius_arcsec: 1.0
n_neighbors: 1
product_types_accepted: [1]
output_product_type: 2
output_product_type: 2
- model: core.pipeline
pk: 2
fields:
name: combine_specz
display_name: Combine Specz Catalogs
version: 0.0.1
description: Combine Specz Catalogs pipeline
created_at: 2022-05-18 15:36:59.830913+00:00
system_config:
executor:
name: "local" # or "slurm"
args:
n_workers: 2
threads_per_worker: 2
memory_limit: "1GiB"
inputs:
specz:
- path: "/datasets/specz.parquet"
columns:
ra: "ra"
dec: "dec"
z: "z"
- path: "/datasets/specz.parquet"
columns:
ra: "ra"
dec: "dec"
z: "z"
output_dir: "outputs"
param:
debug: true
product_types_accepted: [1]
output_product_type: 1
19 changes: 19 additions & 0 deletions backend/core/migrations/0039_release_indexing_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 5.0.6 on 2024-08-26 20:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0038_alter_process_path_alter_release_name'),
]

operations = [
migrations.AddField(
model_name='release',
name='indexing_column',
field=models.CharField(default='id', max_length=255),
preserve_default=False,
),
]
2 changes: 2 additions & 0 deletions backend/core/models/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ class Release(models.Model):
display_name = models.CharField(max_length=255)
description = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
indexing_column = models.CharField(max_length=255)


def __str__(self):
return f"{self.display_name}"
67 changes: 51 additions & 16 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,56 @@
from django.conf import settings
from django.utils import dateparse, timezone

logger = logging.getLogger('beat')
logger = logging.getLogger("beat")
maestro = Maestro(settings.ORCHEST_URL)


@shared_task()
def check_stopping():
logger.info("Checking processes stopping...")

procs_updated = []
monitoring_statuses = ["Stopping"]
procs_stopping = Process.objects.filter(status__in=monitoring_statuses)

for proc in procs_stopping:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
continue

proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if not proc_orchest_status in monitoring_statuses:
proc.status = proc_orchest_status
proc.save()
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)

return procs_updated


@shared_task()
def check_processes_finish():
logger.info("Checking running processes...")

procs_updated = []
active_statuses = ['Pending', 'Running']
active_statuses = ["Pending", "Running"]
procs_running = Process.objects.filter(status__in=active_statuses)

for proc in procs_running:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
Expand All @@ -36,13 +71,13 @@ def check_processes_finish():
continue

proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get('status') # type: ignore
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")
if proc_orchest_status == 'Running' and not proc.status:
started_at = proc_orchest.get('started_at', str(proc.created_at))

if proc_orchest_status == "Running" and not proc.status:
started_at = proc_orchest.get("started_at", str(proc.created_at))
proc.started_at = dateparse.parse_datetime(started_at)
proc.save()

Expand All @@ -58,8 +93,8 @@ def check_processes_finish():


def update_dates(process, data):
started_at = data.get('started_at', str(process.created_at))
ended_at = data.get('ended_at', str(timezone.now()))
started_at = data.get("started_at", str(process.created_at))
ended_at = data.get("ended_at", str(timezone.now()))
process.started_at = dateparse.parse_datetime(started_at)
process.ended_at = dateparse.parse_datetime(ended_at)
return process
Expand All @@ -82,18 +117,18 @@ def register_outputs(process_id):
reg_product = RegistryProduct(process.upload.pk)

process_file_dict = load_yaml(process_file)
outputs = process_file_dict.get('outputs', None)
outputs = process_file_dict.get("outputs", None)

try:
for output in outputs:
logger.debug('-> output: %s', output)
filepath = output.get('path')
rolename = output.get('role')
role_id = file_roles.get(rolename, file_roles.get('description'))
logger.debug("-> output: %s", output)
filepath = output.get("path")
rolename = output.get("role")
role_id = file_roles.get(rolename, file_roles.get("description"))
upload_path = copy_upload(filepath, process.upload.path)
reg_product.create_product_file(upload_path, role_id)
process.upload.save()

reg_product.registry()
process.upload.status = 1 # Published status
process.upload.save()
Expand All @@ -108,7 +143,7 @@ def register_outputs(process_id):
def copy_upload(filepath, upload_dir):
filepath = pathlib.Path(filepath)
new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name)
logger.debug('new_filepath -> %s', str(new_filepath))
logger.debug("new_filepath -> %s", str(new_filepath))
shutil.copyfile(str(filepath), str(new_filepath))
return str(new_filepath)

Expand Down
Loading
Loading