Skip to content

Commit

Permalink
Responses to reviewer comments (rebase before merging)
Browse files Browse the repository at this point in the history
  • Loading branch information
fred3m committed Oct 23, 2023
1 parent 2464942 commit 7c4409e
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 137 deletions.
2 changes: 1 addition & 1 deletion python/lsst/rubintv/analysis/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from . import command, database, query
from . import command, database, query, utils
90 changes: 49 additions & 41 deletions python/lsst/rubintv/analysis/service/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,56 +26,64 @@
from websocket import WebSocketApp

from .command import DatabaseConnection, execute_command
from .utils import printc, Colors

logger = logging.getLogger("lsst.rubintv.analysis.service.client")


def on_error(ws: WebSocketApp, error: str) -> None:
"""Error received from the server."""
print(f"\033[91mError: {error}\033[0m")
class Worker:
def __init__(self, address: str, port: int, connection_info: dict[str, dict]):
self._address = address
self._port = port
self._connection_info = connection_info

def on_error(self, ws: WebSocketApp, error: str) -> None:
"""Error received from the server."""
printc(f"Error: {error}", color=Colors.BRIGHT_RED)

def on_close(ws: WebSocketApp, close_status_code: str, close_msg: str) -> None:
"""Connection closed by the server."""
print("\033[93mConnection closed\033[0m")
def on_close(self, ws: WebSocketApp, close_status_code: str, close_msg: str) -> None:
"""Connection closed by the server."""
printc("Connection closed", Colors.BRIGHT_YELLOW)

def run(self) -> None:
"""Run the worker and connect to the rubinTV server.
def run_worker(address: str, port: int, connection_info: dict[str, dict]) -> None:
"""Run the worker and connect to the rubinTV server.
Parameters
----------
address :
Address of the rubinTV web app.
port :
Port of the rubinTV web app websockets.
connection_info :
Connections .
"""
# Load the database connection information
databases: dict[str, DatabaseConnection] = {}

Parameters
----------
address :
Address of the rubinTV web app.
port :
Port of the rubinTV web app websockets.
connection_info :
Connections .
"""
# Load the database connection information
databases: dict[str, DatabaseConnection] = {}
for name, info in self._connection_info["databases"].items():
with open(info["schema"], "r") as file:
engine = sqlalchemy.create_engine(info["url"])
schema = yaml.safe_load(file)
databases[name] = DatabaseConnection(schema=schema, engine=engine)

for name, info in connection_info["databases"].items():
with open(info["schema"], "r") as file:
engine = sqlalchemy.create_engine(info["url"])
schema = yaml.safe_load(file)
databases[name] = DatabaseConnection(schema=schema, engine=engine)
# Load the Butler (if one is available)
butler: Butler | None = None
if "butler" in self._connection_info:
repo = self._connection_info["butler"].pop("repo")
butler = Butler(repo, **self._connection_info["butler"])

# Load the Butler (if one is available)
butler: Butler | None = None
if "butler" in connection_info:
repo = connection_info["butler"].pop("repo")
butler = Butler(repo, **connection_info["butler"])
def on_message(ws: WebSocketApp, message: str) -> None:
"""Message received from the server."""
response = execute_command(message, databases, butler)
ws.send(response)

def on_message(ws: WebSocketApp, message: str) -> None:
"""Message received from the server."""
response = execute_command(message, databases, butler)
ws.send(response)

print(f"\033[92mConnecting to rubinTV at {address}:{port}\033[0m")
# Connect to the WebSocket server
ws = WebSocketApp(
f"ws://{address}:{port}/ws/worker", on_message=on_message, on_error=on_error, on_close=on_close
)
ws.run_forever()
ws.close()
printc(f"Connecting to rubinTV at {self._address}:{self._port}", Colors.BRIGHT_GREEN)
# Connect to the WebSocket server
ws = WebSocketApp(
f"ws://{self._address}:{self._port}/ws/worker",
on_message=on_message,
on_error=self.on_error,
on_close=self.on_close,
)
ws.run_forever()
ws.close()
21 changes: 7 additions & 14 deletions python/lsst/rubintv/analysis/service/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class BaseCommand(ABC):
This should be unique for each command.
"""

command_registry = {}
result: dict | None = None
response_type: str

Expand All @@ -153,7 +154,7 @@ def build_contents(self, databases: dict[str, DatabaseConnection], butler: Butle
databases :
The database connections.
butler :
A conencted Butler.
A connected Butler.
Returns
-------
Expand Down Expand Up @@ -187,11 +188,7 @@ def to_json(self):
@classmethod
def register(cls, name: str):
"""Register a command."""
command_registry[name] = cls


# Registry of all commands
command_registry = {}
BaseCommand.command_registry[name] = cls


def execute_command(command_str: str, databases: dict[str, DatabaseConnection], butler: Butler | None) -> str:
Expand All @@ -212,7 +209,7 @@ def execute_command(command_str: str, databases: dict[str, DatabaseConnection],
databases :
The database connections.
butler :
A conencted Butler.
A connected Butler.
"""
try:
command_dict = json.loads(command_str)
Expand All @@ -226,15 +223,11 @@ def execute_command(command_str: str, databases: dict[str, DatabaseConnection],
if "name" not in command_dict.keys():
raise CommandParsingError("No command 'name' given")

if command_dict["name"] not in command_registry.keys():
if command_dict["name"] not in BaseCommand.command_registry.keys():
raise CommandParsingError(f"Unrecognized command '{command_dict['name']}'")

if "parameters" in command_dict:
parameters = command_dict["parameters"]
else:
parameters = {}

command = command_registry[command_dict["name"]](**parameters)
parameters = command_dict.get("parameters", {})
command = BaseCommand.command_registry[command_dict["name"]](**parameters)

except Exception as err:
logging.exception("Error parsing command.")
Expand Down
2 changes: 1 addition & 1 deletion python/lsst/rubintv/analysis/service/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def build_contents(self, databases: dict[str, DatabaseConnection], butler: Butle
engine=database.engine,
)

if len(data) == 0:
if not data:
# There is no column data to return
content: dict = {
"columns": columns,
Expand Down
29 changes: 15 additions & 14 deletions python/lsst/rubintv/analysis/service/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,24 @@ class ParentQuery(Query):
"""

def __init__(self, children: list[Query], operator: str):
self.children = children
self.operator = operator
self._children = children
self._operator = operator

def __call__(self, table: sqlalchemy.Table) -> sqlalchemy.sql.elements.BooleanClauseList:
child_results = [child(table) for child in self.children]
child_results = [child(table) for child in self._children]
try:
if self.operator == "AND":
return sqlalchemy.and_(*child_results)
if self.operator == "OR":
return sqlalchemy.or_(*child_results)
if self.operator == "NOT":
return sqlalchemy.not_(*child_results)
if self.operator == "XOR":
return sqlalchemy.and_(
sqlalchemy.or_(*child_results),
sqlalchemy.not_(sqlalchemy.and_(*child_results)),
)
match self._operator:
case "AND":
return sqlalchemy.and_(*child_results)
case "OR":
return sqlalchemy.or_(*child_results)
case "NOT":
return sqlalchemy.not_(*child_results)
case "XOR":
return sqlalchemy.and_(
sqlalchemy.or_(*child_results),
sqlalchemy.not_(sqlalchemy.and_(*child_results)),
)
except Exception:
raise QueryError("Error applying a boolean query statement.")

Expand Down
40 changes: 40 additions & 0 deletions python/lsst/rubintv/analysis/service/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from enum import Enum


# ANSI color codes for printing to the terminal
class Colors(Enum):
RESET = 0
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
DEFAULT = 39
BRIGHT_BLACK = 90
BRIGHT_RED = 91
BRIGHT_GREEN = 92
BRIGHT_YELLOW = 93
BRIGHT_BLUE = 94
BRIGHT_MAGENTA = 95
BRIGHT_CYAN = 96
BRIGHT_WHITE = 97


def printc(message: str, color: Colors, end_color: Colors = Colors.RESET):
"""Print a message to the terminal in color.
After printing reset the color by default.
Parameters
----------
message :
The message to print.
color :
The color to print the message in.
end :
The color future messages should be printed in.
"""
print(f"\033[{color.value}m{message}\033[{end_color.value}m")
Loading

0 comments on commit 7c4409e

Please sign in to comment.