Skip to content

Commit

Permalink
Error in validating of executors configuration (#297)
Browse files Browse the repository at this point in the history
* Added Combine Specz pipeline

* changed submodule branch

* changed submodule branch

* changed submodule branch

* Fixed error in validating of executors configuration

* Fixed small error
  • Loading branch information
crisingulani authored Sep 4, 2024
1 parent 078cec6 commit d859c9c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
27 changes: 17 additions & 10 deletions backend/core/views/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ def create(self, request):
serializer.is_valid(raise_exception=True)

try:
logger.debug(f"Create DB process: {request.data}")
logger.debug("Create DB process: %s", request.data)
instance = self.perform_create(serializer)
process = Process.objects.get(pk=instance.pk)
process.save()
logger.debug(f"Process ID {instance.pk} inserted.")
logger.debug("Process ID %s inserted.", instance.pk)
except Exception as err:
content = {"error": str(err)}
logger.error(err)
return Response(content, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

try:
orch_url = settings.ORCHEST_URL
logger.debug(f"Instantiating maestro: {orch_url}")
logger.debug("Instantiating maestro: %s", orch_url)
maestro = Maestro(url=orch_url)

release_path = None
Expand All @@ -96,13 +96,14 @@ def create(self, request):
pathlib.Path(settings.DATASETS_DIR, process.release.name)
)
release_index_col = process.release.indexing_column
logger.debug(f"Release: {process.release}")
logger.debug("Release: %s", process.release)

used_config = {}

if process.used_config:
used_config = process.used_config
logger.debug(f"Config: {used_config}")

logger.debug("Config: %s", used_config)

_inputs = process.inputs.all()
inputfiles = []
Expand All @@ -117,22 +118,28 @@ def create(self, request):
dec = self.__get_mapped_column(_input, "Dec")
z = self.__get_mapped_column(_input, "z")

_file = {"path": str(filepath), "columns": {"ra": ra, "dec": dec, "z": z}}
_file = {
"path": str(filepath),
"columns": {"ra": ra, "dec": dec, "z": z},
}
inputfiles.append(_file)

used_config["inputs"] = {
"dataset": {"path": release_path, "columns": {"id": release_index_col}},
"specz": inputfiles,
}

logger.debug(f"Inputs: {used_config.get('inputs')}")
logger.debug("Inputs: %s", used_config.get("inputs"))

orch_process = maestro.start(
pipeline=process.pipeline.name, config=used_config
)
logger.debug(f"Process submitted: ORCH_ID {process.orchestration_process_id}")

process.orchestration_process_id = orch_process.get("id")
orch_process_id = orch_process.get("id")

logger.debug("Process submitted: ORCH_ID %s", orch_process_id)

process.orchestration_process_id = orch_process_id
process.used_config = json.loads(orch_process.get("used_config", None))
process.path = orch_process.get("path_str")
process.save()
Expand Down
4 changes: 4 additions & 0 deletions orchestration/datasets/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*
*/
!.gitignore
!mini_dataset
2 changes: 1 addition & 1 deletion orchestration/pipelines

0 comments on commit d859c9c

Please sign in to comment.