Skip to content

Commit

Permalink
WIP read data with seperate read() call instead of start reading duri…
Browse files Browse the repository at this point in the history
…ng init
  • Loading branch information
Jan Griesfeller committed Oct 18, 2024
1 parent d564d7c commit 511a749
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 125 deletions.
219 changes: 116 additions & 103 deletions src/pyaro_readers/actrisebas/ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
self._filename = None
self.vars_to_read = vars_to_read
self._stations = {}
self._urls_to_dl = {}
self.urls_to_dl = {}
self._data = {} # var -> {data-array}
self._set_filters(filters)
# self._header = []
Expand All @@ -99,7 +99,7 @@ def __init__(
# gives a mapping between the EBAS or pyaerocom variable name
# and the CF standard name found in the EBAS data files
# Due to standard_names aliases, the values are a list
self._standard_names = {}
self.standard_names = {}
# _laststatstr = ""
self._revision = datetime.datetime.now()
self._metadata["revision"] = datetime.datetime.strftime(
Expand All @@ -124,44 +124,44 @@ def __init__(
self.sites_to_exclude = []

# read config file
self._def_data = self._read_definitions(file=DEFINITION_FILE)
# Because the user might have given a pyaerocom name, build self._actris_vars_to_read with a list
self.def_data = self._read_definitions(file=DEFINITION_FILE)
# Because the user might have given a pyaerocom name, build self.actris_vars_to_read with a list
# of ACTRIS variables to read. values are a list
self._actris_vars_to_read = {}
self.actris_vars_to_read = {}
for var in self.vars_to_read:
self._metadata[var] = {}
# handle pyaerocom variables here:
# if a given variable name is in the list of pyaerocom variable names in definitions.toml
self._actris_vars_to_read[var] = []
if var in self._def_data["variables"]:
# use gave a pyaerocom variable name
self._actris_vars_to_read[var] = self._def_data["variables"][var][
self.actris_vars_to_read[var] = []
if var in self.def_data["variables"]:
# user gave a pyaerocom variable name
self.actris_vars_to_read[var] = self.def_data["variables"][var][
"actris_variable"
]
for _actris_var in self._actris_vars_to_read[var]:
for _actris_var in self.actris_vars_to_read[var]:
try:
self._standard_names[var].extend(
self.standard_names[var].extend(
self.get_actris_standard_name(_actris_var)
)
self._standard_names[_actris_var].extend(
self.standard_names[_actris_var].extend(
self.get_actris_standard_name(_actris_var)
)
except KeyError:
self._standard_names[var] = self.get_actris_standard_name(
self.standard_names[var] = self.get_actris_standard_name(
_actris_var
)
self._standard_names[
self.standard_names[
_actris_var
] = self.get_actris_standard_name(_actris_var)

else:
# user gave ACTRIS name
self._actris_vars_to_read[var].append(var)
self._standard_names[var] = self.get_actris_standard_name(var)
self.actris_vars_to_read[var].append(var)
self.standard_names[var] = self.get_actris_standard_name(var)

for _pyaro_var in self._actris_vars_to_read:
for _pyaro_var in self.actris_vars_to_read:
self._metadata[_pyaro_var] = {}
for _actris_var in self._actris_vars_to_read[_pyaro_var]:
for _actris_var in self.actris_vars_to_read[_pyaro_var]:
# for testing since the API is error-prone and slow at the time of this writing
test_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)),
Expand All @@ -176,7 +176,7 @@ def __init__(
json_resp = []
while len(json_resp_tmp) != 0:
# search for variable metadata
query_url = f"{VAR_QUERY_URL}{quote(self._actris_vars_to_read[_pyaro_var][0])}/page/{page_no}"
query_url = f"{VAR_QUERY_URL}{quote(self.actris_vars_to_read[_pyaro_var][0])}/page/{page_no}"
logger.info(query_url)
retries = Retry(connect=5, read=2, redirect=5)
http = PoolManager(retries=retries)
Expand All @@ -196,107 +196,120 @@ def __init__(
continue

self._metadata[_pyaro_var][_actris_var] = json_resp
self._urls_to_dl[_actris_var] = self.extract_urls(
self.urls_to_dl[_actris_var] = self.extract_urls(
json_resp,
sites_to_read=self.sites_to_read,
sites_to_exclude=self.sites_to_exclude,
)
# The following needs some refinement once we read pyaerocom variables that hold more than
# one EBAS variable
# we need to decide per station which EBAS variable to return at a certain station and potentially time
self.read_data(
actris_variable=_pyaro_var, urls_to_dl=self._urls_to_dl[_actris_var]
)
assert self._data[_pyaro_var]
# self.read_data(
# actris_variable=_pyaro_var, urls_to_dl=self.urls_to_dl[_actris_var]
# )
# assert self._data[_pyaro_var]
# return _pyaro_var, self.urls_to_dl

def metadata(self):
return self._metadata

def read_data(
def read(
self,
actris_variable: str,
urls_to_dl: dict,
tqdm_desc="reading stations",
):
"""
read the data from EBAS thredds server
"""
bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None)
for s_idx, site_name in enumerate(urls_to_dl):
for f_idx, url in enumerate(urls_to_dl[site_name]):
tmp_data = xr.open_dataset(url)

# put all data variables in the data struct for the moment
for d_idx, _data_var in enumerate(
self._get_ebas_data_vars(
tmp_data,
)
):
# look for a standard_name match and return only that variable
std_name = self.get_ebas_data_standard_name(tmp_data, _data_var)
if std_name not in self._standard_names[actris_variable]:
# logger.info(
# f"station {site_name}, file #{f_idx}: skipping variable {_data_var} due to wrong standard name"
# )
continue
else:
bla = f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}"
logger.info(
f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}"
)
# for actris vocabulary key and value of self.actris_vars_to_read are the same
# for pyaerocom vocabulary they are not (key is pyaerocom variable name there)!
for _var in self.actris_vars_to_read:
for actris_variable in self.actris_vars_to_read[_var]:
# actris_variable = self.actris_vars_to_read[_var][0]

urls_to_dl = self.urls_to_dl[actris_variable]
bar = tqdm(desc=tqdm_desc, total=len(urls_to_dl), disable=None)
for s_idx, site_name in enumerate(urls_to_dl):
for f_idx, url in enumerate(urls_to_dl[site_name]):
logger.info(f"reading file {url}")
tmp_data = xr.open_dataset(url)

# put all data variables in the data struct for the moment
for d_idx, _data_var in enumerate(
self._get_ebas_data_vars(
tmp_data,
)
):
# look for a standard_name match and return only that variable
std_name = self.get_ebas_data_standard_name(tmp_data, _data_var)
if std_name not in self.standard_names[actris_variable]:
# logger.info(
# f"station {site_name}, file #{f_idx}: skipping variable {_data_var} due to wrong standard name"
# )
continue
else:
log_str = f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}"
logger.info(log_str)

# assert f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}"
long_name = tmp_data.attrs["ebas_station_name"]
stat_code = tmp_data.attrs["ebas_station_code"]
# create variables valid for all measured variables...
start_time = np.asarray(tmp_data["time_bnds"][:, 0])
stop_time = np.asarray(tmp_data["time_bnds"][:, 1])
ts_no = len(start_time)
lat = np.full(ts_no, tmp_data.attrs["geospatial_lat_min"])
lon = np.full(ts_no, tmp_data.attrs["geospatial_lon_min"])
# station = np.full(ts_no, tmp_data.attrs["ebas_station_code"])
station = np.full(ts_no, long_name)
altitude = np.full(
ts_no, tmp_data.attrs["geospatial_vertical_min"]
)
standard_deviation = np.full(ts_no, np.nan)
vals = tmp_data[_data_var].values
# apply flags
ebas_qc_var = self.get_ebas_data_qc_variable(
tmp_data, _data_var
)

flags = np.full(ts_no, Flag.VALID)
if _var not in self._data:
self._data[_var] = NpStructuredData(
_var,
self.get_ebas_data_units(tmp_data, _data_var),
)

assert f"station {site_name}, file #{f_idx}: found matching standard_name {std_name}"
long_name = tmp_data.attrs["ebas_station_name"]
stat_code = tmp_data.attrs["ebas_station_code"]
# create variables valid for all measured variables...
start_time = np.asarray(tmp_data["time_bnds"][:, 0])
stop_time = np.asarray(tmp_data["time_bnds"][:, 1])
ts_no = len(start_time)
lat = np.full(ts_no, tmp_data.attrs["geospatial_lat_min"])
lon = np.full(ts_no, tmp_data.attrs["geospatial_lon_min"])
# station = np.full(ts_no, tmp_data.attrs["ebas_station_code"])
station = np.full(ts_no, long_name)
altitude = np.full(ts_no, tmp_data.attrs["geospatial_vertical_min"])
standard_deviation = np.full(ts_no, np.nan)
vals = tmp_data[_data_var].values
# apply flags
ebas_qc_var = self.get_ebas_data_qc_variable(tmp_data, _data_var)

flags = np.full(ts_no, Flag.VALID)
if actris_variable not in self._data:
self._data[actris_variable] = NpStructuredData(
actris_variable,
self.get_ebas_data_units(tmp_data, _data_var),
self._data[_var].append(
value=vals,
station=station,
latitude=lat,
longitude=lon,
altitude=altitude,
start_time=start_time,
end_time=stop_time,
# TODO: Currently assuming that all observations are valid.
flag=flags,
standard_deviation=standard_deviation,
)
# stop after the 1st matching variable
logger.info(
f"matching std_name found. Not searching for possible additional std_name matches at this point..."
)
break

if not site_name in self._stations:
self._stations[site_name] = Station(
{
"station": site_name,
"longitude": lon[0],
"latitude": lat[0],
"altitude": altitude[0],
"country": self.get_ebas_data_country_code(tmp_data),
"url": "",
"long_name": stat_code,
}
)

self._data[actris_variable].append(
value=vals,
station=station,
latitude=lat,
longitude=lon,
altitude=altitude,
start_time=start_time,
end_time=stop_time,
# TODO: Currently assuming that all observations are valid.
flag=flags,
standard_deviation=standard_deviation,
)
# stop after the 1st matching variable
break
if not site_name in self._stations:
self._stations[site_name] = Station(
{
"station": site_name,
"longitude": lon[0],
"latitude": lat[0],
"altitude": altitude[0],
"country": self.get_ebas_data_country_code(tmp_data),
"url": "",
"long_name": stat_code,
}
)
bar.update(1)
bar.close()
bar.update(1)
bar.close()

def get_ebas_flags(self, url: str = EBAS_FLAG_URL) -> dict:
"""small helper to download the bas flag file from NILU"""
Expand Down Expand Up @@ -362,7 +375,7 @@ def get_ebas_data_country_code(self, tmp_data):
def get_actris_standard_name(self, actris_var_name):
"""small helper method to get corresponding CF standard name for a given ACTRIS variable"""
try:
return self._def_data[STD_NAME_SECTION_NAME][actris_var_name]
return self.def_data[STD_NAME_SECTION_NAME][actris_var_name]
except KeyError:
raise ActrisEbasStdNameNotFoundException(
f"Error: no CF standard name for {actris_var_name} found!"
Expand All @@ -387,7 +400,7 @@ def _get_ebas_data_vars(self, tmp_data, actris_var: str = None, units: str = Non
# if defined, return only names that match
if (
tmp_data[data_var].attrs["units"]
== self._def_data["actris_std_units"][data_var]
== self.def_data["actris_std_units"][data_var]
):
data_vars.append(data_var)
except KeyError:
Expand Down
41 changes: 19 additions & 22 deletions tests/test_ActrisEbasReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,16 @@ def test_api_reading_small_data_set(self):
# },
}
engine = pyaro.list_timeseries_engines()[self.engine]
#
with engine.open(
# filters=[],
filters=self.station_filter,
vars_to_read=self.vars_to_read,
) as ts:
self.assertGreaterEqual(len(ts.variables()), 1)
self.assertGreaterEqual(len(ts.stations()), 2)
self.assertGreaterEqual(len(ts._data[ts.variables()[0]]), 1000)
self.assertGreaterEqual(len(ts.data(ts.variables()[0])), 1000)

self.assertIn("revision", ts.metadata())
read_obj = engine.open(
filters=self.station_filter, vars_to_read=self.vars_to_read
)
read_obj.read()
self.assertGreaterEqual(len(read_obj.variables()), 1)
self.assertGreaterEqual(len(read_obj.stations()), 2)
self.assertGreaterEqual(len(read_obj._data[read_obj.variables()[0]]), 1000)
self.assertGreaterEqual(len(read_obj.data(read_obj.variables()[0])), 1000)
self.assertGreaterEqual(len(read_obj.variables()), 1)
self.assertIn("revision", read_obj.metadata())

def test_api_reading_pyaerocom_naming(self):
# test access to the EBAS API
Expand All @@ -99,16 +97,15 @@ def test_api_reading_pyaerocom_naming(self):
# test variable by variable
for _var in self.pyaerocom_vars_to_read:
engine = pyaro.list_timeseries_engines()[self.engine]
#
with engine.open(
filters=self.station_filter,
vars_to_read=[_var],
test_flag=False,
) as ts:
self.assertGreaterEqual(len(ts.variables()), 1)
self.assertGreaterEqual(len(ts.stations()), 2)
self.assertGreaterEqual(len(ts._data[ts.variables()[0]]), 1000)
self.assertGreaterEqual(len(ts.data(ts.variables()[0])), 1000)
read_obj = engine.open(filters=self.station_filter, vars_to_read=[_var])
read_obj.read()
self.assertGreaterEqual(len(read_obj.variables()), 1)
self.assertGreaterEqual(len(read_obj.stations()), 2)
self.assertGreaterEqual(len(read_obj._data[read_obj.variables()[0]]), 1000)
self.assertGreaterEqual(len(read_obj.data(read_obj.variables()[0])), 1000)
self.assertGreaterEqual(len(read_obj.variables()), 1)
self.assertIn("revision", read_obj.metadata())


#
# #
Expand Down

0 comments on commit 511a749

Please sign in to comment.