Skip to content

Commit

Permalink
Merge pull request #9 from lsst-ts/tickets/DM-47882
Browse files Browse the repository at this point in the history
DM-47882: Refactor config file and add command logging
  • Loading branch information
fred3m authored Dec 5, 2024
2 parents 97820eb + bfb02ab commit b585a90
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 48 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: build_and_test

on:

Check warning on line 4 in .github/workflows/build.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build_docs.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: docs

on:

Check warning on line 4 in .github/workflows/build_docs.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/formatting.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: Check Python formatting

on:

Check warning on line 4 in .github/workflows/formatting.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: lint

on:

Check warning on line 4 in .github/workflows/lint.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/yamllint.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
name: Lint YAML Files

on:

Check warning on line 4 in .github/workflows/yamllint.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
1 change: 1 addition & 0 deletions doc/manifest.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
# Documentation manifest.

# List of names of Python modules in this package.
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/rubintv/analysis/service/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def build_join(self, table_names: set[str]) -> sqlalchemy.Table | sqlalchemy.Joi
return select_from


def _removeSchemaTable(schema, table_name):
def _remove_schema_table(schema, table_name):
"""Remove a table from the schema
We do this because the ConsDbSchema contains the tables and columns
Expand Down Expand Up @@ -293,7 +293,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template
# A new table was added to the schema and cannot be parsed
msg = f"Table {table['name']} has not been implemented in the RubinTV analysis service"
logger.warning(msg)
_removeSchemaTable(self.schema, table["name"])
_remove_schema_table(self.schema, table["name"])
else:
try:
self.tables[table["name"]] = sqlalchemy.Table(
Expand All @@ -306,7 +306,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template
# The table is in sdm_schemas but has not yet been added
# to the database.
logger.warning(f"Table {table['name']} from schema not found in database")
_removeSchemaTable(self.schema, table["name"])
_remove_schema_table(self.schema, table["name"])

self.joins = JoinBuilder(self.tables, join_templates)

Expand Down
2 changes: 2 additions & 0 deletions python/lsst/rubintv/analysis/service/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def run(self) -> None:

def on_message(ws: WebSocketApp, message: str) -> None:
"""Message received from the server."""
logger.info(f"Executing command: {message}")
response = execute_command(message, self.data_center)
logger.info("Sending response")
ws.send(response)

logger.connection(f"Connecting to rubinTV at {self._address}:{self._port}")
Expand Down
31 changes: 21 additions & 10 deletions scripts/config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
---
locations:
summit: "postgresdb01.cp.lsst.org"
usdf: "usdf-summitdb.slac.stanford.edu"
dev:
users_path: /sdf/home/f/fred3m/u/data/dev_users
consdb: usdf-summitdb.slac.stanford.edu
butlers:
- embargo
# efd_url: connection info here

Check warning on line 8 in scripts/config.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

8:5 [comments-indentation] comment not indented like content
summit:
users_path: /var/ddv-config
credentials_path: /etc/secrets/postgres-credentials.txt
consdb: postgresdb01.cp.lsst.org
butlers:
- embargo
# efd_url: connection info here

Check warning on line 15 in scripts/config.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

15:5 [comments-indentation] comment not indented like content
usdf:
users_path: /var/ddv-config
credentials_path: /etc/secrets/postgres-credentials.txt
consdb: usdf-summitdb.slac.stanford.edu
butlers:
- embargo
# efd_url: connection info here

Check warning on line 22 in scripts/config.yaml

View workflow job for this annotation

GitHub Actions / call-workflow / yamllint

22:5 [comments-indentation] comment not indented like content
schemas:
cdb_latiss: cdb_latiss.yaml
cdb_lsstcomcam: cdb_lsstcomcam.yaml
cdb_lsstcomcamsim: cdb_lsstcomcamsim.yaml
# cdb_lsstcam: cdb_lsstcam.yaml
repos:
- /repo/main
- embargo_or4
butlers:
- s3://embargo@rubin-summit-users/butler.yaml
#efd:
# url: connection info here
# cdb_lsstcam: cdb_lsstcam.yaml
2 changes: 1 addition & 1 deletion scripts/joins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ joins:
visit1:
- visit_id
ccdvisit1:
#- visit_id
# - visit_id
- exposure_id

# visit1 and visit1_quicklook
Expand Down
99 changes: 65 additions & 34 deletions scripts/rubintv_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import logging
import os
import pathlib
from typing import Any

import sqlalchemy
import yaml
Expand All @@ -37,18 +38,56 @@
default_joins = os.path.join(pathlib.Path(__file__).parent.absolute(), "joins.yaml")
logger = logging.getLogger("lsst.rubintv.analysis.server.worker")
sdm_schemas_path = os.path.join(os.path.expandvars("$SDM_SCHEMAS_DIR"), "yml")
prod_credentials_path = os.path.join("/etc/secrets", "postgres-credentials.txt")
test_credentials_path = os.path.join(os.path.expanduser("~"), ".lsst", "postgres-credentials.txt")
summit_users_path = "/var/ddv-config"
usdf_users_path = "/var/ddv-config"
dev_users_path = "/sdf/home/f/fred3m/u/data/dev_users"


class UniversalToVisit(DataMatch):
def get_join(self):
return


class LocationConfig:
"""Location based configuration for the worker.
Attributes
----------
users_path : str
Path to the user file.
credentials_path : str
Path to the credentials file.
consdb : str
The name of the consdb to connect to.
butlers : dict[str, dict[str, str]]
A dictionary of butler configurations.
"""

users_path: str
credentials_path: str
consdb: str
butlers: list[str]
schemas: dict[str, str]
efd_url: str | None

def __init__(self, location: str, yaml_config: dict[str, Any]):
config = yaml_config["locations"][location]

# Set location-specific attributes
self.users_path = config["users_path"]
try:
self.credentials_path = config["credentials_path"]
except KeyError:
self.credentials_path = test_credentials_path
self.consdb = config["consdb"]
self.butlers = config["butlers"]
try:
self.efd_url = config["efd_url"]
except KeyError:
self.efd_url = None

# Set other attributes
self.schemas = yaml_config["schemas"]


def main():
parser = argparse.ArgumentParser(description="Initialize a new RubinTV worker.")
parser.add_argument(
Expand Down Expand Up @@ -82,10 +121,14 @@ def main():
"--database",
default="exposurelog",
help="The name of the database to connect to."
"It will likely never need to be changed, but including it as an option just in case.",
"It will likely never need to be changed, but including it as an option just in case.",
)
args = parser.parse_args()

# Ensure that the location is valid
if args.location.lower() not in ["summit", "usdf", "dev"]:
raise ValueError(f"Invalid location: {args.location}, must be either 'summit', 'usdf' or 'dev'")

# Configure logging for all modules
log_level = getattr(logging, args.log_all.upper(), None)
if not isinstance(log_level, int):
Expand All @@ -112,67 +155,55 @@ def main():
# Load the configuration and join files
logger.info("Loading config")
with open(args.config, "r") as file:
config = yaml.safe_load(file)
config = LocationConfig(args.location.lower(), yaml.safe_load(file))
with open(args.joins, "r") as file:
joins = yaml.safe_load(file)["joins"]

# Set the database URL based on the location
logger.info("Connecting to the database")
server = ""
if args.location.lower() == "summit":
server = config["locations"]["summit"]
credentials_path = prod_credentials_path
user_path = summit_users_path
elif args.location.lower() == "usdf":
server = config["locations"]["usdf"]
credentials_path = prod_credentials_path
user_path = usdf_users_path
elif args.location.lower() == "dev":
server = config["locations"]["usdf"]
credentials_path = test_credentials_path
user_path = dev_users_path
else:
raise ValueError(f"Invalid location: {args.location}, must be either 'summit' or 'usdf'")

with open(credentials_path, "r") as file:
with open(config.credentials_path, "r") as file:
credentials = file.readlines()
for credential in credentials:
_server, _, database, user, password = credential.split(":")
if _server == server and database == args.database:
if _server == config.consdb and database == args.database:
password = password.strip()
break
else:
raise ValueError(f"Could not find credentials for {server}")
database_url = f"postgresql://{user}:{password}@{server}/{database}"
raise ValueError(f"Could not find credentials for {config.consdb} and {args.database}")
database_url = f"postgresql://{user}:{password}@{config.consdb}/{database}"
engine = sqlalchemy.create_engine(database_url)

# Initialize the data center that provides access to various data sources
schemas: dict[str, ConsDbSchema] = {}

for name, filename in config["schemas"].items():
for name, filename in config.schemas.items():
full_path = os.path.join(sdm_schemas_path, filename)
with open(full_path, "r") as file:
schema = yaml.safe_load(file)
schemas[name] = ConsDbSchema(schema=schema, engine=engine, join_templates=joins)

# Load the Butler (if one is available)
butlers: dict[str, Butler] | None = None
if "butlers" in config:
logger.info("Connecting to Butlers")
butlers = {}
for repo in config["butlers"]:
butlers[repo] = Butler(repo) # type: ignore
logger.info("Connecting to Butlers")
butlers = {}
for repo in config.butlers:
try:
butlers[repo] = Butler(repo)
except Exception as e:
logger.error(f"Failed to connect to butler {repo}: {e}")

# Load the EFD client (if one is available)
efd_client: EfdClient | None = None
if "efd" in config:
if config.efd_url is not None:
logger.info("Connecting to EFD")
raise NotImplementedError("EFD client not yet implemented")

# Create the DataCenter that keeps track of all data sources.
# This will have to be updated every time we want to
# change/add a new data source.
data_center = DataCenter(schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=user_path)
data_center = DataCenter(
schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=config.users_path
)

# Run the client and connect to rubinTV via websockets
logger.info("Initializing worker")
Expand Down

0 comments on commit b585a90

Please sign in to comment.