Skip to content

Commit

Permalink
Simplify rain gauge processing
Browse files Browse the repository at this point in the history
  • Loading branch information
tukiains committed Jan 22, 2025
1 parent 2e893ce commit 0c32897
Showing 1 changed file with 57 additions and 49 deletions.
106 changes: 57 additions & 49 deletions src/processing/harmonizer/rain_gauge.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import logging
import shutil
from tempfile import NamedTemporaryFile
from typing import Final

import netCDF4
from cloudnetpy.instruments import instruments
from numpy import ma

from processing.harmonizer import core

RATE: Final = "rainfall_rate"
AMOUNT: Final = "rainfall_amount"


def harmonize_thies_pt_nc(data: dict) -> str:
if "output_path" not in data:
Expand All @@ -23,26 +27,21 @@ def harmonize_thies_pt_nc(data: dict) -> str:
gauge = RainGaugeNc(nc_raw, nc, data)
ind = gauge.get_valid_time_indices()
gauge.nc.createDimension("time", len(ind))
gauge.copy_data(("time", "int_m", "am_tot", "am_red"), ind)
gauge.copy_data(("time", "int_h", "am_tot"), ind)
gauge.mask_bad_data_values()
gauge.fix_variable_names()
gauge.fix_variable_attributes()
gauge.clean_global_attributes()
gauge.convert_units()
for key in (
"rainfall_rate",
"rainfall_amount",
):
gauge.harmonize_standard_attributes(key)
gauge.harmonize_attribute("comment", ("rainfall_amount",))
gauge.fix_long_names()
gauge.convert_rainfall_rate()
gauge.convert_rainfall_amount()
gauge.fix_jumps_in_pt()
gauge.normalize_rainfall_amount()
uuid = gauge.add_uuid()
gauge.add_global_attributes("rain-gauge", instruments.THIES_PT)
gauge.add_date()
gauge.convert_time()
gauge.add_geolocation()
gauge.add_history("rain-gauge")
gauge.fix_jumps_in_pt()
gauge.normalize_rainfall_amount()
if "output_path" not in data:
shutil.copy(temp_file.name, data["full_path"])
return uuid
Expand All @@ -62,26 +61,19 @@ def harmonize_pluvio_nc(data: dict) -> str:
gauge = RainGaugeNc(nc_raw, nc, data)
ind = gauge.get_valid_time_indices()
gauge.nc.createDimension("time", len(ind))
gauge.copy_data(
("time", "rain_rate", "r_accum_RT", "r_accum_NRT", "total_accum_NRT"), ind
)
gauge.copy_data(("time", "rain_rate", "total_accum_NRT"), ind)
gauge.mask_bad_data_values()
gauge.fix_variable_names()
gauge.convert_units()
for key in (
"rainfall_rate",
"rainfall_amount",
):
gauge.harmonize_standard_attributes(key)
gauge.harmonize_attribute("comment", ("rainfall_amount",))
gauge.fix_long_names()
gauge.fix_variable_attributes()
gauge.convert_rainfall_rate()
gauge.convert_rainfall_amount()
gauge.normalize_rainfall_amount()
uuid = gauge.add_uuid()
gauge.add_global_attributes("rain-gauge", instruments.PLUVIO2)
gauge.add_date()
gauge.convert_time()
gauge.add_geolocation()
gauge.add_history("rain-gauge")
gauge.normalize_rainfall_amount()
if "output_path" not in data:
shutil.copy(temp_file.name, data["full_path"])
return uuid
Expand All @@ -92,6 +84,11 @@ def mask_bad_data_values(self):
for _, variable in self.nc.variables.items():
variable[:] = ma.masked_invalid(variable[:])

def fix_variable_attributes(self):
for key in (RATE, AMOUNT):
for attr in ("long_name", "standard_name", "comment"):
self.harmonize_attribute(attr, (key,))

def copy_data(
self,
keys: tuple,
Expand All @@ -112,52 +109,63 @@ def _copy_variable(self, key: str, time_ind: list):
var_out = self.nc.createVariable(
key, dtype, "time", zlib=True, fill_value=fill_value
)
var_out.units = getattr(variable, "units", "1")
screened_data = self._screen_data(variable, time_ind)
var_out[:] = screened_data

def fix_variable_names(self):
keymap = {
"rain_rate": "rainfall_rate",
"int_m": "rainfall_rate",
"total_accum_NRT": "rainfall_amount",
"am_tot": "rainfall_amount",
"am_red": "r_accum_RT",
"rain_rate": RATE,
"int_h": RATE,
"int_m": RATE,
"total_accum_NRT": AMOUNT,
"am_tot": AMOUNT,
}
self.fix_name(keymap)

def fix_long_names(self):
keymap = {
"r_accum_RT": "Real time accumulated rainfall",
"r_accum_NRT": "Near real time accumulated rainfall",
}
self.fix_attribute(keymap, "long_name")

def convert_units(self):
mm_to_m = 0.001
mmh_to_ms = mm_to_m / 3600
self.nc.variables["rainfall_rate"].units = "m s-1"
self.nc.variables["rainfall_rate"][:] *= mmh_to_ms
for key in ("r_accum_RT", "r_accum_NRT", "rainfall_amount"):
if key not in self.nc.variables:
continue
self.nc.variables[key].units = "m"
self.nc.variables[key][:] *= mm_to_m
def convert_rainfall_rate(self):
data = self.nc.variables[RATE][:]
units = self.nc.variables[RATE].units.lower()
match units:
case "mm/s" | "mm s-1" | "mm / s":
pass
case "mm/h" | "mm/hour" | "mm h-1" | "mm / h" | "mm / hour":
data *= 1 / 3600 / 1000
logging.info(f"Converted {RATE} from {units} to m s-1.")
case "mm/min" | "mm min-1" | "mm / min":
data *= 1 / 60 / 1000
logging.info(f"Converted {RATE} from {units} to m s-1.")
case _:
raise ValueError(f"Unknown units: {units}")
self.nc.variables[RATE][:] = data
self.nc.variables[RATE].units = "m s-1"

def convert_rainfall_amount(self):
units = self.nc.variables[AMOUNT].units.lower()
match units:
case "m":
pass
case "mm":
self.nc.variables[AMOUNT][:] /= 1000
case _:
raise ValueError(f"Unknown units: {units}")
self.nc.variables[AMOUNT].units = "m"

def fix_jumps_in_pt(self):
"""Fixes suspicious jumps from a valid value to single 0-value and back in Thies PT data."""
data = self.nc.variables["rainfall_amount"][:]
data = self.nc.variables[AMOUNT][:]
for i in range(1, len(data) - 1):
if data[i] == 0 and data[i - 1] > 0 and data[i + 1] > 0:
data[i] = data[i + 1]
self.nc.variables["rainfall_amount"][:] = data
self.nc.variables[AMOUNT][:] = data

def normalize_rainfall_amount(self) -> None:
"""Copied from Cloudnetpy."""
data = self.nc.variables["rainfall_amount"][:]
data = self.nc.variables[AMOUNT][:]
offset = 0
for i in range(1, len(data)):
if data[i] + offset < data[i - 1]:
offset += data[i - 1]
data[i] += offset
data -= data[0]
self.nc.variables["rainfall_amount"][:] = data
self.nc.variables[AMOUNT][:] = data

0 comments on commit 0c32897

Please sign in to comment.