diff --git a/docs/api.rst b/docs/api.rst index 5e67ad26..1d85ab5c 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -154,7 +154,7 @@ SFINCS workflows workflows.merge_dataarrays workflows.burn_river_rect workflows.snap_discharge - workflows.river_boundary_points + workflows.river_source_points workflows.river_centerline_from_hydrography workflows.landuse workflows.cn_to_s diff --git a/hydromt_sfincs/sfincs.py b/hydromt_sfincs/sfincs.py index 4d4e8d84..f71d5bfd 100644 --- a/hydromt_sfincs/sfincs.py +++ b/hydromt_sfincs/sfincs.py @@ -762,6 +762,8 @@ def setup_river_inflow( first_index: int = 1, keep_rivers_geom: bool = False, reverse_river_geom: bool = False, + src_type: str = "inflow", + mask_upstream_cells: bool = False, ): """Setup discharge (src) points where a river enters the model domain. @@ -808,13 +810,21 @@ def setup_river_inflow( reverse_river_geom: bool, optional If True, assume that segments in 'rivers' are drawn from downstream to upstream. Only used if 'rivers' is not None, By default False + src_type: {'inflow', 'headwater'}, optional + Source type, by default 'inflow' + If 'inflow', return points where the river flows into the model domain. + If 'headwater', return all headwater (including inflow) points within the model domain. + mask_upstream_cells: bool, optional + If True, mask upstream cells of the river source points, by default False. + Note that this requires hydrography data and is only used if `src_type='headwater'`. See Also -------- setup_discharge_forcing setup_discharge_forcing_from_grid """ - da_flwdir, da_uparea, gdf_riv = None, None, None + # get hydrography data + da_uparea = None if hydrography is not None: ds = self.data_catalog.get_rasterdataset( hydrography, @@ -822,9 +832,14 @@ def setup_river_inflow( variables=["uparea", "flwdir"], buffer=5, ) - da_flwdir = ds["flwdir"] - da_uparea = ds["uparea"] - elif ( + da_uparea = ds["uparea"] # reused in river_source_points + elif mask_upstream_cells: + raise ValueError( + "Masking of upstream cells requires hydrography data to be provided." + ) + + # get river centerlines + if ( isinstance(rivers, str) and rivers == "rivers_outflow" and rivers in self.geoms @@ -835,26 +850,54 @@ def setup_river_inflow( gdf_riv = self.data_catalog.get_geodataframe( rivers, geom=self.region ).to_crs(self.crs) - else: + elif hydrography is not None: + gdf_riv = workflows.river_centerline_from_hydrography( + da_flwdir = ds["flwdir"], + da_uparea = da_uparea, + river_upa=river_upa, + river_len=river_len, + gdf_mask=self.region + ) + elif hydrography is None: raise ValueError("Either hydrography or rivers must be provided.") - gdf_src, gdf_riv = workflows.river_boundary_points( - region=self.region, - res=self.reggrid.dx, + # estimate buffer based on model resolution + buffer = self.reggrid.dx + if self.crs.is_geographic: + buffer = buffer * 111111.0 + + # get river inflow / headwater source points + gdf_src = workflows.river_source_points( gdf_riv=gdf_riv, - da_flwdir=da_flwdir, - da_uparea=da_uparea, - river_len=river_len, + gdf_mask=self.region, + src_type=src_type, + buffer=buffer, river_upa=river_upa, - inflow=True, + river_len=river_len, + da_uparea=da_uparea, reverse_river_geom=reverse_river_geom, logger=self.logger, ) - n = len(gdf_src.index) - self.logger.info(f"Found {n} river inflow points.") - if n == 0: + if gdf_src.empty: return + # mask upstream cells + # FIXME basin mask should be based on shifted outlet cells + if mask_upstream_cells and src_type == "headwater": + gdf_bas, _ = workflows.basin_mask( + da_flwdir=ds["flwdir"], + gdf_outlet=gdf_src, + ) + self.setup_mask_active( + exclude_mask=gdf_bas, + reset_mask=False, + ) + # make sure that the river source points are not masked ! + self.setup_mask_active( + include_mask=gdf_src, + reset_mask=False, + ) + # set forcing src pnts gdf_src.index = gdf_src.index + first_index self.set_forcing_1d(gdf_locs=gdf_src.copy(), name="dis", merge=merge) @@ -943,7 +986,8 @@ def setup_river_outflow( -------- setup_mask_bounds """ - da_flwdir, da_uparea, gdf_riv = None, None, None + # get hydrography data + da_uparea = None if hydrography is not None: ds = self.data_catalog.get_rasterdataset( hydrography, @@ -951,9 +995,10 @@ def setup_river_outflow( variables=["uparea", "flwdir"], buffer=5, ) - da_flwdir = ds["flwdir"] - da_uparea = ds["uparea"] - elif ( + da_uparea = ds["uparea"] # reused in river_source_points + + # get river centerlines + if ( isinstance(rivers, str) and rivers == "rivers_inflow" and rivers in self.geoms @@ -964,22 +1009,36 @@ def setup_river_outflow( gdf_riv = self.data_catalog.get_geodataframe( rivers, geom=self.region ).to_crs(self.crs) + elif hydrography is not None: + gdf_riv = workflows.river_centerline_from_hydrography( + da_flwdir = ds["flwdir"], + da_uparea = da_uparea, + river_upa=river_upa, + river_len=river_len, + gdf_mask=self.region + ) else: raise ValueError("Either hydrography or rivers must be provided.") - # TODO reproject region and gdf_riv to utm zone if model crs is geographic - gdf_out, gdf_riv = workflows.river_boundary_points( - region=self.region, - res=self.reggrid.dx, + # estimate buffer based on model resolution + buffer = self.reggrid.dx + if self.crs.is_geographic: + buffer = buffer * 111111.0 + + # get river inflow / headwater source points + gdf_out = workflows.river_source_points( gdf_riv=gdf_riv, - da_flwdir=da_flwdir, - da_uparea=da_uparea, - river_len=river_len, + gdf_mask=self.region, + src_type="outflow", + buffer=buffer, river_upa=river_upa, - inflow=False, + river_len=river_len, + da_uparea=da_uparea, reverse_river_geom=reverse_river_geom, logger=self.logger, ) + if gdf_out.empty: + return if len(gdf_out) > 0: if "rivwth" in gdf_out.columns: diff --git a/hydromt_sfincs/workflows/flwdir.py b/hydromt_sfincs/workflows/flwdir.py index 0756d8f2..3b3a7b61 100644 --- a/hydromt_sfincs/workflows/flwdir.py +++ b/hydromt_sfincs/workflows/flwdir.py @@ -11,8 +11,9 @@ logger = logging.getLogger(__name__) __all__ = [ - "river_boundary_points", + "river_source_points", "river_centerline_from_hydrography", + "basin_mask", ] @@ -21,7 +22,7 @@ def river_centerline_from_hydrography( da_uparea: xr.DataArray, river_upa: float = 10, river_len: float = 1e3, - mask: gpd.GeoDataFrame = None, + gdf_mask: gpd.GeoDataFrame = None, ) -> gpd.GeoDataFrame: """Returns the centerline of rivers based on a flow direction raster data (`da_flwdir`). @@ -37,7 +38,7 @@ def river_centerline_from_hydrography( river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. - mask: geopandas.GeoDataFrame, optional + gdf_mask: geopandas.GeoDataFrame, optional Polygon to clip river center lines before calculating the river length, by default None. @@ -47,108 +48,183 @@ def river_centerline_from_hydrography( River line vector data. """ # get river network from hydrography based on upstream area mask - flwdir = hydromt.flw.flwdir_from_da(da_flwdir, mask=da_uparea >= river_upa) - gdf_riv = gpd.GeoDataFrame.from_features(flwdir.streams(), crs=da_flwdir.raster.crs) + riv_mask=da_uparea >= river_upa + if not riv_mask.any(): + return gpd.GeoDataFrame() + flwdir = hydromt.flw.flwdir_from_da(da_flwdir, mask=riv_mask) + feats = flwdir.streams(uparea=da_uparea.values) + gdf_riv = gpd.GeoDataFrame.from_features(feats, crs=da_flwdir.raster.crs) # clip to mask and remove empty geometries - if mask is not None: - gdf_riv = gdf_riv.to_crs(mask.crs).clip(mask.unary_union) + if gdf_mask is not None and isinstance(gdf_mask, gpd.GeoDataFrame): + gdf_riv = gdf_riv.to_crs(gdf_mask.crs).clip(gdf_mask.unary_union) gdf_riv = gdf_riv[~gdf_riv.is_empty] # create river network from gdf to get distance from outlet 'rivlen' - gdf_riv["rivlen"] = gdf_riv.geometry.length + # length of river segments + if gdf_riv.crs.is_geographic: + gdf_riv["seglen"] = gdf_riv.to_crs("epsg:3857").geometry.length + else: + gdf_riv["seglen"] = gdf_riv.geometry.length + gdf_riv = gdf_riv[gdf_riv["seglen"] > 0] + if gdf_riv.empty or river_len == 0: + return gdf_riv + # accumulate to get river length from outlet flwdir = pyflwdir.from_dataframe(gdf_riv.set_index("idx"), ds_col="idx_ds") - gdf_riv["rivlen"] = flwdir.accuflux(gdf_riv["rivlen"].values, direction="down") + gdf_riv["rivdst"] = flwdir.accuflux(gdf_riv["seglen"].values, direction="down") + # get maximum river length from outlet (at headwater segments) for each river segment + gdf_riv["rivlen"] = flwdir.fillnodata(np.where(flwdir.n_upstream == 0, gdf_riv["rivdst"], 0), 0) + # filter river network based on total length gdf_riv = gdf_riv[gdf_riv["rivlen"] >= river_len] return gdf_riv -def river_boundary_points( - region: gpd.GeoDataFrame, - res: float, - gdf_riv: gpd.GeoDataFrame = None, - da_flwdir: xr.DataArray = None, - da_uparea: xr.DataArray = None, +def river_source_points( + gdf_riv: gpd.GeoDataFrame, + gdf_mask: gpd.GeoDataFrame, + src_type: str = 'inflow', + buffer: float = 100, river_upa: float = 10, river_len: float = 1e3, - inflow: bool = True, + da_uparea: xr.DataArray = None, reverse_river_geom: bool = False, logger: logging.Logger = logger, ) -> Tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]: """Returns the locations where a river flows in (`inflow=True`) - or out (`inflow=False`) of the model region. + or out (`inflow=False`) of the model gdf_mask. Rivers are based on either a river network vector data (`gdf_riv`) or a flow direction raster data (`da_flwdir`). Parameters ---------- - region: geopandas.GeoDataFrame - Polygon of model region of interest. - res: float - Model resolution [m]. - gdf_riv: geopandas.GeoDataFrame, optional + gdf_riv: geopandas.GeoDataFrame River network vector data, by default None. - da_flwdir: xarray.DataArray, optional - D8 flow direction raster data, by default None. - da_uparea: xarray.DataArray, optional - River upstream area raster data, by default None. + Requires 'uparea' and 'rivlen' attributes to + check for river length and upstream area thresholds. + gdf_mask: geopandas.GeoDataFrame + Polygon of model gdf_mask of interest. + src_type: ['inflow', 'outflow', 'headwater'], optional + Type of river source points to return, by default 'inflow'. + If 'inflow', return points where the river flows into the model domain. + If 'outflow', return points where the river flows out of the model domain. + If 'headwater', return all headwater (including inflow) points within the model domain. + buffer: float, optional + Buffer around gdf_mask to select river source points, by default 100 m. river_upa : float, optional Minimum upstream area threshold for rivers [km2], by default 10.0 river_len: float, optional Mimimum river length [m] within the model domain to define river cells, by default 1000 m. - inflow: bool, optional - If True, return inflow otherwise outflow boundary points, by default True. + da_uparea: xarray.DataArray, optional + River upstream area raster data, by default None. reverse_river_geom: bool, optional If True, assume that segments in 'rivers' are drawn from downstream to upstream. Only used if 'rivers' is not None, By default False Returns ------- - gdf_src, gdf_riv: geopandas.GeoDataFrame - In-/outflow points and river line vector data. + gdf_pnt: geopandas.GeoDataFrame + Source points """ - if not isinstance(region, (gpd.GeoDataFrame, gpd.GeoSeries)) and np.all( - region.geometry.type == "Polygon" + # data checks + if not ( + isinstance(gdf_mask, (gpd.GeoDataFrame, gpd.GeoSeries)) + and np.all(np.isin(gdf_mask.geometry.type, ["Polygon", "MultiPolygon"])) ): - raise ValueError("Boundary must be a GeoDataFrame of LineStrings.") - if res > 0.01 and region.crs.is_geographic: - # provide warning - logger.warning( - "The region crs is geographic, while the resolution seems to be in meters." - ) - - if gdf_riv is None and (da_flwdir is None or da_uparea is None): - raise ValueError("Either gdf_riv or da_flwdir and da_uparea must be provided.") - elif gdf_riv is None: # get river network from hydrography - gdf_riv = river_centerline_from_hydrography( - da_flwdir, da_uparea, river_upa, river_len, mask=region - ) - else: - # clip river to model region - gdf_riv = gdf_riv.to_crs(region.crs).clip(region.unary_union) - # filter river network based on uparea and length - if "uparea" in gdf_riv.columns: - gdf_riv = gdf_riv[gdf_riv["uparea"] >= river_upa] - if "rivlen" in gdf_riv.columns: - gdf_riv = gdf_riv[gdf_riv["rivlen"] > river_len] + raise TypeError("gdf_mask must be a GeoDataFrame of Polygons.") + if not ( + isinstance(gdf_riv, (gpd.GeoDataFrame, gpd.GeoSeries)) + and np.all(np.isin(gdf_riv.geometry.type, ["LineString", "MultiLineString"])) + ): + raise TypeError("gdf_riv must be a GeoDataFrame of LineStrings.") + if src_type not in ['inflow', 'outflow', 'headwater']: + raise ValueError("src_type must be either 'inflow', 'outflow', or 'headwater'.") + if gdf_mask.crs.is_geographic: # to pseudo mercator + gdf_mask = gdf_mask.to_crs("epsg:3857") + + # clip river to model gdf_mask + gdf_riv = gdf_riv.to_crs(gdf_mask.crs).clip(gdf_mask.unary_union) + # filter river network based on uparea and length + if "uparea" in gdf_riv.columns: + gdf_riv = gdf_riv[gdf_riv["uparea"] >= river_upa] + if "rivlen" in gdf_riv.columns: + gdf_riv = gdf_riv[gdf_riv["rivlen"] > river_len] + if gdf_riv.empty: + logger.warning("No rivers matching the uparea and rivlen thresholds found in gdf_riv.") + return gpd.GeoDataFrame() + + # get source points 1m before the start/end of the river # a positive dx results in a point near the start of the line (inflow) # a negative dx results in a point near the end of the line (outflow) - dx = res / 5 if inflow else -res / 5 - if reverse_river_geom: - dx = -dx - - # move point a bit into the model domain - gdf_pnt = gdf_riv.interpolate(dx).to_frame("geometry") - # keep points on boundary cells - bnd = region.boundary.buffer(res).unary_union # NOTE should be single geom - gdf_pnt = gdf_pnt[gdf_pnt.within(bnd)].reset_index(drop=True) + dx = -1 if reverse_river_geom else 1 + gdf_up = gdf_riv.interpolate(dx).to_frame("geometry") + gdf_up['riv_idx'] = gdf_riv.index + gdf_ds = gdf_riv.interpolate(-dx).to_frame("geometry") + gdf_ds['riv_idx'] = gdf_riv.index + + # get points that do not intersect with up/downstream end of other river segments + # use a small buffer of 5m around these points to account for dx and avoid issues with inprecise river geometries + if src_type in ['inflow', 'headwater']: + pnts_ds = gdf_ds.buffer(5).unary_union + gdf_pnt = gdf_up[~gdf_up.intersects(pnts_ds)].reset_index(drop=True) + elif src_type == 'outflow': + pnts_up = gdf_up.buffer(5).unary_union + gdf_pnt = gdf_ds[~gdf_ds.intersects(pnts_up)].reset_index(drop=True) + + # get buffer around gdf_mask, in- and outflow points should be within this buffer + if src_type in ['inflow', 'outflow']: + bnd = gdf_mask.boundary.buffer(buffer).unary_union + gdf_pnt = gdf_pnt[gdf_pnt.intersects(bnd)].reset_index(drop=True) + + # log numer of source points + logger.info(f"Found {gdf_pnt.index.size} {src_type} points.") # add uparea attribute if da_uparea is provided if da_uparea is not None: gdf_pnt["uparea"] = da_uparea.raster.sample(gdf_pnt).values gdf_pnt = gdf_pnt.sort_values("uparea", ascending=False).reset_index(drop=True) - if "rivwth" in gdf_riv.columns: - gdf_pnt = hydromt.gis_utils.nearest_merge( - gdf_pnt, gdf_riv, columns=["rivwth"], max_dist=10 - ) - return gdf_pnt, gdf_riv + return gdf_pnt + +# NOTE: this function is should be available in hydromt +def basin_mask( + da_flwdir: xr.DataArray, + gdf_outlet: gpd.GeoDataFrame, + shift_nup: int = 0, +) -> tuple[gpd.GeoDataFrame, xr.DataArray]: + """Returns a basin mask of all cells upstream from the outlets. + + Parameters + ---------- + da_flwdir: xarray.DataArray + D8 flow direction raster data. + gdf_outlet: geopandas.GeoDataFrame + GeoDataFrame of outlet points. + + Returns + ------- + gdf_bas: geopandas.GeoDataFrame + GeoDataFrame of basin polygons. + da_bas: xarray.DataArray + Raster of basin polygons. + """ + if not ( + isinstance(gdf_outlet, (gpd.GeoDataFrame, gpd.GeoSeries)) + and np.all(gdf_outlet.geometry.type == "Point") + ): + raise TypeError("gdf_mask must be a GeoDataFrame of Points.") + flwdir = hydromt.flw.flwdir_from_da(da_flwdir) + if da_flwdir.raster.crs != gdf_outlet.crs: + gdf_outlet = gdf_outlet.to_crs(da_flwdir.raster.crs) + xy= gdf_outlet.geometry.x.values, gdf_outlet.geometry.y.values + # FIXME + # if shift_nup > 0: + # xy = flwdir.main_upstream(xy, n=shift_nup) + da_bas = xr.DataArray( + data=flwdir.basins(xy=xy).astype(np.int32), + dims=da_flwdir.raster.dims, + coords=da_flwdir.raster.coords, + ) + da_bas.raster.set_nodata(0) + da_bas.raster.set_crs(da_flwdir.raster.crs) + gdf_bas = da_bas.raster.vectorize() + gdf_bas['idx'] = gdf_bas.index + return gdf_bas, da_bas \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 247d693b..13d91a01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import pytest from hydromt_sfincs.sfincs import SfincsModel +from hydromt import DataCatalog TESTDATADIR = join(dirname(abspath(__file__)), "data") TESTMODELDIR = join(TESTDATADIR, "sfincs_test") @@ -37,3 +38,17 @@ def mod(tmpdir): mod.read() mod.set_root(str(tmpdir), mode="r+") return mod + +@pytest.fixture +def data_catalog(): + return DataCatalog('artifact_data') + +@pytest.fixture +def hydrography(data_catalog): + bbox = [12.64, 45.48, 12.82, 45.59] + ds_hydro = data_catalog.get_rasterdataset("merit_hydro", variables=['flwdir', 'uparea', 'basins'], bbox=bbox).load() + da_mask = (ds_hydro['basins'] == 210000039).astype(int) + da_mask.raster.set_crs(ds_hydro.raster.crs) + da_mask.raster.set_nodata(0) + gdf_mask = da_mask.raster.vectorize() + return ds_hydro['flwdir'], ds_hydro['uparea'], gdf_mask diff --git a/tests/test_flwdir.py b/tests/test_flwdir.py new file mode 100644 index 00000000..77255547 --- /dev/null +++ b/tests/test_flwdir.py @@ -0,0 +1,78 @@ +import pytest +from hydromt_sfincs.workflows.flwdir import basin_mask, river_centerline_from_hydrography, river_source_points +import geopandas as gpd +import numpy as np + +def test_river_centerline_from_hydrography(hydrography): + # get data + da_flwdir, da_uparea, gdf_mask = hydrography + # get river centerlines + gdf_riv = river_centerline_from_hydrography(da_flwdir, da_uparea, gdf_mask=gdf_mask) + # check + assert isinstance(gdf_riv, gpd.GeoDataFrame) + assert np.isin(['geometry', 'rivlen', 'uparea'], gdf_riv.columns).all() + assert np.isclose(gdf_riv['rivlen'].max(), 19665.50) + assert gdf_riv.index.size == 11 + # no rivers (uparea threshold too high) + gdf_riv = river_centerline_from_hydrography(da_flwdir, da_uparea, river_upa=1e6) + assert gdf_riv.empty + # no rivers (river length threshold too high) + gdf_riv = river_centerline_from_hydrography(da_flwdir, da_uparea, river_len=1e6) + assert gdf_riv.empty + +def test_river_source_points(hydrography, data_catalog): + # get data + da_flwdir, da_uparea, gdf_mask = hydrography + gdf_mask=gdf_mask.to_crs('EPSG:32633') + + # test with derived centerline + gdf_riv = river_centerline_from_hydrography(da_flwdir, da_uparea, gdf_mask=gdf_mask) + kwargs = dict(gdf_riv=gdf_riv, gdf_mask=gdf_mask) + gdf_src = river_source_points(src_type='inflow', **kwargs) + assert gdf_src.index.size == 3 + gdf_src = river_source_points(src_type='headwater', **kwargs) + assert gdf_src.index.size == 6 + gdf_src = river_source_points(src_type='outflow', da_uparea=da_uparea, **kwargs) + assert gdf_src.index.size == 1 + assert np.isin(['geometry', 'riv_idx', 'uparea'], gdf_src.columns).all() + + # test reverse oriented line + gdf_riv = data_catalog.get_geodataframe('rivers_lin2019_v1') + kwargs = dict(gdf_riv=gdf_riv, gdf_mask=gdf_mask, reverse_river_geom=True) + gdf_src = river_source_points(src_type='inflow', **kwargs) + assert gdf_src.index.size == 1 # this data only one river + assert gdf_src['riv_idx'].values[0] == 38 + gdf_src = river_source_points(src_type='outflow', **kwargs) + assert gdf_src.index.size == 1 # this data only one river + assert gdf_src['riv_idx'].values[0] == 34 + gdf_src = river_source_points(src_type='headwater', **kwargs) + assert gdf_src.index.size == 2 + assert np.isin(38, gdf_src['riv_idx'].values) + + # test errors + with pytest.raises(ValueError, match="src_type must be either"): + gdf_src = river_source_points(gdf_riv=gdf_riv, gdf_mask=gdf_mask, src_type='wrong') + with pytest.raises(TypeError, match="gdf_mask must be"): + gdf_src = river_source_points(gdf_riv=gdf_riv, gdf_mask=gdf_riv) + with pytest.raises(TypeError, match="gdf_riv must be"): + gdf_src = river_source_points(gdf_riv=gdf_mask, gdf_mask=gdf_mask) + + +def test_basin_mask(hydrography): + # get data and create outlet point + da_flwdir = hydrography[0] + points = gpd.points_from_xy([12.72375], [45.53625]) + gdf_outlet = gpd.GeoDataFrame(geometry=points, crs='EPSG:4326') + gdf_outlet = gdf_outlet.to_crs('EPSG:32633') + # test basin mask + gdf_bas, da_bas = basin_mask(da_flwdir, gdf_outlet) + assert gdf_bas.index.size == 1 + assert (gdf_bas['idx'].values == gdf_outlet.index.values).all() + assert gdf_bas.crs == da_flwdir.raster.crs + assert np.isin(np.unique(da_bas.values), [0, 1]).all() + # + gdf_outlet.to_file('outlet.geojson', driver='GeoJSON') + gdf_bas.to_file('basin.geojson', driver='GeoJSON') + # test errors + with pytest.raises(TypeError): + basin_mask(da_flwdir, gdf_outlet.buffer(1).to_frame("geometry")) \ No newline at end of file