Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SITCOM-863: Implement core functionality #2

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build_docs.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: docs

Check warning on line 1 in .github/workflows/build_docs.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

1:1 [document-start] missing document start "---"

on:

Check warning on line 3 in .github/workflows/build_docs.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

3:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
Expand Down Expand Up @@ -34,7 +34,9 @@
run: ls python/lsst/rubintv/analysis/service

- name: Install documenteer
run: pip install 'documenteer[pipelines]<0.7'
run: |
pip install 'sphinx<7'
pip install 'documenteer[pipelines]==0.8.2'

- name: Build documentation
working-directory: ./doc
Expand Down
51 changes: 51 additions & 0 deletions doc/lsst.rubintv.analysis.service/design.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
.. _rubintv_analysis_service-design:

=====================================
Design of rubintv_analysis_service
=====================================

.. contents:: Table of Contents
:depth: 2

Overview
========

The ``rubintv_analysis_service`` is a backend Python service designed to support the Derived Data Visualization (DDV) tool within the Rubin Observatory's software ecosystem. It provides a set of libraries and scripts that facilitate the analysis and visualization of astronomical data.

Architecture
============

The service is structured around a series of commands and tasks, each responsible for a specific aspect of data processing and visualization. Key components include:

- **Worker Script**: A script that initializes and runs the service, handling configuration and database connections.

- [`rubintv_worker.py`](rubintv_analysis_service/scripts/rubintv_worker.py)

The script is designed to be run on a worker POD that is part of a Kubernetes cluster. It is responsible for initializing the service, loading configuration, and connecting to the Butler and consDB. It listens for incoming commands from the web application, executes them, and returns the results.

There is also a [`mock server`](rubintv_analysis_service/scripts/mock_server.py) that can be used for testing the service before being built on either the USDF or summit.

- **Commands**: Modular operations that perform specific tasks, such as loading columns, detector images, and detector information. These are implemented in various Python modules within the ``commands`` directory, for example the[`db.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/db.py) module contains commands for loading information from the consolidated database (consDB), while the [`image.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/image.py) module contains commands for loading detector images (not yet implemented), and [`butler.py`](rubintv_analysis_service/python/lsst/rubintv/analysis/service/commands/butler.py) contains commands for loading data from a Butler repository.

All commands derive from the `BaseCommand` class, which provides a common interface for command execution. All inherited classes are required to have parameters as keyword arguments, and implement the `BaseCommand.build_contents` method. This is done to separate the different steps in processing a command:
1. Reading the JSON command and converting it into a python dictionary.
2. Parsing the command and converting it from JSON into a `BaseCommand` instance.
3. Executing the command.
4. Packaging the results of the command into a JSON response and sending it to the rubintv web application.

The `BaseCommand.build_contents` method is called during execution, and must return the result as a `dict` that will be converted into JSON and returned to the user.

Configuration
=============

Configuration for the service is managed through the following YAML files, allowing for flexible deployment and customization of the service's behavior:

- **config.yaml**: Main configuration file specifying service parameters.
- **joins.yaml**: Configuration for database joins.

Configuration options can be overwritten using commad line arguments, which are parsed using the `argparse` module.

Dart/Flutter Frontend
=====================

The frontend of the DDV tool is implemented using the Dart programming language and the Flutter framework. It provides a web-based interface for users to interact with the service, submit commands, and visualize the results, and is located at https://github.com/lsst-ts/rubintv_visualization, which is built on top of [`rubin_chart`](https://github.com/lsst-sitcom/rubin_chart), an open source plotting library in flutter also written by the project.
5 changes: 4 additions & 1 deletion doc/lsst.rubintv.analysis.service/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
lsst.rubintv.analysis.service
#############################

.. Paragraph that describes what this Python module does and links to related modules and frameworks.
This is the backend python service to run the Derived Data Visualization (DDV) tool.


.. _lsst.rubintv.analysis.service-using:

Expand All @@ -18,6 +19,8 @@ toctree linking to topics related to using the module's APIs.
.. toctree::
:maxdepth: 2

design

.. _lsst.rubintv.analysis.service-contributing:

Contributing
Expand Down
23 changes: 22 additions & 1 deletion python/lsst/rubintv/analysis/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,22 @@
from . import command, database, query, utils
# This file is part of lsst_rubintv_analysis_service.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from . import butler, command, commands, data, database, efd, query, utils, viewer, worker
2 changes: 2 additions & 0 deletions python/lsst/rubintv/analysis/service/butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@

@dataclass
class ExampleButlerCommand(BaseCommand):
"""Placeholder for butler commands"""

pass
95 changes: 45 additions & 50 deletions python/lsst/rubintv/analysis/service/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

import json
import logging
import traceback
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from .data import DataCenter

import sqlalchemy
from lsst.daf.butler import Butler

logger = logging.getLogger("lsst.rubintv.analysis.service.command")


def construct_error_message(error_name: str, description: str) -> str:
def construct_error_message(error_name: str, description: str, traceback: str) -> str:
"""Use a standard format for all error messages.

Parameters
Expand All @@ -51,12 +56,13 @@ def construct_error_message(error_name: str, description: str) -> str:
"content": {
"error": error_name,
"description": description,
"traceback": traceback,
},
}
)


def error_msg(error: Exception) -> str:
def error_msg(error: Exception, traceback: str) -> str:
"""Handle errors received while parsing or executing a command.

Parameters
Expand All @@ -72,23 +78,23 @@ def error_msg(error: Exception) -> str:

"""
if isinstance(error, json.decoder.JSONDecodeError):
return construct_error_message("JSON decoder error", error.args[0])
return construct_error_message("JSON decoder error", error.args[0], traceback)

if isinstance(error, CommandParsingError):
return construct_error_message("parsing error", error.args[0])
return construct_error_message("parsing error", error.args[0], traceback)

if isinstance(error, CommandExecutionError):
return construct_error_message("execution error", error.args[0])
return construct_error_message("execution error", error.args[0], traceback)

if isinstance(error, CommandResponseError):
return construct_error_message("command response error", error.args[0])
return construct_error_message("command response error", error.args[0], traceback)

# We should always receive one of the above errors, so the code should
# never get to here. But we generate this response just in case something
# very unexpected happens, or (more likely) the code is altered in such a
# way that this line is it.
msg = "An unknown error occurred, you should never reach this message."
return construct_error_message(error.__class__.__name__, msg)
return construct_error_message(error.__class__.__name__, msg, traceback)


class CommandParsingError(Exception):
Expand All @@ -111,22 +117,6 @@ class CommandResponseError(Exception):
pass


@dataclass
class DatabaseConnection:
"""A connection to a database.

Attributes
----------
engine :
The engine used to connect to the database.
schema :
The schema for the database.
"""

engine: sqlalchemy.engine.Engine
schema: dict


@dataclass(kw_only=True)
class BaseCommand(ABC):
"""Base class for commands.
Expand All @@ -146,15 +136,13 @@ class BaseCommand(ABC):
response_type: str

@abstractmethod
def build_contents(self, databases: dict[str, DatabaseConnection], butler: Butler | None) -> dict:
def build_contents(self, data_center: DataCenter) -> dict:
"""Build the contents of the command.

Parameters
----------
databases :
The database connections.
butler :
A connected Butler.
data_center :
Connections to databases, the Butler, and the EFD.

Returns
-------
Expand All @@ -163,26 +151,26 @@ def build_contents(self, databases: dict[str, DatabaseConnection], butler: Butle
"""
pass

def execute(self, databases: dict[str, DatabaseConnection], butler: Butler | None):
def execute(self, data_center: DataCenter):
"""Execute the command.

This method does not return anything, buts sets the `result`,
the JSON formatted string that is sent to the user.

Parameters
----------
databases :
The database connections.
butler :
A conencted Butler.
data_center :
Connections to databases, the Butler, and the EFD.

"""
self.result = {"type": self.response_type, "content": self.build_contents(databases, butler)}
self.result = {"type": self.response_type, "content": self.build_contents(data_center)}

def to_json(self):
def to_json(self, request_id: str | None = None):
"""Convert the `result` into JSON."""
if self.result is None:
raise CommandExecutionError(f"Null result for command {self.__class__.__name__}")
if request_id is not None:
self.result["requestId"] = request_id
return json.dumps(self.result)

@classmethod
Expand All @@ -191,7 +179,7 @@ def register(cls, name: str):
BaseCommand.command_registry[name] = cls


def execute_command(command_str: str, databases: dict[str, DatabaseConnection], butler: Butler | None) -> str:
def execute_command(command_str: str, data_center: DataCenter) -> str:
"""Parse a JSON formatted string into a command and execute it.

Command format:
Expand All @@ -206,18 +194,17 @@ def execute_command(command_str: str, databases: dict[str, DatabaseConnection],
----------
command_str :
The JSON formatted command received from the user.
databases :
The database connections.
butler :
A connected Butler.
data_center :
Connections to databases, the Butler, and the EFD.
"""
try:
command_dict = json.loads(command_str)
if not isinstance(command_dict, dict):
raise CommandParsingError(f"Could not generate a valid command from {command_str}")
except Exception as err:
logging.exception("Error converting command to JSON.")
return error_msg(err)
traceback_string = traceback.format_exc()
return error_msg(err, traceback_string)

try:
if "name" not in command_dict.keys():
Expand All @@ -230,19 +217,27 @@ def execute_command(command_str: str, databases: dict[str, DatabaseConnection],
command = BaseCommand.command_registry[command_dict["name"]](**parameters)

except Exception as err:
logging.exception("Error parsing command.")
return error_msg(CommandParsingError(f"'{err}' error while parsing command"))
logging.exception(f"Error parsing command {command_dict}")
traceback_string = traceback.format_exc()
return error_msg(CommandParsingError(f"'{err}' error while parsing command"), traceback_string)

try:
command.execute(databases, butler)
command.execute(data_center)
except Exception as err:
logging.exception("Error executing command.")
return error_msg(CommandExecutionError(f"{err} error executing command."))
logging.exception(f"Error executing command {command_dict}")
traceback_string = traceback.format_exc()
return error_msg(CommandExecutionError(f"{err} error executing command."), traceback_string)

try:
result = command.to_json()
if "requestId" in command_dict:
result = command.to_json(command_dict["requestId"])
else:
result = command.to_json()
except Exception as err:
logging.exception("Error converting command response to JSON.")
return error_msg(CommandResponseError(f"{err} error converting command response to JSON."))
traceback_string = traceback.format_exc()
return error_msg(
CommandResponseError(f"{err} error converting command response to JSON."), traceback_string
)

return result
24 changes: 24 additions & 0 deletions python/lsst/rubintv/analysis/service/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# This file is part of lsst_rubintv_analysis_service.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .butler import *
from .db import *
from .image import *
Loading
Loading