From 954b24c6fdaa70edc009d5a78090727c86b8b67a Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Mon, 18 Mar 2024 13:28:38 +0100 Subject: [PATCH] feat: published mogpr processes --- src/fusets/openeo/services/mogpr.json | 4 +- src/fusets/openeo/services/mogpr_s1_s2.json | 46 +++++++++++++++++-- .../openeo/services/publish_mogpr_s1_s2.py | 5 +- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/fusets/openeo/services/mogpr.json b/src/fusets/openeo/services/mogpr.json index 54ecc1e..52b4233 100644 --- a/src/fusets/openeo/services/mogpr.json +++ b/src/fusets/openeo/services/mogpr.json @@ -23,8 +23,8 @@ "data": { "from_parameter": "data" }, - "runtime": "Python", - "udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n # extra_bands = [Band(f\"{x}_STD\", None, None) for x in metadata.bands]\n # inspect(data=metadata, message=\"MOGPR metadata\")\n # for band in extra_bands:\n # metadata = metadata.append_band(band)\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n" + "runtime": "Python-Jep", + "udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import Band, CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n extra_bands = []\n\n if include_uncertainties:\n extra_bands += [Band(f\"{x.name}_STD\", None, None) for x in metadata.bands]\n if include_raw_inputs:\n extra_bands += [Band(f\"{x.name}_RAW\", None, None) for x in metadata.bands]\n for band in extra_bands:\n metadata = metadata.append_band(band)\n inspect(data=metadata, message=\"MOGPR metadata\")\n\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs,\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims).astype(\"float32\"))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n" }, "result": true } diff --git a/src/fusets/openeo/services/mogpr_s1_s2.json b/src/fusets/openeo/services/mogpr_s1_s2.json index 283da12..9f4657d 100644 --- a/src/fusets/openeo/services/mogpr_s1_s2.json +++ b/src/fusets/openeo/services/mogpr_s1_s2.json @@ -659,6 +659,35 @@ } } }, + "applydimension1": { + "process_id": "apply_dimension", + "arguments": { + "data": { + "from_node": "if6" + }, + "dimension": "t", + "process": { + "process_graph": { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "context": { + "smoothing_lambda": { + "from_parameter": "s1_smoothing_lambda" + } + }, + "data": { + "from_parameter": "x" + }, + "runtime": "Python", + "udf": "import sys\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv_static\", \"tmp/venv\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply whittaker smoothing to a datacube\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n\n from fusets.whittaker import whittaker\n\n smoothing_lambda = context.get(\"smoothing_lambda\", None)\n return XarrayDataCube(whittaker(cube.get_array(), smoothing_lambda=smoothing_lambda))\n\n\ndef load_whittakker_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies whittaker smoothing.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n" + }, + "result": true + } + } + } + } + }, "BIOPAR1": { "process_id": "BIOPAR", "arguments": { @@ -1178,7 +1207,7 @@ "process_id": "merge_cubes", "arguments": { "cube1": { - "from_node": "if6" + "from_node": "applydimension1" }, "cube2": { "from_node": "if13" @@ -1194,7 +1223,7 @@ "overlap": [], "process": { "process_graph": { - "runudf1": { + "runudf2": { "process_id": "run_udf", "arguments": { "context": { @@ -1208,8 +1237,8 @@ "data": { "from_parameter": "data" }, - "runtime": "Python", - "udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n # extra_bands = [Band(f\"{x}_STD\", None, None) for x in metadata.bands]\n # inspect(data=metadata, message=\"MOGPR metadata\")\n # for band in extra_bands:\n # metadata = metadata.append_band(band)\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n" + "runtime": "Python-Jep", + "udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import Band, CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n extra_bands = []\n\n if include_uncertainties:\n extra_bands += [Band(f\"{x.name}_STD\", None, None) for x in metadata.bands]\n if include_raw_inputs:\n extra_bands += [Band(f\"{x.name}_RAW\", None, None) for x in metadata.bands]\n for band in extra_bands:\n metadata = metadata.append_band(band)\n inspect(data=metadata, message=\"MOGPR metadata\")\n\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs,\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims).astype(\"float32\"))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n" }, "result": true } @@ -1304,6 +1333,15 @@ "optional": true, "default": "RVI ASC" }, + { + "name": "s1_smoothing_lambda", + "description": "Smoothing factor (Whittaker) to smooth the S1 data (0 = no smoothing)", + "schema": { + "type": "number" + }, + "optional": true, + "default": 10000 + }, { "name": "s2_collection", "description": "S2 data collection to use for fusing the data", diff --git a/src/fusets/openeo/services/publish_mogpr_s1_s2.py b/src/fusets/openeo/services/publish_mogpr_s1_s2.py index 80187f2..3313b0d 100644 --- a/src/fusets/openeo/services/publish_mogpr_s1_s2.py +++ b/src/fusets/openeo/services/publish_mogpr_s1_s2.py @@ -3,6 +3,7 @@ from openeo.api.process import Parameter from openeo.processes import eq, if_, merge_cubes, process +from fusets.openeo.services.dummies import DummyConnection from fusets.openeo.services.helpers import DATE_SCHEMA, GEOJSON_SCHEMA, publish_service, read_description from fusets.openeo.services.publish_mogpr import generate_mogpr_cube from fusets.openeo.services.publish_whittaker import WHITTAKER_DEFAULT_SMOOTHING, generate_whittaker_cube @@ -385,5 +386,5 @@ def generate_mogpr_s1_s2_udp(connection): if __name__ == "__main__": # Using the dummy connection as otherwise Datatype errors are generated when creating the input datacubes # where bands are selected. - # generate_mogpr_s1_s2_udp(connection=DummyConnection()) - execute_udf() + generate_mogpr_s1_s2_udp(connection=DummyConnection()) + # execute_udf()