Skip to content

Commit

Permalink
Add pluvio processing
Browse files Browse the repository at this point in the history
  • Loading branch information
tukiains committed Jan 14, 2025
1 parent 5d1f84e commit 1543a12
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/processing/harmonizer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
from .halo_calibrated import harmonize_halo_calibrated_file
from .hatpro import harmonize_hatpro_file
from .parsivel import harmonize_parsivel_file
from .rain_gauge import harmonize_rain_gauge_file
from .rain_gauge import harmonize_pluvio_nc
from .ws import harmonize_ws_file
9 changes: 1 addition & 8 deletions src/processing/harmonizer/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import cloudnetpy.utils
import netCDF4
import numpy as np
import numpy.ma as ma

import processing.utils

Expand All @@ -18,11 +17,6 @@ def __init__(self, nc_raw: netCDF4.Dataset, nc: netCDF4.Dataset, data: dict):
self.nc = nc
self.data = data

def mask_bad_data_values(self):
# mask nan values etc
for _, variable in self.nc.variables.items():
variable[:] = ma.masked_invalid(variable[:])

def convert_time(self):
"""Converts time to decimal hour."""
time = self.nc.variables["time"]
Expand All @@ -48,7 +42,6 @@ def copy_file_contents(
Optionally copies only certain keys and / or uses certain time indices only.
"""
for key, dimension in self.nc_raw.dimensions.items():
print(key, dimension)
if key == "time" and time_ind is not None:
self.nc.createDimension(key, len(time_ind))
else:
Expand Down Expand Up @@ -207,7 +200,7 @@ def _screen_data(
if (
variable.ndim > 0
and time_ind is not None
and variable.dimensions[0] == "time"
and variable.dimensions[0] in ("time", "dim")
):
if variable.ndim == 1:
return variable[time_ind]
Expand Down
90 changes: 82 additions & 8 deletions src/processing/harmonizer/rain_gauge.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging
import shutil
from tempfile import NamedTemporaryFile

import netCDF4
from cloudnetpy.instruments import instruments
from numpy import ma

from processing.harmonizer import core


def harmonize_rain_gauge_file(data: dict) -> str:
def harmonize_pluvio_nc(data: dict) -> str:
if "output_path" not in data:
temp_file = NamedTemporaryFile()
with (
Expand All @@ -18,21 +20,93 @@ def harmonize_rain_gauge_file(data: dict) -> str:
format="NETCDF4_CLASSIC",
) as nc,
):
gauge = core.Level1Nc(nc_raw, nc, data)
gauge = RainGaugeNc(nc_raw, nc, data)
ind = gauge.get_valid_time_indices()
gauge.copy_file_contents(
keys=("time", "rain_rate", "r_accum_RT", "r_accum_NRT", "total_accum_NRT"),
time_ind=ind,
gauge.nc.createDimension("time", len(ind))
gauge.copy_data(
("time", "rain_rate", "r_accum_RT", "r_accum_NRT", "total_accum_NRT"), ind
)
gauge.mask_bad_data_values()
gauge.harmonize_attribute("units", ("latitude", "longitude", "altitude"))
gauge.fix_name({"rain_rate": "rainfall_rate"})
gauge.fix_variable_names()
gauge.convert_units()
for key in (
"latitude",
"longitude",
"altitude",
"rainfall_rate",
"rainfall_amount",
):
gauge.harmonize_standard_attributes(key)
gauge.fix_long_names()
uuid = gauge.add_uuid()
gauge.add_global_attributes("rain-gauge", instruments.PLUVIO)
gauge.add_global_attributes("rain-gauge", instruments.PLUVIO2)
gauge.add_date()
gauge.convert_time()
gauge.add_geolocation()
gauge.add_history("rain-gauge")
if "output_path" not in data:
shutil.copy(temp_file.name, data["full_path"])
return uuid


class RainGaugeNc(core.Level1Nc):
def mask_bad_data_values(self):
for _, variable in self.nc.variables.items():
variable[:] = ma.masked_invalid(variable[:])

def copy_data(
self,
keys: tuple,
time_ind: list,
):
for key in keys:
self._copy_variable(key, time_ind)
self._copy_global_attributes()

def _copy_variable(self, key: str, time_ind: list):
if key not in self.nc_raw.variables.keys():
logging.warning(f"Key {key} not found from the source file.")
return

variable = self.nc_raw.variables[key]
dtype = "f8" if key == "time" and "int64" in str(variable.dtype) else "f4"
fill_value = netCDF4.default_fillvals[dtype] if key != "time" else None
var_out = self.nc.createVariable(
key, dtype, "time", zlib=True, fill_value=fill_value
)
self._copy_variable_attributes(variable, var_out)
screened_data = self._screen_data(variable, time_ind)
var_out[:] = screened_data

@staticmethod
def _copy_variable_attributes(var_in: netCDF4.Variable, var_out: netCDF4.Variable):
skip = ("_FillValue", "_Fill_Value", "description")
for attr in var_in.ncattrs():
if attr not in skip:
setattr(var_out, attr, getattr(var_in, attr))

def fix_variable_names(self):
keymap = {
"rain_rate": "rainfall_rate",
"total_accum_NRT": "rainfall_amount",
}
self.fix_name(keymap)

def fix_long_names(self):
keymap = {
"r_accum_RT": "Real time accumulated rainfall",
"r_accum_NRT": "Near real time accumulated rainfall",
}
self.fix_attribute(keymap, "long_name")

def convert_units(self):
mm_to_m = 0.001
mmh_to_ms = mm_to_m / 3600
self.nc.variables["rainfall_rate"].units = "m s-1"
self.nc.variables["rainfall_rate"][:] *= mmh_to_ms
self.nc.variables["rainfall_amount"][:] -= self.nc.variables["rainfall_amount"][
0
]
for key in ("r_accum_RT", "r_accum_NRT", "rainfall_amount"):
self.nc.variables[key].units = "m"
self.nc.variables[key][:] *= mm_to_m
2 changes: 1 addition & 1 deletion src/processing/instrument_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ class ProcessRainGauge(ProcessInstrument):
def process_pluvio(self):
full_path, self.uuid.raw = self.download_instrument(largest_only=True)
data = self._get_payload_for_nc_file_augmenter(full_path)
self.uuid.product = harmonizer.harmonize_rain_gauge_file(data)
self.uuid.product = harmonizer.harmonize_pluvio_nc(data)


class ProcessWeatherStation(ProcessInstrument):
Expand Down
2 changes: 1 addition & 1 deletion src/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def _get_fields_for_plot(cloudnet_file_type: str) -> tuple[list, int]:
max_alt = 12
match cloudnet_file_type:
case "rain-gauge":
fields = ["rainfall_rate", ""]
fields = ["rainfall_rate", "r_accum_RT", "r_accum_NRT", "rainfall_amount"]
case "categorize-voodoo":
fields = ["v", "liquid_prob"]
case "categorize":
Expand Down

0 comments on commit 1543a12

Please sign in to comment.