Skip to content

Commit

Permalink
Added Combine Specz pipeline (#295)
Browse files Browse the repository at this point in the history
* Added Combine Specz pipeline

* changed submodule branch

* changed submodule branch

* changed submodule branch
  • Loading branch information
crisingulani committed Aug 28, 2024
1 parent a950db3 commit adcbb49
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "orchestration/pipelines"]
path = orchestration/pipelines
url = https://github.com/linea-it/pzserver_pipelines
branch = training_set_maker
branch = main
60 changes: 58 additions & 2 deletions backend/core/fixtures/initial_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
display_name: Small Dataset
description: Small dataset for example runs
created_at: 2022-05-18 15:36:16.234786+00:00
indexing_column: id
- model: core.producttype
pk: 1
fields:
Expand Down Expand Up @@ -45,6 +46,61 @@
version: 0.0.1
description: Training Set Maker pipeline
created_at: 2022-05-18 15:36:59.830913+00:00
system_config: {"executor": {"local": {"n_workers": 2, "threads_per_worker": 2, "memory_limit": "1GiB"}, "linea-slurm": {"instance": {"cores": 54, "processes": 1, "memory": "123GiB", "queue": "cpu", "job_extra_directives": ["--propagate", "--time=2:00:00"]}, "adapt": {"maximum_jobs": 10}}}, "inputs": {"dataset": {"path": "/datasets/mini_dataset"}, "specz": [{"path": "/datasets/specz.parquet", "columns": {"ra": "ra", "dec": "dec"}}]}, "output_dir": "outputs", "param": {"suffixes": ["_specz", "_dataset"], "output_catalog_name": "tsm_cross_001", "radius_arcsec": 1, "n_neighbors": 1}}
system_config:
executor:
name: "local"
args:
n_workers: 2
threads_per_worker: 2
memory_limit: "1GiB"
inputs:
dataset:
path: "/datasets/data-example/mini_dataset"
columns:
id: "id"
specz:
- path: "/datasets/data-example/specz.parquet"
columns:
ra: "ra"
dec: "dec"
output_dir: "outputs"
param:
duplicate_criteria: "closest"
crossmatch:
output_catalog_name: "tsm_cross_001"
radius_arcsec: 1.0
n_neighbors: 1
product_types_accepted: [1]
output_product_type: 2
output_product_type: 2
- model: core.pipeline
pk: 2
fields:
name: combine_specz
display_name: Combine Specz Catalogs
version: 0.0.1
description: Combine Specz Catalogs pipeline
created_at: 2022-05-18 15:36:59.830913+00:00
system_config:
executor:
name: "local" # or "slurm"
args:
n_workers: 2
threads_per_worker: 2
memory_limit: "1GiB"
inputs:
specz:
- path: "/datasets/specz.parquet"
columns:
ra: "ra"
dec: "dec"
z: "z"
- path: "/datasets/specz.parquet"
columns:
ra: "ra"
dec: "dec"
z: "z"
output_dir: "outputs"
param:
debug: true
product_types_accepted: [1]
output_product_type: 1
19 changes: 19 additions & 0 deletions backend/core/migrations/0039_release_indexing_column.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 5.0.6 on 2024-08-26 20:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0038_alter_process_path_alter_release_name'),
]

operations = [
migrations.AddField(
model_name='release',
name='indexing_column',
field=models.CharField(default='id', max_length=255),
preserve_default=False,
),
]
2 changes: 2 additions & 0 deletions backend/core/models/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ class Release(models.Model):
display_name = models.CharField(max_length=255)
description = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
indexing_column = models.CharField(max_length=255)


def __str__(self):
return f"{self.display_name}"
67 changes: 51 additions & 16 deletions backend/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,56 @@
from django.conf import settings
from django.utils import dateparse, timezone

logger = logging.getLogger('beat')
logger = logging.getLogger("beat")
maestro = Maestro(settings.ORCHEST_URL)


@shared_task()
def check_stopping():
logger.info("Checking processes stopping...")

procs_updated = []
monitoring_statuses = ["Stopping"]
procs_stopping = Process.objects.filter(status__in=monitoring_statuses)

for proc in procs_stopping:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
logger.error(message)
proc.status = "Failed"
proc = update_dates(proc, {})
proc.save()
continue

proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")

if not proc_orchest_status in monitoring_statuses:
proc.status = proc_orchest_status
proc.save()
logger.info(f"-> Process {str(proc)} updated.")
procs_updated.append(proc_orches_id)

return procs_updated


@shared_task()
def check_processes_finish():
logger.info("Checking running processes...")

procs_updated = []
active_statuses = ['Pending', 'Running']
active_statuses = ["Pending", "Running"]
procs_running = Process.objects.filter(status__in=active_statuses)

for proc in procs_running:
logger.info(f"Consulting the {str(proc)} process status.")
proc_orches_id = proc.orchestration_process_id # type: ignore
proc_orches_id = proc.orchestration_process_id # type: ignore

if not proc_orches_id:
message = f"Process {str(proc.pk)} without Orchestration ID."
Expand All @@ -36,13 +71,13 @@ def check_processes_finish():
continue

proc_orchest = maestro.status(proc_orches_id)
proc_orchest_status = proc_orchest.get('status') # type: ignore
proc_orchest_status = proc_orchest.get("status") # type: ignore

logger.info(f"-> Process orchestration ID: {proc_orches_id}")
logger.info(f"-> Status: {proc_orchest_status}")
if proc_orchest_status == 'Running' and not proc.status:
started_at = proc_orchest.get('started_at', str(proc.created_at))

if proc_orchest_status == "Running" and not proc.status:
started_at = proc_orchest.get("started_at", str(proc.created_at))
proc.started_at = dateparse.parse_datetime(started_at)
proc.save()

Expand All @@ -58,8 +93,8 @@ def check_processes_finish():


def update_dates(process, data):
started_at = data.get('started_at', str(process.created_at))
ended_at = data.get('ended_at', str(timezone.now()))
started_at = data.get("started_at", str(process.created_at))
ended_at = data.get("ended_at", str(timezone.now()))
process.started_at = dateparse.parse_datetime(started_at)
process.ended_at = dateparse.parse_datetime(ended_at)
return process
Expand All @@ -82,18 +117,18 @@ def register_outputs(process_id):
reg_product = RegistryProduct(process.upload.pk)

process_file_dict = load_yaml(process_file)
outputs = process_file_dict.get('outputs', None)
outputs = process_file_dict.get("outputs", None)

try:
for output in outputs:
logger.debug('-> output: %s', output)
filepath = output.get('path')
rolename = output.get('role')
role_id = file_roles.get(rolename, file_roles.get('description'))
logger.debug("-> output: %s", output)
filepath = output.get("path")
rolename = output.get("role")
role_id = file_roles.get(rolename, file_roles.get("description"))
upload_path = copy_upload(filepath, process.upload.path)
reg_product.create_product_file(upload_path, role_id)
process.upload.save()

reg_product.registry()
process.upload.status = 1 # Published status
process.upload.save()
Expand All @@ -108,7 +143,7 @@ def register_outputs(process_id):
def copy_upload(filepath, upload_dir):
filepath = pathlib.Path(filepath)
new_filepath = pathlib.Path(settings.MEDIA_ROOT, upload_dir, filepath.name)
logger.debug('new_filepath -> %s', str(new_filepath))
logger.debug("new_filepath -> %s", str(new_filepath))
shutil.copyfile(str(filepath), str(new_filepath))
return str(new_filepath)

Expand Down
Loading

0 comments on commit adcbb49

Please sign in to comment.