From 511a7498c536f3cc0787a3d6f3bf16872df15a71 Mon Sep 17 00:00:00 2001 From: Jan Griesfeller Date: Fri, 18 Oct 2024 14:14:28 +0200 Subject: [PATCH] WIP read data with seperate read() call instead of start reading during init --- .../actrisebas/ActrisEbasReader.py | 219 ++++++++++-------- tests/test_ActrisEbasReader.py | 41 ++-- 2 files changed, 135 insertions(+), 125 deletions(-) diff --git a/src/pyaro_readers/actrisebas/ActrisEbasReader.py b/src/pyaro_readers/actrisebas/ActrisEbasReader.py index 04bd3fe..ec1cc2b 100644 --- a/src/pyaro_readers/actrisebas/ActrisEbasReader.py +++ b/src/pyaro_readers/actrisebas/ActrisEbasReader.py @@ -90,7 +90,7 @@ def __init__( self._filename = None self.vars_to_read = vars_to_read self._stations = {} - self._urls_to_dl = {} + self.urls_to_dl = {} self._data = {} # var -> {data-array} self._set_filters(filters) # self._header = [] @@ -99,7 +99,7 @@ def __init__( # gives a mapping between the EBAS or pyaerocom variable name # and the CF standard name found in the EBAS data files # Due to standard_names aliases, the values are a list - self._standard_names = {} + self.standard_names = {} # _laststatstr = "" self._revision = datetime.datetime.now() self._metadata["revision"] = datetime.datetime.strftime( @@ -124,44 +124,44 @@ def __init__( self.sites_to_exclude = [] # read config file - self._def_data = self._read_definitions(file=DEFINITION_FILE) - # Because the user might have given a pyaerocom name, build self._actris_vars_to_read with a list + self.def_data = self._read_definitions(file=DEFINITION_FILE) + # Because the user might have given a pyaerocom name, build self.actris_vars_to_read with a list # of ACTRIS variables to read. values are a list - self._actris_vars_to_read = {} + self.actris_vars_to_read = {} for var in self.vars_to_read: self._metadata[var] = {} # handle pyaerocom variables here: # if a given variable name is in the list of pyaerocom variable names in definitions.toml - self._actris_vars_to_read[var] = [] - if var in self._def_data["variables"]: - # use gave a pyaerocom variable name - self._actris_vars_to_read[var] = self._def_data["variables"][var][ + self.actris_vars_to_read[var] = [] + if var in self.def_data["variables"]: + # user gave a pyaerocom variable name + self.actris_vars_to_read[var] = self.def_data["variables"][var][ "actris_variable" ] - for _actris_var in self._actris_vars_to_read[var]: + for _actris_var in self.actris_vars_to_read[var]: try: - self._standard_names[var].extend( + self.standard_names[var].extend( self.get_actris_standard_name(_actris_var) ) - self._standard_names[_actris_var].extend( + self.standard_names[_actris_var].extend( self.get_actris_standard_name(_actris_var) ) except KeyError: - self._standard_names[var] = self.get_actris_standard_name( + self.standard_names[var] = self.get_actris_standard_name( _actris_var ) - self._standard_names[ + self.standard_names[ _actris_var ] = self.get_actris_standard_name(_actris_var) else: # user gave ACTRIS name - self._actris_vars_to_read[var].append(var) - self._standard_names[var] = self.get_actris_standard_name(var) + self.actris_vars_to_read[var].append(var) + self.standard_names[var] = self.get_actris_standard_name(var) - for _pyaro_var in self._actris_vars_to_read: + for _pyaro_var in self.actris_vars_to_read: self._metadata[_pyaro_var] = {} - for _actris_var in self._actris_vars_to_read[_pyaro_var]: + for _actris_var in self.actris_vars_to_read[_pyaro_var]: # for testing since the API is error-prone and slow at the time of this writing test_file = os.path.join( os.path.dirname(os.path.realpath(__file__)), @@ -176,7 +176,7 @@ def __init__( json_resp = [] while len(json_resp_tmp) != 0: # search for variable metadata - query_url = f"{VAR_QUERY_URL}{quote(self._actris_vars_to_read[_pyaro_var][0])}/page/{page_no}" + query_url = f"{VAR_QUERY_URL}{quote(self.actris_vars_to_read[_pyaro_var][0])}/page/{page_no}" logger.info(query_url) retries = Retry(connect=5, read=2, redirect=5) http = PoolManager(retries=retries) @@ -196,7 +196,7 @@ def __init__( continue self._metadata[_pyaro_var][_actris_var] = json_resp - self._urls_to_dl[_actris_var] = self.extract_urls( + self.urls_to_dl[_actris_var] = self.extract_urls( json_resp, sites_to_read=self.sites_to_read, sites_to_exclude=self.sites_to_exclude, @@ -204,99 +204,112 @@ def __init__( # The following needs some refinement once we read pyaerocom variables that hold more than # one EBAS variable # we need to decide per station which EBAS variable to return at a certain station and potentially time - self.read_data( - actris_variable=_pyaro_var, urls_to_dl=self._urls_to_dl[_actris_var] - ) - assert self._data[_pyaro_var] + # self.read_data( + # actris_variable=_pyaro_var, urls_to_dl=self.urls_to_dl[_actris_var] + # ) + # assert self._data[_pyaro_var] + # return _pyaro_var, self.urls_to_dl def metadata(self): return self._metadata - def read_data( + def read( self, - actris_variable: str, - urls_to_dl: dict, tqdm_desc="reading stations", ): """ read the data from EBAS thredds server """ - bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None) - for s_idx, site_name in enumerate(urls_to_dl): - for f_idx, url in enumerate(urls_to_dl[site_name]): - tmp_data = xr.open_dataset(url) - - # put all data variables in the data struct for the moment - for d_idx, _data_var in enumerate( - self._get_ebas_data_vars( - tmp_data, - ) - ): - # look for a standard_name match and return only that variable - std_name = self.get_ebas_data_standard_name(tmp_data, _data_var) - if std_name not in self._standard_names[actris_variable]: - # logger.info( - # f"station {site_name}, file #{f_idx}: skipping variable {_data_var} due to wrong standard name" - # ) - continue - else: - bla = f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}" - logger.info( - f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}" - ) + # for actris vocabulary key and value of self.actris_vars_to_read are the same + # for pyaerocom vocabulary they are not (key is pyaerocom variable name there)! + for _var in self.actris_vars_to_read: + for actris_variable in self.actris_vars_to_read[_var]: + # actris_variable = self.actris_vars_to_read[_var][0] + + urls_to_dl = self.urls_to_dl[actris_variable] + bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None) + for s_idx, site_name in enumerate(urls_to_dl): + for f_idx, url in enumerate(urls_to_dl[site_name]): + logger.info(f"reading file {url}") + tmp_data = xr.open_dataset(url) + + # put all data variables in the data struct for the moment + for d_idx, _data_var in enumerate( + self._get_ebas_data_vars( + tmp_data, + ) + ): + # look for a standard_name match and return only that variable + std_name = self.get_ebas_data_standard_name(tmp_data, _data_var) + if std_name not in self.standard_names[actris_variable]: + # logger.info( + # f"station {site_name}, file #{f_idx}: skipping variable {_data_var} due to wrong standard name" + # ) + continue + else: + log_str = f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}" + logger.info(log_str) + + # assert f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}" + long_name = tmp_data.attrs["ebas_station_name"] + stat_code = tmp_data.attrs["ebas_station_code"] + # create variables valid for all measured variables... + start_time = np.asarray(tmp_data["time_bnds"][:, 0]) + stop_time = np.asarray(tmp_data["time_bnds"][:, 1]) + ts_no = len(start_time) + lat = np.full(ts_no, tmp_data.attrs["geospatial_lat_min"]) + lon = np.full(ts_no, tmp_data.attrs["geospatial_lon_min"]) + # station = np.full(ts_no, tmp_data.attrs["ebas_station_code"]) + station = np.full(ts_no, long_name) + altitude = np.full( + ts_no, tmp_data.attrs["geospatial_vertical_min"] + ) + standard_deviation = np.full(ts_no, np.nan) + vals = tmp_data[_data_var].values + # apply flags + ebas_qc_var = self.get_ebas_data_qc_variable( + tmp_data, _data_var + ) + + flags = np.full(ts_no, Flag.VALID) + if _var not in self._data: + self._data[_var] = NpStructuredData( + _var, + self.get_ebas_data_units(tmp_data, _data_var), + ) - assert f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}" - long_name = tmp_data.attrs["ebas_station_name"] - stat_code = tmp_data.attrs["ebas_station_code"] - # create variables valid for all measured variables... - start_time = np.asarray(tmp_data["time_bnds"][:, 0]) - stop_time = np.asarray(tmp_data["time_bnds"][:, 1]) - ts_no = len(start_time) - lat = np.full(ts_no, tmp_data.attrs["geospatial_lat_min"]) - lon = np.full(ts_no, tmp_data.attrs["geospatial_lon_min"]) - # station = np.full(ts_no, tmp_data.attrs["ebas_station_code"]) - station = np.full(ts_no, long_name) - altitude = np.full(ts_no, tmp_data.attrs["geospatial_vertical_min"]) - standard_deviation = np.full(ts_no, np.nan) - vals = tmp_data[_data_var].values - # apply flags - ebas_qc_var = self.get_ebas_data_qc_variable(tmp_data, _data_var) - - flags = np.full(ts_no, Flag.VALID) - if actris_variable not in self._data: - self._data[actris_variable] = NpStructuredData( - actris_variable, - self.get_ebas_data_units(tmp_data, _data_var), + self._data[_var].append( + value=vals, + station=station, + latitude=lat, + longitude=lon, + altitude=altitude, + start_time=start_time, + end_time=stop_time, + # TODO: Currently assuming that all observations are valid. + flag=flags, + standard_deviation=standard_deviation, + ) + # stop after the 1st matching variable + logger.info( + f"matching std_name found. Not searching for possible additional std_name matches at this point..." + ) + break + + if not site_name in self._stations: + self._stations[site_name] = Station( + { + "station": site_name, + "longitude": lon[0], + "latitude": lat[0], + "altitude": altitude[0], + "country": self.get_ebas_data_country_code(tmp_data), + "url": "", + "long_name": stat_code, + } ) - - self._data[actris_variable].append( - value=vals, - station=station, - latitude=lat, - longitude=lon, - altitude=altitude, - start_time=start_time, - end_time=stop_time, - # TODO: Currently assuming that all observations are valid. - flag=flags, - standard_deviation=standard_deviation, - ) - # stop after the 1st matching variable - break - if not site_name in self._stations: - self._stations[site_name] = Station( - { - "station": site_name, - "longitude": lon[0], - "latitude": lat[0], - "altitude": altitude[0], - "country": self.get_ebas_data_country_code(tmp_data), - "url": "", - "long_name": stat_code, - } - ) - bar.update(1) - bar.close() + bar.update(1) + bar.close() def get_ebas_flags(self, url: str = EBAS_FLAG_URL) -> dict: """small helper to download the bas flag file from NILU""" @@ -362,7 +375,7 @@ def get_ebas_data_country_code(self, tmp_data): def get_actris_standard_name(self, actris_var_name): """small helper method to get corresponding CF standard name for a given ACTRIS variable""" try: - return self._def_data[STD_NAME_SECTION_NAME][actris_var_name] + return self.def_data[STD_NAME_SECTION_NAME][actris_var_name] except KeyError: raise ActrisEbasStdNameNotFoundException( f"Error: no CF standard name for {actris_var_name} found!" @@ -387,7 +400,7 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non # if defined, return only names that match if ( tmp_data[data_var].attrs["units"] - == self._def_data["actris_std_units"][data_var] + == self.def_data["actris_std_units"][data_var] ): data_vars.append(data_var) except KeyError: diff --git a/tests/test_ActrisEbasReader.py b/tests/test_ActrisEbasReader.py index 3c1f92b..29ac0d7 100644 --- a/tests/test_ActrisEbasReader.py +++ b/tests/test_ActrisEbasReader.py @@ -75,18 +75,16 @@ def test_api_reading_small_data_set(self): # }, } engine = pyaro.list_timeseries_engines()[self.engine] - # - with engine.open( - # filters=[], - filters=self.station_filter, - vars_to_read=self.vars_to_read, - ) as ts: - self.assertGreaterEqual(len(ts.variables()), 1) - self.assertGreaterEqual(len(ts.stations()), 2) - self.assertGreaterEqual(len(ts._data[ts.variables()[0]]), 1000) - self.assertGreaterEqual(len(ts.data(ts.variables()[0])), 1000) - - self.assertIn("revision", ts.metadata()) + read_obj = engine.open( + filters=self.station_filter, vars_to_read=self.vars_to_read + ) + read_obj.read() + self.assertGreaterEqual(len(read_obj.variables()), 1) + self.assertGreaterEqual(len(read_obj.stations()), 2) + self.assertGreaterEqual(len(read_obj._data[read_obj.variables()[0]]), 1000) + self.assertGreaterEqual(len(read_obj.data(read_obj.variables()[0])), 1000) + self.assertGreaterEqual(len(read_obj.variables()), 1) + self.assertIn("revision", read_obj.metadata()) def test_api_reading_pyaerocom_naming(self): # test access to the EBAS API @@ -99,16 +97,15 @@ def test_api_reading_pyaerocom_naming(self): # test variable by variable for _var in self.pyaerocom_vars_to_read: engine = pyaro.list_timeseries_engines()[self.engine] - # - with engine.open( - filters=self.station_filter, - vars_to_read=[_var], - test_flag=False, - ) as ts: - self.assertGreaterEqual(len(ts.variables()), 1) - self.assertGreaterEqual(len(ts.stations()), 2) - self.assertGreaterEqual(len(ts._data[ts.variables()[0]]), 1000) - self.assertGreaterEqual(len(ts.data(ts.variables()[0])), 1000) + read_obj = engine.open(filters=self.station_filter, vars_to_read=[_var]) + read_obj.read() + self.assertGreaterEqual(len(read_obj.variables()), 1) + self.assertGreaterEqual(len(read_obj.stations()), 2) + self.assertGreaterEqual(len(read_obj._data[read_obj.variables()[0]]), 1000) + self.assertGreaterEqual(len(read_obj.data(read_obj.variables()[0])), 1000) + self.assertGreaterEqual(len(read_obj.variables()), 1) + self.assertIn("revision", read_obj.metadata()) + # # #