Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47882: Refactor config file and add command logging #9

Merged
merged 6 commits into from
Dec 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: build_and_test

on:

Check warning on line 4 in .github/workflows/build.yaml

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
1 change: 1 addition & 0 deletions .github/workflows/build_docs.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: docs

on:

Check warning on line 4 in .github/workflows/build_docs.yaml

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
1 change: 1 addition & 0 deletions .github/workflows/formatting.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: Check Python formatting

on:

Check warning on line 4 in .github/workflows/formatting.yaml

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
1 change: 1 addition & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: lint

on:

Check warning on line 4 in .github/workflows/lint.yaml

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
1 change: 1 addition & 0 deletions .github/workflows/yamllint.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---
name: Lint YAML Files

on:

Check warning on line 4 in .github/workflows/yamllint.yaml

GitHub Actions / call-workflow / yamllint

4:1 [truthy] truthy value should be one of [false, true]
push:
branches:
- main
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
1 change: 1 addition & 0 deletions doc/manifest.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
# Documentation manifest.

# List of names of Python modules in this package.
6 changes: 3 additions & 3 deletions python/lsst/rubintv/analysis/service/database.py
Original file line number Diff line number Diff line change
@@ -228,7 +228,7 @@ def build_join(self, table_names: set[str]) -> sqlalchemy.Table | sqlalchemy.Joi
return select_from


def _removeSchemaTable(schema, table_name):
def _remove_schema_table(schema, table_name):
"""Remove a table from the schema

We do this because the ConsDbSchema contains the tables and columns
@@ -293,7 +293,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template
# A new table was added to the schema and cannot be parsed
msg = f"Table {table['name']} has not been implemented in the RubinTV analysis service"
logger.warning(msg)
_removeSchemaTable(self.schema, table["name"])
_remove_schema_table(self.schema, table["name"])
else:
try:
self.tables[table["name"]] = sqlalchemy.Table(
@@ -306,7 +306,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template
# The table is in sdm_schemas but has not yet been added
# to the database.
logger.warning(f"Table {table['name']} from schema not found in database")
_removeSchemaTable(self.schema, table["name"])
_remove_schema_table(self.schema, table["name"])

self.joins = JoinBuilder(self.tables, join_templates)

2 changes: 2 additions & 0 deletions python/lsst/rubintv/analysis/service/worker.py
Original file line number Diff line number Diff line change
@@ -82,7 +82,9 @@ def run(self) -> None:

def on_message(ws: WebSocketApp, message: str) -> None:
"""Message received from the server."""
logger.info(f"Executing command: {message}")
response = execute_command(message, self.data_center)
logger.info("Sending response")
ws.send(response)

logger.connection(f"Connecting to rubinTV at {self._address}:{self._port}")
31 changes: 21 additions & 10 deletions scripts/config.yaml
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
---
locations:
summit: "postgresdb01.cp.lsst.org"
usdf: "usdf-summitdb.slac.stanford.edu"
dev:
users_path: /sdf/home/f/fred3m/u/data/dev_users
consdb: usdf-summitdb.slac.stanford.edu
butlers:
- embargo
# efd_url: connection info here

Check warning on line 8 in scripts/config.yaml

GitHub Actions / call-workflow / yamllint

8:5 [comments-indentation] comment not indented like content
summit:
users_path: /var/ddv-config
credentials_path: /etc/secrets/postgres-credentials.txt
consdb: postgresdb01.cp.lsst.org
butlers:
- embargo
# efd_url: connection info here

Check warning on line 15 in scripts/config.yaml

GitHub Actions / call-workflow / yamllint

15:5 [comments-indentation] comment not indented like content
usdf:
users_path: /var/ddv-config
credentials_path: /etc/secrets/postgres-credentials.txt
consdb: usdf-summitdb.slac.stanford.edu
butlers:
- embargo
# efd_url: connection info here

Check warning on line 22 in scripts/config.yaml

GitHub Actions / call-workflow / yamllint

22:5 [comments-indentation] comment not indented like content
schemas:
cdb_latiss: cdb_latiss.yaml
cdb_lsstcomcam: cdb_lsstcomcam.yaml
cdb_lsstcomcamsim: cdb_lsstcomcamsim.yaml
# cdb_lsstcam: cdb_lsstcam.yaml
repos:
- /repo/main
- embargo_or4
butlers:
- s3://embargo@rubin-summit-users/butler.yaml
#efd:
# url: connection info here
# cdb_lsstcam: cdb_lsstcam.yaml
2 changes: 1 addition & 1 deletion scripts/joins.yaml
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ joins:
visit1:
- visit_id
ccdvisit1:
#- visit_id
# - visit_id
- exposure_id

# visit1 and visit1_quicklook
99 changes: 65 additions & 34 deletions scripts/rubintv_worker.py
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import logging
import os
import pathlib
from typing import Any

import sqlalchemy
import yaml
@@ -37,18 +38,56 @@
default_joins = os.path.join(pathlib.Path(__file__).parent.absolute(), "joins.yaml")
logger = logging.getLogger("lsst.rubintv.analysis.server.worker")
sdm_schemas_path = os.path.join(os.path.expandvars("$SDM_SCHEMAS_DIR"), "yml")
prod_credentials_path = os.path.join("/etc/secrets", "postgres-credentials.txt")
test_credentials_path = os.path.join(os.path.expanduser("~"), ".lsst", "postgres-credentials.txt")
summit_users_path = "/var/ddv-config"
usdf_users_path = "/var/ddv-config"
dev_users_path = "/sdf/home/f/fred3m/u/data/dev_users"


class UniversalToVisit(DataMatch):
def get_join(self):
return


class LocationConfig:
"""Location based configuration for the worker.

Attributes
----------
users_path : str
Path to the user file.
credentials_path : str
Path to the credentials file.
consdb : str
The name of the consdb to connect to.
butlers : dict[str, dict[str, str]]
A dictionary of butler configurations.
"""

users_path: str
credentials_path: str
consdb: str
butlers: list[str]
schemas: dict[str, str]
efd_url: str | None

def __init__(self, location: str, yaml_config: dict[str, Any]):
config = yaml_config["locations"][location]

# Set location-specific attributes
self.users_path = config["users_path"]
try:
self.credentials_path = config["credentials_path"]
except KeyError:
self.credentials_path = test_credentials_path
self.consdb = config["consdb"]
self.butlers = config["butlers"]
try:
self.efd_url = config["efd_url"]
except KeyError:
self.efd_url = None

# Set other attributes
self.schemas = yaml_config["schemas"]


def main():
parser = argparse.ArgumentParser(description="Initialize a new RubinTV worker.")
parser.add_argument(
@@ -82,10 +121,14 @@ def main():
"--database",
default="exposurelog",
help="The name of the database to connect to."
"It will likely never need to be changed, but including it as an option just in case.",
"It will likely never need to be changed, but including it as an option just in case.",
)
args = parser.parse_args()

# Ensure that the location is valid
if args.location.lower() not in ["summit", "usdf", "dev"]:
raise ValueError(f"Invalid location: {args.location}, must be either 'summit', 'usdf' or 'dev'")

# Configure logging for all modules
log_level = getattr(logging, args.log_all.upper(), None)
if not isinstance(log_level, int):
@@ -112,67 +155,55 @@ def main():
# Load the configuration and join files
logger.info("Loading config")
with open(args.config, "r") as file:
config = yaml.safe_load(file)
config = LocationConfig(args.location.lower(), yaml.safe_load(file))
with open(args.joins, "r") as file:
joins = yaml.safe_load(file)["joins"]

# Set the database URL based on the location
logger.info("Connecting to the database")
server = ""
if args.location.lower() == "summit":
server = config["locations"]["summit"]
credentials_path = prod_credentials_path
user_path = summit_users_path
elif args.location.lower() == "usdf":
server = config["locations"]["usdf"]
credentials_path = prod_credentials_path
user_path = usdf_users_path
elif args.location.lower() == "dev":
server = config["locations"]["usdf"]
credentials_path = test_credentials_path
user_path = dev_users_path
else:
raise ValueError(f"Invalid location: {args.location}, must be either 'summit' or 'usdf'")

with open(credentials_path, "r") as file:
with open(config.credentials_path, "r") as file:
credentials = file.readlines()
for credential in credentials:
_server, _, database, user, password = credential.split(":")
if _server == server and database == args.database:
if _server == config.consdb and database == args.database:
password = password.strip()
break
else:
raise ValueError(f"Could not find credentials for {server}")
database_url = f"postgresql://{user}:{password}@{server}/{database}"
raise ValueError(f"Could not find credentials for {config.consdb} and {args.database}")
database_url = f"postgresql://{user}:{password}@{config.consdb}/{database}"
engine = sqlalchemy.create_engine(database_url)

# Initialize the data center that provides access to various data sources
schemas: dict[str, ConsDbSchema] = {}

for name, filename in config["schemas"].items():
for name, filename in config.schemas.items():
full_path = os.path.join(sdm_schemas_path, filename)
with open(full_path, "r") as file:
schema = yaml.safe_load(file)
schemas[name] = ConsDbSchema(schema=schema, engine=engine, join_templates=joins)

# Load the Butler (if one is available)
butlers: dict[str, Butler] | None = None
if "butlers" in config:
logger.info("Connecting to Butlers")
butlers = {}
for repo in config["butlers"]:
butlers[repo] = Butler(repo) # type: ignore
logger.info("Connecting to Butlers")
butlers = {}
for repo in config.butlers:
try:
butlers[repo] = Butler(repo)
except Exception as e:
logger.error(f"Failed to connect to butler {repo}: {e}")

# Load the EFD client (if one is available)
efd_client: EfdClient | None = None
if "efd" in config:
if config.efd_url is not None:
logger.info("Connecting to EFD")
raise NotImplementedError("EFD client not yet implemented")

# Create the DataCenter that keeps track of all data sources.
# This will have to be updated every time we want to
# change/add a new data source.
data_center = DataCenter(schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=user_path)
data_center = DataCenter(
schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=config.users_path
)

# Run the client and connect to rubinTV via websockets
logger.info("Initializing worker")
Loading