Skip to content

Commit

Permalink
feat: published mogpr processes
Browse files Browse the repository at this point in the history
  • Loading branch information
JanssenBrm committed Mar 18, 2024
1 parent 790476e commit 954b24c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/fusets/openeo/services/mogpr.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
"data": {
"from_parameter": "data"
},
"runtime": "Python",
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n # extra_bands = [Band(f\"{x}_STD\", None, None) for x in metadata.bands]\n # inspect(data=metadata, message=\"MOGPR metadata\")\n # for band in extra_bands:\n # metadata = metadata.append_band(band)\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
"runtime": "Python-Jep",
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import Band, CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n extra_bands = []\n\n if include_uncertainties:\n extra_bands += [Band(f\"{x.name}_STD\", None, None) for x in metadata.bands]\n if include_raw_inputs:\n extra_bands += [Band(f\"{x.name}_RAW\", None, None) for x in metadata.bands]\n for band in extra_bands:\n metadata = metadata.append_band(band)\n inspect(data=metadata, message=\"MOGPR metadata\")\n\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs,\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims).astype(\"float32\"))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
},
"result": true
}
Expand Down
46 changes: 42 additions & 4 deletions src/fusets/openeo/services/mogpr_s1_s2.json
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,35 @@
}
}
},
"applydimension1": {
"process_id": "apply_dimension",
"arguments": {
"data": {
"from_node": "if6"
},
"dimension": "t",
"process": {
"process_graph": {
"runudf1": {
"process_id": "run_udf",
"arguments": {
"context": {
"smoothing_lambda": {
"from_parameter": "s1_smoothing_lambda"
}
},
"data": {
"from_parameter": "x"
},
"runtime": "Python",
"udf": "import sys\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.udf import XarrayDataCube\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv_static\", \"tmp/venv\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply whittaker smoothing to a datacube\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n\n from fusets.whittaker import whittaker\n\n smoothing_lambda = context.get(\"smoothing_lambda\", None)\n return XarrayDataCube(whittaker(cube.get_array(), smoothing_lambda=smoothing_lambda))\n\n\ndef load_whittakker_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies whittaker smoothing.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
},
"result": true
}
}
}
}
},
"BIOPAR1": {
"process_id": "BIOPAR",
"arguments": {
Expand Down Expand Up @@ -1178,7 +1207,7 @@
"process_id": "merge_cubes",
"arguments": {
"cube1": {
"from_node": "if6"
"from_node": "applydimension1"
},
"cube2": {
"from_node": "if13"
Expand All @@ -1194,7 +1223,7 @@
"overlap": [],
"process": {
"process_graph": {
"runudf1": {
"runudf2": {
"process_id": "run_udf",
"arguments": {
"context": {
Expand All @@ -1208,8 +1237,8 @@
"data": {
"from_parameter": "data"
},
"runtime": "Python",
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n # extra_bands = [Band(f\"{x}_STD\", None, None) for x in metadata.bands]\n # inspect(data=metadata, message=\"MOGPR metadata\")\n # for band in extra_bands:\n # metadata = metadata.append_band(band)\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
"runtime": "Python-Jep",
"udf": "import os\nimport sys\nfrom configparser import ConfigParser\nfrom pathlib import Path\nfrom typing import Dict\n\nfrom openeo.metadata import Band, CollectionMetadata\nfrom openeo.udf import XarrayDataCube, inspect\n\n\ndef load_venv():\n \"\"\"\n Add the virtual environment to the system path if the folder `/tmp/venv_static` exists\n :return:\n \"\"\"\n for venv_path in [\"tmp/venv\", \"tmp/venv_static\"]:\n if Path(venv_path).exists():\n sys.path.insert(0, venv_path)\n\n\ndef set_home(home):\n os.environ[\"HOME\"] = home\n\n\ndef create_gpy_cfg():\n home = os.getenv(\"HOME\")\n set_home(\"/tmp\")\n user_file = Path.home() / \".config\" / \"GPy\" / \"user.cfg\"\n if not user_file.exists():\n user_file.parent.mkdir(parents=True, exist_ok=True)\n return user_file, home\n\n\ndef write_gpy_cfg():\n user_file, home = create_gpy_cfg()\n config = ConfigParser()\n config[\"plotting\"] = {\"library\": \"none\"}\n with open(user_file, \"w\") as cfg:\n config.write(cfg)\n cfg.close()\n return home\n\n\ndef apply_metadata(metadata: CollectionMetadata, context: dict) -> CollectionMetadata:\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n extra_bands = []\n\n if include_uncertainties:\n extra_bands += [Band(f\"{x.name}_STD\", None, None) for x in metadata.bands]\n if include_raw_inputs:\n extra_bands += [Band(f\"{x.name}_RAW\", None, None) for x in metadata.bands]\n for band in extra_bands:\n metadata = metadata.append_band(band)\n inspect(data=metadata, message=\"MOGPR metadata\")\n\n return metadata\n\n\ndef apply_datacube(cube: XarrayDataCube, context: Dict) -> XarrayDataCube:\n \"\"\"\n Apply mogpr integration to a datacube.\n MOGPR requires a full timeseries for multiple bands, so it needs to be invoked in the context of an apply_neighborhood process.\n @param cube:\n @param context:\n @return:\n \"\"\"\n load_venv()\n home = write_gpy_cfg()\n\n from fusets.mogpr import mogpr\n\n variables = context.get(\"variables\")\n time_dimension = context.get(\"time_dimension\", \"t\")\n prediction_period = context.get(\"prediction_period\", \"5D\")\n include_uncertainties = context.get(\"include_uncertainties\", False)\n include_raw_inputs = context.get(\"include_raw_inputs\", False)\n\n dims = cube.get_array().dims\n result = mogpr(\n cube.get_array().to_dataset(dim=\"bands\"),\n variables=variables,\n time_dimension=time_dimension,\n prediction_period=prediction_period,\n include_uncertainties=include_uncertainties,\n include_raw_inputs=include_raw_inputs,\n )\n result_dc = XarrayDataCube(result.to_array(dim=\"bands\").transpose(*dims).astype(\"float32\"))\n inspect(data=result_dc, message=\"MOGPR result\")\n set_home(home)\n return result_dc\n\n\ndef load_mogpr_udf() -> str:\n \"\"\"\n Loads an openEO udf that applies mogpr.\n @return:\n \"\"\"\n import os\n\n return Path(os.path.realpath(__file__)).read_text()\n"
},
"result": true
}
Expand Down Expand Up @@ -1304,6 +1333,15 @@
"optional": true,
"default": "RVI ASC"
},
{
"name": "s1_smoothing_lambda",
"description": "Smoothing factor (Whittaker) to smooth the S1 data (0 = no smoothing)",
"schema": {
"type": "number"
},
"optional": true,
"default": 10000
},
{
"name": "s2_collection",
"description": "S2 data collection to use for fusing the data",
Expand Down
5 changes: 3 additions & 2 deletions src/fusets/openeo/services/publish_mogpr_s1_s2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from openeo.api.process import Parameter
from openeo.processes import eq, if_, merge_cubes, process

from fusets.openeo.services.dummies import DummyConnection
from fusets.openeo.services.helpers import DATE_SCHEMA, GEOJSON_SCHEMA, publish_service, read_description
from fusets.openeo.services.publish_mogpr import generate_mogpr_cube
from fusets.openeo.services.publish_whittaker import WHITTAKER_DEFAULT_SMOOTHING, generate_whittaker_cube
Expand Down Expand Up @@ -385,5 +386,5 @@ def generate_mogpr_s1_s2_udp(connection):
if __name__ == "__main__":
# Using the dummy connection as otherwise Datatype errors are generated when creating the input datacubes
# where bands are selected.
# generate_mogpr_s1_s2_udp(connection=DummyConnection())
execute_udf()
generate_mogpr_s1_s2_udp(connection=DummyConnection())
# execute_udf()

0 comments on commit 954b24c

Please sign in to comment.