Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pdbufr to read DWD radar data in bufr format #482

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ if [ "${flavor}" = "testing" ]; then
--extras=radar \
--extras=radarplus \
--extras=restapi \
--extras=sql
--extras=sql \
--extras=bufr

elif [ "${flavor}" = "docs" ]; then
poetry install --verbose --no-interaction --with=docs --extras=interpolation
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ jobs:
brew install eccodes
export WD_ECCODES_DIR=$(brew --prefix eccodes)

- name: Install eccodes (Mac only)
run: |
if [ "$RUNNER_OS" == "macOS" ]; then
brew install eccodes && export WD_ECCODES_DIR=$(brew --prefix eccodes)
fi

- name: Install project
run: .github/workflows/install.sh testing

Expand Down
32 changes: 32 additions & 0 deletions tests/provider/dwd/radar/test_api_historic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
from dirty_equals import IsDatetime, IsDict, IsInt, IsList, IsNumeric, IsStr

from wetterdienst.eccodes import ensure_eccodes
from wetterdienst.provider.dwd.radar import (
DwdRadarDataFormat,
DwdRadarDataSubset,
Expand Down Expand Up @@ -516,6 +517,26 @@ def test_radar_request_site_historic_pe_bufr(default_settings):
decoder = pybufrkit.decoder.Decoder()
decoder.process(payload, info_only=True)

if ensure_eccodes():
df = results[0].df

assert not df.empty

print(df.dropna().query("value != 0"))

assert df.columns.tolist() == [
"station_id",
"latitude",
"longitude",
"height",
"projectionType",
"pictureType",
"date",
"echotops",
]

assert not df.dropna().empty


@pytest.mark.xfail(reason="month_year not matching start_date")
@pytest.mark.remote
Expand Down Expand Up @@ -569,6 +590,13 @@ def test_radar_request_site_historic_pe_timerange(fmt, default_settings):
)
assert re.match(bytes(header, encoding="ascii"), payload[:115])

first = results[0]

if fmt == DwdRadarDataFormat.BUFR:
assert not first.df.dropna().empty

assert first.df.columns == [""]


@pytest.mark.remote
def test_radar_request_site_historic_px250_bufr_yesterday(default_settings):
Expand Down Expand Up @@ -637,6 +665,10 @@ def test_radar_request_site_historic_px250_bufr_timerange(default_settings):

assert len(results) == 12

first = results[0]

assert not first.df.dropna().empty


@pytest.mark.remote
def test_radar_request_site_historic_sweep_vol_v_hdf5_yesterday(default_settings):
Expand Down
2 changes: 2 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_default_settings(caplog):
default_settings = Settings.default()
assert not default_settings.cache_disable
assert re.match(WD_CACHE_DIR_PATTERN, default_settings.cache_dir)
assert default_settings.eccodes_dir is None
assert default_settings.fsspec_client_kwargs == {}
assert default_settings.ts_humanize
assert default_settings.ts_shape == "long"
Expand All @@ -44,6 +45,7 @@ def test_default_settings(caplog):
"precipitation_height": 20.0,
}
assert default_settings.ts_interpolation_use_nearby_station_distance == 1
assert not default_settings.read_bufr
log_message = caplog.messages[0]
assert re.match(WD_CACHE_ENABLED_PATTERN, log_message)

Expand Down
13 changes: 13 additions & 0 deletions wetterdienst/eccodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018-2022, earthobservations developers.
# Distributed under the MIT License. See LICENSE for more info.
def ensure_eccodes() -> bool:
"""Function to ensure that eccodes is loaded"""
try:
import eccodes

eccodes.eccodes.codes_get_api_version()
except (ModuleNotFoundError, RuntimeError):
return False

return True
74 changes: 74 additions & 0 deletions wetterdienst/provider/dwd/radar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@

log = logging.getLogger(__name__)

BUFR_PARAMETER_MAPPING = {
DwdRadarParameter.PE_ECHO_TOP: ["echoTops"],
DwdRadarParameter.PG_REFLECTIVITY: ["horizontalReflectivity"],
DwdRadarParameter.LMAX_VOLUME_SCAN: ["horizontalReflectivity"],
DwdRadarParameter.PX250_REFLECTIVITY: ["horizontalReflectivity"],
}

ECCODES_FOUND = ensure_eccodes()


@dataclass
class RadarResult:
Expand All @@ -64,6 +73,8 @@ class RadarResult:
"""

data: BytesIO
# placeholder for bufr files, which are read into pandas.DataFrame if eccodes available
df: pl.DataFrame = field(default_factory=pl.DataFrame)
timestamp: dt.datetime = None
url: str = None
filename: str = None
Expand Down Expand Up @@ -415,6 +426,69 @@ def query(self) -> Iterator[RadarResult]:
verify_hdf5(result.data)
except Exception as e: # pragma: no cover
log.exception(f"Unable to read HDF5 file. {e}")

if self.format == DwdRadarDataFormat.BUFR:
if ECCODES_FOUND and self.settings.read_bufr:
buffer = result.data

# TODO: pdbufr currently doesn't seem to allow reading directly from BytesIO
tf = tempfile.NamedTemporaryFile("w+b")
tf.write(buffer.read())
tf.seek(0)

df = pdbufr.read_bufr(
tf.name,
columns="data",
flat=True
)

value_vars = []
parameters = BUFR_PARAMETER_MAPPING[self.parameter]
for par in parameters:
value_vars.extend([col for col in df.columns if par in col])
value_vars = set(value_vars)
id_vars = df.columns.difference(value_vars)
id_vars = [iv for iv in id_vars if iv.startswith("#1#")]

df = df.melt(id_vars=id_vars,var_name="parameter",value_vars=value_vars, value_name="value")
df.columns = [col[3:] if col.startswith("#1#") else col for col in df.columns]

df = df.rename(
columns={
"stationNumber": Columns.STATION_ID.value,
"latitude": Columns.LATITUDE.value,
"longitude": Columns.LONGITUDE.value,
"heightOfStation": Columns.HEIGHT.value,
}
)


# df[Columns.STATION_ID.value] = df[Columns.STATION_ID.value].astype(int).astype(str)

date_columns = ["year", "month", "day", "hour", "minute"]
dates = df.loc[:, date_columns].apply(
lambda x: datetime(
year=x.year, month=x.month, day=x.day, hour=x.hour, minute=x.minute
),
axis=1,
)
df.insert(len(df.columns) - 1, Columns.DATE.value, dates)
df = df.drop(columns=date_columns)

def split_index_parameter(text: str):
split_index = text.index("#", 1)
if split_index == -1:
return text, None
index = text[1:split_index]
parameter = text[split_index+1:]
return parameter, float(index)

df[["parameter", "index"]] = df.pop("parameter").map(split_index_parameter).tolist()

df = df.sort_values(["parameter", "index"])

result.df = df

yield result

@staticmethod
Expand Down
Loading