From 3ed31fe3a9332ad0ed49b79f9eea4530be8f3440 Mon Sep 17 00:00:00 2001 From: Lisa Stillwell Date: Tue, 31 Oct 2023 16:46:55 -0400 Subject: [PATCH] pull observations from the new apsviz gauges db --- apsviz_db.py | 56 +++------------ apsviz_gauges_db.py | 82 ++++++++-------------- asgs_db.py | 147 +++------------------------------------ base_db.py | 60 ++++++++++++++++ load-geoserver-images.py | 45 ++++++++---- terria_catalogV8DB.py | 2 +- 6 files changed, 142 insertions(+), 250 deletions(-) create mode 100644 base_db.py diff --git a/apsviz_db.py b/apsviz_db.py index 7f8eb87..f8e5ae2 100644 --- a/apsviz_db.py +++ b/apsviz_db.py @@ -1,64 +1,24 @@ import os, sys -import logging +import base_db import psycopg2 import json from common.logging import LoggingUtil from urllib.parse import urlparse -class APSVIZ_DB: +class APSVIZ_DB(base_db): # dbname looks like this: 'asgs_dashboard' # instance_id looks like this: '2744-2021050618-namforecast' def __init__(self, logger): - self.conn = None - self.logger = logger - self.user = os.getenv('APSVIZ_DB_USERNAME', 'user').strip() - self.pswd = os.getenv('APSVIZ_DB_PASSWORD', 'password').strip() - self.db_name = os.getenv('APSVIZ_DB_DATABASE', 'database').strip() - self.host = os.getenv('ASGS_DB_HOST', 'host').strip() - self.port = os.getenv('ASGS_DB_PORT', '5432').strip() + user = os.getenv('APSVIZ_DB_USERNAME', 'user').strip() + pswd = os.getenv('APSVIZ_DB_PASSWORD', 'password').strip() + db_name = os.getenv('APSVIZ_DB_DATABASE', 'database').strip() + host = os.getenv('APSVIZ_DB_HOST', 'host').strip() + port = os.getenv('APSVIZ_DB_PORT', '5432').strip() - try: - # connect to asgs database - conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}' - - self.conn = psycopg2.connect(conn_str) - self.conn.set_session(autocommit=True) - self.cursor = self.conn.cursor() - except: - e = sys.exc_info()[0] - self.logger.error(f"FAILURE - Cannot connect to APSVIZ DB. error {e}") - - def __del__(self): - """ - close up the DB - :return: - """ - try: - if self.cursor is not None: - self.cursor.close() - if self.conn is not None: - self.conn.close() - except Exception as e: - self.logger.error(f'Error detected closing cursor or connection. {e}') - #sys.exc_info()[0] - - def get_user(self): - return self.user - - def get_password(self): - return self.pswd - - def get_host(self): - return self.host - - def get_port(self): - return self.port - - def get_dbname(self): - return self.db_name + super().__init__(logger, user, pswd, db_name, host, port) def find_cat_group(self, date_str): exists = False diff --git a/apsviz_gauges_db.py b/apsviz_gauges_db.py index 96427f8..bc28ea6 100644 --- a/apsviz_gauges_db.py +++ b/apsviz_gauges_db.py @@ -1,23 +1,20 @@ import os, sys -import psycopg2 +import base_db import csv -from common.logging import LoggingUtil -from urllib.parse import urlparse +class APSVIZ_GAUGES_DB(base_db): -class APSVIZ_GAUGES_DB: - - # dbname looks like this: 'asgs_dashboard' + # dbname looks like this: 'apsviz_gauges' # instance_id looks like this: '2744-2021050618-namforecast' def __init__(self, logger, instance_id): - self.conn = None - self.logger = logger - self.user = os.getenv('APSVIZ_GAUGES_DB_USERNAME', 'user').strip() - self.pswd = os.getenv('APSVIZ_GAUGES_DB_PASSWORD', 'password').strip() - self.host = os.getenv('APSVIZ_GAUGES_DB_HOST', 'host').strip() - self.port = os.getenv('APSVIZ_GAUGES_DB_PORT', '5432').strip() - self.db_name = os.getenv('APSVIZ_GAUGES_DB_DATABASE', '5432').strip() + user = os.getenv('APSVIZ_GAUGES_DB_USERNAME', 'user').strip() + pswd = os.getenv('APSVIZ_GAUGES_DB_PASSWORD', 'password').strip() + host = os.getenv('APSVIZ_GAUGES_DB_HOST', 'host').strip() + port = os.getenv('APSVIZ_GAUGES_DB_PORT', '5432').strip() + db_name = os.getenv('APSVIZ_GAUGES_DB_DATABASE', '5432').strip() + + super().__init__(logger, user, pswd, db_name, host, port) # save whole Id self.instanceId = instance_id @@ -30,45 +27,6 @@ def __init__(self, logger, instance_id): self.instance = parts[0] self.uid = parts[1] - try: - # connect to asgs database - conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}' - - self.conn = psycopg2.connect(conn_str) - self.conn.set_session(autocommit=True) - self.cursor = self.conn.cursor() - except: - e = sys.exc_info()[0] - self.logger.error(f"FAILURE - Cannot connect to APSVIZ-GAUGES DB. error {e}") - - def __del__(self): - """ - close up the DB - :return: - """ - try: - if self.cursor is not None: - self.cursor.close() - if self.conn is not None: - self.conn.close() - except Exception as e: - self.logger.error(f'Error detected closing cursor or connection. {e}') - #sys.exc_info()[0] - - def get_user(self): - return self.user - - def get_password(self): - return self.pswd - - def get_host(self): - return self.host - - def get_port(self): - return self.port - - def get_dbname(self): - return self.db_name # find the stationProps.csv file and insert the contents # into the adcirc_obs db of the ASGS postgres instance @@ -123,6 +81,26 @@ def insert_station_props(self, logger, geo, worksp, csv_file_path, geoserver_hos self.conn.commit() + def isObsRun(self): + + found = False + + try: + sql_stmt = 'SELECT model_run_id FROM drf_apsviz_station WHERE model_run_id=%s' + params = [self.instance] + self.logger.debug(f"sql statement is: {sql_stmt} params are: {params}") + self.cursor.execute(sql_stmt, params) + ret = self.cursor.fetchone() + if ret: + self.logger.debug(f"value returned is: {ret}") + found = True + + except: + e = sys.exc_info()[0] + self.logger.error(f"FAILURE - Cannot retrieve instance id from {self.dbname}. error {e}") + finally: + return found + @staticmethod def valid_csv_row(header: list, row: list, optional=None) -> str: """ diff --git a/asgs_db.py b/asgs_db.py index 3e2f94d..7a4f7b7 100644 --- a/asgs_db.py +++ b/asgs_db.py @@ -1,24 +1,20 @@ import os, sys -import logging -import psycopg2 -import csv +import base_db -from common.logging import LoggingUtil -from urllib.parse import urlparse - -class ASGS_DB: +class ASGS_DB(base_db): # dbname looks like this: 'asgs_dashboard' # instance_id looks like this: '2744-2021050618-namforecast' - def __init__(self, logger, dbname, instance_id): - self.conn = None - self.logger = logger + def __init__(self, logger, instance_id): + + user = os.getenv('ASGS_DB_USERNAME', 'user').strip() + pswd = os.getenv('ASGS_DB_PASSWORD', 'password').strip() + host = os.getenv('ASGS_DB_HOST', 'host').strip() + port = os.getenv('ASGS_DB_PORT', '5432').strip() + db_name = os.getenv('ASGS_DB_DATABASE', 'db').strip() + + super().__init__(logger, user, pswd, db_name, host, port) - self.user = os.getenv('ASGS_DB_USERNAME', 'user').strip() - self.pswd = os.getenv('ASGS_DB_PASSWORD', 'password').strip() - self.host = os.getenv('ASGS_DB_HOST', 'host').strip() - self.port = os.getenv('ASGS_DB_PORT', '5432').strip() - self.db_name = dbname # save whole Id self.instanceId = instance_id @@ -31,46 +27,6 @@ def __init__(self, logger, dbname, instance_id): self.instance = parts[0] self.uid = parts[1] - try: - # connect to asgs database - conn_str = f'host={self.host} port={self.port} dbname={self.db_name} user={self.user} password={self.pswd}' - - self.conn = psycopg2.connect(conn_str) - self.conn.set_session(autocommit=True) - self.cursor = self.conn.cursor() - except: - e = sys.exc_info()[0] - self.logger.error(f"FAILURE - Cannot connect to ASGS_DB. error {e}") - - def __del__(self): - """ - close up the DB - :return: - """ - try: - if self.cursor is not None: - self.cursor.close() - if self.conn is not None: - self.conn.close() - except Exception as e: - self.logger.error(f'Error detected closing cursor or connection. {e}') - #sys.exc_info()[0] - - def get_user(self): - return self.user - - def get_password(self): - return self.pswd - - def get_host(self): - return self.host - - def get_port(self): - return self.port - - def get_dbname(self): - return self.db_name - # given instance id - save geoserver url (to access this mbtiles layer) in the asgs database def saveImageURL(self, name, url): @@ -133,84 +89,3 @@ def getRunMetadata(self): metadata_dict['suite.project_code'] = 'asgs' return metadata_dict - # find the stationProps.csv file and insert the contents - # into the adcirc_obs db of the ASGS postgres instance - def insert_station_props(self, logger, geo, worksp, csv_file_path, geoserver_host): - - # where to find the stationProps.csv file - logger.info(f"Saving {csv_file_path} to DB") - logger.debug(f"DB name is: {self.get_dbname()}") - - # get the image server host name - host = os.environ.get('FILESERVER_HOST_URL', 'none').strip() - # need to remove the .edc from the geoserver_host for now - 7/18/22 - this no longer apllies for k8s runs - #if (host == 'none'): - #host = geoserver_host.replace('.edc', '') - - # open the stationProps.csv file and save in db - # must create the_geom from lat, lon provided in csv file - # also add to instance id column - # and finally, create an url where the obs chart for each station can be accessed - #try: catch this exception in calling program instead - # header of stationProps.csv looks like this: - # StationId,StationName,Source,State,Lat,Lon,Node,Filename,Type - with open(csv_file_path, 'r') as f: - reader = csv.reader(f) - header = next(reader) # Skip the header row. - for index, row in enumerate(reader): - try: - # check the row. columns that have missing data are returned - no_cols_data_msg: str = self.valid_csv_row(header, row, [5]) - - # if there was missing data log it - if no_cols_data_msg: - # log the failed columns - logger.error("Row %s had missing column data. Columns:", index+2, no_cols_data_msg) - - # no need to process this row - continue - - logger.debug(f"opened csv file - saving this row to db: {row}") - filename = os.path.basename(row[7]) - png_url = f"{host}/obs_pngs/{self.instanceId}/{filename}" - filename_list = os.path.splitext(filename) - json_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.json" - csv_url = f"{host}/obs_pngs/{self.instanceId}/{filename_list[0]}.csv" - sql_stmt = "INSERT INTO stations (stationid, stationname, source, state, lat, lon, node, filename, the_geom, instance_id, imageurl, type, jsonurl, csvurl) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s),4326), %s, %s, %s, %s, %s)" - params = [row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[5], row[4], self.instanceId, png_url, row[8], json_url, csv_url] - logger.debug(f"sql_stmt: {sql_stmt} params: {params}") - self.cursor.execute(sql_stmt, params) - except (Exception): - self.conn.commit() - raise IOError - - self.conn.commit() - - @staticmethod - def valid_csv_row(header: list, row: list, optional=None) -> str: - """ - Checks the data list to make sure there are values in each required element - and log the missing data entry. - - :param header: The CSV data header - :param row: The list of data - :param optional: The list of indexes that are optional - - :return: A comma delimited string of the errant columns - """ - # init the return - no_data_col: list = [] - - # if there are no optional values passed in just create an empty list - if optional is None: - optional = [] - - # for each element in the row - for index, value in enumerate(row): - # is this a required value and doesn't have data - if index not in optional and (value is None or len(value) == 0): - # append the column to the list - no_data_col.append(header[index]) - - # return the failed cols - return ','.join(no_data_col) diff --git a/base_db.py b/base_db.py new file mode 100644 index 0000000..6620c3c --- /dev/null +++ b/base_db.py @@ -0,0 +1,60 @@ +import os, sys +import psycopg2 + +class BASE_DB: + + def __init__(self, logger, user, pswd, db, host, port): + self.conn = None + self.logger = logger + + self.user = user.strip() + self.pswd = pswd.strip() + self.dbname = db.strip() + self.host = host.strip() + self.port = port.strip() + + try: + # connect to the database + conn_str = f'host={self.host} port={self.port} dbname={self.dbname} user={self.user} password={self.pswd}' + + self.conn = psycopg2.connect(conn_str) + self.conn.set_session(autocommit=True) + self.cursor = self.conn.cursor() + except: + e = sys.exc_info()[0] + self.logger.error(f"FAILURE - Cannot connect to {self.dbname} DB. error {e}") + + def __del__(self): + """ + close up the DB + :return: + """ + try: + if self.cursor is not None: + self.cursor.close() + if self.conn is not None: + self.conn.close() + except Exception as e: + self.logger.error(f'Error detected closing cursor or connection. {e}') + #sys.exc_info()[0] + + def get_user(self): + return self.user + + def get_password(self): + return self.pswd + + def get_host(self): + return self.host + + def get_port(self): + return self.port + + def get_dbname(self): + return self.dbname + + def get_conn(self): + return self.conn + + def get_cursor(self): + return self.cursor \ No newline at end of file diff --git a/load-geoserver-images.py b/load-geoserver-images.py index 88a058a..e026b2a 100644 --- a/load-geoserver-images.py +++ b/load-geoserver-images.py @@ -329,15 +329,14 @@ def add_mbtiles_coveragestores(logger, geo, url, instance_id, worksp, mbtiles_pa return layergrp -# add a datastore in geoserver for the stationProps.csv file -def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_host, layergrp): +# add a db entries for the stationProps.csv file +def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_host): logger.info(f"Adding the station properties datastore for instance id: {instance_id}") # set up paths and datastore name # TODO put these in ENVs stations_filename = "stationProps.csv" csv_file_path = f"{final_path}/insets/{stations_filename}" store_name = str(instance_id) + "_station_props" - style_name = "observations_style_v3" logger.debug(f"csv_file_path: {csv_file_path} store name: {store_name}") logger.debug(f"checking to see if {csv_file_path} exists") @@ -352,7 +351,27 @@ def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_ except (IOError, OSError): e = sys.exc_info()[0] logger.warning(f"WARNING - Cannot save station data in APSVIZ_GAUGES DB. Error: {e}") - # TODO: Should it be returning here? return layergrp + + else: + logger.info(f"Observations config file:{csv_file_path} does not exist. Skipping creation of obs/mod layer") + utils = GeneralUtils(logger) + msg = f"Error encountered with instance id: {instance_id}. Did not create Observations layer because the station properties config file: {csv_file_path} was not found" + utils.send_slack_msg(msg, 'slack_issues_channel') + + return + + +def add_dbprops_datastore(logger, geo, instance_id, worksp, final_path, geoserver_host, layergrp): + logger.info(f"Adding the station properties datastore for instance id: {instance_id}") + # set up datastore name + store_name = str(instance_id) + "_station_props" + style_name = "observations_style_v3" + + # get apsviz_gauges db connection + asgs_obsdb = APSVIZ_GAUGES_DB(logger, instance_id) + + # check to see if observation for this model run exists, if so, create jndi feature layer + if asgs_obsdb.isObsRun(): # ... using pre-defined postgresql JNDI feature store in Geoserver ret = geo.create_jndi_featurestore(store_name, worksp, overwrite=False) @@ -360,13 +379,13 @@ def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_ logger.info(f"Added JNDI featurestore: {store_name}") # now publish this layer with an SQL filter based on instance_id - sql = f"select * from stations where instance_id='{instance_id}'" + sql = f"select * from drf_apsviz_station where model_run_id='{instance_id}'" name = f"{instance_id}_station_properies_view" # TODO probably need to update this name - 5/21/21 - okay updated ... # but maybe need to make this a little less messy db_name = os.getenv('ASGS_DB_DATABASE', 'asgs').strip() - asgsdb = ASGS_DB(logger, db_name, instance_id) + asgsdb = ASGS_DB(logger, instance_id) meta_dict = asgsdb.getRunMetadata() raw_date = meta_dict['currentdate'] if raw_date: @@ -391,10 +410,7 @@ def add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_ layergrp["wfs"].append({"title": title, "layername": full_layername, "metclass": meta_dict['forcing.metclass'], "info": info_dict, "project_code": meta_dict['suite.project_code'], "product_type": "obs"}) else: - logger.info(f"Observations config file:{csv_file_path} does not exist. Skipping creation of obs/mod layer") - utils = GeneralUtils(logger) - msg = f"Error encountered with instance id: {instance_id}. Did not create Observations layer because the station properties config file: {csv_file_path} was not found" - utils.send_slack_msg(msg, 'slack_issues_channel') + logger.info(f"Observations for model run id {instance_id} do not exist. Skipping creation of obs/mod layer") return layergrp @@ -645,9 +661,12 @@ def main(args): logger.info("Error encountered while loading image mosaic into GeoServer - program exiting") raise SystemExit() - # now put NOAA OBS .csv file into geoserver - next_layergrp = add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_host, new_layergrp) - if (new_layergrp is None): + # now put NOAA OBS .csv file into db + add_props_datastore(logger, geo, instance_id, worksp, final_path, geoserver_host) + + # now create jndi datastore for observations from APSViz Gauges DB + next_layergrp = add_dbprops_datastore(logger, geo, instance_id, worksp, new_layergrp) + if (next_layergrp is None): logger.info("Error encountered while loading noaa observation layer into GeoServer - program exiting") raise SystemExit() diff --git a/terria_catalogV8DB.py b/terria_catalogV8DB.py index 8170a62..1c6da15 100644 --- a/terria_catalogV8DB.py +++ b/terria_catalogV8DB.py @@ -209,7 +209,7 @@ class TerriaCatalogDB: '],' \ '"url": "https://apsviz-geoserver.renci.org/geoserver/ADCIRC_2021/wms",' \ '"featureInfoTemplate": {' \ - '"template": "

{{stationname}}

"' \ + '"template": "

{{location_name}}

"' \ '},' \ '"info": [' \ '{' \