diff --git a/monetio/obs/openaq.py b/monetio/obs/openaq.py index 53cd77c2..bf11b016 100644 --- a/monetio/obs/openaq.py +++ b/monetio/obs/openaq.py @@ -93,7 +93,7 @@ def add_data(self, dates, num_workers=1): dfs = [dask.delayed(self.read_json)(f) for f in urls] dff = dd.from_delayed(dfs) z = dff.compute(num_workers=num_workers) - z.coordinates.replace(to_replace=[None], value=pd.np.nan, inplace=True) + z.coordinates.replace(to_replace=[None], value=NaN, inplace=True) z = z.dropna().reset_index(drop=True) js = json.loads(z[["coordinates", "date"]].to_json(orient="records")) dff = pd.io.json.json_normalize(js) @@ -101,8 +101,12 @@ def add_data(self, dates, num_workers=1): dff.rename({"local": "time_local", "utc": "time"}, axis=1, inplace=True) dff["time"] = pd.to_datetime(dff.time) - dff["time_local"] = pd.to_datetime(dff.time_local) + dff["utcoffset"] = pd.to_datetime(dff.time_local).apply(lambda x: x.utcoffset()) zzz = z.join(dff).drop(columns=["coordinates", "date", "attribution", "averagingPeriod"]) + zzz = self._fix_units(zzz) + assert ( + zzz[~zzz.parameter.isin(["pm25", "pm4", "pm10", "bc"])].unit.dropna() == "ppm" + ).all() zp = self._pivot_table(zzz) zp["siteid"] = ( zp.country @@ -114,8 +118,8 @@ def add_data(self, dates, num_workers=1): ) zp["time"] = zp.time.dt.tz_localize(None) - tzinfo = zp.time_local.apply(lambda x: x.tzinfo.utcoffset(x)) - zp["time_local"] = zp["time"] + tzinfo + zp["time_local"] = zp["time"] + zp["utcoffset"] + return zp.loc[zp.time >= dates.min()] def read_json(self, url): @@ -161,10 +165,17 @@ def local(x): def _fix_units(self, df): df.loc[df.value <= 0] = NaN - df.loc[(df.parameter == "co") & (df.unit != "ppm"), "value"] /= 1145 - df.loc[(df.parameter == "o3") & (df.unit != "ppm"), "value"] /= 2000 - df.loc[(df.parameter == "so2") & (df.unit != "ppm"), "value"] /= 2620 - df.loc[(df.parameter == "no2") & (df.unit != "ppm"), "value"] /= 1880 + # For a certain parameter, different site-times may have different units. + # https://docs.openaq.org/docs/parameters + # These conversion factors are based on + # - air average molecular weight: 29 g/mol + # - air density: 1.2 kg m -3 + # rounded to 3 significant figures. + fs = {"co": 1160, "o3": 1990, "so2": 2650, "no2": 1900, "ch4": 664, "no": 1240} + for vn, f in fs.items(): + is_ug = (df.parameter == vn) & (df.unit == "µg/m³") + df.loc[is_ug, "value"] /= f + df.loc[is_ug, "unit"] = "ppm" return df def _pivot_table(self, df): @@ -178,7 +189,7 @@ def _pivot_table(self, df): "sourceType", "city", "country", - "time_local", + "utcoffset", ], columns="parameter", ).reset_index() @@ -188,10 +199,13 @@ def _pivot_table(self, df): o3="o3_ppm", no2="no2_ppm", so2="so2_ppm", + ch4="ch4_ppm", + no="no_ppm", bc="bc_umg3", pm25="pm25_ugm3", pm10="pm10_ugm3", ), axis=1, + errors="ignore", ) return w diff --git a/tests/test_openaq.py b/tests/test_openaq.py new file mode 100644 index 00000000..a349544c --- /dev/null +++ b/tests/test_openaq.py @@ -0,0 +1,17 @@ +import sys + +import pandas as pd +import pytest + +from monetio import openaq + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires Python 3.7+") +def test_openaq(): + # First date in the archive, just one file + # Browse the archive at https://openaq-fetches.s3.amazonaws.com/index.html + dates = pd.date_range(start="2013-11-26", end="2013-11-27", freq="H")[:-1] + df = openaq.add_data(dates) + assert not df.empty + assert df.siteid.nunique() == 1 + assert (df.country == "CN").all() and ((df.time_local - df.time) == pd.Timedelta(hours=8)).all()