From d2efc5d5f9a5eeb672d2f011c3236a273fffd9f1 Mon Sep 17 00:00:00 2001 From: jhalbauer Date: Thu, 13 Jun 2024 11:59:57 +0200 Subject: [PATCH] r.dop.import: added SN worker --- r.dop.import.sn/r.dop.import.sn.html | 10 + r.dop.import.sn/r.dop.import.sn.py | 275 ++++++++------ r.dop.import.worker.sn/Makefile | 8 + .../r.dop.import.worker.sn.html | 18 + .../r.dop.import.worker.sn.py | 347 ++++++++++++++++++ testsuite/test_r_dop_import_SN.py | 2 +- 6 files changed, 548 insertions(+), 112 deletions(-) create mode 100644 r.dop.import.worker.sn/Makefile create mode 100644 r.dop.import.worker.sn/r.dop.import.worker.sn.html create mode 100644 r.dop.import.worker.sn/r.dop.import.worker.sn.py diff --git a/r.dop.import.sn/r.dop.import.sn.html b/r.dop.import.sn/r.dop.import.sn.html index 3c90437..13258ae 100644 --- a/r.dop.import.sn/r.dop.import.sn.html +++ b/r.dop.import.sn/r.dop.import.sn.html @@ -3,6 +3,16 @@

DESCRIPTION

r.dop.import.sn downloads and imports digital orthophotos (DOP) for Sachsen (SN) and area of interest. +The data "Digitale Orthophotos" is downloaded from +Geoportal Sachsen +and can be used under the specification of the license and referencing the source: + +
source: GeoSN + +
license: Datenlizenz Deutschland Namensnennung 2.0, + +
DOP +

EXAMPLES

diff --git a/r.dop.import.sn/r.dop.import.sn.py b/r.dop.import.sn/r.dop.import.sn.py index 1fbb9c1..92558de 100644 --- a/r.dop.import.sn/r.dop.import.sn.py +++ b/r.dop.import.sn/r.dop.import.sn.py @@ -5,7 +5,7 @@ # MODULE: r.dop.import.sn # AUTHOR(S): Johannes Halbauer, Anika Weinmann # -# PURPOSE: Downloads DOPs for Sachsen and AOI +# PURPOSE: Downloads DOPs for Sachsen and aoi # COPYRIGHT: (C) 2024 by mundialis GmbH & Co. KG and the GRASS # Development Team # @@ -22,7 +22,7 @@ ############################################################################ # %module -# % description: Downloads DOPs for Sachsen and AOI. +# % description: Downloads DOPs for Sachsen and aoi. # % keyword: raster # % keyword: import # % keyword: DOP @@ -37,8 +37,8 @@ # %option # % key: download_dir -# % label: Path of output folder -# % description: Path of download folder +# % label: Path to output folder +# % description: Path to download folder # % required: no # % multiple: no # %end @@ -47,6 +47,16 @@ # % description: Name for output raster map # %end +# %option +# % key: nprocs +# % type: integer +# % required: no +# % multiple: no +# % label: Number of parallel processes +# % description: Number of cores for multiprocessing, -2 is the number of available cores - 1 +# % answer: -2 +# %end + # %option G_OPT_MEMORYMB # %end @@ -66,43 +76,49 @@ import atexit import os -from osgeo import gdal -from time import sleep +import sys + import grass.script as grass +from grass.pygrass.modules import Module, ParallelModuleQueue +from grass.pygrass.utils import get_lib_path +from osgeo import gdal from grass_gis_helpers.cleanup import general_cleanup -from grass_gis_helpers.general import test_memory -from grass_gis_helpers.open_geodata_germany.download_data import ( - check_download_dir, - download_data_using_threadpool, - extract_compressed_files, -) -from grass_gis_helpers.raster import adjust_raster_resolution, create_vrt from grass_gis_helpers.data_import import ( download_and_import_tindex, get_list_of_tindex_locations, ) - -# set global varibales +from grass_gis_helpers.general import test_memory +from grass_gis_helpers.open_geodata_germany.download_data import ( + check_download_dir, +) +from grass_gis_helpers.raster import create_vrt + +# import module library +path = get_lib_path(modname="r.dop.import") +if path is None: + grass.fatal("Unable to find the dop library directory.") +sys.path.append(path) +try: + from r_dop_import_lib import setup_parallel_processing +except Exception as imp_err: + grass.fatal(f"r.dop.import library could not be imported: {imp_err}") + +# set global variables TINDEX = ( "https://github.com/mundialis/tile-indices/raw/main/DOP/SN/" "DOP20_tileindex_SN.gpkg.gz" ) ID = grass.tempname(12) -RETRIES = 10 orig_region = None -keep_data = False rm_rasters = [] rm_vectors = [] download_dir = None +rm_dirs = [] def cleanup(): - rm_dirs = [] - if not keep_data: - if download_dir: - rm_dirs.append(download_dir) general_cleanup( orig_region=orig_region, rm_rasters=rm_rasters, @@ -112,121 +128,158 @@ def cleanup(): def main(): - global ID, orig_region, rm_rasters, rm_vectors, keep_data, download_dir + global ID, orig_region, rm_rasters, rm_vectors + global temp_loc_path, download_dir, new_mapset aoi = options["aoi"] download_dir = check_download_dir(options["download_dir"]) + nprocs = int(options["nprocs"]) + nprocs = setup_parallel_processing(nprocs) output = options["output"] - keep_data = flags["k"] - native_res = flags["r"] + fs = "SN" # set memory to input if possible options["memory"] = test_memory(options["memory"]) + # create list for each raster band for building entire raster + all_raster = { + "red": [], + "green": [], + "blue": [], + "nir": [], + } + # save original region orig_region = f"original_region_{ID}" grass.run_command("g.region", save=orig_region, quiet=True) - ns_res = grass.region()["nsres"] + + # get region resolution and check if resolution consistent + reg = grass.region() + if reg["nsres"] == reg["ewres"]: + ns_res = reg["nsres"] + else: + grass.fatal("N/S resolution is not the same as E/W resolution!") # set region if aoi is given if aoi: grass.run_command("g.region", vector=aoi, flags="a") + # if no aoi save region as aoi + else: + aoi = f"region_aoi_{ID}" + grass.run_command( + "v.in.region", + output=aoi, + quiet=True, + ) # get tile index tindex_vect = f"dop_tindex_{ID}" rm_vectors.append(tindex_vect) download_and_import_tindex(TINDEX, tindex_vect, download_dir) - # get download urls which overlap with AOI - # or current region if no AOI is given + # get download urls which overlap with aoi + # or current region if no aoi is given url_tiles = get_list_of_tindex_locations(tindex_vect, aoi) - # if k flag is set download DOPs to download_dir - if keep_data: - download_list = [ - os.path.dirname(url.replace("/vsizip/vsicurl/", "")) - for url in url_tiles - ] - - # download with using nprocs=3 - download_data_using_threadpool(download_list, download_dir, 3) - # extract downloaded files - file_names = [os.path.basename(url) for url in download_list] - extract_compressed_files(file_names, download_dir) - - # import DOPs directly - grass.message(_("Importing DOPs...")) - - # create list of lists for all DOPs caused by GRASS import structure - # (one raster map per band) - all_dops = [[], [], [], []] - res = None - for url in url_tiles: - # define name for imported DOP - dop_name = os.path.splitext(os.path.basename(url))[0].replace("-", "") - - # define input parameter for import - if keep_data: - input = os.path.join(download_dir, os.path.basename(url)) - else: - if "/vsicurl/" not in url: - input = f"/vsicurl/{url}" + url_tiles_count = 0 + for i in range(len(url_tiles)): + url_tiles_count += 1 + url_tiles[i] = (url_tiles_count, [url_tiles[i]]) + number_tiles = len(url_tiles) + + # set number of parallel processes to number of tiles + if number_tiles < nprocs: + nprocs = number_tiles + queue = ParallelModuleQueue(nprocs=nprocs) + + # get GISDBASE and Location + gisenv = grass.gisenv() + gisdbase = gisenv["GISDBASE"] + location = gisenv["LOCATION_NAME"] + + # set queue and variables for worker adddon + try: + grass.message( + _(f"Importing {number_tiles} DOPs for SN in parallel...") + ) + for tile in url_tiles: + key = tile[0] + new_mapset = f"tmp_mapset_rdop_import_tile_{key}_{os.getpid()}" + rm_dirs.append(os.path.join(gisdbase, location, new_mapset)) + b_name = os.path.basename(tile[1][0]) + raster_name = ( + f"{b_name.split('.')[0].replace('-', '_')}" f"_{os.getpid()}" + ) + for key_rast in all_raster: + all_raster[key_rast].append( + f"{fs}_{raster_name}_{key_rast}@{new_mapset}" + ) + param = { + "tile_key": key, + "tile_url": tile[1][0], + "raster_name": raster_name, + "orig_region": orig_region, + "memory": 1000, + "new_mapset": new_mapset, + "flags": "", + } + grass.message(_(f"raster name: {raster_name}")) + + # modify params + if aoi: + param["aoi"] = aoi + if options["download_dir"]: + param["download_dir"] = download_dir + if flags["k"]: + param["flags"] += "k" + if flags["r"]: + dop_src = gdal.Open(param["tile_url"]) + param["resolution_to_import"] = abs( + dop_src.GetGeoTransform()[1] + ) else: - input = url - - if res is None: - ds = gdal.Open(url) - count = 0 - while ds is None and count <= RETRIES: - count += 1 - sleep(10) - ds = gdal.Open(url) - res = ds.GetGeoTransform()[1] - - # import DOPs - count = 0 - imported = False - while not imported and count <= RETRIES: - count += 1 - try: - grass.run_command( - "r.import", - input=input, - output=dop_name, - extent="region", - resolution="value", - resolution_value=res, - memory=1000, - overwrite=True, - quiet=True, + param["resolution_to_import"] = ns_res + + # append raster bands to download to remove list + rm_red = f"{raster_name}_red" + rm_green = f"{raster_name}_green" + rm_blue = f"{raster_name}_blue" + rm_nir = f"{raster_name}_nir" + rm_rasters.append(rm_red) + rm_rasters.append(rm_green) + rm_rasters.append(rm_blue) + rm_rasters.append(rm_nir) + + # run worker addon in parallel + r_dop_import_worker_SN = Module( + "r.dop.import.worker.sn", + **param, + run_=False, + ) + # catch all GRASS output to stdout and stderr + r_dop_import_worker_SN.stdout = grass.PIPE + r_dop_import_worker_SN.stderr = grass.PIPE + queue.put(r_dop_import_worker_SN) + queue.wait() + except Exception: + for proc_num in range(queue.get_num_run_procs()): + proc = queue.get(proc_num) + if proc.returncode != 0: + # save all stderr to a variable and pass it to a GRASS + # exception + errmsg = proc.outputs["stderr"].value.strip() + grass.fatal( + _(f"\nERROR by processing <{proc.get_bash()}>: {errmsg}") ) - imported = True - except Exception: - sleep(10) - # append DOP name with band suffix - all_dops[0].append(dop_name + ".1") - all_dops[1].append(dop_name + ".2") - all_dops[2].append(dop_name + ".3") - all_dops[3].append(dop_name + ".4") - - # create VRT - vrt_outputs = [] - for dops, band in zip(all_dops, ["red", "green", "blue", "nir"]): - vrt_output = f"{output}_{band}" - create_vrt(dops, vrt_output) - vrt_outputs.append(vrt_output) - - # resample / interpolate whole VRT (because interpolating single files leads - # to empty rows and columns) - # check resolution and resample / interpolate data if needed - if not native_res: - grass.message(_("Resampling / interpolating data...")) - grass.run_command("g.region", raster=vrt_output, res=ns_res, flags="a") - for vrt_out in vrt_outputs: - adjust_raster_resolution(vrt_out, vrt_out, ns_res) - - for result in vrt_outputs: - grass.message(_(f"DOP raster map <{result}> is created.")) + + # create one vrt per band of all imported DOPs + raster_out = [] + for band, b_list in all_raster.items(): + out = f"{output}_{band}" + create_vrt(b_list, out) + raster_out.append(out) + + grass.message(_(f"Generated following raster maps: {raster_out}")) if __name__ == "__main__": diff --git a/r.dop.import.worker.sn/Makefile b/r.dop.import.worker.sn/Makefile new file mode 100644 index 0000000..be9559b --- /dev/null +++ b/r.dop.import.worker.sn/Makefile @@ -0,0 +1,8 @@ +MODULE_TOPDIR = ../.. + +PGM = r.dop.import.worker.sn + +include $(MODULE_TOPDIR)/include/Make/Python.make +include $(MODULE_TOPDIR)/include/Make/Script.make + +default: script diff --git a/r.dop.import.worker.sn/r.dop.import.worker.sn.html b/r.dop.import.worker.sn/r.dop.import.worker.sn.html new file mode 100644 index 0000000..0679a78 --- /dev/null +++ b/r.dop.import.worker.sn/r.dop.import.worker.sn.html @@ -0,0 +1,18 @@ +

DESCRIPTION

+ +r.dop.import.worker.sn is used within r.dop.import.sn to import the +DOPs in parallel. + +

SEE ALSO

+ + +r.dop.import.bb.be, +r.buildvrt, +r.import, +v.check.federal_state + + +

AUTHORS

+ +Johannes Halbauer, mundialis GmbH & Co. KG +Lina Krisztian, mundialis GmbH & Co. KG \ No newline at end of file diff --git a/r.dop.import.worker.sn/r.dop.import.worker.sn.py b/r.dop.import.worker.sn/r.dop.import.worker.sn.py new file mode 100644 index 0000000..8e4d2b9 --- /dev/null +++ b/r.dop.import.worker.sn/r.dop.import.worker.sn.py @@ -0,0 +1,347 @@ +#!/usr/bin/env python3 +# +############################################################################ +# +# MODULE: r.dop.import.worker.sn +# AUTHOR(S): Johannes Halbauer & Lina Krisztian +# +# PURPOSE: Downloads Digital Orthophotos (DOPs) within a specified area in Sachsen +# COPYRIGHT: (C) 2024 by mundialis GmbH & Co. KG and the GRASS Development Team +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +############################################################################# + +# %Module +# % description: Downloads and imports single Digital Orthophotos (DOPs) in Sachsen +# % keyword: imagery +# % keyword: download +# % keyword: DOP +# %end + +# %option G_OPT_V_INPUT +# % key: aoi +# % required: no +# % description: Vector map to restrict DOP import to +# %end + +# %option +# % key: download_dir +# % label: Path to output folder +# % description: Path to download folder +# % required: no +# % multiple: no +# %end + +# %option +# % key: tile_key +# % required: yes +# % description: Key of tile-DOP to import +# %end + +# %option +# % key: tile_url +# % required: yes +# % description: URL of tile-DOP to import +# %end + +# %option +# % key: new_mapset +# % type: string +# % required: yes +# % multiple: no +# % key_desc: name +# % description: Name for new mapset +# %end + +# %option +# % key: orig_region +# % required: yes +# % description: Original region +# %end + +# %option +# % key: resolution_to_import +# % required: yes +# % description: Resolution of region, for which DOP will be imported +# %end + +# %option G_OPT_R_OUTPUT +# % key: raster_name +# % description: Name of raster output +# %end + +# %option G_OPT_MEMORYMB +# % description: Memory which is used by all processes (it is divided by nprocs for each single parallel process) +# %end + +# %flag +# % key: r +# % description: Use native DOP resolution +# %end + +# %flag +# % key: k +# % label: Keep downloaded data in the download directory +# %end + +# %rules +# % requires_all: -k,download_dir +# %end + + +import atexit +import os +import sys +from time import sleep + +import grass.script as grass +from grass.pygrass.utils import get_lib_path + +from grass_gis_helpers.cleanup import general_cleanup, cleaning_tmp_location +from grass_gis_helpers.general import test_memory +from grass_gis_helpers.location import ( + get_current_location, + create_tmp_location, + switch_back_original_location, +) +from grass_gis_helpers.mapset import switch_to_new_mapset +from grass_gis_helpers.open_geodata_germany.download_data import ( + download_data_using_threadpool, + extract_compressed_files, +) +from grass_gis_helpers.raster import adjust_raster_resolution + +# import module library +path = get_lib_path(modname="r.dop.import") +if path is None: + grass.fatal("Unable to find the dop library directory.") +sys.path.append(path) +try: + from r_dop_import_lib import rescale_to_1_256 +except Exception as imp_err: + grass.fatal(f"r.dop.import library could not be imported: {imp_err}") + +rm_rast = [] +rm_group = [] + +gisdbase = None +tmp_loc = None +tmp_gisrc = None + +RETRIES = 30 +WAITING_TIME = 10 + + +def cleanup(): + cleaning_tmp_location( + None, tmp_loc=tmp_loc, tmp_gisrc=tmp_gisrc, gisdbase=gisdbase + ) + general_cleanup( + rm_rasters=rm_rast, + rm_groups=rm_group, + ) + + +def import_and_reproject( + url, + raster_name, + resolution_to_import, + aoi_map=None, + download_dir=None, + epsg=None, +): + """Import DOPs and reproject them if needed. + + Args: + url (str): The URL of the DOP to import + raster_name (str): The prefix name for the output rasters + aoi_map (str): Name of AOI vector map + download_dir (str): Path to local directory to downlaod DOPs to + epsg (int): EPSG code which has to be set if the reproduction should be + done manually and not by r.import + """ + global gisdbase, tmp_loc, tmp_gisrc + aoi_map_to_set_region1 = aoi_map + + # get actual location, mapset, ... + loc, mapset, gisdbase, gisrc = get_current_location() + if not aoi_map: + aoi_map = f"region_aoi_{grass.tempname(8)}" + aoi_map_to_set_region1 = aoi_map + grass.run_command("v.in.region", output=aoi_map, quiet=True) + else: + aoi_map_mapset = aoi_map.split("@") + aoi_map_to_set_region1 = aoi_map_mapset[0] + if len(aoi_map_mapset) == 2: + mapset = aoi_map_mapset[1] + + # create temporary location with EPSG:25832 + tmp_loc, tmp_gisrc = create_tmp_location(epsg) + + # reproject aoi + if aoi_map: + grass.run_command( + "v.proj", + location=loc, + mapset=mapset, + input=aoi_map_to_set_region1, + output=aoi_map_to_set_region1, + quiet=True, + ) + grass.run_command( + "g.region", + vector=aoi_map_to_set_region1, + res=resolution_to_import, + flags="a", + ) + + # import data + # set memory manually to 1000 + # Process stuck, when memory is too large (100000) + # GDAL_CACHEMAX wird nur als MB interpretiert + kwargs = { + "input": url, + "output": raster_name, + "memory": 1000, + "quiet": True, + "extent": "region", + } + # TODO: Auslagern dieser Funktion in Lib, jedoch unterscheidet + # sich der folgende if-Block bspw. von dem von BB/BE + # + # download and keep data to download dir if -k flag ist set + # and change input parameter in kwargs to local path + if flags["k"]: + basename = os.path.basename(url) + print(basename) + url = os.path.dirname(url.replace("/vsizip/vsicurl/", "")) + download_data_using_threadpool([url], download_dir, None) + extract_compressed_files([os.path.basename(url)], download_dir) + kwargs["input"] = os.path.join(download_dir, basename) + + import_sucess = False + tries = 0 + while not import_sucess: + tries += 1 + if tries > RETRIES: + grass.fatal( + _( + f"Importing {kwargs['input']} failed after {RETRIES} " + "retries." + ) + ) + try: + grass.run_command("r.import", **kwargs) + import_sucess = True + except Exception: + sleep(WAITING_TIME) + if not aoi_map: + grass.run_command("g.region", raster=f"{raster_name}.1") + + # reproject data + res = float( + grass.parse_command("r.info", flags="g", map=f"{raster_name}.1")[ + "nsres" + ] + ) + # switch location + os.environ["GISRC"] = str(gisrc) + if aoi_map: + grass.run_command("g.region", vector=aoi_map, res=res, flags="a") + else: + grass.run_command("g.region", res=res, flags="a") + for i in range(1, 5): + name = f"{raster_name}.{i}" + # set memory manually to 1000 + # Process stuck, when memory is too large (100000) + # GDAL_CACHEMAX is only interpreted as MB, if value is <100000 + grass.run_command( + "r.proj", + location=tmp_loc, + mapset="PERMANENT", + input=name, + output=name, + resolution=res, + flags="n", + quiet=True, + memory=1000, + ) + + +def main(): + # parser options + tile_key = options["tile_key"] + tile_url = options["tile_url"] + raster_name = options["raster_name"] + resolution_to_import = float(options["resolution_to_import"]) + orig_region = options["orig_region"] + new_mapset = options["new_mapset"] + download_dir = options["download_dir"] + + # set memory to input if possible + options["memory"] = test_memory(options["memory"]) + + # switch to new mapset for parallel processing + gisrc, newgisrc, old_mapset = switch_to_new_mapset(new_mapset) + + # set region + grass.run_command("g.region", region=f"{orig_region}@{old_mapset}") + if options["aoi"]: + aoi_map = f"{options['aoi']}@{old_mapset}" + else: + aoi_map = None + + # import DOP tile with original resolution + grass.message( + _(f"Started DOP import for key: {tile_key} and URL: {tile_url}") + ) + + # import and reproject DOP tiles based on tileindex + import_and_reproject( + tile_url, + raster_name, + resolution_to_import, + aoi_map, + download_dir, + epsg=25832, + ) + # adjust resolution if required + for band in [1, 2, 3, 4]: + raster_name_band = f"{raster_name}.{band}" + grass.run_command( + "g.region", + raster=raster_name_band, + res=resolution_to_import, + flags="a", + ) + adjust_raster_resolution( + raster_name_band, raster_name_band, resolution_to_import + ) + rm_group.append(raster_name) + grass.message(_(f"Finishing raster import for {raster_name}...")) + + # rescale imported DOPs + new_rm_rast = rescale_to_1_256("SN", raster_name) + rm_rast.extend(new_rm_rast) + + # switch back to original location + switch_back_original_location(gisrc) + grass.utils.try_remove(newgisrc) + grass.message( + _(f"DOP import for key: {tile_key} and URL: {tile_url} done!") + ) + + +if __name__ == "__main__": + options, flags = grass.parser() + atexit.register(cleanup) + main() diff --git a/testsuite/test_r_dop_import_SN.py b/testsuite/test_r_dop_import_SN.py index 031a84b..cf1e668 100644 --- a/testsuite/test_r_dop_import_SN.py +++ b/testsuite/test_r_dop_import_SN.py @@ -28,7 +28,7 @@ class TestRDopImportSN(RDopImportTestBase): fs = "SN" ref_res = 0.2 - aoi_cells = 2021300 + aoi_cells = 2027082 def test_default_settings(self): """