diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0055df1..b2f29ea 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,3 +1,4 @@ +--- name: build_and_test on: diff --git a/.github/workflows/build_docs.yaml b/.github/workflows/build_docs.yaml index e2c05f7..353bdc4 100644 --- a/.github/workflows/build_docs.yaml +++ b/.github/workflows/build_docs.yaml @@ -1,3 +1,4 @@ +--- name: docs on: diff --git a/.github/workflows/formatting.yaml b/.github/workflows/formatting.yaml index 27f34a6..363a03b 100644 --- a/.github/workflows/formatting.yaml +++ b/.github/workflows/formatting.yaml @@ -1,3 +1,4 @@ +--- name: Check Python formatting on: diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 796ef92..45dc027 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -1,3 +1,4 @@ +--- name: lint on: diff --git a/.github/workflows/yamllint.yaml b/.github/workflows/yamllint.yaml index 76ad875..aec968a 100644 --- a/.github/workflows/yamllint.yaml +++ b/.github/workflows/yamllint.yaml @@ -1,3 +1,4 @@ +--- name: Lint YAML Files on: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a07ff8a..a8166cf 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,4 @@ +--- repos: - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/doc/manifest.yaml b/doc/manifest.yaml index 222eb97..bebe911 100644 --- a/doc/manifest.yaml +++ b/doc/manifest.yaml @@ -1,3 +1,4 @@ +--- # Documentation manifest. # List of names of Python modules in this package. diff --git a/python/lsst/rubintv/analysis/service/database.py b/python/lsst/rubintv/analysis/service/database.py index 93bcb0a..320aa27 100644 --- a/python/lsst/rubintv/analysis/service/database.py +++ b/python/lsst/rubintv/analysis/service/database.py @@ -228,7 +228,7 @@ def build_join(self, table_names: set[str]) -> sqlalchemy.Table | sqlalchemy.Joi return select_from -def _removeSchemaTable(schema, table_name): +def _remove_schema_table(schema, table_name): """Remove a table from the schema We do this because the ConsDbSchema contains the tables and columns @@ -293,7 +293,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template # A new table was added to the schema and cannot be parsed msg = f"Table {table['name']} has not been implemented in the RubinTV analysis service" logger.warning(msg) - _removeSchemaTable(self.schema, table["name"]) + _remove_schema_table(self.schema, table["name"]) else: try: self.tables[table["name"]] = sqlalchemy.Table( @@ -306,7 +306,7 @@ def __init__(self, engine: sqlalchemy.engine.Engine, schema: dict, join_template # The table is in sdm_schemas but has not yet been added # to the database. logger.warning(f"Table {table['name']} from schema not found in database") - _removeSchemaTable(self.schema, table["name"]) + _remove_schema_table(self.schema, table["name"]) self.joins = JoinBuilder(self.tables, join_templates) diff --git a/python/lsst/rubintv/analysis/service/worker.py b/python/lsst/rubintv/analysis/service/worker.py index 8cfdf67..01b1e06 100644 --- a/python/lsst/rubintv/analysis/service/worker.py +++ b/python/lsst/rubintv/analysis/service/worker.py @@ -82,7 +82,9 @@ def run(self) -> None: def on_message(ws: WebSocketApp, message: str) -> None: """Message received from the server.""" + logger.info(f"Executing command: {message}") response = execute_command(message, self.data_center) + logger.info("Sending response") ws.send(response) logger.connection(f"Connecting to rubinTV at {self._address}:{self._port}") diff --git a/scripts/config.yaml b/scripts/config.yaml index c99b980..11eb2be 100644 --- a/scripts/config.yaml +++ b/scripts/config.yaml @@ -1,16 +1,27 @@ --- locations: - summit: "postgresdb01.cp.lsst.org" - usdf: "usdf-summitdb.slac.stanford.edu" + dev: + users_path: /sdf/home/f/fred3m/u/data/dev_users + consdb: usdf-summitdb.slac.stanford.edu + butlers: + - embargo + # efd_url: connection info here + summit: + users_path: /var/ddv-config + credentials_path: /etc/secrets/postgres-credentials.txt + consdb: postgresdb01.cp.lsst.org + butlers: + - embargo + # efd_url: connection info here + usdf: + users_path: /var/ddv-config + credentials_path: /etc/secrets/postgres-credentials.txt + consdb: usdf-summitdb.slac.stanford.edu + butlers: + - embargo + # efd_url: connection info here schemas: cdb_latiss: cdb_latiss.yaml cdb_lsstcomcam: cdb_lsstcomcam.yaml cdb_lsstcomcamsim: cdb_lsstcomcamsim.yaml -# cdb_lsstcam: cdb_lsstcam.yaml -repos: - - /repo/main - - embargo_or4 -butlers: - - s3://embargo@rubin-summit-users/butler.yaml -#efd: -# url: connection info here + # cdb_lsstcam: cdb_lsstcam.yaml diff --git a/scripts/joins.yaml b/scripts/joins.yaml index 0111071..0604817 100644 --- a/scripts/joins.yaml +++ b/scripts/joins.yaml @@ -54,7 +54,7 @@ joins: visit1: - visit_id ccdvisit1: - #- visit_id + # - visit_id - exposure_id # visit1 and visit1_quicklook diff --git a/scripts/rubintv_worker.py b/scripts/rubintv_worker.py index 2ed3139..2a8aad3 100644 --- a/scripts/rubintv_worker.py +++ b/scripts/rubintv_worker.py @@ -23,6 +23,7 @@ import logging import os import pathlib +from typing import Any import sqlalchemy import yaml @@ -37,11 +38,7 @@ default_joins = os.path.join(pathlib.Path(__file__).parent.absolute(), "joins.yaml") logger = logging.getLogger("lsst.rubintv.analysis.server.worker") sdm_schemas_path = os.path.join(os.path.expandvars("$SDM_SCHEMAS_DIR"), "yml") -prod_credentials_path = os.path.join("/etc/secrets", "postgres-credentials.txt") test_credentials_path = os.path.join(os.path.expanduser("~"), ".lsst", "postgres-credentials.txt") -summit_users_path = "/var/ddv-config" -usdf_users_path = "/var/ddv-config" -dev_users_path = "/sdf/home/f/fred3m/u/data/dev_users" class UniversalToVisit(DataMatch): @@ -49,6 +46,48 @@ def get_join(self): return +class LocationConfig: + """Location based configuration for the worker. + + Attributes + ---------- + users_path : str + Path to the user file. + credentials_path : str + Path to the credentials file. + consdb : str + The name of the consdb to connect to. + butlers : dict[str, dict[str, str]] + A dictionary of butler configurations. + """ + + users_path: str + credentials_path: str + consdb: str + butlers: list[str] + schemas: dict[str, str] + efd_url: str | None + + def __init__(self, location: str, yaml_config: dict[str, Any]): + config = yaml_config["locations"][location] + + # Set location-specific attributes + self.users_path = config["users_path"] + try: + self.credentials_path = config["credentials_path"] + except KeyError: + self.credentials_path = test_credentials_path + self.consdb = config["consdb"] + self.butlers = config["butlers"] + try: + self.efd_url = config["efd_url"] + except KeyError: + self.efd_url = None + + # Set other attributes + self.schemas = yaml_config["schemas"] + + def main(): parser = argparse.ArgumentParser(description="Initialize a new RubinTV worker.") parser.add_argument( @@ -82,10 +121,14 @@ def main(): "--database", default="exposurelog", help="The name of the database to connect to." - "It will likely never need to be changed, but including it as an option just in case.", + "It will likely never need to be changed, but including it as an option just in case.", ) args = parser.parse_args() + # Ensure that the location is valid + if args.location.lower() not in ["summit", "usdf", "dev"]: + raise ValueError(f"Invalid location: {args.location}, must be either 'summit', 'usdf' or 'dev'") + # Configure logging for all modules log_level = getattr(logging, args.log_all.upper(), None) if not isinstance(log_level, int): @@ -112,67 +155,55 @@ def main(): # Load the configuration and join files logger.info("Loading config") with open(args.config, "r") as file: - config = yaml.safe_load(file) + config = LocationConfig(args.location.lower(), yaml.safe_load(file)) with open(args.joins, "r") as file: joins = yaml.safe_load(file)["joins"] # Set the database URL based on the location logger.info("Connecting to the database") - server = "" - if args.location.lower() == "summit": - server = config["locations"]["summit"] - credentials_path = prod_credentials_path - user_path = summit_users_path - elif args.location.lower() == "usdf": - server = config["locations"]["usdf"] - credentials_path = prod_credentials_path - user_path = usdf_users_path - elif args.location.lower() == "dev": - server = config["locations"]["usdf"] - credentials_path = test_credentials_path - user_path = dev_users_path - else: - raise ValueError(f"Invalid location: {args.location}, must be either 'summit' or 'usdf'") - with open(credentials_path, "r") as file: + with open(config.credentials_path, "r") as file: credentials = file.readlines() for credential in credentials: _server, _, database, user, password = credential.split(":") - if _server == server and database == args.database: + if _server == config.consdb and database == args.database: password = password.strip() break else: - raise ValueError(f"Could not find credentials for {server}") - database_url = f"postgresql://{user}:{password}@{server}/{database}" + raise ValueError(f"Could not find credentials for {config.consdb} and {args.database}") + database_url = f"postgresql://{user}:{password}@{config.consdb}/{database}" engine = sqlalchemy.create_engine(database_url) # Initialize the data center that provides access to various data sources schemas: dict[str, ConsDbSchema] = {} - for name, filename in config["schemas"].items(): + for name, filename in config.schemas.items(): full_path = os.path.join(sdm_schemas_path, filename) with open(full_path, "r") as file: schema = yaml.safe_load(file) schemas[name] = ConsDbSchema(schema=schema, engine=engine, join_templates=joins) # Load the Butler (if one is available) - butlers: dict[str, Butler] | None = None - if "butlers" in config: - logger.info("Connecting to Butlers") - butlers = {} - for repo in config["butlers"]: - butlers[repo] = Butler(repo) # type: ignore + logger.info("Connecting to Butlers") + butlers = {} + for repo in config.butlers: + try: + butlers[repo] = Butler(repo) + except Exception as e: + logger.error(f"Failed to connect to butler {repo}: {e}") # Load the EFD client (if one is available) efd_client: EfdClient | None = None - if "efd" in config: + if config.efd_url is not None: logger.info("Connecting to EFD") raise NotImplementedError("EFD client not yet implemented") # Create the DataCenter that keeps track of all data sources. # This will have to be updated every time we want to # change/add a new data source. - data_center = DataCenter(schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=user_path) + data_center = DataCenter( + schemas=schemas, butlers=butlers, efd_client=efd_client, user_path=config.users_path + ) # Run the client and connect to rubinTV via websockets logger.info("Initializing worker")